from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import SDU_ALARM_LIST from unify_api.modules.alarm_manager.dao.list_alarm_dao import \ alarm_content_time_distribution_dao from unify_api.utils.time_format import proxy_power_slots, time_pick_transf_new from unify_api.constants import EVENT_TYPE_MAP, TEMPERATURE_MAP, \ RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP async def sdu_alarm_content_info(cids, start, end, points): # 总报警数 total_alarm_cnt = 0 # 报警户数, 存在报警的point alarm_points = [] # 违规电器统计 illegal_app_dic = {} if len(cids) == 1: mid_sql = f" and cid = {cids[0]}" else: mid_sql = f" and cid in {tuple(cids)}" sql = f"SELECT DATE_FORMAT(event_datetime,'%m-%d') dt,event_type," \ f"count(1) doc_count FROM `point_1min_event` where event_type in " \ f"{tuple(SDU_ALARM_LIST)} and event_datetime BETWEEN '{start}' " \ f"and '{end}' {mid_sql} GROUP BY dt,event_type ORDER BY dt" point_sql = f"SELECT DISTINCT pid FROM `point_1min_event` " \ f"where event_type in {tuple(SDU_ALARM_LIST)} and " \ f"event_datetime BETWEEN '{start}' and '{end}' {mid_sql} " appliance_sql = f"SELECT appliance,count(1) doc_count FROM " \ f"`point_1min_event` where event_type in " \ f"{tuple(SDU_ALARM_LIST)} and event_datetime " \ f"BETWEEN '{start}' and '{end}' {mid_sql} " \ f"GROUP BY appliance" log.info(f"sql:{sql}") log.info(f"point_sql:{point_sql}") log.info(f"appliance_sql:{appliance_sql}") async with MysqlUtil() as conn: datas = await conn.fetchall(sql) point_datas = await conn.fetchall(point_sql) appliance_datas = await conn.fetchall(appliance_sql) if not datas: return { "ele_overload": {"slots": [], "value": []}, "illegal_ele_app": {"slots": [], "value": []}, "power_quality": {"slots": [], "value": []}, "illegal_app_dic": illegal_app_dic, "total_alarm_cnt": total_alarm_cnt, "alarm_points_cnt": len(alarm_points) } # time_slots = list(set(str(data["dt"]) for data in datas)) time_slots = proxy_power_slots(start, end, "MM-DD", True) log.info(f"time_slots:{time_slots}") # 线路过载 ele_overload = {"slots": time_slots, "value": [0] * len(time_slots)} # 违规电器接入 illegal_ele_app = {"slots": time_slots, "value": [0] * len(time_slots)} # 电能品质 power_quality = {"slots": time_slots, "value": [0] * len(time_slots)} # 初始化,存储监测点,报警数量统计 point_dic = {} for i in points: point_dic[i["name"]] = 0 for data in datas: total_alarm_cnt += data["doc_count"] try: index = time_slots.index(str(data["dt"])) except Exception as e: log.error(f"sdu_alarm_content_info data {str(e)}") continue if data["event_type"] == "illegal_ele_app": illegal_ele_app["value"][index] += data["doc_count"] if data["event_type"] == "ele_overload": ele_overload["value"][index] += data["doc_count"] if data["event_type"] == "power_quality_low": power_quality["value"][index] += data["doc_count"] # 3. 计算报警户数point alarm_points = [point["pid"] for point in point_datas] # 4.小程序,违规电器统计 for item in appliance_datas: illegal = item["appliance"] if illegal: if illegal not in illegal_app_dic: illegal_app_dic[illegal] = item["doc_count"] else: illegal_app_dic[illegal] += item["doc_count"] return { "ele_overload": ele_overload, "illegal_ele_app": illegal_ele_app, "power_quality": power_quality, "illegal_app_dic": illegal_app_dic, "total_alarm_cnt": total_alarm_cnt, "alarm_points_cnt": len(alarm_points) } async def risk_distribution(start, end, point_id_list, is_new=False): li = [f"event_datetime BETWEEN '{start}' and '{end}'", f"event_type in {tuple(SDU_ALARM_LIST)}"] if is_new: li.append("importance in (1, 2)") else: li.append("importance=1") if len(point_id_list) == 1: li.append(f"pid={point_id_list[0]}") else: li.append(f"pid in {tuple(point_id_list)}") mid_sql = " and ".join(li) sql = f"SELECT DISTINCT pid FROM `point_1min_event` where {mid_sql}" log.info(f"risk_distribution sql:{sql}") async with MysqlUtil() as conn: datas = await conn.fetchall(sql) total_user = len(point_id_list) risk_user = len(datas) security_user = total_user - risk_user return security_user, risk_user async def alarm_content_time_distribution_pds(cid, start, end, ): electric_param_detail = { "harmonic": 0, "voltage": 0, "current": 0, "power_factor": 0, "threephase_imbalance": 0, "load_rate": 0, } intervel, slots = time_pick_transf_new(start, end) slots = [slot[5::] for slot in slots] temperature, residual_current, electric_param = \ {"slots": slots, "value": [0] * len(slots)}, \ {"slots": slots, "value": [0] * len(slots)}, \ {"slots": slots, "value": [0] * len(slots)} elec_data = await alarm_content_time_distribution_dao(cid, start, end, ) if elec_data: for index, slot in enumerate(slots): for elec_info in elec_data: if slot == elec_info.get("dat"): if elec_info.get("event_type") in TEMPERATURE_MAP: temperature['value'][index] += elec_info.get("coun", 0) elif elec_info.get("event_type") in RESIDUAL_CURRENT_MAP: residual_current["value"][index] += elec_info.get( "coun", 0) else: electric_param["value"][index] += elec_info.get("coun", 0) if elec_info.get("event_type") in [ "overTHDI", # 电流总谐波有效值越限 "overTHDU", # 电压总谐波畸变率越限 ]: electric_param_detail["harmonic"] += elec_info.get( "coun", 0) elif elec_info.get("event_type") in [ "overU", # 过压 "underU", # 欠压 ]: electric_param_detail["voltage"] += elec_info.get( "coun", 0) elif elec_info.get("event_type") in [ "overI", ]: electric_param_detail["current"] += elec_info.get( "coun", 0) elif elec_info.get("event_type") in [ "underPhasePF", # 单相功率因数越下限 "underTotalPF", # 总功率因数越下限 ]: electric_param_detail[ "power_factor"] += elec_info.get( "coun", 0) elif elec_info.get("event_type") in [ "unbalanceI", # 三相电流不平衡度 "unbalanceU", # 三相电压不平衡度 ]: electric_param_detail["threephase_imbalance"] += \ elec_info.get("coun", 0) elif elec_info.get("event_type") in ["overPR"]: electric_param_detail[ "load_rate"] += elec_info.get( "coun", 0) else: temperature['value'][index] += 0 residual_current['value'][index] += 0 electric_param['value'][index] += 0 return { "temperature": temperature, "residual_current": residual_current, "electric_param": electric_param, "electric_param_detail": electric_param_detail, }