1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from hbmqtt.mqtt.constants import QOS_1
from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil
from pot_libs.common.components.responses import Success
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import EVENT_TYPE_SYNC_DEVICE
from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \
post_update_alarm_emq_dao
from unify_api.modules.device_cloud.procedures.mqtt_helper import \
change_param_to_config
async def post_update_alarm_emq_service(set_id, threshold, duration, enable,
cid, user_id, alarm_cid_dic):
"""更新告警设置-直接与装置通信"""
# 1. 判断黑名单
# 逻辑: 工厂为99且user_id在[88, 16]中
if cid in alarm_cid_dic:
user_lis = alarm_cid_dic.get(cid)
if user_id in user_lis:
return Success(success=0, message="用户工厂在黑名单")
# 2. 根据id查询alarm_setting
alarm_dic = await post_update_alarm_emq_dao(set_id)
if not alarm_dic:
raise BusinessException(message=f"正在设置的报警设置id={set_id}不存在")
if not EVENT_TYPE_SYNC_DEVICE[alarm_dic["etype"]]:
raise BusinessException(
message=f"{alarm_dic} event_type表sync_device值不为1")
point_id = alarm_dic["pid"]
req_dic = {
"point_id": point_id,
"location_id": alarm_dic["lid"],
"event_type": alarm_dic["etype"],
"threshold": threshold,
"duration": duration,
"enable": enable,
}
# location_id和point_id只存在1个, 过滤掉另外一个参数
req_json = {k: v for k, v in req_dic.items() if v is not None}
# 3. 转换为与装置通信的报文
pub_json = await change_param_to_config(req_json, method="config")
if not pub_json:
raise BusinessException(
message=f"{point_id} 找不到meter信息")
# 4. 调用emq
try:
# client_name = "sync_web_client_1"
async with MqttUtil() as emq:
res_data = await emq.device_response(
sid=pub_json.get("sid"),
request_id=pub_json.get("request_id"),
data=pub_json
)
except TimeoutError:
return Success(message="time out 15s with device", success=0)
log.info(f'res_data: {res_data}')
if res_data["status_code"] != 200:
return Success(message="操作失败status=error, status_code=500", success=0)
# 5. update alarm_setting数据
if duration:
sql = "UPDATE soe_config_record SET threshold=%s, duration=%s, " \
"`enable`=%s WHERE id=%s"
sql_args = (threshold, duration, enable, set_id)
else:
sql = "UPDATE soe_config_record SET threshold=%s,enable=%s WHERE id=%s"
sql_args = (threshold, enable, set_id)
async with MysqlUtil() as conn:
modified_cnt = await conn.execute(sql, args=sql_args)
log.info(f"UPDATE alarm_setting modified_cnt={modified_cnt}")
return Success(success=1, message="操作成功")