import json import time from datetime import datetime, timedelta import pendulum from pot_libs.settings import SETTING from unify_api.modules.electric.dao.electric_dao import load_add_to_compy_ids from unify_api.utils.common_utils import round_2 from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api import constants from unify_api.constants import Importance, SDU_ALARM_LIST from unify_api.modules.alarm_manager.dao.list_static_dao import \ alarm_aggs_importance, \ sdu_alarm_importance_dao from unify_api.modules.common.dao.common_dao import monitor_point_join from unify_api.modules.common.procedures.common_utils import get_electric_index from unify_api.modules.common.procedures.points import proxy_points, \ get_points_num from unify_api.modules.common.procedures.pttl_max import load_pttl_max from unify_api.modules.home_page.components.count_info_cps import ( MaxResidualCurrent, ElectricInfo, ) from unify_api.utils.time_format import last30_day_range from unify_api.modules.home_page.dao.count_info_dao import ( get_inline_by_cid, get_power_factor_kpi, get_pcvf_kpi, get_economic_kpi, get_md_space, get_tc_runtime, compy_real_pf, compy_lst_month_pf ) from unify_api.modules.electric_optimization.dao.power_index import ( price_policy_by_cid ) from unify_api.utils.taos_new import parse_td_columns, get_td_table_name, \ td3_tbl_compate, get_td_engine_data from unify_api.utils.time_format import CST async def other_info(cid): today_alarm_count, alarm_count, has_alarm_days = 0, 0, 0 sql = "SELECT DATE_FORMAT(event_datetime,'%%Y-%%m-%%d') event_date, " \ "count(id) doc_count FROM `point_1min_event` where cid=%s " \ "and event_mode='alarm' GROUP BY event_date ORDER BY event_date desc" async with MysqlUtil() as conn: datas = await conn.fetchall(sql, args=(cid,)) now_time = datetime.now() # 获取到工厂安装时间create_time async with MysqlUtil() as conn: company_sql = "select create_time from company where cid = %s" company = await conn.fetchone(company_sql, (cid,)) create_time_timestamp = company["create_time"] create_time = datetime.fromtimestamp(create_time_timestamp) if not datas: # 1. 增加逻辑,新增工厂如果还没有事件产生 # 系统安全运行天数: 当前时间 - 工厂安装时间 + 1 safe_run_days = (now_time - create_time).days + 1 return today_alarm_count, safe_run_days, alarm_count # 5. 构造返回 # 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2 total_days = (now_time - create_time).days + 2 for data in datas: if data["event_date"] == str(now_time)[:10]: today_alarm_count = data["doc_count"] if data["doc_count"]: # 不安全运行的天数 has_alarm_days += 1 alarm_count += data["doc_count"] safe_run_days = total_days - has_alarm_days log.info( f"today_alarm_count={today_alarm_count} safe_run_days={safe_run_days}") return today_alarm_count, safe_run_days, alarm_count def datetime_to_timestamp(dt): ans_time = time.mktime(dt.timetuple()) return ans_time async def electric_use_info(cid): """1.5用电安全指数""" now = str(datetime.now()) start = str(datetime.now() - timedelta(30)) score_events = ['overU', 'underTotalPF', 'underPhasePF', 'overTHDI', 'overTHDU', 'underU', 'unbalanceI', 'overPR', 'overResidualCurrent', 'unbalanceU', 'overI'] score_sql = f"select importance, count(importance) doc_count from " \ f"point_1min_event where cid=%s and event_datetime BETWEEN" \ f"'{start}' and '{now}' and event_type in %s " \ f"GROUP BY importance" alarm_sql = f"select importance, count(importance) doc_count from " \ f"point_1min_event where cid=%s and event_datetime BETWEEN" \ f"'{start}' and '{now}' " \ f"GROUP BY importance" first_score, second_score, third_score = 0, 0, 0 async with MysqlUtil() as conn: score_datas = await conn.fetchall(score_sql, args=(cid, score_events)) alarm_datas = await conn.fetchall(alarm_sql, args=(cid,)) for data in score_datas: if data["importance"] == Importance.First.value: first_score += data["doc_count"] elif data["importance"] == Importance.Second.value: second_score += data["doc_count"] elif data["importance"] == Importance.Third.value: third_score += data["doc_count"] point_len = await proxy_points([cid]) alarm_score = ( (first_score * 2 + second_score * 1 + third_score * 0.5) / point_len if point_len else 0 ) alarm_score = 15 if alarm_score >= 15 else alarm_score electric_use_score = get_electric_index(alarm_score) first_alarm, second_alarm, third_alarm = 0, 0, 0 for data in alarm_datas: if data["importance"] == Importance.First.value: first_alarm += data["doc_count"] elif data["importance"] == Importance.Second.value: second_alarm += data["doc_count"] elif data["importance"] == Importance.Third.value: third_alarm += data["doc_count"] return ElectricInfo( first_alarm_cnt=first_alarm, second_alarm_cnt=second_alarm, third_alarm_cnt=third_alarm, alarm_score=alarm_score, electric_use_score=electric_use_score, ) async def normal_rate_of_location(cid): """获取温度和漏电流达标率""" d_stats = {"residual_current": {"total": 0, "normal": 0}, "temperature": {"total": 0, "normal": 0}, } sql = "select l.lid, l.ad_type type, s.threshold from " \ "location l left join soe_config_record s on l.lid=s.lid " \ "where l.cid = %s and s.enable=1 and l.ad_type in %s " \ "and s.etype in %s;" async with MysqlUtil() as conn: locs = await conn.fetchall(sql, (cid, ("residual_current", "temperature"), ("overTemp", "overResidualCurrent"))) loc_infos = {loc["lid"]: loc for loc in locs} lids = list(loc_infos.keys()) if not lids: return "100%", "100%" prefix = f"real_time:adio:{SETTING.mysql_db}" keys = [f"{prefix}:{lid}" for lid in lids] rt_rlts = await RedisUtils().mget(keys) rt_adios = [json.loads(r) for r in rt_rlts if r] d_rt_adio = {adio["lid"]: adio for adio in rt_adios} now_ts = pendulum.now(tz=CST).int_timestamp for lid, d_loc in loc_infos.items(): d_stats[d_loc["type"]]["total"] += 1 if lid not in d_rt_adio: continue if not isinstance(d_loc["threshold"], (float, int)): continue try: d_adio = d_rt_adio[lid] if (now_ts - d_adio["ts"]) > constants.REAL_EXP_TIME: log.warn(f"adio_current location_id={lid} has expire!") continue if d_adio["v"] < d_loc["threshold"]: d_stats[d_loc["type"]]["normal"] += 1 except Exception as e: log.exception(f"parse real time adio:{d_adio} exc, e:{e}") if d_stats["temperature"]["total"] == 0: temp_qr = "100%" else: norm = d_stats["temperature"]["normal"] total = d_stats["temperature"]["total"] temp_qr = str(round((norm / total) * 100, )) + "%" if d_stats["residual_current"]["total"] == 0: rc_qr = "100%" else: norm = d_stats["residual_current"]["normal"] total = d_stats["residual_current"]["total"] rc_qr = str(round((norm / total) * 100)) + "%" return temp_qr, rc_qr async def real_time_load(cid, end_dt=None): """实时负荷""" td_tbls = [] add_to_compy_ids = await load_add_to_compy_ids(cid) if not add_to_compy_ids: return 0 for item in add_to_compy_ids: mtid, sid = item["mtid"], item["sid"] tbl = get_td_table_name("electric", mtid) td_tbls.append(tbl) td_tbls.append(f"s_{sid.lower()}_e") td_mt_tables = td3_tbl_compate(td_tbls) if not end_dt: end_dt = pendulum.now(tz=CST) s_dt = end_dt.subtract(minutes=15) sql = f"SELECT last_row(mdptime, pttl) FROM electric_stb " \ f"WHERE TBNAME IN {td_mt_tables} " \ f"AND ts>='{str(s_dt)}' AND ts <='{str(end_dt)}' group by tbname;" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai" is_succ, results = await get_td_engine_data(url, sql) if not is_succ: return "" head = parse_td_columns(results) datas = [] for res in results["data"]: datas.append(dict(zip(head, res))) total = 0 for item in datas: if not item: continue total += item["pttl"] return total async def power_count_info(cid): """近30天負荷最大值""" now = datetime.now() start_time = (now - timedelta(30)).strftime("%Y-%m-%d 00:00:00") end_time = now.strftime("%Y-%m-%d %H:%M:%S") max_30d_load, _time = await load_pttl_max(cid, start_time, end_time, -1) cur_load = await real_time_load(cid) return round_2(cur_load), round_2(max_30d_load) async def get_max_aiao_of_filed(cid, start, end, filed="temperature"): value_max, location_name, occur_time = None, None, None sql = f"SELECT a.value_max,a.value_max_time,p.name,b.item FROM " \ f"`location_15min_aiao` a LEFT JOIN location b on a.lid=b.lid " \ f"LEFT JOIN point p on p.mtid=a.mtid where " \ f"a.create_time BETWEEN '{start}' and '{end}' and a.cid=%s " \ f" and a.ad_type=%s order by a.value_max desc limit 1" async with MysqlUtil() as conn: datas = await conn.fetchone(sql, args=(cid, filed)) if datas: value_max = round(datas["value_max"], 2) item_name = '漏电流' if datas['item'] == 'default' else datas['item'] location_name = f"{datas['name']}_{item_name}" occur_time = datas.get("value_max_time") occur_time = str(occur_time) if occur_time else None return MaxResidualCurrent( max=value_max, location_name=location_name, occur_time=occur_time, ) async def company_power_use_info(company_id, start, end): async with MysqlUtil() as conn: sql = f"SELECT sum(kwh) kwh, sum(charge) charge FROM " \ f"`company_15min_power` where create_time BETWEEN " \ f"'{start}' and '{end}' AND cid=%s" datas = await conn.fetchone(sql, args=(company_id,)) return datas async def get_company_charge_price(company_id, es_time_start, es_time_end): power_use_info = await company_power_use_info(company_id, es_time_start, es_time_end) if power_use_info["kwh"]: unit_price = power_use_info["charge"] / power_use_info["kwh"] else: unit_price = "" return unit_price async def power_charge_price(cid): """ 首页获取昨日平均电价, 上月平均电价""" # 昨日平均电价 now = datetime.now() yestday = now - timedelta(1) yestday_start = datetime(yestday.year, yestday.month, yestday.day, 0, 0, 0) yestday_end = yestday_start + timedelta(1) yestday_datas = await company_power_use_info(cid, str(yestday_start), str(yestday_end)) if yestday_datas["kwh"]: yestoday_price = yestday_datas["charge"] / yestday_datas["kwh"] else: yestoday_price = "" if now.month == 1: last_month = 12 year = now.year - 1 last_month_start = datetime(year=year, month=last_month, day=1) else: last_month_start = datetime(year=now.year, month=now.month - 1, day=1) last_month_end = datetime(year=now.year, month=now.month, day=1) last_month_datas = await company_power_use_info(cid, str(last_month_start), str(last_month_end) ) if last_month_datas["kwh"]: last_month_price = last_month_datas["charge"] / last_month_datas["kwh"] else: last_month_price = "" return round_2(yestoday_price), round_2(last_month_price) async def cal_power_factor(cid): """首页获取实时功率因数, 上月功率因数""" real_cos = await compy_real_pf(cid) lst_month_cos = await compy_lst_month_pf(cid) if lst_month_cos: lst_month_cos = round(lst_month_cos, 2) return real_cos, lst_month_cos async def optimization_count_info(company_id: int): """ 首页用电经济指数和用电优化模块统计数据 :param company_id: :return: """ async with MysqlUtil() as conn: sql = "SELECT inlid, `name` FROM inline WHERE cid=%s" inlines = await conn.fetchall(sql, args=(company_id,)) inline_ids = [inline["inlid"] for inline in inlines] now = datetime.now() start_time = ( pendulum.datetime(now.year, now.month, 1) .subtract(months=1) .strftime("%Y-%m-%d %H:%M:%S") ) end_time = pendulum.datetime(now.year, now.month, 1).strftime( "%Y-%m-%d %H:%M:%S") power_use_info = await company_power_use_info(company_id, start_time, end_time) month_charge = power_use_info.get("charge") month_kwh = power_use_info.get("kwh") count_info_map = { "avg_price": month_charge / month_kwh if month_kwh else "" } if not inline_ids: count_info_map.update( { "power_factor": {"save_charge": "", "kpi_x": "", "desc": "", }, "pcvf": {"save_charge": "", "kpi_x": "", "desc": "", }, "power_save": {"save_charge": "", "kpi_x": "", "desc": "", }, "md_space": {"save_charge": "", "kpi_x": "", "desc": "", }, "save_percent": 0, "md_space_p": "", "mean_load_factor": "", } ) return count_info_map now = datetime.now() if now.month == 1: last_month_dt = datetime(year=now.year - 1, month=12, day=1) else: last_month_dt = datetime(year=now.year, month=now.month - 1, day=1) last_month_str = datetime.strftime(last_month_dt, "%Y-%m-%d") # 功率因数 async with MysqlUtil() as conn: sql = "SELECT inlid, `cos`, save_charge pf_cost, kpi_x, save_charge " \ "FROM algo_power_factor_result WHERE inlid in %s " \ "and month=%s" power_factor_results = await conn.fetchall(sql, args=( inline_ids, last_month_str)) total_pf_save = round( sum([i["pf_cost"] for i in power_factor_results if i["pf_cost"] and i["pf_cost"] >= 0]), 2, ) total_pf_save = 0 if total_pf_save <= 0 else total_pf_save pf_kpi_x_list = [ i["kpi_x"] for i in power_factor_results if type(i["kpi_x"]) in [int, float] ] pf_kpi_x = min(pf_kpi_x_list) if len(pf_kpi_x_list) else "" if pf_kpi_x == "": pf_desc = "" elif pf_kpi_x >= 0.9: pf_desc = "无空间" elif 0.85 < pf_kpi_x < 0.9: pf_desc = "空间较小" elif 0.8 < pf_kpi_x <= 0.85: pf_desc = "空间适中" else: pf_desc = "空间较大" count_info_map["power_factor"] = { "save_charge": total_pf_save if pf_kpi_x != "" else "", "kpi_x": pf_kpi_x, "desc": pf_desc, } # 移峰填谷指数 async with MysqlUtil() as conn: sql = "select `score`, `cost_save` from `algo_plsi_result` " \ "where `inlid` in %s and `month` = %s" pcvfs = await conn.fetchall(sql, args=(inline_ids, last_month_str)) pcvf_kpi_x_list = [i["score"] for i in pcvfs if type(i["score"]) in [int, float]] pcvf_kpi_x = min(pcvf_kpi_x_list) if len(pcvf_kpi_x_list) else "" total_pcvf_save = round( sum([i["cost_save"] for i in pcvfs if i["cost_save"] and i["cost_save"] >= 0]), 2 ) if pcvf_kpi_x == "": pcvf_desc = "" elif pcvf_kpi_x >= 90: pcvf_desc = "无空间" elif 80 < pcvf_kpi_x < 90: pcvf_desc = "空间较小" elif 70 < pcvf_kpi_x <= 80: pcvf_desc = "空间适中" else: pcvf_desc = "空间较大" total_pcvf_save = 0 if total_pcvf_save <= 0 else total_pcvf_save count_info_map["pcvf"] = { "save_charge": total_pcvf_save if pcvf_kpi_x != "" else "", "kpi_x": pcvf_kpi_x, "desc": pcvf_desc, } # 经济运行 async with MysqlUtil() as conn: sql = "select `kpi_x`, `save_charge`, `mean_load_factor` " \ "from `algo_economic_operation_result` where " \ "`inlid` in %s and `month` = %s" economic_operations = await conn.fetchall(sql, args=( inline_ids, last_month_str)) economic_kpi_x_list = [ i["kpi_x"] for i in economic_operations if type(i["kpi_x"]) in [int, float] ] economic_kpi_x = max( economic_kpi_x_list) if economic_kpi_x_list else "" total_economic_save = round( sum( [ i["save_charge"] for i in economic_operations if i["save_charge"] and i["save_charge"] >= 0 ] ), 2, ) total_economic_save = 0 if total_economic_save <= 0 else \ total_economic_save if economic_kpi_x == "": economic_desc = "" elif economic_kpi_x <= 0.6: economic_desc = "无空间" elif 0.6 < economic_kpi_x <= 0.7: economic_desc = "空间较小" elif 0.7 < economic_kpi_x <= 0.8: economic_desc = "空间适中" else: economic_desc = "空间较大" count_info_map["power_save"] = { "save_charge": total_economic_save if economic_kpi_x != "" else "", "kpi_x": economic_kpi_x, "desc": economic_desc, } # 最大需量 async with MysqlUtil() as conn: sql = ( "select a.inline_md_charge, a.kpi_x, a.save_charge " "from `algo_md_space_analysis_result` a " "inner join`algo_md_space_analysis_unit` b " " on a.space_analysis_id=b.id " "where b.inlid in %s and a.month = %s and b.valid=1;" ) md_spaces = await conn.fetchall(sql, args=(inline_ids, last_month_str)) md_space_kpi_x_list = [i["kpi_x"] for i in md_spaces if type(i["kpi_x"]) in [int, float]] md_space_kpi_x = max(md_space_kpi_x_list) if len( md_space_kpi_x_list) else "" total_md_space_save = round( sum( [i["save_charge"] for i in md_spaces if i["save_charge"] and i["save_charge"] >= 0] ), 2, ) total_md_space_save = 0 if total_md_space_save <= 0 else \ total_md_space_save if md_space_kpi_x == "": md_space_desc = "" elif md_space_kpi_x <= 0: md_space_desc = "无空间" elif 0 < md_space_kpi_x <= 0.1: md_space_desc = "空间较小" elif 0.1 < md_space_kpi_x <= 0.2: md_space_desc = "空间适中" else: md_space_desc = "空间较大" count_info_map["md_space"] = { "save_charge": total_md_space_save if md_space_kpi_x != "" else "", "kpi_x": md_space_kpi_x, "desc": md_space_desc, } total_save_cost = 0 for _, item in count_info_map.items(): total_save_cost += ( item["save_charge"] if isinstance(item, dict) and item[ "save_charge"] else 0 ) save_percent = total_save_cost / month_charge if month_charge else "" count_info_map["save_percent"] = save_percent # 计算最大需量 async with MysqlUtil() as conn: sql = "select `price_md`,`price_tc` from `price_policy` where `cid`=%s" price_policy = await conn.fetchone(sql, args=(company_id,)) total_md_space_charge = sum( [i["inline_md_charge"] for i in md_spaces if i["inline_md_charge"]]) total_md_space_p = ( total_md_space_charge / price_policy["price_md"] if price_policy and price_policy["price_md"] else "" ) count_info_map["md_space_p"] = total_md_space_p # 经济运行最低负载率 mean_load_factors = [ i["mean_load_factor"] for i in economic_operations if i["mean_load_factor"] ] count_info_map["mean_load_factor"] = "" if mean_load_factors: count_info_map["mean_load_factor"] = min(mean_load_factors) return count_info_map async def electric_use_info_sdu(cid): start, end = last30_day_range() first_alarm_cnt = 0 second_alarm_cnt = 0 third_alarm_cnt = 0 sql = f"select importance,count(1) doc_count from point_1min_event " \ f"where cid={cid} and event_datetime BETWEEN '{start}' and '{end}' " \ f"and event_type in {tuple(SDU_ALARM_LIST)} GROUP BY importance" log.info(f"sql:{sql}") async with MysqlUtil() as conn: datas = await conn.fetchall(sql) for data in datas: if data["importance"] == Importance.First.value: first_alarm_cnt += data["doc_count"] elif data["importance"] == Importance.Second.value: second_alarm_cnt += data["doc_count"] elif data["importance"] == Importance.Third.value: third_alarm_cnt += data["doc_count"] point_len = await get_points_num(cid) alarm_score = ( ( first_alarm_cnt * 2 + second_alarm_cnt * 1 + third_alarm_cnt * 0.5) / point_len if point_len else 0 ) alarm_score = 15 if alarm_score >= 15 else alarm_score electric_use_score = get_electric_index(alarm_score) return ElectricInfo( first_alarm_cnt=first_alarm_cnt, second_alarm_cnt=second_alarm_cnt, third_alarm_cnt=third_alarm_cnt, alarm_score=alarm_score, electric_use_score=electric_use_score, ) async def electric_use_info_points_sdu(start, end, points): """用电安全指数, 识电u, 根据points来计算""" sql = f"select importance,count(*) as doc_count from point_1min_event " \ f"where pid in %s and event_datetime BETWEEN %s and %s " \ f"and event_type in %s group by importance" async with MysqlUtil() as conn: results = await conn.fetchall(sql, args=(points, start, end, SDU_ALARM_LIST)) first_alarm_cnt = 0 second_alarm_cnt = 0 third_alarm_cnt = 0 for result in results: if result["importance"] == Importance.First.value: first_alarm_cnt += result["doc_count"] elif result["importance"] == Importance.Second.value: second_alarm_cnt += result["doc_count"] elif result["importance"] == Importance.Third.value: third_alarm_cnt += result["doc_count"] alarm_score = (first_alarm_cnt * 2 + second_alarm_cnt * 1 + third_alarm_cnt * 0.5) / len(points) if alarm_score >= 15: alarm_score = 15 electric_use_score = get_electric_index(alarm_score) log.info( "point_len={} alarm_score={} electric_use_score={}".format( len(points), alarm_score, electric_use_score ) ) return ElectricInfo( first_alarm_cnt=first_alarm_cnt, second_alarm_cnt=second_alarm_cnt, third_alarm_cnt=third_alarm_cnt, alarm_score=alarm_score, electric_use_score=electric_use_score, ) async def optimization_count_info_new(company_id: int): """ 首页用电经济指数和用电优化模块统计数据 :param company_id: :return: """ inlines = await get_inline_by_cid(company_id) inline_ids = [inline["inlid"] for inline in inlines] # 获取公司上月用电 now = datetime.now() es_start_time = ( pendulum.datetime(now.year, now.month, 1) .subtract(months=1) .strftime("%Y-%m-%dT%H:%M:%S+08:00") ) es_end_time = pendulum.datetime(now.year, now.month, 1).strftime( "%Y-%m-%dT%H:%M:%S+08:00") power_use_info = await company_power_use_info(company_id, es_start_time, es_end_time) month_charge = power_use_info["charge"] count_info_map = { "avg_price": power_use_info["charge"] / power_use_info["kwh"] if power_use_info["kwh"] else "" } if not inline_ids: count_info_map.update( { "power_factor": {"save_charge": "", "kpi_x": "", "desc": "", }, "pcvf": {"save_charge": "", "kpi_x": "", "desc": "", }, "power_save": {"save_charge": "", "kpi_x": "", "desc": "", }, "md_space": {"save_charge": "", "kpi_x": "", "desc": "", }, "save_percent": 0, "md_space_p": "", "mean_load_factor": "", } ) return count_info_map if now.month == 1: last_month_dt = datetime(year=now.year - 1, month=12, day=1) else: last_month_dt = datetime(year=now.year, month=now.month - 1, day=1) last_month_str = datetime.strftime(last_month_dt, "%Y-%m") # 功率因数 power_factor_results = await get_power_factor_kpi(inline_ids, last_month_dt) total_pf_save = round( sum([i["pf_cost"] for i in power_factor_results if i["pf_cost"] and i["pf_cost"] >= 0]), 2, ) total_pf_save = 0 if total_pf_save <= 0 else total_pf_save pf_kpi_x_list = [ (i["name"], i["kpi_x"]) for i in power_factor_results if type(i["kpi_x"]) in [int, float] ] if len(pf_kpi_x_list): pf_kpi_x_num = [pf_kpi[1] for pf_kpi in pf_kpi_x_list] pf_kpi_x = min(pf_kpi_x_num) if pf_kpi_x >= 0.9: pf_desc = "上月功率因数比较合理,请继续保持" else: pf_kpi_x_name = [] for index, kpi_num in enumerate(pf_kpi_x_num): if kpi_num < 0.9: pf_kpi_x_name.append(pf_kpi_x_list[index][0]) pf_desc = f"{'、'.join(pf_kpi_x_name)}可以进行无功补偿" else: pf_kpi_x = "" pf_desc = "" if pf_kpi_x == "": pf_space = "" elif pf_kpi_x >= 0.9: pf_space = "无空间" elif 0.85 < pf_kpi_x < 0.9: pf_space = "空间较小" elif 0.8 < pf_kpi_x <= 0.85: pf_space = "空间适中" else: pf_space = "空间较大" count_info_map["power_factor"] = { "save_charge": total_pf_save if pf_kpi_x != "" else "", "kpi_x": pf_kpi_x, "desc": pf_desc, "space": pf_space } # 移峰填谷指数 pcvfs = await get_pcvf_kpi(inline_ids, last_month_str) pcvf_kpi_x_list = [(i["name"], i["score"]) for i in pcvfs if type(i["score"]) in [int, float]] if len(pcvf_kpi_x_list): pcvf_kpi_x_num = [pcvf_kpi[1] for pcvf_kpi in pcvf_kpi_x_list] pcvf_kpi_x = min(pcvf_kpi_x_num) pcvf_kpi_x_name = [] if pcvf_kpi_x < 70: for index, kpi_num in enumerate(pcvf_kpi_x_num): if kpi_num < 70: pcvf_kpi_x_name.append(pcvf_kpi_x_list[index][0]) pcvf_desc = f"{'、'.join(pcvf_kpi_x_name)}可以进行无功补偿" elif pcvf_kpi_x < 90: for index, kpi_num in enumerate(pcvf_kpi_x_num): if kpi_num < 90: pcvf_kpi_x_name.append(pcvf_kpi_x_list[index][0]) pcvf_desc = f"请合理调整{'、'.join(pcvf_kpi_x_name)}用电时间或" \ f"引入新能源,转移高峰电量至低谷" else: pcvf_desc = "平均电价处于较低水平,请继续保持" else: pcvf_kpi_x = "" pcvf_desc = "" if pcvf_kpi_x == "": pcvf_space = "" elif pcvf_kpi_x >= 90: pcvf_space = "无空间" elif 80 < pcvf_kpi_x < 90: pcvf_space = "空间较小" elif 70 < pcvf_kpi_x <= 80: pcvf_space = "空间适中" else: pcvf_space = "空间较大" total_pcvf_save = round( sum([i["cost_save"] for i in pcvfs if i["cost_save"] and i["cost_save"] >= 0]), 2 ) total_pcvf_save = 0 if total_pcvf_save <= 0 else total_pcvf_save count_info_map["pcvf"] = { "save_charge": total_pcvf_save if pcvf_kpi_x != "" else "", "kpi_x": pcvf_kpi_x, "desc": pcvf_desc, "space": pcvf_space } # 经济运行 economic_operations = await get_economic_kpi(inline_ids, last_month_str) economic_kpi_x_list = [ (i["name"], i["kpi_x"]) for i in economic_operations if type(i["kpi_x"]) in [int, float] ] total_economic_save = round( sum( [ i["save_charge"] for i in economic_operations if i["save_charge"] and i["save_charge"] >= 0 ] ), 2, ) if len(economic_kpi_x_list): econ_kpi_x_num = [econ_kpi[1] for econ_kpi in economic_kpi_x_list] econ_kpi_x = max(econ_kpi_x_num) econ_kpi_x_min = min(econ_kpi_x_num) econ_kpi_x_name = [] if econ_kpi_x >= 0.9: for index, kpi_max in enumerate(econ_kpi_x_num): if kpi_max >= 0.9: econ_kpi_x_name.append(economic_kpi_x_list[index][0]) economic_desc = f"{'、'.join(econ_kpi_x_name)}负载率超过" \ f"安全阈值,应考虑扩容" elif econ_kpi_x >= 0.7: for index, kpi_max in enumerate(econ_kpi_x_num): if kpi_max >= 0.7: econ_kpi_x_name.append(economic_kpi_x_list[index][0]) economic_desc = f"{'、'.join(econ_kpi_x_name)}负载率较高," \ f"可考虑调整负荷或扩容" elif econ_kpi_x_min < 0.6 and econ_kpi_x < 0.9: for index, kpi_max in enumerate(econ_kpi_x_num): if kpi_max < 0.6: econ_kpi_x_name.append(economic_kpi_x_list[index][0]) economic_desc = f"{'、'.join(econ_kpi_x_name)}负载率较低," \ f"请考虑容改需或更换小容量变压器" else: economic_desc = "进线负载率比较经济,请继续保持" else: econ_kpi_x = "" economic_desc = "" if econ_kpi_x == "": econ_space = "" elif econ_kpi_x <= 0.6: econ_space = "无空间" elif 0.6 < econ_kpi_x <= 0.7: econ_space = "空间较小" elif 0.7 < econ_kpi_x <= 0.8: econ_space = "空间适中" else: econ_space = "空间较大" count_info_map["power_save"] = { "save_charge": total_economic_save if econ_kpi_x != "" else "", "kpi_x": econ_kpi_x, "desc": economic_desc, "space": econ_space } # 容量、需量价格 price_policy = await price_policy_by_cid(company_id) price_md = price_policy["price_md"] if price_policy["price_md"] else 0 price_tc = price_policy["price_tc"] if price_policy["price_tc"] else 0 # 最大需量 md_spaces = await get_md_space(inline_ids, last_month_dt) md_space_kpi_x_list = [i["kpi_x"] for i in md_spaces if type(i["kpi_x"]) in [int, float]] md_space_kpi_x = max(md_space_kpi_x_list) if len( md_space_kpi_x_list) else "" total_md_space_save = round( sum( [i["save_charge"] for i in md_spaces if i["save_charge"] and i["save_charge"] >= 0] ), 2, ) total_md_space_save = 0 if total_md_space_save <= 0 else total_md_space_save if md_space_kpi_x == "": md_space_space = "" elif md_space_kpi_x <= 0: md_space_space = "无空间" elif 0 < md_space_kpi_x <= 0.1: md_space_space = "空间较小" elif 0.1 < md_space_kpi_x <= 0.2: md_space_space = "空间适中" else: md_space_space = "空间较大" md_space_tc_runtimes = await get_tc_runtime(inline_ids) md_space_name = [] for index, item in enumerate(md_spaces): if type(item["inline_md_predict"]) in [int, float] and \ md_space_tc_runtimes[index]["tc_runtime"] * price_tc >= \ price_md * item["inline_md_predict"]: md_space_name.append(md_space_tc_runtimes[index]["name"]) if len(md_space_name): md_space_desc = f"若次月负荷无较大变动,建议{'、'.join(md_space_name)}" \ f"选择按最大需量计费" else: md_space_desc = "不存在容改需空间" count_info_map["md_space"] = { "save_charge": total_md_space_save if md_space_kpi_x != "" else "", "kpi_x": md_space_kpi_x, "desc": md_space_desc, "space": md_space_space } # 节费率 total_save_cost = 0 for _, item in count_info_map.items(): total_save_cost += ( item["save_charge"] if isinstance(item, dict) and item[ "save_charge"] else 0 ) save_percent = total_save_cost / month_charge if month_charge else "" count_info_map["save_percent"] = save_percent # 计算最大需量 total_md_space_charge = sum( [i["inline_md_charge"] for i in md_spaces if i["inline_md_charge"]]) total_md_space_p = ( total_md_space_charge / price_policy["price_md"] if price_policy and price_policy["price_md"] else "" ) count_info_map["md_space_p"] = total_md_space_p # 经济运行最低负载率 mean_load_factors = [ i["mean_load_factor"] for i in economic_operations if i["mean_load_factor"] ] count_info_map["mean_load_factor"] = "" if mean_load_factors: count_info_map["mean_load_factor"] = min(mean_load_factors) return count_info_map async def cid_alarm_importance_count(cid, start, end): """计算工厂报警数, 按报警等级""" monitor_point_list = await monitor_point_join(cid) point_list = [i["pid"] for i in monitor_point_list] es_res = await sdu_alarm_importance_dao(start, end, point_list) es_res_key = {i["key"]: i for i in es_res} res_list = [] for info in monitor_point_list: name = info.get("name") point_id = info.get("pid") tmp_dic = {"name": name, "alarm_count": 0, "first": 0, "second": 0, "third": 0} if point_id in es_res_key: inner_bucket = es_res_key[point_id]["importance"]["buckets"] for b in inner_bucket: if b["key"] == Importance.First.value: tmp_dic["first"] += b["doc_count"] elif b["key"] == Importance.Second.value: tmp_dic["second"] += b["doc_count"] elif b["key"] == Importance.Third.value: tmp_dic["third"] += b["doc_count"] tmp_dic["alarm_count"] = tmp_dic["first"] + tmp_dic["second"] + \ tmp_dic["third"] res_list.append(tmp_dic) # 按照报警数, 排top5 res_list_sort = sorted(res_list, key=lambda i: i["alarm_count"], reverse=True)[:5] return res_list_sort async def alarm_importance_count_total(cid, start, end): """计算工厂报警数, 按报警等级""" es_res = await alarm_aggs_importance(cid, start, end) first_cnt, second_cnt, third_cnt = 0, 0, 0 for buckets in es_res: if buckets["importance"] == Importance.First.value: first_cnt += buckets["alarm_count"] elif buckets["importance"] == Importance.Second.value: second_cnt += buckets["alarm_count"] elif buckets["importance"] == Importance.Third.value: third_cnt += buckets["alarm_count"] return { "first_cnt": first_cnt, "second_cnt": second_cnt, "third_cnt": third_cnt, "total_cnt": first_cnt + second_cnt + third_cnt } def safety_ratio_res(score, client_name): """根据安全指数, 求安全风险系数""" # wechat和web共用 if 90 <= score <= 100: return "风险较低" if client_name == "wechat" else "当前安全状况良好,请继续保持!" elif 80 <= score < 90: return "低风险" if client_name == "wechat" else "当前安全状况较好,请关注报警信息!" elif 60 <= score < 80: return "中风险" if client_name == "wechat" else "当前安全状况一般,请密切关注报警信息!" elif 40 <= score < 60: return "风险较高" if client_name == "wechat" else "当前安全状况较差,请重视报警信息!" elif 0 <= score < 40: return "高风险" if client_name == "wechat" else "当前安全状况很差,请重视报警信息并处理!" else: log.error(f"score:{score}, 安全指数不在0-100范围内") return "" def health_status_res(score, client_name): """根据健康指数, 求健康状态""" if 90 <= score <= 100: return "状态优" if client_name == "wechat" else "当前健康状况良好,请继续保持!" elif 75 <= score < 90: return "状态较好" if client_name == "wechat" else "当前健康状况较好,请关注电能质量信息!" elif 60 <= score < 75: return "状况一般" if client_name == "wechat" else "当前健康状况一般,请密切关注电能质量信息!" elif 0 <= score < 60: return "状态差" if client_name == "wechat" else "当前健康状况差,请重视电能质量信息并改善!" else: log.error(f"score:{score}, 健康指数不在0-100范围内") return "" def carbon_status_res(score): """根据碳排指数, 求达标情况, wechat使用""" if 80 <= score <= 100: return "达标" elif 0 <= score < 80: return "不达标" else: log.error(f"score:{score}, 碳排指数不在0-100范围内") return "" def carbon_status_res_web(score): """根据碳排指数, 求达标情况, web使用""" if not isinstance(score, int): return "" if 90 <= score <= 100: return "当前能耗达标且水平很低,请继续保持!" elif 80 <= score < 90: return "当前能耗达标且水平较低,请继续保持!" elif 60 <= score < 80: return "当前能耗不达标,请关注能耗控制!" elif 0 <= score < 60: return "当前能耗不达标且水平较高,请重视能耗控制!" else: log.error(f"score:{score}, 碳排指数不在0-100范围内") return "" def economic_index_desc(score, client_name): """经济指数, 返回文字说明""" if 90 <= score <= 100: return "状态良好" if client_name == "wechat" else "当前经济性良好,请继续保持" elif 75 <= score < 90: return "存在空间" if client_name == "wechat" else "存在空间,建议关注优化措施" elif 60 <= score < 75: return "空间较大" if client_name == "wechat" else "空间较大,建议尝试优化措施!" elif 0 <= score < 60: return "空间很大" if client_name == "wechat" else "空间很大,建议采取优化措施!" else: log.error(f"score:{score}, 经济指数不在0-100范围内") return ""