adio.py 11.2 KB
# -*- coding:utf-8 -*-
#
# Author:jing
# Date: 2020/7/9
import json
import time
from datetime import datetime
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.es_util.es_query import EsQuery
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.sanic_api import summary, description, examples
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
from unify_api.utils import time_format
from unify_api import constants
from unify_api.utils.common_utils import round_2
from unify_api.modules.adio.components.adio import (
    AdioHistoryResponse,
    AdioCurrentResponse,
    AdioIndexResponse,
    AdioIndex,
    adio_index_example,
    adio_history_example,
    AdioHistory,
    AdioCurrent,
    adio_current_example,
)
from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from unify_api.modules.adio.dao.adio_dao import get_location_dao, \
    get_location_15min_dao, get_adio_current_data


@summary("返回安全监测历史曲线")
@description("包含温度曲线和漏电流曲线,取最大值")
@examples(adio_history_example)
async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse:
    #  1.获取intervel和时间轴
    try:
        date_start = body.filter.ranges[0].start
        date_end = body.filter.ranges[0].end
    except:
        log.error("para error, ranges is NULL")
        return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[])

    try:
        # 形如 interval = 900 slots=['00:00', '00:15', '00:30'
        intervel, slots = time_format.time_pick_transf(date_start, date_end)
    except:
        log.error("para error, date format error")
        return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[])

    try:
        location_group = body.filter.in_groups[0].group
    except:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots)

    if not location_group:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots)

    # 3.获取温度曲线和漏电流曲线数据
    # 动态漏电流阈值
    sql = "select threshold from soe_config_record where lid in %s " \
          "and etype = %s limit 1"
    async with MysqlUtil() as conn:
        settings = await conn.fetchall(sql, args=(tuple(location_group),
                                                  "overResidualCurrent"))
    residual_current_threhold = settings[0]["threshold"] if settings else 30
    if intervel == 24 * 3600:
        table_name = "location_1day_aiao"
        date_format = "%%m-%%d"
    else:
        table_name = "location_15min_aiao"
        date_format = "%%H:%%i"
    sql = f"SELECT lid,value_max,DATE_FORMAT(create_time, '{date_format}') " \
          f"date_time,ad_field FROM {table_name} WHERE lid in %s and " \
          f"create_time BETWEEN '{date_start}' and '{date_end}' " \
          f"order by create_time"
    async with MysqlUtil() as conn:
        datas = await conn.fetchall(sql, args=(location_group, ))
    type_name = {
        "residual_current": "漏电流", "temp1": "A相", "temp2": "B相",
        "temp3": "C相", "temp4": "N线",
    }
    location_info, location_type_name = {}, {}
    temp_list, residual_currents = [], []
    for lid in location_group:
        location_info[lid] = ["" for _ in slots]
    for data in datas:
        lid = data.get("lid")
        value = data.get("value_max")
        mid_slot = data.get("date_time")
        if lid not in location_type_name.keys():
            ad_field = data.get("ad_field")
            location_type_name[lid] = ad_field
        slot_index = slots.index(mid_slot)
        location_info[lid][slot_index] = value
    for key, value in location_info.items():
        item_name = location_type_name.get(key)
        item = type_name.get(item_name)
        adio_his = AdioHistory(item=item, value_slots=value)
        if item_name == "residual_current":
            adio_his.threhold = residual_current_threhold
            residual_currents.append(adio_his)
        else:
            temp_list.append(adio_his)
    return AdioHistoryResponse(
        temperature=temp_list, residual_current=residual_currents,
        time_slots=slots
    )


@summary("返回安全监测实时参数")
@description("包含温度和漏电流")
@examples(adio_current_example)
async def post_adio_current(req, body: PageRequest) -> AdioCurrentResponse:
    try:
        in_group = body.filter.in_groups[0]
    except:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioCurrentResponse(temperature=[], residual_current=[])
    
    # location_ids
    location_group = in_group.group
    if not location_group:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioCurrentResponse(temperature=[], residual_current=[])
    
    # 读取location表信息
    location_info = await get_location_dao(location_group)
    if not location_info:
        log.warning("location_id error location_info empty")
        return AdioCurrentResponse(temperature=[], residual_current=[])
    # 获取mtid信息
    mtid = list(location_info.values())[0]['mtid']
    # 读取tdengine里面的数据
    aido_data = await get_adio_current_data(mtid)
    if not aido_data:
        return AdioCurrentResponse(temperature=[], residual_current=[])
    temperature = []
    residual_current = []
    trans_field = {"A相": "temp1", "B相": "temp2", "C相": "temp3",
                   "N线": "temp4"}
    for location_id, item_info in location_info.items():
        time_str = aido_data.get('ts')[:19]
        item = item_info.get("item", "")
        if item_info.get("type") == "residual_current":
            adio_current = AdioCurrent(
                type="residual_current",
                item="漏电流",
                real_time=time_str,
                value=aido_data.get("residual_current")
            )
            residual_current.append(adio_current)
        else:
            type_filed = trans_field.get(item)
            adio_current = AdioCurrent(
                type="temperature",
                item=item,
                real_time=time_str,
                value=aido_data.get(type_filed),
            )
            temperature.append(adio_current)
    
    return AdioCurrentResponse(temperature=temperature,
                               residual_current=residual_current)


