linkage_control.py 9.44 KB
import json

from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
from pot_libs.settings import SETTING
from unify_api.constants import ADIO_KEY, LINK_CONT_EXP_TIME
from unify_api.modules.linkage_control.components.linkage_control_cps import \
    LinLocationReq, LinLocationResp, LinkageLocation, SwitchOpReq, \
    SwitchOpResp, SwitchRecordReq, SwitchRecordResp, SwitchRecord, SoesResp
from unify_api.modules.linkage_control.service.linkage_control_service import \
    switch_operation_emq_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.time_format import srv_time


@summary('状态监测')
async def post_linkage_location(req, body: LinLocationReq) -> LinLocationResp:
    """联动控制,工厂所有开关监测点,group+item"""
    cid = body.cid
    # 1. 根据cid查出所有(location.group)
    sql = "select * from location where cid = %s and ad_type = %s"
    async with MysqlUtil() as conn:
        loc_info = await conn.fetchall(sql=sql, args=(cid, "switch"))
    if not loc_info:
        return LinLocationResp(location_info=[])
    # 2. 查询用户是否有控制权限
    user_id = jwt_user(req)
    if not user_id:
        return LinLocationResp.missing_jwt()
    auth_sql = "select linkage_control from linkage_control_authority " \
               "where user_id = %s and cid = %s"
    async with MysqlUtil() as conn:
        auth_info = await conn.fetchone(sql=auth_sql, args=(user_id, cid))
    log.info(f"auth_info:{auth_info}")
    try:
        auth_info = auth_info["linkage_control"]
    except:
        # 说明查询不到auth_info记录,auth_info为None报错
        auth_info = None
    if not auth_info:
        authority = 0
    else:
        authority = 1
    # 3. 查询开关状态redis, 拼接好返回list
    location_info = []
    for info in loc_info:
        ll = LinkageLocation()
        ll.location_id = info.get("id")
        if info.get("item") == "default":
            ll.name = info.get("group")
        else:
            ll.name = info.get("group") + "_" + info.get("item")
        # 开关状态
        val = await RedisUtils().hget(ADIO_KEY, info.get("id"))
        try:
            val_dic = json.loads(val)
        except:
            ll.status = 2  # 2表示通信中断
            location_info.append(ll)
            continue
        now_date, timestamp = srv_time()
        if timestamp - val_dic.get("timestamp") <= LINK_CONT_EXP_TIME and \
                val_dic.get("value") is not None:  # 5min内
            ll.status = val_dic.get("value")
        else:
            ll.status = 2  # 超过5min,2表示通信中断
        location_info.append(ll)
    # 4. 返回
    return LinLocationResp(authority=authority,
                           location_info=location_info)


# @summary('开断/闭合操作')
# async def post_switch_operation(req, body: SwitchOpReq) -> SwitchOpResp:
#     """联动控制开关, 开断/闭合操作"""
#     cid = body.cid
#     location_id = body.location_id
#     switch = body.switch
#     switch_end = int(switch[1])
#     # 1. 调用业务后台接口,传数给装置
#     req_json = {
#         "switch": switch_end,  # 0-开断 1-闭合
#         "location_id": location_id
#     }
#     try:
#         resp, status = await AioHttpUtils().post(SETTING.linkage_control_url,
#                                                  req_json, timeout=50)
#     except Exception as e:
#         log.exception(e)
#         # 1.1 自己超时, 返回50001  (超时,程序会报requests.exceptions.Timeout异常)
#         log.error("联动控制超时")
#         return SwitchOpResp.timeout_self_err()
#     resp = json.loads(resp)
#     # 1.2 业务500, 返回50002
#     if status != 200:
#         log.warning("linkage control status 50002")
#         return SwitchOpResp.service_err()
#     # 1.3 硬件超时,有timeout,
#     #      如果是断开到闭合,有timeout,则认为是跳闸到闭合失败,返回50003
#     #      其他,硬件超时, 返回50004
#     if resp.get("timeout"):
#         if switch == "01":
#             log.warning("linkage control status Hardware_timeout 50003")
#             return SwitchOpResp.hardware_timeout_opentoclose_err()
#         log.warning("linkage control status 50004")
#         return SwitchOpResp.hardware_timeout_err()
#     # 1.4. 硬件500, status_code500, 返回5005
#     if resp.get("status_code") == 500:
#         if switch == "01":
#             log.warning("linkage control status Hardware_500 50003")
#             return SwitchOpResp.hardware_timeout_opentoclose_err()
#         log.warning("linkage control status 50005")
#         return SwitchOpResp.hardware_control_err()
#     # 2. 更新redis
#     now_date, timestamp = srv_time()
#     redis_dic = {
#         "value": switch_end,
#         "timestamp": int(timestamp)
#     }
#     redis_dic = json.dumps(redis_dic)
#     # val返回0
#     val = await RedisClient().hset(ADIO_KEY, str(location_id), redis_dic)
#     # 3. 更新操作到联动控制记录表
#     user_id = jwt_user(req)
#     if not user_id:
#         return LinLocationResp.missing_jwt()
#     insert_sql = "INSERT INTO linkage_control_operation_record " \
#                  "(`time`, location_id, `type`, action, user_id) VALUES %s"
#     async with MysqlUtil() as conn:
#         # 成功res返回1
#         res = await conn.execute(insert_sql, args=(
#             (now_date, location_id, 'remote', switch, user_id),))
#     if res != 1:
#         return SwitchOpResp.db_error()
#     return SwitchOpResp.success()


@summary('状态记录')
async def post_switch_record_list(req,
                                  body: SwitchRecordReq) -> SwitchRecordResp:
    """联动控制, 状态记录"""
    cid = body.cid
    is_wechat = body.is_wechat
    location_list = body.location_list
    if not location_list:
        async with MysqlUtil() as conn:
            sql = "select lid from location where cid=%s"
            locations = await conn.fetchall(sql, args=(cid,))
            location_list = [l["lid"] for l in locations]

    if not location_list:
        return SwitchRecordResp(total=0, rows=[])
    page_num = body.page_num
    page_record_num = body.page_record_num
    # web日月筛选按钮
    start = body.start
    end = body.end
    # 1.查询联动控制记录表
    sql_total = "SELECT count(*) from linkage_control_operation_record " \
                "where location_id in %s"
    total_args = (tuple(location_list),)
    if start and end:
        sql_total = "SELECT count(*) from linkage_control_operation_record " \
                    "where location_id in %s and time >= %s and time <= %s"
        total_args = (tuple(location_list), start, end)
    base_sql = "select lr.`time`,lr.`type`,lr.`action`,lr.location_id, " \
               "location.`ad_type` group,location.item, lr.user_id " \
               "from linkage_control_operation_record lr left join location " \
               "on lr.location_id = location.id where location_id in %s "
    if is_wechat == 1:  # 小程序端
        sql = base_sql + "order by `time` desc limit 0, 20"
        args = (tuple(location_list),)
    else:  # web端
        if not start:
            sql = base_sql + "order by `time` desc limit %s, %s"
            args = (tuple(location_list), (page_num - 1) * page_record_num,
                    page_record_num)
        else:
            sql = base_sql + "and lr.time >= %s and lr.time <= %s " \
                             "order by `time` desc limit %s, %s"
            args = (tuple(location_list), start, end,
                    (page_num - 1) * page_record_num, page_record_num)

    async with MysqlUtil() as conn:
        total_res = await conn.fetch_value(sql_total, args=total_args)
        db_result = await conn.fetchall(sql, args=args)
    if not db_result:
        return SwitchRecordResp(total=0, rows=[])
    # 2. 构造返回
    record_info = []
    for info in db_result:
        # location名
        if info.get("item") == "default":
            location = info.get("group")
        else:
            location = info.get("group") + '_' + info.get("item")
        # 操作人
        if info.get("user_id"):
            op_sql = "select name from linkage_control_authority " \
                     "where user_id = %s"
            async with MysqlUtil() as conn:
                op_res = await conn.fetchone(op_sql, args=(
                    info.get("user_id"),))
            op_name = op_res.get("name")
        else:
            op_name = ""
        time_str = info.get("time").strftime('%Y-%m-%d %H:%M:%S')
        sr = SwitchRecord()
        sr.time = time_str  # 时间
        sr.location = location  # 开关
        sr.type = info.get("type")  # 类型
        sr.action_info = info.get("action")  # 动作情况
        sr.user = op_name  # 操作人
        record_info.append(sr)
    return SwitchRecordResp(
        total=total_res if total_res < 10000 else 10000,
        rows=record_info)


@summary('开断/闭合操作-直接与装置通信')
async def post_switch_operation(req, body: SwitchOpReq) -> SoesResp:
    """联动控制开关, 开断/闭合操作"""
    cid = body.cid
    location_id = body.location_id
    switch = body.switch
    switch_end = int(switch[1])
    user_id = req.ctx.user_id
    return await switch_operation_emq_service(location_id, switch, switch_end,
                                              user_id)