Commit af2f7607 authored by lcn's avatar lcn

bug修复

parent 21b257c1
...@@ -16,84 +16,11 @@ async def scope_by_sql(mid_sql): ...@@ -16,84 +16,11 @@ async def scope_by_sql(mid_sql):
return data return data
async def scope_by_es(cid, point_id, page_num, page_size, start, end): async def detail_data_by_es(pid, create_time):
query_body = { sql = f"select * from point_1min_scope where pid=%s and create_time=%s"
"from": (page_num - 1) * page_size, async with MysqlUtil() as conn:
"size": page_size, data = await conn.fetchone(sql, args=(pid, create_time))
"query": { return data
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"terms": {
"point_id": point_id
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
if start and end:
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body,
index="poweriot_point_1min_scope")
if es_re["hits"]:
total = es_re["hits"]["total"]
data = [get_source(hit) for hit in es_re['hits']['hits']]
else:
data, total = [], 0
return data, total
async def detail_data_by_es(scope_id):
query_body = {
"query": {
"bool": {
"must": [
{
"term": {
"_id": scope_id
}
}
]
}
}
}
try:
async with EsUtil() as es:
es_results = await es.search_origin(
body=query_body,
index="poweriot_point_1min_scope")
except:
log.error("es query error")
return ScopeDetailsResponse().db_error()
result = None
if es_results["hits"]["total"]:
result = es_results["hits"]["hits"][0]["_source"]
return result
def get_source(hit): def get_source(hit):
...@@ -102,29 +29,10 @@ def get_source(hit): ...@@ -102,29 +29,10 @@ def get_source(hit):
return result return result
async def event_data_by_es(scope_id): async def event_data_by_es(mtid, event_datetime, event_type):
query_body = { sql = f"select * from point_1min_event where mtid=%s and " \
"query": { f"event_datetime=%s and event_type=%s"
"bool": { async with MysqlUtil() as conn:
"must": [ data = await conn.fetchone(sql,
{ args=(mtid, event_datetime, event_type))
"term": { return data
"doc_id.keyword": scope_id
}
}
]
}
}
}
try:
async with EsUtil() as es:
es_results = await es.search_origin(
body=query_body,
index="poweriot_point_1min_event")
except:
log.error("es query error")
return ScopeDetailsResponse().db_error()
result = None
if es_results["hits"]["total"]:
result = es_results["hits"]["hits"][0]["_source"]
return result
...@@ -33,7 +33,7 @@ async def get_point_name_dao(pid): ...@@ -33,7 +33,7 @@ async def get_point_name_dao(pid):
async def get_trigger_params_dao(type_id): async def get_trigger_params_dao(type_id):
sql = "select name from event_type WHERE id=%s" sql = "select name from event_type WHERE e_type=%s"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(type_id, )) data = await conn.fetchone(sql, args=(type_id, ))
return data return data
import json import json
import logging import logging
from pot_libs.qingstor_util.qs_client import QsClient
from unify_api.utils import time_format from unify_api.utils import time_format
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import \ from unify_api.modules.zhiwei_u.dao.warning_operations_dao import \
select_point_dao select_point_dao
...@@ -8,9 +9,8 @@ from unify_api.modules.scope_analyse.dao.scope_record_mysql_dao import \ ...@@ -8,9 +9,8 @@ from unify_api.modules.scope_analyse.dao.scope_record_mysql_dao import \
get_mtid_dao, get_location_id_dao, get_threhold_dao, get_point_name_dao, \ get_mtid_dao, get_location_id_dao, get_threhold_dao, get_point_name_dao, \
get_trigger_params_dao get_trigger_params_dao
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFile from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFile
from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg
from unify_api.modules.scope_analyse.dao.scope_record_dao import \ from unify_api.modules.scope_analyse.dao.scope_record_dao import \
scope_by_es, detail_data_by_es, event_data_by_es, scope_by_sql detail_data_by_es, event_data_by_es, scope_by_sql
from unify_api.modules.scope_analyse.components.scope_analyse_cps import \ from unify_api.modules.scope_analyse.components.scope_analyse_cps import \
ScopeRecordResp, ScopeRecord, ScopeAnalyseResp, LeakageCurrentResp ScopeRecordResp, ScopeRecord, ScopeAnalyseResp, LeakageCurrentResp
from pot_libs.logger import log from pot_libs.logger import log
...@@ -19,7 +19,6 @@ from unify_api.utils.response_code import RET ...@@ -19,7 +19,6 @@ from unify_api.utils.response_code import RET
async def scope_record_service(cid, point_id, page_num, page_size, start, end): async def scope_record_service(cid, point_id, page_num, page_size, start, end):
# datas, total = await scope_by_es(cid, point_id, page_num, page_size, start, end)
li = [f"cid={cid}"] li = [f"cid={cid}"]
if point_id: if point_id:
if len(point_id) == 1: if len(point_id) == 1:
...@@ -36,8 +35,8 @@ async def scope_record_service(cid, point_id, page_num, page_size, start, end): ...@@ -36,8 +35,8 @@ async def scope_record_service(cid, point_id, page_num, page_size, start, end):
# 动态漏电流阈值 # 动态漏电流阈值
rows = [] rows = []
if datas: if datas:
start = (page_num-1) * page_size start = (page_num - 1) * page_size
datas = datas[start: start+page_num] datas = datas[start: start + page_num]
for data in datas: for data in datas:
# 漏电流 # 漏电流
if data["fault_type"] in ("over_res_cur", "overResidualCurrent"): if data["fault_type"] in ("over_res_cur", "overResidualCurrent"):
...@@ -65,37 +64,46 @@ async def scope_record_service(cid, point_id, page_num, page_size, start, end): ...@@ -65,37 +64,46 @@ async def scope_record_service(cid, point_id, page_num, page_size, start, end):
record_type_name, probability = "不存在故障", "" record_type_name, probability = "不存在故障", ""
fault_type = await get_trigger_params_dao(data["fault_type"]) fault_type = await get_trigger_params_dao(data["fault_type"])
reason = fault_type.get("name") reason = fault_type.get("name")
dt = time_format.convert_to_dt(data["datetime"]) dt = data["create_time"]
check_dt = time_format.convert_dt_to_timestr(dt) check_dt = time_format.convert_dt_to_timestr(dt)
sr = ScopeRecord(trigger_time=check_dt, point_id=data["point_id"], check_timestamp = int(time_format.convert_dt_to_timestamp(dt))
point_name=point_dict.get(data["point_id"]), sr = ScopeRecord(trigger_time=check_dt, point_id=data["pid"],
point_name=point_dict.get(data["pid"]),
record_type=data["fault_type"], record_type=data["fault_type"],
record_type_name=record_type_name, record_type_name=record_type_name,
probability=probability, reason=reason, probability=probability, reason=reason,
scope_id=data["_id"]) scope_id="{}_{}".format(data["pid"],
check_timestamp))
rows.append(sr) rows.append(sr)
return ScopeRecordResp(rows=rows, total=total) return ScopeRecordResp(rows=rows, total=total)
# 故障诊断-波形分析 # 故障诊断-波形分析
async def scope_analyse_service(scope_id): async def scope_analyse_service(pid, create_time):
data = await detail_data_by_es(scope_id) check_dt = time_format.get_datetime_str(create_time)
data = await detail_data_by_es(pid, check_dt)
if not data: if not data:
log.info(f"波形分析 没有数据 scope_id:{scope_id}") log.info(f"波形分析 没有数据 pid:{pid},create_time:{create_time}")
return success_res(code=RET.not_data, msg="没有找到该数据") return success_res(code=RET.not_data, msg="没有找到该数据")
point_name = await get_point_name_dao(data["point_id"]) point_name = await get_point_name_dao(pid)
dt = time_format.convert_to_dt(data["datetime"]) try:
check_dt = time_format.convert_dt_to_timestr(dt) async with QsClient() as qs:
context = json.loads(data.get("context")) context = await qs.get_object(data["url"])
log.info(f"波形分析 scope_id:{scope_id}, type:{data['fault_type']}") except Exception as e:
res = data.get("result") log.error(f"录波地址无效 url:{data['url']} message:{str(e)}")
if res: return success_res(code=RET.not_data, msg="录波地址有误")
log.info(
f"波形分析 pid:{pid},create_time:{create_time}, type:{data['fault_type']}")
res = data.get("index_loc")
try:
res_dic = json.loads(res) res_dic = json.loads(res)
trigger_point = [d for d in res_dic.values()][0].get("location") trigger_point = [d for d in res_dic.values()][0].get("location")
else: except AttributeError as e:
trigger_point = None log.error(
f"录波出发位置有误 index_loc:{data['index_loc']} message:{str(e)}")
trigger_point = 0
# 漏电流 # 漏电流
if data["fault_type"] == "over_res_cur": if data["fault_type"] in ("over_res_cur", "overResidualCurrent"):
threhold = await get_threhold(data["cid"], data["sid"]) threhold = await get_threhold(data["cid"], data["sid"])
# result = leakage_reg(ileak_rms=context["ileak_rms"], # result = leakage_reg(ileak_rms=context["ileak_rms"],
# leak_hold=threhold) # leak_hold=threhold)
...@@ -113,7 +121,7 @@ async def scope_analyse_service(scope_id): ...@@ -113,7 +121,7 @@ async def scope_analyse_service(scope_id):
point_name=point_name["name"], point_name=point_name["name"],
trigger_time=check_dt, trigger_time=check_dt,
trigger_point=trigger_point, trigger_point=trigger_point,
trigger_params=trigger_params, trigger_params="-".join(trigger_params),
reason=reason, reason=reason,
contents=context) contents=context)
else: else:
...@@ -139,15 +147,16 @@ async def scope_analyse_service(scope_id): ...@@ -139,15 +147,16 @@ async def scope_analyse_service(scope_id):
# trigger_params[0] = trigger_params[0] + "相" # trigger_params[0] = trigger_params[0] + "相"
fault_type = await get_trigger_params_dao(data["fault_type"]) fault_type = await get_trigger_params_dao(data["fault_type"])
trigger_params = [fault_type.get("name")] trigger_params = [fault_type.get("name")]
event = await event_data_by_es(scope_id) event = await event_data_by_es(data["mtid"], check_dt,
data["fault_type"])
if event: if event:
trigger_params.insert(0, f'{event.get("phase")}相') trigger_params.insert(0, f'{event.get("phase")}相')
logging.info(f"scope_id:{scope_id}, fault_type:{data['fault_type']}, " logging.info(f"pid:{pid},create_time:{create_time}, "
f"result:{result}") f"fault_type:{data['fault_type']}, result:{result}")
return ScopeAnalyseResp(point_id=data["point_id"], return ScopeAnalyseResp(point_id=pid,
point_name=point_name["name"], point_name=point_name["name"],
trigger_time=check_dt, trigger_time=check_dt,
trigger_params=["漏电流越限"], trigger_params="-".join(trigger_params),
trigger_point=trigger_point, trigger_point=trigger_point,
reason=fina_reason, reason=fina_reason,
contents=context) contents=context)
......
from pot_libs.sanic_api import summary from pot_libs.sanic_api import summary
from unify_api.modules.scope_analyse.components.scope_analyse_cps import \ from unify_api.modules.scope_analyse.components.scope_analyse_cps import \
ScopeRecordReq, ScopeRecordResp, ScopeAnalyseReq, ScopeAnalyseResp,\ ScopeRecordReq, ScopeRecordResp, ScopeAnalyseReq, ScopeAnalyseResp, \
LeakageCurrentResp LeakageCurrentResp
from unify_api.modules.scope_analyse.service.scope_analyse_service import * from unify_api.modules.scope_analyse.service.scope_analyse_service import *
...@@ -14,15 +14,16 @@ async def post_scope_record(req, body: ScopeRecordReq) -> ScopeRecordResp: ...@@ -14,15 +14,16 @@ async def post_scope_record(req, body: ScopeRecordReq) -> ScopeRecordResp:
end = body.end end = body.end
page_size = body.page_size page_size = body.page_size
page_num = body.page_num page_num = body.page_num
return await scope_record_service(cid, point_id, page_num, page_size, start, end) return await scope_record_service(cid, point_id, page_num, page_size,
start, end)
# 故障诊断-波形分析 # 故障诊断-波形分析
@summary('故障诊断-波形分析') @summary('故障诊断-波形分析')
async def post_scope_analyse(req, body: ScopeAnalyseReq) -> ScopeAnalyseResp: async def post_scope_analyse(req, body: ScopeAnalyseReq) -> ScopeAnalyseResp:
scope_id = body.scope_id scope_id = body.scope_id
return await scope_analyse_service(scope_id) pid, create_time = tuple(scope_id.split("_"))
return await scope_analyse_service(pid, int(create_time))
# 故障诊断-波形分析-漏电流 # 故障诊断-波形分析-漏电流
# @summary('故障诊断-波形分析-漏电流') # @summary('故障诊断-波形分析-漏电流')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment