import json from collections import defaultdict from datetime import datetime, timedelta from itertools import groupby from pot_libs.es_util.es_utils import EsUtil from pot_libs.logger import log from pot_libs.settings import SETTING from pot_libs.utils.exc_util import BusinessException from unify_api.constants import POINT_1MIN_INDEX, PRODUCT, ExtendModule from unify_api.modules.common.procedures.common_cps import point_day2month from unify_api.modules.common.service.td_engine_service import \ get_td_engine_data from unify_api.modules.shidianu.components.algorithm_cps import ( AlgorithmOutput, RunPeriodtem, ElectricActionItem, RunTimeItem, ) from unify_api.modules.common.procedures.points import get_meter_by_point from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.modules.shidianu.procedures.power_param import power_p from unify_api.modules.shidianu.service.IntergratePipe import hisPieceFind from unify_api.modules.shidianu.service.electricProportion import \ electricProportion import numpy as np import pandas as pd import asyncio from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, \ params_mining from unify_api.modules.users.procedures.user_product_auth import \ get_product_auth from unify_api.utils.time_format import ( last_n_day, srv_time, get_day_start, YMD_Hms ) from unify_api.utils.taos_new import parse_td_columns async def output_result(sid, meter_sn, date_time): """识电u,算法识别结果""" sn = meter_sn.lower() # 1. 根据sid+meter_sn查询sdu_power_param表 sql = "select * from sdu_power_param where sid = %s and meter_sn = %s" async with MysqlUtil() as conn: param_res = await conn.fetchone(sql=sql, args=(sid, meter_sn)) # 2. 如果数据表没有生成好的"功率参数", 使用近半月功率数据计算得出"功率参数" if param_res: log.info(f"{sid}_{meter_sn}:功率参数已存在") power_param = [ param_res["fridge"], param_res["air_conditioner"], param_res["calorifier"], param_res["rice_cooker"], param_res["induction_cooker"], ] else: log.info(f"{sid}_{meter_sn}:功率参数不存在, 开始使用历史数据计算") # 2.1 求最近7天日期 date_time_list = last_n_day(date_time, 7) # 2.2 调算法,生成pa100-pa3000 pa_list = await params_mining(date_time_list, sid, sn) # 2.3 调算法,生成功率参数power_param param_str = paramAnalysis(pa_list) log.info(f"历史数据计算得出参数功率:{param_str}") # 2.4 插入功率参数到sdu_power_param表 param_list = param_str.split(",") insert_sql = ( "INSERT INTO sdu_power_param (sid, meter_sn, is_demo, " "fridge, air_conditioner, calorifier, rice_cooker, " "induction_cooker, create_time) " "VALUES %s" ) now_date, timestamp = srv_time() async with MysqlUtil() as conn: # 成功res返回1 res = await conn.execute( insert_sql, args=( ( sid, meter_sn, param_list[5], param_list[0], param_list[1], param_list[2], param_list[3], param_list[4], timestamp, ), ), ) param_list.pop() # list元素str,全部转换为int power_param = list(map(int, param_list)) log.info(f"power_param:{power_param}") # 3. 准备算法输入数据 # 3.1 查询es,获取查询日期功率数据dataframe p_list, each_data_list = await power_p(sid, sn, date_time) each_data = pd.DataFrame(each_data_list, columns=["time", "p", "i"]) each_data = each_data.fillna(value=0.2) # 3.2 功率参数dataframe params = np.array(power_param) # 1. 设备识别算法 result_dict, power_pa, day_time = hisPieceFind(params, each_data) # 2. 功率波动,电量分析,运行时长 state_dict = electricProportion( stageComponent=result_dict["设备运行情况"], timestage=result_dict["运行时间段"], power=power_pa, timeAll=day_time, ) return result_dict, state_dict, p_list def get_runtime_list(state_dict): """ 获取 @dataclass class RunTimeItem(Model): device: str = Str("设备").eg("冰箱") run_time: float = Float("运行时长, 单位小时").eg(1) pchange_times: float = Float("功率波动次数").eg(1) power: float = Float("用电 单位kwh").eg(100) 组成的列表 :param state_dict: :return: """ runtime_list = [] for rundev_desc, power_time_list in state_dict["电量与时长"].items(): power_list, runtimes = power_time_list power_list = [i for i in power_list if str(i) != "nan"] runtimes = [i for i in power_list if str(i) != "nan"] rundev = rundev_desc.rstrip("运行") runtime_list.append( RunTimeItem( device=rundev, power=round(sum(power_list), 2), run_time=round(sum(runtimes), 2), pchange_times=state_dict["波动"].get(rundev, 0), ) ) runtime_list.sort(key=lambda x: x.run_time, reverse=True) return runtime_list async def get_curve_p(mtid, meter_sn, start, end, time_slots): ''' 从tdenine中获取相关数据 ''' sn = meter_sn.lower() p_field = f"p{sn}" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai" sql = f"select ts, max({p_field}) {p_field} from mt{mtid}_ele " \ f"where meter_sn='{str.upper(sn)}' " \ f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)" status, results = await get_td_engine_data(url, sql) if not status: return [] datas = {} head = parse_td_columns(results) for one in results.get('data'): data = dict(zip(head, one)) slot = str(data['ts'])[11:16] datas[slot] = data[p_field] p_list = [] for slot in time_slots: p = datas.get(slot, "") if p: p = round(p, 4) if type(p) in [int, float] else p p_list.append(p) return p_list def time_interval_class(item_obj): time_str = item_obj.action_time return time_str async def algorithm_result_to_front(pid, req_date, user_id, product, detail): meter_info = await get_meter_by_point(pid) if not meter_info: raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!") sid, meter_no = meter_info["sid"], meter_info["meter_no"] mtid = meter_info["mtid"] dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S") time_slot = [ datetime.strftime(dt + timedelta(minutes=i), "%Y-%m-%d %H:%M:%S").split(" ")[1][:5] for i in range(1440) ] s_dt = get_day_start(req_date, dt_fmt="YYYY-MM-DD") s_dts, e_dts = s_dt.format(YMD_Hms), s_dt.add(days=1).format(YMD_Hms) p_list = await get_curve_p(mtid, meter_no, s_dts, e_dts, time_slot) async with MysqlUtil() as conn: sql = "select * from sdu_dev_run_anal where pid=%s and cal_day=%s" res = await conn.fetchone(sql, args=(pid, req_date)) if not res: return AlgorithmOutput( time_slot=time_slot, p_slot=p_list, electric_actions=[], run_period_list=[], runtime_list=[], electric_action_groups=[], ) action_list, action_time_list = [], [] """ { "ele_overload": [], "power_quality_low": [], "high_power_app": [["2020-12-22 00:01:46", [["\u7535\u5439\u98ce+\u7535\u78c1\u7089", 1]]], ["2020-12-22 02:26:41", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 19:49:43", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 20:23:35", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 21:40:20", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 22:42:27", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:31:41", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 23:45:49", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:49:21", [["\u5fae\u6ce2\u7089", 1]]]], "ele_car_battery": [], "illegal_ele_app": [["2020-12-22 02:45:37", [["\u70ed\u5f97\u5feb", 1]]], ["2020-12-22 23:43:48", [["\u70ed\u5f97\u5feb", 1]]]] } """ event_name_map = { "ele_overload": "用电过载", "high_power_app": "大功率电器", "illegal_ele_app": "违规电器", "ele_car_battery": "违规电器", "power_quality_low": "用电质量低", } event_type_map = { "ele_overload": set(), "power_quality_low": set(), "high_power_app": set(), "illegal_ele_app": set(), "ele_car_battery": set(), } # 根据配置扩展模块取不同的算法结果字段 async with MysqlUtil() as conn: point_sql = "select pid, cid from point where pid=%s" point_map = await conn.fetchone(point_sql, args=(pid,)) if not point_map: raise BusinessException(message="没有该监测点的信息,请联系运维人员!") cid = point_map["cid"] # 2.调用函数获取到用户信息 product_auth_map = await get_product_auth(user_id, product) cid_ext_module_map = product_auth_map["product"] if str(cid) not in cid_ext_module_map: log.error(f"用户user_id = {user_id} 工厂cid={cid}的权限") raise BusinessException( message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限") act_info = json.loads(res["act_info"]) if res["act_info"] else {} if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[ str(cid)] and detail == 1: # 如果识电U配置了2那么给权限看新字段(内部人看的详细识别数据) act_info = json.loads(res["act_info2"]) if res["act_info2"] else {} for event_type, event_list in act_info.items(): for i in event_list: if event_type in event_type_map: for item in i[1]: event_type_map[event_type].add(item[0]) action_time_list.extend([i[0]] * len(i[1])) for item in i[1]: action_list.append(item[0]) result_dict = { "设备运行情况": json.loads(res["dev_run_info"]) if res[ "dev_run_info"] is not None else [], "运行时间段": json.loads(res["dev_run_tp"]) if res[ "dev_run_tp"] is not None else [], "行为列表": action_list, "行为时间": action_time_list, } state_dict = { "电量与时长": json.loads(res["ele_quan_dur"]) if res[ "ele_quan_dur"] is not None else {}, "波动": json.loads(res["dev_wave"]) if res[ "dev_wave"] is not None else {}, } log.info(f"算法结果 result_dict={result_dict} state_dict={state_dict}") electric_action_list = [ ElectricActionItem( action_name=action_name, action_time=result_dict["行为时间"][index].split(" ")[1][:5], ) for index, action_name in enumerate(result_dict["行为列表"]) ] action_groups = groupby(electric_action_list, key=time_interval_class) electric_action_group_map = defaultdict(set) for time_interval_str, electric_actions in action_groups: for i in electric_actions: for _event_type, event_set in event_type_map.items(): if i.action_name in event_set: electric_action_group_map[time_interval_str].add( f"{event_name_map[_event_type]}{i.action_name}" if i.action_name not in ["大功率设备", "正常电器"] else i.action_name ) electric_action_groups = [] for i in time_slot: if i not in electric_action_group_map: electric_action_groups.append([]) else: electric_action_groups.append(list(electric_action_group_map[i])) return AlgorithmOutput( time_slot=time_slot, p_slot=p_list, electric_actions=electric_action_list, run_period_list=[ RunPeriodtem( running_devices=running_devices, time_period=[i.split(" ")[1][:5] for i in result_dict["运行时间段"][index]], ) for index, running_devices in enumerate(result_dict["设备运行情况"]) ], runtime_list=get_runtime_list(state_dict), electric_action_groups=electric_action_groups, ) async def main(): result_dict, state_dict, p_list = await output_result("A2004000192", "A", "2020-12-02") print(result_dict) print(state_dict) if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())