alarm_setting_service.py 3.05 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
from hbmqtt.mqtt.constants import QOS_1
from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil
from pot_libs.common.components.responses import Success
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import EVENT_TYPE_SYNC_DEVICE
from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \
    post_update_alarm_emq_dao
from unify_api.modules.device_cloud.procedures.mqtt_helper import \
    change_param_to_config


async def post_update_alarm_emq_service(set_id, threshold, duration, enable,
                                        cid, user_id, alarm_cid_dic):
    """更新告警设置-直接与装置通信"""
    # 1. 判断黑名单
    # 逻辑: 工厂为99且user_id在[88, 16]中
    if cid in alarm_cid_dic:
        user_lis = alarm_cid_dic.get(cid)
        if user_id in user_lis:
            return Success(success=0, message="用户工厂在黑名单")
    # 2. 根据id查询alarm_setting
    alarm_dic = await post_update_alarm_emq_dao(set_id)
    if not alarm_dic:
        raise BusinessException(message=f"正在设置的报警设置id={set_id}不存在")

    if not EVENT_TYPE_SYNC_DEVICE[alarm_dic["etype"]]:
        raise BusinessException(
            message=f"{alarm_dic} event_type表sync_device值不为1")
    point_id = alarm_dic["pid"]
    req_dic = {
        "point_id": point_id,
        "location_id": alarm_dic["lid"],
        "event_type": alarm_dic["etype"],
        "threshold": threshold,
        "duration": duration,
        "enable": enable,
    }
    # location_id和point_id只存在1个, 过滤掉另外一个参数
    req_json = {k: v for k, v in req_dic.items() if v is not None}
    # 3. 转换为与装置通信的报文
    pub_json = await change_param_to_config(req_json, method="config")
    if not pub_json:
        raise BusinessException(
            message=f"{point_id} 找不到meter信息")
    # 4. 调用emq
    try:
        # client_name = "sync_web_client_1"
        async with MqttUtil() as emq:
            res_data = await emq.device_response(
                sid=pub_json.get("sid"),
                request_id=pub_json.get("request_id"),
                data=pub_json
            )
    except TimeoutError:
        return Success(message="time out 15s with device", success=0)

    log.info(f'res_data: {res_data}')
    if res_data["status_code"] != 200:
        return Success(message="操作失败status=error, status_code=500", success=0)
    # 5. update alarm_setting数据
    if duration:
        sql = "UPDATE soe_config_record SET threshold=%s, duration=%s, " \
              "`enable`=%s WHERE id=%s"
        sql_args = (threshold, duration, enable, set_id)
    else:
        sql = "UPDATE soe_config_record SET threshold=%s,enable=%s WHERE id=%s"
        sql_args = (threshold, enable, set_id)
    async with MysqlUtil() as conn:
        modified_cnt = await conn.execute(sql, args=sql_args)
        log.info(f"UPDATE alarm_setting modified_cnt={modified_cnt}")
    return Success(success=1, message="操作成功")