from uuid import uuid4 import pendulum from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import EVENT_TYPE_UNIT_MAP async def alarm_setting_pds(pid, locations): """获取报警设置""" sql_point = """ SELECT sd.id, sd.etype, et.`NAME`, sd.threshold, sd.duration, sd.importance, sd.`enable` FROM soe_config_record sd INNER JOIN event_type et ON sd.etype = et.e_type WHERE sd.pid = %s """ sql_location = """ SELECT ln.item, ln.ad_type AS location_ytpe, sd.id, sd.etype, et.`name`, sd.importance, sd.threshold, sd.duration, sd.`enable` FROM soe_config_record sd INNER JOIN event_type et ON sd.etype = et.e_type INNER JOIN location ln ON ln.lid = sd.lid WHERE sd.lid IN %s """ async with MysqlUtil() as conn: res_point = await conn.fetchall(sql_point, args=(pid,)) if pid else {} res_location = await conn.fetchall(sql_location, args=( tuple(locations),)) if locations else {} alarm_list = [] for res in res_point: name = res.get("name", "") id_ = res.get("id", "") threshold = res.get("threshold", "") duration = res.get("duration", "") enable = res.get("enable", "") event_type = res.get("etype", "") p_dic = { "id": id_, "type": name, "threshold": threshold, "duration": duration, "enable": enable, "unit": EVENT_TYPE_UNIT_MAP[event_type] } alarm_list.append(p_dic) for res in res_location: name = (res.get("item") + res.get("name")) if res.get( "item") != "default" else res.get("name"), id_ = res.get("id") threshold = res.get("threshold", "") duration = res.get("duration", "") enable = res.get("enable", "") event_type = res.get("type", "") lo_dic = { "id": id_, "type": name, "threshold": threshold, "duration": duration, "enable": enable, "unit": EVENT_TYPE_UNIT_MAP[event_type] } alarm_list.append(lo_dic) return alarm_list async def device_get_v1(sid): """v1.0查询配置信息""" try: request_id = str(uuid4()) data = { "sid": sid, "method": "get", "request_id": request_id, "time": pendulum.now().to_datetime_string() } async with MqttUtil() as emq: result = await emq.device_response( sid=sid, request_id=request_id, data=data ) # except TimeoutError: except Exception as e: log.warning(e) log.warning(f"sid:{sid}在1.0协议未查询到数据") # 4.2 根据2.0协议查询数据 result = None return result async def device_get_config(sid, key): """v2.0 配置查询""" try: request_id = str(uuid4()) data = { "sid": sid, "method": "get-config", "request_id": request_id, "key": key, "time": pendulum.now().to_datetime_string() } async with MqttUtil() as emq: result = await emq.device_response( sid=sid, request_id=request_id, data=data ) # except TimeoutError: except Exception as e: log.warning(e) log.warning(f"sid:{sid}在2.0协议未查询到数据") # 4.2 根据2.0协议查询数据 result = None return result async def device_demolish_pds(sid): """v2.0 装置拆除""" flag = False try: request_id = str(uuid4()) data = { "sid": sid, "method": "command", "request_id": request_id, "time": pendulum.now().to_datetime_string(), "data": { "command": "restart", "params": {} } } async with MqttUtil() as emq: result = await emq.device_response( sid=sid, request_id=request_id, data=data ) if result["status_code"] == 200: flag = True except TimeoutError: log.info(f"sid:{sid}拆除装置timeout") # 1.0装置不需要下发拆除, 回复timeout认为成功 flag = True return flag