Commit 1becd439 authored by ZZH's avatar ZZH

remove es 2023-5-30

parent de51a7ff
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from unify_api import constants from unify_api.constants import SDU_ALARM_LIST
from unify_api.constants import SDU_ALARM_LIST, SDU_ONE_TWO_GRADE_ALARM
from datetime import datetime
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.utils.time_format import convert_es_str
async def sdu_alarm_statistics_dao(cid, start, end, is_sdu=True): async def sdu_alarm_statistics_dao(cid, start, end, is_sdu=True):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str}}},
{"term": {"cid": cid}},
]
if is_sdu:
# 新版sdu, 限制报警类型
filter_list.append({"terms": {"type.keyword": SDU_ALARM_LIST}})
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list}},
"aggs": {
"alarm_cnt": {
"date_histogram": {
"field": "datetime",
"interval": "day",
"time_zone": "+08:00",
"format": "yyyy-MM-dd",
"min_doc_count": 0,
"extended_bounds": {"min": start_dt.strftime("%Y-%m-%d"),
"max": end_dt.strftime("%Y-%m-%d")},
},
"aggs": {
"point_cnt": {
"terms": {"field": "point_id"}},
"appliance_cnt": {
"terms": {"field": "appliance.keyword"}},
}
},
}
}
log.info("alarm_content_info query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["alarm_cnt"]["buckets"]
async def sdu_alarm_statistics_dao_new15(cid, start, end, is_sdu=True):
mid_sql = "" mid_sql = ""
if is_sdu: if is_sdu:
# 新版sdu, 限制报警类型 # 新版sdu, 限制报警类型
...@@ -66,56 +18,6 @@ async def sdu_alarm_statistics_dao_new15(cid, start, end, is_sdu=True): ...@@ -66,56 +18,6 @@ async def sdu_alarm_statistics_dao_new15(cid, start, end, is_sdu=True):
async def sdu_alarm_type_dao(start, end, points): async def sdu_alarm_type_dao(start, end, points):
"""根据points分组, 再根据报警类型分组"""
start_es = convert_es_str(start)
end_es = convert_es_str(end)
# 所有报警类型
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"point_id": points
}
},
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"points": {
"terms": {
"field": "point_id",
"size": 1000
},
"aggs": {
"type": {
"terms": {
"field": "type.keyword",
"size": 20
}
}
}
}
}
}
log.info("electric_behave query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["points"]["buckets"]
async def sdu_alarm_type_dao_new15(start, end, points):
if points and len(points) == 1: if points and len(points) == 1:
mid_str = f"pid={points[0]}" mid_str = f"pid={points[0]}"
else: else:
...@@ -130,58 +32,6 @@ async def sdu_alarm_type_dao_new15(start, end, points): ...@@ -130,58 +32,6 @@ async def sdu_alarm_type_dao_new15(start, end, points):
async def sdu_alarm_limit_type_dao(start, end, points): async def sdu_alarm_limit_type_dao(start, end, points):
"""根据points分组, 再根据报警类型分组, 限制type"""
start_es = convert_es_str(start)
end_es = convert_es_str(end)
# 所有报警类型
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"point_id": points
}
},
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
},
# 新版sdu, 限制报警类型
{"terms": {"type.keyword": SDU_ALARM_LIST}}
]
}
},
"aggs": {
"points": {
"terms": {
"field": "point_id",
"size": 1000
},
"aggs": {
"type": {
"terms": {
"field": "type.keyword",
"size": 20
}
}
}
}
}
}
log.info("electric_behave query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["points"]["buckets"]
async def sdu_alarm_limit_type_dao_new15(start, end, points):
sql = f"SELECT pid, event_type, count(1) doc_count " \ sql = f"SELECT pid, event_type, count(1) doc_count " \
f"FROM `point_1min_event` WHERE pid in %s and " \ f"FROM `point_1min_event` WHERE pid in %s and " \
f"event_datetime BETWEEN '{start}' and '{end}' " \ f"event_datetime BETWEEN '{start}' and '{end}' " \
...@@ -192,61 +42,6 @@ async def sdu_alarm_limit_type_dao_new15(start, end, points): ...@@ -192,61 +42,6 @@ async def sdu_alarm_limit_type_dao_new15(start, end, points):
async def sdu_alarm_importance_dao(start, end, points, is_sdu=None): async def sdu_alarm_importance_dao(start, end, points, is_sdu=None):
"""根据points分组, 再根据报警等级分组"""
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"point_id": points
}
},
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"points": {
"terms": {
"field": "point_id",
"size": 1000
},
"aggs": {
"importance": {
"terms": {
"field": "importance",
"size": 10
}
}
}
}
}
}
if is_sdu:
query_body["query"]["bool"]["must"].append(
# 新版sdu, 限制报警类型
{"terms": {"type.keyword": SDU_ALARM_LIST}}
)
log.info("electric_behave query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["points"]["buckets"]
async def sdu_alarm_importance_dao_new15(start, end, points, is_sdu=None):
if points and len(points) == 1: if points and len(points) == 1:
mid_str = f"pid={points[0]}" mid_str = f"pid={points[0]}"
else: else:
......
...@@ -4,11 +4,9 @@ from unify_api.modules.alarm_manager.components.alarm_static_cps import \ ...@@ -4,11 +4,9 @@ from unify_api.modules.alarm_manager.components.alarm_static_cps import \
SduAlarmResp, RiskCount, ContentName, SassResp, AppResp, SebResp, SiarResp, \ SduAlarmResp, RiskCount, ContentName, SassResp, AppResp, SebResp, SiarResp, \
ZsResp, TimeCount, ZasResp ZsResp, TimeCount, ZasResp
from unify_api.modules.alarm_manager.dao.list_static_dao import \ from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_statistics_dao, sdu_alarm_type_dao, sdu_alarm_importance_dao, \ zdu_alarm_aggs_date_impotent, sdu_alarm_type_dao, \
sdu_alarm_limit_type_dao, \ sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \
zdu_alarm_aggs_date_impotent, sdu_alarm_type_dao_new15, \ sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao
sdu_alarm_importance_dao_new15, sdu_alarm_statistics_dao_new15, \
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao_new15
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \ from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
new_alarm_content_info, risk_distribution, zdu_summary_info, \ new_alarm_content_info, risk_distribution, zdu_summary_info, \
new_alarm_content_info_new15, risk_distribution_new15, \ new_alarm_content_info_new15, risk_distribution_new15, \
...@@ -81,51 +79,13 @@ async def sdu_alarm_statistics_service(cids, start, end, product): ...@@ -81,51 +79,13 @@ async def sdu_alarm_statistics_service(cids, start, end, product):
async def sdu_alarm_statistics_sort_service(cid, start, end, page_size, async def sdu_alarm_statistics_sort_service(cid, start, end, page_size,
page_num, sort): page_num, sort):
"""报警统计-报警记录-排名-识电u"""
# 获取point信息 # 获取point信息
points = await points_by_cid([cid]) points = await points_by_cid([cid])
if not points: if not points:
raise ParamException(message=f"{cid}没有points") raise ParamException(message=f"{cid}没有points")
# point_id_list = [i["pid"] for i in points] # point_id_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points} points_map = {i["pid"]: i["name"] for i in points}
# 1.调用函数获取报警统计信息
buckets = await sdu_alarm_statistics_dao(cid, start, end) buckets = await sdu_alarm_statistics_dao(cid, start, end)
# point报警统计
points_dic = {}
for bucket in buckets:
# 2.2 point报警数量
if bucket["point_cnt"]["buckets"]:
for item in bucket["point_cnt"]["buckets"]:
point_t = item["key"]
# point和point_name映射
point_name = points_map[point_t]
if point_name not in points_dic:
points_dic[point_name] = item["doc_count"]
else:
points_dic[point_name] += item["doc_count"]
# point报警统计
points_alarm_list = []
for key, val in points_dic.items():
points_alarm_list.append({"name": key, "value": val})
reverse = True if sort == "desc" else False
points_alarm_list.sort(key=lambda x: x["value"], reverse=reverse)
points_alarm_list_size = points_alarm_list[
(page_num - 1) * page_size: page_num * page_size]
return SassResp(
alarm_ranking_total=len(points_alarm_list),
alarm_ranking=points_alarm_list_size
)
async def sdu_alarm_statistics_sort_service_new15(cid, start, end, page_size,
page_num, sort):
# 获取point信息
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
# point_id_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
buckets = await sdu_alarm_statistics_dao_new15(cid, start, end)
if not buckets: if not buckets:
return SassResp(alarm_ranking_total=0, alarm_ranking=[]) return SassResp(alarm_ranking_total=0, alarm_ranking=[])
points_dic = {} points_dic = {}
...@@ -153,41 +113,9 @@ async def sdu_alarm_statistics_sort_service_new15(cid, start, end, page_size, ...@@ -153,41 +113,9 @@ async def sdu_alarm_statistics_sort_service_new15(cid, start, end, page_size,
async def sdu_app_statistics_sort_service(cid, start, end): async def sdu_app_statistics_sort_service(cid, start, end):
"""报警统计-报警记录-排名-识电u""" """报警统计-报警记录-排名-识电u"""
# 获取point信息
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
# point_id_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
# 1.调用函数获取报警统计信息
buckets = await sdu_alarm_statistics_dao(cid, start, end)
# 违规电器统计
illegal_app_dic = {}
for bucket in buckets:
# 1.1 电器识别
if bucket["appliance_cnt"]["buckets"]:
for item in bucket["appliance_cnt"]["buckets"]:
illegal = item["key"]
if illegal not in illegal_app_dic:
illegal_app_dic[illegal] = item["doc_count"]
else:
illegal_app_dic[illegal] += item["doc_count"]
# 排序
ele_app_list = []
for key, val in illegal_app_dic.items():
# 目前版本只展示违规
ele_app_list.append({"name": key, "value": val, "type": "违规"})
ele_app_list.sort(key=lambda x: x["value"], reverse=True)
return AppResp(
ele_app_ranking=ele_app_list,
)
async def sdu_app_statistics_sort_service_new15(cid, start, end):
"""报警统计-报警记录-排名-识电u"""
# 1.调用函数获取报警统计信息 # 1.调用函数获取报警统计信息
buckets = await sdu_alarm_statistics_dao_new15(cid, start, end) buckets = await sdu_alarm_statistics_dao(cid, start, end)
if not buckets: if not buckets:
return AppResp(ele_app_ranking=[]) return AppResp(ele_app_ranking=[])
# 违规电器统计 # 违规电器统计
...@@ -218,12 +146,11 @@ async def sdu_electric_behave_service(cid, start, end, storeys, product): ...@@ -218,12 +146,11 @@ async def sdu_electric_behave_service(cid, start, end, storeys, product):
# 获取point_id列表 # 获取point_id列表
points = [i.get("point_id") for i in point_list] points = [i.get("point_id") for i in point_list]
# 2. es查询违规/大功率/正常次数 # 2. es查询违规/大功率/正常次数
# es_type_res = await sdu_alarm_type_dao(start, end, points) es_type_res = await sdu_alarm_type_dao(start, end, points)
es_type_res = await sdu_alarm_type_dao_new15(start, end, points)
es_type_res = {i["pid"]: i for i in es_type_res if es_type_res} es_type_res = {i["pid"]: i for i in es_type_res if es_type_res}
# 2.2 es查询报警等级, 计算报警分, 需要限制报警类型为sdu新版 # 2.2 es查询报警等级, 计算报警分, 需要限制报警类型为sdu新版
es_imp_res = await sdu_alarm_importance_dao_new15(start, end, points, es_imp_res = await sdu_alarm_importance_dao(start, end, points,
is_sdu=True) is_sdu=True)
es_imp_res = {i["pid"]: i for i in es_imp_res if es_imp_res} es_imp_res = {i["pid"]: i for i in es_imp_res if es_imp_res}
# 3. 构造返回 # 3. 构造返回
return_data = {} return_data = {}
...@@ -310,7 +237,7 @@ async def sdu_index_alarm_rank(cid, start, end, product): ...@@ -310,7 +237,7 @@ async def sdu_index_alarm_rank(cid, start, end, product):
behavior_illegal_app = sorted(behavior_illegal_app, behavior_illegal_app = sorted(behavior_illegal_app,
key=lambda x: x["value"], reverse=True) key=lambda x: x["value"], reverse=True)
# 2. 报警排名, 违规行为 # 2. 报警排名, 违规行为
es_type_res = await sdu_alarm_limit_type_dao_new15(start, end, point_list) es_type_res = await sdu_alarm_limit_type_dao(start, end, point_list)
alarm_ranking = [] alarm_ranking = []
illegal_behavior = [] illegal_behavior = []
mid_goods = {} mid_goods = {}
......
...@@ -7,13 +7,11 @@ from unify_api.modules.alarm_manager.components.alarm_static_cps import \ ...@@ -7,13 +7,11 @@ from unify_api.modules.alarm_manager.components.alarm_static_cps import \
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \ from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
alarm_content_info, risk_distribution alarm_content_info, risk_distribution
from unify_api.modules.alarm_manager.service.alarm_static_service import \ from unify_api.modules.alarm_manager.service.alarm_static_service import \
sdu_alarm_statistics_service, sdu_alarm_statistics_sort_service, \ sdu_alarm_statistics_service, sdu_electric_behave_service, \
sdu_app_statistics_sort_service, sdu_electric_behave_service, \
zdu_level_distribution_service, \ zdu_level_distribution_service, \
zdu_content_distribution_service, zdu_summary_service, \ zdu_content_distribution_service, zdu_summary_service, \
zdu_alarm_sort_service_2, sdu_alarm_statistics_sort_service_new15, \ zdu_alarm_sort_service_2, sdu_alarm_statistics_sort_service, \
sdu_app_statistics_sort_service_new15, \ sdu_app_statistics_sort_service, sdu_index_alarm_rank
sdu_index_alarm_rank
from unify_api.modules.home_page.components.security_info_cps import \ from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, AlarmContentDistributionResp SecurityCountResp, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.count_info_pds import \ from unify_api.modules.home_page.procedures.count_info_pds import \
...@@ -100,8 +98,7 @@ async def post_sdu_alarm_statistics_wx(req, body: SasReq) -> SduAlarmResp: ...@@ -100,8 +98,7 @@ async def post_sdu_alarm_statistics_wx(req, body: SasReq) -> SduAlarmResp:
@summary("报警统计-报警记录-排名-识电u") @summary("报警统计-报警记录-排名-识电u")
async def post_sdu_alarm_statistics_sort(req, async def post_sdu_alarm_statistics_sort(req, body: SassReq) -> SassResp:
body: SassReq) -> SassResp:
"""目前用于 识电u->报警统计""" """目前用于 识电u->报警统计"""
product = body.product product = body.product
cid = body.cid cid = body.cid
...@@ -110,11 +107,8 @@ async def post_sdu_alarm_statistics_sort(req, ...@@ -110,11 +107,8 @@ async def post_sdu_alarm_statistics_sort(req,
page_size = body.page_size page_size = body.page_size
page_num = body.page_num page_num = body.page_num
sort = body.sort sort = body.sort
# return await sdu_alarm_statistics_sort_service(cid, start, end, page_size, return await sdu_alarm_statistics_sort_service(cid, start, end, page_size,
# page_num, sort) page_num, sort)
return await sdu_alarm_statistics_sort_service_new15(cid, start, end,
page_size, page_num,
sort)
@summary("报警统计-电器识别-排名-识电u") @summary("报警统计-电器识别-排名-识电u")
...@@ -124,7 +118,7 @@ async def post_sdu_app_statistics_sort(req, body: AppReq) -> AppResp: ...@@ -124,7 +118,7 @@ async def post_sdu_app_statistics_sort(req, body: AppReq) -> AppResp:
cid = body.cid cid = body.cid
start = body.start start = body.start
end = body.end end = body.end
return await sdu_app_statistics_sort_service_new15(cid, start, end) return await sdu_app_statistics_sort_service(cid, start, end)
@summary("首页-运行趋势-识电u") @summary("首页-运行趋势-识电u")
......
...@@ -17,8 +17,8 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil ...@@ -17,8 +17,8 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants from unify_api import constants
from unify_api.constants import Importance, EVENT_TYPE_MAP, SDU_ALARM_LIST from unify_api.constants import Importance, EVENT_TYPE_MAP, SDU_ALARM_LIST
from unify_api.modules.alarm_manager.dao.list_static_dao import \ from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_importance_dao, alarm_aggs_importance, \ alarm_aggs_importance, \
sdu_alarm_importance_dao_new15 sdu_alarm_importance_dao
from unify_api.modules.common.dao.common_dao import monitor_point_join, \ from unify_api.modules.common.dao.common_dao import monitor_point_join, \
monitor_by_cid monitor_by_cid
from unify_api.modules.common.procedures.common_utils import get_electric_index from unify_api.modules.common.procedures.common_utils import get_electric_index
...@@ -1484,7 +1484,7 @@ async def cid_alarm_importance_count(cid, start, end): ...@@ -1484,7 +1484,7 @@ async def cid_alarm_importance_count(cid, start, end):
"""计算工厂报警数, 按报警等级""" """计算工厂报警数, 按报警等级"""
monitor_point_list = await monitor_point_join(cid) monitor_point_list = await monitor_point_join(cid)
point_list = [i["pid"] for i in monitor_point_list] point_list = [i["pid"] for i in monitor_point_list]
es_res = await sdu_alarm_importance_dao_new15(start, end, point_list) es_res = await sdu_alarm_importance_dao(start, end, point_list)
es_res_key = {i["key"]: i for i in es_res} es_res_key = {i["key"]: i for i in es_res}
res_list = [] res_list = []
...@@ -1504,8 +1504,7 @@ async def cid_alarm_importance_count(cid, start, end): ...@@ -1504,8 +1504,7 @@ async def cid_alarm_importance_count(cid, start, end):
tmp_dic["third"] += b["doc_count"] tmp_dic["third"] += b["doc_count"]
tmp_dic["alarm_count"] = tmp_dic["first"] + tmp_dic["second"] + \ tmp_dic["alarm_count"] = tmp_dic["first"] + tmp_dic["second"] + \
tmp_dic[ tmp_dic["third"]
"third"]
res_list.append(tmp_dic) res_list.append(tmp_dic)
# 按照报警数, 排top5 # 按照报警数, 排top5
res_list_sort = sorted(res_list, key=lambda i: i["alarm_count"], res_list_sort = sorted(res_list, key=lambda i: i["alarm_count"],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment