Commit 444ec2cc authored by wang.wenrong's avatar wang.wenrong

Merge branch 'wwr' into 'develop'

安识U-精细监测

See merge request !34
parents 4a5aa641 7cbc839a
......@@ -2,27 +2,175 @@ from pot_libs.settings import SETTING
from unify_api.utils.common_utils import make_tdengine_data_as_list
from unify_api.utils.taos_new import get_td_table_name, get_td_engine_data
from unify_api.utils.exc_util import BusinessException
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_aiao_1min_data(monitor_info, start_time, end_time, su_table):
async def get_aiao_1min_dao(mtid, start_time, end_time):
# 查1min温度漏电流
sid_data = await get_sid_by_mtid_dao(mtid)
sid = sid_data['sid'].lower()
su_table = "new_adio_stb"
td_mt_table = get_td_table_name(su_table, mtid)
url = f"{SETTING.stb_url}db_adio"
# td的精度过高,采用 >= start and < end的形式查询
sql = f"select last_row( ts, temp1, temp2, temp3, temp4, residual_current) " \
f"from adio_stb where TBNAME = '{td_mt_table}' and " \
f"ts >= '{start_time}' AND ts <'{end_time}' " \
f"Interval(60s) order by ts asc"
is_succ, results = await get_td_engine_data(url, sql)
if not results:
su_table = "old_adio_stb"
td_mt_table = get_td_table_name(su_table, sid)
sql = f"select last_row( ts, temp1, temp2, temp3, temp4, residual_current) " \
f"from adio_stb where TBNAME = '{td_mt_table}' and " \
f"ts >= '{start_time}' AND ts <'{end_time}' " \
f"Interval(60s) order by ts asc"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
raise BusinessException()
td_datas = make_tdengine_data_as_list(results)
if not td_datas:
return ""
return td_datas
async def get_sid_by_mtid_dao(mtid):
# 查15min温度漏电流
sql = f"""
SELECT
sid
FROM
monitor
WHERE
mtid = {mtid}
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, )
return data
async def get_aiao_15min_dao(mtid, start_time, end_time):
# 查15min温度漏电流
sql = f"""
SELECT
DATE_FORMAT(create_time,'%H:%i') create_time,
value_avg,
ad_field
FROM
location_15min_aiao
WHERE
create_time > "{start_time}"
AND create_time < "{end_time}"
AND mtid = {mtid}
"""
:param monitor_info: pass
:param start_time: 开始时间
:param end_time: 结束时间
:param su_table: 超级表名
:return select_val对应在数据库中对应的值
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
return data
async def get_aiao_1day_dao(mtid, start_time, end_time):
sql = f"""
SELECT
DATE_FORMAT(create_time,'%m-%d') create_time,
value_avg,
ad_field
FROM
location_1day_aiao
WHERE
create_time > "{start_time}"
AND create_time < "{end_time}"
AND mtid = {mtid}
"""
mtid = monitor_info["mtid"]
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
return data
async def get_point_1min_chart_dao(mtid, ctnum, start_time, end_time):
if ctnum == 2:
stats_items = [
"pttl",
"qttl",
"uab",
"ucb",
"ia",
"ic",
]
else:
stats_items = [
"pttl",
"qttl",
"ua",
"ub",
"uc",
"ia",
"ib",
"ic",
]
# 查1min温度漏电流
sid_data = await get_sid_by_mtid_dao(mtid)
sid = sid_data['sid'].lower()
su_table = "new_electric_stb"
td_mt_table = get_td_table_name(su_table, mtid)
url = f"{SETTING.stb_url}db_adio"
url = f"{SETTING.stb_url}db_electric"
# td的精度过高,采用 >= start and < end的形式查询
sql = f" select temp1, temp2, temp3, temp4 from {su_table} " \
f" where TBNAME = '{td_mt_table}' " \
f" and ts >= '{start_time}' AND ts <'{end_time}' "
stats_items.insert(0, 'ts')
sql = f"select last_row({','.join(stats_items)}) " \
f"from electric_stb where TBNAME = '{td_mt_table}' and " \
f"ts >= '{start_time}' AND ts <'{end_time}' " \
f"Interval(60s) order by ts asc"
is_succ, results = await get_td_engine_data(url, sql)
if not results:
su_table = "old_electric_stb"
td_mt_table = get_td_table_name(su_table, sid)
sql = f"select last_row( {','.join(stats_items)}) " \
f"from electric_stb where TBNAME = '{td_mt_table}' and " \
f"ts >= '{start_time}' AND ts <'{end_time}' " \
f"Interval(60s) order by ts asc"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
raise BusinessException()
td_datas = make_tdengine_data_as_list(results)
if not td_datas:
return ""
return td_datas[0]
return td_datas
async def get_point_15min_chart_dao(mtid, stats_items, date_start, date_end):
# 查15min温度漏电流
sql = f"""
SELECT
DATE_FORMAT(create_time,'%H:%i') create_time,
{','.join(stats_items)}
FROM
point_15min_electric
WHERE
create_time > "{date_start}"
AND create_time < "{date_end}"
AND mtid = {mtid}
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
return data
async def get_point_1day_chart_dao(mtid, stats_items, date_start, date_end):
sql = f"""
SELECT
DATE_FORMAT(create_time,'%m-%d') create_time,
{','.join(stats_items)}
FROM
point_1day_electric
WHERE
create_time > "{date_start}"
AND create_time < "{date_end}"
AND mtid = {mtid}
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
return data
......@@ -2,7 +2,6 @@ from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.es_util.es_utils import EsUtil
from unify_api import constants
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_data
async def get_location_by_ids(location_ids):
......@@ -16,6 +15,30 @@ async def get_location_by_ids(location_ids):
return location_info
async def get_mtid_by_location_ids(location_ids):
"""
根据location——id获取mtid
"""
async with MysqlUtil() as conn:
sql = f"""
SELECT
m.mtid,
m.`name`,
m.m_type,
l.lid,
l.item,
l.ad_type
FROM
monitor m
INNER JOIN location l ON m.mtid = l.mtid
WHERE
l.lid in {tuple(location_ids)}
AND m.demolished = 0
"""
result = await conn.fetchall(sql, )
return result
async def get_threshold_by_location(location_ids, type='overResidualCurrent',
default_threshold=30):
'''
......@@ -64,3 +87,243 @@ async def get_es_point_1min_data(query_body, start):
index=p_database)
return es_results.get("hits", {}).get("hits", {})
async def get_aiao_1min_pds(slots, temp_res_data, res_curr_th):
temp, res, s_value, a_value, b_value, c_value, n_value = [], [], {
"item": "漏电流", "threhold": res_curr_th}, {"item": "A相"}, {
"item": "B相"}, {
"item": "C相"}, {
"item": "N相"}
temp1, temp2, temp3, temp4, res_curr = {}, {}, {}, {}, {}
[temp1.update({i.get("ts"): i.get("temp1")}) for i in temp_res_data]
[temp2.update({i.get("ts"): i.get("temp2")}) for i in temp_res_data]
[temp3.update({i.get("ts"): i.get("temp3")}) for i in temp_res_data]
[temp4.update({i.get("ts"): i.get("temp4")}) for i in temp_res_data]
[res_curr.update({i.get("ts"): i.get("residual_current")}) for i in
temp_res_data]
a_slot, b_slot, c_slot, n_slot, res_slot = [], [], [], [], []
[a_slot.append(temp1.get(i, "")) for i in slots]
[b_slot.append(temp2.get(i, "")) for i in slots]
[c_slot.append(temp3.get(i, "")) for i in slots]
[n_slot.append(temp4.get(i, "")) for i in slots]
[res_slot.append(res_curr.get(i, "")) for i in slots]
a_value.update({"value_slots": a_slot})
b_value.update({"value_slots": b_slot})
c_value.update({"value_slots": c_slot})
n_value.update({"value_slots": n_slot})
s_value.update({"value_slots": res_slot})
temp.append(a_value)
temp.append(b_value)
temp.append(c_value)
temp.append(n_value)
res.append(s_value)
return temp, res
async def get_aiao_data_pds(slots, temp_res_data, res_curr_th):
temp, res, s_value, a_value, b_value, c_value, n_value = [], [], {
"item": "漏电流", "threhold": res_curr_th}, {"item": "A相"}, {
"item": "B相"}, {
"item": "C相"}, {
"item": "N相"}
temp1, temp2, temp3, temp4, res_curr = {}, {}, {}, {}, {}
for i in temp_res_data:
if i.get('ad_field') == 'temp1':
temp1.update({i.get("create_time"): i.get("value_avg")})
elif i.get('ad_field') == "temp2":
temp2.update({i.get("create_time"): i.get("value_avg")})
elif i.get('ad_field') == "temp3":
temp3.update({i.get("create_time"): i.get("value_avg")})
elif i.get('ad_field') == "temp4":
temp4.update({i.get("create_time"): i.get("value_avg")})
else:
res_curr.update({i.get("create_time"): i.get("value_avg")})
a_slot, b_slot, c_slot, n_slot, res_slot = [], [], [], [], []
[a_slot.append(temp1.get(i, "")) for i in slots]
[b_slot.append(temp2.get(i, "")) for i in slots]
[c_slot.append(temp3.get(i, "")) for i in slots]
[n_slot.append(temp4.get(i, "")) for i in slots]
[res_slot.append(res_curr.get(i, "")) for i in slots]
a_value.update({"value_slots": a_slot})
b_value.update({"value_slots": b_slot})
c_value.update({"value_slots": c_slot})
n_value.update({"value_slots": n_slot})
s_value.update({"value_slots": res_slot})
temp.append(a_value)
temp.append(b_value)
temp.append(c_value)
temp.append(n_value)
res.append(s_value)
return temp, res
async def get_point_1min_chart_pds(ctnum, slots, data):
if ctnum == 3:
i, v, power, ia_value, ib_value, ic_value, ua_value, ub_value, uc_value, pttl_value, qttl_value \
= [], [], [], {"item": "ia"}, {"item": "ib"}, {"item": "ic"}, {
"item": "ua"}, {"item": "ub"}, {"item": "uc"}, {"item": "pttl"}, {
"item": "qttl"}
ia_dict, ib_dict, ic_dict, pttl_dict, qttl_dict, ua_dict, ub_dict, uc_dict \
= {}, {}, {}, {}, {}, {}, {}, {}
[ia_dict.update({i.get("ts"): i.get("ia")}) for i in data]
[ib_dict.update({i.get("ts"): i.get("ib")}) for i in data]
[ic_dict.update({i.get("ts"): i.get("ic")}) for i in data]
[pttl_dict.update({i.get("ts"): i.get("pttl")}) for i in data]
[qttl_dict.update({i.get("ts"): i.get("qttl")}) for i in data]
[ua_dict.update({i.get("ts"): i.get("ua")}) for i in data]
[ub_dict.update({i.get("ts"): i.get("ub")}) for i in data]
[uc_dict.update({i.get("ts"): i.get("uc")}) for i in data]
ia_list, ib_list, ic_list, pttl_list, qttl_list, ua_list, ub_list, uc_list \
= [], [], [], [], [], [], [], []
[ia_list.append(ia_dict.get(i, "")) for i in slots]
[ib_list.append(ia_dict.get(i, "")) for i in slots]
[ic_list.append(ia_dict.get(i, "")) for i in slots]
[pttl_list.append(ia_dict.get(i, "")) for i in slots]
[qttl_list.append(ia_dict.get(i, "")) for i in slots]
[ua_list.append(ia_dict.get(i, "")) for i in slots]
[ub_list.append(ia_dict.get(i, "")) for i in slots]
[uc_list.append(ia_dict.get(i, "")) for i in slots]
ia_value.update({"value_slots": ia_list})
ib_value.update({"value_slots": ib_list})
ic_value.update({"value_slots": ic_list})
pttl_value.update({"value_slots": pttl_list})
qttl_value.update({"value_slots": qttl_list})
ua_value.update({"value_slots": ua_list})
ub_value.update({"value_slots": ub_list})
uc_value.update({"value_slots": uc_list})
i.append(ia_value)
i.append(ib_value)
i.append(ic_value)
v.append(ua_value)
v.append(ub_value)
v.append(uc_value)
power.append(pttl_value)
power.append(qttl_value)
else:
i, v, power, ia_value, ic_value, uab_value, ucb_value, pttl_value, qttl_value \
= [], [], [], {"item": "ia"}, {"item": "ic"}, {
"item": "uab"}, {"item": "ucb"}, {"item": "pttl"}, {"item": "qttl"}
ia_dict, ic_dict, pttl_dict, qttl_dict, uab_dict, ucb_dict, \
= {}, {}, {}, {}, {}, {}
[ia_dict.update({i.get("ts"): i.get("ia")}) for i in data]
[ic_dict.update({i.get("ts"): i.get("ic")}) for i in data]
[pttl_dict.update({i.get("ts"): i.get("pttl")}) for i in data]
[qttl_dict.update({i.get("ts"): i.get("qttl")}) for i in data]
[uab_dict.update({i.get("ts"): i.get("ua")}) for i in data]
[ucb_dict.update({i.get("ts"): i.get("ub")}) for i in data]
ia_list, ic_list, pttl_list, qttl_list, uab_list, ucb_list \
= [], [], [], [], [], []
[ia_list.append(ia_dict.get(i, "")) for i in slots]
[ic_list.append(ia_dict.get(i, "")) for i in slots]
[pttl_list.append(ia_dict.get(i, "")) for i in slots]
[qttl_list.append(ia_dict.get(i, "")) for i in slots]
[uab_list.append(ia_dict.get(i, "")) for i in slots]
[ucb_list.append(ia_dict.get(i, "")) for i in slots]
ia_value.update({"value_slots": ia_list})
ic_value.update({"value_slots": ic_list})
pttl_value.update({"value_slots": pttl_list})
qttl_value.update({"value_slots": qttl_list})
uab_value.update({"value_slots": uab_list})
ucb_value.update({"value_slots": ucb_list})
i.append(ia_value)
i.append(ic_value)
v.append(uab_value)
v.append(ucb_value)
power.append(pttl_value)
power.append(qttl_value)
return i, v, power
async def get_point_data_chart_pds(ctnum, slots, data):
if ctnum == 3:
i, v, power, ia_value, ib_value, ic_value, ua_value, ub_value, uc_value, pttl_value, qttl_value \
= [], [], [], {"item": "ia"}, {"item": "ib"}, {"item": "ic"}, {
"item": "ua"}, {"item": "ub"}, {"item": "uc"}, {"item": "pttl"}, {
"item": "qttl"}
ia_dict, ib_dict, ic_dict, pttl_dict, qttl_dict, ua_dict, ub_dict, uc_dict \
= {}, {}, {}, {}, {}, {}, {}, {}
[ia_dict.update({i.get("create_time"): i.get("ia_mean")}) for i in data]
[ib_dict.update({i.get("create_time"): i.get("ib_mean")}) for i in data]
[ic_dict.update({i.get("create_time"): i.get("ic_mean")}) for i in data]
[pttl_dict.update({i.get("create_time"): i.get("pttl_mean")}) for i in data]
[qttl_dict.update({i.get("create_time"): i.get("qttl_mean")}) for i in data]
[ua_dict.update({i.get("create_time"): i.get("ua_mean")}) for i in data]
[ub_dict.update({i.get("create_time"): i.get("ub_mean")}) for i in data]
[uc_dict.update({i.get("create_time"): i.get("uc_mean")}) for i in data]
ia_list, ib_list, ic_list, pttl_list, qttl_list, ua_list, ub_list, uc_list \
= [], [], [], [], [], [], [], []
[ia_list.append(ia_dict.get(i, "")) for i in slots]
[ib_list.append(ia_dict.get(i, "")) for i in slots]
[ic_list.append(ia_dict.get(i, "")) for i in slots]
[pttl_list.append(ia_dict.get(i, "")) for i in slots]
[qttl_list.append(ia_dict.get(i, "")) for i in slots]
[ua_list.append(ia_dict.get(i, "")) for i in slots]
[ub_list.append(ia_dict.get(i, "")) for i in slots]
[uc_list.append(ia_dict.get(i, "")) for i in slots]
ia_value.update({"value_slots": ia_list})
ib_value.update({"value_slots": ib_list})
ic_value.update({"value_slots": ic_list})
pttl_value.update({"value_slots": pttl_list})
qttl_value.update({"value_slots": qttl_list})
ua_value.update({"value_slots": ua_list})
ub_value.update({"value_slots": ub_list})
uc_value.update({"value_slots": uc_list})
i.append(ia_value)
i.append(ib_value)
i.append(ic_value)
v.append(ua_value)
v.append(ub_value)
v.append(uc_value)
power.append(pttl_value)
power.append(qttl_value)
else:
i, v, power, ia_value, ic_value, uab_value, ucb_value, pttl_value, qttl_value \
= [], [], [], {"item": "ia"}, {"item": "ic"}, {
"item": "uab"}, {"item": "ucb"}, {"item": "pttl"}, {"item": "qttl"}
ia_dict, ic_dict, pttl_dict, qttl_dict, uab_dict, ucb_dict, \
= {}, {}, {}, {}, {}, {}
[ia_dict.update({i.get("create_time"): i.get("ia_mean")}) for i in data]
[ic_dict.update({i.get("create_time"): i.get("ic_mean")}) for i in data]
[pttl_dict.update({i.get("create_time"): i.get("pttl_mean")}) for i in data]
[qttl_dict.update({i.get("create_time"): i.get("qttl_mean")}) for i in data]
[uab_dict.update({i.get("create_time"): i.get("ua_mean")}) for i in data]
[ucb_dict.update({i.get("create_time"): i.get("ub_mean")}) for i in data]
ia_list, ic_list, pttl_list, qttl_list, uab_list, ucb_list \
= [], [], [], [], [], []
[ia_list.append(ia_dict.get(i, "")) for i in slots]
[ic_list.append(ia_dict.get(i, "")) for i in slots]
[pttl_list.append(ia_dict.get(i, "")) for i in slots]
[qttl_list.append(ia_dict.get(i, "")) for i in slots]
[uab_list.append(ia_dict.get(i, "")) for i in slots]
[ucb_list.append(ia_dict.get(i, "")) for i in slots]
ia_value.update({"value_slots": ia_list})
ic_value.update({"value_slots": ic_list})
pttl_value.update({"value_slots": pttl_list})
qttl_value.update({"value_slots": qttl_list})
uab_value.update({"value_slots": uab_list})
ucb_value.update({"value_slots": ucb_list})
i.append(ia_value)
i.append(ic_value)
v.append(uab_value)
v.append(ucb_value)
power.append(pttl_value)
power.append(qttl_value)
return i, v, power
......@@ -7,7 +7,9 @@ from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from pot_libs.common.components.query import PageRequest
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_data
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_dao, \
get_aiao_1day_dao, get_aiao_15min_dao, get_point_15min_chart_dao, \
get_point_1day_chart_dao, get_point_1min_chart_dao
from unify_api.utils import time_format
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type
......@@ -17,148 +19,47 @@ from unify_api.modules.anshiu.components.fine_monitor_cps import (
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_es_point_1min_data, get_es_point_15min_data,
get_es_aiao_15min_data, get_threshold_by_location
get_es_aiao_15min_data, get_threshold_by_location, get_aiao_1min_pds,
get_aiao_data_pds, get_point_1min_chart_pds, get_point_data_chart_pds
)
from unify_api.utils.time_format import convert_timestamp_to_str
async def get_adio_chart_data(location_group, location_info,
start_timestamp,
end_timestamp, intervel, slots):
async def get_adio_chart_data(location_group, location_info, date_start,
date_end, intervel, slots):
'''
获取环境(温度与漏电流)的曲线数据
'''
temp, res = [], []
# 工况标准,取其中一个漏电流阈值
residual_current_threhold = await get_threshold_by_location(
res_curr_th = await get_threshold_by_location(
location_group)
if intervel > 60:
mtid = location_info[0]['mtid']
if 15 * 60 >= intervel > 60:
# 取时间间隔为15min的数据
temp, res = await get_adio_15min_chart_data(
filter,
residual_current_threhold,
location_info,
slots,
intervel
)
temp_res_data = await get_aiao_15min_dao(mtid, date_start, date_end)
temp, res = await get_aiao_data_pds(slots, temp_res_data, res_curr_th)
elif intervel == 86400:
# 取时间间隔为1day的数据
temp_res_data = await get_aiao_1day_dao(mtid, date_start, date_end)
temp, res = await get_aiao_data_pds(slots, temp_res_data, res_curr_th)
else:
# 取时间间隔为1min的数据
temp, res = await get_adio_1min_chart_data(
residual_current_threhold,
location_info,
slots,
start_timestamp
)
return temp, res
async def get_adio_15min_chart_data(filter, residual_current_threhold,
location_info, slots, intervel):
'''
获取15min环境的曲线数据
'''
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# 组装es请求信息
query_body = EsQuery.aggr_history(
page_request,
interval=intervel,
stats_items=[
"value_max",
]
)
# 分组字段
format_field = "location_id_{}_aggs"
for location_id, _ in location_info.items():
group_field = format_field.format(location_id)
query_body['aggs'][group_field] = {
"filter": {"term": {"location_id": location_id}},
"aggs": {
group_field: query_body['aggs']['aggs_name']
}
}
del query_body['aggs']['aggs_name']
aggs_res = await get_es_aiao_15min_data(query_body)
# 时间类型
date_type = "day" if intervel == 24 * 3600 else "minute"
# 温度及漏电流
temperature_list = []
residual_currents_list = []
for location_id, item_info in location_info.items():
group_field = format_field.format(location_id)
data_buckets = (
aggs_res.get(group_field, {}).get(group_field, {}).get("buckets",
[])
)
# 时间戳转换为时分并作为键值返回
_data = es_helper.process_es_aggs_aiao(
data_buckets, time_key="key", value_key="value_max",
date_type=date_type,
)
# 取最大值
his_data = [_data[slot].get('max', "") if slot in _data else "" for slot in slots]
adio_his = Chart(item='', value_slots=his_data)
if item_info.get("type") == "residual_current":
adio_his.item = "漏电流"
adio_his.threhold = residual_current_threhold
residual_currents_list.append(adio_his)
else:
adio_his.item = item_info.get("item", "")
temperature_list.append(adio_his)
return temperature_list, residual_currents_list
async def get_adio_1min_chart_data(residual_current_threhold,
location_info, slots, start_timestamp):
'''
获取1min环境的曲线数据
'''
data = await get_aiao_1min_data(monitor_info, start_time, end_time, su_table)
# 温度及漏电流
temperature_list = []
residual_currents_list = []
es_res = [v.get('_source') for v in es_res]
for location_id, item in groupby(es_res, key=itemgetter('location_id')):
# 转成数组的形式
item_dict = [value for value in item]
# 获取location类型
item_location_info = location_info.get(location_id, {})
# 时间戳转换为时分并作为键值返回
_data = es_helper.process_es_aggs_aiao(
item_dict, time_key="time", value_key="value",
date_type="minute",
)
his_data = [_data.get(slot, "") for slot in slots]
adio_his = Chart(item='', value_slots=his_data)
if item_location_info.get("type") == "residual_current":
adio_his.item = "漏电流"
adio_his.threhold = residual_current_threhold
residual_currents_list.append(adio_his)
else:
adio_his.item = item_location_info.get("item", "")
temperature_list.append(adio_his)
temp_res_data = await get_aiao_1min_dao(mtid, date_start, date_end)
temp, res = await get_aiao_1min_pds(slots, temp_res_data, res_curr_th)
return temperature_list, residual_currents_list
return temp, res
async def get_point_chart_data(point_id, start_timestamp, end_timestamp, \
intervel, slots):
async def get_point_chart_data(point_id, date_start, date_end, intervel,
slots):
'''
获取电气量
'''
# 获取当前监测点的接表法
ctnum, _ = await get_wiring_type(point_id)
ctnum, mtid = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(
......@@ -166,46 +67,6 @@ async def get_point_chart_data(point_id, start_timestamp, end_timestamp, \
# 返回的数值不在2,3中的,一般是装置点已经拆除。默认先给一个默认值3
ctnum = 3
if intervel > 60:
elec_data = await get_point_15min_chart_data(point_id,
ctnum,
slots,
start_timestamp,
end_timestamp,
intervel)
else:
elec_data = await get_point_1min_chart_data(point_id,
ctnum,
slots,
start_timestamp,
end_timestamp)
# 功率、电流、电压
power, i, u = [], [], []
for item, value_slots in elec_data.items():
data = Chart(item=item.replace("_mean", ""), value_slots=value_slots)
if item.startswith("p") or item.startswith("q"):
power.append(data)
elif item.startswith("i"):
i.append(data)
elif item.startswith("u"):
u.append((data))
return power, i, u, ctnum
async def get_point_15min_chart_data(point_id, ctnum, slots, start_timestamp,
end_timestamp, intervel):
'''
获取15min电气量
'''
es_start = my_pendulum.from_timestamp(start_timestamp)
es_end = my_pendulum.from_timestamp(end_timestamp)
range = Range(field="quarter_time", start=es_start, end=es_end)
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# 统计字段
if ctnum == 2:
stats_items = [
"pttl_mean",
......@@ -227,82 +88,21 @@ async def get_point_15min_chart_data(point_id, ctnum, slots, start_timestamp,
"ic_mean",
]
query_body = EsQuery.aggr_history_new(
page_request,
interval=intervel,
stats_items=stats_items,
histogram_field="quarter_time",
)
aggs_res = await get_es_point_15min_data(query_body)
data_buckets = aggs_res.get("aggs_name", {}).get("buckets", [])
if 15 * 60 >= intervel > 60:
elec_data = await get_point_15min_chart_dao(mtid, stats_items, date_start,
date_end)
power, i, u = await get_point_data_chart_pds(ctnum, slots, elec_data)
time_bucket_map = {i["key_as_string"]: i for i in data_buckets}
log.info(f"post_elec_history slots={slots} _data = "
f"{list(time_bucket_map.keys())}, query_body={query_body}")
elec_data = {stats_item: [] for stats_item in stats_items}
for slot in slots:
if slot in time_bucket_map:
for stats_item in stats_items:
# 获取平均值
value = time_bucket_map[slot].get(stats_item, {}).get("avg",
"")
elec_data[stats_item].append(value)
elif intervel == 86400:
elec_data = await get_point_1day_chart_dao(mtid, stats_items, date_start,
date_end)
power, i, u = await get_point_data_chart_pds(ctnum, slots, elec_data)
else:
for stats_item in stats_items:
elec_data[stats_item].append("")
return elec_data
elec_data = await get_point_1min_chart_dao(mtid, ctnum, date_start,
date_end)
power, i, u = await get_point_1min_chart_pds(ctnum, slots, elec_data)
async def get_point_1min_chart_data(point_id, ctnum, slots, start_timestamp,
end_timestamp):
'''
获取1min电气量
'''
range = Range(field="time", start=start_timestamp, end=end_timestamp)
equal = Equal(field="point_id", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
sort = Sort(field='time', direction='asc')
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
# 统计字段
if ctnum == 2:
stats_items = [
"pttl",
"qttl",
"uab",
"ucb",
"ia",
"ic",
]
else:
stats_items = [
"pttl",
"qttl",
"ua",
"ub",
"uc",
"ia",
"ib",
"ic",
]
query_body = EsQuery.query(page_request)
es_time = convert_timestamp_to_str(start_timestamp)
es_res = await get_es_point_1min_data(query_body, es_time)
_data = {}
for res in es_res:
_time = convert_timestamp_to_str(res.get('_source', {}).get('time', 0),
date_type='minute')
_data[_time] = res.get('_source')
elec_data = {}
for slot in slots:
for stats_item in stats_items:
data = _data.get(slot).get(stats_item, "") if slot in _data else ""
elec_data.setdefault(stats_item, []).append(data)
return elec_data
return power, i, u, ctnum
async def get_adio_info_data(location_group, location_info, start_timestamp,
......
......@@ -9,7 +9,7 @@ from unify_api.modules.anshiu.components.fine_monitor_cps import (
FineMonitorInfoResp
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_location_by_ids, get_threshold_by_location
get_location_by_ids, get_threshold_by_location, get_mtid_by_location_ids
)
from unify_api.modules.anshiu.service.fine_monitor_serv import (
get_adio_chart_data, get_point_chart_data, get_adio_info_data,
......@@ -43,7 +43,7 @@ async def post_fine_monitor_chart(request,
# 获取location表的信息
try:
location_info = await get_location_by_ids(location_group)
location_info = await get_mtid_by_location_ids(location_group)
except Exception as e:
log.error('get_fine_monitor_chart_error ' + e)
return FineMonitorChartResp.db_error()
......@@ -54,16 +54,19 @@ async def post_fine_monitor_chart(request,
intervel, slots)
# 电力数据 power_list、电流曲线、电压曲线
power_list, i_list, v_list, ctnum = await get_point_chart_data(point_id,
start_timestamp,
end_timestamp, \
intervel, slots)
power_list, i_list, v_list, ctnum = await get_point_chart_data(
point_id,
date_start,
date_end,
intervel,
slots)
# 获取温度与漏电流的曲线数据
# 获取用电数据
return FineMonitorChartResp(time_slots=slots,
temperature=temperature_list,
residual_current=residual_currents_list,
power=power_list, i=i_list, v=v_list,ctnum=ctnum)
power=power_list, i=i_list, v=v_list,
ctnum=ctnum)
@summary("精细监测-指标统计")
......
import argparse
from dataclasses import dataclass
import pendulum
from chinese_calendar import is_workday, is_holiday
from datetime import datetime
import re
......@@ -178,5 +180,7 @@ def make_tdengine_data_as_list(tdengine_data):
meta[0] for meta in tdengine_data["column_meta"]]
result = []
for res in tdengine_data["data"]:
res[0] = pendulum.parse(res[0]).format("HH:mm")
result.append(dict(zip(head, res)))
return result
......@@ -154,6 +154,8 @@ def get_td_table_name(topic, id):
"smoke_soe_stb": "smoke_soe_stb%s", # 烟感报警
"old_adio_stb": "s_%s_a", # 安电--旧 这里是%s是sid
"new_adio_stb": "mt%s_adi", # 安电——新
"old_electric_stb": "s_%s_a",
"new_electric_stb": "mt%s_ele",
}
table_name = topic_map.get(topic)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment