Commit 984a952d authored by ZZH's avatar ZZH

remove es 2023-6-2

parent 0bc47465
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import POINT_15MIN_INDEX, INDEX from unify_api.utils.time_format import range_to_type
from unify_api.utils.es_query_body import EsQueryBody
from unify_api.utils.time_format import power_slots, range_to_type
async def pttl_max(cid, start, end, point_id=None, inline_id=None): async def load_pttl_max(cid, start, end, point_id=None, inline_id=None):
# 根据进线,找point
if inline_id:
sql = "SELECT pid from `point` WHERE cid = %s " \
"and inlid_belongedto = %s and add_to_company = 1"
async with MysqlUtil() as conn:
point_info = await conn.fetchall(sql=sql,
args=(cid, inline_id))
point_list = [point.get("pid") for point in point_info]
terms = {"pid": point_list}
elif point_id == -1: # 选的全部
# 1.找出工厂所有pid,point表add_to_company字段为1
sql = "SELECT pid from `point` WHERE cid = %s " \
"and add_to_company = 1"
async with MysqlUtil() as conn:
point_info = await conn.fetchall(sql=sql, args=(cid,))
point_list = [point.get("pid") for point in point_info]
terms = {"pid": point_list}
else:
terms = {"pid": [point_id]}
# 1. 根据时间范围,取不同的index
date_type = range_to_type(start, end)
index = INDEX[date_type]
if date_type == "day":
date_key = "hour"
elif date_type == "month":
date_key = "day"
else:
date_key = "month"
# 2. 构造query_body
eqb = EsQueryBody(terms=terms, start=start, end=end,
date_key=date_key)
query = eqb.query()
query["aggs"] = {
"time_column": {
"date_histogram": {
"field": date_key,
"interval": date_key,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm"
},
"aggs": {
"pttl_max": {
"sum": {
"field": "pttl_max"
}
}
}
}
}
log.info(index + f"====={query}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query, index=index)
if not es_re["aggregations"]["time_column"]["buckets"]:
return "", ""
# 2.返回
es_re = es_re["aggregations"]["time_column"]["buckets"]
# 最大需量
max_val = 0
max_val_time = ""
for res in es_re:
mdp_max_value = res["pttl_max"]["value"]
if mdp_max_value and mdp_max_value > max_val:
max_val = mdp_max_value
max_val_time = res["key_as_string"]
# 根据时间范围, 返回不同时间格式
if max_val_time:
if date_type == "day":
max_val_time = max_val_time.split(" ")[1]
elif date_type == "month":
max_val_time = max_val_time.split("-", 1)[1].split(" ")[0]
else:
max_val_time = max_val_time[:7]
return max_val, max_val_time
async def pttl_max_new15(cid, start, end, point_id=None, inline_id=None):
# 根据进线,找point # 根据进线,找point
if inline_id: if inline_id:
sql = "SELECT pid from `point` WHERE cid = %s " \ sql = "SELECT pid from `point` WHERE cid = %s " \
...@@ -140,73 +60,7 @@ async def pttl_max_new15(cid, start, end, point_id=None, inline_id=None): ...@@ -140,73 +60,7 @@ async def pttl_max_new15(cid, start, end, point_id=None, inline_id=None):
return max_val, max_val_time return max_val, max_val_time
async def pttl_max_15min(cid, start, end, point_id=None, inline_id=None): async def load_pttl_max_15min(cid, start, end, point_id=None, inline_id=None):
"""负荷分布,最高负荷需要拿15min"""
# 根据进线,找point
if inline_id:
sql = "SELECT pid from `point` WHERE cid = %s " \
"and inlid_belongedto = %s"
async with MysqlUtil() as conn:
point_info = await conn.fetchall(sql=sql,
args=(cid, inline_id))
point_list = [point.get("pid") for point in point_info]
terms = {"pid": point_list}
elif point_id == -1: # 选的全部
# 1.找出工厂所有pid,point表add_to_company字段为1
sql = "SELECT pid from `point` WHERE cid = %s"
async with MysqlUtil() as conn:
point_info = await conn.fetchall(sql=sql, args=(cid,))
point_list = [point.get("pid") for point in point_info]
terms = {"pid": point_list}
else:
terms = {"pid": [point_id]}
# 1. 根据时间范围,取不同的index
index = POINT_15MIN_INDEX
date_key = "quarter_time"
interval = "15m"
# 2. 构造query_body
eqb = EsQueryBody(terms=terms, start=start, end=end,
date_key=date_key)
query = eqb.query()
query["aggs"] = {
"time_column": {
"date_histogram": {
"field": date_key,
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm"
},
"aggs": {
"pttl_max": {
"sum": {
"field": "pttl_max"
}
}
}
}
}
log.info(index + f"====={query}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query, index=index)
if not es_re["aggregations"]["time_column"]["buckets"]:
return "", ""
# 2.返回
es_re = es_re["aggregations"]["time_column"]["buckets"]
# 最大需量
max_val = 0
max_val_time = ""
for res in es_re:
mdp_max_value = res["pttl_max"]["value"]
if mdp_max_value and mdp_max_value > max_val:
max_val = mdp_max_value
max_val_time = res["key_as_string"]
# 根据时间范围, 返回不同时间格式
if max_val_time:
max_val_time = max_val_time[5:]
return max_val, max_val_time
async def pttl_max_15min_new15(cid, start, end, point_id=None, inline_id=None):
if inline_id: if inline_id:
sql = "SELECT pid from `point` WHERE cid = %s and inlid = %s" sql = "SELECT pid from `point` WHERE cid = %s and inlid = %s"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.utils.time_format import get_start_end_by_tz_time_new from unify_api.utils.time_format import get_start_end_by_tz_time_new
def convert_es_str(str1: object) -> object: async def load_compy_power(cid, start, end):
"""str date转换为str es日期格式"""
es_date = my_pendulum.from_format(str1, 'YYYY-MM-DD')
return str(es_date)
async def query_search_kwh_p(cid, start, end, interval):
query_body = {
"query": {
"bool": {
"filter": [
{
"term": {
"cid": cid
}
},
{
"range": {
"quarter_time": {
"gte": convert_es_str(start),
"lte": convert_es_str(end)
}
}
}
]
}
},
"aggs": {"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"p": {
"stats": {
"field": "p"
}
},
"kwh": {
"stats": {
"field": "kwh"
}
}
}
}
},
"sort": [
{
"quarter_time": {
"order": "asc"
}
}
]
}
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body,
index="poweriot_company_15min_power")
return es_re["aggregations"]["quarter_time"]["buckets"]
async def query_search_kwh_p_new15(cid, start, end):
start, _ = get_start_end_by_tz_time_new(start) start, _ = get_start_end_by_tz_time_new(start)
_, end = get_start_end_by_tz_time_new(end) _, end = get_start_end_by_tz_time_new(end)
sql = f""" sql = f"""
...@@ -75,53 +11,11 @@ async def query_search_kwh_p_new15(cid, start, end): ...@@ -75,53 +11,11 @@ async def query_search_kwh_p_new15(cid, start, end):
order by create_time asc order by create_time asc
""" """
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, datas = await conn.fetchall(sql=sql, args=(cid, start, end))
args=(cid, start, end))
return datas return datas
async def query_spfv_price(cid, start, end): async def load_spfv_price(cid, start, end):
query_body = {
"query": {
"bool": {
"filter": [
{
"term": {
"cid": cid
}
},
{
"range": {
"quarter_time": {
"gte": convert_es_str(start),
"lte": convert_es_str(end)
}
}
}
]
}
},
"aggs": {
"charge": {
"stats": {
"field": "charge"
}
},
"kwh": {
"stats": {
"field": "kwh"
}
}
}
}
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body,
index="poweriot_company_15min_power")
return es_re["aggregations"]["charge"]["avg"], es_re["aggregations"]["kwh"]["avg"]
async def query_spfv_price_new15(cid, start, end):
start, _ = get_start_end_by_tz_time_new(start) start, _ = get_start_end_by_tz_time_new(start)
_, end = get_start_end_by_tz_time_new(end) _, end = get_start_end_by_tz_time_new(end)
sql = f""" sql = f"""
......
...@@ -2,8 +2,7 @@ from unify_api.modules.elec_charge.common.utils import \ ...@@ -2,8 +2,7 @@ from unify_api.modules.elec_charge.common.utils import \
power_charge, max_min_time, power_charge_new15 power_charge, max_min_time, power_charge_new15
from pot_libs.sanic_api import summary from pot_libs.sanic_api import summary
from pot_libs.utils.pendulum_wrapper import my_pendulum from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.common.procedures.pttl_max import pttl_max, \ from unify_api.modules.common.procedures.pttl_max import load_pttl_max
pttl_max_new15
from unify_api.modules.elec_charge.components.elec_statistics_cps import \ from unify_api.modules.elec_charge.components.elec_statistics_cps import \
PcStatiReq, PcStatiResp, MaxpReq, MaxpResp, PcmResp PcStatiReq, PcStatiResp, MaxpReq, MaxpResp, PcmResp
from unify_api.utils.common_utils import round_2 from unify_api.utils.common_utils import round_2
...@@ -31,7 +30,7 @@ async def post_max_p(req, body: MaxpReq) -> MaxpResp: ...@@ -31,7 +30,7 @@ async def post_max_p(req, body: MaxpReq) -> MaxpResp:
point_id = body.point_id point_id = body.point_id
start = body.start start = body.start
end = body.end end = body.end
max_val, max_val_time = await pttl_max_new15(cid, start, end, point_id=point_id) max_val, max_val_time = await load_pttl_max(cid, start, end, point_id=point_id)
return MaxpResp(maxp=max_val, date_time=max_val_time) return MaxpResp(maxp=max_val, date_time=max_val_time)
...@@ -121,7 +120,7 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type): ...@@ -121,7 +120,7 @@ async def power_charge_min_max_service_new15(cid, pid, start, end, date_type):
min_kwh = {"value": "", "time": ""} min_kwh = {"value": "", "time": ""}
max_charge = {"value": "", "time": ""} max_charge = {"value": "", "time": ""}
min_charge = {"value": "", "time": ""} min_charge = {"value": "", "time": ""}
max_val, max_val_time = await pttl_max_new15(cid, start, end, point_id=pid) max_val, max_val_time = await load_pttl_max(cid, start, end, point_id=pid)
max_p["value"] = round_2(max_val) max_p["value"] = round_2(max_val)
max_p["time"] = max_val_time max_p["time"] = max_val_time
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点 # 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
......
...@@ -20,9 +20,7 @@ from unify_api.modules.common.dao.common_dao import monitor_point_join ...@@ -20,9 +20,7 @@ from unify_api.modules.common.dao.common_dao import monitor_point_join
from unify_api.modules.common.procedures.common_utils import get_electric_index from unify_api.modules.common.procedures.common_utils import get_electric_index
from unify_api.modules.common.procedures.points import proxy_points, \ from unify_api.modules.common.procedures.points import proxy_points, \
get_points_num get_points_num
from unify_api.modules.common.procedures.pttl_max import pttl_max, \ from unify_api.modules.common.procedures.pttl_max import load_pttl_max
pttl_max_new15
from unify_api.modules.electric.views.electric import METERDATA_CURRENT_KEY
from unify_api.modules.home_page.components.count_info_cps import ( from unify_api.modules.home_page.components.count_info_cps import (
MaxResidualCurrent, MaxResidualCurrent,
ElectricInfo, ElectricInfo,
...@@ -308,7 +306,7 @@ async def power_count_info(cid): ...@@ -308,7 +306,7 @@ async def power_count_info(cid):
start_time = (now - timedelta(30)).strftime("%Y-%m-%d 00:00:00") start_time = (now - timedelta(30)).strftime("%Y-%m-%d 00:00:00")
end_time = now.strftime("%Y-%m-%d %H:%M:%S") end_time = now.strftime("%Y-%m-%d %H:%M:%S")
max_30d_load, _time = await pttl_max_new15(cid, start_time, end_time, -1) max_30d_load, _time = await load_pttl_max(cid, start_time, end_time, -1)
cur_load = await real_time_load(cid) cur_load = await real_time_load(cid)
return round_2(cur_load), round_2(max_30d_load) return round_2(cur_load), round_2(max_30d_load)
......
...@@ -3,8 +3,7 @@ import re ...@@ -3,8 +3,7 @@ import re
from unify_api.utils.common_utils import round_2 from unify_api.utils.common_utils import round_2
from pot_libs.mysql_util import mysql_util from pot_libs.mysql_util import mysql_util
from pot_libs.sanic_api import summary from pot_libs.sanic_api import summary
from unify_api.modules.common.procedures.pttl_max import pttl_max, \ from unify_api.modules.common.procedures.pttl_max import load_pttl_max_15min
pttl_max_15min, pttl_max_15min_new15
from unify_api.modules.load_analysis.components.load_distribution_cps import \ from unify_api.modules.load_analysis.components.load_distribution_cps import \
DistributionReq, DistributionResp, LrBins, MaxpResp DistributionReq, DistributionResp, LrBins, MaxpResp
...@@ -53,11 +52,9 @@ async def post_load_distribution(req, ...@@ -53,11 +52,9 @@ async def post_load_distribution(req,
# 峰谷差 # 峰谷差
peak_valley = result["peak_valley"] peak_valley = result["peak_valley"]
# 最高负荷 # 最高负荷
# max_val, max_val_time = await pttl_max_15min(cid=cid, start=start, max_val, max_val_time = await load_pttl_max_15min(cid=cid, start=start,
# end=end, inline_id=inline_id) end=end,
max_val, max_val_time = await pttl_max_15min_new15(cid=cid, start=start, inline_id=inline_id)
end=end,
inline_id=inline_id)
max_p = MaxpResp(maxp=round_2(max_val), date_time=max_val_time) max_p = MaxpResp(maxp=round_2(max_val), date_time=max_val_time)
return DistributionResp(base_load=base_load, mean_load_rate=mean_load_rate, return DistributionResp(base_load=base_load, mean_load_rate=mean_load_rate,
peak_valley=peak_valley, max_load=max_p, peak_valley=peak_valley, max_load=max_p,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment