Commit d059b272 authored by ZZH's avatar ZZH

remove es 2023-6-2

parent 984a952d
......@@ -25,7 +25,7 @@ from unify_api.modules.anshiu.procedures.scope_operations_pds import \
from unify_api.modules.device_cloud.procedures.mqtt_helper import \
change_param_to_config
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type_new15
load_point_ctnum
from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope
from unify_api.utils.time_format import get_time_duration, \
......@@ -490,7 +490,7 @@ async def scope_detail_service(event_id):
if not scope_data or not scope_data.get("url"):
return {}, [], [], [], [], [], [], [], []
# 接线法:二表/三表
ctnum = await get_wiring_type_new15(pid) or 3
ctnum = await load_point_ctnum(pid) or 3
# 查询录波详细数据 还不确定在哪儿拿,现在脚本是minio里边,但是家义说未来要在青云里面
# 测试数据存储
# filedata/electric_ops/scope/A2004000519/2022/6/27/17/1656321462.json
......
......@@ -8,8 +8,7 @@ from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.common.dao.health_score_dao import \
health_score_points_aggs, get_point_dats_dao, get_mean_datas_dao
from unify_api.modules.common.procedures.points import get_points, \
get_points_new15
from unify_api.modules.common.procedures.points import load_compy_points
from unify_api.modules.electric.procedures.electric_util import \
batch_get_wiring_type
from unify_api.modules.home_page.procedures import point_inlines
......@@ -447,7 +446,7 @@ async def load_manage_health_radar(cids, recent_days=30):
today = pendulum.today()
start_time = today.subtract(days=recent_days).format("YYYY-MM-DD HH:mm:ss")
end_time = str(today.subtract(seconds=1)).format("YYYY-MM-DD HH:mm:ss")
company_point_map = await get_points_new15(cids)
company_point_map = await load_compy_points(cids)
all_point_map = dict()
for cid, points in company_point_map.items():
for pid, point_info in points.items():
......
......@@ -6,56 +6,7 @@ from unify_api.modules.common.components.list_points_cps import CommonLocation,
from unify_api.modules.common.dao.common_dao import query_points_by_storey
async def get_points(company_ids):
"""
获取一个工厂的有效监测点
:param company_id:
:return: 返回{"cid": {"point_id": point_info}}
"""
company_point_map = defaultdict(dict)
async with MysqlUtil() as conn:
point_sql = (
"select pid, cid cid, inlid from point where cid in %s"
)
points = await conn.fetchall(point_sql, args=(company_ids,))
company_point_ids_map = defaultdict(list)
for point in points:
company_point_ids_map[point["cid"]].append(point["pid"])
point_map = {i["pid"]: i for i in points}
point_ids = list(point_map.keys())
pid_field, start_time_field = "pid", "start_time"
sql = f"SELECT pid, mid FROM change_meter_record WHERE pid in %s ORDER BY {pid_field}, {start_time_field}"
records = await conn.fetchall(sql, args=(point_ids,))
newest_point_meter_relation = {i["pid"]: i["mid"] for i in records if
i["mid"]}
valid_mids = list(newest_point_meter_relation.values())
newest_record_map = {i["pid"]: point_map.get(i["pid"]) for i in records
if i["mid"]}
# 根据有效的meter id查询meter参数
async with MysqlUtil() as conn:
mid_field, start_time_field = "mid", "start_time"
mp_sql = f"SELECT vc, mid, ctnum FROM meter_param_record WHERE mid in %s ORDER BY {mid_field}, {start_time_field}"
mps = await conn.fetchall(mp_sql, args=(valid_mids,))
meter_param_map = {i["mid"]: i for i in mps}
for cid, point_ids in company_point_ids_map.items():
for point_id in point_ids:
if point_id in newest_record_map:
mid = newest_point_meter_relation[point_id]
meter_param = meter_param_map.get(mid)
if not meter_param:
# 如果不存在参数,拆了?
continue
point_info = newest_record_map[point_id]
point_info["meter_param"] = meter_param
company_point_map[cid][point_id] = point_info
return company_point_map
async def get_points_new15(cids):
async def load_compy_points(cids):
sql = "SELECT p.pid,p.cid,p.inlid,vc,ctnum " \
"FROM `point` p INNER JOIN " \
"monitor m on m.mtid=p.mtid where p.cid in %s and m.demolished=0;"
......
......@@ -26,7 +26,7 @@ async def get_wiring_type(point_id):
return ctnum, mtid
async def get_wiring_type_new15(pid):
async def load_point_ctnum(pid):
sql = "SELECT ctnum FROM `point` WHERE pid=%s"
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, args=(pid,))
......
......@@ -4,7 +4,7 @@ import pendulum
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.common.procedures.common_cps import proxy_safe_run_info
from unify_api.modules.common.procedures.points import get_points, get_points_new15
from unify_api.modules.common.procedures.points import load_compy_points
async def proxy_electric_count_info(cids, month_str):
......@@ -14,8 +14,7 @@ async def proxy_electric_count_info(cids, month_str):
sql = "select * from safe_health_stats_cid where cid in %s and cal_month=%s"
safe_stats_list = await conn.fetchall(sql, args=(cids, month_str,))
# company_point_map = await get_points(cids)
company_point_map = await get_points_new15(cids)
company_point_map = await load_compy_points(cids)
cid_alarm_score_map = {}
alarm_content_map = {
......
......@@ -16,7 +16,7 @@ from unify_api.modules.electric.procedures.electric_pds import \
from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2, round_4, multiplication_two
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type, get_wiring_type_new15
get_wiring_type, load_point_ctnum
from datetime import datetime
from unify_api.modules.common.procedures.location_temp_rcurrent import \
location_stats_statics
......@@ -415,7 +415,7 @@ async def qual_current_level_service(point_list):
async def elec_index_service(cid, point_id, start, end):
ctnum = await get_wiring_type_new15(point_id)
ctnum = await load_point_ctnum(point_id)
ctnum = ctnum if ctnum == 2 else 3
now = str(datetime.now())
if start[:10] == now[:10] and end[:10] == now[:10]:
......
......@@ -25,7 +25,7 @@ from unify_api.modules.electric.service.electric_service import (
from unify_api.utils import time_format
from unify_api import constants
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type, get_wiring_type_new15,
get_wiring_type, load_point_ctnum,
add_random_change,
)
......@@ -90,7 +90,7 @@ async def post_elec_history(req, body: PageRequest) -> ElecHistoryResponse:
async def elec_history_service(start, end, pid, intervel, slots):
ctnum = await get_wiring_type_new15(pid)
ctnum = await load_point_ctnum(pid)
if ctnum == 2:
stats_items = [
"lf_mean", "pttl_mean", "qttl_mean", "costtl_mean", "uab_mean",
......@@ -358,7 +358,7 @@ async def post_qual_history(req, body: PageRequest) -> QualHistoryResponse:
async def qual_history_service(start, end, intervel, slots, pid):
ctnum = await get_wiring_type_new15(pid)
ctnum = await load_point_ctnum(pid)
if intervel == 900:
table_name = "point_15min_electric"
date_fmt = "%%H:%%i"
......
from datetime import datetime, timedelta
from datetime import datetime
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import COMPANY_1DAY_POWER, EVENT_TYPE_MAP, Importance, \
CST
from unify_api.constants import EVENT_TYPE_MAP, Importance, CST
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_aggs_type
from unify_api.modules.common.procedures.cids import get_cid_info
from unify_api.modules.common.procedures.common_cps import (
proxy_safe_run_info,
alarm_time_distribution,
proxy_safe_run_info, alarm_time_distribution,
)
from unify_api.modules.common.procedures.common_utils import get_electric_index
from unify_api.modules.common.procedures.health_score import (
load_manage_health_radar,
load_manage_health_index,
load_manage_health_radar, load_manage_health_index,
)
from unify_api.modules.common.procedures.points import get_points, \
get_points_new15
from unify_api.modules.home_page.procedures.count_info_pds import \
datetime_to_timestamp
from unify_api.modules.common.procedures.points import load_compy_points
from unify_api.utils.time_format import last30_day_range_today
......@@ -60,7 +50,7 @@ async def proxy_alarm_score(cids):
cids, start_time, end_time, score_events))
data_map = {"{}-{}".format(i["cid"], i["importance"]): i["count"] for i in
datas}
cid_alarm_score_map = {}
for cid in cids:
first_key = "{}-{}".format(cid, Importance.First.value)
......@@ -79,7 +69,7 @@ async def proxy_alarm_score(cids):
second_alarm_cnt = data_map.get(second_key)
if third_key in data_map:
third_alarm_cnt = data_map.get(third_key)
company_point_map = await get_points_new15(cids)
company_point_map = await load_compy_points(cids)
point_len = len(company_point_map.get(cid) or {})
alarm_score = (
(
......@@ -105,11 +95,11 @@ async def security_level_count(cids):
for cid, alarm_score in cid_alarm_score_map.items():
if alarm_score <= 1:
security_level_map["security_cnt"] += 1
elif alarm_score > 1 and alarm_score <= 2:
elif 1 < alarm_score <= 2:
security_level_map["pretty_low_cnt"] += 1
elif alarm_score > 2 and alarm_score <= 5:
elif 2 < alarm_score <= 5:
security_level_map["medium_cnt"] += 1
elif alarm_score > 5 and alarm_score <= 10:
elif 5 < alarm_score <= 10:
security_level_map["pretty_high_cnt"] += 1
else:
security_level_map["high_cnt"] += 1
......@@ -147,7 +137,7 @@ async def alarm_percentage_count(cids):
async with MysqlUtil() as conn:
event_type_data = await conn.fetchall(event_type_sql, args=(cids,))
importance_data = await conn.fetchall(importance_sql, args=(cids,))
first_alarm_cnt, second_alarm_cnt, third_alarm_cnt = 0, 0, 0
for bucket in importance_data:
if bucket["importance"] == Importance.First.value:
......@@ -156,7 +146,7 @@ async def alarm_percentage_count(cids):
second_alarm_cnt += bucket["doc_count"]
elif bucket["importance"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
temperature_cnt, residual_current_cnt, electric_param_cnt = 0, 0, 0
for bucket in event_type_data:
if bucket["event_type"] in [
......@@ -173,7 +163,7 @@ async def alarm_percentage_count(cids):
residual_current_cnt += bucket["doc_count"]
else:
electric_param_cnt += bucket["doc_count"]
time_distribution_map = await alarm_time_distribution(cids, start,
end)
alarm_percentage_map = {
......@@ -190,97 +180,6 @@ async def alarm_percentage_count(cids):
return alarm_percentage_map
async def alarm_percentage_count_old(cids):
now = datetime.now()
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
datetime(now.year, now.month, now.day) - timedelta(30))
query_body = {
"query": {
"bool": {
"filter": [
{"terms": {"cid": cids}},
{"range": {"time": {"gte": start_timestamp,
"lte": end_timestamp, }}},
],
}
},
"size": 0,
"aggs": {
"importance_aggs": {
"filter": {"terms": {"cid": cids}},
"aggs": {"importance": {
"terms": {"field": "importance", "size": 10000}}},
},
"type_aggs": {
"filter": {"terms": {"cid": cids}},
"aggs": {"type": {
"terms": {"field": "type.keyword", "size": 10000}}},
},
},
}
log.info("cal_score_safe_electric query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
importance_buckets = (
es_result.get("aggregations", {})
.get("importance_aggs", {})
.get("importance", {})
.get("buckets", [])
)
first_alarm_cnt, second_alarm_cnt, third_alarm_cnt = 0, 0, 0
for bucket in importance_buckets:
if bucket["key"] == Importance.First.value:
first_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Second.value:
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
type_buckets = (
es_result.get("aggregations", {}).get("type_aggs", {}).get("type",
{}).get(
"buckets", [])
)
temperature_cnt, residual_current_cnt, electric_param_cnt = 0, 0, 0
for bucket in type_buckets:
if bucket["key"] in [
"overTemp",
"overTempRange1min",
"overTempRange15min",
"overTempTrendDaily",
"overTempTrendQuarterly",
]:
temperature_cnt += bucket["doc_count"]
elif bucket["key"] in [
"overResidualCurrent",
]:
residual_current_cnt += bucket["doc_count"]
else:
electric_param_cnt += bucket["doc_count"]
start_dt_str = str(datetime.fromtimestamp(start_timestamp))
end_dt_str = str(datetime.fromtimestamp(end_timestamp))
time_distribution_map = await alarm_time_distribution(cids, start_dt_str,
end_dt_str)
alarm_percentage_map = {
"first_alarm_cnt": first_alarm_cnt,
"second_alarm_cnt": second_alarm_cnt,
"third_alarm_cnt": third_alarm_cnt,
"temperature_cnt": temperature_cnt,
"residual_current_cnt": residual_current_cnt,
"electric_param_cnt": electric_param_cnt,
"day_alarm_cnt": time_distribution_map["day_alarm_cnt"],
"night_alarm_cnt": time_distribution_map["night_alarm_cnt"],
"morning_alarm_cnt": time_distribution_map["morning_alarm_cnt"],
}
return alarm_percentage_map
async def proxy_today_alarm_cnt(cids, group_field="importance"):
"""
代码复用,默认是按照importance字段分组
......@@ -303,9 +202,9 @@ async def proxy_today_alarm_cnt(cids, group_field="importance"):
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(start_time, end_time, cids))
log.info("alarm aggs sql={}".format(sql))
cid_alarm_map = {
cid: {
"first_alarm_cnt": 0,
......@@ -354,7 +253,7 @@ async def proxy_today_spfv_cnt(cids):
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(cids, start_time, end_time))
cid_spfv_map = {cid: {"s": 0, "p": 0, "f": 0, "v": 0} for cid in cids}
for data in datas:
cid = data.get("cid")
......@@ -372,13 +271,13 @@ async def proxy_map_info(cids):
"""
# 1. 今日报警统计
cid_alarm_map = await proxy_today_alarm_cnt(cids)
# 2. 今日用电
cid_power_spfv_map = await proxy_today_spfv_cnt(cids)
# 3. 安全运行天数
cid_safe_run_map = await proxy_safe_run_info(cids)
# 4. 安全排名
cid_alarm_score_map = await proxy_alarm_score(cids)
electric_index_map = {
......@@ -389,7 +288,7 @@ async def proxy_map_info(cids):
[(round(i), cid) for cid, i in electric_index_map.items()],
reverse=True
)
# 5. 健康排名
company_score_map = await load_manage_health_radar(cids,
recent_days=7)
......@@ -397,14 +296,14 @@ async def proxy_map_info(cids):
health_index_list = sorted(
[(round(i), cid) for cid, i in company_index_map.items()], reverse=True
)
# 6. 组装返回数据
cid_info_map = {cid: {} for cid in cids}
async with MysqlUtil() as conn:
company_sql = "select cid, shortname, longitude, latitude from company where cid in %s"
companys = await conn.fetchall(company_sql, args=(cids,))
company_map = {i["cid"]: i for i in companys}
print(f"company_index_map = {company_index_map}")
for cid in cids:
cid_info_map[cid]["reg_today_alarm_info"] = {}
......@@ -430,7 +329,7 @@ async def proxy_map_info(cids):
cid_info_map[cid]["longitude"] = company_map[cid].get(
"longitude") or ""
cid_info_map[cid]["latitude"] = company_map[cid].get("latitude") or ""
return cid_info_map
......@@ -442,13 +341,13 @@ async def reg_map_info(cids):
"""
# 1. 今日报警统计
cid_alarm_map = await proxy_today_alarm_cnt(cids, group_field="type")
# 2. 今日用电
cid_power_spfv_map = await proxy_today_spfv_cnt(cids)
# 3. 安全运行天数
cid_safe_run_map = await proxy_safe_run_info(cids)
# 4. 安全排名
cid_alarm_score_map = await proxy_alarm_score(cids)
electric_index_map = {
......@@ -459,11 +358,11 @@ async def reg_map_info(cids):
[(round(i), cid) for cid, i in electric_index_map.items()],
reverse=True
)
# 6. 组装返回数据
cid_info_map = {cid: {} for cid in cids}
company_map = await get_cid_info(cids)
for cid in cids:
cid_info_map[cid]["reg_today_alarm_info"] = cid_alarm_map[cid]
cid_info_map[cid]["today_alarm_info"] = {}
......@@ -476,7 +375,7 @@ async def reg_map_info(cids):
)
cid_info_map[cid]["electric_index"] = round(electric_index_map[cid])
cid_info_map[cid]["electric_score"] = cid_alarm_score_map[cid]
cid_info_map[cid]["company_cnt"] = len(cids)
cid_info_map[cid]["company_name"] = company_map[cid]["shortname"]
cid_info_map[cid]["today_alarm_cnt"] = cid_safe_run_map[cid][
......@@ -484,7 +383,7 @@ async def reg_map_info(cids):
cid_info_map[cid]["longitude"] = company_map[cid].get(
"longitude") or ""
cid_info_map[cid]["latitude"] = company_map[cid].get("latitude") or ""
return cid_info_map
......@@ -508,40 +407,38 @@ async def total_run_day_proxy(cids):
async def alarm_safe_power(cid, start, end):
"""状态监测, 安全和用电, 智电u"""
temp_cnt, over_rc_cnt, lr_cnt, pf_cnt = 0, 0, 0, 0
under_u_cnt, over_u_cnt, over_i_cnt = 0, 0, 0
es_res = await sdu_alarm_aggs_type(cid, start, end)
temperature_cnt, residual_current_cnt, lr_cnt, \
power_factor_cnt, under_u_cnt, over_u_cnt, over_i_cnt \
= 0, 0, 0, 0, 0, 0, 0
for bucket in es_res:
# 温度
if bucket["event_type"] in ("overTemp", "overTempRange1min",
"overTempRange15min"):
temperature_cnt += bucket["doc_count"]
e_type = bucket["event_type"]
if e_type in ("overTemp", "overTempRange1min", "overTempRange15min"):
temp_cnt += bucket["doc_count"]
# 漏电流
elif bucket["event_type"] in ("overResidualCurrent",):
residual_current_cnt += bucket["doc_count"]
elif e_type in ("overResidualCurrent",):
over_rc_cnt += bucket["doc_count"]
# 负载率
elif bucket["event_type"] in ("overPR",):
elif e_type in ("overPR",):
lr_cnt += bucket["doc_count"]
# 功率因数
elif bucket["event_type"] in ("underPhasePF", "underTotalPF"):
power_factor_cnt += bucket["doc_count"]
elif e_type in ("underPhasePF", "underTotalPF"):
pf_cnt += bucket["doc_count"]
# 欠压
elif bucket["event_type"] in ("underU",):
elif e_type in ("underU",):
under_u_cnt += bucket["doc_count"]
# 过压
elif bucket["event_type"] in ("overU",):
elif e_type in ("overU",):
over_u_cnt += bucket["doc_count"]
# 过流
elif bucket["event_type"] in ("overI",):
elif e_type in ("overI",):
over_i_cnt += bucket["doc_count"]
alarm_map = {
"temperature_cnt": temperature_cnt,
"residual_current_cnt": residual_current_cnt,
"temperature_cnt": temp_cnt,
"residual_current_cnt": over_rc_cnt,
"lr_cnt": lr_cnt,
"power_factor_cnt": power_factor_cnt,
"power_factor_cnt": pf_cnt,
"under_u_cnt": under_u_cnt,
"over_u_cnt": over_u_cnt,
"over_i_cnt": over_i_cnt,
......
......@@ -9,7 +9,7 @@ from unify_api.modules.common.procedures.alarm_cps import alarm_count, \
from unify_api.modules.common.procedures.cids import get_cids, get_cid_info, \
get_proxy_cids
from unify_api.modules.common.procedures.common_cps import proxy_safe_run_info
from unify_api.modules.common.procedures.points import proxy_points, get_points
from unify_api.modules.common.procedures.points import proxy_points
from unify_api.modules.common.procedures.power_cps import power_use_count, \
load_cmpy_power
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
......@@ -189,31 +189,6 @@ async def post_proxy_map_info(request,
)
@summary("识电U首页统计信息")
async def post_count_info_sdu(req) -> CountInfoSduResp:
user_id = req.ctx.user_id
product = req.json["product"]
cids = await get_cids(user_id, product=product)
company_point_map = await get_points(cids)
safe_run_map = await proxy_safe_run_info(cids)
total_tenant, safe_operation_days = 0, 0
for cid in cids:
total_tenant += len(company_point_map[cid])
safe_operation_days += safe_run_map[cid]["safe_run_days"]
total_power = await power_use_count(cids)
total_alarm = await alarm_count(cids)
online_rate = 93 + random.choice([1, 1.2, 1.5, 1.7, 2])
return CountInfoSduResp(
total_tenant=total_tenant,
online_rate=online_rate,
safe_operation_days=safe_operation_days,
total_power=total_power,
total_alarm=total_alarm,
)
@summary("识电U首页报警分布")
async def post_reg_alarm_distribution(request,
body: AlarmRankingReq) -> AlarmDistributionResp:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment