from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.modules.common.procedures.cids import get_cid_info from unify_api.modules.common.procedures.power_cps import inline_power_use_info async def get_inline_datas_dao(cids): inline_sql = f"select `inlid`, `name`, cid cid, " \ f"`inline_tc`, `tc_runtime` from `inline` " \ f"where cid in %s order by cid" async with MysqlUtil() as conn: inlines = await conn.fetchall(inline_sql, args=(cids,)) if cids else [] inline_map = {i["inlid"]: i for i in inlines} inline_cid_map = {i["inlid"]: i["cid"] for i in inlines} return inline_map, inline_cid_map async def get_power_factor_results(inline_ids, month_str): async with MysqlUtil() as conn: pf_sql = f"select inlid,cos,save_charge pf_cost,save_charge, kpi_x " \ f" from algo_power_factor_result where inlid in %s " \ f"and month = %s" power_factors = ( await conn.fetchall(pf_sql, args=( inline_ids, month_str)) if inline_ids else [] ) return power_factors async def proxy_power_factor_summary(cids, month_str, inline_map=None): if not inline_map: inline_map, inline_cid_map = await get_inline_datas_dao(cids) inline_ids = list(inline_map.keys()) power_factors = await get_power_factor_results(inline_ids, month_str) unqualified_cnt = 0 first_level_cnt = 0 second_level_cnt = 0 third_level_cnt = 0 fourth_level_cnt = 0 fifth_level_cnt = 0 total_save_charge = round( sum( [ round(i["save_charge"], 2) for i in power_factors if type(i["save_charge"]) in [int, float] and i[ "save_charge"] >= 0 ] ), 2, ) for pf_res in power_factors: power_factor = pf_res["kpi_x"] if power_factor < 0.9: unqualified_cnt += 1 if power_factor < 0.8: fifth_level_cnt += 1 elif 0.8 <= power_factor < 0.85: fourth_level_cnt += 1 elif 0.85 <= power_factor < 0.9: third_level_cnt += 1 elif 0.9 <= power_factor < 0.95: second_level_cnt += 1 else: first_level_cnt += 1 return { "total_cnt": len(power_factors), "unqualified_cnt": unqualified_cnt, "first_level_cnt": first_level_cnt, "second_level_cnt": second_level_cnt, "third_level_cnt": third_level_cnt, "fourth_level_cnt": fourth_level_cnt, "fifth_level_cnt": fifth_level_cnt, "total_save_charge": total_save_charge, } async def proxy_power_factor_list( cids, month_str, page_size=10, page_num=1, sort_field="company_name", sort_direction="asc", status_list=None, ): inline_map, inline_cid_map = await get_inline_datas_dao(cids) company_map = await get_cid_info(cids) inline_ids = list(inline_map.keys()) power_factors = await get_power_factor_results(inline_ids, month_str) all_list = [] for pf_res in power_factors: desc = "奖励" if pf_res["kpi_x"] >= 0.9 else "罚款" if status_list and desc not in status_list: continue inline_id = pf_res["inlid"] cid = inline_cid_map[inline_id] all_list.append( { "cid": cid, "company_name": company_map[cid]["shortname"], "inline_id": inline_id, "inline_name": inline_map[inline_id]["name"], "power_factor": pf_res["kpi_x"], "status": "奖励" if pf_res["kpi_x"] >= 0.9 else "罚款", "save_charge": pf_res["save_charge"] if pf_res[ "save_charge"] > 0 else 0, } ) if sort_field in ["company_name", "power_factor", "save_charge"]: all_list.sort( key=lambda x: x[sort_field], reverse=True if sort_direction == "desc" else False, ) return { "rows": all_list[page_size * (page_num - 1): page_size * page_num], "total": len(all_list), } async def get_md_space_result(inline_ids, month_str): async with MysqlUtil() as conn: sql = ( "select `month`, `inline_md_charge`, `inline_tc_charge`, " "`inline_md_predict`, `save_charge`, `kpi_x`, " "inlid `related_inlids` " "from `algo_md_space_analysis_result` " "inner join`algo_md_space_analysis_unit`" " on `algo_md_space_analysis_result`.space_analysis_id=`algo_md_space_analysis_unit`.id " "where inlid in %s and `month` = %s and valid=1;" ) md_spaces = await conn.fetchall(sql, args=(inline_ids, month_str)) return md_spaces async def proxy_md_space_list( cids, month_str, page_size=10, page_num=1, sort_field="company_name", sort_direction="asc", status_list=None, ): inline_map, inline_cid_map = await get_inline_datas_dao(cids) company_map = await get_cid_info(cids) inline_ids = list(inline_map.keys()) md_spaces = await get_md_space_result(inline_ids, month_str) all_list = [] for mdp_res in md_spaces: md_space_kpi_x = mdp_res.get("kpi_x") if type(md_space_kpi_x) in [int, float]: desc = "有空间" if md_space_kpi_x > 0 else "无空间" else: desc = "-" if status_list and desc not in status_list: continue inline_id = mdp_res["related_inlids"] cid = inline_cid_map[inline_id] all_list.append( { "cid": cid, "company_name": company_map[cid]["shortname"], "inline_id": inline_id, "inline_name": inline_map[inline_id]["name"], "inline_tc": inline_map[inline_id]["inline_tc"], "tc_runtime": inline_map[inline_id]["tc_runtime"], "status": desc, "save_charge": mdp_res["save_charge"] if mdp_res[ "save_charge"] > 0 else 0, } ) if sort_field in ["company_name", "save_charge"]: all_list.sort( key=lambda x: x[sort_field], reverse=True if sort_direction == "desc" else False, ) return { "rows": all_list[page_size * (page_num - 1): page_size * page_num], "total": len(all_list), } async def proxy_md_space_summary(cids, month_str, inline_map=None): if not inline_map: inline_map, inline_cid_map = await get_inline_datas_dao(cids) inline_ids = list(inline_map.keys()) md_spaces = await get_md_space_result(inline_ids, month_str) big_space_cnt = 0 proper_space_cnt = 0 small_space_cnt = 0 no_space_cnt = 0 exits_space_cnt = 0 total_save_charge = round( sum( [ round(i["save_charge"], 2) for i in md_spaces if type(i["save_charge"]) in [int, float] and i[ "save_charge"] >= 0 ] ), 2, ) for mdp_res in md_spaces: md_space_kpi_x = mdp_res.get("kpi_x") if type(md_space_kpi_x) in [int, float]: if md_space_kpi_x <= 0: no_space_cnt += 1 elif 0 < md_space_kpi_x <= 0.1: small_space_cnt += 1 exits_space_cnt += 1 elif 0.1 < md_space_kpi_x <= 0.2: proper_space_cnt += 1 exits_space_cnt += 1 else: big_space_cnt += 1 exits_space_cnt += 1 else: log.error(f"mdp_res={mdp_res} 无法判断需量空间") return { "total_cnt": len(md_spaces), "exits_space_cnt": exits_space_cnt, "big_space_cnt": big_space_cnt, "proper_space_cnt": proper_space_cnt, "small_space_cnt": small_space_cnt, "no_space_cnt": no_space_cnt, "total_save_charge": total_save_charge, } async def get_power_save_result(inline_ids, month_str): async with MysqlUtil() as conn: sql = "select `inlid`, `cost_loss_percent`, `mean_load_factor`, " \ "`kpi_x`, `save_charge` from `algo_economic_operation_result` " \ "where `inlid` in %s and `month` = %s" power_save_res_list = await conn.fetchall(sql, args=(inline_ids, month_str)) return power_save_res_list async def proxy_power_save_list( cids, month_str, page_size=10, page_num=1, sort_field="company_name", sort_direction="asc", ): async with MysqlUtil() as conn: inline_sql = f"select `inlid`, `name`,cid cid, " \ f"`inline_tc`, `tc_runtime` from `inline` " \ f"where cid in %s order by cid" inlines = await conn.fetchall(inline_sql, args=(cids,)) if cids else [] inline_map = {i["inlid"]: i for i in inlines} inline_cid_map = {i["inlid"]: i["cid"] for i in inlines} company_map = await get_cid_info(cids) inline_ids = list(inline_map.keys()) power_save_res_list = await get_power_save_result(inline_ids, month_str) all_list = [] for power_save_res in power_save_res_list: inline_id = power_save_res["inlid"] cid = inline_cid_map[inline_id] all_list.append( { "cid": cid, "company_name": company_map[cid]["shortname"], "inline_id": inline_id, "inline_name": inline_map[inline_id]["name"], "inline_tc": inline_map[inline_id]["inline_tc"], "tc_runtime": inline_map[inline_id]["tc_runtime"], "avg_load_rate": power_save_res["mean_load_factor"], "cos_loss_rate": power_save_res["cost_loss_percent"], "save_charge": power_save_res["save_charge"] if power_save_res["save_charge"] > 0 else 0, } ) if sort_field in ["company_name", "avg_load_rate", "save_charge"]: all_list.sort( key=lambda x: x[sort_field], reverse=True if sort_direction == "desc" else False, ) return { "rows": all_list[page_size * (page_num - 1): page_size * page_num], "total": len(all_list), } async def proxy_power_save_summary( cids, month_str, inline_map=None, ): if not inline_map: # 为了代码复用,减少数据库查询,如果外部传了进线信息,那么没必要再查询了 async with MysqlUtil() as conn: inline_sql = f"select `inlid`, `name`, cid `cid`, " \ f"`inline_tc`, `tc_runtime` from `inline` " \ f"where cid in %s order by cid" inlines = await conn.fetchall(inline_sql, args=(cids,)) if cids else [] inline_map = {i["inlid"]: i for i in inlines} inline_ids = list(inline_map.keys()) power_save_res_list = await get_power_save_result(inline_ids, month_str) big_space_cnt = 0 proper_space_cnt = 0 small_space_cnt = 0 no_space_cnt = 0 exits_space_cnt = 0 total_save_charge = round( sum( [ round(i["save_charge"], 2) for i in power_save_res_list if type(i["save_charge"]) in [int, float] and i[ "save_charge"] >= 0 ] ), 2, ) for power_save_res in power_save_res_list: kpi_x = power_save_res.get("kpi_x") if type(kpi_x) in [int, float]: if kpi_x <= 0.6: no_space_cnt += 1 elif kpi_x > 0.6 and kpi_x <= 0.7: small_space_cnt += 1 exits_space_cnt += 1 elif kpi_x > 0.7 and kpi_x <= 0.8: proper_space_cnt += 1 exits_space_cnt += 1 else: big_space_cnt += 1 exits_space_cnt += 1 else: log.error( f"power_save_res = {power_save_res} kpi_x={kpi_x}无法评价经济运行空间") return { "total_cnt": len(power_save_res_list), "exits_space_cnt": exits_space_cnt, "big_space_cnt": big_space_cnt, "proper_space_cnt": proper_space_cnt, "small_space_cnt": small_space_cnt, "no_space_cnt": no_space_cnt, "total_save_charge": total_save_charge, } async def get_pcvf_result(inline_ids, month_str): async with MysqlUtil() as conn: sql = "select `inlid`, `score`, `cost_save` from `algo_plsi_result` " \ "where `inlid` in %s and `month` = %s" power_pcvfs = await conn.fetchall(sql, args=(inline_ids, month_str)) return power_pcvfs async def proxy_pcvf_list( cids, month_str, page_size=10, page_num=1, sort_field="company_name", sort_direction="asc", ): """ 移峰填谷指数 :param cids: :param month_str: :param page_size: :param page_num: :param sort_field: :param sort_direction: :return: """ inline_map, inline_cid_map = await get_inline_datas_dao(cids) company_map = await get_cid_info(cids) inline_ids = list(inline_map.keys()) power_pcvfs = await get_pcvf_result(inline_ids, month_str) all_list = [] for power_pcvf in power_pcvfs: inline_id = power_pcvf["inlid"] cid = inline_cid_map[inline_id] all_list.append( { "cid": cid, "company_name": company_map[cid]["shortname"], "inline_id": inline_id, "inline_name": inline_map[inline_id]["name"], "pcvf_index": power_pcvf["score"], "save_charge": power_pcvf["cost_save"] if power_pcvf[ "cost_save"] > 0 else 0, } ) all_inline_ids = [i["inline_id"] for i in all_list] if not all_inline_ids: return {"rows": [], "total": 0} # power_info_map = await inline_power_use_info(all_inline_ids, month_str) power_info_map = await inline_power_use_info(all_inline_ids, month_str) for i in all_list: power = 0 charge = 0 avg_price = 0 inline_id = i["inline_id"] if inline_id in power_info_map: charge = round(power_info_map[inline_id]["charge"], 2) power = round(power_info_map[inline_id]["kwh"], 2) avg_price = round(charge / power, 2) if power else 0 i["avg_price"] = avg_price i["power"] = power i["charge"] = charge if sort_field in ["company_name", "avg_price", "pcvf_index", "save_charge"]: all_list.sort( key=lambda x: x[sort_field], reverse=True if sort_direction == "desc" else False, ) page_rows = all_list[page_size * (page_num - 1): page_size * page_num] return { "rows": page_rows, "total": len(all_list), } async def proxy_pcvf_summary( cids, month_str, inline_map=None, ): if not inline_map: inline_map, inline_cid_map = await get_inline_datas_dao(cids) inline_ids = list(inline_map.keys()) power_pcvf_res_list = await get_pcvf_result(inline_ids, month_str) total_save_charge = round( sum( [ round(i["cost_save"], 2) for i in power_pcvf_res_list if type(i["cost_save"]) in [int, float] and i["cost_save"] >= 0 ] ), 2, ) big_space_cnt = 0 proper_space_cnt = 0 small_space_cnt = 0 no_space_cnt = 0 exits_space_cnt = 0 for power_pcvf_res in power_pcvf_res_list: score = power_pcvf_res.get("score") if type(score) in [int, float]: if score >= 90: no_space_cnt += 1 elif 80 < score < 90: small_space_cnt += 1 exits_space_cnt += 1 elif 70 < score <= 80: proper_space_cnt += 1 exits_space_cnt += 1 else: big_space_cnt += 1 exits_space_cnt += 1 else: log.error( f"power_save_res = {power_pcvf_res} score={score}无法评价移峰填谷空间") return { "total_cnt": len(power_pcvf_res_list), "exits_space_cnt": exits_space_cnt, "big_space_cnt": big_space_cnt, "proper_space_cnt": proper_space_cnt, "small_space_cnt": small_space_cnt, "no_space_cnt": no_space_cnt, "total_save_charge": total_save_charge, } async def proxy_electric_optimization_summary(cids, month_str): inline_sql = f"select inlid,name,cid cid,inline_tc,tc_runtime" \ f" from inline where `cid` in %s order by cid" async with MysqlUtil() as conn: inlines = await conn.fetchall(inline_sql, args=(cids,)) if cids else [] inline_map = {i["inlid"]: i for i in inlines} inline_ids = list(inline_map.keys()) power_factor_summary_map = await proxy_power_factor_summary( inline_ids, month_str, inline_map=inline_map ) md_space_summary_map = await proxy_md_space_summary( inline_ids, month_str, inline_map=inline_map ) power_save_summary_map = await proxy_power_save_summary( inline_ids, month_str, inline_map=inline_map ) power_pcvf_summary_map = await proxy_pcvf_summary(inline_ids, month_str, inline_map=inline_map) return { "power_factor": { "can_optimiz_cnt": power_factor_summary_map["unqualified_cnt"], "save_charge": power_factor_summary_map["total_save_charge"], }, "md_space": { "can_optimiz_cnt": md_space_summary_map["exits_space_cnt"], "save_charge": md_space_summary_map["total_save_charge"], }, "pcvf": { "can_optimiz_cnt": power_pcvf_summary_map["exits_space_cnt"], "save_charge": power_pcvf_summary_map["total_save_charge"], }, "power_save": { "can_optimiz_cnt": power_save_summary_map["exits_space_cnt"], "save_charge": power_save_summary_map["total_save_charge"], }, }