import json import math import pandas as pd import pendulum from pot_libs.settings import SETTING from pot_libs.logger import log from pot_libs.utils.exc_util import BusinessException from pot_libs.aredis_util.aredis_utils import RedisUtils from unify_api.constants import ( POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, LF_THRESHOLD, THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD ) from unify_api.modules.common.procedures.points import points_by_storeys, \ get_meter_by_point from unify_api.modules.electric.dao.electric_dao import \ monitor_point_join_by_points, get_electric_datas_dao from unify_api.utils.common_utils import round_2, round_4, multiplication_two from unify_api.modules.electric.procedures.electric_util import \ load_point_ctnum from datetime import datetime from unify_api.constants import REAL_EXP_TIME, REAL_HR_EXP_TIME from unify_api.utils.time_format import CST, YMD_Hms, timestamp2dts from unify_api.modules.common.procedures.location_temp_rcurrent import \ location_stats_statics from unify_api.modules.electric.components.electric import ( ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp, ) async def elec_current_storeys_service(cid, storeys): """用电监测-实时监测-楼层""" # 1.根据storeys获取points信息 point_list = await points_by_storeys(cid, storeys) mtids = [i.get("mtid") for i in point_list] d_rt_ele = await batch_load_rt_ele(mtids) elec_data = {} for info in point_list: storey_name = info.get("storey_name") storey_id = info.get("storey_id") point_id = info.get("point_id") room_name = info.get("room_name") mtid = info.get("mtid") ctnum = info.get("ctnum") rt_ele = d_rt_ele.get(mtid, None) if rt_ele: # 识电U只有一项有数据,返回具体的项 rt_ele = get_sdu_i_and_u(rt_ele, ctnum) time_str = timestamp2dts(rt_ele["ts"], YMD_Hms) res_dic = { "room_name": room_name, "storey_id": storey_id, "point_id": point_id, "ctnum": ctnum, "real_time": time_str, "ua": round_2(rt_ele.get("ua")), "ia": round_2(rt_ele.get("ia")), "ub": round_2(rt_ele.get("ub")), "ib": round_2(rt_ele.get("ib")), "uc": round_2(rt_ele.get("uc")), "ic": round_2(rt_ele.get("ic")), "pttl": round_2(rt_ele.get("pttl")), "qttl": round_2(rt_ele.get("qttl")), "freq": round_2(rt_ele.get("freq")), "costtl": round_2(rt_ele.get("costtl")), "lf": round_2(rt_ele.get("lf")), "sdu_i": rt_ele.get("sdu_i"), "sdu_u": rt_ele.get("sdu_u"), } else: res_dic = { "room_name": room_name, "point_id": point_id, "storey_id": storey_id, "ctnum": ctnum, "real_time": "", "ua": "", "ia": "", "ub": "", "ib": "", "uc": "", "ic": "", "pttl": "", "qttl": "", "freq": "", "costtl": "", "lf": "", "sdu_i": "", "sdu_u": "", } if storey_name in elec_data: elec_data[storey_name].append(res_dic) else: elec_data[storey_name] = [res_dic] # 转换成list格式, 可以按照storey_name排序 if elec_data: # 房间排序, 并返回数据转化为list elec_list = [{"name": key, "storey_id": value[0]["storey_id"], "room_data": sorted(value, key=lambda i: i["room_name"])} for key, value in elec_data.items()] # 楼层排序 elec_list = sorted(elec_list, key=lambda x: x["storey_id"]) else: elec_list = [] return EscResp(elec_data=elec_list) async def qual_current_storeys_service(cid, storeys): """电能质量-实时参数-楼层""" # 1.根据storeys获取points信息 point_list = await points_by_storeys(cid, storeys) mtids = [point["mtid"] for point in point_list if point["mtid"]] d_rt_ele = await batch_load_rt_ele_with_hr(mtids) # 4. 返回数据 qual_data = {} for info in point_list: storey_name = info.get("storey_name") storey_id = info.get("storey_id") point_id = info.get("point_id") room_name = info.get("room_name") mtid = info.get("mtid") ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3 rt_ele = d_rt_ele.get(mtid, None) if rt_ele: rt_ele = get_sdu_i_and_u(rt_ele, ctnum) time_str = timestamp2dts(rt_ele["ts"], YMD_Hms) res_dic = { "room_name": room_name, "storey_id": storey_id, "point_id": point_id, "ctnum": ctnum, "real_time": time_str, # 电流/电压谐波畸变率 "thdia": round_4(rt_ele.get("thdia")), "thdib": round_4(rt_ele.get("thdib")), "thdic": round_4(rt_ele.get("thdic")), "thdua": round_4(rt_ele.get("thdua")), "thdub": round_4(rt_ele.get("thdub")), "thduc": round_4(rt_ele.get("thduc")), # 电压偏差 "ua_dev": round_4(rt_ele.get("ua_dev")), "ub_dev": round_4(rt_ele.get("ub_dev")), "uc_dev": round_4(rt_ele.get("uc_dev")), "sdu_i": rt_ele.get("sdu_i"), "sdu_u": rt_ele.get("sdu_u"), } else: res_dic = { "room_name": room_name, "storey_id": storey_id, "point_id": point_id, "ctnum": ctnum, "real_time": "", "thdia": "", "thdib": "", "thdic": "", "thdua": "", "thdub": "", "thduc": "", "ua_dev": "", "ub_dev": "", "uc_dev": "", "sdu_i": "", "sdu_u": "", } if storey_name in qual_data: qual_data[storey_name].append(res_dic) else: qual_data[storey_name] = [res_dic] # 转换成list格式, 可以按照storey_name排序 if qual_data: # 房间排序, 并返回数据转化为list lst_qual = [{"name": key, "storey_id": value[0]["storey_id"], "room_data": sorted(value, key=lambda i: i["room_name"])} for key, value in qual_data.items()] # 楼层排序 lst_qual = sorted(lst_qual, key=lambda x: x["storey_id"]) else: lst_qual = [] return QcsResp(qual_data=lst_qual) async def elec_card_level_service(point_list): """用电监测-卡片信息-level""" # 1. 获取每个point_id的详细信息 d_point_info = await monitor_point_join_by_points(point_list) mtids = [p_info["mtid"] for p_info in d_point_info if p_info["mtid"]] d_rt_ele = await batch_load_rt_ele(mtids) # 4. 返回数据 ret_data = {"inline": [], "transformer": [], "feeder": [], "power_dist": [], "device": [] } for info in d_point_info: m_name = info.get("name") m_type = POINT_LEVEL_MAP[info.get("m_type")] point_id = info.get("pid") mtid = info.get("mtid") ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3 rt_ele = d_rt_ele.get(mtid, None) if rt_ele: rt_ele = get_sdu_i_and_u(rt_ele, ctnum) time_str = timestamp2dts(rt_ele["ts"], YMD_Hms) res_dic = { "name": m_name, "point_id": point_id, "ctnum": ctnum, "real_time": time_str, "ua": round_2(rt_ele.get("ua")), "ia": round_2(rt_ele.get("ia")), "ub": round_2(rt_ele.get("ub")), "ib": round_2(rt_ele.get("ib")), "uc": round_2(rt_ele.get("uc")), "ic": round_2(rt_ele.get("ic")), "pttl": round_2(rt_ele.get("pttl")), "qttl": round_2(rt_ele.get("qttl")), "freq": round_2(rt_ele.get("freq")), "costtl": round_2(rt_ele.get("costtl")), "lf": round_2(rt_ele.get("lf")), "sdu_i": rt_ele.get("sdu_i"), "sdu_u": rt_ele.get("sdu_u"), # 增加电压偏差,用于判断是否超过阈值标红 "ua_dev": round_4(rt_ele.get("ua_dev")), "ub_dev": round_4(rt_ele.get("ub_dev")), "uc_dev": round_4(rt_ele.get("uc_dev")), "uab_dev": round_4(rt_ele.get("uab_dev")), "ucb_dev": round_4(rt_ele.get("ucb_dev")), # 增加阈值 "u_threshold": U_THRESHOLD, "costtl_threshold": COSTTL_THRESHOLD, "lf_threshold": LF_THRESHOLD, } else: res_dic = { "name": m_name, "point_id": point_id, "ctnum": ctnum, "real_time": "", "ua": "", "ia": "", "ub": "", "ib": "", "uc": "", "ic": "", "pttl": "", "qttl": "", "freq": "", "costtl": "", "lf": "", "sdu_i": "", "sdu_u": "", "ua_dev": "", "ub_dev": "", "uc_dev": "", "uab_dev": "", "ucb_dev": "", "u_threshold": U_THRESHOLD, "costtl_threshold": COSTTL_THRESHOLD, "lf_threshold": LF_THRESHOLD, } ret_data[m_type].append(res_dic) return EclResp(inline=ret_data["inline"], transformer=ret_data["transformer"], feeder=ret_data["feeder"], power_dist=ret_data["power_dist"], device=ret_data["device"], ) async def qual_current_level_service(point_list): """电能质量-卡片信息-level""" # 1. 获取每个point_id的详细信息 d_point_info = await monitor_point_join_by_points(point_list) mtids = [p_info["mtid"] for p_info in d_point_info if p_info["mtid"]] d_rt_ele = await batch_load_rt_ele_with_hr(mtids) ret_data = { "inline": [], "transformer": [], "feeder": [], "power_dist": [], "device": [] } for info in d_point_info: m_name = info.get("name") m_type = POINT_LEVEL_MAP[info.get("m_type")] point_id = info.get("pid") mtid = info.get("mtid") ctnum = info.get("ctnum") if info.get("ctnum") == 2 else 3 rt_ele = d_rt_ele.get(mtid, None) if rt_ele: # 初始化返回dic rt_ele = get_sdu_i_and_u(rt_ele, ctnum) time_str = timestamp2dts(rt_ele["ts"], YMD_Hms) fdia = round_2(rt_ele.get("fdia")) fdib = round_2(rt_ele.get("fdib")) fdic = round_2(rt_ele.get("fdic")) thdia = round_4(rt_ele.get("thdia")) thdib = round_4(rt_ele.get("thdib")) thdic = round_4(rt_ele.get("thdic")) res_dic = { "name": m_name, "point_id": point_id, "ctnum": ctnum, "real_time": time_str, # 电流/电压谐波畸变率 "thdia": thdia, "thdib": thdib, "thdic": thdic, "thdua": round_4(rt_ele.get("thdua")), "thdub": round_4(rt_ele.get("thdub")), "thduc": round_4(rt_ele.get("thduc")), "thduab": round_4(rt_ele.get("thduab")), "thducb": round_4(rt_ele.get("thducb")), # 基波电流 "fdia": fdia, "fdib": fdib, "fdic": fdic, # 三相不平衡 "ubl": round_4(rt_ele.get("ubl")), "ibl": round_4(rt_ele.get("ibl")), # 电压偏差 "ua_dev": round_4(rt_ele.get("ua_dev")), "ub_dev": round_4(rt_ele.get("ub_dev")), "uc_dev": round_4(rt_ele.get("uc_dev")), "uab_dev": round_4(rt_ele.get("uab_dev")), "ucb_dev": round_4(rt_ele.get("ucb_dev")), # 电流总谐波有效值 = 基波电流 * 电流总谐波畸变率 "thdia_virtual": round_2(multiplication_two(fdia, thdia)), "thdib_virtual": round_2(multiplication_two(fdib, thdib)), "thdic_virtual": round_2(multiplication_two(fdic, thdic)), # 增加阈值 web "thdu_threshold": THDU_THRESHOLD, # 电压总谐波畸变 "bl_threshold": BL_THRESHOLD, # 三相不平衡 # 增加阈值 小程序 "thdi_threshold": THDI_THRESHOLD, # 电流总谐波畸变 "u_threshold": U_THRESHOLD, # 电压偏差 } else: res_dic = { "name": m_name, "point_id": point_id, "ctnum": ctnum, "real_time": "", # 电流/电压谐波畸变率 "thdia": "", "thdib": "", "thdic": "", "thdua": "", "thdub": "", "thduc": "", "thduab": "", "thducb": "", # 基波电流 "fdia": "", "fdib": "", "fdic": "", # 三相不平衡 "ubl": "", "ibl": "", # 电压偏差 "ua_dev": "", "ub_dev": "", "uc_dev": "", "uab_dev": "", "ucb_dev": "", # 电流总谐波有效值 = 基波电流 * 电流总谐波畸变率 "thdia_virtual": "", "thdib_virtual": "", "thdic_virtual": "", } ret_data[m_type].append(res_dic) return QclResp(inline=ret_data["inline"], transformer=ret_data["transformer"], feeder=ret_data["feeder"], power_dist=ret_data["power_dist"], device=ret_data["device"], ) async def elec_index_service(cid, point_id, start, end): ctnum = await load_point_ctnum(point_id) ctnum = ctnum if ctnum == 2 else 3 now = str(datetime.now()) if start[:10] == now[:10] and end[:10] == now[:10]: table_name = "point_15min_electric" redi_table_name = "location_15min_aiao" else: table_name = "point_1day_electric" redi_table_name = "location_1day_aiao" if ctnum == 2: common_items = ["lf_mean", "lf_min", "lf_max", "pttl_mean", "pttl_min", "pttl_max", "qttl_mean", "qttl_min", "qttl_max", "costtl_mean", "costtl_min", "costtl_max", "uab_mean", "uab_min", "uab_max", "ucb_mean", "ucb_min", "ucb_max", "ia_mean", "ia_min", "ia_max", "ic_mean", "ic_min", "ic_max", "freq_mean", "freq_min", "freq_max"] elec_qual_items = ["ubl_mean", "ubl_min", "ubl_max", "ibl_mean", "ibl_min", "ibl_max", "thduab_mean", "thduab_min", "thduab_max", "thducb_mean", "thducb_min", "thducb_max", "thdia_mean", "thdia_min", "thdia_max", "thdic_mean", "thdic_min", "thdic_max", "uab_dev_mean", "uab_dev_min", "uab_dev_max", "freq_dev_mean", "freq_dev_min", "freq_dev_max", ] else: common_items = ["lf_mean", "lf_min", "lf_max", "pttl_mean", "pttl_min", "pttl_max", "qttl_mean", "qttl_min", "qttl_max", "costtl_mean", "costtl_min", "costtl_max", "ua_mean", "ua_min", "ua_max", "ub_mean", "ub_min", "ub_max", "uc_mean", "uc_min", "uc_max", "ia_mean", "ia_min", "ia_max", "ib_mean", "ib_min", "ib_max", "ic_mean", "ic_min", "ic_max", "freq_mean", "freq_min", "freq_max"] elec_qual_items = ["ubl_mean", "ubl_min", "ubl_max", "ibl_mean", "ibl_min", "ibl_max", "thdua_mean", "thdua_min", "thdua_max", "thdub_mean", "thdub_min", "thdub_max", "thduc_mean", "thduc_min", "thduc_max", "thdia_mean", "thdia_min", "thdia_max", "thdib_mean", "thdib_min", "thdib_max", "thdic_mean", "thdic_min", "thdic_max", "ua_dev_mean", "ua_dev_min", "ua_dev_max", "freq_dev_mean", "freq_dev_min", "freq_dev_max"] datas = await get_electric_datas_dao(table_name, point_id, start, end) # if not datas: # return ElecIndexResponse( # ctnum=ctnum, common_indexes=[], # elec_qual_indexes=[] # ) df = pd.DataFrame(list(datas)) # 常规参数统计 common_indexes = [] _common_items = {i.rsplit("_", 1)[0] for i in common_items} for item in _common_items: if datas: max_item_name = f"{item}_max" max_value = df[max_item_name].max() if not pd.isna(max_value): max_datas = df.loc[df[max_item_name].idxmax()].to_dict() max_time = max_datas.get(f"{item}_max_time") max_time = '' if pd.isnull(max_time) else str(max_time) else: max_value, max_time = "", "" min_item_name = f"{item}_min" min_value = df[min_item_name].min() if not pd.isna(min_value): min_datas = df.loc[df[min_item_name].idxmin()].to_dict() min_time = min_datas.get(f"{item}_min_time") min_time = '' if pd.isnull(min_time) else str(min_time) else: min_value, min_time = "", "" mean_item_name = f"{item}_mean" avg_value = df[mean_item_name].mean() if not pd.isna(avg_value): avg_value = round(avg_value, 2) if avg_value else "" else: avg_value = "" elec_index = ElecIndex( stats_index=item, max=max_value, max_time=max_time or "", min=min_value, min_time=min_time or "", avg=avg_value, ) else: elec_index = ElecIndex( stats_index=item, max="", max_time="", min="", min_time="", avg="", ) common_indexes.append(elec_index) # 电能质量统计 elec_qual_indexes = [] _elec_qual_items = {i.rsplit("_", 1)[0] for i in elec_qual_items} for item in _elec_qual_items: if datas: max_item_name = f"{item}_max" max_value = df[max_item_name].max() if not pd.isna(max_value): max_datas = df.loc[df[max_item_name].idxmax()].to_dict() max_time = max_datas.get(f"{item}_max_time") max_time = '' if pd.isnull(max_time) else str(max_time) else: max_value, max_time = "", "" min_item_name = f"{item}_min" min_value = df[min_item_name].min() if not pd.isna(min_value): min_datas = df.loc[df[min_item_name].idxmin()].to_dict() min_time = min_datas.get(f"{item}_min_time") min_time = '' if pd.isnull(min_time) else str(min_time) else: min_value, min_time = "", "" mean_item_name = f"{item}_mean" avg_value = df[mean_item_name].mean() if not pd.isna(avg_value): avg_value = round(avg_value, 4) if avg_value else "" else: avg_value = "" elec_index = ElecIndex( stats_index=item, max=max_value, max_time=max_time, min=min_value, min_time=min_time, avg=avg_value, ) else: elec_index = ElecIndex( stats_index=item, max="", max_time="", min="", min_time="", avg="", ) elec_qual_indexes.append(elec_index) # 小程序需要这漏电流和温度 if cid: location_datas = await location_stats_statics(redi_table_name, cid, start, end) if location_datas: for key, value in location_datas.items(): if value["ad_type"] == "residual_current": name = "漏电流(mA)" else: name = f"{value.get('item')}温度(℃)" common_indexes.append( ElecIndex( stats_index=value["ad_type"], name=name, max=value.get("max_value") or '', max_time=value.get("max_value_time") or '', min=value.get("min_value") or '', min_time=value.get("min_value_time") or '', avg=value.get("mean_value") or '', ) ) return ElecIndexResponse( ctnum=ctnum, common_indexes=common_indexes, elec_qual_indexes=elec_qual_indexes ) async def elec_current_service(point_id): # 获取mtid p_info = await get_meter_by_point(point_id) if not p_info or not p_info["mtid"]: msg = f"没有监测点:{point_id} monitor信息,请联系运维人员!" raise BusinessException(message=msg) now_ts = pendulum.now(tz=CST).int_timestamp d_rt_ele, ts = None, now_ts try: mtid = p_info["mtid"] key = f"real_time:electric:{SETTING.mysql_db}:{mtid}" key_hr = f"real_time:electric_hr:{SETTING.mysql_db}:{mtid}" rt_ele, rt_ele_hr = await RedisUtils().mget([key, key_hr]) if rt_ele and rt_ele_hr: rt_ele, rt_ele_hr = json.loads(rt_ele), json.loads(rt_ele_hr) if now_ts - rt_ele["ts"] <= REAL_EXP_TIME: if now_ts - rt_ele_hr["ts"] <= REAL_HR_EXP_TIME: for k in rt_ele_hr.keys(): if k not in rt_ele.keys(): rt_ele[k] = rt_ele_hr[k] d_rt_ele, ts = rt_ele, rt_ele["ts"] elif rt_ele: rt_ele = json.loads(rt_ele) if now_ts - rt_ele["ts"] <= REAL_EXP_TIME: d_rt_ele, ts = rt_ele, rt_ele["ts"] elif rt_ele_hr: rt_ele_hr = json.loads(rt_ele_hr) if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME: d_rt_ele, ts = rt_ele_hr, rt_ele_hr["ts"] except Exception as e: log.error(f"parse real time electric error, pid:{point_id}") log.exception(e) time_str = timestamp2dts(ts, YMD_Hms) if d_rt_ele is None: return time_str, None # 识电U只有一项有数据,返回具体的项 ctnum = d_rt_ele.get("ctnum") or 3 return time_str, get_sdu_i_and_u(d_rt_ele, ctnum) async def batch_load_rt_ele(mtids): now_ts = pendulum.now(tz=CST).int_timestamp d_rt_ele = {mtid: {} for mtid in mtids} try: db = SETTING.mysql_db keys = [f"real_time:electric:{db}:{mtid}" for mtid in mtids] i_size = 500 lst_rt_ele = [] for i in range(math.ceil(len(mtids) / i_size)): pipe = await RedisUtils().client.pipeline() for key in keys[i_size * i:i_size * (i + 1)]: await pipe.get(key) lst_rt_ele += await pipe.execute() for rt_ele in lst_rt_ele: if rt_ele is None: continue rt_ele = json.loads(rt_ele) if now_ts - rt_ele["ts"] <= REAL_EXP_TIME: d_rt_ele[rt_ele["mtid"]] = rt_ele except Exception as e: log.error(f"batch load real time electric error, mtids:{mtids}") log.exception(e) return d_rt_ele async def batch_load_rt_ele_with_hr(mtids): now_ts = pendulum.now(tz=CST).int_timestamp d_rt_ele = {mtid: {} for mtid in mtids} try: db = SETTING.mysql_db keys = [f"real_time:electric:{db}:{mtid}" for mtid in mtids] key_hrs = [f"real_time:electric_hr:{db}:{mtid}" for mtid in mtids] i_size = 500 lst_rt_ele, lst_rt_ele_hr = [], [] for i in range(math.ceil(len(mtids) / i_size)): pipe = await RedisUtils().client.pipeline() for key in keys[i_size * i:i_size * (i + 1)]: await pipe.get(key) lst_rt_ele += await pipe.execute() pipe = await RedisUtils().client.pipeline() for key in key_hrs[i_size * i:i_size * (i + 1)]: await pipe.get(key) lst_rt_ele_hr += await pipe.execute() for i, mtid in enumerate(mtids): rt_ele, rt_ele_hr = lst_rt_ele[i], lst_rt_ele_hr[i] if rt_ele and rt_ele_hr: rt_ele, rt_ele_hr = json.loads(rt_ele), json.loads(rt_ele_hr) if rt_ele["mtid"] != mtid or rt_ele_hr["mtid"] != mtid: log.error(f"batch_load_rt_ele error, mtid:{mtid}") continue if now_ts - rt_ele["ts"] <= REAL_EXP_TIME: if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME: for k in rt_ele_hr.keys(): if k not in rt_ele.keys(): rt_ele[k] = rt_ele_hr[k] d_rt_ele[mtid] = rt_ele elif rt_ele: rt_ele = json.loads(rt_ele) if rt_ele["mtid"] != mtid: log.error(f"load_rt_ele error, mtid:{mtid}") continue if now_ts - rt_ele["ts"] <= REAL_EXP_TIME: d_rt_ele[mtid] = rt_ele elif rt_ele_hr: rt_ele_hr = json.loads(rt_ele_hr) if rt_ele_hr["mtid"] != mtid: log.error(f"load_rt_ele_hr error, mtid:{mtid}") continue if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME: d_rt_ele[mtid] = rt_ele_hr except Exception as e: log.error(f"batch load real time electric error, mtids:{mtids}") log.exception(e) return d_rt_ele def get_sdu_i_and_u(res, ctnum): """获取识电U的相序字段""" res["sdu_i"] = None res["sdu_u"] = None meter_sn = res.get("meter_sn", "").lower() if meter_sn == "a": res["sdu_i"] = "ia" res["sdu_u"] = "ua" if ctnum == 3 else "uab" if meter_sn == "b": res["sdu_i"] = "ib" if ctnum == 3: res["sdu_u"] = "ub" if meter_sn == "c": res["sdu_i"] = "ic" res["sdu_u"] = "uc" if ctnum == 3 else "ucb" return res