@summary("返回安全监测实时参数(老的)")
@description("包含温度和漏电流(老的)")
@examples(adio_current_example)
async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse:
    try:
        in_group = body.filter.in_groups[0]
    except:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioCurrentResponse(temperature=[], residual_current=[])

    # location_ids
    location_group = in_group.group
    if not location_group:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioCurrentResponse(temperature=[], residual_current=[])

    # 读取location表信息
    location_info = await get_location_dao(location_group)
    temperature = []
    residual_current = []
    for location_id, item_info in location_info.items():
        try:
            adio_info = await RedisUtils().hget("adio_current", location_id)
            if adio_info:
                adio_info = json.loads(adio_info)
        except Exception:
            log.error("redis error")
            return AdioCurrentResponse().db_error()

        time_now = int(time.time())
        if adio_info:
            real_tt = adio_info.get("timestamp", 0)
            if (time_now - real_tt) <= constants.REAL_EXP_TIME:
                adio_value = round(adio_info.get("value", 0), 2)
            else:
                adio_value = ""  # 超过4小时的值直接丢弃
            time_str = time_format.get_datetime_str(real_tt)
        else:
            adio_value = ""
            time_str = time_format.get_datetime_str(time_now)

        if item_info.get("type") == "residual_current":
            adio_current = AdioCurrent(
                type="residual_current", item="漏电流", real_time=time_str, value=adio_value
            )
            residual_current.append(adio_current)
        else:
            adio_current = AdioCurrent(
                type="temperature",
                item=item_info.get("item", ""),
                real_time=time_str,
                value=adio_value,
            )
            temperature.append(adio_current)

    return AdioCurrentResponse(temperature=temperature, residual_current=residual_current)


@summary("返回安全监测指标统计")
@description("温度和漏电流的最高、最低、平均值")
@examples(adio_index_example)
async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse:
    try:
        location_group = body.filter.in_groups[0].group
    except:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioIndexResponse(adio_indexes=[])

    if not location_group:
        log.warning("para exception, in_groups is NULL, no location_id")
        return AdioIndexResponse(adio_indexes=[])

    # # load location表信息
    location_info = await get_location_dao(location_group)

    # 获取时间
    try:
        start = body.filter.ranges[0].start
        end = body.filter.ranges[0].end
    except:
        log.error("para error, ranges is NULL")
        return AdioIndexResponse(adio_indexes=[])
    adio_indexes = []
    for lid in location_group:
        value_max, value_min, value_avg = [], [], []
        value_max_time, value_min_time = [], []
        datas = await get_location_15min_dao(lid, start, end)
        for data in datas:
            value_max.append(data.get("value_max"))
            value_min.append(data.get("value_min"))
            value_avg.append(data.get("value_avg"))
            value_max_time.append(data.get("value_max_time"))
            value_min_time.append(data.get("value_min_time"))
        if value_max:
            value_max_max = max([m for m in value_max])
            value_max_max_index = value_max.index(value_max_max)
            value_max_time_data = value_max_time[value_max_max_index]
        else:
            value_max_max, value_max_time_data = "", ""
        if value_min:
            value_min_min = min([m for m in value_min])
            value_min_min_index = value_min.index(value_min_min)
            value_min_time_data = value_min_time[value_min_min_index]
        else:
            value_min_min, value_min_time_data = "", ""
        value_avg_list = [m for m in value_avg]
        value_avg_data = sum(value_avg_list)/len(value_avg_list) \
            if value_avg_list else 0
        adio_index = AdioIndex(
            type=location_info[lid]["type"],
            item=location_info[lid]["item"],
            max=round_2(value_max_max),
            max_time=str(value_max_time_data) if value_max_time_data else "",
            min=round_2(value_min_min),
            min_time=str(value_min_time_data) if value_min_time_data else "",
            avg=round_2(value_avg_data),
        )
        adio_indexes.append(adio_index)
    return AdioIndexResponse(adio_indexes=adio_indexes)