Commit 8dd52daa authored by peng.xiaozhe's avatar peng.xiaozhe

fix:scope-detail接口升级

parent d01ffbff
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_event_info_by_event_id(event_id):
"""获取报警信息"""
sql = f"""
SELECT
mtid, name, pid point_id, UNIX_TIMESTAMP(event_datetime) time,
cid, message, event_datetime datetime
FROM point_1min_event
WHERE event_id = "{event_id}"
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql)
return data if data else {}
async def get_scope_info_pds(mtid, create_time):
"""获取报警西南西"""
sql = f"""
SELECT
fault_type, create_time datetime, index_loc result, url context_url
FROM point_1min_scope
WHERE mtid = "{mtid}"
AND create_time = "{create_time}"
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql)
return data if data else {}
import datetime
import json import json
import io import io
import pandas as pd import pandas as pd
from pot_libs.qingstor_util.qs_client import QsClient
from procedures.scope_operations_pds import get_event_info_by_event_id, get_scope_info_pds
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin
from unify_api.modules.zhiwei_u import config from unify_api.modules.zhiwei_u import config
from unify_api.utils import time_format from unify_api.utils import time_format
...@@ -99,28 +103,27 @@ async def search_scope_service(prod_id, cid, pid, sid, page_num, page_size, ...@@ -99,28 +103,27 @@ async def search_scope_service(prod_id, cid, pid, sid, page_num, page_size,
return SearchScopeResq(total=total, rows=rows) return SearchScopeResq(total=total, rows=rows)
async def load_context(url):
async with QsClient() as client:
context_bytes = await client.get_file(key=url)
context = json.loads(context_bytes.decode("utf-8"))
return context
async def scope_detail_service(id, wave_range, download=None, user_id=None): async def scope_detail_service(id, wave_range, download=None, user_id=None):
# 1.根据es_id查询point_1min_event对应的point属性 # 1.根据es_id查询point_1min_event对应的point属性
es_id = Equal(field="_id", value=id)
filter = Filter(equals=[es_id], ranges=[], in_groups=[], keywords=[])
query_request = PageRequest(page_size=1, page_num=1, filter=filter,
sort=None)
query_body = EsQuery.query(query_request)
try: try:
async with EsUtil() as es: mysql_results = await get_event_info_by_event_id(id)
es_results = await es.search_origin( except Exception as e:
body=query_body, log.error(f"query error {e}")
index=config.SCOPE_DATABASE)
except:
log.error("es query error")
return ScopeDetailsResponse().db_error() return ScopeDetailsResponse().db_error()
try: try:
es_results = es_results["hits"]["hits"][0] source = mysql_results
source = es_results.get("_source", {})
point_name = source.get("name") point_name = source.get("name")
point_id = source.get("point_id") point_id = source.get("point_id")
mtid = source.get("mtid")
cmp_time = source.get("time") cmp_time = source.get("time")
cid = source.get("cid") cid = source.get("cid")
...@@ -132,37 +135,20 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None): ...@@ -132,37 +135,20 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None):
point = await select_sid_by_pid(point_id) point = await select_sid_by_pid(point_id)
except Exception: except Exception:
log.warning("can not find data on es(index: %s): %s" % ( log.warning(f"query err {id}")
config.POINT_1MIN_SCOPE, query_body))
raise DBException raise DBException
# 4.接线法:二表/三表 # 4.接线法:二表/三表
ctnum, _ = await get_wiring_type(point_id) ctnum, _ = await get_wiring_type(point_id)
if not ctnum: if not ctnum:
ctnum = 3 ctnum = 3
# 2.用point_1min_event的ponint_id、cmp_time查询point_1min_scope
point_equal = Equal(field="point_id", value=point_id)
time_equal = Equal(field="time", value=cmp_time)
filter = Filter(equals=[point_equal, time_equal], ranges=[], in_groups=[],
keywords=[])
query_request = PageRequest(page_size=1, page_num=1, filter=filter,
sort=None)
query_body = EsQuery.query(query_request)
try: try:
async with EsUtil() as es: mysql_results = await get_scope_info_pds(mtid, check_dt)
es_results = await es.search_origin(
body=query_body,
index=config.POINT_1MIN_SCOPE)
except Exception as e: except Exception as e:
log.error(f"es query error {e}") log.error(f"es query error {e}")
return ScopeDetailsResponse().db_error() return ScopeDetailsResponse().db_error()
if not es_results: if not mysql_results:
log.warning("can not find data on es(index: %s): %s" % (
config.POINT_1MIN_SCOPE, query_body))
return ScopeDetailsResponse().db_error()
if not es_results["hits"]["hits"]:
alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid, alarm = Alarm(check_dt=check_dt, pid=point_id, cid=cid,
point=point_name, sid=point["sid"], message=message, point=point_name, sid=point["sid"], message=message,
...@@ -180,12 +166,13 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None): ...@@ -180,12 +166,13 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None):
date_time="", date_time="",
contents={} contents={}
) )
es_results = es_results["hits"]["hits"][0] source = mysql_results
scope_type = es_results.get("_source", {}).get("fault_type") scope_type = source.get("fault_type")
fault_type = config.EVENT_TYPE_MAP.get(scope_type) fault_type = config.EVENT_TYPE_MAP.get(scope_type)
date_time = time_format.esstr_to_dtstr( # date_time = time_format.esstr_to_dtstr(
es_results["_source"].get("datetime")) # source.get("datetime"))
position = list(json.loads(es_results["_source"].get("result")).keys()) date_time = datetime.datetime.strftime(source.get("datetime"), "%Y-%m-%d %H:%M:%S")
position = list(json.loads(source.get("result")).keys())
position_tmp = [ position_tmp = [
pos + "相" if len(pos) == 1 else pos + "线" if len(pos) == 2 else '' for pos + "相" if len(pos) == 1 else pos + "线" if len(pos) == 2 else '' for
pos in position] pos in position]
...@@ -193,9 +180,9 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None): ...@@ -193,9 +180,9 @@ async def scope_detail_service(id, wave_range, download=None, user_id=None):
position_str = "|".join(position_tmp) position_str = "|".join(position_tmp)
# 3.曲线数据 # 3.曲线数据
res_dic = json.loads(es_results["_source"].get("result")) res_dic = json.loads(source.get("result"))
location = [d for d in res_dic.values()][0].get("location") location = [d for d in res_dic.values()][0].get("location")
wave_data = json.loads(es_results["_source"].get("context")) wave_data = await load_context(source.get("context_url"))
length = wave_data.get("ia") or wave_data.get("ileak_rms") length = wave_data.get("ia") or wave_data.get("ileak_rms")
contin_time = len(length) contin_time = len(length)
if wave_range == "100ms": if wave_range == "100ms":
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment