from pot_libs.es_util.es_helper import process_sql_data_aiao from pot_libs.logger import log from unify_api.constants import SLOTS from unify_api.utils.es_query_body import es_process, sql_time_process from unify_api.utils.time_format import time_pick_transf, power_slots from unify_api.modules.elec_charge.components.elec_statistics_cps import \ SlotValue from pot_libs.utils.pendulum_wrapper import my_pendulum from unify_api.modules.elec_charge.dao.elec_statistics_dao import \ get_sum_by_interval, get_power_charge_dao def interval_type(date_type, start, end): if date_type == "day": return "hour" elif date_type == "month": return "day" elif date_type == "year": return "month" # range else: start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss') end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss') diff_mm = (end_f - start_f).in_minutes() if diff_mm <= 48 * 60: return "hour" else: return "day" def interval_type_to_mysql(date_type, start, end): if date_type == "day": return '%%Y-%%m-%%d %%H:00:00' elif date_type == "month": return '%%Y-%%m-%%d 00:00:00' elif date_type == "year": return "%%Y-%%m-01 00:00:00" # range else: start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss') end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss') diff_mm = (end_f - start_f).in_minutes() if diff_mm <= 48 * 60: return '%%Y-%%m-%%d %%H:00:00' else: return '%%Y-%%m-%%d 00:00:00' async def load_power_charge(cid_list, point_id, start, end, date_type, proxy_slots=1): if point_id == -1: # 选的全部 table_name = "company_15min_power" sql_mid = "cid in %s" else: table_name = "point_15min_power" sql_mid = "pid = %s" # date_type转换, 用于聚合查询 trans_type = interval_type_to_mysql(date_type, start, end) datas = await get_sum_by_interval(table_name, trans_type, start, end, sql_mid, cid_list, point_id) if date_type == "day": slots = SLOTS[date_type] es_re = es_process(datas, fmat="HH:mm", time_key="time_interval") elif date_type == "month": intervel, slots = time_pick_transf(start, end) es_re = es_process(datas, fmat="MM-DD", time_key="time_interval") elif date_type == "year": intervel, slots = time_pick_transf(start, end) es_re = es_process(datas, fmat="YYYY-MM", time_key="time_interval") else: # range # 在计算slots时,增加返回re_type,方便和es对应数据 slots, re_type = power_slots(start, end) if re_type == "day": es_re = es_process(datas, fmat="YYYY-MM-DD HH:mm", time_key="time_interval") else: es_re = es_process(datas, fmat="YYYY-MM-DD", time_key="time_interval") # 拼接slot, value返回 kwh_sv = SlotValue() # 电量对象 kwh_sv.slots = slots kwh_list = [] charge_sv = SlotValue() # 电费对象 charge_sv.slots = slots charge_list = [] for slot in slots: if slot in es_re: # 1.每个时间点,电量信息 kwh_value = es_re[slot].get("kwh") # 值为0是正常数据 if kwh_value == 0: kwh_list.append(0.0) else: kwh_list.append(round(kwh_value, 2) if kwh_value else "") # 2.每个时间点,电费信息 charge_value = es_re[slot].get("charge") # 值为0是正常数据 if charge_value == 0: charge_list.append(0.0) else: charge_list.append( round(charge_value, 2) if charge_value else "") else: kwh_list.append("") charge_list.append("") kwh_sv.value = kwh_list charge_sv.value = charge_list return kwh_sv, charge_sv async def power_charge_download(cid, point_id, start, end): if point_id == -1: # 选的全部 table_name = "company_15min_power" sql_mid = f"cid = {cid}" else: table_name = "point_15min_power" sql_mid = f"pid = {point_id}" datas = await get_power_charge_dao(table_name, start, end, sql_mid) es_re = process_sql_data_aiao(datas, key="create_time", key_format="%Y-%m-%d %H:%M") # 4.获取slots intervel, slots = time_pick_transf(start, end, is_range=1) # 5.构造返回 kwh_sv = SlotValue() # 电量对象 kwh_sv.slots = slots kwh_list = [] charge_sv = SlotValue() # 电费对象 charge_sv.slots = slots charge_list = [] for slot in slots: if slot in es_re: # 1.每个时间点,电量信息 kwh_value = es_re[slot].get("kwh") # 值为0是正常数据 if kwh_value == 0: kwh_list.append(0.0) else: kwh_list.append(round(kwh_value, 2) if kwh_value else "") # 2.每个时间点,电费信息 charge_value = es_re[slot].get("charge") # 值为0是正常数据 if charge_value == 0: charge_list.append(0.0) else: charge_list.append( round(charge_value, 2) if charge_value else "") else: kwh_list.append("") charge_list.append("") kwh_sv.value = kwh_list charge_sv.value = charge_list return kwh_sv, charge_sv async def today_yesterday_load(cid, point_id, start, end, is_range=0): """今日和昨日负荷""" if point_id == -1: # 选的全部 table_name = "company_15min_power" sql_mid = f"cid = {cid}" else: table_name = "point_15min_power" sql_mid = f"pid = {point_id}" datas = await get_power_charge_dao(table_name, start, end, sql_mid) # 3.获取slots if is_range: intervel, slots = time_pick_transf(start, end, is_range) es_re = sql_time_process(datas, fmt="%Y-%m-%d %H:%M", time_key="create_time") else: intervel, slots = time_pick_transf(start, end) es_re = sql_time_process(datas, fmt="%H:%M", time_key="create_time") sv = SlotValue() # 今日负荷对象 sv.slots = slots tmp_list = [] # 5.拼接返回 for slot in slots: if slot in es_re: # 1.每个时间点,电量信息 value = es_re[slot].get("p") # 值为0是正常数据 if value == 0: tmp_list.append(0.0) else: tmp_list.append(round(value, 2) if value else "") else: tmp_list.append("") sv.value = tmp_list return sv def aver_price(kwh_obj, charge_obj): """根据电量电费对象, 得出平均电价对象""" # 平均电价:总电费 / 总电量 sv = SlotValue() sv.slots = kwh_obj.slots charge_list = charge_obj.value kwh_list = kwh_obj.value price_list = [] for key, value in enumerate(charge_list): if value == "": price_list.append("") else: try: value_res = value / kwh_list[key] price_list.append(round(value_res, 2)) except Exception as e: log.exception(e) price_list.append("") sv.value = price_list return sv def max_min_time(li_sv, add_one_index=False, in_2_day=False): """求最大值,最小值,对应的时间""" value_list = li_sv.value slot_list = li_sv.slots # 1. 最大值 value_tmp = [i for i in value_list if i != ""] if not value_tmp: return "", "", "", "" max_v = max(value_tmp) max_index = value_tmp.index(max_v) min_v = min(value_tmp) min_index = value_tmp.index(min_v) if not add_one_index: # 年和月, 时间格式01-20, 就是slots格式 max_time = slot_list[max_index] min_time = slot_list[min_index] else: if in_2_day: # 自定义时2天内, 返回:2021-01-30 13:00-2021-01-30 14:00 try: max_time = slot_list[max_index][5:] + " - " + \ slot_list[max_index + 1][5:] except Exception as e: max_time = slot_list[max_index][5:] + " - " + \ slot_list[max_index][5:10] + " 24:00" try: min_time = slot_list[min_index][5:] + " - " + \ slot_list[min_index + 1][5:] except Exception as e: min_time = slot_list[min_index][5:] + " - " + \ slot_list[min_index][5:10] + " 24:00" else: # 日范围, 返回11:00-12:00 try: max_time = slot_list[max_index] + "-" + slot_list[ max_index + 1] except Exception as e: max_time = slot_list[max_index] + "-24:00" try: min_time = slot_list[min_index] + "-" + slot_list[ min_index + 1] except Exception as e: min_time = slot_list[min_index] + "-24:00" return max_v, max_time, min_v, min_time