Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
e493023d
Commit
e493023d
authored
May 10, 2023
by
wang.wenrong
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'wwr' into 'develop'
common_monitor See merge request
!36
parents
1b9dee42
1e77b43e
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
404 additions
and
5 deletions
+404
-5
fine_monitor_serv.py
unify_api/modules/anshiu/service/fine_monitor_serv.py
+1
-1
common_dao.py
unify_api/modules/common/dao/common_dao.py
+18
-4
common_es_dao.py
unify_api/modules/common/dao/common_es_dao.py
+283
-0
list_point_dao.py
unify_api/modules/common/dao/list_point_dao.py
+102
-0
No files found.
unify_api/modules/anshiu/service/fine_monitor_serv.py
View file @
e493023d
...
...
@@ -2,7 +2,7 @@ from operator import itemgetter
from
itertools
import
groupby
from
pot_libs.common.components.query
import
Range
,
Equal
,
Filter
,
InGroup
,
\
Sort
from
pot_lib
s.common.dao.common_dao
import
get_fields_by_mtid
from
unify_api.module
s.common.dao.common_dao
import
get_fields_by_mtid
from
pot_libs.es_util
import
es_helper
from
pot_libs.es_util.es_query
import
EsQuery
from
pot_libs.logger
import
log
...
...
unify_api/modules/common/dao/common_dao.py
View file @
e493023d
...
...
@@ -107,7 +107,7 @@ async def meter_by_mids(mids):
async
def
item_by_mitd_dao
(
mtids
):
sql
=
"select mtid, meter_no from monitor where mtid in
%
s"
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
mtids
,
))
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
mtids
,))
return
datas
...
...
@@ -268,7 +268,7 @@ async def monitor_page_by_cid(cids, page_num, page_size):
"where a.cid in
%
s and a.demolished = 0 GROUP BY a.cid limit
%
s,
%
s;"
async
with
MysqlUtil
()
as
conn
:
info_list
=
await
conn
.
fetchall
(
sql
=
sql
,
args
=
(
tuple
(
cids
),
(
page_num
-
1
)
*
page_size
,
page_size
))
tuple
(
cids
),
(
page_num
-
1
)
*
page_size
,
page_size
))
return
info_list
...
...
@@ -334,7 +334,7 @@ async def monitor_by_mtid(mtid):
async
def
search_iccid
(
sid
):
sql
=
"select iccid, sid from sid_iccid where sid =
%
s "
async
with
MysqlUtil
()
as
conn
:
iccid
=
await
conn
.
fetchone
(
sql
,
args
=
(
sid
,
))
iccid
=
await
conn
.
fetchone
(
sql
,
args
=
(
sid
,))
return
iccid
...
...
@@ -342,4 +342,18 @@ async def save_iccid(sid, iccid):
sql
=
"INSERT INTO `sid_iccid` (`sid`, `iccid`) VALUES (
%
s,
%
s) "
async
with
MysqlUtil
()
as
conn
:
await
conn
.
execute
(
sql
,
args
=
(
sid
,
iccid
))
log
.
info
(
sql
%
(
sid
,
iccid
))
\ No newline at end of file
log
.
info
(
sql
%
(
sid
,
iccid
))
async
def
get_fields_by_mtid
(
mtid
,
table_name
=
"monitor"
,
fields
=
"m_type"
):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql
=
f
"select {fields} from {table_name} where mtid =
%
s"
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchone
(
sql
,
(
mtid
,))
return
result
unify_api/modules/common/dao/common_es_dao.py
View file @
e493023d
...
...
@@ -2,6 +2,11 @@ from pot_libs.es_util.es_utils import EsUtil
from
pot_libs.logger
import
log
from
unify_api.utils.time_format
import
convert_es_str
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
import
json
from
pot_libs.aredis_util.aredis_utils
import
RedisUtils
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.logger
import
log
point_15min_index
=
"poweriot_point_15min_index"
...
...
@@ -110,3 +115,281 @@ async def sql_point_15min_index_new15(start, end, pid):
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
pid
,))
return
datas
async
def
company_by_cids
(
cids
):
"""根据cids查询company信息"""
sql
=
"SELECT * from company where cid in
%
s"
async
with
MysqlUtil
()
as
conn
:
company_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
company_list
async
def
point_by_points
(
point_list
):
sql
=
"SELECT * from point where pid in
%
s"
async
with
MysqlUtil
()
as
conn
:
point_info_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
point_list
),))
return
point_info_list
async
def
get_history_pids_by_mtids
(
cid
,
create_time
,
mtids
=
None
):
"""
获取截止到某个日期之前安装的监测点
:param cid:
:param create_time:
:param mtids:
:return:
"""
where
=
""
if
mtids
:
where
+=
f
" and m.mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql
=
f
"SELECT p.pid,m.mtid from point p left join monitor m on "
\
f
"p.mtid=m.mtid where m.demolished = 0 and p.cid =
%
s and "
\
f
"p.create_time <=
%
s {where}"
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
create_time
))
return
result
or
[]
async
def
point_by_mtids
(
mtids
):
sql
=
"SELECT pid, mtid from point where mtid IN
%
s"
async
with
MysqlUtil
()
as
conn
:
point_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
mtids
,))
return
point_list
async
def
points_by_cid
(
cids
,
m_type
=
None
):
"""根据cid查询points"""
type_sql
=
""
if
m_type
:
type_sql
=
f
"and m.m_type = {m_type}"
sql
=
f
"SELECT p.* FROM `monitor` m LEFT JOIN point p "
\
f
"on m.mtid=p.mtid WHERE p.cid in
%
s {type_sql}"
async
with
MysqlUtil
()
as
conn
:
points
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
points
async
def
points_monitor_by_cid
(
cids
,
m_type
=
None
):
type_sql
=
""
if
m_type
:
type_sql
=
f
"and m.m_type = {m_type}"
sql
=
"SELECT p.pid, m.* FROM `monitor` m LEFT JOIN point p "
\
f
"on m.mtid=p.mtid where m.cid in
%
s and m.demolished=0 {type_sql}"
async
with
MysqlUtil
()
as
conn
:
points
=
await
conn
.
fetchall
(
sql
,
args
=
(
tuple
(
cids
),))
return
points
async
def
monitor_by_cid
(
cid
,
m_type
=
None
):
"""根据cid查询monitor"""
type_sql
=
""
if
m_type
:
type_sql
=
f
"and monitor.m_type = {m_type}"
sql
=
f
"SELECT * FROM monitor WHERE cid =
%
s and demolished = 0 {type_sql}"
async
with
MysqlUtil
()
as
conn
:
monitor_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_list
async
def
monitor_point_join
(
cid
):
"""monitor和point关联"""
sql
=
"SELECT m.mtid, p.pid, p.name, p.add_to_company FROM monitor m "
\
"inner join point p on m.mtid = p.mtid "
\
"WHERE m.cid =
%
s and m.demolished = 0"
async
with
MysqlUtil
()
as
conn
:
monitor_point_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_point_list
async
def
monitor_location_join
(
cid
):
"""monitor和location关联"""
sql
=
"SELECT m.mtid, l.id, l.item FROM monitor m "
\
"inner join location l on m.mtid = l.mtid "
\
"WHERE m.cid =
%
s and m.demolished = 0"
async
with
MysqlUtil
()
as
conn
:
monitor_location_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
monitor_location_list
async
def
company_model_by_cid
(
cid
):
"""根据cid查询company_model信息"""
sql
=
"SELECT * from company_model where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
company_model_dic
=
await
conn
.
fetchone
(
sql
,
args
=
(
cid
,))
return
company_model_dic
async
def
inline_zdu_all_by_cid
(
cid
):
"""根据cid查询inline_zdu信息"""
sql
=
"SELECT * from inline where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
inline_zdu_dic
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
inline_zdu_dic
async
def
company_extend_by_cid
(
cid
):
"""根据cids查询company信息"""
sql
=
"SELECT * from company_extend where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
company_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
company_list
async
def
user_by_user_id
(
user_id
,
is_delete
=
None
):
sql
=
"SELECT * FROM user where user_id =
%
s"
if
is_delete
is
not
None
:
sql
+=
f
' AND is_delete = {is_delete} '
async
with
MysqlUtil
()
as
conn
:
user_dic
=
await
conn
.
fetchone
(
sql
=
sql
,
args
=
(
user_id
,))
return
user_dic
async
def
user_by_phone_number
(
phone
,
is_delete
=
True
):
sql
=
"SELECT * FROM user where phone_number =
%
s"
if
is_delete
:
sql
+=
" and is_delete=0"
else
:
sql
+=
" order by is_delete asc"
async
with
MysqlUtil
()
as
conn
:
user_dic
=
await
conn
.
fetchone
(
sql
=
sql
,
args
=
(
phone
,))
return
user_dic
async
def
get_user_list_by_cid_auth
(
cid
,
auth
,
symbol
=
"="
):
"""
根据公司权限 获取用户列表
:param cid:
:param auth: 权限
:param symbol: 符号j
:return:
"""
sql
=
f
"""
select distinct(u.user_id) user_id from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid =
%
s and u.auth {symbol}
%
s
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
auth
))
return
[
user
.
get
(
"user_id"
)
for
user
in
result
]
if
result
else
[]
async
def
get_user_info_list_by_cid_auth
(
cid
,
auth
,
symbol
=
"="
):
"""
根据公司权限 获取用户列表 有可能会有重复的user_info
:param cid:
:param auth: 权限
:param symbol: 比较符号 >, <, =......
:return:
"""
sql
=
f
"""
select u.user_id, u.name from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid =
%
s and u.auth {symbol}
%
s
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
auth
))
return
result
if
result
else
[]
async
def
get_common_type
(
cid
):
"""获取不同设备对象的类型type_id"""
sql
=
"select DISTINCT m_type from monitor where cid =
%
s"
async
with
MysqlUtil
()
as
coon
:
common_type
=
await
coon
.
fetchall
(
sql
,
args
=
(
cid
,))
common_type_list
=
[]
for
comm_type
in
common_type
:
common_type_list
.
append
(
comm_type
.
get
(
'm_type'
))
return
common_type_list
async
def
get_fields_by_mtid
(
mtid
,
table_name
=
"monitor"
,
fields
=
"m_type"
):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql
=
f
"select {fields} from {table_name} where mtid =
%
s"
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchone
(
sql
,
(
mtid
,))
return
result
async
def
get_point_monitor_dao
(
id_value
,
field
=
"m.mtid"
):
sql
=
f
"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc,"
\
f
"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid "
\
f
"where m.demolished = 0 and {field}=
%
s;"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
id_value
,))
return
data
async
def
get_location_monitor_dao
(
lid
):
sql
=
"SELECT l.lid,l.ad_field,m.sid FROM `location` l "
\
"INNER JOIN monitor m on l.mtid=m.mtid where l.lid=
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
lid
,))
return
data
async
def
get_monitor_status
(
topic
,
mtids
,
db
=
"bromake"
,
default_status
=
None
):
"""
从redis获取设备状态
:param default_status: 当获取不到设备状态时的默认状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list
=
[
f
"status:{topic}:{db}:{mtid}"
for
mtid
in
mtids
]
status_list_tmp
=
await
RedisUtils
()
.
mget
(
key_list
)
status_list
=
[]
for
status
in
status_list_tmp
:
if
isinstance
(
status
,
str
):
status
=
json
.
loads
(
status
)
if
status
is
None
:
status
=
default_status
status_list
.
append
(
status
)
return
dict
(
zip
(
mtids
,
status_list
))
async
def
get_monitor_online_count
(
topic
,
mtids
,
db
=
"bromake"
):
"""
从redis获取设备状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list
=
[
f
"status:{topic}:{db}:{mtid}"
for
mtid
in
mtids
]
status_list_tmp
=
await
RedisUtils
()
.
mget
(
key_list
)
online_count
=
0
for
status
in
status_list_tmp
:
if
status
is
None
:
continue
online_count
+=
1
return
online_count
async
def
get_tc_runtime
(
inline_ids
):
sql
=
"SELECT inlid, name, tc_runtime FROM `inline` where inlid in
%
s;"
async
with
MysqlUtil
()
as
conn
:
tc_runtimes
=
await
conn
.
fetchall
(
sql
,
args
=
(
inline_ids
,))
return
tc_runtimes
or
[]
async
def
get_pid_by_mtid
(
mtid
):
sql
=
"select pid from point where mtid =
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,))
return
data
async
def
get_point_install_date_by_cid
(
cid
):
sql
=
"select min(create_time) install_time from point where cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetch_value
(
sql
,
args
=
(
cid
,))
return
data
unify_api/modules/common/dao/list_point_dao.py
0 → 100644
View file @
e493023d
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
async
def
tree_monitor_dao
(
cid
,
group
=
1
):
sql
=
f
"""
select a.id,a.parent_id,a.mtid, a.name ,(case when a.mtid=0 then
0 else 5 end) as `level`, ifnull(b.m_type,0) m_type
from monitor_tree a
left join monitor b on a.mtid = b.mtid
where a.cid =
%
s and a.class=
%
s and (b.demolished = 0 or a.mtid=0)
order by b.m_type asc,LENGTH(a.name),a.name,a.id asc
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
group
))
return
result
async
def
tree_build_dao
(
cid
,
group
=
1
):
sql
=
f
"""
select a.id,a.parent_id,ifnull(a.mtid,0) mtid,a.name,a.level,
ifnull(b.m_type,0) m_type,a.class as `group`
from build_tree a
left join monitor b on a.mtid = b.mtid
where a.is_delete = 0 and a.cid =
%
s and a.class=
%
s and (
b.demolished = 0 or a.mtid is null)
order by a.level desc,LENGTH(a.name),a.name
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
group
))
return
result
async
def
monitor_name_list_dao
(
cid
,
mtids
=
None
):
"""
获取监测点列表
:param cid:
:param mtids:
:return:
"""
where
=
""
if
mtids
:
where
+=
f
" and mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql
=
f
"""
select mtid, name from monitor
where cid =
%
s and demolished = 0 {where}
order by mtid asc
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
result
or
[]
async
def
tree_name_by_ids
(
cid
,
table_name
=
"monitor_tree"
,
ids
=
None
):
"""
根据id列表获取树名称
:param cid:
:param table_name:
:param ids:
:return:
"""
where
=
""
if
ids
:
where
+=
f
" and a.id in {str(tuple(ids)).replace(',)', ')')}"
if
table_name
==
"monitor_tree"
:
sql
=
f
"""
select a.id, a.name, a.mtid from monitor_tree a
where a.cid =
%
s {where}
order by a.name, a.id asc
"""
else
:
sql
=
f
"""
select a.id, concat(ifnull(b.name,""),"-",a.name) as name,
a.mtid from build_tree a
left join build_tree b on a.parent_id = b.id
where a.cid =
%
s {where}
order by a.name, a.id asc
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
result
or
[]
async
def
dynamic_env_list_dao
(
cid
,
positions
=
None
):
"""
获取动环列表
:param cid:
:param positions:
:return:
"""
where
=
""
if
positions
:
where
+=
f
" and position in {str(tuple(positions)).replace(',)', ')')}"
sql
=
f
"""
select position as id,position as name,group_concat(mtid) mtids
from dynamic_env_device
where cid =
%
s {where}
group by position
order by position asc
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,))
return
result
or
[]
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment