# -*- coding: utf-8 -*- """ calc distributed photovoltaic optimation for companies. This is a tornado process and responds request from web back server. """ import pendulum import datetime import pandas as pd from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import ENERGY_INVESTMENT_PV from unify_api.modules.energy_optimize.service.energy_store_optimize import \ AutoDic from unify_api.modules.energy_optimize.service.ess_utils import \ PricePolicyHelper from unify_api.modules.energy_optimize.service.pv_optimation_tool import \ PvOptimizationTool class PhotovoltaicOptimize(object): def __init__(self, inlid): self._inlid = inlid async def calc_inline(self, pv_params): rlt = {'rlt_flag': True} inl_info = await self._get_inline_info() inline_vc = inl_info['inline_vc'] cid = inl_info['cid'] city = await self._get_company_city(cid) df_pv = await self._construct_pv_curve(city) # construct df_pv if len(df_pv) == 0: rlt['rlt_flag'] = False rlt['message'] = "无光伏典型出力曲线,请联系工作人员!" return rlt max_dt = await self._find_kwh_max_day() # 00:00:00 of the max kwh day if not max_dt: rlt['rlt_flag'] = False rlt['message'] = '暂无' return rlt pp = await self._get_company_price_policy(cid) pp_info_d = PricePolicyHelper.map_price_policy(pp, inline_vc, max_dt.int_timestamp) time_str_d = PricePolicyHelper.quarter_chars_2_time_str( pp_info_d['quarters']) # construct pv_system price_type = pp_info_d['price_type'] pv_system = await self._construct_pv_system(pv_params, price_type) # construct price price = self._construct_price(pv_params, pp_info_d, time_str_d) # construct env_benifit env_benifit = self._construct_env_benifit() # construct inline_var tc_runtime = inl_info['tc_runtime'] inline_var = await self._construct_inline_var(tc_runtime) # construct df_load df_load = await self._construct_load_curve(max_dt) # logger.info('pv_system: %s', pv_system) # logger.info('price: %s', price) # logger.info('env_benifit: %s', env_benifit) # logger.info('inline_var: %s', inline_var) # logger.info('df_load: %s', df_load) # logger.info('df_pv: %s', df_pv) pv_ot = PvOptimizationTool(pv_system, price, env_benifit, df_load, df_pv, inline_var) pv_ot.output() # assemble return value rlt['install_cap'] = self._assemble_install_cap(pv_ot) rlt['invest_evaluate'] = self._assemble_invest_evaluate(pv_ot) rlt['opt_analysis'] = pv_ot.opt_analysis rlt['opt_curve'] = self._assemble_opt_curve(pv_ot) return rlt def _assemble_install_cap(self, pv_ot): install_cap = { 'capacity': pv_ot.invest_capacity["capacity"], 'install_space': pv_ot.pv_system["install_space"], 'ttl_invest': pv_ot.invest_capacity["ttl_invest"], 'first_year_ttl_kwh': pv_ot.invest_capacity["first_year_ttl_kwh"] } return install_cap def _assemble_invest_evaluate(self, pv_ot): cost_per_kwh = (pv_ot.price["rmb_per_wp"] - pv_ot.price["first_install_subsidy"]) invest_income = pv_ot.invest_evaluate["invest_income"] first_year_ttl_kwh = pv_ot.invest_capacity["first_year_ttl_kwh"] first_year_month_income = invest_income["first_year_month_income"] first_year_income = invest_income["first_year_income"] first_year_income_rate = invest_income["first_year_income_rate"] invest_income_year = invest_income["invest_income_year"] i_and_r = { 'user_type': pv_ot.pv_system["user_type"], 'ttl_invest': pv_ot.invest_capacity["ttl_invest"], 'capacity': pv_ot.invest_capacity["capacity"], 'cost_per_kwh': cost_per_kwh, 'install_space': pv_ot.pv_system["install_space"], 'peak_sunshine_hours': pv_ot.pv_system["peak_sunshine_hours"], 'self_use_ratio': pv_ot.pv_system["self_use_ratio"], 'self_use_price_discout': pv_ot.price["self_use_price_discout"], 'sel_use_per_kwh': pv_ot.price["sel_use_per_kwh"], 'local_subsidy': pv_ot.price["local_subsidy"], 'first_install_subsidy': pv_ot.price['first_install_subsidy'], 'state_subsidy': pv_ot.price["state_subsidy"], 'first_year_ttl_kwh': first_year_ttl_kwh, 'first_year_month_income': first_year_month_income, 'first_year_income': first_year_income, 'first_year_income_rate': first_year_income_rate, 'invest_income_year': invest_income_year } env_benifit = pv_ot.invest_evaluate["env_benifit_per_year"] c_footprint = { 'families': env_benifit['one_family_kwh'], 'tree': env_benifit['tree'], 'coal': env_benifit['Coal'], 'CO2': env_benifit['CO2'], 'SO2': env_benifit['SO2'], 'NOx': env_benifit['NOx'], 'smoke': env_benifit['Smoke'], 'H2O': env_benifit['H2O'] } invest_evaluate = {'i_and_r': i_and_r, 'carbon_footprint': c_footprint} return invest_evaluate def _assemble_opt_curve(self, pv_ot): rlt = [] for idx, row in pv_ot.opt_curve.iterrows(): tmpd = {} tmpd['quarter_time'] = idx tmpd['load_curve'] = row['load_curve'] tmpd['load_pv_curve'] = row['load_pv_curve'] tmpd['pv_curve'] = row['pv_curve'] rlt.append(tmpd) return rlt async def _construct_pv_system(self, pv_params, price_type): area = pv_params['install_space'] # ratio fixed, convert to decimal, web backend just pass us a # percent, we need to divide 100. ratio = pv_params['self_use_ratio'] / 100 sql = ("select pv.peak_sunshine_hours from inline i, company c, " "algo_distributed_pv_quick_check_list pv " "where i.inlid=%s and i.cid=c.cid and " "c.city=pv.city;") async with MysqlUtil() as conn: hours = await conn.fetchone(sql, (self._inlid,)) or {} hours = hours.get("peak_sunshine_hours") or 0 annual_hours = hours * 365 # peak_sunshine_hours means annual_peak_sunshine_hours, the name # in algorithm is misleading pv_system = { "user_type": price_type, # 工商业 "install_space": area, # 安装面积m2 "efficiency": 0.8, # 光伏发电效率 "first_year_decay_rate": 2.5, # 首年衰减率 "first10_year_decay_rate": 0.8, # 第2-10年衰减率 "other_year_decay_rate": 0.7, # 第11-25年衰减率 "evaluate_year": 25, # 评估年限 "self_use_ratio": ratio, # 自发自用比例 "peak_sunshine_hours": annual_hours # 年峰值日照小时数 } return pv_system def _construct_price(self, pv_params, pp_info_d, time_str_d): cons_p = pv_params['rmb_per_wp'] user_p = pv_params['sel_use_per_kwh'] user_p_dis = pv_params['self_use_price_discout'] first_sbsy = pv_params['first_install_subsidy'] local_sbsy = pv_params['local_subsidy'] local_year = pv_params['local_subsidy_year'] price_md = pp_info_d['price_md'] price_d = { "rmb_per_wp": cons_p, # 建设单价 "self_use_price_discout": user_p_dis, # 自发自用电价折扣 "sel_use_per_kwh": user_p, # 自发自用电价 # "state_subsidy": 0.42, # 国家补贴 "state_subsidy": 0.0, # 国家补贴 "local_subsidy": local_sbsy, # 地方补贴 "local_subsidy_year": local_year, # 地方补贴年限 "first_install_subsidy": first_sbsy, # 初始安装补贴 "coal_in_grid": 0.43, # 脱硫电价 "max_demand": price_md, "spfv_price": {} } sfpv_price = price_d['spfv_price'] if pp_info_d['price_s']: sct = self._construct_section('s', pp_info_d, time_str_d) sfpv_price['section_s'] = sct if pp_info_d['price_p']: sct = self._construct_section('p', pp_info_d, time_str_d) sfpv_price['section_p'] = sct if pp_info_d['price_f']: sct = self._construct_section('f', pp_info_d, time_str_d) sfpv_price['section_f'] = sct if pp_info_d['price_v']: sct = self._construct_section('v', pp_info_d, time_str_d) sfpv_price['section_v'] = sct return price_d def _construct_env_benifit(self): env_benifit_param = { "one_family_kwh": 3600, # 一户家庭一年用3600度电 "CO2": 0.592, # 二氧化碳排放kg/kWh "SO2": 0.0002, # 二氧化硫排放kg/kWh "NOx": 0.00019, # 氮氧化合物kg/kWh "Smoke": 0.00004, # 烟尘kg/kWh "Coal": 0.3076, # 煤耗kg/kWh "H2O": 0.06, # 纯净水m3/kWh "tree": 18.3 # 1棵树1年可吸收18.3千克CO2 } return env_benifit_param def _construct_section(self, p_char, pp_info_d, time_str_d): """ contruct section_x for price_d.""" section = {'price': pp_info_d['price_' + p_char]} time_range_str = ';'.join(time_str_d[p_char]) section['time_range'] = time_range_str return section async def _build_kwh_charge_sum_lastest_30(self, p_char): """ build es query sentance for get kwh sum and charge sum within lastest 30 days for specified p_char. """ sql = f""" select sum(kwh) kwh,sum(charge) charge from inline_15min_power where inlid = %s and spfv = %s and create_time >= % s and create_time < %s """ dt = pendulum.now() dt_1_month_ago = dt.subtract(days=30) start_time = dt_1_month_ago.format("YYYY-MM-DD HH:mm:ss") end_time = dt.format("YYYY-MM-DD HH:mm:ss") async with MysqlUtil() as conn: result = await conn.fetchone(sql, args=(self._inlid, p_char, start_time, end_time)) return result or {} async def _construct_inline_var(self, inline_tc): inline_var = {'inline_capacity': inline_tc} result = await self._build_kwh_charge_sum_lastest_30("s") # search_rlt = self._es.search(inline_15min_power_esindex, q) charge_s = result.get("charge") or 0 kwh_s = result.get("kwh") or 0 result = await self._build_kwh_charge_sum_lastest_30("p") # search_rlt = self._es.search(inline_15min_power_esindex, q) charge_p = result.get("charge") or 0 kwh_p = result.get("kwh") or 0 # add 's' and 'p', because algorithm needs these charge_sp = charge_s + charge_p kwh_sp = kwh_s + kwh_p inline_var['peak_charge'] = charge_sp inline_var['peak_kwh'] = kwh_sp result = await self._build_kwh_charge_sum_lastest_30("f") # search_rlt = self._es.search(inline_15min_power_esindex, q) charge_f = result.get("charge") or 0 kwh_f = result.get("kwh") or 0 inline_var['flat_charge'] = charge_f inline_var['flat_kwh'] = kwh_f return inline_var async def _build_load_curve(self, start_dt): end_dt = start_dt.add(days=1) start_time = start_dt.format("YYYY-MM-DD HH:mm:ss") end_time = end_dt.format("YYYY-MM-DD HH:mm:ss") sql = f""" select create_time,p from inline_15min_power where inlid = %s and create_time >= %s and create_time < %s order by create_time asc limit 100 """ async with MysqlUtil() as conn: results = await conn.fetchall(sql, args=(self._inlid, start_time, end_time)) return results or [] async def _construct_load_curve(self, start_dt): hits_list = await self._build_load_curve(start_dt) # hits_list is already sorted by quarter_time asc kw_list = [] for item in hits_list: kw_list.append({ 'quarter_time': item.get('create_time'), 'load_curve': item.get('p') }) df = pd.DataFrame(kw_list) return df async def _construct_pv_curve(self, city): sql = "select hour, p from algo_distributed_pv where city=%s " \ "order by hour asc" async with MysqlUtil() as conn: sql_rlt = await conn.fetchall(sql, args=(city,)) pv_list = [] for item in sql_rlt: dt = pendulum.datetime(2019, 1, 1, tz='Asia/Shanghai') dt = dt + item['hour'] # item['hour'] is a timedelta object qrt_dt = datetime.datetime(year=dt.year, month=dt.month, day=dt.day, hour=dt.hour, minute=dt.minute, second=dt.second) pv_list.append({'quarter_time': qrt_dt, 'pv_curve': item['p']}) df = pd.DataFrame(pv_list) return df async def _get_inline_info(self): """ get inline_vc, tc_runtime, cid from redis. :return: a dict """ sql = "SELECT inline_vc, tc_runtime, cid cid from " \ "inline where inlid = %s" async with MysqlUtil() as conn: info = await conn.fetchone(sql, args=(self._inlid,)) rlt = {'inline_vc': info['inline_vc'], 'tc_runtime': info['tc_runtime'], 'cid': info['cid']} return rlt async def _get_company_price_policy(self, cid): result = AutoDic() sql = 'SELECT * FROM price_policy where cid = %s' async with MysqlUtil() as conn: policies = await conn.fetchall(sql, (cid,)) for policy in policies: result[str(policy['inline_vc'])][str(policy['start_month'])][ policy['time_range']] = policy return result async def _get_company_city(self, cid): sql = "SELECT city from company where cid = %s" async with MysqlUtil() as conn: c_info = await conn.fetchone(sql, (cid,)) # company_j = self._r_cache.hget(company_hashname, str(cid)) # c_info = json.loads(company_j) return c_info['city'] async def _find_kwh_max_day(self): sql = f""" select create_time from inline_1day_power where inlid = %s and create_time >=%s and create_time < %s order by kwh desc limit 1; """ # search_rlt = self._es.search(inline_1day_power_esindex, q) dt = pendulum.now() dt_half_year_ago = dt.subtract(months=6) start_time = dt_half_year_ago.format("YYYY-MM-DD HH:mm:ss") end_time = dt.format("YYYY-MM-DD HH:mm:ss") async with MysqlUtil() as conn: result = await conn.fetchone(sql, args=(self._inlid, start_time, end_time)) if not result: return None max_dt = result.get("create_time").strftime("%Y-%m-%d %H:%M:%S") return pendulum.parse(max_dt) async def pv_out_result(inlid, params): """结果输出函数""" # get cid sql = "select cid from inline where inlid = %s" async with MysqlUtil() as conn: cid_info = await conn.fetchone(sql=sql, args=(inlid,)) cid = cid_info.get("cid") # get proxy_id sql = "select cpm.proxy from company c inner join company_proxy_map cpm " \ "on cpm.cid=c.cid where c.cid = %s" async with MysqlUtil() as conn: proxy_res = await conn.fetchone(sql, args=(cid,)) proxy_id = proxy_res["proxy"] sql = "insert into energy_investment_analysis_record " \ "(cid, analysis_type, inlid, proxy, time) " \ "values (%s, %s, %s, %s, %s)" ts = pendulum.now().int_timestamp async with MysqlUtil() as conn: await conn.execute(sql, args=( cid, ENERGY_INVESTMENT_PV, inlid, proxy_id, ts)) # handle request pv_optimize = PhotovoltaicOptimize(inlid) algo_rlt = await pv_optimize.calc_inline(params) return algo_rlt