Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
153e0c79
Commit
153e0c79
authored
Apr 12, 2023
by
wang.wenrong
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'wwr' into 'develop'
anshiU See merge request
!11
parents
a19ac9e7
7fb7f69a
Show whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
2646 additions
and
0 deletions
+2646
-0
__init__.py
unify_api/modules/anshiu/__init__.py
+0
-0
__init__.py
unify_api/modules/anshiu/components/__init__.py
+0
-0
equip_management_cps.py
unify_api/modules/anshiu/components/equip_management_cps.py
+170
-0
fine_monitor_cps.py
unify_api/modules/anshiu/components/fine_monitor_cps.py
+97
-0
scope_operations_cps.py
unify_api/modules/anshiu/components/scope_operations_cps.py
+509
-0
__init__.py
unify_api/modules/anshiu/dao/__init__.py
+0
-0
__init__.py
unify_api/modules/anshiu/procedures/__init__.py
+0
-0
equip_management_pds.py
unify_api/modules/anshiu/procedures/equip_management_pds.py
+175
-0
fine_monitor_pds.py
unify_api/modules/anshiu/procedures/fine_monitor_pds.py
+79
-0
scope_operations_pds.py
unify_api/modules/anshiu/procedures/scope_operations_pds.py
+73
-0
__init__.py
unify_api/modules/anshiu/service/__init__.py
+0
-0
equip_management_serv.py
unify_api/modules/anshiu/service/equip_management_serv.py
+50
-0
fine_monitor_serv.py
unify_api/modules/anshiu/service/fine_monitor_serv.py
+515
-0
scope_operations_serv.py
unify_api/modules/anshiu/service/scope_operations_serv.py
+534
-0
__init__.py
unify_api/modules/anshiu/views/__init__.py
+0
-0
equip_management.py
unify_api/modules/anshiu/views/equip_management.py
+131
-0
fine_monitor.py
unify_api/modules/anshiu/views/fine_monitor.py
+112
-0
scope_operations.py
unify_api/modules/anshiu/views/scope_operations.py
+158
-0
time_format.py
unify_api/utils/time_format.py
+43
-0
No files found.
unify_api/modules/anshiu/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/components/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/components/equip_management_cps.py
0 → 100644
View file @
153e0c79
from
dataclasses
import
dataclass
from
pot_libs.sanic_api
import
Model
from
pot_libs.common.components.fields
import
Cid
from
pot_libs.sanic_api.column
import
Opt
,
Float
,
Int
,
Str
,
List
,
Enum
from
unify_api.utils.response_code
import
DbErr
,
ParamErr
,
JwtErr
,
UserErr
@
dataclass
class
EquipManagementTotalReq
(
Model
):
'''
设备管理汇总-请求格式
'''
cid
:
Cid
@
dataclass
class
EquipManagementListReq
(
Model
):
'''
设备管理列表-请求格式
'''
cid
:
Cid
is_download
:
int
=
Int
(
"是否是下载 0-不是 1-是"
)
.
eg
(
0
)
page_size
:
int
=
Opt
(
Int
(
"页大小"
)
.
eg
(
10
))
page_num
:
int
=
Opt
(
Int
(
"页码"
)
.
eg
(
1
))
@
dataclass
class
EquipRunStatusReq
(
Model
):
'''
运行统计状态
'''
point_id
:
int
=
Opt
(
Int
(
"监测点"
)
.
eg
(
260
))
@
dataclass
class
EquipManagementTotalResp
(
Model
):
'''
设备管理汇总-返回格式
'''
installed_number
:
int
=
Int
(
"安装点数"
)
.
eg
(
1
)
start_time
:
str
=
Str
(
"启用时间"
)
.
eg
(
"2022-06-16"
)
@
dataclass
class
EquipManagementInfo
(
Model
,
DbErr
,
UserErr
,
JwtErr
):
'''
设备管理列表详情-返回格式
'''
installed_location
:
str
=
Opt
(
Str
(
"安装位置"
)
.
eg
(
"3#变压器"
))
device_number
:
str
=
Opt
(
Str
(
"设备编号"
)
.
eg
(
"A1904000083"
))
wiring_type
:
str
=
Opt
(
Str
(
"接线形式"
)
.
eg
(
"三表法"
))
ct_change
:
str
=
Opt
(
Int
(
"ct变比"
)
.
eg
(
1000
))
pt_change
:
str
=
Opt
(
Int
(
"pt变比"
)
.
eg
(
1000
))
rated_voltage
:
int
=
Opt
(
Int
(
"额定电压"
)
.
eg
(
400
))
start_time
:
str
=
Str
(
"接线时间"
)
.
eg
(
"2022-06-17 16:42"
)
@
dataclass
class
EquipManagementListResp
(
Model
,
DbErr
,
UserErr
,
JwtErr
):
'''
设备管理列表-返回格式
'''
rows
:
list
=
List
(
"设备信息"
)
.
items
(
EquipManagementInfo
)
total
:
int
=
Int
(
"总量"
)
page_num
:
int
=
Int
(
"当前页"
)
.
eg
(
1
)
@
dataclass
class
EquipRunReq
(
Model
):
'''
运行统计-请求格式
'''
cid
:
Cid
page_size
:
int
=
Opt
(
Int
(
"每页记录数"
)
.
eg
(
20
))
page_num
:
int
=
Opt
(
Int
(
"当前页码"
)
.
eg
(
1
))
is_download
:
int
=
Int
(
"是否是下载 0-不是 1-是"
)
.
eg
(
0
)
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2020-05-01 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2020-05-01 23:59:59"
))
point_ids
:
list
=
Opt
(
List
(
"监测点 []表示没有 [-1]表示全部"
)
.
eg
([
260
,
261
,
268
]))
sort_field
:
list
=
Enum
(
'排序字段'
)
.
of
(
'point_name'
,
'start_time'
,
'end_time'
,
'run_time'
)
sort_type
:
list
=
Enum
(
'排序方向'
)
.
of
(
'asc'
,
'desc'
)
@
classmethod
def
example
(
cls
):
return
{
"cid"
:
154
,
"point_ids"
:
[
182
,
421
,
422
,
423
,
1752
,
1753
,
1754
],
"start"
:
"2022-06-28 00:00:00"
,
"end"
:
"2022-06-28 23:59:59"
,
"is_download"
:
0
,
"page_num"
:
1
,
"page_size"
:
20
,
"sort_field"
:
"start_time"
,
"sort_type"
:
"desc"
}
@
dataclass
class
EquipRunInfo
(
Model
,
DbErr
,
UserErr
,
JwtErr
):
'''
运行统计列表详情-返回格式
'''
point_name
:
str
=
Opt
(
Str
(
"监测点"
)
.
eg
(
"3#变压器"
))
start_time
:
str
=
Str
(
"开启时间"
)
.
eg
(
"2022-06-17 16:42"
)
end_time
:
str
=
Str
(
"关闭时间"
)
.
eg
(
"2022-06-21 16:42"
)
run_time
:
str
=
Str
(
"运行时长"
)
.
eg
(
"1小时20分"
)
@
dataclass
class
EquipRunListResp
(
Model
,
DbErr
,
UserErr
,
ParamErr
):
'''
运行统计-返回格式
'''
rows
:
list
=
List
(
"运行信息"
)
.
items
(
EquipRunInfo
)
total
:
int
=
Int
(
"总量"
)
page_num
:
int
=
Int
(
"当前页"
)
.
eg
(
1
)
@
dataclass
class
EquipRunStatisticsReq
(
Model
):
'''
运行统计数据-请求格式
'''
cid
:
Cid
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2020-05-01 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2020-05-01 23:59:59"
))
point_ids
:
list
=
Opt
(
List
(
"监测点 []表示没有 [-1]表示全部"
)
.
eg
([
260
,
261
,
268
]))
@
classmethod
def
example
(
cls
):
return
{
"cid"
:
84
,
"point_ids"
:
[
261
,
262
,
266
,
268
],
"start"
:
"2021-09-06 15:04:51"
,
"end"
:
"2021-12-06 15:04:51"
,
}
@
dataclass
class
EquipRunStatisticsResp
(
Model
,
ParamErr
):
'''
运行统计数据-返回格式
'''
count
:
int
=
Int
(
"运行次数"
)
.
eg
(
23
)
run_all_time
:
str
=
Str
(
"运行时长"
)
.
eg
(
"11小时21分"
)
run_avg_time
:
str
=
Str
(
"平均时长"
)
.
eg
(
"1小时05分"
)
run_max_time
:
str
=
Str
(
"最长时长"
)
.
eg
(
"2小时23分"
)
@
dataclass
class
EquipRunStatusResp
(
Model
,
ParamErr
):
'''
运行统计状态-返回格式
'''
is_run
:
int
=
Int
(
"是否运行:0-否、1-是 2-非动力设备"
)
.
eg
(
1
)
unify_api/modules/anshiu/components/fine_monitor_cps.py
0 → 100644
View file @
153e0c79
from
dataclasses
import
dataclass
from
pot_libs.sanic_api
import
Model
from
pot_libs.sanic_api.column
import
List
,
Str
,
Int
,
Opt
,
Float
from
pot_libs.common.components.fields
import
Cid
,
Item
,
DateTime
from
unify_api.utils.response_code
import
DbErr
,
ParamErr
@
dataclass
class
FineMonitorChartReq
(
Model
):
'''
精确监测-图表请求
'''
pid
:
int
=
Int
(
"监测点id"
)
.
eg
(
261
)
start
:
str
=
Str
(
"开始时间"
)
.
eg
(
"2022-06-20 00:00:00"
)
end
:
str
=
Str
(
"结束时间"
)
.
eg
(
"2022-06-20 23:59:59"
)
location_ids
:
list
=
List
(
"location_ids"
)
.
eg
([
487
,
488
,
489
,
490
,
491
])
@
dataclass
class
FineMonitorInfoReq
(
Model
):
'''
精确监测-指标统计请求
'''
pid
:
int
=
Int
(
"监测点id"
)
.
eg
(
261
)
start
:
str
=
Str
(
"开始时间"
)
.
eg
(
"2022-06-20 00:00:00"
)
end
:
str
=
Str
(
"结束时间"
)
.
eg
(
"2022-06-20 23:59:59"
)
location_ids
:
list
=
List
(
"location_ids"
)
.
eg
([
487
,
488
,
489
,
490
,
491
])
@
dataclass
class
Chart
(
Model
,
DbErr
):
'''图表格式'''
item
:
str
=
Opt
(
Str
(
'类型'
)
.
eg
(
'A相'
))
value_slots
:
list
=
Opt
(
List
(
'数据列表'
)
.
items
(
Float
()))
@
dataclass
class
Statistics
(
Model
,
DbErr
):
type
:
str
=
Str
(
'指标类型'
)
.
eg
(
'temperature'
)
item
:
Item
=
Item
max
:
float
=
Float
(
"最大值"
)
max_time
:
DateTime
=
DateTime
min
:
float
=
Float
(
"最小值"
)
min_time
:
DateTime
=
DateTime
avg
:
float
=
Float
(
'平均值'
)
@
dataclass
class
FineMonitorChartResp
(
Model
,
DbErr
,
ParamErr
):
'''
精确监测-图表返回
'''
time_slots
:
list
=
List
(
"横坐标时序"
)
.
items
(
Float
())
residual_current
:
list
=
List
(
"漏电流"
)
.
items
(
Chart
)
temperature
:
list
=
List
(
"温度"
)
.
items
(
Chart
)
power
:
list
=
List
(
"功率(p表示有功,q表示无功)"
)
.
items
(
Chart
)
i
:
list
=
List
(
"电流"
)
.
items
(
Chart
)
v
:
list
=
List
(
"电压"
)
.
items
(
Chart
)
ctnum
:
int
=
Int
(
"接线方式:包含两表法、三表法"
)
.
eg
(
3
)
@
classmethod
def
example
(
cls
):
return
{
"time_slots"
:
[
"00:00"
,
"00:15"
],
"ctnum"
:
3
,
"residual_current"
:
[{
"item"
:
"漏电流"
,
"value_slots"
:
[
3.26
,
3.31
]
}],
"temperature"
:
[{
"item"
:
"A相"
,
"value_slots"
:
[
27.48
,
27.43
]
},
{
"item"
:
"B相"
,
"value_slots"
:
[
28.04
,
28.03
]
}],
"power"
:
[{
"item"
:
"p"
,
"value_slots"
:
[]
}],
"i"
:
[{
"item"
:
"ia"
,
"value_slots"
:
[
12
,
34
]
}],
"v"
:
[{
"item"
:
"ua"
,
"value_slots"
:
[]
}]
}
@
dataclass
class
FineMonitorInfoResp
(
Model
,
DbErr
,
ParamErr
):
'''
精确监测-指标统计返回
'''
info_list
:
list
=
List
(
"指标统计列表"
)
.
items
(
Statistics
)
unify_api/modules/anshiu/components/scope_operations_cps.py
0 → 100644
View file @
153e0c79
from
dataclasses
import
dataclass
from
pot_libs.common.components.fields
import
Cid
,
Item
from
pot_libs.sanic_api
import
Model
from
pot_libs.sanic_api.column
import
Int
,
List
,
Opt
,
Str
,
Float
,
Dict
,
Enum
from
unify_api.utils.response_code
import
DbErr
,
ServerErr
,
ParamErr
@
dataclass
class
ScopeListReq
(
Model
):
'''
录波识别记录-请求格式
'''
cid
:
Cid
page_size
:
int
=
Opt
(
Int
(
"每页记录数"
)
.
eg
(
20
))
page_num
:
int
=
Opt
(
Int
(
"当前页码"
)
.
eg
(
1
))
is_download
:
int
=
Int
(
"是否是下载 0-不是 1-是"
)
.
eg
(
0
)
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2022-06-22 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2022-06-22 23:59:59"
))
scope_g
:
str
=
Opt
(
List
(
"录波颗粒度"
)
.
eg
([
"2s"
]))
pids
:
list
=
Opt
(
List
(
"检测点 []表示没有 [-1]表示全部"
)
.
eg
([
260
,
261
,
268
]))
@
dataclass
class
ScopeListDownloadReq
(
Model
):
'''
2s录波识记录下载-请求格式
'''
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2022-06-22 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2022-06-22 23:59:59"
))
pids
:
list
=
Opt
(
List
(
"监测点 []表示没有 [-1]表示全部"
)
.
eg
([
260
,
261
,
268
]))
@
dataclass
class
ScopeListItem
(
Model
):
'''
录播识别记录-每一条数据
'''
check_dt
:
str
=
Str
(
"触发时间"
)
.
eg
(
"2021-07-01 23:59:59"
)
point
:
str
=
Str
(
'监测点名称'
)
.
eg
(
'土建区总进线'
)
message
:
str
=
Str
(
'触发原因'
)
.
eg
(
'B相电压波动'
)
scope_type
:
int
=
Opt
(
Int
(
'录波颗粒度'
)
.
eg
(
'0.25ms'
))
doc_id
:
str
=
Opt
(
Str
(
'该条信息id'
)
.
eg
(
'213_over_gap_i__1604302769'
))
@
dataclass
class
ScopeListResp
(
Model
):
'''
录波识别记录-返回格式
'''
total
:
int
=
Opt
(
Int
(
"总条数"
)
.
eg
(
20
))
rows
:
list
=
Opt
(
List
(
"总数据"
)
.
items
(
ScopeListItem
))
page_num
:
int
=
Int
(
"当前页"
)
.
eg
(
1
)
@
dataclass
class
ScopeItemDownload
(
Model
):
"""
录波下载数据详情
"""
datetime
:
str
=
Str
(
"时间"
)
.
eg
(
"2021-07-01 23:59:59"
)
ua
:
float
=
Float
(
'A相电压'
)
ub
:
float
=
Float
(
'B相电压'
)
uc
:
float
=
Float
(
'C相电压'
)
ia
:
float
=
Float
(
'A相电流'
)
ib
:
float
=
Float
(
'B相电流'
)
ic
:
float
=
Float
(
'C相电流'
)
pttl
:
float
=
Float
(
'功率'
)
lc
:
float
=
Float
(
'漏电流'
)
@
dataclass
class
ScopeListDownloadResp
(
Model
):
"""
录波下载数据
"""
rows
:
list
=
Opt
(
List
(
"数据列表"
)
.
items
(
ScopeItemDownload
))
@
dataclass
class
ScopeContent
(
Model
):
'''
识别详情图表-返回格式
'''
item
:
Item
value_datas
:
list
=
List
()
.
items
(
Float
())
@
dataclass
class
ScopeDetailRep
(
Model
):
'''
识别详情-请求格式
'''
id
:
str
=
Opt
(
Str
(
"doc_id"
)
.
eg
(
"423_over_gap_i__1655991008"
))
@
classmethod
def
example
(
cls
):
return
{
"id"
:
"423_over_gap_i__1655991008"
}
@
dataclass
class
ScopeDetailsResp
(
Model
,
DbErr
):
'''
识别详情-返回格式
'''
point
:
str
=
Str
(
'监测点名称'
)
.
eg
(
'土建区总进线'
)
contin_time
:
int
=
Opt
(
Int
(
'持续时间'
)
.
eg
(
2
))
scope_type
:
str
=
Str
(
'录波颗粒度'
)
.
eg
(
'0.25ms'
)
check_dt
:
str
=
Str
(
"发生时间"
)
.
eg
(
"2021-07-01 23:59:59"
)
ctnum
:
str
=
Str
(
"接线法 2-两表法 3-三表法"
)
.
eg
(
3
)
location
:
int
=
Opt
(
Int
(
'位置'
)
.
eg
(
240
))
item
:
str
=
Item
type
:
str
=
Opt
(
Str
()
.
eg
(
'电流波动'
))
i
:
list
=
List
(
"电流事件录波"
)
.
items
(
ScopeContent
)
v
:
list
=
List
(
"电压事件录波"
)
.
items
(
ScopeContent
)
residual_current
:
list
=
Opt
(
List
(
"2s录波漏电流"
)
.
items
(
ScopeContent
))
p
:
list
=
Opt
(
List
(
"2s录波功率"
)
.
items
(
ScopeContent
))
@
classmethod
def
example
(
cls
):
return
{
"point"
:
"钻孔配电柜"
,
"contin_time"
:
"400"
,
"check_dt"
:
"2022-06-23 21:30:08"
,
"scope_type"
:
"0.25ms"
,
"item"
:
"A相"
,
"type"
:
"电流波动"
,
"i"
:
[{
"item"
:
"ia"
,
"value_datas"
:
[
1.25
,
-
0.34
]
}],
"v"
:
[{
"item"
:
"ua"
,
"value_datas"
:
[
86.06
,
123.34
]
}],
"residual_current"
:
[{
"item"
:
"漏电流"
,
"value_datas"
:
[
86.06
,
123.34
]
}],
"power"
:
[{
"item"
:
"p"
,
"value_datas"
:
[
86.06
,
123.34
]
}],
}
@
dataclass
class
GetScopeConfigReq
(
Model
):
'''
获取录播配置-请求格式
'''
pid
:
int
=
Int
(
"监测点id"
)
.
eg
(
1754
)
@
dataclass
class
SetScopeConfigReq
(
Model
):
'''
设置录播配置-请求格式
'''
pid
:
int
=
Int
(
"监测点id"
)
.
eg
(
20
)
scope_type
:
str
=
Enum
(
"录波颗粒度"
)
.
of
(
'0.25ms'
,
'0.2s'
,
'2s'
)
type
:
str
=
Enum
(
"提交类型"
)
.
of
(
"state"
,
"i"
,
"v"
,
"residual_current"
,
"power"
,
"time"
)
state
:
int
=
Opt
(
Enum
(
"开启关闭状态"
)
.
of
(
0
,
1
))
umax
:
int
=
Opt
(
Int
(
"电压上限"
)
.
eg
(
430
))
umin
:
int
=
Opt
(
Int
(
"电压下限"
)
.
eg
(
360
))
ugap
:
int
=
Opt
(
Int
(
"电压波动阈值"
)
.
eg
(
50
))
imax
:
int
=
Opt
(
Int
(
"电流上限"
)
.
eg
(
40
))
igap
:
int
=
Opt
(
Int
(
"电流波动阈值"
)
.
eg
(
5
))
lcmax
:
int
=
Opt
(
Int
(
"漏电流上限"
)
.
eg
(
40
))
lcgap
:
int
=
Opt
(
Int
(
"漏电流波动阈值"
)
.
eg
(
40
))
pttlmax
:
int
=
Opt
(
Int
(
"功率上限"
)
.
eg
(
5
))
pttlgap
:
int
=
Opt
(
Int
(
"功率波动阈值"
)
.
eg
(
40
))
one_time
:
list
=
Opt
(
List
(
"第一段时间"
)
.
items
(
Str
()))
two_time
:
list
=
Opt
(
List
(
"第二段时间"
)
.
items
(
Str
()))
three_time
:
list
=
Opt
(
List
(
"第三段时间"
)
.
items
(
Str
()))
@
dataclass
class
GetScopeConfigResp
(
Model
,
ServerErr
):
'''
获取录播配置-返回格式
'''
pid
:
int
=
Int
(
"监测点id"
)
.
eg
(
20
)
rows
:
list
=
List
(
"配置内容"
)
.
items
(
SetScopeConfigReq
)
@
classmethod
def
example
(
cls
):
return
{
"pid"
:
1754
,
"rows"
:
{
"0.25ms"
:
{
"state"
:
1
,
"configs"
:
{
"umin"
:
{
"name"
:
"越下限阈值"
,
"type"
:
"v"
,
"value"
:
85
},
"umax"
:
{
"name"
:
"越上限阈值"
,
"type"
:
"v"
,
"value"
:
115
},
"ugap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"v"
,
"value"
:
25
},
"imax"
:
{
"name"
:
"越限阈值"
,
"type"
:
"i"
,
"value"
:
5
},
"igap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"i"
,
"value"
:
20
}
}
},
"0.2s"
:
{
"state"
:
1
,
"configs"
:
{
"umin"
:
{
"name"
:
"越下限阈值"
,
"type"
:
"v"
,
"value"
:
187
},
"umax"
:
{
"name"
:
"越上限阈值"
,
"type"
:
"v"
,
"value"
:
253
},
"ugap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"v"
,
"value"
:
50
},
"imax"
:
{
"name"
:
"越限阈值"
,
"type"
:
"i"
,
"value"
:
5
},
"igap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"i"
,
"value"
:
1
}
}
},
"2s"
:
{
"state"
:
1
,
"configs"
:
{
"umin"
:
{
"name"
:
"越下限阈值"
,
"type"
:
"v"
,
"value"
:
187
},
"umax"
:
{
"name"
:
"越上限阈值"
,
"type"
:
"v"
,
"value"
:
253
},
"ugap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"v"
,
"value"
:
50
},
"imax"
:
{
"name"
:
"越限阈值"
,
"type"
:
"i"
,
"value"
:
5
},
"igap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"i"
,
"value"
:
1
},
"lcmax"
:
{
"name"
:
"越限阈值"
,
"type"
:
"residual_current"
,
"value"
:
1
},
"lcgap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"residual_current"
,
"value"
:
1
},
"pttlmax"
:
{
"name"
:
"越限阈值"
,
"type"
:
"power"
,
"value"
:
5
},
"pttlgap"
:
{
"name"
:
"波动阈值"
,
"type"
:
"power"
,
"value"
:
2
},
"one_time"
:
{
"name"
:
"第一个时间段"
,
"type"
:
"time"
,
"value"
:
[
"08:00"
,
"08:30"
]
},
"two_time"
:
{
"name"
:
"第二个时间段"
,
"type"
:
"time"
,
"value"
:
[
"09:00"
,
"10:00"
]
},
"three_time"
:
{
"name"
:
"第三个时间段"
,
"type"
:
"time"
,
"value"
:
[
"14:00"
,
"15:00"
]
}
}
}
}
}
@
dataclass
class
InitScopeConfigReq
(
Model
):
'''
初始化配置信息--请求
'''
pids
:
list
=
List
(
'pids'
)
@
classmethod
def
example
(
cls
):
return
{
'pids'
:
[
238
,
240
,
242
,
330
,
343
,
749
,
1463
,
2248
]
}
@
dataclass
class
FlushScopeEsDataReq
(
Model
):
'''
刷新es录波数据--请求
'''
scope_type_list
:
list
=
List
(
'录波类型'
)
start_time
:
str
=
Str
(
'开始时间'
)
end_time
:
str
=
Str
(
'结束时间'
)
@
classmethod
def
example
(
cls
):
return
{
'scope_type_list'
:
[
'200ms'
,
'2s'
,
'0.25ms'
],
'start_time'
:
'2022-07-04 17:40:24'
,
'end_time'
:
'2022-07-21 18:40:24'
,
}
@
dataclass
class
GetScopeConfigList
(
Model
):
'''
获取录播配置列表
'''
state
:
int
=
Enum
(
"状态 0-关 1-开"
)
.
of
(
0
,
1
)
configs
:
list
=
List
(
"返回配置"
)
@
dataclass
class
SetScopeConfigResp
(
Model
):
'''
设置录播配置-返回格式
'''
success
:
int
=
Int
(
'是否操作成功 0-否 1-是'
)
.
eg
(
1
)
message
:
str
=
Str
(
"返回信息"
)
.
eg
(
"操作成功"
)
# 识别记录列表请求举例
scope_list_req_example
=
{
"范例1"
:
{
"cid"
:
114
,
"page_size"
:
20
,
"page_num"
:
1
,
"is_download"
:
0
,
"start"
:
"2022-06-22 00:00:00"
,
"end"
:
"2022-06-22 23:59:59"
,
"pids"
:
[]
},
"范例2"
:
{
"cid"
:
44
,
"page_size"
:
20
,
"page_num"
:
1
,
"is_download"
:
0
,
"start"
:
"2022-06-22 00:00:00"
,
"end"
:
"2022-06-22 23:59:59"
,
"pids"
:
[
260
,
261
,
268
]
}
}
'''
设置录波配置格式
'''
set_scope_config_example
=
{
"0.25ms设置电压范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"0.25ms"
,
"type"
:
"v"
,
"umax"
:
430
,
"umin"
:
360
,
"ugap"
:
50
,
},
"0.25ms设置状态范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"0.25ms"
,
"type"
:
"state"
,
"state"
:
1
,
},
"0.2s设置电流范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"0.2s"
,
"type"
:
"i"
,
"imax"
:
40
,
"igap"
:
5
,
},
"2s设置漏电流范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"2s"
,
"type"
:
"residual_current"
,
"lcmax"
:
30
,
"lcgap"
:
12
},
"2s设置功率范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"2s"
,
"type"
:
"power"
,
"pttlmax"
:
5
,
"pttlgap"
:
40
,
},
"2s设置时间段范例"
:
{
"pid"
:
1754
,
"scope_type"
:
"2s"
,
"type"
:
"time"
,
"one_time"
:
[
'08:00'
,
'09:00'
],
"two_time"
:
[
'10:00'
,
'10:30'
],
"three_time"
:
[
'13:00'
,
'14:00'
],
},
}
'''
初始化录播的配置
'''
init_scope_config_example
=
{
"coef"
:
{
"Kua"
:
2.843364
,
"Bua"
:
2070.924316
,
"Kub"
:
2.849909
,
"Bub"
:
2058.210205
,
"Kuc"
:
2.854909
,
"Buc"
:
2054.158447
,
"Kia"
:
9.65814
,
"Bia"
:
2056.642578
,
"Kib"
:
9.652307
,
"Bib"
:
2054.431152
,
"Kic"
:
9.65714
,
"Bic"
:
2056.80249
},
"threshold"
:
{
"0.25ms"
:
{
"en_scope"
:
0
,
"umin"
:
187
,
"umax"
:
253
,
"ugap"
:
11.5
,
"igap"
:
8
,
"imax"
:
167
,
},
"0.2s"
:
{
"en_wave_200ms"
:
0
,
"umin"
:
187
,
"umax"
:
253
,
"ugap"
:
11.5
,
"igap"
:
8
,
"imax"
:
167
,
"scop_limit"
:
50
},
"2s"
:
{
"en_electric_2s"
:
0
,
"start_time_I"
:
"11:00:00"
,
"stop_time_I"
:
"12:00:00"
,
"start_time_II"
:
"14:00:00"
,
"stop_time_II"
:
"15:00:00"
,
"start_time_III"
:
"15:00:00"
,
"stop_time_III"
:
"16:00:00"
,
"umin"
:
187
,
"umax"
:
253
,
"ugap"
:
11.5
,
"igap"
:
15
,
"imax"
:
167
,
"lcmax"
:
30
,
"lcgap"
:
12
,
"pttlgap"
:
10
,
"pttlmax"
:
110
}
}
}
unify_api/modules/anshiu/dao/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/procedures/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/procedures/equip_management_pds.py
0 → 100644
View file @
153e0c79
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.logger
import
log
from
unify_api.modules.product_info.procedures.hardware_pds
import
(
get_user_hardware_info
,
hardware_statistics
)
async
def
check_company_exist
(
company_id
):
'''
判断工厂是否存在
'''
raw_sql
=
"select count(*) as company_count from company where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
company_count
=
await
conn
.
fetchone
(
sql
=
raw_sql
,
args
=
(
company_id
))
return
company_count
.
get
(
'company_count'
)
>
0
async
def
equip_management_list
(
company_id
,
page_num
,
page_size
):
'''
获取设备管理的监测点列表,先保留老的写法,后面1.0改版的时候统一改
'''
datas
=
await
get_user_hardware_info
(
company_id
,
page_num
,
page_size
)
return_fields
=
(
"installed_location"
,
"device_number"
,
"wiring_type"
,
"ct_change"
,
"pt_change"
,
"rated_voltage"
,
"start_time"
)
return_datas
=
[]
for
data
in
datas
.
get
(
'rows'
):
return_one
=
{}
for
return_field
in
return_fields
:
return_one
[
return_field
]
=
data
.
get
(
return_field
)
return_datas
.
append
(
return_one
)
datas
[
'rows'
]
=
return_datas
return
datas
async
def
equip_management_total
(
company_id
):
'''
获取设备管理的汇总信息
'''
datas
=
await
hardware_statistics
(
company_id
)
return
datas
async
def
equip_run_list
(
company_id
,
point_ids
,
start_time
,
end_time
,
page_num
,
page_size
,
sort_field
,
sort_type
):
'''
获取设备运行记录
'''
async
with
MysqlUtil
()
as
conn
:
raw_sql
=
"SELECT {} from scope_equip_run_record s "
\
"left join (select pid,max(id) max_id "
\
"from scope_equip_run_record group by pid) sp "
\
"on s.pid = sp.pid "
\
"left join point p on s.pid=p.pid "
\
"left join monitor_reuse r on p.mtid = r.mtid "
\
"where "
\
"(p.cid=
%
s or r.cid =
%
s) and s.start_time "
\
"BETWEEN
%
s and
%
s and "
\
"(s.end_time > 0 or (s.end_time = 0 and s.id = sp.max_id)) "
if
point_ids
:
raw_sql
+=
" and s.pid in
%
s"
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
tuple
(
point_ids
)
)
else
:
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
)
# 先总数
count_sql
=
raw_sql
.
format
(
"count(*) as run_count"
,
""
)
count_result
=
await
conn
.
fetchone
(
sql
=
count_sql
,
args
=
args
)
list_result
=
[]
if
count_result
.
get
(
"run_count"
,
0
)
>
0
:
# 排序字段处理
if
sort_field
==
'point_name'
:
sort_field
=
'p.name'
elif
sort_field
==
'run_time'
:
sort_field
=
'(s.end_time-s.start_time)'
# 再分页列表
raw_sql
=
raw_sql
.
format
(
"s.pid,p.name point_name,s.start_time,s.end_time"
,
)
raw_sql
+=
" order by {} {} LIMIT
%
s OFFSET
%
s"
.
format
(
sort_field
,
sort_type
)
if
point_ids
:
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
tuple
(
point_ids
),
page_size
,
(
page_num
-
1
)
*
page_size
)
else
:
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
page_size
,
(
page_num
-
1
)
*
page_size
)
list_result
=
await
conn
.
fetchall
(
sql
=
raw_sql
,
args
=
args
)
return
list_result
,
count_result
.
get
(
"run_count"
,
0
)
async
def
equip_run_statistics
(
company_id
,
point_ids
,
start_time
,
end_time
):
'''
获取运行统计数据
'''
dura_time
=
"case when end_time > 0 then end_time-start_time else 0 end"
async
with
MysqlUtil
()
as
conn
:
count_sql
=
f
"SELECT count(*) as total_count,"
\
f
"avg({dura_time}) as avg_time,"
\
f
"sum({dura_time}) as all_time,"
\
f
"max({dura_time}) as max_time "
\
"from scope_equip_run_record s "
\
"left join (select pid,max(id) max_id from "
\
"scope_equip_run_record group by pid) sp "
\
"on s.pid = sp.pid "
\
"left join point p on s.pid=p.pid "
\
"left join monitor_reuse r on p.mtid = r.mtid "
\
"where (p.cid=
%
s or r.cid =
%
s) "
\
"and s.start_time BETWEEN
%
s and
%
s and (s.end_time > 0 "
\
"or (s.end_time = 0 and s.id = sp.max_id)) "
if
point_ids
:
count_sql
+=
" and s.pid in
%
s"
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
tuple
(
point_ids
)
)
else
:
args
=
(
company_id
,
company_id
,
start_time
,
end_time
,
)
count_result
=
await
conn
.
fetchone
(
sql
=
count_sql
,
args
=
args
)
return
count_result
async
def
get_equip_run_status
(
point_id
):
'''
获取当前设备是否正在运行
'''
async
with
MysqlUtil
()
as
conn
:
# 是否非动力设备
power_equip_sql
=
"select is_power_equipment from monitor m "
\
"left join point p on m.mtid = p.mtid "
\
"where p.pid =
%
s"
power_equip_result
=
await
conn
.
fetchone
(
sql
=
power_equip_sql
,
args
=
(
point_id
,))
if
power_equip_result
.
get
(
"is_power_equipment"
,
0
)
==
0
:
return
2
raw_sql
=
"select count(*) run_count from scope_equip_run_record s "
\
"left join (select pid,max(id) max_id from "
\
"scope_equip_run_record group by pid) sp "
\
"on s.pid = sp.pid "
\
"WHERE s.pid=
%
s and start_time < unix_timestamp(NOW()) "
\
"and (end_time > unix_timestamp(NOW()) or "
\
"(end_time=0 and id=max_id)) "
result
=
await
conn
.
fetchone
(
sql
=
raw_sql
,
args
=
(
point_id
,))
return
1
if
result
.
get
(
"run_count"
)
>
0
else
0
unify_api/modules/anshiu/procedures/fine_monitor_pds.py
0 → 100644
View file @
153e0c79
from
pot_libs.logger
import
log
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.es_util.es_utils
import
EsUtil
from
unify_api
import
constants
async
def
get_location_by_ids
(
location_ids
):
'''
根据location_id获取各项温度信息
'''
async
with
MysqlUtil
()
as
conn
:
sql
=
"select id,item,type from location where id in
%
s"
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
location_ids
),))
location_info
=
{
res
.
get
(
'id'
):
res
for
res
in
result
}
return
location_info
async
def
get_threshold_by_location
(
location_ids
,
type
=
'overResidualCurrent'
,
default_threshold
=
30
):
'''
根据location_id获取阈值
'''
async
with
MysqlUtil
()
as
conn
:
sql
=
"select threshold from alarm_setting where location_id in
%
s "
\
"and type =
%
s limit 1 "
settings
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
location_ids
),
type
))
if
settings
:
return
settings
[
0
][
"threshold"
]
return
default_threshold
async
def
get_es_aiao_15min_data
(
query_body
):
'''
从es中获取环境相关数据(15min)
'''
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
LOCATION_15MIN_AIAO
)
return
es_results
.
get
(
"aggregations"
,
{})
async
def
get_es_point_15min_data
(
query_body
):
'''
从es中获取电力相关数据(15min)
'''
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_15MIN_INDEX
)
return
es_results
.
get
(
"aggregations"
,
{})
async
def
get_es_aiao_1min_data
(
query_body
,
start
):
'''
从es中获取环境相关数据(1min)
'''
async
with
EsUtil
()
as
es
:
# 环境相关qmin的数据需要分表查询
p_database
=
"poweriot_location_1min_aiao_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
p_database
)
return
es_results
.
get
(
"hits"
,
{})
.
get
(
"hits"
,
{})
async
def
get_es_point_1min_data
(
query_body
,
start
):
'''
从es中获取电气相关数据(1min)
'''
async
with
EsUtil
()
as
es
:
# 电气相关数据1min的需要分表查询
p_database
=
"poweriot_point_1min_index_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
p_database
)
return
es_results
.
get
(
"hits"
,
{})
.
get
(
"hits"
,
{})
unify_api/modules/anshiu/procedures/scope_operations_pds.py
0 → 100644
View file @
153e0c79
import
json
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.logger
import
log
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
unify_api
import
constants
async
def
get_scope_config_by_pid
(
pid
):
'''
获取某个设备的录波配置
'''
async
with
MysqlUtil
()
as
conn
:
sql
=
"select threshold from scope_config_record where pid =
%
s"
result
=
await
conn
.
fetchone
(
sql
=
sql
,
args
=
(
pid
,))
return
result
.
get
(
'threshold'
,
{})
if
result
else
{}
async
def
set_scope_config_by_pid
(
pid
,
config
):
'''
设置录波配置
'''
try
:
async
with
MysqlUtil
()
as
conn
:
sql
=
"update scope_config_record set threshold=
%
s where pid=
%
s"
result
=
await
conn
.
execute
(
sql
=
sql
,
args
=
(
config
,
pid
))
except
Exception
as
e
:
log
.
error
(
'set_scope_config_by_pid update error:'
+
str
(
e
))
return
-
1
return
result
async
def
add_scope_config_by_pid
(
pid
,
configs
):
'''
增加录波配置
'''
try
:
async
with
MysqlUtil
()
as
conn
:
sql
=
"INSERT INTO `power_iot`.`scope_config_record` (`pid`, "
\
"`coef`, `threshold`, `vol_cur`) VALUES (
%
s,
%
s,
%
s,
%
s)"
result
=
await
conn
.
execute
(
sql
=
sql
,
args
=
(
pid
,
json
.
dumps
(
configs
[
"coef"
]),
json
.
dumps
(
configs
[
"threshold"
]),
json
.
dumps
(
configs
)))
except
Exception
as
e
:
log
.
error
(
'add_scope_config_by_pid add error:'
+
str
(
e
))
return
-
1
return
result
async
def
get_scope_list_by_pid
(
pids
,
start_dt
,
end_dt
,
scope_g
=
"2s"
):
"""
ES查询2s录波记录数据
"""
query_body
=
{
"size"
:
10000
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"point_id"
:
pids
}},
{
"term"
:
{
"scope_g"
:
{
"value"
:
scope_g
}}},
{
"range"
:
{
"datetime"
:
{
"gte"
:
start_dt
,
"lt"
:
end_dt
}}}
]
}
},
"sort"
:
[{
"datetime"
:
{
"order"
:
"asc"
}}]
}
async
with
EsUtil
()
as
es
:
datas
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_1MIN_SCOPE
)
return
datas
[
"hits"
][
"hits"
]
\ No newline at end of file
unify_api/modules/anshiu/service/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/service/equip_management_serv.py
0 → 100644
View file @
153e0c79
from
unify_api.modules.anshiu.procedures.equip_management_pds
import
\
equip_run_list
,
equip_run_statistics
from
unify_api.modules.anshiu.components.equip_management_cps
import
\
EquipRunInfo
from
unify_api.utils.time_format
import
get_time_duration
,
get_datetime_str
,
\
get_date_timestamp
,
get_time_duration_by_str
async
def
equip_run_list_serv
(
company_id
,
point_ids
,
start_time
,
end_time
,
page_num
,
page_size
,
sort_field
,
sort_type
):
'''
获取运行统计列表
'''
start_time
=
get_date_timestamp
(
start_time
)
end_time
=
get_date_timestamp
(
end_time
)
rows
,
total
=
await
equip_run_list
(
company_id
,
point_ids
,
start_time
,
end_time
,
page_num
,
page_size
,
sort_field
,
sort_type
)
run_lists
=
[]
for
row
in
rows
:
run_time
=
get_time_duration
(
row
[
'start_time'
],
row
[
'end_time'
])
if
\
row
[
'start_time'
]
and
row
[
'end_time'
]
else
''
start_time_one
=
get_datetime_str
(
row
[
'start_time'
])
if
row
[
'start_time'
]
else
''
end_time_one
=
get_datetime_str
(
row
[
'end_time'
])
if
row
[
'end_time'
]
\
else
''
equip_run_info
=
EquipRunInfo
(
point_name
=
row
[
'point_name'
],
start_time
=
start_time_one
,
end_time
=
end_time_one
,
run_time
=
run_time
)
run_lists
.
append
(
equip_run_info
)
return
run_lists
,
total
async
def
equip_run_statistics_serv
(
company_id
,
point_ids
,
start_time
,
end_time
):
'''
运行统计数据
'''
start_time
=
get_date_timestamp
(
start_time
)
end_time
=
get_date_timestamp
(
end_time
)
data
=
await
equip_run_statistics
(
company_id
,
point_ids
,
start_time
,
end_time
)
data
[
'all_time'
]
=
get_time_duration_by_str
(
data
[
'all_time'
])
if
data
[
'total_count'
]
else
''
data
[
'avg_time'
]
=
get_time_duration_by_str
(
data
[
'avg_time'
])
if
data
[
'total_count'
]
else
''
data
[
'max_time'
]
=
get_time_duration_by_str
(
data
[
'max_time'
])
if
data
[
'total_count'
]
else
''
return
data
unify_api/modules/anshiu/service/fine_monitor_serv.py
0 → 100644
View file @
153e0c79
from
operator
import
itemgetter
from
itertools
import
groupby
from
pot_libs.common.components.query
import
Range
,
Equal
,
Filter
,
InGroup
,
\
Sort
from
pot_libs.es_util
import
es_helper
from
pot_libs.es_util.es_query
import
EsQuery
from
pot_libs.logger
import
log
from
pot_libs.common.components.query
import
PageRequest
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
unify_api.utils
import
time_format
from
unify_api.modules.electric.procedures.electric_util
import
(
get_wiring_type
)
from
unify_api.modules.anshiu.components.fine_monitor_cps
import
(
Statistics
,
Chart
)
from
unify_api.modules.anshiu.procedures.fine_monitor_pds
import
(
get_es_aiao_1min_data
,
get_es_point_1min_data
,
get_es_point_15min_data
,
get_es_aiao_15min_data
,
get_threshold_by_location
)
from
unify_api.utils.time_format
import
convert_timestamp_to_str
async
def
get_adio_chart_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
,
intervel
,
slots
):
'''
获取环境(温度与漏电流)的曲线数据
'''
# 工况标准,取其中一个漏电流阈值
residual_current_threhold
=
await
get_threshold_by_location
(
location_group
)
# 组装es请求信息
range
=
Range
(
field
=
"time"
,
start
=
start_timestamp
,
end
=
end_timestamp
)
in_group
=
InGroup
(
field
=
"location_id"
,
group
=
location_group
)
filter
=
Filter
(
equals
=
[],
ranges
=
[
range
],
in_groups
=
[
in_group
],
keywords
=
[])
if
intervel
>
60
:
# 取时间间隔为15min的数据
temp
,
res
=
await
get_adio_15min_chart_data
(
filter
,
residual_current_threhold
,
location_info
,
slots
,
intervel
)
else
:
# 取时间间隔为1min的数据
temp
,
res
=
await
get_adio_1min_chart_data
(
filter
,
residual_current_threhold
,
location_info
,
slots
,
start_timestamp
)
return
temp
,
res
async
def
get_adio_15min_chart_data
(
filter
,
residual_current_threhold
,
location_info
,
slots
,
intervel
):
'''
获取15min环境的曲线数据
'''
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
# 组装es请求信息
query_body
=
EsQuery
.
aggr_history
(
page_request
,
interval
=
intervel
,
stats_items
=
[
"value_max"
,
]
)
# 分组字段
format_field
=
"location_id_{}_aggs"
for
location_id
,
_
in
location_info
.
items
():
group_field
=
format_field
.
format
(
location_id
)
query_body
[
'aggs'
][
group_field
]
=
{
"filter"
:
{
"term"
:
{
"location_id"
:
location_id
}},
"aggs"
:
{
group_field
:
query_body
[
'aggs'
][
'aggs_name'
]
}
}
del
query_body
[
'aggs'
][
'aggs_name'
]
aggs_res
=
await
get_es_aiao_15min_data
(
query_body
)
# 时间类型
date_type
=
"day"
if
intervel
==
24
*
3600
else
"minute"
# 温度及漏电流
temperature_list
=
[]
residual_currents_list
=
[]
for
location_id
,
item_info
in
location_info
.
items
():
group_field
=
format_field
.
format
(
location_id
)
data_buckets
=
(
aggs_res
.
get
(
group_field
,
{})
.
get
(
group_field
,
{})
.
get
(
"buckets"
,
[])
)
# 时间戳转换为时分并作为键值返回
_data
=
es_helper
.
process_es_aggs_aiao
(
data_buckets
,
time_key
=
"key"
,
value_key
=
"value_max"
,
date_type
=
date_type
,
)
# 取最大值
his_data
=
[
_data
[
slot
]
.
get
(
'max'
,
""
)
if
slot
in
_data
else
""
for
slot
in
slots
]
adio_his
=
Chart
(
item
=
''
,
value_slots
=
his_data
)
if
item_info
.
get
(
"type"
)
==
"residual_current"
:
adio_his
.
item
=
"漏电流"
adio_his
.
threhold
=
residual_current_threhold
residual_currents_list
.
append
(
adio_his
)
else
:
adio_his
.
item
=
item_info
.
get
(
"item"
,
""
)
temperature_list
.
append
(
adio_his
)
return
temperature_list
,
residual_currents_list
async
def
get_adio_1min_chart_data
(
filter
,
residual_current_threhold
,
location_info
,
slots
,
start_timestamp
):
'''
获取1min环境的曲线数据
'''
sort
=
Sort
(
field
=
'location_id'
,
direction
=
'asc'
)
page_request
=
PageRequest
(
page_size
=
10000
,
page_num
=
1
,
sort
=
sort
,
filter
=
filter
)
query_body
=
EsQuery
.
query
(
page_request
)
es_time
=
convert_timestamp_to_str
(
start_timestamp
)
es_res
=
await
get_es_aiao_1min_data
(
query_body
,
es_time
)
# 温度及漏电流
temperature_list
=
[]
residual_currents_list
=
[]
es_res
=
[
v
.
get
(
'_source'
)
for
v
in
es_res
]
for
location_id
,
item
in
groupby
(
es_res
,
key
=
itemgetter
(
'location_id'
)):
# 转成数组的形式
item_dict
=
[
value
for
value
in
item
]
# 获取location类型
item_location_info
=
location_info
.
get
(
location_id
,
{})
# 时间戳转换为时分并作为键值返回
_data
=
es_helper
.
process_es_aggs_aiao
(
item_dict
,
time_key
=
"time"
,
value_key
=
"value"
,
date_type
=
"minute"
,
)
his_data
=
[
_data
.
get
(
slot
,
""
)
for
slot
in
slots
]
adio_his
=
Chart
(
item
=
''
,
value_slots
=
his_data
)
if
item_location_info
.
get
(
"type"
)
==
"residual_current"
:
adio_his
.
item
=
"漏电流"
adio_his
.
threhold
=
residual_current_threhold
residual_currents_list
.
append
(
adio_his
)
else
:
adio_his
.
item
=
item_location_info
.
get
(
"item"
,
""
)
temperature_list
.
append
(
adio_his
)
return
temperature_list
,
residual_currents_list
async
def
get_point_chart_data
(
point_id
,
start_timestamp
,
end_timestamp
,
\
intervel
,
slots
):
'''
获取电气量
'''
# 获取当前监测点的接表法
ctnum
,
_
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum , 监测点已经拆除"
)
# 返回的数值不在2,3中的,一般是装置点已经拆除。默认先给一个默认值3
ctnum
=
3
if
intervel
>
60
:
elec_data
=
await
get_point_15min_chart_data
(
point_id
,
ctnum
,
slots
,
start_timestamp
,
end_timestamp
,
intervel
)
else
:
elec_data
=
await
get_point_1min_chart_data
(
point_id
,
ctnum
,
slots
,
start_timestamp
,
end_timestamp
)
# 功率、电流、电压
power
,
i
,
u
=
[],
[],
[]
for
item
,
value_slots
in
elec_data
.
items
():
data
=
Chart
(
item
=
item
.
replace
(
"_mean"
,
""
),
value_slots
=
value_slots
)
if
item
.
startswith
(
"p"
)
or
item
.
startswith
(
"q"
):
power
.
append
(
data
)
elif
item
.
startswith
(
"i"
):
i
.
append
(
data
)
elif
item
.
startswith
(
"u"
):
u
.
append
((
data
))
return
power
,
i
,
u
,
ctnum
async
def
get_point_15min_chart_data
(
point_id
,
ctnum
,
slots
,
start_timestamp
,
end_timestamp
,
intervel
):
'''
获取15min电气量
'''
es_start
=
my_pendulum
.
from_timestamp
(
start_timestamp
)
es_end
=
my_pendulum
.
from_timestamp
(
end_timestamp
)
range
=
Range
(
field
=
"quarter_time"
,
start
=
es_start
,
end
=
es_end
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
# 统计字段
if
ctnum
==
2
:
stats_items
=
[
"pttl_mean"
,
"qttl_mean"
,
"uab_mean"
,
"ucb_mean"
,
"ia_mean"
,
"ic_mean"
,
]
else
:
stats_items
=
[
"pttl_mean"
,
"qttl_mean"
,
"ua_mean"
,
"ub_mean"
,
"uc_mean"
,
"ia_mean"
,
"ib_mean"
,
"ic_mean"
,
]
query_body
=
EsQuery
.
aggr_history_new
(
page_request
,
interval
=
intervel
,
stats_items
=
stats_items
,
histogram_field
=
"quarter_time"
,
)
aggs_res
=
await
get_es_point_15min_data
(
query_body
)
data_buckets
=
aggs_res
.
get
(
"aggs_name"
,
{})
.
get
(
"buckets"
,
[])
time_bucket_map
=
{
i
[
"key_as_string"
]:
i
for
i
in
data_buckets
}
log
.
info
(
f
"post_elec_history slots={slots} _data = "
f
"{list(time_bucket_map.keys())}, query_body={query_body}"
)
elec_data
=
{
stats_item
:
[]
for
stats_item
in
stats_items
}
for
slot
in
slots
:
if
slot
in
time_bucket_map
:
for
stats_item
in
stats_items
:
# 获取平均值
value
=
time_bucket_map
[
slot
]
.
get
(
stats_item
,
{})
.
get
(
"avg"
,
""
)
elec_data
[
stats_item
]
.
append
(
value
)
else
:
for
stats_item
in
stats_items
:
elec_data
[
stats_item
]
.
append
(
""
)
return
elec_data
async
def
get_point_1min_chart_data
(
point_id
,
ctnum
,
slots
,
start_timestamp
,
end_timestamp
):
'''
获取1min电气量
'''
range
=
Range
(
field
=
"time"
,
start
=
start_timestamp
,
end
=
end_timestamp
)
equal
=
Equal
(
field
=
"point_id"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
sort
=
Sort
(
field
=
'time'
,
direction
=
'asc'
)
page_request
=
PageRequest
(
page_size
=
10000
,
page_num
=
1
,
sort
=
sort
,
filter
=
filter
)
# 统计字段
if
ctnum
==
2
:
stats_items
=
[
"pttl"
,
"qttl"
,
"uab"
,
"ucb"
,
"ia"
,
"ic"
,
]
else
:
stats_items
=
[
"pttl"
,
"qttl"
,
"ua"
,
"ub"
,
"uc"
,
"ia"
,
"ib"
,
"ic"
,
]
query_body
=
EsQuery
.
query
(
page_request
)
es_time
=
convert_timestamp_to_str
(
start_timestamp
)
es_res
=
await
get_es_point_1min_data
(
query_body
,
es_time
)
_data
=
{}
for
res
in
es_res
:
_time
=
convert_timestamp_to_str
(
res
.
get
(
'_source'
,
{})
.
get
(
'time'
,
0
),
date_type
=
'minute'
)
_data
[
_time
]
=
res
.
get
(
'_source'
)
elec_data
=
{}
for
slot
in
slots
:
for
stats_item
in
stats_items
:
data
=
_data
.
get
(
slot
)
.
get
(
stats_item
,
""
)
if
slot
in
_data
else
""
elec_data
.
setdefault
(
stats_item
,
[])
.
append
(
data
)
return
elec_data
async
def
get_adio_info_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
):
'''
获取环境相关数据
'''
range
=
Range
(
field
=
"time"
,
start
=
start_timestamp
,
end
=
end_timestamp
)
# 分别统计各个location温度最大值、最小值、平均值
stats_info
=
{}
for
location_id
in
location_group
:
equal
=
Equal
(
field
=
"location_id"
,
value
=
location_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
query_body
=
EsQuery
.
aggr_index
(
page_request
,
stats_items
=
[
"value_max"
,
"value_min"
,
"value_avg"
]
)
aggregations
=
await
get_es_aiao_15min_data
(
query_body
)
# 最大值, 这里叫法有点奇怪,但是最大值应该取15min的最大值聚合结果
max_info
=
aggregations
.
get
(
"value_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max
=
source
.
get
(
"value_max"
,
0
)
max
=
round
(
max
,
2
)
if
max
is
not
None
else
""
max_ts
=
source
.
get
(
"value_max_time"
,
0
)
max_value_time
=
str
(
time_format
.
convert_to_dt
(
max_ts
))
else
:
max
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
"value_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min
=
source
.
get
(
"value_min"
,
0
)
min
=
round
(
min
,
2
)
if
min
is
not
None
else
""
min_ts
=
source
.
get
(
"value_min_time"
,
0
)
min_value_time
=
str
(
time_format
.
convert_to_dt
(
min_ts
))
else
:
min
=
""
min_value_time
=
""
avg
=
aggregations
.
get
(
"value_avg_avg"
,
{})
.
get
(
"value"
)
avg
=
round
(
avg
,
2
)
if
avg
is
not
None
else
""
stats_info
[
location_id
]
=
{
"max"
:
{
"value"
:
max
,
"time"
:
max_value_time
},
"min"
:
{
"value"
:
min
,
"time"
:
min_value_time
},
"avg"
:
avg
,
}
# 返回
adio_indexes
=
[]
for
location_id
,
info
in
location_info
.
items
():
item
,
type
=
info
[
"item"
],
info
[
"type"
]
# 漏电流的item更改一下
item
=
'漏电流'
if
type
==
'residual_current'
else
item
_info
=
stats_info
[
location_id
]
adio_index
=
Statistics
(
type
=
type
,
item
=
item
,
max
=
_info
[
"max"
][
"value"
],
max_time
=
_info
[
"max"
][
"time"
],
min
=
_info
[
"min"
][
"value"
],
min_time
=
_info
[
"min"
][
"time"
],
avg
=
_info
[
"avg"
],
)
adio_indexes
.
append
(
adio_index
)
return
adio_indexes
async
def
get_point_info_data
(
point_id
,
start_time
,
end_time
):
# 2. 获取几表法
ctnum
,
_
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum, 装置点已经被拆!"
)
# 给默认值3表法
ctnum
=
3
range
=
Range
(
field
=
"quarter_time"
,
start
=
start_time
,
end
=
end_time
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if
ctnum
==
2
:
stats_items
=
[
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"uab_mean"
,
"uab_min"
,
"uab_max"
,
"ucb_mean"
,
"ucb_min"
,
"ucb_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
]
else
:
stats_items
=
[
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"ua_mean"
,
"ua_min"
,
"ua_max"
,
"ub_mean"
,
"ub_min"
,
"ub_max"
,
"uc_mean"
,
"uc_min"
,
"uc_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ib_mean"
,
"ib_min"
,
"ib_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
]
query_body
=
EsQuery
.
aggr_index
(
page_request
,
stats_items
=
stats_items
)
aggregations
=
await
get_es_point_15min_data
(
query_body
)
# 常规参数统计
common_indexes
=
[]
_stats_items
=
{
i
.
rsplit
(
"_"
,
1
)[
0
]
for
i
in
stats_items
}
for
item
in
_stats_items
:
# 最大值
max_info
=
aggregations
.
get
(
f
"{item}_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max_value
=
source
.
get
(
f
"{item}_max"
,
""
)
max_dt
=
source
.
get
(
f
"{item}_max_time"
)
if
max_dt
is
None
:
log
.
error
(
f
"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}"
)
max_value_time
=
str
(
time_format
.
convert_to_dt
(
max_dt
)
if
max_dt
else
""
)
else
:
max_value
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
f
"{item}_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min_value
=
source
.
get
(
f
"{item}_min"
)
min_value
=
min_value
if
min_value
is
not
None
else
""
min_dt
=
source
.
get
(
f
"{item}_min_time"
)
min_value_time
=
str
(
time_format
.
convert_to_dt
(
min_dt
)
if
min_dt
else
""
)
else
:
min_value
=
""
min_value_time
=
""
# 平均值
avg
=
aggregations
.
get
(
f
"{item}_mean_avg"
,
{})
.
get
(
"value"
)
avg
=
round
(
avg
,
2
)
if
avg
is
not
None
else
""
elec_index
=
Statistics
(
item
=
item
,
max
=
max_value
,
max_time
=
max_value_time
,
min
=
min_value
,
min_time
=
min_value_time
,
avg
=
avg
,
)
common_indexes
.
append
(
elec_index
)
return
common_indexes
unify_api/modules/anshiu/service/scope_operations_serv.py
0 → 100644
View file @
153e0c79
import
json
import
time
import
pandas
as
pd
from
pandas.core.dtypes.inference
import
is_number
from
pot_libs.aiomqtt_util.hbmqtt_utils
import
MqttUtil
from
pot_libs.aredis_util
import
aredis_utils
from
pot_libs.common.components.query
import
PageRequest
,
Filter
,
InGroup
,
\
Range
,
Equal
,
Sort
from
pot_libs.es_util.es_query
import
EsQuery
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.logger
import
log
from
pot_libs.utils.exc_util
import
BusinessException
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
pot_libs.utils.time_format
import
convert_dt_to_timestr
,
\
convert_to_es_str
,
time_str_to_str
from
unify_api.modules.anshiu.components.scope_operations_cps
import
\
ScopeListItem
,
ScopeContent
,
ScopeDetailsResp
,
GetScopeConfigList
,
\
init_scope_config_example
,
ScopeItemDownload
from
unify_api.modules.anshiu.procedures.scope_operations_pds
import
\
get_scope_config_by_pid
,
set_scope_config_by_pid
,
add_scope_config_by_pid
,
\
get_scope_list_by_pid
from
unify_api.modules.common.dao.common_dao
import
point_by_points
,
\
points_by_cid
from
unify_api.modules.device_cloud.procedures.mqtt_helper
import
\
change_param_to_config
from
unify_api.modules.zhiwei_u
import
config
from
unify_api.modules.zhiwei_u.dao.data_es_dao
import
query_search_scope_pids
,
\
query_search_scope
from
unify_api.modules.zhiwei_u.service.scope_operations_service
import
\
scope_detail_service
from
unify_api.utils
import
time_format
from
unify_api.utils.time_format
import
get_time_duration
,
\
get_current_datetime_str
,
convert_str_to_timestamp
async
def
search_scope_service
(
pids
,
cid
,
page_num
,
page_size
,
start
,
end
,
scope_g
):
'''
获取录波记录
'''
if
len
(
pids
)
==
0
:
datas
=
await
query_search_scope
([
cid
],
''
,
page_num
,
page_size
,
start
,
end
,
scope_g
)
points
=
await
points_by_cid
([
cid
])
else
:
datas
=
await
query_search_scope_pids
(
pids
,
page_num
,
page_size
,
start
,
end
,
scope_g
)
points
=
await
point_by_points
(
list
(
set
(
pids
)))
if
not
datas
[
"hits"
][
"hits"
]:
return
[],
0
total
=
datas
[
"hits"
][
"total"
]
# 获取监测点数据
points
=
{
point
[
'pid'
]:
point
for
point
in
points
}
# 获取录波明细数据以获取颗粒度
# scope_type_items = await get_scope_type(pids, cid, start, end)
rows
=
[]
for
info
in
datas
[
"hits"
][
"hits"
]:
pid
=
info
[
"_source"
][
"point_id"
]
message
=
info
[
"_source"
][
"message"
]
check_dt
=
info
[
"_source"
][
"datetime"
]
# scope_type_key = "%s_%s" % (pid, info["_source"]["time"])
# scope_type = scope_type_items.get(scope_type_key, 0)
scope_type
=
info
[
"_source"
]
.
get
(
"scope_g"
,
''
)
if
scope_type
==
'200ms'
:
scope_type
=
'0.2s'
dt
=
time_format
.
convert_to_dt
(
check_dt
)
check_dt
=
time_format
.
convert_dt_to_timestr
(
dt
)
scope_list_item
=
ScopeListItem
(
check_dt
=
check_dt
,
scope_type
=
scope_type
,
point
=
points
[
pid
]
.
get
(
"name"
),
message
=
message
,
doc_id
=
info
[
"_id"
]
)
rows
.
append
(
scope_list_item
)
return
rows
,
total
async
def
scope_list_download_data
(
pids
,
start
,
end
):
"""
获取录波下载数据
"""
duration
=
get_time_duration
(
start
,
end
,
False
,
False
)
if
duration
>
24
*
60
*
60
:
raise
BusinessException
(
message
=
'只能支持下载一天的录波数据'
)
start_dt
=
convert_to_es_str
(
start
)
end_dt
=
convert_to_es_str
(
end
)
datas
=
await
get_scope_list_by_pid
(
pids
,
start_dt
,
end_dt
)
scope_data
=
[]
scope_fields
=
[
"ua"
,
"ub"
,
"uc"
,
"ia"
,
"ib"
,
"ic"
,
"pttl"
,
"lc"
,
]
for
hit
in
datas
:
_source
=
hit
[
"_source"
]
context
=
json
.
loads
(
_source
[
"context"
])
# 只导出录波的数据
wave_data
=
context
.
get
(
"ia"
)
if
not
wave_data
:
continue
for
i
,
_
in
enumerate
(
wave_data
):
datetime
=
_source
[
"datetime"
][:
19
]
.
replace
(
'T'
,
' '
)
one_scope_data
=
{}
for
field
in
scope_fields
:
if
field
in
context
:
value
=
context
.
get
(
field
)[
i
]
value
=
""
if
pd
.
isna
(
value
)
else
value
else
:
value
=
None
one_scope_data
[
field
]
=
value
item
=
ScopeItemDownload
(
datetime
=
datetime
,
**
one_scope_data
)
scope_data
.
append
(
item
)
return
scope_data
async
def
scope_detail_data
(
doc_id
):
'''
识别详情
'''
# 暂时先默认查询所有的
wave_range
=
"all"
scope_detail_obj
=
await
scope_detail_service
(
doc_id
,
wave_range
)
# 录波颗粒度
scope_type
=
scope_detail_obj
.
scope_g
if
scope_type
==
'200ms'
:
scope_type
=
'0.2s'
if
scope_detail_obj
.
ctnum
==
2
:
i_fields
=
[
'ia'
,
'ic'
]
v_fields
=
[
'uab'
,
'ucb'
]
else
:
i_fields
=
[
'ia'
,
'ib'
,
'ic'
]
v_fields
=
[
'ua'
,
'ub'
,
'uc'
]
i
=
[
ScopeContent
(
item
=
item
,
value_datas
=
getattr
(
scope_detail_obj
.
contents
,
item
,
[]))
for
item
in
i_fields
]
v
=
[
ScopeContent
(
item
=
item
,
value_datas
=
getattr
(
scope_detail_obj
.
contents
,
item
,
[]))
for
item
in
v_fields
]
# 漏电流及功率
if
scope_type
==
'2s'
:
residual_current
=
[
float
(
str
(
v
)
.
replace
(
'nan'
,
'0'
))
for
v
in
scope_detail_obj
.
contents
.
lc
]
residual_current
=
[
ScopeContent
(
item
=
'漏电流'
,
value_datas
=
residual_current
)]
power
=
[
float
(
str
(
v
)
.
replace
(
'nan'
,
'0'
))
for
v
in
scope_detail_obj
.
contents
.
pttl
]
power
=
[
ScopeContent
(
item
=
'总有功功率'
,
value_datas
=
power
)]
else
:
residual_current
=
[]
power
=
[]
# 当触发项为漏电流或者功率的时候 item为空
if
scope_detail_obj
.
type
.
find
(
'漏电流'
)
>=
0
or
scope_detail_obj
.
type
.
find
(
'功率'
)
>=
0
:
item
=
''
else
:
item
=
scope_detail_obj
.
item
return
ScopeDetailsResp
(
point
=
scope_detail_obj
.
alarm
.
point
,
contin_time
=
scope_detail_obj
.
alarm
.
contin_time
,
scope_type
=
scope_type
,
check_dt
=
scope_detail_obj
.
alarm
.
check_dt
,
ctnum
=
scope_detail_obj
.
ctnum
,
location
=
scope_detail_obj
.
location
,
item
=
item
,
type
=
scope_detail_obj
.
type
,
i
=
i
,
v
=
v
,
residual_current
=
residual_current
,
p
=
power
)
async
def
get_scope_type
(
pids
,
cid
,
start
,
end
):
'''
获取录波颗粒度
'''
if
len
(
pids
)
==
0
:
equal
=
[
Equal
(
field
=
"cid"
,
value
=
cid
)]
groups
=
[]
else
:
equal
=
[]
groups
=
[
InGroup
(
field
=
'point_id'
,
group
=
pids
)]
if
start
and
end
:
start_time
=
convert_str_to_timestamp
(
start
)
end_time
=
convert_str_to_timestamp
(
end
)
range
=
[
Range
(
field
=
'time'
,
start
=
start_time
,
end
=
end_time
)]
else
:
range
=
[]
filter
=
Filter
(
equals
=
equal
,
keywords
=
[],
ranges
=
range
,
in_groups
=
groups
)
page_request
=
PageRequest
(
page_size
=
10000
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
query_body
=
EsQuery
.
query
(
page_request
)
query_body
[
'_source'
]
=
[
"point_id"
,
"time"
,
"scope_g"
]
rows
=
{}
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
config
.
POINT_1MIN_SCOPE
)
if
not
es_results
[
"hits"
][
"hits"
]:
return
{}
for
info
in
es_results
[
"hits"
][
"hits"
]:
scope_type_key
=
"
%
s_
%
s"
%
(
info
[
"_source"
][
"point_id"
],
info
[
"_source"
][
"time"
])
rows
[
scope_type_key
]
=
info
[
"_source"
]
.
get
(
"scope_g"
,
'0.25ms'
)
or
'0.25ms'
return
rows
async
def
set_scope_config_serv
(
pid
,
type
,
scope_type
,
args
):
'''
设置某个设备的录波配置
'''
# 先查看缓存中是否存在,如果存在直接返回
redis_key
=
f
"scope_config:{pid}"
redis_result
=
await
aredis_utils
.
RedisClient
()
.
get
(
redis_key
)
log
.
info
(
f
'set_scope_config_serv redis_result:{redis_result}'
)
if
redis_result
:
raise
Exception
(
'两次操作间隔3分钟,请稍后再试。'
)
# 先获取录波的配置
configs
=
await
get_scope_config_by_pid
(
pid
)
# 如果存在,则解析,如果不存在,初始化
if
configs
:
configs
=
json
.
loads
(
configs
)
if
not
configs
or
'2s'
not
in
configs
or
'0.25ms'
not
in
configs
or
\
'0.2s'
not
in
configs
:
try
:
configs
=
await
init_scope_config
(
pid
)
except
Exception
as
e
:
raise
Exception
(
'初始化配置信息出错:'
+
str
(
e
))
# # 重置配置信息
if
type
==
'time'
:
if
scope_type
!=
'2s'
:
raise
Exception
(
'只有录波颗粒度为2s的支持时间设置'
)
# 时间配置
for
field
,
item
in
args
.
items
():
if
field
==
'one_time'
:
start_field
,
stop_field
=
'start_time_I'
,
'stop_time_I'
elif
field
==
'two_time'
:
start_field
,
stop_field
=
'start_time_II'
,
'stop_time_II'
elif
field
==
'three_time'
:
start_field
,
stop_field
=
'start_time_III'
,
'stop_time_III'
else
:
continue
# 验证时间差
if
len
(
item
)
not
in
[
0
,
2
]:
log
.
error
(
'set_scope_config_serv time_param error'
+
str
(
item
))
raise
Exception
(
'参数错误'
)
if
len
(
item
)
==
2
and
(
item
[
0
]
or
item
[
1
]):
try
:
today
=
get_current_datetime_str
(
"
%
Y-
%
m-
%
d"
)
duration
=
get_time_duration
(
'
%
s
%
s:00'
%
(
today
,
item
[
0
]),
'
%
s
%
s:00'
%
(
today
,
item
[
1
]),
False
,
False
)
except
Exception
as
e
:
log
.
error
(
'set_scope_config_serv time_format error'
+
str
(
item
))
raise
Exception
(
'时间格式错误'
)
if
duration
>
3600
:
log
.
error
(
'set_scope_config_serv time_format error'
+
str
(
item
))
raise
Exception
(
'时间差不能大于一个小时'
)
start_value
=
item
[
0
]
if
len
(
item
)
==
2
and
item
[
0
]
else
'00:00'
stop_value
=
item
[
1
]
if
len
(
item
)
==
2
and
item
[
1
]
else
'00:00'
configs
[
scope_type
][
start_field
]
=
f
'{start_value}:00'
configs
[
scope_type
][
stop_field
]
=
f
'{stop_value}:00'
else
:
# 其他配置
if
type
==
'state'
:
# 状态对应字段
state_fields
=
{
'0.25ms'
:
'en_scope'
,
'0.2s'
:
'en_wave_200ms'
,
'2s'
:
'en_electric_2s'
}
args
[
state_fields
[
scope_type
]]
=
args
[
'state'
]
for
field
,
item
in
args
.
items
():
if
field
in
configs
[
scope_type
]:
configs
[
scope_type
][
field
]
=
item
# 配置下发(只有正式环境才开启)
await
set_mqtt_scope_config
(
pid
,
configs
,
scope_type
)
# 将数据存入缓存中
await
aredis_utils
.
RedisClient
()
.
setex
(
redis_key
,
180
,
pid
)
# 保存配置信息
new_configs
=
json
.
dumps
(
configs
)
update_count
=
await
set_scope_config_by_pid
(
pid
,
new_configs
)
if
update_count
<
0
:
log
.
error
(
f
'set_scope_config_serv update sql error pid:{pid},'
f
'new_configs={new_configs}'
)
raise
Exception
(
'操作失败'
)
async
def
get_scope_config_serv
(
pid
):
'''
获取录波配置
'''
configs
=
await
get_scope_config_by_pid
(
pid
)
# 如果存在,则解析,
if
configs
:
configs
=
json
.
loads
(
configs
)
# 如果不存在,初始化
if
not
configs
or
'2s'
not
in
configs
or
'0.25ms'
not
in
configs
or
\
'0.2s'
not
in
configs
:
try
:
configs
=
await
init_scope_config
(
pid
)
except
Exception
as
e
:
configs
=
init_scope_config_example
[
"threshold"
]
# raise Exception('初始化配置信息出错:' + str(e))
# 格式化数据返回
return_data
=
{}
# 字段解释
fields
=
{
'umin'
:
[
'越下限阈值'
,
'v'
],
'umax'
:
[
'越上限阈值'
,
'v'
],
'ugap'
:
[
'波动阈值'
,
'v'
],
'imax'
:
[
'越限阈值'
,
'i'
],
'igap'
:
[
'波动阈值'
,
'i'
],
'lcmax'
:
[
'越限阈值'
,
'residual_current'
],
'lcgap'
:
[
'波动阈值'
,
'residual_current'
],
'pttlmax'
:
[
'越限阈值'
,
'power'
],
'pttlgap'
:
[
'波动阈值'
,
'power'
],
}
for
scope_type
,
config
in
configs
.
items
():
one_config
=
{}
for
field
,
data
in
fields
.
items
():
if
field
in
config
:
one_config
[
field
]
=
{
'name'
:
data
[
0
],
'type'
:
data
[
1
],
'value'
:
config
.
get
(
field
)}
# 状态显示
if
scope_type
==
'0.25ms'
:
state
=
config
.
get
(
'en_scope'
,
0
)
elif
scope_type
==
'0.2s'
:
state
=
config
.
get
(
'en_wave_200ms'
,
0
)
elif
scope_type
==
'2s'
:
state
=
config
.
get
(
'en_electric_2s'
,
0
)
else
:
state
=
config
.
get
(
'state'
,
0
)
# 时间段显示
if
scope_type
==
'2s'
:
config
=
{
conf_key
:
conf_v
.
replace
(
'00:00:00'
,
''
)[
0
:
5
]
for
conf_key
,
conf_v
in
config
.
items
()
if
isinstance
(
conf_v
,
str
)}
one_config
[
'one_time'
]
=
{
'name'
:
'第一个时间段'
,
'type'
:
'time'
,
'value'
:
[
config
.
get
(
'start_time_I'
),
config
.
get
(
'stop_time_I'
)]}
one_config
[
'two_time'
]
=
{
'name'
:
'第二个时间段'
,
'type'
:
'time'
,
'value'
:
[
config
.
get
(
'start_time_II'
),
config
.
get
(
'stop_time_II'
)]}
one_config
[
'three_time'
]
=
{
'name'
:
'第三个时间段'
,
'type'
:
'time'
,
'value'
:
[
config
.
get
(
'start_time_III'
),
config
.
get
(
'stop_time_III'
)]}
one_data
=
GetScopeConfigList
(
state
=
state
,
configs
=
one_config
)
return_data
[
scope_type
]
=
one_data
return
return_data
async
def
init_scope_config
(
pid
,
update_sql
=
True
):
'''
初始化录波配置
'''
# 获取配置信息
new_configs
=
await
get_mqtt_scope_config
(
pid
)
# 更新数据库配置信息
if
update_sql
:
configs
=
await
get_scope_config_by_pid
(
pid
)
if
configs
:
update_count
=
await
set_scope_config_by_pid
(
pid
,
json
.
dumps
(
new_configs
[
"threshold"
]))
else
:
update_count
=
await
add_scope_config_by_pid
(
pid
,
new_configs
)
if
update_count
<
0
:
log
.
error
(
f
'set_scope_config_serv update sql error pid:{pid},'
f
'new_configs={new_configs}'
)
return
new_configs
[
"threshold"
]
async
def
get_mqtt_scope_config
(
pid
):
'''
获取录波配置信息
'''
# 封装数据格式
pub_dict
=
dict
()
pub_dict
[
'point_id'
]
=
pid
pub_dict
[
'scope_type'
]
=
''
pub_json
=
await
change_param_to_config
(
pub_dict
,
method
=
'get'
,
type
=
'scope'
)
if
not
pub_json
:
log
.
error
(
'get_mqtt_scope_config pub_json error pid:'
+
str
(
pid
))
raise
Exception
(
'找不到设备信息'
)
# 从协议中获取配置信息
res_data
=
await
mqtt_func
(
pub_json
,
pid
)
if
not
res_data
.
get
(
'data'
):
raise
Exception
(
'请求数据为空'
)
sid
=
pub_json
.
get
(
"sid"
)
# 转换数据字段
trans_fields
=
{
"0.25ms"
:
{
"scopeEnable"
:
"en_scope"
,
"scoplimit"
:
"scop_limit"
},
"0.2s"
:
{
"scopeEnable"
:
"en_wave_200ms"
,
"scoplimit"
:
"scop_limit"
},
"2s"
:
{
"scopeEnable"
:
"en_electric_2s"
}}
# 封装成数据库保存格式
configs
=
{
"0.25ms"
:
res_data
.
get
(
'data'
)
.
get
(
"scope"
)
.
get
(
"electric"
)
.
get
(
sid
)
.
get
(
"threshold"
),
"0.2s"
:
res_data
.
get
(
'data'
)
.
get
(
"wave_200ms"
)
.
get
(
"electric"
)
.
get
(
sid
)
.
get
(
"threshold"
),
"2s"
:
res_data
.
get
(
'data'
)
.
get
(
"electric_2s"
)
.
get
(
"electric"
)
.
get
(
sid
)
.
get
(
"threshold"
),
}
for
type
,
config
in
configs
.
items
():
trans_field
=
trans_fields
.
get
(
type
)
if
trans_field
:
for
key
,
trans_key
in
trans_field
.
items
():
if
key
in
config
:
configs
[
type
][
trans_key
]
=
config
[
key
]
del
configs
[
type
][
key
]
return
{
"coef"
:
res_data
.
get
(
'data'
)
.
get
(
"scope"
)
.
get
(
"electric"
)
.
get
(
sid
)
.
get
(
"coef"
),
'threshold'
:
configs
}
async
def
set_mqtt_scope_config
(
pid
,
configs
,
scope_type
):
'''
设置录波配置信息
'''
# 封装配置下发信息
pub_dict
=
configs
[
scope_type
]
pub_dict
[
'point_id'
]
=
pid
pub_dict
[
'scope_type'
]
=
scope_type
pub_json
=
await
change_param_to_config
(
pub_dict
,
method
=
'config'
,
type
=
'scope'
)
if
not
pub_json
:
log
.
error
(
'set_scope_config_serv pub_json error'
+
str
(
configs
[
scope_type
]))
raise
Exception
(
'找不到设备信息'
)
# 将配置信息下发
await
mqtt_func
(
pub_json
,
pid
)
async
def
mqtt_func
(
pub_json
,
pid
):
'''
请求mqtt服务器
'''
try
:
async
with
MqttUtil
()
as
emq
:
res_data
=
await
emq
.
device_response
(
sid
=
pub_json
.
get
(
"sid"
),
request_id
=
pub_json
.
get
(
"request_id"
),
data
=
pub_json
)
except
TimeoutError
:
log
.
error
(
'mqtt_func timeout error pid'
+
str
(
pid
))
raise
Exception
(
'请求超时'
)
except
Exception
as
e
:
log
.
error
(
'mqtt_func error:'
+
str
(
e
))
raise
Exception
(
'请求错误'
)
log
.
info
(
f
'mqtt_func res_data:{res_data}'
)
if
res_data
.
get
(
'status_code'
)
!=
200
:
log
.
error
(
'mqtt_func mqtt error pid'
+
str
(
pid
))
raise
Exception
(
'请求失败'
)
return
res_data
async
def
flush_scope_es_data
(
scope_g
,
start_time
,
end_time
):
'''
刷新es的颗粒度
'''
equal
=
Equal
(
field
=
'scope_g'
,
value
=
scope_g
)
sort
=
Sort
(
field
=
'time'
,
direction
=
'desc'
)
start_dt
=
my_pendulum
.
from_format
(
start_time
,
'YYYY-MM-DD HH:mm:ss'
)
end_dt
=
my_pendulum
.
from_format
(
end_time
,
'YYYY-MM-DD HH:mm:ss'
)
while
start_dt
<
end_dt
:
try
:
# 一天取一次
start_timestamp
=
start_dt
.
timestamp
()
print
(
start_dt
.
strftime
(
'YYYY-MM-DD HH:mm:ss'
))
every_end_dt
=
start_dt
.
add
(
days
=
1
)
end_timestamp
=
every_end_dt
.
timestamp
()
range
=
Range
(
field
=
'time'
,
start
=
start_timestamp
,
end
=
end_timestamp
)
# 先查出来
filter
=
Filter
(
equals
=
[
equal
],
keywords
=
[],
ranges
=
[
range
],
in_groups
=
[])
page_request
=
PageRequest
(
page_size
=
10000
,
page_num
=
1
,
sort
=
sort
,
filter
=
filter
)
query_body
=
EsQuery
.
query
(
page_request
)
query_body
[
'_source'
]
=
[
'_id'
]
async
with
EsUtil
()
as
es
:
search_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
config
.
POINT_1MIN_SCOPE
)
if
not
search_results
or
not
search_results
[
'hits'
][
'hits'
]:
continue
doc_ids
=
[
data
[
'_id'
]
for
data
in
search_results
[
'hits'
][
'hits'
]]
# 然后再刷新
update_body
=
{
"query"
:
{
"bool"
:
{
"must"
:
{
"terms"
:
{
"doc_id.keyword"
:
doc_ids
}
}
}
},
"script"
:
{
"source"
:
"ctx._source.scope_g = '
%
s'"
%
scope_g
}
}
async
with
EsUtil
()
as
es
:
update_results
=
await
es
.
update_by_query
(
body
=
update_body
,
index
=
config
.
SCOPE_DATABASE
)
if
not
update_results
or
update_results
.
get
(
'updated'
)
>
0
:
log
.
error
(
f
"flush_scope_es_data update_by_query count error:"
f
"{update_results.get('updated')}"
)
except
Exception
as
e
:
log
.
error
(
f
"flush_scope_es_data error :{str(e)}"
)
continue
finally
:
# 递增
start_dt
=
every_end_dt
unify_api/modules/anshiu/views/__init__.py
0 → 100644
View file @
153e0c79
unify_api/modules/anshiu/views/equip_management.py
0 → 100644
View file @
153e0c79
from
pot_libs.sanic_api
import
summary
,
description
,
examples
from
pot_libs.logger
import
log
from
unify_api.modules.anshiu.service.equip_management_serv
import
\
equip_run_list_serv
,
equip_run_statistics_serv
from
unify_api.modules.anshiu.components.equip_management_cps
import
(
EquipManagementTotalReq
,
EquipManagementListReq
,
EquipManagementTotalResp
,
EquipManagementListResp
,
EquipRunReq
,
EquipRunListResp
,
EquipRunStatusReq
,
EquipRunStatusResp
,
EquipRunStatisticsReq
,
EquipRunStatisticsResp
)
from
unify_api.modules.anshiu.procedures.equip_management_pds
import
(
check_company_exist
,
equip_management_list
,
equip_management_total
,
equip_run_list
,
get_equip_run_status
)
@
summary
(
"设备管理-获取设备统计信息"
)
async
def
post_equip_management_total
(
request
,
body
:
EquipManagementTotalReq
)
->
EquipManagementTotalResp
:
company_id
=
body
.
cid
total_info
=
await
equip_management_total
(
company_id
)
return
EquipManagementTotalResp
(
installed_number
=
total_info
[
"installed_number"
],
start_time
=
total_info
[
"start_time"
])
@
summary
(
"设备管理-获取设备列表/下载"
)
@
description
(
"列表的时候正常传页码,下载的时候is_download=1"
)
async
def
post_equip_management_list
(
request
,
body
:
EquipManagementListReq
)
->
EquipManagementListResp
:
company_id
=
body
.
cid
is_download
=
body
.
is_download
page_size
,
page_num
=
body
.
page_size
,
body
.
page_num
log
.
info
(
f
"post_equip_management_list company_id={company_id},page_size={page_size},page_num={page_num}"
)
# 下载(限制最大10000条)
if
is_download
==
1
:
page_num
,
page_size
=
1
,
10000
company_exist
=
await
check_company_exist
(
company_id
)
if
not
company_exist
:
return
EquipManagementListResp
.
user_error
()
page_map
=
await
equip_management_list
(
company_id
,
page_num
,
page_size
)
return
EquipManagementListResp
(
rows
=
page_map
[
"rows"
],
total
=
page_map
[
"total"
],
page_num
=
page_num
)
@
summary
(
"运行统计-列表"
)
@
description
(
"列表的时候正常传页码,下载的时候is_download=1"
)
async
def
post_equip_run_list
(
request
,
body
:
EquipRunReq
)
->
EquipRunListResp
:
try
:
company_id
=
body
.
cid
point_ids
=
body
.
point_ids
page_size
=
body
.
page_size
page_num
=
body
.
page_num
start_time
=
body
.
start
end_time
=
body
.
end
is_download
=
body
.
is_download
sort_field
=
body
.
sort_field
if
body
.
sort_field
and
is_download
==
0
\
else
'start_time'
sort_type
=
body
.
sort_type
if
body
.
sort_type
and
is_download
==
0
\
else
'desc'
# 未选中监测点直接返回
if
len
(
point_ids
)
==
0
:
return
EquipRunListResp
(
rows
=
[],
total
=
0
,
page_num
=
page_num
)
# 监测点选中全部
if
point_ids
[
0
]
==
-
1
:
point_ids
=
[]
# 未选中监测点且未选中工厂提示错误
if
len
(
point_ids
)
==
0
and
not
company_id
:
log
.
error
(
"post_scope_list_param_error pids:
%
s cid:
%
s"
%
(
point_ids
,
company_id
))
return
EquipRunListResp
.
error_param
()
# 下载(限制最大10000条)
if
is_download
==
1
:
page_num
,
page_size
=
1
,
10000
except
Exception
as
e
:
log
.
error
(
"post_equip_run_list_error :"
+
e
)
return
EquipRunListResp
.
param_error
()
# 从数据库中获取数据
rows
,
total
=
await
equip_run_list_serv
(
company_id
,
point_ids
,
start_time
,
end_time
,
page_num
,
page_size
,
sort_field
,
sort_type
)
return
EquipRunListResp
(
rows
=
rows
,
total
=
total
,
page_num
=
page_num
)
@
summary
(
"运行统计-获取统计数据"
)
async
def
post_equip_run_statistics
(
request
,
body
:
EquipRunStatisticsReq
)
->
EquipRunStatisticsResp
:
'''
获取运行统计数据
'''
cid
=
body
.
cid
start_time
=
body
.
start
end_time
=
body
.
end
point_ids
=
body
.
point_ids
# 未选中监测点直接返回
if
len
(
point_ids
)
==
0
:
return
EquipRunStatisticsResp
(
count
=
0
,
run_all_time
=
''
,
run_avg_time
=
''
,
run_max_time
=
''
)
# 监测点选中全部
if
point_ids
[
0
]
==
-
1
:
point_ids
=
[]
# 未选中监测点且未选中工厂提示错误
if
len
(
point_ids
)
==
0
and
not
cid
:
log
.
error
(
"post_scope_list_param_error pids:
%
s cid:
%
s"
%
(
point_ids
,
cid
))
return
EquipRunStatisticsResp
.
error_param
()
data
=
await
equip_run_statistics_serv
(
cid
,
point_ids
,
start_time
,
end_time
)
return
EquipRunStatisticsResp
(
count
=
data
[
'total_count'
],
run_all_time
=
data
[
'all_time'
],
run_avg_time
=
data
[
'avg_time'
],
run_max_time
=
data
[
'max_time'
])
@
summary
(
'获取检测点运行状态'
)
async
def
post_equit_run_status
(
request
,
body
:
EquipRunStatusReq
)
->
EquipRunStatusResp
:
point_id
=
body
.
point_id
if
not
point_id
or
point_id
<=
0
:
log
.
error
(
f
'get_equit_run_status_param_error point:{point_id}'
)
return
EquipRunStatusResp
.
param_error
()
status
=
await
get_equip_run_status
(
point_id
)
return
EquipRunStatusResp
(
is_run
=
status
)
unify_api/modules/anshiu/views/fine_monitor.py
0 → 100644
View file @
153e0c79
from
pot_libs.sanic_api
import
summary
,
examples
from
pot_libs.common.components.query
import
PageRequest
from
pot_libs.logger
import
log
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
unify_api.utils.request_util
import
filed_value_from_list
from
unify_api.utils
import
time_format
from
unify_api.modules.anshiu.components.fine_monitor_cps
import
(
FineMonitorChartReq
,
FineMonitorInfoReq
,
FineMonitorChartResp
,
FineMonitorInfoResp
)
from
unify_api.modules.anshiu.procedures.fine_monitor_pds
import
(
get_location_by_ids
,
get_threshold_by_location
)
from
unify_api.modules.anshiu.service.fine_monitor_serv
import
(
get_adio_chart_data
,
get_point_chart_data
,
get_adio_info_data
,
get_point_info_data
)
@
summary
(
"精细监测-五个图表"
)
async
def
post_fine_monitor_chart
(
request
,
body
:
FineMonitorChartReq
)
->
FineMonitorChartResp
:
try
:
date_start
=
body
.
start
date_end
=
body
.
end
# 起始时间转化为时间戳
start_timestamp
=
time_format
.
get_date_timestamp
(
date_start
)
end_timestamp
=
time_format
.
get_date_timestamp
(
date_end
)
# 计算间隔与坐标点
intervel
,
slots
=
time_format
.
time_pick_transf
(
date_start
,
date_end
)
# 获取监测点
point_id
=
body
.
pid
if
not
point_id
or
point_id
<=
0
:
raise
Exception
(
'point_error point_id:{}'
.
format
(
point_id
))
# 获取location点
location_group
=
body
.
location_ids
if
not
location_group
:
raise
Exception
(
'in_groups is NULL, no location_id'
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_chart_error '
+
str
(
e
))
return
FineMonitorChartResp
.
param_error
()
# 获取location表的信息
try
:
location_info
=
await
get_location_by_ids
(
location_group
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_chart_error '
+
e
)
return
FineMonitorChartResp
.
db_error
()
# 获取温度及漏电流数据
temperature_list
,
residual_currents_list
=
await
get_adio_chart_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
,
intervel
,
slots
)
# 电力数据 power_list、电流曲线、电压曲线
power_list
,
i_list
,
v_list
,
ctnum
=
await
get_point_chart_data
(
point_id
,
start_timestamp
,
end_timestamp
,
\
intervel
,
slots
)
# 获取温度与漏电流的曲线数据
# 获取用电数据
return
FineMonitorChartResp
(
time_slots
=
slots
,
temperature
=
temperature_list
,
residual_current
=
residual_currents_list
,
power
=
power_list
,
i
=
i_list
,
v
=
v_list
,
ctnum
=
ctnum
)
@
summary
(
"精细监测-指标统计"
)
async
def
post_fine_monitor_info
(
request
,
body
:
FineMonitorInfoReq
)
->
FineMonitorInfoResp
:
try
:
date_start
=
body
.
start
date_end
=
body
.
end
# 起始时间转化为时间戳
start_timestamp
=
time_format
.
get_date_timestamp
(
date_start
)
end_timestamp
=
time_format
.
get_date_timestamp
(
date_end
)
# 起始时间转化为es时间格式
es_start_dt
=
my_pendulum
.
from_format
(
date_start
,
"YYYY-MM-DD HH:mm:ss"
)
es_end_dt
=
my_pendulum
.
from_format
(
date_end
,
"YYYY-MM-DD HH:mm:ss"
)
# 获取监测点
point_id
=
body
.
pid
if
not
point_id
or
point_id
<=
0
:
raise
Exception
(
'point_error point_id:{}'
.
format
(
point_id
))
# 获取location点
location_group
=
body
.
location_ids
if
not
location_group
:
raise
Exception
(
'in_groups is NULL, no location_id'
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_info '
+
str
(
e
))
return
FineMonitorInfoResp
.
param_error
()
# 获取location表的信息
try
:
location_info
=
await
get_location_by_ids
(
location_group
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_chart_error '
+
e
)
return
FineMonitorChartResp
.
db_error
()
info_list
=
[]
# 环境相关数据
adio_list
=
await
get_adio_info_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
)
# 用电相关数据
point_list
=
await
get_point_info_data
(
point_id
,
es_start_dt
,
es_end_dt
)
info_list
.
extend
(
adio_list
)
info_list
.
extend
(
point_list
)
return
FineMonitorInfoResp
(
info_list
=
info_list
)
unify_api/modules/anshiu/views/scope_operations.py
0 → 100644
View file @
153e0c79
from
pot_libs.common.components.responses
import
Success
from
pot_libs.logger
import
log
from
pot_libs.sanic_api
import
summary
,
description
,
examples
from
pot_libs.utils.exc_util
import
BusinessException
from
unify_api.modules.anshiu.components.scope_operations_cps
import
\
ScopeListReq
,
ScopeListResp
,
GetScopeConfigReq
,
GetScopeConfigResp
,
\
SetScopeConfigReq
,
SetScopeConfigResp
,
\
scope_list_req_example
,
ScopeDetailsResp
,
ScopeDetailRep
,
\
set_scope_config_example
,
InitScopeConfigReq
,
FlushScopeEsDataReq
,
\
ScopeListDownloadReq
,
ScopeListDownloadResp
from
unify_api.modules.anshiu.service.scope_operations_serv
import
\
search_scope_service
,
scope_detail_data
,
get_scope_config_serv
,
\
set_scope_config_serv
,
init_scope_config
,
flush_scope_es_data
,
\
scope_list_download_data
@
summary
(
"识别记录-列表"
)
@
description
(
"列表的时候正常传页码,下载的时候is_download=1"
)
@
examples
(
scope_list_req_example
)
async
def
post_scope_list
(
request
,
body
:
ScopeListReq
)
->
ScopeListResp
:
'''
识别记录
'''
cid
=
body
.
cid
page_size
=
body
.
page_size
page_num
=
body
.
page_num
start
=
body
.
start
end
=
body
.
end
scope_g
=
body
.
scope_g
pids
=
body
.
pids
is_download
=
body
.
is_download
# 监测点选中全部
if
pids
and
pids
[
0
]
==
-
1
:
pids
=
[]
# 未选中监测点且未选中工厂提示错误
if
len
(
pids
)
==
0
and
not
cid
:
log
.
error
(
"post_scope_list_param_error pids:
%
s cid:
%
s"
%
(
pids
,
cid
))
return
ScopeListResp
.
error_param
()
if
page_num
*
page_size
>
30000
:
log
.
error
(
f
"post_scope_list_param_error page_too_large page_num"
f
":{page_num},page_size:{page_size}"
)
raise
BusinessException
(
message
=
'只能查询前
%
s条数据,建议缩小查询范围'
%
30000
)
# 下载(限制最大10000条)
if
is_download
==
1
:
page_num
,
page_size
=
1
,
10000
# 替换scope_g
if
scope_g
:
scope_g
=
[
'200ms'
if
i
==
'0.2s'
else
i
for
i
in
scope_g
]
rows
,
total
=
await
search_scope_service
(
pids
,
cid
,
page_num
,
page_size
,
start
,
end
,
scope_g
)
return
ScopeListResp
(
rows
=
rows
,
total
=
total
,
page_num
=
page_num
)
@
summary
(
"2s录波数据下载"
)
async
def
post_scope_list_download
(
request
,
body
:
ScopeListDownloadReq
)
->
ScopeListDownloadResp
:
start
=
body
.
start
end
=
body
.
end
pids
=
body
.
pids
if
len
(
pids
)
>
1
:
raise
BusinessException
(
message
=
"只允许下载一个监测点的数据"
)
datas
=
await
scope_list_download_data
(
pids
,
start
,
end
)
return
ScopeListDownloadResp
(
rows
=
datas
)
@
summary
(
"识别记录-详情"
)
async
def
post_scope_detail
(
request
,
body
:
ScopeDetailRep
)
->
ScopeDetailsResp
:
'''
识别详情
'''
doc_id
=
body
.
id
return
await
scope_detail_data
(
doc_id
)
@
summary
(
"识别设置-获取配置信息"
)
async
def
post_get_scope_config
(
request
,
body
:
GetScopeConfigReq
)
->
GetScopeConfigResp
:
'''
识别设置-配置展示
'''
pid
=
body
.
pid
try
:
data
=
await
get_scope_config_serv
(
pid
)
except
Exception
as
e
:
log
.
error
(
'post_get_scope_config error:'
+
str
(
e
))
return
GetScopeConfigResp
.
server_error
()
return
GetScopeConfigResp
(
pid
=
pid
,
rows
=
data
)
@
summary
(
"识别设置-设置配置信息"
)
@
examples
(
set_scope_config_example
)
async
def
post_set_scope_config
(
request
,
body
:
SetScopeConfigReq
)
->
SetScopeConfigResp
:
'''
识别设置-配置设置
'''
pid
=
body
.
pid
type
=
body
.
type
scope_type
=
body
.
scope_type
# 每一种类型需要的字段
fields
=
{
'state'
:
[
'state'
],
'i'
:
[
'imax'
,
'igap'
],
'v'
:
[
'umax'
,
'umin'
,
'ugap'
],
'residual_current'
:
[
'lcmax'
,
'lcgap'
],
'power'
:
[
'pttlmax'
,
'pttlgap'
],
'time'
:
[
'one_time'
,
'two_time'
,
'three_time'
]}
args
=
{}
for
field
in
fields
.
get
(
type
):
args
[
field
]
=
getattr
(
body
,
field
)
try
:
await
set_scope_config_serv
(
pid
,
type
,
scope_type
,
args
)
except
Exception
as
e
:
log
.
error
(
'post_set_scope_config'
+
str
(
e
))
return
SetScopeConfigResp
(
success
=
0
,
message
=
str
(
e
))
return
SetScopeConfigResp
(
success
=
1
,
message
=
'操作成功'
)
@
summary
(
"识别设置-初始化设备配置信息(开发专用!)"
)
async
def
post_init_scope_config
(
request
,
body
:
InitScopeConfigReq
)
->
Success
:
pids
=
body
.
pids
# user_id = request.ctx.user_id
# if user_id not in ['100653']:
# return Success(success=0, message='无此操作权限')
error_list
=
[]
for
pid
in
pids
:
try
:
await
init_scope_config
(
pid
)
except
Exception
as
e
:
log
.
error
(
f
'{pid}:post_init_scope_config error {str(e)}'
)
error_list
.
append
(
str
(
e
))
continue
if
error_list
:
log
.
error
(
f
"post_init_scope_config error:{','.join(error_list)}"
)
else
:
log
.
info
(
f
"post_init_scope_config success total_success_count:"
f
"{str(len(pids))}"
)
return
Success
(
success
=
1
,
message
=
','
.
join
(
error_list
))
@
summary
(
"刷新es录波数据(开发专用!)"
)
async
def
post_flush_scope_es_data
(
request
,
body
:
FlushScopeEsDataReq
)
->
\
Success
:
scope_type_list
=
body
.
scope_type_list
start_time
=
body
.
start_time
end_time
=
body
.
end_time
try
:
for
scope_g
in
scope_type_list
:
if
scope_g
not
in
[
'200ms'
,
'2s'
,
'0.25ms'
]:
continue
await
flush_scope_es_data
(
scope_g
,
start_time
,
end_time
)
except
Exception
as
e
:
log
.
error
(
f
'post_flush_scope_es_data error {str(e)}'
)
return
Success
(
success
=
0
,
message
=
str
(
e
))
return
Success
(
success
=
1
,
message
=
'操作成功'
)
unify_api/utils/time_format.py
View file @
153e0c79
...
@@ -1400,3 +1400,46 @@ def get_start_end_by_tz_time_new(time_str, from_format='YYYY-MM-DD',
...
@@ -1400,3 +1400,46 @@ def get_start_end_by_tz_time_new(time_str, from_format='YYYY-MM-DD',
start
=
date
.
start_of
(
"day"
)
.
format
(
to_format
)
start
=
date
.
start_of
(
"day"
)
.
format
(
to_format
)
end
=
date
.
end_of
(
"day"
)
.
format
(
to_format
)
end
=
date
.
end_of
(
"day"
)
.
format
(
to_format
)
return
start
,
end
return
start
,
end
def
get_time_duration
(
start_time
,
end_time
,
is_timestamp
=
True
,
is_need_trans
=
True
):
"""
根据开始、结束时间获取格式化的时间差
"""
if
is_timestamp
:
start_time
=
convert_timestamp_to_dt
(
start_time
)
end_time
=
convert_timestamp_to_dt
(
end_time
)
else
:
start_time
=
convert_to_dt
(
start_time
)
end_time
=
convert_to_dt
(
end_time
)
# 计算时间差
duration
=
end_time
-
start_time
# 如果不需要转换 则直接返回
duration_str
=
duration
.
seconds
+
duration
.
days
*
24
*
60
*
60
if
not
is_need_trans
:
return
duration_str
# 返回
return_str
=
get_time_duration_by_str
(
duration_str
)
return
return_str
def
get_time_duration_by_str
(
duration_str
):
"""
根据时间戳获取格式化的时间差
"""
return_str
=
''
days
=
int
(
duration_str
/
(
60
*
60
*
24
))
if
days
>
0
:
return_str
+=
"
%
s天"
%
str
(
days
)
hours
=
int
(
duration_str
%
(
60
*
60
*
24
)
/
(
60
*
60
))
if
hours
>
0
:
return_str
+=
"
%
s时"
%
str
(
hours
)
minutes
=
int
(
duration_str
%
(
60
*
60
*
24
)
%
(
60
*
60
)
/
60
)
if
minutes
>
0
:
return_str
+=
"
%
s分"
%
str
(
minutes
)
seconds
=
int
(
duration_str
%
(
60
*
60
*
24
)
%
(
60
*
60
)
%
60
)
if
seconds
>
0
:
return_str
+=
"
%
s秒"
%
str
(
seconds
)
return
return_str
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment