# -*- coding:utf-8 -*- """ data:2021/7/21 09:50 装置替换:替换之前需要先做数据检测:旧装置是否可拆、新装置(如果曾经接入过系统,是否处于已拆除状态) 目前,设备管理界面不支持搜索已经拆掉、被替换掉的装置 需要如下操作: meter_param_record:继承被替换装置mid属性,新增一条记录mid为替换装置的mid change_meter_record:插入新记录,新记录mid为新装置mid change_sensor_record:插入新记录,sid、field字段为新装置属性 monitor:sid字段更新为新的sid; monitor_his_record:更新旧装置demolished, demolished_time字段;并再新增一条记录,mtid相同,新的sid,install_time 注意: 1. 本接口不允许直接替换已在系统的装置:仅允许替换已被拆掉装置 + 新装置 2. 如果是手动拆表、换表,请注意change_meter_record的start_time与install_ts时间需要严格一致 """ import time # from asyncio import get_event_loop from pot_libs.mysql_util.mysql_trans_util import MysqlUtil from pot_libs.logger import log async def _load_o_sid_infos(cid, sid): # to simply load related info,load info forward async with MysqlUtil() as mysql_u: sql = "select pid,m.mtid,sid,meter_no from monitor m " \ "left join point p on m.mtid = p.mtid " \ "where m.cid=%s AND m.sid=%s AND demolished=0" results = await mysql_u.fetchall(sql, (cid, sid)) or [] return {re["meter_no"]: re for re in results} async def _check_can_replace_in(new_sid): async with MysqlUtil() as mysql_u: # 1. check new sid is in sysyem sql = "SELECT count(*) count FROM power_iot.monitor " \ "WHERE sid=%s AND demolished=0" # 2. get demolished meter_no meter_sql = "select group_concat(meter_no) meter_infos " \ "from power_iot.monitor " \ "WHERE sid=%s AND demolished=1" result = await mysql_u.fetch_value(sql, (new_sid,)) meter_result = await mysql_u.fetch_value(meter_sql, (new_sid,)) return bool(result), meter_result or "" async def _load_new_param_sql(mid, new_mid, start_ts): key_str, value_str, params = "mid", "%s", [new_mid] sql = "SELECT * FROM power_iot.meter_param_record WHERE mid=%s;" async with MysqlUtil() as mysql_u: record = await mysql_u.fetchone(sql, (mid,)) for k, v in record.items(): if v is None or k == "mid": continue if k == "start_time": v = start_ts key_str += f", {k}" value_str += ", %s" params.append(v) sql = f"INSERT INTO power_iot.meter_param_record({key_str})" \ f" VALUES ({value_str});" # log.info(f"meter_param_record sql:{sql}") return sql, params async def dev_replace(cid, sid, new_sid): # check old sid in monitor if sid == new_sid: return False, "sid相同" o_meter_infos = await _load_o_sid_infos(cid, sid) if not o_meter_infos: return False, "旧装置被拆除,或配置有误,无法更换" # check new sid in meter、change_meter_record、change_sensor_record has_new, n_meter_infos = await _check_can_replace_in(new_sid) if has_new: return False, "新装置已存在,或配置有误,无法更换" log.info(f"begin to replace dev:cid:{cid} sid:{sid} new_sid:{new_sid}") async with MysqlUtil() as mysql_u: try: new_ts = int(time.time()) # demolished time,new record time for o_meter_no, v in o_meter_infos.items(): pid, mtid = v['pid'], v['mtid'] # get install time record_sql = "select max(install_ts) install_ts from " \ "power_iot.monitor_his_record " \ "WHERE mtid=%s AND sid=%s and meter_no=%s" start_ts = await mysql_u.fetch_value(record_sql, (mtid, sid, o_meter_no)) if not start_ts: sql = "SELECT create_time FROM point WHERE pid=%s;" start_ts = await mysql_u.fetch_value(sql, (pid,)) # 1. insert new meter_param_record if o_meter_no in ['A', 'B', 'C']: if o_meter_no in n_meter_infos: _meter_no = o_meter_no else: log.error(f"new sid:{new_sid} not support sdu") return False, "新装置不支持sdu" else: _meter_no = new_sid # 2. update monitor sql = "UPDATE power_iot.monitor SET sid=%s, meter_no=%s " \ "WHERE mtid=%s" await mysql_u.execute(sql, args=(new_sid, _meter_no, mtid)) # 3. update old monitor_his_record sql = "UPDATE power_iot.monitor_his_record " \ "SET demolished=1, demolished_ts=%s " \ "WHERE mtid=%s AND sid=%s AND install_ts=%s" await mysql_u.execute(sql, (new_ts, mtid, sid, start_ts)) # 4. insert new monitor_his_record sql = "INSERT INTO power_iot.monitor_his_record " \ "(mtid, sid, meter_no, install_ts) " \ "VALUES (%s, %s, %s, %s);" await mysql_u.execute(sql, args=(mtid, new_sid, _meter_no, new_ts)) # 5. update devops # sql = "update devops.db_map_meter set sid=%s " \ # "where db=%s and sid=%s;" # await mysql_u.execute(sql, args=(new_sid, 'power_iot', sid)) except Exception as e: log.error(f"device demolish fail e:{e}") await mysql_u.rollback() return False, "装置更换异常" else: await mysql_u.commit() log.info(f"finish replace dev:cid:{cid} o_sid:{sid} " f"new_sid:{new_sid} new_ts:{new_ts}") return True, "操作成功" # if __name__ == '__main__': # cid = 126 # sid = 'A2004000944' # new_sid = 'A2004000316' # get_event_loop().run_until_complete(dev_replace(cid, sid, new_sid))