#!/usr/bin/python # -*- coding: utf-8 -*- import pandas as pd import numpy as np import copy import datetime class EssOptimizationTool(object): """储能优化工具 inline_var:{ "inline_capacity":1000, #进线容量,kVA, #"capacity_price":23.0, #容量电费 "max_demand_price":32.0, #需量电费 "section_s":{"price":0, "ttl_kwh": 0, "time_range": "14:00-17:00;19:00-22:00"}, "section_p":{"price":0.8888, "ttl_kwh": 0, "time_range": "14:00-17:00;19:00-22:00"}, "section_f":{"price":0.5503, "ttl_kwh": 0, "time_range": "8:00-14:00;17:00-19:00;22:00-24:00"}, "section_v":{"price":0.2900, "ttl_kwh": 0, "time_range":"00:00-8:00"} }#电量数据为完整月份计量中,峰时段最小电量的月份 ess_system:{ "cell_price":10000, #电池单价,元/kWh "pcs_price":10000, #pcs单价,元/kW "other_ttl_charge":0, #其他费用总价,元 "pcs_efficiency":90, #转换效率 "bat_efficiency":90, #充放电效率 "decay_rate":5.0, #衰减率 #"evaluate_year": "5-10", #评估年限 #"invest_income_rate":(15, 12, 10, 8, 6), #投资收益率 "DOD":90.0, #放电深度 "year_use_days": 330.0, #一年可利用时间 "charge_C_rate":0.5, #充放电倍率 #"res_value_bat":30, #电池残值 #"loop_time":3000-5000 #循环次数 }, df_curve:pandas.Series """ ess_params = { "evaluate_year": "5-10", # 评估年限 "invest_income_rate": (15, 12, 10, 8, 6), # 投资收益率 "res_value_bat": 30, # 电池残值 "loop_time": 3000 - 5000 # 循环次数 } def __init__(self, inline_var, ess_system, df_curve): self.inline_var = inline_var self.ess_system = self._update_system_params(ess_system) self.df_curve = df_curve self.time_sections = self._check_time_section() self.curve = self._merge_curve(df_curve) self.capacity = 0 self.delta_price = self.price_gap() self.flag = self._check_peak_valley_shifting() self.economic_evaluate = 0 self.opt_curve = 0 self.opt_analysis = 0 def price_gap(self): if "section_s" in self.inline_var.keys(): delta_price = (self.inline_var["section_s"]["price"] - self.inline_var["section_v"]["price"]) else: delta_price = (self.inline_var["section_p"]["price"] - self.inline_var["section_v"]["price"]) return delta_price def _update_system_params(self, data_dict): data = copy.deepcopy(data_dict) data.update(self.ess_params) return data def _merge_curve(self, df_curve): df = copy.deepcopy(self.time_sections) # index,quarter_time,spfv_flag df["time_point"] = df["quarter_time"].apply( lambda x: datetime.datetime.strftime(x, "%H:%M:%S")) load_curve = copy.deepcopy(df_curve) load_curve["time_point"] = load_curve["quarter_time"].apply( lambda x: datetime.datetime.strftime(x, "%H:%M:%S")) df_new = pd.merge(df[["time_point", "spfv_flag"]], load_curve[["time_point", "p"]], how="left", on="time_point") df_new.rename(columns={"p": "load_curve"}, inplace=True) return df_new def time_15min_parse(self, time_range): time_sections = time_range.split(";") sections = [] for section in time_sections: section_range = section.split("-") start_time = pd.Timestamp(section_range[0]) if section_range[1] == "24:00": section_range[1] = "00:00" stop_time = pd.Timestamp(section_range[1]) + pd.Timedelta( days=1) else: stop_time = pd.Timestamp(section_range[1]) current_time = start_time # + pd.Timedelta(seconds=900) while True: if current_time < stop_time: sections.append(current_time) current_time += pd.Timedelta(seconds=900) else: break return sections def _check_time_section(self): param = {"section_s": 3, "section_p": 2, "section_f": 1, "section_v": 0} hour = [] spfv_flag = [] for key in self.inline_var.keys(): if key not in ["inline_capacity", "capacity_price", "max_demand_price"]: time_point = self.time_15min_parse( self.inline_var[key]["time_range"]) hour.extend(time_point) num_15mins = len(time_point) spfv_flag.extend([param[key]] * num_15mins) time_section = pd.DataFrame( {"quarter_time": hour, "spfv_flag": spfv_flag}) time_section = time_section.sort_values(by="quarter_time") time_section.index = range(96) return time_section def _check_peak_valley_shifting(self): flag = False time_sections = copy.deepcopy(self.time_sections) len_peak = (time_sections["spfv_flag"] >= 2).sum() len_smooth = (time_sections["spfv_flag"] == 1).sum() len_valley = (time_sections["spfv_flag"] == 0).sum() df = copy.deepcopy(self.curve) df = df.set_index("time_point") load_max = df.loc[df["spfv_flag"] >= 2, "load_curve"].max() if "section_s" in self.inline_var.keys(): peak_kwh = self.inline_var["section_s"]["ttl_kwh"] + \ self.inline_var["section_p"]["ttl_kwh"] else: peak_kwh = self.inline_var["section_p"]["ttl_kwh"] pu_peak_kwh = peak_kwh / len_peak pu_smooth_kwh = self.inline_var["section_f"]["ttl_kwh"] / len_smooth pu_valley_kwh = self.inline_var["section_v"]["ttl_kwh"] / len_valley pu_avg = (peak_kwh + self.inline_var["section_f"]["ttl_kwh"] + self.inline_var["section_v"]["ttl_kwh"]) / ( len_peak + len_smooth + len_valley) if self.delta_price > 0.5 and pu_peak_kwh >= pu_avg: capacity = {} flag = True capacity["PCS"] = pu_peak_kwh - pu_avg capacity["PCS"] = capacity["PCS"] if capacity["PCS"] >= 0 else 0 capacity["battery"] = (capacity["PCS"] / ( self.ess_system["charge_C_rate"] * self.ess_system[ "DOD"] / 100.0 * self.ess_system["bat_efficiency"] / 100.0 * self.ess_system["pcs_efficiency"] / 100.0 + 0.00001)) capacity["battery"] = capacity["battery"] if capacity[ "battery"] >= 0 else 0 self.capacity = capacity return flag def cal_opt_curve(self): opt_curve = {} df = copy.deepcopy(self.curve) df = df.set_index("time_point") delta_P = 0 load_max = df.loc[df["spfv_flag"] >= 2, "load_curve"].max() pload = df.loc[df["spfv_flag"] >= 2, "load_curve"] while delta_P <= self.capacity["PCS"]: if (pload[pload >= load_max - delta_P] - ( load_max - delta_P)).sum() * 0.25 < self.capacity[ "battery"] * self.ess_system["DOD"] / 100: delta_P += 10 else: break smooth_max = load_max - delta_P peak_value = df.loc[df["spfv_flag"] >= 2, ["load_curve"]] shift_kwh = (peak_value.loc[peak_value[ "load_curve"] >= smooth_max, "load_curve"] - smooth_max).sum() peak_value.loc[ peak_value["load_curve"] >= smooth_max, "load_curve"] = smooth_max smooth_value = df.loc[df["spfv_flag"] == 1, ["load_curve"]] valley_value = df.loc[df["spfv_flag"] == 0, ["load_curve"]] delta_P = valley_value["load_curve"].min() shift_valley = 0 while shift_valley <= shift_kwh: delta_kwh = delta_P - valley_value["load_curve"] shift_valley = delta_kwh[delta_kwh >= 0].sum() delta_P += 10 # valley_value.loc[:,:] = (valley_value["load_curve"].sum() + shift_kwh)/((df["spfv_flag"]==0).sum()+0.0001) valley_value[valley_value <= delta_P] = delta_P try: valley_value.loc[valley_value["load_curve"] >= self.inline_var[ "inline_capacity"], "load_curve"] = self.inline_var[ "inline_capacity"] except Exception as e: pass psv_value = pd.concat([valley_value, smooth_value, peak_value], ignore_index=False) df["load_bat_curve"] = psv_value df["bat_curve"] = df["load_curve"] - df["load_bat_curve"] opt_curve["quarter_time"] = df.index opt_curve["load_curve"] = df["load_curve"].apply( lambda x: self.fix_decimal_points(x, decimal_num=2)) opt_curve["bat_curve"] = df["bat_curve"].apply( lambda x: self.fix_decimal_points(x, decimal_num=2)) opt_curve["load_bat_curve"] = df["load_bat_curve"].apply( lambda x: self.fix_decimal_points(x, decimal_num=2)) opt_curve = pd.DataFrame(opt_curve) opt_curve = opt_curve.set_index("quarter_time") self.opt_curve = opt_curve def _peak_valley_shifting(self): rst = {} if self.flag: peak_valley_flag = True peak_valley_kwh = (self.capacity["battery"] * self.ess_system[ "DOD"] / 100.0 * self.ess_system["pcs_efficiency"] / 100.0 * self.ess_system["bat_efficiency"] / 100.0 * self.ess_system["year_use_days"] / 12.0) # 度/月 else: peak_valley_flag = False peak_valley_kwh = 0.0 rst["peak_valley_flag"] = peak_valley_flag rst["peak_valley_kwh"] = self.fix_decimal_points(peak_valley_kwh, decimal_num=2) return rst def _max_demand_control(self): rst = {} if self.flag: diff = (self.opt_curve["load_curve"].max() - self.opt_curve[ "load_bat_curve"].max()) try: if self.inline_var["inline_capacity"]: percent_flag = (diff / self.inline_var[ "inline_capacity"] > 0.01) else: percent_flag = (diff > 10) if percent_flag: max_demand_flag = True max_demand_benifit = diff * self.inline_var[ "max_demand_price"] else: max_demand_flag = False max_demand_benifit = 0.0 except Exception as e: max_demand_flag = False max_demand_benifit = 0.0 else: max_demand_flag = False max_demand_benifit = 0.0 rst["max_demand_flag"] = max_demand_flag rst["max_demand_benifit"] = self.fix_decimal_points(max_demand_benifit, decimal_num=2) return rst def _economic_operation(self): rst = {} if self.flag: diff = (self.opt_curve["load_curve"].max() - self.opt_curve[ "load_bat_curve"].max()) try: if self.inline_var["inline_capacity"]: percent_flag = (diff / self.inline_var[ "inline_capacity"] > 0.01) else: percent_flag = (diff > 10) if percent_flag: economic_operation_flag = True reduce_peak = diff reduce_load_factor = self.opt_curve[ "load_bat_curve"].max() / ( self.inline_var[ "inline_capacity"] + 0.001) else: economic_operation_flag = False reduce_peak = 0.0 reduce_load_factor = 0.0 except Exception as e: economic_operation_flag = False reduce_peak = 0.0 reduce_load_factor = 0.0 else: economic_operation_flag = False reduce_peak = 0.0 reduce_load_factor = 0.0 rst["economic_operation_flag"] = economic_operation_flag rst["reduce_peak"] = self.fix_decimal_points(reduce_peak, decimal_num=2) rst["reduce_load_factor"] = self.fix_decimal_points(reduce_load_factor, decimal_num=4) return rst def optimize_analysis(self): opt_analysis = {} opt_analysis["peak_valley"] = self._peak_valley_shifting() opt_analysis["max_demand"] = self._max_demand_control() opt_analysis["economic_operation"] = self._economic_operation() return opt_analysis @staticmethod def present_value_annuity(rate, years, ttl_invest): percent = rate / 100.0 f = (1.0 - 1.0 / (1.0 + percent) ** years) / (percent + 0.0) rst = ttl_invest / f return np.round(rst, 2) def invest_income_table(self, ttl_invest): years = self.ess_system["evaluate_year"].split("-") rate = np.array(self.ess_system["invest_income_rate"]) df_dict = {} for year in range(int(years[0]), int(years[1]) + 1): df_dict[str(year)] = self.present_value_annuity(rate, year, ttl_invest) df = pd.DataFrame(df_dict, index=[str(i) + "%" for i in rate]) return df def cal_economic_evaluate(self): economic_value = {} invest_bat = self.ess_system["cell_price"] * self.capacity["battery"] invest_pcs = self.ess_system["pcs_price"] * self.capacity["PCS"] economic_value["ttl_invest"] = invest_bat + invest_pcs + \ self.ess_system["other_ttl_charge"] economic_value["year_use_days"] = self.ess_system["year_use_days"] economic_value["peak_valley_year_income"] = ( self.opt_analysis["peak_valley"]["peak_valley_kwh"] * (self.inline_var["section_p"]["price"] - self.inline_var["section_v"]["price"]) * 12.0) economic_value["max_demand_year_income"] = \ self.opt_analysis["max_demand"]["max_demand_benifit"] * 12.0 economic_value["ttl_income"] = economic_value[ "peak_valley_year_income"] + \ economic_value["max_demand_year_income"] economic_value["static_invest_years"] = economic_value["ttl_invest"] / ( economic_value["ttl_income"] + 0.0001) economic_value["invest_income_table"] = self.invest_income_table( economic_value["ttl_invest"]) return economic_value @staticmethod def fix_decimal_points(v, decimal_num=4): """ 将浮点型的值固定小数位, 默认保留4位小数位 :param v: 浮点型的值 """ rlt = v fmt = "%.df" fmt = fmt.replace('d', str(decimal_num)) if isinstance(v, float): s = fmt % v rlt = float(s) return rlt def fix_output_point(self): for key in self.capacity.keys(): self.capacity[key] = self.fix_decimal_points(self.capacity[key], decimal_num=2) for key in self.economic_evaluate.keys(): if key != "invest_income_table": self.economic_evaluate[key] = self.fix_decimal_points( self.economic_evaluate[key], decimal_num=2) def output(self): if self.flag: self.cal_opt_curve() self.opt_analysis = self.optimize_analysis() self.economic_evaluate = self.cal_economic_evaluate() self.fix_output_point() if __name__ == "__main__": # file = r"D:\工作资料\算法\load_curve.xlsx" # df_curve = pd.read_excel(file) import os path = "D:/工作资料/算法/优化工具/test" os.chdir(path) inline_var = { "inline_capacity": 40000.0, # 进线容量,kVA, # "capacity_price": 23.0, #容量电费 "max_demand_price": 32.0, # 需量电费 # "section_s":{"price":0, "ttl_kwh", 0, "time_range": "14:00-17:00;19:00-22:00"}, "section_p": {"price": 0.8286, "ttl_kwh": 122503.30078125, "time_range": "14:00-17:00;19:00-22:00"}, "section_f": {"price": 0.5022, "ttl_kwh": 225055.927734375, "time_range": "08:00-14:00;17:00-19:00;22:00-24:00"}, "section_v": {"price": 0.2511, "ttl_kwh": 69800.4599609375, "time_range": "00:00-8:00"} } # 电量数据为完整月份计量中,峰时段最小电量的月份 ess_system = { "cell_price": 2000, # 电池单价,元/kWh "pcs_price": 300, # pcs单价,元/kW "other_ttl_charge": 0, # 其他费用总价,元 "pcs_efficiency": 95, # pcs转换效率 "bat_efficiency": 95, # 充放电效率 "decay_rate": 5.0, # 衰减率 # "evaluate_year": "5-10", #评估年限 # "invest_income_rate": (15, 12, 10, 8, 6), #投资收益率 "DOD": 70.0, # 放电深度 "year_use_days": 330.0, # 一年可利用时间 "charge_C_rate": 0.8, # 充放电倍率 # "res_value_bat": 30, #电池残值 # "loop_time": "3000-5000" #循环次数 } df_load = {} df_load["quarter_time"] = pd.date_range("2019-01-01", "2019-01-02", freq="0.25H")[:-1] df_load["p"] = ([2000.0] * 24 + [2050.0, 2100.0, 2150.0, 2200.0] + [2650.0, 3100.0, 3550.0, 4000.0] + [ 4500.0, 5200.0, 6000.0] + [6800.0, 7900.0, 8600.0, 8800.0] + [9200.0, 8500.0, 7700.0, 7200.0] + [7000.0, 6900.0, 6800.0, 6700.0] + list( np.arange(6500.0, 7501.0, 125.0)) + list( np.arange(7500.0, 7001.0, -62.5)) + list( np.arange(7000.0, 4001.0, -375.0)) + list( np.arange(4000.0, 2001.0, -83.3))) df_curve = pd.DataFrame(df_load) # df_curve = pd.read_csv("load1.csv", index_col=0) df_curve.loc[:, "quarter_time"] = pd.to_datetime( df_curve.loc[:, "quarter_time"]) df_curve["p"].plot() import time t1 = time.time() obj = EssOptimizationTool(inline_var, ess_system, df_curve) obj.output() print("runtime", time.time() - t1) try: import matplotlib.pyplot as plt obj.opt_curve.plot() plt.show() except Exception: pass print( "**********************************************************************************************************") print("优化分析") print( "经济运行 存在空间{economic_operation_flag},可降低峰荷{reduce_peak}kW,最高负载率降低为{reduce_load_factor}".format( **obj.opt_analysis["economic_operation"])) print("需量控制 存在空间{max_demand_flag},可降低用电成本{max_demand_benifit}元/月".format( **obj.opt_analysis["max_demand"])) print( "移峰填谷{peak_valley_flag} 可减少峰段电网用电月{peak_valley_kwh}度/月,消纳光伏用电费用需根据用户与光伏电站协商电价确定".format( **obj.opt_analysis["peak_valley"])) print( "***********************************************************************************************************") print("容量规模") print("PCS功率 {PCS}kW 电池容量 {battery}kWh".format(**obj.capacity)) print( "************************************************************************************************************") print("经济测算") print("投资额:{ttl_invest}元 折算天数:{year_use_days}天 \n" "移峰填谷年收益:{peak_valley_year_income}元 需量控制年收益:{max_demand_year_income}元 年总收益:{ttl_income}元 静态投资回报年限:{static_invest_years}年".format( **obj.economic_evaluate)) print(obj.economic_evaluate["invest_income_table"]) pd.set_option('display.max_rows', None) # print(obj.opt_curve)