Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
0e8c8368
Commit
0e8c8368
authored
Apr 26, 2023
by
wang.wenrong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
anshiU
parent
ae588b55
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
148 additions
and
57 deletions
+148
-57
fine_monitor_dao.py
unify_api/modules/anshiu/dao/fine_monitor_dao.py
+28
-0
fine_monitor_pds.py
unify_api/modules/anshiu/procedures/fine_monitor_pds.py
+4
-17
fine_monitor_serv.py
unify_api/modules/anshiu/service/fine_monitor_serv.py
+6
-20
fine_monitor.py
unify_api/modules/anshiu/views/fine_monitor.py
+1
-1
electric_util.py
unify_api/modules/electric/procedures/electric_util.py
+3
-8
common_utils.py
unify_api/utils/common_utils.py
+19
-0
exc_util.py
unify_api/utils/exc_util.py
+32
-0
taos_new.py
unify_api/utils/taos_new.py
+37
-3
time_format.py
unify_api/utils/time_format.py
+18
-8
No files found.
unify_api/modules/anshiu/dao/fine_monitor_dao.py
0 → 100644
View file @
0e8c8368
from
pot_libs.settings
import
SETTING
from
unify_api.utils.common_utils
import
make_tdengine_data_as_list
from
unify_api.utils.taos_new
import
get_td_table_name
,
get_td_engine_data
from
unify_api.utils.exc_util
import
BusinessException
async
def
get_aiao_1min_data
(
monitor_info
,
start_time
,
end_time
,
su_table
):
"""
:param monitor_info: pass
:param start_time: 开始时间
:param end_time: 结束时间
:param su_table: 超级表名
:return select_val对应在数据库中对应的值
"""
mtid
=
monitor_info
[
"mtid"
]
td_mt_table
=
get_td_table_name
(
su_table
,
mtid
)
url
=
f
"{SETTING.stb_url}db_adio"
# td的精度过高,采用 >= start and < end的形式查询
sql
=
f
" select temp1, temp2, temp3, temp4 from {su_table} "
\
f
" where TBNAME = '{td_mt_table}' "
\
f
" and ts >= '{start_time}' AND ts <'{end_time}' "
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
not
is_succ
:
raise
BusinessException
()
td_datas
=
make_tdengine_data_as_list
(
results
)
if
not
td_datas
:
return
""
return
td_datas
[
0
]
unify_api/modules/anshiu/procedures/fine_monitor_pds.py
View file @
0e8c8368
...
@@ -2,6 +2,7 @@ from pot_libs.logger import log
...
@@ -2,6 +2,7 @@ from pot_libs.logger import log
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.es_util.es_utils
import
EsUtil
from
unify_api
import
constants
from
unify_api
import
constants
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_aiao_1min_data
async
def
get_location_by_ids
(
location_ids
):
async
def
get_location_by_ids
(
location_ids
):
...
@@ -51,20 +52,6 @@ async def get_es_point_15min_data(query_body):
...
@@ -51,20 +52,6 @@ async def get_es_point_15min_data(query_body):
return
es_results
.
get
(
"aggregations"
,
{})
return
es_results
.
get
(
"aggregations"
,
{})
async
def
get_es_aiao_1min_data
(
query_body
,
start
):
'''
从es中获取环境相关数据(1min)
'''
async
with
EsUtil
()
as
es
:
# 环境相关qmin的数据需要分表查询
p_database
=
"poweriot_location_1min_aiao_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
p_database
)
return
es_results
.
get
(
"hits"
,
{})
.
get
(
"hits"
,
{})
async
def
get_es_point_1min_data
(
query_body
,
start
):
async
def
get_es_point_1min_data
(
query_body
,
start
):
'''
'''
从es中获取电气相关数据(1min)
从es中获取电气相关数据(1min)
...
...
unify_api/modules/anshiu/service/fine_monitor_serv.py
View file @
0e8c8368
...
@@ -7,6 +7,7 @@ from pot_libs.es_util.es_query import EsQuery
...
@@ -7,6 +7,7 @@ from pot_libs.es_util.es_query import EsQuery
from
pot_libs.logger
import
log
from
pot_libs.logger
import
log
from
pot_libs.common.components.query
import
PageRequest
from
pot_libs.common.components.query
import
PageRequest
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_aiao_1min_data
from
unify_api.utils
import
time_format
from
unify_api.utils
import
time_format
from
unify_api.modules.electric.procedures.electric_util
import
(
from
unify_api.modules.electric.procedures.electric_util
import
(
get_wiring_type
get_wiring_type
...
@@ -15,7 +16,7 @@ from unify_api.modules.anshiu.components.fine_monitor_cps import (
...
@@ -15,7 +16,7 @@ from unify_api.modules.anshiu.components.fine_monitor_cps import (
Statistics
,
Chart
Statistics
,
Chart
)
)
from
unify_api.modules.anshiu.procedures.fine_monitor_pds
import
(
from
unify_api.modules.anshiu.procedures.fine_monitor_pds
import
(
get_es_aiao_1min_data
,
get_es_point_1min_data
,
get_es_point_15min_data
,
get_es_point_1min_data
,
get_es_point_15min_data
,
get_es_aiao_15min_data
,
get_threshold_by_location
get_es_aiao_15min_data
,
get_threshold_by_location
)
)
from
unify_api.utils.time_format
import
convert_timestamp_to_str
from
unify_api.utils.time_format
import
convert_timestamp_to_str
...
@@ -25,19 +26,12 @@ async def get_adio_chart_data(location_group, location_info,
...
@@ -25,19 +26,12 @@ async def get_adio_chart_data(location_group, location_info,
start_timestamp
,
start_timestamp
,
end_timestamp
,
intervel
,
slots
):
end_timestamp
,
intervel
,
slots
):
'''
'''
获取环境(温度与漏电流)的曲线数据
todo:es获取liudianliu
获取环境(温度与漏电流)的曲线数据
'''
'''
# 工况标准,取其中一个漏电流阈值
# 工况标准,取其中一个漏电流阈值
residual_current_threhold
=
await
get_threshold_by_location
(
residual_current_threhold
=
await
get_threshold_by_location
(
location_group
)
location_group
)
# 组装es请求信息
range
=
Range
(
field
=
"time"
,
start
=
start_timestamp
,
end
=
end_timestamp
)
in_group
=
InGroup
(
field
=
"location_id"
,
group
=
location_group
)
filter
=
Filter
(
equals
=
[],
ranges
=
[
range
],
in_groups
=
[
in_group
],
keywords
=
[])
if
intervel
>
60
:
if
intervel
>
60
:
# 取时间间隔为15min的数据
# 取时间间隔为15min的数据
temp
,
res
=
await
get_adio_15min_chart_data
(
temp
,
res
=
await
get_adio_15min_chart_data
(
...
@@ -50,7 +44,6 @@ async def get_adio_chart_data(location_group, location_info,
...
@@ -50,7 +44,6 @@ async def get_adio_chart_data(location_group, location_info,
else
:
else
:
# 取时间间隔为1min的数据
# 取时间间隔为1min的数据
temp
,
res
=
await
get_adio_1min_chart_data
(
temp
,
res
=
await
get_adio_1min_chart_data
(
filter
,
residual_current_threhold
,
residual_current_threhold
,
location_info
,
location_info
,
slots
,
slots
,
...
@@ -108,8 +101,7 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
...
@@ -108,8 +101,7 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
date_type
=
date_type
,
date_type
=
date_type
,
)
)
# 取最大值
# 取最大值
his_data
=
[
_data
[
slot
]
.
get
(
'max'
,
""
)
if
slot
in
_data
else
""
for
his_data
=
[
_data
[
slot
]
.
get
(
'max'
,
""
)
if
slot
in
_data
else
""
for
slot
in
slots
]
slot
in
slots
]
adio_his
=
Chart
(
item
=
''
,
value_slots
=
his_data
)
adio_his
=
Chart
(
item
=
''
,
value_slots
=
his_data
)
if
item_info
.
get
(
"type"
)
==
"residual_current"
:
if
item_info
.
get
(
"type"
)
==
"residual_current"
:
...
@@ -123,18 +115,12 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
...
@@ -123,18 +115,12 @@ async def get_adio_15min_chart_data(filter, residual_current_threhold,
return
temperature_list
,
residual_currents_list
return
temperature_list
,
residual_currents_list
async
def
get_adio_1min_chart_data
(
filter
,
residual_current_threhold
,
async
def
get_adio_1min_chart_data
(
residual_current_threhold
,
location_info
,
slots
,
start_timestamp
):
location_info
,
slots
,
start_timestamp
):
'''
'''
获取1min环境的曲线数据
获取1min环境的曲线数据
'''
'''
sort
=
Sort
(
field
=
'location_id'
,
direction
=
'asc'
)
data
=
await
get_aiao_1min_data
(
monitor_info
,
start_time
,
end_time
,
su_table
)
page_request
=
PageRequest
(
page_size
=
10000
,
page_num
=
1
,
sort
=
sort
,
filter
=
filter
)
query_body
=
EsQuery
.
query
(
page_request
)
es_time
=
convert_timestamp_to_str
(
start_timestamp
)
es_res
=
await
get_es_aiao_1min_data
(
query_body
,
es_time
)
# 温度及漏电流
# 温度及漏电流
temperature_list
=
[]
temperature_list
=
[]
...
...
unify_api/modules/anshiu/views/fine_monitor.py
View file @
0e8c8368
...
@@ -50,7 +50,7 @@ async def post_fine_monitor_chart(request,
...
@@ -50,7 +50,7 @@ async def post_fine_monitor_chart(request,
# 获取温度及漏电流数据
# 获取温度及漏电流数据
temperature_list
,
residual_currents_list
=
await
get_adio_chart_data
(
temperature_list
,
residual_currents_list
=
await
get_adio_chart_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
,
location_group
,
location_info
,
date_start
,
date_end
,
intervel
,
slots
)
intervel
,
slots
)
# 电力数据 power_list、电流曲线、电压曲线
# 电力数据 power_list、电流曲线、电压曲线
...
...
unify_api/modules/electric/procedures/electric_util.py
View file @
0e8c8368
...
@@ -17,17 +17,12 @@ async def get_wiring_type(point_id):
...
@@ -17,17 +17,12 @@ async def get_wiring_type(point_id):
if
not
point_id
:
if
not
point_id
:
log
.
error
(
"para error point_id:
%
s exception"
%
point_id
)
log
.
error
(
"para error point_id:
%
s exception"
%
point_id
)
return
ctnum
,
mtid
return
ctnum
,
mtid
sql
=
"SELECT mtid, ctnum FROM point WHERE pid=
%
s "
# 根据point_id去change_meter_record查询最新的mid
sql
=
"SELECT mtid FROM point WHERE pid=
%
s "
async
with
MysqlUtil
()
as
conn
:
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchone
(
sql
,
args
=
(
point_id
,))
result
=
await
conn
.
fetchone
(
sql
,
args
=
(
point_id
,))
if
result
:
if
result
:
mtid
=
result
.
get
(
"mtid"
)
mtid
=
result
.
get
(
"mtid"
,
None
)
ctnum
=
result
.
get
(
"ctnum"
,
None
)
if
not
mtid
:
return
ctnum
,
mtid
return
ctnum
,
mtid
return
ctnum
,
mtid
...
...
unify_api/utils/common_utils.py
View file @
0e8c8368
...
@@ -161,3 +161,22 @@ def correlation(l1, l2):
...
@@ -161,3 +161,22 @@ def correlation(l1, l2):
else
:
else
:
log
.
info
(
f
"求{l1}, {l2}相关系数{r}, 超出判断范围"
)
log
.
info
(
f
"求{l1}, {l2}相关系数{r}, 超出判断范围"
)
return
""
,
""
return
""
,
""
def
make_tdengine_data_as_list
(
tdengine_data
):
"""
将tdengine查询到的数据装换成 列表形式 列表中的数据为字典形式 {字段-字段值、}
适合处理 多表last_row数据
:param tdengine_data: {
"column_meta":[["key1", type, len],["key2", type, len],["key3", type, len]],
"data":[["val1","val2","id1"], ["val3","val4,"id2""]], ....}
:return: {
{"key1":"val1", "key2":"val2"}, {"key1":"val3", "key2":"val4"}
]
"""
head
=
[
re
.
findall
(
r'last_row\((.*)\)'
,
meta
[
0
])[
0
]
if
"("
in
meta
[
0
]
else
meta
[
0
]
for
meta
in
tdengine_data
[
"column_meta"
]]
result
=
[]
for
res
in
tdengine_data
[
"data"
]:
result
.
append
(
dict
(
zip
(
head
,
res
)))
return
result
unify_api/utils/exc_util.py
0 → 100644
View file @
0e8c8368
# -*- coding:utf-8 -*-
class
DBException
(
Exception
):
"""include all type db exception"""
status_code
=
50001
message
=
"数据库操作失败!"
def
__init__
(
self
,
*
args
,
status_code
=
50001
,
message
=
"数据库操作失败!"
,
**
kwargs
):
self
.
status_code
=
status_code
self
.
message
=
message
class
ParamException
(
Exception
):
"""include all type param exception"""
status_code
=
50003
message
=
"参数错误!"
def
__init__
(
self
,
*
args
,
status_code
=
50003
,
message
=
"参数错误!"
,
**
kwargs
):
self
.
status_code
=
status_code
self
.
message
=
message
class
BusinessException
(
Exception
):
"""
业务异常, 比如装置点被拆了mid为None这种情况, 但是在页面端point_id又可选择
"""
status_code
=
50007
message
=
"业务错误!"
def
__init__
(
self
,
*
args
,
status_code
=
50007
,
message
=
"业务错误!"
,
**
kwargs
):
self
.
status_code
=
status_code
self
.
message
=
message
unify_api/utils/taos_new.py
View file @
0e8c8368
...
@@ -10,16 +10,18 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
...
@@ -10,16 +10,18 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
async
def
get_td_engine_data
(
url
,
sql
):
async
def
get_td_engine_data
(
url
,
sql
):
if
"?"
not
in
url
:
url
+=
"?tz=Asia/Shanghai"
token
=
get_token
()
token
=
get_token
()
log
.
info
(
f
"token:{token},sql:{sql}"
)
log
.
info
(
f
"token:{token},sql:{sql}"
)
resp_str
,
status
=
await
AioHttpUtils
()
.
post_data
(
resp_str
,
status
=
await
AioHttpUtils
()
.
post_data
(
url
,
data
=
sql
,
timeout
=
50
,
url
,
data
=
sql
,
timeout
=
50
,
headers
=
{
"Authorization"
:
f
"Basic {token}"
}
headers
=
{
"Authorization"
:
f
"Basic {token}"
}
)
)
results
=
json
.
loads
(
resp_str
)
log
.
info
(
f
"resp_str:{resp_str},status:{status}"
)
log
.
info
(
f
"resp_str:{resp_str},status:{status}"
)
if
status
!=
20
0
:
if
results
[
"code"
]
!=
0
:
return
False
,
None
return
False
,
None
results
=
json
.
loads
(
resp_str
)
return
True
,
results
return
True
,
results
...
@@ -124,3 +126,35 @@ def parse_td_columns(rsp_data):
...
@@ -124,3 +126,35 @@ def parse_td_columns(rsp_data):
tbl_field
=
r
[
0
]
if
r
else
col
[
0
]
tbl_field
=
r
[
0
]
if
r
else
col
[
0
]
head
.
append
(
tbl_field
)
head
.
append
(
tbl_field
)
return
head
return
head
def
get_td_table_name
(
topic
,
id
):
"""
:param topic: 需要查询的主题
:param id: 表命名使用的id
:return:
"""
topic_map
=
{
"water"
:
"water_ele_electric_ops
%
s"
,
"electric"
:
"mt
%
s_ele_electric_ops"
,
"pv_ele"
:
"pv_ele
%
s"
,
"pv_sts"
:
"pv_sts
%
s"
,
"ws"
:
"ws
%
s"
,
"adi"
:
"mt
%
s_adi_electric_ops"
,
"pt_temp"
:
"pt_temp
%
s_electric_ops"
,
# 电容
"ent_guard_sts_stb"
:
"ent_guard_sts
%
s"
,
# 门控
"ent_guard_soe_stb"
:
"ent_guard_soe
%
s"
,
# 门控报警
"indoor_temp_stb"
:
"indoor_temp
%
s"
,
# 室内温度
"indoor_temp_soe_stb"
:
"indoor_temp_soe
%
s"
,
# 室内温度报警
"indoor_hum_stb"
:
"indoor_hum
%
s"
,
# 室内湿度
"indoor_hum_soe_stb"
:
"indoor_hum_soe
%
s"
,
# 室内湿度报警
"water_fld_sts_stb"
:
"water_fld_sts
%
s"
,
# 水浸
"water_fld_soe_stb"
:
"water_fld_soe
%
s"
,
# 水浸报警
"smoke_sts_stb"
:
"smoke_sts_stb
%
s"
,
# 烟感
"smoke_soe_stb"
:
"smoke_soe_stb
%
s"
,
# 烟感报警
"old_adio_stb"
:
"mt
%
s_adi_electric_ops"
,
# 安电--旧
"new_adio_stb"
:
"mt
%
s_adi"
,
# 安电——新
}
table_name
=
topic_map
.
get
(
topic
)
return
table_name
%
id
unify_api/utils/time_format.py
View file @
0e8c8368
...
@@ -25,22 +25,32 @@ def time_pick_transf(start, end, is_range=0):
...
@@ -25,22 +25,32 @@ def time_pick_transf(start, end, is_range=0):
# diff = (end_time - start_time).seconds
# diff = (end_time - start_time).seconds
diff
=
end_f
.
int_timestamp
-
start_f
.
int_timestamp
diff
=
end_f
.
int_timestamp
-
start_f
.
int_timestamp
# 1. 计算intervel
# 1. 计算intervel
# 1.1 区间48小时之内, 返回15min
# 1.1 区间3小时之内, 返回1min
if
diff
<=
48
*
3600
:
if
diff
<=
3
*
3600
:
intervel
=
60
# 1.2 区间48小时之内, 返回15min
elif
diff
<=
48
*
3600
:
intervel
=
15
*
60
intervel
=
15
*
60
# 1.
2
区间在60天以内, 返回1day
# 1.
3
区间在60天以内, 返回1day
elif
48
*
3600
<
diff
<=
60
*
86400
:
elif
diff
<=
60
*
86400
:
intervel
=
86400
intervel
=
86400
# 1.
3
选择年, 返回1个月
# 1.
4
选择年, 返回1个月
else
:
else
:
intervel
=
30
*
86400
intervel
=
30
*
86400
# 2. 计算slots
# 2. 计算slots
# 2.1 取到点的个数, 比如15min的96个点
# 2.1 取到点的个数, 比如15min的96个点
slots
=
[]
slots
=
[]
slot_num
=
round
((
end_f
.
int_timestamp
-
start_f
.
int_timestamp
)
/
intervel
)
slot_num
=
round
(
(
end_f
.
int_timestamp
-
start_f
.
int_timestamp
)
/
intervel
)
for
i
in
range
(
slot_num
):
for
i
in
range
(
slot_num
):
# 区间48小时之内
# 区间3小时之内
if
diff
<
24
*
3600
:
if
diff
<=
3
*
3600
:
dt
=
start_f
.
add
(
minutes
=
1
*
i
)
.
format
(
"YYYY-MM-DD HH:mm"
)
dt_str
=
str
(
dt
)
.
split
()[
1
]
if
is_range
:
dt_str
=
str
(
dt
)
# 区间24小时之内
elif
diff
<
24
*
3600
:
dt
=
start_f
.
add
(
minutes
=
15
*
i
)
.
format
(
"YYYY-MM-DD HH:mm"
)
dt
=
start_f
.
add
(
minutes
=
15
*
i
)
.
format
(
"YYYY-MM-DD HH:mm"
)
dt_str
=
str
(
dt
)
.
split
()[
1
]
dt_str
=
str
(
dt
)
.
split
()[
1
]
if
is_range
:
if
is_range
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment