Commit edbbe5c2 authored by lcn's avatar lcn

修复Bug

parent fb18f2f4
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_kwh_p_dao(table_name, terms, start, end):
sql = f"SELECT create_time,kwh,p FROM {table_name} WHERE cid in %s " \
f"and create_time BETWEEN '{start}' and '{end}'"
async def get_kwh_p_dao(terms, start_time, end_time,
table_name="company_15min_power",
time_fmt="%%Y-%%m-%%d"):
"""
负荷实际数据
:param terms:
:param start_time:
:param end_time:
:param table_name:
:param time_fmt:
:return:
"""
sql = f"""
select p,kwh,DATE_FORMAT(create_time,"{time_fmt}") as cal_time
from {table_name} where cid in %s and create_time >= %s
and create_time <= %s
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(terms, ))
return datas
result = await conn.fetchall(sql, args=(terms, start_time, end_time))
return result or []
async def get_pred_p_dao(terms, start, end):
sql = f"SELECT create_time,p FROM company_day_ahead_predict " \
f"WHERE cid in %s and create_time BETWEEN '{start}' and '{end}'"
async def get_pred_p_dao(terms, start_time, end_time,
time_fmt="%%Y-%%m-%%d"):
"""
负荷预测数据
:param terms:
:param start_time:
:param end_time:
:param time_fmt:
:return:
"""
sql = f"""
select avg(p) p ,count(*) p_count,DATE_FORMAT(create_time,
"{time_fmt}") as cal_time
from company_day_ahead_predict
where cid in %s and create_time >= %s and create_time <= %s
group by cal_time
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(terms,))
return datas
\ No newline at end of file
result = await conn.fetchall(sql, args=(terms, start_time, end_time))
return result or []
from datetime import datetime
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api.constants import COMPANY_15MIN_POWER, COMPANY_DAY_AHEAD_PRE
......@@ -30,7 +33,7 @@ async def load_forecast_service(cid, cids, start, end):
trans_type = "15m"
interval = trans_type
format = "yyyy-MM-dd HH:mm:ss"
real_query["aggs"] = {
"quarter_time": {
"date_histogram": {
......@@ -39,7 +42,8 @@ async def load_forecast_service(cid, cids, start, end):
"time_zone": "+08:00",
"format": format,
},
"aggs": {"p": {"stats": {"field": "p"}}, "kwh": {"stats": {"field": "kwh"}}},
"aggs": {"p": {"stats": {"field": "p"}},
"kwh": {"stats": {"field": "kwh"}}},
}
}
real_index = COMPANY_15MIN_POWER
......@@ -70,14 +74,16 @@ async def load_forecast_service(cid, cids, start, end):
real_value = real_re[slot].get("p").get("sum")
real_power_val = (
real_re[slot].get("kwh").get("sum")
if type(real_re[slot].get("kwh").get("sum")) in [int, float]
if type(real_re[slot].get("kwh").get("sum")) in [int,
float]
else ""
)
else:
real_value = real_re[slot].get("p").get("sum")
real_power_val = (
real_re[slot].get("kwh").get("sum")
if type(real_re[slot].get("kwh").get("sum")) in [int, float]
if type(real_re[slot].get("kwh").get("sum")) in [int,
float]
else ""
)
real_power_list.append(real_power_val)
......@@ -160,8 +166,9 @@ async def load_forecast_service(cid, cids, start, end):
pred_power_list=LoadValue(slots=slots, value=pred_power_list),
deviation_power_list=[],
)
count, maxnum, minnum, average, max_index, min_index = choose_list(deviation_list_tmp, 4)
count, maxnum, minnum, average, max_index, min_index = choose_list(
deviation_list_tmp, 4)
# 求最大偏差和最小偏差时间
max_t = slots[max_index]
min_t = slots[min_index]
......@@ -183,7 +190,7 @@ async def load_forecast_service(cid, cids, start, end):
pred_tmp = [i for i in pred_tmp if i != ""]
total_deviation = abs((sum(pred_tmp) - sum(real_tmp)) / sum(real_tmp))
total_deviation = round(total_deviation, 4)
deviation_power_list = []
for index, real_power in enumerate(real_power_list):
if real_power == "" or pred_power_list[index] == "" or real_power == 0:
......@@ -192,16 +199,17 @@ async def load_forecast_service(cid, cids, start, end):
# (预测-实际)/实际 * 100%
deviation = (pred_power_list[index] - real_power) / real_power
deviation_power_list.append(abs(round(deviation, 4)))
if (
start.split(" ")[0].rsplit("-", 1)[0] == str(datetime.now().date()).rsplit("-", 1)[0]
and trans_type == "day"
start.split(" ")[0].rsplit("-", 1)[0] ==
str(datetime.now().date()).rsplit("-", 1)[0]
and trans_type == "day"
):
# 如果是本月的,那么当天的电量偏差没有意义, 置为空
today_str = str(datetime.now().date()).split("-", 1)[1]
index = slots.index(today_str)
deviation_power_list[index] = ""
return ForecastResp(
pred_data=lv_pred,
real_data=lv_real,
......@@ -216,150 +224,139 @@ async def load_forecast_service(cid, cids, start, end):
)
async def load_forecast_service_new15(cid, cids, start, end):
terms = cids if cids else [cid]
time_type = interval_type("range", start, end)
if time_type == "day":
table_name = "company_1day_power"
fmt = "%m-%d"
async def load_forecast_service_new15(cids, start_time, end_time, interval,
slots):
if interval == 15 * 60:
# 按日
time_fmt = "%%Y-%%m-%%d %%H:%%i"
now_time_fmt = "YYYY-MM-DD HH:mm"
to_fmt = "HH:mm"
real_table_name = "company_15min_power"
elif interval == 86400:
# 按月
time_fmt = "%%Y-%%m-%%d"
now_time_fmt = "YYYY-MM-DD"
to_fmt = now_time_fmt
real_table_name = "company_1day_power"
else:
table_name = "company_15min_power"
fmt = "%H:%M"
datas = await get_kwh_p_dao(table_name, terms, start, end)
if not datas:
return ForecastResp()
# 获取slots
intervel, slots = time_pick_transf(start, end)
real_re = sql_time_process(datas, "create_time", fmt)
lv_real = LoadValue()
lv_real.slots = slots
real_list = []
real_power_list, pred_power_list = [], []
# 按年
time_fmt = "%%Y-%%m"
now_time_fmt = "YYYY-MM"
to_fmt = now_time_fmt
real_table_name = "company_1day_power"
now_time = pendulum.now().format(now_time_fmt)
now_month = pendulum.now().format("YYYY-MM")
# 1,获取实际数据
real_result = await get_kwh_p_dao(cids, start_time, end_time,
real_table_name, time_fmt)
# if not real_result:
# return [], [], [], [], [], [], [], 0, "", 0, "", 0, 0
real_data = {i["cal_time"]: i for i in real_result}
real_list, real_power_list = [], []
for slot in slots:
if slot in real_re:
real_value = real_re[slot].get("p")
real_power_val = (
real_re[slot].get("kwh")
if type(real_re[slot].get("kwh")) in [int, float] else ""
)
real_power_list.append(real_power_val)
# 值为0是正常数据
if real_value in [0, 0.0]:
real_list.append(0.0)
if slot in real_data:
p = real_data[slot]["p"]
kwh = real_data[slot]["kwh"]
if slot > now_time:
value, kwh = "", ""
elif p in [0, 0.0]:
value, kwh = 0.0, 0.0
elif p:
value = round(p, 2)
kwh = round(kwh, 2)
else:
real_list.append(round(real_value, 2) if real_value else "")
value, kwh = "", ""
else:
real_list.append("")
real_power_list.append("")
lv_real.value = real_list
# 3. 预测数据
pred_re = await get_pred_p_dao(terms, start, end)
if not pred_re:
return ForecastResp()
pred_re = sql_time_process(pred_re, "create_time", fmt)
lv_pred = LoadValue()
lv_pred.slots = slots
pred_list = []
value, kwh = "", ""
real_list.append(value)
real_power_list.append(kwh)
# 2,获取预测数据
forecast_result = await get_pred_p_dao(cids, start_time, end_time,
time_fmt)
# if not forecast_result:
# return [], [], [], [], [], [], [], 0, "", 0, "", 0, 0
forecast_data = {i["cal_time"]: i for i in forecast_result}
forecast_list, forecast_power_list = [], []
for slot in slots:
if slot in pred_re:
pred_value = pred_re[slot].get("p")
pred_power_val = (
round(pred_re[slot].get("p") * 0.25, 2)
if type(pred_re[slot].get("p")) in [int, float] else ""
)
pred_power_list.append(pred_power_val)
# 值为0是正常数据
if pred_value in [0, 0.0]:
pred_list.append(0.0)
if slot in forecast_data:
p = forecast_data[slot]["p"]
if p in [0, 0.0]:
value = 0.0
elif p:
value = round(p, 2)
else:
pred_list.append(round(pred_value, 2) if pred_value else "")
value = ""
else:
pred_list.append("")
pred_power_list.append("")
lv_pred.value = pred_list
# 4.求偏差数据
deviation_list = [] # 偏差
deviation_list_abs = [] # 偏差取绝对值, 最大/最小/平均偏差都是绝对值后数据
value = ""
forecast_list.append(value)
if value or value == 0.0:
p_count = forecast_data[slot]["p_count"]
power_value = round(value * 0.25 * p_count, 2)
else:
power_value = ""
forecast_power_list.append(power_value)
# 3.求偏差数据
deviation_list, deviation_power_list = [], [] # 偏差取绝对值, 最大/最小/平均偏差都是绝对值后数据
for num, value in enumerate(real_list):
if not value or not pred_list[num]:
if not value or not forecast_list[num]:
deviation_list.append("")
else:
# (预测-实际)/实际 * 100%
deviation = (pred_list[num] - value) / value
deviation_list.append(round(deviation, 4))
deviation_list_abs.append(abs(round(deviation, 4)))
# 取绝对值,并保留
deviation_list_tmp = [i for i in deviation_list_abs if i != ""]
log.info(f"deviation_list_tmp:{deviation_list_tmp}, "
f"deviation_list_abs:{deviation_list_abs}")
if not deviation_list_tmp:
return ForecastResp(
pred_data=lv_pred,
real_data=lv_real,
deviation_list=deviation_list_abs,
max_deviation=[],
min_deviation=[],
avg_deviation="",
total_deviation="",
real_power_list=LoadValue(slots=slots, value=real_power_list),
pred_power_list=LoadValue(slots=slots, value=pred_power_list),
deviation_power_list=[],
)
count, maxnum, minnum, average, max_index, min_index = choose_list(
deviation_list_tmp, 4)
# 求最大偏差和最小偏差时间
max_t = slots[max_index]
min_t = slots[min_index]
if time_type == "day":
max_time = start[:4] + "-" + max_t
min_time = start[:4] + "-" + min_t
else:
max_time = start.split(" ")[0] + " " + max_t
min_time = start.split(" ")[0] + " " + min_t
# 最大偏差
max_deviation = [maxnum, max_time]
# 最小偏差
min_deviation = [minnum, min_time]
# 平均偏差
avg_deviation = average
# 总量偏差 = | (预测(总) - 实际(总)) / 实际(总) | * 100%
real_tmp = [i for i in real_list if i != ""]
pred_tmp = pred_list[: len(real_tmp)]
pred_tmp = [i for i in pred_tmp if i != ""]
total_deviation = abs((sum(pred_tmp) - sum(real_tmp)) / sum(real_tmp))
total_deviation = round(total_deviation, 4)
deviation_power_list = []
for index, real_power in enumerate(real_power_list):
if real_power == "" or pred_power_list[index] == "" or real_power == 0:
deviation_power_list.append("")
deviation = (forecast_list[num] - value) / value
deviation_list.append(abs(round(deviation, 4)))
for num, value in enumerate(real_power_list):
if not value or not forecast_power_list[num]:
deviation_power = ""
else:
# (预测-实际)/实际 * 100%
deviation = (pred_power_list[index] - real_power) / real_power
deviation_power_list.append(abs(round(deviation, 4)))
if (
start.split(" ")[0].rsplit("-", 1)[0] ==
str(datetime.now().date()).rsplit("-", 1)[0]
and time_type == "day"
):
deviation_power = (forecast_power_list[num] - value) / value
deviation_power = abs(round(deviation_power, 4))
# 如果是本月的,那么当天的电量偏差没有意义, 置为空
today_str = str(datetime.now().date()).split("-", 1)[1]
index = slots.index(today_str)
deviation_power_list[index] = ""
if interval == 86400 and now_month == start_time[:7] and \
num == slots.index(now_time):
deviation_power = ""
deviation_power_list.append(deviation_power)
# 4,求偏差统计值
deviation_list_tmp = [i for i in deviation_list if i != ""]
if deviation_list_tmp:
count, max_deviation, min_deviation, average, max_index, min_index = \
choose_list(deviation_list_tmp, 4)
# 求最大偏差和最小偏差时间
max_time = slots[max_index]
min_time = slots[min_index]
# 平均偏差
avg_deviation = average
# 总量偏差 = | (预测(总) - 实际(总)) / 实际(总) | * 100%
real_tmp = [i for i in real_list if i != ""]
pred_tmp = forecast_list[: len(real_tmp)]
pred_tmp = [i for i in pred_tmp if i != ""]
total_deviation = abs((sum(pred_tmp) - sum(real_tmp)) / sum(real_tmp))
total_deviation = round(total_deviation, 4)
else:
max_deviation, max_time, min_deviation, min_time = 0, "", 0, ""
avg_deviation, total_deviation = 0, 0
# 重置 slots
if interval == 15 * 60:
for num, slot in enumerate(slots):
slot = pendulum.parse(slot).format("HH:mm")
slots[num] = slot
return ForecastResp(
pred_data=lv_pred,
real_data=lv_real,
deviation_list=deviation_list_abs,
pred_data=LoadValue(slots=slots, value=real_list),
real_data=LoadValue(slots=slots, value=forecast_list),
deviation_list=deviation_list,
max_deviation=max_deviation,
min_deviation=min_deviation,
avg_deviation=avg_deviation,
total_deviation=total_deviation,
real_power_list=LoadValue(slots=slots, value=real_power_list),
pred_power_list=LoadValue(slots=slots, value=pred_power_list),
pred_power_list=LoadValue(slots=slots, value=forecast_power_list),
deviation_power_list=deviation_power_list,
)
......@@ -5,6 +5,7 @@ from unify_api.modules.load_analysis.components.load_forecast_cps import (
)
from unify_api.modules.load_analysis.service.load_forecast_service import \
load_forecast_service, load_forecast_service_new15
from unify_api.utils.time_format import time_pick_transf_new
@summary("负荷预测")
......@@ -17,4 +18,8 @@ async def post_load_forecast(req, body: ForecastReq) -> ForecastResp:
# 管理版本多个工厂的情况, 兼容能力最强的参数cids, 保留旧有的cid:
cids = body.cids
# return await load_forecast_service(cid, cids, start, end)
return await load_forecast_service_new15(cid, cids, start, end)
terms = cids if cids else [cid]
# 获取时间差
interval, slots = time_pick_transf_new(start, end)
return await load_forecast_service_new15(terms, start, end,
interval, slots)
......@@ -60,6 +60,41 @@ def time_pick_transf(start, end, is_range=0):
return intervel, slots
def time_pick_transf_new(start, end):
"""获取intervel和slots, 详细显示时间轴信息,新接口都使用这个来获取时间"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
diff = end_f.int_timestamp - start_f.int_timestamp
# 1. 计算intervel
# 1.1 区间48小时之内, 返回15min
if diff <= 48 * 3600:
intervel = 15 * 60
# 1.2 区间在60天以内, 返回1day
elif 48 * 3600 < diff <= 60 * 86400:
intervel = 86400
# 1.3 选择年, 返回1个月
else:
intervel = 30 * 86400
# 2. 计算slots
# 2.1 取到点的个数, 比如15min的96个点
slots = []
slot_num = round((end_f.int_timestamp - start_f.int_timestamp) / intervel)
for i in range(slot_num):
# 区间48小时之内
if diff <= 48 * 3600:
dt = start_f.add(minutes=15 * i).format("YYYY-MM-DD HH:mm")
dt_str = str(dt)
# 区间在60天以内
elif 48 * 3600 < diff <= 60 * 86400:
dt = start_f.add(days=1 * i).format("YYYY-MM-DD")
dt_str = str(dt)
else:
dt = start_f.add(months=1 * i).format("YYYY-MM")
dt_str = str(dt)
slots.append(dt_str)
return intervel, slots
def power_slots(start, end):
"""电量电费,用电统计,time=range slots计算"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment