Commit 1e77b43e authored by wang.wenrong's avatar wang.wenrong

common_monitor

parent 5835292c
...@@ -2,7 +2,7 @@ from operator import itemgetter ...@@ -2,7 +2,7 @@ from operator import itemgetter
from itertools import groupby from itertools import groupby
from pot_libs.common.components.query import Range, Equal, Filter, InGroup, \ from pot_libs.common.components.query import Range, Equal, Filter, InGroup, \
Sort Sort
from pot_libs.common.dao.common_dao import get_fields_by_mtid from unify_api.modules.common.dao.common_dao import get_fields_by_mtid
from pot_libs.es_util import es_helper from pot_libs.es_util import es_helper
from pot_libs.es_util.es_query import EsQuery from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log from pot_libs.logger import log
......
...@@ -107,7 +107,7 @@ async def meter_by_mids(mids): ...@@ -107,7 +107,7 @@ async def meter_by_mids(mids):
async def item_by_mitd_dao(mtids): async def item_by_mitd_dao(mtids):
sql = "select mtid, meter_no from monitor where mtid in %s" sql = "select mtid, meter_no from monitor where mtid in %s"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(mtids, )) datas = await conn.fetchall(sql, args=(mtids,))
return datas return datas
...@@ -268,7 +268,7 @@ async def monitor_page_by_cid(cids, page_num, page_size): ...@@ -268,7 +268,7 @@ async def monitor_page_by_cid(cids, page_num, page_size):
"where a.cid in %s and a.demolished = 0 GROUP BY a.cid limit %s, %s;" "where a.cid in %s and a.demolished = 0 GROUP BY a.cid limit %s, %s;"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
info_list = await conn.fetchall(sql=sql, args=( info_list = await conn.fetchall(sql=sql, args=(
tuple(cids), (page_num-1)*page_size, page_size)) tuple(cids), (page_num - 1) * page_size, page_size))
return info_list return info_list
...@@ -334,7 +334,7 @@ async def monitor_by_mtid(mtid): ...@@ -334,7 +334,7 @@ async def monitor_by_mtid(mtid):
async def search_iccid(sid): async def search_iccid(sid):
sql = "select iccid, sid from sid_iccid where sid = %s " sql = "select iccid, sid from sid_iccid where sid = %s "
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
iccid = await conn.fetchone(sql, args=(sid, )) iccid = await conn.fetchone(sql, args=(sid,))
return iccid return iccid
...@@ -343,3 +343,17 @@ async def save_iccid(sid, iccid): ...@@ -343,3 +343,17 @@ async def save_iccid(sid, iccid):
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
await conn.execute(sql, args=(sid, iccid)) await conn.execute(sql, args=(sid, iccid))
log.info(sql % (sid, iccid)) log.info(sql % (sid, iccid))
async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql = f"select {fields} from {table_name} where mtid = %s"
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, (mtid,))
return result
...@@ -2,6 +2,11 @@ from pot_libs.es_util.es_utils import EsUtil ...@@ -2,6 +2,11 @@ from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str from unify_api.utils.time_format import convert_es_str
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
import json
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
point_15min_index = "poweriot_point_15min_index" point_15min_index = "poweriot_point_15min_index"
...@@ -110,3 +115,281 @@ async def sql_point_15min_index_new15(start, end, pid): ...@@ -110,3 +115,281 @@ async def sql_point_15min_index_new15(start, end, pid):
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,)) datas = await conn.fetchall(sql, args=(pid,))
return datas return datas
async def company_by_cids(cids):
"""根据cids查询company信息"""
sql = "SELECT * from company where cid in %s"
async with MysqlUtil() as conn:
company_list = await conn.fetchall(sql, args=(tuple(cids),))
return company_list
async def point_by_points(point_list):
sql = "SELECT * from point where pid in %s"
async with MysqlUtil() as conn:
point_info_list = await conn.fetchall(sql, args=(tuple(point_list),))
return point_info_list
async def get_history_pids_by_mtids(cid, create_time, mtids=None):
"""
获取截止到某个日期之前安装的监测点
:param cid:
:param create_time:
:param mtids:
:return:
"""
where = ""
if mtids:
where += f" and m.mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql = f"SELECT p.pid,m.mtid from point p left join monitor m on " \
f"p.mtid=m.mtid where m.demolished = 0 and p.cid = %s and " \
f"p.create_time <= %s {where}"
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, create_time))
return result or []
async def point_by_mtids(mtids):
sql = "SELECT pid, mtid from point where mtid IN %s"
async with MysqlUtil() as conn:
point_list = await conn.fetchall(sql, args=(mtids,))
return point_list
async def points_by_cid(cids, m_type=None):
"""根据cid查询points"""
type_sql = ""
if m_type:
type_sql = f"and m.m_type = {m_type}"
sql = f"SELECT p.* FROM `monitor` m LEFT JOIN point p " \
f"on m.mtid=p.mtid WHERE p.cid in %s {type_sql}"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql, args=(tuple(cids),))
return points
async def points_monitor_by_cid(cids, m_type=None):
type_sql = ""
if m_type:
type_sql = f"and m.m_type = {m_type}"
sql = "SELECT p.pid, m.* FROM `monitor` m LEFT JOIN point p " \
f"on m.mtid=p.mtid where m.cid in %s and m.demolished=0 {type_sql}"
async with MysqlUtil() as conn:
points = await conn.fetchall(sql, args=(tuple(cids),))
return points
async def monitor_by_cid(cid, m_type=None):
"""根据cid查询monitor"""
type_sql = ""
if m_type:
type_sql = f"and monitor.m_type = {m_type}"
sql = f"SELECT * FROM monitor WHERE cid = %s and demolished = 0 {type_sql}"
async with MysqlUtil() as conn:
monitor_list = await conn.fetchall(sql, args=(cid,))
return monitor_list
async def monitor_point_join(cid):
"""monitor和point关联"""
sql = "SELECT m.mtid, p.pid, p.name, p.add_to_company FROM monitor m " \
"inner join point p on m.mtid = p.mtid " \
"WHERE m.cid = %s and m.demolished = 0"
async with MysqlUtil() as conn:
monitor_point_list = await conn.fetchall(sql, args=(cid,))
return monitor_point_list
async def monitor_location_join(cid):
"""monitor和location关联"""
sql = "SELECT m.mtid, l.id, l.item FROM monitor m " \
"inner join location l on m.mtid = l.mtid " \
"WHERE m.cid = %s and m.demolished = 0"
async with MysqlUtil() as conn:
monitor_location_list = await conn.fetchall(sql, args=(cid,))
return monitor_location_list
async def company_model_by_cid(cid):
"""根据cid查询company_model信息"""
sql = "SELECT * from company_model where cid = %s"
async with MysqlUtil() as conn:
company_model_dic = await conn.fetchone(sql, args=(cid,))
return company_model_dic
async def inline_zdu_all_by_cid(cid):
"""根据cid查询inline_zdu信息"""
sql = "SELECT * from inline where cid = %s"
async with MysqlUtil() as conn:
inline_zdu_dic = await conn.fetchall(sql, args=(cid,))
return inline_zdu_dic
async def company_extend_by_cid(cid):
"""根据cids查询company信息"""
sql = "SELECT * from company_extend where cid = %s"
async with MysqlUtil() as conn:
company_list = await conn.fetchall(sql, args=(cid,))
return company_list
async def user_by_user_id(user_id, is_delete=None):
sql = "SELECT * FROM user where user_id = %s"
if is_delete is not None:
sql += f' AND is_delete = {is_delete} '
async with MysqlUtil() as conn:
user_dic = await conn.fetchone(sql=sql, args=(user_id,))
return user_dic
async def user_by_phone_number(phone, is_delete=True):
sql = "SELECT * FROM user where phone_number = %s"
if is_delete:
sql += " and is_delete=0"
else:
sql += " order by is_delete asc"
async with MysqlUtil() as conn:
user_dic = await conn.fetchone(sql=sql, args=(phone,))
return user_dic
async def get_user_list_by_cid_auth(cid, auth, symbol="="):
"""
根据公司权限 获取用户列表
:param cid:
:param auth: 权限
:param symbol: 符号j
:return:
"""
sql = f"""
select distinct(u.user_id) user_id from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid = %s and u.auth {symbol} %s
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, auth))
return [user.get("user_id") for user in result] if result else []
async def get_user_info_list_by_cid_auth(cid, auth, symbol="="):
"""
根据公司权限 获取用户列表 有可能会有重复的user_info
:param cid:
:param auth: 权限
:param symbol: 比较符号 >, <, =......
:return:
"""
sql = f"""
select u.user_id, u.name from user u
left join user_proxy_company_auth upca on u.user_id = upca.user_id
where u.is_delete = 0 and upca.cid = %s and u.auth {symbol} %s
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, auth))
return result if result else []
async def get_common_type(cid):
"""获取不同设备对象的类型type_id"""
sql = "select DISTINCT m_type from monitor where cid = %s"
async with MysqlUtil() as coon:
common_type = await coon.fetchall(sql, args=(cid,))
common_type_list = []
for comm_type in common_type:
common_type_list.append(comm_type.get('m_type'))
return common_type_list
async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
"""
通过mtid获取设备表id
:param mtid:
:param table_name:
:param fields:
:return:
"""
sql = f"select {fields} from {table_name} where mtid = %s"
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, (mtid,))
return result
async def get_point_monitor_dao(id_value, field="m.mtid"):
sql = f"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc," \
f"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \
f"where m.demolished = 0 and {field}=%s;"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(id_value,))
return data
async def get_location_monitor_dao(lid):
sql = "SELECT l.lid,l.ad_field,m.sid FROM `location` l " \
"INNER JOIN monitor m on l.mtid=m.mtid where l.lid=%s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(lid,))
return data
async def get_monitor_status(topic, mtids, db="bromake", default_status=None):
"""
从redis获取设备状态
:param default_status: 当获取不到设备状态时的默认状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list = [f"status:{topic}:{db}:{mtid}" for mtid in mtids]
status_list_tmp = await RedisUtils().mget(key_list)
status_list = []
for status in status_list_tmp:
if isinstance(status, str):
status = json.loads(status)
if status is None:
status = default_status
status_list.append(status)
return dict(zip(mtids, status_list))
async def get_monitor_online_count(topic, mtids, db="bromake"):
"""
从redis获取设备状态
:param db: TIDB中的数据库名字
:param topic: 查询设备的主题类型
:param mtids: []
:return: {mtid: status} # status可能是int类型,也可能是list类型
"""
key_list = [f"status:{topic}:{db}:{mtid}" for mtid in mtids]
status_list_tmp = await RedisUtils().mget(key_list)
online_count = 0
for status in status_list_tmp:
if status is None:
continue
online_count += 1
return online_count
async def get_tc_runtime(inline_ids):
sql = "SELECT inlid, name, tc_runtime FROM `inline` where inlid in %s;"
async with MysqlUtil() as conn:
tc_runtimes = await conn.fetchall(sql, args=(inline_ids,))
return tc_runtimes or []
async def get_pid_by_mtid(mtid):
sql = "select pid from point where mtid = %s"
async with MysqlUtil() as conn:
data = await conn.fetchone(sql, args=(mtid,))
return data
async def get_point_install_date_by_cid(cid):
sql = "select min(create_time) install_time from point where cid = %s"
async with MysqlUtil() as conn:
data = await conn.fetch_value(sql, args=(cid,))
return data
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def tree_monitor_dao(cid, group=1):
sql = f"""
select a.id,a.parent_id,a.mtid, a.name ,(case when a.mtid=0 then
0 else 5 end) as `level`, ifnull(b.m_type,0) m_type
from monitor_tree a
left join monitor b on a.mtid = b.mtid
where a.cid = %s and a.class=%s and (b.demolished = 0 or a.mtid=0)
order by b.m_type asc,LENGTH(a.name),a.name,a.id asc
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, group))
return result
async def tree_build_dao(cid, group=1):
sql = f"""
select a.id,a.parent_id,ifnull(a.mtid,0) mtid,a.name,a.level,
ifnull(b.m_type,0) m_type,a.class as `group`
from build_tree a
left join monitor b on a.mtid = b.mtid
where a.is_delete = 0 and a.cid = %s and a.class=%s and (
b.demolished = 0 or a.mtid is null)
order by a.level desc,LENGTH(a.name),a.name
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid, group))
return result
async def monitor_name_list_dao(cid, mtids=None):
"""
获取监测点列表
:param cid:
:param mtids:
:return:
"""
where = ""
if mtids:
where += f" and mtid in {str(tuple(mtids)).replace(',)', ')')}"
sql = f"""
select mtid, name from monitor
where cid = %s and demolished = 0 {where}
order by mtid asc
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid,))
return result or []
async def tree_name_by_ids(cid, table_name="monitor_tree", ids=None):
"""
根据id列表获取树名称
:param cid:
:param table_name:
:param ids:
:return:
"""
where = ""
if ids:
where += f" and a.id in {str(tuple(ids)).replace(',)', ')')}"
if table_name == "monitor_tree":
sql = f"""
select a.id, a.name, a.mtid from monitor_tree a
where a.cid = %s {where}
order by a.name, a.id asc
"""
else:
sql = f"""
select a.id, concat(ifnull(b.name,""),"-",a.name) as name,
a.mtid from build_tree a
left join build_tree b on a.parent_id = b.id
where a.cid = %s {where}
order by a.name, a.id asc
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid,))
return result or []
async def dynamic_env_list_dao(cid, positions=None):
"""
获取动环列表
:param cid:
:param positions:
:return:
"""
where = ""
if positions:
where += f" and position in {str(tuple(positions)).replace(',)', ')')}"
sql = f"""
select position as id,position as name,group_concat(mtid) mtids
from dynamic_env_device
where cid = %s {where}
group by position
order by position asc
"""
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(cid,))
return result or []
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment