Commit e7df191b authored by lcn's avatar lcn

修复BUG

parent 6614a20a
......@@ -122,6 +122,18 @@ async def query_charge_aggs(date_start, date_end, cid_list):
return es_re["aggregations"]["cids"]["buckets"]
async def query_charge_new15(date_start, date_end, cid_list):
sql = f"""
select cid,sum(kwh) kwh,sum(charge) charge from company_15min_power
where cid in %s and create_time BETWEEN %s and %s
group by cid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(cid_list, date_start,
date_end))
return datas
async def power_charge_p_aggs(date_start, date_end, cid_list, interval):
"""
date_histogram,
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import COMPANY_1DAY_POWER, COMPANY_15MIN_POWER
from unify_api.modules.common.dao.common_dao import company_by_cids
from unify_api.modules.elec_charge.components.elec_charge_cps import Spvf, \
PowerViewRes
from unify_api.modules.elec_charge.dao.elec_charge_dao import query_charge_aggs
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs, query_charge_new15
from unify_api.utils.common_utils import round_2, process_es_data, round_4
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.time_format import convert_es_str, last_time_str
......@@ -26,9 +28,9 @@ def quarters_trans(quarters):
}
temp_value = ""
temp_index = 0
start = my_pendulum.from_format("00:00", 'HH:mm')
for index, value in enumerate(quarters):
if index == 0:
temp_value = value
......@@ -37,12 +39,12 @@ def quarters_trans(quarters):
# dic2[temp_value].append(str(temp_index) + "-" + str(index-1))
# 计算时长
dic2[temp_value + "dt"] += (index - temp_index) * 15 / 60
# 转换为时间范围
minute_s = start.add(minutes=temp_index * 15).format("HH:mm")
minute_e = start.add(minutes=index * 15).format("HH:mm")
dic2[temp_value].append(str(minute_s) + "-" + str(minute_e))
# 重置temp_value和temp_index
temp_value = value
temp_index = index
......@@ -73,6 +75,17 @@ async def proxy_power(cid_list):
return 0
async def proxy_power15(cid_list):
"""渠道版累计用电"""
sql = f"""
select sum(kwh) kwh from company_1day_power where cid in %s
"""
# 3.返回数据
async with MysqlUtil() as conn:
res = await conn.fetchone(sql=sql, args=(cid_list,))
return round(res.get("kwh", 0), 2) if res else 0
async def power_overview_proxy(date_start, date_end, cid_list):
"""渠道版, 抽离电量电费信息,供调用"""
pv1 = {} # 电量
......@@ -127,7 +140,7 @@ async def power_overview_proxy(date_start, date_end, cid_list):
}
}
}
log.info(query_body)
# 3. 查询es
async with EsUtil() as es:
......@@ -154,6 +167,26 @@ async def power_overview_proxy(date_start, date_end, cid_list):
return pv1, pv2
async def power_overview_proxy15(date_start, date_end, cid_list):
"""渠道版, 抽离电量电费信息,供调用"""
pv1 = {} # 电量
pv2 = {} # 电费
sql = f"""
select spfv,sum(p) as p,sum(kwh) as kwh,sum(charge) as charge
from company_15min_power
where cid in %s and create_time BETWEEN %s and %s
group by spfv
"""
async with MysqlUtil() as conn:
res = await conn.fetchall(sql=sql,
args=(cid_list, date_start, date_end))
# 4. 构造返回
for info in res:
pv1[info.get("spfv")] = round_2(info.get("kwh"))
pv2[info.get("spfv")] = round_2(info.get("charge"))
return pv1, pv2
def total_value(dict_total):
# spfv是对象
if not dict_total:
......@@ -222,6 +255,69 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
return kwh_list, charge_list, price_list
async def power_aggs_cid_proxy_new15(start, end, cid_list, date_type):
"""渠道版,电量电费信息,根据cid聚合,再聚合求出sum电量/电费"""
# 1. 求出上周期时间
start_last, end_last = last_time_str(start, end, date_type)
# 2. 获取es结果
re_this = await query_charge_new15(start, end, cid_list)
re_last = await query_charge_new15(start_last, end_last, cid_list)
if not re_this:
log.info(
f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
re_last_dic = process_es_data(re_last, key="cid")
# 3. 构造返回
kwh_list = []
charge_list = []
price_list = []
# 3.1 查询出cid和工厂名对应关系
company_list = await company_by_cids(cid_list)
# 把cid提出来
com_dic = process_es_data(company_list, key="cid")
for info in re_this:
cid = info.get("cid")
cid_name = com_dic[cid]["shortname"]
kwh = round_2(info.get("kwh"))
if kwh == 0:
continue
# 上一周期如果没有数据, 此数据不参与统计
try:
kwh_last = re_last_dic[cid]["kwh"]
kwh_rate = round_4((kwh - kwh_last) / kwh_last)
if kwh_last == 0:
continue
except Exception as e:
log.error(e)
log.info(
f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
continue
charge = round_2(info.get("charge"))
try:
charge_last = re_last_dic[cid]["charge"]
charge_rate = round_4((charge - charge_last) / charge_last)
if charge_last == 0:
continue
except Exception as e:
log.error(e)
log.info("本次有数据, 上周期没有数据")
log.info(
f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
continue
price = round_2(charge / kwh)
price_last = round_2(charge_last / kwh_last)
price_rate = round_4((price - price_last) / price_last)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh, "rate": kwh_rate})
charge_list.append(
{"name": cid_name, "value": charge, "rate": charge_rate})
price_list.append(
{"name": cid_name, "value": price, "rate": price_rate})
return kwh_list, charge_list, price_list
async def power_index_aggs_cid_proxy(start, end, cid_list, date_type):
"""power_aggs_cid_proxy缩减版, 没有增长率"""
# 1. 获取es结果
......@@ -247,7 +343,40 @@ async def power_index_aggs_cid_proxy(start, end, cid_list, date_type):
if kwh == 0:
continue
price = round_2(charge / kwh)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh})
charge_list.append({"name": cid_name, "value": charge})
price_list.append({"name": cid_name, "value": price})
return kwh_list, charge_list, price_list
async def power_index_cid_proxy_new15(start, end, cid_list, date_type):
"""power_aggs_cid_proxy缩减版, 没有增长率"""
# 1. 获取es结果
res = await query_charge_new15(start, end, cid_list)
if not res:
log.info(
f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
# 3. 构造返回
kwh_list = []
charge_list = []
price_list = []
# 3.1 查询出cid和工厂名对应关系
company_list = await company_by_cids(cid_list)
# 把cid提出来
com_dic = process_es_data(company_list, key="cid")
for info in res:
cid = info.get("cid")
cid_name = com_dic[cid]["shortname"] if cid in com_dic else ''
kwh = round_2(info.get("kwh"))
charge = round_2(info.get("charge"))
# 值为0不参与排名统计
if kwh == 0:
continue
price = round_2(charge / kwh)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh})
charge_list.append({"name": cid_name, "value": charge})
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import COMPANY_15MIN_POWER, SLOTS, SLOTS_15MIN
from unify_api.modules.elec_charge.components.elec_statistics_cps import \
SlotValue
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
power_charge_p_aggs
from unify_api.utils.common_utils import round_2
from unify_api.utils.es_query_body import EsQueryBody, es_process
from unify_api.utils.es_query_body import EsQueryBody, es_process, \
sql_time_process
from unify_api.utils.time_format import time_pick_transf
......@@ -65,6 +67,42 @@ async def proxy_today_yesterday_p(cid_list, start, end):
return sv
async def proxy_today_yesterday_p_new15(cid_list, start, end):
"""proxy首页今日和昨日负荷"""
sql = f"""
select create_time,sum(p) as p from company_15min_power
where cid in %s and create_time between %s and %s
group by create_time
order by create_time asc
"""
# 2.获取slots
intervel, slots = time_pick_transf(start, end)
# 3. 查询es
async with MysqlUtil() as conn:
res = await conn.fetchall(sql=sql, args=(cid_list, start, end))
if not res:
return SlotValue(slots=slots, value=[])
# 4.为了es结果和slots对应
es_re = sql_time_process(res, fmt="%H:%M", time_key='create_time')
sv = SlotValue() # 今日负荷对象
sv.slots = slots
tmp_list = []
# 5.拼接返回
for slot in slots:
if slot in es_re:
# 1.每个时间点,电量信息
value = es_re[slot].get("p")
# 值为0是正常数据
if value == 0:
tmp_list.append(0.0)
else:
tmp_list.append(value or "")
else:
tmp_list.append("")
sv.value = tmp_list
return sv
def by_slots(slots, es_re_dic):
# 拼接slot, value返回
kwh_list = []
......
......@@ -20,8 +20,8 @@ from unify_api.modules.elec_charge.components.elec_charge_cps import \
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, get_kwh_charge, query_charge_aggs_points_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy, \
power_index_aggs_cid_proxy
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy_new15, \
power_index_cid_proxy_new15, power_overview_proxy15
from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service
from unify_api.utils.common_utils import round_2, round_4, NumListHelper
......@@ -151,7 +151,7 @@ async def post_price_policy(req, body: PricePolicyReq) -> PricePolicyResp:
quarters = price.get("quarters")
price_info_dic = price
break
if len(quarters) != 96:
log.error("quarters config fail")
return PricePolicyResp(price_info=[])
......@@ -191,11 +191,13 @@ async def aver_elec_price_new15(start, end, point_id, cid, date_type):
return this_ck, None
# 上周期电量电费
start_last, end_last = last_time_str(start, end, date_type)
last_datas = await get_kwh_charge(table_name, name, value, start_last, end_last)
last_datas = await get_kwh_charge(table_name, name, value, start_last,
end_last)
last_ck = ChargeKwh()
last_sum_charge = last_datas.get("sum_charge")
last_sum_kwh = last_datas.get("sum_kwh")
this_ck.charge = round(last_sum_charge, 2) if last_sum_charge else last_sum_charge
this_ck.charge = round(last_sum_charge,
2) if last_sum_charge else last_sum_charge
this_ck.kwh = round(last_sum_kwh, 2) if last_sum_kwh else last_sum_kwh
return this_ck, last_ck
......@@ -311,8 +313,8 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
end = now_date
# 获取上一周期开始结束时间
start_last, end_last = last_time_str(start, end, date_type)
power, charge = await power_overview_proxy(start, end, cid_list)
power_last, charge_last = await power_overview_proxy(start_last, end_last,
power, charge = await power_overview_proxy15(start, end, cid_list)
power_last, charge_last = await power_overview_proxy15(start_last, end_last,
cid_list)
if not all([power, charge, power_last, charge_last]):
return PopResp(power=Spvf(), charge=Spvf())
......@@ -363,30 +365,30 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
"month")
this_month_p, this_month_charge = await power_overview_proxy(
this_month_p, this_month_charge = await power_overview_proxy15(
month_start, month_end, cid_list)
last_month_p, last_month_charge = await power_overview_proxy(
last_month_p, last_month_charge = await power_overview_proxy15(
last_month_start, last_month_end, cid_list)
if not all([this_month_p, this_month_charge, last_month_p,
last_month_charge]):
return MtpResp()
this_month_total_power = total_value(this_month_p)
last_month_total_power = total_value(last_month_p)
month_power_rate = (this_month_total_power -
last_month_total_power) / last_month_total_power
# 2. 今日/昨日数据
last_day_start, last_day_end = last_time_str(today_start, today_end, "day")
this_day_p, this_day_charge = await power_overview_proxy(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy(
this_day_p, this_day_charge = await power_overview_proxy15(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy15(
last_day_start, last_day_end, cid_list)
if not all([this_day_p, this_day_charge, last_day_p, last_day_charge]):
return MtpResp()
this_day_total_power = total_value(this_day_p)
last_day_total_power = total_value(last_day_p)
day_power_rate = \
......@@ -410,7 +412,7 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
date_type = body.date_type
if date_type == "range":
date_type = "month"
# 如果end是今天, 则end=当前时间, 避免增长率错误
end_tmp = end.split(" ")[0]
now_date, timestamp = srv_time()
......@@ -424,9 +426,8 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
end = now_date
# 2. 查询工厂电量电费信息
kwh_list, charge_list, price_list = await power_aggs_cid_proxy(start, end,
cid_list,
date_type)
kwh_list, charge_list, price_list = await power_aggs_cid_proxy_new15(
start, end, cid_list, date_type)
kwh_list_st = sorted(kwh_list, key=lambda i: i['value'], reverse=True)
charge_list_st = sorted(charge_list, key=lambda i: i['value'],
reverse=True)
......@@ -449,7 +450,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
return IpspResp()
today_start, today_end, month_start, month_end = today_month_date()
# 2. 获取今日数据
kwh_list_d, charge_list_d, price_list_d = await power_index_aggs_cid_proxy(
kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy_new15(
today_start, today_end, cid_list, "day")
kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'],
reverse=True)[:5]
......@@ -458,7 +459,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
price_list_d_st = sorted(price_list_d, key=lambda i: i['value'],
reverse=True)[:5]
# 2. 获取本月数据
kwh_list_m, charge_list_m, price_list_m = await power_index_aggs_cid_proxy(
kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy_new15(
month_start, month_end, cid_list, "month")
kwh_list_m_st = sorted(kwh_list_m, key=lambda i: i['value'],
reverse=True)[:5]
......
......@@ -17,8 +17,9 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
power_charge_p_aggs, power_charge_p_cid_aggs, histogram_aggs_points, \
power_charge_p_point_aggs
from unify_api.modules.elec_charge.procedures.elec_statis_proxy_pds import \
proxy_today_yesterday_p, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, power_charge
proxy_today_yesterday_p_new15, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, \
power_charge, power_charge_new15
from unify_api.utils.common_utils import process_es_data
from unify_api.utils.es_query_body import es_process
from unify_api.utils.time_format import last_time_str, proxy_power_slots, \
......@@ -49,26 +50,26 @@ async def post_power_statis_proxy(req,
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p(cid_list, start, end)
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p(cid_list,
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge(cid_list, point_id,
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, point_id,
last_start,
last_end,
date_type)
......@@ -78,7 +79,7 @@ async def post_power_statis_proxy(req,
last_aver_price=last_aver_price)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -91,17 +92,17 @@ async def post_power_statis_proxy(req,
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
# 负荷曲线
this_p = await proxy_today_yesterday_p(cid_list, start,
this_p = await proxy_today_yesterday_p_new15(cid_list, start,
end)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
......@@ -359,26 +360,26 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
date_type = "month"
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, -1, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p(cid_list, start, end)
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p(cid_list,
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge(cid_list, -1, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge(cid_list, -1,
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, -1,
last_start,
last_end,
date_type)
......
......@@ -13,7 +13,7 @@ from unify_api.modules.common.procedures.points import proxy_points, get_points
from unify_api.modules.common.procedures.power_cps import power_use_count, \
power_use_count_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
proxy_power
proxy_power, proxy_power15
from unify_api.modules.home_page.components.count_info_proxy_cps import (
CountInfoProxyResp,
ProxySecurityLevelCntResp,
......@@ -277,7 +277,7 @@ async def post_zhidian_info_proxy(req, body: ProductProxyReq) -> AipResp:
# 2. 监测点位
total_monitor = await proxy_points(cid_list)
# 3. 累计监测用电
total_power = await proxy_power(cid_list)
total_power = await proxy_power15(cid_list)
# 4. 用户接入总时长, 每个工厂接入时长总和
total_run_day = await total_run_day_proxy(cid_list)
return AipResp(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment