import time from unify_api.modules.electric.dao.electric_dao import \ get_elec_mtid_sid_by_cid from unify_api.utils.common_utils import round_2 from pot_libs.logger import log from unify_api.modules.home_page.procedures.dev_grade import get_dev_grade from unify_api.utils import time_format from pot_libs.utils.exc_util import ParamException, BusinessException from unify_api.modules.home_page.components.health_index import \ HealthCtlRateRes from unify_api.modules.zhiwei_u.dao.warning_operations_dao import \ select_point_dao from unify_api.modules.electric.service.electric_service import ( batch_load_rt_ele_with_hr ) async def health_ctl_rate_srv(cid): if cid <= 0: log.error("param error") raise ParamException(message="参数错误, cid参数必须是一个正整数!") if not await select_point_dao(cid): log.error("cid:%s no point da;ta" % cid) raise BusinessException(message="工厂没有任何监测点!") stats = {"lf": 0, "costtl": 0, "freq_dev": 0, "thdu": 0, "v_dev": 0, "ubl": 0} lst_mtid_sid = await get_elec_mtid_sid_by_cid(cid) mtids = [mtid_sid["mtid"] for mtid_sid in lst_mtid_sid] d_rt_ele = await batch_load_rt_ele_with_hr(mtids) for mtid, rt_ele in d_rt_ele.items(): ctnum = rt_ele.get("ctnum") # 电压偏差 v_dev = rt_ele.get("ua_dev") if ctnum == 3 else rt_ele.get("uab_dev") grade = get_dev_grade(dev_type="v", cur=v_dev) if grade and grade >= 60: stats["v_dev"] += 1 # 频率偏差 grade = get_dev_grade(dev_type="freq", cur=rt_ele.get("freq_dev")) if grade and grade >= 60: stats["freq_dev"] += 1 # 三相电压不平衡度 grade = get_dev_grade(dev_type="ubl", cur=rt_ele.get("ubl")) if grade and grade >= 60: stats["ubl"] += 1 # 功率因数 grade = get_dev_grade(dev_type="costtl", cur=rt_ele.get("costtl")) if grade and grade >= 60: stats["costtl"] += 1 # (电压)谐波畸变率 thdu = rt_ele.get("thdua") if ctnum == 3 else rt_ele.get("thduab") grade = get_dev_grade(dev_type="thdu", cur=thdu) if grade and grade >= 60: stats["thdu"] += 1 # 负载率 lf = rt_ele.get("lf") if lf is None: stats["lf"] += 1 else: grade = get_dev_grade(dev_type="lf", cur=lf) if grade and grade >= 60: stats["lf"] += 1 time_str = time_format.get_datetime_str(int(time.time())) total = len(d_rt_ele) if total == 0: return HealthCtlRateRes(real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, ubl=1) return HealthCtlRateRes( real_time=time_str, lf=round_2(stats["lf"] / total), costtl=round_2(stats["costtl"] / total), thdu=round_2(stats["thdu"] / total), v_dev=round_2(stats["v_dev"] / total), freq_dev=round_2(stats["freq_dev"] / total), ubl=round_2(stats["ubl"] / total), )