import json import base64 import re import pendulum from unify_api import constants from pot_libs.common.components.responses import success_res from unify_api.modules.shidianu.components.open_data_cps import ( BasicInfoResp, StbDataResp ) from unify_api.modules.alarm_manager.components.list_alarm import \ ListAlarmResponse, Alarm from unify_api.modules.common.dao.common_dao import meter_by_mids, \ monitor_point_storey_join_in, points_monitor_by_cid from unify_api.modules.common.procedures.points import point_to_mid from unify_api.modules.shidianu.dao.open_data_dao import \ get_user_product_auth, result_longgang_by_cid, monitor_point_company from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils from pot_libs.settings import SETTING from pot_libs.utils.pendulum_wrapper import my_pendulum from pot_libs.aredis_util.aredis_utils import RedisUtils from unify_api.utils import time_format async def basic_info_longgang_service(user_id, page_size, page_num): # 1.验证权限 cids = [223] is_auth = await get_power(user_id, cids) if not is_auth: return success_res(code=4001, msg="您没有权限访问") # 2. 信息 # company = await company_by_cids(cids) # if not company: # return success_res(code=4008, msg="没有该cid信息") rows = [] # company_address = company[0]['address'] or '' monitor_point_storey_list = await monitor_point_storey_join_in(cids, 1, 3000) total = len(monitor_point_storey_list) if not monitor_point_storey_list: return success_res(code=4009, msg="此厂还未装任何装置") page_start = (page_num - 1) * page_size page_end = (page_num - 1) * page_size + page_size if page_start > total: return BasicInfoResp(rows=[], total=total) # 3. 增加相序字段 pids = [i["pid"] for i in monitor_point_storey_list] # 3.1 point和mid对应关系 point_mid, p_num = await point_to_mid(pids) # 3.2 mid查询相序 mids = point_mid.values() meter_list = await meter_by_mids(mids=mids) # 把mid提出来, {mid: meter_no} mid_meter_dic = {i["mid"]: i["meter_no"] for i in meter_list} for info in monitor_point_storey_list[page_start:page_end]: cid = info["cid"] storey_id = info.get("storey_id") or '' room_name = info["room_name"] or '' storey_name = info["storey_name"] or '' company_address = info["address"] or '' # 增加相序字段 pid = info["pid"] meter_no = mid_meter_dic[point_mid[pid]] + "相" # 创建时间 # create_time = info["create_time"] # create_time = datetime.strftime( # datetime.fromtimestamp(create_time), "%Y-%m-%d %H:%M") rows.append( { "cid": cid, "pid": info["pid"], "mtid": info["mtid"], "storey_id": storey_id, "meter_no": meter_no, "equipment_address": f"{company_address}{storey_name}{room_name}", "equipment_code": info["sid"], "root_address": f"{company_address}{storey_name}{room_name}", "longitude": info.get("longitude", '') or '', "latitude": info.get("latitude", '') or '', "insurer": "黄锦成", "insurer_phone": "13580428480" } ) return BasicInfoResp(rows=rows, total=total) async def stb_data_longgang_service(user_id, type): db_name = { "soe": {"super": "db_soe", "suffix": "soe"}, "electric": {"super": "db_electric", "suffix": "ele"}, "appliance": {"super": "db_appliance", "suffix": "app"} } if type not in db_name.keys(): return success_res(code=4002, msg="type错误") # 频率 is_frequency = await RedisUtils().get(f"get_data_{type}:{user_id}") if is_frequency: return success_res(code=4011, msg="访问频繁,请10s后访问") # 权限 # cids = SETTING.get_new_datas_by_cids cids = [223] is_auth = await get_power(user_id, cids) if not is_auth: return success_res(code=4001, msg="您没有权限访问") token = get_token() stb_url = f"{SETTING.stb_url}{db_name[type]['super']}" sql = f""" select last_row(*) from electric_stb where cpyid = {cids[0]} group by tbname """ resp_str, status = await AioHttpUtils().post_data( stb_url, data=sql, timeout=50, headers={"Authorization": f"Basic {token}"} ) if not resp_str or status != 200: return success_res(code=4003, msg="未查找到数据") results = json.loads(resp_str) head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i for i in results["head"]] datas = [] for res in results["data"]: datas.append(dict(zip(head, res))) if type == "electric": [data.pop("ts_received") for data in datas] else: for data in datas: data["ts"] = data["ts_origin"] data.pop("ts_origin") total = results.get("rows", 0) or 0 # 对数据进行压缩 # datas_str = json.dumps(datas) # datas_bytes = zlib.compress(datas_str.encode("utf-8")) # encoded = base64.b64encode(bytearray(datas_bytes)).decode() # log.info(f"stb_data 压缩前{len(datas_str)},压缩后{len(datas_bytes)}," # f"输出大小{len(encoded)}") await RedisUtils().setex(f"get_data_{type}:{user_id}", 10, 1) # return StbDataResp(rows=encoded, total=total) return StbDataResp(rows=datas, total=total) async def get_power(user_id, cids): user_info = await get_user_product_auth(user_id) if isinstance(cids, list): if user_info and user_info.get("cid_ext"): cid_ext = json.loads(user_info["cid_ext"]) for cid in cids: user_cid_power = cid_ext.get(str(cid), "") if not user_cid_power or (4 not in user_cid_power): return False return True return False elif isinstance(cids, int): if user_info and user_info.get("cid_ext"): cid_ext = json.loads(user_info["cid_ext"]) user_cid_power = cid_ext.get(str(cids), "") if user_cid_power and (4 in user_cid_power): return True return False def get_token(): user_password = f"{SETTING.td_user}:{SETTING.td_pwd}" token = base64.b64encode(user_password.encode()).decode() return token async def get_requests(token, stb_url, sql): resp_str, status = await AioHttpUtils().post( stb_url, data=sql, timeout=50, headers={"Authorization": f"Basic {token}"} ) if status == 200: return resp_str else: return False async def supplement_data_service(user_id, cid, start, end, type): is_auth = await get_power(user_id, cid) if not is_auth: return success_res(code=4001, msg="您没有权限访问") db_name = { "soe": "db_soe", "electric": "db_electric", "appliance": "db_appliance" } if type not in db_name.keys(): return success_res(code=4002, msg="type错误") now = pendulum.now() try: start = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss') end = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss') hour = (end - start).in_hours() if hour > 6: return success_res(code=4003, msg="时间跨度不能大于6小时") if (now - start).in_months() < 1: return success_res(code=4010, msg="查询日期不能大于1个月") except Exception as e: success_res(code=4004, msg="开始时间或者结束时间错误") token = get_token() stb_url = f"{SETTING.stb_url}{db_name[type]}" table_name = f"{type}_stb" if type == "electric": sql = f"select * from {table_name} where cpyid={cid} and " \ f"ts >= '{start}' and ts <= '{end}'" else: sql = f"select * from {table_name} where cpyid={cid} and " \ f"ts_origin >= '{start}' and ts_origin <= '{end}'" resp_str, status = await AioHttpUtils().post_data( stb_url, data=sql, timeout=50, headers={"Authorization": f"Basic {token}"} ) if not resp_str or status != 200: return success_res(code=4003, msg="未查找到数据") results = json.loads(resp_str) datas = [] for res in results["data"]: datas.append(dict(zip(results["head"], res))) if type == "electric": datas = [data.pop("ts_received") for data in datas] else: for data in datas: data["ts"] = data["ts_origin"] data.pop("ts_origin") total = results.get("rows", 0) or 0 return StbDataResp(rows=datas, total=total) # 告警结果和分析结果 async def result_longgang_service(user_id, importance, page_size, page_num): # cids = SETTING.get_new_datas_by_cids cids = [223] is_auth = await get_power(user_id, cids) if not is_auth: return success_res(code=4001, msg="您没有权限访问") es_res = await result_longgang_by_cid(cids, page_num, page_size, importance) if not es_res["hits"]["hits"]: return ListAlarmResponse(total=0, rows=[]) # 2. 构建返回数据 monitor_point_list = await monitor_point_company(cids) mp_map = {} for mp_list in monitor_point_list: if mp_list["pid"]: mp_map[mp_list["pid"]] = { "sid": mp_list["sid"], "mtid": mp_list["mtid"], "storey_name": mp_list["storey_name"], "room_name": mp_list["room_name"], "fullname": mp_list["fullname"], } rows = [] for info in es_res["hits"]["hits"]: es_id = info["_id"] source = info["_source"] point_id = source.get("point_id") if point_id in mp_map.keys(): type = source.get("type") type_str = constants.SDU_EVENT_TYPE_MAP.get(type, type) point_id = source.get("point_id") sid = mp_map.get(point_id).get("sid") mtid = mp_map.get(point_id).get("mtid") date_time = source.get("datetime") dt = time_format.convert_to_dt(date_time) date_time = time_format.convert_dt_to_timestr(dt) event_duration = source.get("event_duration") storey_name = mp_map.get(point_id).get("storey_name") room_name = mp_map.get(point_id).get("room_name") company_name = mp_map.get(point_id).get("fullname") alarm = Alarm( es_id=es_id, name=source.get("name"), importance=source.get("importance"), date_time=date_time, type=type, type_name=type_str, description=source.get("message"), event_duration=event_duration, company_name=company_name, point_id=point_id, storey_name=storey_name, room_name=room_name, storey_room_name=storey_name + room_name, sid=sid, mtid=mtid ) rows.append(alarm) real_total = es_res["hits"]["total"] total = real_total if real_total < constants.ES_TOTAL_LIMIT else constants.ES_TOTAL_LIMIT return ListAlarmResponse(total=total, rows=rows)