Commit 15ba9fa7 authored by ZZH's avatar ZZH

remove es 2023-6-2

parent 4af81566
import json
import pendulum
from pot_libs.common.components.query import PageRequest, Filter, Equal, Range
from pot_libs.es_util.es_query import EsQuery
from pot_libs.es_util.es_utils import EsUtil
import io
import pandas as pd
from pot_libs.common.components.query import PageRequest
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary, examples
from pot_libs.utils.exc_util import ParamException
from unify_api.constants import COMPANY_15MIN_POWER, POINT_15MIN_POWER, PRODUCT
from unify_api.constants import PRODUCT
from unify_api.modules.common.components.common_cps import LevelResp
from unify_api.modules.common.dao.common_dao import company_by_cids
from unify_api.modules.common.procedures.cids import get_cids, get_proxy_cids
from unify_api.modules.common.procedures.cids import get_proxy_cids
from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.elec_charge.components.elec_charge_cps import \
power_overview_example, PricePolicyReq, PricePolicyResp, \
......@@ -26,15 +26,12 @@ from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service, load_info_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.common_utils import round_2, round_4, NumListHelper
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.request_util import filed_value_from_list
from unify_api.utils.time_format import convert_es_str, last_time_str, \
today_month_date, srv_time
import io
import pandas as pd
from unify_api.utils.time_format import last_time_str, today_month_date, \
srv_time
async def power_overview_new15(start, end, point_id, cid):
async def power_overview(start, end, point_id, cid):
if point_id == -1: # 选的全部
sql = f"SELECT spfv, sum(p), sum(kwh), sum(charge) " \
f"FROM company_15min_power where create_time " \
......@@ -68,51 +65,6 @@ async def power_overview_new15(start, end, point_id, cid):
return pv1, pv2
async def power_overview(date_start, date_end, point_id, cid):
"""抽离电量电费信息,供调用"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
# 2. 获取query_body # es时间查询字段:quarter_time
query_range = Range(field="quarter_time", start=start_es,
end=end_es)
if point_id == -1: # 选的全部
equal = Equal(field="cid", value=cid)
index = COMPANY_15MIN_POWER
else:
equal = Equal(field="pid", value=point_id)
index = POINT_15MIN_POWER
query_filter = Filter(equals=[equal], ranges=[query_range], in_groups=[],
keywords=[])
page_request = PageRequest(
page_size=10, page_num=1, filter=query_filter, sort=None)
query_body = EsQuery.agg_group_by(page_request, "spfv.keyword", "sum",
("kwh", "charge", "p"))
log.info(query_body)
# 3. 查询es
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
if not es_re:
return PowerViewRes(charge=Spvf(), power=Spvf())
es_re = es_re["aggregations"]["spfvs"]["buckets"]
# 4. 构造返回
pv1 = Spvf() # 电量对象
pv2 = Spvf() # 电费对象
for info in es_re:
if info.get("key") == "s":
pv1.s = info.get("kwh")["value"]
pv2.s = info.get("charge")["value"]
elif info.get("key") == "p":
pv1.p = info.get("kwh")["value"]
pv2.p = info.get("charge")["value"]
elif info.get("key") == "f":
pv1.f = info.get("kwh")["value"]
pv2.f = info.get("charge")["value"]
elif info.get("key") == "v":
pv1.v = info.get("kwh")["value"]
pv2.v = info.get("charge")["value"]
return pv1, pv2
@summary('电量电费信息')
@examples(power_overview_example)
async def post_power_overview(req, body: PageRequest) -> PowerViewRes:
......@@ -125,8 +77,8 @@ async def post_power_overview(req, body: PageRequest) -> PowerViewRes:
date_end = body.filter.ranges[0].end
if date_start == date_end:
return PowerViewRes(power=Spvf(), charge=Spvf())
pv1, pv2 = await power_overview_new15(start=date_start, end=date_end,
point_id=point_id, cid=cid)
pv1, pv2 = await power_overview(date_start, date_end, point_id, cid)
return PowerViewRes(power=pv1, charge=pv2)
......@@ -173,7 +125,7 @@ async def post_price_policy(req, body: PricePolicyReq) -> PricePolicyResp:
return PricePolicyResp(price_info=res_list)
async def aver_elec_price_new15(start, end, point_id, cid, date_type):
async def avg_ele_price(start, end, point_id, cid, date_type):
if point_id == -1: # 选的全部
table_name = "company_15min_power"
name = "cid"
......@@ -203,43 +155,6 @@ async def aver_elec_price_new15(start, end, point_id, cid, date_type):
return this_ck, last_ck
async def aver_elec_price(start, end, point_id, cid, date_type):
"""平均电价和增长率, 抽离功能"""
if point_id == -1: # 选的全部
matchs = {"cid": cid}
index = COMPANY_15MIN_POWER
else:
matchs = {"pid": point_id}
index = POINT_15MIN_POWER
# 2.构造query_body
query_this = agg_statistics(matchs=matchs,
aggs_key=["kwh", "charge"],
start=start, end=end, date_key="quarter_time")
# 3.返回数据
async with EsUtil() as es:
this_re = await es.search_origin(body=query_this,
index=index)
# 本周期电量电费
this_ck = ChargeKwh()
this_ck.charge = this_re["aggregations"]["charge"]["value"]
this_ck.kwh = this_re["aggregations"]["kwh"]["value"]
if date_type == "range":
return this_ck, None
# 上周期电量电费
start_last, end_last = last_time_str(start, end, date_type)
query_last = agg_statistics(matchs=matchs,
aggs_key=["kwh", "charge"],
start=start_last, end=end_last,
date_key="quarter_time")
async with EsUtil() as es:
last_re = await es.search_origin(body=query_last,
index=index)
last_ck = ChargeKwh()
last_ck.charge = last_re["aggregations"]["charge"]["value"]
last_ck.kwh = last_re["aggregations"]["kwh"]["value"]
return this_ck, last_ck
@summary('平均电价和增长率')
async def post_aver_elec_price(req, body: AverPriceReq) -> AverPriceResp:
"""平均电价, 增长率"""
......@@ -251,7 +166,7 @@ async def post_aver_elec_price(req, body: AverPriceReq) -> AverPriceResp:
date_type = body.date_type
if start == end:
return AverPriceResp(this_power=ChargeKwh(), last_power=ChargeKwh())
this_ck, last_ck = await aver_elec_price_new15(start, end, point_id, cid,
this_ck, last_ck = await avg_ele_price(start, end, point_id, cid,
date_type)
return AverPriceResp(this_power=this_ck, last_power=last_ck)
......@@ -262,20 +177,20 @@ async def post_index_charge(req, body: IndexChargeReq) -> IndexChargeResp:
point_id = body.point_id
today_start, today_end, month_start, month_end = today_month_date()
# 1. 今日电量电费spvf
kwh_t, charge_t = await power_overview_new15(today_start, today_end,
point_id, cid)
kwh_t, charge_t = await power_overview(today_start, today_end, point_id,
cid)
today_spvf = PowerViewRes(power=kwh_t, charge=charge_t)
# 2. 本月电量电费spvf
kwh_m, charge_m = await power_overview_new15(month_start, month_end,
point_id, cid)
kwh_m, charge_m = await power_overview(month_start, month_end, point_id,
cid)
month_spvf = PowerViewRes(power=kwh_m, charge=charge_m)
# 3. 今日平均电价和增长率
this_ck_t, last_ck_t = await aver_elec_price_new15(today_start, today_end,
this_ck_t, last_ck_t = await avg_ele_price(today_start, today_end,
point_id, cid,
date_type="day")
today_power = AverPriceResp(this_power=this_ck_t, last_power=last_ck_t)
# 4. 本月平均电价和增长率
this_ck_m, last_ck_m = await aver_elec_price_new15(month_start, month_end,
this_ck_m, last_ck_m = await avg_ele_price(month_start, month_end,
point_id, cid,
date_type="month")
month_power = AverPriceResp(this_power=this_ck_m, last_power=last_ck_m)
......
from datetime import datetime
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api.constants import COMPANY_15MIN_POWER, COMPANY_DAY_AHEAD_PRE
from unify_api.modules.elec_charge.common.utils import interval_type
from unify_api.modules.load_analysis.components.load_forecast_cps import (
ForecastResp,
LoadValue,
ForecastResp, LoadValue,
)
from unify_api.utils.common_utils import choose_list
from unify_api.utils.es_query_body import EsQueryBody, es_process, \
sql_time_process
from unify_api.utils.time_format import time_pick_transf
from unify_api.modules.load_analysis.dao.load_forecast_dao import \
get_kwh_p_dao, get_pred_p_dao
async def load_forecast_service(cid, cids, start, end):
"""负荷预测"""
terms = {"cid": [cid]}
if cids:
terms = {"cid": cids}
# 2.es查询实时、预测数据
query = EsQueryBody(terms=terms, start=start, end=end,
date_key="quarter_time", size=500)
real_query = query.query()
# date_type转换, 用于聚合查询
trans_type = interval_type("range", start, end)
if trans_type != "day":
trans_type = "15m"
interval = trans_type
format = "yyyy-MM-dd HH:mm:ss"
real_query["aggs"] = {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": format,
},
"aggs": {"p": {"stats": {"field": "p"}},
"kwh": {"stats": {"field": "kwh"}}},
}
}
real_index = COMPANY_15MIN_POWER
log.info(real_index + f"====={real_query}")
async with EsUtil() as es:
real_re = await es.search_origin(body=real_query, index=real_index)
if trans_type == "day":
real_re = real_re["aggregations"]["quarter_time"]["buckets"]
else:
real_re = real_re["aggregations"]["quarter_time"]["buckets"]
if not real_re:
return ForecastResp()
# 获取slots
intervel, slots = time_pick_transf(start, end)
if trans_type == "day":
fmt = "MM-DD"
real_re = es_process(real_re, fmat=fmt)
else:
fmt = "HH:mm"
real_re = es_process(real_re, fmat=fmt)
lv_real = LoadValue()
lv_real.slots = slots
real_list = []
real_power_list, pred_power_list = [], []
for slot in slots:
if slot in real_re:
if trans_type == "day":
real_value = real_re[slot].get("p").get("sum")
real_power_val = (
real_re[slot].get("kwh").get("sum")
if type(real_re[slot].get("kwh").get("sum")) in [int,
float]
else ""
)
else:
real_value = real_re[slot].get("p").get("sum")
real_power_val = (
real_re[slot].get("kwh").get("sum")
if type(real_re[slot].get("kwh").get("sum")) in [int,
float]
else ""
)
real_power_list.append(real_power_val)
# 值为0是正常数据
if real_value in [0, 0.0]:
real_list.append(0.0)
else:
real_list.append(round(real_value, 2) if real_value else "")
else:
real_list.append("")
real_power_list.append("")
lv_real.value = real_list
# 3. 预测数据
pred_index = COMPANY_DAY_AHEAD_PRE
log.info(pred_index + f"====={real_query}")
async with EsUtil() as es:
pred_re = await es.search_origin(body=real_query, index=pred_index)
pred_re = pred_re["aggregations"]["quarter_time"]["buckets"]
if not pred_re:
return ForecastResp()
if trans_type == "day":
fmt = "MM-DD"
else:
fmt = "HH:mm"
pred_re = es_process(pred_re, fmat=fmt)
lv_pred = LoadValue()
lv_pred.slots = slots
pred_list = []
for slot in slots:
if slot in pred_re:
if trans_type == "day":
pred_value = pred_re[slot].get("p").get("sum")
pred_power_val = (
round(pred_re[slot].get("p").get("sum") * 0.25, 2)
if type(pred_re[slot].get("p").get("sum")) in [int, float]
else ""
)
else:
pred_value = pred_re[slot].get("p").get("sum")
pred_power_val = (
round(pred_re[slot].get("p").get("sum") * 0.25, 2)
if type(pred_re[slot].get("p").get("sum")) in [int, float]
else ""
)
pred_power_list.append(pred_power_val)
# 值为0是正常数据
if pred_value in [0, 0.0]:
pred_list.append(0.0)
else:
pred_list.append(round(pred_value, 2) if pred_value else "")
else:
pred_list.append("")
pred_power_list.append("")
lv_pred.value = pred_list
# 4.求偏差数据
deviation_list = [] # 偏差
deviation_list_abs = [] # 偏差取绝对值, 最大/最小/平均偏差都是绝对值后数据
for num, value in enumerate(real_list):
if not value or not pred_list[num]:
deviation_list.append("")
else:
# (预测-实际)/实际 * 100%
deviation = (pred_list[num] - value) / value
deviation_list.append(round(deviation, 4))
deviation_list_abs.append(abs(round(deviation, 4)))
# 取绝对值,并保留
deviation_list_tmp = [i for i in deviation_list_abs if i != ""]
log.info(f"deviation_list_tmp:{deviation_list_tmp}, "
f"deviation_list_abs:{deviation_list_abs}")
if not deviation_list_tmp:
return ForecastResp(
pred_data=lv_pred,
real_data=lv_real,
deviation_list=deviation_list_abs,
max_deviation=[],
min_deviation=[],
avg_deviation="",
total_deviation="",
real_power_list=LoadValue(slots=slots, value=real_power_list),
pred_power_list=LoadValue(slots=slots, value=pred_power_list),
deviation_power_list=[],
)
count, maxnum, minnum, average, max_index, min_index = choose_list(
deviation_list_tmp, 4)
# 求最大偏差和最小偏差时间
max_t = slots[max_index]
min_t = slots[min_index]
if trans_type == "day":
max_time = start[:4] + "-" + max_t
min_time = start[:4] + "-" + min_t
else:
max_time = start.split(" ")[0] + " " + max_t
min_time = start.split(" ")[0] + " " + min_t
# 最大偏差
max_deviation = [maxnum, max_time]
# 最小偏差
min_deviation = [minnum, min_time]
# 平均偏差
avg_deviation = average
# 总量偏差 = | (预测(总) - 实际(总)) / 实际(总) | * 100%
real_tmp = [i for i in real_list if i != ""]
pred_tmp = pred_list[: len(real_tmp)]
pred_tmp = [i for i in pred_tmp if i != ""]
total_deviation = abs((sum(pred_tmp) - sum(real_tmp)) / sum(real_tmp))
total_deviation = round(total_deviation, 4)
deviation_power_list = []
for index, real_power in enumerate(real_power_list):
if real_power == "" or pred_power_list[index] == "" or real_power == 0:
deviation_power_list.append("")
else:
# (预测-实际)/实际 * 100%
deviation = (pred_power_list[index] - real_power) / real_power
deviation_power_list.append(abs(round(deviation, 4)))
if (
start.split(" ")[0].rsplit("-", 1)[0] ==
str(datetime.now().date()).rsplit("-", 1)[0]
and trans_type == "day"
):
# 如果是本月的,那么当天的电量偏差没有意义, 置为空
today_str = str(datetime.now().date()).split("-", 1)[1]
index = slots.index(today_str)
deviation_power_list[index] = ""
return ForecastResp(
pred_data=lv_pred,
real_data=lv_real,
deviation_list=deviation_list_abs,
max_deviation=max_deviation,
min_deviation=min_deviation,
avg_deviation=avg_deviation,
total_deviation=total_deviation,
real_power_list=LoadValue(slots=slots, value=real_power_list),
pred_power_list=LoadValue(slots=slots, value=pred_power_list),
deviation_power_list=deviation_power_list,
)
async def load_forecast_service_new15(cids, start_time, end_time, interval,
slots):
async def load_forecast_srv(cids, s_time, e_time, interval, slots):
if interval == 15 * 60:
# 按日
time_fmt = "%%Y-%%m-%%d %%H:%%i"
now_time_fmt = "YYYY-MM-DD HH:mm"
to_fmt = "HH:mm"
real_table_name = "company_15min_power"
tbl_name = "company_15min_power"
elif interval == 86400:
# 按月
time_fmt = "%%Y-%%m-%%d"
now_time_fmt = "YYYY-MM-DD"
to_fmt = now_time_fmt
real_table_name = "company_1day_power"
tbl_name = "company_1day_power"
else:
# 按年
time_fmt = "%%Y-%%m"
now_time_fmt = "YYYY-MM"
to_fmt = now_time_fmt
real_table_name = "company_1day_power"
tbl_name = "company_1day_power"
now_time = pendulum.now().format(now_time_fmt)
now_month = pendulum.now().format("YYYY-MM")
# 1,获取实际数据
real_result = await get_kwh_p_dao(cids, start_time, end_time,
real_table_name, time_fmt)
real_result = await get_kwh_p_dao(cids, s_time, e_time, tbl_name, time_fmt)
# if not real_result:
# return [], [], [], [], [], [], [], 0, "", 0, "", 0, 0
......@@ -273,9 +51,9 @@ async def load_forecast_service_new15(cids, start_time, end_time, interval,
value, kwh = "", ""
real_list.append(value)
real_power_list.append(kwh)
# 2,获取预测数据
forecast_result = await get_pred_p_dao(cids, start_time, end_time,
time_fmt)
forecast_result = await get_pred_p_dao(cids, s_time, e_time, time_fmt)
# if not forecast_result:
# return [], [], [], [], [], [], [], 0, "", 0, "", 0, 0
forecast_data = {i["cal_time"]: i for i in forecast_result}
......@@ -298,6 +76,7 @@ async def load_forecast_service_new15(cids, start_time, end_time, interval,
else:
power_value = ""
forecast_power_list.append(power_value)
# 3.求偏差数据
deviation_list, deviation_power_list = [], [] # 偏差取绝对值, 最大/最小/平均偏差都是绝对值后数据
for num, value in enumerate(real_list):
......@@ -316,7 +95,7 @@ async def load_forecast_service_new15(cids, start_time, end_time, interval,
deviation_power = (forecast_power_list[num] - value) / value
deviation_power = abs(round(deviation_power, 4))
# 如果是本月的,那么当天的电量偏差没有意义, 置为空
if interval == 86400 and now_month == start_time[:7] and \
if interval == 86400 and now_month == s_time[:7] and \
num == slots.index(now_time):
deviation_power = ""
deviation_power_list.append(deviation_power)
......
from pot_libs.sanic_api import summary
from unify_api.modules.load_analysis.components.load_forecast_cps import (
ForecastReq,
ForecastResp,
ForecastReq, ForecastResp,
)
from unify_api.modules.load_analysis.service.load_forecast_service import \
load_forecast_service, load_forecast_service_new15
load_forecast_srv
from unify_api.utils.time_format import time_pick_transf_new
......@@ -17,9 +16,7 @@ async def post_load_forecast(req, body: ForecastReq) -> ForecastResp:
end = body.end
# 管理版本多个工厂的情况, 兼容能力最强的参数cids, 保留旧有的cid:
cids = body.cids
# return await load_forecast_service(cid, cids, start, end)
terms = cids if cids else [cid]
# 获取时间差
interval, slots = time_pick_transf_new(start, end)
return await load_forecast_service_new15(terms, start, end,
interval, slots)
return await load_forecast_srv(terms, start, end, interval, slots)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment