Commit fc7587ad authored by lcn's avatar lcn

bug修复

parent ac110535
...@@ -86,5 +86,5 @@ async def load_pttl_max_15min(cid, start, end, point_id=None, inline_id=None): ...@@ -86,5 +86,5 @@ async def load_pttl_max_15min(cid, start, end, point_id=None, inline_id=None):
max_val_time = str(res["create_time"]) max_val_time = str(res["create_time"])
# 根据时间范围, 返回不同时间格式 # 根据时间范围, 返回不同时间格式
if max_val_time: if max_val_time:
max_val_time = max_val_time[5:] max_val_time = max_val_time[5:16]
return max_val, max_val_time return max_val, max_val_time
...@@ -31,21 +31,14 @@ async def dev_demolish(cid, sid): ...@@ -31,21 +31,14 @@ async def dev_demolish(cid, sid):
log.error(f"no monitor to demolish found, cid:{cid} sid:{sid}") log.error(f"no monitor to demolish found, cid:{cid} sid:{sid}")
return False, "该装置已经被拆除" return False, "该装置已经被拆除"
sql = "SELECT pid FROM power_iot.point WHERE mtid IN %s" sql = "SELECT pid,mtid FROM power_iot.point WHERE mtid IN %s"
pids = [re["pid"] for re in mtid_pid_list = await mysql_u.fetchall(sql, (tuple(mtids),)) or []
await mysql_u.fetchall(sql, (tuple(mtids),))] mtid_pid_map = {re["mtid"]: re["pid"] for re in mtid_pid_list}
if not pids: if not mtid_pid_map:
log.error(f"no pids found, mtids:{mtids}") log.error(f"no mtid_pid_map found, mtids:{mtids}")
return False, "该装置已经被拆除" return False, "该装置已经被拆除"
# start_time, now_ts
sql = "SELECT start_time FROM change_meter_record " \
"WHERE pid=%s ORDER BY start_time desc LIMIT 1;"
start_ts = await mysql_u.fetch_value(sql, (pids[0],))
now_ts = int(time.time()) now_ts = int(time.time())
log.info(f"begin to demolish monitor:{mtids}, cid:{cid} sid:{sid} "
f"start_time:{start_ts} now_ts:{now_ts}")
async with MysqlUtil() as mysql_u: async with MysqlUtil() as mysql_u:
try: try:
...@@ -54,20 +47,21 @@ async def dev_demolish(cid, sid): ...@@ -54,20 +47,21 @@ async def dev_demolish(cid, sid):
"WHERE cid=%s AND sid=%s;" "WHERE cid=%s AND sid=%s;"
await mysql_u.execute(sql, args=(cid, sid)) await mysql_u.execute(sql, args=(cid, sid))
# 2. update change_meter_record: add new line with mid is null # 2. update monitor_his_record
for pid in pids:
sql = "INSERT INTO power_iot.change_meter_record " \
"(pid, start_time) VALUES (%s, %s);"
await mysql_u.execute(sql, args=(pid, now_ts))
# 3. update change_sensor_record: add new line with sid, field is null
for lid in await _load_lids(mtids):
sql = "INSERT INTO power_iot.change_sensor_record " \
"(location_id, start_time) VALUES (%s, %s);"
await mysql_u.execute(sql, args=(lid, now_ts))
# 4. update monitor_his_record
for mtid in mtids: for mtid in mtids:
# get install time
record_sql = "select max(install_ts) install_ts from " \
"power_iot.monitor_his_record " \
"WHERE mtid=%s AND sid=%s"
start_ts = await mysql_u.fetch_value(record_sql,
(mtid, sid))
if not start_ts:
sql = "SELECT create_time FROM point WHERE pid=%s;"
start_ts = await mysql_u.fetch_value(sql, (
mtid_pid_map.get(mtid),))
log.info(
f"begin to demolish monitor:{mtid}, cid:{cid} sid:{sid} "
f"start_time:{start_ts} now_ts:{now_ts}")
sql = "UPDATE power_iot.monitor_his_record " \ sql = "UPDATE power_iot.monitor_his_record " \
"SET demolished=1, demolished_ts=%s " \ "SET demolished=1, demolished_ts=%s " \
"WHERE mtid=%s AND sid=%s AND install_ts=%s;" "WHERE mtid=%s AND sid=%s AND install_ts=%s;"
......
...@@ -25,99 +25,25 @@ from pot_libs.logger import log ...@@ -25,99 +25,25 @@ from pot_libs.logger import log
async def _load_o_sid_infos(cid, sid): async def _load_o_sid_infos(cid, sid):
# to simply load related info,load info forward # to simply load related info,load info forward
async with MysqlUtil() as mysql_u: async with MysqlUtil() as mysql_u:
sql = "SELECT mtid FROM power_iot.monitor " \ sql = "select pid,m.mtid,sid,meter_no from monitor m " \
"WHERE cid=%s AND sid=%s AND demolished=0" "left join point p on m.mtid = p.mtid " \
mtids = [re["mtid"] for re in await mysql_u.fetchall(sql, (cid, sid))] "where m.cid=%s AND m.sid=%s AND demolished=0"
if not mtids: results = await mysql_u.fetchall(sql, (cid, sid)) or []
log.error(f"no monitor to replace found, cid:{cid} sid:{sid}") return {re["meter_no"]: re for re in results}
return None
lid_fields = {}
sql = "SELECT id FROM power_iot.location WHERE mtid IN %s"
lids = [re["id"] for re in
await mysql_u.fetchall(sql, (tuple(mtids),))]
for lid in lids:
sql = "SELECT field FROM power_iot.change_sensor_record " \
"WHERE location_id=%s ORDER BY start_time DESC LIMIT 1;"
field = await mysql_u.fetch_value(sql, (lid,))
lid_fields[lid] = field
sql = "SELECT pid, mtid FROM power_iot.point WHERE mtid IN %s"
pid_mtids = await mysql_u.fetchall(sql, (tuple(mtids),))
if not pid_mtids:
log.error(f"no monitor in point found, cid:{cid} sid:{sid}")
return None
o_meter_infos = dict(meter_no={}, lid_fields=lid_fields)
for item in pid_mtids:
pid, mtid = item["pid"], item["mtid"]
sql = "SELECT mid FROM power_iot.change_meter_record " \
"WHERE pid=%s ORDER BY start_time DESC LIMIT 1;"
mid = await mysql_u.fetch_value(sql, (pid,))
if mid is None:
log.error(f"old sid:{sid} demolished")
return None
sql = "SELECT meter_no FROM power_iot.meter WHERE mid=%s;"
meter_no = await mysql_u.fetch_value(sql, (mid,))
o_meter_infos["meter_no"][meter_no] = dict(pid=pid, mid=mid,
mtid=mtid)
return o_meter_infos
async def _check_can_replace_in(new_sid): async def _check_can_replace_in(new_sid):
meter_info, lid_fields = None, {}
async with MysqlUtil() as mysql_u: async with MysqlUtil() as mysql_u:
# 1. check new sid is in sysyem # 1. check new sid is in sysyem
sql = "SELECT 1 FROM power_iot.monitor " \ sql = "SELECT count(*) count FROM power_iot.monitor " \
"WHERE sid=%s AND demolished=0" "WHERE sid=%s AND demolished=0"
if await mysql_u.fetchone(sql, (new_sid,)): # 2. get demolished meter_no
log.error(f"new_sid:{new_sid} in monitor, not surpport replace") meter_sql = "select group_concat(meter_no) meter_infos " \
return False, meter_info, lid_fields "from power_iot.monitor " \
"WHERE sid=%s AND demolished=1"
# 2. # check new sid in meter result = await mysql_u.fetch_value(sql, (new_sid,))
sql = "SELECT mid, sid, meter_no FROM power_iot.meter WHERE sid=%s;" meter_result = await mysql_u.fetch_value(meter_sql, (new_sid,))
_meter_info = await mysql_u.fetchall(sql, (new_sid,)) return bool(result), meter_result or ""
if not _meter_info:
log.error(f"new sid:{new_sid} not in meter table")
return False, meter_info, lid_fields
# 3. check new sid in change_meter_record
meter_info = {re['meter_no']: re['mid'] for re in _meter_info}
for mid in meter_info.values():
sql = "SELECT pid FROM change_meter_record " \
"WHERE mid=%s ORDER BY start_time DESC LIMIT 1;"
pid = await mysql_u.fetch_value(sql, (mid,))
if not pid:
# new sid
continue
sql = "SELECT mid FROM change_meter_record " \
"WHERE pid=%s ORDER BY start_time DESC LIMIT 1;"
if await mysql_u.fetch_value(sql, (pid,)) == mid:
# if _mid is None or _mid != mid: # demolished or replaced
log.error(f"{new_sid} in system, demolish first")
return False, meter_info, lid_fields
# 4. check new sid in change_sensor_record
now_ts = int(time.time())
sql = "SELECT * FROM change_sensor_record " \
"WHERE sid=%s ORDER BY start_time ASC"
for re in await mysql_u.fetchall(sql, (new_sid,)):
if re['start_time'] < now_ts:
lid_fields[re["field"]] = re["location_id"]
for lid in lid_fields.values():
sql = "SELECT sid FROM change_sensor_record " \
"WHERE location_id=%s ORDER BY start_time DESC LIMIT 1;"
_sid = await mysql_u.fetch_value(sql, (lid,))
if _sid is None or _sid != new_sid:
continue
log.error(f"{new_sid} in change_sensor_record, demolish first")
return False, meter_info, lid_fields
return True, meter_info, lid_fields
async def _load_new_param_sql(mid, new_mid, start_ts): async def _load_new_param_sql(mid, new_mid, start_ts):
...@@ -152,20 +78,25 @@ async def dev_replace(cid, sid, new_sid): ...@@ -152,20 +78,25 @@ async def dev_replace(cid, sid, new_sid):
return False, "旧装置被拆除,或配置有误,无法更换" return False, "旧装置被拆除,或配置有误,无法更换"
# check new sid in meter、change_meter_record、change_sensor_record # check new sid in meter、change_meter_record、change_sensor_record
can_repalce_in, n_meter_infos, _ = await _check_can_replace_in(new_sid) has_new, n_meter_infos = await _check_can_replace_in(new_sid)
if not can_repalce_in: if has_new:
return False, "新装置已存在,或配置有误,无法更换" return False, "新装置已存在,或配置有误,无法更换"
log.info(f"begin to replace dev:cid:{cid} sid:{sid} new_sid:{new_sid}") log.info(f"begin to replace dev:cid:{cid} sid:{sid} new_sid:{new_sid}")
async with MysqlUtil() as mysql_u: async with MysqlUtil() as mysql_u:
try: try:
new_ts = int(time.time()) # demolished time,new record time new_ts = int(time.time()) # demolished time,new record time
for o_meter_no, v in o_meter_infos["meter_no"].items(): for o_meter_no, v in o_meter_infos.items():
pid, mid, mtid = v['pid'], v['mid'], v['mtid'] pid, mtid = v['pid'], v['mtid']
# get install time # get install time
sql = "SELECT start_time FROM change_meter_record " \ record_sql = "select max(install_ts) install_ts from " \
"WHERE pid=%s ORDER BY start_time DESC LIMIT 1;" "power_iot.monitor_his_record " \
"WHERE mtid=%s AND sid=%s and meter_no=%s"
start_ts = await mysql_u.fetch_value(record_sql,
(mtid, sid, o_meter_no))
if not start_ts:
sql = "SELECT create_time FROM point WHERE pid=%s;"
start_ts = await mysql_u.fetch_value(sql, (pid,)) start_ts = await mysql_u.fetch_value(sql, (pid,))
# 1. insert new meter_param_record # 1. insert new meter_param_record
...@@ -177,40 +108,28 @@ async def dev_replace(cid, sid, new_sid): ...@@ -177,40 +108,28 @@ async def dev_replace(cid, sid, new_sid):
return False, "新装置不支持sdu" return False, "新装置不支持sdu"
else: else:
_meter_no = new_sid _meter_no = new_sid
new_mid = n_meter_infos[_meter_no]
sql, params = await _load_new_param_sql(mid, new_mid, new_ts)
await mysql_u.execute(sql, args=tuple(params))
# 2. update monitor # 2. update monitor
sql = "UPDATE power_iot.monitor SET sid=%s, meter_no=%s " \ sql = "UPDATE power_iot.monitor SET sid=%s, meter_no=%s " \
"WHERE mtid=%s" "WHERE mtid=%s"
await mysql_u.execute(sql, args=(new_sid, _meter_no, mtid)) await mysql_u.execute(sql, args=(new_sid, _meter_no, mtid))
# 3. update change_meter_record:add new record with new mid # 3. update old monitor_his_record
sql = "INSERT INTO power_iot.change_meter_record " \
"(pid, start_time, mid) VALUES (%s, %s, %s);"
await mysql_u.execute(sql, args=(pid, new_ts, new_mid))
# 4. update old monitor_his_record
sql = "UPDATE power_iot.monitor_his_record " \ sql = "UPDATE power_iot.monitor_his_record " \
"SET demolished=1, demolished_ts=%s " \ "SET demolished=1, demolished_ts=%s " \
"WHERE mtid=%s AND sid=%s AND install_ts=%s" "WHERE mtid=%s AND sid=%s AND install_ts=%s"
await mysql_u.execute(sql, (new_ts, mtid, sid, start_ts)) await mysql_u.execute(sql, (new_ts, mtid, sid, start_ts))
# 5. insert new monitor_his_record # 4. insert new monitor_his_record
sql = "INSERT INTO power_iot.monitor_his_record " \ sql = "INSERT INTO power_iot.monitor_his_record " \
"(mtid, sid, meter_no, install_ts) " \ "(mtid, sid, meter_no, install_ts) " \
"VALUES (%s, %s, %s, %s);" "VALUES (%s, %s, %s, %s);"
await mysql_u.execute(sql, args=(mtid, new_sid, _meter_no, await mysql_u.execute(sql, args=(mtid, new_sid, _meter_no,
new_ts)) new_ts))
# 5. update devops
# 2. update change_sensor_record:insert new record with new sid + field # sql = "update devops.db_map_meter set sid=%s " \
lid_fields = o_meter_infos["lid_fields"] # "where db=%s and sid=%s;"
for lid, field in lid_fields.items(): # await mysql_u.execute(sql, args=(new_sid, 'power_iot', sid))
sql = "INSERT INTO power_iot.change_sensor_record " \
"(location_id, start_time, sid, field) " \
"VALUES (%s, %s, %s, %s);"
await mysql_u.execute(sql, args=(lid, new_ts, new_sid, field))
except Exception as e: except Exception as e:
log.error(f"device demolish fail e:{e}") log.error(f"device demolish fail e:{e}")
await mysql_u.rollback() await mysql_u.rollback()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment