import json import logging import math from pot_libs.qingstor_util.qs_client import QsClient from unify_api.utils import time_format from unify_api.modules.zhiwei_u.dao.warning_operations_dao import \ select_point_dao from unify_api.modules.scope_analyse.dao.scope_record_mysql_dao import \ get_mtid_dao, get_location_id_dao, get_threhold_dao, get_point_name_dao, \ get_trigger_params_dao from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFile from unify_api.modules.scope_analyse.dao.scope_record_dao import \ detail_data_by_es, event_data_by_es, scope_by_sql from unify_api.modules.scope_analyse.components.scope_analyse_cps import \ ScopeRecordResp, ScopeRecord, ScopeAnalyseResp, LeakageCurrentResp from pot_libs.logger import log from pot_libs.common.components.responses import success_res from unify_api.utils.response_code import RET async def scope_record_service(cid, point_id, offset, limit, start, end): li = [f"cid={cid}"] if point_id: if len(point_id) == 1: li.append(f"pid={point_id[0]}") else: li.append(f"pid in {tuple(point_id)}") if start and end: li.append(f"create_time BETWEEN '{start}' and '{end}'") mid_sql = " and ".join(li) datas, total = await scope_by_sql(mid_sql, offset, limit) # 获取监测点名称 point_dict = await get_point_dict(cid) # 动态漏电流阈值 rows = [] for data in datas: # 漏电流 if data["fault_type"] in ("over_res_cur", "overResidualCurrent"): probability = 1 record_type_name = "漏电流" reason = "漏电流越限" threhold = await get_threhold(data["cid"], data["sid"]) log.info(f"scope_record_service threhold:{threhold}") else: try: # 获取url地址数据,后续再接上 context = json.load(data.get("url")) ctnum = 2 if "uab" in context else 3 result = actionFile(context, ctnum) except: result = None log.info(f"actionFile:{result}") if isinstance(result, list): # record_type_name, probability, reason = result[0] record_type_name, probability, _ = result[0] probability = round(float(probability), 4) else: # record_type_name, probability, reason = "不存在故障", "", # result record_type_name, probability = "不存在故障", "" fault_type = await get_trigger_params_dao(data["fault_type"]) reason = fault_type.get("name") dt = data["create_time"] check_dt = time_format.convert_dt_to_timestr(dt) check_timestamp = int(time_format.convert_dt_to_timestamp(dt)) sr = ScopeRecord(trigger_time=check_dt, point_id=data["pid"], point_name=point_dict.get(data["pid"]), record_type=data["fault_type"], record_type_name=record_type_name, probability=probability, reason=reason, scope_id="{}_{}".format(data["pid"], check_timestamp)) rows.append(sr) return ScopeRecordResp(rows=rows, total=total) # 故障诊断-波形分析 async def scope_analyse_service(pid, create_time): check_dt = time_format.get_datetime_str(create_time) data = await detail_data_by_es(pid, check_dt) if not data: log.info(f"波形分析 没有数据 pid:{pid},create_time:{create_time}") return success_res(code=RET.not_data, msg="没有找到该数据") point_name = await get_point_name_dao(pid) try: async with QsClient() as qs: context = await qs.get_object(data["url"]) for k, v in context.items(): context[k] = [value if not math.isnan(value) else '' for value in v] if "lc" in context: del context["lc"] if "pttl" in context: del context["pttl"] except Exception as e: log.error(f"录波地址无效 url:{data['url']} message:{str(e)}") return success_res(code=RET.not_data, msg="录波地址有误") log.info( f"波形分析 pid:{pid},create_time:{create_time}, type:{data['fault_type']}") res = data.get("index_loc") try: res_dic = json.loads(res) trigger_point = [d for d in res_dic.values()][0].get("location") except AttributeError as e: log.error( f"录波出发位置有误 index_loc:{data['index_loc']} message:{str(e)}") trigger_point = 0 # 漏电流 if data["fault_type"] in ("over_res_cur", "overResidualCurrent"): threhold = await get_threhold(data["cid"], data["sid"]) # result = leakage_reg(ileak_rms=context["ileak_rms"], # leak_hold=threhold) # log.info(f"actionFile 漏电流 结论:{result}") trigger_params = ["漏电流越限"] reason = [{"title": "漏电流越限", "probability": 1, "suggest": "逐级排查找出漏电故障点修复,需做好防护措施防止触电"}] # if result == "漏电流越限": # reason = [{"title": "漏电流越限", "probability": 1, # "suggest": "逐级排查找出漏电故障点修复,需做好防护措施防止触电"}] # else: # reason = [] return LeakageCurrentResp(point_id=data["point_id"], point_name=point_name["name"], trigger_time=check_dt, trigger_point=trigger_point, trigger_params=trigger_params, reason=reason, contents=context) else: try: ctnum = 2 if "uab" in context else 3 result = actionFile(context, ctnum) log.info(f"actionFile 波形分析 结论 result:{result}") except: result = None fina_reason = [] if isinstance(result, list): record_type_name, probability, reason = result[0] trigger_params = reason.split("相") if len(trigger_params[0]) <= 2: trigger_params[0] = trigger_params[0] + "相" for r in result: temp = r[0].replace("A", "").replace("B", "").replace("C", "") fina_reason.append( {"title": r[0], "probability": round(float(r[1]), 4) if r[1] else "", "suggest": REASON_DICT.get(temp) }) # elif isinstance(result, str): # trigger_params = result.split("相") # if len(trigger_params[0]) <= 2: # trigger_params[0] = trigger_params[0] + "相" fault_type = await get_trigger_params_dao(data["fault_type"]) trigger_params = [fault_type.get("name")] event = await event_data_by_es(data["mtid"], check_dt, data["fault_type"]) if event: trigger_params.insert(0, f'{event.get("phase")}相') logging.info(f"pid:{pid},create_time:{create_time}, " f"fault_type:{data['fault_type']}, result:{result}") return ScopeAnalyseResp(point_id=pid, point_name=point_name["name"], trigger_time=check_dt, trigger_params=trigger_params, trigger_point=trigger_point, reason=fina_reason, contents=context) async def get_point_dict(cid): point_list = await select_point_dao(cid) point_dict = {} for point in point_list: point_dict[point["pid"]] = point["name"] return point_dict async def get_threhold(cid, sid): try: # mtid_info = await get_mtid_dao(data["cid"], data["sid"]) mtid_info = await get_mtid_dao(cid, sid) log.info(f"scope_record_service mtid:{mtid_info}") location = await get_location_id_dao(mtid_info["mtid"]) log.info(f"scope_record_service location:{location}") threhold = await get_threhold_dao(location["id"]) except: threhold = 30 return threhold REASON_DICT = { "单相断线": "请尽快找出断线故障点修复,避免三相负载长时间缺相运行", "单相接地": "逐级排查找出接地故障点修复,带电排查需疏散无关人员并做好防护措施防止触电", "两相短路": "请立即将短路故障点上级开关断开挂牌,再进行短路故障原因排查修复," "防止有人重复送电造成触电及二次短路事故", "两相接地": "逐级排查找出接地故障点修复,带电排查需疏散无关人员并做好防护措施防止触电", "三相短路": "请立即将短路故障点上级开关断开挂牌,再进行短路故障原因排查修复," "防止有人重复送电造成触电及二次短路事故", "三相断线": "请尽快找出断线故障点修复,排查需疏散无关人员并做好防护措施防止触电", "两相断线": "请尽快找出断线故障点修复,避免三相负载长时间缺相运行", "漏电越限": "逐级排查找出漏电故障点修复,需做好防护措施防止触电" }