import json from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.sanic_api import summary from pot_libs.settings import SETTING from unify_api.constants import ADIO_KEY, LINK_CONT_EXP_TIME from unify_api.modules.linkage_control.components.linkage_control_cps import \ LinLocationReq, LinLocationResp, LinkageLocation, SwitchOpReq, \ SwitchOpResp, SwitchRecordReq, SwitchRecordResp, SwitchRecord, SoesResp from unify_api.modules.linkage_control.service.linkage_control_service import \ switch_operation_emq_service from unify_api.modules.users.procedures.jwt_user import jwt_user from unify_api.utils.time_format import srv_time @summary('状态监测') async def post_linkage_location(req, body: LinLocationReq) -> LinLocationResp: """联动控制,工厂所有开关监测点,group+item""" cid = body.cid # 1. 根据cid查出所有(location.group) sql = "select l.*,m.name from location l left join monitor m on " \ "l.mtid = m.mtid where l.cid = %s and l.ad_type = %s" async with MysqlUtil() as conn: loc_info = await conn.fetchall(sql=sql, args=(cid, "switch")) if not loc_info: return LinLocationResp(location_info=[]) # 2. 查询用户是否有控制权限 user_id = jwt_user(req) if not user_id: return LinLocationResp.missing_jwt() auth_sql = "select linkage_control from linkage_control_authority " \ "where user_id = %s and cid = %s" async with MysqlUtil() as conn: auth_info = await conn.fetchone(sql=auth_sql, args=(user_id, cid)) log.info(f"auth_info:{auth_info}") try: auth_info = auth_info["linkage_control"] except: # 说明查询不到auth_info记录,auth_info为None报错 auth_info = None if not auth_info: authority = 0 else: authority = 1 # 3. 查询开关状态redis, 拼接好返回list location_info = [] for info in loc_info: ll = LinkageLocation() ll.location_id = info.get("id") if info.get("item") == "default": ll.name = info.get("name") else: ll.name = info.get("name") + "_" + info.get("item") # 开关状态 val = await RedisUtils().hget(ADIO_KEY, info.get("id")) try: val_dic = json.loads(val) except: ll.status = 2 # 2表示通信中断 location_info.append(ll) continue now_date, timestamp = srv_time() if timestamp - val_dic.get("timestamp") <= LINK_CONT_EXP_TIME and \ val_dic.get("value") is not None: # 5min内 ll.status = val_dic.get("value") else: ll.status = 2 # 超过5min,2表示通信中断 location_info.append(ll) # 4. 返回 return LinLocationResp(authority=authority, location_info=location_info) # @summary('开断/闭合操作') # async def post_switch_operation(req, body: SwitchOpReq) -> SwitchOpResp: # """联动控制开关, 开断/闭合操作""" # cid = body.cid # location_id = body.location_id # switch = body.switch # switch_end = int(switch[1]) # # 1. 调用业务后台接口,传数给装置 # req_json = { # "switch": switch_end, # 0-开断 1-闭合 # "location_id": location_id # } # try: # resp, status = await AioHttpUtils().post(SETTING.linkage_control_url, # req_json, timeout=50) # except Exception as e: # log.exception(e) # # 1.1 自己超时, 返回50001 (超时,程序会报requests.exceptions.Timeout异常) # log.error("联动控制超时") # return SwitchOpResp.timeout_self_err() # resp = json.loads(resp) # # 1.2 业务500, 返回50002 # if status != 200: # log.warning("linkage control status 50002") # return SwitchOpResp.service_err() # # 1.3 硬件超时,有timeout, # # 如果是断开到闭合,有timeout,则认为是跳闸到闭合失败,返回50003 # # 其他,硬件超时, 返回50004 # if resp.get("timeout"): # if switch == "01": # log.warning("linkage control status Hardware_timeout 50003") # return SwitchOpResp.hardware_timeout_opentoclose_err() # log.warning("linkage control status 50004") # return SwitchOpResp.hardware_timeout_err() # # 1.4. 硬件500, status_code500, 返回5005 # if resp.get("status_code") == 500: # if switch == "01": # log.warning("linkage control status Hardware_500 50003") # return SwitchOpResp.hardware_timeout_opentoclose_err() # log.warning("linkage control status 50005") # return SwitchOpResp.hardware_control_err() # # 2. 更新redis # now_date, timestamp = srv_time() # redis_dic = { # "value": switch_end, # "timestamp": int(timestamp) # } # redis_dic = json.dumps(redis_dic) # # val返回0 # val = await RedisClient().hset(ADIO_KEY, str(location_id), redis_dic) # # 3. 更新操作到联动控制记录表 # user_id = jwt_user(req) # if not user_id: # return LinLocationResp.missing_jwt() # insert_sql = "INSERT INTO linkage_control_operation_record " \ # "(`time`, location_id, `type`, action, user_id) VALUES %s" # async with MysqlUtil() as conn: # # 成功res返回1 # res = await conn.execute(insert_sql, args=( # (now_date, location_id, 'remote', switch, user_id),)) # if res != 1: # return SwitchOpResp.db_error() # return SwitchOpResp.success() @summary('状态记录') async def post_switch_record_list(req, body: SwitchRecordReq) -> SwitchRecordResp: """联动控制, 状态记录""" cid = body.cid is_wechat = body.is_wechat location_list = body.location_list if not location_list: async with MysqlUtil() as conn: sql = "select lid from location where cid=%s" locations = await conn.fetchall(sql, args=(cid,)) location_list = [l["lid"] for l in locations] if not location_list: return SwitchRecordResp(total=0, rows=[]) page_num = body.page_num page_record_num = body.page_record_num # web日月筛选按钮 start = body.start end = body.end # 1.查询联动控制记录表 sql_total = "SELECT count(*) from linkage_control_operation_record " \ "where lid in %s" total_args = (tuple(location_list),) if start and end: sql_total = "SELECT count(*) from linkage_control_operation_record " \ "where lid in %s and time >= %s and time <= %s" total_args = (tuple(location_list), start, end) base_sql = "select lr.`time`,lr.`type`,lr.`action`,lr.lid, " \ "location.`ad_type`,location.item, lr.user_id,m.name " \ "from linkage_control_operation_record lr " \ "left join location on lr.lid = location.lid " \ "left join monitor m on location.mtid=m.mtid " \ "where lr.lid in %s " if is_wechat == 1: # 小程序端 sql = base_sql + "order by `time` desc limit 0, 20" args = (tuple(location_list),) else: # web端 if not start: sql = base_sql + "order by `time` desc limit %s, %s" args = (tuple(location_list), (page_num - 1) * page_record_num, page_record_num) else: sql = base_sql + "and lr.time >= %s and lr.time <= %s " \ "order by `time` desc limit %s, %s" args = (tuple(location_list), start, end, (page_num - 1) * page_record_num, page_record_num) async with MysqlUtil() as conn: total_res = await conn.fetch_value(sql_total, args=total_args) db_result = await conn.fetchall(sql, args=args) if not db_result: return SwitchRecordResp(total=0, rows=[]) # 2. 构造返回 record_info = [] for info in db_result: # location名 if info.get("item") == "default": location = info.get("name") else: location = info.get("name") + '_' + info.get("item") # 操作人 if info.get("user_id"): op_sql = "select name from linkage_control_authority " \ "where user_id = %s" async with MysqlUtil() as conn: op_res = await conn.fetchone(op_sql, args=( info.get("user_id"),)) op_name = op_res.get("name") else: op_name = "" time_str = info.get("time").strftime('%Y-%m-%d %H:%M:%S') sr = SwitchRecord() sr.time = time_str # 时间 sr.location = location # 开关 sr.type = info.get("type") # 类型 sr.action_info = info.get("action") # 动作情况 sr.user = op_name # 操作人 record_info.append(sr) return SwitchRecordResp( total=total_res, rows=record_info) @summary('开断/闭合操作-直接与装置通信') async def post_switch_operation(req, body: SwitchOpReq) -> SoesResp: """联动控制开关, 开断/闭合操作""" cid = body.cid location_id = body.location_id switch = body.switch switch_end = int(switch[1]) user_id = req.ctx.user_id return await switch_operation_emq_service(location_id, switch, switch_end, user_id)