Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
E
ems_collector
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ZZH
ems_collector
Commits
6e6e8a21
Commit
6e6e8a21
authored
Apr 28, 2026
by
ZZH
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
opt ems data sync 2026-4-28 16:49
parent
6965a6b3
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
254 additions
and
253 deletions
+254
-253
ems.py
src/ems_water_grp/ems.py
+83
-153
helper.py
src/ems_water_grp/helper.py
+171
-100
No files found.
src/ems_water_grp/ems.py
View file @
6e6e8a21
...
...
@@ -4,7 +4,6 @@ DATE:2026/4/9 15:43
"""
import
asyncio
import
aiohttp
import
json
import
time
import
math
...
...
@@ -14,30 +13,23 @@ from gmqtt import Client as MQTTClient
from
utils.encrypt
import
gen_sign
from
utils.constants
import
USER_AES
from
utils.utils
import
admin_client_id
,
mqtt_pwd
from
ems_water_grp.helper
import
FIELD_MAP
,
transfer_realtime
from
ems_water_grp.helper
import
(
EMS_DEVS
,
FIELD_MAP
,
ETYPE_TOPIC
,
transfer_realtime
)
from
infra.http.httpx_util
import
HttpxClient
from
infra.config.settings
import
SETTING
from
infra.logger.logger
import
Logger
log_name
=
f
"water_grp"
Logger
.
init_logger_path
(
f
"./ems_water_grp"
,
f
"{log_name}.log"
,
log_name
)
logger
=
Logger
.
getLogger
(
log_name
)
BASE_URL
=
"https://energy.sunwoda.com/energy-data-openapi/open-api"
# EMQX / MQTT 配置
MQTT_BROKER
=
"127.0.0.1"
MQTT_PORT
=
1883
MQTT_USER
=
"admin"
MQTT_PWD
=
"password"
MQTT_CLIENT_ID
=
"sunwoda_collector_01"
TOPIC_PREFIX
=
"eems/td/"
# 调度配置
POLL_INTVL
=
60
# 实时数据拉取间隔 (秒)
TOPOLOGY_SYNC_INTVL
=
86400
# 拓扑结构(场站/设备/测点)同步间隔 (秒),默认1天
class
WaterGrpClient
:
BASE_URL
=
"https://energy.sunwoda.com/energy-data-openapi/open-api"
def
__init__
(
self
):
self
.
user
=
"WATER_GRP"
self
.
app_id
=
USER_AES
[
self
.
user
][
"AppID"
]
...
...
@@ -49,6 +41,9 @@ class WaterGrpClient:
self
.
http
=
HttpxClient
()
async
def
close
(
self
):
await
self
.
http
.
close
()
async
def
_ensure_token
(
self
):
"""确保 Token 有效,过期则自动重新获取"""
cur_time
=
time
.
time
()
...
...
@@ -59,7 +54,7 @@ class WaterGrpClient:
logger
.
info
(
"Token 不存在或即将过期,正在重新获取..."
)
# 1. 获取 Code
code_url
=
f
"{BASE_URL}/getCode"
code_url
=
f
"{
self.
BASE_URL}/getCode"
resp
,
status_code
=
await
self
.
http
.
post
(
code_url
,
json
=
{
"appId"
:
self
.
app_id
})
if
status_code
!=
200
:
...
...
@@ -72,7 +67,7 @@ class WaterGrpClient:
auth_code
=
code_data
[
"data"
][
"code"
]
# 2. 获取 Token
token_url
=
f
"{BASE_URL}/getToken"
token_url
=
f
"{
self.
BASE_URL}/getToken"
header
=
{
"appId"
:
self
.
app_id
,
"sign"
:
self
.
sign
,
"code"
:
auth_code
}
...
...
@@ -94,16 +89,19 @@ class WaterGrpClient:
self
.
token_expire_time
=
cur_time
+
timeout
logger
.
info
(
f
"Token 获取成功,有效期 {timeout} 秒"
)
def
get_auth_headers
(
self
):
async
def
get_auth_headers
(
self
):
"""生成通用请求头"""
if
not
self
.
token
:
await
self
.
_ensure_token
()
auth
=
{
"appId"
:
self
.
app_id
,
"sign"
:
self
.
sign
,
"token"
:
self
.
token
}
return
{
"Authorization"
:
json
.
dumps
(
auth
),
"AppId"
:
self
.
app_id
,
"Content-Type"
:
"application/json"
}
async
def
fetch_sites
(
self
):
""" 获取全部站点列表 """
headers
=
self
.
get_auth_headers
()
url
=
f
"{BASE_URL}/v1/site/queryList"
headers
=
await
self
.
get_auth_headers
()
url
=
f
"{
self.
BASE_URL}/v1/site/queryList"
resp
,
_
=
await
self
.
http
.
post
(
url
,
headers
=
headers
,
json
=
{})
sites
=
json
.
loads
(
resp
)
.
get
(
"datas"
,
[])
logger
.
info
(
f
"Total fetch {len(sites)} site,{sites}"
)
...
...
@@ -114,9 +112,9 @@ class WaterGrpClient:
EQUIP_PAGE_SIZE
=
100
total_equipments
=
[]
page
=
1
url
=
f
"{BASE_URL}/v1/equipment/queryListBySiteId"
url
=
f
"{
self.
BASE_URL}/v1/equipment/queryListBySiteId"
if
not
headers
:
headers
=
self
.
get_auth_headers
()
headers
=
await
self
.
get_auth_headers
()
while
True
:
pyd
=
{
"siteId"
:
site_id
,
"num"
:
page
,
"size"
:
100
}
...
...
@@ -132,7 +130,6 @@ class WaterGrpClient:
f
"累计{len(total_equipments)}/{total}"
)
# 判断是否已拉完
total_pages
=
math
.
ceil
(
total
/
EQUIP_PAGE_SIZE
)
if
page
>=
total_pages
or
not
datas
:
break
...
...
@@ -142,21 +139,18 @@ class WaterGrpClient:
logger
.
info
(
f
"站点 {site_id} 共获取到 {len(total_equipments)} 台设备"
)
return
total_equipments
async
def
fetch_dev_
realts_
fields
(
self
,
site_id
,
equip_id
,
headers
=
None
):
async
def
fetch_dev_fields
(
self
,
site_id
,
equip_id
,
headers
=
None
):
""" 获取指定设备的测点列表 """
if
not
headers
:
headers
=
self
.
get_auth_headers
()
url
=
f
"{BASE_URL}/v1/sqlthte/queryListByNodeId"
headers
=
await
self
.
get_auth_headers
()
url
=
f
"{self.BASE_URL}/v1/sqlthte/queryListByNodeId"
pyd
=
{
"siteId"
:
site_id
,
"equipmentId"
:
equip_id
}
resp
,
_
=
await
self
.
http
.
post
(
url
,
headers
=
headers
,
json
=
pyd
)
resp
=
json
.
loads
(
resp
)
fields
=
[]
for
node
in
resp
.
get
(
"data"
,
[]):
# parameterBriefCode:为空,说明不是实时测点
# if equip_id in [2641,13691]:
if
equip_id
in
[
2625
]:
print
(
"node"
,
node
)
code
=
node
.
get
(
"parameterBriefCode"
)
if
code
:
fields
.
append
(
code
)
...
...
@@ -170,45 +164,33 @@ class WaterGrpClient:
parameterName:测点描述
parameterBriefCode:测点简码
"""
await
self
.
_ensure_token
()
headers
=
self
.
get_auth_headers
()
headers
=
await
self
.
get_auth_headers
()
device_infos
=
[]
sites
=
await
self
.
fetch_sites
()
for
site
in
sites
:
site_id
=
site
[
"id"
]
site_name
=
site
.
get
(
"name"
,
str
(
site_id
))
cid
=
site
[
"id"
]
logger
.
info
(
f
"发现站点: {site['name']} (ID: {site_id})"
)
try
:
equipments
=
await
self
.
fetch_equipments
(
site_id
,
headers
)
except
Exception
as
e
:
logger
.
error
(
f
"sync {site_name} equipments fail, {e}"
)
continue
print
(
site_id
,
site_name
,
len
(
equipments
))
print
(
equipments
)
print
(
site_id
,
site_name
,
len
(
equipments
),
equipments
)
for
equip
in
equipments
:
print
(
equip
)
equip_id
=
equip
[
"id"
]
equip_code
=
equip
[
"code"
]
equip_name
=
equip
.
get
(
"name"
,
str
(
equip_id
))
sid
=
equip
[
"code"
]
equip_type
=
equip
.
get
(
"equipmentType"
)
# if sid in ["SEWIRE00004", "SEWIRE00009"]:
# print("equip", equip)
# 如果设备没有 code 或 type,API 无法查询实时数据,跳过
if
not
equip_code
or
not
equip_type
:
continue
try
:
fields
=
await
self
.
fetch_dev_
realts_
fields
(
fields
=
await
self
.
fetch_dev_fields
(
site_id
,
equip_id
,
headers
)
# if sid == "SEWIRE00004":
# print("fields", fields)
except
Exception
as
e
:
logger
.
error
(
f
"fetch {equip_name} fields fail, {e}"
)
continue
...
...
@@ -226,23 +208,21 @@ class WaterGrpClient:
async
def
fetch_realtime
(
self
,
payload
,
headers
=
None
):
""" 拉取设备的实时数据 """
await
self
.
_ensure_token
()
url
=
f
"{BASE_URL}/v1/sqlthte/realTimeData"
url
=
f
"{self.BASE_URL}/v1/sqlthte/realTimeData"
if
not
headers
:
headers
=
self
.
get_auth_headers
()
headers
=
await
self
.
get_auth_headers
()
try
:
resp
,
status_code
=
await
self
.
http
.
post
(
url
,
headers
=
headers
,
json
=
payload
)
print
(
status_code
,
resp
)
if
status_code
==
200
:
resp
=
json
.
loads
(
resp
)
if
resp
.
get
(
"code"
)
==
10000
:
return
resp
.
get
(
"data"
,
[])
return
payload
,
resp
.
get
(
"data"
,
[])
except
Exception
as
e
:
logger
.
error
(
f
"fetch {payload.get('equipmentCode')} real fail {e}"
)
return
[]
return
payload
,
[]
class
WaterGrpService
:
...
...
@@ -275,73 +255,42 @@ class WaterGrpService:
async
def
stop
(
self
):
logger
.
info
(
"Shutting down Service..."
)
await
HttpxC
lient
.
close
()
await
self
.
api_c
lient
.
close
()
await
self
.
mqtt_client
.
disconnect
()
async
def
start
(
self
):
self
.
mqtt_client
=
MQTTClient
(
admin_client_id
(
"WaterGrpService"
))
self
.
mqtt_client
.
set_auth_credentials
(
self
.
username
,
mqtt_pwd
(
self
.
username
))
self
.
mqtt_client
.
on_connect
=
self
.
on_connect
self
.
mqtt_client
.
on_subscribe
=
self
.
on_subscribe
self
.
mqtt_client
.
on_disconnect
=
self
.
on_disconnect
await
self
.
mqtt_client
.
connect
(
MQTT_BROKER
,
MQTT_PORT
)
await
self
.
main_loop
()
def
pack_device_info
(
self
):
# 封装成对方平台请求格式
return
async
def
sync_once_data
(
self
):
logger
.
info
(
f
"Start poll real data)"
)
headers
=
self
.
api_client
.
get_auth_headers
()
headers
=
await
self
.
api_client
.
get_auth_headers
()
tasks
=
[]
for
pyd
in
EMS_DEVS
:
pyd
[
"briefCodeList"
]
=
list
(
FIELD_MAP
[
pyd
[
"equipmentType"
]]
.
keys
())
tasks
.
append
(
self
.
api_client
.
fetch_realtime
(
pyd
,
headers
))
tasks
=
[
self
.
api_client
.
fetch_realtime
(
pyd
,
headers
)
for
pyd
in
self
.
topology_cache
]
results
=
await
asyncio
.
gather
(
*
tasks
,
return_exceptions
=
True
)
pub_cnt
=
0
for
data_list
in
results
:
if
isinstance
(
data_list
,
list
):
for
data_point
in
data_list
:
keyword
,
val
=
data_point
.
get
(
"keyword"
),
data_point
.
get
(
"value"
)
if
keyword
is
not
None
and
val
is
not
None
:
topic
=
f
"{TOPIC_PREFIX}{keyword}"
payload
=
json
.
dumps
({
"value"
:
val
,
"originValue"
:
data_point
.
get
(
"originValue"
),
"pushTime"
:
data_point
.
get
(
"pushTime"
),
"collectTime"
:
int
(
time
.
time
()
*
1000
)
})
self
.
mqtt_client
.
publish
(
topic
,
payload
,
qos
=
1
)
for
pyd
,
data_lst
in
results
:
try
:
payload
=
transfer_realtime
(
pyd
,
data_lst
)
topic
=
ETYPE_TOPIC
[
pyd
[
"equipmentType"
]]
topic
=
f
"{self.topic_prefix}{topic}/{pyd['equipmentCode']}"
self
.
mqtt_client
.
publish
(
topic
,
json
.
dumps
(
payload
),
qos
=
1
)
pub_cnt
+=
1
logger
.
info
(
f
"Poll finished, pub {pub_cnt} points."
)
except
Exception
as
e
:
logger
.
error
(
f
"Parse and forward data:{data_lst} fail, {e}"
)
continue
logger
.
info
(
f
"Poll finished, forward {pub_cnt} datas"
)
async
def
main_loop
(
self
):
while
True
:
try
:
loop_
start_ts
=
time
.
time
()
start_ts
=
time
.
time
()
# 1. 定期同步拓扑 (解决之前会卡死的 Bug)
if
not
self
.
topology_cache
or
(
loop_start_ts
-
self
.
last_sync_ts
>
TOPOLOGY_SYNC_INTVL
):
try
:
self
.
topology_cache
=
await
self
.
api_client
.
sync_device_infos
()
self
.
last_sync_ts
=
loop_start_ts
except
Exception
as
e
:
logger
.
error
(
f
"Sync topology fail, will retry next loop: {e}"
)
if
self
.
topology_cache
:
await
self
.
sync_once_data
()
# 3. 耗时补偿计算
sleep_ts
=
max
(
0.1
,
POLL_INTVL
-
(
time
.
time
()
-
loop_start_ts
))
sleep_ts
=
max
(
0.1
,
POLL_INTVL
-
(
time
.
time
()
-
start_ts
))
if
sleep_ts
>
0
:
await
asyncio
.
sleep
(
sleep_ts
)
except
Exception
as
e
:
logger
.
error
(
f
"Sync data fail, will retry next loop: {e}"
)
...
...
@@ -349,62 +298,43 @@ class WaterGrpService:
finally
:
await
self
.
stop
()
async
def
start
(
self
):
self
.
mqtt_client
=
MQTTClient
(
admin_client_id
(
"WaterGrpService"
))
self
.
mqtt_client
.
set_auth_credentials
(
self
.
username
,
mqtt_pwd
(
self
.
username
))
async
def
main
():
self
.
mqtt_client
.
on_connect
=
self
.
on_connect
self
.
mqtt_client
.
on_subscribe
=
self
.
on_subscribe
self
.
mqtt_client
.
on_disconnect
=
self
.
on_disconnect
await
self
.
mqtt_client
.
connect
(
SETTING
.
mqtt_host
,
SETTING
.
mqtt_port
)
await
self
.
main_loop
()
async
def
tools
():
api_client
=
WaterGrpClient
()
topology_cache
=
await
api_client
.
sync_device_infos
()
# print(topology_cache)
# pyd = {
# "siteId": 185,
# "equipmentCode": "SESPGI00002",
# "equipmentType": 221,
# "briefCodeList": [
# "TemAir",
# "StWork",
# "Edaily"
# ]
# }
dev_info
=
await
api_client
.
sync_device_infos
()
pyd
=
{
# 并网逆变器
"siteId"
:
185
,
"equipmentCode"
:
"SE
WIRE
00001"
,
"equipmentType"
:
2
5
1
,
"briefCodeList"
:
list
(
FIELD_MAP
[
2
44
]
.
keys
())
"equipmentCode"
:
"SE
SPGI
00001"
,
"equipmentType"
:
2
2
1
,
"briefCodeList"
:
list
(
FIELD_MAP
[
2
21
]
.
keys
())
}
pyd
=
{
"siteId"
:
185
,
"equipmentCode"
:
"SESBMS00004"
,
"equipmentType"
:
244
,
"briefCodeList"
:
list
(
FIELD_MAP
[
244
]
.
keys
())
}
pyd
=
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPCS00001"
,
"equipmentType"
:
243
,
"briefCodeList"
:
list
(
FIELD_MAP
[
243
]
.
keys
())
},
pyd
=
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPCS00001"
,
"equipmentType"
:
243
,
"briefCodeList"
:
[
'CharKwhD'
,
'DiscKwhD'
,
'CharKwhM'
,
'DiscKwhM'
,
'CharKwhY'
,
'DiscKwhY'
,
'EChargeT'
,
'EDischT'
]
}
# pyd = {
# "siteId": 185,
# "equipmentCode": "SEWIRE00004",
# "equipmentType": 251,
# "briefCodeList": ['ECharD', 'CharKwhD', 'EDischarD', 'DiscKwhD',
# 'EChargeT','EDischT' ]
# }
rlt
=
await
api_client
.
fetch_realtime
(
pyd
)
print
(
rlt
)
for
r
in
rlt
:
print
(
r
)
pyd
,
rlt
=
await
api_client
.
fetch_realtime
(
pyd
)
rlt
=
transfer_realtime
(
pyd
,
rlt
)
print
(
rlt
)
async
def
main
():
try
:
srv
=
WaterGrpService
()
await
srv
.
start
()
except
Exception
as
e
:
logger
.
error
(
f
"Sync data fail, will retry next loop: {e}"
)
if
__name__
==
"__main__"
:
logger
.
info
(
"水务集团采集服务 (基于 httpx) 正在启动..."
)
try
:
...
...
src/ems_water_grp/helper.py
View file @
6e6e8a21
...
...
@@ -4,113 +4,184 @@ DATE:2026/4/13 14:31
"""
EMS_DEVS
=
[
# 荷坳水厂水光储一体化站
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPGI00001"
,
"equipmentType"
:
221
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPGI00002"
,
"equipmentType"
:
221
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SESBMS00003"
,
"equipmentType"
:
244
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SESBMS00004"
,
"equipmentType"
:
244
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPCS00001"
,
"equipmentType"
:
243
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SESPCS00002"
,
"equipmentType"
:
243
},
{
"siteId"
:
185
,
"equipmentCode"
:
"SEWIRE00001"
,
"equipmentType"
:
251
},
]
ETYPE_TOPIC
=
{
221
:
"pv_inverter_ele"
,
244
:
"bms_stack_ele"
,
243
:
"acdc_ov_ele"
,
251
:
"pcc_ele"
}
EMS_WATER_MAP
=
{
185
:
911
,
# 荷坳-水光储-水厂
}
FIELD_MAP
=
{
221
:
{
# 并网逆变器
"Udc1"
:
"mppt1_u"
,
"Idc1"
:
"mppt1_i"
,
"Udc2"
:
"mppt2_u"
,
"Idc2"
:
"mppt2_i"
,
"Udc3"
:
"mppt3_u"
,
"Idc3"
:
"mppt3_i"
,
"Udc4"
:
"mppt4_u"
,
"Idc4"
:
"mppt4_i"
,
"Udc5"
:
"mppt5_u"
,
"Idc5"
:
"mppt5_i"
,
"Udc6"
:
"mppt6_u"
,
"Idc6"
:
"mppt6_i"
,
"Udc7"
:
"mppt7_u"
,
"Idc7"
:
"mppt7_i"
,
"Udc8"
:
"mppt8_u"
,
"Idc8"
:
"mppt8_i"
,
"Pdc"
:
"pttl_dc"
,
# 总直流功率 -> 输入功率
# ================= 交流侧 (AC) 电压、电流 =================
"Uab"
:
"uab"
,
# A-B线电压
"Ubc"
:
"ubc"
,
# B-C线电压
"Uca"
:
"uca"
,
# C-A线电压
"Ua"
:
"ua"
,
# A相电压
"Ub"
:
"ub"
,
# B相电压
"Uc"
:
"uc"
,
# C相电压
"Ia"
:
"ia"
,
# A相电流
"Ib"
:
"ib"
,
# B相电流
"Ic"
:
"ic"
,
# C相电流
# ================= 功率与频率 =================
"P"
:
"pttl"
,
# 总有功功率
"Q"
:
"qttl"
,
# 总无功功率
"Fac"
:
"costtl"
,
# 功率因数
"Freq"
:
"freq"
,
# 电网频率
# ================= 状态与效率 =================
"TemAir"
:
"pv_temp"
,
# 机内空气温度 -> 内部温度
"InvEffi"
:
"inv_eff"
,
# 逆变器转换效率 -> 转换效率
# ================= 发电量统计 =================
"Edaily"
:
"power_day"
,
# 日发电量 -> 当日发电量
"EMonth"
:
"power_month"
,
# 月发电量 -> 当月发电量
"EYear"
:
"power_year"
,
# 年发电量 -> 当年发电量
"Etotal"
:
"power_cumulate"
,
# 总发电量 -> 累计发电量
244
:
{
# 电池堆
'TotalVoltage'
:
'u'
,
'TotalCurrent'
:
'i'
,
'OperP'
:
'pttl'
,
},
'SOC'
:
'soc'
,
'SOH'
:
'soh'
,
'SOCMax'
:
'cell_soc_max'
,
'SOCMin'
:
'cell_soc_min'
,
244
:
{
# 电池堆
"TotalVoltage"
:
"u"
,
"TotalCurrent"
:
"i"
,
"OperP"
:
"pttl"
,
"SOC"
:
"soc"
,
"SOH"
:
"soh"
,
"SOCMax"
:
"cell_soc_max"
,
"SOCMin"
:
"cell_soc_min"
,
# ================= 单体电压、温度极值及定位 =================
'UcellAvg'
:
'cell_u_mean'
,
'UcellMax'
:
'cell_u_max'
,
'UcellNumMax'
:
'cell_u_max_cell_no'
,
'UcellGNumMax'
:
'cell_u_max_rack_no'
,
'UcellMin'
:
'cell_u_min'
,
'UcellNumMin'
:
'cell_u_min_cell_no'
,
'UcellGNumMin'
:
'cell_u_min_rack_no'
,
'TcellAvg'
:
'cell_temp_mean'
,
'TcellMax'
:
'cell_temp_max'
,
'TcellNumMax'
:
'cell_temp_max_cell_no'
,
'TcellGNumMax'
:
'cell_temp_max_rack_no'
,
'TcellMin'
:
'cell_temp_min'
,
'TcellNumMin'
:
'cell_temp_min_cell_no'
,
'TcellGNumMin'
:
'cell_temp_min_rack_no'
,
"UcellAvg"
:
"cell_u_mean"
,
"UcellMax"
:
"cell_u_max"
,
"UcellNumMax"
:
"cell_u_max_cell_no"
,
"UcellGNumMax"
:
"cell_u_max_rack_no"
,
"UcellMin"
:
"cell_u_min"
,
"UcellNumMin"
:
"cell_u_min_cell_no"
,
"UcellGNumMin"
:
"cell_u_min_rack_no"
,
"TcellAvg"
:
"cell_temp_mean"
,
"TcellMax"
:
"cell_temp_max"
,
"TcellNumMax"
:
"cell_temp_max_cell_no"
,
"TcellGNumMax"
:
"cell_temp_max_rack_no"
,
"TcellMin"
:
"cell_temp_min"
,
"TcellNumMin"
:
"cell_temp_min_cell_no"
,
"TcellGNumMin"
:
"cell_temp_min_rack_no"
,
# ================= 簇级 聚合极值 =================
'UgroupAvg'
:
'rack_u_mean'
,
'UgroupMax'
:
'rack_u_max'
,
'UgroupNumMax'
:
'rack_u_max_rack_no'
,
'UgroupMin'
:
'rack_u_min'
,
'UgroupNumMin'
:
'rack_u_min_rack_no'
,
'IgroupMax'
:
'rack_i_max'
,
'IgroupNumMax'
:
'rack_i_max_rack_no'
,
'IgroupMin'
:
'rack_i_min'
,
'IgroupNumMin'
:
'rack_i_min_rack_no'
,
'StChargEng'
:
'chg_cap_max_e'
,
# 堆最大可充电量
'StDischargEng'
:
'disc_cap_max_e'
,
# 堆最大可放电量
'ChargEngD'
:
'chg_cap_day'
,
# 日充电电量
'DischargEngD'
:
'disc_cap_day'
,
# 日放电电量
'ChargEngT'
:
'accum_chg_cap'
,
# 堆累计充电电量
'DischargEngT'
:
'accum_disc_cap'
,
# 堆累计放电电量
'CharHourT'
:
'accum_chg_time'
,
# 堆累计充电时间
'DischarHourT'
:
'accum_disc_time'
,
# 堆累计放电时间
"UgroupAvg"
:
"rack_u_mean"
,
"UgroupMax"
:
"rack_u_max"
,
"UgroupNumMax"
:
"rack_u_max_rack_no"
,
"UgroupMin"
:
"rack_u_min"
,
"UgroupNumMin"
:
"rack_u_min_rack_no"
,
"IgroupMax"
:
"rack_i_max"
,
"IgroupNumMax"
:
"rack_i_max_rack_no"
,
"IgroupMin"
:
"rack_i_min"
,
"IgroupNumMin"
:
"rack_i_min_rack_no"
,
"StChargEng"
:
"chg_cap_max_e"
,
# 堆最大可充电量
"StDischargEng"
:
"disc_cap_max_e"
,
# 堆最大可放电量
"ChargEngD"
:
"chg_cap_day"
,
# 日充电电量
"DischargEngD"
:
"disc_cap_day"
,
# 日放电电量
"ChargEngT"
:
"accum_chg_cap"
,
# 堆累计充电电量
"DischargEngT"
:
"accum_disc_cap"
,
# 堆累计放电电量
"CharHourT"
:
"accum_chg_time"
,
# 堆累计充电时间
"DischarHourT"
:
"accum_disc_time"
,
# 堆累计放电时间
},
243
:
{
# 变流器
'Uab'
:
'uab'
,
'Ubc'
:
'ubc'
,
'Uca'
:
'uca'
,
'Ia'
:
'ia'
,
'Ib'
:
'ib'
,
'Ic'
:
'ic'
,
'Psum'
:
'pttl'
,
'Qsum'
:
'qttl'
,
'TotalAppaP'
:
'sttl'
,
'Cos'
:
'costtl'
,
'Vdc'
:
'udc'
,
'Idc'
:
'idc'
,
'Pdc'
:
'pdc'
,
243
:
{
# 储能变流器
"Uab"
:
"uab"
,
"Ubc"
:
"ubc"
,
"Uca"
:
"uca"
,
"Ia"
:
"ia"
,
"Ib"
:
"ib"
,
"Ic"
:
"ic"
,
"Psum"
:
"pttl"
,
"Qsum"
:
"qttl"
,
"TotalAppaP"
:
"sttl"
,
"Cos"
:
"costtl"
,
"Vdc"
:
"udc"
,
"Idc"
:
"idc"
,
"Pdc"
:
"pdc"
,
# 正向-充电(用电),负向-放电(发电)
'CharKwhD'
:
'kwhttl_day_n'
,
# 日累计充电量
'DiscKwhD'
:
'kwhttl_day_p'
,
# 日累计放电量
'CharKwhM'
:
'kwhttl_month_n'
,
# 月累计充电量 -> PCS 交流反向有功总电能(月)
'DiscKwhM'
:
'kwhttl_month_p'
,
# 月累计放电量 -> PCS 交流正向有功总电能(月)
'CharKwhY'
:
'kwhttl_year_n'
,
# 年累计充电量 -> PCS 交流反向有功总电能(年)
'DiscKwhY'
:
'kwhttl_year_p'
,
# 年累计放电量 -> PCS 交流正向有功总电能(年)
'EChargeT'
:
'kwhttl_sum_n'
,
# 总充电量 -> PCS 交流反向有功总电能(总)
'EDischT'
:
'kwhttl_sum_p'
,
# 总放电量 -> PCS 交流正向有功总电能(总)
# 'CharKwhD-DiscKwhD': "kwhttl_day", # PCS有功总电能累计值(日)
"CharKwhD"
:
"kwhttl_day_n"
,
# 日累计充电量
"DiscKwhD"
:
"kwhttl_day_p"
,
# 日累计放电量
"CharKwhM"
:
"kwhttl_month_n"
,
# 月累计充电量 -> PCS 交流反向有功总电能(月)
"DiscKwhM"
:
"kwhttl_month_p"
,
# 月累计放电量 -> PCS 交流正向有功总电能(月)
"CharKwhY"
:
"kwhttl_year_n"
,
# 年累计充电量 -> PCS 交流反向有功总电能(年)
"DiscKwhY"
:
"kwhttl_year_p"
,
# 年累计放电量 -> PCS 交流正向有功总电能(年)
"EChargeT"
:
"kwhttl_sum_n"
,
# 总充电量 -> PCS 交流反向有功总电能(总)
"EDischT"
:
"kwhttl_sum_p"
,
# 总放电量 -> PCS 交流正向有功总电能(总)
},
251
:
{
# 并网电表
'Ua'
:
'ua'
,
'Ub'
:
'ub'
,
'Uc'
:
'uc'
,
'Uab'
:
'uab'
,
'Ubc'
:
'ubc'
,
'Uca'
:
'uca'
,
'Ia'
:
'ia'
,
'Ib'
:
'ib'
,
'Ic'
:
'ic'
,
'Freq'
:
'freq'
,
'Pa'
:
'pa'
,
'Pb'
:
'pb'
,
'Pc'
:
'pc'
,
'P'
:
'pttl'
,
'Qa'
:
'qa'
,
'Qb'
:
'qb'
,
'Qc'
:
'qc'
,
'Q'
:
'qttl'
,
'Sa'
:
'sa'
,
'Sb'
:
'sb'
,
'Sc'
:
'sc'
,
'S'
:
'sttl'
,
'FacA'
:
'cosa'
,
'FacB'
:
'cosb'
,
'FacC'
:
'cosc'
,
'Fac'
:
'costtl'
,
251
:
{
# 并网电表
"Ua"
:
"ua"
,
"Ub"
:
"ub"
,
"Uc"
:
"uc"
,
"Uab"
:
"uab"
,
"Ubc"
:
"ubc"
,
"Uca"
:
"uca"
,
"Ia"
:
"ia"
,
"Ib"
:
"ib"
,
"Ic"
:
"ic"
,
"Freq"
:
"freq"
,
"Pa"
:
"pa"
,
"Pb"
:
"pb"
,
"Pc"
:
"pc"
,
"P"
:
"pttl"
,
"Qa"
:
"qa"
,
"Qb"
:
"qb"
,
"Qc"
:
"qc"
,
"Q"
:
"qttl"
,
"Sa"
:
"sa"
,
"Sb"
:
"sb"
,
"Sc"
:
"sc"
,
"S"
:
"sttl"
,
"FacA"
:
"cosa"
,
"FacB"
:
"cosb"
,
"FacC"
:
"cosc"
,
"Fac"
:
"costtl"
,
"EletD"
:
"power_day"
,
"EletM"
:
"power_month"
,
...
...
@@ -122,13 +193,13 @@ FIELD_MAP = {
def
transfer_realtime
(
pyd
,
datas
):
cid
,
sid
,
=
pyd
[
"siteId"
],
pyd
[
"equipmentCode"
]
e
_t
ype
=
pyd
[
"equipmentType"
]
rlt
=
dict
(
cid
=
cid
,
mid
=
sid
,
nm
=
sid
,
e_type
=
e_typ
e
)
cid
,
equipmentCode
,
=
pyd
[
"siteId"
],
pyd
[
"equipmentCode"
]
e
quipmentT
ype
=
pyd
[
"equipmentType"
]
rlt
=
dict
(
cid
=
cid
,
mid
=
equipmentCode
,
nm
=
equipmentCod
e
)
t
=
datas
[
0
][
"pushTime"
]
tags
=
{}
for
r
in
datas
:
brief_code
=
r
[
"keyword"
]
.
split
(
"/"
)[
-
1
]
tags
[
FIELD_MAP
[
e
_t
ype
][
brief_code
]]
=
r
[
"value"
]
tags
[
FIELD_MAP
[
e
quipmentT
ype
][
brief_code
]]
=
r
[
"value"
]
rlt
[
"images"
]
=
[
dict
(
t
=
t
,
tags
=
tags
)]
return
rlt
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment