linkage_control.py 9.53 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import json

from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
from pot_libs.settings import SETTING
from unify_api.constants import ADIO_KEY, LINK_CONT_EXP_TIME
from unify_api.modules.linkage_control.components.linkage_control_cps import \
    LinLocationReq, LinLocationResp, LinkageLocation, SwitchOpReq, \
    SwitchOpResp, SwitchRecordReq, SwitchRecordResp, SwitchRecord, SoesResp
from unify_api.modules.linkage_control.service.linkage_control_service import \
    switch_operation_emq_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.time_format import srv_time


@summary('状态监测')
async def post_linkage_location(req, body: LinLocationReq) -> LinLocationResp:
    """联动控制,工厂所有开关监测点,group+item"""
    cid = body.cid
    # 1. 根据cid查出所有(location.group)
lcn's avatar
lcn committed
24 25
    sql = "select l.*,m.name  from location l left join monitor m on " \
          "l.mtid = m.mtid where l.cid = %s and l.ad_type = %s"
lcn's avatar
lcn committed
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
    async with MysqlUtil() as conn:
        loc_info = await conn.fetchall(sql=sql, args=(cid, "switch"))
    if not loc_info:
        return LinLocationResp(location_info=[])
    # 2. 查询用户是否有控制权限
    user_id = jwt_user(req)
    if not user_id:
        return LinLocationResp.missing_jwt()
    auth_sql = "select linkage_control from linkage_control_authority " \
               "where user_id = %s and cid = %s"
    async with MysqlUtil() as conn:
        auth_info = await conn.fetchone(sql=auth_sql, args=(user_id, cid))
    log.info(f"auth_info:{auth_info}")
    try:
        auth_info = auth_info["linkage_control"]
    except:
        # 说明查询不到auth_info记录,auth_info为None报错
        auth_info = None
    if not auth_info:
        authority = 0
    else:
        authority = 1
    # 3. 查询开关状态redis, 拼接好返回list
    location_info = []
    for info in loc_info:
        ll = LinkageLocation()
        ll.location_id = info.get("id")
        if info.get("item") == "default":
lcn's avatar
lcn committed
54
            ll.name = info.get("name")
lcn's avatar
lcn committed
55
        else:
lcn's avatar
lcn committed
56
            ll.name = info.get("name") + "_" + info.get("item")
lcn's avatar
lcn committed
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
        # 开关状态
        val = await RedisUtils().hget(ADIO_KEY, info.get("id"))
        try:
            val_dic = json.loads(val)
        except:
            ll.status = 2  # 2表示通信中断
            location_info.append(ll)
            continue
        now_date, timestamp = srv_time()
        if timestamp - val_dic.get("timestamp") <= LINK_CONT_EXP_TIME and \
                val_dic.get("value") is not None:  # 5min内
            ll.status = val_dic.get("value")
        else:
            ll.status = 2  # 超过5min,2表示通信中断
        location_info.append(ll)
    # 4. 返回
    return LinLocationResp(authority=authority,
                           location_info=location_info)


# @summary('开断/闭合操作')
# async def post_switch_operation(req, body: SwitchOpReq) -> SwitchOpResp:
#     """联动控制开关, 开断/闭合操作"""
#     cid = body.cid
#     location_id = body.location_id
#     switch = body.switch
#     switch_end = int(switch[1])
#     # 1. 调用业务后台接口,传数给装置
#     req_json = {
#         "switch": switch_end,  # 0-开断 1-闭合
#         "location_id": location_id
#     }
#     try:
#         resp, status = await AioHttpUtils().post(SETTING.linkage_control_url,
#                                                  req_json, timeout=50)
#     except Exception as e:
#         log.exception(e)
#         # 1.1 自己超时, 返回50001  (超时,程序会报requests.exceptions.Timeout异常)
#         log.error("联动控制超时")
#         return SwitchOpResp.timeout_self_err()
#     resp = json.loads(resp)
#     # 1.2 业务500, 返回50002
#     if status != 200:
#         log.warning("linkage control status 50002")
#         return SwitchOpResp.service_err()
#     # 1.3 硬件超时,有timeout,
#     #      如果是断开到闭合,有timeout,则认为是跳闸到闭合失败,返回50003
#     #      其他,硬件超时, 返回50004
#     if resp.get("timeout"):
#         if switch == "01":
#             log.warning("linkage control status Hardware_timeout 50003")
#             return SwitchOpResp.hardware_timeout_opentoclose_err()
#         log.warning("linkage control status 50004")
#         return SwitchOpResp.hardware_timeout_err()
#     # 1.4. 硬件500, status_code500, 返回5005
#     if resp.get("status_code") == 500:
#         if switch == "01":
#             log.warning("linkage control status Hardware_500 50003")
#             return SwitchOpResp.hardware_timeout_opentoclose_err()
#         log.warning("linkage control status 50005")
#         return SwitchOpResp.hardware_control_err()
#     # 2. 更新redis
#     now_date, timestamp = srv_time()
#     redis_dic = {
#         "value": switch_end,
#         "timestamp": int(timestamp)
#     }
#     redis_dic = json.dumps(redis_dic)
#     # val返回0
#     val = await RedisClient().hset(ADIO_KEY, str(location_id), redis_dic)
#     # 3. 更新操作到联动控制记录表
#     user_id = jwt_user(req)
#     if not user_id:
#         return LinLocationResp.missing_jwt()
#     insert_sql = "INSERT INTO linkage_control_operation_record " \
#                  "(`time`, location_id, `type`, action, user_id) VALUES %s"
#     async with MysqlUtil() as conn:
#         # 成功res返回1
#         res = await conn.execute(insert_sql, args=(
#             (now_date, location_id, 'remote', switch, user_id),))
#     if res != 1:
#         return SwitchOpResp.db_error()
#     return SwitchOpResp.success()


@summary('状态记录')
async def post_switch_record_list(req,
                                  body: SwitchRecordReq) -> SwitchRecordResp:
    """联动控制, 状态记录"""
    cid = body.cid
    is_wechat = body.is_wechat
    location_list = body.location_list
    if not location_list:
        async with MysqlUtil() as conn:
            sql = "select lid from location where cid=%s"
            locations = await conn.fetchall(sql, args=(cid,))
            location_list = [l["lid"] for l in locations]
lcn's avatar
lcn committed
154
    
lcn's avatar
lcn committed
155 156 157 158 159 160 161 162 163
    if not location_list:
        return SwitchRecordResp(total=0, rows=[])
    page_num = body.page_num
    page_record_num = body.page_record_num
    # web日月筛选按钮
    start = body.start
    end = body.end
    # 1.查询联动控制记录表
    sql_total = "SELECT count(*) from linkage_control_operation_record " \
lcn's avatar
lcn committed
164
                "where lid in %s"
lcn's avatar
lcn committed
165 166 167
    total_args = (tuple(location_list),)
    if start and end:
        sql_total = "SELECT count(*) from linkage_control_operation_record " \
lcn's avatar
lcn committed
168
                    "where lid in %s and time >= %s and time <= %s"
lcn's avatar
lcn committed
169
        total_args = (tuple(location_list), start, end)
lcn's avatar
lcn committed
170 171 172 173 174 175
    base_sql = "select lr.`time`,lr.`type`,lr.`action`,lr.lid, " \
               "location.`ad_type`,location.item, lr.user_id,m.name " \
               "from linkage_control_operation_record lr " \
               "left join location on lr.lid = location.lid " \
               "left join monitor m on location.mtid=m.mtid " \
               "where lr.lid in %s "
lcn's avatar
lcn committed
176 177 178 179 180 181 182 183 184 185 186 187 188
    if is_wechat == 1:  # 小程序端
        sql = base_sql + "order by `time` desc limit 0, 20"
        args = (tuple(location_list),)
    else:  # web端
        if not start:
            sql = base_sql + "order by `time` desc limit %s, %s"
            args = (tuple(location_list), (page_num - 1) * page_record_num,
                    page_record_num)
        else:
            sql = base_sql + "and lr.time >= %s and lr.time <= %s " \
                             "order by `time` desc limit %s, %s"
            args = (tuple(location_list), start, end,
                    (page_num - 1) * page_record_num, page_record_num)
lcn's avatar
lcn committed
189
    
lcn's avatar
lcn committed
190 191 192 193 194 195 196 197 198 199
    async with MysqlUtil() as conn:
        total_res = await conn.fetch_value(sql_total, args=total_args)
        db_result = await conn.fetchall(sql, args=args)
    if not db_result:
        return SwitchRecordResp(total=0, rows=[])
    # 2. 构造返回
    record_info = []
    for info in db_result:
        # location名
        if info.get("item") == "default":
lcn's avatar
lcn committed
200
            location = info.get("name")
lcn's avatar
lcn committed
201
        else:
lcn's avatar
lcn committed
202
            location = info.get("name") + '_' + info.get("item")
lcn's avatar
lcn committed
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
        # 操作人
        if info.get("user_id"):
            op_sql = "select name from linkage_control_authority " \
                     "where user_id = %s"
            async with MysqlUtil() as conn:
                op_res = await conn.fetchone(op_sql, args=(
                    info.get("user_id"),))
            op_name = op_res.get("name")
        else:
            op_name = ""
        time_str = info.get("time").strftime('%Y-%m-%d %H:%M:%S')
        sr = SwitchRecord()
        sr.time = time_str  # 时间
        sr.location = location  # 开关
        sr.type = info.get("type")  # 类型
        sr.action_info = info.get("action")  # 动作情况
        sr.user = op_name  # 操作人
        record_info.append(sr)
    return SwitchRecordResp(
lcn's avatar
lcn committed
222
        total=total_res,
lcn's avatar
lcn committed
223 224 225 226 227 228 229 230 231 232 233 234 235
        rows=record_info)


@summary('开断/闭合操作-直接与装置通信')
async def post_switch_operation(req, body: SwitchOpReq) -> SoesResp:
    """联动控制开关, 开断/闭合操作"""
    cid = body.cid
    location_id = body.location_id
    switch = body.switch
    switch_end = int(switch[1])
    user_id = req.ctx.user_id
    return await switch_operation_emq_service(location_id, switch, switch_end,
                                              user_id)