Commit ebaa57cd authored by ZZH's avatar ZZH

remove es 2023-5-30

parent 1becd439
...@@ -12,239 +12,7 @@ from unify_api.constants import EVENT_TYPE_MAP, TEMPERATURE_MAP, \ ...@@ -12,239 +12,7 @@ from unify_api.constants import EVENT_TYPE_MAP, TEMPERATURE_MAP, \
RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP
async def alarm_content_info(cid, start, end, points): async def sdu_alarm_content_info(cids, start, end, points):
"""
识电U报警统计
"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str}}},
{"term": {"cid": cid}}]
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list}},
"aggs": {
"alarm_cnt": {
"date_histogram": {
"field": "datetime",
"interval": "day",
"time_zone": "+08:00",
"format": "yyyy-MM-dd",
"min_doc_count": 0,
"extended_bounds": {"min": start_dt.strftime("%Y-%m-%d"),
"max": end_dt.strftime("%Y-%m-%d")},
},
"aggs": {
"type_cnt": {
"terms": {"field": "type.keyword"}},
# 名字es和mysql可能会不一致, 使用point_id映射name
# "name_cnt": {
# "terms": {"field": "name.keyword"}},
"point_cnt": {
"terms": {"field": "point_id"}},
}
},
}
}
log.info("alarm_content_info query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
high_power_app = {"slots": [], "value": [0] * len(buckets)}
ele_overload = {"slots": [], "value": [0] * len(buckets)}
illegal_ele_app = {"slots": [], "value": [0] * len(buckets)}
power_quality = {"slots": [], "value": [0] * len(buckets)}
ele_car_battery = {"slots": [], "value": [0] * len(buckets)}
if not buckets:
return None
points_map = {i["pid"]: i["name"] for i in points}
# 初始化,存储监测点,报警数量统计
point_dic = {}
for i in points:
point_dic[i["name"]] = 0
# 总报警数
total_alarm_cnt = 0
for index, bucket in enumerate(buckets):
time_str = bucket["key_as_string"][5:10]
high_power_app["slots"].append(time_str)
ele_overload["slots"].append(time_str)
illegal_ele_app["slots"].append(time_str)
power_quality["slots"].append(time_str)
ele_car_battery["slots"].append(time_str)
# 1. 报警类型统计
if bucket["type_cnt"]["buckets"]:
for item in bucket["type_cnt"]["buckets"]:
total_alarm_cnt += item["doc_count"]
if item["key"] == "high_power_app":
high_power_app["value"][index] += item["doc_count"]
if item["key"] == "illegal_ele_app":
illegal_ele_app["value"][index] += item["doc_count"]
if item["key"] == "ele_overload":
ele_overload["value"][index] += item["doc_count"]
if item["key"] == "power_quality_low":
power_quality["value"][index] += item["doc_count"]
if item["key"] == "ele_car_battery":
ele_car_battery["value"][index] += item["doc_count"]
# 2. 监测点,报警数量统计
# if bucket["name_cnt"]["buckets"]:
# for item in bucket["name_cnt"]["buckets"]:
# point_dic[item["key"]] += item["doc_count"]
if bucket["point_cnt"]["buckets"]:
for item in bucket["point_cnt"]["buckets"]:
point_id = item["key"]
# 根据point_id map point_name
name = points_map[point_id]
point_dic[name] += item["doc_count"]
log.info(f"high_power_app={high_power_app}")
log.info(f"ele_overload={ele_overload}")
log.info(f"illegal_ele_app={illegal_ele_app}")
log.info(f"power_quality={power_quality}")
log.info(f"ele_car_battery={ele_car_battery}")
log.info(f"point_dic={point_dic}")
log.info(f"total_alarm_cnt={total_alarm_cnt}")
return {
"high_power_app": high_power_app,
"ele_overload": ele_overload,
"illegal_ele_app": illegal_ele_app,
"power_quality": power_quality,
"ele_car_battery": ele_car_battery,
"point_dic": point_dic,
"total_alarm_cnt": total_alarm_cnt
}
async def new_alarm_content_info(cids, start, end, points):
"""
识电U正式版, 报警统计
"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str}}},
{"terms": {"cid": cids}},
# 新版sdu, 限制报警类型
{"terms": {"type.keyword": SDU_ALARM_LIST}}
]
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list}},
"aggs": {
"alarm_cnt": {
"date_histogram": {
"field": "datetime",
"interval": "day",
"time_zone": "+08:00",
"format": "yyyy-MM-dd",
"min_doc_count": 0,
"extended_bounds": {"min": start_dt.strftime("%Y-%m-%d"),
"max": end_dt.strftime("%Y-%m-%d")},
},
"aggs": {
"type_cnt": {
"terms": {"field": "type.keyword"}},
# "name_cnt": {
# "terms": {"field": "name.keyword"}},
# 用于计算报警户数
"point_cnt": {
"terms": {"field": "point_id"}},
"appliance_cnt": {
"terms": {"field": "appliance.keyword"}},
}
},
}
}
log.info("alarm_content_info query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
# 线路过载
ele_overload = {"slots": [], "value": [0] * len(buckets)}
# 违规电器接入
illegal_ele_app = {"slots": [], "value": [0] * len(buckets)}
# 电能品质
power_quality = {"slots": [], "value": [0] * len(buckets)}
# 电动车电池, 归为违规电器
# ele_car_battery = {"slots": [], "value": [0] * len(buckets)}
# 初始化,存储监测点,报警数量统计
point_dic = {}
for i in points:
point_dic[i["name"]] = 0
# 总报警数
total_alarm_cnt = 0
# 报警户数, 存在报警的point
alarm_points = []
# 违规电器统计
illegal_app_dic = {}
for index, bucket in enumerate(buckets):
time_str = bucket["key_as_string"][5:10]
ele_overload["slots"].append(time_str)
illegal_ele_app["slots"].append(time_str)
power_quality["slots"].append(time_str)
# 1. 报警类型统计
if bucket["type_cnt"]["buckets"]:
for item in bucket["type_cnt"]["buckets"]:
total_alarm_cnt += item["doc_count"]
if item["key"] == "illegal_ele_app":
illegal_ele_app["value"][index] += item["doc_count"]
if item["key"] == "ele_overload":
ele_overload["value"][index] += item["doc_count"]
if item["key"] == "power_quality_low":
power_quality["value"][index] += item["doc_count"]
# 电动车电池, 归为违规电器, 业务后台已处理
# if item["key"] == "ele_car_battery":
# illegal_ele_app["value"][index] += item["doc_count"]
# 2. 监测点,报警数量统计
# if bucket["name_cnt"]["buckets"]:
# for item in bucket["name_cnt"]["buckets"]:
# point_dic[item["key"]] += item["doc_count"]
# 3. 计算报警户数point
if bucket["point_cnt"]["buckets"]:
for item in bucket["point_cnt"]["buckets"]:
point_t = item["key"]
if point_t not in alarm_points:
alarm_points.append(point_t)
# 4.小程序,违规电器统计
if bucket["appliance_cnt"]["buckets"]:
for item in bucket["appliance_cnt"]["buckets"]:
illegal = item["key"]
if illegal not in illegal_app_dic:
illegal_app_dic[illegal] = item["doc_count"]
else:
illegal_app_dic[illegal] += item["doc_count"]
log.info(f"ele_overload={ele_overload}")
log.info(f"illegal_ele_app={illegal_ele_app}")
log.info(f"power_quality={power_quality}")
log.info(f"illegal_app_dic={illegal_app_dic}")
log.info(f"total_alarm_cnt={total_alarm_cnt}")
return {
"ele_overload": ele_overload,
"illegal_ele_app": illegal_ele_app,
"power_quality": power_quality,
"illegal_app_dic": illegal_app_dic,
"total_alarm_cnt": total_alarm_cnt,
"alarm_points_cnt": len(alarm_points)
}
async def new_alarm_content_info_new15(cids, start, end, points):
# 总报警数 # 总报警数
total_alarm_cnt = 0 total_alarm_cnt = 0
# 报警户数, 存在报警的point # 报警户数, 存在报警的point
...@@ -301,7 +69,7 @@ async def new_alarm_content_info_new15(cids, start, end, points): ...@@ -301,7 +69,7 @@ async def new_alarm_content_info_new15(cids, start, end, points):
try: try:
index = time_slots.index(str(data["dt"])) index = time_slots.index(str(data["dt"]))
except Exception as e: except Exception as e:
log.error(f"new_alarm_content_info_new15 data {str(e)}") log.error(f"sdu_alarm_content_info data {str(e)}")
continue continue
if data["event_type"] == "illegal_ele_app": if data["event_type"] == "illegal_ele_app":
illegal_ele_app["value"][index] += data["doc_count"] illegal_ele_app["value"][index] += data["doc_count"]
...@@ -461,7 +229,7 @@ async def alarm_content_time_distribution_pds(cid, start, end, ): ...@@ -461,7 +229,7 @@ async def alarm_content_time_distribution_pds(cid, start, end, ):
intervel, slots = time_pick_transf_new(start, end) intervel, slots = time_pick_transf_new(start, end)
slots = [slot[5::] for slot in slots] slots = [slot[5::] for slot in slots]
temperature, residual_current, electric_param = \ temperature, residual_current, electric_param = \
{"slots": slots, "value": [0] * len(slots)},\ {"slots": slots, "value": [0] * len(slots)}, \
{"slots": slots, "value": [0] * len(slots)}, \ {"slots": slots, "value": [0] * len(slots)}, \
{"slots": slots, "value": [0] * len(slots)} {"slots": slots, "value": [0] * len(slots)}
......
...@@ -8,8 +8,8 @@ from unify_api.modules.alarm_manager.dao.list_static_dao import \ ...@@ -8,8 +8,8 @@ from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \ sdu_alarm_importance_dao, sdu_alarm_statistics_dao, \
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \ from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
new_alarm_content_info, risk_distribution, zdu_summary_info, \ risk_distribution, zdu_summary_info, \
new_alarm_content_info_new15, risk_distribution_new15, \ sdu_alarm_content_info, risk_distribution_new15, \
alarm_content_time_distribution_pds alarm_content_time_distribution_pds
from unify_api.modules.common.dao.common_dao import points_by_cid, \ from unify_api.modules.common.dao.common_dao import points_by_cid, \
monitor_point_join, points_monitor_by_cid monitor_point_join, points_monitor_by_cid
...@@ -22,7 +22,7 @@ from unify_api.modules.home_page.procedures.security_info_pds import \ ...@@ -22,7 +22,7 @@ from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info_new15 alarm_count_info_new15
from unify_api.utils.common_utils import round_1, division_two from unify_api.utils.common_utils import round_1, division_two
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \ from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
zdu_alarm_sort_dao, alarm_content_time_distribution_dao, zdu_summary_dao zdu_alarm_sort_dao, zdu_summary_dao
async def sdu_alarm_statistics_service(cids, start, end, product): async def sdu_alarm_statistics_service(cids, start, end, product):
...@@ -33,8 +33,7 @@ async def sdu_alarm_statistics_service(cids, start, end, product): ...@@ -33,8 +33,7 @@ async def sdu_alarm_statistics_service(cids, start, end, product):
raise ParamException(message=f"{cids}没有points") raise ParamException(message=f"{cids}没有points")
point_id_list = [i["pid"] for i in points] point_id_list = [i["pid"] for i in points]
# 1.调用函数获取报警统计信息 # 1.调用函数获取报警统计信息
alarm_info_map = await new_alarm_content_info_new15(cids, start, end, alarm_info_map = await sdu_alarm_content_info(cids, start, end, points)
points)
ele_overload, illegal_ele_app, power_quality, illegal_app_dic, \ ele_overload, illegal_ele_app, power_quality, illegal_app_dic, \
total_alarm_cnt, alarm_points_cnt = ( total_alarm_cnt, alarm_points_cnt = (
alarm_info_map["ele_overload"], alarm_info_map["ele_overload"],
......
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary from pot_libs.sanic_api import summary
from unify_api.modules.alarm_manager.components.alarm_static_cps import \ from unify_api.modules.alarm_manager.components.alarm_static_cps import \
SduAlarmReq, SduAlarmResp, ContentName, RiskCount, SasReq, SassReq, \ SduAlarmReq, SduAlarmResp, ContentName, RiskCount, SasReq, SassReq, \
SassResp, AppReq, AppResp, SisReq, SisResp, SiasReq, SiasResp, SebReq, \ SassResp, AppReq, AppResp, SisReq, SisResp, SiasReq, SiasResp, SebReq, \
SebResp, SiarResp, ZsResp, ZasReq, ZasResp SebResp, SiarResp, ZsResp, ZasReq, ZasResp
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
alarm_content_info, risk_distribution
from unify_api.modules.alarm_manager.service.alarm_static_service import \ from unify_api.modules.alarm_manager.service.alarm_static_service import \
sdu_alarm_statistics_service, sdu_electric_behave_service, \ sdu_alarm_statistics_service, sdu_electric_behave_service, \
zdu_level_distribution_service, \ zdu_level_distribution_service, \
...@@ -19,64 +16,6 @@ from unify_api.modules.home_page.procedures.count_info_pds import \ ...@@ -19,64 +16,6 @@ from unify_api.modules.home_page.procedures.count_info_pds import \
from unify_api.utils.time_format import last30_day_range from unify_api.utils.time_format import last30_day_range
@summary("报警统计")
async def post_alarm_statistics(req, body: SduAlarmReq) -> SduAlarmResp:
"""目前用于 识电u->报警统计"""
product = body.product
cid = body.cid
start = body.start
end = body.end
# 获取point信息
sql_point = "SELECT pid, name FROM point WHERE cid=%s"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql_point, args=(cid,))
if not points:
return SduAlarmResp()
point_id_list = [i["pid"] for i in points]
# 1.调用函数获取报警统计信息
alarm_info_map = await alarm_content_info(cid, start, end, points)
if not alarm_info_map:
return SduAlarmResp()
high_power_app, ele_overload, illegal_ele_app, power_quality, \
ele_car_battery, point_dic, total_alarm_cnt = (
alarm_info_map["high_power_app"],
alarm_info_map["ele_overload"],
alarm_info_map["illegal_ele_app"],
alarm_info_map["power_quality"],
alarm_info_map["ele_car_battery"],
alarm_info_map["point_dic"],
alarm_info_map["total_alarm_cnt"]
)
# point_dic排序
point_list = []
for key, val in point_dic.items():
point_list.append({"name": key, "value": val})
point_list.sort(key=lambda x: x["value"], reverse=True)
# 报警类型总数,用于求比例
cn = ContentName(ele_overload=sum(ele_overload["value"]),
high_power_app=sum(high_power_app["value"]),
illegal_ele_app=sum(illegal_ele_app["value"]),
power_quality=sum(power_quality["value"]),
ele_car_battery=sum(ele_car_battery["value"])
)
# 2.计算风险分布
# 其中当前时段为发生过I级报警为风险用户,其余为安全用户
security_user, risk_user = await risk_distribution(start, end,
point_id_list)
rc = RiskCount(security_user=security_user, risk_user=risk_user)
return SduAlarmResp(
total_alarm_cnt=total_alarm_cnt,
high_power_app=high_power_app,
ele_overload=ele_overload,
illegal_ele_app=illegal_ele_app,
power_quality=power_quality,
ele_car_battery=ele_car_battery,
alarm_ranking=point_list,
content_distribution=cn,
risk_distribution=rc
)
@summary("报警统计-识电u") @summary("报警统计-识电u")
async def post_sdu_alarm_statistics(req, body: SduAlarmReq) -> SduAlarmResp: async def post_sdu_alarm_statistics(req, body: SduAlarmReq) -> SduAlarmResp:
"""目前用于 识电u->报警统计""" """目前用于 识电u->报警统计"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment