Commit 448a64af authored by ZZH's avatar ZZH

remove es 2023-6-2

parent b04fd32c
...@@ -4,10 +4,7 @@ ...@@ -4,10 +4,7 @@
# Date: 2020/7/9 # Date: 2020/7/9
import json import json
import time import time
from datetime import datetime
from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.es_util.es_query import EsQuery
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.sanic_api import summary, description, examples from pot_libs.sanic_api import summary, description, examples
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log from pot_libs.logger import log
...@@ -25,9 +22,10 @@ from unify_api.modules.adio.components.adio import ( ...@@ -25,9 +22,10 @@ from unify_api.modules.adio.components.adio import (
AdioCurrent, AdioCurrent,
adio_current_example, adio_current_example,
) )
from pot_libs.common.components.query import PageRequest, Range, Equal, Filter from pot_libs.common.components.query import PageRequest
from unify_api.modules.adio.dao.adio_dao import get_location_dao, \ from unify_api.modules.adio.dao.adio_dao import (
get_location_15min_dao, get_adio_current_data get_location_dao, get_location_15min_dao, get_adio_current_data
)
@summary("返回安全监测历史曲线") @summary("返回安全监测历史曲线")
...@@ -40,24 +38,28 @@ async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse: ...@@ -40,24 +38,28 @@ async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse:
date_end = body.filter.ranges[0].end date_end = body.filter.ranges[0].end
except: except:
log.error("para error, ranges is NULL") log.error("para error, ranges is NULL")
return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[]) return AdioHistoryResponse(temperature=[], residual_current=[],
time_slots=[])
try: try:
# 形如 interval = 900 slots=['00:00', '00:15', '00:30' # 形如 interval = 900 slots=['00:00', '00:15', '00:30'
intervel, slots = time_format.time_pick_transf(date_start, date_end) intervel, slots = time_format.time_pick_transf(date_start, date_end)
except: except:
log.error("para error, date format error") log.error("para error, date format error")
return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=[]) return AdioHistoryResponse(temperature=[], residual_current=[],
time_slots=[])
try: try:
location_group = body.filter.in_groups[0].group location_group = body.filter.in_groups[0].group
except: except:
log.warning("para exception, in_groups is NULL, no location_id") log.warning("para exception, in_groups is NULL, no location_id")
return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots) return AdioHistoryResponse(temperature=[], residual_current=[],
time_slots=slots)
if not location_group: if not location_group:
log.warning("para exception, in_groups is NULL, no location_id") log.warning("para exception, in_groups is NULL, no location_id")
return AdioHistoryResponse(temperature=[], residual_current=[], time_slots=slots) return AdioHistoryResponse(temperature=[], residual_current=[],
time_slots=slots)
# 3.获取温度曲线和漏电流曲线数据 # 3.获取温度曲线和漏电流曲线数据
# 动态漏电流阈值 # 动态漏电流阈值
...@@ -78,7 +80,7 @@ async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse: ...@@ -78,7 +80,7 @@ async def post_adio_history(req, body: PageRequest) -> AdioHistoryResponse:
f"create_time BETWEEN '{date_start}' and '{date_end}' " \ f"create_time BETWEEN '{date_start}' and '{date_end}' " \
f"order by create_time" f"order by create_time"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(location_group, )) datas = await conn.fetchall(sql, args=(location_group,))
type_name = { type_name = {
"residual_current": "漏电流", "temp1": "A相", "temp2": "B相", "residual_current": "漏电流", "temp1": "A相", "temp2": "B相",
"temp3": "C相", "temp4": "N线", "temp3": "C相", "temp4": "N线",
...@@ -210,7 +212,8 @@ async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse: ...@@ -210,7 +212,8 @@ async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse:
if item_info.get("type") == "residual_current": if item_info.get("type") == "residual_current":
adio_current = AdioCurrent( adio_current = AdioCurrent(
type="residual_current", item="漏电流", real_time=time_str, value=adio_value type="residual_current", item="漏电流", real_time=time_str,
value=adio_value
) )
residual_current.append(adio_current) residual_current.append(adio_current)
else: else:
...@@ -222,7 +225,8 @@ async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse: ...@@ -222,7 +225,8 @@ async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse:
) )
temperature.append(adio_current) temperature.append(adio_current)
return AdioCurrentResponse(temperature=temperature, residual_current=residual_current) return AdioCurrentResponse(temperature=temperature,
residual_current=residual_current)
@summary("返回安全监测指标统计") @summary("返回安全监测指标统计")
...@@ -273,7 +277,7 @@ async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse: ...@@ -273,7 +277,7 @@ async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse:
else: else:
value_min_min, value_min_time_data = "", "" value_min_min, value_min_time_data = "", ""
value_avg_list = [m for m in value_avg] value_avg_list = [m for m in value_avg]
value_avg_data = sum(value_avg_list)/len(value_avg_list) \ value_avg_data = sum(value_avg_list) / len(value_avg_list) \
if value_avg_list else 0 if value_avg_list else 0
adio_index = AdioIndex( adio_index = AdioIndex(
type=location_info[lid]["type"], type=location_info[lid]["type"],
...@@ -286,6 +290,3 @@ async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse: ...@@ -286,6 +290,3 @@ async def post_adio_index(req, body: PageRequest) -> AdioIndexResponse:
) )
adio_indexes.append(adio_index) adio_indexes.append(adio_index)
return AdioIndexResponse(adio_indexes=adio_indexes) return AdioIndexResponse(adio_indexes=adio_indexes)
...@@ -2,93 +2,13 @@ import pendulum ...@@ -2,93 +2,13 @@ import pendulum
from pot_libs.es_util.es_utils import EsUtil from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log from pot_libs.logger import log
from unify_api.constants import POINT_1MIN_EVENT, SDU_ALARM_LIST, Product, \ from unify_api.constants import POINT_1MIN_EVENT, SDU_ALARM_LIST
EVENT_TYPE_MAP, TEMPERATURE_MAP, RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP from unify_api.utils.time_format import convert_es_str
from unify_api.utils.time_format import convert_es_str, end_now_str
index = POINT_1MIN_EVENT index = POINT_1MIN_EVENT
async def new_list_alarm_dao(cid, points, page_num, page_size, start, end, async def new_list_alarm_dao(cid, points, start, end, importance, offset,
product, importance):
"""es报警信息分页列表, 可根据报警等级筛选"""
# alarm_list = list(EVENT_TYPE_MAP.keys()) # 安电u
#
# if product == Product.RecognitionElectric.value: # 识电u
alarm_list = SDU_ALARM_LIST
query_body = {
"from": (page_num - 1) * page_size,
"size": page_size,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"terms": {
"type.keyword": alarm_list
}
},
{
"terms": {
"point_id": points
}
},
{
"terms": {
"importance": importance
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
if start and end:
start_es = convert_es_str(start)
end_es = end_now_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
else:
# 有些脏数据超过现在时间的不显示
now_date = pendulum.now()
time_format = "%Y-%m-%dT%H:%M:%S+08:00"
end_date = now_date.strftime(time_format)
end_es = str(end_date)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"lte": end_es
}
}
}
)
log.info(f"index:{index}--query_body:{query_body}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re
async def new_list_alarm_dao_new15(cid, points, start, end, importance, offset,
page_size): page_size):
mid_li = [f"cid={cid}", f"event_type in {tuple(SDU_ALARM_LIST)}"] mid_li = [f"cid={cid}", f"event_type in {tuple(SDU_ALARM_LIST)}"]
if len(points) == 1: if len(points) == 1:
...@@ -107,8 +27,7 @@ async def new_list_alarm_dao_new15(cid, points, start, end, importance, offset, ...@@ -107,8 +27,7 @@ async def new_list_alarm_dao_new15(cid, points, start, end, importance, offset,
mid_li.append(f"event_datetime < '{end_date}'") mid_li.append(f"event_datetime < '{end_date}'")
# 查询总数 # 查询总数
total_sql = f""" total_sql = f"""
select count(*) total_count from point_1min_event where select count(*) total_count from point_1min_event where {' and '.join(mid_li)}
{' and '.join(mid_li)}
""" """
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
total = await conn.fetchone(total_sql) total = await conn.fetchone(total_sql)
...@@ -178,71 +97,7 @@ async def wx_list_alarm_dao(cids, product, start, end): ...@@ -178,71 +97,7 @@ async def wx_list_alarm_dao(cids, product, start, end):
return es_re return es_re
async def list_alarm_zdu_dao(cid, points, page_num, page_size, start, end, async def list_alarm_zdu_dao(cid, points, start, end, importance, event_type):
importance, event_type):
"""智电u, es报警信息分页列表, 可根据报警等级筛选"""
query_body = {
"from": (page_num - 1) * page_size,
"size": page_size,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"terms": {
"point_id": points
}
},
{
"terms": {
"importance": importance
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
if start and end:
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
if event_type:
query_body["query"]["bool"]["must"].append(
{
"terms": {
"type.keyword": event_type
}
}
)
log.info(f"index:{index}--query_body:{query_body}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re
async def list_alarm_zdu_dao_new15(cid, points, start,
end, importance, event_type):
mid_li = [f"cid={cid}"] mid_li = [f"cid={cid}"]
if len(points) == 1: if len(points) == 1:
mid_li.append(f"pid={points[0]}") mid_li.append(f"pid={points[0]}")
......
...@@ -18,7 +18,7 @@ from unify_api.modules.common.procedures.points import points_by_storeys ...@@ -18,7 +18,7 @@ from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.home_page.components.security_info_cps import \ from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.security_info_pds import \ from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info_new15 alarm_count_info
from unify_api.utils.common_utils import round_1, division_two from unify_api.utils.common_utils import round_1, division_two
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \ from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
zdu_alarm_sort_dao, zdu_summary_dao zdu_alarm_sort_dao, zdu_summary_dao
...@@ -283,7 +283,7 @@ async def sdu_index_alarm_rank(cid, start, end, product): ...@@ -283,7 +283,7 @@ async def sdu_index_alarm_rank(cid, start, end, product):
async def zdu_level_distribution_service(cid, start, end, product): async def zdu_level_distribution_service(cid, start, end, product):
"""报警统计-报警等级-智电u""" """报警统计-报警等级-智电u"""
alarm_info_map = await alarm_count_info_new15([cid], start, end, "month") alarm_info_map = await alarm_count_info([cid], start, end, "month")
first_alarm, second_alarm, third_alarm = ( first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"], alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"], alarm_info_map["second_alarm"],
......
...@@ -5,8 +5,8 @@ from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \ ...@@ -5,8 +5,8 @@ from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \
company_extend_dao, list_alarm_data_dao, get_list_alarm_dao, \ company_extend_dao, list_alarm_data_dao, get_list_alarm_dao, \
get_total_list_alarm_dao get_total_list_alarm_dao
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \ from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
new_list_alarm_dao, wx_list_alarm_dao, list_alarm_zdu_dao, \ wx_list_alarm_dao, \
wx_list_alarm_zdu_dao, list_alarm_zdu_dao_new15, new_list_alarm_dao_new15 wx_list_alarm_zdu_dao, list_alarm_zdu_dao, new_list_alarm_dao
from unify_api.modules.common.procedures.cids import get_cid_info from unify_api.modules.common.procedures.cids import get_cid_info
from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.electric.dao.electric_dao import \ from unify_api.modules.electric.dao.electric_dao import \
...@@ -34,7 +34,7 @@ async def new_list_alarm_service(cid, storeys, offset, page_size, start, end, ...@@ -34,7 +34,7 @@ async def new_list_alarm_service(cid, storeys, offset, page_size, start, end,
"point_name": i.get("room_name")} "point_name": i.get("room_name")}
for i in point_list if i["point_id"] in points] for i in point_list if i["point_id"] in points]
# 2. 查询结果 # 2. 查询结果
total, results = await new_list_alarm_dao_new15(cid, points, start, end, total, results = await new_list_alarm_dao(cid, points, start, end,
importance, importance,
offset, page_size) offset, page_size)
if not results: if not results:
...@@ -139,11 +139,8 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start, ...@@ -139,11 +139,8 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
if not point_list or not event_type or not importance: if not point_list or not event_type or not importance:
return ListAlarmResponse(total=0, rows=[]) return ListAlarmResponse(total=0, rows=[])
# # 1. es查询结果 results = await list_alarm_zdu_dao(cid, point_list, start, end, importance,
# es_res = await list_alarm_zdu_dao(cid, point_list, page_num, page_size, event_type)
# start, end, importance, event_type)
results = await list_alarm_zdu_dao_new15(cid, point_list, start, end,
importance, event_type)
real_total = len(results) real_total = len(results)
results = results[(page_num - 1) * page_size, page_num * page_size] results = results[(page_num - 1) * page_size, page_num * page_size]
# 2. 获取工厂, 报警type对应的描述信息 # 2. 获取工厂, 报警type对应的描述信息
...@@ -247,8 +244,8 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end): ...@@ -247,8 +244,8 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end):
return ListAlarmResponse(total=total, rows=rows) return ListAlarmResponse(total=total, rows=rows)
async def list_alarm_service_new15(cid, point_id, start, end, importance, async def list_alarm_service(cid, point_id, start, end, importance, page_size,
page_size, page_num, alarm_type): page_num, alarm_type):
li = ["event_mode!='scope'"] li = ["event_mode!='scope'"]
if point_id: if point_id:
li.append(f"pid={point_id}") li.append(f"pid={point_id}")
......
...@@ -2,27 +2,18 @@ ...@@ -2,27 +2,18 @@
# #
# Author:jing # Author:jing
# Date: 2020/7/9 # Date: 2020/7/9
import random from pot_libs.sanic_api import summary, description
from datetime import datetime, timedelta from pot_libs.utils.exc_util import BusinessException
from pot_libs.sanic_api import summary, description, examples
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.es_util.es_query import EsQuery
from pot_libs.utils.exc_util import DBException, BusinessException
from unify_api.constants import Product from unify_api.constants import Product
from unify_api.modules.alarm_manager.components.alarm_static_cps import NlaReq, \ from unify_api.modules.alarm_manager.components.alarm_static_cps import NlaReq, \
WlaReq, LazReq WlaReq, LazReq
from unify_api.modules.alarm_manager.service.list_alarm_service import \ from unify_api.modules.alarm_manager.service.list_alarm_service import \
new_list_alarm_service, wx_list_alarm_service, list_alarm_zdu_service, \ new_list_alarm_service, wx_list_alarm_service, list_alarm_zdu_service, \
wx_list_alarm_zdu_service, list_alarm_service_new15 wx_list_alarm_zdu_service, list_alarm_service
from unify_api.modules.common.procedures.cids import get_cid_info, get_cids, \ from unify_api.modules.common.procedures.cids import get_cid_info, get_cids, \
get_proxy_cids get_proxy_cids
from unify_api.modules.users.procedures.jwt_user import jwt_user from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils import time_format from pot_libs.common.components.query import PageRequest
from unify_api import constants
from pot_libs.common.components.query import PageRequest, Equal, Range, Filter, \
InGroup
from unify_api.modules.alarm_manager.components.list_alarm import ( from unify_api.modules.alarm_manager.components.list_alarm import (
ListAlarmResponse, ListAlarmReq, ListAlarmResponse, ListAlarmReq,
Alarm, Alarm,
...@@ -83,139 +74,11 @@ async def post_list_alarm(req, body: PageRequest) -> ListAlarmResponse: ...@@ -83,139 +74,11 @@ async def post_list_alarm(req, body: PageRequest) -> ListAlarmResponse:
cids = [cid] cids = [cid]
if not cids: if not cids:
raise BusinessException(message=f"你没有工厂权限") raise BusinessException(message=f"你没有工厂权限")
return await list_alarm_service_new15(cids, point_id, start, end, return await list_alarm_service(cids, point_id, start, end,
importance, page_size, page_num, importance, page_size, page_num,
alarm_type) alarm_type)
@summary("上面的备份")
@description("筛选字段:监测点")
@examples(list_alarm_example)
async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
# 将"2020-05-02 17:32:31”格式的时间转化为"2020-05-02T17:32:31+08:00"
# 小程序首页展示报警信息列表最新40条,不需要传时间
if body.filter.ranges:
_range = body.filter.ranges[0]
field, start, end = _range.field, _range.start, _range.end
start_dt = time_format.convert_to_dt(start)
start_str = time_format.convert_dt_to_str(start_dt, date_type="tz")
end_dt = time_format.convert_to_dt(end)
end_str = time_format.convert_dt_to_str(end_dt, date_type="tz")
_range = Range(field=field, start=start_str, end=end_str)
ranges = [_range]
else:
ranges = []
# TODO:当没有选择监测点的时候,需要从req取cid,作为筛选条件
if req.json.get("cid"):
cid = req.json["cid"]
equal = Equal(field="cid", value=cid)
body.filter.equals.append(equal)
if req.json.get("product") == Product.AndianUManage.value:
proxy_id = req.json.get("proxy_id")
product = req.json.get("product")
user_id = req.ctx.user_id
req_cids = req.json.get("cids")
# cids = await get_cids(user_id, product)
cids = await get_proxy_cids(user_id, product, proxy_id)
if req_cids:
if any(i for i in req_cids if i not in cids):
raise BusinessException(
message=f"你没有工厂{set(req_cids) - set(cids)}没有权限")
in_group = InGroup(field="cid", group=req_cids)
else:
in_group = InGroup(field="cid", group=cids)
body.filter.in_groups.append(in_group)
if req.json.get("product") in [Product.RecognitionElectric.value,
Product.IntelligentU.value]:
req_cid = req.json.get("cid")
if not req_cid:
product = req.json.get("product")
user_id = req.ctx.user_id
cids = await get_cids(user_id, product)
else:
cids = [req_cid]
in_group = InGroup(field="cid", group=cids)
body.filter.in_groups.append(in_group)
filter = Filter(
equals=body.filter.equals,
ranges=ranges,
in_groups=body.filter.in_groups,
keywords=body.filter.keywords,
)
# 重新封装PageRequest
page_request = PageRequest(
page_size=body.page_size, page_num=body.page_num, sort=body.sort,
filter=filter
)
query_body = EsQuery().query(page_request)
if not query_body.get("query"):
query = {
"bool": {"must_not": [{"terms": {"mode.keyword": ["scope"]}}]}}
query_body["query"] = query
else:
must_not = [{"terms": {"mode.keyword": ["scope"]}}]
query_body["query"]["bool"]["must_not"] = must_not
index = "poweriot_point_1min_event"
log.info(f"index:{index}--query_body:{query_body}")
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body, index=index)
if not es_results:
log.warning(
"Can not find data on es(index: %s): %s" % (index, query_body))
raise DBException
cid_info_map = await get_cid_info(all=True)
rows = []
for info in es_results["hits"]["hits"]:
es_id = info["_id"]
source = info["_source"]
type = source.get("type")
mode = source.get("mode")
type_str = constants.EVENT_TYPE_MAP.get(type, type)
point_id = source.get("point_id")
location_id = source.get("location_id")
date_time = source.get("datetime")
dt = time_format.convert_to_dt(date_time)
date_time = time_format.convert_dt_to_timestr(dt)
event_duration = source.get("event_duration")
if point_id and mode == "scope":
url = "/scope_details?doc_id=%s" % es_id
redirect_type = "scope"
elif location_id and type in constants.TEMP_SCOPE_URL_TYPE:
url = "/temp_trend?doc_id=%s" % es_id
redirect_type = "temp_trend"
else:
url = None
redirect_type = ""
cid = int(source.get("cid")) if source.get("cid") else source.get(
"cid")
alarm = Alarm(
name=source.get("name"),
importance=source.get("importance"),
date_time=date_time,
type=type,
type_name=type_str,
description=source.get("message"),
redirect_type=redirect_type,
es_id=es_id,
url=url,
event_duration=round(event_duration)
if isinstance(event_duration, float) else event_duration,
company_name=cid_info_map.get(cid, {}).get("fullname", ""),
)
rows.append(alarm)
real_total = es_results["hits"]["total"]
total = real_total if real_total < constants.ES_TOTAL_LIMIT else constants.ES_TOTAL_LIMIT
return ListAlarmResponse(total=total, rows=rows)
@summary("报警记录-列表分页") @summary("报警记录-列表分页")
async def post_new_list_alarm(req, body: NlaReq) -> ListAlarmResponse: async def post_new_list_alarm(req, body: NlaReq) -> ListAlarmResponse:
# 1. 获取参数 # 1. 获取参数
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
from math import sqrt from math import sqrt
import pendulum import pendulum
import datetime import datetime
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.common.dao.health_score_dao import \ from unify_api.modules.common.dao.health_score_dao import \
......
import calendar
import random
from collections import defaultdict
from datetime import datetime
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants from unify_api.constants import Importance
from unify_api.constants import Importance, Product
from unify_api.modules.common.procedures.common_cps import ( from unify_api.modules.common.procedures.common_cps import (
proxy_safe_run_info, proxy_safe_run_info,
alarm_time_distribution, alarm_time_distribution,
) )
from unify_api.utils.time_format import get_start_end_by_tz_time_new, \ from unify_api.utils.time_format import proxy_power_slots, day_slots
proxy_power_slots, day_slots
async def alarm_count_info(company_ids, start, end, date_type): async def alarm_count_info(company_ids, start, end, date_type):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00"
)
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
if date_type == "day":
interval = "hour"
_format = "yyyy-MM-dd HH:mm:ss"
_min = start_dt.strftime("%Y-%m-%d %H:%M:%S")
_max = end_dt.strftime("%Y-%m-%d %H:%M:%S")
else:
# date_type == "month"
interval = "day"
_format = "yyyy-MM-dd"
_min = start_dt.strftime("%Y-%m-%d")
_max = end_dt.strftime("%Y-%m-%d")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str, }}},
{"terms": {"cid": company_ids}}]
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list, }},
"aggs": {
"alarm_cnt": {
"date_histogram": {
"field": "datetime",
"interval": interval,
"time_zone": "+08:00",
"format": _format,
"min_doc_count": 0,
"extended_bounds": {"min": _min, "max": _max, },
},
"aggs": {"type_cnt": {"terms": {"field": "importance"}}},
},
"cid_aggs": {"terms": {"field": "cid"}},
"type_aggs": {"terms": {"field": "type.keyword"}},
},
}
log.info("alarm_count_info query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
first_alarm = {"slots": [], "value": [0] * len(buckets)}
second_alarm = {"slots": [], "value": [0] * len(buckets)}
third_alarm = {"slots": [], "value": [0] * len(buckets)}
cid_buckets = es_result["aggregations"]["cid_aggs"]["buckets"]
cid_alarm_cnt_map = {i["key"]: i["doc_count"] for i in cid_buckets}
type_buckets = es_result["aggregations"]["type_aggs"]["buckets"]
type_alarm_cnt_map = {i["key"]: i["doc_count"] for i in type_buckets}
for index, bucket in enumerate(buckets):
if date_type == "day":
time_str = bucket["key_as_string"][11:16]
else:
time_str = bucket["key_as_string"][5:10]
first_alarm["slots"].append(time_str)
second_alarm["slots"].append(time_str)
third_alarm["slots"].append(time_str)
if bucket["type_cnt"]["buckets"]:
for item in bucket["type_cnt"]["buckets"]:
if item["key"] == Importance.First.value:
first_alarm["value"][index] += item["doc_count"]
elif item["key"] == Importance.Second.value:
second_alarm["value"][index] += item["doc_count"]
elif item["key"] == Importance.Third.value:
third_alarm["value"][index] += item["doc_count"]
log.info(f"first_alarm={first_alarm}")
log.info(f"second_alarm={second_alarm}")
log.info(f"third_alarm={third_alarm}")
return {
"first_alarm": first_alarm,
"second_alarm": second_alarm,
"third_alarm": third_alarm,
"cid_alarm_cnt_map": cid_alarm_cnt_map,
"type_alarm_cnt_map": type_alarm_cnt_map,
}
async def alarm_count_info_new15(company_ids, start, end, date_type):
if date_type == "day": if date_type == "day":
date_fmt = "DATE_FORMAT(event_datetime,'%%H')" date_fmt = "DATE_FORMAT(event_datetime,'%%H')"
slots = day_slots('hours') slots = day_slots('hours')
......
...@@ -42,7 +42,7 @@ from unify_api.modules.home_page.procedures.count_info_proxy_pds import ( ...@@ -42,7 +42,7 @@ from unify_api.modules.home_page.procedures.count_info_proxy_pds import (
total_run_day_proxy, total_run_day_proxy,
) )
from unify_api.modules.home_page.procedures.security_info_pds import \ from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info_new15 alarm_count_info
from unify_api.modules.home_page.service.count_info_service import \ from unify_api.modules.home_page.service.count_info_service import \
safe_run_sdu safe_run_sdu
from unify_api.modules.elec_charge.components.elec_charge_cps import \ from unify_api.modules.elec_charge.components.elec_charge_cps import \
...@@ -225,7 +225,7 @@ async def post_reg_alarm_distribution(request, ...@@ -225,7 +225,7 @@ async def post_reg_alarm_distribution(request,
if product == Product.RecognitionElectric.value: if product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id user_id = request.ctx.user_id
cids = await get_cids(user_id, product) cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type) alarm_info_map = await alarm_count_info(cids, start, end, date_type)
type_alarm_cnt_map = alarm_info_map["type_alarm_cnt_map"] type_alarm_cnt_map = alarm_info_map["type_alarm_cnt_map"]
return AlarmDistributionResp( return AlarmDistributionResp(
alarm_categories=RegAlarmCnt( alarm_categories=RegAlarmCnt(
...@@ -250,7 +250,7 @@ async def post_reg_alarm_rank(request, ...@@ -250,7 +250,7 @@ async def post_reg_alarm_rank(request,
if product == Product.RecognitionElectric.value: if product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id user_id = request.ctx.user_id
cids = await get_cids(user_id, product) cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type) alarm_info_map = await alarm_count_info(cids, start, end, date_type)
cid_alarm_cnt_map = alarm_info_map["cid_alarm_cnt_map"] cid_alarm_cnt_map = alarm_info_map["cid_alarm_cnt_map"]
cid_info_map = await get_cid_info(all=True) cid_info_map = await get_cid_info(all=True)
......
...@@ -21,7 +21,7 @@ from unify_api.modules.home_page.components.security_info_cps import ( ...@@ -21,7 +21,7 @@ from unify_api.modules.home_page.components.security_info_cps import (
AlarmSummaryResp, AlarmSummaryResp,
) )
from unify_api.modules.home_page.procedures.security_info_pds import ( from unify_api.modules.home_page.procedures.security_info_pds import (
alarm_summary, alarm_count_info_new15, alarm_summary, alarm_count_info,
) )
from unify_api.modules.users.procedures.jwt_user import jwt_user from unify_api.modules.users.procedures.jwt_user import jwt_user
...@@ -44,7 +44,7 @@ async def post_security_index(request, body: SecurityCountReq) -> SecurityCountR ...@@ -44,7 +44,7 @@ async def post_security_index(request, body: SecurityCountReq) -> SecurityCountR
elif product == Product.RecognitionElectric.value: elif product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id user_id = request.ctx.user_id
cids = await get_cids(user_id, product) cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type) alarm_info_map = await alarm_count_info(cids, start, end, date_type)
first_alarm, second_alarm, third_alarm = ( first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"], alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"], alarm_info_map["second_alarm"],
...@@ -85,7 +85,7 @@ async def post_alarm_level_distribution(request, body: SecurityCommonReq) -> Sec ...@@ -85,7 +85,7 @@ async def post_alarm_level_distribution(request, body: SecurityCommonReq) -> Sec
else: else:
raise BusinessException(message=f"暂时不支持其他产品") raise BusinessException(message=f"暂时不支持其他产品")
alarm_info_map = await alarm_count_info_new15(req_cids, start, end, date_type) alarm_info_map = await alarm_count_info(req_cids, start, end, date_type)
first_alarm, second_alarm, third_alarm = ( first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"], alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"], alarm_info_map["second_alarm"],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment