import json from pot_libs.utils.exc_util import BusinessException from unify_api import constants from unify_api.modules.alarm_manager.dao.list_static_dao import \ sdu_alarm_behavior_dao from unify_api.modules.common.procedures.points import \ get_meter_by_point from unify_api.modules.home_page.procedures.count_info_pds import \ electric_use_info_points_sdu from unify_api.modules.shidianu.components.algorithm_cps import WcResp, AbcResp from unify_api.modules.shidianu.dao.analysis_result_dao import \ query_sdu_power_wave, query_sdu_recog_record from unify_api.modules.shidianu.procedures.output_result import get_curve_p from unify_api.utils.time_format import last30_day_range, \ day_slots, get_start_end_by_tz_time_new async def wave_curve_srv(point_id, req_date, product): # 1,获取slots time_slot = day_slots() # 2. 获取sid meter_info = await get_meter_by_point(point_id) if not meter_info: raise BusinessException(message="没有该监测点的monitor信息,请联系运维人员!") mtid, meter_no = meter_info["mtid"], meter_info["meter_no"] start, end = get_start_end_by_tz_time_new(req_date) # 获取设备数据 p_list = await get_curve_p(mtid, meter_no, start, end, time_slot) # 3. 获取用电设备识别结果 device_data = await query_sdu_recog_record(point_id, start, end) electric_actions = {} if device_data: for i in device_data: recog_dt = str(i["recog_dt"]) recog_dt = recog_dt.split(" ")[1][:5] act_info = json.loads(i["act_info"]) # 拼凑返回格式 04:02 违规电器,疑似电动车电池 tmp_dic = {} (key, value), = act_info.items() type_str = constants.SDU_EVENT_TYPE_MAP.get(key) # 违规,大功率,正常电器, 只保留1个级别最高的展示 if recog_dt not in electric_actions: electric_actions[recog_dt] = [ {"type": type_str, "value": value, "type_str": key} ] else: # 如果是电动车电池, 优先级最高, 同一分钟第一次已经是电动车电池continue if electric_actions[recog_dt][0]["value"] == "电动车电池": continue # 如果本次正常电器, 不需要处理了 # 如果本次违规电器, 替换原来的 elif key == "illegal_ele_app": electric_actions[recog_dt] = [ {"type": type_str, "value": value, "type_str": key} ] # 如果本次是大功率电器, 且原本是正常电器, 则替换 elif key == "high_power_app" and electric_actions[recog_dt][0][ "type_str"] == "normal_app": electric_actions[recog_dt] = [ {"type": type_str, "value": value, "type_str": key} ] # electric_actions[recog_dt].append( # {"type": type_str, "value": value}) return WcResp( time_slot=time_slot, p_slot=p_list, electric_actions=electric_actions ) async def alarm_behavior_curve_service(point_id, req_date, product): # 1. 获取功率波动, 如果没有查询到功率波动,返回None wave_data = await query_sdu_power_wave(point_id, req_date + " 00:00:00") if wave_data: power_swing = json.loads(wave_data["power_dist"]) else: power_swing = None # 2. 安全评价 start, end = last30_day_range() alarm_res = await electric_use_info_points_sdu(start, end, [point_id]) safety_eval = {"first_alarm_cnt": alarm_res.first_alarm_cnt, "second_alarm_cnt": alarm_res.second_alarm_cnt, "third_alarm_cnt": alarm_res.third_alarm_cnt, "alarm_score": alarm_res.alarm_score, "electric_use_score": alarm_res.electric_use_score, } # 3. 行为统计 # behavior_res = await sdu_alarm_behavior_dao(start, end, [point_id]) behavior_res = await sdu_alarm_behavior_dao(start, end, [point_id]) behavior_illegal_app = [] if behavior_res: for i in behavior_res: tmp_dic = {"name": i["appliance"], "value": i["doc_count"]} behavior_illegal_app.append(tmp_dic) behavior_illegal_app = sorted(behavior_illegal_app, key=lambda x: x["value"], reverse=True) return AbcResp(power_swing=power_swing, safety_eval=safety_eval, behavior_illegal_app=behavior_illegal_app[:5])