import json import base64 import pendulum from unify_api.modules.electric.dao.electric_dao import \ get_elec_mtid_sid_by_cid from unify_api import constants from pot_libs.common.components.responses import success_res from unify_api.modules.shidianu.components.open_data_cps import ( BasicInfoResp, StbDataResp ) from unify_api.modules.alarm_manager.components.list_alarm import \ ListAlarmResponse, Alarm from unify_api.modules.common.dao.common_dao import load_compy_storey_points from unify_api.modules.shidianu.dao.open_data_dao import \ get_user_product_auth, load_lg_sdu_events, monitor_point_company from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils from pot_libs.settings import SETTING from pot_libs.utils.pendulum_wrapper import my_pendulum from pot_libs.aredis_util.aredis_utils import RedisUtils from unify_api.utils.taos_new import get_td_engine_data, parse_td_columns from unify_api.utils.time_format import ( CST, convert_dt_to_timestr, convert_to_dt ) from unify_api.modules.electric.service.electric_service import ( batch_load_rt_ele_with_hr ) async def basic_info_longgang_service(user_id, pg_size, pg_num): # 1.验证权限 cids = [223] is_auth = await get_power(user_id, cids) if not is_auth: return success_res(code=4001, msg="您没有权限访问") rows = [] storey_points = await load_compy_storey_points(cids, 1, 3000) total = len(storey_points) if not storey_points: return success_res(code=4009, msg="此厂还未装任何装置") page_start = (pg_num - 1) * pg_size page_end = (pg_num - 1) * pg_size + pg_size if page_start > total: return BasicInfoResp(rows=[], total=total) for info in storey_points[page_start:page_end]: room_name = info["room_name"] or '' storey_name = info["storey_name"] or '' compy_addr = info["address"] or '' rows.append( { "cid": info["cid"], "pid": info["pid"], "mtid": info["mtid"], "storey_id": info.get("storey_id", ""), "meter_no": info["meter_no"] + "相", "equipment_address": f"{compy_addr}{storey_name}{room_name}", "equipment_code": info["sid"], "root_address": f"{compy_addr}{storey_name}{room_name}", "longitude": "", "latitude": "", "insurer": "黄锦成", "insurer_phone": "13580428480" } ) return BasicInfoResp(rows=rows, total=total) async def stb_data_longgang_service(user_id, d_type): topic2db = {"electric": "db_electric", "soe": "db_soe", "appliance": "db_appliance"} if d_type not in topic2db.keys(): return success_res(code=4002, msg="type错误") access_lim_key = f"access_data_{d_type}:{user_id}" if await RedisUtils().get(access_lim_key): return success_res(code=4011, msg="访问频繁,请60s后访问") cids = [223] if not await get_power(user_id, cids): return success_res(code=4001, msg="您没有权限访问") if d_type == "appliance": return success_res(code=4003, msg="未查找到数据") elif d_type == "electric": lst_mtid_sid = await get_elec_mtid_sid_by_cid(cids[0]) mtids = [mtid_sid["mtid"] for mtid_sid in lst_mtid_sid] d_rt_ele = await batch_load_rt_ele_with_hr(mtids) datas = list(d_rt_ele.values()) else: db_name = topic2db[d_type] stb_url = f"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai" sql = f"select last_row(*) from {db_name}.{d_type}_stb " \ f"where cpyid={cids[0]} group by tbname;" is_succ, results = await get_td_engine_data(stb_url, sql) if not is_succ: return success_res(code=4003, msg="未查找到数据") head = parse_td_columns(results) datas = [dict(zip(head, r)) for r in results["data"]] for data in datas: data["ts"] = data.pop("ts_origin") await RedisUtils().setex(access_lim_key, 60, 1) return StbDataResp(rows=datas, total=len(datas)) async def get_power(user_id, cids): user_info = await get_user_product_auth(user_id) if isinstance(cids, list): if user_info and user_info.get("cid_ext"): cid_ext = json.loads(user_info["cid_ext"]) for cid in cids: user_cid_power = cid_ext.get(str(cid), "") if not user_cid_power or (4 not in user_cid_power): return False return True return False elif isinstance(cids, int): if user_info and user_info.get("cid_ext"): cid_ext = json.loads(user_info["cid_ext"]) user_cid_power = cid_ext.get(str(cids), "") if user_cid_power and (4 in user_cid_power): return True return False def get_token(): user_password = f"{SETTING.td_user}:{SETTING.td_pwd}" token = base64.b64encode(user_password.encode()).decode() return token async def get_requests(token, stb_url, sql): resp_str, status = await AioHttpUtils().post( stb_url, data=sql, timeout=50, headers={"Authorization": f"Basic {token}"} ) if status == 200: return resp_str else: return False async def supplement_data_service(user_id, cid, start, end, d_type): if not await get_power(user_id, cid): return success_res(code=4001, msg="您没有权限访问") topic2db = {"electric": "db_electric", "soe": "db_soe", "appliance": "db_appliance"} if d_type not in topic2db.keys(): return success_res(code=4002, msg="type错误") now = pendulum.now(tz=CST) try: start = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss') end = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss') if (end - start).in_hours() > 6: return success_res(code=4003, msg="时间跨度不能大于6小时") if (now - start).in_months() > 1: return success_res(code=4010, msg="查询日期不能大于1个月") except Exception as e: success_res(code=4004, msg="开始时间或者结束时间错误") db_name = topic2db[d_type] stb_url = f"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai" if d_type == "electric": sql = f"select * from {d_type}_stb where cpyid={cid} and " \ f"ts >= '{start}' and ts <= '{end}'" else: sql = f"select * from {d_type}_stb where cpyid={cid} and " \ f"ts_origin >= '{start}' and ts_origin <= '{end}'" is_succ, results = await get_td_engine_data(stb_url, sql) if not is_succ: return success_res(code=4003, msg="未查找到数据") head = parse_td_columns(results) datas = [dict(zip(head, r)) for r in results["data"]] if d_type == "electric": datas = [data.pop("ts_received") for data in datas] else: for data in datas: data["ts"] = data["ts_origin"] data.pop("ts_origin") total = results.get("rows", 0) or 0 return StbDataResp(rows=datas, total=total) # 告警结果和分析结果 async def result_longgang_service(user_id, importance, pg_size, pg_num): cid = 223 cids = [223] is_auth = await get_power(user_id, cids) if not is_auth: return success_res(code=4001, msg="您没有权限访问") total, events = await load_lg_sdu_events(cid, pg_num, pg_size, importance) if not events: return ListAlarmResponse(total=0, rows=[]) # 2. 构建返回数据 monitor_point_list = await monitor_point_company(cids) mp_map = {} for mp_list in monitor_point_list: if mp_list["pid"]: mp_map[mp_list["pid"]] = { "sid": mp_list["sid"], "mtid": mp_list["mtid"], "storey_name": mp_list["storey_name"], "room_name": mp_list["room_name"], "fullname": mp_list["fullname"], } rows = [] for event in events: point_id = event.get("pid") if point_id in mp_map.keys(): type = event.get("event_type") type_str = constants.SDU_EVENT_TYPE_MAP.get(type, type) point_id = event.get("pid") sid = mp_map.get(point_id).get("sid") date_time = event.get("event_datetime") dt = convert_to_dt(date_time) date_time = convert_dt_to_timestr(dt) event_duration = event.get("event_duration") storey_name = mp_map.get(point_id).get("storey_name") room_name = mp_map.get(point_id).get("room_name") company_name = mp_map.get(point_id).get("fullname") alarm = Alarm( es_id=event["id"], name=event.get("name"), importance=event.get("importance"), date_time=date_time, type=type, type_name=type_str, description=event.get("message"), event_duration=event_duration, company_name=company_name, point_id=point_id, storey_name=storey_name, room_name=room_name, storey_room_name=storey_name + room_name, sid=sid, mtid=event.get("mtid") ) rows.append(alarm) return ListAlarmResponse(total=total, rows=rows)