Commit fd3450a0 authored by lcn's avatar lcn

修改安电管理版

parent 91a67783
......@@ -473,4 +473,7 @@ ELECTRIC_PARAM_MAP = {
"unbalanceI", # 三相电流不平衡度
"unbalanceU", # 三相电压不平衡度
"overPR"
}
\ No newline at end of file
}
CST = "Asia/Shanghai"
\ No newline at end of file
......@@ -67,7 +67,7 @@ async def new_list_alarm_service(cid, storeys, offset, page_size, start, end,
url = None
redirect_type = ""
cid = int(res.get("cid")) if res.get("cid") else res.get("cid")
storey_name = point_storey_map[point_id]["storey_name"]
room_name = point_storey_map[point_id]["room_name"]
alarm = Alarm(
......@@ -138,14 +138,14 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
"""报警信息分页列表"""
if not point_list or not event_type or not importance:
return ListAlarmResponse(total=0, rows=[])
# # 1. es查询结果
# es_res = await list_alarm_zdu_dao(cid, point_list, page_num, page_size,
# start, end, importance, event_type)
results = await list_alarm_zdu_dao_new15(cid, point_list, start, end,
importance, event_type)
real_total = len(results)
results = results[(page_num-1)*page_size, page_num*page_size]
results = results[(page_num - 1) * page_size, page_num * page_size]
# 2. 获取工厂, 报警type对应的描述信息
event_dic = await company_extend_dao(cid)
event_dic_map = {event["key"]: event for event in event_dic}
......@@ -171,7 +171,7 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
else:
url = None
redirect_type = ""
alarm = Alarm(
name=res.get("name"),
importance=res.get("importance"),
......@@ -187,7 +187,7 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
content=res.get("message"),
)
rows.append(alarm)
total = real_total if real_total < constants.ES_TOTAL_LIMIT \
else constants.ES_TOTAL_LIMIT
return ListAlarmResponse(total=total, rows=rows)
......@@ -225,7 +225,7 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end):
else:
url = None
redirect_type = ""
alarm = Alarm(
name=source.get("name"),
importance=source.get("importance"),
......@@ -241,7 +241,7 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end):
content=source.get("message"),
)
rows.append(alarm)
# total小程序不分页, 返回了但是不用
total = es_res["hits"]["total"]
return ListAlarmResponse(total=total, rows=rows)
......@@ -253,7 +253,10 @@ async def list_alarm_service_new15(cid, point_id, start, end, importance,
if point_id:
li.append(f"pid={point_id}")
else:
li.append(f"cid={cid}")
if not isinstance(cid, list):
cid = [cid]
cid_where = str(tuple(cid)).replace(",)", ")")
li.append(f"cid in {cid_where}")
if start and end:
li.append(f"event_datetime BETWEEN '{start}' and '{end}'")
if importance:
......@@ -270,7 +273,7 @@ async def list_alarm_service_new15(cid, point_id, start, end, importance,
li.append(f"event_type in {str(tuple(alarm_type)).strip(',')}")
mid_sql = " and ".join(li)
total = await get_total_list_alarm_dao(mid_sql)
mid_sql2 = " and ".join(["point_1min_event."+i for i in li])
mid_sql2 = " and ".join(["point_1min_event." + i for i in li])
datas = await get_list_alarm_dao(mid_sql2, page_size, page_num)
rows = []
for data in datas:
......@@ -279,7 +282,10 @@ async def list_alarm_service_new15(cid, point_id, start, end, importance,
type_str = constants.EVENT_TYPE_MAP.get(event_type)
location_id = data.get("lid")
es_id = data.get("id")
if location_id and (event_type in constants.TEMP_SCOPE_URL_TYPE):
if point_id and data.get("event_mode") == "scope":
url = "/scope_details?doc_id=%s" % es_id
redirect_type = "scope"
elif location_id and type in constants.TEMP_SCOPE_URL_TYPE:
url = "/temp_trend?doc_id=%s" % es_id
redirect_type = "temp_trend"
else:
......@@ -300,4 +306,4 @@ async def list_alarm_service_new15(cid, point_id, start, end, importance,
company_name=data.get("fullname") or '',
)
rows.append(alarm)
return ListAlarmResponse(total=total, rows=rows)
\ No newline at end of file
return ListAlarmResponse(total=total, rows=rows)
......@@ -18,6 +18,7 @@ from unify_api.modules.alarm_manager.service.list_alarm_service import \
wx_list_alarm_zdu_service, list_alarm_service_new15
from unify_api.modules.common.procedures.cids import get_cid_info, get_cids, \
get_proxy_cids
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils import time_format
from unify_api import constants
from pot_libs.common.components.query import PageRequest, Equal, Range, Filter, \
......@@ -55,7 +56,31 @@ async def post_list_alarm(req, body: PageRequest) -> ListAlarmResponse:
alarm_type = in_group.group
elif in_group.field == 'importance':
importance = in_group.group
return await list_alarm_service_new15(cid, point_id, start, end,
cids = []
if req.json.get("product") == Product.AndianUManage.value:
proxy_id = req.json.get("proxy_id")
product = req.json.get("product")
user_id = jwt_user(req)
req_cids = req.json.get("cids")
# cids = await get_cids(user_id, product)
proxy_cids = await get_proxy_cids(user_id, product, proxy_id)
cids = list(set(req_cids) & set(proxy_cids))
if req.json.get("product") in [Product.RecognitionElectric.value,
Product.IntelligentU.value]:
if not cid:
product = req.json.get("product")
user_id = jwt_user(req)
cids = await get_cids(user_id, product)
else:
cids = [cid]
if not cids and cid:
cids = [cid]
if not cids:
raise BusinessException(message=f"你没有工厂权限")
return await list_alarm_service_new15(cids, point_id, start, end,
importance, page_size, page_num,
alarm_type)
......@@ -77,19 +102,19 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
ranges = [_range]
else:
ranges = []
# TODO:当没有选择监测点的时候,需要从req取cid,作为筛选条件
if req.json.get("cid"):
cid = req.json["cid"]
equal = Equal(field="cid", value=cid)
body.filter.equals.append(equal)
if req.json.get("product") == Product.AndianUManage.value:
proxy_id = req.json.get("proxy_id")
product = req.json.get("product")
user_id = req.ctx.user_id
req_cids = req.json.get("cids")
# cids = await get_cids(user_id, product)
cids = await get_proxy_cids(user_id, product, proxy_id)
if req_cids:
......@@ -100,7 +125,7 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
else:
in_group = InGroup(field="cid", group=cids)
body.filter.in_groups.append(in_group)
if req.json.get("product") in [Product.RecognitionElectric.value,
Product.IntelligentU.value]:
req_cid = req.json.get("cid")
......@@ -112,14 +137,14 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
cids = [req_cid]
in_group = InGroup(field="cid", group=cids)
body.filter.in_groups.append(in_group)
filter = Filter(
equals=body.filter.equals,
ranges=ranges,
in_groups=body.filter.in_groups,
keywords=body.filter.keywords,
)
# 重新封装PageRequest
page_request = PageRequest(
page_size=body.page_size, page_num=body.page_num, sort=body.sort,
......@@ -127,7 +152,8 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
)
query_body = EsQuery().query(page_request)
if not query_body.get("query"):
query = {"bool": {"must_not": [{"terms": {"mode.keyword": ["scope"]}}]}}
query = {
"bool": {"must_not": [{"terms": {"mode.keyword": ["scope"]}}]}}
query_body["query"] = query
else:
must_not = [{"terms": {"mode.keyword": ["scope"]}}]
......@@ -140,7 +166,7 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
log.warning(
"Can not find data on es(index: %s): %s" % (index, query_body))
raise DBException
cid_info_map = await get_cid_info(all=True)
rows = []
for info in es_results["hits"]["hits"]:
......@@ -181,7 +207,7 @@ async def post_list_alarm_bak(req, body: PageRequest) -> ListAlarmResponse:
company_name=cid_info_map.get(cid, {}).get("fullname", ""),
)
rows.append(alarm)
real_total = es_results["hits"]["total"]
total = real_total if real_total < constants.ES_TOTAL_LIMIT else constants.ES_TOTAL_LIMIT
return ListAlarmResponse(total=total, rows=rows)
......@@ -206,7 +232,6 @@ async def post_new_list_alarm(req, body: NlaReq) -> ListAlarmResponse:
product)
@summary("小程序消息列表")
async def post_wx_list_alarm(req, body: WlaReq) -> ListAlarmResponse:
# 1. 获取参数
......
......@@ -7,72 +7,53 @@ from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import CST
def point_day2month(dt):
if isinstance(dt, int) or isinstance(dt, float):
dt = pendulum.from_timestamp(dt, tz="Asia/Shanghai")
es_index = f"{constants.POINT_1MIN_INDEX}_{dt.year}_{dt.month}"
elif isinstance(dt, datetime):
es_index = f"{constants.POINT_1MIN_INDEX}_{dt.year}_{dt.month}"
else:
es_index = constants.POINT_1MIN_INDEX
return es_index
async def today_alarm_cnt(cids):
filters = [
{"terms": {"cid": cids}},
{"term": {"mode": "alarm"}},
]
start_time = pendulum.today(tz="Asia/Shanghai")
es_end_time = start_time.subtract(days=-1).format("YYYY-MM-DDTHH:mm:ss+08:00")
es_start_time = start_time.format("YYYY-MM-DDTHH:mm:ss+08:00")
filters.append({"range": {"datetime": {"gte": es_start_time, "lt": es_end_time,}}},)
query_body = {
"query": {"bool": {"filter": filters}},
"size": 0,
"aggs": {
"cid_aggs": {
"terms": {"field": "cid", "size": 10000},
"aggs": {
"date_alarms": {
"date_histogram": {
"field": "datetime",
"order": {"_key": "desc"},
"min_doc_count": 0,
"interval": "day",
"format": "yyyy-MM-dd",
"time_zone": "+08:00",
}
}
},
}
},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body, index=constants.POINT_1MIN_EVENT)
cid_buckets = es_result.get("aggregations", {}).get("cid_aggs", {}).get("buckets", [])
cid_bucket_map = {bucket["key"]: bucket for bucket in cid_buckets}
now_time = datetime.now()
es_end_time = start_time.subtract(days=-1).format("YYYY-MM-DD HH:mm:ss")
es_start_time = start_time.format("YYYY-MM-DD HH:mm:ss")
sql = f"""
select cid,count(*) count
from point_1min_event pe
left join event_type et on pe.event_type = et.e_type
where cid in %s and et.mode = 'alarm' and event_datetime >= %s
and event_datetime < %s
group by cid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql,
args=(cids, es_start_time, es_end_time))
cid_bucket_map = {i["cid"]: i["count"] for i in datas}
cid_alarm_map = {cid: {"today_alarm_count": 0} for cid in cids}
for cid in cids:
bucket = {}
if cid in cid_bucket_map:
bucket = cid_bucket_map[cid]
date_alarm_bucket = bucket.get("date_alarms", {}).get("buckets", [])
for i in date_alarm_bucket:
if i["key_as_string"] == str(now_time)[:10]:
cid_alarm_map[cid]["today_alarm_count"] += i["doc_count"]
alarm_count = cid_bucket_map.get("cid") or 0
cid_alarm_map[cid]["today_alarm_count"] += alarm_count
return cid_alarm_map
async def proxy_safe_run_info(cids, start_time_str=None, end_time_str=None):
async def proxy_safe_run_info(cids, start_time_str=None,
end_time_str=None):
"""
批量获取 各个工厂的安全运行天数以及今日报警数, 如果是获取月份的,那么计算这个月的安全运行天数
:param cids:
......@@ -83,83 +64,67 @@ async def proxy_safe_run_info(cids, start_time_str=None, end_time_str=None):
# {"term": {"mode": "alarm"}},
{"term": {"importance": 1}},
]
where = ""
start_dt, end_dt, start_ts, end_ts = None, None, 0, 0
now_dt = pendulum.now(tz=CST)
if start_time_str and end_time_str:
start_dt = datetime.strptime(start_time_str, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S")
now = datetime.now()
if end_dt > now:
end_dt = now
es_start_str = datetime(
year=start_dt.year, month=start_dt.month, day=start_dt.day
).strftime("%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filters.append({"range": {"datetime": {"gte": es_start_str, "lt": es_end_str,}}},)
query_body = {
"query": {"bool": {"filter": filters}},
"size": 0,
"aggs": {
"cid_aggs": {
"terms": {"field": "cid", "size": 10000},
"aggs": {
"date_alarms": {
"date_histogram": {
"field": "datetime",
"order": {"_key": "desc"},
"min_doc_count": 0,
"interval": "day",
"format": "yyyy-MM-dd",
"time_zone": "+08:00",
}
}
},
}
},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body, index=constants.POINT_1MIN_EVENT)
now_time = datetime.now()
start_dt = pendulum.parse(start_time_str)
end_dt = pendulum.parse(end_time_str)
start_ts = start_dt.int_timestamp
end_ts = end_dt.int_timestamp
now_ts = now_dt.int_timestamp
if end_ts > now_ts:
end_time_str = now_dt.format("YYYY-MM-DD HH:mm:ss")
where += f" and event_datetime>= '{start_time_str}' and " \
f"event_datetime < '{end_time_str}' "
sql = f"""
select cid,date_format(event_datetime,"%%Y-%%m-%%d") fmt_day,
count(*) count
from point_1min_event
where cid in %s {where}
group by cid,date_format(event_datetime,"%%Y-%%m-%%d")
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(cids,))
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select cid, create_time from company where cid in %s"
companys = await conn.fetchall(company_sql, (cids,))
create_time_timestamp_map = {
company["cid"]: datetime.fromtimestamp(company["create_time"]) for company in companys
company["cid"]: pendulum.from_timestamp(
company["create_time"], tz=CST) for company in companys
}
cid_alarm_map = {cid: {"today_alarm_count": 0, "safe_run_days": 0} for cid in cids}
cid_buckets = es_result.get("aggregations", {}).get("cid_aggs", {}).get("buckets", [])
cid_bucket_map = {bucket["key"]: bucket for bucket in cid_buckets}
cid_alarm_map = {cid: {"today_alarm_count": 0, "safe_run_days": 0} for cid
in cids}
cid_alarm_count_dict = dict()
for data in datas:
cid = data.get("cid")
if cid not in cid_alarm_count_dict:
cid_alarm_count_dict[cid] = 0
elif data.get("count") > 0:
cid_alarm_count_dict[cid] += 1
for cid in cids:
create_time = create_time_timestamp_map[cid]
total_days = (now_time - create_time).days + 1
create_dt = create_time_timestamp_map[cid]
total_days = (now_dt - create_dt).days + 1
if start_time_str and end_time_str:
# 计算一段时间内安全运行天数,总天数的逻辑稍微不一样
total_days = (end_dt.date() - start_dt.date()).days + 1
if create_time > start_dt and create_time < end_dt:
total_days = (end_dt.date() - create_time.date()).days + 1
elif create_time > end_dt:
total_days = (end_dt - start_dt).days + 1
create_ts = create_dt.int_timestamp
if start_ts < create_ts < end_ts:
total_days = (end_dt - create_dt).days + 1
elif create_ts > end_ts:
total_days = 0
has_alarm_days = 0
bucket = {}
if cid in cid_bucket_map:
bucket = cid_bucket_map[cid]
date_alarm_bucket = bucket.get("date_alarms", {}).get("buckets", [])
for i in date_alarm_bucket:
if i["doc_count"] != 0:
# 没有报警,看做是安全运行了,统计累计安全运行的天数
has_alarm_days += 1
has_alarm_days = cid_alarm_count_dict.get("cid") or 0
safe_run_days = total_days - has_alarm_days
cid_alarm_map[cid]["safe_run_days"] = safe_run_days
cid_alarm_map[cid]["total_days"] = total_days
today_alarm_map = await today_alarm_cnt(cids)
for cid in cid_alarm_map:
cid_alarm_map[cid]["today_alarm_count"] = today_alarm_map[cid]["today_alarm_count"]
cid_alarm_map[cid]["today_alarm_count"] = today_alarm_map[cid][
"today_alarm_count"]
return cid_alarm_map
......@@ -178,9 +143,10 @@ async def alarm_time_distribution(company_ids, start, end):
HOUR (pevent.event_datetime)
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(company_ids, ))
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0, "morning_alarm_cnt": 0}
datas = await conn.fetchall(sql, args=(company_ids,))
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0,
"morning_alarm_cnt": 0}
for data in datas:
hour = int(data["event_hour"])
if hour >= 6 and hour < 18:
......@@ -195,14 +161,15 @@ async def alarm_time_distribution(company_ids, start, end):
async def alarm_time_distribution_old(company_ids, start, end):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month, day=start_dt.day).strftime(
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00"
)
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str, }}},
{"terms": {"cid": company_ids}}]
query_body = {
"query": {"bool": {"filter": filter_list}},
"size": 0,
......@@ -224,13 +191,15 @@ async def alarm_time_distribution_old(company_ids, start, end):
}
},
}
log.info("alarm time distribute query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body, index=constants.POINT_1MIN_EVENT)
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
print(f"alarm time distribute es_result = {es_result}")
buckets = es_result["aggregations"]["cid_aggs"]["buckets"] or []
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0, "morning_alarm_cnt": 0}
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0,
"morning_alarm_cnt": 0}
for i in buckets:
cid_buckets = i.get("time_alarms", {}).get("buckets", [])
for item in cid_buckets:
......
......@@ -9,7 +9,8 @@ from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.common.dao.health_score_dao import \
health_score_points_aggs, get_point_dats_dao, get_mean_datas_dao
from unify_api.modules.common.procedures.points import get_points
from unify_api.modules.common.procedures.points import get_points, \
get_points_new15
from unify_api.modules.electric.procedures.electric_util import \
batch_get_wiring_type
from unify_api.modules.home_page.procedures import point_inlines
......@@ -30,12 +31,12 @@ async def load_health_radar(cid, param_point_id=None):
# if json_score:
# score_info = json.loads(json_score)
# return score_info
# 计算最近7天时间起始
today = pendulum.today()
start_time = str(today.subtract(days=7))
end_time = str(today.subtract(seconds=1))
inline_point_ids = []
point_ids = []
# 1. 获取该工厂所有进线数据
......@@ -48,7 +49,7 @@ async def load_health_radar(cid, param_point_id=None):
inline_point_ids.append(pid)
else:
point_ids.append(pid)
# 对如下性能差代码做修改
stats = {point_id: {} for point_id in inline_point_ids + point_ids}
point_info_map = await batch_get_wiring_type(inline_point_ids + point_ids)
......@@ -65,7 +66,7 @@ async def load_health_radar(cid, param_point_id=None):
else:
stats_items = ["uab_mean", "freq_mean", "ubl_mean", "costtl_mean",
"lf_mean"]
for item in stats_items:
point_v = es_dic.get(point_id)
if not point_v:
......@@ -117,20 +118,23 @@ async def load_health_radar(cid, param_point_id=None):
"SELECT pid, mtid FROM point WHERE pid IN %s order by pid, create_time asc"
)
async with MysqlUtil() as conn:
change_meter_records = await conn.fetchall(sql, args=(tuple(all_point_ids),))
change_meter_records = await conn.fetchall(sql, args=(
tuple(all_point_ids),))
point_mid_map = {
i["pid"]: i["mtid"] for i in change_meter_records if i["mtid"] is not None
i["pid"]: i["mtid"] for i in change_meter_records if
i["mtid"] is not None
}
# 获取meter_param_record中的标准电压
all_mids = list(point_mid_map.values())
meter_param_map = {}
if all_mids:
async with MysqlUtil() as conn:
sql = "SELECT mtid, vc, voltage_side, ctnum FROM point WHERE mtid IN %s order by mtid, create_time asc"
meter_param_records = await conn.fetchall(sql, args=(tuple(all_mids),))
meter_param_records = await conn.fetchall(sql,
args=(tuple(all_mids),))
meter_param_map = {i["mtid"]: i for i in meter_param_records}
log.info(f"all_mids={all_mids}")
# 电压偏差评分
total, total_score = 0, 0
......@@ -138,7 +142,7 @@ async def load_health_radar(cid, param_point_id=None):
ua_mean = stats.get(point_id, {}).get("ua_mean")
if ua_mean is None:
continue
mtid = point_mid_map.get(point_id)
if not mtid:
# pid没有mid,拆了
......@@ -153,47 +157,47 @@ async def load_health_radar(cid, param_point_id=None):
stand_voltage = meter_vc / sqrt(3) if ctnum == 3 else meter_vc
else:
stand_voltage = 400 if ctnum == 3 else 10000
v_dev = (ua_mean - stand_voltage) / stand_voltage
score = get_dev_score(dev_type="v", cur=v_dev)
if score is None:
continue
total_score += score
total += 1
v_score = total_score / total if total else 100
# 频率偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
freq_mean = stats.get(point_id, {}).get("freq_mean")
if freq_mean is None:
continue
freq_dev = freq_mean - FREQ_STANDARD
score = get_dev_score(dev_type="freq", cur=freq_dev)
if score is None:
continue
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 三相[电压]不平衡评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ubl_avg = stats.get(point_id, {}).get("ubl_mean")
if ubl_avg is None:
continue
score = get_dev_score(dev_type="ubl", cur=ubl_avg)
if score is None:
continue
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
if inline_point_ids:
......@@ -204,15 +208,15 @@ async def load_health_radar(cid, param_point_id=None):
costtl_mean = stats.get(point_id, {}).get("costtl_mean")
if costtl_mean is None:
continue
score = get_dev_score(dev_type="costtl", cur=costtl_mean)
if score is None:
continue
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# (电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
......@@ -220,15 +224,15 @@ async def load_health_radar(cid, param_point_id=None):
thdua_mean = stats.get(point_id, {}).get("thdua_mean")
if thdua_mean is None:
continue
score = get_dev_score(dev_type="thdu", cur=thdua_mean)
if score is None:
continue
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 负载率
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -239,7 +243,7 @@ async def load_health_radar(cid, param_point_id=None):
score = get_dev_score(dev_type="lf", cur=lf_mean)
if score is None:
continue
total_score += score
total += 1
lf_score = total_score / total if total else 100
......@@ -253,8 +257,9 @@ async def load_health_radar(cid, param_point_id=None):
lf_score,
)
if not thdu_score:
thdu_score = (v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
thdu_score = (
v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
# 存入redis
score_info = {
"v_score": v_score,
......@@ -264,13 +269,13 @@ async def load_health_radar(cid, param_point_id=None):
"thdu_score": thdu_score,
"lf_score": lf_score,
}
# now_ts = pendulum.now().int_timestamp
# tomorrow_ts = pendulum.tomorrow().int_timestamp
# exp_ts = tomorrow_ts - now_ts
#
# await RedisClient().setex(redis_key, exp_ts, json.dumps(score_info))
return score_info
......@@ -332,7 +337,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 三相[电压]不平衡评分
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -345,7 +350,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -358,7 +363,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# (电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
......@@ -372,7 +377,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 负载率
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -386,7 +391,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
lf_score = total_score / total if total else 100
if not thdu_score:
thdu_score = (v_score + freq_score + ubl_score + costtl_score +
lf_score) / 5.0
......@@ -405,18 +410,18 @@ async def load_health_index(cid, point_id=None):
"""用电健康指数"""
# score_info = await load_health_radar(cid, point_id)
score_info = await load_health_radar_new15(cid, point_id)
if score_info is None:
log.error("load_health_index fail")
return 0
v_score = score_info["v_score"]
freq_score = score_info["freq_score"]
if v_score <= 60 or freq_score <= 60:
# 电压偏差/频率偏差评分,有一个低于60分,则整体健康指数为0
log.info("v_score or freq_score less 60")
return 0
sub_dev = (1 - (v_score + freq_score) / 200.0) * 20
sub_lf = (1 - score_info["lf_score"] / 100.0) * 20
sub_costtl = (1 - score_info["costtl_score"] / 100.0) * 20
......@@ -438,58 +443,30 @@ async def load_manage_health_radar(cids, recent_days=30):
# if json_score:
# score_info = json.loads(json_score)
# return score_info
# 计算最近30天时间起始
today = pendulum.today()
start_time = str(today.subtract(days=recent_days))
end_time = str(today.subtract(seconds=1))
company_point_map = await get_points(cids)
all_point_map = {
point_id: point_info
for i in company_point_map.values()
for point_id, point_info in i.items()
}
start_time = today.subtract(days=recent_days).format("YYYY-MM-DD HH:mm:ss")
end_time = str(today.subtract(seconds=1)).format("YYYY-MM-DD HH:mm:ss")
company_point_map = await get_points_new15(cids)
all_point_map = dict()
for cid, points in company_point_map.items():
for pid, point_info in points.items():
all_point_map[pid] = point_info
all_point_ids = list(all_point_map.keys())
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"pid": all_point_ids}},
{"range": {"quarter_time": {"gte": start_time, "lte": end_time,}}},
],
}
},
"size": 0,
"aggs": {},
}
for point_id in all_point_ids:
ctnum = all_point_map[point_id]["meter_param"]["ctnum"]
if ctnum == 3:
stats_items = [
"ua_mean",
"freq_mean",
"ubl_mean",
"costtl_mean",
"thdua_mean",
"lf_mean",
]
else:
stats_items = ["uab_mean", "freq_mean", "ubl_mean", "costtl_mean", "lf_mean"]
aggs_stats = {}
for stats_item in stats_items:
aggs_stats[stats_item] = {"stats": {"field": stats_item}}
query_body["aggs"][f"point_id_{point_id}_aggs"] = {
"filter": {"term": {"pid": point_id}},
"aggs": aggs_stats,
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body, index=POINT_15MIN_INDEX)
sql = f"""
select pid,avg(ua_mean) ua_mean,avg(uab_mean) uab_mean,avg(freq_mean) freq_mean,
avg(ubl_mean) ubl_mean,avg(costtl_mean) costtl_mean,
avg(thdua_mean) thdua_mean,avg(lf_mean) lf_mean from
point_15min_electric
where pid in %s and create_time >= %s and create_time <=%s
group by pid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql,
args=(all_point_ids, start_time, end_time))
data_map = {i['pid']: i for i in datas}
# 单独计算每个公司的健康指数
company_score_map = {}
for cid in cids:
......@@ -508,28 +485,23 @@ async def load_manage_health_radar(cids, recent_days=30):
continue
inline_point_ids, point_ids = [], []
for point_id, point_item in point_map.items():
if point_item["inlid_belongedto"]:
if point_item["inlid"]:
inline_point_ids.append(point_id)
else:
point_ids.append(point_id)
# 1. 电压偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ua_mean = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("ua_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
ua_mean = data_point_map.get("ua_mean")
if ua_mean is None:
continue
point_info = all_point_map[point_id]
if not point_info["meter_param"]:
# 没有参数的装置, 拆了?
continue
meter_param = point_info["meter_param"]
meter_vc, ctnum = meter_param.get("vc"), meter_param.get("ctnum") or 3
meter_vc, ctnum = point_info.get("vc"), point_info.get(
"ctnum") or 3
if meter_vc:
stand_voltage = meter_vc / sqrt(3) if ctnum == 3 else meter_vc
else:
......@@ -538,23 +510,21 @@ async def load_manage_health_radar(cids, recent_days=30):
score = get_dev_score(dev_type="v", cur=v_dev)
if score is None:
continue
total_score += score
total += 1
v_score = total_score / total if total else 100
# 2. 频率偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
freq_mean = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("freq_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
freq_mean = data_point_map.get("freq_mean")
if freq_mean is None:
continue
freq_dev = freq_mean - FREQ_STANDARD
score = get_dev_score(dev_type="freq", cur=freq_dev)
if score is None:
......@@ -562,16 +532,14 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 3. 三相[电压]不平衡评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ubl_avg = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("ubl_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
ubl_avg = data_point_map.get("ubl_mean")
if ubl_avg is None:
continue
score = get_dev_score(dev_type="ubl", cur=ubl_avg)
......@@ -580,7 +548,7 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 4. 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
if inline_point_ids:
......@@ -588,12 +556,10 @@ async def load_manage_health_radar(cids, recent_days=30):
else:
ids = point_ids
for point_id in ids:
costtl_mean = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("costtl_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
costtl_mean = data_point_map.get("costtl_mean")
if costtl_mean is None:
continue
score = get_dev_score(dev_type="costtl", cur=costtl_mean)
......@@ -602,17 +568,15 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# 4.(电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
thdua_mean = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("thdua_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
thdua_mean = data_point_map.get("thdua_mean")
if thdua_mean is None:
continue
score = get_dev_score(dev_type="thdu", cur=thdua_mean)
......@@ -621,16 +585,14 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 5. 负载率
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
lf_mean = (
es_result.get("aggregations", {})
.get(f"point_id_{point_id}_aggs", {})
.get("lf_mean", {})
.get("avg")
)
data_point_map = data_map.get(point_id)
if not data_point_map:
continue
lf_mean = data_point_map.get("lf_mean")
if lf_mean is None:
score = 100
else:
......@@ -652,8 +614,9 @@ async def load_manage_health_radar(cids, recent_days=30):
lf_score,
)
if not thdu_score:
thdu_score = (v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
thdu_score = (
v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
company_score_map[cid] = {
"v_score": v_score,
"freq_score": freq_score,
......@@ -678,7 +641,7 @@ async def load_manage_health_index(company_score_info):
log.error(f"cid = {cid}load_health_index fail")
company_index_map[cid] = 0
continue
v_score = score_info["v_score"]
freq_score = score_info["freq_score"]
if v_score <= 60 or freq_score <= 60:
......@@ -686,11 +649,12 @@ async def load_manage_health_index(company_score_info):
log.info(f"cid = {cid} v_score or freq_score less 60")
company_index_map[cid] = 0
continue
sub_dev = (1 - (v_score + freq_score) / 200.0) * 20
sub_lf = (1 - score_info["lf_score"] / 100.0) * 20
sub_costtl = (1 - score_info["costtl_score"] / 100.0) * 20
sub_thdu = (1 - score_info["thdu_score"] / 100.0) * 20
sub_ubl = (1 - score_info["ubl_score"] / 100.0) * 20
company_index_map[cid] = 100 - sub_dev - sub_lf - sub_costtl - sub_thdu - sub_ubl
company_index_map[
cid] = 100 - sub_dev - sub_lf - sub_costtl - sub_thdu - sub_ubl
return company_index_map
......@@ -21,26 +21,26 @@ async def get_points(company_ids):
company_point_ids_map = defaultdict(list)
for point in points:
company_point_ids_map[point["cid"]].append(point["pid"])
point_map = {i["pid"]: i for i in points}
point_ids = list(point_map.keys())
pid_field, start_time_field = "pid", "start_time"
sql = f"SELECT pid, mid FROM change_meter_record WHERE pid in %s ORDER BY {pid_field}, {start_time_field}"
records = await conn.fetchall(sql, args=(point_ids,))
newest_point_meter_relation = {i["pid"]: i["mid"] for i in records if
i["mid"]}
valid_mids = list(newest_point_meter_relation.values())
newest_record_map = {i["pid"]: point_map.get(i["pid"]) for i in records
if i["mid"]}
# 根据有效的meter id查询meter参数
async with MysqlUtil() as conn:
mid_field, start_time_field = "mid", "start_time"
mp_sql = f"SELECT vc, mid, ctnum FROM meter_param_record WHERE mid in %s ORDER BY {mid_field}, {start_time_field}"
mps = await conn.fetchall(mp_sql, args=(valid_mids,))
meter_param_map = {i["mid"]: i for i in mps}
for cid, point_ids in company_point_ids_map.items():
for point_id in point_ids:
if point_id in newest_record_map:
......@@ -56,13 +56,14 @@ async def get_points(company_ids):
async def get_points_new15(cids):
sql = "SELECT p.pid,p.cid,p.inlid FROM `point` p INNER JOIN monitor m " \
"on m.mtid=p.mtid where p.cid in %s and m.demolished=0;"
sql = "SELECT p.pid,p.cid,p.inlid,vc,ctnum " \
"FROM `point` p INNER JOIN " \
"monitor m on m.mtid=p.mtid where p.cid in %s and m.demolished=0;"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql, args=(cids,))
company_point_map = defaultdict(dict)
for point in points:
company_point_map[point["cid"]][point["pid"]] = points
company_point_map[point["cid"]][point["pid"]] = point
return company_point_map
......@@ -121,7 +122,7 @@ async def list_point(cid):
for res in result:
pid = res.get("pid")
points[pid] = res
sql = "SELECT id, `group`, item FROM location WHERE cid=%s and `type` in %s"
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(
......@@ -131,7 +132,7 @@ async def list_point(cid):
group = res.get("group")
item = res.get("item")
groups.setdefault(group, []).append((id, item))
for pid, point_info in points.items():
name = point_info.get("name")
add_to_company = point_info["add_to_company"]
......@@ -145,7 +146,7 @@ async def list_point(cid):
comm_point = {"name": name, "point_id": pid, "locations": locations,
"add_to_company": add_to_company}
list_point.append(comm_point)
async with MysqlUtil() as conn:
sql = "SELECT inlid, `name` FROM inline WHERE cid=%s"
inlines = await conn.fetchall(sql, args=(cid,))
......
import random
from datetime import datetime, timedelta
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api import constants
from unify_api.constants import COMPANY_1DAY_POWER, EVENT_TYPE_MAP, Importance
from unify_api.constants import COMPANY_1DAY_POWER, EVENT_TYPE_MAP, Importance, \
CST
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_aggs_type
from unify_api.modules.common.procedures.cids import get_cid_info
......@@ -19,19 +20,17 @@ from unify_api.modules.common.procedures.health_score import (
load_manage_health_radar,
load_manage_health_index,
)
from unify_api.modules.common.procedures.points import get_points
from unify_api.modules.common.procedures.points import get_points, \
get_points_new15
from unify_api.modules.home_page.procedures.count_info_pds import \
datetime_to_timestamp
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.time_format import last30_day_range_today
async def proxy_alarm_score(cids):
now = datetime.now()
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
datetime(now.year, now.month, now.day) - timedelta(30))
now_dt = pendulum.now()
end_time = now_dt.format("YYYY-MM-DD HH:mm:ss")
start_time = now_dt.subtract(days=30).format("YYYY-MM-DD HH:mm:ss")
score_events = [
i
for i in EVENT_TYPE_MAP.keys()
......@@ -49,50 +48,38 @@ async def proxy_alarm_score(cids):
"under_rms_u",
]
]
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"cid": cids}},
{"terms": {"type.keyword": score_events, }},
{"range": {"time": {"gte": start_timestamp,
"lte": end_timestamp, }}},
],
}
},
"size": 0,
"aggs": {},
}
for cid in cids:
query_body["aggs"][f"cid_{cid}_aggs"] = {
"filter": {"term": {"cid": cid}},
"aggs": {"importance": {"terms": {"field": "importance"}}},
}
log.info("cal_score_safe_electric query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
sql = f"""
select cid,importance,count(*) count from point_1min_event
where cid in %s and event_datetime >=%s and event_datetime <= %s
and event_type in %s
group by cid,importance
"""
log.info("cal_score_safe_electric sql={}".format(sql))
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(
cids, start_time, end_time, score_events))
data_map = {"{}-{}".format(i["cid"], i["importance"]): i["count"] for i in
datas}
cid_alarm_score_map = {}
for cid in cids:
cid_aggs_info = es_result.get("aggregations", {}).get(
f"cid_{cid}_aggs", {})
if not cid_aggs_info:
cid_alarm_score_map["alarm_score"] = 0
first_key = "{}-{}".format(cid, Importance.First.value)
second_key = "{}-{}".format(cid, Importance.Second.value)
third_key = "{}-{}".format(cid, Importance.Third.value)
if first_key not in data_map and second_key not in data_map and \
third_key not in data_map:
cid_alarm_score_map[cid] = 100
continue
first_alarm_cnt = 0
second_alarm_cnt = 0
third_alarm_cnt = 0
for bucket in cid_aggs_info.get("importance", {}).get("buckets", []):
if bucket["key"] == Importance.First.value:
first_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Second.value:
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
company_point_map = await get_points(cids)
if first_key in data_map:
first_alarm_cnt = data_map.get(first_key)
if second_key in data_map:
second_alarm_cnt = data_map.get(second_key)
if third_key in data_map:
third_alarm_cnt = data_map.get(third_key)
company_point_map = await get_points_new15(cids)
point_len = len(company_point_map.get(cid) or {})
alarm_score = (
(
......@@ -138,7 +125,7 @@ async def alarm_percentage_count(cids):
FROM
point_1min_event pevent
WHERE
cid = %s
cid in %s
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
......@@ -151,7 +138,7 @@ async def alarm_percentage_count(cids):
FROM
point_1min_event pevent
WHERE
cid = %s
cid in %s
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
......@@ -160,7 +147,7 @@ async def alarm_percentage_count(cids):
async with MysqlUtil() as conn:
event_type_data = await conn.fetchall(event_type_sql, args=(cids,))
importance_data = await conn.fetchall(importance_sql, args=(cids,))
first_alarm_cnt, second_alarm_cnt, third_alarm_cnt = 0, 0, 0
for bucket in importance_data:
if bucket["importance"] == Importance.First.value:
......@@ -169,7 +156,7 @@ async def alarm_percentage_count(cids):
second_alarm_cnt += bucket["doc_count"]
elif bucket["importance"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
temperature_cnt, residual_current_cnt, electric_param_cnt = 0, 0, 0
for bucket in event_type_data:
if bucket["event_type"] in [
......@@ -186,7 +173,7 @@ async def alarm_percentage_count(cids):
residual_current_cnt += bucket["doc_count"]
else:
electric_param_cnt += bucket["doc_count"]
time_distribution_map = await alarm_time_distribution(cids, start,
end)
alarm_percentage_map = {
......@@ -208,7 +195,7 @@ async def alarm_percentage_count_old(cids):
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
datetime(now.year, now.month, now.day) - timedelta(30))
query_body = {
"query": {
"bool": {
......@@ -237,12 +224,12 @@ async def alarm_percentage_count_old(cids):
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
importance_buckets = (
es_result.get("aggregations", {})
.get("importance_aggs", {})
.get("importance", {})
.get("buckets", [])
.get("importance_aggs", {})
.get("importance", {})
.get("buckets", [])
)
first_alarm_cnt, second_alarm_cnt, third_alarm_cnt = 0, 0, 0
for bucket in importance_buckets:
......@@ -252,13 +239,13 @@ async def alarm_percentage_count_old(cids):
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
type_buckets = (
es_result.get("aggregations", {}).get("type_aggs", {}).get("type",
{}).get(
"buckets", [])
)
temperature_cnt, residual_current_cnt, electric_param_cnt = 0, 0, 0
for bucket in type_buckets:
if bucket["key"] in [
......@@ -275,7 +262,7 @@ async def alarm_percentage_count_old(cids):
residual_current_cnt += bucket["doc_count"]
else:
electric_param_cnt += bucket["doc_count"]
start_dt_str = str(datetime.fromtimestamp(start_timestamp))
end_dt_str = str(datetime.fromtimestamp(end_timestamp))
time_distribution_map = await alarm_time_distribution(cids, start_dt_str,
......@@ -301,47 +288,24 @@ async def proxy_today_alarm_cnt(cids, group_field="importance"):
:param group_field: example: "importance"
:return:
"""
now = datetime.now()
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
datetime(now.year, now.month, now.day))
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"cid": cids}},
{"range": {"time": {"gte": start_timestamp,
"lte": end_timestamp, }}},
],
}
},
"size": 0,
"aggs": {},
}
int_group_field = [
"importance",
]
for cid in cids:
query_body["aggs"][f"cid_{cid}_aggs"] = {
"filter": {"term": {"cid": cid}},
"aggs": {
f"{group_field}": {
"terms": {
"field": f"{group_field}"
if group_field in int_group_field
else f"{group_field}.keyword",
"size": 10000,
}
}
},
}
log.info("alarm aggs query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
start_time = pendulum.now(tz=CST).start_of(unit="day").format(
"YYYY-MM-DD HH:mm:ss")
end_time = pendulum.now(tz=CST).format("YYYY-MM-DD HH:mm:ss")
if group_field == "type":
# 需要关联event_type里面的type
group_field = "event_type"
sql = f"""
select cid,{group_field},count(*) count from
point_1min_event
where event_datetime >=%s and event_datetime <= %s
and cid in %s
group by cid,{group_field}
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(start_time, end_time, cids))
log.info("alarm aggs sql={}".format(sql))
cid_alarm_map = {
cid: {
"first_alarm_cnt": 0,
......@@ -355,27 +319,22 @@ async def proxy_today_alarm_cnt(cids, group_field="importance"):
}
for cid in cids
}
for cid in cids:
cid_buckets = (
es_result.get("aggregations", {})
.get(f"cid_{cid}_aggs", {})
.get(group_field, {})
.get("buckets", [])
)
alarm_type_map = {
Importance.First.value: "first_alarm_cnt",
Importance.Second.value: "second_alarm_cnt",
Importance.Third.value: "third_alarm_cnt",
"ele_overload": "overuse_eleprod",
"high_power_app": "high_p_eleprod",
"illegal_ele_app": "illegal_eleprod",
"power_quality_low": "electric_quantity",
"ele_car_battery": "ele_car_battery",
}
for bucket in cid_buckets:
if bucket["key"] in alarm_type_map:
_key = alarm_type_map[bucket["key"]]
cid_alarm_map[cid][_key] += bucket["doc_count"]
alarm_type_map = {
Importance.First.value: "first_alarm_cnt",
Importance.Second.value: "second_alarm_cnt",
Importance.Third.value: "third_alarm_cnt",
"ele_overload": "overuse_eleprod",
"high_power_app": "high_p_eleprod",
"illegal_ele_app": "illegal_eleprod",
"power_quality_low": "electric_quantity",
"ele_car_battery": "ele_car_battery",
}
for data in datas:
cid = data.get("cid")
key = data.get(group_field)
if key in alarm_type_map:
value = alarm_type_map.get(key)
cid_alarm_map[cid][value] = data.get("count")
return cid_alarm_map
......@@ -385,53 +344,22 @@ async def proxy_today_spfv_cnt(cids):
:param cids:
:return:
"""
now = datetime.now()
start_time = datetime(now.year, now.month, now.day)
end_time = datetime(now.year, now.month, now.day, now.hour, now.minute,
now.second)
es_start = datetime.strftime(start_time, "%Y-%m-%dT%H:%M:%S+08:00")
es_end = datetime.strftime(end_time, "%Y-%m-%dT%H:%M:%S+08:00")
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"cid": cids}},
{"range": {
"quarter_time": {"gte": es_start, "lte": es_end}}},
]
}
},
"size": 0,
"aggs": {
"cid_aggs": {
# 注意这里size不设置的话,只会返回结果聚合结果10条,也就是cids最多返回10个
"terms": {"field": "cid", "size": 10000},
"aggs": {
"spfv_aggs": {
"terms": {"field": "spfv.keyword"},
"aggs": {"kwh": {"sum": {"field": "kwh"}}},
}
},
}
},
}
log.info("spfv aggs query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.COMPANY_15MIN_POWER)
cid_buckets = es_result.get("aggregations", {}).get("cid_aggs", {}).get(
"buckets", [])
start_time = pendulum.now(tz=CST).start_of(unit="day").format(
"YYYY-MM-DD HH:mm:ss")
end_time = pendulum.now(tz=CST).format("YYYY-MM-DD HH:mm:ss")
sql = f"""
select cid,spfv,sum(kwh) kwh from company_15min_power
where cid in %s and create_time >=%s and create_time <=%s
group by cid,spfv
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(cids, start_time, end_time))
cid_spfv_map = {cid: {"s": 0, "p": 0, "f": 0, "v": 0} for cid in cids}
for bucket in cid_buckets:
cid = bucket.get("key")
spvf_buckets = bucket.get("spfv_aggs", {}).get("buckets", [])
for i in spvf_buckets:
cid_spfv_map[cid][i["key"]] += round(i["kwh"]["value"])
for data in datas:
cid = data.get("cid")
spfv = data.get("spfv")
cid_spfv_map[cid][spfv] = data.get("kwh")
log.info(f"cid_spfv_map = {cid_spfv_map}")
return cid_spfv_map
......@@ -444,13 +372,13 @@ async def proxy_map_info(cids):
"""
# 1. 今日报警统计
cid_alarm_map = await proxy_today_alarm_cnt(cids)
# 2. 今日用电
cid_power_spfv_map = await proxy_today_spfv_cnt(cids)
# 3. 安全运行天数
cid_safe_run_map = await proxy_safe_run_info(cids)
# 4. 安全排名
cid_alarm_score_map = await proxy_alarm_score(cids)
electric_index_map = {
......@@ -461,21 +389,22 @@ async def proxy_map_info(cids):
[(round(i), cid) for cid, i in electric_index_map.items()],
reverse=True
)
# 5. 健康排名
company_score_map = await load_manage_health_radar(cids, recent_days=7)
company_score_map = await load_manage_health_radar(cids,
recent_days=7)
company_index_map = await load_manage_health_index(company_score_map)
health_index_list = sorted(
[(round(i), cid) for cid, i in company_index_map.items()], reverse=True
)
# 6. 组装返回数据
cid_info_map = {cid: {} for cid in cids}
async with MysqlUtil() as conn:
company_sql = "select cid, shortname, longitude, latitude from company where cid in %s"
companys = await conn.fetchall(company_sql, args=(cids,))
company_map = {i["cid"]: i for i in companys}
print(f"company_index_map = {company_index_map}")
for cid in cids:
cid_info_map[cid]["reg_today_alarm_info"] = {}
......@@ -501,7 +430,7 @@ async def proxy_map_info(cids):
cid_info_map[cid]["longitude"] = company_map[cid].get(
"longitude") or ""
cid_info_map[cid]["latitude"] = company_map[cid].get("latitude") or ""
return cid_info_map
......@@ -513,13 +442,13 @@ async def reg_map_info(cids):
"""
# 1. 今日报警统计
cid_alarm_map = await proxy_today_alarm_cnt(cids, group_field="type")
# 2. 今日用电
cid_power_spfv_map = await proxy_today_spfv_cnt(cids)
# 3. 安全运行天数
cid_safe_run_map = await proxy_safe_run_info(cids)
# 4. 安全排名
cid_alarm_score_map = await proxy_alarm_score(cids)
electric_index_map = {
......@@ -530,11 +459,11 @@ async def reg_map_info(cids):
[(round(i), cid) for cid, i in electric_index_map.items()],
reverse=True
)
# 6. 组装返回数据
cid_info_map = {cid: {} for cid in cids}
company_map = await get_cid_info(cids)
for cid in cids:
cid_info_map[cid]["reg_today_alarm_info"] = cid_alarm_map[cid]
cid_info_map[cid]["today_alarm_info"] = {}
......@@ -547,7 +476,7 @@ async def reg_map_info(cids):
)
cid_info_map[cid]["electric_index"] = round(electric_index_map[cid])
cid_info_map[cid]["electric_score"] = cid_alarm_score_map[cid]
cid_info_map[cid]["company_cnt"] = len(cids)
cid_info_map[cid]["company_name"] = company_map[cid]["shortname"]
cid_info_map[cid]["today_alarm_cnt"] = cid_safe_run_map[cid][
......@@ -555,7 +484,7 @@ async def reg_map_info(cids):
cid_info_map[cid]["longitude"] = company_map[cid].get(
"longitude") or ""
cid_info_map[cid]["latitude"] = company_map[cid].get("latitude") or ""
return cid_info_map
......@@ -583,11 +512,11 @@ async def alarm_safe_power(cid, start, end):
temperature_cnt, residual_current_cnt, lr_cnt, \
power_factor_cnt, under_u_cnt, over_u_cnt, over_i_cnt \
= 0, 0, 0, 0, 0, 0, 0
for bucket in es_res:
# 温度
if bucket["event_type"] in ("overTemp", "overTempRange1min",
"overTempRange15min"):
"overTempRange15min"):
temperature_cnt += bucket["doc_count"]
# 漏电流
elif bucket["event_type"] in ("overResidualCurrent",):
......@@ -607,7 +536,7 @@ async def alarm_safe_power(cid, start, end):
# 过流
elif bucket["event_type"] in ("overI",):
over_i_cnt += bucket["doc_count"]
alarm_map = {
"temperature_cnt": temperature_cnt,
"residual_current_cnt": residual_current_cnt,
......
......@@ -175,61 +175,19 @@ async def alarm_summary(company_ids, start, end, date_type):
:param date_type:
:return:
"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00"
)
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
if date_type == "day":
_format = "yyyy-MM-dd HH:mm:ss"
_min = start_dt.strftime("%Y-%m-%d %H:%M:%S")
_max = end_dt.strftime("%Y-%m-%d %H:%M:%S")
else:
# date_type == "month"
_format = "yyyy-MM-dd"
_min = start_dt.strftime("%Y-%m-%d")
_max = end_dt.strftime("%Y-%m-%d")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str, }}},
{"term": {"mode": "alarm"}},
]
filter_list.append({"terms": {"cid": company_ids}})
query_body = {
"query": {"bool": {"filter": filter_list}},
"size": 0,
"aggs": {
"cid_aggs": {
"terms": {"field": "cid", "size": 10000},
"aggs": {
"date_alarms": {
"date_histogram": {
"field": "datetime",
"order": {"_key": "desc"},
"min_doc_count": 1,
"interval": "day",
"format": "yyyy-MM-dd",
"time_zone": "+08:00",
}
}
},
}
},
}
log.info("alarm_summary query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
print(f"es_result = {es_result}")
buckets = es_result["aggregations"]["cid_aggs"]["buckets"] or []
sql = f"""
select cid,count(*) count from point_1min_event
where cid in %s and event_mode = 'alarm' and event_datetime >= %s
and event_datetime <= %s
group by cid
"""
log.info("alarm_summary sql={}".format(sql))
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(company_ids, start, end))
print(f"datas = {datas}")
total_alarm_cnt, alarm_company_cnt = sum(
[i["doc_count"] for i in buckets]), len(buckets)
cid_alarmcnt_list = [i["doc_count"] for i in buckets]
[i["count"] for i in datas]), len(datas)
cid_alarmcnt_list = [i["count"] for i in datas]
safe_run_map = await proxy_safe_run_info(company_ids, start_time_str=start,
end_time_str=end)
......
......@@ -48,6 +48,7 @@ from unify_api.modules.home_page.service.count_info_service import \
safe_run_sdu, safe_run_sdu_new15
from unify_api.modules.elec_charge.components.elec_charge_cps import \
ProductProxyReq
from unify_api.modules.users.procedures.jwt_user import jwt_user
@summary("代理版首页统计信息-安电U")
......@@ -55,7 +56,7 @@ async def post_count_info_proxy(req) -> CountInfoProxyResp:
# 1. 获取cid_list
host = req.host
product = PRODUCT.get(host)
user_id = req.ctx.user_id
user_id = jwt_user(req)
proxy_id = req.json.get("proxy_id")
# cid_list = await get_cids(user_id, product)
cid_list = await get_proxy_cids(user_id, product, proxy_id)
......@@ -105,7 +106,7 @@ async def post_security_level_count(
async def post_alarm_percentage_count(
request, body: ProxySecurityLevelCntReq
) -> ProxyAlarmPercentageCntResp:
user_id = request.ctx.user_id
user_id = jwt_user(request)
product = body.product
req_cid = body.cid
if not req_cid:
......@@ -137,7 +138,7 @@ async def post_alarm_percentage_count(
@summary("代理版本首页地图数据")
async def post_proxy_map_info(request,
body: ProxySecurityLevelCntReq) -> ProxyIndexMapResp:
user_id = request.ctx.user_id
user_id = jwt_user(request)
product = body.product
req_cid = body.cid
if not req_cid:
......
......@@ -23,6 +23,7 @@ from unify_api.modules.home_page.components.security_info_cps import (
from unify_api.modules.home_page.procedures.security_info_pds import (
alarm_summary, alarm_count_info_new15,
)
from unify_api.modules.users.procedures.jwt_user import jwt_user
@summary("获取首页今日或者近30天安全报警统计信息")
......@@ -164,7 +165,7 @@ async def post_alarm_summary(request, body: SecurityCommonReq) -> AlarmSummaryRe
if not req_cids:
raise BusinessException(message=f"暂无工厂")
if product == Product.AndianUManage.value:
user_id = request.ctx.user_id
user_id = jwt_user(request)
# cids = await get_cids(user_id, product)
proxy_id = body.proxy_id
cids = await get_proxy_cids(user_id, product, proxy_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment