Commit 6f542157 authored by wang.wenrong's avatar wang.wenrong

Merge branch 'wwr' into 'develop'

anshiU

See merge request !32
parents 04644eed 0e8c8368
from pot_libs.settings import SETTING
from unify_api.utils.common_utils import make_tdengine_data_as_list
from unify_api.utils.taos_new import get_td_table_name, get_td_engine_data
from unify_api.utils.exc_util import BusinessException
async def get_aiao_1min_data(monitor_info, start_time, end_time, su_table):
"""
:param monitor_info: pass
:param start_time: 开始时间
:param end_time: 结束时间
:param su_table: 超级表名
:return select_val对应在数据库中对应的值
"""
mtid = monitor_info["mtid"]
td_mt_table = get_td_table_name(su_table, mtid)
url = f"{SETTING.stb_url}db_adio"
# td的精度过高,采用 >= start and < end的形式查询
sql = f" select temp1, temp2, temp3, temp4 from {su_table} " \
f" where TBNAME = '{td_mt_table}' " \
f" and ts >= '{start_time}' AND ts <'{end_time}' "
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
raise BusinessException()
td_datas = make_tdengine_data_as_list(results)
if not td_datas:
return ""
return td_datas[0]
......@@ -2,6 +2,7 @@ from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.es_util.es_utils import EsUtil
from unify_api import constants
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_data
async def get_location_by_ids(location_ids):
......@@ -36,7 +37,7 @@ async def get_es_aiao_15min_data(query_body):
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body,
index=constants.LOCATION_15MIN_AIAO)
return es_results.get("aggregations", {})
......@@ -47,22 +48,8 @@ async def get_es_point_15min_data(query_body):
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body,
index=constants.POINT_15MIN_INDEX)
return es_results.get("aggregations", {})
async def get_es_aiao_1min_data(query_body, start):
'''
从es中获取环境相关数据(1min)
'''
async with EsUtil() as es:
# 环境相关qmin的数据需要分表查询
p_database = "poweriot_location_1min_aiao_" + start[:4] + "_" + \
str(int(start[5:7]))
es_results = await es.search_origin(body=query_body,
index=p_database)
return es_results.get("hits", {}).get("hits", {})
return es_results.get("aggregations", {})
async def get_es_point_1min_data(query_body, start):
......@@ -75,5 +62,5 @@ async def get_es_point_1min_data(query_body, start):
str(int(start[5:7]))
es_results = await es.search_origin(body=query_body,
index=p_database)
return es_results.get("hits", {}).get("hits", {})
......@@ -7,6 +7,7 @@ from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from pot_libs.common.components.query import PageRequest
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_data
from unify_api.utils import time_format
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type
......@@ -15,7 +16,7 @@ from unify_api.modules.anshiu.components.fine_monitor_cps import (
Statistics, Chart
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_es_aiao_1min_data, get_es_point_1min_data, get_es_point_15min_data,
get_es_point_1min_data, get_es_point_15min_data,
get_es_aiao_15min_data, get_threshold_by_location
)
from unify_api.utils.time_format import convert_timestamp_to_str
......@@ -25,19 +26,12 @@ async def get_adio_chart_data(location_group, location_info,
start_timestamp,
end_timestamp, intervel, slots):
'''
获取环境(温度与漏电流)的曲线数据 todo:es获取liudianliu
获取环境(温度与漏电流)的曲线数据
'''
# 工况标准,取其中一个漏电流阈值
residual_current_threhold = await get_threshold_by_location(
location_group)
# 组装es请求信息
range = Range(field="time", start=start_timestamp, end=end_timestamp)
in_group = InGroup(field="location_id", group=location_group)
filter = Filter(equals=[], ranges=[range], in_groups=[in_group],
keywords=[])
if intervel > 60:
# 取时间间隔为15min的数据
temp, res = await get_adio_15min_chart_data(
......@@ -50,7 +44,6 @@ async def get_adio_chart_data(location_group, location_info,
else:
# 取时间间隔为1min的数据
temp, res = await get_adio_1min_chart_data(
filter,
residual_current_threhold,
location_info,
slots,
......@@ -108,8 +101,7 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
date_type=date_type,
)
# 取最大值
his_data = [_data[slot].get('max', "") if slot in _data else "" for
slot in slots]
his_data = [_data[slot].get('max', "") if slot in _data else "" for slot in slots]
adio_his = Chart(item='', value_slots=his_data)
if item_info.get("type") == "residual_current":
......@@ -123,18 +115,12 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
return temperature_list, residual_currents_list
async def get_adio_1min_chart_data(filter, residual_current_threhold,
async def get_adio_1min_chart_data(residual_current_threhold,
location_info, slots, start_timestamp):
'''
获取1min环境的曲线数据
'''
sort = Sort(field='location_id', direction='asc')
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
query_body = EsQuery.query(page_request)
es_time = convert_timestamp_to_str(start_timestamp)
es_res = await get_es_aiao_1min_data(query_body, es_time)
data = await get_aiao_1min_data(monitor_info, start_time, end_time, su_table)
# 温度及漏电流
temperature_list = []
......
......@@ -50,7 +50,7 @@ async def post_fine_monitor_chart(request,
# 获取温度及漏电流数据
temperature_list, residual_currents_list = await get_adio_chart_data(
location_group, location_info, start_timestamp, end_timestamp,
location_group, location_info, date_start, date_end,
intervel, slots)
# 电力数据 power_list、电流曲线、电压曲线
......
......@@ -17,17 +17,12 @@ async def get_wiring_type(point_id):
if not point_id:
log.error("para error point_id:%s exception" % point_id)
return ctnum, mtid
# 根据point_id去change_meter_record查询最新的mid
sql = "SELECT mtid FROM point WHERE pid=%s "
sql = "SELECT mtid, ctnum FROM point WHERE pid=%s "
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, args=(point_id,))
if result:
mtid = result.get("mtid")
if not mtid:
return ctnum, mtid
mtid = result.get("mtid", None)
ctnum = result.get("ctnum", None)
return ctnum, mtid
......
......@@ -161,3 +161,22 @@ def correlation(l1, l2):
else:
log.info(f"求{l1}, {l2}相关系数{r}, 超出判断范围")
return "", ""
def make_tdengine_data_as_list(tdengine_data):
"""
将tdengine查询到的数据装换成 列表形式 列表中的数据为字典形式 {字段-字段值、}
适合处理 多表last_row数据
:param tdengine_data: {
"column_meta":[["key1", type, len],["key2", type, len],["key3", type, len]],
"data":[["val1","val2","id1"], ["val3","val4,"id2""]], ....}
:return: {
{"key1":"val1", "key2":"val2"}, {"key1":"val3", "key2":"val4"}
]
"""
head = [re.findall(r'last_row\((.*)\)', meta[0])[0] if "(" in meta[0] else
meta[0] for meta in tdengine_data["column_meta"]]
result = []
for res in tdengine_data["data"]:
result.append(dict(zip(head, res)))
return result
# -*- coding:utf-8 -*-
class DBException(Exception):
"""include all type db exception"""
status_code = 50001
message = "数据库操作失败!"
def __init__(self, *args, status_code=50001, message="数据库操作失败!", **kwargs):
self.status_code = status_code
self.message = message
class ParamException(Exception):
"""include all type param exception"""
status_code = 50003
message = "参数错误!"
def __init__(self, *args, status_code=50003, message="参数错误!", **kwargs):
self.status_code = status_code
self.message = message
class BusinessException(Exception):
"""
业务异常, 比如装置点被拆了mid为None这种情况, 但是在页面端point_id又可选择
"""
status_code = 50007
message = "业务错误!"
def __init__(self, *args, status_code=50007, message="业务错误!", **kwargs):
self.status_code = status_code
self.message = message
......@@ -10,16 +10,18 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_td_engine_data(url, sql):
if "?" not in url:
url += "?tz=Asia/Shanghai"
token = get_token()
log.info(f"token:{token},sql:{sql}")
resp_str, status = await AioHttpUtils().post_data(
url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
)
results = json.loads(resp_str)
log.info(f"resp_str:{resp_str},status:{status}")
if status != 200:
if results["code"] != 0:
return False, None
results = json.loads(resp_str)
return True, results
......@@ -111,7 +113,7 @@ def get_td_table_name(topic, id):
def td3_tbl_compate(td_tables):
if len(td_tables) > 1:
return tuple(td_tables)
return f"('{td_tables[0]}')"
......@@ -124,3 +126,35 @@ def parse_td_columns(rsp_data):
tbl_field = r[0] if r else col[0]
head.append(tbl_field)
return head
def get_td_table_name(topic, id):
"""
:param topic: 需要查询的主题
:param id: 表命名使用的id
:return:
"""
topic_map = {
"water": "water_ele_electric_ops%s",
"electric": "mt%s_ele_electric_ops",
"pv_ele": "pv_ele%s",
"pv_sts": "pv_sts%s",
"ws": "ws%s",
"adi": "mt%s_adi_electric_ops",
"pt_temp": "pt_temp%s_electric_ops", # 电容
"ent_guard_sts_stb": "ent_guard_sts%s", # 门控
"ent_guard_soe_stb": "ent_guard_soe%s", # 门控报警
"indoor_temp_stb": "indoor_temp%s", # 室内温度
"indoor_temp_soe_stb": "indoor_temp_soe%s", # 室内温度报警
"indoor_hum_stb": "indoor_hum%s", # 室内湿度
"indoor_hum_soe_stb": "indoor_hum_soe%s", # 室内湿度报警
"water_fld_sts_stb": "water_fld_sts%s", # 水浸
"water_fld_soe_stb": "water_fld_soe%s", # 水浸报警
"smoke_sts_stb": "smoke_sts_stb%s", # 烟感
"smoke_soe_stb": "smoke_soe_stb%s", # 烟感报警
"old_adio_stb": "mt%s_adi_electric_ops", # 安电--旧
"new_adio_stb": "mt%s_adi", # 安电——新
}
table_name = topic_map.get(topic)
return table_name % id
......@@ -25,22 +25,32 @@ def time_pick_transf(start, end, is_range=0):
# diff = (end_time - start_time).seconds
diff = end_f.int_timestamp - start_f.int_timestamp
# 1. 计算intervel
# 1.1 区间48小时之内, 返回15min
if diff <= 48 * 3600:
# 1.1 区间3小时之内, 返回1min
if diff <= 3 * 3600:
intervel = 60
# 1.2 区间48小时之内, 返回15min
elif diff <= 48 * 3600:
intervel = 15 * 60
# 1.2 区间在60天以内, 返回1day
elif 48 * 3600 < diff <= 60 * 86400:
# 1.3 区间在60天以内, 返回1day
elif diff <= 60 * 86400:
intervel = 86400
# 1.3 选择年, 返回1个月
# 1.4 选择年, 返回1个月
else:
intervel = 30 * 86400
# 2. 计算slots
# 2.1 取到点的个数, 比如15min的96个点
slots = []
slot_num = round((end_f.int_timestamp - start_f.int_timestamp) / intervel)
slot_num = round(
(end_f.int_timestamp - start_f.int_timestamp) / intervel)
for i in range(slot_num):
# 区间48小时之内
if diff < 24 * 3600:
# 区间3小时之内
if diff <= 3 * 3600:
dt = start_f.add(minutes=1 * i).format("YYYY-MM-DD HH:mm")
dt_str = str(dt).split()[1]
if is_range:
dt_str = str(dt)
# 区间24小时之内
elif diff < 24 * 3600:
dt = start_f.add(minutes=15 * i).format("YYYY-MM-DD HH:mm")
dt_str = str(dt).split()[1]
if is_range:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment