Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
438e22c3
Commit
438e22c3
authored
May 11, 2023
by
wang.wenrong
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
del_es_dao
parent
901ff9b5
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
81 additions
and
700 deletions
+81
-700
fine_monitor_dao.py
unify_api/modules/anshiu/dao/fine_monitor_dao.py
+14
-0
fine_monitor_serv.py
unify_api/modules/anshiu/service/fine_monitor_serv.py
+0
-196
fine_monitor.py
unify_api/modules/anshiu/views/fine_monitor.py
+5
-47
scope_operations.py
unify_api/modules/anshiu/views/scope_operations.py
+3
-1
common_dao.py
unify_api/modules/common/dao/common_dao.py
+9
-1
common_es_dao.py
unify_api/modules/common/dao/common_es_dao.py
+0
-395
drop_dust_pds.py
unify_api/modules/tsp_water/procedures/drop_dust_pds.py
+0
-2
drop_dust_service.py
unify_api/modules/tsp_water/service/drop_dust_service.py
+2
-3
data_es_dao.py
unify_api/modules/zhiwei_u/dao/data_es_dao.py
+48
-55
No files found.
unify_api/modules/anshiu/dao/fine_monitor_dao.py
View file @
438e22c3
...
...
@@ -190,6 +190,20 @@ async def get_mtid_by_pid_dao(pid):
return
data
async
def
get_mtids_by_pids_dao
(
pid
):
sql
=
f
"""
SELECT
mtid
FROM
point
WHERE
pid in
%
s
"""
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
pid
,))
return
data
async
def
get_point_monitor_dao
(
id_value
,
field
=
"m.mtid"
):
sql
=
f
"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc,"
\
f
"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid "
\
...
...
unify_api/modules/anshiu/service/fine_monitor_serv.py
View file @
438e22c3
...
...
@@ -112,202 +112,6 @@ async def get_point_chart_data(point_id, date_start, date_end, intervel,
return
power
,
i
,
u
,
ctnum
async
def
get_adio_info_data
(
location_group
,
location_info
,
start_timestamp
,
end_timestamp
):
'''
获取环境相关数据
'''
range
=
Range
(
field
=
"time"
,
start
=
start_timestamp
,
end
=
end_timestamp
)
# 分别统计各个location温度最大值、最小值、平均值
stats_info
=
{}
for
location_id
in
location_group
:
equal
=
Equal
(
field
=
"location_id"
,
value
=
location_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
query_body
=
EsQuery
.
aggr_index
(
page_request
,
stats_items
=
[
"value_max"
,
"value_min"
,
"value_avg"
]
)
aggregations
=
await
get_es_aiao_15min_data
(
query_body
)
# 最大值, 这里叫法有点奇怪,但是最大值应该取15min的最大值聚合结果
max_info
=
aggregations
.
get
(
"value_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max
=
source
.
get
(
"value_max"
,
0
)
max
=
round
(
max
,
2
)
if
max
is
not
None
else
""
max_ts
=
source
.
get
(
"value_max_time"
,
0
)
max_value_time
=
str
(
time_format
.
convert_to_dt
(
max_ts
))
else
:
max
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
"value_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min
=
source
.
get
(
"value_min"
,
0
)
min
=
round
(
min
,
2
)
if
min
is
not
None
else
""
min_ts
=
source
.
get
(
"value_min_time"
,
0
)
min_value_time
=
str
(
time_format
.
convert_to_dt
(
min_ts
))
else
:
min
=
""
min_value_time
=
""
avg
=
aggregations
.
get
(
"value_avg_avg"
,
{})
.
get
(
"value"
)
avg
=
round
(
avg
,
2
)
if
avg
is
not
None
else
""
stats_info
[
location_id
]
=
{
"max"
:
{
"value"
:
max
,
"time"
:
max_value_time
},
"min"
:
{
"value"
:
min
,
"time"
:
min_value_time
},
"avg"
:
avg
,
}
# 返回
adio_indexes
=
[]
for
location_id
,
info
in
location_info
.
items
():
item
,
type
=
info
[
"item"
],
info
[
"type"
]
# 漏电流的item更改一下
item
=
'漏电流'
if
type
==
'residual_current'
else
item
_info
=
stats_info
[
location_id
]
adio_index
=
Statistics
(
type
=
type
,
item
=
item
,
max
=
_info
[
"max"
][
"value"
],
max_time
=
_info
[
"max"
][
"time"
],
min
=
_info
[
"min"
][
"value"
],
min_time
=
_info
[
"min"
][
"time"
],
avg
=
_info
[
"avg"
],
)
adio_indexes
.
append
(
adio_index
)
return
adio_indexes
async
def
get_point_info_data
(
point_id
,
start_time
,
end_time
):
# 2. 获取几表法
ctnum
,
_
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum, 装置点已经被拆!"
)
# 给默认值3表法
ctnum
=
3
range
=
Range
(
field
=
"quarter_time"
,
start
=
start_time
,
end
=
end_time
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if
ctnum
==
2
:
stats_items
=
[
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"uab_mean"
,
"uab_min"
,
"uab_max"
,
"ucb_mean"
,
"ucb_min"
,
"ucb_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
]
else
:
stats_items
=
[
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"ua_mean"
,
"ua_min"
,
"ua_max"
,
"ub_mean"
,
"ub_min"
,
"ub_max"
,
"uc_mean"
,
"uc_min"
,
"uc_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ib_mean"
,
"ib_min"
,
"ib_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
]
query_body
=
EsQuery
.
aggr_index
(
page_request
,
stats_items
=
stats_items
)
aggregations
=
await
get_es_point_15min_data
(
query_body
)
# 常规参数统计
common_indexes
=
[]
_stats_items
=
{
i
.
rsplit
(
"_"
,
1
)[
0
]
for
i
in
stats_items
}
for
item
in
_stats_items
:
# 最大值
max_info
=
aggregations
.
get
(
f
"{item}_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max_value
=
source
.
get
(
f
"{item}_max"
,
""
)
max_dt
=
source
.
get
(
f
"{item}_max_time"
)
if
max_dt
is
None
:
log
.
error
(
f
"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}"
)
max_value_time
=
str
(
time_format
.
convert_to_dt
(
max_dt
)
if
max_dt
else
""
)
else
:
max_value
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
f
"{item}_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min_value
=
source
.
get
(
f
"{item}_min"
)
min_value
=
min_value
if
min_value
is
not
None
else
""
min_dt
=
source
.
get
(
f
"{item}_min_time"
)
min_value_time
=
str
(
time_format
.
convert_to_dt
(
min_dt
)
if
min_dt
else
""
)
else
:
min_value
=
""
min_value_time
=
""
# 平均值
avg
=
aggregations
.
get
(
f
"{item}_mean_avg"
,
{})
.
get
(
"value"
)
avg
=
round
(
avg
,
2
)
if
avg
is
not
None
else
""
elec_index
=
Statistics
(
item
=
item
,
max
=
max_value
,
max_time
=
max_value_time
,
min
=
min_value
,
min_time
=
min_value_time
,
avg
=
avg
,
)
common_indexes
.
append
(
elec_index
)
return
common_indexes
async
def
electric_index_list_service
(
mtid
,
start_time
,
end_time
,
param_types
=
None
):
"""
...
...
unify_api/modules/anshiu/views/fine_monitor.py
View file @
438e22c3
from
pot_libs.sanic_api
import
summary
,
examples
from
pot_libs.common.components.query
import
PageRequest
from
pot_libs.sanic_api
import
summary
from
pot_libs.logger
import
log
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_mtid_by_pid_dao
from
unify_api.utils.request_util
import
filed_value_from_list
from
unify_api.utils
import
time_format
from
unify_api.modules.anshiu.components.fine_monitor_cps
import
(
FineMonitorChartReq
,
FineMonitorInfoReq
,
FineMonitorChartResp
,
FineMonitorInfoResp
,
ElectricIndexListResp
FineMonitorInfoResp
)
from
unify_api.modules.anshiu.procedures.fine_monitor_pds
import
(
get_
location_by_ids
,
get_threshold_by_location
,
get_
mtid_by_location_ids
get_mtid_by_location_ids
)
from
unify_api.modules.anshiu.service.fine_monitor_serv
import
(
get_adio_chart_data
,
get_point_chart_data
,
get_adio_info_data
,
get_point_info_data
,
electric_index_list_service
get_adio_chart_data
,
get_point_chart_data
,
electric_index_list_service
)
...
...
@@ -70,45 +67,6 @@ async def post_fine_monitor_chart(request,
ctnum
=
ctnum
)
@
summary
(
"精细监测-指标统计1"
)
async
def
post_fine_monitor_info1
(
request
,
body
:
FineMonitorInfoReq
)
->
FineMonitorInfoResp
:
try
:
date_start
=
body
.
start
date_end
=
body
.
end
# 获取监测点
point_id
=
body
.
pid
if
not
point_id
or
point_id
<=
0
:
raise
Exception
(
'point_error point_id:{}'
.
format
(
point_id
))
# 获取location点
location_group
=
body
.
location_ids
if
not
location_group
:
raise
Exception
(
'in_groups is NULL, no location_id'
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_info '
+
str
(
e
))
return
FineMonitorInfoResp
.
param_error
()
# 获取location表的信息
try
:
location_info
=
await
get_location_by_ids
(
location_group
)
except
Exception
as
e
:
log
.
error
(
'get_fine_monitor_chart_error '
+
e
)
return
FineMonitorChartResp
.
db_error
()
info_list
=
[]
# 环境相关数据
adio_list
=
await
get_adio_info_data
(
location_group
,
location_info
,
date_start
,
date_end
)
# 用电相关数据
point_list
=
await
get_point_info_data
(
point_id
,
date_start
,
date_end
)
info_list
.
extend
(
adio_list
)
info_list
.
extend
(
point_list
)
return
FineMonitorInfoResp
(
info_list
=
info_list
)
@
summary
(
"精细监测-指标统计"
)
async
def
post_fine_monitor_info
(
request
,
body
:
FineMonitorInfoReq
)
->
FineMonitorInfoResp
:
...
...
unify_api/modules/anshiu/views/scope_operations.py
View file @
438e22c3
...
...
@@ -47,7 +47,9 @@ async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp:
# 替换scope_g
if
scope_g
:
scope_g
=
[
'200ms'
if
i
==
'0.2s'
else
i
for
i
in
scope_g
]
rows
,
total
=
await
search_scope_service
(
pids
,
cid
,
page_num
,
page_size
,
rows
,
total
=
await
search_scope_service
(
pids
,
cid
,
(
page_num
-
1
)
*
page_size
,
page_size
,
start
,
end
,
scope_g
)
return
ScopeListResp
(
rows
=
rows
,
total
=
total
,
page_num
=
page_num
)
...
...
unify_api/modules/common/dao/common_dao.py
View file @
438e22c3
...
...
@@ -327,7 +327,7 @@ async def get_all_username():
async
def
monitor_by_mtid
(
mtid
):
sql
=
"select * from monitor where mtid =
%
s "
async
with
MysqlUtil
()
as
conn
:
monitor_dic
=
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,
))
monitor_dic
=
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,))
return
monitor_dic
...
...
@@ -357,3 +357,11 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchone
(
sql
,
(
mtid
,))
return
result
async
def
sql_point_15min_index_new15
(
start
,
end
,
pid
):
sql
=
f
"SELECT pttl_mean, create_time FROM `point_15min_electric` "
\
f
"where pid=
%
s and create_time BETWEEN '{start}' and '{end}'"
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
pid
,))
return
datas
unify_api/modules/common/dao/common_es_dao.py
deleted
100644 → 0
View file @
901ff9b5
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.logger
import
log
from
unify_api.utils.time_format
import
convert_es_str
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
import
json
from
pot_libs.aredis_util.aredis_utils
import
RedisUtils
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.logger
import
log
point_15min_index
=
"poweriot_point_15min_index"
async
def
query_point_15min_index
(
date_start
,
date_end
,
point_id
):
"""point点某一天96个点数据"""
start_es
=
convert_es_str
(
date_start
)
end_es
=
convert_es_str
(
date_end
)
query_body
=
{
"size"
:
100
,
"_source"
:
[
"pttl_mean"
,
"quarter_time"
],
"query"
:
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"pid"
:
point_id
}
},
{
"range"
:
{
"quarter_time"
:
{
"gte"
:
start_es
,
"lte"
:
end_es
}
}
}
]
}
}
}
log
.
info
(
query_body
)
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search
(
body
=
query_body
,
index
=
point_15min_index
)
return
es_re
async
def
query_point_15min_index_aggs_pid
(
date_start
,
date_end
,
point_list
):
"""
1. 根据pid聚合
2. 根据15分钟聚合
3. 拿到hits数据
"""
start_es
=
convert_es_str
(
date_start
)
end_es
=
convert_es_str
(
date_end
)
query_body
=
{
"size"
:
0
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"pid"
:
point_list
}
},
{
"range"
:
{
"quarter_time"
:
{
"gte"
:
start_es
,
"lte"
:
end_es
}
}
}
]
}
},
"aggs"
:
{
"points"
:
{
"terms"
:
{
"field"
:
"pid"
,
"size"
:
1000
},
"aggs"
:
{
"quarter_time"
:
{
"date_histogram"
:
{
"field"
:
"quarter_time"
,
"interval"
:
"15m"
,
"time_zone"
:
"+08:00"
,
"format"
:
"yyyy-MM-dd HH:mm:ss"
},
"aggs"
:
{
"pttl_mean"
:
{
"top_hits"
:
{
"size"
:
1
,
"_source"
:
[
"pttl_mean"
,
"quarter_time"
]
}
}
}
}
}
}
}
}
log
.
info
(
query_body
)
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
point_15min_index
)
return
es_re
[
"aggregations"
][
"points"
][
"buckets"
]
async
def
sql_point_15min_index_new15
(
start
,
end
,
pid
):
sql
=
f
"SELECT pttl_mean, create_time FROM `point_15min_electric` "
\
f
"where pid=
%
s and create_time BETWEEN '{start}' and '{end}'"
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
pid
,))
return
datas
async
def
company_by_cids
(
cids
):
"""根据cids查询company信息"""
sql
=
"SELECT * from company where cid in
%
s"
async
with
MysqlUtil
()
as
conn
:
company_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
company_list
async
def
point_by_points
(
point_list
):
sql
=
"SELECT * from point where pid in
%
s"
async
with
MysqlUtil
()
as
conn
:
point_info_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
point_list
),))
return
point_info_list
async
def
get_history_pids_by_mtids
(
cid
,
create_time
,
mtids
=
None
):
"""
获取截止到某个日期之前安装的监测点
:param cid:
:param create_time:
:param mtids:
:return:
"""
where
=
""
if
mtids
:
where
+=
f
" and m.mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql
=
f
"SELECT p.pid,m.mtid from point p left join monitor m on "
\
f
"p.mtid=m.mtid where m.demolished = 0 and p.cid =
%
s and "
\
f
"p.create_time <=
%
s {where}"
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
create_time
))
return
result
or
[]
async
def
point_by_mtids
(
mtids
):
sql
=
"SELECT pid, mtid from point where mtid IN
%
s"
async
with
MysqlUtil
()
as
conn
:
point_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
mtids
,))
return
point_list
async
def
points_by_cid
(
cids
,
m_type
=
None
):
"""根据cid查询points"""
type_sql
=
""
if
m_type
:
type_sql
=
f
"and m.m_type = {m_type}"
sql
=
f
"SELECT p.* FROM `monitor` m LEFT JOIN point p "
\
f
"on m.mtid=p.mtid WHERE p.cid in
%
s {type_sql}"
async
with
MysqlUtil
()
as
conn
:
points
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
points
async
def
points_monitor_by_cid
(
cids
,
m_type
=
None
):
type_sql
=
""
if
m_type
:
type_sql
=
f
"and m.m_type = {m_type}"
sql
=
"SELECT p.pid, m.* FROM `monitor` m LEFT JOIN point p "
\
f
"on m.mtid=p.mtid where m.cid in
%
s and m.demolished=0 {type_sql}"
async
with
MysqlUtil
()
as
conn
:
points
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
points
async
def
monitor_by_cid
(
cid
,
m_type
=
None
):
"""根据cid查询monitor"""
type_sql
=
""
if
m_type
:
type_sql
=
f
"and monitor.m_type = {m_type}"
sql
=
f
"SELECT * FROM monitor WHERE cid =
%
s and demolished = 0 {type_sql}"
async
with
MysqlUtil
()
as
conn
:
monitor_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_list
async
def
monitor_point_join
(
cid
):
"""monitor和point关联"""
sql
=
"SELECT m.mtid, p.pid, p.name, p.add_to_company FROM monitor m "
\
"inner join point p on m.mtid = p.mtid "
\
"WHERE m.cid =
%
s and m.demolished = 0"
async
with
MysqlUtil
()
as
conn
:
monitor_point_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_point_list
async
def
monitor_location_join
(
cid
):
"""monitor和location关联"""
sql
=
"SELECT m.mtid, l.id, l.item FROM monitor m "
\
"inner join location l on m.mtid = l.mtid "
\
"WHERE m.cid =
%
s and m.demolished = 0"
async
with
MysqlUtil
()
as
conn
:
monitor_location_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_location_list
async
def
company_model_by_cid
(
cid
):
"""根据cid查询company_model信息"""
sql
=
"SELECT * from company_model where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
company_model_dic
=
await
conn
.
fetchone
(
sql
,
args
=
(
cid
,))
return
company_model_dic
async
def
inline_zdu_all_by_cid
(
cid
):
"""根据cid查询inline_zdu信息"""
sql
=
"SELECT * from inline where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
inline_zdu_dic
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
inline_zdu_dic
async
def
company_extend_by_cid
(
cid
):
"""根据cids查询company信息"""
sql
=
"SELECT * from company_extend where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
company_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
company_list
async
def
user_by_user_id
(
user_id
,
is_delete
=
None
):
sql
=
"SELECT * FROM user where user_id =
%
s"
if
is_delete
is
not
None
:
sql
+=
f
' AND is_delete = {is_delete} '
async
with
MysqlUtil
()
as
conn
:
user_dic
=
await
conn
.
fetchone
(
sql
=
sql
,
args
=
(
user_id
,))
return
user_dic
async
def
user_by_phone_number
(
phone
,
is_delete
=
True
):
sql
=
"SELECT * FROM user where phone_number =
%
s"
if
is_delete
:
sql
+=
" and is_delete=0"
else
:
sql
+=
" order by is_delete asc"
async
with
MysqlUtil
()
as
conn
:
user_dic
=
await
conn
.
fetchone
(
sql
=
sql
,
args
=
(
phone
,))
return
user_dic
async
def
get_user_list_by_cid_auth
(
cid
,
auth
,
symbol
=
"="
):
"""
根据公司权限 获取用户列表
:param cid:
:param auth: 权限
:param symbol: 符号j
:return:
"""
sql
=
f
"""
select distinct(u.user_id) user_id from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid =
%
s and u.auth {symbol}
%
s
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
auth
))
return
[
user
.
get
(
"user_id"
)
for
user
in
result
]
if
result
else
[]
async
def
get_user_info_list_by_cid_auth
(
cid
,
auth
,
symbol
=
"="
):
"""
根据公司权限 获取用户列表 有可能会有重复的user_info
:param cid:
:param auth: 权限
:param symbol: 比较符号 >, <, =......
:return:
"""
sql
=
f
"""
select u.user_id, u.name from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid =
%
s and u.auth {symbol}
%
s
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
auth
))
return
result
if
result
else
[]
async
def
get_common_type
(
cid
):
"""获取不同设备对象的类型type_id"""
sql
=
"select DISTINCT m_type from monitor where cid =
%
s"
async
with
MysqlUtil
()
as
coon
:
common_type
=
await
coon
.
fetchall
(
sql
,
args
=
(
cid
,))
common_type_list
=
[]
for
comm_type
in
common_type
:
common_type_list
.
append
(
comm_type
.
get
(
'm_type'
))
return
common_type_list
async
def
get_fields_by_mtid
(
mtid
,
table_name
=
"monitor"
,
fields
=
"m_type"
):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql
=
f
"select {fields} from {table_name} where mtid =
%
s"
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchone
(
sql
,
(
mtid
,))
return
result
async
def
get_point_monitor_dao
(
id_value
,
field
=
"m.mtid"
):
sql
=
f
"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc,"
\
f
"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid "
\
f
"where m.demolished = 0 and {field}=
%
s;"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
id_value
,))
return
data
async
def
get_location_monitor_dao
(
lid
):
sql
=
"SELECT l.lid,l.ad_field,m.sid FROM `location` l "
\
"INNER JOIN monitor m on l.mtid=m.mtid where l.lid=
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
lid
,))
return
data
async
def
get_monitor_status
(
topic
,
mtids
,
db
=
"bromake"
,
default_status
=
None
):
"""
从redis获取设备状态
:param default_status: 当获取不到设备状态时的默认状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list
=
[
f
"status:{topic}:{db}:{mtid}"
for
mtid
in
mtids
]
status_list_tmp
=
await
RedisUtils
()
.
mget
(
key_list
)
status_list
=
[]
for
status
in
status_list_tmp
:
if
isinstance
(
status
,
str
):
status
=
json
.
loads
(
status
)
if
status
is
None
:
status
=
default_status
status_list
.
append
(
status
)
return
dict
(
zip
(
mtids
,
status_list
))
async
def
get_monitor_online_count
(
topic
,
mtids
,
db
=
"bromake"
):
"""
从redis获取设备状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list
=
[
f
"status:{topic}:{db}:{mtid}"
for
mtid
in
mtids
]
status_list_tmp
=
await
RedisUtils
()
.
mget
(
key_list
)
online_count
=
0
for
status
in
status_list_tmp
:
if
status
is
None
:
continue
online_count
+=
1
return
online_count
async
def
get_tc_runtime
(
inline_ids
):
sql
=
"SELECT inlid, name, tc_runtime FROM `inline` where inlid in
%
s;"
async
with
MysqlUtil
()
as
conn
:
tc_runtimes
=
await
conn
.
fetchall
(
sql
,
args
=
(
inline_ids
,))
return
tc_runtimes
or
[]
async
def
get_pid_by_mtid
(
mtid
):
sql
=
"select pid from point where mtid =
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,))
return
data
async
def
get_point_install_date_by_cid
(
cid
):
sql
=
"select min(create_time) install_time from point where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetch_value
(
sql
,
args
=
(
cid
,))
return
data
unify_api/modules/tsp_water/procedures/drop_dust_pds.py
View file @
438e22c3
# from unify_api.modules.common.dao.common_es_dao import \
# query_point_15min_index_aggs_pid
# from unify_api.modules.elec_charge.dao.elec_charge_dao import \
# query_charge_aggs_points
# from unify_api.utils.time_format import start_end_date
...
...
unify_api/modules/tsp_water/service/drop_dust_service.py
View file @
438e22c3
from
unify_api.constants
import
SLOTS_15MIN
,
DUST_STATE
from
unify_api.modules.common.dao.common_dao
import
storey_pl_by_cid
,
\
storey_wp_by_cid
from
unify_api.modules.common.dao.common_
es_
dao
import
\
query_point_15min_index
,
sql_point_15min_index_new15
from
unify_api.modules.common.dao.common_dao
import
\
sql_point_15min_index_new15
from
unify_api.modules.common.procedures.points
import
points_by_storeys
from
unify_api.modules.tsp_water.components.drop_dust_cps
import
DdwResp
,
\
DdResp
,
IrmResp
,
IosResp
,
ItiResp
,
WsStatiResp
...
...
@@ -108,7 +108,6 @@ async def post_drop_dust_wave_service(point_id, start, end):
"""降尘措施-雾炮-运行曲线"""
# 1. 获取聚合信息
slots_list
=
SLOTS_15MIN
# es_re = await query_point_15min_index(start, end, point_id)
sql_re
=
await
sql_point_15min_index_new15
(
start
,
end
,
point_id
)
if
not
sql_re
:
return
DdwResp
(
slots
=
[],
value
=
[])
...
...
unify_api/modules/zhiwei_u/dao/data_es_dao.py
View file @
438e22c3
from
pot_libs.es_util.es_utils
import
EsUtil
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_mtid_by_pid_dao
,
\
get_sid_by_mtid_dao
,
get_mtids_by_pids_dao
from
unify_api.utils.time_format
import
convert_es_str
from
unify_api.modules.zhiwei_u.config
import
SCOPE_DATABASE
...
...
@@ -90,61 +92,52 @@ async def get_search_scope(cid, pid, start, end):
async
def
query_search_scope
(
cid
,
pid
,
page_num
,
page_size
,
start
,
end
):
query_body
=
{
"from"
:
(
page_num
-
1
)
*
page_size
,
"size"
:
page_size
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"mode.keyword"
:
"scope"
}
}
]
}
},
"sort"
:
[
{
"datetime"
:
{
"order"
:
"desc"
}
}
]
}
if
start
and
end
:
start_es
=
convert_es_str
(
start
)
end_es
=
convert_es_str
(
end
)
query_body
[
"query"
][
"bool"
][
"must"
]
.
append
(
{
"range"
:
{
"datetime"
:
{
"gte"
:
start_es
,
"lte"
:
end_es
}
}
}
)
start_time
,
end_time
,
scope_g
):
"""
查询录波列表
"""
if
len
(
pid
)
>
1
:
mtid
=
get_mtids_by_pids_dao
(
pid
)
else
:
mtid
=
get_mtid_by_pid_dao
(
pid
)
mtid
=
mtid
[
'mtid'
]
where
=
""
if
start_time
:
where
+=
f
" and event_datetime >= '{start_time}'"
if
end_time
:
where
+=
f
" and event_datetime <= '{end_time}'"
if
mtid
:
where
+=
f
" and pe.mtid={mtid}"
join_sql
=
""
sql
=
f
"""
SELECT
pt.event_id doc_id,
pt.event_datetime check_dt,
pt.`name` point,
pt.message message,
pe.scope_g scope_type
FROM
point_1min_event pt
LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid
AND pe.create_time = pt.event_datetime
WHERE
{where}
pe.cid = 183
AND pt.event_mode = '{scope_g}'
AND pe.pid in (1463, 2248)
ORDER BY
pe.create_time DESC
LIMIT limit {page_num} offset {page_size} """
async
with
MysqlUtil
()
as
conn
:
if
cid
:
query_body
[
"query"
][
"bool"
][
"must"
]
.
append
(
{
"terms"
:
{
"cid"
:
cid
}
}
)
if
pid
:
query_body
[
"query"
][
"bool"
][
"must"
]
.
append
(
{
"term"
:
{
"point_id"
:
pid
}
}
)
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
SCOPE_DATABASE
)
return
es_re
data
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
))
else
:
data
=
await
conn
.
fetchall
(
sql
)
return
data
async
def
get_scope_pids
(
pids
,
start
,
end
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment