Commit f1c2b7ac authored by ZZH's avatar ZZH

remove es 2023-5-30

parent ebaa57cd
......@@ -98,65 +98,6 @@ async def sdu_alarm_content_info(cids, start, end, points):
async def risk_distribution(start, end, point_id_list, is_new=False):
"""风险分布, 暂时识电u使用"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
importance_list = [1]
# 新版sdu, 其中当前时段发生过I或II级报警为风险用户,其余为安全用户
if is_new:
importance_list = [1, 2]
query_body = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"datetime": {
"gte": es_start_str,
"lte": es_end_str
}
}
},
{
"terms": {
"point_id": point_id_list
}
},
{
"terms": {
"importance": importance_list
}
},
# 新版sdu, 限制报警类型
{"terms": {"type.keyword": SDU_ALARM_LIST}}
]
}
},
"aggs": {
"alarm_cnt": {
"terms": {
"field": "point_id"
}
}
}
}
log.info("risk_distribution query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
total_user = len(point_id_list)
risk_user = len(buckets)
security_user = total_user - risk_user
return security_user, risk_user
async def risk_distribution_new15(start, end, point_id_list, is_new=False):
li = [f"event_datetime BETWEEN '{start}' and '{end}'",
f"event_type in {tuple(SDU_ALARM_LIST)}"]
if is_new:
......@@ -169,7 +110,7 @@ async def risk_distribution_new15(start, end, point_id_list, is_new=False):
li.append(f"pid in {tuple(point_id_list)}")
mid_sql = " and ".join(li)
sql = f"SELECT DISTINCT pid FROM `point_1min_event` where {mid_sql}"
log.info(f"risk_distribution_new15 sql:{sql}")
log.info(f"risk_distribution sql:{sql}")
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
total_user = len(point_id_list)
......
......@@ -8,8 +8,7 @@ from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
risk_distribution, zdu_summary_info, \
sdu_alarm_content_info, risk_distribution_new15, \
sdu_alarm_content_info, risk_distribution, \
alarm_content_time_distribution_pds
from unify_api.modules.common.dao.common_dao import points_by_cid, \
monitor_point_join, points_monitor_by_cid
......@@ -55,7 +54,7 @@ async def sdu_alarm_statistics_service(cids, start, end, product):
)
# 2.计算风险分布, 与安电u不同, 安电u发生过I级则为风险用户
# 其中当前时段发生过I或II级报警为风险用户,其余为安全用户
security_user, risk_user = await risk_distribution_new15(start, end,
security_user, risk_user = await risk_distribution(start, end,
point_id_list,
is_new=True)
rc = RiskCount(security_user=security_user, risk_user=risk_user)
......
......@@ -12,7 +12,7 @@ from unify_api.modules.alarm_manager.service.alarm_static_service import \
from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.count_info_pds import \
electric_use_info_sdu, electric_use_info_sdu_new15
electric_use_info_sdu
from unify_api.utils.time_format import last30_day_range
......@@ -83,7 +83,7 @@ async def post_sdu_index_alarm_statistics(req, body: SiasReq) -> SiasResp:
# 安全和报警统计
res = await sdu_alarm_statistics_service([cid], start, end, product)
# 安全指数
alarm_res = await electric_use_info_sdu_new15(cid)
alarm_res = await electric_use_info_sdu(cid)
electric_use_score = round(alarm_res.electric_use_score)
return SiasResp(
risk_distribution=res.risk_distribution,
......
......@@ -1052,77 +1052,6 @@ async def optimization_count_info(company_id: int):
async def electric_use_info_sdu(cid):
"""用电安全指数, 识电u"""
start, end = last30_day_range()
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body = {
"query": {
"bool": {
"filter": [
{"term": {"cid": cid}},
{"range": {
"datetime": {"gte": start_es, "lte": end_es, }}},
{"terms": {"type.keyword": SDU_ALARM_LIST}}
],
}
},
"size": 0,
"aggs": {
"alarm_aggs": {
"terms": {"field": "importance"}
}
}
}
log.info("cal_score_safe_electric query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
score_buckets = (
es_result.get("aggregations", {}).get("alarm_aggs", {}).get("buckets",
[])
)
first_alarm_cnt = 0
second_alarm_cnt = 0
third_alarm_cnt = 0
for bucket in score_buckets:
if bucket["key"] == Importance.First.value:
first_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Second.value:
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
company_point_map = await get_points([cid])
point_len = len(company_point_map.get(cid) or {})
alarm_score = (
(
first_alarm_cnt * 2 + second_alarm_cnt * 1 + third_alarm_cnt * 0.5) / point_len
if point_len
else 0
)
if alarm_score >= 15:
alarm_score = 15
electric_use_score = get_electric_index(alarm_score)
log.info(
"point_len={} alarm_score={} electric_use_score={}".format(
point_len, alarm_score, electric_use_score
)
)
return ElectricInfo(
first_alarm_cnt=first_alarm_cnt,
second_alarm_cnt=second_alarm_cnt,
third_alarm_cnt=third_alarm_cnt,
alarm_score=alarm_score,
electric_use_score=electric_use_score,
)
async def electric_use_info_sdu_new15(cid):
start, end = last30_day_range()
first_alarm_cnt = 0
second_alarm_cnt = 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment