Commit 313d2526 authored by lcn's avatar lcn

修复Bug

parent 995fd2d3
......@@ -19,7 +19,7 @@ async def point_day_power_dao(cid, start, end):
ORDER BY pp.create_time
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, args=(cid, ))
data = await conn.fetchall(sql, args=(cid,))
return data
......@@ -31,7 +31,7 @@ async def get_total_kwh_dao(cid, start, end):
and pp.create_time<='{end}' and m.demolished=0
"""
async with MysqlUtil() as conn:
total_kwh = await conn.fetchone(sql, args=(cid, ))
total_kwh = await conn.fetchone(sql, args=(cid,))
return total_kwh
......@@ -40,7 +40,7 @@ async def get_kwh_charge(table_name, name, value, start, end):
f"FROM {table_name} where create_time>='{start}' and " \
f"create_time<='{end}' and {name} = %s"
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql, args=(value, ))
datas = await conn.fetchone(sql, args=(value,))
return datas
......@@ -73,7 +73,7 @@ async def query_charge_aggs(date_start, date_end, cid_list):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -138,132 +138,46 @@ async def power_charge_p_aggs(date_start, date_end, cid_list, interval):
"""
date_histogram,
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cid_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
},
"p": {
"sum": {
"field": "p"
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re["aggregations"]["quarter_time"]["buckets"]
if interval == "hour":
time_fmt = "%%Y-%%m-%%d %%H"
elif interval == "day":
time_fmt = "%%Y-%%m-%%d"
else:
time_fmt = "%%Y-%%m-%%d %%H:%%i"
sql = f"""
select date_format(create_time,"{time_fmt}") as create_time,sum(kwh)
kwh,sum(charge) charge,sum(p) p
from company_15min_power
where cid in %s and create_time >= %s and create_time <= %s
group by date_format(create_time,"{time_fmt}")
"""
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,
args=(cid_list, date_start, date_end))
return results or []
async def power_charge_p_cid_aggs(date_start, date_end, cid_list, interval):
"""
excel下载, 按照cid,date_histogram两次聚合,求电量电费
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cid_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"cids": {
"terms": {
"field": "cid",
"size": 1000
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
},
"p": {
"sum": {
"field": "p"
}
}
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re["aggregations"]["cids"]["buckets"]
if interval == "hour":
time_fmt = "%%Y-%%m-%%d %%H"
elif interval == "day":
time_fmt = "%%Y-%%m-%%d"
else:
time_fmt = "%%Y-%%m-%%d %%H:%%i"
sql = f"""
select cid,date_format(create_time,"{time_fmt}") as create_time,
sum(kwh) kwh,sum(charge) charge,sum(p) p
from company_15min_power
where cid in %s and create_time >= %s and create_time <= %s
group by cid,date_format(create_time,"{time_fmt}")
"""
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,
args=(cid_list, date_start, date_end))
return results or []
async def query_charge_aggs_points(date_start, date_end, point_list):
......@@ -293,7 +207,7 @@ async def query_charge_aggs_points(date_start, date_end, point_list):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -348,7 +262,7 @@ async def query_charge_aggs_points_new15(start, end, point_list):
f"where pid in %s and create_time BETWEEN '{start}' and '{end}' " \
f"GROUP BY pid"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(point_list, ))
datas = await conn.fetchall(sql, args=(point_list,))
return datas
......@@ -356,7 +270,7 @@ async def histogram_aggs_points(date_start, date_end, point_list, interval):
"""date_histogram"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -418,7 +332,7 @@ async def power_charge_p_point_aggs(date_start, date_end, pid_list, interval):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -495,7 +409,6 @@ async def point_aggs_kwh(point_list, start=None, end=None):
return data
async def point_aggs_kwh_new15(point_list, start=None, end=None):
"""1.5版本根据pid,求电量电费"""
if start and end:
......@@ -506,7 +419,7 @@ async def point_aggs_kwh_new15(point_list, start=None, end=None):
sql = "SELECT sum(kwh) kwh,sum(charge) charge FROM point_15min_power" \
" where pid in %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(point_list, ))
data = await conn.fetchone(sql, args=(point_list,))
return data
......@@ -516,7 +429,7 @@ async def extended_bounds_agg(date_start, date_end, cid_list, interval):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......
......@@ -113,11 +113,11 @@ def by_slots(slots, es_re_dic):
for slot in slots:
if slot in es_re_dic:
# 1.每个时间点,电量信息
kwh_value = round_2(es_re_dic[slot].get("kwh").get("value"))
kwh_value = round_2(es_re_dic[slot].get("kwh"))
# 2.每个时间点,电费信息
charge_value = round_2(es_re_dic[slot].get("charge").get("value"))
charge_value = round_2(es_re_dic[slot].get("charge"))
# 3. 功率
p_value = round_2(es_re_dic[slot].get("p").get("value"))
p_value = round_2(es_re_dic[slot].get("p"))
# 4. 电价
try:
price_value = round_2(charge_value / kwh_value)
......@@ -141,16 +141,18 @@ async def power_charge_p_proxy(cid_list, start, end, date_type, interval):
# 1. 96个点数据
slots_96 = SLOTS_15MIN
es_re_96 = await power_charge_p_aggs(start, end, cid_list, "15m")
es_re_96_dic = es_process(es_re_96, fmat="HH:mm") # 为了es结果和slots对应
es_re_96_dic = es_process(es_re_96, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_96, es_re_96_dic)
# 96个点,工厂电量电费
# 2. 24个点数据
slots_24 = SLOTS[date_type]
es_re_24 = await power_charge_p_aggs(start, end, cid_list, "hour")
es_re_24_dic = es_process(es_re_24, fmat="HH:mm") # 为了es结果和slots对应
es_re_24_dic = es_process(es_re_24, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24, es_re_24_dic)
# elif date_type == "month":
# intervel, slots = time_pick_transf(start, end)
# es_re = es_process(es_re, fmat="MM-DD") # 为了es结果和slots对应
import copy
import json
from itertools import groupby
from operator import itemgetter
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
......@@ -50,37 +53,41 @@ async def post_power_statis_proxy(req,
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, point_id,
last_start,
last_end,
date_type)
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list,
point_id,
last_start,
last_end,
date_type)
last_aver_price = aver_price(last_kwh_sv, last_charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price,
last_aver_price=last_aver_price)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price)
......@@ -92,18 +99,20 @@ async def post_power_statis_proxy(req,
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end,
date_type)
# 负荷曲线
this_p = await proxy_today_yesterday_p_new15(cid_list, start,
end)
end)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end, date_type)
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id,
start,
end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -361,28 +370,28 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, -1,
last_start,
last_end,
date_type)
last_start,
last_end,
date_type)
last_aver_price = aver_price(last_kwh_sv, last_charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
this_aver_price=this_aver_price,
......@@ -427,7 +436,8 @@ async def get_power_statist_download(req):
slots_24 = SLOTS[date_type]
# 1. 96个点, 电量\电费\平均功率\电价
es_re_96 = await power_charge_p_aggs(start, end, cid_list, "15m")
es_re_96_dic = es_process(es_re_96, fmat="HH:mm") # 为了es结果和slots对应
es_re_96_dic = es_process(es_re_96, "HH:mm", time_key="create_time")
# 为了es结果和slots对应
kwh_96, charge_96, p_96, price_96 = by_slots(slots_96, es_re_96_dic)
p_96.append("")
price_96.append("")
......@@ -445,32 +455,33 @@ async def get_power_statist_download(req):
power_96_dic = {"电量(万kWh)": slots_96_zj}
charge_96_dic = {"电费(万元)": slots_96_zj}
has_power_cids = [] # es查询有数据的工厂
for info in res_cid_96:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_96, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic = es_process(info["quarter_time"]["buckets"],
fmat="HH:mm")
info_dic = es_process(cid_one, fmat="HH:mm",
time_key="create_time")
kwh_96, charge_96, p_96, price_96 = by_slots(slots_96,
info_dic)
power_96_dic[cid_name] = division_down(kwh_96)
charge_96_dic[cid_name] = division_down(charge_96)
# 没有电量数据的工厂, 设为空字符串
for cid in cid_list:
if cid not in has_power_cids:
cid_name = com_dic[cid]["shortname"]
power_96_dic[cid_name] = list(str(' ') * 97)
charge_96_dic[cid_name] = list(str(' ') * 97)
df1 = pd.DataFrame(power_96_dic)
df1.to_excel(writer, sheet_name="96个点电量", index=False)
df2 = pd.DataFrame(charge_96_dic)
df2.to_excel(writer, sheet_name="96个点电费", index=False)
# 3. 24个点, 电量\电费\平均功率\电价
es_re_24 = await power_charge_p_aggs(start, end, cid_list, "hour")
es_re_24_dic = es_process(es_re_24, fmat="HH:mm") # 为了es结果和slots对应
es_re_24_dic = es_process(es_re_24, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24, es_re_24_dic)
slots_24_zj = copy.deepcopy(slots_24)
slots_24_zj.append("总计")
......@@ -491,25 +502,25 @@ async def get_power_statist_download(req):
power_24_dic = {"电量(万kWh)": slots_24_zj}
charge_24_dic = {"电费(万元)": slots_24_zj}
has_power_cids_24 = [] # es查询有数据的工厂
for info in res_cid_24:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_24, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids_24.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic_24 = es_process(info["quarter_time"]["buckets"],
fmat="HH:mm")
info_dic_24 = es_process(cid_one, fmat="HH:mm",
time_key="create_time")
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24,
info_dic_24)
power_24_dic[cid_name] = division_down(kwh_24)
charge_24_dic[cid_name] = division_down(charge_24)
# 没有电量数据的工厂, 设为空字符串
for cid in cid_list:
if cid not in has_power_cids_24:
cid_name = com_dic[cid]["shortname"]
power_24_dic[cid_name] = list(str(' ') * 25)
charge_24_dic[cid_name] = list(str(' ') * 25)
df3 = pd.DataFrame(power_24_dic)
df3.to_excel(writer, sheet_name="24个点电量", index=False)
df4 = pd.DataFrame(charge_24_dic)
......@@ -521,7 +532,8 @@ async def get_power_statist_download(req):
slots_m_p = copy.deepcopy(slots_m)
slots_m_p.append("总计")
es_re_m = await power_charge_p_aggs(start, end, cid_list, "day")
es_re_m_dic = es_process(es_re_m, fmat="MM-DD") # 为了es结果和slots对应
es_re_m_dic = es_process(es_re_m, fmat="MM-DD",
time_key="create_time") # 为了es结果和slots对应
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, es_re_m_dic)
price_m.append("")
dict_m = {
......@@ -537,24 +549,24 @@ async def get_power_statist_download(req):
power_m_dic = {"电量(万kWh)": slots_m_p}
charge_m_dic = {"电费(万元)": slots_m_p}
has_power_cids = [] # es查询有数据的工厂
for info in res_cid_m:
cid = info.get("key")
for cid, cid_info in groupby(res_cid_m, key=itemgetter("cid")):
cid_one = [one for one in cid_info]
has_power_cids.append(cid)
cid_name = com_dic[cid]["shortname"]
# 把slots作为key提出来
info_dic = es_process(info["quarter_time"]["buckets"],
fmat="MM-DD")
info_dic = es_process(cid_one, fmat="MM-DD",
time_key="create_time")
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, info_dic)
power_m_dic[cid_name] = division_down(kwh_m)
charge_m_dic[cid_name] = division_down(charge_m)
# 没有电量数据的工厂, 设为空字符串
for cid in cid_list:
if cid not in has_power_cids:
cid_name = com_dic[cid]["shortname"]
power_m_dic[cid_name] = list(str(' ') * (len(slots_m) + 1))
charge_m_dic[cid_name] = list(str(' ') * (len(slots_m) + 1))
df1 = pd.DataFrame(power_m_dic)
df1.to_excel(writer, sheet_name="电量", index=False)
df2 = pd.DataFrame(charge_m_dic)
......@@ -583,7 +595,7 @@ async def get_power_company_download(req):
start = args.get("start")
end = args.get("end")
date_type = args.get("date_type")
# 参数为point_id, 转换为point_list
if point_id == -1: # 选的全部
# 1.找出工厂所有pid,point表add_to_company字段为1
......@@ -593,7 +605,7 @@ async def get_power_company_download(req):
point_list = [point.get("pid") for point in point_info]
else:
point_list = [point_id]
# 查询point
point_info_list = await point_by_points(point_list)
pid_dic = {i["pid"]: i for i in point_info_list}
......@@ -637,14 +649,14 @@ async def get_power_company_download(req):
info_dic)
power_96_dic[pid_name] = division_down(kwh_96)
charge_96_dic[pid_name] = division_down(charge_96)
# 没有电量数据的监测点, 设为空字符串
for pid in point_list:
if pid not in has_power_pids:
pid_name = pid_dic[pid]["name"]
power_96_dic[pid_name] = list(str(' ') * 97)
charge_96_dic[pid_name] = list(str(' ') * 97)
df1 = pd.DataFrame(power_96_dic)
df1.to_excel(writer, sheet_name="96个点电量", index=False)
df2 = pd.DataFrame(charge_96_dic)
......@@ -683,14 +695,14 @@ async def get_power_company_download(req):
info_dic_24)
power_24_dic[pid_name] = division_down(kwh_24)
charge_24_dic[pid_name] = division_down(charge_24)
# 没有电量数据的工厂, 设为空字符串
for pid in point_list:
if pid not in has_power_pids_24:
pid_name = pid_dic[pid]["name"]
power_24_dic[pid_name] = list(str(' ') * 25)
charge_24_dic[pid_name] = list(str(' ') * 25)
df3 = pd.DataFrame(power_24_dic)
df3.to_excel(writer, sheet_name="24个点电量", index=False)
df4 = pd.DataFrame(charge_24_dic)
......@@ -728,14 +740,14 @@ async def get_power_company_download(req):
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, info_dic)
power_m_dic[pid_name] = division_down(kwh_m)
charge_m_dic[pid_name] = division_down(charge_m)
# 没有电量数据的工厂, 设为空字符串
for pid in point_list:
if pid not in has_power_pids:
pid_name = pid_dic[pid]["name"]
power_m_dic[pid_name] = list(str(' ') * (len(slots_m) + 1))
charge_m_dic[pid_name] = list(str(' ') * (len(slots_m) + 1))
df1 = pd.DataFrame(power_m_dic)
df1.to_excel(writer, sheet_name="电量", index=False)
df2 = pd.DataFrame(charge_m_dic)
......@@ -774,14 +786,14 @@ async def get_power_company_download(req):
kwh_m, charge_m, p_m, price_m = by_slots(slots_m, info_dic)
power_m_dic[pid_name] = division_down(kwh_m)
charge_m_dic[pid_name] = division_down(charge_m)
# 没有电量数据的point, 设为空字符串
for pid in point_list:
if pid not in has_power_pids:
pid_name = pid_dic[pid]["name"]
power_m_dic[pid_name] = list(str(' ') * (len(slots_m) + 1))
charge_m_dic[pid_name] = list(str(' ') * (len(slots_m) + 1))
df1 = pd.DataFrame(power_m_dic)
df1.to_excel(writer, sheet_name="电量", index=False)
df2 = pd.DataFrame(charge_m_dic)
......
......@@ -305,7 +305,7 @@ def start_end_date():
def time_str_to_str(date_str, format="HH:mm"):
"""YYYY-MM-DD HH:mm:ss格式转换为format格式"""
start_f = my_pendulum.from_format(date_str, 'YYYY-MM-DD HH:mm:ss')
start_f = my_pendulum.parse(date_str)
start_f = start_f.format(format)
return start_f
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment