1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
from pot_libs.utils.exc_util import ParamException
from unify_api.constants import Importance
from unify_api.modules.alarm_manager.components.alarm_static_cps import \
SduAlarmResp, RiskCount, ContentName, SassResp, AppResp, SebResp, SiarResp, \
ZsResp, TimeCount, ZasResp
from unify_api.modules.alarm_manager.dao.list_static_dao import \
zdu_alarm_aggs_date_impotent, sdu_alarm_type_dao, \
sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
sdu_alarm_content_info, risk_distribution, \
alarm_content_time_distribution_pds
from unify_api.modules.common.dao.common_dao import points_by_cid, \
monitor_point_join, points_monitor_by_cid
from unify_api.modules.common.procedures.common_cps import \
alarm_time_distribution
from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info
from unify_api.utils.common_utils import round_1, division_two
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
zdu_alarm_sort_dao, zdu_summary_dao
async def sdu_alarm_statistics_service(cids, start, end, product):
"""目前用于 识电u->报警统计"""
# 获取point信息
points = await points_monitor_by_cid(cids)
if not points:
raise ParamException(message=f"{cids}没有points")
point_id_list = [i["pid"] for i in points]
# 1.调用函数获取报警统计信息
alarm_info_map = await sdu_alarm_content_info(cids, start, end, points)
ele_overload, illegal_ele_app, power_quality, illegal_app_dic, \
total_alarm_cnt, alarm_points_cnt = (
alarm_info_map["ele_overload"],
alarm_info_map["illegal_ele_app"],
alarm_info_map["power_quality"],
alarm_info_map["illegal_app_dic"],
alarm_info_map["total_alarm_cnt"],
alarm_info_map["alarm_points_cnt"],
)
# point_dic排序
illegal_app_list = []
for key, val in illegal_app_dic.items():
illegal_app_list.append({"name": key, "value": val})
illegal_app_list.sort(key=lambda x: x["value"], reverse=True)
# 报警类型总数,用于求比例
cn = ContentName(ele_overload=sum(ele_overload["value"]),
illegal_ele_app=sum(illegal_ele_app["value"]),
power_quality=sum(power_quality["value"]),
)
# 2.计算风险分布, 与安电u不同, 安电u发生过I级则为风险用户
# 其中当前时段发生过I或II级报警为风险用户,其余为安全用户
security_user, risk_user = await risk_distribution(start, end,
point_id_list,
is_new=True)
rc = RiskCount(security_user=security_user, risk_user=risk_user)
if alarm_points_cnt == 0:
aver_alarm = 0
else:
aver_alarm = round(total_alarm_cnt / alarm_points_cnt, 1)
return SduAlarmResp(
total_alarm_cnt=total_alarm_cnt,
alarm_points_cnt=alarm_points_cnt,
aver_alarm=aver_alarm,
ele_overload=ele_overload,
illegal_ele_app=illegal_ele_app,
power_quality=power_quality,
illegal_app_list=illegal_app_list,
content_distribution=cn,
risk_distribution=rc
)
async def sdu_alarm_statistics_sort_service(cid, start, end, page_size,
page_num, sort):
# 获取point信息
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
# point_id_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
buckets = await sdu_alarm_statistics_dao(cid, start, end)
if not buckets:
return SassResp(alarm_ranking_total=0, alarm_ranking=[])
points_dic = {}
for bucket in buckets:
point_t = bucket["pid"]
if point_t:
point_name = points_map[point_t]
if point_t not in points_dic.keys():
points_dic[point_name] = bucket["doc_count"]
else:
points_dic[point_name] += bucket["doc_count"]
# point报警统计
points_alarm_list = []
for key, val in points_dic.items():
points_alarm_list.append({"name": key, "value": val})
reverse = True if sort == "desc" else False
points_alarm_list.sort(key=lambda x: x["value"], reverse=reverse)
points_alarm_list_size = points_alarm_list[
(page_num - 1) * page_size: page_num * page_size]
return SassResp(
alarm_ranking_total=len(points_alarm_list),
alarm_ranking=points_alarm_list_size
)
async def sdu_app_statistics_sort_service(cid, start, end):
"""报警统计-报警记录-排名-识电u"""
# 1.调用函数获取报警统计信息
buckets = await sdu_alarm_statistics_dao(cid, start, end)
if not buckets:
return AppResp(ele_app_ranking=[])
# 违规电器统计
illegal_app_dic = {}
for bucket in buckets:
# 1.1 电器识别
application = bucket.get("appliance")
if application:
if application not in illegal_app_dic.keys():
illegal_app_dic[application] = bucket["doc_count"]
else:
illegal_app_dic[application] += bucket["doc_count"]
# 排序
ele_app_list = []
for key, val in illegal_app_dic.items():
# 目前版本只展示违规
ele_app_list.append({"name": key, "value": val, "type": "违规"})
ele_app_list.sort(key=lambda x: x["value"], reverse=True)
return AppResp(
ele_app_ranking=ele_app_list,
)
async def sdu_electric_behave_service(cid, start, end, storeys, product):
"""近30天用电行为"""
# 1.根据storeys获取points信息
point_list = await points_by_storeys(cid, storeys)
# 获取point_id列表
points = [i.get("point_id") for i in point_list]
# 2. es查询违规/大功率/正常次数
es_type_res = await sdu_alarm_type_dao(start, end, points)
es_type_res = {i["pid"]: i for i in es_type_res if es_type_res}
# 2.2 es查询报警等级, 计算报警分, 需要限制报警类型为sdu新版
es_imp_res = await sdu_alarm_importance_dao(start, end, points,
is_sdu=True)
es_imp_res = {i["pid"]: i for i in es_imp_res if es_imp_res}
# 3. 构造返回
return_data = {}
for info in point_list:
storey_name = info.get("storey_name")
storey_id = info.get("storey_id")
point_id = info.get("point_id")
room_name = info.get("room_name")
# 没有报警数据的初始值
illegal_ele_app = 0
high_power_app = 0
normal_app = 0
# 3.1计算类型报警次数
type_res = es_type_res.get(point_id, {})
if type_res:
# 3.1违规次数
if type_res.get("event_type") == "illegal_ele_app":
illegal_ele_app = type_res.get("doc_count")
# 3.2大功率次数
if type_res.get("event_type") == "high_power_app":
high_power_app = type_res.get("doc_count")
# 3.1正常次数
if type_res.get("event_type") == "normal_app":
normal_app = type_res.get("doc_count")
# 3.2计算报警分
first_alarm_cnt = 0
second_alarm_cnt = 0
third_alarm_cnt = 0
imp_res = es_imp_res.get(point_id, {})
if imp_res:
if imp_res["importance"] == Importance.First.value:
first_alarm_cnt += imp_res["doc_count"]
elif imp_res["importance"] == Importance.Second.value:
second_alarm_cnt += imp_res["doc_count"]
elif imp_res["importance"] == Importance.Third.value:
third_alarm_cnt += imp_res["doc_count"]
alarm_score = first_alarm_cnt * 2 + second_alarm_cnt * 1 + \
third_alarm_cnt * 0.5
if alarm_score >= 15:
alarm_score = 15
# 初始化返回dic
res_dic = {
"room_name": room_name,
"storey_id": storey_id,
"point_id": point_id,
"illegal_ele_app": illegal_ele_app, # 违规
"high_power_app": high_power_app, # 大功率
"normal_app": normal_app, # 正常
"alarm_score": alarm_score # 报警分
}
# 3.3 组装返回格式为dic
if storey_name in return_data:
return_data[storey_name].append(res_dic)
else:
return_data[storey_name] = [res_dic]
# 转换成list格式, 可以按照storey_name排序
if return_data:
# 房间排序, 并返回数据转化为list
return_list = [{"name": key, "storey_id": value[0]["storey_id"],
"room_data": sorted(value,
key=lambda i: i["room_name"])}
for key, value in return_data.items()]
# 楼层排序
return_list = sorted(return_list, key=lambda x: x["storey_id"])
else:
return_list = []
return SebResp(return_data=return_list)
async def sdu_index_alarm_rank(cid, start, end, product):
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
point_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
# 1. 违规电器排名
behavior_res = await sdu_alarm_behavior_dao(start, end, point_list)
behavior_illegal_app = []
if behavior_res:
for i in behavior_res:
tmp_dic = {"name": i["appliance"], "value": i["doc_count"]}
behavior_illegal_app.append(tmp_dic)
behavior_illegal_app = sorted(behavior_illegal_app,
key=lambda x: x["value"], reverse=True)
# 2. 报警排名, 违规行为
es_type_res = await sdu_alarm_limit_type_dao(start, end, point_list)
alarm_ranking = []
illegal_behavior = []
mid_goods = {}
for rs in es_type_res:
event_type = rs.get("event_type")
point_name = points_map.get(rs.get("pid"))
im1, im2, im3 = 0, 0, 0
im3 = rs["doc_count"] if event_type == "power_quality_low" else im3
im2 = rs["doc_count"] if event_type == "ele_overload" else im2
if event_type == "illegal_ele_app":
im1 = rs["doc_count"]
illegal_dic = {"name": point_name, "value": rs["doc_count"]}
illegal_behavior.append(illegal_dic)
if rs["pid"] not in mid_goods:
mid_goods[rs["pid"]] = {"im1": im1, "im2": im2, "im3": im3}
else:
mid_goods[rs["pid"]] = {
"im1": im1 + mid_goods[rs["pid"]]["im1"],
"im2": im2 + mid_goods[rs["pid"]]["im2"],
"im3": im3 + mid_goods[rs["pid"]]["im3"]
}
for k, v in mid_goods.items():
point_name = points_map.get(k)
im1 = v.get("im1") or 0
im2 = v.get("im2") or 0
im3 = v.get("im3") or 0
alarm_dic = {
"name": point_name, "value": im1 + im2 + im3,
"im1": im1, "im2": im2, "im3": im3
}
alarm_ranking.append(alarm_dic)
# 3. 排序
if len(alarm_ranking) > 1:
alarm_ranking = sorted(alarm_ranking, key=lambda x: x["value"],
reverse=True)
if len(illegal_behavior) > 1:
illegal_behavior = sorted(illegal_behavior,
key=lambda x: x["value"], reverse=True)
return SiarResp(
illegal_app=behavior_illegal_app[:5],
illegal_behavior=illegal_behavior[:5],
alarm_ranking=alarm_ranking[:5]
)
async def zdu_level_distribution_service(cid, start, end, product):
"""报警统计-报警等级-智电u"""
alarm_info_map = await alarm_count_info([cid], start, end, "month")
first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"],
alarm_info_map["third_alarm"],
)
return SecurityCountResp(
first_alarm=first_alarm,
second_alarm=second_alarm,
third_alarm=third_alarm,
level_detail=LevelCount(
first_alarm_cnt=sum(first_alarm["value"]),
second_alarm_cnt=sum(second_alarm["value"]),
third_alarm_cnt=sum(third_alarm["value"]),
),
)
async def zdu_content_distribution_service(cid, start, end, product):
"""报警统计-报警内容-智电u"""
alarm_info_map = await alarm_content_time_distribution_pds(cid, start, end)
temperature, residual_current, electric_param, electric_param_detail = (
alarm_info_map["temperature"],
alarm_info_map["residual_current"],
alarm_info_map["electric_param"],
alarm_info_map["electric_param_detail"],
)
return AlarmContentDistributionResp(
temperature=temperature,
residual_current=residual_current,
electric_param=electric_param,
content_detail=ContentCount(
temperature_cnt=sum(temperature["value"]),
residual_current_cnt=sum(residual_current["value"]),
electric_param_cnt=sum(electric_param["value"]),
),
)
async def zdu_summary_service(cid, start, end, product):
"""报警统计-统计概况信息-智电u"""
# 1. 报警总数, 报警监测点数, 平均报警 = 报警总数除以报警监测点
total_alarm_cnt, alarm_points = await zdu_summary_dao(cid, start, end)
# 2. 安全运行, 本月累计安全运行天数,从月初算起, 未出现一级报警则加一天
safe_run = await zdu_alarm_aggs_date_impotent(cid, start, end)
# 3. 时段分布, 白天/黑夜/凌晨
time_distribution_map = await alarm_time_distribution([cid], start, end)
return ZsResp(
total_alarm_cnt=total_alarm_cnt,
alarm_points_cnt=alarm_points,
aver_alarm=round_1(division_two(total_alarm_cnt, alarm_points)),
safe_run=safe_run,
time_interval_distribution=TimeCount(
daytime_cnt=time_distribution_map["day_alarm_cnt"],
night_cnt=time_distribution_map["night_alarm_cnt"],
morning_cnt=time_distribution_map["morning_alarm_cnt"],
)
)
async def zdu_alarm_sort_service_2(cid, start, end, page_size, page_num):
"""报警统计-报警排名-智电u"""
points_alarm_list = await zdu_alarm_sort_dao(cid, start, end, page_size,
page_num)
return ZasResp(
total=len(points_alarm_list) if points_alarm_list else 0,
alarm_ranking=points_alarm_list
)