Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
fed8b6df
Commit
fed8b6df
authored
Jul 20, 2023
by
ZZH
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
code optimize 2023-7-20
parent
8080048e
Show whitespace changes
Inline
Side-by-side
Showing
8 changed files
with
48 additions
and
172 deletions
+48
-172
adio_dao.py
unify_api/modules/adio/dao/adio_dao.py
+0
-20
adio.py
unify_api/modules/adio/views/adio.py
+1
-8
alarm_static_pds.py
..._api/modules/alarm_manager/procedures/alarm_static_pds.py
+0
-4
cids.py
unify_api/modules/common/procedures/cids.py
+0
-1
electric_pds.py
unify_api/modules/electric/procedures/electric_pds.py
+0
-36
health_index_service.py
unify_api/modules/home_page/service/health_index_service.py
+25
-62
open_data_service.py
unify_api/modules/shidianu/service/open_data_service.py
+22
-13
taos_new.py
unify_api/utils/taos_new.py
+0
-28
No files found.
unify_api/modules/adio/dao/adio_dao.py
View file @
fed8b6df
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
pot_libs.settings
import
SETTING
from
unify_api.modules.common.service.td_engine_service
import
\
get_td_engine_data
from
unify_api.utils.taos_new
import
parse_td_columns
async
def
get_location_dao
(
lids
):
...
...
@@ -28,19 +24,3 @@ async def get_location_15min_dao(lid, start, end,
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,
args
=
(
lid
,))
return
result
async
def
get_adio_current_data
(
mtid
):
'''
获取安全监测实时数据
'''
url
=
f
"{SETTING.stb_url}db_adio?tz=Asia/Shanghai"
sql
=
f
"select last_row(*) from mt{mtid}_adi"
is_success
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
not
is_success
:
return
{}
if
not
results
[
'data'
]:
return
{}
head
=
parse_td_columns
(
results
)
res
=
dict
(
zip
(
head
,
results
[
'data'
][
0
]))
return
res
unify_api/modules/adio/views/adio.py
View file @
fed8b6df
# -*- coding:utf-8 -*-
#
# Author:jing
# Date: 2020/7/9
import
asyncio
import
json
import
time
...
...
@@ -17,7 +13,6 @@ from unify_api.modules.adio.service.adio_card import adio_index_service
from
unify_api.utils
import
time_format
from
unify_api.utils.time_format
import
CST
,
YMD_Hms
,
timestamp2dts
from
unify_api
import
constants
from
unify_api.utils.common_utils
import
round_2
from
unify_api.modules.adio.components.adio
import
(
AdioHistoryResponse
,
AdioCurrentResponse
,
...
...
@@ -30,9 +25,7 @@ from unify_api.modules.adio.components.adio import (
adio_current_example
,
)
from
pot_libs.common.components.query
import
PageRequest
from
unify_api.modules.adio.dao.adio_dao
import
(
get_location_dao
,
get_location_15min_dao
,
get_adio_current_data
)
from
unify_api.modules.adio.dao.adio_dao
import
get_location_dao
@
summary
(
"返回安全监测历史曲线"
)
...
...
unify_api/modules/alarm_manager/procedures/alarm_static_pds.py
View file @
fed8b6df
...
...
@@ -31,9 +31,6 @@ async def sdu_alarm_content_info(cids, start, end, points):
f
"{tuple(SDU_ALARM_LIST)} and event_datetime "
\
f
"BETWEEN '{start}' and '{end}' {mid_sql} "
\
f
"GROUP BY appliance"
log
.
info
(
f
"sql:{sql}"
)
log
.
info
(
f
"point_sql:{point_sql}"
)
log
.
info
(
f
"appliance_sql:{appliance_sql}"
)
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
)
point_datas
=
await
conn
.
fetchall
(
point_sql
)
...
...
@@ -49,7 +46,6 @@ async def sdu_alarm_content_info(cids, start, end, points):
}
# time_slots = list(set(str(data["dt"]) for data in datas))
time_slots
=
proxy_power_slots
(
start
,
end
,
"MM-DD"
,
True
)
log
.
info
(
f
"time_slots:{time_slots}"
)
# 线路过载
ele_overload
=
{
"slots"
:
time_slots
,
"value"
:
[
0
]
*
len
(
time_slots
)}
# 违规电器接入
...
...
unify_api/modules/common/procedures/cids.py
View file @
fed8b6df
...
...
@@ -31,7 +31,6 @@ async def get_cid_info(company_ids=None, proxy_ids=None, all=False):
async
with
MysqlUtil
()
as
conn
:
companys
=
await
conn
.
fetchall
(
company_sql
,
args
=
args
)
company_map
=
{
i
[
"cid"
]:
i
for
i
in
companys
}
print
(
"company_map"
,
company_map
)
return
company_map
...
...
unify_api/modules/electric/procedures/electric_pds.py
View file @
fed8b6df
import
pendulum
from
pot_libs.settings
import
SETTING
from
unify_api.constants
import
CST
from
unify_api.modules.common.service.td_engine_service
import
\
get_td_engine_data
from
unify_api.utils.taos_new
import
parse_td_columns
,
td3_tbl_compate
from
unify_api.utils.time_format
import
get_15min_ago
async
def
elec_current_data
(
mtids
,
cid
):
res_map
=
{}
last_15min_time
=
get_15min_ago
()
url
=
f
"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
table_name
=
[
"mt{}_ele"
.
format
(
mtid
)
for
mtid
in
mtids
]
td_tables
=
td3_tbl_compate
(
table_name
)
sql
=
f
"select last_row(*) from electric_stb "
\
f
"where TBNAME IN {td_tables} and ts >= '{last_15min_time}' "
\
f
"group by tbname"
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
is_succ
:
head
=
parse_td_columns
(
results
)
for
res
in
results
[
"data"
]:
data
=
dict
(
zip
(
head
,
res
))
if
data
[
"mtid"
]
in
mtids
:
res_map
[
data
[
"mtid"
]]
=
data
return
res_map
def
trans_electric_tdengine_data
(
results
):
head
=
parse_td_columns
(
results
)
if
not
results
[
"data"
]:
results
[
"data"
]
=
[
''
for
i
in
range
(
len
(
head
))]
res
=
dict
(
zip
(
head
,
results
[
"data"
][
0
]))
return
res
unify_api/modules/home_page/service/health_index_service.py
View file @
fed8b6df
import
time
import
pendulum
from
unify_api.modules.electric.dao.electric_dao
import
\
get_elec_mtid_sid_by_cid
from
unify_api.utils.common_utils
import
round_2
from
pot_libs.logger
import
log
from
pot_libs.settings
import
SETTING
from
unify_api.modules.home_page.procedures.dev_grade
import
get_dev_grade
from
unify_api.utils
import
time_format
from
unify_api.constants
import
REAL_EXP_TIME
from
pot_libs.utils.exc_util
import
ParamException
,
BusinessException
from
unify_api.modules.home_page.components.health_index
import
\
HealthCtlRateRes
from
unify_api.modules.zhiwei_u.dao.warning_operations_dao
import
\
select_point_dao
from
unify_api.modules.common.service.td_engine_service
import
\
get_td_engine_data
from
unify_api.utils.taos_new
import
parse_td_columns
,
td3_tbl_compate
,
\
get_td_table_name
from
unify_api.modules.electric.service.electric_service
import
(
batch_load_rt_ele_with_hr
)
async
def
health_ctl_rate_srv
(
cid
):
if
cid
<=
0
:
log
.
error
(
"param error"
)
raise
ParamException
(
message
=
"参数错误, cid参数必须是一个正整数!"
)
point_infos
=
await
select_point_dao
(
cid
)
if
not
point_infos
:
if
not
await
select_point_dao
(
cid
)
:
log
.
error
(
"cid:
%
s no point da;ta"
%
cid
)
raise
BusinessException
(
message
=
"工厂没有任何监测点!"
)
stats
=
{
"lf"
:
0
,
"costtl"
:
0
,
"freq_dev"
:
0
,
"thdu"
:
0
,
"v_dev"
:
0
,
"ubl"
:
0
}
now_ts
=
int
(
time
.
time
())
real_tt
=
now_ts
total
=
0
datas
=
await
get_elec_mtid_sid_by_cid
(
cid
)
td_mt_tables
=
tuple
(
(
get_td_table_name
(
"electric"
,
data
[
"mtid"
])
for
data
in
datas
if
data
[
"mtid"
]))
td_mt_tables
=
td3_tbl_compate
(
td_mt_tables
)
sql
=
f
"select last_row(*) from electric_stb "
\
f
"where TBNAME IN {td_mt_tables} group by tbname"
url
=
f
"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
time_str
=
time_format
.
get_datetime_str
(
real_tt
)
if
not
is_succ
:
log
.
warn
(
f
"cid={cid} 无任何有效mid"
)
return
HealthCtlRateRes
(
real_time
=
time_str
,
lf
=
1
,
costtl
=
1
,
thdu
=
1
,
v_dev
=
1
,
freq_dev
=
1
,
ubl
=
1
)
if
not
results
[
"data"
]:
# 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables
=
tuple
(
(
f
"s{data['sid'].lower()}_e"
for
data
in
datas
if
data
[
"sid"
]))
lst_mtid_sid
=
await
get_elec_mtid_sid_by_cid
(
cid
)
mtids
=
[
mtid_sid
[
"mtid"
]
for
mtid_sid
in
lst_mtid_sid
]
td_s_tables
=
td3_tbl_compate
(
td_s_tables
)
sql
=
f
"select last_row(*) from electric_stb "
\
f
"where TBNAME IN {td_s_tables} group by tbname"
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
not
is_succ
:
log
.
warn
(
f
"cid={cid} 无任何有效mid"
)
return
HealthCtlRateRes
(
real_time
=
time_str
,
lf
=
1
,
costtl
=
1
,
thdu
=
1
,
v_dev
=
1
,
freq_dev
=
1
,
ubl
=
1
)
head
=
parse_td_columns
(
results
)
datas
=
[]
for
res
in
results
[
"data"
]:
datas
.
append
(
dict
(
zip
(
head
,
res
)))
for
data
in
datas
:
real_tt
=
pendulum
.
parse
(
data
[
"ts"
])
.
int_timestamp
if
now_ts
-
real_tt
>
REAL_EXP_TIME
:
continue
total
+=
1
ctnum
=
data
.
get
(
"ctnum"
)
d_rt_ele
=
await
batch_load_rt_ele_with_hr
(
mtids
)
for
mtid
,
rt_ele
in
d_rt_ele
.
items
():
ctnum
=
rt_ele
.
get
(
"ctnum"
)
# 电压偏差
v_dev
=
data
.
get
(
"ua_dev"
)
if
ctnum
==
3
else
data
.
get
(
"uab_dev"
)
v_dev
=
rt_ele
.
get
(
"ua_dev"
)
if
ctnum
==
3
else
rt_ele
.
get
(
"uab_dev"
)
grade
=
get_dev_grade
(
dev_type
=
"v"
,
cur
=
v_dev
)
if
grade
and
grade
>=
60
:
stats
[
"v_dev"
]
+=
1
# 频率偏差
freq_dev
=
data
.
get
(
"freq_dev"
)
grade
=
get_dev_grade
(
dev_type
=
"freq"
,
cur
=
freq_dev
)
grade
=
get_dev_grade
(
dev_type
=
"freq"
,
cur
=
rt_ele
.
get
(
"freq_dev"
))
if
grade
and
grade
>=
60
:
stats
[
"freq_dev"
]
+=
1
# 三相电压不平衡度
ubl
=
data
.
get
(
"ubl"
)
grade
=
get_dev_grade
(
dev_type
=
"ubl"
,
cur
=
ubl
)
grade
=
get_dev_grade
(
dev_type
=
"ubl"
,
cur
=
rt_ele
.
get
(
"ubl"
))
if
grade
and
grade
>=
60
:
stats
[
"ubl"
]
+=
1
# 功率因数
costtl
=
data
.
get
(
"costtl"
)
grade
=
get_dev_grade
(
dev_type
=
"costtl"
,
cur
=
costtl
)
grade
=
get_dev_grade
(
dev_type
=
"costtl"
,
cur
=
rt_ele
.
get
(
"costtl"
))
if
grade
and
grade
>=
60
:
stats
[
"costtl"
]
+=
1
# (电压)谐波畸变率
thdu
=
data
.
get
(
"thdua"
)
if
ctnum
==
3
else
data
.
get
(
"thduab"
)
thdu
=
rt_ele
.
get
(
"thdua"
)
if
ctnum
==
3
else
rt_ele
.
get
(
"thduab"
)
grade
=
get_dev_grade
(
dev_type
=
"thdu"
,
cur
=
thdu
)
if
grade
and
grade
>=
60
:
stats
[
"thdu"
]
+=
1
# 负载率
lf
=
data
.
get
(
"lf"
)
lf
=
rt_ele
.
get
(
"lf"
)
if
lf
is
None
:
stats
[
"lf"
]
+=
1
else
:
grade
=
get_dev_grade
(
dev_type
=
"lf"
,
cur
=
lf
)
if
grade
and
grade
>=
60
:
stats
[
"lf"
]
+=
1
time_str
=
time_format
.
get_datetime_str
(
int
(
time
.
time
()))
total
=
len
(
d_rt_ele
)
if
total
==
0
:
return
HealthCtlRateRes
(
real_time
=
time_str
,
lf
=
1
,
costtl
=
1
,
thdu
=
1
,
v_dev
=
1
,
freq_dev
=
1
,
ubl
=
1
)
...
...
unify_api/modules/shidianu/service/open_data_service.py
View file @
fed8b6df
import
json
import
base64
import
re
import
pendulum
from
unify_api.modules.electric.dao.electric_dao
import
\
get_elec_mtid_sid_by_cid
from
unify_api
import
constants
from
pot_libs.common.components.responses
import
success_res
from
unify_api.modules.shidianu.components.open_data_cps
import
(
...
...
@@ -20,6 +21,9 @@ from unify_api.utils.taos_new import get_td_engine_data, parse_td_columns
from
unify_api.utils.time_format
import
(
CST
,
convert_dt_to_timestr
,
convert_to_dt
)
from
unify_api.modules.electric.service.electric_service
import
(
batch_load_rt_ele_with_hr
)
async
def
basic_info_longgang_service
(
user_id
,
pg_size
,
pg_num
):
...
...
@@ -77,6 +81,15 @@ async def stb_data_longgang_service(user_id, d_type):
if
not
await
get_power
(
user_id
,
cids
):
return
success_res
(
code
=
4001
,
msg
=
"您没有权限访问"
)
if
d_type
==
"appliance"
:
return
success_res
(
code
=
4003
,
msg
=
"未查找到数据"
)
elif
d_type
==
"electric"
:
lst_mtid_sid
=
await
get_elec_mtid_sid_by_cid
(
cids
[
0
])
mtids
=
[
mtid_sid
[
"mtid"
]
for
mtid_sid
in
lst_mtid_sid
]
d_rt_ele
=
await
batch_load_rt_ele_with_hr
(
mtids
)
datas
=
list
(
d_rt_ele
.
values
())
else
:
db_name
=
topic2db
[
d_type
]
stb_url
=
f
"{SETTING.stb_url}{db_name}?tz=Asia/Shanghai"
sql
=
f
"select last_row(*) from {db_name}.{d_type}_stb "
\
...
...
@@ -87,12 +100,8 @@ async def stb_data_longgang_service(user_id, d_type):
head
=
parse_td_columns
(
results
)
datas
=
[
dict
(
zip
(
head
,
r
))
for
r
in
results
[
"data"
]]
if
d_type
==
"electric"
:
[
data
.
pop
(
"ts_received"
)
for
data
in
datas
]
else
:
for
data
in
datas
:
data
[
"ts"
]
=
data
[
"ts_origin"
]
data
.
pop
(
"ts_origin"
)
data
[
"ts"
]
=
data
.
pop
(
"ts_origin"
)
await
RedisUtils
()
.
setex
(
access_lim_key
,
60
,
1
)
return
StbDataResp
(
rows
=
datas
,
total
=
results
.
get
(
"rows"
,
0
))
...
...
unify_api/utils/taos_new.py
View file @
fed8b6df
...
...
@@ -3,7 +3,6 @@ import re
from
pot_libs.logger
import
log
from
pot_libs.aiohttp_util.aiohttp_utils
import
AioHttpUtils
# from bromake.modules.shidianu.service.open_data_service import get_token
from
pot_libs.settings
import
SETTING
import
base64
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
...
...
@@ -66,33 +65,6 @@ def test_td_engine():
print
(
a_list
)
async
def
elec_current_data_new16
(
mtids
):
res_map
=
{}
url
=
f
"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
table_names
=
[
get_td_table_name
(
"electric"
,
mtid
)
for
mtid
in
mtids
]
if
len
(
table_names
)
>
1
:
for
table_name
in
table_names
:
sql
=
f
"select last_row(*) from {table_name} group by tbname "
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
is_succ
:
head
=
parse_td_columns
(
results
)
for
res
in
results
[
"data"
]:
data
=
dict
(
zip
(
head
,
res
))
res_map
[
data
[
"mtid"
]]
=
data
res_map
.
update
(
res_map
)
return
res_map
,
{}
else
:
sql
=
f
"select last_row(*) from "
+
""
.
join
(
table_names
)
+
"group by tbname "
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
if
is_succ
:
head
=
parse_td_columns
(
results
)
for
res
in
results
[
"data"
]:
data
=
dict
(
zip
(
head
,
res
))
res_map
[
data
[
"mtid"
]]
=
data
return
res_map
,
{}
# td 3.0
def
td3_tbl_compate
(
td_tables
):
if
len
(
td_tables
)
>
1
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment