from pot_libs.logger import log from pot_libs.sanic_api import summary from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.modules.common.components.common_cps import CidReq from unify_api.modules.common.procedures.points import point_to_mid from unify_api.modules.common.service.list_point_service import \ list_storey_service, list_tsp_point_service, list_point_level_service, \ list_point_inline_service from unify_api.modules.common.components.list_points_cps import ( ListPointRequest, ListPointResponse, CommonPoint, CommonLocation, Inline, LsRep, LtpRep, LplResp, LpiResp) @summary('获取监测点,进线列表') async def post_list_point(req, body: ListPointRequest) -> ListPointResponse: cid = body.cid if not cid or cid < 0: return ListPointResponse().param_error() is_power_equipment = body.is_power_equipment list_point = [] points = {} groups = {} sql = "SELECT p.pid,p.mtid, p.name, p.add_to_company FROM point p " \ "left join monitor m on p.mtid = m.mtid " \ "left join monitor_reuse mr on p.mtid = mr.mtid " \ "WHERE m.demolished = 0 and (p.cid=%s or mr.cid = %s) " # 查询属于当前工厂下的监测点 + 其他工厂但是被复用到当前工厂的监测点 if is_power_equipment: # 动力设备 sql += " and m.is_power_equipment = 1 " async with MysqlUtil() as conn: result = await conn.fetchall(sql, args=(cid, cid)) if not result: return ListPointResponse( points=list_point, inlines=[], power_show_all=0 ) # 去调拆表的POINT point_ids = [point["pid"] for point in result] point_mid_map, point_count = await point_to_mid(point_ids) result = [point for point in result if point["pid"] in point_mid_map] for res in result: pid = res.get("pid") points[pid] = res # 获取相应的mtid point_map_mtids = [point["mtid"] for point in result if point['pid'] in point_mid_map] # 根据pid获取mtd sql = "SELECT ln.lid, mr.`name` `group`, ln.item FROM location ln LEFT JOIN" \ " monitor mr on ln.mtid = mr.mtid WHERE ( ln.cid=%s or ln.mtid in " \ "%s ) and ln.`ad_type` in %s" try: async with MysqlUtil() as conn: result = await conn.fetchall(sql, args=( cid, point_map_mtids, ["temperature", "residual_current"])) for res in result: id = res.get("lid") group = res.get("group") item = res.get("item") groups.setdefault(group, []).append((id, item)) except Exception as e: log.exception(e) return ListPointResponse().db_error() for pid, point_info in points.items(): name = point_info.get("name") add_to_company = point_info["add_to_company"] items = groups.get(name, []) locations = [] for id, item in items: comm_location = CommonLocation(location_id=id, item=item) locations.append(comm_location) comm_point = CommonPoint(name=name, point_id=pid, locations=locations, add_to_company=add_to_company) list_point.append(comm_point) async with MysqlUtil() as conn: sql = "SELECT inlid, `name` FROM inline WHERE cid=%s" inlines = await conn.fetchall(sql, args=(cid,)) inline_list = [Inline(inline_id=inline["inlid"], name=inline["name"]) for inline in inlines] return ListPointResponse( points=list_point, inlines=inline_list, power_show_all=1 if any( i for i in list_point if i.add_to_company == 1) else 0 ) @summary('获取楼层-识电u') async def post_list_storey(req, body: ListPointRequest) -> LsRep: cid = body.cid return await list_storey_service(cid) @summary('获取TSP监测点-扬尘') async def post_list_tsp_point(req, body: ListPointRequest) -> LtpRep: cid = body.cid return await list_tsp_point_service(cid) @summary('新版监测点-包含进线变压器层级') async def post_list_point_level(req, body: CidReq) -> LplResp: cid = body.cid return await list_point_level_service(cid) @summary('新版监测点-进线') async def post_list_point_inline(req, body: CidReq) -> LpiResp: cid = body.cid return await list_point_inline_service(cid)