import pendulum from datetime import timedelta import math from pot_libs.es_util.es_utils import EsUtil from pot_libs.logger import log from unify_api.constants import POINT_1MIN_EVENT from unify_api.utils.time_format import convert_es_str, CST from pot_libs.mysql_util.mysql_util import MysqlUtil index = POINT_1MIN_EVENT async def alarm_aggs_point_location(date_start, date_end, cid): """根据point_id和location_id聚合""" start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "term": { "cid": cid } }, { "range": { "datetime": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "point": { "terms": { "field": "point_id", "size": 500 } }, "location": { "terms": { "field": "location_id", "size": 500 } } } } log.info(f"index:{index} ---- query_body:{query_body}") async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=index) location_list = es_re["aggregations"]["location"]["buckets"] point_list = es_re["aggregations"]["point"]["buckets"] return location_list, point_list async def get_inline_by_cid(cid): sql = "SELECT inlid, `name` FROM inline WHERE cid=%s" async with MysqlUtil() as conn: inlines = await conn.fetchall(sql, args=(cid,)) return inlines # 功率因素 async def get_power_factor_kpi(inline_ids, month_str): sql = "SELECT b.name, a.inlid, a.save_charge pf_cost, a.kpi_x, " \ "a.save_charge FROM algo_power_factor_result a LEFT JOIN inline b " \ "on a.inlid = b.inlid WHERE a.inlid in %s and a.`month`=%s" async with MysqlUtil() as conn: power_factor_results = await conn.fetchall(sql, args=( inline_ids, month_str)) return power_factor_results # 移峰填谷 async def get_pcvf_kpi(inline_ids, last_month_str): sql = "select b.name, a.score, a.cost_save " \ "from algo_plsi_result a left join inline b " \ "on a.inlid = b.inlid where a.inlid in %s and a.month = %s" async with MysqlUtil() as conn: pcvfs = await conn.fetchall(sql, args=(inline_ids, last_month_str)) return pcvfs # 经济运行 async def get_economic_kpi(inline_ids, last_month_str): sql = "select b.name, a.kpi_x, a.save_charge, a.mean_load_factor " \ "from algo_economic_operation_result a " \ "left join inline b on a.inlid = b.inlid " \ "where a.inlid in %s and a.month = %s" async with MysqlUtil() as conn: econ = await conn.fetchall(sql, args=(inline_ids, last_month_str)) return econ # 最大需量 async def get_md_space(inline_ids, last_month_str): sql = ( "select a.inline_md_charge, a.kpi_x, a.save_charge, " "a.inline_md_predict, b.inlid,b.tc_runtime " "from algo_md_space_analysis_result a " "inner join algo_md_space_analysis_unit b " "on a.space_analysis_id=b.id " "where b.inlid in %s and a.month = %s and valid=1;" ) async with MysqlUtil() as conn: md_spaces = await conn.fetchall(sql, args=(inline_ids, last_month_str)) return md_spaces async def get_tc_runtime(inline_ids): sql = "SELECT inlid, name, tc_runtime FROM `inline` where inlid in %s;" async with MysqlUtil() as conn: tc_runtimes = await conn.fetchall(sql, args=(inline_ids,)) return tc_runtimes async def compy_real_pf(cid): sql = "select pid, inlid from point where cid=%s and add_to_company=1" async with MysqlUtil() as conn: pids = [r["pid"] for r in await conn.fetchall(sql, (cid,))] if not pids: return "" dt = pendulum.now(tz=CST) tstamp = dt.int_timestamp // (15 * 60) * (15 * 60) dt = pendulum.from_timestamp(tstamp, tz=CST) end_dt = (dt - timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S") str_dt = dt.strftime("%Y-%m-%d %H:%M:%S") sql = f"SELECT pid, create_time, pttl_mean, qttl_mean FROM " \ f"`point_15min_electric` where create_time in " \ f"('{str_dt}', '{end_dt}') and pid in %s;" total_pttl, total_qttl = 0, 0 for r in await conn.fetchall(sql, (tuple(pids),)): total_pttl += r["pttl_mean"] total_qttl += r["qttl_mean"] std_cos = math.sqrt(total_pttl * total_pttl + total_qttl * total_qttl) return round(total_pttl / std_cos, 2) if std_cos else "" async def compy_lst_month_pf(cid): sql = "SELECT inlid FROM inline WHERE cid=%s;" async with MysqlUtil() as conn: inlids = [r["inlid"] for r in await conn.fetchall(sql, (cid,))] if not inlids: return "" now_dt = pendulum.now(tz=CST) cal_month = now_dt.subtract(months=1).start_of(unit="month").format( "YYYY-MM-DD") sql = "SELECT cos FROM algo_power_factor_result " \ "WHERE inlid in %s and month=%s" cos_lst = [r["cos"] for r in await conn.fetchall(sql, (inlids, cal_month))] return min(cos_lst) if cos_lst else ""