import json import time import re import pendulum from unify_api.modules.electric.dao.electric_dao import \ get_elec_mtid_sid_by_cid from unify_api.modules.common.dao.common_dao import monitor_by_cid from unify_api.utils.common_utils import round_2 from pot_libs.logger import log from pot_libs.settings import SETTING from pot_libs.aredis_util.aredis_utils import RedisUtils from unify_api.modules.electric.procedures.electric_util import get_wiring_type from unify_api.modules.home_page.procedures import point_inlines from unify_api.modules.home_page.procedures.dev_grade import get_dev_grade from unify_api.utils import time_format from unify_api.modules.electric.views.electric import ( METERDATA_CURRENT_KEY, METERDATA_CURRENT_HR_KEY, ) from unify_api.constants import VOLTAGE_STANDARD, REAL_EXP_TIME, \ FREQ_STANDARD, Product from pot_libs.utils.exc_util import ParamException, DBException, \ BusinessException from unify_api.modules.home_page.components.health_index import \ HealthCtlRateRes from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\ select_point_dao from unify_api.modules.common.service.td_engine_service import \ get_td_engine_data from unify_api.utils.taos_new import parse_td_columns, td3_tbl_compate, \ get_td_table_name async def health_ctl_rate_service(cid): if cid <= 0: log.error("param error") raise ParamException(message="参数错误, cid参数必须是一个正整数!") # 获取工厂所有监测点数据 point_infos = await point_inlines.get_all_points(cid) if not point_infos: log.error("cid:%s no point data" % cid) raise BusinessException(message="工厂没有任何监测点!") point_ids = point_infos.keys() stats = {"lf": 0, "costtl": 0, "freq_dev": 0, "thdu": 0, "v_dev": 0, "ubl": 0} now_ts = int(time.time()) real_tt = now_ts valid_mids = [] total = 0 for point_id in point_ids: ctnum, mid = await get_wiring_type(point_id) if not mid: # 换表: point_id 有效, mid(拆掉)为None的情况 log.warn(f"cid={ctnum} point_id={point_id} 已经换表了") continue valid_mids.append(mid) try: res = None result = await RedisUtils().hget(METERDATA_CURRENT_KEY, mid) result2 = await RedisUtils().hget(METERDATA_CURRENT_HR_KEY, mid) if result and result2: res = json.loads(result) res2 = json.loads(result2) real1_tt = res.get("timestamp") real_tt = res2.get("timestamp") if ( real1_tt and real_tt and now_ts - real1_tt <= REAL_EXP_TIME and now_ts - real_tt <= REAL_EXP_TIME ): for k in res2.keys(): if not k in res.keys(): res[k] = res2[k] else: log.info( "1 realtime_power_qual of mid %s is expired." % mid) continue elif result2: res = json.loads(result2) real_tt = res.get("timestamp") if real_tt and now_ts - real_tt <= REAL_EXP_TIME: res = res else: log.info( "2 realtime_power_qual of mid %s is expired." % mid) continue elif result: res = json.loads(result) real_tt = res.get("timestamp") if real_tt and now_ts - real_tt <= REAL_EXP_TIME: res = res else: log.info( "3 realtime_power_qual of mid %s is expired." % mid) continue else: log.error("realtime_power_quality not exist") except Exception as e: log.exception(e) raise DBException total += 1 if res: if ctnum != res.get("ctnum"): log.info(f"res = {res}") log.error( f"health_ctl_rate mysql ctnum={ctnum} payload ctnum={res.get('ctnum')} point_id={point_id} ctnum wrong!" ) # 如果发现配置错误的点,以报文的配置为准 ctnum = res.get("ctnum") # 电压偏差 v_dev = res.get("ua_dev") if ctnum == 3 else res.get("uab_dev") grade = get_dev_grade(dev_type="v", cur=v_dev) if grade and grade >= 60: stats["v_dev"] += 1 # 频率偏差 freq_dev = res.get("freq_dev") grade = get_dev_grade(dev_type="freq", cur=freq_dev) if grade and grade >= 60: stats["freq_dev"] += 1 # 三相电压不平衡度 ubl = res.get("ubl") grade = get_dev_grade(dev_type="ubl", cur=ubl) if grade and grade >= 60: stats["ubl"] += 1 # 功率因数 costtl = res.get("costtl") grade = get_dev_grade(dev_type="costtl", cur=costtl) if grade and grade >= 60: stats["costtl"] += 1 # (电压)谐波畸变率 thdu = res.get("thdua") if ctnum == 3 else res.get("thduab") grade = get_dev_grade(dev_type="thdu", cur=thdu) if grade and grade >= 60: stats["thdu"] += 1 # 负载率 lf = res.get("lf") if lf is None: stats["lf"] += 1 else: grade = get_dev_grade(dev_type="lf", cur=lf) if grade and grade >= 60: stats["lf"] += 1 else: # 没有数据指标可以知道这个是否合格的时候 stats["v_dev"] += 1 stats["freq_dev"] += 1 stats["ubl"] += 1 stats["costtl"] += 1 stats["thdu"] += 1 stats["lf"] += 1 total = 1.0 * total time_str = time_format.get_datetime_str(real_tt) if not valid_mids or total == 0: log.warn(f"cid={cid} 无任何有效mid") return HealthCtlRateRes( real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, ubl=1, ) return HealthCtlRateRes( real_time=time_str, lf=stats["lf"] / total, costtl=stats["costtl"] / total, thdu=stats["thdu"] / total, v_dev=stats["v_dev"] / total, freq_dev=stats["freq_dev"] / total, ubl=stats["ubl"] / total, ) async def health_ctl_rate_service_new15(cid): if cid <= 0: log.error("param error") raise ParamException(message="参数错误, cid参数必须是一个正整数!") point_infos = await select_point_dao(cid) if not point_infos: log.error("cid:%s no point da;ta" % cid) raise BusinessException(message="工厂没有任何监测点!") stats = {"lf": 0, "costtl": 0, "freq_dev": 0, "thdu": 0, "v_dev": 0, "ubl": 0} now_ts = int(time.time()) real_tt = now_ts total = 0 datas = await get_elec_mtid_sid_by_cid(cid) td_mt_tables = tuple( (get_td_table_name("electric", data["mtid"]) for data in datas if data["mtid"])) td_mt_tables = td3_tbl_compate(td_mt_tables) sql = f"select last_row(*) from electric_stb " \ f"where TBNAME IN {td_mt_tables} group by tbname" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai" is_succ, results = await get_td_engine_data(url, sql) time_str = time_format.get_datetime_str(real_tt) if not is_succ: log.warn(f"cid={cid} 无任何有效mid") return HealthCtlRateRes( real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, ubl=1, ) if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿 td_s_tables = tuple( (f"s{data['sid'].lower()}_e" for data in datas if data["sid"])) td_s_tables = td3_tbl_compate(td_s_tables) sql = f"select last_row(*) from electric_stb " \ f"where TBNAME IN {td_s_tables} group by tbname" is_succ, results = await get_td_engine_data(url, sql) if not is_succ: log.warn(f"cid={cid} 无任何有效mid") return HealthCtlRateRes( real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, ubl=1, ) head = parse_td_columns(results) datas = [] for res in results["data"]: datas.append(dict(zip(head, res))) for data in datas: real_tt = pendulum.parse(data["ts"]).int_timestamp if now_ts - real_tt > REAL_EXP_TIME: continue total += 1 ctnum = data.get("ctnum") # 电压偏差 v_dev = data.get("ua_dev") if ctnum == 3 else data.get("uab_dev") grade = get_dev_grade(dev_type="v", cur=v_dev) if grade and grade >= 60: stats["v_dev"] += 1 # 频率偏差 freq_dev = data.get("freq_dev") grade = get_dev_grade(dev_type="freq", cur=freq_dev) if grade and grade >= 60: stats["freq_dev"] += 1 # 三相电压不平衡度 ubl = data.get("ubl") grade = get_dev_grade(dev_type="ubl", cur=ubl) if grade and grade >= 60: stats["ubl"] += 1 # 功率因数 costtl = data.get("costtl") grade = get_dev_grade(dev_type="costtl", cur=costtl) if grade and grade >= 60: stats["costtl"] += 1 # (电压)谐波畸变率 thdu = data.get("thdua") if ctnum == 3 else data.get("thduab") grade = get_dev_grade(dev_type="thdu", cur=thdu) if grade and grade >= 60: stats["thdu"] += 1 # 负载率 lf = data.get("lf") if lf is None: stats["lf"] += 1 else: grade = get_dev_grade(dev_type="lf", cur=lf) if grade and grade >= 60: stats["lf"] += 1 if total == 0: return HealthCtlRateRes( real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, ubl=1, ) return HealthCtlRateRes( real_time=time_str, lf=round_2(stats["lf"] / total), costtl=round_2(stats["costtl"] / total), thdu=round_2(stats["thdu"] / total), v_dev=round_2(stats["v_dev"] / total), freq_dev=round_2(stats["freq_dev"] / total), ubl=round_2(stats["ubl"] / total), )