Commit f0fe05e6 authored by ZZH's avatar ZZH

remove es 2023-6-5

parent 15ba9fa7
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api import constants
from pot_libs.mysql_util.mysql_util import MysqlUtil
......@@ -26,30 +23,3 @@ async def get_mean_datas_dao(pids, words, start, end):
datas = await conn.fetchall(sql, args=(pids,))
return datas
return []
async def health_score_points_aggs(start, end, point_list):
"""根据points分组, 再求平均值"""
sql = f"""
SELECT
pid,
avg( uab_mean ) uab_mean_avg,
avg( costtl_mean ) costtl_mean_avg,
avg( lf_mean ) lf_mean_avg,
avg( thduab_mean ) thduab_mean_avg,
avg( ubl_mean ) ubl_mean_avg,
avg( freq_mean ) freq_mean_avg,
avg( ua_mean ) ua_mean_avg
FROM
point_15min_electric
WHERE
create_time > "{start}"
AND create_time < "{end}"
AND pid in %s
GROUP BY
pid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(tuple(point_list),)) if point_list else []
return datas
......@@ -7,277 +7,13 @@ import datetime
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.common.dao.health_score_dao import \
health_score_points_aggs, get_point_dats_dao, get_mean_datas_dao
get_point_dats_dao, get_mean_datas_dao
from unify_api.modules.common.procedures.points import load_compy_points
from unify_api.modules.electric.procedures.electric_util import \
batch_get_wiring_type
from unify_api.modules.home_page.procedures import point_inlines
from unify_api.constants import FREQ_STANDARD
from unify_api.modules.home_page.procedures.dev_grade import get_dev_score
async def load_health_radar(cid, param_point_id=None):
"""获取健康指数雷达-取15min数据
注意:
1.负载率的数据在写入ES15min数据时有做变压器判断逻辑,这里直接读取即可
2.功率因数:有进线级功率因数时,只计算进线级功率因数
3.电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
"""
# 先从redis取,没有则重新计算,过期时间戳为当天0点
# redis_key = "elec_score:health:%s" % cid
# json_score = await RedisClient().get(redis_key)
# if json_score:
# score_info = json.loads(json_score)
# return score_info
# 计算最近7天时间起始
today = pendulum.today()
start_time = str(today.subtract(days=7))
end_time = str(today.subtract(seconds=1))
inline_point_ids = []
point_ids = []
# 1. 获取该工厂所有进线数据
inline_infos = await point_inlines.get_point_inlines(cid)
for pid, inline in inline_infos.items():
if param_point_id and pid != param_point_id:
# 指定了监测点, 只算监测点的健康指数
continue
if inline:
inline_point_ids.append(pid)
else:
point_ids.append(pid)
# 对如下性能差代码做修改
stats = {point_id: {} for point_id in inline_point_ids + point_ids}
point_info_map = await batch_get_wiring_type(inline_point_ids + point_ids)
es_health = await health_score_points_aggs(
start_time, end_time, inline_point_ids + point_ids
)
es_dic = {i["pid"]: i for i in es_health if es_health}
# 统计所有点所有平均值
for point_id in inline_point_ids + point_ids:
ctnum = point_info_map[point_id]["ctnum"]
if ctnum == 3:
stats_items = ["ua_mean", "freq_mean", "ubl_mean", "costtl_mean",
"thdua_mean", "lf_mean"]
else:
stats_items = ["uab_mean", "freq_mean", "ubl_mean", "costtl_mean",
"lf_mean"]
for item in stats_items:
point_v = es_dic.get(point_id)
if not point_v:
stats[point_id][item] = None
else:
stats[point_id][item] = point_v[item + '_avg']
'''
range = Range(field="quarter_time", start=start_time, end=end_time)
stats = {point_id: {} for point_id in inline_point_ids + point_ids}
# 统计所有点所有平均值
for point_id in inline_point_ids + point_ids:
ctnum, mid = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.warn(f"health_radar point_id={point_id} ctnum={ctnum} 找不到ctnum")
continue
if ctnum == 3:
stats_items = [
"ua_mean",
"freq_mean",
"ubl_mean",
"costtl_mean",
"thdua_mean",
"lf_mean",
]
else:
stats_items = ["uab_mean", "freq_mean", "ubl_mean", "costtl_mean", "lf_mean"]
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None, filter=filter)
query_body = EsQuery.aggr_index(page_request, stats_items=stats_items)
print(query_body)
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body, index=POINT_15MIN_INDEX)
if not es_results:
log.warning("can not find data on es(index: %s): %s" % (POINT_15MIN_INDEX, query_body))
aggregations = es_results.get("aggregations", {})
for item in stats_items:
avg = aggregations.get("%s_avg" % item, {}).get("value")
stats[point_id][item] = avg
'''
# 获取所有poin_id和mtid对应关系
all_point_ids = inline_point_ids + point_ids
point_mid_map = {}
if all_point_ids:
sql = (
"SELECT pid, mtid FROM point WHERE pid IN %s order by pid, create_time asc"
)
async with MysqlUtil() as conn:
change_meter_records = await conn.fetchall(sql, args=(
tuple(all_point_ids),))
point_mid_map = {
i["pid"]: i["mtid"] for i in change_meter_records if
i["mtid"] is not None
}
# 获取meter_param_record中的标准电压
all_mids = list(point_mid_map.values())
meter_param_map = {}
if all_mids:
async with MysqlUtil() as conn:
sql = "SELECT mtid, vc, voltage_side, ctnum FROM point WHERE mtid IN %s order by mtid, create_time asc"
meter_param_records = await conn.fetchall(sql,
args=(tuple(all_mids),))
meter_param_map = {i["mtid"]: i for i in meter_param_records}
log.info(f"all_mids={all_mids}")
# 电压偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ua_mean = stats.get(point_id, {}).get("ua_mean")
if ua_mean is None:
continue
mtid = point_mid_map.get(point_id)
if not mtid:
# pid没有mid,拆了
log.warning(f"pid={point_id} mtid={mtid} mid无效")
continue
meter_param = meter_param_map.get(mtid)
if not meter_param:
log.warning(f"pid={point_id} mtid={mtid} 没有表参数")
continue
meter_vc, ctnum = meter_param.get("vc"), meter_param.get("ctnum") or 3
if meter_vc:
stand_voltage = meter_vc / sqrt(3) if ctnum == 3 else meter_vc
else:
stand_voltage = 400 if ctnum == 3 else 10000
v_dev = (ua_mean - stand_voltage) / stand_voltage
score = get_dev_score(dev_type="v", cur=v_dev)
if score is None:
continue
total_score += score
total += 1
v_score = total_score / total if total else 100
# 频率偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
freq_mean = stats.get(point_id, {}).get("freq_mean")
if freq_mean is None:
continue
freq_dev = freq_mean - FREQ_STANDARD
score = get_dev_score(dev_type="freq", cur=freq_dev)
if score is None:
continue
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 三相[电压]不平衡评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ubl_avg = stats.get(point_id, {}).get("ubl_mean")
if ubl_avg is None:
continue
score = get_dev_score(dev_type="ubl", cur=ubl_avg)
if score is None:
continue
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
if inline_point_ids:
ids = inline_point_ids
else:
ids = point_ids
for point_id in ids:
costtl_mean = stats.get(point_id, {}).get("costtl_mean")
if costtl_mean is None:
continue
score = get_dev_score(dev_type="costtl", cur=costtl_mean)
if score is None:
continue
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# (电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
thdua_mean = stats.get(point_id, {}).get("thdua_mean")
if thdua_mean is None:
continue
score = get_dev_score(dev_type="thdu", cur=thdua_mean)
if score is None:
continue
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 负载率
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
lf_mean = stats.get(point_id, {}).get("lf_mean")
if lf_mean is None:
score = 100
else:
score = get_dev_score(dev_type="lf", cur=lf_mean)
if score is None:
continue
total_score += score
total += 1
lf_score = total_score / total if total else 100
log.info(
"v_score:%s, freq_score:%s, ubl_score:%s, costtl_score:%s, " "thdu_score:%s, lf_score:%s",
v_score,
freq_score,
ubl_score,
costtl_score,
thdu_score,
lf_score,
)
if not thdu_score:
thdu_score = (
v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
# 存入redis
score_info = {
"v_score": v_score,
"freq_score": freq_score,
"ubl_score": ubl_score,
"costtl_score": costtl_score,
"thdu_score": thdu_score,
"lf_score": lf_score,
}
# now_ts = pendulum.now().int_timestamp
# tomorrow_ts = pendulum.tomorrow().int_timestamp
# exp_ts = tomorrow_ts - now_ts
#
# await RedisClient().setex(redis_key, exp_ts, json.dumps(score_info))
return score_info
async def load_health_radar_new15(cid, param_point_id=None):
# 计算最近7天时间起始
today = datetime.date.today()
start_time = (today - datetime.timedelta(days=7)).strftime(
......@@ -406,8 +142,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
async def load_health_index(cid, point_id=None):
"""用电健康指数"""
# score_info = await load_health_radar(cid, point_id)
score_info = await load_health_radar_new15(cid, point_id)
score_info = await load_health_radar(cid, point_id)
if score_info is None:
log.error("load_health_index fail")
......
......@@ -241,20 +241,7 @@ async def power_charge_p_point_aggs(date_start, date_end, pid_list, interval):
return es_re["aggregations"]["pids"]["buckets"]
async def point_aggs_kwh(point_list, start=None, end=None):
if start and end:
sql = f"SELECT sum(kwh) kwh, sum(charge) charge FROM " \
f"`point_15min_power` " \
f"where pid in %s and create_time BETWEEN {start} and {end}"
else:
sql = "SELECT sum(kwh) kwh,sum(charge) charge FROM point_15min_power" \
" where pid in %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(point_list,))
return data
async def point_aggs_kwh_new15(point_list, start=None, end=None):
async def point_kwh_charge(point_list, start=None, end=None):
"""1.5版本根据pid,求电量电费"""
if start and end:
sql = f"SELECT sum(kwh) kwh, sum(charge) charge FROM " \
......
......@@ -14,7 +14,7 @@ from unify_api.modules.electric_optimization.procedures.optimization_pds import
)
from unify_api.constants import ADD_ELE_PRICE
from unify_api.modules.elec_charge.dao.elec_charge_dao import (
point_aggs_kwh, point_aggs_kwh_new15
point_kwh_charge
)
......@@ -84,7 +84,7 @@ async def power_factor_service(inline_id):
save_charge = None
inline = await tc_by_inline_id(inline_id)
point_list = await pids_by_cid(inline.get("cid"))
charge_res = await point_aggs_kwh_new15(point_list)
charge_res = await point_kwh_charge(point_list)
# total_charge = charge_res["aggregations"]["charge"]["value"]
total_charge = charge_res.get("charge") or 0
if last_month_pf and 0.9 > last_month_pf >= 0:
......
......@@ -9,7 +9,7 @@ from unify_api.modules.common.dao.common_dao import monitor_by_cid, tsp_by_cid,
from unify_api.modules.common.procedures import health_score
from unify_api.modules.common.procedures.points import proxy_points, list_point
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, point_aggs_kwh
query_charge_aggs_points, point_kwh_charge
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
load_proxy_power
from unify_api.modules.home_page.components.count_info_proxy_cps import \
......@@ -183,8 +183,8 @@ async def info_yang_chen_service(cid):
# 获取全部雾炮监测点
storey_list = await storey_wp_by_cid(cid)
point_list = [storey["point_id"] for storey in storey_list]
kwh_res = await point_aggs_kwh(point_list)
total_kwh = kwh_res["aggregations"]["kwh"]["value"]
kwh_res = await point_kwh_charge(point_list)
total_kwh = kwh_res.get("kwh") or 0
return IycResp(
total_point=total_point,
air_quality=air_quality,
......@@ -352,26 +352,6 @@ async def alarm_safe_index_service(cid):
)
async def health_status_index_service(cid):
"""首页-健康指数等"""
# 1. 健康评分, 雷达
score_info = await health_score.load_health_radar(cid)
# 2. 健康指数
health_index = await health_score.load_health_index(cid)
# 3. 状态
health_status = health_status_res(health_index, "wechat")
return HsiResp(
health_index=round(health_index),
health_status=health_status,
v_dev=round(score_info["v_score"]),
freq_dev=round(score_info["freq_score"]),
ubl=round(score_info["ubl_score"]),
costtl=round(score_info["costtl_score"]),
thdu=round(score_info["thdu_score"]),
lf=round(score_info["lf_score"]),
)
async def economic_index_res(cid):
"""经济指数"""
count_info_map = await optimization_count_info(cid)
......
......@@ -31,8 +31,7 @@ from unify_api.modules.home_page.service.count_info_service import \
post_zd_info_factory_service, risk_cost_service, info_yang_chen_service, \
info_yang_chen_map_service, rank_type_ranking_service, \
condition_monitor_service, alarm_price_costtl_service, \
alarm_safe_index_service, health_status_index_service, \
all_index_info_service
alarm_safe_index_service, all_index_info_service
@summary("获取首页统计信息")
......@@ -316,7 +315,7 @@ async def post_health_status_index(req, body: CidReq) -> HsiResp:
# 1. 获取参数
cid = body.cid
# 2. 调service获取数据
return await health_status_index_service(cid)
# return await health_status_index_service(cid)
@summary("首页-全部指数")
......
......@@ -56,9 +56,7 @@ async def post_health_radar(req, body: QueryDetails) -> HealthRadarResp:
log.error("param error")
raise ParamException(message="参数错误, cid参数必须是一个正整数!")
# score_info = await health_score.load_health_radar(cid)
score_info = await health_score.load_health_radar_new15(cid)
score_info = await health_score.load_health_radar(cid)
return HealthRadarResp(
v_dev=score_info["v_score"],
freq_dev=score_info["freq_score"],
......@@ -73,7 +71,6 @@ async def post_health_radar(req, body: QueryDetails) -> HealthRadarResp:
@description("每分钟刷新一次")
async def post_health_ctl_rate(req, body: QueryDetails) -> HealthCtlRateRes:
cid = body.cid
# return await health_ctl_rate_service(cid)
return await health_ctl_rate_srv(cid)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment