1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding:utf-8 -*-
"""
data:2021/7/21 09:50
装置替换:替换之前需要先做数据检测:旧装置是否可拆、新装置(如果曾经接入过系统,是否处于已拆除状态)
目前,设备管理界面不支持搜索已经拆掉、被替换掉的装置
需要如下操作:
meter_param_record:继承被替换装置mid属性,新增一条记录mid为替换装置的mid
change_meter_record:插入新记录,新记录mid为新装置mid
change_sensor_record:插入新记录,sid、field字段为新装置属性
monitor:sid字段更新为新的sid;
monitor_his_record:更新旧装置demolished, demolished_time字段;并再新增一条记录,mtid相同,新的sid,install_time
注意:
1. 本接口不允许直接替换已在系统的装置:仅允许替换已被拆掉装置 + 新装置
2. 如果是手动拆表、换表,请注意change_meter_record的start_time与install_ts时间需要严格一致
"""
import time
# from asyncio import get_event_loop
from pot_libs.mysql_util.mysql_trans_util import MysqlUtil
from pot_libs.logger import log
async def _load_o_sid_infos(cid, sid):
# to simply load related info,load info forward
async with MysqlUtil() as mysql_u:
sql = "select pid,m.mtid,sid,meter_no from monitor m " \
"left join point p on m.mtid = p.mtid " \
"where m.cid=%s AND m.sid=%s AND demolished=0"
results = await mysql_u.fetchall(sql, (cid, sid)) or []
return {re["meter_no"]: re for re in results}
async def _check_can_replace_in(new_sid):
async with MysqlUtil() as mysql_u:
# 1. check new sid is in sysyem
sql = "SELECT count(*) count FROM power_iot.monitor " \
"WHERE sid=%s AND demolished=0"
# 2. get demolished meter_no
meter_sql = "select group_concat(meter_no) meter_infos " \
"from power_iot.monitor " \
"WHERE sid=%s AND demolished=1"
result = await mysql_u.fetch_value(sql, (new_sid,))
meter_result = await mysql_u.fetch_value(meter_sql, (new_sid,))
return bool(result), meter_result or ""
async def _load_new_param_sql(mid, new_mid, start_ts):
key_str, value_str, params = "mid", "%s", [new_mid]
sql = "SELECT * FROM power_iot.meter_param_record WHERE mid=%s;"
async with MysqlUtil() as mysql_u:
record = await mysql_u.fetchone(sql, (mid,))
for k, v in record.items():
if v is None or k == "mid":
continue
if k == "start_time":
v = start_ts
key_str += f", {k}"
value_str += ", %s"
params.append(v)
sql = f"INSERT INTO power_iot.meter_param_record({key_str})" \
f" VALUES ({value_str});"
# log.info(f"meter_param_record sql:{sql}")
return sql, params
async def dev_replace(cid, sid, new_sid):
# check old sid in monitor
if sid == new_sid:
return False, "sid相同"
o_meter_infos = await _load_o_sid_infos(cid, sid)
if not o_meter_infos:
return False, "旧装置被拆除,或配置有误,无法更换"
# check new sid in meter、change_meter_record、change_sensor_record
has_new, n_meter_infos = await _check_can_replace_in(new_sid)
if has_new:
return False, "新装置已存在,或配置有误,无法更换"
log.info(f"begin to replace dev:cid:{cid} sid:{sid} new_sid:{new_sid}")
async with MysqlUtil() as mysql_u:
try:
new_ts = int(time.time()) # demolished time,new record time
for o_meter_no, v in o_meter_infos.items():
pid, mtid = v['pid'], v['mtid']
# get install time
record_sql = "select max(install_ts) install_ts from " \
"power_iot.monitor_his_record " \
"WHERE mtid=%s AND sid=%s and meter_no=%s"
start_ts = await mysql_u.fetch_value(record_sql,
(mtid, sid, o_meter_no))
if not start_ts:
sql = "SELECT create_time FROM point WHERE pid=%s;"
start_ts = await mysql_u.fetch_value(sql, (pid,))
# 1. insert new meter_param_record
if o_meter_no in ['A', 'B', 'C']:
if o_meter_no in n_meter_infos:
_meter_no = o_meter_no
else:
log.error(f"new sid:{new_sid} not support sdu")
return False, "新装置不支持sdu"
else:
_meter_no = new_sid
# 2. update monitor
sql = "UPDATE power_iot.monitor SET sid=%s, meter_no=%s " \
"WHERE mtid=%s"
await mysql_u.execute(sql, args=(new_sid, _meter_no, mtid))
# 3. update old monitor_his_record
sql = "UPDATE power_iot.monitor_his_record " \
"SET demolished=1, demolished_ts=%s " \
"WHERE mtid=%s AND sid=%s AND install_ts=%s"
await mysql_u.execute(sql, (new_ts, mtid, sid, start_ts))
# 4. insert new monitor_his_record
sql = "INSERT INTO power_iot.monitor_his_record " \
"(mtid, sid, meter_no, install_ts) " \
"VALUES (%s, %s, %s, %s);"
await mysql_u.execute(sql, args=(mtid, new_sid, _meter_no,
new_ts))
# 5. update devops
# sql = "update devops.db_map_meter set sid=%s " \
# "where db=%s and sid=%s;"
# await mysql_u.execute(sql, args=(new_sid, 'power_iot', sid))
except Exception as e:
log.error(f"device demolish fail e:{e}")
await mysql_u.rollback()
return False, "装置更换异常"
else:
await mysql_u.commit()
log.info(f"finish replace dev:cid:{cid} o_sid:{sid} "
f"new_sid:{new_sid} new_ts:{new_ts}")
return True, "操作成功"
# if __name__ == '__main__':
# cid = 126
# sid = 'A2004000944'
# new_sid = 'A2004000316'
# get_event_loop().run_until_complete(dev_replace(cid, sid, new_sid))