import asyncio import hashlib import json import time import aioredis from pot_libs.aredis_util.aredis_utils import RedisClient, RedisUtils from pot_libs.settings import SETTING from pot_libs.utils import time_format from unify_api import constants from unify_api.constants import POINT_LEVEL_MAP from unify_api.modules.adio.components.adio import AdioIndex from unify_api.modules.adio.components.adio_card_cps import AcResp from unify_api.modules.adio.dao.adio_card_dao import \ monitor_location_join_by_locations, alarm_setting_by_locations from unify_api.modules.adio.dao.adio_dao import get_location_15min_dao, \ get_location_dao from unify_api.modules.common.procedures.list_point_pds import \ monitor_map_point_location from unify_api.utils.common_utils import round_2 ADIO_CURRENT = "adio_current" async def post_adio_card_service(location_list, cid): """安全监测-卡片信息-level""" # 构造cid下每个monitor包含的point和location monitor_p_dic, monitor_l_dic = await monitor_map_point_location(cid) # 1. 获取每个location_id的详细信息, mysql的in查询,打乱了location_list顺序 monitor_location_list = await monitor_location_join_by_locations( location_list) # 2. 查询温度漏电流阈值 alarm_setting_list = await alarm_setting_by_locations(location_list) alarm_dic = {alarm["lid"]: alarm for alarm in alarm_setting_list} # 3. 根据location_list查询redis # multi_exec查询redis redis = await aioredis.create_redis_pool(SETTING.redis_single) tr = redis.multi_exec() for location_id in location_list: tr.hget(ADIO_CURRENT, location_id) res_redis = await tr.execute() # 3. 构造返回 ret_data = { "inline": {}, "transformer": {}, "feeder": {}, "power_dist": {}, "device": {} } i = 0 for info in monitor_location_list: m_name = info.get("name") mtid = info.get("mtid") m_type = POINT_LEVEL_MAP.get(info.get("m_type")) location_id = info.get("lid") location_type = info.get("ad_type") location_item = info.get("item") # redis数据 adio_value = "" redis_data = res_redis[i] i += 1 if redis_data: adio_info = json.loads(redis_data) time_now = int(time.time()) real_tt = adio_info.get("timestamp", 0) if (time_now - real_tt) <= constants.REAL_EXP_TIME: adio_value = round(adio_info.get("value", 0), 2) threshold = alarm_dic[location_id]["threshold"] if m_name in ret_data[m_type]: if location_type == "temperature": ret_data[m_type][m_name].setdefault(location_type, []).append( {"item": location_item, "value": adio_value} ) # 阈值取最小数值 if threshold < ret_data[m_type][m_name]["t_threshold"]: ret_data[m_type][m_name]["t_threshold"] = threshold else: ret_data[m_type][m_name].setdefault(location_type, []).append( {"item": "漏电流", "value": adio_value} ) ret_data[m_type][m_name]["r_threshold"] = threshold # 卡片增加location_id字段 ret_data[m_type][m_name]["location_id"].append(location_id) else: if location_type == "temperature": ret_data[m_type][m_name] = { "name": m_name, "t_threshold": threshold, "temperature": [ {"item": location_item, "value": adio_value}] } else: ret_data[m_type][m_name] = { "name": m_name, "r_threshold": threshold, "residual_current": [ {"item": "漏电流", "value": adio_value}] } # 卡片增加location_id字段 ret_data[m_type][m_name]["location_id"] = [location_id] # 卡片增加point_id字段 ret_data[m_type][m_name]["point_id"] = monitor_p_dic[mtid] # 返回内容,转换为list for key, value in ret_data.items(): if value: ret_data[key] = [j for i, j in value.items()] else: ret_data[key] = [] return AcResp( inline=ret_data["inline"], transformer=ret_data["transformer"], feeder=ret_data["feeder"], power_dist=ret_data["power_dist"], device=ret_data["device"], ) async def adio_index_service(location_group, start, end): # # load location表信息 location_info = await get_location_dao(location_group) intervel, _ = time_format.time_pick_transf(start, end) tasks = [get_location_15min_service( location_info, lid, start, end, intervel) for lid in location_group] adio_index_result = await asyncio.gather(*tasks) datas = list(adio_index_result) adio_indexes = [] for data in datas: adio_index = AdioIndex(**data) adio_indexes.append(adio_index) return adio_indexes async def get_location_15min_service(location_info, lid, start, end, intervel): value_max, value_min, value_avg = [], [], [] value_max_time, value_min_time = [], [] if intervel == 24 * 3600: table_name = "location_1day_aiao" else: table_name = "location_15min_aiao" datas = await get_location_15min_dao(lid, start, end, table_name) for data in datas: value_max.append(data.get("value_max")) value_min.append(data.get("value_min")) value_avg.append(data.get("value_avg")) value_max_time.append(data.get("value_max_time")) value_min_time.append(data.get("value_min_time")) if value_max: value_max_max = max([m for m in value_max]) value_max_max_index = value_max.index(value_max_max) value_max_time_data = value_max_time[value_max_max_index] else: value_max_max, value_max_time_data = "", "" if value_min: value_min_min = min([m for m in value_min]) value_min_min_index = value_min.index(value_min_min) value_min_time_data = value_min_time[value_min_min_index] else: value_min_min, value_min_time_data = "", "" value_avg_list = [m for m in value_avg] value_avg_data = sum(value_avg_list) / len(value_avg_list) \ if value_avg_list else 0 adio_index = dict({ "type": location_info[lid]["type"], "item": location_info[lid]["item"], "max": round_2(value_max_max), "max_time": str(value_max_time_data) if value_max_time_data else "", "min": round_2(value_min_min), "min_time": str(value_min_time_data) if value_min_time_data else "", "avg": round_2(value_avg_data), }) return adio_index