Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
2bd87ca5
Commit
2bd87ca5
authored
Jul 19, 2023
by
ZZH
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
longgang data 2023-7-19
parent
4100febc
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
306 additions
and
194 deletions
+306
-194
select_company_cps.py
unify_api/modules/common/components/select_company_cps.py
+1
-1
common_dao.py
unify_api/modules/common/dao/common_dao.py
+12
-12
open_data_cps.py
unify_api/modules/shidianu/components/open_data_cps.py
+59
-3
open_data_dao.py
unify_api/modules/shidianu/dao/open_data_dao.py
+26
-45
open_data_service.py
unify_api/modules/shidianu/service/open_data_service.py
+80
-117
open_data.py
unify_api/modules/shidianu/views/open_data.py
+128
-16
No files found.
unify_api/modules/common/components/select_company_cps.py
View file @
2bd87ca5
...
...
@@ -29,7 +29,7 @@ class CompanyResponse(Model, DbErr):
@
dataclass
class
CmReq
(
Model
):
cid
:
int
=
Int
(
"公司id"
)
.
eg
(
66
)
cid
:
int
=
Opt
(
Int
(
"公司id"
)
.
eg
(
66
)
)
@
dataclass
...
...
unify_api/modules/common/dao/common_dao.py
View file @
2bd87ca5
...
...
@@ -103,19 +103,19 @@ async def item_by_mitd_dao(mtids):
return
datas
async
def
monitor_point_storey_join_in
(
cid
,
page_num
,
page
_size
):
async
def
load_compy_storey_points
(
cid
,
pg_num
,
pg
_size
):
"""monitor和point和storey联合查询, 分页"""
sql
=
"SELECT monitor.cid,
c.address,point.name,
point.create_time, "
\
"monitor.sid,
srm.room_name, srm.storey_name, monitor.longitud
e, "
\
"
point.mtid,monitor.latitude,point.pid FROM monitor
"
\
"
inner join point on monitor.mtid = point.mtid inner join
"
\
"
storey_room_map srm on point.pid = srm.point_id left join
"
\
"
company c on c.cid = monitor.cid WHERE monitor.cid in
%
s
"
\
"
and monitor.demolished = 0 order by point.pid limit
%
s,
%
s"
async
with
MysqlUtil
()
as
conn
:
monitor_point_storey_list
=
await
conn
.
fetchall
(
sql
,
args
=
(
cid
,
(
page_num
-
1
)
*
page_size
,
page_size
))
return
monitor_point_storey_list
sql
=
"SELECT monitor.cid,
c.address, point.name,
point.create_time, "
\
"monitor.sid,
monitor.meter_no, srm.room_name, srm.storey_nam
e, "
\
"
monitor.longitude, point.mtid, monitor.latitude, point.pid
"
\
"
FROM monitor
"
\
"
inner join point on monitor.mtid = point.mtid
"
\
"
inner join storey_room_map srm on point.pid = srm.point_id
"
\
"
left join company c on c.cid = monitor.cid "
\
"WHERE monitor.cid in
%
s and monitor.demolished = 0 "
\
"order by point.pid limit
%
s,
%
s"
async
with
MysqlUtil
()
as
conn
:
return
await
conn
.
fetchall
(
sql
,
(
cid
,
(
pg_num
-
1
)
*
pg_size
,
pg_size
))
async
def
meter_param_by_mid
(
mtid
):
...
...
unify_api/modules/shidianu/components/open_data_cps.py
View file @
2bd87ca5
from
dataclasses
import
dataclass
from
pot_libs.sanic_api
import
Model
from
pot_libs.sanic_api.column
import
Float
,
Str
,
List
,
Int
,
Dict
,
Opt
from
pot_libs.common.components.fields
import
Cid
@
dataclass
class
BasicInfoReq
(
Model
):
# cid: int = Int("工厂id").eg(78)
# cid: int = Opt(Int("工厂id").eg(78))
page_size
:
int
=
Opt
(
Int
(
"页面大小"
)
.
eg
(
10
))
page_num
:
int
=
Opt
(
Int
(
"当前页面"
)
.
eg
(
1
))
...
...
@@ -46,3 +44,61 @@ class SupplementReq(Model):
start
:
str
=
Str
(
"开始时间"
)
.
eg
(
"2021-12-29 00:00:00"
)
end
:
str
=
Str
(
"开始时间"
)
.
eg
(
"2021-12-29 06:00:00"
)
type
:
str
=
Str
(
"类型"
)
.
eg
(
"appliance or electric"
)
@
dataclass
class
RiskCount
(
Model
):
security_user
:
int
=
Opt
(
Int
(
"安全用户"
)
.
eg
(
10
))
risk_user
:
int
=
Opt
(
Int
(
"风险用户"
)
.
eg
(
2
))
@
dataclass
class
ContentName
(
Model
):
ele_overload
:
int
=
Opt
(
Int
(
"用电超载"
)
.
eg
(
10
))
high_power_app
:
int
=
Opt
(
Int
(
"大功率电器"
)
.
eg
(
2
))
illegal_ele_app
:
int
=
Opt
(
Int
(
"违规电器"
)
.
eg
(
2
))
power_quality
:
int
=
Opt
(
Int
(
"电能质量"
)
.
eg
(
2
))
ele_car_battery
:
int
=
Opt
(
Int
(
"电能质量"
)
.
eg
(
2
))
@
dataclass
class
HomeDataResp
(
Model
):
risk_distribution
:
RiskCount
=
Opt
(
RiskCount
)
content_distribution
:
ContentName
=
Opt
(
ContentName
)
electric_use_score
:
float
=
Opt
(
Float
(
"用电安全指数"
)
.
eg
(
90.3
))
total_tenant
:
int
=
Opt
(
Int
(
"接入住户"
)
.
eg
(
20
))
online_rate
:
float
=
Opt
(
Float
(
"在线率"
)
.
eg
(
0.8
))
safe_day
:
float
=
Opt
(
Float
(
"平均安全运行"
)
.
eg
(
11.1
))
total_power
:
float
=
Opt
(
Float
(
"累计监测用电"
)
.
eg
(
96000
))
total_alarm
:
int
=
Opt
(
Int
(
"累计报警次数"
)
.
eg
(
5
))
@
dataclass
class
HomeLstAlarmReq
(
Model
):
cid
:
Opt
(
Cid
)
importance
:
list
=
Opt
(
List
(
"报警等级,默认:[1, 2, 3]"
)
.
eg
([
1
,
2
,
3
]))
page_size
:
int
=
Opt
(
Int
(
"每页记录数"
)
.
eg
(
10
))
page_num
:
int
=
Opt
(
Int
(
"当前页码"
)
.
eg
(
1
))
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2021-02-01 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2021-02-28 23:59:59"
))
@
dataclass
class
HomeAlarmStatsReq
(
Model
):
cid
:
Opt
(
Cid
)
importance
:
list
=
Opt
(
List
(
"报警等级,默认:[1, 2, 3]"
)
.
eg
([
1
,
2
,
3
]))
page_size
:
int
=
Opt
(
Int
(
"每页记录数"
)
.
eg
(
10
))
page_num
:
int
=
Opt
(
Int
(
"当前页码"
)
.
eg
(
1
))
start
:
str
=
Opt
(
Str
(
"开始时间"
)
.
eg
(
"2021-02-01 00:00:00"
))
end
:
str
=
Opt
(
Str
(
"结束时间"
)
.
eg
(
"2021-02-28 23:59:59"
))
@
dataclass
class
HomeAlarmStatsResp
(
Model
):
ele_overload
:
dict
=
Dict
(
"线路过载"
)
.
eg
(
{
"slots"
:
[
"00-01"
,
"00-02"
,
"00-03"
],
"value"
:
[
1
,
2
,
3
]})
illegal_ele_app
:
dict
=
Dict
(
"违规电器"
)
.
eg
(
{
"slots"
:
[
"00-01"
,
"00-02"
,
"00-03"
],
"value"
:
[
1
,
2
,
3
]})
power_quality
:
dict
=
Dict
(
"电能质量"
)
.
eg
(
{
"slots"
:
[
"00-01"
,
"00-02"
,
"00-03"
],
"value"
:
[
1
,
2
,
3
]})
unify_api/modules/shidianu/dao/open_data_dao.py
View file @
2bd87ca5
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.es_util.es_utils
import
EsUtil
from
unify_api.constants
import
POINT_1MIN_EVENT
,
SDU_ALARM_LIST
from
unify_api.constants
import
SDU_ALARM_LIST
import
pendulum
from
unify_api.utils.time_format
import
CST
async
def
get_user_product_auth
(
user_id
):
sql
=
"SELECT * from user_product_auth where user_id=
%
s and product=4"
async
with
MysqlUtil
()
as
conn
:
user_info
=
await
conn
.
fetchone
(
sql
,
args
=
(
user_id
,))
return
user_info
return
await
conn
.
fetchone
(
sql
,
args
=
(
user_id
,))
async
def
get_basic_info_by_mtid
(
mtid
,
cid
):
...
...
@@ -16,8 +16,7 @@ async def get_basic_info_by_mtid(mtid, cid):
"on p.pid=s.point_id LEFT JOIN monitor m on m.mtid=p.mtid "
\
"where m.mtid=
%
s and m.demolished=0 and s.cid =
%
s"
async
with
MysqlUtil
()
as
conn
:
info
=
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,
cid
))
return
info
return
await
conn
.
fetchone
(
sql
,
args
=
(
mtid
,
cid
))
async
def
monitor_point_company
(
cids
):
...
...
@@ -26,45 +25,27 @@ async def monitor_point_company(cids):
"on c.cid=m.cid INNER JOIN point p on m.mtid=p.mtid "
\
"INNER JOIN storey_room_map s on s.point_id=p.pid where c.cid in
%
s"
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
cids
,
))
return
datas
return
await
conn
.
fetchall
(
sql
,
args
=
(
cids
,))
async
def
result_longgang_by_cid
(
cids
,
page_num
,
page_size
,
importance
):
alarm_list
=
SDU_ALARM_LIST
async
def
load_lg_sdu_events
(
cid
,
pg_num
,
pg_size
,
importance
):
cond_lst
=
[
f
"cid={cid}"
,
f
"event_type in {tuple(SDU_ALARM_LIST)}"
]
if
len
(
importance
)
>
0
:
importance
=
str
(
tuple
(
importance
))
.
replace
(
',)'
,
')'
)
cond_lst
.
append
(
f
"importance in {importance}"
)
query_body
=
{
"from"
:
(
page_num
-
1
)
*
page_size
,
"size"
:
page_size
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"cid"
:
cids
}
},
{
"terms"
:
{
"type.keyword"
:
alarm_list
}
},
{
"terms"
:
{
"importance"
:
importance
}
}
]
}
},
"sort"
:
[
{
"datetime"
:
{
"order"
:
"desc"
}
}
]
}
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
POINT_1MIN_EVENT
)
return
es_re
time_format
=
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
end_date
=
str
(
pendulum
.
now
(
tz
=
CST
)
.
strftime
(
time_format
))
cond_lst
.
append
(
f
"event_datetime < '{end_date}'"
)
cond_str
=
" AND "
.
join
(
cond_lst
)
async
with
MysqlUtil
()
as
conn
:
sql
=
f
"select count(*) cnt from point_1min_event WHERE {cond_str};"
total
=
await
conn
.
fetchone
(
sql
)
total_count
=
total
.
get
(
"cnt"
,
0
)
if
total_count
<=
0
:
return
0
,
[]
sql
=
f
"select * from point_1min_event WHERE {cond_str} "
\
f
"order by event_datetime desc limit
%
s,
%
s;"
data
=
await
conn
.
fetchall
(
sql
,
args
=
((
pg_num
-
1
)
*
pg_size
,
pg_size
))
return
total_count
,
data
unify_api/modules/shidianu/service/open_data_service.py
View file @
2bd87ca5
...
...
@@ -9,103 +9,76 @@ from unify_api.modules.shidianu.components.open_data_cps import (
)
from
unify_api.modules.alarm_manager.components.list_alarm
import
\
ListAlarmResponse
,
Alarm
from
unify_api.modules.common.dao.common_dao
import
meter_by_mids
,
\
monitor_point_storey_join_in
,
points_monitor_by_cid
from
unify_api.modules.common.procedures.points
import
point_to_mid
from
unify_api.modules.common.dao.common_dao
import
load_compy_storey_points
from
unify_api.modules.shidianu.dao.open_data_dao
import
\
get_user_product_auth
,
result_longgang_by_cid
,
monitor_point_company
get_user_product_auth
,
load_lg_sdu_events
,
monitor_point_company
from
pot_libs.aiohttp_util.aiohttp_utils
import
AioHttpUtils
from
pot_libs.settings
import
SETTING
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
pot_libs.aredis_util.aredis_utils
import
RedisUtils
from
unify_api.utils
import
time_format
from
unify_api.utils.time_format
import
(
CST
,
convert_dt_to_timestr
,
convert_to_dt
)
async
def
basic_info_longgang_service
(
user_id
,
p
age_size
,
page
_num
):
async
def
basic_info_longgang_service
(
user_id
,
p
g_size
,
pg
_num
):
# 1.验证权限
cids
=
[
223
]
is_auth
=
await
get_power
(
user_id
,
cids
)
if
not
is_auth
:
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
# 2. 信息
# company = await company_by_cids(cids)
# if not company:
# return success_res(code=4008, msg="没有该cid信息")
rows
=
[]
# company_address = company[0]['address'] or ''
monitor_point_storey_list
=
await
monitor_point_storey_join_in
(
cids
,
1
,
3000
)
total
=
len
(
monitor_point_storey_list
)
if
not
monitor_point_storey_list
:
rows
=
[]
storey_points
=
await
load_compy_storey_points
(
cids
,
1
,
3000
)
total
=
len
(
storey_points
)
if
not
storey_points
:
return
success_res
(
code
=
4009
,
msg
=
"此厂还未装任何装置"
)
page_start
=
(
page_num
-
1
)
*
page_size
page_end
=
(
page_num
-
1
)
*
page_size
+
page_size
page_start
=
(
pg_num
-
1
)
*
pg_size
page_end
=
(
pg_num
-
1
)
*
pg_size
+
pg_size
if
page_start
>
total
:
return
BasicInfoResp
(
rows
=
[],
total
=
total
)
# 3. 增加相序字段
pids
=
[
i
[
"pid"
]
for
i
in
monitor_point_storey_list
]
# 3.1 point和mid对应关系
point_mid
,
p_num
=
await
point_to_mid
(
pids
)
# 3.2 mid查询相序
mids
=
point_mid
.
values
()
meter_list
=
await
meter_by_mids
(
mids
=
mids
)
# 把mid提出来, {mid: meter_no}
mid_meter_dic
=
{
i
[
"mid"
]:
i
[
"meter_no"
]
for
i
in
meter_list
}
for
info
in
monitor_point_storey_list
[
page_start
:
page_end
]:
cid
=
info
[
"cid"
]
storey_id
=
info
.
get
(
"storey_id"
)
or
''
for
info
in
storey_points
[
page_start
:
page_end
]:
room_name
=
info
[
"room_name"
]
or
''
storey_name
=
info
[
"storey_name"
]
or
''
company_address
=
info
[
"address"
]
or
''
# 增加相序字段
pid
=
info
[
"pid"
]
meter_no
=
mid_meter_dic
[
point_mid
[
pid
]]
+
"相"
# 创建时间
# create_time = info["create_time"]
# create_time = datetime.strftime(
# datetime.fromtimestamp(create_time), "%Y-%m-%d %H:%M")
compy_addr
=
info
[
"address"
]
or
''
rows
.
append
(
{
"cid"
:
cid
,
"cid"
:
info
[
"cid"
]
,
"pid"
:
info
[
"pid"
],
"mtid"
:
info
[
"mtid"
],
"storey_id"
:
storey_id
,
"meter_no"
:
meter_no
,
"equipment_address"
:
f
"{comp
any_address
}{storey_name}{room_name}"
,
"storey_id"
:
info
.
get
(
"storey_id"
,
""
)
,
"meter_no"
:
info
[
"meter_no"
]
+
"相"
,
"equipment_address"
:
f
"{comp
y_addr
}{storey_name}{room_name}"
,
"equipment_code"
:
info
[
"sid"
],
"root_address"
:
f
"{comp
any_address
}{storey_name}{room_name}"
,
"longitude"
:
info
.
get
(
"longitude"
,
''
)
or
''
,
"latitude"
:
info
.
get
(
"latitude"
,
''
)
or
''
,
"root_address"
:
f
"{comp
y_addr
}{storey_name}{room_name}"
,
"longitude"
:
info
.
get
(
"longitude"
,
''
),
"latitude"
:
info
.
get
(
"latitude"
,
''
),
"insurer"
:
"黄锦成"
,
"insurer_phone"
:
"13580428480"
}
)
return
BasicInfoResp
(
rows
=
rows
,
total
=
total
)
async
def
stb_data_longgang_service
(
user_id
,
type
):
db_name
=
{
"soe"
:
{
"super"
:
"db_soe"
,
"suffix"
:
"soe"
},
"electric"
:
{
"super"
:
"db_electric"
,
"suffix"
:
"ele"
},
"appliance"
:
{
"super"
:
"db_appliance"
,
"suffix"
:
"app"
}
}
if
type
not
in
db_name
.
keys
():
async
def
stb_data_longgang_service
(
user_id
,
d_type
):
topic2db
=
{
"electric"
:
"db_electric"
,
"soe"
:
"db_soe"
,
"appliance"
:
"db_appliance"
}
if
d_type
not
in
topic2db
.
keys
():
return
success_res
(
code
=
4002
,
msg
=
"type错误"
)
# 频率
is_frequency
=
await
RedisUtils
()
.
get
(
f
"get_data_{type}:{user_id}"
)
if
is_frequency
:
access_lim_key
=
f
"access_data_{d_type}:{user_id}"
if
await
RedisUtils
()
.
get
(
access_lim_key
)
:
return
success_res
(
code
=
4011
,
msg
=
"访问频繁,请10s后访问"
)
# 权限
# cids = SETTING.get_new_datas_by_cids
cids
=
[
223
]
is_auth
=
await
get_power
(
user_id
,
cids
)
if
not
is_auth
:
if
not
await
get_power
(
user_id
,
cids
):
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
token
=
get_token
()
stb_url
=
f
"{SETTING.stb_url}{db_name[type]['super']}"
td_token
=
get_token
()
db_name
=
topic2db
[
d_type
]
stb_url
=
f
"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai"
sql
=
f
"""
select last_row(*) from electric_stb
where cpyid = {cids[0]}
...
...
@@ -113,32 +86,25 @@ async def stb_data_longgang_service(user_id, type):
"""
resp_str
,
status
=
await
AioHttpUtils
()
.
post_data
(
stb_url
,
data
=
sql
,
timeout
=
50
,
headers
=
{
"Authorization"
:
f
"Basic {token}"
}
headers
=
{
"Authorization"
:
f
"Basic {t
d_t
oken}"
}
)
if
not
resp_str
or
status
!=
200
:
return
success_res
(
code
=
4003
,
msg
=
"未查找到数据"
)
results
=
json
.
loads
(
resp_str
)
head
=
[
re
.
findall
(
r'last_row\((.*)\)'
,
i
)[
0
]
if
"("
in
i
else
i
for
i
in
results
[
"head"
]]
datas
=
[]
for
res
in
results
[
"data"
]:
datas
.
append
(
dict
(
zip
(
head
,
res
)))
if
type
==
"electric"
:
datas
=
[
dict
(
zip
(
head
,
r
))
for
r
in
results
[
"data"
]]
if
d_type
==
"electric"
:
[
data
.
pop
(
"ts_received"
)
for
data
in
datas
]
else
:
for
data
in
datas
:
data
[
"ts"
]
=
data
[
"ts_origin"
]
data
.
pop
(
"ts_origin"
)
total
=
results
.
get
(
"rows"
,
0
)
or
0
# 对数据进行压缩
# datas_str = json.dumps(datas)
# datas_bytes = zlib.compress(datas_str.encode("utf-8"))
# encoded = base64.b64encode(bytearray(datas_bytes)).decode()
# log.info(f"stb_data 压缩前{len(datas_str)},压缩后{len(datas_bytes)},"
# f"输出大小{len(encoded)}")
await
RedisUtils
()
.
setex
(
f
"get_data_{type}:{user_id}"
,
10
,
1
)
# return StbDataResp(rows=encoded, total=total)
return
StbDataResp
(
rows
=
datas
,
total
=
total
)
await
RedisUtils
()
.
setex
(
access_lim_key
,
10
,
1
)
return
StbDataResp
(
rows
=
datas
,
total
=
results
.
get
(
"rows"
,
0
))
async
def
get_power
(
user_id
,
cids
):
...
...
@@ -178,32 +144,32 @@ async def get_requests(token, stb_url, sql):
return
False
async
def
supplement_data_service
(
user_id
,
cid
,
start
,
end
,
type
):
is_auth
=
await
get_power
(
user_id
,
cid
)
if
not
is_auth
:
async
def
supplement_data_service
(
user_id
,
cid
,
start
,
end
,
d_type
):
if
not
await
get_power
(
user_id
,
cid
):
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
db_name
=
{
"soe"
:
"db_soe"
,
"electric"
:
"db_electric"
,
"appliance"
:
"db_appliance"
}
if
type
not
in
db_name
.
keys
():
topic2db
=
{
"electric"
:
"db_electric"
,
"soe"
:
"db_soe"
,
"appliance"
:
"db_appliance"
}
if
d_type
not
in
topic2db
.
keys
():
return
success_res
(
code
=
4002
,
msg
=
"type错误"
)
now
=
pendulum
.
now
()
now
=
pendulum
.
now
(
tz
=
CST
)
try
:
start
=
my_pendulum
.
from_format
(
start
,
'YYYY-MM-DD HH:mm:ss'
)
end
=
my_pendulum
.
from_format
(
end
,
'YYYY-MM-DD HH:mm:ss'
)
hour
=
(
end
-
start
)
.
in_hours
()
if
hour
>
6
:
if
(
end
-
start
)
.
in_hours
()
>
6
:
return
success_res
(
code
=
4003
,
msg
=
"时间跨度不能大于6小时"
)
if
(
now
-
start
)
.
in_months
()
<
1
:
if
(
now
-
start
)
.
in_months
()
>
1
:
return
success_res
(
code
=
4010
,
msg
=
"查询日期不能大于1个月"
)
except
Exception
as
e
:
success_res
(
code
=
4004
,
msg
=
"开始时间或者结束时间错误"
)
token
=
get_token
()
stb_url
=
f
"{SETTING.stb_url}{db_name[type]}"
table_name
=
f
"{type}_stb"
if
type
==
"electric"
:
db_name
=
topic2db
[
d_type
]
stb_url
=
f
"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai"
table_name
=
f
"{d_type}_stb"
if
d_type
==
"electric"
:
sql
=
f
"select * from {table_name} where cpyid={cid} and "
\
f
"ts >= '{start}' and ts <= '{end}'"
else
:
...
...
@@ -215,11 +181,12 @@ async def supplement_data_service(user_id, cid, start, end, type):
)
if
not
resp_str
or
status
!=
200
:
return
success_res
(
code
=
4003
,
msg
=
"未查找到数据"
)
results
=
json
.
loads
(
resp_str
)
datas
=
[]
for
res
in
results
[
"data"
]:
datas
.
append
(
dict
(
zip
(
results
[
"head"
],
res
)))
if
type
==
"electric"
:
if
d_
type
==
"electric"
:
datas
=
[
data
.
pop
(
"ts_received"
)
for
data
in
datas
]
else
:
for
data
in
datas
:
...
...
@@ -230,16 +197,17 @@ async def supplement_data_service(user_id, cid, start, end, type):
# 告警结果和分析结果
async
def
result_longgang_service
(
user_id
,
importance
,
p
age_size
,
page
_num
):
# cids = SETTING.get_new_datas_by_cids
async
def
result_longgang_service
(
user_id
,
importance
,
p
g_size
,
pg
_num
):
cid
=
223
cids
=
[
223
]
is_auth
=
await
get_power
(
user_id
,
cids
)
if
not
is_auth
:
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
es_res
=
await
result_longgang_by_cid
(
cids
,
page_num
,
page_size
,
importance
)
if
not
e
s_res
[
"hits"
][
"hits"
]
:
total
,
events
=
await
load_lg_sdu_events
(
cid
,
pg_num
,
pg_size
,
importance
)
if
not
e
vents
:
return
ListAlarmResponse
(
total
=
0
,
rows
=
[])
# 2. 构建返回数据
monitor_point_list
=
await
monitor_point_company
(
cids
)
mp_map
=
{}
...
...
@@ -253,31 +221,28 @@ async def result_longgang_service(user_id, importance, page_size, page_num):
}
rows
=
[]
for
info
in
es_res
[
"hits"
][
"hits"
]:
es_id
=
info
[
"_id"
]
source
=
info
[
"_source"
]
point_id
=
source
.
get
(
"point_id"
)
for
event
in
events
:
point_id
=
event
.
get
(
"pid"
)
if
point_id
in
mp_map
.
keys
():
type
=
source
.
get
(
"
type"
)
type
=
event
.
get
(
"event_
type"
)
type_str
=
constants
.
SDU_EVENT_TYPE_MAP
.
get
(
type
,
type
)
point_id
=
source
.
get
(
"point_
id"
)
point_id
=
event
.
get
(
"p
id"
)
sid
=
mp_map
.
get
(
point_id
)
.
get
(
"sid"
)
mtid
=
mp_map
.
get
(
point_id
)
.
get
(
"mtid"
)
date_time
=
source
.
get
(
"datetime"
)
dt
=
time_format
.
convert_to_dt
(
date_time
)
date_time
=
time_format
.
convert_dt_to_timestr
(
dt
)
event_duration
=
source
.
get
(
"event_duration"
)
date_time
=
event
.
get
(
"event_datetime"
)
dt
=
convert_to_dt
(
date_time
)
date_time
=
convert_dt_to_timestr
(
dt
)
event_duration
=
event
.
get
(
"event_duration"
)
storey_name
=
mp_map
.
get
(
point_id
)
.
get
(
"storey_name"
)
room_name
=
mp_map
.
get
(
point_id
)
.
get
(
"room_name"
)
company_name
=
mp_map
.
get
(
point_id
)
.
get
(
"fullname"
)
alarm
=
Alarm
(
es_id
=
e
s_id
,
name
=
source
.
get
(
"name"
),
importance
=
source
.
get
(
"importance"
),
es_id
=
e
vent
[
"id"
]
,
name
=
event
.
get
(
"name"
),
importance
=
event
.
get
(
"importance"
),
date_time
=
date_time
,
type
=
type
,
type_name
=
type_str
,
description
=
source
.
get
(
"message"
),
description
=
event
.
get
(
"message"
),
event_duration
=
event_duration
,
company_name
=
company_name
,
point_id
=
point_id
,
...
...
@@ -285,10 +250,8 @@ async def result_longgang_service(user_id, importance, page_size, page_num):
room_name
=
room_name
,
storey_room_name
=
storey_name
+
room_name
,
sid
=
sid
,
mtid
=
mtid
mtid
=
event
.
get
(
"mtid"
)
)
rows
.
append
(
alarm
)
real_total
=
es_res
[
"hits"
][
"total"
]
total
=
real_total
if
real_total
<
constants
.
ES_TOTAL_LIMIT
else
constants
.
ES_TOTAL_LIMIT
return
ListAlarmResponse
(
total
=
total
,
rows
=
rows
)
unify_api/modules/shidianu/views/open_data.py
View file @
2bd87ca5
import
random
from
pot_libs.sanic_api
import
summary
from
unify_api.modules.shidianu.components.open_data_cps
import
(
BasicInfoReq
,
BasicInfoResp
,
StbDataReq
,
StbDataResp
,
SupplementReq
BasicInfoReq
,
BasicInfoResp
,
StbDataReq
,
StbDataResp
,
SupplementReq
,
HomeDataResp
,
HomeLstAlarmReq
,
HomeAlarmStatsReq
,
HomeAlarmStatsResp
)
from
unify_api.modules.alarm_manager.components.list_alarm
import
\
ListAlarmResponse
from
unify_api.modules.shidianu.service.open_data_service
import
\
basic_info_longgang_service
,
stb_data_longgang_service
,
\
from
unify_api.modules.shidianu.service.open_data_service
import
(
basic_info_longgang_service
,
stb_data_longgang_service
,
supplement_data_service
,
result_longgang_service
)
from
pot_libs.settings
import
SETTING
from
unify_api.utils.time_format
import
last30_day_range
from
unify_api.modules.common.dao.common_dao
import
monitor_by_cid
from
unify_api.modules.home_page.service.count_info_service
import
safe_run_sdu
from
unify_api.modules.common.procedures.power_cps
import
power_use_count
from
unify_api.modules.common.procedures.alarm_cps
import
alarm_count_sdu_new
from
unify_api.modules.shidianu.service.open_data_service
import
get_power
from
unify_api.modules.common.components.select_company_cps
import
CmReq
from
pot_libs.common.components.responses
import
success_res
from
unify_api.modules.alarm_manager.service.alarm_static_service
import
(
sdu_alarm_statistics_service
)
from
unify_api.modules.home_page.procedures.count_info_pds
import
(
electric_use_info_sdu
)
from
unify_api.modules.common.dao.common_dao
import
storey_by_cid
from
unify_api.modules.alarm_manager.service.list_alarm_service
import
\
new_list_alarm_service
# 数据对外开放接口
@
summary
(
"获取装置列表"
)
async
def
post_basic_info_longgang
2
(
req
,
body
:
BasicInfoReq
)
->
BasicInfoResp
:
async
def
post_basic_info_longgang
(
req
,
body
:
BasicInfoReq
)
->
BasicInfoResp
:
user_id
=
req
.
ctx
.
user_id
# user_id = 10086
page_size
=
body
.
page_size
or
10
...
...
@@ -20,7 +41,7 @@ async def post_basic_info_longgang2(req, body: BasicInfoReq) -> BasicInfoResp:
@
summary
(
"查询数据"
)
async
def
post_stb_data_longgang
2
(
req
,
body
:
StbDataReq
)
->
StbDataResp
:
async
def
post_stb_data_longgang
(
req
,
body
:
StbDataReq
)
->
StbDataResp
:
user_id
=
req
.
ctx
.
user_id
# user_id = 10086
# cid = body.cid
...
...
@@ -29,27 +50,25 @@ async def post_stb_data_longgang2(req, body: StbDataReq) -> StbDataResp:
@
summary
(
"获取告警结果"
)
async
def
post_alarm_result_longgang
2
(
req
,
body
:
BasicInfoReq
)
->
\
async
def
post_alarm_result_longgang
(
req
,
body
:
BasicInfoReq
)
->
\
ListAlarmResponse
:
user_id
=
req
.
ctx
.
user_id
# user_id = 10086
p
age
_size
=
body
.
page_size
or
10
p
age
_num
=
body
.
page_num
or
1
p
g
_size
=
body
.
page_size
or
10
p
g
_num
=
body
.
page_num
or
1
importance
=
[
2
,
3
]
return
await
result_longgang_service
(
user_id
,
importance
,
page_size
,
page_num
)
return
await
result_longgang_service
(
user_id
,
importance
,
pg_size
,
pg_num
)
@
summary
(
"获取分析结果"
)
async
def
post_analyse_result_longgang
2
(
req
,
body
:
BasicInfoReq
)
->
\
async
def
post_analyse_result_longgang
(
req
,
body
:
BasicInfoReq
)
->
\
ListAlarmResponse
:
user_id
=
req
.
ctx
.
user_id
# user_id = 10086
p
age
_size
=
body
.
page_size
or
10
p
age
_num
=
body
.
page_num
or
1
p
g
_size
=
body
.
page_size
or
10
p
g
_num
=
body
.
page_num
or
1
importance
=
[
1
]
return
await
result_longgang_service
(
user_id
,
importance
,
page_size
,
page_num
)
return
await
result_longgang_service
(
user_id
,
importance
,
pg_size
,
pg_num
)
@
summary
(
"补充数据"
)
...
...
@@ -60,4 +79,97 @@ async def post_supplement_data(req, body: SupplementReq) -> StbDataResp:
start
=
body
.
start
end
=
body
.
end
type
=
body
.
type
return
await
supplement_data_service
(
user_id
,
cid
,
start
,
end
,
type
)
\ No newline at end of file
return
await
supplement_data_service
(
user_id
,
cid
,
start
,
end
,
type
)
@
summary
(
"首页信息"
)
async
def
post_home_page_data
(
req
,
body
:
CmReq
)
->
HomeDataResp
:
user_id
=
req
.
ctx
.
user_id
cid
=
223
cids
=
[
223
]
is_auth
=
await
get_power
(
user_id
,
cids
)
if
not
is_auth
and
not
SETTING
.
debug_mode
:
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
start
,
end
=
last30_day_range
()
product
=
4
# 安全和报警统计
res
=
await
sdu_alarm_statistics_service
([
cid
],
start
,
end
,
product
)
# 安全指数
alarm_res
=
await
electric_use_info_sdu
(
cid
)
electric_use_score
=
round
(
alarm_res
.
electric_use_score
)
# 1. 接入住户,从monitor表取,解决拆除逻辑
monitor_list
=
await
monitor_by_cid
(
cid
)
total_tenant
=
len
(
monitor_list
)
# 2. 安全运行天数: 以天计,当工厂某天I级、II级报警总数小于总户数*5%时,即为安全运行,
# 展示自接入累加安全运行天数
safe_day
=
await
safe_run_sdu
(
cid
,
total_tenant
)
# 3. 在线率
online_rate
=
88
+
random
.
choice
([
1
,
1.5
,
2
,
2.5
,
3
,
3.5
,
4
])
# 4.累计用电
total_power
=
await
power_use_count
(
cids
)
# 5. 累计报警
total_alarm
=
await
alarm_count_sdu_new
(
cids
)
return
HomeDataResp
(
risk_distribution
=
res
.
risk_distribution
,
content_distribution
=
res
.
content_distribution
,
electric_use_score
=
electric_use_score
,
total_tenant
=
total_tenant
,
online_rate
=
online_rate
,
safe_day
=
safe_day
,
total_power
=
total_power
,
total_alarm
=
total_alarm
,
)
@
summary
(
"首页-最近报警"
)
async
def
post_home_page_lst_alarm
(
req
,
body
:
HomeLstAlarmReq
)
->
\
ListAlarmResponse
:
user_id
=
req
.
ctx
.
user_id
# cid = body.cid
cid
=
223
is_auth
=
await
get_power
(
user_id
,
[
cid
])
if
not
is_auth
and
not
SETTING
.
debug_mode
:
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
importance
=
body
.
importance
page_size
=
body
.
page_size
page_num
=
body
.
page_num
start
=
body
.
start
end
=
body
.
end
product
=
4
point_ids
=
None
storeys
=
await
storey_by_cid
(
cid
)
storey_ids
=
[
item
[
"storey_id"
]
for
item
in
storeys
]
return
await
new_list_alarm_service
(
cid
,
storey_ids
,
page_num
,
page_size
,
start
,
end
,
product
,
importance
,
point_ids
)
@
summary
(
"首页-运行趋势"
)
async
def
post_home_page_alarm_stats
(
req
,
body
:
HomeAlarmStatsReq
)
->
\
HomeAlarmStatsResp
:
user_id
=
req
.
ctx
.
user_id
# cid = body.cid
cid
=
223
is_auth
=
await
get_power
(
user_id
,
[
cid
])
if
not
is_auth
and
not
SETTING
.
debug_mode
:
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
# 1. 获取参数
start
=
body
.
start
end
=
body
.
end
product
=
4
sr
=
await
sdu_alarm_statistics_service
([
cid
],
start
,
end
,
product
)
return
HomeAlarmStatsResp
(
ele_overload
=
sr
.
ele_overload
,
illegal_ele_app
=
sr
.
illegal_ele_app
,
power_quality
=
sr
.
power_quality
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment