Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
U
unify_api2
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
chaonan
unify_api2
Commits
bf1a6c4f
Commit
bf1a6c4f
authored
Jun 01, 2023
by
ZZH
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
remove es 2023-6-1
parent
faf23a43
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
42 additions
and
720 deletions
+42
-720
electric_service.py
unify_api/modules/electric/service/electric_service.py
+2
-312
electric.py
unify_api/modules/electric/views/electric.py
+40
-408
No files found.
unify_api/modules/electric/service/electric_service.py
View file @
bf1a6c4f
...
...
@@ -421,317 +421,7 @@ async def qual_current_level_service(point_list):
)
async
def
elec_index_service
(
cid
,
point_id
,
date_start
,
date_end
):
# 2. 获取几表法
ctnum
,
_
=
3
,
0
# await get_wiring_type(point_id)
if
ctnum
not
in
[
2
,
3
]:
# 给默认值3表法
ctnum
=
3
# 起始时间分别转化为时间戳
start_tt
=
time_format
.
get_date_timestamp
(
date_start
)
end_tt
=
time_format
.
get_date_timestamp
(
date_end
)
es_start_dt
=
str
(
my_pendulum
.
from_format
(
date_start
,
"YYYY-MM-DD HH:mm:ss"
))
es_end_dt
=
str
(
my_pendulum
.
from_format
(
date_end
,
"YYYY-MM-DD HH:mm:ss"
))
range
=
Range
(
field
=
"quarter_time"
,
start
=
es_start_dt
,
end
=
es_end_dt
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
# TODO频率偏差和电压偏差后期直接通过硬件取值,暂时忽略
if
ctnum
==
2
:
common_items
=
[
"lf_mean"
,
"lf_min"
,
"lf_max"
,
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"costtl_mean"
,
"costtl_min"
,
"costtl_max"
,
"uab_mean"
,
"uab_min"
,
"uab_max"
,
"ucb_mean"
,
"ucb_min"
,
"ucb_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
"freq_mean"
,
"freq_min"
,
"freq_max"
,
]
elec_qual_items
=
[
"ubl_mean"
,
"ubl_min"
,
"ubl_max"
,
"ibl_mean"
,
"ibl_min"
,
"ibl_max"
,
"thduab_mean"
,
"thduab_min"
,
"thduab_max"
,
"thducb_mean"
,
"thducb_min"
,
"thducb_max"
,
"thdia_mean"
,
"thdia_min"
,
"thdia_max"
,
"thdic_mean"
,
"thdic_min"
,
"thdic_max"
,
"uab_dev_mean"
,
"uab_dev_min"
,
"uab_dev_max"
,
"freq_dev_mean"
,
"freq_dev_min"
,
"freq_dev_max"
,
]
else
:
common_items
=
[
"lf_mean"
,
"lf_min"
,
"lf_max"
,
"pttl_mean"
,
"pttl_min"
,
"pttl_max"
,
"qttl_mean"
,
"qttl_min"
,
"qttl_max"
,
"costtl_mean"
,
"costtl_min"
,
"costtl_max"
,
"ua_mean"
,
"ua_min"
,
"ua_max"
,
"ub_mean"
,
"ub_min"
,
"ub_max"
,
"uc_mean"
,
"uc_min"
,
"uc_max"
,
"ia_mean"
,
"ia_min"
,
"ia_max"
,
"ib_mean"
,
"ib_min"
,
"ib_max"
,
"ic_mean"
,
"ic_min"
,
"ic_max"
,
"freq_mean"
,
"freq_min"
,
"freq_max"
,
]
elec_qual_items
=
[
"ubl_mean"
,
"ubl_min"
,
"ubl_max"
,
"ibl_mean"
,
"ibl_min"
,
"ibl_max"
,
"thdua_mean"
,
"thdua_min"
,
"thdua_max"
,
"thdub_mean"
,
"thdub_min"
,
"thdub_max"
,
"thduc_mean"
,
"thduc_min"
,
"thduc_max"
,
"thdia_mean"
,
"thdia_min"
,
"thdia_max"
,
"thdib_mean"
,
"thdib_min"
,
"thdib_max"
,
"thdic_mean"
,
"thdic_min"
,
"thdic_max"
,
"ua_dev_mean"
,
"ua_dev_min"
,
"ua_dev_max"
,
"freq_dev_mean"
,
"freq_dev_min"
,
"freq_dev_max"
,
]
query_body
=
EsQuery
.
aggr_index
(
page_request
,
stats_items
=
common_items
+
elec_qual_items
)
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_15MIN_INDEX
)
aggregations
=
es_results
.
get
(
"aggregations"
,
{})
# 常规参数统计
common_indexes
=
[]
_common_items
=
{
i
.
rsplit
(
"_"
,
1
)[
0
]
for
i
in
common_items
}
for
item
in
_common_items
:
# 最大值
max_info
=
aggregations
.
get
(
f
"{item}_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max_value
=
source
.
get
(
f
"{item}_max"
)
max_value
=
max_value
if
max_value
is
not
None
else
""
max_dt
=
source
.
get
(
f
"{item}_max_time"
)
if
max_dt
is
None
:
log
.
error
(
f
"错误{item}_max_time: item={item} ctnum={ctnum} point_id={point_id}"
)
max_value_time
=
(
str
(
datetime
.
strptime
(
max_dt
,
"
%
Y-
%
m-
%
dT
%
H:
%
M:
%
S+08:00"
))
if
max_dt
else
""
)
else
:
max_value
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
f
"{item}_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min_value
=
source
.
get
(
f
"{item}_min"
)
min_value
=
min_value
if
min_value
is
not
None
else
""
min_dt
=
source
.
get
(
f
"{item}_min_time"
)
min_value_time
=
(
str
(
datetime
.
strptime
(
min_dt
,
"
%
Y-
%
m-
%
dT
%
H:
%
M:
%
S+08:00"
))
if
min_dt
else
""
)
else
:
min_value
=
""
min_value_time
=
""
avg
=
aggregations
.
get
(
f
"{item}_mean_avg"
,
{})
.
get
(
"value"
)
avg
=
round
(
avg
,
2
)
if
avg
is
not
None
else
""
elec_index
=
ElecIndex
(
stats_index
=
item
,
max
=
max_value
,
max_time
=
max_value_time
,
min
=
min_value
,
min_time
=
min_value_time
,
avg
=
avg
,
)
common_indexes
.
append
(
elec_index
)
# 电能质量统计
elec_qual_indexes
=
[]
_elec_qual_items
=
{
i
.
rsplit
(
"_"
,
1
)[
0
]
for
i
in
elec_qual_items
}
for
item
in
_elec_qual_items
:
# 最大值
max_info
=
aggregations
.
get
(
f
"{item}_max_max"
,
{})
hits
=
max_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
max_value
=
source
.
get
(
f
"{item}_max"
,
0
)
max_value
=
max_value
if
max_value
is
not
None
else
""
max_dt
=
source
.
get
(
f
"{item}_max_time"
,
0
)
max_value_time
=
str
(
datetime
.
strptime
(
max_dt
,
"
%
Y-
%
m-
%
dT
%
H:
%
M:
%
S+08:00"
))
if
max_dt
else
""
else
:
max_value
=
""
max_value_time
=
""
# 最小值
min_info
=
aggregations
.
get
(
f
"{item}_min_min"
,
{})
hits
=
min_info
.
get
(
"hits"
,
{})
.
get
(
"hits"
)
if
hits
:
source
=
hits
[
0
]
.
get
(
"_source"
,
{})
min_value
=
source
.
get
(
f
"{item}_min"
,
0
)
min_value
=
min_value
if
min_value
is
not
None
else
""
min_dt
=
source
.
get
(
f
"{item}_min_time"
)
min_value_time
=
str
(
datetime
.
strptime
(
min_dt
,
"
%
Y-
%
m-
%
dT
%
H:
%
M:
%
S+08:00"
))
if
min_dt
else
""
else
:
min_value
=
""
min_value_time
=
""
avg
=
aggregations
.
get
(
f
"{item}_mean_avg"
,
{})
.
get
(
"value"
)
avg
=
avg
if
avg
is
not
None
else
""
elec_index
=
ElecIndex
(
stats_index
=
item
,
max
=
max_value
,
max_time
=
max_value_time
,
min
=
min_value
,
min_time
=
min_value_time
,
avg
=
avg
,
)
elec_qual_indexes
.
append
(
elec_index
)
if
cid
:
# 小程序需要这漏电流和温度
residual_current_map
=
await
location_stats_statics
(
cid
,
point_id
,
start_tt
,
end_tt
,
_type
=
"residual_current"
)
if
residual_current_map
:
residual_current_map
=
residual_current_map
[
"漏电流"
]
common_indexes
.
append
(
ElecIndex
(
stats_index
=
"residual_current"
,
name
=
"漏电流(mA)"
,
max
=
residual_current_map
[
"value_max"
][
"value"
]
if
"value_max"
in
residual_current_map
else
None
,
max_time
=
residual_current_map
[
"value_max"
][
"time"
]
if
"value_max"
in
residual_current_map
else
None
,
min
=
residual_current_map
[
"value_min"
][
"value"
]
if
"value_min"
in
residual_current_map
else
None
,
min_time
=
residual_current_map
[
"value_min"
][
"time"
]
if
"value_min"
in
residual_current_map
else
None
,
avg
=
residual_current_map
[
"value_avg"
]
if
"value_avg"
in
residual_current_map
else
None
,
)
)
temp_map
=
await
location_stats_statics
(
cid
,
point_id
,
start_tt
,
end_tt
,
_type
=
"temperature"
)
if
temp_map
:
for
item_name
,
agg_info_map
in
temp_map
.
items
():
common_indexes
.
append
(
ElecIndex
(
stats_index
=
"temperature"
,
name
=
f
"{item_name}(℃)"
,
max
=
agg_info_map
[
"value_max"
][
"value"
]
if
"value_max"
in
agg_info_map
else
None
,
max_time
=
agg_info_map
[
"value_max"
][
"time"
]
if
"value_max"
in
agg_info_map
else
None
,
min
=
agg_info_map
[
"value_min"
][
"value"
]
if
"value_min"
in
agg_info_map
else
None
,
min_time
=
agg_info_map
[
"value_min"
][
"time"
]
if
"value_min"
in
agg_info_map
else
None
,
avg
=
agg_info_map
[
"value_avg"
]
if
"value_avg"
in
agg_info_map
else
None
,
)
)
return
ElecIndexResponse
(
ctnum
=
ctnum
,
common_indexes
=
common_indexes
,
elec_qual_indexes
=
elec_qual_indexes
)
async
def
elec_index_service_new15
(
cid
,
point_id
,
start
,
end
):
async
def
elec_index_service
(
cid
,
point_id
,
start
,
end
):
ctnum
=
await
get_wiring_type_new15
(
point_id
)
ctnum
=
ctnum
if
ctnum
==
2
else
3
now
=
str
(
datetime
.
now
())
...
...
@@ -897,7 +587,7 @@ async def elec_index_service_new15(cid, point_id, start, end):
)
async
def
elec_current_service
_new15
(
point_id
):
async
def
elec_current_service
(
point_id
):
# 获取mtid
meter_info
=
await
get_meter_by_point
(
point_id
)
if
not
meter_info
:
...
...
unify_api/modules/electric/views/electric.py
View file @
bf1a6c4f
...
...
@@ -5,27 +5,23 @@
import
time
import
json
from
dataclasses
import
fields
import
re
from
pot_libs.settings
import
SETTING
from
datetime
import
datetime
from
unify_api.modules.common.procedures.points
import
get_meter_by_point
from
unify_api.modules.common.service.td_engine_service
import
\
get_td_engine_data
from
pot_libs.aredis_util.aredis_utils
import
RedisUtils
from
pot_libs.es_util.es_query
import
EsQuery
from
pot_libs.es_util.es_utils
import
EsUtil
from
pot_libs.sanic_api
import
summary
,
description
,
examples
from
pot_libs.logger
import
log
from
pot_libs.utils.exc_util
import
ParamException
,
DBException
,
BusinessException
from
pot_libs.utils.pendulum_wrapper
import
my_pendulum
from
pot_libs.utils.exc_util
import
(
ParamException
,
DBException
,
BusinessException
)
from
unify_api.modules.common.components.common_cps
import
CidPointsReq
from
unify_api.modules.common.procedures
import
health_score
from
unify_api.modules.electric.service.electric_service
import
\
elec_current_storeys_service
,
qual_current_storeys_service
,
\
elec_card_level_service
,
qual_current_level_service
,
elec_index_service
,
\
elec_index_service_new15
,
elec_current_service_new15
,
get_sdu_i_and_u
from
unify_api.modules.electric.service.electric_service
import
(
elec_current_storeys_service
,
qual_current_storeys_service
,
elec_card_level_service
,
qual_current_level_service
,
elec_index_service
,
elec_current_service
,
get_sdu_i_and_u
)
from
unify_api.utils
import
time_format
from
unify_api
import
constants
from
unify_api.modules.electric.procedures.electric_util
import
(
...
...
@@ -33,7 +29,7 @@ from unify_api.modules.electric.procedures.electric_util import (
add_random_change
,
)
from
pot_libs.common.components.query
import
PageRequest
,
Range
,
Equal
,
Filter
from
pot_libs.common.components.query
import
PageRequest
from
unify_api.modules.electric.components.electric
import
(
ElecHistoryResponse
,
ElecCurrentResponse
,
...
...
@@ -56,8 +52,9 @@ from unify_api.modules.electric.components.electric import (
CurrentHarmonic
,
EcsReq
,
EscResp
,
QcsResp
,
EclResp
,
QclResp
,
)
from
unify_api.utils.request_util
import
filed_value_from_list
from
unify_api.modules.electric.dao.electric_dao
import
get_qual_history_dao
,
\
get_elec_history_dao
from
unify_api.modules.electric.dao.electric_dao
import
(
get_qual_history_dao
,
get_elec_history_dao
)
from
unify_api.utils.taos_new
import
parse_td_columns
METERDATA_CURRENT_KEY
=
"meterdata_current"
...
...
@@ -88,157 +85,11 @@ async def post_elec_history(req, body: PageRequest) -> ElecHistoryResponse:
log
.
warning
(
"param exception, equals is NULL, no point_id"
)
raise
ParamException
(
message
=
"param exception, equals is NULL, no point_id"
)
# return await elec_history_service(date_start, date_end, point_id,
# intervel, slots)
return
await
elec_history_service_new15
(
date_start
,
date_end
,
point_id
,
intervel
,
slots
)
async
def
elec_history_service
(
date_start
,
date_end
,
point_id
,
intervel
,
slots
):
# 2.获取曲线数据
# 2.1 起始时间分别转化为时间戳
es_start_dt
=
my_pendulum
.
from_format
(
date_start
,
"YYYY-MM-DD HH:mm:ss"
)
es_end_dt
=
my_pendulum
.
from_format
(
date_end
,
"YYYY-MM-DD HH:mm:ss"
)
range
=
Range
(
field
=
"quarter_time"
,
start
=
es_start_dt
,
end
=
es_end_dt
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
ctnum
,
_
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum , 监测点已经拆除"
)
# 这个实际上装置点已经拆除,无法获取到接表法,但是页面又可以选择, 只能给个默认值,但是不是根本解决办法,万一人家历史数据是二表法呢
# 那么就给默认值吧,没办法了
ctnum
=
3
query_body
=
EsQuery
.
aggr_history_new
(
page_request
,
interval
=
intervel
,
stats_items
=
[
"lf_mean"
,
"pttl_mean"
,
"qttl_mean"
,
"costtl_mean"
,
"uab_mean"
,
"ucb_mean"
,
"ua_mean"
,
"ub_mean"
,
"uc_mean"
,
"ia_mean"
,
"ib_mean"
,
"ic_mean"
,
"freq_mean"
,
],
histogram_field
=
"quarter_time"
,
)
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_15MIN_INDEX
)
if
ctnum
==
2
:
stats_items
=
[
"lf_mean"
,
"pttl_mean"
,
"qttl_mean"
,
"costtl_mean"
,
"uab_mean"
,
"ucb_mean"
,
"ia_mean"
,
"ic_mean"
,
"freq_mean"
,
]
else
:
stats_items
=
[
"lf_mean"
,
"pttl_mean"
,
"qttl_mean"
,
"costtl_mean"
,
"ua_mean"
,
"ub_mean"
,
"uc_mean"
,
"ia_mean"
,
"ib_mean"
,
"ic_mean"
,
"freq_mean"
,
]
return
await
elec_history_service
(
date_start
,
date_end
,
point_id
,
intervel
,
slots
)
aggs_res
=
es_results
.
get
(
"aggregations"
,
{})
data_buckets
=
aggs_res
.
get
(
"aggs_name"
,
{})
.
get
(
"buckets"
,
[])
time_bucket_map
=
{
i
[
"key_as_string"
]:
i
for
i
in
data_buckets
}
print
(
f
"post_elec_history slots={slots} _data = {list(time_bucket_map.keys())}, query_body={query_body}"
)
elec_data
=
{
stats_item
:
[]
for
stats_item
in
stats_items
}
for
slot
in
slots
:
if
slot
in
time_bucket_map
:
for
stats_item
in
stats_items
:
value
=
time_bucket_map
[
slot
]
.
get
(
stats_item
,
{})
.
get
(
"avg"
)
value
=
value
if
value
is
not
None
else
""
elec_data
[
stats_item
]
.
append
(
value
)
else
:
for
stats_item
in
stats_items
:
elec_data
[
stats_item
]
.
append
(
""
)
# 识电U只有一项有数据,返回具体的项
sdu_i
=
None
sdu_u
=
None
if
ctnum
==
2
:
u
=
U
(
uab
=
elec_data
[
"uab_mean"
],
ucb
=
elec_data
[
"ucb_mean"
])
i
=
I
(
ia
=
elec_data
[
"ia_mean"
],
ic
=
elec_data
[
"ic_mean"
])
if
any
(
elec_data
[
"uab_mean"
])
and
not
any
(
elec_data
[
"ucb_mean"
]):
sdu_i
=
"ia"
sdu_u
=
"uab"
if
any
(
elec_data
[
"ucb_mean"
])
and
not
any
(
elec_data
[
"uab_mean"
]):
sdu_i
=
"ic"
sdu_u
=
"ucb"
else
:
u
=
U
(
ua
=
elec_data
[
"ua_mean"
],
ub
=
elec_data
[
"ub_mean"
],
uc
=
elec_data
[
"uc_mean"
])
i
=
I
(
ia
=
elec_data
[
"ia_mean"
],
ib
=
elec_data
[
"ib_mean"
],
ic
=
elec_data
[
"ic_mean"
])
if
(
any
(
elec_data
[
"ua_mean"
])
and
not
any
(
elec_data
[
"ub_mean"
])
and
not
any
(
elec_data
[
"uc_mean"
])
):
sdu_i
=
"ia"
sdu_u
=
"ua"
if
(
any
(
elec_data
[
"ub_mean"
])
and
not
any
(
elec_data
[
"ua_mean"
])
and
not
any
(
elec_data
[
"uc_mean"
])
):
sdu_i
=
"ib"
sdu_u
=
"ub"
if
(
any
(
elec_data
[
"uc_mean"
])
and
not
any
(
elec_data
[
"ua_mean"
])
and
not
any
(
elec_data
[
"ub_mean"
])
):
sdu_i
=
"ic"
sdu_u
=
"uc"
return
ElecHistoryResponse
(
ctnum
=
ctnum
,
lf
=
elec_data
[
"lf_mean"
],
power
=
Power
(
p
=
elec_data
[
"pttl_mean"
],
q
=
elec_data
[
"qttl_mean"
]),
costtl
=
elec_data
[
"costtl_mean"
],
u
=
u
,
i
=
i
,
freq
=
elec_data
[
"freq_mean"
],
time_slots
=
slots
,
sdu_i
=
sdu_i
,
sdu_u
=
sdu_u
,
)
async
def
elec_history_service_new15
(
start
,
end
,
pid
,
intervel
,
slots
):
async
def
elec_history_service
(
start
,
end
,
pid
,
intervel
,
slots
):
ctnum
=
await
get_wiring_type_new15
(
pid
)
if
ctnum
==
2
:
stats_items
=
[
...
...
@@ -253,11 +104,11 @@ async def elec_history_service_new15(start, end, pid, intervel, slots):
]
if
intervel
==
900
:
table_name
=
"point_15min_electric"
date_f
orma
t
=
"
%%
H:
%%
i"
date_f
m
t
=
"
%%
H:
%%
i"
else
:
table_name
=
"point_1day_electric"
date_f
orma
t
=
"
%%
m-
%%
d"
datas
=
await
get_elec_history_dao
(
table_name
,
pid
,
start
,
end
,
date_f
orma
t
)
date_f
m
t
=
"
%%
m-
%%
d"
datas
=
await
get_elec_history_dao
(
table_name
,
pid
,
start
,
end
,
date_f
m
t
)
datas
=
{
data
[
"date_time"
]:
data
for
data
in
datas
}
elec_data
=
{
stats_item
:
[]
for
stats_item
in
stats_items
}
for
slot
in
slots
:
...
...
@@ -325,9 +176,9 @@ async def post_elec_current(req, body: PageRequest) -> ElecCurrentResponse:
raise
ParamException
(
message
=
"param exception, equals is Ncount_infoULL, no point_id"
)
try
:
time_str
,
res
=
await
elec_current_service
_new15
(
point_id
)
time_str
,
res
=
await
elec_current_service
(
point_id
)
except
Exception
as
e
:
log
.
error
(
f
"post_elec_current elec_current_service
_new15
error:"
log
.
error
(
f
"post_elec_current elec_current_service error:"
f
"{str(e)}"
)
raise
BusinessException
(
message
=
f
"{str(e)}"
)
return
ElecCurrentResponse
(
...
...
@@ -344,14 +195,13 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
point_id
=
filed_value_from_list
(
body
.
filter
.
equals
,
"point_id"
)
if
point_id
<=
0
or
not
point_id
:
log
.
warning
(
"param exception, equals is NULL, no point_id"
)
raise
ParamException
(
message
=
"param exception, equals is Ncount_infoULL, no point_id"
)
msg
=
"param exception, equals is NULL, no point_id"
log
.
warning
(
msg
)
raise
ParamException
(
message
=
msg
)
ctnum
,
mid
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum 装置已经拆除"
)
log
.
error
(
f
"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum"
)
if
not
mid
:
log
.
error
(
f
"post_elec_current pid={point_id} 没有mid 装置已经被拆除"
)
...
...
@@ -464,9 +314,9 @@ async def post_elec_index(req, body: PageRequest) -> ElecIndexResponse:
point_id
=
filed_value_from_list
(
body
.
filter
.
equals
,
"point_id"
)
if
not
point_id
or
point_id
<=
0
:
log
.
warning
(
"param exception, equals is NULL, no point_id"
)
raise
ParamException
(
message
=
"param exception, equals is NULL, no point_id"
)
msg
=
"param exception, equals is NULL, no point_id"
log
.
warning
(
msg
)
raise
ParamException
(
message
=
msg
)
# 3. 获取常规参数统计和电能质量统计
# 获取时间
try
:
...
...
@@ -475,8 +325,7 @@ async def post_elec_index(req, body: PageRequest) -> ElecIndexResponse:
except
:
log
.
error
(
"param error, ranges is NULL"
)
raise
ParamException
(
message
=
"param error, ranges is NULL"
)
# return await elec_index_service(cid, point_id, date_start, date_end)
return
await
elec_index_service_new15
(
cid
,
point_id
,
date_start
,
date_end
)
return
await
elec_index_service
(
cid
,
point_id
,
date_start
,
date_end
)
@
summary
(
"电能质量-历史曲线"
)
...
...
@@ -500,240 +349,23 @@ async def post_qual_history(req, body: PageRequest) -> QualHistoryResponse:
point_id
=
filed_value_from_list
(
body
.
filter
.
equals
,
"point_id"
)
if
not
point_id
or
point_id
<=
0
:
log
.
warning
(
"param exception, equals is NULL, no point_id"
)
raise
ParamException
(
message
=
"param exception, equals is NULL, no point_id"
)
# return await qual_history_service(date_start, date_end, intervel, slots,
# point_id)
return
await
qual_history_service_new15
(
date_start
,
date_end
,
intervel
,
slots
,
point_id
)
async
def
qual_history_service
(
date_start
,
date_end
,
intervel
,
slots
,
point_id
):
# 2.获取曲线数据
# 2.1 起始时间分别转化为时间戳
es_start_dt
=
str
(
my_pendulum
.
from_format
(
date_start
,
"YYYY-MM-DD HH:mm:ss"
))
es_end_dt
=
str
(
my_pendulum
.
from_format
(
date_end
,
"YYYY-MM-DD HH:mm:ss"
))
range
=
Range
(
field
=
"quarter_time"
,
start
=
es_start_dt
,
end
=
es_end_dt
)
equal
=
Equal
(
field
=
"pid"
,
value
=
point_id
)
filter
=
Filter
(
equals
=
[
equal
],
ranges
=
[
range
],
in_groups
=
[],
keywords
=
[])
page_request
=
PageRequest
(
page_size
=
1
,
page_num
=
1
,
sort
=
None
,
filter
=
filter
)
ctnum
,
_
=
await
get_wiring_type
(
point_id
)
if
ctnum
not
in
[
2
,
3
]:
log
.
error
(
f
"qual_history point_id={point_id} ctnum={ctnum} 无法获取到ctnum 装置已经被拆"
)
ctnum
=
3
if
ctnum
==
2
:
stats_items
=
[
"fdia_mean"
,
# 用于计算电流谐波有效值
"fdic_mean"
,
"uab_dev_mean"
,
"ucb_dev_mean"
,
"freq_dev_mean"
,
"ubl_mean"
,
"ibl_mean"
,
"thduab_mean"
,
"thducb_mean"
,
"thdia_mean"
,
"thdic_mean"
,
"hr3ia_mean"
,
"hr5ia_mean"
,
"hr7ia_mean"
,
"hr9ia_mean"
,
"hr11ia_mean"
,
"hr13ia_mean"
,
"hr3ic_mean"
,
"hr5ic_mean"
,
"hr7ic_mean"
,
"hr9ic_mean"
,
"hr11ic_mean"
,
"hr13ic_mean"
,
"hr3uab_mean"
,
"hr5uab_mean"
,
"hr7uab_mean"
,
"hr9uab_mean"
,
"hr11uab_mean"
,
"hr13uab_mean"
,
"hr3ucb_mean"
,
"hr5ucb_mean"
,
"hr7ucb_mean"
,
"hr9ucb_mean"
,
"hr11ucb_mean"
,
"hr13ucb_mean"
,
]
else
:
stats_items
=
[
"fdia_mean"
,
# 用于计算电流谐波有效值
"fdib_mean"
,
"fdic_mean"
,
"ua_dev_mean"
,
"ub_dev_mean"
,
"uc_dev_mean"
,
"freq_dev_mean"
,
"ubl_mean"
,
"ibl_mean"
,
"thdua_mean"
,
"thdub_mean"
,
"thduc_mean"
,
"thdia_mean"
,
"thdib_mean"
,
"thdic_mean"
,
"hr3ia_mean"
,
"hr5ia_mean"
,
"hr7ia_mean"
,
"hr9ia_mean"
,
"hr11ia_mean"
,
"hr13ia_mean"
,
"hr3ib_mean"
,
"hr5ib_mean"
,
"hr7ib_mean"
,
"hr9ib_mean"
,
"hr11ib_mean"
,
"hr13ib_mean"
,
"hr3ic_mean"
,
"hr5ic_mean"
,
"hr7ic_mean"
,
"hr9ic_mean"
,
"hr11ic_mean"
,
"hr13ic_mean"
,
"hr3ua_mean"
,
"hr5ua_mean"
,
"hr7ua_mean"
,
"hr9ua_mean"
,
"hr11ua_mean"
,
"hr13ua_mean"
,
"hr3ub_mean"
,
"hr5ub_mean"
,
"hr7ub_mean"
,
"hr9ub_mean"
,
"hr11ub_mean"
,
"hr13ub_mean"
,
"hr3uc_mean"
,
"hr5uc_mean"
,
"hr7uc_mean"
,
"hr9uc_mean"
,
"hr11uc_mean"
,
"hr13uc_mean"
,
]
query_body
=
EsQuery
.
aggr_history_new
(
page_request
,
interval
=
intervel
,
stats_items
=
stats_items
,
histogram_field
=
"quarter_time"
)
async
with
EsUtil
()
as
es
:
es_results
=
await
es
.
search_origin
(
body
=
query_body
,
index
=
constants
.
POINT_15MIN_INDEX
)
msg
=
"param exception, equals is NULL, no point_id"
log
.
warning
(
msg
)
raise
ParamException
(
message
=
msg
)
aggs_res
=
es_results
.
get
(
"aggregations"
,
{})
data_buckets
=
aggs_res
.
get
(
"aggs_name"
,
{})
.
get
(
"buckets"
,
[])
time_bucket_map
=
{
i
[
"key_as_string"
]:
i
for
i
in
data_buckets
}
log
.
info
(
f
"post_qual_history query_body={query_body}, slots={slots} time_bucket_map.keys={list(time_bucket_map.keys())}"
)
elec_data
=
{
stats_item
:
[]
for
stats_item
in
stats_items
}
for
slot
in
slots
:
if
slot
in
time_bucket_map
:
for
stats_item
in
stats_items
:
value
=
time_bucket_map
[
slot
]
.
get
(
stats_item
,
{})
.
get
(
"avg"
,
""
)
# TODO:电压偏差和频率偏差暂时采用手段计算方式,后期直接读ES数据
if
value
and
stats_item
==
"freq_dev_mean"
:
# 如果频率偏差保留两位小数之后为0了,那么直接返回0,防止出现-0.00 的情况
if
abs
(
value
)
<
0.005
:
value
=
0
elec_data
[
stats_item
]
.
append
(
value
)
else
:
for
stats_item
in
stats_items
:
elec_data
[
stats_item
]
.
append
(
""
)
voltage_dev
=
VoltageDev
(
**
{
k
.
rsplit
(
"_"
,
1
)[
0
]:
v
for
k
,
v
in
elec_data
.
items
()
if
k
.
rsplit
(
"_"
,
1
)[
0
]
in
[
field
.
name
for
field
in
fields
(
VoltageDev
)]
}
)
three_phase_unbalance
=
ThreePhaseImbalance
(
voltage
=
elec_data
[
"ubl_mean"
],
current
=
elec_data
[
"ibl_mean"
]
)
voltage_harmonic
=
VoltageHarmonic
(
**
{
k
.
rsplit
(
"_"
,
1
)[
0
]:
v
for
k
,
v
in
elec_data
.
items
()
if
k
.
rsplit
(
"_"
,
1
)[
0
]
in
[
field
.
name
for
field
in
fields
(
VoltageHarmonic
)]
}
)
# 新增电流谐波有效值 = 电流*畸变率 前端处理
current_harmonic
=
CurrentHarmonic
(
**
{
k
.
rsplit
(
"_"
,
1
)[
0
]:
v
for
k
,
v
in
elec_data
.
items
()
if
k
.
rsplit
(
"_"
,
1
)[
0
]
in
[
field
.
name
for
field
in
fields
(
CurrentHarmonic
)]
}
)
# 识电U只有一项有数据,返回具体的项
sdu_i
=
None
sdu_u
=
None
if
ctnum
==
2
:
if
any
(
elec_data
[
"uab_dev_mean"
])
and
not
any
(
elec_data
[
"ucb_dev_mean"
]):
sdu_i
=
"ia"
sdu_u
=
"uab"
if
any
(
elec_data
[
"ucb_dev_mean"
])
and
not
any
(
elec_data
[
"uab_dev_mean"
]):
sdu_i
=
"ic"
sdu_u
=
"ucb"
else
:
if
(
any
(
elec_data
[
"ua_dev_mean"
])
and
not
any
(
elec_data
[
"ub_dev_mean"
])
and
not
any
(
elec_data
[
"uc_dev_mean"
])
):
sdu_i
=
"ia"
sdu_u
=
"ua"
if
(
any
(
elec_data
[
"ub_dev_mean"
])
and
not
any
(
elec_data
[
"ua_dev_mean"
])
and
not
any
(
elec_data
[
"uc_dev_mean"
])
):
sdu_i
=
"ib"
sdu_u
=
"ub"
if
(
any
(
elec_data
[
"uc_dev_mean"
])
and
not
any
(
elec_data
[
"ua_dev_mean"
])
and
not
any
(
elec_data
[
"ub_dev_mean"
])
):
sdu_i
=
"ic"
sdu_u
=
"uc"
return
QualHistoryResponse
(
ctnum
=
ctnum
,
voltage_dev
=
voltage_dev
,
freq_dev
=
elec_data
[
"freq_dev_mean"
],
three_phase_unbalance
=
three_phase_unbalance
,
voltage_harmonic
=
voltage_harmonic
,
current_harmonic
=
current_harmonic
,
time_slots
=
slots
,
sdu_i
=
sdu_i
,
sdu_u
=
sdu_u
,
)
return
await
qual_history_service
(
date_start
,
date_end
,
intervel
,
slots
,
point_id
)
async
def
qual_history_service
_new15
(
start
,
end
,
intervel
,
slots
,
pid
):
async
def
qual_history_service
(
start
,
end
,
intervel
,
slots
,
pid
):
ctnum
=
await
get_wiring_type_new15
(
pid
)
if
intervel
==
900
:
table_name
=
"point_15min_electric"
date_f
orma
t
=
"
%%
H:
%%
i"
date_f
m
t
=
"
%%
H:
%%
i"
else
:
table_name
=
"point_1day_electric"
date_f
orma
t
=
"
%%
m-
%%
d"
datas
=
await
get_qual_history_dao
(
table_name
,
pid
,
start
,
end
,
date_f
orma
t
)
date_f
m
t
=
"
%%
m-
%%
d"
datas
=
await
get_qual_history_dao
(
table_name
,
pid
,
start
,
end
,
date_f
m
t
)
datas
=
{
data
[
"date_time"
]:
data
for
data
in
datas
}
if
ctnum
==
2
:
# 用于计算电流谐波有效值
...
...
@@ -882,14 +514,14 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
url
=
f
"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql
=
f
"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ
,
results
=
await
get_td_engine_data
(
url
,
sql
)
log
.
info
(
f
"is_succ:{is_succ}"
)
if
is_succ
:
head
=
parse_td_columns
(
results
)
if
not
results
[
"data"
]:
results
[
"data"
]
=
[
''
for
i
in
range
(
len
(
head
))]
res
=
dict
(
zip
(
head
,
results
[
"data"
][
0
]))
ctnum
=
res
.
get
(
"ctnum"
)
or
3
harmonic
=
json
.
loads
(
res
.
get
(
"harmonic"
))
if
res
.
get
(
"harmonic"
)
else
{}
harmonic
=
json
.
loads
(
res
.
get
(
"harmonic"
))
if
res
.
get
(
"harmonic"
)
else
{}
voltage_harmonic_dict
,
current_harmonic_dict
=
{},
{}
for
k
in
[
field
.
name
for
field
in
fields
(
VoltageHarmonicRate
)]:
if
k
in
harmonic
.
keys
():
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment