output_result.py 13.3 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
import json
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import groupby
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.settings import SETTING
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import POINT_1MIN_INDEX, PRODUCT, ExtendModule
from unify_api.modules.common.procedures.common_cps import point_day2month
from unify_api.modules.common.service.td_engine_service import \
    get_td_engine_data
from unify_api.modules.shidianu.components.algorithm_cps import (
    AlgorithmOutput,
    RunPeriodtem,
    ElectricActionItem,
    RunTimeItem,
)
from unify_api.modules.common.procedures.points import get_meter_by_point
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.shidianu.procedures.power_param import power_p
from unify_api.modules.shidianu.service.IntergratePipe import hisPieceFind
ZZH's avatar
ZZH committed
23 24
from unify_api.modules.shidianu.service.electricProportion import \
    electricProportion
lcn's avatar
lcn committed
25 26 27 28
import numpy as np
import pandas as pd
import asyncio

ZZH's avatar
ZZH committed
29 30 31 32 33 34 35
from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, \
    params_mining
from unify_api.modules.users.procedures.user_product_auth import \
    get_product_auth
from unify_api.utils.time_format import (
    last_n_day, srv_time, get_day_start, YMD_Hms
)
36
from unify_api.utils.taos_new import parse_td_columns
lcn's avatar
lcn committed
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145


async def output_result(sid, meter_sn, date_time):
    """识电u,算法识别结果"""
    sn = meter_sn.lower()
    # 1. 根据sid+meter_sn查询sdu_power_param表
    sql = "select * from sdu_power_param where sid = %s and meter_sn = %s"
    async with MysqlUtil() as conn:
        param_res = await conn.fetchone(sql=sql, args=(sid, meter_sn))
    # 2. 如果数据表没有生成好的"功率参数", 使用近半月功率数据计算得出"功率参数"
    if param_res:
        log.info(f"{sid}_{meter_sn}:功率参数已存在")
        power_param = [
            param_res["fridge"],
            param_res["air_conditioner"],
            param_res["calorifier"],
            param_res["rice_cooker"],
            param_res["induction_cooker"],
        ]
    else:
        log.info(f"{sid}_{meter_sn}:功率参数不存在, 开始使用历史数据计算")
        # 2.1 求最近7天日期
        date_time_list = last_n_day(date_time, 7)
        # 2.2 调算法,生成pa100-pa3000
        pa_list = await params_mining(date_time_list, sid, sn)
        # 2.3 调算法,生成功率参数power_param
        param_str = paramAnalysis(pa_list)
        log.info(f"历史数据计算得出参数功率:{param_str}")
        # 2.4 插入功率参数到sdu_power_param表
        param_list = param_str.split(",")
        insert_sql = (
            "INSERT INTO sdu_power_param (sid, meter_sn, is_demo, "
            "fridge, air_conditioner, calorifier, rice_cooker, "
            "induction_cooker, create_time) "
            "VALUES %s"
        )
        now_date, timestamp = srv_time()
        async with MysqlUtil() as conn:
            # 成功res返回1
            res = await conn.execute(
                insert_sql,
                args=(
                    (
                        sid,
                        meter_sn,
                        param_list[5],
                        param_list[0],
                        param_list[1],
                        param_list[2],
                        param_list[3],
                        param_list[4],
                        timestamp,
                    ),
                ),
            )
        param_list.pop()
        # list元素str,全部转换为int
        power_param = list(map(int, param_list))
    log.info(f"power_param:{power_param}")
    # 3. 准备算法输入数据
    # 3.1 查询es,获取查询日期功率数据dataframe
    p_list, each_data_list = await power_p(sid, sn, date_time)
    each_data = pd.DataFrame(each_data_list, columns=["time", "p", "i"])
    each_data = each_data.fillna(value=0.2)
    # 3.2 功率参数dataframe
    params = np.array(power_param)
    # 1. 设备识别算法
    result_dict, power_pa, day_time = hisPieceFind(params, each_data)
    # 2. 功率波动,电量分析,运行时长
    state_dict = electricProportion(
        stageComponent=result_dict["设备运行情况"],
        timestage=result_dict["运行时间段"],
        power=power_pa,
        timeAll=day_time,
    )
    return result_dict, state_dict, p_list


def get_runtime_list(state_dict):
    """
    获取
    @dataclass
    class RunTimeItem(Model):
        device: str = Str("设备").eg("冰箱")
        run_time: float = Float("运行时长, 单位小时").eg(1)
        pchange_times: float = Float("功率波动次数").eg(1)
        power: float = Float("用电 单位kwh").eg(100)
    组成的列表
    :param state_dict:
    :return:
    """
    runtime_list = []
    for rundev_desc, power_time_list in state_dict["电量与时长"].items():
        power_list, runtimes = power_time_list
        power_list = [i for i in power_list if str(i) != "nan"]
        runtimes = [i for i in power_list if str(i) != "nan"]
        rundev = rundev_desc.rstrip("运行")
        runtime_list.append(
            RunTimeItem(
                device=rundev,
                power=round(sum(power_list), 2),
                run_time=round(sum(runtimes), 2),
                pchange_times=state_dict["波动"].get(rundev, 0),
            )
        )
        runtime_list.sort(key=lambda x: x.run_time, reverse=True)
    return runtime_list


146
async def get_curve_p(mtid, meter_sn, start, end, time_slots):
lcn's avatar
lcn committed
147 148 149 150 151
    '''
        从tdenine中获取相关数据
    '''
    sn = meter_sn.lower()
    p_field = f"p{sn}"
wang.wenrong's avatar
wang.wenrong committed
152
    url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
153
    sql = f"select ts, max({p_field}) {p_field} from  mt{mtid}_ele " \
lcn's avatar
lcn committed
154 155 156 157 158 159
          f"where meter_sn='{str.upper(sn)}' " \
          f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
    status, results = await get_td_engine_data(url, sql)
    if not status:
        return []
    datas = {}
160
    head = parse_td_columns(results)
lcn's avatar
lcn committed
161
    for one in results.get('data'):
162
        data = dict(zip(head, one))
lcn's avatar
lcn committed
163 164 165 166
        slot = str(data['ts'])[11:16]
        datas[slot] = data[p_field]
    p_list = []
    for slot in time_slots:
ZZH's avatar
ZZH committed
167
        p = datas.get(slot, "")
lcn's avatar
lcn committed
168 169 170 171 172 173 174 175 176 177 178
        if p:
            p = round(p, 4) if type(p) in [int, float] else p
        p_list.append(p)
    return p_list


def time_interval_class(item_obj):
    time_str = item_obj.action_time
    return time_str


ZZH's avatar
ZZH committed
179 180
async def algorithm_result_to_front(pid, req_date, user_id, product, detail):
    meter_info = await get_meter_by_point(pid)
lcn's avatar
lcn committed
181 182
    if not meter_info:
        raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!")
ZZH's avatar
ZZH committed
183

lcn's avatar
lcn committed
184
    sid, meter_no = meter_info["sid"], meter_info["meter_no"]
ZZH's avatar
ZZH committed
185
    mtid = meter_info["mtid"]
lcn's avatar
lcn committed
186 187
    dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S")
    time_slot = [
ZZH's avatar
ZZH committed
188 189
        datetime.strftime(dt + timedelta(minutes=i),
                          "%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
lcn's avatar
lcn committed
190 191
        for i in range(1440)
    ]
ZZH's avatar
ZZH committed
192 193 194 195

    s_dt = get_day_start(req_date, dt_fmt="YYYY-MM-DD")
    s_dts, e_dts = s_dt.format(YMD_Hms), s_dt.add(days=1).format(YMD_Hms)
    p_list = await get_curve_p(mtid, meter_no, s_dts, e_dts, time_slot)
lcn's avatar
lcn committed
196 197
    async with MysqlUtil() as conn:
        sql = "select * from sdu_dev_run_anal where pid=%s and cal_day=%s"
ZZH's avatar
ZZH committed
198
        res = await conn.fetchone(sql, args=(pid, req_date))
lcn's avatar
lcn committed
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
        if not res:
            return AlgorithmOutput(
                time_slot=time_slot,
                p_slot=p_list,
                electric_actions=[],
                run_period_list=[],
                runtime_list=[],
                electric_action_groups=[],
            )
        action_list, action_time_list = [], []
        """
        {
            "ele_overload": [],
            "power_quality_low": [], 
            "high_power_app": [["2020-12-22 00:01:46", [["\u7535\u5439\u98ce+\u7535\u78c1\u7089", 1]]], ["2020-12-22 02:26:41", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 19:49:43", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 20:23:35", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 21:40:20", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 22:42:27", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:31:41", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 23:45:49", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:49:21", [["\u5fae\u6ce2\u7089", 1]]]], "ele_car_battery": [], "illegal_ele_app": [["2020-12-22 02:45:37", [["\u70ed\u5f97\u5feb", 1]]], ["2020-12-22 23:43:48", [["\u70ed\u5f97\u5feb", 1]]]]
        }
        """
        event_name_map = {
            "ele_overload": "用电过载",
            "high_power_app": "大功率电器",
            "illegal_ele_app": "违规电器",
            "ele_car_battery": "违规电器",
            "power_quality_low": "用电质量低",
        }
        event_type_map = {
            "ele_overload": set(),
            "power_quality_low": set(),
            "high_power_app": set(),
            "illegal_ele_app": set(),
            "ele_car_battery": set(),
        }

        # 根据配置扩展模块取不同的算法结果字段
        async with MysqlUtil() as conn:
wang.wenrong's avatar
wang.wenrong committed
233
            point_sql = "select pid, cid from point where pid=%s"
ZZH's avatar
ZZH committed
234
            point_map = await conn.fetchone(point_sql, args=(pid,))
lcn's avatar
lcn committed
235 236
        if not point_map:
            raise BusinessException(message="没有该监测点的信息,请联系运维人员!")
wang.wenrong's avatar
wang.wenrong committed
237
        cid = point_map["cid"]
lcn's avatar
lcn committed
238 239 240 241 242
        # 2.调用函数获取到用户信息
        product_auth_map = await get_product_auth(user_id, product)
        cid_ext_module_map = product_auth_map["product"]
        if str(cid) not in cid_ext_module_map:
            log.error(f"用户user_id = {user_id} 工厂cid={cid}的权限")
ZZH's avatar
ZZH committed
243 244
            raise BusinessException(
                message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限")
lcn's avatar
lcn committed
245 246

        act_info = json.loads(res["act_info"]) if res["act_info"] else {}
ZZH's avatar
ZZH committed
247 248
        if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[
            str(cid)] and detail == 1:
lcn's avatar
lcn committed
249 250 251 252 253 254 255 256 257 258 259 260
            # 如果识电U配置了2那么给权限看新字段(内部人看的详细识别数据)
            act_info = json.loads(res["act_info2"]) if res["act_info2"] else {}
        for event_type, event_list in act_info.items():
            for i in event_list:
                if event_type in event_type_map:
                    for item in i[1]:
                        event_type_map[event_type].add(item[0])
                action_time_list.extend([i[0]] * len(i[1]))
                for item in i[1]:
                    action_list.append(item[0])

        result_dict = {
ZZH's avatar
ZZH committed
261 262 263 264
            "设备运行情况": json.loads(res["dev_run_info"]) if res[
                                                             "dev_run_info"] is not None else [],
            "运行时间段": json.loads(res["dev_run_tp"]) if res[
                                                          "dev_run_tp"] is not None else [],
lcn's avatar
lcn committed
265 266 267 268
            "行为列表": action_list,
            "行为时间": action_time_list,
        }
        state_dict = {
ZZH's avatar
ZZH committed
269 270 271 272
            "电量与时长": json.loads(res["ele_quan_dur"]) if res[
                                                            "ele_quan_dur"] is not None else {},
            "波动": json.loads(res["dev_wave"]) if res[
                                                     "dev_wave"] is not None else {},
lcn's avatar
lcn committed
273 274 275 276 277
        }
    log.info(f"算法结果 result_dict={result_dict} state_dict={state_dict}")

    electric_action_list = [
        ElectricActionItem(
ZZH's avatar
ZZH committed
278 279
            action_name=action_name,
            action_time=result_dict["行为时间"][index].split(" ")[1][:5],
lcn's avatar
lcn committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
        )
        for index, action_name in enumerate(result_dict["行为列表"])
    ]

    action_groups = groupby(electric_action_list, key=time_interval_class)

    electric_action_group_map = defaultdict(set)
    for time_interval_str, electric_actions in action_groups:
        for i in electric_actions:
            for _event_type, event_set in event_type_map.items():
                if i.action_name in event_set:
                    electric_action_group_map[time_interval_str].add(
                        f"{event_name_map[_event_type]}{i.action_name}"
                        if i.action_name not in ["大功率设备", "正常电器"]
                        else i.action_name
                    )

    electric_action_groups = []
    for i in time_slot:
        if i not in electric_action_group_map:
            electric_action_groups.append([])
        else:
            electric_action_groups.append(list(electric_action_group_map[i]))

    return AlgorithmOutput(
        time_slot=time_slot,
        p_slot=p_list,
        electric_actions=electric_action_list,
        run_period_list=[
            RunPeriodtem(
                running_devices=running_devices,
ZZH's avatar
ZZH committed
311 312
                time_period=[i.split(" ")[1][:5] for i in
                             result_dict["运行时间段"][index]],
lcn's avatar
lcn committed
313 314 315 316 317 318 319 320 321
            )
            for index, running_devices in enumerate(result_dict["设备运行情况"])
        ],
        runtime_list=get_runtime_list(state_dict),
        electric_action_groups=electric_action_groups,
    )


async def main():
ZZH's avatar
ZZH committed
322 323
    result_dict, state_dict, p_list = await output_result("A2004000192", "A",
                                                          "2020-12-02")
lcn's avatar
lcn committed
324 325 326 327 328 329 330
    print(result_dict)
    print(state_dict)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())