# -*- coding:utf-8 -*- import json import time import pendulum from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.sanic_api import summary, description, examples from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.settings import SETTING from pot_libs.logger import log from unify_api.modules.adio.service.adio_card import adio_index_service from unify_api.utils import time_format from unify_api.utils.time_format import CST, YMD_Hms, timestamp2dts from unify_api import constants from unify_api.modules.adio.components.adio import ( AdioHistoryResponse, AdioCurrentResponse, AdioIndexResponse, AdioIndex, adio_index_example, adio_history_example, AdioHistory, AdioCurrent, adio_current_example, ) from pot_libs.common.components.query import PageRequest from unify_api.modules.adio.dao.adio_dao import get_location_dao @summary("返回安全监测历史曲线") @description("包含温度曲线和漏电流曲线,取最大值") @examples(adio_history_example) async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse: # 1.获取intervel和时间轴 try: date_start = body.filter.ranges[0].start date_end = body.filter.ranges[0].end except: log.error("para error, ranges is NULL") return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[]) try: # 形如 interval = 900 slots=['00:00', '00:15', '00:30' intervel, slots = time_format.time_pick_transf(date_start, date_end) except: log.error("para error, date format error") return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[]) try: location_group = body.filter.in_groups[0].group except: log.warning("para exception, in_groups is NULL, no location_id") return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots) if not location_group: log.warning("para exception, in_groups is NULL, no location_id") return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots) # 3.获取温度曲线和漏电流曲线数据 # 动态漏电流阈值 sql = "select threshold from soe_config_record where lid in %s " \ "and etype = %s limit 1" async with MysqlUtil() as conn: settings = await conn.fetchall(sql, args=(tuple(location_group), "overResidualCurrent")) residual_current_threhold = settings[0]["threshold"] if settings else 30 if intervel == 24 * 3600: table_name = "location_1day_aiao" date_format = "%%m-%%d" else: table_name = "location_15min_aiao" date_format = "%%H:%%i" sql = f"SELECT lid,value_max,DATE_FORMAT(create_time, '{date_format}') " \ f"date_time,ad_field FROM {table_name} WHERE lid in %s and " \ f"create_time BETWEEN '{date_start}' and '{date_end}' " \ f"order by create_time" async with MysqlUtil() as conn: datas = await conn.fetchall(sql, args=(location_group,)) type_name = { "residual_current": "漏电流", "temp1": "A相", "temp2": "B相", "temp3": "C相", "temp4": "N线", } location_info, location_type_name = {}, {} temp_list, residual_currents = [], [] for lid in location_group: location_info[lid] = ["" for _ in slots] for data in datas: lid = data.get("lid") value = data.get("value_max") mid_slot = data.get("date_time") if lid not in location_type_name.keys(): ad_field = data.get("ad_field") location_type_name[lid] = ad_field slot_index = slots.index(mid_slot) location_info[lid][slot_index] = value for key, value in location_info.items(): item_name = location_type_name.get(key) item = type_name.get(item_name) adio_his = AdioHistory(item=item, value_slots=value) if item_name == "residual_current": adio_his.threhold = residual_current_threhold residual_currents.append(adio_his) else: temp_list.append(adio_his) return AdioHistoryResponse( temperature=temp_list, residual_current=residual_currents, time_slots=slots ) @summary("返回安全监测实时参数") @description("包含温度和漏电流") @examples(adio_current_example) async def post_adio_current(req, body: PageRequest) -> AdioCurrentResponse: try: in_group = body.filter.in_groups[0] except: log.warning("para exception, in_groups is NULL, no location_id") return AdioCurrentResponse(temperature=[], residual_current=[]) # location_ids location_group = in_group.group if not location_group: log.warning("para exception, in_groups is NULL, no location_id") return AdioCurrentResponse(temperature=[], residual_current=[]) # 读取location表信息 location_info = await get_location_dao(location_group) if not location_info: log.warning("location_id error location_info empty") return AdioCurrentResponse(temperature=[], residual_current=[]) # load real time adio lids = list(location_info.keys()) prefix = f"real_time:adio:{SETTING.mysql_db}" keys = [f"{prefix}:{lid}" for lid in lids] rt_rlts = await RedisUtils().mget(keys) rt_adios = [json.loads(r) for r in rt_rlts if r] d_rt_adio = {adio["lid"]: adio for adio in rt_adios} if not d_rt_adio: return AdioCurrentResponse(temperature=[], residual_current=[]) temp_lst = [] rc_lst = [] for lid, loc_info in location_info.items(): if lid not in d_rt_adio: ts = pendulum.now(tz=CST).int_timestamp ad_value = "" else: ts, ad_value = d_rt_adio[lid]["ts"], d_rt_adio[lid]["v"] time_str = timestamp2dts(ts, YMD_Hms) if loc_info.get("type") == "residual_current": adio_current = AdioCurrent(type="residual_current", item="漏电流", real_time=time_str, value=ad_value) rc_lst.append(adio_current) else: adio_current = AdioCurrent(type="temperature", item=loc_info.get("item", ""), real_time=time_str, value=ad_value, ) temp_lst.append(adio_current) return AdioCurrentResponse(temperature=temp_lst, residual_current=rc_lst) @summary("返回安全监测实时参数(老的)") @description("包含温度和漏电流(老的)") @examples(adio_current_example) async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse: try: in_group = body.filter.in_groups[0] except: log.warning("para exception, in_groups is NULL, no location_id") return AdioCurrentResponse(temperature=[], residual_current=[]) # location_ids location_group = in_group.group if not location_group: log.warning("para exception, in_groups is NULL, no location_id") return AdioCurrentResponse(temperature=[], residual_current=[]) # 读取location表信息 location_info = await get_location_dao(location_group) temperature = [] residual_current = [] for location_id, item_info in location_info.items(): try: adio_info = await RedisUtils().hget("adio_current", location_id) if adio_info: adio_info = json.loads(adio_info) except Exception: log.error("redis error") return AdioCurrentResponse().db_error() time_now = int(time.time()) if adio_info: real_tt = adio_info.get("timestamp", 0) if (time_now - real_tt) <= constants.REAL_EXP_TIME: adio_value = round(adio_info.get("value", 0), 2) else: adio_value = "" # 超过4小时的值直接丢弃 time_str = time_format.get_datetime_str(real_tt) else: adio_value = "" time_str = time_format.get_datetime_str(time_now) if item_info.get("type") == "residual_current": adio_current = AdioCurrent( type="residual_current", item="漏电流", real_time=time_str, value=adio_value ) residual_current.append(adio_current) else: adio_current = AdioCurrent( type="temperature", item=item_info.get("item", ""), real_time=time_str, value=adio_value, ) temperature.append(adio_current) return AdioCurrentResponse(temperature=temperature, residual_current=residual_current) @summary("返回安全监测指标统计") @description("温度和漏电流的最高、最低、平均值") @examples(adio_index_example) async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse: try: location_group = body.filter.in_groups[0].group except: log.warning("para exception, in_groups is NULL, no location_id") return AdioIndexResponse(adio_indexes=[]) if not location_group: log.warning("para exception, in_groups is NULL, no location_id") return AdioIndexResponse(adio_indexes=[]) # 获取时间 try: start = body.filter.ranges[0].start end = body.filter.ranges[0].end except: log.error("para error, ranges is NULL") return AdioIndexResponse(adio_indexes=[]) adio_indexes = await adio_index_service(location_group, start, end) return AdioIndexResponse(adio_indexes=adio_indexes)