Commit cccfde1f authored by ZZH's avatar ZZH

remove es 2023-6-2

parent d059b272
from pot_libs.es_util.es_helper import process_es_data_aiao, \
process_sql_data_aiao
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.es_util.es_helper import process_sql_data_aiao
from pot_libs.logger import log
from unify_api.constants import COMPANY_15MIN_POWER, \
POINT_15MIN_POWER, SLOTS
from unify_api.utils.es_query_body import EsQueryBody, es_process, \
es_time_process, sql_time_process
from unify_api.utils.time_format import time_pick_transf, \
power_slots, convert_to_es_str
from unify_api.constants import SLOTS
from unify_api.utils.es_query_body import es_process, sql_time_process
from unify_api.utils.time_format import time_pick_transf, power_slots
from unify_api.modules.elec_charge.components.elec_statistics_cps import \
SlotValue
from pot_libs.utils.pendulum_wrapper import my_pendulum
......@@ -51,84 +46,7 @@ def interval_type_to_mysql(date_type, start, end):
return '%%Y-%%m-%%d 00:00:00'
async def power_charge(cid_list, point_id, start, end, date_type,
proxy_slots=1):
"""电量电费slot和value"""
if point_id == -1: # 选的全部
terms = {"cid": cid_list}
index = COMPANY_15MIN_POWER
eqb = EsQueryBody(terms=terms, start=start, end=end,
date_key="quarter_time")
else:
matchs = {"pid": point_id}
index = POINT_15MIN_POWER
eqb = EsQueryBody(matchs=matchs, start=start, end=end,
date_key="quarter_time")
# 2.构造query_body
# date_type转换, 用于聚合查询
trans_type = interval_type(date_type, start, end)
query_body = eqb.query_agg_histogram(interval=trans_type)
log.info(index + f"====={query_body}")
# 3.返回数据
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
if not es_re:
return SlotValue(slots=SLOTS[date_type], value=[])
es_re = es_re["aggregations"]["quarter_time"]["buckets"]
# intervel, slots = time_pick_transf(start, end)
# slots = get_slots_between_date(start, end, INTERVAL[date_type])
if date_type == "day":
slots = SLOTS[date_type]
es_re = es_process(es_re, fmat="HH:mm") # 为了es结果和slots对应
elif date_type == "month":
intervel, slots = time_pick_transf(start, end)
# # 知电u管理版, 本月多少天, 就多少slots
# if proxy_slots:
# slots = proxy_power_slots(start, end)
es_re = es_process(es_re, fmat="MM-DD") # 为了es结果和slots对应
elif date_type == "year":
intervel, slots = time_pick_transf(start, end)
es_re = es_process(es_re, fmat="YYYY-MM") # 为了es结果和slots对应
else: # range
# 在计算slots时,增加返回re_type,方便和es对应数据
slots, re_type = power_slots(start, end)
if re_type == "day":
es_re = es_process(es_re, fmat="YYYY-MM-DD HH:mm")
else:
es_re = es_process(es_re, fmat="YYYY-MM-DD")
# 拼接slot, value返回
kwh_sv = SlotValue() # 电量对象
kwh_sv.slots = slots
kwh_list = []
charge_sv = SlotValue() # 电费对象
charge_sv.slots = slots
charge_list = []
for slot in slots:
if slot in es_re:
# 1.每个时间点,电量信息
kwh_value = es_re[slot].get("kwh").get("sum")
# 值为0是正常数据
if kwh_value == 0:
kwh_list.append(0.0)
else:
kwh_list.append(round(kwh_value, 2) if kwh_value else "")
# 2.每个时间点,电费信息
charge_value = es_re[slot].get("charge").get("sum")
# 值为0是正常数据
if charge_value == 0:
charge_list.append(0.0)
else:
charge_list.append(
round(charge_value, 2) if charge_value else "")
else:
kwh_list.append("")
charge_list.append("")
kwh_sv.value = kwh_list
charge_sv.value = charge_list
return kwh_sv, charge_sv
async def power_charge_new15(cid_list, point_id, start, end, date_type,
async def load_power_charge(cid_list, point_id, start, end, date_type,
proxy_slots=1):
if point_id == -1: # 选的全部
table_name = "company_15min_power"
......@@ -191,62 +109,6 @@ async def power_charge_new15(cid_list, point_id, start, end, date_type,
async def power_charge_download(cid, point_id, start, end):
"""电量电费下载,需要15min每个点的数据"""
# 2. cid、pid条件
if point_id == -1: # 选的全部
matchs = {"cid": cid}
index = COMPANY_15MIN_POWER
else:
matchs = {"pid": point_id}
index = POINT_15MIN_POWER
# 2.构造query_body
eqb = EsQueryBody(matchs=matchs, start=start, end=end,
date_key="quarter_time", size=1000)
query_body = eqb.query()
log.info(index + f"====={query_body}")
# 3.查询es
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
es_re = es_re["hits"]["hits"]
if not es_re:
return SlotValue(slots=[], value=[]), SlotValue(slots=[], value=[])
es_re = process_es_data_aiao(es_re, key="quarter_time", key_format="")
# 4.获取slots
intervel, slots = time_pick_transf(start, end, is_range=1)
# 5.构造返回
kwh_sv = SlotValue() # 电量对象
kwh_sv.slots = slots
kwh_list = []
charge_sv = SlotValue() # 电费对象
charge_sv.slots = slots
charge_list = []
for slot in slots:
slot = convert_to_es_str(slot, format="YYYY-MM-DD HH:mm") # 转换为es格式
if slot in es_re:
# 1.每个时间点,电量信息
kwh_value = es_re[slot]["_source"].get("kwh")
# 值为0是正常数据
if kwh_value == 0:
kwh_list.append(0.0)
else:
kwh_list.append(round(kwh_value, 2) if kwh_value else "")
# 2.每个时间点,电费信息
charge_value = es_re[slot]["_source"].get("charge")
# 值为0是正常数据
if charge_value == 0:
charge_list.append(0.0)
else:
charge_list.append(
round(charge_value, 2) if charge_value else "")
else:
kwh_list.append("")
charge_list.append("")
kwh_sv.value = kwh_list
charge_sv.value = charge_list
return kwh_sv, charge_sv
async def power_charge_download_new15(cid, point_id, start, end):
if point_id == -1: # 选的全部
table_name = "company_15min_power"
sql_mid = f"cid = {cid}"
......@@ -290,55 +152,7 @@ async def power_charge_download_new15(cid, point_id, start, end):
return kwh_sv, charge_sv
async def today_yesterday_p(cid, point_id, start, end, is_range=0):
"""今日和昨日负荷"""
if point_id == -1: # 选的全部
matchs = {"cid": cid}
index = COMPANY_15MIN_POWER
else:
matchs = {"pid": point_id}
index = POINT_15MIN_POWER
# 1.构造query_body
query = EsQueryBody(matchs=matchs, start=start, end=end,
date_key="quarter_time", size=1000)
query_body = query.query()
log.info(index + f"====={query_body}")
# 2. 查询es
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
if not es_re:
return SlotValue()
# 3.获取slots
if is_range:
intervel, slots = time_pick_transf(start, end, is_range)
else:
intervel, slots = time_pick_transf(start, end)
# 4.为了es结果和slots对应
es_re = es_re["hits"]["hits"]
if is_range: # range时间格式包含年月日
es_re = es_time_process(es_re, fmt="%Y-%m-%d %H:%M")
else:
es_re = es_time_process(es_re, fmt="%H:%M")
sv = SlotValue() # 今日负荷对象
sv.slots = slots
tmp_list = []
# 5.拼接返回
for slot in slots:
if slot in es_re:
# 1.每个时间点,电量信息
value = es_re[slot].get("p")
# 值为0是正常数据
if value == 0:
tmp_list.append(0.0)
else:
tmp_list.append(round(value, 2) if value else "")
else:
tmp_list.append("")
sv.value = tmp_list
return sv
async def today_yesterday_p_new15(cid, point_id, start, end, is_range=0):
async def today_yesterday_load(cid, point_id, start, end, is_range=0):
"""今日和昨日负荷"""
if point_id == -1: # 选的全部
table_name = "company_15min_power"
......
from unify_api.modules.elec_charge.common.utils import \
power_charge, power_charge_download, today_yesterday_p, aver_price, \
power_charge_download_new15, power_charge_new15, today_yesterday_p_new15
aver_price, power_charge_download, load_power_charge, today_yesterday_load
from unify_api.modules.elec_charge.components.elec_statistics_cps import \
SlotValue
from unify_api.utils.time_format import last_time_str
......@@ -11,111 +10,30 @@ from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.utils.common_utils import round_2
async def power_charge_stati_service(cid, point_id, start, end, date_type):
async def power_charge_stats_srv(cid, point_id, start, end, date_type):
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
# 智电u增加碳足迹 电量*0.754
co2_slot = kwh_sv.slots
co2_value = [round_2(i * CO2_N) if i else i for i in kwh_sv.value]
co2_sv = SlotValue(slots=co2_slot, value=co2_value)
# 需要增加15min电量电费
kwh_sv_15min, charge_sv_15min = await power_charge_download(cid,
point_id,
start,
end)
# 今日/昨日负荷曲线
today_p = await today_yesterday_p(cid, point_id, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await today_yesterday_p(cid, point_id, ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p, kwh_15min=kwh_sv_15min,
charge_15min=charge_sv_15min, co2=co2_sv)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 智电u增加碳足迹 电量*0.754
co2_slot = kwh_sv.slots
co2_value = [round_2(i * CO2_N) if i else i for i in kwh_sv.value]
co2_sv = SlotValue(slots=co2_slot, value=co2_value)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge([cid], point_id,
last_start,
last_end,
date_type)
last_aver_price = aver_price(last_kwh_sv, last_charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price,
last_aver_price=last_aver_price, co2=co2_sv)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price)
else:
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
diff_mm = (end_f - start_f).in_minutes()
if diff_mm <= 48 * 60:
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
# 负荷曲线
this_p = await today_yesterday_p(cid, point_id, start, end,
is_range=1)
# 需要增加15min电量电费
kwh_sv_15min, charge_sv_15min = await power_charge_download(cid,
point_id,
start,
end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p,
kwh_15min=kwh_sv_15min,
charge_15min=charge_sv_15min)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price)
async def power_charge_stati_service_new15(cid, point_id, start, end, date_type):
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], point_id, start,
kwh_sv, charge_sv = await load_power_charge([cid], point_id, start,
end, date_type)
# 智电u增加碳足迹 电量*0.754
co2_slot = kwh_sv.slots
co2_value = [round_2(i * CO2_N) if i else i for i in kwh_sv.value]
co2_sv = SlotValue(slots=co2_slot, value=co2_value)
# 需要增加15min电量电费
kwh_sv_15min, charge_sv_15min = await power_charge_download_new15(
kwh_sv_15min, charge_sv_15min = await power_charge_download(
cid, point_id, start, end)
# 今日/昨日负荷曲线
today_p = await today_yesterday_p_new15(cid, point_id, start, end)
today_p = await today_yesterday_load(cid, point_id, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await today_yesterday_p_new15(cid, point_id, ysd_start,
yesterday_p = await today_yesterday_load(cid, point_id, ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p, kwh_15min=kwh_sv_15min,
charge_15min=charge_sv_15min, co2=co2_sv)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15([cid], point_id, start,
kwh_sv, charge_sv = await load_power_charge([cid], point_id, start,
end, date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 智电u增加碳足迹 电量*0.754
......@@ -125,15 +43,17 @@ async def power_charge_stati_service_new15(cid, point_id, start, end, date_type)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(
[cid], point_id, last_start, last_end, date_type)
last_kwh_sv, last_charge_sv = await load_power_charge([cid], point_id,
last_start,
last_end,
date_type)
last_aver_price = aver_price(last_kwh_sv, last_charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price,
last_aver_price=last_aver_price, co2=co2_sv)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], point_id, start,
kwh_sv, charge_sv = await load_power_charge([cid], point_id, start,
end, date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -146,20 +66,20 @@ async def power_charge_stati_service_new15(cid, point_id, start, end, date_type)
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], point_id,
kwh_sv, charge_sv = await load_power_charge([cid], point_id,
start, end, date_type)
# 负荷曲线
this_p = await today_yesterday_p_new15(cid, point_id, start, end,
this_p = await today_yesterday_load(cid, point_id, start, end,
is_range=1)
# 需要增加15min电量电费
kwh_sv_15min, charge_sv_15min = await power_charge_download_new15(
kwh_sv_15min, charge_sv_15min = await power_charge_download(
cid, point_id, start, end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p,
kwh_15min=kwh_sv_15min,
charge_15min=charge_sv_15min)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], point_id,
kwh_sv, charge_sv = await load_power_charge([cid], point_id,
start, end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
......
......@@ -22,7 +22,7 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
from unify_api.modules.elec_charge.procedures.elec_statis_proxy_pds import \
proxy_today_yesterday_p_new15, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, \
power_charge, power_charge_new15
load_power_charge
from unify_api.utils.common_utils import process_es_data
from unify_api.utils.es_query_body import es_process
from unify_api.utils.time_format import last_time_str, proxy_power_slots, \
......@@ -32,8 +32,7 @@ import pandas as pd
@summary('代理版电量电费统计曲线')
async def post_power_statis_proxy(req,
body: StatisProxyReq) -> PcStatiResp:
async def post_power_statis_proxy(req, body: StatisProxyReq) -> PcStatiResp:
# 1.获取参数
product = req.json.get("product")
proxy_id = body.proxy_id
......@@ -53,28 +52,25 @@ async def post_power_statis_proxy(req,
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id, start,
end, date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
yesterday_p = await proxy_today_yesterday_p_new15(cid_list, ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id, start,
end, date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list,
last_kwh_sv, last_charge_sv = await load_power_charge(cid_list,
point_id,
last_start,
last_end,
......@@ -85,9 +81,8 @@ async def post_power_statis_proxy(req,
last_aver_price=last_aver_price)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id, start,
end, date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price)
......@@ -99,10 +94,8 @@ async def post_power_statis_proxy(req,
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end,
date_type)
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id,
start, end, date_type)
# 负荷曲线
this_p = await proxy_today_yesterday_p_new15(cid_list, start,
end)
......@@ -110,9 +103,8 @@ async def post_power_statis_proxy(req,
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end, date_type)
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id,
start, end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -369,7 +361,7 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
date_type = "month"
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
kwh_sv, charge_sv = await load_power_charge(cid_list, -1, start, end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
......@@ -382,13 +374,13 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
kwh_sv, charge_sv = await load_power_charge(cid_list, -1, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, -1,
last_kwh_sv, last_charge_sv = await load_power_charge(cid_list, -1,
last_start,
last_end,
date_type)
......
from unify_api.modules.elec_charge.common.utils import \
power_charge, max_min_time, power_charge_new15
max_min_time, load_power_charge
from pot_libs.sanic_api import summary
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.common.procedures.pttl_max import load_pttl_max
......@@ -7,7 +7,7 @@ from unify_api.modules.elec_charge.components.elec_statistics_cps import \
PcStatiReq, PcStatiResp, MaxpReq, MaxpResp, PcmResp
from unify_api.utils.common_utils import round_2
from unify_api.modules.elec_charge.service.elec_statistics_service import \
power_charge_stati_service, power_charge_stati_service_new15
power_charge_stats_srv
@summary('电量电费统计曲线')
......@@ -18,19 +18,16 @@ async def post_power_charge_stati(req, body: PcStatiReq) -> PcStatiResp:
start = body.start
end = body.end
date_type = body.date_type
# return await power_charge_stati_service(cid, point_id, start, end,
# date_type)
return await power_charge_stati_service_new15(cid, point_id, start, end,
date_type)
return await power_charge_stats_srv(cid, point_id, start, end, date_type)
@summary("小程序最高负荷/web最大需量")
async def post_max_p(req, body: MaxpReq) -> MaxpResp:
cid = body.cid
point_id = body.point_id
pid = body.point_id
start = body.start
end = body.end
max_val, max_val_time = await load_pttl_max(cid, start, end, point_id=point_id)
max_val, max_val_time = await load_pttl_max(cid, start, end, point_id=pid)
return MaxpResp(maxp=max_val, date_time=max_val_time)
......@@ -42,78 +39,10 @@ async def post_power_charge_min_max(req, body: PcStatiReq) -> PcmResp:
start = body.start
end = body.end
date_type = body.date_type
# return await power_charge_min_max_service(cid, pid, start, end, date_type)
return await power_charge_min_max_service_new15(cid, pid, start, end,
date_type)
async def power_charge_min_max_service(cid, point_id, start, end, date_type):
# 初始化返回值
max_p = {"value": "", "time": ""}
max_kwh = {"value": "", "time": ""}
min_kwh = {"value": "", "time": ""}
max_charge = {"value": "", "time": ""}
min_charge = {"value": "", "time": ""}
# 1. 最高负荷
max_val, max_val_time = await pttl_max(cid, start, end, point_id=point_id)
max_p["value"] = round_2(max_val)
max_p["time"] = max_val_time
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = \
max_min_time(kwh_sv, add_one_index=True)
c_max_v, c_max_time, c_min_v, c_min_time = \
max_min_time(charge_sv, add_one_index=True)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
else:
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
diff_mm = (end_f - start_f).in_minutes()
if diff_mm <= 48 * 60:
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = \
max_min_time(kwh_sv, add_one_index=True, in_2_day=True)
c_max_v, c_max_time, c_min_v, c_min_time = \
max_min_time(charge_sv, add_one_index=True, in_2_day=True)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge([cid], point_id, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
max_kwh["value"] = round_2(k_max_v)
max_kwh["time"] = k_max_time
min_kwh["value"] = round_2(k_min_v)
min_kwh["time"] = k_min_time
max_charge["value"] = round_2(c_max_v)
max_charge["time"] = c_max_time
min_charge["value"] = round_2(c_min_v)
min_charge["time"] = c_min_time
return PcmResp(max_p=max_p, max_kwh=max_kwh, min_kwh=min_kwh,
max_charge=max_charge, min_charge=min_charge)
return await power_charge_min_max_srv(cid, pid, start, end, date_type)
async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
async def power_charge_min_max_srv(cid, pid, start, end, date_type):
# 初始化返回值
max_p = {"value": "", "time": ""}
max_kwh = {"value": "", "time": ""}
......@@ -125,7 +54,7 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
max_p["time"] = max_val_time
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
kwh_sv, charge_sv = await power_charge_new15([cid], pid, start, end,
kwh_sv, charge_sv = await load_power_charge([cid], pid, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = \
max_min_time(kwh_sv, add_one_index=True)
......@@ -133,13 +62,13 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
max_min_time(charge_sv, add_one_index=True)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15([cid], pid, start, end,
kwh_sv, charge_sv = await load_power_charge([cid], pid, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], pid, start, end,
kwh_sv, charge_sv = await load_power_charge([cid], pid, start, end,
date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
......@@ -151,7 +80,7 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], pid, start,
kwh_sv, charge_sv = await load_power_charge([cid], pid, start,
end, date_type)
k_max_v, k_max_time, k_min_v, k_min_time = \
max_min_time(kwh_sv, add_one_index=True, in_2_day=True)
......@@ -159,7 +88,7 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
max_min_time(charge_sv, add_one_index=True, in_2_day=True)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge_new15([cid], pid, start,
kwh_sv, charge_sv = await load_power_charge([cid], pid, start,
end, date_type)
k_max_v, k_max_time, k_min_v, k_min_time = max_min_time(kwh_sv)
c_max_v, c_max_time, c_min_v, c_min_time = max_min_time(charge_sv)
......
......@@ -132,67 +132,6 @@ async def tsp_histogram_day_tsp_id(interval): # todo: 扬尘es 待改
return es_re["aggregations"]["quarter_time"]["buckets"]
async def tsp_histogram_avg(date_start, date_end, interval, tsp_id=None):
"""TSP信息-历史曲线-莫天是1h一个点的平均值"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"pm25": {
"avg": {
"field": "pm25_mean"
}
},
"pm10": {
"avg": {
"field": "pm10_mean"
}
},
"tsp": {
"avg": {
"field": "tsp_mean"
}
}
}
}
}
}
if tsp_id:
query_body["query"]["bool"]["must"].append(
{"term": {"tsp_id": tsp_id}}
)
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
return es_re["aggregations"]["quarter_time"]["buckets"]
async def range_max_value(date_start, date_end):
"""今日最高PM2.5"""
start_es = convert_es_str(date_start)
......
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import SLOTS
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
histogram_aggs_points
from unify_api.modules.tsp_water.dao.drop_dust_dao import sum_water_group
from unify_api.modules.tsp_water.dao.tsp_dao import tsp_histogram_avg
from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2n
from unify_api.utils.es_query_body import es_process
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def per_hour_wave(start, end, tsp_id=None):
"""PM2.5/PM10/TSP每小时或每天曲线数据"""
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
interval = "day"
fmt = "MM-DD"
# 需求是每小时一个点
elif interval == 15 * 60:
slots = SLOTS["day"]
interval = "hour"
fmt = "HH:mm"
else:
raise BusinessException(message="time range not day or month")
# 1. 查询es
es_res = await tsp_histogram_avg(start, end, interval, tsp_id)
es_dic = es_process(es_res, fmat=fmt)
# 2. 组装数据
pm25_list = []
pm10_list = []
tsp_list = []
for slot in slots:
if slot in es_dic:
pm25_value = round_2n(es_dic[slot]["pm25"].get("value"))
pm10_value = round_2n(es_dic[slot]["pm10"].get("value"))
tsp_value = round_2n(es_dic[slot]["tsp"].get("value"))
else:
pm25_value, pm10_value, tsp_value = None, None, None
pm25_list.append(pm25_value)
pm10_list.append(pm10_value)
tsp_list.append(tsp_value)
return pm25_list, pm10_list, tsp_list, slots
async def per_hour_wave_new15(start, end, tsp_id=None):
interval, slots = time_format.time_pick_transf(start, end)
mid_sql = f"tsp_id={tsp_id} and" if tsp_id else ""
if interval == 24 * 3600:
......@@ -80,34 +43,7 @@ async def per_hour_wave_new15(start, end, tsp_id=None):
return pm25_list, pm10_list, tsp_list, slots
async def per_hour_kwh_wave(start, end, tsp_id_list=None):
"""每小时或每天电量曲线数据"""
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
interval = "day"
fmt = "MM-DD"
# 需求是每小时一个点
elif interval == 15 * 60:
slots = SLOTS["day"]
interval = "hour"
fmt = "HH:mm"
else:
raise BusinessException(message="time range not day or month")
# 1. 查询es
es_res = await histogram_aggs_points(start, end, tsp_id_list, interval)
es_dic = es_process(es_res, fmat=fmt)
# 2. 组装数据
kwh_list = []
for slot in slots:
if slot in es_dic:
kwh_value = round_2n(es_dic[slot]["kwh"].get("value"))
else:
kwh_value = None
kwh_list.append(kwh_value)
return kwh_list, slots
async def per_hour_kwh_wave_new15(start, end, pids):
async def per_hour_kwh_wave(start, end, pids):
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
sql = f'SELECT DATE_FORMAT(create_time,"%%m-%%d") date_time, ' \
......@@ -124,7 +60,7 @@ async def per_hour_kwh_wave_new15(start, end, pids):
else:
raise BusinessException(message="time range not day or month")
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pids, ))
datas = await conn.fetchall(sql, args=(pids,))
datas_map = {data["date_time"]: data for data in datas}
# 2. 组装数据
kwh_list = []
......
......@@ -15,8 +15,8 @@ from unify_api.modules.tsp_water.service.tsp_service import day_env_service,\
from unify_api.utils.common_utils import round_2
from unify_api.utils.time_format import srv_time, last7_day_range, \
start_end_date
from unify_api.modules.tsp_water.procedures.tsp_pds import per_hour_wave, \
per_hour_kwh_wave, per_hour_wave_new15, per_hour_kwh_wave_new15
from unify_api.modules.tsp_water.procedures.tsp_pds import \
per_hour_wave, per_hour_kwh_wave
async def post_drop_dust_service(storeys):
......@@ -248,7 +248,7 @@ async def index_today_info_service(cid):
async def index_water_service(start, end, date_type):
# 用电量 pm2.5 pm10 tsp
# pm25_list, pm10_list, tsp_list, slots = await per_hour_wave(start, end)
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave_new15(start, end)
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave(start, end)
water_info = await sum_water_group(start, end, date_type)
water_list = ['' for _ in range(len(slots))]
for index, info in enumerate(water_info):
......@@ -265,8 +265,8 @@ async def index_water_service(start, end, date_type):
async def index_electric_service(cid, start, end):
storey_list = await storey_wp_by_cid(cid)
point_list = [storey["point_id"] for storey in storey_list]
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave_new15(start, end)
kwh_res, slots = await per_hour_kwh_wave_new15(start, end, point_list)
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave(start, end)
kwh_res, slots = await per_hour_kwh_wave(start, end, point_list)
return WsStatiResp(
pm2_5=pm25_list,
pm10=pm10_list,
......
......@@ -18,9 +18,8 @@ from unify_api.modules.tsp_water.dao.tsp_map_dao import \
get_contrast_data_day_dao, get_contrast_data_month_dao, get_cid_tsp_dao
from unify_api.modules.tsp_water.procedures.drop_dust_pds import \
pm2_5_trans_grade, pm10_trans_grade, tsp_trans_grade
from unify_api.modules.tsp_water.procedures.tsp_pds import per_hour_wave, \
per_hour_kwh_wave, per_hour_water_wave, per_hour_kwh_wave_new15, \
per_hour_wave_new15
from unify_api.modules.tsp_water.procedures.tsp_pds import \
per_hour_water_wave, per_hour_kwh_wave, per_hour_wave
from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2, correlation, round_0
from unify_api.utils.time_format import start_end_date
......@@ -373,12 +372,12 @@ async def day_env_service(cid):
async def stat_analysis_service(cid, tsp_id, start, end):
"""统计分析-扬尘"""
# 1. 查询es, 获取tsp信息
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave_new15(
pm25_list, pm10_list, tsp_list, slots = await per_hour_wave(
start, end, tsp_id)
# 2. 获取雾炮电量数据
storey_list = await storey_wp_by_cid(cid)
point_list = [storey["point_id"] for storey in storey_list]
kwh_res, slots = await per_hour_kwh_wave_new15(start, end, point_list)
kwh_res, slots = await per_hour_kwh_wave(start, end, point_list)
r_gun_pm25_value, r_gun_pm25_info = correlation(kwh_res, pm25_list)
r_gun_pm10_value, r_gun_pm10_info = correlation(kwh_res, pm10_list)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment