from pot_libs.utils.exc_util import ParamException from unify_api.constants import Importance from unify_api.modules.alarm_manager.components.alarm_static_cps import \ SduAlarmResp, RiskCount, ContentName, SassResp, AppResp, SebResp, SiarResp, \ ZsResp, TimeCount, ZasResp from unify_api.modules.alarm_manager.dao.list_static_dao import \ zdu_alarm_aggs_date_impotent, sdu_alarm_type_dao, \ sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \ sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \ sdu_alarm_content_info, risk_distribution, \ alarm_content_time_distribution_pds from unify_api.modules.common.dao.common_dao import points_by_cid, \ monitor_point_join, points_monitor_by_cid from unify_api.modules.common.procedures.common_cps import \ alarm_time_distribution from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.home_page.components.security_info_cps import \ SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp from unify_api.modules.home_page.procedures.security_info_pds import \ alarm_count_info from unify_api.utils.common_utils import round_1, division_two from unify_api.modules.alarm_manager.dao.list_alarm_dao import \ zdu_alarm_sort_dao, zdu_summary_dao async def sdu_alarm_statistics_service(cids, start, end, product): """目前用于 识电u->报警统计""" # 获取point信息 points = await points_monitor_by_cid(cids) if not points: raise ParamException(message=f"{cids}没有points") point_id_list = [i["pid"] for i in points] # 1.调用函数获取报警统计信息 alarm_info_map = await sdu_alarm_content_info(cids, start, end, points) ele_overload, illegal_ele_app, power_quality, illegal_app_dic, \ total_alarm_cnt, alarm_points_cnt = ( alarm_info_map["ele_overload"], alarm_info_map["illegal_ele_app"], alarm_info_map["power_quality"], alarm_info_map["illegal_app_dic"], alarm_info_map["total_alarm_cnt"], alarm_info_map["alarm_points_cnt"], ) # point_dic排序 illegal_app_list = [] for key, val in illegal_app_dic.items(): illegal_app_list.append({"name": key, "value": val}) illegal_app_list.sort(key=lambda x: x["value"], reverse=True) # 报警类型总数,用于求比例 cn = ContentName(ele_overload=sum(ele_overload["value"]), illegal_ele_app=sum(illegal_ele_app["value"]), power_quality=sum(power_quality["value"]), ) # 2.计算风险分布, 与安电u不同, 安电u发生过I级则为风险用户 # 其中当前时段发生过I或II级报警为风险用户,其余为安全用户 security_user, risk_user = await risk_distribution(start, end, point_id_list, is_new=True) rc = RiskCount(security_user=security_user, risk_user=risk_user) if alarm_points_cnt == 0: aver_alarm = 0 else: aver_alarm = round(total_alarm_cnt / alarm_points_cnt, 1) return SduAlarmResp( total_alarm_cnt=total_alarm_cnt, alarm_points_cnt=alarm_points_cnt, aver_alarm=aver_alarm, ele_overload=ele_overload, illegal_ele_app=illegal_ele_app, power_quality=power_quality, illegal_app_list=illegal_app_list, content_distribution=cn, risk_distribution=rc ) async def sdu_alarm_statistics_sort_service(cid, start, end, page_size, page_num, sort): # 获取point信息 points = await points_by_cid([cid]) if not points: raise ParamException(message=f"{cid}没有points") # point_id_list = [i["pid"] for i in points] points_map = {i["pid"]: i["name"] for i in points} buckets = await sdu_alarm_statistics_dao(cid, start, end) if not buckets: return SassResp(alarm_ranking_total=0, alarm_ranking=[]) points_dic = {} for bucket in buckets: point_t = bucket["pid"] if point_t: point_name = points_map[point_t] if point_t not in points_dic.keys(): points_dic[point_name] = bucket["doc_count"] else: points_dic[point_name] += bucket["doc_count"] # point报警统计 points_alarm_list = [] for key, val in points_dic.items(): points_alarm_list.append({"name": key, "value": val}) reverse = True if sort == "desc" else False points_alarm_list.sort(key=lambda x: x["value"], reverse=reverse) points_alarm_list_size = points_alarm_list[ (page_num - 1) * page_size: page_num * page_size] return SassResp( alarm_ranking_total=len(points_alarm_list), alarm_ranking=points_alarm_list_size ) async def sdu_app_statistics_sort_service(cid, start, end): """报警统计-报警记录-排名-识电u""" # 1.调用函数获取报警统计信息 buckets = await sdu_alarm_statistics_dao(cid, start, end) if not buckets: return AppResp(ele_app_ranking=[]) # 违规电器统计 illegal_app_dic = {} for bucket in buckets: # 1.1 电器识别 application = bucket.get("appliance") if application: if application not in illegal_app_dic.keys(): illegal_app_dic[application] = bucket["doc_count"] else: illegal_app_dic[application] += bucket["doc_count"] # 排序 ele_app_list = [] for key, val in illegal_app_dic.items(): # 目前版本只展示违规 ele_app_list.append({"name": key, "value": val, "type": "违规"}) ele_app_list.sort(key=lambda x: x["value"], reverse=True) return AppResp( ele_app_ranking=ele_app_list, ) async def sdu_electric_behave_service(cid, start, end, storeys, product): """近30天用电行为""" # 1.根据storeys获取points信息 point_list = await points_by_storeys(cid, storeys) # 获取point_id列表 points = [i.get("point_id") for i in point_list] # 2. es查询违规/大功率/正常次数 es_type_res = await sdu_alarm_type_dao(start, end, points) es_type_res = {i["pid"]: i for i in es_type_res if es_type_res} # 2.2 es查询报警等级, 计算报警分, 需要限制报警类型为sdu新版 es_imp_res = await sdu_alarm_importance_dao(start, end, points, is_sdu=True) es_imp_res = {i["pid"]: i for i in es_imp_res if es_imp_res} # 3. 构造返回 return_data = {} for info in point_list: storey_name = info.get("storey_name") storey_id = info.get("storey_id") point_id = info.get("point_id") room_name = info.get("room_name") # 没有报警数据的初始值 illegal_ele_app = 0 high_power_app = 0 normal_app = 0 # 3.1计算类型报警次数 type_res = es_type_res.get(point_id, {}) if type_res: # 3.1违规次数 if type_res.get("event_type") == "illegal_ele_app": illegal_ele_app = type_res.get("doc_count") # 3.2大功率次数 if type_res.get("event_type") == "high_power_app": high_power_app = type_res.get("doc_count") # 3.1正常次数 if type_res.get("event_type") == "normal_app": normal_app = type_res.get("doc_count") # 3.2计算报警分 first_alarm_cnt = 0 second_alarm_cnt = 0 third_alarm_cnt = 0 imp_res = es_imp_res.get(point_id, {}) if imp_res: if imp_res["importance"] == Importance.First.value: first_alarm_cnt += imp_res["doc_count"] elif imp_res["importance"] == Importance.Second.value: second_alarm_cnt += imp_res["doc_count"] elif imp_res["importance"] == Importance.Third.value: third_alarm_cnt += imp_res["doc_count"] alarm_score = first_alarm_cnt * 2 + second_alarm_cnt * 1 + \ third_alarm_cnt * 0.5 if alarm_score >= 15: alarm_score = 15 # 初始化返回dic res_dic = { "room_name": room_name, "storey_id": storey_id, "point_id": point_id, "illegal_ele_app": illegal_ele_app, # 违规 "high_power_app": high_power_app, # 大功率 "normal_app": normal_app, # 正常 "alarm_score": alarm_score # 报警分 } # 3.3 组装返回格式为dic if storey_name in return_data: return_data[storey_name].append(res_dic) else: return_data[storey_name] = [res_dic] # 转换成list格式, 可以按照storey_name排序 if return_data: # 房间排序, 并返回数据转化为list return_list = [{"name": key, "storey_id": value[0]["storey_id"], "room_data": sorted(value, key=lambda i: i["room_name"])} for key, value in return_data.items()] # 楼层排序 return_list = sorted(return_list, key=lambda x: x["storey_id"]) else: return_list = [] return SebResp(return_data=return_list) async def sdu_index_alarm_rank(cid, start, end, product): points = await points_by_cid([cid]) if not points: raise ParamException(message=f"{cid}没有points") point_list = [i["pid"] for i in points] points_map = {i["pid"]: i["name"] for i in points} # 1. 违规电器排名 behavior_res = await sdu_alarm_behavior_dao(start, end, point_list) behavior_illegal_app = [] if behavior_res: for i in behavior_res: tmp_dic = {"name": i["appliance"], "value": i["doc_count"]} behavior_illegal_app.append(tmp_dic) behavior_illegal_app = sorted(behavior_illegal_app, key=lambda x: x["value"], reverse=True) # 2. 报警排名, 违规行为 es_type_res = await sdu_alarm_limit_type_dao(start, end, point_list) alarm_ranking = [] illegal_behavior = [] mid_goods = {} for rs in es_type_res: event_type = rs.get("event_type") point_name = points_map.get(rs.get("pid")) im1, im2, im3 = 0, 0, 0 im3 = rs["doc_count"] if event_type == "power_quality_low" else im3 im2 = rs["doc_count"] if event_type == "ele_overload" else im2 if event_type == "illegal_ele_app": im1 = rs["doc_count"] illegal_dic = {"name": point_name, "value": rs["doc_count"]} illegal_behavior.append(illegal_dic) if rs["pid"] not in mid_goods: mid_goods[rs["pid"]] = {"im1": im1, "im2": im2, "im3": im3} else: mid_goods[rs["pid"]] = { "im1": im1 + mid_goods[rs["pid"]]["im1"], "im2": im2 + mid_goods[rs["pid"]]["im2"], "im3": im3 + mid_goods[rs["pid"]]["im3"] } for k, v in mid_goods.items(): point_name = points_map.get(k) im1 = v.get("im1") or 0 im2 = v.get("im2") or 0 im3 = v.get("im3") or 0 alarm_dic = { "name": point_name, "value": im1 + im2 + im3, "im1": im1, "im2": im2, "im3": im3 } alarm_ranking.append(alarm_dic) # 3. 排序 if len(alarm_ranking) > 1: alarm_ranking = sorted(alarm_ranking, key=lambda x: x["value"], reverse=True) if len(illegal_behavior) > 1: illegal_behavior = sorted(illegal_behavior, key=lambda x: x["value"], reverse=True) return SiarResp( illegal_app=behavior_illegal_app[:5], illegal_behavior=illegal_behavior[:5], alarm_ranking=alarm_ranking[:5] ) async def zdu_level_distribution_service(cid, start, end, product): """报警统计-报警等级-智电u""" alarm_info_map = await alarm_count_info([cid], start, end, "month") first_alarm, second_alarm, third_alarm = ( alarm_info_map["first_alarm"], alarm_info_map["second_alarm"], alarm_info_map["third_alarm"], ) return SecurityCountResp( first_alarm=first_alarm, second_alarm=second_alarm, third_alarm=third_alarm, level_detail=LevelCount( first_alarm_cnt=sum(first_alarm["value"]), second_alarm_cnt=sum(second_alarm["value"]), third_alarm_cnt=sum(third_alarm["value"]), ), ) async def zdu_content_distribution_service(cid, start, end, product): """报警统计-报警内容-智电u""" alarm_info_map = await alarm_content_time_distribution_pds(cid, start, end) temperature, residual_current, electric_param, electric_param_detail = ( alarm_info_map["temperature"], alarm_info_map["residual_current"], alarm_info_map["electric_param"], alarm_info_map["electric_param_detail"], ) return AlarmContentDistributionResp( temperature=temperature, residual_current=residual_current, electric_param=electric_param, content_detail=ContentCount( temperature_cnt=sum(temperature["value"]), residual_current_cnt=sum(residual_current["value"]), electric_param_cnt=sum(electric_param["value"]), ), ) async def zdu_summary_service(cid, start, end, product): """报警统计-统计概况信息-智电u""" # 1. 报警总数, 报警监测点数, 平均报警 = 报警总数除以报警监测点 total_alarm_cnt, alarm_points = await zdu_summary_dao(cid, start, end) # 2. 安全运行, 本月累计安全运行天数,从月初算起, 未出现一级报警则加一天 safe_run = await zdu_alarm_aggs_date_impotent(cid, start, end) # 3. 时段分布, 白天/黑夜/凌晨 time_distribution_map = await alarm_time_distribution([cid], start, end) return ZsResp( total_alarm_cnt=total_alarm_cnt, alarm_points_cnt=alarm_points, aver_alarm=round_1(division_two(total_alarm_cnt, alarm_points)), safe_run=safe_run, time_interval_distribution=TimeCount( daytime_cnt=time_distribution_map["day_alarm_cnt"], night_cnt=time_distribution_map["night_alarm_cnt"], morning_cnt=time_distribution_map["morning_alarm_cnt"], ) ) async def zdu_alarm_sort_service_2(cid, start, end, page_size, page_num): """报警统计-报警排名-智电u""" points_alarm_list = await zdu_alarm_sort_dao(cid, start, end, page_size, page_num) return ZasResp( total=len(points_alarm_list) if points_alarm_list else 0, alarm_ranking=points_alarm_list )