output_result.py 13.3 KB
import json
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import groupby
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.settings import SETTING
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import POINT_1MIN_INDEX, PRODUCT, ExtendModule
from unify_api.modules.common.procedures.common_cps import point_day2month
from unify_api.modules.common.service.td_engine_service import \
    get_td_engine_data
from unify_api.modules.shidianu.components.algorithm_cps import (
    AlgorithmOutput,
    RunPeriodtem,
    ElectricActionItem,
    RunTimeItem,
)
from unify_api.modules.common.procedures.points import get_meter_by_point
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.shidianu.procedures.power_param import power_p
from unify_api.modules.shidianu.service.IntergratePipe import hisPieceFind
from unify_api.modules.shidianu.service.electricProportion import \
    electricProportion
import numpy as np
import pandas as pd
import asyncio

from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, \
    params_mining
from unify_api.modules.users.procedures.user_product_auth import \
    get_product_auth
from unify_api.utils.time_format import (
    last_n_day, srv_time, get_day_start, YMD_Hms
)
from unify_api.utils.taos_new import parse_td_columns


async def output_result(sid, meter_sn, date_time):
    """识电u,算法识别结果"""
    sn = meter_sn.lower()
    # 1. 根据sid+meter_sn查询sdu_power_param表
    sql = "select * from sdu_power_param where sid = %s and meter_sn = %s"
    async with MysqlUtil() as conn:
        param_res = await conn.fetchone(sql=sql, args=(sid, meter_sn))
    # 2. 如果数据表没有生成好的"功率参数", 使用近半月功率数据计算得出"功率参数"
    if param_res:
        log.info(f"{sid}_{meter_sn}:功率参数已存在")
        power_param = [
            param_res["fridge"],
            param_res["air_conditioner"],
            param_res["calorifier"],
            param_res["rice_cooker"],
            param_res["induction_cooker"],
        ]
    else:
        log.info(f"{sid}_{meter_sn}:功率参数不存在, 开始使用历史数据计算")
        # 2.1 求最近7天日期
        date_time_list = last_n_day(date_time, 7)
        # 2.2 调算法,生成pa100-pa3000
        pa_list = await params_mining(date_time_list, sid, sn)
        # 2.3 调算法,生成功率参数power_param
        param_str = paramAnalysis(pa_list)
        log.info(f"历史数据计算得出参数功率:{param_str}")
        # 2.4 插入功率参数到sdu_power_param表
        param_list = param_str.split(",")
        insert_sql = (
            "INSERT INTO sdu_power_param (sid, meter_sn, is_demo, "
            "fridge, air_conditioner, calorifier, rice_cooker, "
            "induction_cooker, create_time) "
            "VALUES %s"
        )
        now_date, timestamp = srv_time()
        async with MysqlUtil() as conn:
            # 成功res返回1
            res = await conn.execute(
                insert_sql,
                args=(
                    (
                        sid,
                        meter_sn,
                        param_list[5],
                        param_list[0],
                        param_list[1],
                        param_list[2],
                        param_list[3],
                        param_list[4],
                        timestamp,
                    ),
                ),
            )
        param_list.pop()
        # list元素str,全部转换为int
        power_param = list(map(int, param_list))
    log.info(f"power_param:{power_param}")
    # 3. 准备算法输入数据
    # 3.1 查询es,获取查询日期功率数据dataframe
    p_list, each_data_list = await power_p(sid, sn, date_time)
    each_data = pd.DataFrame(each_data_list, columns=["time", "p", "i"])
    each_data = each_data.fillna(value=0.2)
    # 3.2 功率参数dataframe
    params = np.array(power_param)
    # 1. 设备识别算法
    result_dict, power_pa, day_time = hisPieceFind(params, each_data)
    # 2. 功率波动,电量分析,运行时长
    state_dict = electricProportion(
        stageComponent=result_dict["设备运行情况"],
        timestage=result_dict["运行时间段"],
        power=power_pa,
        timeAll=day_time,
    )
    return result_dict, state_dict, p_list


def get_runtime_list(state_dict):
    """
    获取
    @dataclass
    class RunTimeItem(Model):
        device: str = Str("设备").eg("冰箱")
        run_time: float = Float("运行时长, 单位小时").eg(1)
        pchange_times: float = Float("功率波动次数").eg(1)
        power: float = Float("用电 单位kwh").eg(100)
    组成的列表
    :param state_dict:
    :return:
    """
    runtime_list = []
    for rundev_desc, power_time_list in state_dict["电量与时长"].items():
        power_list, runtimes = power_time_list
        power_list = [i for i in power_list if str(i) != "nan"]
        runtimes = [i for i in power_list if str(i) != "nan"]
        rundev = rundev_desc.rstrip("运行")
        runtime_list.append(
            RunTimeItem(
                device=rundev,
                power=round(sum(power_list), 2),
                run_time=round(sum(runtimes), 2),
                pchange_times=state_dict["波动"].get(rundev, 0),
            )
        )
        runtime_list.sort(key=lambda x: x.run_time, reverse=True)
    return runtime_list


async def get_curve_p(mtid, meter_sn, start, end, time_slots):
    '''
        从tdenine中获取相关数据
    '''
    sn = meter_sn.lower()
    p_field = f"p{sn}"
    url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
    sql = f"select ts, max({p_field}) {p_field} from  mt{mtid}_ele " \
          f"where meter_sn='{str.upper(sn)}' " \
          f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
    status, results = await get_td_engine_data(url, sql)
    if not status:
        return []
    datas = {}
    head = parse_td_columns(results)
    for one in results.get('data'):
        data = dict(zip(head, one))
        slot = str(data['ts'])[11:16]
        datas[slot] = data[p_field]
    p_list = []
    for slot in time_slots:
        p = datas.get(slot, "")
        if p:
            p = round(p, 4) if type(p) in [int, float] else p
        p_list.append(p)
    return p_list


def time_interval_class(item_obj):
    time_str = item_obj.action_time
    return time_str


async def algorithm_result_to_front(pid, req_date, user_id, product, detail):
    meter_info = await get_meter_by_point(pid)
    if not meter_info:
        raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!")

    sid, meter_no = meter_info["sid"], meter_info["meter_no"]
    mtid = meter_info["mtid"]
    dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S")
    time_slot = [
        datetime.strftime(dt + timedelta(minutes=i),
                          "%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
        for i in range(1440)
    ]

    s_dt = get_day_start(req_date, dt_fmt="YYYY-MM-DD")
    s_dts, e_dts = s_dt.format(YMD_Hms), s_dt.add(days=1).format(YMD_Hms)
    p_list = await get_curve_p(mtid, meter_no, s_dts, e_dts, time_slot)
    async with MysqlUtil() as conn:
        sql = "select * from sdu_dev_run_anal where pid=%s and cal_day=%s"
        res = await conn.fetchone(sql, args=(pid, req_date))
        if not res:
            return AlgorithmOutput(
                time_slot=time_slot,
                p_slot=p_list,
                electric_actions=[],
                run_period_list=[],
                runtime_list=[],
                electric_action_groups=[],
            )
        action_list, action_time_list = [], []
        """
        {
            "ele_overload": [],
            "power_quality_low": [], 
            "high_power_app": [["2020-12-22 00:01:46", [["\u7535\u5439\u98ce+\u7535\u78c1\u7089", 1]]], ["2020-12-22 02:26:41", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 19:49:43", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 20:23:35", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 21:40:20", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 22:42:27", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:31:41", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 23:45:49", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:49:21", [["\u5fae\u6ce2\u7089", 1]]]], "ele_car_battery": [], "illegal_ele_app": [["2020-12-22 02:45:37", [["\u70ed\u5f97\u5feb", 1]]], ["2020-12-22 23:43:48", [["\u70ed\u5f97\u5feb", 1]]]]
        }
        """
        event_name_map = {
            "ele_overload": "用电过载",
            "high_power_app": "大功率电器",
            "illegal_ele_app": "违规电器",
            "ele_car_battery": "违规电器",
            "power_quality_low": "用电质量低",
        }
        event_type_map = {
            "ele_overload": set(),
            "power_quality_low": set(),
            "high_power_app": set(),
            "illegal_ele_app": set(),
            "ele_car_battery": set(),
        }

        # 根据配置扩展模块取不同的算法结果字段
        async with MysqlUtil() as conn:
            point_sql = "select pid, cid from point where pid=%s"
            point_map = await conn.fetchone(point_sql, args=(pid,))
        if not point_map:
            raise BusinessException(message="没有该监测点的信息,请联系运维人员!")
        cid = point_map["cid"]
        # 2.调用函数获取到用户信息
        product_auth_map = await get_product_auth(user_id, product)
        cid_ext_module_map = product_auth_map["product"]
        if str(cid) not in cid_ext_module_map:
            log.error(f"用户user_id = {user_id} 工厂cid={cid}的权限")
            raise BusinessException(
                message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限")

        act_info = json.loads(res["act_info"]) if res["act_info"] else {}
        if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[
            str(cid)] and detail == 1:
            # 如果识电U配置了2那么给权限看新字段(内部人看的详细识别数据)
            act_info = json.loads(res["act_info2"]) if res["act_info2"] else {}
        for event_type, event_list in act_info.items():
            for i in event_list:
                if event_type in event_type_map:
                    for item in i[1]:
                        event_type_map[event_type].add(item[0])
                action_time_list.extend([i[0]] * len(i[1]))
                for item in i[1]:
                    action_list.append(item[0])

        result_dict = {
            "设备运行情况": json.loads(res["dev_run_info"]) if res[
                                                             "dev_run_info"] is not None else [],
            "运行时间段": json.loads(res["dev_run_tp"]) if res[
                                                          "dev_run_tp"] is not None else [],
            "行为列表": action_list,
            "行为时间": action_time_list,
        }
        state_dict = {
            "电量与时长": json.loads(res["ele_quan_dur"]) if res[
                                                            "ele_quan_dur"] is not None else {},
            "波动": json.loads(res["dev_wave"]) if res[
                                                     "dev_wave"] is not None else {},
        }
    log.info(f"算法结果 result_dict={result_dict} state_dict={state_dict}")

    electric_action_list = [
        ElectricActionItem(
            action_name=action_name,
            action_time=result_dict["行为时间"][index].split(" ")[1][:5],
        )
        for index, action_name in enumerate(result_dict["行为列表"])
    ]

    action_groups = groupby(electric_action_list, key=time_interval_class)

    electric_action_group_map = defaultdict(set)
    for time_interval_str, electric_actions in action_groups:
        for i in electric_actions:
            for _event_type, event_set in event_type_map.items():
                if i.action_name in event_set:
                    electric_action_group_map[time_interval_str].add(
                        f"{event_name_map[_event_type]}{i.action_name}"
                        if i.action_name not in ["大功率设备", "正常电器"]
                        else i.action_name
                    )

    electric_action_groups = []
    for i in time_slot:
        if i not in electric_action_group_map:
            electric_action_groups.append([])
        else:
            electric_action_groups.append(list(electric_action_group_map[i]))

    return AlgorithmOutput(
        time_slot=time_slot,
        p_slot=p_list,
        electric_actions=electric_action_list,
        run_period_list=[
            RunPeriodtem(
                running_devices=running_devices,
                time_period=[i.split(" ")[1][:5] for i in
                             result_dict["运行时间段"][index]],
            )
            for index, running_devices in enumerate(result_dict["设备运行情况"])
        ],
        runtime_list=get_runtime_list(state_dict),
        electric_action_groups=electric_action_groups,
    )


async def main():
    result_dict, state_dict, p_list = await output_result("A2004000192", "A",
                                                          "2020-12-02")
    print(result_dict)
    print(state_dict)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())