from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.utils.pendulum_wrapper import my_pendulum from unify_api.modules.common.dao.common_dao import company_by_cids from unify_api.modules.elec_charge.dao.elec_charge_dao import load_cmpy_charge from unify_api.utils.common_utils import round_2, process_es_data, round_4 from unify_api.utils.time_format import last_time_str from functools import reduce def quarters_trans(quarters): """分时电价转换工具""" dic2 = { "sdt": 0, "pdt": 0, "fdt": 0, "vdt": 0, "s": [], "p": [], "f": [], "v": [] } temp_value = "" temp_index = 0 start = my_pendulum.from_format("00:00", 'HH:mm') for index, value in enumerate(quarters): if index == 0: temp_value = value if value != temp_value: # 有切换 # 切换前元素的索引范围 # dic2[temp_value].append(str(temp_index) + "-" + str(index-1)) # 计算时长 dic2[temp_value + "dt"] += (index - temp_index) * 15 / 60 # 转换为时间范围 minute_s = start.add(minutes=temp_index * 15).format("HH:mm") minute_e = start.add(minutes=index * 15).format("HH:mm") dic2[temp_value].append(str(minute_s) + "-" + str(minute_e)) # 重置temp_value和temp_index temp_value = value temp_index = index if index == 95: minute_s = start.add(minutes=temp_index * 15).format("HH:mm") minute_e = start.add(minutes=(index + 1) * 15).format("HH:mm") dic2[temp_value].append(str(minute_s) + "-" + str(minute_e)) # 计算时长 dic2[temp_value + "dt"] += (index - temp_index + 1) * 15 / 60 return dic2 async def load_proxy_power(cid_list): """渠道版累计用电""" sql = f""" select sum(kwh) kwh from company_1day_power where cid in %s """ # 3.返回数据 async with MysqlUtil() as conn: res = await conn.fetchone(sql=sql, args=(cid_list,)) return round(res.get("kwh", 0), 2) if res else 0 async def power_overview_proxy(date_start, date_end, cid_list): """渠道版, 抽离电量电费信息,供调用""" pv1 = {} # 电量 pv2 = {} # 电费 sql = f""" select spfv,sum(p) as p,sum(kwh) as kwh,sum(charge) as charge from company_15min_power where cid in %s and create_time BETWEEN %s and %s group by spfv """ async with MysqlUtil() as conn: res = await conn.fetchall(sql=sql, args=(cid_list, date_start, date_end)) # 4. 构造返回 for info in res: pv1[info.get("spfv")] = round_2(info.get("kwh")) pv2[info.get("spfv")] = round_2(info.get("charge")) return pv1, pv2 def total_value(dict_total): # spfv是对象 if not dict_total: return "", "" return reduce(lambda x, y: x + y, dict_total.values()) async def power_aggs_cid_proxy(start, end, cid_list, date_type): """渠道版,电量电费信息,根据cid聚合,再聚合求出sum电量/电费""" # 1. 求出上周期时间 start_last, end_last = last_time_str(start, end, date_type) # 2. 获取es结果 re_this = await load_cmpy_charge(start, end, cid_list) re_last = await load_cmpy_charge(start_last, end_last, cid_list) if not re_this: log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}") return [], [], [] re_last_dic = process_es_data(re_last, key="cid") # 3. 构造返回 kwh_list = [] charge_list = [] price_list = [] # 3.1 查询出cid和工厂名对应关系 company_list = await company_by_cids(cid_list) # 把cid提出来 com_dic = process_es_data(company_list, key="cid") for info in re_this: cid = info.get("cid") cid_name = com_dic[cid]["shortname"] kwh = round_2(info.get("kwh")) if kwh == 0: continue # 上一周期如果没有数据, 此数据不参与统计 try: kwh_last = re_last_dic[cid]["kwh"] kwh_rate = round_4((kwh - kwh_last) / kwh_last) if kwh_last == 0: continue except Exception as e: log.error(e) log.info( f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}") continue charge = round_2(info.get("charge")) try: charge_last = re_last_dic[cid]["charge"] charge_rate = round_4((charge - charge_last) / charge_last) if charge_last == 0: continue except Exception as e: log.error(e) log.info("本次有数据, 上周期没有数据") log.info( f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}") continue price = round_2(charge / kwh) price_last = round_2(charge_last / kwh_last) price_rate = round_4((price - price_last) / price_last) # 构造kwh kwh_list.append({"name": cid_name, "value": kwh, "rate": kwh_rate}) charge_list.append( {"name": cid_name, "value": charge, "rate": charge_rate}) price_list.append( {"name": cid_name, "value": price, "rate": price_rate}) return kwh_list, charge_list, price_list async def power_index_cid_proxy(start, end, cid_list, date_type): """power_aggs_cid_proxy缩减版, 没有增长率""" # 1. 获取es结果 res = await load_cmpy_charge(start, end, cid_list) if not res: log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}") return [], [], [] # 3. 构造返回 kwh_list = [] charge_list = [] price_list = [] # 3.1 查询出cid和工厂名对应关系 company_list = await company_by_cids(cid_list) # 把cid提出来 com_dic = process_es_data(company_list, key="cid") for info in res: cid = info.get("cid") cid_name = com_dic[cid]["shortname"] if cid in com_dic else '' kwh = round_2(info.get("kwh")) charge = round_2(info.get("charge")) # 值为0不参与排名统计 if kwh == 0: continue price = round_2(charge / kwh) # 构造kwh kwh_list.append({"name": cid_name, "value": kwh}) charge_list.append({"name": cid_name, "value": charge}) price_list.append({"name": cid_name, "value": price}) return kwh_list, charge_list, price_list