1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import json
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import groupby
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.settings import SETTING
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import POINT_1MIN_INDEX, PRODUCT, ExtendModule
from unify_api.modules.common.procedures.common_cps import point_day2month
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.modules.shidianu.components.algorithm_cps import (
AlgorithmOutput,
RunPeriodtem,
ElectricActionItem,
RunTimeItem,
)
from unify_api.modules.common.procedures.points import get_meter_by_point
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.shidianu.procedures.power_param import power_p
from unify_api.modules.shidianu.service.IntergratePipe import hisPieceFind
from unify_api.modules.shidianu.service.electricProportion import \
electricProportion
import numpy as np
import pandas as pd
import asyncio
from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, \
params_mining
from unify_api.modules.users.procedures.user_product_auth import \
get_product_auth
from unify_api.utils.time_format import (
last_n_day, srv_time, get_day_start, YMD_Hms
)
from unify_api.utils.taos_new import parse_td_columns
async def output_result(sid, meter_sn, date_time):
"""识电u,算法识别结果"""
sn = meter_sn.lower()
# 1. 根据sid+meter_sn查询sdu_power_param表
sql = "select * from sdu_power_param where sid = %s and meter_sn = %s"
async with MysqlUtil() as conn:
param_res = await conn.fetchone(sql=sql, args=(sid, meter_sn))
# 2. 如果数据表没有生成好的"功率参数", 使用近半月功率数据计算得出"功率参数"
if param_res:
log.info(f"{sid}_{meter_sn}:功率参数已存在")
power_param = [
param_res["fridge"],
param_res["air_conditioner"],
param_res["calorifier"],
param_res["rice_cooker"],
param_res["induction_cooker"],
]
else:
log.info(f"{sid}_{meter_sn}:功率参数不存在, 开始使用历史数据计算")
# 2.1 求最近7天日期
date_time_list = last_n_day(date_time, 7)
# 2.2 调算法,生成pa100-pa3000
pa_list = await params_mining(date_time_list, sid, sn)
# 2.3 调算法,生成功率参数power_param
param_str = paramAnalysis(pa_list)
log.info(f"历史数据计算得出参数功率:{param_str}")
# 2.4 插入功率参数到sdu_power_param表
param_list = param_str.split(",")
insert_sql = (
"INSERT INTO sdu_power_param (sid, meter_sn, is_demo, "
"fridge, air_conditioner, calorifier, rice_cooker, "
"induction_cooker, create_time) "
"VALUES %s"
)
now_date, timestamp = srv_time()
async with MysqlUtil() as conn:
# 成功res返回1
res = await conn.execute(
insert_sql,
args=(
(
sid,
meter_sn,
param_list[5],
param_list[0],
param_list[1],
param_list[2],
param_list[3],
param_list[4],
timestamp,
),
),
)
param_list.pop()
# list元素str,全部转换为int
power_param = list(map(int, param_list))
log.info(f"power_param:{power_param}")
# 3. 准备算法输入数据
# 3.1 查询es,获取查询日期功率数据dataframe
p_list, each_data_list = await power_p(sid, sn, date_time)
each_data = pd.DataFrame(each_data_list, columns=["time", "p", "i"])
each_data = each_data.fillna(value=0.2)
# 3.2 功率参数dataframe
params = np.array(power_param)
# 1. 设备识别算法
result_dict, power_pa, day_time = hisPieceFind(params, each_data)
# 2. 功率波动,电量分析,运行时长
state_dict = electricProportion(
stageComponent=result_dict["设备运行情况"],
timestage=result_dict["运行时间段"],
power=power_pa,
timeAll=day_time,
)
return result_dict, state_dict, p_list
def get_runtime_list(state_dict):
"""
获取
@dataclass
class RunTimeItem(Model):
device: str = Str("设备").eg("冰箱")
run_time: float = Float("运行时长, 单位小时").eg(1)
pchange_times: float = Float("功率波动次数").eg(1)
power: float = Float("用电 单位kwh").eg(100)
组成的列表
:param state_dict:
:return:
"""
runtime_list = []
for rundev_desc, power_time_list in state_dict["电量与时长"].items():
power_list, runtimes = power_time_list
power_list = [i for i in power_list if str(i) != "nan"]
runtimes = [i for i in power_list if str(i) != "nan"]
rundev = rundev_desc.rstrip("运行")
runtime_list.append(
RunTimeItem(
device=rundev,
power=round(sum(power_list), 2),
run_time=round(sum(runtimes), 2),
pchange_times=state_dict["波动"].get(rundev, 0),
)
)
runtime_list.sort(key=lambda x: x.run_time, reverse=True)
return runtime_list
async def get_curve_p(mtid, meter_sn, start, end, time_slots):
'''
从tdenine中获取相关数据
'''
sn = meter_sn.lower()
p_field = f"p{sn}"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select ts, max({p_field}) {p_field} from mt{mtid}_ele " \
f"where meter_sn='{str.upper(sn)}' " \
f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
status, results = await get_td_engine_data(url, sql)
if not status:
return []
datas = {}
head = parse_td_columns(results)
for one in results.get('data'):
data = dict(zip(head, one))
slot = str(data['ts'])[11:16]
datas[slot] = data[p_field]
p_list = []
for slot in time_slots:
p = datas.get(slot, "")
if p:
p = round(p, 4) if type(p) in [int, float] else p
p_list.append(p)
return p_list
def time_interval_class(item_obj):
time_str = item_obj.action_time
return time_str
async def algorithm_result_to_front(pid, req_date, user_id, product, detail):
meter_info = await get_meter_by_point(pid)
if not meter_info:
raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!")
sid, meter_no = meter_info["sid"], meter_info["meter_no"]
mtid = meter_info["mtid"]
dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S")
time_slot = [
datetime.strftime(dt + timedelta(minutes=i),
"%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
for i in range(1440)
]
s_dt = get_day_start(req_date, dt_fmt="YYYY-MM-DD")
s_dts, e_dts = s_dt.format(YMD_Hms), s_dt.add(days=1).format(YMD_Hms)
p_list = await get_curve_p(mtid, meter_no, s_dts, e_dts, time_slot)
async with MysqlUtil() as conn:
sql = "select * from sdu_dev_run_anal where pid=%s and cal_day=%s"
res = await conn.fetchone(sql, args=(pid, req_date))
if not res:
return AlgorithmOutput(
time_slot=time_slot,
p_slot=p_list,
electric_actions=[],
run_period_list=[],
runtime_list=[],
electric_action_groups=[],
)
action_list, action_time_list = [], []
"""
{
"ele_overload": [],
"power_quality_low": [],
"high_power_app": [["2020-12-22 00:01:46", [["\u7535\u5439\u98ce+\u7535\u78c1\u7089", 1]]], ["2020-12-22 02:26:41", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 19:49:43", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 20:23:35", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 21:40:20", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 22:42:27", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:31:41", [["\u7535\u78c1\u7089", 1]]], ["2020-12-22 23:45:49", [["\u7535\u70ed\u6cb9\u6c40", 1]]], ["2020-12-22 23:49:21", [["\u5fae\u6ce2\u7089", 1]]]], "ele_car_battery": [], "illegal_ele_app": [["2020-12-22 02:45:37", [["\u70ed\u5f97\u5feb", 1]]], ["2020-12-22 23:43:48", [["\u70ed\u5f97\u5feb", 1]]]]
}
"""
event_name_map = {
"ele_overload": "用电过载",
"high_power_app": "大功率电器",
"illegal_ele_app": "违规电器",
"ele_car_battery": "违规电器",
"power_quality_low": "用电质量低",
}
event_type_map = {
"ele_overload": set(),
"power_quality_low": set(),
"high_power_app": set(),
"illegal_ele_app": set(),
"ele_car_battery": set(),
}
# 根据配置扩展模块取不同的算法结果字段
async with MysqlUtil() as conn:
point_sql = "select pid, cid from point where pid=%s"
point_map = await conn.fetchone(point_sql, args=(pid,))
if not point_map:
raise BusinessException(message="没有该监测点的信息,请联系运维人员!")
cid = point_map["cid"]
# 2.调用函数获取到用户信息
product_auth_map = await get_product_auth(user_id, product)
cid_ext_module_map = product_auth_map["product"]
if str(cid) not in cid_ext_module_map:
log.error(f"用户user_id = {user_id} 工厂cid={cid}的权限")
raise BusinessException(
message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限")
act_info = json.loads(res["act_info"]) if res["act_info"] else {}
if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[
str(cid)] and detail == 1:
# 如果识电U配置了2那么给权限看新字段(内部人看的详细识别数据)
act_info = json.loads(res["act_info2"]) if res["act_info2"] else {}
for event_type, event_list in act_info.items():
for i in event_list:
if event_type in event_type_map:
for item in i[1]:
event_type_map[event_type].add(item[0])
action_time_list.extend([i[0]] * len(i[1]))
for item in i[1]:
action_list.append(item[0])
result_dict = {
"设备运行情况": json.loads(res["dev_run_info"]) if res[
"dev_run_info"] is not None else [],
"运行时间段": json.loads(res["dev_run_tp"]) if res[
"dev_run_tp"] is not None else [],
"行为列表": action_list,
"行为时间": action_time_list,
}
state_dict = {
"电量与时长": json.loads(res["ele_quan_dur"]) if res[
"ele_quan_dur"] is not None else {},
"波动": json.loads(res["dev_wave"]) if res[
"dev_wave"] is not None else {},
}
log.info(f"算法结果 result_dict={result_dict} state_dict={state_dict}")
electric_action_list = [
ElectricActionItem(
action_name=action_name,
action_time=result_dict["行为时间"][index].split(" ")[1][:5],
)
for index, action_name in enumerate(result_dict["行为列表"])
]
action_groups = groupby(electric_action_list, key=time_interval_class)
electric_action_group_map = defaultdict(set)
for time_interval_str, electric_actions in action_groups:
for i in electric_actions:
for _event_type, event_set in event_type_map.items():
if i.action_name in event_set:
electric_action_group_map[time_interval_str].add(
f"{event_name_map[_event_type]}{i.action_name}"
if i.action_name not in ["大功率设备", "正常电器"]
else i.action_name
)
electric_action_groups = []
for i in time_slot:
if i not in electric_action_group_map:
electric_action_groups.append([])
else:
electric_action_groups.append(list(electric_action_group_map[i]))
return AlgorithmOutput(
time_slot=time_slot,
p_slot=p_list,
electric_actions=electric_action_list,
run_period_list=[
RunPeriodtem(
running_devices=running_devices,
time_period=[i.split(" ")[1][:5] for i in
result_dict["运行时间段"][index]],
)
for index, running_devices in enumerate(result_dict["设备运行情况"])
],
runtime_list=get_runtime_list(state_dict),
electric_action_groups=electric_action_groups,
)
async def main():
result_dict, state_dict, p_list = await output_result("A2004000192", "A",
"2020-12-02")
print(result_dict)
print(state_dict)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())