Commit d27c2ae6 authored by wang.wenrong's avatar wang.wenrong

2s scopedownloads

parent 347b59c5
......@@ -50,3 +50,29 @@ async def get_threshold_by_mtid(mtid):
async with MysqlUtil() as conn:
threshold = await conn.fetch_value(sql, args=(mtid,))
return threshold
async def get_scope_url_by_pid(mtid, start_dt, end_dt):
"""
获取录波详情
:param pid:
:param event_datetime:
:return:
"""
sql = f"""
SELECT
url,
DATE_FORMAT(create_time, '%Y-%m-%d %H:%i:%s') datetime
FROM
point_1min_scope
WHERE
create_time > '{start_dt}'
AND create_time < '{end_dt}'
AND mtid = {mtid}
AND scope_g = '2s'
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql,)
return result
import datetime
import json
import math
from pot_libs.qingstor_util.qs_client import QsClient
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeDetail
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao
from unify_api.modules.anshiu.dao.scope_operations_dao import \
get_scope_url_by_pid
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type_new15
from unify_api.utils.log_utils import LOGGER
async def get_scope_config_by_pid(pid):
......@@ -54,20 +66,38 @@ async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"):
"""
ES查询2s录波记录数据
"""
query_body = {
"size": 10000,
"query": {
"bool": {
"must": [
{"terms": {"point_id": pids}},
{"term": {"scope_g": {"value": scope_g}}},
{"range": {"datetime": {"gte": start_dt, "lt": end_dt}}}
]
}
},
"sort": [{"datetime": {"order": "asc"}}]
}
async with EsUtil() as es:
datas = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_SCOPE)
return datas["hits"]["hits"]
\ No newline at end of file
mtid = await get_mtid_by_pid_dao(pids)
mtid = mtid.get('mtid')
# 获取录波曲线数据
scope_url_data = await get_scope_url_by_pid(mtid, start_dt, end_dt)
scope_list = []
starttime_wm = datetime.datetime.now()
print(f'起始时间:{starttime_wm}')
for scope_data in scope_url_data:
try:
async with QsClient() as qs:
url = scope_data.get("url")
wave_data = await qs.get_object(url)
[scope_list.append({"datetime": scope_data['datetime'],
"ua": wave_data['ua'][i],
"ub": wave_data['ub'][i],
"uc": wave_data['uc'][i],
"ia": wave_data['ia'][i],
"ib": wave_data['ib'][i],
"ic": wave_data['ic'][i],
"lc": wave_data['lc'][i],
"pttl": wave_data['pttl'][i]}) for i in
range(len(wave_data.get('ua')))]
# for i in range(len(wave_data['ua'])):
# a_dict = dict(zip(wave_data.keys(),
# list(zip(*wave_data.values()))[i]))
# a_dict["datetime"] = scope_data['datetime']
# scope_list.append(a_dict)
except Exception as e:
LOGGER.error(f"scope_detail_service error message:{str(e)}")
return []
end_time_wm = datetime.datetime.now()
print(f'结束时间:{end_time_wm}')
return scope_list
......@@ -35,8 +35,6 @@ from unify_api.modules.electric.procedures.electric_util import \
from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope_pids, \
query_search_scope
from unify_api.modules.zhiwei_u.service.scope_operations_service import \
scope_detail_service
from unify_api.utils import time_format
from unify_api.utils.time_format import get_time_duration, \
get_current_datetime_str, convert_str_to_timestamp
......@@ -48,12 +46,10 @@ async def search_scope_service(pids, cid, page_num, page_size, start, end,
获取录波记录
'''
datas = await query_search_scope(cid, pids, page_num, page_size,
datas, total = await query_search_scope(cid, pids, page_num, page_size,
start, end, scope_g)
total = len(datas)
return datas, total
return datas, total["total"]
async def scope_list_download_data(pids, start, end):
......@@ -63,33 +59,8 @@ async def scope_list_download_data(pids, start, end):
duration = get_time_duration(start, end, False, False)
if duration > 24 * 60 * 60:
raise BusinessException(message='只能支持下载一天的录波数据')
start_dt = convert_to_es_str(start)
end_dt = convert_to_es_str(end)
datas = await get_scope_list_by_pid(pids, start_dt, end_dt)
scope_data = []
scope_fields = [
"ua", "ub", "uc", "ia", "ib", "ic", "pttl", "lc",
]
for hit in datas:
_source = hit["_source"]
context = json.loads(_source["context"])
# 只导出录波的数据
wave_data = context.get("ia")
if not wave_data:
continue
for i, _ in enumerate(wave_data):
datetime = _source["datetime"][:19].replace('T', ' ')
one_scope_data = {}
for field in scope_fields:
if field in context:
value = context.get(field)[i]
value = "" if pd.isna(value) else value
else:
value = None
one_scope_data[field] = value
item = ScopeItemDownload(datetime=datetime, **one_scope_data)
scope_data.append(item)
return scope_data
datas = await get_scope_list_by_pid(pids, start, end)
return datas
async def scope_detail_data(doc_id):
......
......@@ -139,11 +139,23 @@ async def query_search_scope(cid, pid, page_num, page_size,
pe.create_time DESC
LIMIT {page_num} , {page_size} """
total_sql = f"""
SELECT
count(*) total
FROM
point_1min_event pt
LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid
AND pe.create_time = pt.event_datetime
WHERE
pt.event_mode = 'scope'
{where}
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
total = await conn.fetchone(total_sql)
return data
return data, total
async def get_scope_pids(pids, start, end):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment