1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import pendulum
from unify_api.modules.common.dao.common_dao import (
get_point_monitor_dao, get_location_monitor_dao
)
from uuid import uuid4
async def get_point_soe_config(event_data, with_params=False):
point_id = event_data.get('point_id')
event_settings = {k: v for k, v in event_data.items() if k in [
'threshold', 'duration', 'enable'
]}
datas = await get_point_monitor_dao(point_id)
sid = datas.get("sid")
meter_sn = datas.get("meter_no")
config = {
'soe': {
"electric": {meter_sn: {event_data['event_type']: event_settings}}
}
}
if with_params:
keys_li = ["ctr", "ptr", "ctnum", "vc", "tc", "imax"]
params = {k: v for k, v in datas.items() if k in keys_li
and (v is not None)}
params['point_id'] = point_id
config['params'] = {'electric': {meter_sn: params}}
return sid, config
async def get_location_soe_config(event, with_params):
location_id = event.get('location_id')
event_settings = {k: v for k, v in event.items() if k in [
'threshold', 'duration', 'enable'
]}
datas = await get_location_monitor_dao(location_id)
sid = datas.get("sid")
field = datas.get("ad_field")
if field == "residual_current":
config = {'soe': {"adio": {event['event_type']: event_settings}}}
else:
config = {
'soe': {"adio": {field: {event['event_type']: event_settings}}}}
if with_params:
config['params'] = {'adio': {field: {'location_id': location_id}}}
return sid, config
async def change_param_to_config(req_json, method):
"""
web参数, 转换为与装置通信的报文
soe method: config
"""
sid, config = None, None
point_id = req_json.get("point_id")
if point_id is not None:
sid, config = await get_point_soe_config(req_json, with_params=True)
location_id = req_json.get('location_id')
if location_id is not None:
sid, config = await get_location_soe_config(req_json, with_params=True)
request_data = {}
if sid:
request_id = str(uuid4())
request_data = dict(
request_id=request_id,
time=pendulum.now().to_datetime_string(),
method=method,
sid=sid, data=config)
return request_data
async def switch_control_param_to_config(sid, field, switch,
command="switch_control",
method="command"):
"""
联动控制, 转换为与装置通信的报文
soe method: command
"""
request_id = str(uuid4())
request_data = dict(
request_id=request_id,
time=pendulum.now().to_datetime_string(),
method=method,
sid=sid,
data={'command': command,
'params': {'field': field, 'value': switch}}
)
return request_data