Commit de51a7ff authored by ZZH's avatar ZZH

remove es 2023-5-29

parent 220a3fe5
......@@ -264,57 +264,6 @@ async def sdu_alarm_importance_dao_new15(start, end, points, is_sdu=None):
async def sdu_alarm_behavior_dao(start, end, points):
"""用电行为统计, 目前只有违规电器
如果还有其他统计, 则可先根据type分组, 再根据appliance分组
"""
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body = {
"size": 0,
"query": {
"bool": {
"filter": [
{
"terms": {
"point_id": points
}
},
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
},
{
"terms": {
"type.keyword": [
"illegal_ele_app"
]
}
}
]
}
},
"aggs": {
"appliance": {
"terms": {
"field": "appliance.keyword",
"size": 100
}
}
}
}
log.info("alarm_behavior query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["appliance"]["buckets"]
async def sdu_alarm_behavior_dao_new15(start, end, points):
sql = f"SELECT appliance, count(1) doc_count " \
f"FROM `point_1min_event` WHERE pid in %s " \
f"and event_type = 'illegal_ele_app' and " \
......@@ -324,44 +273,6 @@ async def sdu_alarm_behavior_dao_new15(start, end, points):
return datas
async def sdu_alarm_aggs_date(cid):
"""sdu求安全运行
根据每日聚合,再根据points聚合
"""
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{"terms": {
"type.keyword": SDU_ONE_TWO_GRADE_ALARM}}
]
}
},
"aggs": {
"date_day": {
"date_histogram": {
"field": "datetime",
"interval": "day",
"time_zone": "+08:00",
"format": "yyyy-MM-dd",
"min_doc_count": 0
}
}
}
}
log.info("query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["date_day"]["buckets"]
async def zdu_alarm_aggs_date_impotent(cid, start, end):
"""zdu求安全运行
根据每日聚合,再根据等级聚合
......@@ -383,48 +294,6 @@ async def zdu_alarm_aggs_date_impotent(cid, start, end):
return len(datas) if datas else 0
async def sdu_alarm_aggs_date_importance(cid): # todo: 扬尘es待改
"""按日期,再按等级聚合"""
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
}
]
}
},
"aggs": {
"date_day": {
"date_histogram": {
"field": "datetime",
"interval": "day",
"time_zone": "+08:00",
"format": "yyyy-MM-dd",
"min_doc_count": 0
},
"aggs": {
"importance": {
"terms": {
"field": "importance",
"size": 10
}
}
}
}
}
}
log.info("query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["date_day"]["buckets"]
async def sdu_alarm_aggs_type(cid, start, end):
sql = f"""
SELECT
......@@ -444,51 +313,6 @@ async def sdu_alarm_aggs_type(cid, start, end):
return datas if datas else []
async def sdu_alarm_aggs_type_old(cid, start, end):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
"""根据类型聚合"""
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"range": {
"datetime": {
"gte": es_start_str,
"lte": es_end_str
}
}
}
]
}
},
"aggs": {
"type": {
"terms": {
"field": "type.keyword",
"size": 40
}
}
}
}
log.info("query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["type"]["buckets"]
async def alarm_aggs_importance(cid, start, end):
"""按报警等级聚合"""
sql = f"""
......
......@@ -5,10 +5,10 @@ from unify_api.modules.alarm_manager.components.alarm_static_cps import \
ZsResp, TimeCount, ZasResp
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_statistics_dao, sdu_alarm_type_dao, sdu_alarm_importance_dao, \
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao, \
sdu_alarm_limit_type_dao, \
zdu_alarm_aggs_date_impotent, sdu_alarm_type_dao_new15, \
sdu_alarm_importance_dao_new15, sdu_alarm_statistics_dao_new15, \
sdu_alarm_behavior_dao_new15, sdu_alarm_limit_type_dao_new15
sdu_alarm_behavior_dao, sdu_alarm_limit_type_dao_new15
from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
new_alarm_content_info, risk_distribution, zdu_summary_info, \
new_alarm_content_info_new15, risk_distribution_new15, \
......@@ -294,79 +294,15 @@ async def sdu_electric_behave_service(cid, start, end, storeys, product):
return SebResp(return_data=return_list)
async def sdu_index_alarm_ranking_service(cid, start, end, product):
"""首页-报警违规排名-新版识电u, 近30天"""
async def sdu_index_alarm_rank(cid, start, end, product):
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
point_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
# 1. 违规电器排名
behavior_res = await sdu_alarm_behavior_dao(start, end, point_list)
behavior_illegal_app = []
if behavior_res:
for i in behavior_res:
tmp_dic = {"name": i["key"], "value": i["doc_count"]}
behavior_illegal_app.append(tmp_dic)
behavior_illegal_app = sorted(behavior_illegal_app,
key=lambda x: x["value"], reverse=True)
# 2. 报警排名, 违规行为
es_type_res = await sdu_alarm_limit_type_dao(start, end, point_list)
alarm_ranking = []
illegal_behavior = []
for buck in es_type_res:
point_name = points_map.get(buck["key"])
# 具体报警等级
# power_quality_low:能质量偏低--III级报警
# ele_overload: 线路过载--II级报警
# illegal_ele_app: 违规电器接入--I级报警
im1, im2, im3 = 0, 0, 0
if buck.get("type") and buck.get("type").get("buckets"):
for im in buck["type"]["buckets"]:
im3 = im["doc_count"] \
if im["key"] == "power_quality_low" else im3
im2 = im["doc_count"] \
if im["key"] == "ele_overload" else im2
im1 = im["doc_count"] \
if im["key"] == "illegal_ele_app" else im1
# 报警排名
alarm_dic = {
"name": point_name, "value": buck["doc_count"],
"im1": im1, "im2": im2, "im3": im3
}
alarm_ranking.append(alarm_dic)
# 违规行为
illegal_count = 0
type_buck = buck["type"]["buckets"]
for i in type_buck:
if i.get("key") == "illegal_ele_app":
illegal_count = i.get("doc_count")
illegal_dic = {"name": point_name, "value": illegal_count}
illegal_behavior.append(illegal_dic)
# 3. 排序
if len(alarm_ranking) > 1:
alarm_ranking = sorted(alarm_ranking, key=lambda x: x["value"],
reverse=True)
if len(illegal_behavior) > 1:
illegal_behavior = sorted(illegal_behavior, key=lambda x: x["value"],
reverse=True)
return SiarResp(
illegal_app=behavior_illegal_app[:5],
illegal_behavior=illegal_behavior[:5],
alarm_ranking=alarm_ranking[:5]
)
async def sdu_index_alarm_ranking_service_new15(cid, start, end, product):
points = await points_by_cid([cid])
if not points:
raise ParamException(message=f"{cid}没有points")
point_list = [i["pid"] for i in points]
points_map = {i["pid"]: i["name"] for i in points}
# 1. 违规电器排名
behavior_res = await sdu_alarm_behavior_dao_new15(start, end, point_list)
behavior_illegal_app = []
if behavior_res:
for i in behavior_res:
tmp_dic = {"name": i["appliance"], "value": i["doc_count"]}
......
......@@ -9,11 +9,11 @@ from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \
from unify_api.modules.alarm_manager.service.alarm_static_service import \
sdu_alarm_statistics_service, sdu_alarm_statistics_sort_service, \
sdu_app_statistics_sort_service, sdu_electric_behave_service, \
sdu_index_alarm_ranking_service, zdu_level_distribution_service, \
zdu_level_distribution_service, \
zdu_content_distribution_service, zdu_summary_service, \
zdu_alarm_sort_service_2, sdu_alarm_statistics_sort_service_new15, \
sdu_app_statistics_sort_service_new15, \
sdu_index_alarm_ranking_service_new15
sdu_index_alarm_rank
from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.count_info_pds import \
......@@ -177,7 +177,7 @@ async def post_sdu_index_alarm_ranking(req, body: SiasReq) -> SiarResp:
product = body.product
# 最近30天, 不包含今天
start, end = last30_day_range()
return await sdu_index_alarm_ranking_service_new15(cid, start, end, product)
return await sdu_index_alarm_rank(cid, start, end, product)
@summary("报警统计-报警等级-智电u")
......
......@@ -21,26 +21,26 @@ async def get_points(company_ids):
company_point_ids_map = defaultdict(list)
for point in points:
company_point_ids_map[point["cid"]].append(point["pid"])
point_map = {i["pid"]: i for i in points}
point_ids = list(point_map.keys())
pid_field, start_time_field = "pid", "start_time"
sql = f"SELECT pid, mid FROM change_meter_record WHERE pid in %s ORDER BY {pid_field}, {start_time_field}"
records = await conn.fetchall(sql, args=(point_ids,))
newest_point_meter_relation = {i["pid"]: i["mid"] for i in records if
i["mid"]}
valid_mids = list(newest_point_meter_relation.values())
newest_record_map = {i["pid"]: point_map.get(i["pid"]) for i in records
if i["mid"]}
# 根据有效的meter id查询meter参数
async with MysqlUtil() as conn:
mid_field, start_time_field = "mid", "start_time"
mp_sql = f"SELECT vc, mid, ctnum FROM meter_param_record WHERE mid in %s ORDER BY {mid_field}, {start_time_field}"
mps = await conn.fetchall(mp_sql, args=(valid_mids,))
meter_param_map = {i["mid"]: i for i in mps}
for cid, point_ids in company_point_ids_map.items():
for point_id in point_ids:
if point_id in newest_record_map:
......@@ -85,23 +85,6 @@ async def proxy_points(cid_list):
async def get_meter_by_point(point_id):
"""
通过point_id获取sid
:param point_id:
:return: sid
"""
async with MysqlUtil() as conn:
sql = "SELECT mid from change_meter_record where pid = %s order by start_time desc limit 1"
point_meter_info = await conn.fetchone(sql, args=(point_id,))
if not point_meter_info:
return None
newest_mid = point_meter_info["mid"]
meter_sql = "SELECT sid, meter_no from meter where mid = %s"
meter_info = await conn.fetchone(meter_sql, args=(newest_mid,))
return meter_info
async def get_meter_by_point_new15(point_id):
"""
根据point获取设备数据
"""
......@@ -122,7 +105,7 @@ async def list_point(cid):
for res in result:
pid = res.get("pid")
points[pid] = res
sql = "SELECT id, `group`, item FROM location WHERE cid=%s and `type` in %s"
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(
......@@ -132,7 +115,7 @@ async def list_point(cid):
group = res.get("group")
item = res.get("item")
groups.setdefault(group, []).append((id, item))
for pid, point_info in points.items():
name = point_info.get("name")
add_to_company = point_info["add_to_company"]
......@@ -146,7 +129,7 @@ async def list_point(cid):
comm_point = {"name": name, "point_id": pid, "locations": locations,
"add_to_company": add_to_company}
list_point.append(comm_point)
async with MysqlUtil() as conn:
sql = "SELECT inlid, `name` FROM inline WHERE cid=%s"
inlines = await conn.fetchall(sql, args=(cid,))
......
......@@ -5,7 +5,7 @@ from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, \
LF_THRESHOLD, THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD
from unify_api.modules.common.procedures.points import points_by_storeys, \
get_meter_by_point_new15
get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.modules.electric.dao.electric_dao import \
......@@ -899,7 +899,7 @@ async def elec_index_service_new15(cid, point_id, start, end):
async def elec_current_service_new15(point_id):
# 获取mtid
meter_info = await get_meter_by_point_new15(point_id)
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(
message="没有该监测点的monitor信息,请联系运维人员!")
......
......@@ -9,7 +9,7 @@ import re
from pot_libs.settings import SETTING
from datetime import datetime
from unify_api.modules.common.procedures.points import get_meter_by_point_new15
from unify_api.modules.common.procedures.points import get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
......@@ -873,7 +873,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
raise ParamException(
message="param exception, equals is NULL, no point_id")
# 获取mtid
meter_info = await get_meter_by_point_new15(point_id)
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(
message="没有该监测点的monitor信息,请联系运维人员!")
......
......@@ -2,8 +2,6 @@ import ast
from unify_api.constants import SDU_ONE_TWO_GRADE_ALARM
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import CO2_N
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_aggs_date, sdu_alarm_aggs_date_importance
from unify_api.modules.carbon_neutral.service.carbon_reduce_service import \
carbon_emission_index_service
from unify_api.modules.common.dao.common_dao import monitor_by_cid, tsp_by_cid, \
......@@ -21,10 +19,10 @@ from unify_api.modules.home_page.dao.count_info_dao import \
alarm_aggs_point_location
from unify_api.modules.home_page.procedures.count_info_pds import other_info, \
electric_use_info, cid_alarm_importance_count, \
alarm_importance_count_total, power_factor, current_load, \
alarm_importance_count_total, current_load, \
get_company_charge_price, health_status_res, carbon_status_res_web, \
optimization_count_info, economic_index_desc, electric_use_info_new15, \
power_factor_new15, current_load_new15
optimization_count_info, economic_index_desc, \
cal_power_factor, current_load_new15
from unify_api.modules.home_page.procedures.count_info_proxy_pds import \
alarm_percentage_count, alarm_safe_power
from unify_api.modules.tsp_water.dao.drop_dust_dao import \
......@@ -170,7 +168,8 @@ async def info_yang_chen_service(cid):
if pm25_max_list and max(pm25_max_list) < 35:
air_quality += 1
# 3. 安全运行天数, 从接入平台算起,未出现一级报警则加一天
alarm_es = await sdu_alarm_aggs_date_importance(cid)
# alarm_es = await sdu_alarm_aggs_date_importance(cid)
alarm_es = []
safe_operation_days = 0
for alarm in alarm_es:
in_bucket = alarm["importance"]["buckets"]
......@@ -269,7 +268,7 @@ async def alarm_price_costtl_service(cid):
# 1. 今日报警
imp_dic = await alarm_importance_count_total(cid, today_start, today_end)
# 2. 实时功率因数, 上月功率因数
cos_ttl, last_month_cos = await power_factor_new15(cid)
cos_ttl, last_month_cos = await cal_power_factor(cid)
# 3. 实时负荷
cur_load = await current_load_new15(cid)
# 4. 平均电价
......@@ -400,7 +399,7 @@ async def all_index_info_service(cid):
health_index = round(health_index)
health_status = health_status_res(health_index, "web")
# 2. 安全指数
elec_info = await electric_use_info_new15(cid)
elec_info = await electric_use_info(cid)
safety_index = elec_info.electric_use_score
safety_status = safety_ratio_res(safety_index, "web")
# 3. 碳排指数
......
......@@ -24,10 +24,10 @@ from unify_api.modules.home_page.procedures.count_info_pds import (
normal_rate_of_location, normal_rate_of_location_new15,
other_info, other_info_new15,
power_count_info, power_count_info_new15,
electric_use_info, electric_use_info_new15,
electric_use_info,
datetime_to_timestamp,
power_charge_price, power_charge_price_new15,
power_factor, power_factor_new15,
cal_power_factor,
optimization_count_info, optimization_count_info_new
)
from unify_api.modules.home_page.service.count_info_service import \
......@@ -79,7 +79,7 @@ async def post_count_info(request, body: CountInfoReq) -> CountInfoResp:
# 用电安全指数, 报警分, 近30天报警1,2,3级数目
# electric_info = await electric_use_info(company_id)
electric_info = await electric_use_info_new15(company_id)
electric_info = await electric_use_info(company_id)
# 昨日平均电价, 上月平均电价
# yestoday_price, last_month_price = await power_charge_price(
......@@ -89,7 +89,7 @@ async def post_count_info(request, body: CountInfoReq) -> CountInfoResp:
# 实时功率因数, 上月功率因数
# cos_ttl, last_month_cos = await power_factor(company_id)
cos_ttl, last_month_cos = await power_factor_new15(company_id)
cos_ttl, last_month_cos = await cal_power_factor(company_id)
# 其实异常捕获这个东西最好是在框架内部做一次就够了
except (ElasticsearchException, MySQLError, RedisError) as e:
......
......@@ -33,8 +33,7 @@ from unify_api.modules.home_page.components.count_info_proxy_cps import (
AlarmRankingReq,
AipResp, CisResp, CisReq,
)
from unify_api.modules.home_page.procedures.count_info_pds import other_info, \
electric_use_info
from unify_api.modules.home_page.procedures.count_info_pds import other_info
from unify_api.modules.home_page.procedures.count_info_proxy_pds import (
security_level_count,
alarm_percentage_count,
......
import json
from datetime import datetime, timedelta
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.exc_util import BusinessException
from unify_api import constants
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_behavior_dao, sdu_alarm_behavior_dao_new15
from unify_api.modules.common.procedures.points import get_meter_by_point, \
get_meter_by_point_new15
sdu_alarm_behavior_dao
from unify_api.modules.common.procedures.points import \
get_meter_by_point
from unify_api.modules.home_page.procedures.count_info_pds import \
electric_use_info_points_sdu, electric_use_info_points_sdu_new15
electric_use_info_points_sdu
from unify_api.modules.shidianu.components.algorithm_cps import WcResp, AbcResp
from unify_api.modules.shidianu.dao.analysis_result_dao import \
query_sdu_power_wave, query_sdu_recog_record
......@@ -19,67 +18,11 @@ from unify_api.utils.time_format import last30_day_range, \
get_start_end_by_tz_time, day_slots, get_start_end_by_tz_time_new
async def wave_curve_service(point_id, req_date, product):
# 1. 获取曲线数据和slots
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!")
sid, meter_no = meter_info["sid"], meter_info["meter_no"]
dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S")
time_slot = [
datetime.strftime(dt + timedelta(minutes=i),
"%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
for i in range(1440)
]
p_list = await get_p_list(sid, meter_no, req_date, time_slot)
# 2. 获取用电设备识别结果
start, end = get_start_end_by_tz_time(req_date)
device_data = await query_sdu_recog_record(point_id, start, end)
electric_actions = {}
if device_data:
for i in device_data:
recog_dt = str(i["recog_dt"])
recog_dt = recog_dt.split(" ")[1][:5]
act_info = json.loads(i["act_info"])
# 拼凑返回格式 04:02 违规电器,疑似电动车电池
tmp_dic = {}
(key, value), = act_info.items()
type_str = constants.SDU_EVENT_TYPE_MAP.get(key)
# 违规,大功率,正常电器, 只保留1个级别最高的展示
if recog_dt not in electric_actions:
electric_actions[recog_dt] = [
{"type": type_str, "value": value, "type_str": key}
]
else:
# 如果是电动车电池, 优先级最高, 同一分钟第一次已经是电动车电池continue
if electric_actions[recog_dt][0]["value"] == "电动车电池":
continue
# 如果本次正常电器, 不需要处理了
# 如果本次违规电器, 替换原来的
elif key == "illegal_ele_app":
electric_actions[recog_dt] = [
{"type": type_str, "value": value, "type_str": key}
]
# 如果本次是大功率电器, 且原本是正常电器, 则替换
elif key == "high_power_app" and electric_actions[recog_dt][0][
"type_str"] == "normal_app":
electric_actions[recog_dt] = [
{"type": type_str, "value": value, "type_str": key}
]
# electric_actions[recog_dt].append(
# {"type": type_str, "value": value})
return WcResp(
time_slot=time_slot,
p_slot=p_list,
electric_actions=electric_actions
)
async def wave_curve_service_new15(point_id, req_date, product):
# 1,获取slots
time_slot = day_slots()
# 2. 获取sid
meter_info = await get_meter_by_point_new15(point_id)
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(message="没有该监测点的monitor信息,请联系运维人员!")
mtid, meter_no = meter_info["mtid"], meter_info["meter_no"]
......@@ -127,6 +70,7 @@ async def wave_curve_service_new15(point_id, req_date, product):
electric_actions=electric_actions
)
async def alarm_behavior_curve_service(point_id, req_date, product):
# 1. 获取功率波动, 如果没有查询到功率波动,返回None
wave_data = await query_sdu_power_wave(point_id, req_date + " 00:00:00")
......@@ -136,9 +80,7 @@ async def alarm_behavior_curve_service(point_id, req_date, product):
power_swing = None
# 2. 安全评价
start, end = last30_day_range()
# alarm_res = await electric_use_info_points_sdu(start, end, [point_id])
alarm_res = await electric_use_info_points_sdu_new15(start, end,
[point_id])
alarm_res = await electric_use_info_points_sdu(start, end, [point_id])
safety_eval = {"first_alarm_cnt": alarm_res.first_alarm_cnt,
"second_alarm_cnt": alarm_res.second_alarm_cnt,
"third_alarm_cnt": alarm_res.third_alarm_cnt,
......@@ -147,7 +89,7 @@ async def alarm_behavior_curve_service(point_id, req_date, product):
}
# 3. 行为统计
# behavior_res = await sdu_alarm_behavior_dao(start, end, [point_id])
behavior_res = await sdu_alarm_behavior_dao_new15(start, end, [point_id])
behavior_res = await sdu_alarm_behavior_dao(start, end, [point_id])
behavior_illegal_app = []
if behavior_res:
for i in behavior_res:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment