import pendulum from unify_api.constants import POINT_LEVEL_MAP from unify_api.modules.common.components.common_cps import LevelResp from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.elec_charge.components.elec_charge_cps import KpResp from unify_api.modules.elec_charge.dao.elec_charge_dao import \ query_charge_aggs_points from unify_api.modules.electric.dao.electric_dao import \ monitor_point_join_by_points from unify_api.modules.home_page.procedures.count_info_pds import real_time_load from unify_api.utils.common_utils import round_2, division_two async def kwh_points_service(cid, start, end, storeys): """获取points电量""" # 1.根据storeys获取points信息 point_list = await points_by_storeys(storeys) # 获取point_id列表 points = [i.get("point_id") for i in point_list] # 2. es查询数据 es_res = await query_charge_aggs_points(start, end, points) es_res = {i["pid"]: i for i in es_res if es_res} # 3. 构造返回 kwh_data = {} # 构造返回 for info in point_list: storey_name = info.get("storey_name") storey_id = info.get("storey_id") point_id = info.get("point_id") room_name = info.get("room_name") # 初始化返回dic res_dic = { "room_name": room_name, "storey_id": storey_id, "point_id": point_id, } if point_id in es_res.keys(): kwh = round_2(es_res[point_id]["kwh"]) else: kwh = "" res_dic["value"] = kwh # 组装返回格式为dic if storey_name in kwh_data: kwh_data[storey_name].append(res_dic) else: kwh_data[storey_name] = [res_dic] # 转换成list格式, 可以按照storey_name排序 if kwh_data: # 房间排序, 并返回数据转化为list kwh_list = [{"name": key, "storey_id": value[0]["storey_id"], "room_data": sorted(value, key=lambda i: i["room_name"])} for key, value in kwh_data.items()] # 楼层排序 kwh_list = sorted(kwh_list, key=lambda x: x["storey_id"]) else: kwh_list = [] return KpResp(kwh_data=kwh_list) async def kwh_card_level_service(cid, point_list, start, end): """电量电费-卡片信息-level""" # 1. 获取每个point_id的详细信息 monitor_point_list = await monitor_point_join_by_points(point_list) # 2. es查询数据 es_res = await query_charge_aggs_points(start, end, point_list) es_res = {i["pid"]: i for i in es_res if es_res} # 3. 返回数据 ret_data = { "inline": [], "transformer": [], "feeder": [], "power_dist": [], "device": [] } for info in monitor_point_list: m_name = info.get("name") m_type = POINT_LEVEL_MAP[info.get("m_type")] point_id = info.get("pid") # 初始化返回dic res_dic = { "name": m_name, "point_id": point_id, "kwh": "", "charge": "", "price": "", } if point_id in es_res.keys(): kwh = round_2(es_res[point_id]["kwh"]) charge = round_2(es_res[point_id]["charge"]) price = round_2(division_two(charge, kwh)) res_dic["kwh"] = kwh res_dic["charge"] = charge res_dic["price"] = price ret_data[m_type].append(res_dic) return LevelResp( inline=ret_data["inline"], transformer=ret_data["transformer"], feeder=ret_data["feeder"], power_dist=ret_data["power_dist"], device=ret_data["device"], ) async def load_info_service(cid_list): # 实时负荷 cur_load = await real_time_load(cid_list) yesterday_dt = pendulum.now(tz="Asia/Shanghai").subtract(days=1) yes_load = await real_time_load(cid_list, yesterday_dt) load_percent = round((cur_load - yes_load) / yes_load, 2) if cur_load and yes_load else "" return cur_load, yes_load, load_percent