Commit 4af81566 authored by ZZH's avatar ZZH

remove es 2023-6-2

parent cccfde1f
......@@ -61,15 +61,6 @@ async def points_monitor_by_cid(cids):
return points
async def mid_by_pid(pid):
sql = "select mtid from point where pid=%s " \
"order by create_time desc limit 1 "
async with MysqlUtil() as conn:
mid_dic = await conn.fetchone(sql, args=(pid,))
return mid_dic
async def get_point_monitor_dao(pid):
sql = "SELECT m.meter_no,m.mtid,m.sid," \
"p.ctr,p.ptr,p.ctnum,,,p.imax" \
......@@ -88,14 +79,6 @@ async def get_location_monitor_dao(lid):
return datas
async def meter_by_mid(mid):
sql = "select mid,sid,meter_no from meter where mid=%s"
async with MysqlUtil() as conn:
meter_dic = await conn.fetchone(sql, args=(mid,))
return meter_dic
async def meter_by_mids(mids):
sql = "select * from meter where mid in %s"
......@@ -135,15 +118,6 @@ async def meter_param_by_mid(mtid):
return meter_param_dic
async def change_sensor_by_location(location_id):
sql = "select location_id,sid,field from change_sensor_record " \
"where location_id=%s order by start_time desc limit 1"
async with MysqlUtil() as conn:
sensor_dic = await conn.fetchone(sql, args=(location_id,))
return sensor_dic
async def tsp_by_cid(cid):
sql = "SELECT tsp_id, name FROM tsp WHERE cid = %s " \
......@@ -12,7 +12,7 @@ from unify_api.modules.common.procedures.points import load_compy_points
from unify_api.modules.electric.procedures.electric_util import \
from unify_api.modules.home_page.procedures import point_inlines
from unify_api.constants import FREQ_STANDARD, POINT_15MIN_INDEX
from unify_api.constants import FREQ_STANDARD
from unify_api.modules.home_page.procedures.dev_grade import get_dev_score
......@@ -29,12 +29,12 @@ async def load_health_radar(cid, param_point_id=None):
# if json_score:
# score_info = json.loads(json_score)
# return score_info
# 计算最近7天时间起始
today =
start_time = str(today.subtract(days=7))
end_time = str(today.subtract(seconds=1))
inline_point_ids = []
point_ids = []
# 1. 获取该工厂所有进线数据
......@@ -47,7 +47,7 @@ async def load_health_radar(cid, param_point_id=None):
# 对如下性能差代码做修改
stats = {point_id: {} for point_id in inline_point_ids + point_ids}
point_info_map = await batch_get_wiring_type(inline_point_ids + point_ids)
......@@ -64,7 +64,7 @@ async def load_health_radar(cid, param_point_id=None):
stats_items = ["uab_mean", "freq_mean", "ubl_mean", "costtl_mean",
for item in stats_items:
point_v = es_dic.get(point_id)
if not point_v:
......@@ -122,7 +122,7 @@ async def load_health_radar(cid, param_point_id=None):
i["pid"]: i["mtid"] for i in change_meter_records if
i["mtid"] is not None
# 获取meter_param_record中的标准电压
all_mids = list(point_mid_map.values())
meter_param_map = {}
......@@ -132,7 +132,7 @@ async def load_health_radar(cid, param_point_id=None):
meter_param_records = await conn.fetchall(sql,
meter_param_map = {i["mtid"]: i for i in meter_param_records}"all_mids={all_mids}")
# 电压偏差评分
total, total_score = 0, 0
......@@ -140,7 +140,7 @@ async def load_health_radar(cid, param_point_id=None):
ua_mean = stats.get(point_id, {}).get("ua_mean")
if ua_mean is None:
mtid = point_mid_map.get(point_id)
if not mtid:
# pid没有mid,拆了
......@@ -155,47 +155,47 @@ async def load_health_radar(cid, param_point_id=None):
stand_voltage = meter_vc / sqrt(3) if ctnum == 3 else meter_vc
stand_voltage = 400 if ctnum == 3 else 10000
v_dev = (ua_mean - stand_voltage) / stand_voltage
score = get_dev_score(dev_type="v", cur=v_dev)
if score is None:
total_score += score
total += 1
v_score = total_score / total if total else 100
# 频率偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
freq_mean = stats.get(point_id, {}).get("freq_mean")
if freq_mean is None:
freq_dev = freq_mean - FREQ_STANDARD
score = get_dev_score(dev_type="freq", cur=freq_dev)
if score is None:
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 三相[电压]不平衡评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
ubl_avg = stats.get(point_id, {}).get("ubl_mean")
if ubl_avg is None:
score = get_dev_score(dev_type="ubl", cur=ubl_avg)
if score is None:
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
if inline_point_ids:
......@@ -206,15 +206,15 @@ async def load_health_radar(cid, param_point_id=None):
costtl_mean = stats.get(point_id, {}).get("costtl_mean")
if costtl_mean is None:
score = get_dev_score(dev_type="costtl", cur=costtl_mean)
if score is None:
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# (电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
......@@ -222,15 +222,15 @@ async def load_health_radar(cid, param_point_id=None):
thdua_mean = stats.get(point_id, {}).get("thdua_mean")
if thdua_mean is None:
score = get_dev_score(dev_type="thdu", cur=thdua_mean)
if score is None:
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 负载率
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -241,7 +241,7 @@ async def load_health_radar(cid, param_point_id=None):
score = get_dev_score(dev_type="lf", cur=lf_mean)
if score is None:
total_score += score
total += 1
lf_score = total_score / total if total else 100
......@@ -257,7 +257,7 @@ async def load_health_radar(cid, param_point_id=None):
if not thdu_score:
thdu_score = (
v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
# 存入redis
score_info = {
"v_score": v_score,
......@@ -267,13 +267,13 @@ async def load_health_radar(cid, param_point_id=None):
"thdu_score": thdu_score,
"lf_score": lf_score,
# now_ts =
# tomorrow_ts = pendulum.tomorrow().int_timestamp
# exp_ts = tomorrow_ts - now_ts
# await RedisClient().setex(redis_key, exp_ts, json.dumps(score_info))
return score_info
......@@ -335,7 +335,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 三相[电压]不平衡评分
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -348,7 +348,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -361,7 +361,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# (电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
......@@ -375,7 +375,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 负载率
total, total_score = 0, 0
for index, data in points_datas.items():
......@@ -389,7 +389,7 @@ async def load_health_radar_new15(cid, param_point_id=None):
total_score += score
total += 1
lf_score = total_score / total if total else 100
if not thdu_score:
thdu_score = (v_score + freq_score + ubl_score + costtl_score +
lf_score) / 5.0
......@@ -408,18 +408,18 @@ async def load_health_index(cid, point_id=None):
# score_info = await load_health_radar(cid, point_id)
score_info = await load_health_radar_new15(cid, point_id)
if score_info is None:
log.error("load_health_index fail")
return 0
v_score = score_info["v_score"]
freq_score = score_info["freq_score"]
if v_score <= 60 or freq_score <= 60:
# 电压偏差/频率偏差评分,有一个低于60分,则整体健康指数为0"v_score or freq_score less 60")
return 0
sub_dev = (1 - (v_score + freq_score) / 200.0) * 20
sub_lf = (1 - score_info["lf_score"] / 100.0) * 20
sub_costtl = (1 - score_info["costtl_score"] / 100.0) * 20
......@@ -441,7 +441,7 @@ async def load_manage_health_radar(cids, recent_days=30):
# if json_score:
# score_info = json.loads(json_score)
# return score_info
# 计算最近30天时间起始
today =
start_time = today.subtract(days=recent_days).format("YYYY-MM-DD HH:mm:ss")
......@@ -464,7 +464,7 @@ async def load_manage_health_radar(cids, recent_days=30):
datas = await conn.fetchall(sql,
args=(all_point_ids, start_time, end_time))
data_map = {i['pid']: i for i in datas}
# 单独计算每个公司的健康指数
company_score_map = {}
for cid in cids:
......@@ -487,7 +487,7 @@ async def load_manage_health_radar(cids, recent_days=30):
# 1. 电压偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -508,11 +508,11 @@ async def load_manage_health_radar(cids, recent_days=30):
score = get_dev_score(dev_type="v", cur=v_dev)
if score is None:
total_score += score
total += 1
v_score = total_score / total if total else 100
# 2. 频率偏差评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -522,7 +522,7 @@ async def load_manage_health_radar(cids, recent_days=30):
freq_mean = data_point_map.get("freq_mean")
if freq_mean is None:
freq_dev = freq_mean - FREQ_STANDARD
score = get_dev_score(dev_type="freq", cur=freq_dev)
if score is None:
......@@ -530,7 +530,7 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
freq_score = total_score / total if total else 100
# 3. 三相[电压]不平衡评分
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -546,7 +546,7 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
ubl_score = total_score / total if total else 100
# 4. 功率因数:有进线级功率因数时,只计算进线级功率因数
total, total_score = 0, 0
if inline_point_ids:
......@@ -566,7 +566,7 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
costtl_score = total_score / total if total else 100
# 4.(电压)谐波畸变率
# 电压谐波畸变:只计算三表法计量点,如果所有监测点都是二表法,则取其他所有指标均值
total, total_score = 0, 0
......@@ -583,7 +583,7 @@ async def load_manage_health_radar(cids, recent_days=30):
total_score += score
total += 1
thdu_score = total_score / total if total else 100
# 5. 负载率
total, total_score = 0, 0
for point_id in inline_point_ids + point_ids:
......@@ -614,7 +614,7 @@ async def load_manage_health_radar(cids, recent_days=30):
if not thdu_score:
thdu_score = (
v_score + freq_score + ubl_score + costtl_score + lf_score) / 5.0
company_score_map[cid] = {
"v_score": v_score,
"freq_score": freq_score,
......@@ -639,7 +639,7 @@ async def load_manage_health_index(company_score_info):
log.error(f"cid = {cid}load_health_index fail")
company_index_map[cid] = 0
v_score = score_info["v_score"]
freq_score = score_info["freq_score"]
if v_score <= 60 or freq_score <= 60:
......@@ -647,7 +647,7 @@ async def load_manage_health_index(company_score_info):"cid = {cid} v_score or freq_score less 60")
company_index_map[cid] = 0
sub_dev = (1 - (v_score + freq_score) / 200.0) * 20
sub_lf = (1 - score_info["lf_score"] / 100.0) * 20
sub_costtl = (1 - score_info["costtl_score"] / 100.0) * 20
import pendulum
from pot_libs.logger import log
from unify_api.modules.common.dao.common_dao import mid_by_pid, meter_by_mid, \
meter_param_by_mid, change_sensor_by_location, get_point_monitor_dao, \
from unify_api.modules.common.dao.common_dao import (
get_point_monitor_dao, get_location_monitor_dao
from uuid import uuid4
async def get_point_soe_config(event_data, with_params=False):
with_params: 用于同步配置参数信息,保证adio事件可以得到正确的数值
point_id = event_data.get('point_id')
event_settings = {k: v for k, v in event_data.items() if k in [
'threshold', 'duration', 'enable'
# 根据pid获取mid
mid_dic = await mid_by_pid(point_id)
mid = mid_dic.get("mid")
if mid is None:'=======no mid of point id: {point_id}')
return None, None
meter_dic = await meter_by_mid(mid)
sid = meter_dic.get("sid")
meter_sn = meter_dic.get("meter_no")
config = {
'soe': {
"electric": {
meter_sn: {event_data['event_type']: event_settings}}}
if with_params:
params = await meter_param_by_mid(mid)
params = {k: v for k, v in params.items() if v is not None}
params['point_id'] = point_id
config['params'] = {'electric': {meter_sn: params}}
return sid, config
async def get_point_soe_config_new15(event_data, with_params=False):
point_id = event_data.get('point_id')
event_settings = {k: v for k, v in event_data.items() if k in [
'threshold', 'duration', 'enable'
......@@ -50,7 +16,9 @@ async def get_point_soe_config_new15(event_data, with_params=False):
config = {
'soe': {
"electric": {
meter_sn: {event_data['event_type']: event_settings}}}
meter_sn: {event_data['event_type']: event_settings}
if with_params:
keys_li = ["ctr", "ptr", "ctnum", "vc", "tc", "imax"]
......@@ -62,29 +30,6 @@ async def get_point_soe_config_new15(event_data, with_params=False):
async def get_location_soe_config(event, with_params):
location_id = event.get('location_id')
event_settings = {k: v for k, v in event.items() if k in [
'threshold', 'duration', 'enable'
sensor_dic = await change_sensor_by_location(location_id)
sid = sensor_dic.get("sid")
field = sensor_dic.get("field")
except:'=======no sid of location: {location_id}')
return None, None
if field == "residual_current":
config = {'soe': {"adio": {event['event_type']: event_settings}}}
config = {
'soe': {"adio": {field: {event['event_type']: event_settings}}}}
if with_params:
config['params'] = {'adio': {field: {'location_id': location_id}}}
return sid, config
async def get_location_soe_config_new15(event, with_params):
location_id = event.get('location_id')
event_settings = {k: v for k, v in event.items() if k in [
'threshold', 'duration', 'enable'
......@@ -110,24 +55,19 @@ async def change_param_to_config(req_json, method):
sid, config = None, None
point_id = req_json.get("point_id")
if point_id is not None:
# sid, config = await get_point_soe_config(req_json, with_params=True)
sid, config = await get_point_soe_config_new15(req_json,
sid, config = await get_point_soe_config(req_json, with_params=True)
location_id = req_json.get('location_id')
if location_id is not None:
# sid, config = await get_location_soe_config(req_json,
# with_params=True)
sid, config = await get_location_soe_config_new15(req_json,
sid, config = await get_location_soe_config(req_json, with_params=True)
request_data = {}
if sid:
request_id = str(uuid4())
request_data = dict(
sid=sid, data=config)
request_data = dict(request_id=request_id,,
sid=sid, data=config)
return request_data
......@@ -142,12 +82,11 @@ async def switch_control_param_to_config(sid, field, switch,
if switch == "00":
switch = "10"
request_id = str(uuid4())
request_data = dict(
data={'command': command,
'params': {'field': field, 'value': switch}}
request_data = dict(request_id=request_id,,
data={'command': command,
'params': {'field': field, 'value': switch}}
return request_data
import pendulum
from pot_libs.logger import log
from unify_api.modules.common.dao.common_dao import mid_by_pid, meter_by_mid, \
meter_param_by_mid, change_sensor_by_location
from unify_api.modules.common.dao.common_dao import (
get_point_monitor_dao, get_location_monitor_dao
from uuid import uuid4
async def get_point_soe_config(event_data, with_params=False):
with_params: 用于同步配置参数信息,保证adio事件可以得到正确的数值
point_id = event_data.get('point_id')
event_settings = {k: v for k, v in event_data.items() if k in [
'threshold', 'duration', 'enable'
# 根据pid获取mid
mid_dic = await mid_by_pid(point_id)
mid = mid_dic.get("mid")
if mid is None:'=======no mid of point id: {point_id}')
return None, None
meter_dic = await meter_by_mid(mid)
sid = meter_dic.get("sid")
meter_sn = meter_dic.get("meter_no")
datas = await get_point_monitor_dao(point_id)
sid = datas.get("sid")
meter_sn = datas.get("meter_no")
config = {
'soe': {
"electric": {
meter_sn: {event_data['event_type']: event_settings}}}
"electric": {meter_sn: {event_data['event_type']: event_settings}}
if with_params:
params = await meter_param_by_mid(mid)
params = {k: v for k, v in params.items() if v is not None}
keys_li = ["ctr", "ptr", "ctnum", "vc", "tc", "imax"]
params = {k: v for k, v in datas.items() if k in keys_li
and (v is not None)}
params['point_id'] = point_id
config['params'] = {'electric': {meter_sn: params}}
return sid, config
......@@ -43,14 +32,9 @@ async def get_location_soe_config(event, with_params):
event_settings = {k: v for k, v in event.items() if k in [
'threshold', 'duration', 'enable'
sensor_dic = await change_sensor_by_location(location_id)
sid = sensor_dic.get("sid")
field = sensor_dic.get("field")
except:'=======no sid of location: {location_id}')
return None, None
datas = await get_location_monitor_dao(location_id)
sid = datas.get("sid")
field = datas.get("ad_field")
if field == "residual_current":
config = {'soe': {"adio": {event['event_type']: event_settings}}}
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import COMPANY_15MIN_POWER, SLOTS, SLOTS_15MIN
from unify_api.constants import SLOTS, SLOTS_15MIN
from unify_api.modules.elec_charge.components.elec_statistics_cps import \
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
from unify_api.utils.common_utils import round_2
from unify_api.utils.es_query_body import EsQueryBody, es_process, \
from unify_api.utils.es_query_body import es_process, sql_time_process
from unify_api.utils.time_format import time_pick_transf
async def proxy_today_yesterday_p(cid_list, start, end):
terms = {"cid": cid_list}
# 1.构造query_body
query = EsQueryBody(terms=terms, start=start, end=end,
date_key="quarter_time", size=0)
query_body = query.query()
query_body["aggs"] = {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": "15m",
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
"aggs": {
"p": {
"stats": {
"field": "p"
} + f"====={query_body}")
# 2.获取slots
intervel, slots = time_pick_transf(start, end)
# 3. 查询es
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
es_re = es_re["aggregations"]["quarter_time"]["buckets"]
if not es_re:
return SlotValue(slots=slots, value=[])
# 4.为了es结果和slots对应
es_re = es_process(es_re, fmat="HH:mm")
sv = SlotValue() # 今日负荷对象
sv.slots = slots
tmp_list = []
# 5.拼接返回
for slot in slots:
if slot in es_re:
# 1.每个时间点,电量信息
value = es_re[slot].get("p").get("sum")
# 值为0是正常数据
if value == 0:
tmp_list.append(value or "")
sv.value = tmp_list
return sv
async def proxy_today_yesterday_p_new15(cid_list, start, end):
async def proxy_today_yesterday_load(cid_list, start, end):
sql = f"""
select create_time,sum(p) as p from company_15min_power
......@@ -145,14 +87,14 @@ async def power_charge_p_proxy(cid_list, start, end, date_type, interval):
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_96, es_re_96_dic)
# 96个点,工厂电量电费
# 2. 24个点数据
slots_24 = SLOTS[date_type]
es_re_24 = await power_charge_p_aggs(start, end, cid_list, "hour")
es_re_24_dic = es_process(es_re_24, fmat="HH:mm",
time_key="create_time") # 为了es结果和slots对应
kwh_24, charge_24, p_24, price_24 = by_slots(slots_24, es_re_24_dic)
# elif date_type == "month":
# intervel, slots = time_pick_transf(start, end)
# es_re = es_process(es_re, fmat="MM-DD") # 为了es结果和slots对应
......@@ -20,7 +20,7 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
power_charge_p_aggs, power_charge_p_cid_aggs, histogram_aggs_points, \
from unify_api.modules.elec_charge.procedures.elec_statis_proxy_pds import \
proxy_today_yesterday_p_new15, by_slots
proxy_today_yesterday_load, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, \
from unify_api.utils.common_utils import process_es_data
......@@ -56,9 +56,9 @@ async def post_power_statis_proxy(req, body: StatisProxyReq) -> PcStatiResp:
end, date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
today_p = await proxy_today_yesterday_load(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p_new15(cid_list, ysd_start,
yesterday_p = await proxy_today_yesterday_load(cid_list, ysd_start,
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
......@@ -97,7 +97,7 @@ async def post_power_statis_proxy(req, body: StatisProxyReq) -> PcStatiResp:
kwh_sv, charge_sv = await load_power_charge(cid_list, point_id,
start, end, date_type)
# 负荷曲线
this_p = await proxy_today_yesterday_p_new15(cid_list, start,
this_p = await proxy_today_yesterday_load(cid_list, start,
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
......@@ -365,9 +365,9 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
today_p = await proxy_today_yesterday_load(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
yesterday_p = await proxy_today_yesterday_load(cid_list,
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
......@@ -37,61 +37,7 @@ from unify_api.utils.taos_new import parse_td_columns, get_td_table_name, \
td3_tbl_compate, get_td_engine_data
async def other_info(company_id):
alarm_sql = f"""
DATE(pevent.event_datetime) event_date,
COUNT(*) event_count
point_1min_event pevent
cid = %s
now_time =
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select create_time from company where cid = %s"
alarm_data = await conn.fetchall(alarm_sql, (company_id,))
company = await conn.fetchone(company_sql, (company_id,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
today_alarm_count = 0
alarm_count = 0
if not alarm_data:
"No alarm data %s" % (company_id)
# 1. 增加逻辑,新增工厂如果还没有事件产生
# 系统安全运行天数: 当前时间 - 工厂安装时间 + 1
safe_run_days = (now_time - create_time).days + 1
return today_alarm_count, safe_run_days, alarm_count
# 5. 构造返回
# 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2
total_days = (now_time - create_time).days + 2
has_alarm_days = 0
for data in alarm_data:
if data["event_date"].strftime("%Y-%m-%d") == str(now_time)[:10]:
today_alarm_count += data["event_count"]
if data["event_count"] != 0:
# 没有报警,看做是安全运行了,统计累计安全运行的天数
has_alarm_days += 1
alarm_count += data["event_count"]
safe_run_days = total_days - has_alarm_days
f"today_alarm_count={today_alarm_count} safe_run_days={safe_run_days}")
return today_alarm_count, safe_run_days, alarm_count
async def other_info_new15(cid):
async def other_info(cid):
today_alarm_count, alarm_count, has_alarm_days = 0, 0, 0
sql = "SELECT DATE_FORMAT(event_datetime,'%%Y-%%m-%%d') event_date, " \
"count(id) doc_count FROM `point_1min_event` where cid=%s " \
import json
import time
import re
import pendulum
from unify_api.modules.electric.dao.electric_dao import \
from unify_api.modules.common.dao.common_dao import monitor_by_cid
from unify_api.utils.common_utils import round_2
from pot_libs.logger import log
from pot_libs.settings import SETTING
from pot_libs.aredis_util.aredis_utils import RedisUtils
from unify_api.modules.electric.procedures.electric_util import get_wiring_type
from unify_api.modules.home_page.procedures import point_inlines
from unify_api.modules.home_page.procedures.dev_grade import get_dev_grade
from unify_api.utils import time_format
from unify_api.modules.electric.views.electric import (
from unify_api.constants import VOLTAGE_STANDARD, REAL_EXP_TIME, \
from pot_libs.utils.exc_util import ParamException, DBException, \
from unify_api.constants import REAL_EXP_TIME
from pot_libs.utils.exc_util import ParamException, BusinessException
from unify_api.modules.home_page.components.health_index import \
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import \
from unify_api.modules.common.service.td_engine_service import \
......@@ -33,155 +20,7 @@ from unify_api.utils.taos_new import parse_td_columns, td3_tbl_compate, \
async def health_ctl_rate_service(cid):
if cid <= 0:
log.error("param error")
raise ParamException(message="参数错误, cid参数必须是一个正整数!")
# 获取工厂所有监测点数据
point_infos = await point_inlines.get_all_points(cid)
if not point_infos:
log.error("cid:%s no point data" % cid)
raise BusinessException(message="工厂没有任何监测点!")
point_ids = point_infos.keys()
stats = {"lf": 0, "costtl": 0, "freq_dev": 0, "thdu": 0, "v_dev": 0,
"ubl": 0}
now_ts = int(time.time())
real_tt = now_ts
valid_mids = []
total = 0
for point_id in point_ids:
ctnum, mid = await get_wiring_type(point_id)
if not mid:
# 换表: point_id 有效, mid(拆掉)为None的情况
log.warn(f"cid={ctnum} point_id={point_id} 已经换表了")
res = None
result = await RedisUtils().hget(METERDATA_CURRENT_KEY, mid)
result2 = await RedisUtils().hget(METERDATA_CURRENT_HR_KEY, mid)
if result and result2:
res = json.loads(result)
res2 = json.loads(result2)
real1_tt = res.get("timestamp")
real_tt = res2.get("timestamp")
if (
and real_tt
and now_ts - real1_tt <= REAL_EXP_TIME
and now_ts - real_tt <= REAL_EXP_TIME
for k in res2.keys():
if not k in res.keys():
res[k] = res2[k]
"1 realtime_power_qual of mid %s is expired." % mid)
elif result2:
res = json.loads(result2)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= REAL_EXP_TIME:
res = res
"2 realtime_power_qual of mid %s is expired." % mid)
elif result:
res = json.loads(result)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= REAL_EXP_TIME:
res = res
"3 realtime_power_qual of mid %s is expired." % mid)
log.error("realtime_power_quality not exist")
except Exception as e:
raise DBException
total += 1
if res:
if ctnum != res.get("ctnum"):"res = {res}")
f"health_ctl_rate mysql ctnum={ctnum} payload ctnum={res.get('ctnum')} point_id={point_id} ctnum wrong!"
# 如果发现配置错误的点,以报文的配置为准
ctnum = res.get("ctnum")
# 电压偏差
v_dev = res.get("ua_dev") if ctnum == 3 else res.get("uab_dev")
grade = get_dev_grade(dev_type="v", cur=v_dev)
if grade and grade >= 60:
stats["v_dev"] += 1
# 频率偏差
freq_dev = res.get("freq_dev")
grade = get_dev_grade(dev_type="freq", cur=freq_dev)
if grade and grade >= 60:
stats["freq_dev"] += 1
# 三相电压不平衡度
ubl = res.get("ubl")
grade = get_dev_grade(dev_type="ubl", cur=ubl)
if grade and grade >= 60:
stats["ubl"] += 1
# 功率因数
costtl = res.get("costtl")
grade = get_dev_grade(dev_type="costtl", cur=costtl)
if grade and grade >= 60:
stats["costtl"] += 1
# (电压)谐波畸变率
thdu = res.get("thdua") if ctnum == 3 else res.get("thduab")
grade = get_dev_grade(dev_type="thdu", cur=thdu)
if grade and grade >= 60:
stats["thdu"] += 1
# 负载率
lf = res.get("lf")
if lf is None:
stats["lf"] += 1
grade = get_dev_grade(dev_type="lf", cur=lf)
if grade and grade >= 60:
stats["lf"] += 1
# 没有数据指标可以知道这个是否合格的时候
stats["v_dev"] += 1
stats["freq_dev"] += 1
stats["ubl"] += 1
stats["costtl"] += 1
stats["thdu"] += 1
stats["lf"] += 1
total = 1.0 * total
time_str = time_format.get_datetime_str(real_tt)
if not valid_mids or total == 0:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
return HealthCtlRateRes(
lf=stats["lf"] / total,
costtl=stats["costtl"] / total,
thdu=stats["thdu"] / total,
v_dev=stats["v_dev"] / total,
freq_dev=stats["freq_dev"] / total,
ubl=stats["ubl"] / total,
async def health_ctl_rate_service_new15(cid):
async def health_ctl_rate_srv(cid):
if cid <= 0:
log.error("param error")
raise ParamException(message="参数错误, cid参数必须是一个正整数!")
......@@ -210,10 +49,8 @@ async def health_ctl_rate_service_new15(cid):
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
return HealthCtlRateRes(real_time=time_str, lf=1, costtl=1, thdu=1,
v_dev=1, freq_dev=1, ubl=1)
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
......@@ -225,11 +62,8 @@ async def health_ctl_rate_service_new15(cid):
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1,
return HealthCtlRateRes(real_time=time_str, lf=1, costtl=1, thdu=1,
v_dev=1, freq_dev=1, ubl=1)
head = parse_td_columns(results)
datas = []
......@@ -275,10 +109,8 @@ async def health_ctl_rate_service_new15(cid):
if grade and grade >= 60:
stats["lf"] += 1
if total == 0:
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
return HealthCtlRateRes(real_time=time_str, lf=1, costtl=1, thdu=1,
v_dev=1, freq_dev=1, ubl=1)
return HealthCtlRateRes(
lf=round_2(stats["lf"] / total),
......@@ -20,7 +20,7 @@ from unify_api.modules.home_page.components.count_info_proxy_cps import \
HsiResp, AiiResp
from unify_api.modules.home_page.procedures.count_info_pds import (
get_max_aiao_of_filed, normal_rate_of_location,
other_info, other_info_new15,
......@@ -62,7 +62,7 @@ async def post_count_info(request, body: CountInfoReq) -> CountInfoResp:
# 今日报警数和累计安全运行天数
# today_alarm_count, safe_run_days, alarm_count = await other_info(
# company_id)
today_alarm_count, safe_run_days, alarm_count = await other_info_new15(
today_alarm_count, safe_run_days, alarm_count = await other_info(
# 实时负荷和近30日最高负荷
......@@ -3,13 +3,13 @@
from pot_libs.logger import log
from pot_libs.sanic_api import summary, description
from pot_libs.utils.exc_util import ParamException, DBException, BusinessException
from pot_libs.utils.exc_util import ParamException
from unify_api.modules.common.procedures import health_score
from unify_api.modules.common.procedures.cids import get_cids, get_proxy_cids
from unify_api.modules.common.procedures.health_score import load_manage_health_radar
from unify_api.modules.common.procedures.cids import get_proxy_cids
from unify_api.modules.common.procedures.health_score import \
from unify_api.modules.home_page.service.health_index_service import \
health_ctl_rate_service, health_ctl_rate_service_new15
from unify_api.modules.home_page.components.health_index import (
......@@ -74,12 +74,13 @@ async def post_health_radar(req, body: QueryDetails) -> HealthRadarResp:
async def post_health_ctl_rate(req, body: QueryDetails) -> HealthCtlRateRes:
cid = body.cid
# return await health_ctl_rate_service(cid)
return await health_ctl_rate_service_new15(cid)
return await health_ctl_rate_srv(cid)
async def post_manage_health_index(req, body: ManageHealthIndexReq) -> ManageHealthIndexResp:
async def post_manage_health_index(req,
body: ManageHealthIndexReq) -> ManageHealthIndexResp:
product = body.product
user_id = req.ctx.user_id
......@@ -90,7 +91,8 @@ async def post_manage_health_index(req, body: ManageHealthIndexReq) -> ManageHea
if not cids:
return ManageHealthIndexResp(**cid_map)
company_score_map = await health_score.load_manage_health_radar(cids)
company_index_map = await health_score.load_manage_health_index(company_score_map)
company_index_map = await health_score.load_manage_health_index(
for cid, health_index in company_index_map.items():
health_index = round(health_index)
......@@ -3,7 +3,7 @@ from datetime import datetime
from unify_api.modules.common.dao.common_dao import monitor_by_cid, \
company_by_cids, monitor_point_storey_join, company_by_cid, \
detection_point_by_cid, monitor_page_by_cid, start_time_by_cids, \
tcs_runtime_by_cids, meter_by_mids, item_by_mitd_dao
tcs_runtime_by_cids, item_by_mitd_dao
from unify_api.modules.common.procedures.points import point_to_mid
from unify_api.modules.product_info.components.hardware_cps import HisResp, \
HlsResp, HardwareInfoManResq, HardwareInfoListResq
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment