Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
9e06d118
Commit
9e06d118
authored
May 12, 2023
by
wang.wenrong
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'wwr' into 'develop'
2s scopedownloads See merge request
!41
parents
bba96b1a
d27c2ae6
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
91 additions
and
52 deletions
+91
-52
scope_operations_dao.py
unify_api/modules/anshiu/dao/scope_operations_dao.py
+26
-0
scope_operations_pds.py
unify_api/modules/anshiu/procedures/scope_operations_pds.py
+47
-17
scope_operations_serv.py
unify_api/modules/anshiu/service/scope_operations_serv.py
+5
-34
data_es_dao.py
unify_api/modules/zhiwei_u/dao/data_es_dao.py
+13
-1
No files found.
unify_api/modules/anshiu/dao/scope_operations_dao.py
View file @
9e06d118
...
...
@@ -50,3 +50,29 @@ async def get_threshold_by_mtid(mtid):
async
with
MysqlUtil
()
as
conn
:
threshold
=
await
conn
.
fetch_value
(
sql
,
args
=
(
mtid
,))
return
threshold
async
def
get_scope_url_by_pid
(
mtid
,
start_dt
,
end_dt
):
"""
获取录波详情
:param pid:
:param event_datetime:
:return:
"""
sql
=
f
"""
SELECT
url,
DATE_FORMAT(create_time, '
%
Y-
%
m-
%
d
%
H:
%
i:
%
s') datetime
FROM
point_1min_scope
WHERE
create_time > '{start_dt}'
AND create_time < '{end_dt}'
AND mtid = {mtid}
AND scope_g = '2s'
"""
async
with
MysqlUtil
()
as
conn
:
result
=
await
conn
.
fetchall
(
sql
,)
return
result
unify_api/modules/anshiu/procedures/scope_operations_pds.py
View file @
9e06d118
import
datetime
import
json
import
math
from
pot_libs.qingstor_util.qs_client
import
QsClient
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.logger
import
log
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
from
unify_api
import
constants
from
unify_api.modules.anshiu.components.scope_operations_cps
import
\
ScopeDetail
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_mtid_by_pid_dao
from
unify_api.modules.anshiu.dao.scope_operations_dao
import
\
get_scope_url_by_pid
from
unify_api.modules.electric.procedures.electric_util
import
\
get_wiring_type_new15
from
unify_api.utils.log_utils
import
LOGGER
async
def
get_scope_config_by_pid
(
pid
):
...
...
@@ -54,20 +66,38 @@ async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"):
"""
ES查询2s录波记录数据
"""
query_body
=
{
"size"
:
10000
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"point_id"
:
pids
}},
{
"term"
:
{
"scope_g"
:
{
"value"
:
scope_g
}}},
{
"range"
:
{
"datetime"
:
{
"gte"
:
start_dt
,
"lt"
:
end_dt
}}}
]
}
},
"sort"
:
[{
"datetime"
:
{
"order"
:
"asc"
}}]
}
async
with
EsUtil
()
as
es
:
datas
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_1MIN_SCOPE
)
return
datas
[
"hits"
][
"hits"
]
\ No newline at end of file
mtid
=
await
get_mtid_by_pid_dao
(
pids
)
mtid
=
mtid
.
get
(
'mtid'
)
# 获取录波曲线数据
scope_url_data
=
await
get_scope_url_by_pid
(
mtid
,
start_dt
,
end_dt
)
scope_list
=
[]
starttime_wm
=
datetime
.
datetime
.
now
()
print
(
f
'起始时间:{starttime_wm}'
)
for
scope_data
in
scope_url_data
:
try
:
async
with
QsClient
()
as
qs
:
url
=
scope_data
.
get
(
"url"
)
wave_data
=
await
qs
.
get_object
(
url
)
[
scope_list
.
append
({
"datetime"
:
scope_data
[
'datetime'
],
"ua"
:
wave_data
[
'ua'
][
i
],
"ub"
:
wave_data
[
'ub'
][
i
],
"uc"
:
wave_data
[
'uc'
][
i
],
"ia"
:
wave_data
[
'ia'
][
i
],
"ib"
:
wave_data
[
'ib'
][
i
],
"ic"
:
wave_data
[
'ic'
][
i
],
"lc"
:
wave_data
[
'lc'
][
i
],
"pttl"
:
wave_data
[
'pttl'
][
i
]})
for
i
in
range
(
len
(
wave_data
.
get
(
'ua'
)))]
# for i in range(len(wave_data['ua'])):
# a_dict = dict(zip(wave_data.keys(),
# list(zip(*wave_data.values()))[i]))
# a_dict["datetime"] = scope_data['datetime']
# scope_list.append(a_dict)
except
Exception
as
e
:
LOGGER
.
error
(
f
"scope_detail_service error message:{str(e)}"
)
return
[]
end_time_wm
=
datetime
.
datetime
.
now
()
print
(
f
'结束时间:{end_time_wm}'
)
return
scope_list
unify_api/modules/anshiu/service/scope_operations_serv.py
View file @
9e06d118
...
...
@@ -35,8 +35,6 @@ from unify_api.modules.electric.procedures.electric_util import \
from
unify_api.modules.zhiwei_u
import
config
from
unify_api.modules.zhiwei_u.dao.data_es_dao
import
query_search_scope_pids
,
\
query_search_scope
from
unify_api.modules.zhiwei_u.service.scope_operations_service
import
\
scope_detail_service
from
unify_api.utils
import
time_format
from
unify_api.utils.time_format
import
get_time_duration
,
\
get_current_datetime_str
,
convert_str_to_timestamp
...
...
@@ -48,12 +46,10 @@ async def search_scope_service(pids, cid, page_num, page_size, start, end,
获取录波记录
'''
datas
=
await
query_search_scope
(
cid
,
pids
,
page_num
,
page_size
,
start
,
end
,
scope_g
)
datas
,
total
=
await
query_search_scope
(
cid
,
pids
,
page_num
,
page_size
,
start
,
end
,
scope_g
)
total
=
len
(
datas
)
return
datas
,
total
return
datas
,
total
[
"total"
]
async
def
scope_list_download_data
(
pids
,
start
,
end
):
...
...
@@ -63,33 +59,8 @@ async def scope_list_download_data(pids, start, end):
duration
=
get_time_duration
(
start
,
end
,
False
,
False
)
if
duration
>
24
*
60
*
60
:
raise
BusinessException
(
message
=
'只能支持下载一天的录波数据'
)
start_dt
=
convert_to_es_str
(
start
)
end_dt
=
convert_to_es_str
(
end
)
datas
=
await
get_scope_list_by_pid
(
pids
,
start_dt
,
end_dt
)
scope_data
=
[]
scope_fields
=
[
"ua"
,
"ub"
,
"uc"
,
"ia"
,
"ib"
,
"ic"
,
"pttl"
,
"lc"
,
]
for
hit
in
datas
:
_source
=
hit
[
"_source"
]
context
=
json
.
loads
(
_source
[
"context"
])
# 只导出录波的数据
wave_data
=
context
.
get
(
"ia"
)
if
not
wave_data
:
continue
for
i
,
_
in
enumerate
(
wave_data
):
datetime
=
_source
[
"datetime"
][:
19
]
.
replace
(
'T'
,
' '
)
one_scope_data
=
{}
for
field
in
scope_fields
:
if
field
in
context
:
value
=
context
.
get
(
field
)[
i
]
value
=
""
if
pd
.
isna
(
value
)
else
value
else
:
value
=
None
one_scope_data
[
field
]
=
value
item
=
ScopeItemDownload
(
datetime
=
datetime
,
**
one_scope_data
)
scope_data
.
append
(
item
)
return
scope_data
datas
=
await
get_scope_list_by_pid
(
pids
,
start
,
end
)
return
datas
async
def
scope_detail_data
(
doc_id
):
...
...
unify_api/modules/zhiwei_u/dao/data_es_dao.py
View file @
9e06d118
...
...
@@ -139,11 +139,23 @@ async def query_search_scope(cid, pid, page_num, page_size,
pe.create_time DESC
LIMIT {page_num} , {page_size} """
total_sql
=
f
"""
SELECT
count(*) total
FROM
point_1min_event pt
LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid
AND pe.create_time = pt.event_datetime
WHERE
pt.event_mode = 'scope'
{where}
"""
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchall
(
sql
,
)
total
=
await
conn
.
fetchone
(
total_sql
)
return
data
return
data
,
total
async
def
get_scope_pids
(
pids
,
start
,
end
):
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment