import json import pendulum import io import pandas as pd from pot_libs.common.components.query import PageRequest from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.sanic_api import summary, examples from pot_libs.utils.exc_util import ParamException from unify_api.constants import PRODUCT from unify_api.modules.common.components.common_cps import LevelResp from unify_api.modules.common.dao.common_dao import company_by_cids from unify_api.modules.common.procedures.cids import get_proxy_cids from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.elec_charge.components.elec_charge_cps import \ power_overview_example, PricePolicyReq, PricePolicyResp, \ PricePolicy, AverPriceReq, PowerViewRes, Spvf, AverPriceResp, ChargeKwh, \ IndexChargeReq, IndexChargeResp, PopReq, PopResp, MtpResp, PspResp, \ IpspResp, KpReq, KpResp, KclReq, ProductProxyReq, LoadInfoReq, LoadInfoResp from unify_api.modules.elec_charge.dao.elec_charge_dao import \ get_kwh_charge, query_charge_aggs_points from unify_api.modules.elec_charge.procedures.elec_charge_pds import \ quarters_trans, total_value, \ power_aggs_cid_proxy, power_index_cid_proxy, power_overview_proxy from unify_api.modules.elec_charge.service.elec_charge_service import \ kwh_points_service, kwh_card_level_service, load_info_service from unify_api.modules.users.procedures.jwt_user import jwt_user from unify_api.utils.common_utils import round_2, round_4, NumListHelper from unify_api.utils.request_util import filed_value_from_list from unify_api.utils.time_format import last_time_str, today_month_date, \ srv_time async def power_overview(start, end, point_id, cid): if point_id == -1: # 选的全部 sql = f"SELECT spfv, sum(p), sum(kwh), sum(charge) " \ f"FROM company_15min_power where create_time " \ f"BETWEEN '{start}' and '{end}' AND cid=%s GROUP BY spfv" async with MysqlUtil() as conn: datas = await conn.fetchall(sql=sql, args=(cid,)) else: sql = f"SELECT spfv, sum(p), sum(kwh), sum(charge) " \ f"FROM point_15min_power where create_time " \ f"BETWEEN '{start}' and '{end}' AND pid=%s GROUP BY spfv" async with MysqlUtil() as conn: datas = await conn.fetchall(sql=sql, args=(point_id,)) pv1 = Spvf() # 电量对象 pv2 = Spvf() # 电费对象 if not datas: return pv1, pv2 # 4. 构造返回 for data in datas: if data.get("spfv") == "s": pv1.s = round(data.get("sum(kwh)"), 2) pv2.s = round(data.get("sum(charge)"), 2) elif data.get("spfv") == "p": pv1.p = round(data.get("sum(kwh)"), 2) pv2.p = round(data.get("sum(charge)"), 2) elif data.get("spfv") == "f": pv1.f = round(data.get("sum(kwh)"), 2) pv2.f = round(data.get("sum(charge)"), 2) elif data.get("spfv") == "v": pv1.v = round(data.get("sum(kwh)"), 2) pv2.v = round(data.get("sum(charge)"), 2) return pv1, pv2 @summary('电量电费信息') @examples(power_overview_example) async def post_power_overview(req, body: PageRequest) -> PowerViewRes: """电量电费信息""" # todo 获取参数是否需要修改 # 1. 获取参数 point_id = filed_value_from_list(body.filter.equals, "point_id") cid = filed_value_from_list(body.filter.equals, "cid") date_start = body.filter.ranges[0].start date_end = body.filter.ranges[0].end if date_start == date_end: return PowerViewRes(power=Spvf(), charge=Spvf()) pv1, pv2 = await power_overview(date_start, date_end, point_id, cid) return PowerViewRes(power=pv1, charge=pv2) @summary('分时电价') async def post_price_policy(req, body: PricePolicyReq) -> PricePolicyResp: """分时电价""" cid = body.cid # 1. 查询price_policy表 sql = "select time_range, quarters, price_s, price_p, price_f, price_v " \ "from price_policy where cid = %s ORDER BY start_month DESC" async with MysqlUtil() as conn: price_info = await conn.fetchall(sql=sql, args=(cid,)) if not price_info: return PricePolicyResp(price_info=[]) # 2. 解析判断月份 quarters = "" price_info_dic = None for price in price_info: time_range = price.get("time_range") time_list = NumListHelper.parse_comma_delimited_num(time_range) now_month = pendulum.now().month if now_month in time_list: quarters = price.get("quarters") price_info_dic = price break if len(quarters) != 96: log.error("quarters config fail") return PricePolicyResp(price_info=[]) # 2. 分时电价转换工具 dic_re = quarters_trans(quarters) # 3. 构造返回 res_list = [] for i in "spfv": pp = PricePolicy() pp.term = i # spfv -> 尖峰平谷 pp.period = dic_re.get(i) # 收费时段 pp.duration = dic_re.get(i + "dt") # 收费时长 ele_price = price_info_dic.get("price_" + i) pp.ele_price = round(ele_price, 2) if ele_price else None if not pp.duration or not pp.ele_price: continue res_list.append(pp) return PricePolicyResp(price_info=res_list) async def avg_ele_price(start, end, point_id, cid, date_type): if point_id == -1: # 选的全部 table_name = "company_15min_power" name = "cid" value = cid else: table_name = "point_15min_power" name = "pid" value = point_id this_datas = await get_kwh_charge(table_name, name, value, start, end) this_ck = ChargeKwh() sum_charge = this_datas.get("sum_charge") sum_kwh = this_datas.get("sum_kwh") this_ck.charge = round(sum_charge, 2) if sum_charge else sum_charge this_ck.kwh = round(sum_kwh, 2) if sum_kwh else sum_kwh if date_type == "range": return this_ck, None # 上周期电量电费 start_last, end_last = last_time_str(start, end, date_type) last_datas = await get_kwh_charge(table_name, name, value, start_last, end_last) last_ck = ChargeKwh() last_sum_charge = last_datas.get("sum_charge") last_sum_kwh = last_datas.get("sum_kwh") last_ck.charge = round(last_sum_charge, 2) if last_sum_charge else last_sum_charge last_ck.kwh = round(last_sum_kwh, 2) if last_sum_kwh else last_sum_kwh return this_ck, last_ck @summary('平均电价和增长率') async def post_aver_elec_price(req, body: AverPriceReq) -> AverPriceResp: """平均电价, 增长率""" # 1.获取参数 cid = body.cid point_id = body.point_id start = body.start end = body.end date_type = body.date_type if start == end: return AverPriceResp(this_power=ChargeKwh(), last_power=ChargeKwh()) this_ck, last_ck = await avg_ele_price(start, end, point_id, cid, date_type) return AverPriceResp(this_power=this_ck, last_power=last_ck) @summary('首页今日本月电量电费') async def post_index_charge(req, body: IndexChargeReq) -> IndexChargeResp: cid = body.cid pid = body.point_id s_today, e_today, s_month, e_month = today_month_date() # 1. 今日电量电费spvf kwh_t, charge_t = await power_overview(s_today, e_today, pid, cid) today_spvf = PowerViewRes(power=kwh_t, charge=charge_t) # 2. 本月电量电费spvf kwh_m, charge_m = await power_overview(s_month, e_month, pid, cid) month_spvf = PowerViewRes(power=kwh_m, charge=charge_m) # 3. 今日平均电价和增长率 this_ck_t, last_ck_t = await avg_ele_price(s_today, e_today, pid, cid, date_type="day") today_power = AverPriceResp(this_power=this_ck_t, last_power=last_ck_t) # 4. 本月平均电价和增长率 this_ck_m, last_ck_m = await avg_ele_price(s_month, e_month, pid, cid, date_type="month") month_power = AverPriceResp(this_power=this_ck_m, last_power=last_ck_m) return IndexChargeResp(today_spvf=today_spvf, month_spvf=month_spvf, today_power=today_power, month_power=month_power) @summary('电量电费信息-管理版') async def post_power_overview_proxy(req, body: PopReq) -> PopResp: # 1. 获取参数 cid_list = body.cid_list proxy_id = body.proxy_id host = req.host product = PRODUCT.get(host) user_id = jwt_user(req) # 全部工厂 if not cid_list: log.info(f"power_overview_proxy根据用户userId:{user_id} " f"product:{product}查询所有工厂, proxy:{proxy_id}") cid_list = await get_proxy_cids(user_id, product, proxy_id) start = body.start end = body.end date_type = body.date_type if date_type == "range": date_type = "month" # 如果end是今天, 则end=当前时间, 避免增长率错误 end_tmp = end.split(" ")[0] now_date, timestamp = srv_time() now_tmp = now_date.split(" ")[0] if now_tmp == end_tmp: end = now_date # 如果date_type是month, 且end是本月, 则end=当前时间 now = pendulum.now() month_end = str(now.end_of('month').format("YYYY-MM-DD HH:mm:ss")) if date_type == "month" and end == month_end: end = now_date # 获取上一周期开始结束时间 start_last, end_last = last_time_str(start, end, date_type) power, charge = await power_overview_proxy(start, end, cid_list) power_last, charge_last = await power_overview_proxy(start_last, end_last, cid_list) if not all([power, charge, power_last, charge_last]): return PopResp(power=Spvf(), charge=Spvf()) total_power = total_value(power) total_charge = total_value(charge) total_power_last = total_value(power_last) total_charge_last = total_value(charge_last) # 增长率 power_rate = (total_power - total_power_last) / total_power_last \ if total_charge_last else 0 charge_rate = (total_charge - total_charge_last) / total_charge_last \ if total_charge_last else 0 # 平均电价, 对比上期 avg_price = (total_charge / total_power) if total_power else 0 avg_price_last = (total_charge_last / total_power_last) \ if total_power_last else 0 price_rate = (avg_price - avg_price_last) / avg_price_last \ if avg_price_last else 0 return PopResp(power=power, charge=charge, this_power=round_2(total_power), this_charge=round_2(total_charge), last_power=round_2(total_power_last), last_charge=round_2(total_charge_last), power_rate=round(power_rate, 4), charge_rate=round(charge_rate, 4), avg_price=round_2(avg_price), price_rate=round(price_rate, 4) ) @summary('本月今日用电-管理版首页') async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp: # 1. 获取参数 host = req.host product = PRODUCT.get(host) user_id = jwt_user(req) # cid_list = await get_cids(user_id, product) proxy_id = body.proxy_id cid_list = await get_proxy_cids(user_id, product, proxy_id) \ if proxy_id else None # 全部工厂 if not cid_list: log.info(f"未查询到工厂userId:{user_id},product:{product}" f",proxy_id:{proxy_id}") return MtpResp() s_today, e_today, s_month, e_month = today_month_date() # 2. 本月/上月数据 last_month_start, last_month_end = last_time_str(s_month, e_month, "month", True) this_month_p, this_month_charge = await power_overview_proxy( s_month, e_month, cid_list) last_month_p, last_month_charge = await power_overview_proxy( last_month_start, last_month_end, cid_list) if not all([this_month_p, this_month_charge, last_month_p, last_month_charge]): return MtpResp() this_month_total_power = total_value(this_month_p) last_month_total_power = total_value(last_month_p) month_power_rate = (this_month_total_power - last_month_total_power) / last_month_total_power # 2. 今日/昨日数据 last_day_start, last_day_end = last_time_str(s_today, e_today, "day") this_day_p, this_day_charge = await power_overview_proxy(s_today, e_today, cid_list) last_day_p, last_day_charge = await power_overview_proxy(last_day_start, last_day_end, cid_list) if not all([this_day_p, this_day_charge, last_day_p, last_day_charge]): return MtpResp() this_day_total_power = total_value(this_day_p) last_day_total_power = total_value(last_day_p) day_power_rate = \ (this_day_total_power - last_day_total_power) / last_day_total_power \ if last_day_total_power else 0 return MtpResp(this_month_power=round_2(this_month_total_power), last_month_power=round_2(last_month_total_power), month_power_rate=round_4(month_power_rate), this_day_power=round_2(this_day_total_power), last_day_power=round_2(last_day_total_power), day_power_rate=round_4(day_power_rate) ) @summary('用电排名-管理版') async def post_power_sort_proxy(req, body: PopReq) -> PspResp: # 1. 获取参数 cid_list = body.cid_list start = body.start end = body.end date_type = body.date_type if date_type == "range": date_type = "month" # 如果end是今天, 则end=当前时间, 避免增长率错误 end_tmp = end.split(" ")[0] now_date, timestamp = srv_time() now_tmp = now_date.split(" ")[0] if now_tmp == end_tmp: end = now_date # 如果date_type是month, 且end是本月, 则end=当前时间 now = pendulum.now() month_end = str(now.end_of('month').format("YYYY-MM-DD HH:mm:ss")) if date_type == "month" and end == month_end: end = now_date # 2. 查询工厂电量电费信息 kwh_list, charge_list, price_list = await power_aggs_cid_proxy(start, end, cid_list, date_type) kwh_list_st = sorted(kwh_list, key=lambda i: i['value'], reverse=True) charge_list_st = sorted(charge_list, key=lambda i: i['value'], reverse=True) price_list_st = sorted(price_list, key=lambda i: i['value'], reverse=True) return PspResp(kwh=kwh_list_st, charge=charge_list_st, price=price_list_st) @summary('首页本月今日-用电排名-管理版') async def post_index_power_sort_proxy(req) -> IpspResp: # 1. 获取参数 product = PRODUCT.get(req.host) user_id = req.ctx.user_id proxy_id = req.json.get("proxy_id") # cid_list = await get_cids(user_id, product) cid_list = await get_proxy_cids(user_id, product, proxy_id) \ if proxy_id else None # 全部工厂 if not cid_list: log.info(f"未查询到工厂, userId:{user_id} product:{product}") return IpspResp() s_today, e_today, s_month, e_month = today_month_date() # 2. 获取今日数据 kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy( s_today, e_today, cid_list, "day") kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'], reverse=True)[ :5] charge_list_d_st = sorted(charge_list_d, key=lambda i: i['value'], reverse=True)[:5] price_list_d_st = sorted(price_list_d, key=lambda i: i['value'], reverse=True)[:5] # 2. 获取本月数据 kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy( s_month, e_month, cid_list, "month") kwh_list_m_st = sorted(kwh_list_m, key=lambda i: i['value'], reverse=True)[:5] charge_list_m_st = sorted(charge_list_m, key=lambda i: i['value'], reverse=True)[:5] price_list_m_st = sorted(price_list_m, key=lambda i: i['value'], reverse=True)[:5] return IpspResp(day_kwh=kwh_list_d_st, month_kwh=kwh_list_m_st, day_charge=charge_list_d_st, month_charge=charge_list_m_st, day_price=price_list_d_st, month_price=price_list_m_st ) @summary('电量统计-监测点') async def post_kwh_points(req, body: KpReq) -> KpResp: """监测点电量""" cid = body.cid storeys = body.storeys start = body.start end = body.end return await kwh_points_service(cid, start, end, storeys) @summary('电量统计-监测点-下载excel') async def get_kwh_points_download(req): output = io.BytesIO() writer = pd.ExcelWriter(output, engine='xlsxwriter') # 1. 获取参数 args = req.args cid = args.get("cid") storeys = json.loads(args.get("storeys")) start = args.get("start") end = args.get("end") # 2.根据storeys获取points信息 point_list = await points_by_storeys(storeys) # 获取point_id列表 points = [i.get("point_id") for i in point_list] # 3. es查询数据 es_res = await query_charge_aggs_points(start, end, points) es_res = {i["pid"]: i for i in es_res if es_res} # 4.返回数据 storey_name_list = [] room_name_list = [] kwh_list = [] for info in point_list: storey_name = info.get("storey_name") point_id = info.get("point_id") room_name = info.get("room_name") if point_id in es_res: kwh = round_2(es_res[point_id]["kwh"]) else: kwh = "" storey_name_list.append(storey_name) room_name_list.append(room_name) kwh_list.append(kwh) dict_tmp = { "楼层/楼栋": storey_name_list, "户号": room_name_list, "电量(kWh)": kwh_list, } df = pd.DataFrame(dict_tmp) df.to_excel(writer, sheet_name="电量信息", index=False) writer.save() output.seek(0) # 获取cid_name cid_info = await company_by_cids([cid]) if cid_info: cid_name = cid_info[0]["shortname"] else: raise ParamException(message=f"cid找不到工厂") excel_name = f"{cid_name}- {start[:10]}~{end[:10]}电量信息.xlsx" return output, excel_name @summary("电量电费-卡片信息-level") async def post_kwh_card_level(req, body: KclReq) -> LevelResp: cid = body.cid point_list = body.point_list start = body.start end = body.end return await kwh_card_level_service(cid, point_list, start, end) @summary("获取知电管理版首页负荷信息") async def post_load_info(request, body: LoadInfoReq) -> LoadInfoResp: # 1. 获取company_id # 1. 获取参数 product = PRODUCT.get(request.host) user_id = jwt_user(request) proxy_id = body.proxy_id cid_list = await get_proxy_cids(user_id, product, proxy_id) \ if proxy_id else None # 全部工厂 if not cid_list: log.info(f"未查询到工厂, userId:{user_id} product:{product}") return LoadInfoResp() try: # 实时负荷,昨日同时负荷,对比昨日 current_load, yesterday_load, load_percent = await load_info_service( cid_list) except Exception as e: log.exception(e) return LoadInfoResp().server_error() return LoadInfoResp( current_load=current_load, yesterday_load=yesterday_load, load_percent=load_percent )