#!/usr/bin/python # -*- coding: utf-8 -*- import pandas as pd import numpy as np import copy import datetime import numpy_financial as npf class EssEvaluateTool(object): """储能测算工具 price= { "epc_price": 1.4, #epc单价,元/Wh "bank_interest": 0.085, # 折现率 "capacity_price": 23.0, #容量电价 "max_demand_price": 32.0, #需量电价 "peak_valley_price": 0.9, #峰谷价差,元/kWh "peak_flat_price": 0.3, #峰平价差,元/kWh "kwh_subsidy": 0.3, #度电补贴,元/kWh #"section_s":{"time_range": "14:00-17:00;19:00-22:00"}, "section_f":{"time_range": "12:00-17:00;19:00-22:00"}, "section_p":{"time_range": "08:00-12:00;17:00-19:00;22:00-24:00"}, "section_v":{"time_range": "00:00-8:00"} } ess_system ={ "capacity": 40000.0, #工厂容量,kVA, "rule":1, # 一充一放或两充两放 "install_capacity":5000, # kWh,100-5000,000kWh "bat_efficiency": 0.95, #电池效率 "pcs_efficiency": 0.95, #pcs转换效率 "DOD": 0.9, #放电深度 "decay_rate": 0.03, #衰减率 "maintenance_ratio_per_year":0.02, # 年运维费用占静态投资额比例 "year_use_days": 330.0, #一年可利用时间 "evaluate_year": 10, #评估年限 "subsidy_year": 10, #补贴年限 #"invest_income_rate": (15, 12, 10, 8, 6), #投资收益率 "loop_time": 5000 #循环次数 } max_demand_var={"flag":True, "pmax":0} df_curve:pandas.Series """ def __init__(self, ess_system, price, max_demand_var, df_curve): self.ess_system = ess_system self.price = price self.max_demand_var = max_demand_var self.df_curve = df_curve self.all_index_sections = 0 self.sf_flag = False self.time_sections = self._check_time_section() self.curve = self._merge_curve(df_curve) self.invest_capacity = self.cal_invest_capacity() self.pcs_capacity = self.invest_capacity / 2 self.invest_charge = self.cal_invest_charge() self.maintenance_year_charge = self.cal_maintenance_year_charge() self.first_year_dc_kwh = self.cal_first_year_dc_kwh() self.month_average_dc_kwh = self.first_year_dc_kwh / 12 self.first_year_dc_benefit = self.cal_first_year_dc_benefit() self.month_dc_benefit = self.first_year_dc_benefit / 12 self.first_year_subsidy = self.cal_first_year_subsidy() self.max_demand_benifit = self.cal_max_demand_benefit() self.evaluate_table = 0 self.opt_curve = 0 self.static_period = 0 self.dynamic_period = 0 self.total_subsidy = 0 self.irr = 0 def cal_invest_capacity(self): rst = self.ess_system["install_capacity"] if rst > self.ess_system["capacity"] / 2: rst = self.ess_system["capacity"] / 2 return rst def cal_invest_charge(self): rst = self.invest_capacity * self.price["epc_price"] * 1000 return rst def cal_maintenance_year_charge(self): rst = self.ess_system[ "maintenance_ratio_per_year"] * self.invest_charge return rst def cal_first_year_dc_benefit(self): kwh = self.first_year_dc_kwh if self.ess_system["rule"] <= 1.5: rst = self.price["peak_valley_price"] * kwh else: rst = (self.price["peak_valley_price"] + self.price[ "peak_flat_price"]) * kwh return rst def cal_first_year_dc_kwh(self): kwh = self.invest_capacity * self.ess_system["DOD"] * self.ess_system[ "pcs_efficiency"] * self.ess_system["bat_efficiency"] if self.ess_system["rule"] <= 1.5: rst = kwh * self.ess_system["year_use_days"] else: rst = kwh * self.ess_system["year_use_days"] return rst def cal_first_year_subsidy(self): kwh = self.first_year_dc_kwh rst = self.price["kwh_subsidy"] * kwh return rst def cal_max_demand_benefit(self): if self.max_demand_var["flag"]: rst = self.invest_capacity / 2 * self.price[ "max_demand_price"] * 12 else: base_charge = self.price["capacity_price"] * self.ess_system[ "capacity"] max_demand_charge = self.price["max_demand_price"] * ( self.max_demand_var["pmax"] - self.invest_capacity / 2) if base_charge < max_demand_charge: rst = 0 else: rst = (base_charge - max_demand_charge) * 12 return rst def _merge_curve(self, df_curve): df = copy.deepcopy(self.time_sections) # index,quarter_time,spfv_flag df["quarter_time"] = df["quarter_time"].apply( lambda x: datetime.datetime.strftime(x, "%H:%M:%S")) load_curve = copy.deepcopy(df_curve) load_curve["quarter_time"] = load_curve["quarter_time"].apply( lambda x: datetime.datetime.strftime(x, "%H:%M:%S")) df_new = pd.merge(df[["quarter_time", "spfv_flag"]], load_curve[["quarter_time", "p"]], how="left", on="quarter_time") df_new.rename(columns={"p": "load_curve"}, inplace=True) return df_new def time_15min_parse(self, time_range, key): time_sections = time_range.split(";") sections = [] str_sections1 = [] index_section = {} for section in time_sections: str_sections = [] section_range = section.split("-") start_time = pd.Timestamp(section_range[0]) if section_range[1] == "24:00": section_range[1] = "00:00" stop_time = pd.Timestamp(section_range[1]) + pd.Timedelta( days=1) else: stop_time = pd.Timestamp(section_range[1]) current_time = start_time # + pd.Timedelta(seconds=900) while True: if current_time < stop_time: sections.append(current_time) str_sections.append(current_time.strftime("%H:%M:%S")) current_time += pd.Timedelta(seconds=900) else: break str_sections1.append(str_sections) index_section[key] = str_sections1 return sections, index_section def _check_time_section(self): param = {"section_s": 3, "section_p": 2, "section_f": 1, "section_v": 0} hour = [] spfv_flag = [] all_index_sections = {} for key in self.price.keys(): if key in param.keys(): time_point, index_section = self.time_15min_parse( self.price[key]["time_range"], key) hour.extend(time_point) num_15mins = len(time_point) spfv_flag.extend([param[key]] * num_15mins) all_index_sections.update(index_section) self.all_index_sections = all_index_sections time_section = pd.DataFrame( {"quarter_time": hour, "spfv_flag": spfv_flag}) time_section = time_section.sort_values(by="quarter_time") time_section.index = range(96) index_sf = list(time_section["spfv_flag"]).index(1) if index_sf: if time_section["spfv_flag"][index_sf - 1] == 0: self.sf_flag = True return time_section def cal_opt_curve(self): df = copy.deepcopy(self.curve) df.set_index("quarter_time", inplace=True) df["bat_curve"] = 0 p_charge = 4 * self.invest_capacity / len( self.all_index_sections["section_v"][0]) if p_charge > self.invest_capacity / 2: p_charge = self.invest_capacity / 2 df.loc[self.all_index_sections["section_v"][0], "bat_curve"] = p_charge if self.ess_system["rule"] <= 1.5: if "section_s" in self.all_index_sections.keys(): p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_s"][0]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_s"][ 0], "bat_curve"] = p_discharge else: p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_p"][0]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_p"][ 0], "bat_curve"] = p_discharge else: p_charge = 4 * self.invest_capacity / len( self.all_index_sections["section_f"][0]) if p_charge > self.invest_capacity / 2: p_charge = self.invest_capacity / 2 if self.sf_flag: df.loc[self.all_index_sections["section_f"][ 1], "bat_curve"] = p_charge else: df.loc[self.all_index_sections["section_f"][ 0], "bat_curve"] = p_charge if "section_s" in self.all_index_sections.keys(): p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_s"][0]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_s"][ 0], "bat_curve"] = p_discharge p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_p"][0]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_p"][ 0], "bat_curve"] = p_discharge else: p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_p"][0]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_p"][ 0], "bat_curve"] = p_discharge p_discharge = -4 * self.invest_capacity / len( self.all_index_sections["section_p"][1]) if -p_discharge > self.invest_capacity / 2: p_discharge = -self.invest_capacity / 2 df.loc[self.all_index_sections["section_p"][ 1], "bat_curve"] = p_discharge df["load_bat_curve"] = df["load_curve"] + df["bat_curve"] self.opt_curve = df[["load_curve", "bat_curve", "load_bat_curve"]] def cal_evaluate_table(self): table_title = ["年份", "剩余容量", "固定成本", "累计充放数", "充放电收益", "补贴收益", "维护成本", "累计总成本", "本年收益", "累计收益"] decay_rate = ([0] + [self.ess_system["decay_rate"]] * ( self.ess_system["evaluate_year"] - 1)) bank_interest = [1 / (1 + self.price["bank_interest"])] * \ self.ess_system["evaluate_year"] charge_output = [- self.invest_charge] + [0] * ( self.ess_system["evaluate_year"] - 1) delta_year = max( self.ess_system["evaluate_year"] - self.ess_system["subsidy_year"], 0) subsidy_coef = [1] * self.ess_system["subsidy_year"] + [0] * delta_year data = pd.DataFrame( {"年份": range(1, (self.ess_system["evaluate_year"] + 1)), "年衰减率": decay_rate, "固定成本": charge_output, "折现率": bank_interest, "补贴系数": subsidy_coef}, columns=["年份", "年衰减率", "固定成本", "折现率", "补贴系数"]) data["剩余容量"] = 1 - data["年衰减率"] data["剩余容量"] = data["剩余容量"].cumprod() data["累计充放数"] = self.ess_system["year_use_days"] * self.ess_system[ "rule"] data["累计充放数"] = data["累计充放数"].cumsum() data["维护成本"] = -self.maintenance_year_charge index_last_temp = data["累计充放数"][ data["累计充放数"] > self.ess_system["loop_time"]] if len(index_last_temp) > 0: index_last = index_last_temp.index[0] data.loc[index_last, "累计充放数"] = self.ess_system["loop_time"] data.loc[index_last, "维护成本"] = (-( self.ess_system["loop_time"] - data.loc[ index_last - 1, "累计充放数"]) / self.ess_system["year_use_days"] / self.ess_system[ "rule"] * self.maintenance_year_charge) else: index_last = len(data) data.loc[ data["累计充放数"] > self.ess_system["loop_time"], ["剩余容量", "累计充放数", "维护成本"]] = 0 data["折现率"] = data["折现率"].cumprod() data["充放电收益"] = data["剩余容量"] * self.first_year_dc_benefit data["补贴收益"] = data["剩余容量"] * self.first_year_subsidy * data["补贴系数"] self.total_subsidy = data["补贴收益"].sum() data["累计总成本"] = (data["维护成本"] + data["固定成本"]).cumsum() data["本年收益"] = data["充放电收益"] + data["维护成本"] + data["补贴收益"] data["本年折现收益"] = data["本年收益"] * data["折现率"] data["累计收益"] = (data["本年收益"] + data["固定成本"]).cumsum() data["折现累计收益"] = (data["本年折现收益"] + data["固定成本"]).cumsum() static_temp = data["累计收益"][data["累计收益"] > 0] if len(static_temp) > 0: static_index = list(static_temp.index)[0] static_index = static_index - 1 if static_index > 0 else 0 self.static_period = data.loc[static_index, "年份"] - data.loc[ static_index, "累计收益"] / (data.loc[ static_index + 1, "本年收益"] + 0.001) else: self.static_period = -1 dynamic_temp = data["折现累计收益"][data["折现累计收益"] > 0] if len(dynamic_temp) > 0: dynamic_index = list(dynamic_temp.index)[0] dynamic_index = dynamic_index - 1 if dynamic_index > 0 else 0 self.dynamic_period = data.loc[dynamic_index, "年份"] - data.loc[ dynamic_index, "折现累计收益"] / (data.loc[ dynamic_index + 1, "本年折现收益"] + 0.001) else: self.dynamic_period = -1 self.irr = npf.irr([- self.invest_charge] + list(data["本年收益"].values)) return data.loc[:index_last, table_title] @staticmethod def fix_decimal_points(v, decimal_num=4): """ 将浮点型的值固定小数位, 默认保留4位小数位 :param v: 浮点型的值 """ rlt = v fmt = "%.df" fmt = fmt.replace('d', str(decimal_num)) if isinstance(v, float): s = fmt % v rlt = float(s) return rlt def output(self): self.input_param_process() self.cal_opt_curve() data = self.cal_evaluate_table() for key in data.columns: data[key] = data[key].apply(lambda x: self.fix_decimal_points(x)) for key in ["load_curve", "bat_curve"]: self.opt_curve[key] = self.opt_curve[key].apply( lambda x: self.fix_decimal_points(x)) self.evaluate_table = data def input_param_process(self): if self.ess_system["rule"] <= 1.5: self.ess_system["rule"] = 1 elif self.ess_system["rule"] > 1.5: self.ess_system["rule"] = 2 if self.ess_system["bat_efficiency"] <= 0: self.ess_system["bat_efficiency"] = 0.0 elif self.ess_system["bat_efficiency"] >= 1: self.ess_system["bat_efficiency"] = 1 if self.ess_system["pcs_efficiency"] <= 0: self.ess_system["pcs_efficiency"] = 0.0 elif self.ess_system["pcs_efficiency"] >= 1: self.ess_system["pcs_efficiency"] = 1 if self.ess_system["DOD"] <= 0: self.ess_system["DOD"] = 0.0 elif self.ess_system["DOD"] >= 1: self.ess_system["DOD"] = 1 if self.ess_system["decay_rate"] <= 0: self.ess_system["decay_rate"] = 0.0 elif self.ess_system["decay_rate"] >= 1: self.ess_system["decay_rate"] = 1 if self.ess_system["maintenance_ratio_per_year"] <= 0: self.ess_system["maintenance_ratio_per_year"] = 0.0 elif self.ess_system["maintenance_ratio_per_year"] >= 1: self.ess_system["maintenance_ratio_per_year"] = 1 if self.ess_system["subsidy_year"] >= self.ess_system["evaluate_year"]: self.ess_system["subsidy_year"] = self.ess_system["evaluate_year"] if __name__ == "__main__": pd.set_option('display.max_rows', 200) import os path = os.path.split(__file__)[0] os.chdir(path) price = { "epc_price": 1.4, # epc单价,元/Wh "bank_interest": 0.085, # 折现率 "capacity_price": 23.0, # 容量电费 "max_demand_price": 32.0, # 需量电费 "peak_valley_price": 0.9, # 峰谷价差,元/kWh "peak_flat_price": 0.3, # 峰平价差,元/kWh "kwh_subsidy": 0.1, # 度电补贴 "section_s": {"time_range": "10:00-12:00;17:00-18:00"}, "section_f": {"time_range": "12:00-13:00;19:00-22:00"}, "section_p": {"time_range": "13:00-17:00;18:00-19:00"}, "section_v": {"time_range": "00:00-10:00;22:00-24:00"} } ess_system = { "capacity": 40000.0, # 工厂容量,kVA, "rule": 2, # 一充一放或两充两放 "install_capacity": 5000, # kWh,100-5000,000kWh "bat_efficiency": 0.95, # 电池效率 "pcs_efficiency": 0.95, # pcs转换效率 "DOD": 0.9, # 放电深度 "decay_rate": 0.03, # 衰减率 "maintenance_ratio_per_year": 0.02, # 年运维费用占静态投资额比例 "year_use_days": 330.0, # 一年可利用时间 "evaluate_year": 7, # 评估年限 "subsidy_year": 10, # 补贴年限 # "invest_income_rate": (15, 12, 10, 8, 6), #投资收益率 "loop_time": 5000 # 循环次数 } max_demand_var = {"flag": True, "pmax": 0} df_load = {} df_load["quarter_time"] = pd.date_range("2019-01-01", "2019-01-02", freq="0.25H")[:-1] df_load["p"] = ([2000.0] * 24 + [2050.0, 2100.0, 2150.0, 2200.0] + [2650.0, 3100.0, 3550.0, 4000.0] + [ 4500.0, 5200.0, 6000.0] + [6800.0, 7900.0, 8600.0, 8800.0] + [9200.0, 8500.0, 7700.0, 7200.0] + [7000.0, 6900.0, 6800.0, 6700.0] + list( np.arange(6500.0, 7501.0, 125.0)) + list( np.arange(7500.0, 7001.0, -62.5)) + list( np.arange(7000.0, 4001.0, -375.0)) + list( np.arange(4000.0, 2001.0, -83.3))) df_curve = pd.DataFrame(df_load) # df_curve = pd.read_csv("load1.csv", index_col=0) df_curve.loc[:, "quarter_time"] = pd.to_datetime( df_curve.loc[:, "quarter_time"]) import time t1 = time.time() obj = EssEvaluateTool(ess_system, price, max_demand_var, df_curve) obj.output() print( "**********************************************************************************************************") print(obj.evaluate_table) print(obj.evaluate_table["累计充放数"]) print(obj.max_demand_benifit) print(obj.invest_capacity) print(obj.invest_charge) print(obj.static_period) print(obj.dynamic_period) print(obj.irr) print(obj.total_subsidy) print(obj.pcs_capacity) print(obj.month_average_dc_kwh) print(obj.month_dc_benefit)