Commit 220a3fe5 authored by ZZH's avatar ZZH

remove es 2023-5-29

parent 7e380ceb
......@@ -44,93 +44,14 @@ async def get_kwh_charge(table_name, name, value, start, end):
return datas
async def query_charge_aggs(date_start, date_end, cid_list):
"""
参数: cid_list, start, end
查询: 根据参数query, aggs cid, aggs power/charge求sum
返回: [
{
"key" : 32,
"doc_count" : 44,
"charge" : {
"value" : 8658.496337890625
},
"kwh" : {
"value" : 25314.447940826416
}
},
{
"key" : 44,
"doc_count" : 43,
"charge" : {
"value" : 13868.499267578125
},
"kwh" : {
"value" : 31743.359497070312
}
}
]
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cid_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"cids": {
"terms": {
"field": "cid",
"size": 1000
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re["aggregations"]["cids"]["buckets"]
async def query_charge_new15(date_start, date_end, cid_list):
async def load_cmpy_charge(date_start, date_end, cids):
sql = f"""
select cid,sum(kwh) kwh,sum(charge) charge from company_15min_power
where cid in %s and create_time BETWEEN %s and %s
group by cid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(cid_list, date_start,
date_end))
datas = await conn.fetchall(sql=sql, args=(cids, date_start, date_end))
return datas
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import COMPANY_1DAY_POWER, COMPANY_15MIN_POWER
from unify_api.modules.common.dao.common_dao import company_by_cids
from unify_api.modules.elec_charge.components.elec_charge_cps import Spvf, \
PowerViewRes
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs, query_charge_new15
from unify_api.modules.elec_charge.dao.elec_charge_dao import load_cmpy_charge
from unify_api.utils.common_utils import round_2, process_es_data, round_4
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.time_format import convert_es_str, last_time_str
from unify_api.utils.time_format import last_time_str
from functools import reduce
......@@ -100,8 +94,8 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
# 1. 求出上周期时间
start_last, end_last = last_time_str(start, end, date_type)
# 2. 获取es结果
re_this = await query_charge_new15(start, end, cid_list)
re_last = await query_charge_new15(start_last, end_last, cid_list)
re_this = await load_cmpy_charge(start, end, cid_list)
re_last = await load_cmpy_charge(start_last, end_last, cid_list)
if not re_this:
log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
......@@ -160,10 +154,9 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
async def power_index_cid_proxy(start, end, cid_list, date_type):
"""power_aggs_cid_proxy缩减版, 没有增长率"""
# 1. 获取es结果
res = await query_charge_new15(start, end, cid_list)
res = await load_cmpy_charge(start, end, cid_list)
if not res:
log.info(
f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
log.info(f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
# 3. 构造返回
kwh_list = []
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment