import asyncio import datetime import json from pot_libs.qingstor_util.qs_client import QsClient from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao from unify_api.modules.anshiu.dao.scope_operations_dao import \ get_scope_url_by_pid from unify_api.utils.log_utils import LOGGER async def get_scope_config_by_pid(pid): ''' 获取某个设备的录波配置 ''' async with MysqlUtil() as conn: sql = "select threshold from scope_config_record where pid = %s" result = await conn.fetchone(sql=sql, args=(pid,)) return result.get('threshold', {}) if result else {} async def set_scope_config_by_pid(pid, config): ''' 设置录波配置 ''' try: async with MysqlUtil() as conn: sql = "update scope_config_record set threshold=%s where pid=%s" result = await conn.execute(sql=sql, args=(config, pid)) except Exception as e: log.error('set_scope_config_by_pid update error:' + str(e)) return -1 return result async def add_scope_config_by_pid(pid, configs): ''' 增加录波配置 ''' try: async with MysqlUtil() as conn: sql = "INSERT INTO `power_iot`.`scope_config_record` (`pid`, " \ "`coef`, `threshold`, `vol_cur`) VALUES (%s, %s, %s, %s)" result = await conn.execute(sql=sql, args=(pid, json.dumps( configs["coef"]), json.dumps(configs[ "threshold"]), json.dumps(configs))) except Exception as e: log.error('add_scope_config_by_pid add error:' + str(e)) return -1 return result async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"): """ ES查询2s录波记录数据 """ mtid = await get_mtid_by_pid_dao(pids) mtid = mtid.get('mtid') # 获取录波曲线数据 scope_url_data = await get_scope_url_by_pid(mtid, start_dt, end_dt) scope_list = [] starttime_wm = datetime.datetime.now() print(f'起始时间:{starttime_wm}') task_list = [] for scope_data in scope_url_data: try: url = scope_data.get("url") task_list.append(get_qs_datas(url)) except Exception as e: LOGGER.error(f"scope_detail_service error message:{str(e)}") return [] qs_dats = await asyncio.gather(*task_list) qs_data_map = {data[0]: data[1] for data in qs_dats} for scope_data in scope_url_data: url = scope_data.get("url") wave_data = qs_data_map[url] [scope_list.append({"datetime": scope_data['datetime'], "ua": wave_data['ua'][i], "ub": wave_data['ub'][i], "uc": wave_data['uc'][i], "ia": wave_data['ia'][i], "ib": wave_data['ib'][i], "ic": wave_data['ic'][i], "lc": wave_data['lc'][i], "pttl": wave_data['pttl'][i]}) for i in range(len(wave_data.get('ua')))] end_time_wm = datetime.datetime.now() print(f'结束时间:{end_time_wm}') return scope_list async def get_qs_datas(url): async with QsClient() as qs: wave_data = await qs.get_object(url) return url, wave_data