import json import math from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.qingstor_util.qs_client import QsClient from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg from unify_api.utils.log_utils import LOGGER from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil from pot_libs.aredis_util import aredis_utils from pot_libs.common.components.query import PageRequest, Filter, InGroup, \ Range, Equal, Sort from pot_libs.es_util.es_query import EsQuery from pot_libs.es_util.es_utils import EsUtil from pot_libs.logger import log from pot_libs.utils.exc_util import BusinessException from pot_libs.utils.pendulum_wrapper import my_pendulum from unify_api.modules.anshiu.components.scope_operations_cps import \ ScopeListItem, ScopeContent, ScopeDetailsResp, GetScopeConfigList, \ init_scope_config_example, ScopeItemDownload, ScopeDetail, ScopeDetails from unify_api.modules.anshiu.dao.scope_operations_dao import \ get_scope_event_by_event_id, get_scope_detail_by_pid, \ get_threshold_by_mtid, get_e_type_by_event_type from unify_api.modules.anshiu.procedures.scope_operations_pds import \ get_scope_config_by_pid, set_scope_config_by_pid, add_scope_config_by_pid, \ get_scope_list_by_pid from unify_api.modules.device_cloud.procedures.mqtt_helper import \ change_param_to_config from unify_api.modules.electric.procedures.electric_util import \ load_point_ctnum from unify_api.modules.zhiwei_u import config from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope from unify_api.utils.time_format import get_time_duration, \ get_current_datetime_str, convert_str_to_timestamp async def search_scope_service(pids, cid, page_num, page_size, start, end, scope_g): ''' 获取录波记录 ''' datas, total = await query_search_scope(cid, pids, page_num, page_size, start, end, scope_g) return datas, total["total"] async def scope_list_download_data(pids, start, end): """ 获取录波下载数据 """ duration = get_time_duration(start, end, False, False) if duration > 24 * 60 * 60: raise BusinessException(message='只能支持下载一天的录波数据') datas = await get_scope_list_by_pid(pids, start, end) return datas async def scope_detail_data(doc_id): ''' 识别详情 ''' # 暂时先默认查询所有的 wave_range = "all" scope_detail_obj = await scope_detail_service(doc_id, wave_range) # 录波颗粒度 scope_type = scope_detail_obj.scope_g if scope_type == '200ms': scope_type = '0.2s' if scope_detail_obj.ctnum == 2: i_fields = ['ia', 'ic'] v_fields = ['uab', 'ucb'] else: i_fields = ['ia', 'ib', 'ic'] v_fields = ['ua', 'ub', 'uc'] i = [ScopeContent(item=item, value_datas=getattr( scope_detail_obj.contents, item, [])) for item in i_fields] v = [ScopeContent(item=item, value_datas=getattr( scope_detail_obj.contents, item, [])) for item in v_fields] # 漏电流及功率 if scope_type == '2s': residual_current = [float(str(v).replace('nan', '0')) for v in scope_detail_obj.contents.lc] residual_current = [ ScopeContent(item='漏电流', value_datas=residual_current)] power = [float(str(v).replace('nan', '0')) for v in scope_detail_obj.contents.pttl] power = [ScopeContent(item='总有功功率', value_datas=power)] else: residual_current = [] power = [] # 当触发项为漏电流或者功率的时候 item为空 if scope_detail_obj.type.find('漏电流') >= 0 or scope_detail_obj.type.find( '功率') >= 0: item = '' else: item = scope_detail_obj.item return ScopeDetailsResp(point=scope_detail_obj.alarm.point, contin_time=scope_detail_obj.alarm.contin_time, scope_type=scope_type, check_dt=scope_detail_obj.alarm.check_dt, ctnum=scope_detail_obj.ctnum, location=scope_detail_obj.location, item=item, type=scope_detail_obj.type, i=i, v=v, residual_current=residual_current, p=power ) async def get_scope_type(pids, cid, start, end): ''' 获取录波颗粒度 ''' if len(pids) == 0: equal = [Equal(field="cid", value=cid)] groups = [] else: equal = [] groups = [InGroup(field='point_id', group=pids)] if start and end: start_time = convert_str_to_timestamp(start) end_time = convert_str_to_timestamp(end) range = [Range(field='time', start=start_time, end=end_time)] else: range = [] filter = Filter(equals=equal, keywords=[], ranges=range, in_groups=groups) page_request = PageRequest(page_size=10000, page_num=1, sort=None, filter=filter) query_body = EsQuery.query(page_request) query_body['_source'] = ["point_id", "time", "scope_g"] rows = {} async with EsUtil() as es: es_results = await es.search_origin( body=query_body, index=config.POINT_1MIN_SCOPE) if not es_results["hits"]["hits"]: return {} for info in es_results["hits"]["hits"]: scope_type_key = "%s_%s" % (info["_source"]["point_id"], info["_source"]["time"]) rows[scope_type_key] = info["_source"].get("scope_g", '0.25ms') or '0.25ms' return rows async def set_scope_config_serv(pid, type, scope_type, args): ''' 设置某个设备的录波配置 ''' # 先查看缓存中是否存在,如果存在直接返回 redis_key = f"scope_config:{pid}" redis_result = await RedisUtils().get(redis_key) log.info(f'set_scope_config_serv redis_result:{redis_result}') if redis_result: raise Exception('两次操作间隔3分钟,请稍后再试。') # 先获取录波的配置 configs = await get_scope_config_by_pid(pid) # 如果存在,则解析,如果不存在,初始化 if configs: configs = json.loads(configs) if not configs or '2s' not in configs or '0.25ms' not in configs or \ '0.2s' not in configs: try: configs = await init_scope_config(pid) except Exception as e: raise Exception('初始化配置信息出错:' + str(e)) # # 重置配置信息 if type == 'time': if scope_type != '2s': raise Exception('只有录波颗粒度为2s的支持时间设置') # 时间配置 for field, item in args.items(): if field == 'one_time': start_field, stop_field = 'start_time_I', 'stop_time_I' elif field == 'two_time': start_field, stop_field = 'start_time_II', 'stop_time_II' elif field == 'three_time': start_field, stop_field = 'start_time_III', 'stop_time_III' else: continue # 验证时间差 if len(item) not in [0, 2]: log.error('set_scope_config_serv time_param error' + str(item)) raise Exception('参数错误') if len(item) == 2 and (item[0] or item[1]): try: today = get_current_datetime_str("%Y-%m-%d") duration = get_time_duration('%s %s:00' % (today, item[0]), '%s %s:00' % (today, item[1]), False, False) except Exception as e: log.error('set_scope_config_serv time_format error' + str(item)) raise Exception('时间格式错误') if duration > 3600: log.error( 'set_scope_config_serv time_format error' + str(item)) raise Exception('时间差不能大于一个小时') start_value = item[0] if len(item) == 2 and item[0] else '00:00' stop_value = item[1] if len(item) == 2 and item[1] else '00:00' configs[scope_type][start_field] = f'{start_value}:00' configs[scope_type][stop_field] = f'{stop_value}:00' else: # 其他配置 if type == 'state': # 状态对应字段 state_fields = {'0.25ms': 'en_scope', '0.2s': 'en_wave_200ms', '2s': 'en_electric_2s'} args[state_fields[scope_type]] = args['state'] for field, item in args.items(): if field in configs[scope_type]: configs[scope_type][field] = item # 配置下发(只有正式环境才开启) await set_mqtt_scope_config(pid, configs, scope_type) # 将数据存入缓存中 await RedisUtils().setex(redis_key, 180, pid) # 保存配置信息 new_configs = json.dumps(configs) update_count = await set_scope_config_by_pid(pid, new_configs) if update_count < 0: log.error(f'set_scope_config_serv update sql error pid:{pid},' f'new_configs={new_configs}') raise Exception('操作失败') async def get_scope_config_serv(pid): ''' 获取录波配置 ''' configs = await get_scope_config_by_pid(pid) # 如果存在,则解析, if configs: configs = json.loads(configs) # 如果不存在,初始化 if not configs or '2s' not in configs or '0.25ms' not in configs or \ '0.2s' not in configs: try: configs = await init_scope_config(pid) except Exception as e: configs = init_scope_config_example["threshold"] # raise Exception('初始化配置信息出错:' + str(e)) # 格式化数据返回 return_data = {} # 字段解释 fields = { 'umin': ['越下限阈值', 'v'], 'umax': ['越上限阈值', 'v'], 'ugap': ['波动阈值', 'v'], 'imax': ['越限阈值', 'i'], 'igap': ['波动阈值', 'i'], 'lcmax': ['越限阈值', 'residual_current'], 'lcgap': ['波动阈值', 'residual_current'], 'pttlmax': ['越限阈值', 'power'], 'pttlgap': ['波动阈值', 'power'], } for scope_type, config in configs.items(): one_config = {} for field, data in fields.items(): if field in config: one_config[field] = {'name': data[0], 'type': data[1], 'value': config.get(field)} # 状态显示 if scope_type == '0.25ms': state = config.get('en_scope', 0) elif scope_type == '0.2s': state = config.get('en_wave_200ms', 0) elif scope_type == '2s': state = config.get('en_electric_2s', 0) else: state = config.get('state', 0) # 时间段显示 if scope_type == '2s': config = {conf_key: conf_v.replace('00:00:00', '')[0:5] for conf_key, conf_v in config.items() if isinstance( conf_v, str)} one_config['one_time'] = {'name': '第一个时间段', 'type': 'time', 'value': [config.get('start_time_I'), config.get('stop_time_I')]} one_config['two_time'] = {'name': '第二个时间段', 'type': 'time', 'value': [config.get('start_time_II'), config.get('stop_time_II')]} one_config['three_time'] = {'name': '第三个时间段', 'type': 'time', 'value': [config.get('start_time_III'), config.get('stop_time_III')]} one_data = GetScopeConfigList(state=state, configs=one_config) return_data[scope_type] = one_data return return_data async def init_scope_config(pid, update_sql=True): ''' 初始化录波配置 ''' # 获取配置信息 new_configs = await get_mqtt_scope_config(pid) # 更新数据库配置信息 if update_sql: configs = await get_scope_config_by_pid(pid) if configs: update_count = await set_scope_config_by_pid(pid, json.dumps( new_configs[ "threshold"])) else: update_count = await add_scope_config_by_pid(pid, new_configs) if update_count < 0: log.error(f'set_scope_config_serv update sql error pid:{pid},' f'new_configs={new_configs}') return new_configs["threshold"] async def get_mqtt_scope_config(pid): ''' 获取录波配置信息 ''' # 封装数据格式 pub_dict = dict() pub_dict['point_id'] = pid pub_dict['scope_type'] = '' pub_json = await change_param_to_config(pub_dict, method='get', type='scope') if not pub_json: log.error( 'get_mqtt_scope_config pub_json error pid:' + str(pid)) raise Exception('找不到设备信息') # 从协议中获取配置信息 res_data = await mqtt_func(pub_json, pid) if not res_data.get('data'): raise Exception('请求数据为空') sid = pub_json.get("sid") # 转换数据字段 trans_fields = {"0.25ms": {"scopeEnable": "en_scope", "scoplimit": "scop_limit"}, "0.2s": {"scopeEnable": "en_wave_200ms", "scoplimit": "scop_limit"}, "2s": {"scopeEnable": "en_electric_2s"}} # 封装成数据库保存格式 configs = { "0.25ms": res_data.get('data').get("scope").get("electric").get( sid).get("threshold"), "0.2s": res_data.get('data').get("wave_200ms").get("electric").get( sid).get("threshold"), "2s": res_data.get('data').get("electric_2s").get("electric").get( sid).get("threshold"), } for type, config in configs.items(): trans_field = trans_fields.get(type) if trans_field: for key, trans_key in trans_field.items(): if key in config: configs[type][trans_key] = config[key] del configs[type][key] return { "coef": res_data.get('data').get("scope").get("electric").get( sid).get("coef"), 'threshold': configs } async def set_mqtt_scope_config(pid, configs, scope_type): ''' 设置录波配置信息 ''' # 封装配置下发信息 pub_dict = configs[scope_type] pub_dict['point_id'] = pid pub_dict['scope_type'] = scope_type pub_json = await change_param_to_config(pub_dict, method='config', type='scope') if not pub_json: log.error( 'set_scope_config_serv pub_json error' + str(configs[scope_type])) raise Exception('找不到设备信息') # 将配置信息下发 await mqtt_func(pub_json, pid) async def mqtt_func(pub_json, pid): ''' 请求mqtt服务器 ''' try: async with MqttUtil() as emq: res_data = await emq.device_response( sid=pub_json.get("sid"), request_id=pub_json.get("request_id"), data=pub_json ) except TimeoutError: log.error('mqtt_func timeout error pid' + str(pid)) raise Exception('请求超时') except Exception as e: log.error('mqtt_func error:' + str(e)) raise Exception('请求错误') log.info(f'mqtt_func res_data:{res_data}') if res_data.get('status_code') != 200: log.error('mqtt_func mqtt error pid' + str(pid)) raise Exception('请求失败') return res_data async def flush_scope_es_data(scope_g, start_time, end_time): ''' 刷新es的颗粒度 ''' equal = Equal(field='scope_g', value=scope_g) sort = Sort(field='time', direction='desc') start_dt = my_pendulum.from_format(start_time, 'YYYY-MM-DD HH:mm:ss') end_dt = my_pendulum.from_format(end_time, 'YYYY-MM-DD HH:mm:ss') while start_dt < end_dt: try: # 一天取一次 start_timestamp = start_dt.timestamp() print(start_dt.strftime('YYYY-MM-DD HH:mm:ss')) every_end_dt = start_dt.add(days=1) end_timestamp = every_end_dt.timestamp() range = Range(field='time', start=start_timestamp, end=end_timestamp) # 先查出来 filter = Filter(equals=[equal], keywords=[], ranges=[range], in_groups=[]) page_request = PageRequest(page_size=10000, page_num=1, sort=sort, filter=filter) query_body = EsQuery.query(page_request) query_body['_source'] = ['_id'] async with EsUtil() as es: search_results = await es.search_origin( body=query_body, index=config.POINT_1MIN_SCOPE ) if not search_results or not search_results['hits']['hits']: continue doc_ids = [data['_id'] for data in search_results['hits']['hits']] # 然后再刷新 update_body = { "query": { "bool": { "must": { "terms": { "doc_id.keyword": doc_ids } } } }, "script": { "source": "ctx._source.scope_g = '%s'" % scope_g } } async with EsUtil() as es: update_results = await es.update_by_query( body=update_body, index=config.SCOPE_DATABASE ) if not update_results or update_results.get('updated') > 0: log.error(f"flush_scope_es_data update_by_query count error:" f"{update_results.get('updated')}") except Exception as e: log.error(f"flush_scope_es_data error :{str(e)}") continue finally: # 递增 start_dt = every_end_dt # 1min_event 里面的event_datetime 对应 tdengine 里面scope的ts_origin async def scope_detail_service(event_id): # 获取报警信息 event_data = await get_scope_event_by_event_id(event_id) if not event_data: return {}, [], [], [], [], [], [], [], [] event_datetime = str(event_data.get("event_datetime")) pid = event_data.get("pid") mtid = event_data.get("mtid") sid = event_data.get("sid") event_type = event_data.get("event_type") # 获取录播数据 scope_data = await get_scope_detail_by_pid(pid, event_datetime) if not scope_data or not scope_data.get("url"): return {}, [], [], [], [], [], [], [], [] # 接线法:二表/三表 ctnum = await load_point_ctnum(pid) or 3 # 查询录波详细数据 还不确定在哪儿拿,现在脚本是minio里边,但是家义说未来要在青云里面 # 测试数据存储 # filedata/electric_ops/scope/A2004000519/2022/6/27/17/1656321462.json # 获取录波曲线数据 try: async with QsClient() as qs: url = scope_data.get("url") wave_data = await qs.get_object(url) except Exception as e: LOGGER.error(f"scope_detail_service error message:{str(e)}") return {}, [], [], [], [] # 录波颗粒度 scope_g = scope_data.get("scope_g") if ctnum == 2: i_fields = ['ia', 'ic'] v_fields = ['uab', 'ucb'] else: i_fields = ['ia', 'ib', 'ic'] v_fields = ['ua', 'ub', 'uc'] u_list, i_list = [], [] residual_current, power = [], [] u_count, i_count, lc_count, power_count = 0, 0, 0, 0 for k, v in wave_data.items(): v = [value if not math.isnan(value) else '' for value in v] if k in i_fields: i_list.append(ScopeDetails(item=k, value_datas=v)) u_count = len(v) if k in v_fields: u_list.append(ScopeDetails(item=k, value_datas=v)) i_count = len(v) # 2s颗粒度的会有漏电流及功率 if k in ("lc", "ileak_rms"): residual_current.append(ScopeDetails(item='漏电流', value_datas=v)) lc_count = len(v) if k == "pttl": power.append(ScopeDetails(item='总有功功率', value_datas=v)) power_count = len(v) # 结论分析 result = await get_scope_conclusion(wave_data, event_type, mtid, sid, ctnum) # 返回信息 data = dict() data["name"] = event_data.get("name") data["sid"] = sid data["event_datetime"] = str(event_data.get("event_datetime")) # data["duration"] = event_data.get("duration") if scope_g == '0.25ms': # data["duration"] = "400ms" data["duration"] = str(int(i_count * 0.25)) + "ms" elif scope_g == '200ms': data["duration"] = "10s" else: data["duration"] = "10min" data["message"] = event_data.get("message") data["conclusion"] = result data["scope_g"] = scope_g event_type = event_data.get("event_type") e_type = await get_e_type_by_event_type(event_type) data["type"] = e_type["type"] data["ctnum"] = ctnum data["item"] = event_data["phase"] index_loc = json.loads(scope_data.get("index_loc")) data["location"] = index_loc[f"{data['item']}"][ 'location'] if index_loc.get( data['item']) else index_loc.get("index_loc") return data or {}, u_list, i_list, residual_current, power, async def get_scope_conclusion(wave_data, event_type, mtid, sid, ctnum): """ 结论分析 :param wave_data: :param event_type: :param mtid: :param sid: :param ctnum: :return: """ try: lc_data = wave_data.get("ileak_rms") or wave_data.get("lc") LOGGER.info(f"wave_data:{wave_data}") if event_type == "over_res_cur": threshold = await get_threshold_by_mtid(mtid) or 30 result = leakage_reg(ileak_rms=lc_data, leak_hold=threshold) LOGGER.info(f"actionFile 漏电流 sid:{sid}结论:{result}") else: res = actionFilemin(wave_data, ctnum) LOGGER.info(f"结论分析 res:{res}") if res == "nofault": result = "不存在故障" else: result = "" for r in res: result += "%s : %0.0f%%;" % (r[0], r[1] * 100) except Exception as e: result = "" LOGGER.error(f"actionFile error:{e}") return result