from hbmqtt.mqtt.constants import QOS_1 from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil from pot_libs.common.components.responses import Success from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.utils.exc_util import BusinessException from unify_api.constants import EVENT_TYPE_SYNC_DEVICE from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \ post_update_alarm_emq_dao from unify_api.modules.device_cloud.procedures.mqtt_helper import \ change_param_to_config async def post_update_alarm_emq_service(set_id, threshold, duration, enable, cid, user_id, alarm_cid_dic): """更新告警设置-直接与装置通信""" # 1. 判断黑名单 # 逻辑: 工厂为99且user_id在[88, 16]中 if cid in alarm_cid_dic: user_lis = alarm_cid_dic.get(cid) if user_id in user_lis: return Success(success=0, message="用户工厂在黑名单") # 2. 根据id查询alarm_setting alarm_dic = await post_update_alarm_emq_dao(set_id) if not alarm_dic: raise BusinessException(message=f"正在设置的报警设置id={set_id}不存在") if not EVENT_TYPE_SYNC_DEVICE[alarm_dic["etype"]]: raise BusinessException( message=f"{alarm_dic} event_type表sync_device值不为1") point_id = alarm_dic["pid"] req_dic = { "point_id": point_id, "location_id": alarm_dic["lid"], "event_type": alarm_dic["etype"], "threshold": threshold, "duration": duration, "enable": enable, } # location_id和point_id只存在1个, 过滤掉另外一个参数 req_json = {k: v for k, v in req_dic.items() if v is not None} # 3. 转换为与装置通信的报文 pub_json = await change_param_to_config(req_json, method="config") if not pub_json: raise BusinessException( message=f"{point_id} 找不到meter信息") # 4. 调用emq try: # client_name = "sync_web_client_1" async with MqttUtil() as emq: res_data = await emq.device_response( sid=pub_json.get("sid"), request_id=pub_json.get("request_id"), data=pub_json ) except TimeoutError: return Success(message="time out 15s with device", success=0) log.info(f'res_data: {res_data}') if res_data["status_code"] != 200: return Success(message="操作失败status=error, status_code=500", success=0) # 5. update alarm_setting数据 if duration: sql = "UPDATE soe_config_record SET threshold=%s, duration=%s, " \ "`enable`=%s WHERE id=%s" sql_args = (threshold, duration, enable, set_id) else: sql = "UPDATE soe_config_record SET threshold=%s,enable=%s WHERE id=%s" sql_args = (threshold, enable, set_id) async with MysqlUtil() as conn: modified_cnt = await conn.execute(sql, args=sql_args) log.info(f"UPDATE alarm_setting modified_cnt={modified_cnt}") return Success(success=1, message="操作成功")