# -*- coding:utf-8 -*- from dataclasses import fields from pot_libs.sanic_api import summary, description, examples from pot_libs.logger import log from pot_libs.utils.exc_util import ParamException, BusinessException from unify_api.modules.common.components.common_cps import CidPointsReq from unify_api.modules.common.procedures import health_score from unify_api.modules.electric.service.electric_service import ( elec_current_storeys_service, qual_current_storeys_service, elec_card_level_service, qual_current_level_service, elec_index_service, elec_current_service ) from unify_api.utils import time_format from unify_api.modules.electric.procedures.electric_util import ( load_point_ctnum, add_random_change ) from pot_libs.common.components.query import PageRequest from unify_api.modules.electric.components.electric import ( ElecHistoryResponse, ElecCurrentResponse, qual_current_example, QualHistoryResponse, qual_history__example, ThreePhaseImbalance, QualCurrentResponse, VoltageHarmonicRate, CurrentHarmonicRate, VoltageDev, Power, U, I, elec_current_example, ElecIndexResponse, elec_index_example, elec_history_example, VoltageHarmonic, CurrentHarmonic, EcsReq, EscResp, QcsResp, EclResp, QclResp, ) from unify_api.utils.request_util import filed_value_from_list from unify_api.modules.electric.dao.electric_dao import ( get_qual_history_dao, get_elec_history_dao ) @summary("用电监测-实时监测-历史曲线") @description("包含负载率、有功/无功、功率因素、电压、电流、频率曲线") @examples(elec_history_example) async def post_elec_history(req, body: PageRequest) -> ElecHistoryResponse: # 1.获取intervel和时间轴 try: date_start = body.filter.ranges[0].start date_end = body.filter.ranges[0].end except: log.error("param error, ranges is NULL") raise ParamException(message="param error, ranges is NULL") try: intervel, slots = time_format.time_pick_transf(date_start, date_end) except: log.error("param error, date format error") raise ParamException(message="param error, date format error") point_id = filed_value_from_list(body.filter.equals, "point_id") if point_id <= 0 or not point_id: log.warning("param exception, equals is NULL, no point_id") raise ParamException( message="param exception, equals is NULL, no point_id") return await elec_history_service(date_start, date_end, point_id, intervel, slots) async def elec_history_service(start, end, pid, intervel, slots): ctnum = await load_point_ctnum(pid) if ctnum == 2: stats_items = [ "lf_mean", "pttl_mean", "qttl_mean", "costtl_mean", "uab_mean", "ucb_mean", "ia_mean", "ic_mean", "freq_mean" ] else: ctnum = 3 stats_items = [ "lf_mean", "pttl_mean", "qttl_mean", "costtl_mean", "ua_mean", "ub_mean", "uc_mean", "ia_mean", "ib_mean", "ic_mean", "freq_mean" ] if intervel == 900: table_name = "point_15min_electric" date_fmt = "%%H:%%i" else: table_name = "point_1day_electric" date_fmt = "%%m-%%d" datas = await get_elec_history_dao(table_name, pid, start, end, date_fmt) datas = {data["date_time"]: data for data in datas} elec_data = {stats_item: [] for stats_item in stats_items} for slot in slots: if slot in datas.keys(): for stats_item in stats_items: value = datas[slot].get(stats_item) value = value if value is not None else "" elec_data[stats_item].append(value) else: for stats_item in stats_items: elec_data[stats_item].append("") # 识电U只有一项有数据,返回具体的项 sdu_i = None sdu_u = None if ctnum == 2: u = U(uab=elec_data["uab_mean"], ucb=elec_data["ucb_mean"]) i = I(ia=elec_data["ia_mean"], ic=elec_data["ic_mean"]) if any(elec_data["uab_mean"]) and not any(elec_data["ucb_mean"]): sdu_i, sdu_u = "ia", "uab" if any(elec_data["ucb_mean"]) and not any(elec_data["uab_mean"]): sdu_i, sdu_u = "ic", "ucb" else: u = U(ua=elec_data["ua_mean"], ub=elec_data["ub_mean"], uc=elec_data["uc_mean"]) i = I(ia=elec_data["ia_mean"], ib=elec_data["ib_mean"], ic=elec_data["ic_mean"]) if ( any(elec_data["ua_mean"]) and not any(elec_data["ub_mean"]) and not any(elec_data["uc_mean"]) ): sdu_i, sdu_u = "ia", "ua" if ( any(elec_data["ub_mean"]) and not any(elec_data["ua_mean"]) and not any(elec_data["uc_mean"]) ): sdu_i, sdu_u = "ib", "ub" if ( any(elec_data["uc_mean"]) and not any(elec_data["ua_mean"]) and not any(elec_data["ub_mean"]) ): sdu_i, sdu_u = "ic", "uc" return ElecHistoryResponse( ctnum=ctnum, lf=elec_data["lf_mean"], power=Power(p=elec_data["pttl_mean"], q=elec_data["qttl_mean"]), costtl=elec_data["costtl_mean"], u=u, i=i, freq=elec_data["freq_mean"], time_slots=slots, sdu_i=sdu_i, sdu_u=sdu_u, ) @summary("用电监测-实时监测-实时参数") @examples(elec_current_example) async def post_elec_current(req, body: PageRequest) -> ElecCurrentResponse: point_id = filed_value_from_list(body.filter.equals, "point_id") if point_id <= 0 or not point_id: log.warning("param exception, equals is NULL, no point_id") raise ParamException(message="param exception, equals NULL, no pid") try: time_str, res = await elec_current_service(point_id) res = res or {} # 加些随机变化(防止数据一直不变化) for k, v in res.items(): if isinstance(v, (str, int)): continue res[k] = add_random_change(v) except Exception as e: log.error(f"post_elec_current service error:{str(e)}") raise BusinessException(message=f"{str(e)}") return ElecCurrentResponse(real_time=time_str, **{k: v for k, v in res.items() if k in [field.name for field in fields(ElecCurrentResponse)]}, ) @summary("指标统计-指标统计-常规参数+电能质量") @examples(elec_index_example) async def post_elec_index(req, body: PageRequest) -> ElecIndexResponse: cid = req.json.get("cid") # 1. 获取point_id point_id = filed_value_from_list(body.filter.equals, "point_id") if not point_id or point_id <= 0: msg = "param exception, equals is NULL, no point_id" log.warning(msg) raise ParamException(message=msg) # 3. 获取常规参数统计和电能质量统计 # 获取时间 try: date_start = body.filter.ranges[0].start date_end = body.filter.ranges[0].end except: log.error("param error, ranges is NULL") raise ParamException(message="param error, ranges is NULL") return await elec_index_service(cid, point_id, date_start, date_end) @summary("电能质量-历史曲线") @description("包含电压偏差、频率偏差、三相不平衡、谐波畸变率曲线") @examples(qual_history__example) async def post_qual_history(req, body: PageRequest) -> QualHistoryResponse: # 1.获取intervel和时间轴 try: date_start = body.filter.ranges[0].start date_end = body.filter.ranges[0].end except: log.error("param error, ranges is NULL") raise ParamException(message="param error, ranges is NULL") try: intervel, slots = time_format.time_pick_transf(date_start, date_end) except: log.error("param error, date format error") raise ParamException(message="param error, date format error") point_id = filed_value_from_list(body.filter.equals, "point_id") if not point_id or point_id <= 0: msg = "param exception, equals is NULL, no point_id" log.warning(msg) raise ParamException(message=msg) return await qual_history_service(date_start, date_end, intervel, slots, point_id) async def qual_history_service(start, end, intervel, slots, pid): ctnum = await load_point_ctnum(pid) if intervel == 900: table_name = "point_15min_electric" date_fmt = "%%H:%%i" else: table_name = "point_1day_electric" date_fmt = "%%m-%%d" datas = await get_qual_history_dao(table_name, pid, start, end, date_fmt) datas = {data["date_time"]: data for data in datas} if ctnum == 2: # 用于计算电流谐波有效值 stats_items = [ "fdia_mean", "fdic_mean", "uab_dev_mean", "ucb_dev_mean", "freq_dev_mean", "ubl_mean", "ibl_mean", "thduab_mean", "thducb_mean", "thdia_mean", "thdic_mean", "hr3ia_mean", "hr5ia_mean", "hr7ia_mean", "hr9ia_mean", "hr11ia_mean", "hr13ia_mean", "hr3ic_mean", "hr5ic_mean", "hr7ic_mean", "hr9ic_mean", "hr11ic_mean", "hr13ic_mean", "hr3uab_mean", "hr5uab_mean", "hr7uab_mean", "hr9uab_mean", "hr11uab_mean", "hr13uab_mean", "hr3ucb_mean", "hr5ucb_mean", "hr7ucb_mean", "hr9ucb_mean", "hr11ucb_mean", "hr13ucb_mean", ] else: ctnum = 3 # 用于计算电流谐波有效值 stats_items = [ "fdia_mean", "fdib_mean", "fdic_mean", "ua_dev_mean", "ub_dev_mean", "uc_dev_mean", "freq_dev_mean", "ubl_mean", "ibl_mean", "thdua_mean", "thdub_mean", "thduc_mean", "thdia_mean", "thdib_mean", "thdic_mean", "hr3ia_mean", "hr5ia_mean", "hr7ia_mean", "hr9ia_mean", "hr11ia_mean", "hr13ia_mean", "hr3ib_mean", "hr5ib_mean", "hr7ib_mean", "hr9ib_mean", "hr11ib_mean", "hr13ib_mean", "hr3ic_mean", "hr5ic_mean", "hr7ic_mean", "hr9ic_mean", "hr11ic_mean", "hr13ic_mean", "hr3ua_mean", "hr5ua_mean", "hr7ua_mean", "hr9ua_mean", "hr11ua_mean", "hr13ua_mean", "hr3ub_mean", "hr5ub_mean", "hr7ub_mean", "hr9ub_mean", "hr11ub_mean", "hr13ub_mean", "hr3uc_mean", "hr5uc_mean", "hr7uc_mean", "hr9uc_mean", "hr11uc_mean", "hr13uc_mean" ] elec_data = {stats_item: [] for stats_item in stats_items} for slot in slots: if slot in datas.keys(): for stats_item in stats_items: value = datas[slot].get(stats_item, "") if value and stats_item == "freq_dev_mean": # 如果频率偏差保留两位小数之后为0了,那么直接返回0,防止出现-0.00 的情况 if abs(value) < 0.05: value = 0 elec_data[stats_item].append(value) else: for stats_item in stats_items: elec_data[stats_item].append("") voltage_dev = VoltageDev( **{ k.rsplit("_", 1)[0]: v for k, v in elec_data.items() if k.rsplit("_", 1)[0] in [field.name for field in fields(VoltageDev)] } ) three_phase_unbalance = ThreePhaseImbalance( voltage=elec_data["ubl_mean"], current=elec_data["ibl_mean"] ) voltage_harmonic = VoltageHarmonic( **{ k.rsplit("_", 1)[0]: v for k, v in elec_data.items() if k.rsplit("_", 1)[0] in [field.name for field in fields(VoltageHarmonic)] } ) # 新增电流谐波有效值 = 电流*畸变率 前端处理 current_harmonic = CurrentHarmonic( **{ k.rsplit("_", 1)[0]: v for k, v in elec_data.items() if k.rsplit("_", 1)[0] in [field.name for field in fields(CurrentHarmonic)] } ) # 识电U只有一项有数据,返回具体的项 sdu_i = None sdu_u = None if ctnum == 2: if any(elec_data["uab_dev_mean"]) and not any( elec_data["ucb_dev_mean"]): sdu_i = "ia" sdu_u = "uab" if any(elec_data["ucb_dev_mean"]) and not any( elec_data["uab_dev_mean"]): sdu_i = "ic" sdu_u = "ucb" else: if ( any(elec_data["ua_dev_mean"]) and not any(elec_data["ub_dev_mean"]) and not any(elec_data["uc_dev_mean"]) ): sdu_i = "ia" sdu_u = "ua" if ( any(elec_data["ub_dev_mean"]) and not any(elec_data["ua_dev_mean"]) and not any(elec_data["uc_dev_mean"]) ): sdu_i = "ib" sdu_u = "ub" if ( any(elec_data["uc_dev_mean"]) and not any(elec_data["ua_dev_mean"]) and not any(elec_data["ub_dev_mean"]) ): sdu_i = "ic" sdu_u = "uc" return QualHistoryResponse( ctnum=ctnum, voltage_dev=voltage_dev, freq_dev=elec_data["freq_dev_mean"], three_phase_unbalance=three_phase_unbalance, voltage_harmonic=voltage_harmonic, current_harmonic=current_harmonic, time_slots=slots, sdu_i=sdu_i, sdu_u=sdu_u, ) @summary("电能质量-实时参数") @description("包含用电健康指数、电压偏差、频率偏差、三相不平衡度、谐波畸变率") @examples(qual_current_example) async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse: try: point_id = filed_value_from_list(body.filter.equals, "point_id") cid = req.json["cid"] except Exception as e: log.warning(f"param exception, equals is NULL, no pid, e:{e}") raise ParamException(message="param exception, equals is NULL, no pid") if point_id <= 0 or not point_id or cid <= 0: log.warning("param exception, equals is NULL, no point_id") raise ParamException(message="param exception, equals is NULL, no pid") try: time_str, d_rt_ele = await elec_current_service(point_id) if d_rt_ele is None: d_rt_ele = {} # 加些随机变化(防止数据一直不变化) for k, v in d_rt_ele.items(): if isinstance(v, (str, int)): continue d_rt_ele[k] = add_random_change(v) except Exception as e: log.error(f"post_qual_current service error:{str(e)}") raise BusinessException(message=f"{str(e)}") d_vol_harm, d_cur_harm = {}, {} for k in [field.name for field in fields(VoltageHarmonicRate)]: d_vol_harm[k] = d_rt_ele.get(k) or "" voltage_harmonic = VoltageHarmonicRate(**d_vol_harm) for k in [field.name for field in fields(CurrentHarmonicRate)]: d_cur_harm[k] = d_rt_ele.get(k) or "" current_harmonic = CurrentHarmonicRate(**d_cur_harm) health_index = await health_score.load_health_index(cid, point_id) ret_items = ["ua_dev", "ub_dev", "uc_dev", "uab_dev", "ucb_dev", "freq_dev", "ubl", "ibl", "sdu_i", "sdu_u"] return QualCurrentResponse( ctnum=d_rt_ele.get("ctnum") or 3, real_time=time_str, health_index=health_index, voltage_harmonic=voltage_harmonic, current_harmonic=current_harmonic, **{k: v for k, v in d_rt_ele.items() if k in ret_items}, ) @summary("用电监测-实时监测-楼层") async def post_elec_current_storeys(req, body: EcsReq) -> EscResp: cid, storeys = body.cid, body.storeys return await elec_current_storeys_service(cid, storeys) @summary("电能质量-实时参数-楼层") async def post_qual_current_storeys(req, body: EcsReq) -> QcsResp: cid, storeys = body.cid, body.storeys return await qual_current_storeys_service(cid, storeys) @summary("用电监测-卡片信息-level") async def post_elec_card_level(req, body: CidPointsReq) -> EclResp: point_list = body.point_list return await elec_card_level_service(point_list) @summary("电能质量-卡片信息-level") async def post_qual_current_level(req, body: CidPointsReq) -> QclResp: point_list = body.point_list return await qual_current_level_service(point_list)