import datetime import json import io import pandas as pd from pot_libs.qingstor_util.qs_client import QsClient from unify_api.modules.zhiwei_u.procedures.scope_operations_pds import \ get_event_info_by_event_id, get_scope_info_pds from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin from unify_api.modules.zhiwei_u import config from unify_api.utils import time_format from pot_libs.utils.exc_util import DBException from dataclasses import fields from unify_api.modules.zhiwei_u.dao.data_es_dao import get_scope_pids, \ get_search_scope from unify_api.modules.zhiwei_u.dao.order_operations_dao import \ select_cname_by_cid from unify_api.modules.zhiwei_u.dao.scope_operations_dao import \ sid_to_order_dao, select_sid_by_pid, select_cids_by_prod_id, \ select_cname_by_cids, select_sid_by_pids from unify_api.modules.zhiwei_u.components.scope_operations_cps import \ SearchScopeResq, Alarm from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username from pot_libs.common.components.responses import Success from unify_api.modules.electric.procedures.electric_util import get_wiring_type from pot_libs.logger import log from unify_api.modules.zhiwei_u.components.scope_operations_cps import \ ScopeDetailsResponse, ScopeContent from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg from unify_api.modules.scope_analyse.service.scope_analyse_service import \ get_threhold async def search_scope_service(prod_id, cid, pid, sid, page_num, page_size, start, end): start_num = (page_num - 1) * page_size if not pid and sid: pids_datas = await sid_to_order_dao(sid) pids = [data["pid"] for data in pids_datas] # todo databases should be point_1min_scope, not point_1min_event datas, total = await get_scope_pids(pids, start, end, page_size, start_num) else: if prod_id and not cid and not pid: cid_list = await select_cids_by_prod_id(prod_id) cid = [c["cid"] for c in cid_list] elif cid: cid = [cid] datas, total = await get_search_scope(cid, pid, start, end, page_size, start_num) if not datas: return SearchScopeResq(total=0, rows=[]) pids, cids = [], [] for info in datas: pids.append(info["pid"]) cids.append(info["cid"]) companys = await select_cname_by_cids(list(set(cids))) points = await select_sid_by_pids(list(set(pids))) rows = [] for info in datas: cid = info["cid"] for index, c in enumerate(companys): if cid == c["cid"]: company = companys[index] break else: company = [] pid = info["pid"] message = info["message"] check_dt = info["event_datetime"] dt = time_format.convert_to_dt(check_dt) check_dt = time_format.convert_dt_to_timestr(dt) for index, p in enumerate(points): if pid == p["pid"]: point = points[index] break else: point = [] doc_id = str(info["id"]) alarm = Alarm(check_dt=check_dt, prod_id=company.get("product"), product=config.PRODUCT[company.get("product")], cid=cid, shortname=company.get("shortname"), pid=pid, point=point.get("name"), sid=point.get("sid"), message=message, doc_id=doc_id ) rows.append(alarm) return SearchScopeResq(total=total, rows=rows) async def load_context(url): async with QsClient() as client: context_bytes = await client.get_file(key=url) context = json.loads(context_bytes.decode("utf-8")) return context async def scope_detail_service(id, wave_range, download=None, user_id=None): # 1.根据es_id查询point_1min_event对应的point属性 try: mysql_results = await get_event_info_by_event_id(id) except Exception as e: log.error(f"query error {e}") return ScopeDetailsResponse().db_error() try: source = mysql_results point_name = source.get("name") point_id = source.get("point_id") mtid = source.get("mtid") cmp_time = source.get("time") cid = source.get("cid") company = await select_cname_by_cid(cid) message = source.get("message") check_dt = source.get("datetime") dt = time_format.convert_to_dt(check_dt) check_dt = time_format.convert_dt_to_timestr(dt) point = await select_sid_by_pid(point_id) except Exception: log.warning(f"query err {id}") raise DBException # 4.接线法:二表/三表 ctnum, _ = await get_wiring_type(point_id) if not ctnum: ctnum = 3 try: mysql_results = await get_scope_info_pds(mtid, check_dt) except Exception as e: log.error(f"es query error {e}") return ScopeDetailsResponse().db_error() if not mysql_results: alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid, point=point_name, sid=point["sid"], message=message, prod_id=company["product"], product=config.PRODUCT[company["product"]], shortname=company["shortname"], contin_time=0, result="暂无数据") return ScopeDetailsResponse( alarm=alarm, ctnum=ctnum, group=point_name, item={}, scope_type="", type="", date_time="", contents={} ) source = mysql_results scope_type = source.get("fault_type") fault_type = config.EVENT_TYPE_MAP.get(scope_type) date_time = datetime.datetime.strftime(source.get("datetime"), "%Y-%m-%d %H:%M:%S") position = list(json.loads(source.get("result")).keys()) position_tmp = [ pos + "相" if len(pos) == 1 else pos + "线" if len(pos) == 2 else '' for pos in position] position_tmp = [p for p in position_tmp if p] # 去掉空字符串 position_str = "|".join(position_tmp) # 3.曲线数据 res_dic = json.loads(source.get("result")) location = [d for d in res_dic.values()][0].get("location") wave_data = await load_context(source.get("context_url")) length = wave_data.get("ia") or wave_data.get("ileak_rms") contin_time = len(length) if wave_range == "100ms": if location <= 200: for key, value in wave_data.items(): wave_data[key] = value[:400] elif location >= 1400: for key, value in wave_data.items(): wave_data[key] = value[1200:] else: for key, value in wave_data.items(): wave_data[key] = value[location - 200:location + 200] scope_content = ScopeContent( **{k: v for k, v in wave_data.items() if k in [ for field in fields(ScopeContent)]} ) # 下载报表 if download: download_auth = None if user_id: user_info = await get_username(user_id) download_auth = user_info.get("role", None) if scope_type == "over_res_cur": wave_data = { "序号": [i for i in range(1, contin_time + 1)], "ileak_rms": wave_data.get("ileak_rms"), "ileak": wave_data.get("ileak"), } table_name = f'{company["shortname"]}_{point_name}_' \ f'{check_dt[:10]}_漏电流录波' return await excl_download(wave_data, table_name, "sheet1") elif download_auth == 2: wave_data = { "序号": [i for i in range(1, contin_time + 1)], "ia": wave_data.get("ia"), "ib": wave_data.get("ib"), "ic": wave_data.get("ic"), "ua": wave_data.get("ua"), "ub": wave_data.get("ub"), "uc": wave_data.get("uc"), } table_name = f'{company["shortname"]}_{point_name}_' \ f'{check_dt[:10]}_电压录波/电流录波' return await excl_download(wave_data, table_name, "sheet1") else: return Success(success=0, message="您的权限不足") # 结论分析 try: log.warning(f"wave_data:{wave_data}") if scope_type == "over_res_cur": threhold = await get_threhold(cid, point["sid"]) result = leakage_reg(ileak_rms=wave_data["ileak_rms"], leak_hold=threhold)"actionFile 漏电流 sid:{point['sid']}结论:{result}") else: res = actionFilemin(wave_data, ctnum) log.warning(f"结论分析 res:{res}") if res == "nofault": result = "不存在故障" else: result = "" for r in res: result += "%s : %0.0f%%;" % (r[0], r[1] * 100) except Exception as e: result = "" log.error(f"actionFile error:{e}") alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid, point=point_name, sid=point["sid"], message=message, prod_id=company["product"], product=config.PRODUCT[company["product"]], shortname=company["shortname"], contin_time=int(contin_time * 0.25), result=result) return ScopeDetailsResponse( alarm=alarm, ctnum=ctnum, group=point_name, item=position_str, scope_type=scope_type, type=fault_type, date_time=date_time, location=location, contents=scope_content ) async def excl_download(dict_data, table_name, sheet_name): output = io.BytesIO() excel_name = table_name + '.xlsx' writer = pd.ExcelWriter(output, engine='xlsxwriter') df = pd.DataFrame(dict_data) df.to_excel(writer, sheet_name=sheet_name, index=False) return output, excel_name async def dataframe_excl_download(data, table_name, sheet_name="sheet1"): output = io.BytesIO() excel_name = table_name + '.xlsx' writer = pd.ExcelWriter(output, engine='xlsxwriter') data.to_excel(writer, sheet_name=sheet_name, index=False) return output, excel_name