Commit 677ae78f authored by wang.wenrong's avatar wang.wenrong

Merge branch 'wwr' into 'develop'

del_es_dao

See merge request !38
parents d01ffbff 438e22c3
...@@ -190,6 +190,20 @@ async def get_mtid_by_pid_dao(pid): ...@@ -190,6 +190,20 @@ async def get_mtid_by_pid_dao(pid):
return data return data
async def get_mtids_by_pids_dao(pid):
sql = f"""
SELECT
mtid
FROM
point
WHERE
pid in %s
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(pid,))
return data
async def get_point_monitor_dao(id_value, field="m.mtid"): async def get_point_monitor_dao(id_value, field="m.mtid"):
sql = f"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc," \ sql = f"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc," \
f"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \ f"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \
......
...@@ -112,202 +112,6 @@ async def get_point_chart_data(point_id, date_start, date_end, intervel, ...@@ -112,202 +112,6 @@ async def get_point_chart_data(point_id, date_start, date_end, intervel,
return power, i, u, ctnum return power, i, u, ctnum
async def get_adio_info_data(location_group, location_info, start_timestamp,
end_timestamp):
'''
获取环境相关数据
'''
range = Range(field="time", start=start_timestamp, end=end_timestamp)
# 分别统计各个location温度最大值、最小值、平均值
stats_info = {}
for location_id in location_group:
equal = Equal(field="location_id", value=location_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[],
keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
query_body = EsQuery.aggr_index(
page_request, stats_items=["value_max", "value_min", "value_avg"]
)
aggregations = await get_es_aiao_15min_data(query_body)
# 最大值, 这里叫法有点奇怪,但是最大值应该取15min的最大值聚合结果
max_info = aggregations.get("value_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max = source.get("value_max", 0)
max = round(max, 2) if max is not None else ""
max_ts = source.get("value_max_time", 0)
max_value_time = str(time_format.convert_to_dt(max_ts))
else:
max = ""
max_value_time = ""
# 最小值
min_info = aggregations.get("value_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min = source.get("value_min", 0)
min = round(min, 2) if min is not None else ""
min_ts = source.get("value_min_time", 0)
min_value_time = str(time_format.convert_to_dt(min_ts))
else:
min = ""
min_value_time = ""
avg = aggregations.get("value_avg_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
stats_info[location_id] = {
"max": {"value": max, "time": max_value_time},
"min": {"value": min, "time": min_value_time},
"avg": avg,
}
# 返回
adio_indexes = []
for location_id, info in location_info.items():
item, type = info["item"], info["type"]
# 漏电流的item更改一下
item = '漏电流' if type == 'residual_current' else item
_info = stats_info[location_id]
adio_index = Statistics(
type=type,
item=item,
max=_info["max"]["value"],
max_time=_info["max"]["time"],
min=_info["min"]["value"],
min_time=_info["min"]["time"],
avg=_info["avg"],
)
adio_indexes.append(adio_index)
return adio_indexes
async def get_point_info_data(point_id, start_time,
end_time):
# 2. 获取几表法
ctnum, _ = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(
f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum, 装置点已经被拆!")
# 给默认值3表法
ctnum = 3
range = Range(field="quarter_time", start=start_time,
end=end_time)
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if ctnum == 2:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"uab_mean",
"uab_min",
"uab_max",
"ucb_mean",
"ucb_min",
"ucb_max",
"ia_mean",
"ia_min",
"ia_max",
"ic_mean",
"ic_min",
"ic_max",
]
else:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"ua_mean",
"ua_min",
"ua_max",
"ub_mean",
"ub_min",
"ub_max",
"uc_mean",
"uc_min",
"uc_max",
"ia_mean",
"ia_min",
"ia_max",
"ib_mean",
"ib_min",
"ib_max",
"ic_mean",
"ic_min",
"ic_max",
]
query_body = EsQuery.aggr_index(page_request, stats_items=stats_items)
aggregations = await get_es_point_15min_data(query_body)
# 常规参数统计
common_indexes = []
_stats_items = {i.rsplit("_", 1)[0] for i in stats_items}
for item in _stats_items:
# 最大值
max_info = aggregations.get(f"{item}_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max_value = source.get(f"{item}_max", "")
max_dt = source.get(f"{item}_max_time")
if max_dt is None:
log.error(
f"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}")
max_value_time = str(
time_format.convert_to_dt(max_dt) if max_dt else "")
else:
max_value = ""
max_value_time = ""
# 最小值
min_info = aggregations.get(f"{item}_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min_value = source.get(f"{item}_min")
min_value = min_value if min_value is not None else ""
min_dt = source.get(f"{item}_min_time")
min_value_time = str(time_format.convert_to_dt(min_dt) if min_dt
else "")
else:
min_value = ""
min_value_time = ""
# 平均值
avg = aggregations.get(f"{item}_mean_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
elec_index = Statistics(
item=item,
max=max_value,
max_time=max_value_time,
min=min_value,
min_time=min_value_time,
avg=avg,
)
common_indexes.append(elec_index)
return common_indexes
async def electric_index_list_service(mtid, start_time, end_time, async def electric_index_list_service(mtid, start_time, end_time,
param_types=None): param_types=None):
""" """
......
from pot_libs.sanic_api import summary, examples from pot_libs.sanic_api import summary
from pot_libs.common.components.query import PageRequest
from pot_libs.logger import log from pot_libs.logger import log
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao
from unify_api.utils.request_util import filed_value_from_list
from unify_api.utils import time_format from unify_api.utils import time_format
from unify_api.modules.anshiu.components.fine_monitor_cps import ( from unify_api.modules.anshiu.components.fine_monitor_cps import (
FineMonitorChartReq, FineMonitorInfoReq, FineMonitorChartResp, FineMonitorChartReq, FineMonitorInfoReq, FineMonitorChartResp,
FineMonitorInfoResp, ElectricIndexListResp FineMonitorInfoResp
) )
from unify_api.modules.anshiu.procedures.fine_monitor_pds import ( from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_location_by_ids, get_threshold_by_location, get_mtid_by_location_ids get_mtid_by_location_ids
) )
from unify_api.modules.anshiu.service.fine_monitor_serv import ( from unify_api.modules.anshiu.service.fine_monitor_serv import (
get_adio_chart_data, get_point_chart_data, get_adio_info_data, get_adio_chart_data, get_point_chart_data,
get_point_info_data, electric_index_list_service electric_index_list_service
) )
...@@ -70,45 +67,6 @@ async def post_fine_monitor_chart(request, ...@@ -70,45 +67,6 @@ async def post_fine_monitor_chart(request,
ctnum=ctnum) ctnum=ctnum)
@summary("精细监测-指标统计1")
async def post_fine_monitor_info1(request,
body: FineMonitorInfoReq) -> FineMonitorInfoResp:
try:
date_start = body.start
date_end = body.end
# 获取监测点
point_id = body.pid
if not point_id or point_id <= 0:
raise Exception('point_error point_id:{}'.format(point_id))
# 获取location点
location_group = body.location_ids
if not location_group:
raise Exception('in_groups is NULL, no location_id')
except Exception as e:
log.error('get_fine_monitor_info ' + str(e))
return FineMonitorInfoResp.param_error()
# 获取location表的信息
try:
location_info = await get_location_by_ids(location_group)
except Exception as e:
log.error('get_fine_monitor_chart_error ' + e)
return FineMonitorChartResp.db_error()
info_list = []
# 环境相关数据
adio_list = await get_adio_info_data(location_group,
location_info, date_start,
date_end)
# 用电相关数据
point_list = await get_point_info_data(point_id, date_start,
date_end)
info_list.extend(adio_list)
info_list.extend(point_list)
return FineMonitorInfoResp(info_list=info_list)
@summary("精细监测-指标统计") @summary("精细监测-指标统计")
async def post_fine_monitor_info(request, async def post_fine_monitor_info(request,
body: FineMonitorInfoReq) -> FineMonitorInfoResp: body: FineMonitorInfoReq) -> FineMonitorInfoResp:
......
...@@ -47,7 +47,9 @@ async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp: ...@@ -47,7 +47,9 @@ async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp:
# 替换scope_g # 替换scope_g
if scope_g: if scope_g:
scope_g = ['200ms' if i == '0.2s' else i for i in scope_g] scope_g = ['200ms' if i == '0.2s' else i for i in scope_g]
rows, total = await search_scope_service(pids, cid, page_num, page_size, rows, total = await search_scope_service(pids, cid,
(page_num - 1) * page_size,
page_size,
start, end, scope_g) start, end, scope_g)
return ScopeListResp(rows=rows, total=total, page_num=page_num) return ScopeListResp(rows=rows, total=total, page_num=page_num)
......
...@@ -327,7 +327,7 @@ async def get_all_username(): ...@@ -327,7 +327,7 @@ async def get_all_username():
async def monitor_by_mtid(mtid): async def monitor_by_mtid(mtid):
sql = "select * from monitor where mtid = %s " sql = "select * from monitor where mtid = %s "
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
monitor_dic = await conn.fetchone(sql, args=(mtid, )) monitor_dic = await conn.fetchone(sql, args=(mtid,))
return monitor_dic return monitor_dic
...@@ -357,3 +357,11 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"): ...@@ -357,3 +357,11 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
result = await conn.fetchone(sql, (mtid,)) result = await conn.fetchone(sql, (mtid,))
return result return result
async def sql_point_15min_index_new15(start, end, pid):
sql = f"SELECT pttl_mean, create_time FROM `point_15min_electric` " \
f"where pid=%s and create_time BETWEEN '{start}' and '{end}'"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,))
return datas
This diff is collapsed.
# from unify_api.modules.common.dao.common_es_dao import \
# query_point_15min_index_aggs_pid
# from unify_api.modules.elec_charge.dao.elec_charge_dao import \ # from unify_api.modules.elec_charge.dao.elec_charge_dao import \
# query_charge_aggs_points # query_charge_aggs_points
# from unify_api.utils.time_format import start_end_date # from unify_api.utils.time_format import start_end_date
......
from unify_api.constants import SLOTS_15MIN, DUST_STATE from unify_api.constants import SLOTS_15MIN, DUST_STATE
from unify_api.modules.common.dao.common_dao import storey_pl_by_cid, \ from unify_api.modules.common.dao.common_dao import storey_pl_by_cid, \
storey_wp_by_cid storey_wp_by_cid
from unify_api.modules.common.dao.common_es_dao import \ from unify_api.modules.common.dao.common_dao import \
query_point_15min_index, sql_point_15min_index_new15 sql_point_15min_index_new15
from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.tsp_water.components.drop_dust_cps import DdwResp, \ from unify_api.modules.tsp_water.components.drop_dust_cps import DdwResp, \
DdResp, IrmResp, IosResp, ItiResp, WsStatiResp DdResp, IrmResp, IosResp, ItiResp, WsStatiResp
...@@ -108,7 +108,6 @@ async def post_drop_dust_wave_service(point_id, start, end): ...@@ -108,7 +108,6 @@ async def post_drop_dust_wave_service(point_id, start, end):
"""降尘措施-雾炮-运行曲线""" """降尘措施-雾炮-运行曲线"""
# 1. 获取聚合信息 # 1. 获取聚合信息
slots_list = SLOTS_15MIN slots_list = SLOTS_15MIN
# es_re = await query_point_15min_index(start, end, point_id)
sql_re = await sql_point_15min_index_new15(start, end, point_id) sql_re = await sql_point_15min_index_new15(start, end, point_id)
if not sql_re: if not sql_re:
return DdwResp(slots=[], value=[]) return DdwResp(slots=[], value=[])
......
from pot_libs.es_util.es_utils import EsUtil from pot_libs.es_util.es_utils import EsUtil
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao, \
get_sid_by_mtid_dao, get_mtids_by_pids_dao
from unify_api.utils.time_format import convert_es_str from unify_api.utils.time_format import convert_es_str
from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE
...@@ -90,61 +92,52 @@ async def get_search_scope(cid, pid, start, end): ...@@ -90,61 +92,52 @@ async def get_search_scope(cid, pid, start, end):
async def query_search_scope(cid, pid, page_num, page_size, async def query_search_scope(cid, pid, page_num, page_size,
start, end): start_time, end_time, scope_g):
query_body = { """
"from": (page_num - 1) * page_size, 查询录波列表
"size": page_size, """
"query": { if len(pid) > 1:
"bool": {
"must": [ mtid = get_mtids_by_pids_dao(pid)
{ else:
"term": { mtid = get_mtid_by_pid_dao(pid)
"mode.keyword": "scope" mtid = mtid['mtid']
} where = ""
} if start_time:
] where += f" and event_datetime >= '{start_time}'"
} if end_time:
}, where += f" and event_datetime <= '{end_time}'"
"sort": [ if mtid:
{ where += f" and pe.mtid={mtid}"
"datetime": {
"order": "desc" join_sql = ""
}
} sql = f"""
] SELECT
} pt.event_id doc_id,
if start and end: pt.event_datetime check_dt,
start_es = convert_es_str(start) pt.`name` point,
end_es = convert_es_str(end) pt.message message,
query_body["query"]["bool"]["must"].append( pe.scope_g scope_type
{ FROM
"range": { point_1min_event pt
"datetime": { LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid
"gte": start_es, AND pe.create_time = pt.event_datetime
"lte": end_es WHERE
} {where}
} pe.cid = 183
} AND pt.event_mode = '{scope_g}'
) AND pe.pid in (1463, 2248)
ORDER BY
pe.create_time DESC
LIMIT limit {page_num} offset {page_size} """
async with MysqlUtil() as conn:
if cid: if cid:
query_body["query"]["bool"]["must"].append( data = await conn.fetchall(sql, args=(cid, ))
{ else:
"terms": { data = await conn.fetchall(sql)
"cid": cid return data
}
}
)
if pid:
query_body["query"]["bool"]["must"].append(
{
"term": {
"point_id": pid
}
}
)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE)
return es_re
async def get_scope_pids(pids, start, end): async def get_scope_pids(pids, start, end):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment