Commit 7fb7f69a authored by wang.wenrong's avatar wang.wenrong

anshiU

parent 6aff3dca
from dataclasses import dataclass
from pot_libs.sanic_api import Model
from pot_libs.common.components.fields import Cid
from pot_libs.sanic_api.column import Opt, Float, Int, Str, List, Enum
from unify_api.utils.response_code import DbErr, ParamErr, JwtErr, UserErr
@dataclass
class EquipManagementTotalReq(Model):
'''
设备管理汇总-请求格式
'''
cid: Cid
@dataclass
class EquipManagementListReq(Model):
'''
设备管理列表-请求格式
'''
cid: Cid
is_download: int = Int("是否是下载 0-不是 1-是").eg(0)
page_size: int = Opt(Int("页大小").eg(10))
page_num: int = Opt(Int("页码").eg(1))
@dataclass
class EquipRunStatusReq(Model):
'''
运行统计状态
'''
point_id: int = Opt(Int("监测点").eg(260))
@dataclass
class EquipManagementTotalResp(Model):
'''
设备管理汇总-返回格式
'''
installed_number: int = Int("安装点数").eg(1)
start_time: str = Str("启用时间").eg("2022-06-16")
@dataclass
class EquipManagementInfo(Model, DbErr, UserErr, JwtErr):
'''
设备管理列表详情-返回格式
'''
installed_location: str = Opt(Str("安装位置").eg("3#变压器"))
device_number: str = Opt(Str("设备编号").eg("A1904000083"))
wiring_type: str = Opt(Str("接线形式").eg("三表法"))
ct_change: str = Opt(Int("ct变比").eg(1000))
pt_change: str = Opt(Int("pt变比").eg(1000))
rated_voltage: int = Opt(Int("额定电压").eg(400))
start_time: str = Str("接线时间").eg("2022-06-17 16:42")
@dataclass
class EquipManagementListResp(Model, DbErr, UserErr, JwtErr):
'''
设备管理列表-返回格式
'''
rows: list = List("设备信息").items(EquipManagementInfo)
total: int = Int("总量")
page_num: int = Int("当前页").eg(1)
@dataclass
class EquipRunReq(Model):
'''
运行统计-请求格式
'''
cid: Cid
page_size: int = Opt(Int("每页记录数").eg(20))
page_num: int = Opt(Int("当前页码").eg(1))
is_download: int = Int("是否是下载 0-不是 1-是").eg(0)
start: str = Opt(Str("开始时间").eg("2020-05-01 00:00:00"))
end: str = Opt(Str("结束时间").eg("2020-05-01 23:59:59"))
point_ids: list = Opt(List("监测点 []表示没有 [-1]表示全部").eg([260, 261, 268]))
sort_field: list = Enum('排序字段').of('point_name', 'start_time', 'end_time',
'run_time')
sort_type: list = Enum('排序方向').of('asc', 'desc')
@classmethod
def example(cls):
return {
"cid": 154,
"point_ids": [
182,
421,
422,
423,
1752,
1753,
1754
],
"start": "2022-06-28 00:00:00",
"end": "2022-06-28 23:59:59",
"is_download": 0,
"page_num": 1,
"page_size": 20,
"sort_field": "start_time",
"sort_type": "desc"
}
@dataclass
class EquipRunInfo(Model, DbErr, UserErr, JwtErr):
'''
运行统计列表详情-返回格式
'''
point_name: str = Opt(Str("监测点").eg("3#变压器"))
start_time: str = Str("开启时间").eg("2022-06-17 16:42")
end_time: str = Str("关闭时间").eg("2022-06-21 16:42")
run_time: str = Str("运行时长").eg("1小时20分")
@dataclass
class EquipRunListResp(Model, DbErr, UserErr, ParamErr):
'''
运行统计-返回格式
'''
rows: list = List("运行信息").items(EquipRunInfo)
total: int = Int("总量")
page_num: int = Int("当前页").eg(1)
@dataclass
class EquipRunStatisticsReq(Model):
'''
运行统计数据-请求格式
'''
cid: Cid
start: str = Opt(Str("开始时间").eg("2020-05-01 00:00:00"))
end: str = Opt(Str("结束时间").eg("2020-05-01 23:59:59"))
point_ids: list = Opt(List("监测点 []表示没有 [-1]表示全部").eg([260, 261, 268]))
@classmethod
def example(cls):
return {
"cid": 84,
"point_ids": [
261,
262,
266,
268
],
"start": "2021-09-06 15:04:51",
"end": "2021-12-06 15:04:51",
}
@dataclass
class EquipRunStatisticsResp(Model, ParamErr):
'''
运行统计数据-返回格式
'''
count: int = Int("运行次数").eg(23)
run_all_time: str = Str("运行时长").eg("11小时21分")
run_avg_time: str = Str("平均时长").eg("1小时05分")
run_max_time: str = Str("最长时长").eg("2小时23分")
@dataclass
class EquipRunStatusResp(Model, ParamErr):
'''
运行统计状态-返回格式
'''
is_run: int = Int("是否运行:0-否、1-是 2-非动力设备").eg(1)
from dataclasses import dataclass
from pot_libs.sanic_api import Model
from pot_libs.sanic_api.column import List, Str, Int, Opt, Float
from pot_libs.common.components.fields import Cid, Item, DateTime
from unify_api.utils.response_code import DbErr, ParamErr
@dataclass
class FineMonitorChartReq(Model):
'''
精确监测-图表请求
'''
pid: int = Int("监测点id").eg(261)
start: str = Str("开始时间").eg("2022-06-20 00:00:00")
end: str = Str("结束时间").eg("2022-06-20 23:59:59")
location_ids: list = List("location_ids").eg([487, 488, 489, 490, 491])
@dataclass
class FineMonitorInfoReq(Model):
'''
精确监测-指标统计请求
'''
pid: int = Int("监测点id").eg(261)
start: str = Str("开始时间").eg("2022-06-20 00:00:00")
end: str = Str("结束时间").eg("2022-06-20 23:59:59")
location_ids: list = List("location_ids").eg([487, 488, 489, 490, 491])
@dataclass
class Chart(Model, DbErr):
'''图表格式'''
item: str = Opt(Str('类型').eg('A相'))
value_slots: list = Opt(List('数据列表').items(Float()))
@dataclass
class Statistics(Model, DbErr):
type: str = Str('指标类型').eg('temperature')
item: Item = Item
max: float = Float("最大值")
max_time: DateTime = DateTime
min: float = Float("最小值")
min_time: DateTime = DateTime
avg: float = Float('平均值')
@dataclass
class FineMonitorChartResp(Model, DbErr, ParamErr):
'''
精确监测-图表返回
'''
time_slots: list = List("横坐标时序").items(Float())
residual_current: list = List("漏电流").items(Chart)
temperature: list = List("温度").items(Chart)
power: list = List("功率(p表示有功,q表示无功)").items(Chart)
i: list = List("电流").items(Chart)
v: list = List("电压").items(Chart)
ctnum: int = Int("接线方式:包含两表法、三表法").eg(3)
@classmethod
def example(cls):
return {
"time_slots": ["00:00", "00:15"],
"ctnum": 3,
"residual_current": [{
"item": "漏电流",
"value_slots": [3.26, 3.31]
}],
"temperature": [{
"item": "A相",
"value_slots": [27.48, 27.43]
}, {
"item": "B相",
"value_slots": [28.04, 28.03]
}],
"power": [{
"item": "p",
"value_slots": []
}],
"i": [{
"item": "ia",
"value_slots": [12, 34]
}],
"v": [{
"item": "ua",
"value_slots": []
}]
}
@dataclass
class FineMonitorInfoResp(Model, DbErr, ParamErr):
'''
精确监测-指标统计返回
'''
info_list: list = List("指标统计列表").items(Statistics)
from dataclasses import dataclass
from pot_libs.common.components.fields import Cid, Item
from pot_libs.sanic_api import Model
from pot_libs.sanic_api.column import Int, List, Opt, Str, Float, Dict, Enum
from unify_api.utils.response_code import DbErr, ServerErr, ParamErr
@dataclass
class ScopeListReq(Model):
'''
录波识别记录-请求格式
'''
cid: Cid
page_size: int = Opt(Int("每页记录数").eg(20))
page_num: int = Opt(Int("当前页码").eg(1))
is_download: int = Int("是否是下载 0-不是 1-是").eg(0)
start: str = Opt(Str("开始时间").eg("2022-06-22 00:00:00"))
end: str = Opt(Str("结束时间").eg("2022-06-22 23:59:59"))
scope_g: str = Opt(List("录波颗粒度").eg(["2s"]))
pids: list = Opt(List("检测点 []表示没有 [-1]表示全部").eg([260, 261, 268]))
@dataclass
class ScopeListDownloadReq(Model):
'''
2s录波识记录下载-请求格式
'''
start: str = Opt(Str("开始时间").eg("2022-06-22 00:00:00"))
end: str = Opt(Str("结束时间").eg("2022-06-22 23:59:59"))
pids: list = Opt(List("监测点 []表示没有 [-1]表示全部").eg([260, 261, 268]))
@dataclass
class ScopeListItem(Model):
'''
录播识别记录-每一条数据
'''
check_dt: str = Str("触发时间").eg("2021-07-01 23:59:59")
point: str = Str('监测点名称').eg('土建区总进线')
message: str = Str('触发原因').eg('B相电压波动')
scope_type: int = Opt(Int('录波颗粒度').eg('0.25ms'))
doc_id: str = Opt(Str('该条信息id').eg('213_over_gap_i__1604302769'))
@dataclass
class ScopeListResp(Model):
'''
录波识别记录-返回格式
'''
total: int = Opt(Int("总条数").eg(20))
rows: list = Opt(List("总数据").items(ScopeListItem))
page_num: int = Int("当前页").eg(1)
@dataclass
class ScopeItemDownload(Model):
"""
录波下载数据详情
"""
datetime: str = Str("时间").eg("2021-07-01 23:59:59")
ua: float = Float('A相电压')
ub: float = Float('B相电压')
uc: float = Float('C相电压')
ia: float = Float('A相电流')
ib: float = Float('B相电流')
ic: float = Float('C相电流')
pttl: float = Float('功率')
lc: float = Float('漏电流')
@dataclass
class ScopeListDownloadResp(Model):
"""
录波下载数据
"""
rows: list = Opt(List("数据列表").items(ScopeItemDownload))
@dataclass
class ScopeContent(Model):
'''
识别详情图表-返回格式
'''
item: Item
value_datas: list = List().items(Float())
@dataclass
class ScopeDetailRep(Model):
'''
识别详情-请求格式
'''
id: str = Opt(Str("doc_id").eg("423_over_gap_i__1655991008"))
@classmethod
def example(cls):
return {
"id": "423_over_gap_i__1655991008"
}
@dataclass
class ScopeDetailsResp(Model, DbErr):
'''
识别详情-返回格式
'''
point: str = Str('监测点名称').eg('土建区总进线')
contin_time: int = Opt(Int('持续时间').eg(2))
scope_type: str = Str('录波颗粒度').eg('0.25ms')
check_dt: str = Str("发生时间").eg("2021-07-01 23:59:59")
ctnum: str = Str("接线法 2-两表法 3-三表法").eg(3)
location: int = Opt(Int('位置').eg(240))
item: str = Item
type: str = Opt(Str().eg('电流波动'))
i: list = List("电流事件录波").items(ScopeContent)
v: list = List("电压事件录波").items(ScopeContent)
residual_current: list = Opt(List("2s录波漏电流").items(ScopeContent))
p: list = Opt(List("2s录波功率").items(ScopeContent))
@classmethod
def example(cls):
return {
"point": "钻孔配电柜",
"contin_time": "400",
"check_dt": "2022-06-23 21:30:08",
"scope_type": "0.25ms",
"item": "A相",
"type": "电流波动",
"i": [{
"item": "ia",
"value_datas": [1.25, -0.34]
}],
"v": [{
"item": "ua",
"value_datas": [86.06, 123.34]
}],
"residual_current": [{
"item": "漏电流",
"value_datas": [86.06, 123.34]
}],
"power": [{
"item": "p",
"value_datas": [86.06, 123.34]
}],
}
@dataclass
class GetScopeConfigReq(Model):
'''
获取录播配置-请求格式
'''
pid: int = Int("监测点id").eg(1754)
@dataclass
class SetScopeConfigReq(Model):
'''
设置录播配置-请求格式
'''
pid: int = Int("监测点id").eg(20)
scope_type: str = Enum("录波颗粒度").of('0.25ms', '0.2s', '2s')
type: str = Enum("提交类型").of("state", "i", "v", "residual_current", "power",
"time")
state: int = Opt(Enum("开启关闭状态").of(0, 1))
umax: int = Opt(Int("电压上限").eg(430))
umin: int = Opt(Int("电压下限").eg(360))
ugap: int = Opt(Int("电压波动阈值").eg(50))
imax: int = Opt(Int("电流上限").eg(40))
igap: int = Opt(Int("电流波动阈值").eg(5))
lcmax: int = Opt(Int("漏电流上限").eg(40))
lcgap: int = Opt(Int("漏电流波动阈值").eg(40))
pttlmax: int = Opt(Int("功率上限").eg(5))
pttlgap: int = Opt(Int("功率波动阈值").eg(40))
one_time: list = Opt(List("第一段时间").items(Str()))
two_time: list = Opt(List("第二段时间").items(Str()))
three_time: list = Opt(List("第三段时间").items(Str()))
@dataclass
class GetScopeConfigResp(Model, ServerErr):
'''
获取录播配置-返回格式
'''
pid: int = Int("监测点id").eg(20)
rows: list = List("配置内容").items(SetScopeConfigReq)
@classmethod
def example(cls):
return {
"pid": 1754,
"rows": {
"0.25ms": {
"state": 1,
"configs": {
"umin": {
"name": "越下限阈值",
"type": "v",
"value": 85
},
"umax": {
"name": "越上限阈值",
"type": "v",
"value": 115
},
"ugap": {
"name": "波动阈值",
"type": "v",
"value": 25
},
"imax": {
"name": "越限阈值",
"type": "i",
"value": 5
},
"igap": {
"name": "波动阈值",
"type": "i",
"value": 20
}
}
},
"0.2s": {
"state": 1,
"configs": {
"umin": {
"name": "越下限阈值",
"type": "v",
"value": 187
},
"umax": {
"name": "越上限阈值",
"type": "v",
"value": 253
},
"ugap": {
"name": "波动阈值",
"type": "v",
"value": 50
},
"imax": {
"name": "越限阈值",
"type": "i",
"value": 5
},
"igap": {
"name": "波动阈值",
"type": "i",
"value": 1
}
}
},
"2s": {
"state": 1,
"configs": {
"umin": {
"name": "越下限阈值",
"type": "v",
"value": 187
},
"umax": {
"name": "越上限阈值",
"type": "v",
"value": 253
},
"ugap": {
"name": "波动阈值",
"type": "v",
"value": 50
},
"imax": {
"name": "越限阈值",
"type": "i",
"value": 5
},
"igap": {
"name": "波动阈值",
"type": "i",
"value": 1
},
"lcmax": {
"name": "越限阈值",
"type": "residual_current",
"value": 1
},
"lcgap": {
"name": "波动阈值",
"type": "residual_current",
"value": 1
},
"pttlmax": {
"name": "越限阈值",
"type": "power",
"value": 5
},
"pttlgap": {
"name": "波动阈值",
"type": "power",
"value": 2
},
"one_time": {
"name": "第一个时间段",
"type": "time",
"value": [
"08:00",
"08:30"
]
},
"two_time": {
"name": "第二个时间段",
"type": "time",
"value": [
"09:00",
"10:00"
]
},
"three_time": {
"name": "第三个时间段",
"type": "time",
"value": [
"14:00",
"15:00"
]
}
}
}
}
}
@dataclass
class InitScopeConfigReq(Model):
'''
初始化配置信息--请求
'''
pids: list = List('pids')
@classmethod
def example(cls):
return {
'pids': [238, 240, 242, 330, 343, 749, 1463, 2248]
}
@dataclass
class FlushScopeEsDataReq(Model):
'''
刷新es录波数据--请求
'''
scope_type_list: list = List('录波类型')
start_time: str = Str('开始时间')
end_time: str = Str('结束时间')
@classmethod
def example(cls):
return {
'scope_type_list': ['200ms', '2s', '0.25ms'],
'start_time': '2022-07-04 17:40:24',
'end_time': '2022-07-21 18:40:24',
}
@dataclass
class GetScopeConfigList(Model):
'''
获取录播配置列表
'''
state: int = Enum("状态 0-关 1-开").of(0, 1)
configs: list = List("返回配置")
@dataclass
class SetScopeConfigResp(Model):
'''
设置录播配置-返回格式
'''
success: int = Int('是否操作成功 0-否 1-是').eg(1)
message: str = Str("返回信息").eg("操作成功")
# 识别记录列表请求举例
scope_list_req_example = {
"范例1": {
"cid": 114,
"page_size": 20,
"page_num": 1,
"is_download": 0,
"start": "2022-06-22 00:00:00",
"end": "2022-06-22 23:59:59",
"pids": []
},
"范例2": {
"cid": 44,
"page_size": 20,
"page_num": 1,
"is_download": 0,
"start": "2022-06-22 00:00:00",
"end": "2022-06-22 23:59:59",
"pids": [260, 261, 268]
}
}
'''
设置录波配置格式
'''
set_scope_config_example = {
"0.25ms设置电压范例": {
"pid": 1754,
"scope_type": "0.25ms",
"type": "v",
"umax": 430,
"umin": 360,
"ugap": 50,
},
"0.25ms设置状态范例": {
"pid": 1754,
"scope_type": "0.25ms",
"type": "state",
"state": 1,
},
"0.2s设置电流范例": {
"pid": 1754,
"scope_type": "0.2s",
"type": "i",
"imax": 40,
"igap": 5,
},
"2s设置漏电流范例": {
"pid": 1754,
"scope_type": "2s",
"type": "residual_current",
"lcmax": 30,
"lcgap": 12
},
"2s设置功率范例": {
"pid": 1754,
"scope_type": "2s",
"type": "power",
"pttlmax": 5,
"pttlgap": 40,
},
"2s设置时间段范例": {
"pid": 1754,
"scope_type": "2s",
"type": "time",
"one_time": ['08:00', '09:00'],
"two_time": ['10:00', '10:30'],
"three_time": ['13:00', '14:00'],
},
}
'''
初始化录播的配置
'''
init_scope_config_example = {
"coef": {
"Kua": 2.843364,
"Bua": 2070.924316,
"Kub": 2.849909,
"Bub": 2058.210205,
"Kuc": 2.854909,
"Buc": 2054.158447,
"Kia": 9.65814,
"Bia": 2056.642578,
"Kib": 9.652307,
"Bib": 2054.431152,
"Kic": 9.65714,
"Bic": 2056.80249
},
"threshold": {
"0.25ms": {
"en_scope": 0,
"umin": 187,
"umax": 253,
"ugap": 11.5,
"igap": 8,
"imax": 167,
},
"0.2s": {
"en_wave_200ms": 0,
"umin": 187,
"umax": 253,
"ugap": 11.5,
"igap": 8,
"imax": 167,
"scop_limit": 50
},
"2s": {
"en_electric_2s": 0,
"start_time_I": "11:00:00",
"stop_time_I": "12:00:00",
"start_time_II": "14:00:00",
"stop_time_II": "15:00:00",
"start_time_III": "15:00:00",
"stop_time_III": "16:00:00",
"umin": 187,
"umax": 253,
"ugap": 11.5,
"igap": 15,
"imax": 167,
"lcmax": 30,
"lcgap": 12,
"pttlgap": 10,
"pttlmax": 110
}
}
}
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
from unify_api.modules.product_info.procedures.hardware_pds import (
get_user_hardware_info, hardware_statistics)
async def check_company_exist(company_id):
'''
判断工厂是否存在
'''
raw_sql = "select count(*) as company_count from company where cid = %s"
async with MysqlUtil() as conn:
company_count = await conn.fetchone(sql=raw_sql, args=(company_id))
return company_count.get('company_count') > 0
async def equip_management_list(company_id, page_num, page_size):
'''
获取设备管理的监测点列表,先保留老的写法,后面1.0改版的时候统一改
'''
datas = await get_user_hardware_info(company_id, page_num, page_size)
return_fields = (
"installed_location", "device_number", "wiring_type", "ct_change",
"pt_change", "rated_voltage", "start_time")
return_datas = []
for data in datas.get('rows'):
return_one = {}
for return_field in return_fields:
return_one[return_field] = data.get(return_field)
return_datas.append(return_one)
datas['rows'] = return_datas
return datas
async def equip_management_total(company_id):
'''
获取设备管理的汇总信息
'''
datas = await hardware_statistics(company_id)
return datas
async def equip_run_list(company_id, point_ids, start_time, end_time,
page_num, page_size, sort_field, sort_type):
'''
获取设备运行记录
'''
async with MysqlUtil() as conn:
raw_sql = "SELECT {} from scope_equip_run_record s " \
"left join (select pid,max(id) max_id " \
"from scope_equip_run_record group by pid) sp " \
"on s.pid = sp.pid " \
"left join point p on s.pid=p.pid " \
"left join monitor_reuse r on p.mtid = r.mtid " \
"where " \
"(p.cid=%s or r.cid = %s) and s.start_time " \
"BETWEEN %s and %s and " \
"(s.end_time > 0 or (s.end_time = 0 and s.id = sp.max_id)) "
if point_ids:
raw_sql += " and s.pid in %s"
args = (
company_id,
company_id,
start_time,
end_time,
tuple(point_ids)
)
else:
args = (
company_id,
company_id,
start_time,
end_time,
)
# 先总数
count_sql = raw_sql.format("count(*) as run_count", "")
count_result = await conn.fetchone(sql=count_sql, args=args)
list_result = []
if count_result.get("run_count", 0) > 0:
# 排序字段处理
if sort_field == 'point_name':
sort_field = 'p.name'
elif sort_field == 'run_time':
sort_field = '(s.end_time-s.start_time)'
# 再分页列表
raw_sql = raw_sql.format(
"s.pid,p.name point_name,s.start_time,s.end_time",
)
raw_sql += " order by {} {} LIMIT %s OFFSET %s".format(sort_field,
sort_type)
if point_ids:
args = (company_id,
company_id,
start_time,
end_time,
tuple(point_ids),
page_size,
(page_num - 1) * page_size
)
else:
args = (company_id,
company_id,
start_time,
end_time,
page_size,
(page_num - 1) * page_size
)
list_result = await conn.fetchall(sql=raw_sql,
args=args)
return list_result, count_result.get("run_count", 0)
async def equip_run_statistics(company_id, point_ids, start_time, end_time):
'''
获取运行统计数据
'''
dura_time = "case when end_time > 0 then end_time-start_time else 0 end"
async with MysqlUtil() as conn:
count_sql = f"SELECT count(*) as total_count," \
f"avg({dura_time}) as avg_time," \
f"sum({dura_time}) as all_time," \
f"max({dura_time}) as max_time " \
"from scope_equip_run_record s " \
"left join (select pid,max(id) max_id from " \
"scope_equip_run_record group by pid) sp " \
"on s.pid = sp.pid " \
"left join point p on s.pid=p.pid " \
"left join monitor_reuse r on p.mtid = r.mtid " \
"where (p.cid=%s or r.cid = %s) " \
"and s.start_time BETWEEN %s and %s and (s.end_time > 0 " \
"or (s.end_time = 0 and s.id = sp.max_id)) "
if point_ids:
count_sql += " and s.pid in %s"
args = (
company_id,
company_id,
start_time,
end_time,
tuple(point_ids)
)
else:
args = (
company_id,
company_id,
start_time,
end_time,
)
count_result = await conn.fetchone(sql=count_sql, args=args)
return count_result
async def get_equip_run_status(point_id):
'''
获取当前设备是否正在运行
'''
async with MysqlUtil() as conn:
# 是否非动力设备
power_equip_sql = "select is_power_equipment from monitor m " \
"left join point p on m.mtid = p.mtid " \
"where p.pid = %s"
power_equip_result = await conn.fetchone(sql=power_equip_sql,
args=(point_id,))
if power_equip_result.get("is_power_equipment", 0) == 0:
return 2
raw_sql = "select count(*) run_count from scope_equip_run_record s " \
"left join (select pid,max(id) max_id from " \
"scope_equip_run_record group by pid) sp " \
"on s.pid = sp.pid " \
"WHERE s.pid= %s and start_time < unix_timestamp(NOW()) " \
"and (end_time > unix_timestamp(NOW()) or " \
"(end_time=0 and id=max_id)) "
result = await conn.fetchone(sql=raw_sql, args=(point_id,))
return 1 if result.get("run_count") > 0 else 0
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.es_util.es_utils import EsUtil
from unify_api import constants
async def get_location_by_ids(location_ids):
'''
根据location_id获取各项温度信息
'''
async with MysqlUtil() as conn:
sql = "select id,item,type from location where id in %s"
result = await conn.fetchall(sql, args=(tuple(location_ids),))
location_info = {res.get('id'): res for res in result}
return location_info
async def get_threshold_by_location(location_ids, type='overResidualCurrent',
default_threshold=30):
'''
根据location_id获取阈值
'''
async with MysqlUtil() as conn:
sql = "select threshold from alarm_setting where location_id in %s " \
"and type = %s limit 1 "
settings = await conn.fetchall(sql, args=(tuple(location_ids), type))
if settings:
return settings[0]["threshold"]
return default_threshold
async def get_es_aiao_15min_data(query_body):
'''
从es中获取环境相关数据(15min)
'''
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body,
index=constants.LOCATION_15MIN_AIAO)
return es_results.get("aggregations", {})
async def get_es_point_15min_data(query_body):
'''
从es中获取电力相关数据(15min)
'''
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body,
index=constants.POINT_15MIN_INDEX)
return es_results.get("aggregations", {})
async def get_es_aiao_1min_data(query_body, start):
'''
从es中获取环境相关数据(1min)
'''
async with EsUtil() as es:
# 环境相关qmin的数据需要分表查询
p_database = "poweriot_location_1min_aiao_" + start[:4] + "_" + \
str(int(start[5:7]))
es_results = await es.search_origin(body=query_body,
index=p_database)
return es_results.get("hits", {}).get("hits", {})
async def get_es_point_1min_data(query_body, start):
'''
从es中获取电气相关数据(1min)
'''
async with EsUtil() as es:
# 电气相关数据1min的需要分表查询
p_database = "poweriot_point_1min_index_" + start[:4] + "_" + \
str(int(start[5:7]))
es_results = await es.search_origin(body=query_body,
index=p_database)
return es_results.get("hits", {}).get("hits", {})
import json
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
async def get_scope_config_by_pid(pid):
'''
获取某个设备的录波配置
'''
async with MysqlUtil() as conn:
sql = "select threshold from scope_config_record where pid = %s"
result = await conn.fetchone(sql=sql, args=(pid,))
return result.get('threshold', {}) if result else {}
async def set_scope_config_by_pid(pid, config):
'''
设置录波配置
'''
try:
async with MysqlUtil() as conn:
sql = "update scope_config_record set threshold=%s where pid=%s"
result = await conn.execute(sql=sql, args=(config, pid))
except Exception as e:
log.error('set_scope_config_by_pid update error:' + str(e))
return -1
return result
async def add_scope_config_by_pid(pid, configs):
'''
增加录波配置
'''
try:
async with MysqlUtil() as conn:
sql = "INSERT INTO `power_iot`.`scope_config_record` (`pid`, " \
"`coef`, `threshold`, `vol_cur`) VALUES (%s, %s, %s, %s)"
result = await conn.execute(sql=sql, args=(pid,
json.dumps(
configs["coef"]),
json.dumps(configs[
"threshold"]),
json.dumps(configs)))
except Exception as e:
log.error('add_scope_config_by_pid add error:' + str(e))
return -1
return result
async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"):
"""
ES查询2s录波记录数据
"""
query_body = {
"size": 10000,
"query": {
"bool": {
"must": [
{"terms": {"point_id": pids}},
{"term": {"scope_g": {"value": scope_g}}},
{"range": {"datetime": {"gte": start_dt, "lt": end_dt}}}
]
}
},
"sort": [{"datetime": {"order": "asc"}}]
}
async with EsUtil() as es:
datas = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_SCOPE)
return datas["hits"]["hits"]
\ No newline at end of file
from unify_api.modules.anshiu.procedures.equip_management_pds import \
equip_run_list, equip_run_statistics
from unify_api.modules.anshiu.components.equip_management_cps import \
EquipRunInfo
from unify_api.utils.time_format import get_time_duration, get_datetime_str, \
get_date_timestamp, get_time_duration_by_str
async def equip_run_list_serv(company_id, point_ids, start_time, end_time,
page_num, page_size, sort_field, sort_type):
'''
获取运行统计列表
'''
start_time = get_date_timestamp(start_time)
end_time = get_date_timestamp(end_time)
rows, total = await equip_run_list(company_id, point_ids, start_time,
end_time, page_num, page_size,
sort_field, sort_type)
run_lists = []
for row in rows:
run_time = get_time_duration(row['start_time'], row['end_time']) if \
row['start_time'] and row['end_time'] else ''
start_time_one = get_datetime_str(row['start_time']) if row[
'start_time'] else ''
end_time_one = get_datetime_str(row['end_time']) if row['end_time'] \
else ''
equip_run_info = EquipRunInfo(point_name=row['point_name'],
start_time=start_time_one,
end_time=end_time_one,
run_time=run_time)
run_lists.append(equip_run_info)
return run_lists, total
async def equip_run_statistics_serv(company_id, point_ids, start_time,
end_time):
'''
运行统计数据
'''
start_time = get_date_timestamp(start_time)
end_time = get_date_timestamp(end_time)
data = await equip_run_statistics(company_id, point_ids, start_time,
end_time)
data['all_time'] = get_time_duration_by_str(data['all_time']) if data[
'total_count'] else ''
data['avg_time'] = get_time_duration_by_str(data['avg_time']) if data[
'total_count'] else ''
data['max_time'] = get_time_duration_by_str(data['max_time']) if data[
'total_count'] else ''
return data
from operator import itemgetter
from itertools import groupby
from pot_libs.common.components.query import Range, Equal, Filter, InGroup, \
Sort
from pot_libs.es_util import es_helper
from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from pot_libs.common.components.query import PageRequest
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.utils import time_format
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type
)
from unify_api.modules.anshiu.components.fine_monitor_cps import (
Statistics, Chart
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_es_aiao_1min_data, get_es_point_1min_data, get_es_point_15min_data,
get_es_aiao_15min_data, get_threshold_by_location
)
from unify_api.utils.time_format import convert_timestamp_to_str
async def get_adio_chart_data(location_group, location_info,
start_timestamp,
end_timestamp, intervel, slots):
'''
获取环境(温度与漏电流)的曲线数据
'''
# 工况标准,取其中一个漏电流阈值
residual_current_threhold = await get_threshold_by_location(
location_group)
# 组装es请求信息
range = Range(field="time", start=start_timestamp, end=end_timestamp)
in_group = InGroup(field="location_id", group=location_group)
filter = Filter(equals=[], ranges=[range], in_groups=[in_group],
keywords=[])
if intervel > 60:
# 取时间间隔为15min的数据
temp, res = await get_adio_15min_chart_data(
filter,
residual_current_threhold,
location_info,
slots,
intervel
)
else:
# 取时间间隔为1min的数据
temp, res = await get_adio_1min_chart_data(
filter,
residual_current_threhold,
location_info,
slots,
start_timestamp
)
return temp, res
async def get_adio_15min_chart_data(filter, residual_current_threhold,
location_info, slots, intervel):
'''
获取15min环境的曲线数据
'''
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# 组装es请求信息
query_body = EsQuery.aggr_history(
page_request,
interval=intervel,
stats_items=[
"value_max",
]
)
# 分组字段
format_field = "location_id_{}_aggs"
for location_id, _ in location_info.items():
group_field = format_field.format(location_id)
query_body['aggs'][group_field] = {
"filter": {"term": {"location_id": location_id}},
"aggs": {
group_field: query_body['aggs']['aggs_name']
}
}
del query_body['aggs']['aggs_name']
aggs_res = await get_es_aiao_15min_data(query_body)
# 时间类型
date_type = "day" if intervel == 24 * 3600 else "minute"
# 温度及漏电流
temperature_list = []
residual_currents_list = []
for location_id, item_info in location_info.items():
group_field = format_field.format(location_id)
data_buckets = (
aggs_res.get(group_field, {}).get(group_field, {}).get("buckets",
[])
)
# 时间戳转换为时分并作为键值返回
_data = es_helper.process_es_aggs_aiao(
data_buckets, time_key="key", value_key="value_max",
date_type=date_type,
)
# 取最大值
his_data = [_data[slot].get('max', "") if slot in _data else "" for
slot in slots]
adio_his = Chart(item='', value_slots=his_data)
if item_info.get("type") == "residual_current":
adio_his.item = "漏电流"
adio_his.threhold = residual_current_threhold
residual_currents_list.append(adio_his)
else:
adio_his.item = item_info.get("item", "")
temperature_list.append(adio_his)
return temperature_list, residual_currents_list
async def get_adio_1min_chart_data(filter, residual_current_threhold,
location_info, slots, start_timestamp):
'''
获取1min环境的曲线数据
'''
sort = Sort(field='location_id', direction='asc')
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
query_body = EsQuery.query(page_request)
es_time = convert_timestamp_to_str(start_timestamp)
es_res = await get_es_aiao_1min_data(query_body, es_time)
# 温度及漏电流
temperature_list = []
residual_currents_list = []
es_res = [v.get('_source') for v in es_res]
for location_id, item in groupby(es_res, key=itemgetter('location_id')):
# 转成数组的形式
item_dict = [value for value in item]
# 获取location类型
item_location_info = location_info.get(location_id, {})
# 时间戳转换为时分并作为键值返回
_data = es_helper.process_es_aggs_aiao(
item_dict, time_key="time", value_key="value",
date_type="minute",
)
his_data = [_data.get(slot, "") for slot in slots]
adio_his = Chart(item='', value_slots=his_data)
if item_location_info.get("type") == "residual_current":
adio_his.item = "漏电流"
adio_his.threhold = residual_current_threhold
residual_currents_list.append(adio_his)
else:
adio_his.item = item_location_info.get("item", "")
temperature_list.append(adio_his)
return temperature_list, residual_currents_list
async def get_point_chart_data(point_id, start_timestamp, end_timestamp, \
intervel, slots):
'''
获取电气量
'''
# 获取当前监测点的接表法
ctnum, _ = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(
f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum , 监测点已经拆除")
# 返回的数值不在2,3中的,一般是装置点已经拆除。默认先给一个默认值3
ctnum = 3
if intervel > 60:
elec_data = await get_point_15min_chart_data(point_id,
ctnum,
slots,
start_timestamp,
end_timestamp,
intervel)
else:
elec_data = await get_point_1min_chart_data(point_id,
ctnum,
slots,
start_timestamp,
end_timestamp)
# 功率、电流、电压
power, i, u = [], [], []
for item, value_slots in elec_data.items():
data = Chart(item=item.replace("_mean", ""), value_slots=value_slots)
if item.startswith("p") or item.startswith("q"):
power.append(data)
elif item.startswith("i"):
i.append(data)
elif item.startswith("u"):
u.append((data))
return power, i, u, ctnum
async def get_point_15min_chart_data(point_id, ctnum, slots, start_timestamp,
end_timestamp, intervel):
'''
获取15min电气量
'''
es_start = my_pendulum.from_timestamp(start_timestamp)
es_end = my_pendulum.from_timestamp(end_timestamp)
range = Range(field="quarter_time", start=es_start, end=es_end)
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# 统计字段
if ctnum == 2:
stats_items = [
"pttl_mean",
"qttl_mean",
"uab_mean",
"ucb_mean",
"ia_mean",
"ic_mean",
]
else:
stats_items = [
"pttl_mean",
"qttl_mean",
"ua_mean",
"ub_mean",
"uc_mean",
"ia_mean",
"ib_mean",
"ic_mean",
]
query_body = EsQuery.aggr_history_new(
page_request,
interval=intervel,
stats_items=stats_items,
histogram_field="quarter_time",
)
aggs_res = await get_es_point_15min_data(query_body)
data_buckets = aggs_res.get("aggs_name", {}).get("buckets", [])
time_bucket_map = {i["key_as_string"]: i for i in data_buckets}
log.info(f"post_elec_history slots={slots} _data = "
f"{list(time_bucket_map.keys())}, query_body={query_body}")
elec_data = {stats_item: [] for stats_item in stats_items}
for slot in slots:
if slot in time_bucket_map:
for stats_item in stats_items:
# 获取平均值
value = time_bucket_map[slot].get(stats_item, {}).get("avg",
"")
elec_data[stats_item].append(value)
else:
for stats_item in stats_items:
elec_data[stats_item].append("")
return elec_data
async def get_point_1min_chart_data(point_id, ctnum, slots, start_timestamp,
end_timestamp):
'''
获取1min电气量
'''
range = Range(field="time", start=start_timestamp, end=end_timestamp)
equal = Equal(field="point_id", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
sort = Sort(field='time', direction='asc')
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
# 统计字段
if ctnum == 2:
stats_items = [
"pttl",
"qttl",
"uab",
"ucb",
"ia",
"ic",
]
else:
stats_items = [
"pttl",
"qttl",
"ua",
"ub",
"uc",
"ia",
"ib",
"ic",
]
query_body = EsQuery.query(page_request)
es_time = convert_timestamp_to_str(start_timestamp)
es_res = await get_es_point_1min_data(query_body, es_time)
_data = {}
for res in es_res:
_time = convert_timestamp_to_str(res.get('_source', {}).get('time', 0),
date_type='minute')
_data[_time] = res.get('_source')
elec_data = {}
for slot in slots:
for stats_item in stats_items:
data = _data.get(slot).get(stats_item, "") if slot in _data else ""
elec_data.setdefault(stats_item, []).append(data)
return elec_data
async def get_adio_info_data(location_group, location_info, start_timestamp,
end_timestamp):
'''
获取环境相关数据
'''
range = Range(field="time", start=start_timestamp, end=end_timestamp)
# 分别统计各个location温度最大值、最小值、平均值
stats_info = {}
for location_id in location_group:
equal = Equal(field="location_id", value=location_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[],
keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
query_body = EsQuery.aggr_index(
page_request, stats_items=["value_max", "value_min", "value_avg"]
)
aggregations = await get_es_aiao_15min_data(query_body)
# 最大值, 这里叫法有点奇怪,但是最大值应该取15min的最大值聚合结果
max_info = aggregations.get("value_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max = source.get("value_max", 0)
max = round(max, 2) if max is not None else ""
max_ts = source.get("value_max_time", 0)
max_value_time = str(time_format.convert_to_dt(max_ts))
else:
max = ""
max_value_time = ""
# 最小值
min_info = aggregations.get("value_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min = source.get("value_min", 0)
min = round(min, 2) if min is not None else ""
min_ts = source.get("value_min_time", 0)
min_value_time = str(time_format.convert_to_dt(min_ts))
else:
min = ""
min_value_time = ""
avg = aggregations.get("value_avg_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
stats_info[location_id] = {
"max": {"value": max, "time": max_value_time},
"min": {"value": min, "time": min_value_time},
"avg": avg,
}
# 返回
adio_indexes = []
for location_id, info in location_info.items():
item, type = info["item"], info["type"]
# 漏电流的item更改一下
item = '漏电流' if type == 'residual_current' else item
_info = stats_info[location_id]
adio_index = Statistics(
type=type,
item=item,
max=_info["max"]["value"],
max_time=_info["max"]["time"],
min=_info["min"]["value"],
min_time=_info["min"]["time"],
avg=_info["avg"],
)
adio_indexes.append(adio_index)
return adio_indexes
async def get_point_info_data(point_id, start_time,
end_time):
# 2. 获取几表法
ctnum, _ = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(
f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum, 装置点已经被拆!")
# 给默认值3表法
ctnum = 3
range = Range(field="quarter_time", start=start_time,
end=end_time)
equal = Equal(field="pid", value=point_id)
filter = Filter(equals=[equal], ranges=[range], in_groups=[], keywords=[])
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if ctnum == 2:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"uab_mean",
"uab_min",
"uab_max",
"ucb_mean",
"ucb_min",
"ucb_max",
"ia_mean",
"ia_min",
"ia_max",
"ic_mean",
"ic_min",
"ic_max",
]
else:
stats_items = [
"pttl_mean",
"pttl_min",
"pttl_max",
"qttl_mean",
"qttl_min",
"qttl_max",
"ua_mean",
"ua_min",
"ua_max",
"ub_mean",
"ub_min",
"ub_max",
"uc_mean",
"uc_min",
"uc_max",
"ia_mean",
"ia_min",
"ia_max",
"ib_mean",
"ib_min",
"ib_max",
"ic_mean",
"ic_min",
"ic_max",
]
query_body = EsQuery.aggr_index(page_request, stats_items=stats_items)
aggregations = await get_es_point_15min_data(query_body)
# 常规参数统计
common_indexes = []
_stats_items = {i.rsplit("_", 1)[0] for i in stats_items}
for item in _stats_items:
# 最大值
max_info = aggregations.get(f"{item}_max_max", {})
hits = max_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
max_value = source.get(f"{item}_max", "")
max_dt = source.get(f"{item}_max_time")
if max_dt is None:
log.error(
f"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}")
max_value_time = str(
time_format.convert_to_dt(max_dt) if max_dt else "")
else:
max_value = ""
max_value_time = ""
# 最小值
min_info = aggregations.get(f"{item}_min_min", {})
hits = min_info.get("hits", {}).get("hits")
if hits:
source = hits[0].get("_source", {})
min_value = source.get(f"{item}_min")
min_value = min_value if min_value is not None else ""
min_dt = source.get(f"{item}_min_time")
min_value_time = str(time_format.convert_to_dt(min_dt) if min_dt
else "")
else:
min_value = ""
min_value_time = ""
# 平均值
avg = aggregations.get(f"{item}_mean_avg", {}).get("value")
avg = round(avg, 2) if avg is not None else ""
elec_index = Statistics(
item=item,
max=max_value,
max_time=max_value_time,
min=min_value,
min_time=min_value_time,
avg=avg,
)
common_indexes.append(elec_index)
return common_indexes
import json
import time
import pandas as pd
from pandas.core.dtypes.inference import is_number
from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil
from pot_libs.aredis_util import aredis_utils
from pot_libs.common.components.query import PageRequest, Filter, InGroup, \
Range, Equal, Sort
from pot_libs.es_util.es_query import EsQuery
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.utils.exc_util import BusinessException
from pot_libs.utils.pendulum_wrapper import my_pendulum
from pot_libs.utils.time_format import convert_dt_to_timestr, \
convert_to_es_str, time_str_to_str
from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeListItem, ScopeContent, ScopeDetailsResp, GetScopeConfigList, \
init_scope_config_example, ScopeItemDownload
from unify_api.modules.anshiu.procedures.scope_operations_pds import \
get_scope_config_by_pid, set_scope_config_by_pid, add_scope_config_by_pid, \
get_scope_list_by_pid
from unify_api.modules.common.dao.common_dao import point_by_points, \
points_by_cid
from unify_api.modules.device_cloud.procedures.mqtt_helper import \
change_param_to_config
from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope_pids, \
query_search_scope
from unify_api.modules.zhiwei_u.service.scope_operations_service import \
scope_detail_service
from unify_api.utils import time_format
from unify_api.utils.time_format import get_time_duration, \
get_current_datetime_str, convert_str_to_timestamp
async def search_scope_service(pids, cid, page_num, page_size, start, end,
scope_g):
'''
获取录波记录
'''
if len(pids) == 0:
datas = await query_search_scope([cid], '', page_num, page_size,
start, end, scope_g)
points = await points_by_cid([cid])
else:
datas = await query_search_scope_pids(pids, page_num, page_size,
start, end, scope_g)
points = await point_by_points(list(set(pids)))
if not datas["hits"]["hits"]:
return [], 0
total = datas["hits"]["total"]
# 获取监测点数据
points = {point['pid']: point for point in points}
# 获取录波明细数据以获取颗粒度
# scope_type_items = await get_scope_type(pids, cid, start, end)
rows = []
for info in datas["hits"]["hits"]:
pid = info["_source"]["point_id"]
message = info["_source"]["message"]
check_dt = info["_source"]["datetime"]
# scope_type_key = "%s_%s" % (pid, info["_source"]["time"])
# scope_type = scope_type_items.get(scope_type_key, 0)
scope_type = info["_source"].get("scope_g", '')
if scope_type == '200ms':
scope_type = '0.2s'
dt = time_format.convert_to_dt(check_dt)
check_dt = time_format.convert_dt_to_timestr(dt)
scope_list_item = ScopeListItem(check_dt=check_dt,
scope_type=scope_type,
point=points[pid].get("name"),
message=message,
doc_id=info["_id"]
)
rows.append(scope_list_item)
return rows, total
async def scope_list_download_data(pids, start, end):
"""
获取录波下载数据
"""
duration = get_time_duration(start, end, False, False)
if duration > 24 * 60 * 60:
raise BusinessException(message='只能支持下载一天的录波数据')
start_dt = convert_to_es_str(start)
end_dt = convert_to_es_str(end)
datas = await get_scope_list_by_pid(pids, start_dt, end_dt)
scope_data = []
scope_fields = [
"ua", "ub", "uc", "ia", "ib", "ic", "pttl", "lc",
]
for hit in datas:
_source = hit["_source"]
context = json.loads(_source["context"])
# 只导出录波的数据
wave_data = context.get("ia")
if not wave_data:
continue
for i, _ in enumerate(wave_data):
datetime = _source["datetime"][:19].replace('T', ' ')
one_scope_data = {}
for field in scope_fields:
if field in context:
value = context.get(field)[i]
value = "" if pd.isna(value) else value
else:
value = None
one_scope_data[field] = value
item = ScopeItemDownload(datetime=datetime, **one_scope_data)
scope_data.append(item)
return scope_data
async def scope_detail_data(doc_id):
'''
识别详情
'''
# 暂时先默认查询所有的
wave_range = "all"
scope_detail_obj = await scope_detail_service(doc_id, wave_range)
# 录波颗粒度
scope_type = scope_detail_obj.scope_g
if scope_type == '200ms':
scope_type = '0.2s'
if scope_detail_obj.ctnum == 2:
i_fields = ['ia', 'ic']
v_fields = ['uab', 'ucb']
else:
i_fields = ['ia', 'ib', 'ic']
v_fields = ['ua', 'ub', 'uc']
i = [ScopeContent(item=item, value_datas=getattr(
scope_detail_obj.contents, item, [])) for item in i_fields]
v = [ScopeContent(item=item, value_datas=getattr(
scope_detail_obj.contents, item, [])) for item in v_fields]
# 漏电流及功率
if scope_type == '2s':
residual_current = [float(str(v).replace('nan', '0'))
for v in scope_detail_obj.contents.lc]
residual_current = [
ScopeContent(item='漏电流', value_datas=residual_current)]
power = [float(str(v).replace('nan', '0'))
for v in scope_detail_obj.contents.pttl]
power = [ScopeContent(item='总有功功率', value_datas=power)]
else:
residual_current = []
power = []
# 当触发项为漏电流或者功率的时候 item为空
if scope_detail_obj.type.find('漏电流') >= 0 or scope_detail_obj.type.find(
'功率') >= 0:
item = ''
else:
item = scope_detail_obj.item
return ScopeDetailsResp(point=scope_detail_obj.alarm.point,
contin_time=scope_detail_obj.alarm.contin_time,
scope_type=scope_type,
check_dt=scope_detail_obj.alarm.check_dt,
ctnum=scope_detail_obj.ctnum,
location=scope_detail_obj.location,
item=item,
type=scope_detail_obj.type,
i=i,
v=v,
residual_current=residual_current,
p=power
)
async def get_scope_type(pids, cid, start, end):
'''
获取录波颗粒度
'''
if len(pids) == 0:
equal = [Equal(field="cid", value=cid)]
groups = []
else:
equal = []
groups = [InGroup(field='point_id', group=pids)]
if start and end:
start_time = convert_str_to_timestamp(start)
end_time = convert_str_to_timestamp(end)
range = [Range(field='time', start=start_time, end=end_time)]
else:
range = []
filter = Filter(equals=equal, keywords=[], ranges=range,
in_groups=groups)
page_request = PageRequest(page_size=10000, page_num=1, sort=None,
filter=filter)
query_body = EsQuery.query(page_request)
query_body['_source'] = ["point_id", "time", "scope_g"]
rows = {}
async with EsUtil() as es:
es_results = await es.search_origin(
body=query_body,
index=config.POINT_1MIN_SCOPE)
if not es_results["hits"]["hits"]:
return {}
for info in es_results["hits"]["hits"]:
scope_type_key = "%s_%s" % (info["_source"]["point_id"],
info["_source"]["time"])
rows[scope_type_key] = info["_source"].get("scope_g",
'0.25ms') or '0.25ms'
return rows
async def set_scope_config_serv(pid, type, scope_type, args):
'''
设置某个设备的录波配置
'''
# 先查看缓存中是否存在,如果存在直接返回
redis_key = f"scope_config:{pid}"
redis_result = await aredis_utils.RedisClient().get(redis_key)
log.info(f'set_scope_config_serv redis_result:{redis_result}')
if redis_result:
raise Exception('两次操作间隔3分钟,请稍后再试。')
# 先获取录波的配置
configs = await get_scope_config_by_pid(pid)
# 如果存在,则解析,如果不存在,初始化
if configs:
configs = json.loads(configs)
if not configs or '2s' not in configs or '0.25ms' not in configs or \
'0.2s' not in configs:
try:
configs = await init_scope_config(pid)
except Exception as e:
raise Exception('初始化配置信息出错:' + str(e))
# # 重置配置信息
if type == 'time':
if scope_type != '2s':
raise Exception('只有录波颗粒度为2s的支持时间设置')
# 时间配置
for field, item in args.items():
if field == 'one_time':
start_field, stop_field = 'start_time_I', 'stop_time_I'
elif field == 'two_time':
start_field, stop_field = 'start_time_II', 'stop_time_II'
elif field == 'three_time':
start_field, stop_field = 'start_time_III', 'stop_time_III'
else:
continue
# 验证时间差
if len(item) not in [0, 2]:
log.error('set_scope_config_serv time_param error' + str(item))
raise Exception('参数错误')
if len(item) == 2 and (item[0] or item[1]):
try:
today = get_current_datetime_str("%Y-%m-%d")
duration = get_time_duration('%s %s:00' % (today, item[0]),
'%s %s:00' % (today, item[1]),
False, False)
except Exception as e:
log.error('set_scope_config_serv time_format error' +
str(item))
raise Exception('时间格式错误')
if duration > 3600:
log.error(
'set_scope_config_serv time_format error' +
str(item))
raise Exception('时间差不能大于一个小时')
start_value = item[0] if len(item) == 2 and item[0] else '00:00'
stop_value = item[1] if len(item) == 2 and item[1] else '00:00'
configs[scope_type][start_field] = f'{start_value}:00'
configs[scope_type][stop_field] = f'{stop_value}:00'
else:
# 其他配置
if type == 'state':
# 状态对应字段
state_fields = {'0.25ms': 'en_scope',
'0.2s': 'en_wave_200ms',
'2s': 'en_electric_2s'}
args[state_fields[scope_type]] = args['state']
for field, item in args.items():
if field in configs[scope_type]:
configs[scope_type][field] = item
# 配置下发(只有正式环境才开启)
await set_mqtt_scope_config(pid, configs, scope_type)
# 将数据存入缓存中
await aredis_utils.RedisClient().setex(redis_key, 180, pid)
# 保存配置信息
new_configs = json.dumps(configs)
update_count = await set_scope_config_by_pid(pid, new_configs)
if update_count < 0:
log.error(f'set_scope_config_serv update sql error pid:{pid},'
f'new_configs={new_configs}')
raise Exception('操作失败')
async def get_scope_config_serv(pid):
'''
获取录波配置
'''
configs = await get_scope_config_by_pid(pid)
# 如果存在,则解析,
if configs:
configs = json.loads(configs)
# 如果不存在,初始化
if not configs or '2s' not in configs or '0.25ms' not in configs or \
'0.2s' not in configs:
try:
configs = await init_scope_config(pid)
except Exception as e:
configs = init_scope_config_example["threshold"]
# raise Exception('初始化配置信息出错:' + str(e))
# 格式化数据返回
return_data = {}
# 字段解释
fields = {
'umin': ['越下限阈值', 'v'],
'umax': ['越上限阈值', 'v'],
'ugap': ['波动阈值', 'v'],
'imax': ['越限阈值', 'i'],
'igap': ['波动阈值', 'i'],
'lcmax': ['越限阈值', 'residual_current'],
'lcgap': ['波动阈值', 'residual_current'],
'pttlmax': ['越限阈值', 'power'],
'pttlgap': ['波动阈值', 'power'],
}
for scope_type, config in configs.items():
one_config = {}
for field, data in fields.items():
if field in config:
one_config[field] = {'name': data[0], 'type': data[1], 'value':
config.get(field)}
# 状态显示
if scope_type == '0.25ms':
state = config.get('en_scope', 0)
elif scope_type == '0.2s':
state = config.get('en_wave_200ms', 0)
elif scope_type == '2s':
state = config.get('en_electric_2s', 0)
else:
state = config.get('state', 0)
# 时间段显示
if scope_type == '2s':
config = {conf_key: conf_v.replace('00:00:00', '')[0:5] for
conf_key, conf_v in config.items() if isinstance(
conf_v, str)}
one_config['one_time'] = {'name': '第一个时间段', 'type': 'time',
'value':
[config.get('start_time_I'),
config.get('stop_time_I')]}
one_config['two_time'] = {'name': '第二个时间段', 'type': 'time',
'value':
[config.get('start_time_II'),
config.get('stop_time_II')]}
one_config['three_time'] = {'name': '第三个时间段', 'type': 'time',
'value':
[config.get('start_time_III'),
config.get('stop_time_III')]}
one_data = GetScopeConfigList(state=state,
configs=one_config)
return_data[scope_type] = one_data
return return_data
async def init_scope_config(pid, update_sql=True):
'''
初始化录波配置
'''
# 获取配置信息
new_configs = await get_mqtt_scope_config(pid)
# 更新数据库配置信息
if update_sql:
configs = await get_scope_config_by_pid(pid)
if configs:
update_count = await set_scope_config_by_pid(pid,
json.dumps(
new_configs[
"threshold"]))
else:
update_count = await add_scope_config_by_pid(pid, new_configs)
if update_count < 0:
log.error(f'set_scope_config_serv update sql error pid:{pid},'
f'new_configs={new_configs}')
return new_configs["threshold"]
async def get_mqtt_scope_config(pid):
'''
获取录波配置信息
'''
# 封装数据格式
pub_dict = dict()
pub_dict['point_id'] = pid
pub_dict['scope_type'] = ''
pub_json = await change_param_to_config(pub_dict,
method='get', type='scope')
if not pub_json:
log.error(
'get_mqtt_scope_config pub_json error pid:' + str(pid))
raise Exception('找不到设备信息')
# 从协议中获取配置信息
res_data = await mqtt_func(pub_json, pid)
if not res_data.get('data'):
raise Exception('请求数据为空')
sid = pub_json.get("sid")
# 转换数据字段
trans_fields = {"0.25ms": {"scopeEnable": "en_scope",
"scoplimit": "scop_limit"},
"0.2s": {"scopeEnable": "en_wave_200ms",
"scoplimit": "scop_limit"},
"2s": {"scopeEnable": "en_electric_2s"}}
# 封装成数据库保存格式
configs = {
"0.25ms": res_data.get('data').get("scope").get("electric").get(
sid).get("threshold"),
"0.2s": res_data.get('data').get("wave_200ms").get("electric").get(
sid).get("threshold"),
"2s": res_data.get('data').get("electric_2s").get("electric").get(
sid).get("threshold"),
}
for type, config in configs.items():
trans_field = trans_fields.get(type)
if trans_field:
for key, trans_key in trans_field.items():
if key in config:
configs[type][trans_key] = config[key]
del configs[type][key]
return {
"coef": res_data.get('data').get("scope").get("electric").get(
sid).get("coef"),
'threshold': configs
}
async def set_mqtt_scope_config(pid, configs, scope_type):
'''
设置录波配置信息
'''
# 封装配置下发信息
pub_dict = configs[scope_type]
pub_dict['point_id'] = pid
pub_dict['scope_type'] = scope_type
pub_json = await change_param_to_config(pub_dict,
method='config', type='scope')
if not pub_json:
log.error(
'set_scope_config_serv pub_json error' + str(configs[scope_type]))
raise Exception('找不到设备信息')
# 将配置信息下发
await mqtt_func(pub_json, pid)
async def mqtt_func(pub_json, pid):
'''
请求mqtt服务器
'''
try:
async with MqttUtil() as emq:
res_data = await emq.device_response(
sid=pub_json.get("sid"),
request_id=pub_json.get("request_id"),
data=pub_json
)
except TimeoutError:
log.error('mqtt_func timeout error pid' + str(pid))
raise Exception('请求超时')
except Exception as e:
log.error('mqtt_func error:' + str(e))
raise Exception('请求错误')
log.info(f'mqtt_func res_data:{res_data}')
if res_data.get('status_code') != 200:
log.error('mqtt_func mqtt error pid' + str(pid))
raise Exception('请求失败')
return res_data
async def flush_scope_es_data(scope_g, start_time, end_time):
'''
刷新es的颗粒度
'''
equal = Equal(field='scope_g', value=scope_g)
sort = Sort(field='time', direction='desc')
start_dt = my_pendulum.from_format(start_time, 'YYYY-MM-DD HH:mm:ss')
end_dt = my_pendulum.from_format(end_time, 'YYYY-MM-DD HH:mm:ss')
while start_dt < end_dt:
try:
# 一天取一次
start_timestamp = start_dt.timestamp()
print(start_dt.strftime('YYYY-MM-DD HH:mm:ss'))
every_end_dt = start_dt.add(days=1)
end_timestamp = every_end_dt.timestamp()
range = Range(field='time', start=start_timestamp,
end=end_timestamp)
# 先查出来
filter = Filter(equals=[equal], keywords=[], ranges=[range],
in_groups=[])
page_request = PageRequest(page_size=10000, page_num=1, sort=sort,
filter=filter)
query_body = EsQuery.query(page_request)
query_body['_source'] = ['_id']
async with EsUtil() as es:
search_results = await es.search_origin(
body=query_body,
index=config.POINT_1MIN_SCOPE
)
if not search_results or not search_results['hits']['hits']:
continue
doc_ids = [data['_id'] for data in search_results['hits']['hits']]
# 然后再刷新
update_body = {
"query": {
"bool": {
"must": {
"terms": {
"doc_id.keyword": doc_ids
}
}
}
},
"script": {
"source": "ctx._source.scope_g = '%s'" % scope_g
}
}
async with EsUtil() as es:
update_results = await es.update_by_query(
body=update_body,
index=config.SCOPE_DATABASE
)
if not update_results or update_results.get('updated') > 0:
log.error(f"flush_scope_es_data update_by_query count error:"
f"{update_results.get('updated')}")
except Exception as e:
log.error(f"flush_scope_es_data error :{str(e)}")
continue
finally:
# 递增
start_dt = every_end_dt
from pot_libs.sanic_api import summary, description, examples
from pot_libs.logger import log
from unify_api.modules.anshiu.service.equip_management_serv import \
equip_run_list_serv, equip_run_statistics_serv
from unify_api.modules.anshiu.components.equip_management_cps import (
EquipManagementTotalReq, EquipManagementListReq, EquipManagementTotalResp,
EquipManagementListResp, EquipRunReq, EquipRunListResp,
EquipRunStatusReq, EquipRunStatusResp, EquipRunStatisticsReq,
EquipRunStatisticsResp
)
from unify_api.modules.anshiu.procedures.equip_management_pds import (
check_company_exist, equip_management_list, equip_management_total,
equip_run_list, get_equip_run_status
)
@summary("设备管理-获取设备统计信息")
async def post_equip_management_total(request,
body: EquipManagementTotalReq) -> EquipManagementTotalResp:
company_id = body.cid
total_info = await equip_management_total(company_id)
return EquipManagementTotalResp(
installed_number=total_info["installed_number"],
start_time=total_info["start_time"])
@summary("设备管理-获取设备列表/下载")
@description("列表的时候正常传页码,下载的时候is_download=1")
async def post_equip_management_list(request,
body: EquipManagementListReq) -> EquipManagementListResp:
company_id = body.cid
is_download = body.is_download
page_size, page_num = body.page_size, body.page_num
log.info(
f"post_equip_management_list company_id={company_id},page_size={page_size},page_num={page_num}"
)
# 下载(限制最大10000条)
if is_download == 1:
page_num, page_size = 1, 10000
company_exist = await check_company_exist(company_id)
if not company_exist:
return EquipManagementListResp.user_error()
page_map = await equip_management_list(company_id, page_num, page_size)
return EquipManagementListResp(rows=page_map["rows"],
total=page_map["total"],
page_num=page_num)
@summary("运行统计-列表")
@description("列表的时候正常传页码,下载的时候is_download=1")
async def post_equip_run_list(request, body: EquipRunReq) -> EquipRunListResp:
try:
company_id = body.cid
point_ids = body.point_ids
page_size = body.page_size
page_num = body.page_num
start_time = body.start
end_time = body.end
is_download = body.is_download
sort_field = body.sort_field if body.sort_field and is_download == 0 \
else 'start_time'
sort_type = body.sort_type if body.sort_type and is_download == 0 \
else 'desc'
# 未选中监测点直接返回
if len(point_ids) == 0:
return EquipRunListResp(rows=[], total=0, page_num=page_num)
# 监测点选中全部
if point_ids[0] == -1:
point_ids = []
# 未选中监测点且未选中工厂提示错误
if len(point_ids) == 0 and not company_id:
log.error(
"post_scope_list_param_error pids:%s cid:%s" % (
point_ids, company_id))
return EquipRunListResp.error_param()
# 下载(限制最大10000条)
if is_download == 1:
page_num, page_size = 1, 10000
except Exception as e:
log.error("post_equip_run_list_error :" + e)
return EquipRunListResp.param_error()
# 从数据库中获取数据
rows, total = await equip_run_list_serv(company_id, point_ids, start_time,
end_time,
page_num,
page_size,
sort_field,
sort_type)
return EquipRunListResp(rows=rows, total=total, page_num=page_num)
@summary("运行统计-获取统计数据")
async def post_equip_run_statistics(request,
body: EquipRunStatisticsReq) -> EquipRunStatisticsResp:
'''
获取运行统计数据
'''
cid = body.cid
start_time = body.start
end_time = body.end
point_ids = body.point_ids
# 未选中监测点直接返回
if len(point_ids) == 0:
return EquipRunStatisticsResp(count=0,run_all_time='',run_avg_time='',run_max_time='')
# 监测点选中全部
if point_ids[0] == -1:
point_ids = []
# 未选中监测点且未选中工厂提示错误
if len(point_ids) == 0 and not cid:
log.error("post_scope_list_param_error pids:%s cid:%s" % (point_ids, cid))
return EquipRunStatisticsResp.error_param()
data = await equip_run_statistics_serv(cid, point_ids, start_time,
end_time)
return EquipRunStatisticsResp(count=data['total_count'],
run_all_time=data['all_time'],
run_avg_time=data['avg_time'],
run_max_time=data['max_time'])
@summary('获取检测点运行状态')
async def post_equit_run_status(request,
body: EquipRunStatusReq) -> EquipRunStatusResp:
point_id = body.point_id
if not point_id or point_id <= 0:
log.error(f'get_equit_run_status_param_error point:{point_id}')
return EquipRunStatusResp.param_error()
status = await get_equip_run_status(point_id)
return EquipRunStatusResp(is_run=status)
from pot_libs.sanic_api import summary, examples
from pot_libs.common.components.query import PageRequest
from pot_libs.logger import log
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.utils.request_util import filed_value_from_list
from unify_api.utils import time_format
from unify_api.modules.anshiu.components.fine_monitor_cps import (
FineMonitorChartReq, FineMonitorInfoReq, FineMonitorChartResp,
FineMonitorInfoResp
)
from unify_api.modules.anshiu.procedures.fine_monitor_pds import (
get_location_by_ids, get_threshold_by_location
)
from unify_api.modules.anshiu.service.fine_monitor_serv import (
get_adio_chart_data, get_point_chart_data, get_adio_info_data,
get_point_info_data
)
@summary("精细监测-五个图表")
async def post_fine_monitor_chart(request,
body: FineMonitorChartReq) -> FineMonitorChartResp:
try:
date_start = body.start
date_end = body.end
# 起始时间转化为时间戳
start_timestamp = time_format.get_date_timestamp(date_start)
end_timestamp = time_format.get_date_timestamp(date_end)
# 计算间隔与坐标点
intervel, slots = time_format.time_pick_transf(date_start, date_end)
# 获取监测点
point_id = body.pid
if not point_id or point_id <= 0:
raise Exception('point_error point_id:{}'.format(point_id))
# 获取location点
location_group = body.location_ids
if not location_group:
raise Exception('in_groups is NULL, no location_id')
except Exception as e:
log.error('get_fine_monitor_chart_error ' + str(e))
return FineMonitorChartResp.param_error()
# 获取location表的信息
try:
location_info = await get_location_by_ids(location_group)
except Exception as e:
log.error('get_fine_monitor_chart_error ' + e)
return FineMonitorChartResp.db_error()
# 获取温度及漏电流数据
temperature_list, residual_currents_list = await get_adio_chart_data(
location_group, location_info, start_timestamp, end_timestamp,
intervel, slots)
# 电力数据 power_list、电流曲线、电压曲线
power_list, i_list, v_list, ctnum = await get_point_chart_data(point_id,
start_timestamp,
end_timestamp, \
intervel, slots)
# 获取温度与漏电流的曲线数据
# 获取用电数据
return FineMonitorChartResp(time_slots=slots,
temperature=temperature_list,
residual_current=residual_currents_list,
power=power_list, i=i_list, v=v_list,ctnum=ctnum)
@summary("精细监测-指标统计")
async def post_fine_monitor_info(request,
body: FineMonitorInfoReq) -> FineMonitorInfoResp:
try:
date_start = body.start
date_end = body.end
# 起始时间转化为时间戳
start_timestamp = time_format.get_date_timestamp(date_start)
end_timestamp = time_format.get_date_timestamp(date_end)
# 起始时间转化为es时间格式
es_start_dt = my_pendulum.from_format(date_start,
"YYYY-MM-DD HH:mm:ss")
es_end_dt = my_pendulum.from_format(date_end, "YYYY-MM-DD HH:mm:ss")
# 获取监测点
point_id = body.pid
if not point_id or point_id <= 0:
raise Exception('point_error point_id:{}'.format(point_id))
# 获取location点
location_group = body.location_ids
if not location_group:
raise Exception('in_groups is NULL, no location_id')
except Exception as e:
log.error('get_fine_monitor_info ' + str(e))
return FineMonitorInfoResp.param_error()
# 获取location表的信息
try:
location_info = await get_location_by_ids(location_group)
except Exception as e:
log.error('get_fine_monitor_chart_error ' + e)
return FineMonitorChartResp.db_error()
info_list = []
# 环境相关数据
adio_list = await get_adio_info_data(location_group,
location_info, start_timestamp,
end_timestamp)
# 用电相关数据
point_list = await get_point_info_data(point_id, es_start_dt,
es_end_dt)
info_list.extend(adio_list)
info_list.extend(point_list)
return FineMonitorInfoResp(info_list=info_list)
from pot_libs.common.components.responses import Success
from pot_libs.logger import log
from pot_libs.sanic_api import summary, description, examples
from pot_libs.utils.exc_util import BusinessException
from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeListReq, ScopeListResp, GetScopeConfigReq, GetScopeConfigResp, \
SetScopeConfigReq, SetScopeConfigResp, \
scope_list_req_example, ScopeDetailsResp, ScopeDetailRep, \
set_scope_config_example, InitScopeConfigReq, FlushScopeEsDataReq, \
ScopeListDownloadReq, ScopeListDownloadResp
from unify_api.modules.anshiu.service.scope_operations_serv import \
search_scope_service, scope_detail_data, get_scope_config_serv, \
set_scope_config_serv, init_scope_config, flush_scope_es_data, \
scope_list_download_data
@summary("识别记录-列表")
@description("列表的时候正常传页码,下载的时候is_download=1")
@examples(scope_list_req_example)
async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp:
'''
识别记录
'''
cid = body.cid
page_size = body.page_size
page_num = body.page_num
start = body.start
end = body.end
scope_g = body.scope_g
pids = body.pids
is_download = body.is_download
# 监测点选中全部
if pids and pids[0] == -1:
pids = []
# 未选中监测点且未选中工厂提示错误
if len(pids) == 0 and not cid:
log.error("post_scope_list_param_error pids:%s cid:%s" % (pids, cid))
return ScopeListResp.error_param()
if page_num * page_size > 30000:
log.error(f"post_scope_list_param_error page_too_large page_num"
f":{page_num},page_size:{page_size}")
raise BusinessException(
message='只能查询前%s条数据,建议缩小查询范围' % 30000)
# 下载(限制最大10000条)
if is_download == 1:
page_num, page_size = 1, 10000
# 替换scope_g
if scope_g:
scope_g = ['200ms' if i == '0.2s' else i for i in scope_g]
rows, total = await search_scope_service(pids, cid, page_num, page_size,
start, end, scope_g)
return ScopeListResp(rows=rows, total=total, page_num=page_num)
@summary("2s录波数据下载")
async def post_scope_list_download(request,
body: ScopeListDownloadReq) -> ScopeListDownloadResp:
start = body.start
end = body.end
pids = body.pids
if len(pids) > 1:
raise BusinessException(message="只允许下载一个监测点的数据")
datas = await scope_list_download_data(pids, start, end)
return ScopeListDownloadResp(rows=datas)
@summary("识别记录-详情")
async def post_scope_detail(request,
body: ScopeDetailRep) -> ScopeDetailsResp:
'''
识别详情
'''
doc_id = body.id
return await scope_detail_data(doc_id)
@summary("识别设置-获取配置信息")
async def post_get_scope_config(request,
body: GetScopeConfigReq) -> GetScopeConfigResp:
'''
识别设置-配置展示
'''
pid = body.pid
try:
data = await get_scope_config_serv(pid)
except Exception as e:
log.error('post_get_scope_config error:' + str(e))
return GetScopeConfigResp.server_error()
return GetScopeConfigResp(pid=pid, rows=data)
@summary("识别设置-设置配置信息")
@examples(set_scope_config_example)
async def post_set_scope_config(request,
body: SetScopeConfigReq) -> SetScopeConfigResp:
'''
识别设置-配置设置
'''
pid = body.pid
type = body.type
scope_type = body.scope_type
# 每一种类型需要的字段
fields = {'state': ['state'],
'i': ['imax', 'igap'],
'v': ['umax', 'umin', 'ugap'],
'residual_current': ['lcmax', 'lcgap'],
'power': ['pttlmax', 'pttlgap'],
'time': ['one_time', 'two_time', 'three_time']}
args = {}
for field in fields.get(type):
args[field] = getattr(body, field)
try:
await set_scope_config_serv(pid, type, scope_type, args)
except Exception as e:
log.error('post_set_scope_config' + str(e))
return SetScopeConfigResp(success=0, message=str(e))
return SetScopeConfigResp(success=1, message='操作成功')
@summary("识别设置-初始化设备配置信息(开发专用!)")
async def post_init_scope_config(request,
body: InitScopeConfigReq) -> Success:
pids = body.pids
# user_id = request.ctx.user_id
# if user_id not in ['100653']:
# return Success(success=0, message='无此操作权限')
error_list = []
for pid in pids:
try:
await init_scope_config(pid)
except Exception as e:
log.error(f'{pid}:post_init_scope_config error {str(e)}')
error_list.append(str(e))
continue
if error_list:
log.error(f"post_init_scope_config error:{','.join(error_list)}")
else:
log.info(
f"post_init_scope_config success total_success_count:"
f"{str(len(pids))}")
return Success(success=1, message=','.join(error_list))
@summary("刷新es录波数据(开发专用!)")
async def post_flush_scope_es_data(request, body: FlushScopeEsDataReq) -> \
Success:
scope_type_list = body.scope_type_list
start_time = body.start_time
end_time = body.end_time
try:
for scope_g in scope_type_list:
if scope_g not in ['200ms', '2s', '0.25ms']:
continue
await flush_scope_es_data(scope_g, start_time, end_time)
except Exception as e:
log.error(f'post_flush_scope_es_data error {str(e)}')
return Success(success=0, message=str(e))
return Success(success=1, message='操作成功')
...@@ -1400,3 +1400,46 @@ def get_start_end_by_tz_time_new(time_str, from_format='YYYY-MM-DD', ...@@ -1400,3 +1400,46 @@ def get_start_end_by_tz_time_new(time_str, from_format='YYYY-MM-DD',
start = date.start_of("day").format(to_format) start = date.start_of("day").format(to_format)
end = date.end_of("day").format(to_format) end = date.end_of("day").format(to_format)
return start, end return start, end
def get_time_duration(start_time, end_time, is_timestamp=True,
is_need_trans=True):
"""
根据开始、结束时间获取格式化的时间差
"""
if is_timestamp:
start_time = convert_timestamp_to_dt(start_time)
end_time = convert_timestamp_to_dt(end_time)
else:
start_time = convert_to_dt(start_time)
end_time = convert_to_dt(end_time)
# 计算时间差
duration = end_time - start_time
# 如果不需要转换 则直接返回
duration_str = duration.seconds + duration.days * 24 * 60 * 60
if not is_need_trans:
return duration_str
# 返回
return_str = get_time_duration_by_str(duration_str)
return return_str
def get_time_duration_by_str(duration_str):
"""
根据时间戳获取格式化的时间差
"""
return_str = ''
days = int(duration_str / (60 * 60 * 24))
if days > 0:
return_str += "%s天" % str(days)
hours = int(duration_str % (60 * 60 * 24) / (60 * 60))
if hours > 0:
return_str += "%s时" % str(hours)
minutes = int(duration_str % (60 * 60 * 24) % (60 * 60) / 60)
if minutes > 0:
return_str += "%s分" % str(minutes)
seconds = int(duration_str % (60 * 60 * 24) % (60 * 60) % 60)
if seconds > 0:
return_str += "%s秒" % str(seconds)
return return_str
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment