Commit 521761f1 authored by peng.xiaozhe's avatar peng.xiaozhe

Merge branch 'develop' of git.soejh.com:chaonan/unify_api2 into develop

parents 8dd52daa 67020038
......@@ -67,15 +67,15 @@ class ScopeItemDownload(Model):
ic: float = Float('C相电流')
pttl: float = Float('功率')
lc: float = Float('漏电流')
@dataclass
class ScopeListDownloadResp(Model):
"""
录波下载数据
"""
rows: list = Opt(List("数据列表").items(ScopeItemDownload))
@dataclass
class ScopeContent(Model):
......@@ -92,7 +92,7 @@ class ScopeDetailRep(Model):
识别详情-请求格式
'''
id: str = Opt(Str("doc_id").eg("423_over_gap_i__1655991008"))
@classmethod
def example(cls):
return {
......@@ -117,7 +117,7 @@ class ScopeDetailsResp(Model, DbErr):
v: list = List("电压事件录波").items(ScopeContent)
residual_current: list = Opt(List("2s录波漏电流").items(ScopeContent))
p: list = Opt(List("2s录波功率").items(ScopeContent))
@classmethod
def example(cls):
return {
......@@ -185,7 +185,7 @@ class GetScopeConfigResp(Model, ServerErr):
'''
pid: int = Int("监测点id").eg(20)
rows: list = List("配置内容").items(SetScopeConfigReq)
@classmethod
def example(cls):
return {
......@@ -335,7 +335,7 @@ class InitScopeConfigReq(Model):
初始化配置信息--请求
'''
pids: list = List('pids')
@classmethod
def example(cls):
return {
......@@ -351,7 +351,7 @@ class FlushScopeEsDataReq(Model):
scope_type_list: list = List('录波类型')
start_time: str = Str('开始时间')
end_time: str = Str('结束时间')
@classmethod
def example(cls):
return {
......@@ -399,7 +399,7 @@ scope_list_req_example = {
"end": "2022-06-22 23:59:59",
"pids": [260, 261, 268]
}
}
'''
......@@ -507,3 +507,30 @@ init_scope_config_example = {
}
}
}
@dataclass
class ScopeDetail(Model):
name: str = Str("名称").eg("ia")
value: list = List("值")
@dataclass
class ScopeDetailResp(Model):
name: str = Str("监测点名称").eg("华侨新村270栋101")
ctnum: int = Int("接线方式 2-两表法 3-三表法").eg(1)
sid: str = Str("硬件编号").eg("A2270457094")
event_datetime: str = Str("触发时间").eg("2022-05-26 15:55:03")
duration: str = Str("录波时长").eg("400ms")
message: str = Str("触发原因").eg("漏电流越限")
scope_g: str = Str("录波颗粒度 0.25ms/0.2s/2s").eg("0.25ms")
scope_type: str = Str("录波类型").eg("over_res_cur")
conclusion: str = Str("分析结论").eg("xxxxxxx")
u_list: list = List("电压曲线").items(ScopeDetail)
u_slots: list = List("电压横坐标")
i_list: list = List("电流曲线").items(ScopeDetail)
i_slots: list = List("电流横坐标")
lc_list: list = List("漏电流曲线").items(ScopeDetail)
lc_slots: list = List("漏电流横坐标")
power_list: list = List("功率曲线").items(ScopeDetail)
power_slots: list = List("功率横坐标")
......@@ -183,10 +183,25 @@ async def get_mtid_by_pid_dao(pid):
FROM
point
WHERE
pid = {pid}
pid = %s
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, )
data = await conn.fetchone(sql, args=(pid,))
return data
async def get_mtids_by_pids_dao(pid):
sql = f"""
SELECT
mtid
FROM
point
WHERE
pid in %s
"""
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, args=(pid,))
data = [i['mtid'] for i in data if i['mtid']]
return data
......
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_scope_event_by_event_id(event_id):
"""
获取录波报警详情
:param event_id:
:return:
"""
sql = f"""
select pe.*,m.sid from point_1min_event pe
left join monitor m on pe.mtid = m.mtid
where pe.event_id =%s
"""
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, args=(event_id,))
return result
async def get_scope_detail_by_pid(pid, event_datetime):
"""
获取录波详情
:param pid:
:param event_datetime:
:return:
"""
sql = f"""
select * from point_1min_scope
where pid =%s and create_time=%s
"""
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, args=(pid, event_datetime))
return result
async def get_threshold_by_mtid(mtid):
"""
根据mtid获取阈值
:param mtid:
:return:
"""
sql = f"""
select threshold from soe_config_record scr
left join location l on scr.lid = l.lid
where l.ad_type='residual_current' and scr.etype='overResidualCurrent'
and l.mtid=%s
"""
async with MysqlUtil() as conn:
threshold = await conn.fetch_value(sql, args=(mtid,))
return threshold
from operator import itemgetter
from itertools import groupby
from pot_libs.common.components.query import Range, Equal, Filter, InGroup, \
Sort
from unify_api.modules.common.dao.common_dao import get_fields_by_mtid
from pot_libs.es_util import es_helper
from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from pot_libs.common.components.query import PageRequest
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_aiao_1min_dao, \
get_aiao_1day_dao, get_aiao_15min_dao, get_point_15min_chart_dao, \
get_point_1day_chart_dao, get_point_1min_chart_dao, get_point_monitor_dao, \
electric_index_list_dao, electric_index_location_dao
from unify_api.utils import time_format
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type
)
from unify_api.modules.anshiu.components.fine_monitor_cps import (
Statistics, Chart, ElectricIndexParam
ElectricIndexParam
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_es_point_15min_data,
get_es_aiao_15min_data, get_threshold_by_location, get_aiao_1min_pds,
get_threshold_by_location, get_aiao_1min_pds,
get_aiao_data_pds, get_point_1min_chart_pds, get_point_data_chart_pds,
GENERAL_PARAM_FIELD_2, ELECTRIC_QUALITY_FIELD_2,
GENERAL_PARAM_FIELD_3, ELECTRIC_QUALITY_FIELD_3, cal_electic_value,
cal_aiao_value, cal_pt_value
)
from unify_api.utils.time_format import convert_timestamp_to_str, get_time_diff
from unify_api.utils.time_format import get_time_diff
async def get_adio_chart_data(location_group, location_info, date_start,
......@@ -112,202 +102,6 @@ async def get_point_chart_data(point_id, date_start, date_end, intervel,
return power, i, u, ctnum
async def get_adio_info_data(location_group, location_info, start_timestamp,
end_timestamp):
'''
获取环境相关数据
'''
range = Range(field="time", start=start_timestamp, end=end_timestamp)
# 分别统计各个location温度最大值、最小值、平均值
stats_info = {}
for location_id in location_group:
equal = Equal(field="location_id", value=location_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[],
keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
query_body = EsQuery.aggr_index(
page_request, stats_items=["value_max", "value_min", "value_avg"]
)
aggregations = await get_es_aiao_15min_data(query_body)
# 最大值, 这里叫法有点奇怪,但是最大值应该取15min的最大值聚合结果
max_info = aggregations.get("value_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max = source.get("value_max", 0)
max = round(max, 2) if max is not None else ""
max_ts = source.get("value_max_time", 0)
max_value_time = str(time_format.convert_to_dt(max_ts))
else:
max = ""
max_value_time = ""
# 最小值
min_info = aggregations.get("value_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min = source.get("value_min", 0)
min = round(min, 2) if min is not None else ""
min_ts = source.get("value_min_time", 0)
min_value_time = str(time_format.convert_to_dt(min_ts))
else:
min = ""
min_value_time = ""
avg = aggregations.get("value_avg_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
stats_info[location_id] = {
"max": {"value": max, "time": max_value_time},
"min": {"value": min, "time": min_value_time},
"avg": avg,
}
# 返回
adio_indexes = []
for location_id, info in location_info.items():
item, type = info["item"], info["type"]
# 漏电流的item更改一下
item = '漏电流' if type == 'residual_current' else item
_info = stats_info[location_id]
adio_index = Statistics(
type=type,
item=item,
max=_info["max"]["value"],
max_time=_info["max"]["time"],
min=_info["min"]["value"],
min_time=_info["min"]["time"],
avg=_info["avg"],
)
adio_indexes.append(adio_index)
return adio_indexes
async def get_point_info_data(point_id, start_time,
end_time):
# 2. 获取几表法
ctnum, _ = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(
f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum, 装置点已经被拆!")
# 给默认值3表法
ctnum = 3
range = Range(field="quarter_time", start=start_time,
end=end_time)
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if ctnum == 2:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"uab_mean",
"uab_min",
"uab_max",
"ucb_mean",
"ucb_min",
"ucb_max",
"ia_mean",
"ia_min",
"ia_max",
"ic_mean",
"ic_min",
"ic_max",
]
else:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"ua_mean",
"ua_min",
"ua_max",
"ub_mean",
"ub_min",
"ub_max",
"uc_mean",
"uc_min",
"uc_max",
"ia_mean",
"ia_min",
"ia_max",
"ib_mean",
"ib_min",
"ib_max",
"ic_mean",
"ic_min",
"ic_max",
]
query_body = EsQuery.aggr_index(page_request, stats_items=stats_items)
aggregations = await get_es_point_15min_data(query_body)
# 常规参数统计
common_indexes = []
_stats_items = {i.rsplit("_", 1)[0] for i in stats_items}
for item in _stats_items:
# 最大值
max_info = aggregations.get(f"{item}_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max_value = source.get(f"{item}_max", "")
max_dt = source.get(f"{item}_max_time")
if max_dt is None:
log.error(
f"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}")
max_value_time = str(
time_format.convert_to_dt(max_dt) if max_dt else "")
else:
max_value = ""
max_value_time = ""
# 最小值
min_info = aggregations.get(f"{item}_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min_value = source.get(f"{item}_min")
min_value = min_value if min_value is not None else ""
min_dt = source.get(f"{item}_min_time")
min_value_time = str(time_format.convert_to_dt(min_dt) if min_dt
else "")
else:
min_value = ""
min_value_time = ""
# 平均值
avg = aggregations.get(f"{item}_mean_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
elec_index = Statistics(
item=item,
max=max_value,
max_time=max_value_time,
min=min_value,
min_time=min_value_time,
avg=avg,
)
common_indexes.append(elec_index)
return common_indexes
async def electric_index_list_service(mtid, start_time, end_time,
param_types=None):
"""
......
import json
import time
import math
from pot_libs.qingstor_util.qs_client import QsClient
import pandas as pd
from pandas.core.dtypes.inference import is_number
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin
from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg
from unify_api.utils.log_utils import LOGGER
from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil
from pot_libs.aredis_util import aredis_utils
from pot_libs.common.components.query import PageRequest, Filter, InGroup, \
......@@ -17,7 +20,9 @@ from pot_libs.utils.time_format import convert_dt_to_timestr, \
convert_to_es_str, time_str_to_str
from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeListItem, ScopeContent, ScopeDetailsResp, GetScopeConfigList, \
init_scope_config_example, ScopeItemDownload
init_scope_config_example, ScopeItemDownload, ScopeDetail
from unify_api.modules.anshiu.dao.scope_operations_dao import \
get_scope_event_by_event_id, get_scope_detail_by_pid, get_threshold_by_mtid
from unify_api.modules.anshiu.procedures.scope_operations_pds import \
get_scope_config_by_pid, set_scope_config_by_pid, add_scope_config_by_pid, \
get_scope_list_by_pid
......@@ -25,6 +30,8 @@ from unify_api.modules.common.dao.common_dao import point_by_points, \
points_by_cid
from unify_api.modules.device_cloud.procedures.mqtt_helper import \
change_param_to_config
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type_new15
from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope_pids, \
query_search_scope
......@@ -40,43 +47,13 @@ async def search_scope_service(pids, cid, page_num, page_size, start, end,
'''
获取录波记录
'''
if len(pids) == 0:
datas = await query_search_scope([cid], '', page_num, page_size,
start, end, scope_g)
points = await points_by_cid([cid])
else:
datas = await query_search_scope_pids(pids, page_num, page_size,
start, end, scope_g)
points = await point_by_points(list(set(pids)))
if not datas["hits"]["hits"]:
return [], 0
total = datas["hits"]["total"]
# 获取监测点数据
points = {point['pid']: point for point in points}
# 获取录波明细数据以获取颗粒度
# scope_type_items = await get_scope_type(pids, cid, start, end)
rows = []
for info in datas["hits"]["hits"]:
pid = info["_source"]["point_id"]
message = info["_source"]["message"]
check_dt = info["_source"]["datetime"]
# scope_type_key = "%s_%s" % (pid, info["_source"]["time"])
# scope_type = scope_type_items.get(scope_type_key, 0)
scope_type = info["_source"].get("scope_g", '')
if scope_type == '200ms':
scope_type = '0.2s'
dt = time_format.convert_to_dt(check_dt)
check_dt = time_format.convert_dt_to_timestr(dt)
scope_list_item = ScopeListItem(check_dt=check_dt,
scope_type=scope_type,
point=points[pid].get("name"),
message=message,
doc_id=info["_id"]
)
rows.append(scope_list_item)
return rows, total
datas = await query_search_scope(cid, pids, page_num, page_size,
start, end, scope_g)
total = len(datas)
return datas, total
async def scope_list_download_data(pids, start, end):
......@@ -350,7 +327,7 @@ async def get_scope_config_serv(pid):
'value':
[config.get('start_time_III'),
config.get('stop_time_III')]}
one_data = GetScopeConfigList(state=state,
configs=one_config)
return_data[scope_type] = one_data
......@@ -397,7 +374,7 @@ async def get_mqtt_scope_config(pid):
res_data = await mqtt_func(pub_json, pid)
if not res_data.get('data'):
raise Exception('请求数据为空')
sid = pub_json.get("sid")
# 转换数据字段
trans_fields = {"0.25ms": {"scopeEnable": "en_scope",
......@@ -492,7 +469,7 @@ async def flush_scope_es_data(scope_g, start_time, end_time):
in_groups=[])
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
query_body = EsQuery.query(page_request)
query_body['_source'] = ['_id']
async with EsUtil() as es:
......@@ -532,3 +509,120 @@ async def flush_scope_es_data(scope_g, start_time, end_time):
finally:
# 递增
start_dt = every_end_dt
# 1min_event 里面的event_datetime 对应 tdengine 里面scope的ts_origin
async def scope_detail_service(event_id):
# 获取报警信息
event_data = await get_scope_event_by_event_id(event_id)
if not event_data:
return {}, [], [], [], [], [], [], [], []
event_datetime = str(event_data.get("event_datetime"))
pid = event_data.get("pid")
mtid = event_data.get("mtid")
sid = event_data.get("sid")
event_type = event_data.get("event_type")
# 获取录播数据
scope_data = await get_scope_detail_by_pid(pid, event_datetime)
if not scope_data or not scope_data.get("url"):
return {}, [], [], [], [], [], [], [], []
# 接线法:二表/三表
ctnum = await get_wiring_type_new15(pid) or 3
# 查询录波详细数据 还不确定在哪儿拿,现在脚本是minio里边,但是家义说未来要在青云里面
# 测试数据存储
# filedata/electric_ops/scope/A2004000519/2022/6/27/17/1656321462.json
# 获取录波曲线数据
try:
async with QsClient() as qs:
url = scope_data.get("url")
wave_data = await qs.get_object(url)
except Exception as e:
LOGGER.error(f"scope_detail_service error message:{str(e)}")
return {}, [], [], [], [], [], [], [], []
# 录波颗粒度
scope_g = scope_data.get("scope_g")
if ctnum == 2:
i_fields = ['ia', 'ic']
v_fields = ['uab', 'ucb']
else:
i_fields = ['ia', 'ib', 'ic']
v_fields = ['ua', 'ub', 'uc']
u_list, i_list = [], []
residual_current, power = [], []
u_count, i_count, lc_count, power_count = 0, 0, 0, 0
for k, v in wave_data.items():
v = [value if not math.isnan(value) else '' for value in v]
if k in i_fields:
i_list.append(ScopeDetail(name=k, value=v))
u_count = len(v)
if k in v_fields:
u_list.append(ScopeDetail(name=k, value=v))
i_count = len(v)
# 2s颗粒度的会有漏电流及功率
if k in ("lc", "ileak_rms"):
residual_current.append(ScopeDetail(name='漏电流', value=v))
lc_count = len(v)
if k == "pttl":
power.append(ScopeDetail(name='总有功功率', value=v))
power_count = len(v)
u_slots = [i for i in range(1, u_count + 1)]
i_slots = [i for i in range(1, i_count + 1)]
lc_slots = [i for i in range(1, lc_count + 1)]
power_slots = [i for i in range(1, power_count + 1)]
# 结论分析
result = await get_scope_conclusion(wave_data, event_type, mtid, sid,
ctnum)
# 返回信息
data = dict()
data["name"] = event_data.get("name")
data["sid"] = sid
data["event_datetime"] = str(event_data.get("event_datetime"))
# data["duration"] = event_data.get("duration")
if scope_g == '0.25ms':
# data["duration"] = "400ms"
data["duration"] = str(int(i_count * 0.25)) + "ms"
elif scope_g == '200ms':
data["duration"] = "10s"
else:
data["duration"] = "10min"
data["message"] = event_data.get("message")
data["conclusion"] = result
data["scope_g"] = scope_g
data["scope_type"] = event_data.get("event_type")
data["ctnum"] = ctnum
return data or {}, u_list, i_list, residual_current, power, u_slots, \
i_slots, lc_slots, power_slots
async def get_scope_conclusion(wave_data, event_type, mtid, sid, ctnum):
"""
结论分析
:param wave_data:
:param event_type:
:param mtid:
:param sid:
:param ctnum:
:return:
"""
try:
lc_data = wave_data.get("ileak_rms") or wave_data.get("lc")
LOGGER.info(f"wave_data:{wave_data}")
if event_type == "over_res_cur":
threshold = await get_threshold_by_mtid(mtid) or 30
result = leakage_reg(ileak_rms=lc_data,
leak_hold=threshold)
LOGGER.info(f"actionFile 漏电流 sid:{sid}结论:{result}")
else:
res = actionFilemin(wave_data, ctnum)
LOGGER.info(f"结论分析 res:{res}")
if res == "nofault":
result = "不存在故障"
else:
result = ""
for r in res:
result += "%s : %0.0f%%;" % (r[0], r[1] * 100)
except Exception as e:
result = ""
LOGGER.error(f"actionFile error:{e}")
return result
from pot_libs.sanic_api import summary, examples
from pot_libs.common.components.query import PageRequest
from pot_libs.sanic_api import summary
from pot_libs.logger import log
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao
from unify_api.utils.request_util import filed_value_from_list
from unify_api.utils import time_format
from unify_api.modules.anshiu.components.fine_monitor_cps import (
FineMonitorChartReq, FineMonitorInfoReq, FineMonitorChartResp,
FineMonitorInfoResp, ElectricIndexListResp
FineMonitorInfoResp
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_location_by_ids, get_threshold_by_location, get_mtid_by_location_ids
get_mtid_by_location_ids
)
from unify_api.modules.anshiu.service.fine_monitor_serv import (
get_adio_chart_data, get_point_chart_data, get_adio_info_data,
get_point_info_data, electric_index_list_service
get_adio_chart_data, get_point_chart_data,
electric_index_list_service
)
......@@ -70,45 +67,6 @@ async def post_fine_monitor_chart(request,
ctnum=ctnum)
@summary("精细监测-指标统计1")
async def post_fine_monitor_info1(request,
body: FineMonitorInfoReq) -> FineMonitorInfoResp:
try:
date_start = body.start
date_end = body.end
# 获取监测点
point_id = body.pid
if not point_id or point_id <= 0:
raise Exception('point_error point_id:{}'.format(point_id))
# 获取location点
location_group = body.location_ids
if not location_group:
raise Exception('in_groups is NULL, no location_id')
except Exception as e:
log.error('get_fine_monitor_info ' + str(e))
return FineMonitorInfoResp.param_error()
# 获取location表的信息
try:
location_info = await get_location_by_ids(location_group)
except Exception as e:
log.error('get_fine_monitor_chart_error ' + e)
return FineMonitorChartResp.db_error()
info_list = []
# 环境相关数据
adio_list = await get_adio_info_data(location_group,
location_info, date_start,
date_end)
# 用电相关数据
point_list = await get_point_info_data(point_id, date_start,
date_end)
info_list.extend(adio_list)
info_list.extend(point_list)
return FineMonitorInfoResp(info_list=info_list)
@summary("精细监测-指标统计")
async def post_fine_monitor_info(request,
body: FineMonitorInfoReq) -> FineMonitorInfoResp:
......
......@@ -7,11 +7,11 @@ from unify_api.modules.anshiu.components.scope_operations_cps import \
SetScopeConfigReq, SetScopeConfigResp, \
scope_list_req_example, ScopeDetailsResp, ScopeDetailRep, \
set_scope_config_example, InitScopeConfigReq, FlushScopeEsDataReq, \
ScopeListDownloadReq, ScopeListDownloadResp
ScopeListDownloadReq, ScopeListDownloadResp, ScopeDetailResp
from unify_api.modules.anshiu.service.scope_operations_serv import \
search_scope_service, scope_detail_data, get_scope_config_serv, \
set_scope_config_serv, init_scope_config, flush_scope_es_data, \
scope_list_download_data
scope_list_download_data, scope_detail_service
@summary("识别记录-列表")
......@@ -47,7 +47,9 @@ async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp:
# 替换scope_g
if scope_g:
scope_g = ['200ms' if i == '0.2s' else i for i in scope_g]
rows, total = await search_scope_service(pids, cid, page_num, page_size,
rows, total = await search_scope_service(pids, cid,
(page_num - 1) * page_size,
page_size,
start, end, scope_g)
return ScopeListResp(rows=rows, total=total, page_num=page_num)
......@@ -65,7 +67,7 @@ async def post_scope_list_download(request,
@summary("识别记录-详情")
async def post_scope_detail(request,
async def post_scope_detail1(request,
body: ScopeDetailRep) -> ScopeDetailsResp:
'''
识别详情
......@@ -74,6 +76,37 @@ async def post_scope_detail(request,
return await scope_detail_data(doc_id)
@summary('数据统计-录波查询-录波详情')
async def post_scope_detail(req, body: ScopeDetailRep) -> ScopeDetailResp:
# 1,获取参数
event_id = body.id
# 2,获取信息
data, u_list, i_list, residual_current, power, u_slots, \
i_slots, lc_slots, power_slots = await scope_detail_service(event_id)
# 3,返回信息
return ScopeDetailResp(
name=data.get("name"),
ctnum=data.get("ctnum"),
sid=data.get("sid"),
event_datetime=data.get("event_datetime"),
duration=data.get("duration"),
message=data.get("message"),
scope_g=data.get("scope_g"),
scope_type=data.get("scope_type"),
conclusion=data.get("conclusion"),
u_list=u_list,
u_slots=u_slots,
i_list=i_list,
i_slots=i_slots,
lc_list=residual_current,
lc_slots=lc_slots,
power_list=power,
power_slots=power_slots,
)
@summary("识别设置-获取配置信息")
async def post_get_scope_config(request,
body: GetScopeConfigReq) -> GetScopeConfigResp:
......
......@@ -327,7 +327,7 @@ async def get_all_username():
async def monitor_by_mtid(mtid):
sql = "select * from monitor where mtid = %s "
async with MysqlUtil() as conn:
monitor_dic = await conn.fetchone(sql, args=(mtid, ))
monitor_dic = await conn.fetchone(sql, args=(mtid,))
return monitor_dic
......@@ -357,3 +357,11 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, (mtid,))
return result
async def sql_point_15min_index_new15(start, end, pid):
sql = f"SELECT pttl_mean, create_time FROM `point_15min_electric` " \
f"where pid=%s and create_time BETWEEN '{start}' and '{end}'"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,))
return datas
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str
from pot_libs.mysql_util.mysql_util import MysqlUtil
import json
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
point_15min_index = "poweriot_point_15min_index"
async def query_point_15min_index(date_start, date_end, point_id):
"""point点某一天96个点数据"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 100,
"_source": ["pttl_mean", "quarter_time"],
"query": {
"bool": {
"must": [
{
"term": {
"pid": point_id
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search(body=query_body, index=point_15min_index)
return es_re
async def query_point_15min_index_aggs_pid(date_start, date_end, point_list):
"""
1. 根据pid聚合
2. 根据15分钟聚合
3. 拿到hits数据
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"pid": point_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"points": {
"terms": {
"field": "pid",
"size": 1000
},
"aggs": {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": "15m",
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"pttl_mean": {
"top_hits": {
"size": 1,
"_source": ["pttl_mean", "quarter_time"]
}
}
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body,
index=point_15min_index)
return es_re["aggregations"]["points"]["buckets"]
async def sql_point_15min_index_new15(start, end, pid):
sql = f"SELECT pttl_mean, create_time FROM `point_15min_electric` " \
f"where pid=%s and create_time BETWEEN '{start}' and '{end}'"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,))
return datas
async def company_by_cids(cids):
"""根据cids查询company信息"""
sql = "SELECT * from company where cid in %s"
async with MysqlUtil() as conn:
company_list = await conn.fetchall(sql, args=(tuple(cids),))
return company_list
async def point_by_points(point_list):
sql = "SELECT * from point where pid in %s"
async with MysqlUtil() as conn:
point_info_list = await conn.fetchall(sql, args=(tuple(point_list),))
return point_info_list
async def get_history_pids_by_mtids(cid, create_time, mtids=None):
"""
获取截止到某个日期之前安装的监测点
:param cid:
:param create_time:
:param mtids:
:return:
"""
where = ""
if mtids:
where += f" and m.mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql = f"SELECT p.pid,m.mtid from point p left join monitor m on " \
f"p.mtid=m.mtid where m.demolished = 0 and p.cid = %s and " \
f"p.create_time <= %s {where}"
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, create_time))
return result or []
async def point_by_mtids(mtids):
sql = "SELECT pid, mtid from point where mtid IN %s"
async with MysqlUtil() as conn:
point_list = await conn.fetchall(sql, args=(mtids,))
return point_list
async def points_by_cid(cids, m_type=None):
"""根据cid查询points"""
type_sql = ""
if m_type:
type_sql = f"and m.m_type = {m_type}"
sql = f"SELECT p.* FROM `monitor` m LEFT JOIN point p " \
f"on m.mtid=p.mtid WHERE p.cid in %s {type_sql}"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql, args=(tuple(cids),))
return points
async def points_monitor_by_cid(cids, m_type=None):
type_sql = ""
if m_type:
type_sql = f"and m.m_type = {m_type}"
sql = "SELECT p.pid, m.* FROM `monitor` m LEFT JOIN point p " \
f"on m.mtid=p.mtid where m.cid in %s and m.demolished=0 {type_sql}"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql, args=(tuple(cids),))
return points
async def monitor_by_cid(cid, m_type=None):
"""根据cid查询monitor"""
type_sql = ""
if m_type:
type_sql = f"and monitor.m_type = {m_type}"
sql = f"SELECT * FROM monitor WHERE cid = %s and demolished = 0 {type_sql}"
async with MysqlUtil() as conn:
monitor_list = await conn.fetchall(sql, args=(cid,))
return monitor_list
async def monitor_point_join(cid):
"""monitor和point关联"""
sql = "SELECT m.mtid, p.pid, p.name, p.add_to_company FROM monitor m " \
"inner join point p on m.mtid = p.mtid " \
"WHERE m.cid = %s and m.demolished = 0"
async with MysqlUtil() as conn:
monitor_point_list = await conn.fetchall(sql, args=(cid,))
return monitor_point_list
async def monitor_location_join(cid):
"""monitor和location关联"""
sql = "SELECT m.mtid, l.id, l.item FROM monitor m " \
"inner join location l on m.mtid = l.mtid " \
"WHERE m.cid = %s and m.demolished = 0"
async with MysqlUtil() as conn:
monitor_location_list = await conn.fetchall(sql, args=(cid,))
return monitor_location_list
async def company_model_by_cid(cid):
"""根据cid查询company_model信息"""
sql = "SELECT * from company_model where cid = %s"
async with MysqlUtil() as conn:
company_model_dic = await conn.fetchone(sql, args=(cid,))
return company_model_dic
async def inline_zdu_all_by_cid(cid):
"""根据cid查询inline_zdu信息"""
sql = "SELECT * from inline where cid = %s"
async with MysqlUtil() as conn:
inline_zdu_dic = await conn.fetchall(sql, args=(cid,))
return inline_zdu_dic
async def company_extend_by_cid(cid):
"""根据cids查询company信息"""
sql = "SELECT * from company_extend where cid = %s"
async with MysqlUtil() as conn:
company_list = await conn.fetchall(sql, args=(cid,))
return company_list
async def user_by_user_id(user_id, is_delete=None):
sql = "SELECT * FROM user where user_id = %s"
if is_delete is not None:
sql += f' AND is_delete = {is_delete} '
async with MysqlUtil() as conn:
user_dic = await conn.fetchone(sql=sql, args=(user_id,))
return user_dic
async def user_by_phone_number(phone, is_delete=True):
sql = "SELECT * FROM user where phone_number = %s"
if is_delete:
sql += " and is_delete=0"
else:
sql += " order by is_delete asc"
async with MysqlUtil() as conn:
user_dic = await conn.fetchone(sql=sql, args=(phone,))
return user_dic
async def get_user_list_by_cid_auth(cid, auth, symbol="="):
"""
根据公司权限 获取用户列表
:param cid:
:param auth: 权限
:param symbol: 符号j
:return:
"""
sql = f"""
select distinct(u.user_id) user_id from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid = %s and u.auth {symbol} %s
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, auth))
return [user.get("user_id") for user in result] if result else []
async def get_user_info_list_by_cid_auth(cid, auth, symbol="="):
"""
根据公司权限 获取用户列表 有可能会有重复的user_info
:param cid:
:param auth: 权限
:param symbol: 比较符号 >, <, =......
:return:
"""
sql = f"""
select u.user_id, u.name from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid = %s and u.auth {symbol} %s
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, auth))
return result if result else []
async def get_common_type(cid):
"""获取不同设备对象的类型type_id"""
sql = "select DISTINCT m_type from monitor where cid = %s"
async with MysqlUtil() as coon:
common_type = await coon.fetchall(sql, args=(cid,))
common_type_list = []
for comm_type in common_type:
common_type_list.append(comm_type.get('m_type'))
return common_type_list
async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql = f"select {fields} from {table_name} where mtid = %s"
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, (mtid,))
return result
async def get_point_monitor_dao(id_value, field="m.mtid"):
sql = f"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc," \
f"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \
f"where m.demolished = 0 and {field}=%s;"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(id_value,))
return data
async def get_location_monitor_dao(lid):
sql = "SELECT l.lid,l.ad_field,m.sid FROM `location` l " \
"INNER JOIN monitor m on l.mtid=m.mtid where l.lid=%s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(lid,))
return data
async def get_monitor_status(topic, mtids, db="bromake", default_status=None):
"""
从redis获取设备状态
:param default_status: 当获取不到设备状态时的默认状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list = [f"status:{topic}:{db}:{mtid}" for mtid in mtids]
status_list_tmp = await RedisUtils().mget(key_list)
status_list = []
for status in status_list_tmp:
if isinstance(status, str):
status = json.loads(status)
if status is None:
status = default_status
status_list.append(status)
return dict(zip(mtids, status_list))
async def get_monitor_online_count(topic, mtids, db="bromake"):
"""
从redis获取设备状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list = [f"status:{topic}:{db}:{mtid}" for mtid in mtids]
status_list_tmp = await RedisUtils().mget(key_list)
online_count = 0
for status in status_list_tmp:
if status is None:
continue
online_count += 1
return online_count
async def get_tc_runtime(inline_ids):
sql = "SELECT inlid, name, tc_runtime FROM `inline` where inlid in %s;"
async with MysqlUtil() as conn:
tc_runtimes = await conn.fetchall(sql, args=(inline_ids,))
return tc_runtimes or []
async def get_pid_by_mtid(mtid):
sql = "select pid from point where mtid = %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(mtid,))
return data
async def get_point_install_date_by_cid(cid):
sql = "select min(create_time) install_time from point where cid = %s"
async with MysqlUtil() as conn:
data = await conn.fetch_value(sql, args=(cid,))
return data
# from unify_api.modules.common.dao.common_es_dao import \
# query_point_15min_index_aggs_pid
# from unify_api.modules.elec_charge.dao.elec_charge_dao import \
# query_charge_aggs_points
# from unify_api.utils.time_format import start_end_date
......
from unify_api.constants import SLOTS_15MIN, DUST_STATE
from unify_api.modules.common.dao.common_dao import storey_pl_by_cid, \
storey_wp_by_cid
from unify_api.modules.common.dao.common_es_dao import \
query_point_15min_index, sql_point_15min_index_new15
from unify_api.modules.common.dao.common_dao import \
sql_point_15min_index_new15
from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.tsp_water.components.drop_dust_cps import DdwResp, \
DdResp, IrmResp, IosResp, ItiResp, WsStatiResp
......@@ -108,7 +108,6 @@ async def post_drop_dust_wave_service(point_id, start, end):
"""降尘措施-雾炮-运行曲线"""
# 1. 获取聚合信息
slots_list = SLOTS_15MIN
# es_re = await query_point_15min_index(start, end, point_id)
sql_re = await sql_point_15min_index_new15(start, end, point_id)
if not sql_re:
return DdwResp(slots=[], value=[])
......
from pot_libs.es_util.es_utils import EsUtil
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao, \
get_sid_by_mtid_dao, get_mtids_by_pids_dao
from unify_api.utils.time_format import convert_es_str
from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE
......@@ -72,79 +74,76 @@ async def query_location_1min_index(l_database, date_start, date_end,
async def get_search_scope(cid, pid, start, end):
where_list = [f"pid = {pid}"]
if cid:
where_list.append(f"cid in %s")
if start and end:
where_list.append(f"event_datetime >= '{start}' and event_datetime <='{end}' ")
where_list.append(
f"event_datetime >= '{start}' and event_datetime <='{end}' ")
where_str = " and ".join(where_list)
sql = f"select * from point_1min_event where {where_str} " \
f"ORDER BY event_datetime desc limit 5000"
async with MysqlUtil() as conn:
if cid:
data = await conn.fetchall(sql, args=(cid, ))
data = await conn.fetchall(sql, args=(cid,))
else:
data = await conn.fetchall(sql)
return data
async def query_search_scope(cid, pid, page_num, page_size,
start, end):
query_body = {
"from": (page_num - 1) * page_size,
"size": page_size,
"query": {
"bool": {
"must": [
{
"term": {
"mode.keyword": "scope"
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
if start and end:
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
start_time, end_time, scope_g):
"""
查询录波列表
"""
if len(pid) > 1:
mtid = await get_mtids_by_pids_dao(pid)
else:
mtid = await get_mtid_by_pid_dao(pid)
where = ""
if cid:
query_body["query"]["bool"]["must"].append(
{
"terms": {
"cid": cid
}
}
)
if pid:
query_body["query"]["bool"]["must"].append(
{
"term": {
"point_id": pid
}
}
)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE)
return es_re
where += f" and pe.cid={cid} "
if start_time:
where += f" and pt.event_datetime >= '{start_time}' "
if end_time:
where += f" and pe.create_time <= '{end_time}' "
if mtid:
if len(mtid) == 1:
where += f" and pe.mtid = {mtid['mtid']} "
else:
where += f" and pe.mtid in {tuple(mtid)} "
if scope_g:
if len(scope_g) == 1:
where += f" AND pe.scope_g = {scope_g[0]} "
else:
where += f" AND pe.scope_g in {tuple(scope_g)} "
sql = f"""
SELECT
pt.event_id doc_id,
DATE_FORMAT(pt.event_datetime, '%Y-%m-%d %H:%i:%s') check_dt,
pt.`name` point,
pt.message message,
pe.scope_g scope_type
FROM
point_1min_event pt
LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid
AND pe.create_time = pt.event_datetime
WHERE
pt.event_mode = 'scope'
{where}
ORDER BY
pe.create_time DESC
LIMIT {page_num} , {page_size} """
async with MysqlUtil() as conn:
data = await conn.fetchall(sql, )
return data
async def get_scope_pids(pids, start, end):
......
......@@ -20,7 +20,7 @@ async def post_search_scope(req, body: SearchScopeRep) -> SearchScopeResq:
@summary('录波查询-详情')
async def post_scope_detail(req, body: ScopeDetailRep) -> SearchScopeResq:
async def post_scope_detai1(req, body: ScopeDetailRep) -> SearchScopeResq:
id = body.id
wave_range = body.wave_range
return await scope_detail_service(id, wave_range)
......
import traceback
import re
from pot_libs.logger import Logger
from pot_libs.settings import SETTING
class CLog(object):
def info(self, msg, save_file=None,
group_name=SETTING.mysql_db): # group_name组名一般为项目名,日志存放在此文件夹下
if not save_file:
save_file = get_save_file()
log_obj = Logger.get_logger(save_file, group_name)
log_obj.info(msg, stacklevel=2)
def error(self, msg, save_file=None, group_name=SETTING.mysql_db):
if not save_file:
save_file = get_save_file()
log_obj = Logger.get_logger(save_file, group_name)
log_obj.error(msg, stacklevel=2)
def warning(self, msg, save_file=None, group_name=SETTING.mysql_db):
if not save_file:
save_file = get_save_file()
log_obj = Logger.get_logger(save_file, group_name)
log_obj.warning(msg, stacklevel=2)
def get_save_file():
"""获取日志存储文件路径,以model_name作为存储文件名"""
stack_list = traceback.extract_stack()
filename = stack_list[-3].filename # 调用位置路径
re_str = f"(.*?){SETTING.mysql_db}/{SETTING.mysql_db}(.*?$)"
res_obj = re.search(re_str, filename)
if not res_obj:
return "default"
tmp_dir = res_obj.group(2)
file_name_list = tmp_dir.split("/")
if not file_name_list or len(file_name_list) == 1:
return "default"
save_file = file_name_list[1] # save_file = modules,script,services,tests,utils
if save_file != "modules":
return save_file.split(".")[0] # 防止出现save_file=xxx.py的情况
return file_name_list[2].split(".")[0]
LOGGER = CLog()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment