Commit 7e380ceb authored by ZZH's avatar ZZH

remove es 2023-5-29

parent 95140e9a
......@@ -36,35 +36,10 @@ async def alarm_count(company_ids):
return total_alarm_cnt
async def alarm_count_sdu_new15(cids):
async def load_alarm_cnt_sdu(cids):
sql = f"select count(1) doc_count from point_1min_event " \
f"where cid in %s and importance in (1, 2, 3) " \
f"and event_type in {tuple(SDU_ALARM_LIST)}"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(cids,))
return data["doc_count"] or 0
async def alarm_count_sdu_new(company_ids):
"""新版识电u报警类型只包含3种"""
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"cid": company_ids}},
{"terms": {"type.keyword": SDU_ALARM_LIST}}
]
}
},
"size": 0,
"aggs": {"cid_alarm_aggs": {"terms": {"field": "cid", "size": 10000}}},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
cid_alarm_buckets = (
es_result.get("aggregations", {}).get("cid_alarm_aggs", {}).get(
"buckets", [])
)
total_alarm_cnt = sum([i["doc_count"] for i in cid_alarm_buckets])
return total_alarm_cnt
......@@ -5,19 +5,21 @@ from unify_api import constants
async def power_use_count(company_ids):
query_body = {
"query": {"bool": {"filter": [{"terms": {"cid": company_ids}},]}},
"query": {"bool": {"filter": [{"terms": {"cid": company_ids}}, ]}},
"size": 0,
"aggs": {"kwh": {"sum": {"field": "kwh"}}},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body, index=constants.COMPANY_15MIN_POWER)
es_result = await es.search_origin(body=query_body,
index=constants.COMPANY_15MIN_POWER)
total_power = round(es_result.get("aggregations", {}).get("kwh", {}).get("value") or 0, 2)
total_power = round(
es_result.get("aggregations", {}).get("kwh", {}).get("value") or 0, 2)
return total_power
async def power_use_count_new15(cids):
async def load_cmpy_power(cids):
sql = "SELECT COUNT(kwh) kwh FROM `company_1day_power` where cid in %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(cids,))
......@@ -26,43 +28,11 @@ async def power_use_count_new15(cids):
async def inline_power_use_info(inline_ids, month_str):
"""
获取进线用电信息
:param inline_id:
:param es_start_time:
:param es_end_time:
:return:
"""
async with EsUtil() as es:
filters = [
{"terms": {"inlid": inline_ids}},
{"term": {"month": f"{month_str}-01T00:00:00+08:00"}},
]
query_body = {
"query": {"bool": {"filter": filters}},
"size": 10000,
}
es_result = await es.search_origin(body=query_body, index=constants.INLINE_1MONTH_POWER)
power_infos = es_result["hits"]["hits"]
inline_power_info_map = {
i["_source"]["inlid"]: {
"kwh": i["_source"]["kwh"],
"charge": i["_source"]["charge"],
"p": i["_source"]["p"],
}
for i in power_infos
}
return inline_power_info_map
async def inline_power_use_info_new15(inline_ids, month_str):
sql = "SELECT inlid, sum(kwh) kwh, sum(charge) charge, sum(p) p FROM " \
"`inline_1day_power` where inlid in %s and " \
f"DATE_FORMAT(create_time, '%%Y-%%m')='{month_str}' GROUP BY inlid"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(inline_ids, ))
datas = await conn.fetchall(sql, args=(inline_ids,))
inline_power_info_map = {
i["inlid"]: {
"kwh": i["kwh"],
......
......@@ -73,7 +73,7 @@ async def query_charge_aggs(date_start, date_end, cid_list):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -180,83 +180,7 @@ async def power_charge_p_cid_aggs(date_start, date_end, cid_list, interval):
return results or []
async def query_charge_aggs_points(date_start, date_end, point_list):
"""
返回: [
{
"key" : 32,
"doc_count" : 44,
"charge" : {
"value" : 8658.496337890625
},
"kwh" : {
"value" : 25314.447940826416
}
},
{
"key" : 44,
"doc_count" : 43,
"charge" : {
"value" : 13868.499267578125
},
"kwh" : {
"value" : 31743.359497070312
}
}
]
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"pid": point_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"points": {
"terms": {
"field": "pid",
"size": 10000
},
"aggs": {
"kwh": {
"sum": {
"field": "kwh"
}
},
"charge": {
"sum": {
"field": "charge"
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index_point)
return es_re["aggregations"]["points"]["buckets"]
async def query_charge_aggs_points_new15(start, end, point_list):
async def query_charge_aggs_points(start, end, point_list):
sql = f"SELECT pid,sum(kwh) kwh,SUM(charge) charge " \
f"FROM `point_15min_power` " \
f"where pid in %s and create_time BETWEEN '{start}' and '{end}' " \
......@@ -270,7 +194,7 @@ async def histogram_aggs_points(date_start, date_end, point_list, interval):
"""date_histogram"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -332,7 +256,7 @@ async def power_charge_p_point_aggs(date_start, date_end, pid_list, interval):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......@@ -429,7 +353,7 @@ async def extended_bounds_agg(date_start, date_end, cid_list, interval):
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
......
......@@ -5,7 +5,7 @@ from unify_api.modules.common.components.common_cps import LevelResp
from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.elec_charge.components.elec_charge_cps import KpResp
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, query_charge_aggs_points_new15
query_charge_aggs_points
from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points
from unify_api.modules.home_page.procedures.count_info_pds import current_load, \
......@@ -20,7 +20,7 @@ async def kwh_points_service(cid, start, end, storeys):
# 获取point_id列表
points = [i.get("point_id") for i in point_list]
# 2. es查询数据
es_res = await query_charge_aggs_points_new15(start, end, points)
es_res = await query_charge_aggs_points(start, end, points)
es_res = {i["pid"]: i for i in es_res if es_res}
# 3. 构造返回
kwh_data = {}
......@@ -64,7 +64,7 @@ async def kwh_card_level_service(cid, point_list, start, end):
# 1. 获取每个point_id的详细信息
monitor_point_list = await monitor_point_join_by_points(point_list)
# 2. es查询数据
es_res = await query_charge_aggs_points_new15(start, end, point_list)
es_res = await query_charge_aggs_points(start, end, point_list)
es_res = {i["pid"]: i for i in es_res if es_res}
# 3. 返回数据
ret_data = {
......@@ -90,11 +90,11 @@ async def kwh_card_level_service(cid, point_list, start, end):
kwh = round_2(es_res[point_id]["kwh"])
charge = round_2(es_res[point_id]["charge"])
price = round_2(division_two(charge, kwh))
res_dic["kwh"] = kwh
res_dic["charge"] = charge
res_dic["price"] = price
ret_data[m_type].append(res_dic)
return LevelResp(
inline=ret_data["inline"],
......
......@@ -18,11 +18,10 @@ from unify_api.modules.elec_charge.components.elec_charge_cps import \
IndexChargeReq, IndexChargeResp, PopReq, PopResp, MtpResp, PspResp, \
IpspResp, KpReq, KpResp, KclReq, ProductProxyReq, LoadInfoReq, LoadInfoResp
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, get_kwh_charge, query_charge_aggs_points_new15
get_kwh_charge, query_charge_aggs_points
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
quarters_trans, power_overview_proxy, total_value, \
power_aggs_cid_proxy_new15, \
power_index_cid_proxy_new15, power_overview_proxy15
quarters_trans, total_value, \
power_aggs_cid_proxy, power_index_cid_proxy, power_overview_proxy
from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service, load_info_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
......@@ -153,7 +152,7 @@ async def post_price_policy(req, body: PricePolicyReq) -> PricePolicyResp:
quarters = price.get("quarters")
price_info_dic = price
break
if len(quarters) != 96:
log.error("quarters config fail")
return PricePolicyResp(price_info=[])
......@@ -315,15 +314,15 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
end = now_date
# 获取上一周期开始结束时间
start_last, end_last = last_time_str(start, end, date_type)
power, charge = await power_overview_proxy15(start, end, cid_list)
power_last, charge_last = await power_overview_proxy15(start_last,
end_last,
cid_list)
power, charge = await power_overview_proxy(start, end, cid_list)
power_last, charge_last = await power_overview_proxy(start_last,
end_last,
cid_list)
if not all([power, charge, power_last, charge_last]):
return PopResp(power=Spvf(), charge=Spvf())
total_power = total_value(power)
total_charge = total_value(charge)
total_power_last = total_value(power_last)
total_charge_last = total_value(charge_last)
# 增长率
......@@ -368,30 +367,30 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
"month", True)
this_month_p, this_month_charge = await power_overview_proxy15(
this_month_p, this_month_charge = await power_overview_proxy(
month_start, month_end, cid_list)
last_month_p, last_month_charge = await power_overview_proxy15(
last_month_p, last_month_charge = await power_overview_proxy(
last_month_start, last_month_end, cid_list)
if not all([this_month_p, this_month_charge, last_month_p,
last_month_charge]):
return MtpResp()
this_month_total_power = total_value(this_month_p)
last_month_total_power = total_value(last_month_p)
month_power_rate = (this_month_total_power -
last_month_total_power) / last_month_total_power
# 2. 今日/昨日数据
last_day_start, last_day_end = last_time_str(today_start, today_end, "day")
this_day_p, this_day_charge = await power_overview_proxy15(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy15(
this_day_p, this_day_charge = await power_overview_proxy(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy(
last_day_start, last_day_end, cid_list)
if not all([this_day_p, this_day_charge, last_day_p, last_day_charge]):
return MtpResp()
this_day_total_power = total_value(this_day_p)
last_day_total_power = total_value(last_day_p)
day_power_rate = \
......@@ -415,7 +414,7 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
date_type = body.date_type
if date_type == "range":
date_type = "month"
# 如果end是今天, 则end=当前时间, 避免增长率错误
end_tmp = end.split(" ")[0]
now_date, timestamp = srv_time()
......@@ -427,9 +426,9 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
month_end = str(now.end_of('month').format("YYYY-MM-DD HH:mm:ss"))
if date_type == "month" and end == month_end:
end = now_date
# 2. 查询工厂电量电费信息
kwh_list, charge_list, price_list = await power_aggs_cid_proxy_new15(
kwh_list, charge_list, price_list = await power_aggs_cid_proxy(
start, end, cid_list, date_type)
kwh_list_st = sorted(kwh_list, key=lambda i: i['value'], reverse=True)
charge_list_st = sorted(charge_list, key=lambda i: i['value'],
......@@ -453,7 +452,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
return IpspResp()
today_start, today_end, month_start, month_end = today_month_date()
# 2. 获取今日数据
kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy_new15(
kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy(
today_start, today_end, cid_list, "day")
kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'],
reverse=True)[:5]
......@@ -462,7 +461,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
price_list_d_st = sorted(price_list_d, key=lambda i: i['value'],
reverse=True)[:5]
# 2. 获取本月数据
kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy_new15(
kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy(
month_start, month_end, cid_list, "month")
kwh_list_m_st = sorted(kwh_list_m, key=lambda i: i['value'],
reverse=True)[:5]
......@@ -501,7 +500,7 @@ async def get_kwh_points_download(req):
# 获取point_id列表
points = [i.get("point_id") for i in point_list]
# 3. es查询数据
es_res = await query_charge_aggs_points_new15(start, end, points)
es_res = await query_charge_aggs_points(start, end, points)
es_res = {i["pid"]: i for i in es_res if es_res}
# 4.返回数据
storey_name_list = []
......@@ -566,7 +565,7 @@ async def post_load_info(request, body: LoadInfoReq) -> LoadInfoResp:
except Exception as e:
log.exception(e)
return LoadInfoResp().server_error()
return LoadInfoResp(
current_load=current_load,
yesterday_load=yesterday_load,
......
......@@ -48,33 +48,7 @@ async def month_power_loadrate(inline_id: int, month_list: list):
return power_loadrate
async def inline_power_use_info(inline_id, es_start_time, es_end_time):
"""
获取进线用电信息
:param inline_id:
:param es_start_time:
:param es_end_time:
:return:
"""
async with EsUtil() as es:
filters = [
{"term": {"inlid": inline_id}},
{"range": {"quarter_time": {"gte": es_start_time, "lt": es_end_time,}}},
]
query_body = {
"query": {"bool": {"filter": filters}},
"size": 0,
"aggs": {"kwh": {"sum": {"field": "kwh"}}, "charge": {"sum": {"field": "charge"}},},
}
es_result = await es.search_origin(body=query_body,
index=constants.INLINE_15MIN_POWER)
return {
"charge": es_result["aggregations"]["charge"]["value"],
"kwh": es_result["aggregations"]["kwh"]["value"],
}
async def inline_power_use_info_new15(inline_id, start, end):
async def load_inline_power(inline_id, start, end):
"""
1.5版本获取进线用电信息
:param inline_id:
......@@ -85,5 +59,5 @@ async def inline_power_use_info_new15(inline_id, start, end):
sql = f"SELECT sum(kwh) kwh, sum(charge) charge FROM inline_15min_power" \
f" where inlid=%s and create_time BETWEEN '{start}' and '{end}'"
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql, args=(inline_id, ))
datas = await conn.fetchone(sql, args=(inline_id,))
return datas
......@@ -9,8 +9,8 @@ from unify_api.modules.electric_optimization.procedures.optimization_pds import
month_md_space
)
from unify_api.modules.electric_optimization.procedures.optimization_pds import (
inline_power_use_info, month_power_pcvf, month_power_factors,
month_power_loadrate, inline_power_use_info_new15
month_power_pcvf, month_power_factors, month_power_loadrate,
load_inline_power
)
from unify_api.constants import ADD_ELE_PRICE
from unify_api.modules.elec_charge.dao.elec_charge_dao import (
......@@ -142,8 +142,7 @@ async def power_peakcut_service(inline_id):
)
end_time = pendulum.datetime(now.year, now.month, 1).strftime(
"%Y-%m-%d %H:%M:%S")
power_use_info = await inline_power_use_info_new15(inline_id, start_time,
end_time)
power_use_info = await load_inline_power(inline_id, start_time, end_time)
month_kwh = power_use_info.get("kwh") or 0
month_charge = power_use_info.get("charge") or 0
avg_price = month_kwh / month_charge if month_charge else ""
......
......@@ -13,7 +13,7 @@ from unify_api.modules.common.procedures.points import proxy_points, list_point
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, point_aggs_kwh
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
proxy_power, proxy_power15
load_proxy_power
from unify_api.modules.home_page.components.count_info_proxy_cps import \
IycResp, IycmResp, RtrResp, AlarmLevelCnt, AlarmContentCnt, CmResp, \
ApcResp, AsiResp, HsiResp, AiiResp
......@@ -51,7 +51,7 @@ async def post_zd_info_factory_service(cid_list):
safe_operation_days += safe_run_days
total_alarm += alarm_count
# 4. 累计监测用电
total_power = await proxy_power15(cid_list)
total_power = await load_proxy_power(cid_list)
# 5. 日均碳排放 = 累计用电 * 0.754 / 总运行天数
# 运行天数
sql = "select create_time from company where cid = %s"
......@@ -127,28 +127,6 @@ async def risk_cost_service(cid):
async def safe_run_sdu(cid, total_tenant):
"""sdu安全运行天数"""
# 1. 每一个point总安装天数
# 安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select create_time from company where cid = %s"
company = await conn.fetchone(company_sql, (cid,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
now_time = datetime.now()
# +2表示包过其实日期
total_days = (now_time - create_time).days + 2
# 2. 根据每日聚合结果
es_res = await sdu_alarm_aggs_date(cid)
for buckets in es_res:
doc_count = buckets["doc_count"]
# 当工厂某天I级、II级报警总数小于总户数 * 5 % 时,即为安全运行
if doc_count > total_tenant * 0.05:
total_days -= 1
return total_days
async def safe_run_sdu_new15(cid, total_tenant):
# 1. 每一个point总安装天数
# 安装时间create_time
company_sql = "select create_time from company where cid = %s"
......
......@@ -5,15 +5,15 @@ from pot_libs.sanic_api import summary
from unify_api.constants import Product, PRODUCT
from unify_api.modules.common.dao.common_dao import monitor_by_cid
from unify_api.modules.common.procedures.alarm_cps import alarm_count, \
alarm_count_sdu_new, alarm_count_sdu_new15
load_alarm_cnt_sdu
from unify_api.modules.common.procedures.cids import get_cids, get_cid_info, \
get_proxy_cids
from unify_api.modules.common.procedures.common_cps import proxy_safe_run_info
from unify_api.modules.common.procedures.points import proxy_points, get_points
from unify_api.modules.common.procedures.power_cps import power_use_count, \
power_use_count_new15
load_cmpy_power
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
proxy_power, proxy_power15
load_proxy_power
from unify_api.modules.home_page.components.count_info_proxy_cps import (
CountInfoProxyResp,
ProxySecurityLevelCntResp,
......@@ -45,7 +45,7 @@ from unify_api.modules.home_page.procedures.count_info_proxy_pds import (
from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info_new15
from unify_api.modules.home_page.service.count_info_service import \
safe_run_sdu, safe_run_sdu_new15
safe_run_sdu
from unify_api.modules.elec_charge.components.elec_charge_cps import \
ProductProxyReq
from unify_api.modules.users.procedures.jwt_user import jwt_user
......@@ -79,7 +79,7 @@ async def post_count_info_proxy(req) -> CountInfoProxyResp:
safe_operation_days += safe_run_days
total_alarm += alarm_count
# 4. 累计监测用电
total_power = await proxy_power15(cid_list)
total_power = await load_proxy_power(cid_list)
return CountInfoProxyResp(
total_cid=total_cid,
total_monitor=total_monitor,
......@@ -278,7 +278,7 @@ async def post_zhidian_info_proxy(req, body: ProductProxyReq) -> AipResp:
# 2. 监测点位
total_monitor = await proxy_points(cid_list)
# 3. 累计监测用电
total_power = await proxy_power15(cid_list)
total_power = await load_proxy_power(cid_list)
# 4. 用户接入总时长, 每个工厂接入时长总和
total_run_day = await total_run_day_proxy(cid_list)
return AipResp(
......@@ -298,13 +298,13 @@ async def post_count_info_sdu_new(req, body: CisReq) -> CisResp:
total_tenant = len(monitor_list)
# 2. 安全运行天数: 以天计,当工厂某天I级、II级报警总数小于总户数*5%时,即为安全运行,
# 展示自接入累加安全运行天数
safe_day = await safe_run_sdu_new15(cid, total_tenant)
safe_day = await safe_run_sdu(cid, total_tenant)
# 3. 在线率
online_rate = 88 + random.choice([1, 1.5, 2, 2.5, 3, 3.5, 4])
# 4.累计用电
total_power = await power_use_count_new15(cids)
total_power = await load_cmpy_power(cids)
# 5. 累计报警
total_alarm = await alarm_count_sdu_new15(cids)
total_alarm = await load_alarm_cnt_sdu(cids)
return CisResp(
total_tenant=total_tenant,
online_rate=online_rate,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment