Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
fbacfff2
Commit
fbacfff2
authored
Jun 25, 2023
by
lcn
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
bug修复
parent
99c71089
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
80 additions
and
103 deletions
+80
-103
power_cps.py
unify_api/modules/common/procedures/power_cps.py
+4
-3
electric_dao.py
unify_api/modules/electric/dao/electric_dao.py
+2
-0
data_es_dao.py
unify_api/modules/zhiwei_u/dao/data_es_dao.py
+47
-70
data_operation_service.py
unify_api/modules/zhiwei_u/service/data_operation_service.py
+27
-30
No files found.
unify_api/modules/common/procedures/power_cps.py
View file @
fbacfff2
...
...
@@ -9,11 +9,11 @@ async def power_use_count(company_ids):
"size"
:
0
,
"aggs"
:
{
"kwh"
:
{
"sum"
:
{
"field"
:
"kwh"
}}},
}
async
with
EsUtil
()
as
es
:
es_result
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
COMPANY_15MIN_POWER
)
total_power
=
round
(
es_result
.
get
(
"aggregations"
,
{})
.
get
(
"kwh"
,
{})
.
get
(
"value"
)
or
0
,
2
)
return
total_power
...
...
@@ -30,7 +30,8 @@ async def load_cmpy_power(cids):
async
def
inline_power_use_info
(
inline_ids
,
month_str
):
sql
=
"SELECT inlid, sum(kwh) kwh, sum(charge) charge, sum(p) p FROM "
\
"`inline_1day_power` where inlid in
%
s and "
\
f
"DATE_FORMAT(create_time, '
%%
Y-
%%
m')='{month_str}' GROUP BY inlid"
f
"DATE_FORMAT(create_time, '
%%
Y-
%%
m-
%%
d')='{month_str}' GROUP BY "
\
f
"inlid"
async
with
MysqlUtil
()
as
conn
:
datas
=
await
conn
.
fetchall
(
sql
,
args
=
(
inline_ids
,))
inline_power_info_map
=
{
...
...
unify_api/modules/electric/dao/electric_dao.py
View file @
fbacfff2
...
...
@@ -75,6 +75,8 @@ async def get_elec_mtid_sid_by_cid(cid):
async
def
load_add_to_compy_ids
(
cid
):
if
not
isinstance
(
cid
,
list
)
or
not
isinstance
(
cid
,
tuple
):
cid
=
[
cid
]
db
=
SETTING
.
mysql_db
sql
=
f
"SELECT monitor.mtid, monitor.sid FROM {db}.monitor "
\
f
"INNER JOIN {db}.point ON point.mtid=monitor.mtid "
\
...
...
unify_api/modules/zhiwei_u/dao/data_es_dao.py
View file @
fbacfff2
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.settings
import
SETTING
from
unify_api.modules.anshiu.dao.fine_monitor_dao
import
get_mtid_by_pid_dao
,
\
get_sid_by_mtid_dao
,
get_mtids_by_pids_dao
from
unify_api.utils.time_format
import
convert_es_str
from
unify_api.utils.common_utils
import
make_tdengine_data_as_list
from
unify_api.utils.taos_new
import
get_td_engine_data
from
unify_api.utils.time_format
import
convert_es_str
,
CST
from
unify_api.modules.zhiwei_u.config
import
SCOPE_DATABASE
from
pot_libs.mysql_util.mysql_util
import
MysqlUtil
async
def
query_point_1min_index
(
p_database
,
date_start
,
date_end
,
point_id
):
async
def
query_point_1min_index
(
date_start
,
date_end
,
mtid
,
fields
=
None
):
"""point点某一天1440个点数据"""
start_es
=
convert_es_str
(
date_start
)
end_es
=
convert_es_str
(
date_end
)
query_body
=
{
"size"
:
10000
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"point_id"
:
point_id
}
},
{
"range"
:
{
"datetime"
:
{
"gte"
:
start_es
,
"lte"
:
end_es
}
}
}
]
}
},
"sort"
:
[{
"datetime"
:
{
"order"
:
"asc"
}}]
}
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search
(
body
=
query_body
,
index
=
p_database
)
return
es_re
async
def
query_location_1min_index
(
l_database
,
date_start
,
date_end
,
location_id
):
if
fields
:
fields
.
insert
(
0
,
'ts'
)
fields_str
=
","
.
join
(
fields
)
else
:
fields_str
=
"*"
db
=
"db_electric"
url
=
f
"{SETTING.stb_url}{db}"
table_name
=
"mt
%
s_ele"
%
mtid
sql
=
f
"select {fields_str} from {table_name} where ts >= '{date_start}'"
\
f
" and ts <= '{date_end}' order by ts asc"
is_succ
,
tdengine_data
=
await
get_td_engine_data
(
url
,
sql
)
if
not
is_succ
:
return
[]
results
=
make_tdengine_data_as_list
(
tdengine_data
)
return
results
async
def
query_location_1min_index
(
date_start
,
date_end
,
mtid
,
fields
=
None
):
"""location点某一天1440*n个点数据"""
start_es
=
convert_es_str
(
date_start
)
end_es
=
convert_es_str
(
date_end
)
query_body
=
{
"size"
:
10000
,
"query"
:
{
"bool"
:
{
"must"
:
[
{
"terms"
:
{
"location_id"
:
location_id
}
},
{
"range"
:
{
"datetime"
:
{
"gte"
:
start_es
,
"lte"
:
end_es
}
}
}
]
}
},
"sort"
:
[{
"datetime"
:
{
"order"
:
"asc"
}}]
}
async
with
EsUtil
()
as
es
:
es_re
=
await
es
.
search
(
body
=
query_body
,
index
=
l_database
)
return
es_re
if
fields
:
fields
.
insert
(
0
,
'ts'
)
fields_str
=
","
.
join
(
fields
)
else
:
fields_str
=
"*"
db
=
"db_adio"
url
=
f
"{SETTING.stb_url}{db}"
table_name
=
"mt
%
s_adi"
%
mtid
sql
=
f
"select {fields_str} from {table_name} where ts >= '{date_start}'"
\
f
" and ts <= '{date_end}' order by ts asc"
is_succ
,
tdengine_data
=
await
get_td_engine_data
(
url
,
sql
)
if
not
is_succ
:
return
[]
results
=
make_tdengine_data_as_list
(
tdengine_data
)
return
results
async
def
get_search_scope
(
cid
,
pid
,
start
,
end
):
...
...
@@ -97,12 +74,12 @@ async def query_search_scope(cid, pid, page_num, page_size,
查询录波列表
"""
if
len
(
pid
)
>
1
:
mtid
=
await
get_mtids_by_pids_dao
(
pid
)
else
:
mtid
=
await
get_mtid_by_pid_dao
(
pid
)
where
=
""
if
cid
:
where
+=
f
" and pe.cid={cid} "
...
...
@@ -120,7 +97,7 @@ async def query_search_scope(cid, pid, page_num, page_size,
where
+=
f
" AND pe.scope_g = {scope_g[0]} "
else
:
where
+=
f
" AND pe.scope_g in {tuple(scope_g)} "
sql
=
f
"""
SELECT
pt.event_id doc_id,
...
...
@@ -138,7 +115,7 @@ async def query_search_scope(cid, pid, page_num, page_size,
ORDER BY
pe.create_time DESC
LIMIT {page_num} , {page_size} """
total_sql
=
f
"""
SELECT
count(*) total
...
...
@@ -151,10 +128,10 @@ async def query_search_scope(cid, pid, page_num, page_size,
{where}
"""
async
with
MysqlUtil
()
as
conn
:
data
=
await
conn
.
fetchall
(
sql
,
)
total
=
await
conn
.
fetchone
(
total_sql
)
return
data
,
total
...
...
unify_api/modules/zhiwei_u/service/data_operation_service.py
View file @
fbacfff2
...
...
@@ -41,7 +41,7 @@ async def sid_to_params_service(sid, pid):
data
=
await
get_mtid_by_sid
(
sid
)
else
:
data
=
await
get_mtids_by_pid
(
pid
)
datas
=
config
.
PHYSICAL
.
copy
()
if
len
(
data
)
==
1
:
location_id
=
await
get_params_by_mtid
(
data
[
0
]
.
get
(
"mtid"
))
...
...
@@ -58,25 +58,21 @@ async def sid_to_params_service(sid, pid):
async
def
get_temp_data
(
pid
,
start
,
end
,
slots
):
mt_data
=
await
get_mtid_by_pid
(
pid
)
location_data
=
await
get_locationid_by_mtid
(
mt_data
[
"mtid"
],
"temperature"
)
all_datas
=
{
"Atemp"
:
[
''
for
_
in
range
(
1440
)],
"Btemp"
:
[
''
for
_
in
range
(
1440
)],
"Ctemp"
:
[
''
for
_
in
range
(
1440
)],
"Ntemp"
:
[
''
for
_
in
range
(
1440
)]
}
location_id
=
[
i
.
get
(
"id"
)
for
i
in
location_data
]
location_temp
=
{
location
[
"id"
]:
location
[
"item"
][
0
]
+
location
[
"type"
][:
4
]
for
location
in
location_data
}
l_database
=
"poweriot_location_1min_aiao_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
datas
=
await
query_location_1min_index
(
l_database
,
start
,
end
,
location_id
)
datas
=
await
query_location_1min_index
(
start
,
end
,
mt_data
[
"mtid"
],
fields
=
[
"temp1"
,
"temp2"
,
"temp3"
,
"temp4"
])
for
data
in
datas
:
key
=
location_temp
[
data
[
"location_id"
]]
index
=
slots
.
index
(
data
[
"datetime"
][
11
:
16
])
all_datas
[
key
][
index
]
=
data
[
"value"
]
index
=
slots
.
index
(
data
[
"ts"
])
all_datas
[
"Atemp"
][
index
]
=
data
[
"temp1"
]
all_datas
[
"Btemp"
][
index
]
=
data
[
"temp2"
]
all_datas
[
"Ctemp"
][
index
]
=
data
[
"temp3"
]
all_datas
[
"Ntemp"
][
index
]
=
data
[
"temp4"
]
return
all_datas
...
...
@@ -101,13 +97,11 @@ async def data_operation_search_service(mtid, point, params, start, end,
threhold
=
await
get_residual_current_threhold
(
resi_data
[
0
][
"id"
])
residual_current_threhold
=
threhold
[
"threshold"
]
if
threhold
else
30
resi_mid_data
=
{
"resi"
:
[
''
for
_
in
range
(
1440
)]}
l_database
=
"poweriot_location_1min_aiao_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
resi_datas
=
await
query_location_1min_index
(
l_database
,
start
,
end
,
[
resi_data
[
0
][
"id"
]])
resi_datas
=
await
query_location_1min_index
(
start
,
end
,
mt_data
[
"mtid"
],
fields
=
[
"residual_current"
])
for
data
in
resi_datas
:
index
=
slots
.
index
(
data
[
"
datetime"
][
11
:
16
])
resi_mid_data
[
"resi"
][
index
]
=
data
[
"
value
"
]
index
=
slots
.
index
(
data
[
"
ts"
])
resi_mid_data
[
"resi"
][
index
]
=
data
[
"
residual_current
"
]
resi
=
RESI
(
threhold
=
residual_current_threhold
,
value_slots
=
resi_mid_data
.
get
(
"resi"
))
ctnum
,
_
=
await
get_wiring_type
(
point
)
...
...
@@ -125,10 +119,8 @@ async def data_operation_search_service(mtid, point, params, start, end,
words
.
append
(
p
)
for
i
in
words
:
all_datas
[
i
]
=
[]
p_database
=
"poweriot_point_1min_index_"
+
start
[:
4
]
+
"_"
+
\
str
(
int
(
start
[
5
:
7
]))
origin_datas
=
await
query_point_1min_index
(
p_database
,
start
,
end
,
point
)
new_data
=
{
d
[
"datetime"
][
11
:
16
]:
d
for
d
in
origin_datas
}
origin_datas
=
await
query_point_1min_index
(
start
,
end
,
mtid
,
words
)
new_data
=
{
d
[
"ts"
]:
d
for
d
in
origin_datas
}
for
slot
in
slots
:
if
slot
in
new_data
.
keys
():
for
i
in
all_datas
.
keys
():
...
...
@@ -232,7 +224,7 @@ async def data_operation_search_service(mtid, point, params, start, end,
dict_data
.
update
(
residual_current
)
dict_data2
=
{
config
.
FILE_KEY
[
k
]:
all_datas
.
get
(
k
)
for
k
in
key
}
dict_data
.
update
(
dict_data2
)
if
"fdie"
in
params
and
sdu_i
and
len
(
sdu_u
)
==
2
:
dict_data
[
config
.
FILE_KEY
[
f
"fdie{sdu_i[-1]}"
]]
=
\
get_nums
(
f
"fdi{sdu_i[-1]}"
,
f
"thdi{sdu_i[-1]}"
,
all_datas
)
...
...
@@ -246,7 +238,7 @@ async def data_operation_search_service(mtid, point, params, start, end,
for
i
in
range
(
3
,
15
,
2
):
dict_data
[
config
.
FILE_KEY
[
f
"fd{i}ia"
]]
=
\
get_nums
(
"fdia"
,
f
"hr{i}ia"
,
all_datas
)
dict_data
[
config
.
FILE_KEY
[
"fdiec"
]]
=
\
get_nums
(
"fdic"
,
"thdic"
,
all_datas
)
for
i
in
range
(
3
,
15
,
2
):
...
...
@@ -363,16 +355,21 @@ def get_download_data(sdu_u, params):
if
"thdu"
in
params
:
key
.
remove
(
"thdu"
)
key
+=
[
"thdua"
,
"hr3ua"
,
"hr5ua"
,
"hr7ua"
,
"hr9ua"
,
"hr11ua"
,
"hr13ua"
,
"thdub"
,
"hr3ub"
,
"hr5ub"
,
"hr7ub"
,
"hr9ub"
,
"hr11ub"
,
"hr13ub"
,
"thdua"
,
"hr3ua"
,
"hr5ua"
,
"hr7ua"
,
"hr9ua"
,
"hr11ua"
,
"hr13ua"
,
"thdub"
,
"hr3ub"
,
"hr5ub"
,
"hr7ub"
,
"hr9ub"
,
"hr11ub"
,
"hr13ub"
,
"thduc"
,
"hr3uc"
,
"hr5uc"
,
"hr7uc"
,
"hr9uc"
,
"hr11uc"
,
"hr13uc"
]
if
"thdi"
in
params
:
key
.
remove
(
"thdi"
)
key
+=
[
"thdia"
,
"hr3ia"
,
"hr5ia"
,
"hr7ia"
,
"hr9ia"
,
"hr11ia"
,
"hr13ia"
,
"thdic"
,
"hr3ic"
,
"hr5ic"
,
"hr7ic"
,
"hr9ic"
,
"hr11ic"
,
"hr13ic"
,
"thdib"
,
"hr3ib"
,
"hr5ib"
,
"hr7ib"
,
"hr9ib"
,
"hr11ib"
,
"hr13ib"
,
"thdia"
,
"hr3ia"
,
"hr5ia"
,
"hr7ia"
,
"hr9ia"
,
"hr11ia"
,
"hr13ia"
,
"thdic"
,
"hr3ic"
,
"hr5ic"
,
"hr7ic"
,
"hr9ic"
,
"hr11ic"
,
"hr13ic"
,
"thdib"
,
"hr3ib"
,
"hr5ib"
,
"hr7ib"
,
"hr9ib"
,
"hr11ib"
,
"hr13ib"
,
]
if
"fdie"
in
params
:
key
.
remove
(
"fdie"
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment