Commit 9fc31e09 authored by wang.wenrong's avatar wang.wenrong

zdu-content-distribution

parent 80208422
......@@ -445,4 +445,32 @@ TDENGINE_CHILD_SUBFIX_LIST = {
"db_scope": "scp",
"db_adio": "adi",
"db_electric": "ele",
}
# 温度
TEMPERATURE_MAP = {
"overTemp",
"overTempRange1min",
"overTempRange15min",
"overTempTrendDaily",
"overTempTrendQuarterly"
}
# 漏电流
RESIDUAL_CURRENT_MAP = {
"overResidualCurrent"
}
# 电参数
ELECTRIC_PARAM_MAP = {
"overTHDI", # 电流总谐波有效值越限
"overTHDU", # 电压总谐波畸变率越限
"overU", # 过压
"underU", # 欠压
"overI",
"underPhasePF", # 单相功率因数越下限
"underTotalPF", # 总功率因数越下限
"unbalanceI", # 三相电流不平衡度
"unbalanceU", # 三相电压不平衡度
"overPR"
}
\ No newline at end of file
......@@ -7,9 +7,9 @@ from pot_libs.sanic_api.column import Str, Int, Opt, List, Float, Dict
@dataclass
class SduAlarmReq(Model):
cid: Cid
start: str = Str("开始时间").eg("2020-07-30 00:00:00")
end: str = Str("结束时间").eg("2020-07-30 23:59:59")
cid: int = Int("cid").eg(1)
start: str = Str("开始时间").eg("2023-02-01 00:00:00")
end: str = Str("结束时间").eg("2023-02-28 23:59:59")
product: int = Int("product 1-知电 2-安电 3-安电管理 4-识电U").eg(4)
......
......@@ -3,7 +3,7 @@ from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
from unify_api.constants import POINT_1MIN_EVENT, SDU_ALARM_LIST, Product, \
EVENT_TYPE_MAP
EVENT_TYPE_MAP, TEMPERATURE_MAP, RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP
from unify_api.utils.time_format import convert_es_str, end_now_str
index = POINT_1MIN_EVENT
......@@ -315,6 +315,9 @@ async def wx_list_alarm_zdu_dao(cid, points, start, end):
async def zdu_alarm_sort_dao(cid, start, end, page_size, page_num):
"""
智电告警排名
"""
sql = f"""
SELECT
pt.`name`,
......@@ -336,8 +339,36 @@ async def zdu_alarm_sort_dao(cid, start, end, page_size, page_num):
pid
ORDER BY
event_count DESC
) a ON pt.pid = a.pid limit {(page_num-1)*page_size}, {page_size}
) a ON pt.pid = a.pid limit {(page_num - 1) * page_size}, {page_size}
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, args=())
return data
async def alarm_content_time_distribution_dao(cid, start, end,):
"""
智电温度漏电流电参数分布
"""
mid_li = [f"cid={cid}"]
if start and end:
mid_li.append(f"event_datetime BETWEEN '{start}' and '{end}'")
sql = f"""
SELECT
event_type,
DATE_FORMAT( event_datetime, '%m-%d' ) dat,
count(*) coun
FROM
power_iot.point_1min_event
WHERE
{' and '.join(mid_li)}
GROUP BY
DATE_FORMAT( event_datetime, '%m-%d' ),
event_type
ORDER BY
dat ASC
"""
async with MysqlUtil() as conn:
elec_data = await conn.fetchall(sql, )
return elec_data
......@@ -5,7 +5,11 @@ from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import SDU_ALARM_LIST
from unify_api.utils.time_format import proxy_power_slots
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
alarm_content_time_distribution_dao
from unify_api.utils.time_format import proxy_power_slots, time_pick_transf_new
from unify_api.constants import EVENT_TYPE_MAP, TEMPERATURE_MAP, \
RESIDUAL_CURRENT_MAP, ELECTRIC_PARAM_MAP
async def alarm_content_info(cid, start, end, points):
......@@ -443,3 +447,80 @@ async def zdu_summary_info(cid, start, end):
total_alarm_cnt += info["doc_count"]
return total_alarm_cnt, len(buckets) # 总监测点数
async def alarm_content_time_distribution_pds(cid, start, end, ):
electric_param_detail = {
"harmonic": 0,
"voltage": 0,
"current": 0,
"power_factor": 0,
"threephase_imbalance": 0,
"load_rate": 0,
}
intervel, slots = time_pick_transf_new(start, end)
slots = [slot[5::] for slot in slots]
temperature, residual_current, electric_param = \
{"slots": slots, "value": [0] * len(slots)},\
{"slots": slots, "value": [0] * len(slots)}, \
{"slots": slots, "value": [0] * len(slots)}
elec_data = await alarm_content_time_distribution_dao(cid, start, end, )
if elec_data:
for index, slot in enumerate(slots):
for elec_info in elec_data:
if slot == elec_info.get("dat"):
if elec_info.get("event_type") in TEMPERATURE_MAP:
temperature['value'][index] += elec_info.get("coun", 0)
elif elec_info.get("event_type") in RESIDUAL_CURRENT_MAP:
residual_current["value"][index] += elec_info.get(
"coun", 0)
else:
electric_param["value"][index] += elec_info.get("coun",
0)
if elec_info.get("event_type") in [
"overTHDI", # 电流总谐波有效值越限
"overTHDU", # 电压总谐波畸变率越限
]:
electric_param_detail["harmonic"] += elec_info.get(
"coun", 0)
elif elec_info.get("event_type") in [
"overU", # 过压
"underU", # 欠压
]:
electric_param_detail["voltage"] += elec_info.get(
"coun", 0)
elif elec_info.get("event_type") in [
"overI",
]:
electric_param_detail["current"] += elec_info.get(
"coun", 0)
elif elec_info.get("event_type") in [
"underPhasePF", # 单相功率因数越下限
"underTotalPF", # 总功率因数越下限
]:
electric_param_detail[
"power_factor"] += elec_info.get(
"coun", 0)
elif elec_info.get("event_type") in [
"unbalanceI", # 三相电流不平衡度
"unbalanceU", # 三相电压不平衡度
]:
electric_param_detail["threephase_imbalance"] += \
elec_info.get("coun", 0)
elif elec_info.get("event_type") in ["overPR"]:
electric_param_detail[
"load_rate"] += elec_info.get(
"coun", 0)
else:
temperature['value'][index] += 0
residual_current['value'][index] += 0
electric_param['value'][index] += 0
return {
"temperature": temperature,
"residual_current": residual_current,
"electric_param": electric_param,
"electric_param_detail": electric_param_detail,
}
......@@ -11,7 +11,8 @@ from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_behavior_dao_new15, sdu_alarm_limit_type_dao_new15
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
new_alarm_content_info, risk_distribution, zdu_summary_info, \
new_alarm_content_info_new15, risk_distribution_new15
new_alarm_content_info_new15, risk_distribution_new15, \
alarm_content_time_distribution_pds
from unify_api.modules.common.dao.common_dao import points_by_cid, \
monitor_point_join, points_monitor_by_cid
from unify_api.modules.common.procedures.common_cps import \
......@@ -20,10 +21,10 @@ from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_content_time_distribution, alarm_count_info_new15
alarm_count_info_new15
from unify_api.utils.common_utils import round_1, division_two
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
zdu_alarm_sort_dao
zdu_alarm_sort_dao, alarm_content_time_distribution_dao
async def sdu_alarm_statistics_service(cids, start, end, product):
......@@ -441,8 +442,7 @@ async def zdu_level_distribution_service(cid, start, end, product):
async def zdu_content_distribution_service(cid, start, end, product):
"""报警统计-报警内容-智电u"""
alarm_info_map = await alarm_content_time_distribution([cid], start,
end, "month")
alarm_info_map = await alarm_content_time_distribution_pds(cid, start, end)
temperature, residual_current, electric_param, electric_param_detail = (
alarm_info_map["temperature"],
alarm_info_map["residual_current"],
......
......@@ -166,139 +166,6 @@ async def alarm_count_info_new15(company_ids, start, end, date_type):
}
async def alarm_content_time_distribution(company_ids, start, end, date_type):
"""
电参数,温度,漏电流时间分布
:param company_ids:
:param start:
:param end:
:param date_type:
:return:
"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00"
)
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
if date_type == "day":
interval = "hour"
_format = "yyyy-MM-dd HH:mm:ss"
_min = start_dt.strftime("%Y-%m-%d %H:%M:%S")
_max = end_dt.strftime("%Y-%m-%d %H:%M:%S")
else:
# date_type == "month"
interval = "day"
_format = "yyyy-MM-dd"
_min = start_dt.strftime("%Y-%m-%d")
_max = end_dt.strftime("%Y-%m-%d")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str, }}},
{"terms": {"cid": company_ids}}]
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list, }},
"aggs": {
"alarm_cnt": {
"date_histogram": {
"field": "datetime",
"interval": interval,
"time_zone": "+08:00",
"format": _format,
"min_doc_count": 0,
"extended_bounds": {"min": _min, "max": _max, },
},
"aggs": {"type_cnt": {
"terms": {"field": "type.keyword", "size": 10000}}},
}
},
}
log.info("alarm_count_info query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
temperature = {"slots": [], "value": [0] * len(buckets)}
residual_current = {"slots": [], "value": [0] * len(buckets)}
electric_param = {"slots": [], "value": [0] * len(buckets)}
electric_param_detail = {
"harmonic": 0,
"voltage": 0,
"current": 0,
"power_factor": 0,
"threephase_imbalance": 0,
"load_rate": 0,
}
for index, bucket in enumerate(buckets):
if date_type == "day":
time_str = bucket["key_as_string"][11:16]
else:
time_str = bucket["key_as_string"][5:10]
temperature["slots"].append(time_str)
residual_current["slots"].append(time_str)
electric_param["slots"].append(time_str)
if bucket["type_cnt"]["buckets"]:
for item in bucket["type_cnt"]["buckets"]:
if item["key"] in [
"overTemp",
"overTempRange1min",
"overTempRange15min",
"overTempTrendDaily",
"overTempTrendQuarterly",
]:
temperature["value"][index] += item["doc_count"]
elif item["key"] in [
"overResidualCurrent",
]:
residual_current["value"][index] += item["doc_count"]
else:
electric_param["value"][index] += item["doc_count"]
if item["key"] in [
"overTHDI", # 电流总谐波有效值越限
"overTHDU", # 电压总谐波畸变率越限
]:
electric_param_detail["harmonic"] += item["doc_count"]
elif item["key"] in [
"overU", # 过压
"underU", # 欠压
]:
electric_param_detail["voltage"] += item["doc_count"]
elif item["key"] in [
"overI",
]:
electric_param_detail["current"] += item["doc_count"]
elif item["key"] in [
"underPhasePF", # 单相功率因数越下限
"underTotalPF", # 总功率因数越下限
]:
electric_param_detail["power_factor"] += item[
"doc_count"]
elif item["key"] in [
"unbalanceI", # 三相电流不平衡度
"unbalanceU", # 三相电压不平衡度
]:
electric_param_detail["threephase_imbalance"] += item[
"doc_count"]
elif item["key"] in ["overPR"]:
electric_param_detail["load_rate"] += item["doc_count"]
log.info(f"temperature={temperature}")
log.info(f"residual_current={residual_current}")
log.info(f"electric_param={electric_param}")
return {
"temperature": temperature,
"residual_current": residual_current,
"electric_param": electric_param,
"electric_param_detail": electric_param_detail,
}
async def alarm_summary(company_ids, start, end, date_type):
"""
电参数,温度,漏电流时间分布
......
......@@ -6,6 +6,8 @@ from pot_libs.logger import log
from pot_libs.sanic_api import summary
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import Product
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
alarm_content_time_distribution_pds
from unify_api.modules.common.procedures.cids import get_cids, get_proxy_cids
from unify_api.modules.home_page.components.security_info_cps import (
......@@ -19,7 +21,6 @@ from unify_api.modules.home_page.components.security_info_cps import (
AlarmSummaryResp,
)
from unify_api.modules.home_page.procedures.security_info_pds import (
alarm_content_time_distribution,
alarm_summary, alarm_count_info_new15,
)
......@@ -127,7 +128,7 @@ async def post_alarm_content_distribution(
else:
raise BusinessException(message=f"暂时不支持其他产品")
alarm_info_map = await alarm_content_time_distribution(req_cids, start, end, date_type)
alarm_info_map = await alarm_content_time_distribution_pds(req_cids, start, end, date_type)
temperature, residual_current, electric_param, electric_param_detail = (
alarm_info_map["temperature"],
alarm_info_map["residual_current"],
......
......@@ -1443,3 +1443,38 @@ def get_time_duration_by_str(duration_str):
if seconds > 0:
return_str += "%s秒" % str(seconds)
return return_str
def time_pick_transf_new(start, end):
"""获取intervel和slots, 详细显示时间轴信息,新接口都使用这个来获取时间"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
diff = end_f.int_timestamp - start_f.int_timestamp
# 1. 计算intervel
# 1.1 区间48小时之内, 返回15min
if diff <= 48 * 3600:
intervel = 15 * 60
# 1.2 区间在60天以内, 返回1day
elif 48 * 3600 < diff <= 60 * 86400:
intervel = 86400
# 1.3 选择年, 返回1个月
else:
intervel = 30 * 86400
# 2. 计算slots
# 2.1 取到点的个数, 比如15min的96个点
slots = []
slot_num = round((end_f.int_timestamp - start_f.int_timestamp) / intervel)
for i in range(slot_num):
# 区间48小时之内
if diff <= 48 * 3600:
dt = start_f.add(minutes=15 * i).format("YYYY-MM-DD HH:mm")
dt_str = str(dt)
# 区间在60天以内
elif 48 * 3600 < diff <= 60 * 86400:
dt = start_f.add(days=1 * i).format("YYYY-MM-DD")
dt_str = str(dt)
else:
dt = start_f.add(months=1 * i).format("YYYY-MM")
dt_str = str(dt)
slots.append(dt_str)
return intervel, slots
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment