Commit 61200aa0 authored by wang.wenrong's avatar wang.wenrong

Merge branch 'develop' into wwr

parents 9fc31e09 8663e19b
......@@ -19,7 +19,7 @@ async def point_day_power_dao(cid, start, end):
ORDER BY pp.create_time
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, args=(cid, ))
data = await conn.fetchall(sql, args=(cid,))
return data
......@@ -31,7 +31,7 @@ async def get_total_kwh_dao(cid, start, end):
and pp.create_time<='{end}' and m.demolished=0
"""
async with MysqlUtil() as conn:
total_kwh = await conn.fetchone(sql, args=(cid, ))
total_kwh = await conn.fetchone(sql, args=(cid,))
return total_kwh
......@@ -40,7 +40,7 @@ async def get_kwh_charge(table_name, name, value, start, end):
f"FROM {table_name} where create_time>='{start}' and " \
f"create_time<='{end}' and {name} = %s"
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql, args=(value, ))
datas = await conn.fetchone(sql, args=(value,))
return datas
......@@ -138,132 +138,46 @@ async def power_charge_p_aggs(date_start, date_end, cid_list, interval):
"""
date_histogram,
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cid_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
},
"p": {
"sum": {
"field": "p"
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re["aggregations"]["quarter_time"]["buckets"]
if interval == "hour":
time_fmt = "%%Y-%%m-%%d %%H"
elif interval == "day":
time_fmt = "%%Y-%%m-%%d"
else:
time_fmt = "%%Y-%%m-%%d %%H:%%i"
sql = f"""
select date_format(create_time,"{time_fmt}") as create_time,sum(kwh)
kwh,sum(charge) charge,sum(p) p
from company_15min_power
where cid in %s and create_time >= %s and create_time <= %s
group by date_format(create_time,"{time_fmt}")
"""
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,
args=(cid_list, date_start, date_end))
return results or []
async def power_charge_p_cid_aggs(date_start, date_end, cid_list, interval):
"""
excel下载, 按照cid,date_histogram两次聚合,求电量电费
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cid_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"cids": {
"terms": {
"field": "cid",
"size": 1000
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
},
"p": {
"sum": {
"field": "p"
}
}
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re["aggregations"]["cids"]["buckets"]
if interval == "hour":
time_fmt = "%%Y-%%m-%%d %%H"
elif interval == "day":
time_fmt = "%%Y-%%m-%%d"
else:
time_fmt = "%%Y-%%m-%%d %%H:%%i"
sql = f"""
select cid,date_format(create_time,"{time_fmt}") as create_time,
sum(kwh) kwh,sum(charge) charge,sum(p) p
from company_15min_power
where cid in %s and create_time >= %s and create_time <= %s
group by cid,date_format(create_time,"{time_fmt}")
"""
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,
args=(cid_list, date_start, date_end))
return results or []
async def query_charge_aggs_points(date_start, date_end, point_list):
......@@ -348,7 +262,7 @@ async def query_charge_aggs_points_new15(start, end, point_list):
f"where pid in %s and create_time BETWEEN '{start}' and '{end}' " \
f"GROUP BY pid"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(point_list, ))
datas = await conn.fetchall(sql, args=(point_list,))
return datas
......@@ -495,7 +409,6 @@ async def point_aggs_kwh(point_list, start=None, end=None):
return data
async def point_aggs_kwh_new15(point_list, start=None, end=None):
"""1.5版本根据pid,求电量电费"""
if start and end:
......@@ -506,7 +419,7 @@ async def point_aggs_kwh_new15(point_list, start=None, end=None):
sql = "SELECT sum(kwh) kwh,sum(charge) charge FROM point_15min_power" \
" where pid in %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(point_list, ))
data = await conn.fetchone(sql, args=(point_list,))
return data
......
......@@ -229,7 +229,8 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
continue
except Exception as e:
log.error(e)
log.info(f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
log.info(
f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
continue
charge = round_2(info.get("charge")["value"])
......@@ -241,7 +242,8 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
except Exception as e:
log.error(e)
log.info("本次有数据, 上周期没有数据")
log.info(f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
log.info(
f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
continue
price = round_2(charge / kwh)
price_last = round_2(charge_last / kwh_last)
......
......@@ -113,11 +113,11 @@ def by_slots(slots, es_re_dic):
for slot in slots:
if slot in es_re_dic:
# 1.每个时间点,电量信息
kwh_value = round_2(es_re_dic[slot].get("kwh").get("value"))
kwh_value = round_2(es_re_dic[slot].get("kwh"))
# 2.每个时间点,电费信息
charge_value = round_2(es_re_dic[slot].get("charge").get("value"))
charge_value = round_2(es_re_dic[slot].get("charge"))
# 3. 功率
p_value = round_2(es_re_dic[slot].get("p").get("value"))
p_value = round_2(es_re_dic[slot].get("p"))
# 4. 电价
try:
price_value = round_2(charge_value / kwh_value)
......@@ -141,14 +141,16 @@ async def power_charge_p_proxy(cid_list, start, end, date_type, interval):
# 1. 96个点数据
slots_96 = SLOTS_15MIN
es_re_96 = await power_charge_p_aggs(start, end, cid_list, "15m")
es_re_96_dic = es_process(es_re_96, fmat="HH:mm") # 为了es结果和slots对应
es_re_96_dic = es_process(es_re_96, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_96, es_re_96_dic)
# 96个点,工厂电量电费
# 2. 24个点数据
slots_24 = SLOTS[date_type]
es_re_24 = await power_charge_p_aggs(start, end, cid_list, "hour")
es_re_24_dic = es_process(es_re_24, fmat="HH:mm") # 为了es结果和slots对应
es_re_24_dic = es_process(es_re_24, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24, es_re_24_dic)
# elif date_type == "month":
......
import pendulum
from unify_api.constants import POINT_LEVEL_MAP
from unify_api.modules.common.components.common_cps import LevelResp
from unify_api.modules.common.procedures.points import points_by_storeys
......@@ -6,6 +8,8 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, query_charge_aggs_points_new15
from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points
from unify_api.modules.home_page.procedures.count_info_pds import current_load, \
current_load_new15
from unify_api.utils.common_utils import round_2, division_two
......@@ -101,4 +105,11 @@ async def kwh_card_level_service(cid, point_list, start, end):
)
async def load_info_service(cid_list):
# 实时负荷
cur_load = await current_load_new15(cid_list)
yesterday_dt = pendulum.now(tz="Asia/Shanghai").subtract(days=1)
yes_load = await current_load_new15(cid_list, yesterday_dt)
load_percent = round((cur_load - yes_load) / yes_load,
2) if cur_load and yes_load else ""
return cur_load, yes_load, load_percent
......@@ -16,14 +16,16 @@ from unify_api.modules.elec_charge.components.elec_charge_cps import \
power_overview_example, PricePolicyReq, PricePolicyResp, \
PricePolicy, AverPriceReq, PowerViewRes, Spvf, AverPriceResp, ChargeKwh, \
IndexChargeReq, IndexChargeResp, PopReq, PopResp, MtpResp, PspResp, \
IpspResp, KpReq, KpResp, KclReq, ProductProxyReq
IpspResp, KpReq, KpResp, KclReq, ProductProxyReq, LoadInfoReq, LoadInfoResp
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, get_kwh_charge, query_charge_aggs_points_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy_new15, \
quarters_trans, power_overview_proxy, total_value, \
power_aggs_cid_proxy_new15, \
power_index_cid_proxy_new15, power_overview_proxy15
from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service
kwh_points_service, kwh_card_level_service, load_info_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.common_utils import round_2, round_4, NumListHelper
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.request_util import filed_value_from_list
......@@ -289,7 +291,7 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
proxy_id = body.proxy_id
host = req.host
product = PRODUCT.get(host)
user_id = req.ctx.user_id
user_id = jwt_user(req)
# 全部工厂
if not cid_list:
log.info(f"power_overview_proxy根据用户userId:{user_id} "
......@@ -314,7 +316,8 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
# 获取上一周期开始结束时间
start_last, end_last = last_time_str(start, end, date_type)
power, charge = await power_overview_proxy15(start, end, cid_list)
power_last, charge_last = await power_overview_proxy15(start_last, end_last,
power_last, charge_last = await power_overview_proxy15(start_last,
end_last,
cid_list)
if not all([power, charge, power_last, charge_last]):
return PopResp(power=Spvf(), charge=Spvf())
......@@ -351,7 +354,7 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
# 1. 获取参数
host = req.host
product = PRODUCT.get(host)
user_id = req.ctx.user_id
user_id = jwt_user(req)
# cid_list = await get_cids(user_id, product)
proxy_id = body.proxy_id
cid_list = await get_proxy_cids(user_id, product, proxy_id) \
......@@ -364,7 +367,7 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
today_start, today_end, month_start, month_end = today_month_date()
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
"month")
"month", True)
this_month_p, this_month_charge = await power_overview_proxy15(
month_start, month_end, cid_list)
last_month_p, last_month_charge = await power_overview_proxy15(
......@@ -541,3 +544,31 @@ async def post_kwh_card_level(req, body: KclReq) -> LevelResp:
start = body.start
end = body.end
return await kwh_card_level_service(cid, point_list, start, end)
@summary("获取知电管理版首页负荷信息")
async def post_load_info(request, body: LoadInfoReq) -> LoadInfoResp:
# 1. 获取company_id
# 1. 获取参数
product = PRODUCT.get(request.host)
user_id = jwt_user(request)
proxy_id = body.proxy_id
cid_list = await get_proxy_cids(user_id, product, proxy_id) \
if proxy_id else None
# 全部工厂
if not cid_list:
log.info(f"未查询到工厂, userId:{user_id} product:{product}")
return LoadInfoResp()
try:
# 实时负荷,昨日同时负荷,对比昨日
current_load, yesterday_load, load_percent = await load_info_service(
cid_list)
except Exception as e:
log.exception(e)
return LoadInfoResp().server_error()
return LoadInfoResp(
current_load=current_load,
yesterday_load=yesterday_load,
load_percent=load_percent
)
import copy
import json
from itertools import groupby
from operator import itemgetter
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
......@@ -50,7 +53,8 @@ async def post_power_statis_proxy(req,
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
......@@ -63,13 +67,15 @@ async def post_power_statis_proxy(req,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, point_id,
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list,
point_id,
last_start,
last_end,
date_type)
......@@ -79,7 +85,8 @@ async def post_power_statis_proxy(req,
last_aver_price=last_aver_price)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -92,7 +99,8 @@ async def post_power_statis_proxy(req,
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end,
date_type)
# 负荷曲线
......@@ -102,7 +110,8 @@ async def post_power_statis_proxy(req,
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
......@@ -427,7 +436,8 @@ async def get_power_statist_download(req):
slots_24 = SLOTS[date_type]
# 1. 96个点, 电量\电费\平均功率\电价
es_re_96 = await power_charge_p_aggs(start, end, cid_list, "15m")
es_re_96_dic = es_process(es_re_96, fmat="HH:mm") # 为了es结果和slots对应
es_re_96_dic = es_process(es_re_96, "HH:mm", time_key="create_time")
# 为了es结果和slots对应
kwh_96, charge_96, p_96, price_96 = by_slots(slots_96, es_re_96_dic)
p_96.append("")
price_96.append("")
......@@ -445,13 +455,13 @@ async def get_power_statist_download(req):
power_96_dic = {"电量(万kWh)": slots_96_zj}
charge_96_dic = {"电费(万元)": slots_96_zj}
has_power_cids = [] # es查询有数据的工厂
for info in res_cid_96:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_96, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic = es_process(info["quarter_time"]["buckets"],
fmat="HH:mm")
info_dic = es_process(cid_one, fmat="HH:mm",
time_key="create_time")
kwh_96, charge_96, p_96, price_96 = by_slots(slots_96,
info_dic)
power_96_dic[cid_name] = division_down(kwh_96)
......@@ -470,7 +480,8 @@ async def get_power_statist_download(req):
df2.to_excel(writer, sheet_name="96个点电费", index=False)
# 3. 24个点, 电量\电费\平均功率\电价
es_re_24 = await power_charge_p_aggs(start, end, cid_list, "hour")
es_re_24_dic = es_process(es_re_24, fmat="HH:mm") # 为了es结果和slots对应
es_re_24_dic = es_process(es_re_24, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24, es_re_24_dic)
slots_24_zj = copy.deepcopy(slots_24)
slots_24_zj.append("总计")
......@@ -491,13 +502,13 @@ async def get_power_statist_download(req):
power_24_dic = {"电量(万kWh)": slots_24_zj}
charge_24_dic = {"电费(万元)": slots_24_zj}
has_power_cids_24 = [] # es查询有数据的工厂
for info in res_cid_24:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_24, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids_24.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic_24 = es_process(info["quarter_time"]["buckets"],
fmat="HH:mm")
info_dic_24 = es_process(cid_one, fmat="HH:mm",
time_key="create_time")
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24,
info_dic_24)
power_24_dic[cid_name] = division_down(kwh_24)
......@@ -521,7 +532,8 @@ async def get_power_statist_download(req):
slots_m_p = copy.deepcopy(slots_m)
slots_m_p.append("总计")
es_re_m = await power_charge_p_aggs(start, end, cid_list, "day")
es_re_m_dic = es_process(es_re_m, fmat="MM-DD") # 为了es结果和slots对应
es_re_m_dic = es_process(es_re_m, fmat="MM-DD",
time_key="create_time") # 为了es结果和slots对应
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, es_re_m_dic)
price_m.append("")
dict_m = {
......@@ -537,13 +549,13 @@ async def get_power_statist_download(req):
power_m_dic = {"电量(万kWh)": slots_m_p}
charge_m_dic = {"电费(万元)": slots_m_p}
has_power_cids = [] # es查询有数据的工厂
for info in res_cid_m:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_m, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic = es_process(info["quarter_time"]["buckets"],
fmat="MM-DD")
info_dic = es_process(cid_one, fmat="MM-DD",
time_key="create_time")
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, info_dic)
power_m_dic[cid_name] = division_down(kwh_m)
charge_m_dic[cid_name] = division_down(charge_m)
......
......@@ -5,7 +5,7 @@ async def monitor_point_join_by_points(points):
"""monitor和point关联"""
sql = "SELECT m.mtid,p.ctnum,m.name, m.m_type, p.pid,p.cid " \
"FROM monitor m inner join point p on m.mtid = p.mtid " \
"WHERE p.pid in %s and m.demolished = 0 order by field(p.pid,{})".\
"WHERE p.pid in %s and m.demolished = 0 order by field(p.pid,{})". \
format(str(points).replace("[", "").replace("]", ""))
async with MysqlUtil() as conn:
monitor_point_list = await conn.fetchall(sql, args=(tuple(points),))
......@@ -26,7 +26,7 @@ async def get_electric_datas_dao(table_name, pid, start, end):
sql = f"SELECT * FROM {table_name} where pid=%s and create_time " \
f"BETWEEN '{start}' and '{end}' ORDER BY create_time desc"
async with MysqlUtil() as conn:
electric_datas = await conn.fetchall(sql, args=(pid, ))
electric_datas = await conn.fetchall(sql, args=(pid,))
return electric_datas
......@@ -35,7 +35,7 @@ async def get_qual_history_dao(table_name, pid, start, end, date_format):
f"p.* FROM {table_name} p where p.pid=%s and p.create_time " \
f"BETWEEN '{start}' and '{end}' order by p.create_time"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid, ))
datas = await conn.fetchall(sql, args=(pid,))
return datas
......@@ -51,6 +51,12 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format):
async def get_elec_mtid_sid_by_cid(cid):
if isinstance(cid, tuple):
cid_tuple = cid
elif isinstance(cid, list):
cid_tuple = tuple(cid)
else:
cid_tuple = (cid,)
sql = (
f"""
SELECT
......@@ -59,9 +65,9 @@ async def get_elec_mtid_sid_by_cid(cid):
FROM
monitor
WHERE
cid = {cid};
cid in %s;
"""
)
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
datas = await conn.fetchall(sql, args=(cid_tuple,))
return datas if datas else []
......@@ -212,7 +212,8 @@ async def elec_card_level_service(point_list):
# point_mid = await batch_get_wiring_type(point_list)
# # 3. 获取redis数据
# res_redis = await elec_current_data(point_mid)
mtids = [monitor["mtid"] for monitor in monitor_point_list if monitor["mtid"]]
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
results = await elec_current_data_new15(mtids, cid)
# 4. 返回数据
......@@ -773,22 +774,22 @@ async def elec_index_service_new15(cid, point_id, start, end):
"ua_dev_mean", "ua_dev_min", "ua_dev_max",
"freq_dev_mean", "freq_dev_min", "freq_dev_max"]
datas = await get_electric_datas_dao(table_name, point_id, start, end)
if not datas:
return ElecIndexResponse(
ctnum=ctnum, common_indexes=[],
elec_qual_indexes=[]
)
# if not datas:
# return ElecIndexResponse(
# ctnum=ctnum, common_indexes=[],
# elec_qual_indexes=[]
# )
df = pd.DataFrame(list(datas))
# 常规参数统计
common_indexes = []
_common_items = {i.rsplit("_", 1)[0] for i in common_items}
for item in _common_items:
item_name = item.rsplit("_", 1)[0]
if datas:
max_item_name = f"{item}_max"
max_value = df[max_item_name].max()
if not pd.isna(max_value):
max_datas = df.loc[df[max_item_name].idxmax()].to_dict()
max_time = max_datas.get(f"{item_name}_max_time")
max_time = max_datas.get(f"{item}_max_time")
max_time = '' if pd.isnull(max_time) else str(max_time)
else:
max_value, max_time = "", ""
......@@ -796,7 +797,7 @@ async def elec_index_service_new15(cid, point_id, start, end):
min_value = df[min_item_name].min()
if not pd.isna(min_value):
min_datas = df.loc[df[min_item_name].idxmin()].to_dict()
min_time = min_datas.get(f"{item_name}_min_time")
min_time = min_datas.get(f"{item}_min_time")
min_time = '' if pd.isnull(min_time) else str(min_time)
else:
min_value, min_time = "", ""
......@@ -807,24 +808,33 @@ async def elec_index_service_new15(cid, point_id, start, end):
else:
avg_value = ""
elec_index = ElecIndex(
stats_index=item_name,
stats_index=item,
max=max_value,
max_time=max_time or "",
min=min_value,
min_time=min_time or "",
avg=avg_value,
)
else:
elec_index = ElecIndex(
stats_index=item,
max="",
max_time="",
min="",
min_time="",
avg="",
)
common_indexes.append(elec_index)
# 电能质量统计
elec_qual_indexes = []
_elec_qual_items = {i.rsplit("_", 1)[0] for i in elec_qual_items}
for item in _elec_qual_items:
item_name = item.rsplit("_", 1)[0]
if datas:
max_item_name = f"{item}_max"
max_value = df[max_item_name].max()
if not pd.isna(max_value):
max_datas = df.loc[df[max_item_name].idxmax()].to_dict()
max_time = max_datas.get(f"{item_name}_max_time")
max_time = max_datas.get(f"{item}_max_time")
max_time = '' if pd.isnull(max_time) else str(max_time)
else:
max_value, max_time = "", ""
......@@ -832,7 +842,7 @@ async def elec_index_service_new15(cid, point_id, start, end):
min_value = df[min_item_name].min()
if not pd.isna(min_value):
min_datas = df.loc[df[min_item_name].idxmin()].to_dict()
min_time = min_datas.get(f"{item_name}_min_time")
min_time = min_datas.get(f"{item}_min_time")
min_time = '' if pd.isnull(min_time) else str(min_time)
else:
min_value, min_time = "", ""
......@@ -843,13 +853,22 @@ async def elec_index_service_new15(cid, point_id, start, end):
else:
avg_value = ""
elec_index = ElecIndex(
stats_index=item_name,
stats_index=item,
max=max_value,
max_time=max_time,
min=min_value,
min_time=min_time,
avg=avg_value,
)
else:
elec_index = ElecIndex(
stats_index=item,
max="",
max_time="",
min="",
min_time="",
avg="",
)
elec_qual_indexes.append(elec_index)
# 小程序需要这漏电流和温度
if cid:
......@@ -877,6 +896,7 @@ async def elec_index_service_new15(cid, point_id, start, end):
elec_qual_indexes=elec_qual_indexes
)
async def elec_current_service_new15(point_id):
# 获取mtid
meter_info = await get_meter_by_point_new15(point_id)
......@@ -903,7 +923,7 @@ async def elec_current_service_new15(point_id):
time_str = str(res["ts"])[0:19]
else:
time_str = time_format.get_datetime_str(0)
return time_str,res
return time_str, res
def get_sdu_i_and_u(res, ctnum):
......
......@@ -3,6 +3,10 @@ import time
from datetime import datetime, timedelta
from math import sqrt
import pendulum
from pot_libs.settings import SETTING
from unify_api.modules.electric.dao.electric_dao import \
get_elec_mtid_sid_by_cid
from unify_api.utils.common_utils import round_2
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.common.components.query import Range, Equal, Filter, PageRequest
......@@ -35,7 +39,9 @@ from unify_api.modules.home_page.dao.count_info_dao import (
from unify_api.modules.electric_optimization.dao.power_index import (
price_policy_by_cid
)
from unify_api.utils.taos_new import parse_td_columns
from unify_api.utils.taos_new import parse_td_columns, get_td_table_name, \
td3_tbl_compate, get_td_engine_data
async def other_info(company_id):
"""
......@@ -337,7 +343,7 @@ async def electric_use_info_new15(cid):
f"GROUP BY importance"
first_score, second_score, third_score = 0, 0, 0
async with MysqlUtil() as conn:
score_datas = await conn.fetchall(score_sql, args=(cid, ))
score_datas = await conn.fetchall(score_sql, args=(cid,))
alarm_datas = await conn.fetchall(alarm_sql, args=(cid, score_events))
for data in score_datas:
if data["importance"] == Importance.First.value:
......@@ -598,39 +604,46 @@ async def current_load(company_id):
return ""
async def current_load_new15(cid):
async def current_load_new15(cid, end_dt=None):
"""实时负荷"""
import re
from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
datas = await get_elec_mtid_sid_by_cid(cid)
td_mt_tables = tuple(
(get_td_table_name("electric", data["mtid"]) for data in datas if
data["mtid"]))
td_mt_tables = td3_tbl_compate(td_mt_tables)
if not end_dt:
end_dt = pendulum.now(tz="Asia/Shanghai")
start_dt = end_dt.subtract(minutes=2)
sql = f"select last_row(mdptime, pttl) from electric_stb " \
f"where TBNAME IN {td_mt_tables} and ts>='{str(start_dt)}' and ts " \
f"<='{str(end_dt)}' group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select last_row(*) from electric_stb
where cpyid={cid}
group by tbname
"""
is_succ, results = await get_td_engine_data(url, sql)
now_tt = int(time.time())
if is_succ:
if not is_succ:
return ""
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
(f"s{data['sid'].lower()}_e" for data in datas if data["sid"]))
td_s_tables = td3_tbl_compate(td_s_tables)
sql = f"select last_row(mdptime, pttl) from electric_stb " \
f"where TBNAME IN {td_s_tables} group by tbname"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
return ""
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
total = 0
for item in datas:
# 这里是有可能item为None的
if item:
mdptime_tt = None
if "mdptime" in item:
mdptime_dt = pendulum.parse(item["mdptime"])
item_tt = item.get("timestamp") or mdptime_dt.int_timestamp
if item_tt:
# 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60:
if not item:
continue
total += item["pttl"]
return total
return ""
async def power_count_info(company_id):
......@@ -769,7 +782,8 @@ async def company_power_use_info_new15(company_id, start, end):
async def get_company_charge_price(company_id, es_time_start, es_time_end):
power_use_info = await company_power_use_info_new15(company_id, es_time_start,
power_use_info = await company_power_use_info_new15(company_id,
es_time_start,
es_time_end)
if power_use_info["kwh"]:
unit_price = power_use_info["charge"] / power_use_info["kwh"]
......@@ -835,7 +849,7 @@ async def power_charge_price_new15(cid):
year = now.year - 1
last_month_start = datetime(year=year, month=last_month, day=1)
else:
last_month_start = datetime(year=now.year, month=now.month-1, day=1)
last_month_start = datetime(year=now.year, month=now.month - 1, day=1)
last_month_end = datetime(year=now.year, month=now.month, day=1)
last_month_datas = await company_power_use_info_new15(
cid, str(last_month_start), str(last_month_end)
......@@ -934,7 +948,7 @@ async def power_factor_new15(cid):
"""首页获取实时功率因数, 上月功率因数"""
point_sql = "select pid,inlid from point where cid=%s and add_to_company=1"
async with MysqlUtil() as conn:
points = await conn.fetchall(point_sql, args=(cid, ))
points = await conn.fetchall(point_sql, args=(cid,))
point_ids = [i["pid"] for i in points]
now = datetime.now()
if now.month == 1:
......@@ -961,7 +975,7 @@ async def power_factor_new15(cid):
dt = pendulum.now(tz="Asia/Shanghai")
tstamp = dt.int_timestamp // (15 * 60) * (15 * 60)
dt = pendulum.from_timestamp(tstamp, tz="Asia/Shanghai")
end_dt = (dt-timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S")
end_dt = (dt - timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S")
str_dt = dt.strftime("%Y-%m-%d %H:%M:%S")
electric_sql = f"SELECT pid,create_time,pttl_mean,qttl_mean FROM " \
f"`point_15min_electric` where create_time in " \
......@@ -1415,7 +1429,7 @@ async def electric_use_info_points_sdu_new15(start, end, points):
f"where pid in %s and event_datetime BETWEEN %s and %s " \
f"and event_type in %s group by importance"
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,args=(points,start,end,
results = await conn.fetchall(sql, args=(points, start, end,
SDU_ALARM_LIST))
first_alarm_cnt = 0
......@@ -1470,7 +1484,8 @@ async def optimization_count_info_new(company_id: int):
)
es_end_time = pendulum.datetime(now.year, now.month, 1).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
power_use_info = await company_power_use_info_new15(company_id, es_start_time,
power_use_info = await company_power_use_info_new15(company_id,
es_start_time,
es_end_time)
month_charge = power_use_info["charge"]
count_info_map = {
......
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_kwh_p_dao(table_name, terms, start, end):
sql = f"SELECT create_time,kwh,p FROM {table_name} WHERE cid in %s " \
f"and create_time BETWEEN '{start}' and '{end}'"
async def get_kwh_p_dao(terms, start_time, end_time,
table_name="company_15min_power",
time_fmt="%%Y-%%m-%%d"):
"""
负荷实际数据
:param terms:
:param start_time:
:param end_time:
:param table_name:
:param time_fmt:
:return:
"""
sql = f"""
select p,kwh,DATE_FORMAT(create_time,"{time_fmt}") as cal_time
from {table_name} where cid in %s and create_time >= %s
and create_time <= %s
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(terms, ))
return datas
result = await conn.fetchall(sql, args=(terms, start_time, end_time))
return result or []
async def get_pred_p_dao(terms, start, end):
sql = f"SELECT create_time,p FROM company_day_ahead_predict " \
f"WHERE cid in %s and create_time BETWEEN '{start}' and '{end}'"
async def get_pred_p_dao(terms, start_time, end_time,
time_fmt="%%Y-%%m-%%d"):
"""
负荷预测数据
:param terms:
:param start_time:
:param end_time:
:param time_fmt:
:return:
"""
sql = f"""
select avg(p) p ,count(*) p_count,DATE_FORMAT(create_time,
"{time_fmt}") as cal_time
from company_day_ahead_predict
where cid in %s and create_time >= %s and create_time <= %s
group by cal_time
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(terms,))
return datas
\ No newline at end of file
result = await conn.fetchall(sql, args=(terms, start_time, end_time))
return result or []
......@@ -5,6 +5,7 @@ from unify_api.modules.load_analysis.components.load_forecast_cps import (
)
from unify_api.modules.load_analysis.service.load_forecast_service import \
load_forecast_service, load_forecast_service_new15
from unify_api.utils.time_format import time_pick_transf_new
@summary("负荷预测")
......@@ -17,4 +18,8 @@ async def post_load_forecast(req, body: ForecastReq) -> ForecastResp:
# 管理版本多个工厂的情况, 兼容能力最强的参数cids, 保留旧有的cid:
cids = body.cids
# return await load_forecast_service(cid, cids, start, end)
return await load_forecast_service_new15(cid, cids, start, end)
terms = cids if cids else [cid]
# 获取时间差
interval, slots = time_pick_transf_new(start, end)
return await load_forecast_service_new15(terms, start, end,
interval, slots)
......@@ -60,6 +60,41 @@ def time_pick_transf(start, end, is_range=0):
return intervel, slots
def time_pick_transf_new(start, end):
"""获取intervel和slots, 详细显示时间轴信息,新接口都使用这个来获取时间"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
diff = end_f.int_timestamp - start_f.int_timestamp
# 1. 计算intervel
# 1.1 区间48小时之内, 返回15min
if diff <= 48 * 3600:
intervel = 15 * 60
# 1.2 区间在60天以内, 返回1day
elif 48 * 3600 < diff <= 60 * 86400:
intervel = 86400
# 1.3 选择年, 返回1个月
else:
intervel = 30 * 86400
# 2. 计算slots
# 2.1 取到点的个数, 比如15min的96个点
slots = []
slot_num = round((end_f.int_timestamp - start_f.int_timestamp) / intervel)
for i in range(slot_num):
# 区间48小时之内
if diff <= 48 * 3600:
dt = start_f.add(minutes=15 * i).format("YYYY-MM-DD HH:mm")
dt_str = str(dt)
# 区间在60天以内
elif 48 * 3600 < diff <= 60 * 86400:
dt = start_f.add(days=1 * i).format("YYYY-MM-DD")
dt_str = str(dt)
else:
dt = start_f.add(months=1 * i).format("YYYY-MM")
dt_str = str(dt)
slots.append(dt_str)
return intervel, slots
def power_slots(start, end):
"""电量电费,用电统计,time=range slots计算"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
......@@ -115,6 +150,7 @@ def proxy_power_slots(start, end, date_format="MM-DD", is_duration=False):
slots.append(dt)
return slots
def day_of_month(start):
"""这个月有几天"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
......@@ -269,7 +305,7 @@ def start_end_date():
def time_str_to_str(date_str, format="HH:mm"):
"""YYYY-MM-DD HH:mm:ss格式转换为format格式"""
start_f = my_pendulum.from_format(date_str, 'YYYY-MM-DD HH:mm:ss')
start_f = my_pendulum.parse(date_str)
start_f = start_f.format(format)
return start_f
......@@ -311,7 +347,7 @@ def convert_to_es_str(str1, format="YYYY-MM-DD HH:mm:ss"):
return str(es_date)
def last_time_str(start, end, date_type):
def last_time_str(start, end, date_type, date_end=False):
"""年月日, 获取上一周期时间"""
if date_type not in ("day", "month", "year"):
return None
......@@ -319,12 +355,21 @@ def last_time_str(start, end, date_type):
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
if date_type == "day":
start_last = start_f.subtract(days=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(days=1)
elif date_type == "month":
start_last = start_f.subtract(months=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(months=1)
else:
start_last = start_f.subtract(years=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(years=1)
return start_last.format("YYYY-MM-DD HH:mm:ss"), end_last.format(
"YYYY-MM-DD HH:mm:ss")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment