import json from hbmqtt.mqtt.constants import QOS_1 from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import ADIO_KEY from unify_api.modules.device_cloud.procedures.mqtt_helper import \ switch_control_param_to_config from unify_api.modules.linkage_control.components.linkage_control_cps import \ SoesResp from unify_api.modules.linkage_control.dao.linkage_control_dao import \ change_sensor_record_by_location_id from unify_api.utils.response_code import RET from unify_api.utils.time_format import srv_time async def switch_operation_emq_service(location_id, switch, switch_end, user_id): """联动控制开关, 开断/闭合操作""" # 1. 组装好与装置通信的报文 change_sensor_dic = await change_sensor_record_by_location_id(location_id) sid = change_sensor_dic["sid"] field = change_sensor_dic["field"] pub_json = await switch_control_param_to_config(sid=sid, field=field, switch=switch) # 2. 调用emq try: # client_name = "sync_web_client_1" async with MqttUtil() as emq: res_data = await emq.device_response( sid=pub_json.get("sid"), request_id=pub_json.get("request_id"), data=pub_json # sub_topic_list=[(f"eems/td/response/{sid}", QOS_1)], # pub_topic=f"eems/td/request/{sid}", # pub_data=pub_json ) # 3.1 硬件超时,有timeout, # 如果是断开到闭合,有timeout,则认为是跳闸到闭合失败,返回50003 # 其他,硬件超时, 返回50004 except TimeoutError: if switch == "01": log.warning("linkage control status Hardware_timeout 50003") return SoesResp(code=RET.close_fail, message="close fail for time out") log.warning("linkage control status 50004") return SoesResp(code=RET.emq_time_out, message="emq time out") # 3.2 硬件500 if res_data["status_code"] != 200: if switch == "01": log.warning("linkage control status Hardware_500 50003") return SoesResp(code=RET.close_fail, message="close fail for hardware 500") log.warning("linkage control status 50005") return SoesResp(code=RET.hardware_time_out, message="hardware 500") # 4. 更新redis now_date, timestamp = srv_time() redis_dic = { "value": switch_end, "timestamp": int(timestamp) } redis_dic = json.dumps(redis_dic) # val返回0 val = await RedisUtils().hset(ADIO_KEY, str(location_id), redis_dic) # 5. 更新操作到联动控制记录表 insert_sql = "INSERT INTO linkage_control_operation_record " \ "(`time`, location_id, `type`, action, user_id) VALUES %s" async with MysqlUtil() as conn: # 成功res返回1 res = await conn.execute(insert_sql, args=( (now_date, location_id, 'remote', switch, user_id),)) return SoesResp(code=RET.ok, message="success")