Commit c1ce4f04 authored by ZZH's avatar ZZH

add water meter forward 2026-6-23 17:16

parent 86978577
...@@ -350,14 +350,6 @@ SCADA_FIELDS_MAP = { ...@@ -350,14 +350,6 @@ SCADA_FIELDS_MAP = {
"FCBF_10DP_TSCJ_WNGH2_ZYGGL_AI": {"sid": "ZXC_10DP_TSCJ_WNGH2", "FCBF_10DP_TSCJ_WNGH2_ZYGGL_AI": {"sid": "ZXC_10DP_TSCJ_WNGH2",
"field": "pttl"}, "field": "pttl"},
# 水表
# DN1400出厂水累计流量
"5f31a1d0-9d19-6287-3b35-3a0ca7259ce3": {"sid": "ZXC_SB_DN1400",
"field": "meter_value"},
# DN2000出厂水累计流量
"66a7964e-7bcf-57a4-77f5-3a0ca724ef58": {"sid": "ZXC_SB_DN2000",
"field": "meter_value"}
}, },
905: { 905: {
# 獭湖水厂 # 獭湖水厂
...@@ -462,11 +454,6 @@ SCADA_FIELDS_MAP = { ...@@ -462,11 +454,6 @@ SCADA_FIELDS_MAP = {
"field": "costtl"}, "field": "costtl"},
"ZNPD_THSC_d0523152727__P": {"sid": "TH_d0523152727", "field": "pttl"}, "ZNPD_THSC_d0523152727__P": {"sid": "TH_d0523152727", "field": "pttl"},
# 水表
# 出厂水累计流量
"02f21875-7cf5-d145-b8d8-3a0c137ae145": {"sid": "TH_SB_1",
"field": "meter_value"}
}, },
909: { 909: {
# 1#变压器电表 # 1#变压器电表
...@@ -574,13 +561,6 @@ SCADA_FIELDS_MAP = { ...@@ -574,13 +561,6 @@ SCADA_FIELDS_MAP = {
"NK_DB52_PF": {"sid": "NK_DB52", "field": "costtl"}, "NK_DB52_PF": {"sid": "NK_DB52", "field": "costtl"},
"NK_DB52_PZ": {"sid": "NK_DB52", "field": "pttl"}, "NK_DB52_PZ": {"sid": "NK_DB52", "field": "pttl"},
# 水表
# 送水泵房1#流量计累计流量
"8d11d6c4-ba07-4a2c-a37a-8cd89a712431": {"sid": "NK_SB_SSBF1",
"field": "meter_value"},
# 送水泵房2#流量计累计流量
"4119f05a-1233-40aa-b2fd-2c60bfda012d": {"sid": "NK_SB_SSBF2",
"field": "meter_value"}
}, },
911: { 911: {
# 荷坳水厂 # 荷坳水厂
...@@ -604,11 +584,6 @@ SCADA_FIELDS_MAP = { ...@@ -604,11 +584,6 @@ SCADA_FIELDS_MAP = {
"HA_SSBF_SSB3_GLYS": {"sid": "HA_SSB3", "field": "costtl"}, "HA_SSBF_SSB3_GLYS": {"sid": "HA_SSB3", "field": "costtl"},
"HA_SSBF_SSB3_YGGL": {"sid": "HA_SSB3", "field": "pttl"}, "HA_SSBF_SSB3_YGGL": {"sid": "HA_SSB3", "field": "pttl"},
# 水表
# 出厂水累计流量读数
"d243a49f-4c2b-b992-95ce-3a0bea8781ef": {"sid": "HA_SB_1",
"field": "meter_value"},
}, },
1501: { 1501: {
...@@ -845,14 +820,6 @@ SCADA_FIELDS_MAP = { ...@@ -845,14 +820,6 @@ SCADA_FIELDS_MAP = {
"DB_P19_ZXYGDD": {"sid": "MK_DB_P19", "field": "kwhttl_p"}, "DB_P19_ZXYGDD": {"sid": "MK_DB_P19", "field": "kwhttl_p"},
"DB_P19_GLYS": {"sid": "MK_DB_P19", "field": "costtl"}, "DB_P19_GLYS": {"sid": "MK_DB_P19", "field": "costtl"},
# 水表
# 木古出水累计流量
"66efe33d-589f-0a41-574b-3a0593d34e3f": {"sid": "MK_SB_GMCS",
"field": "meter_value"},
# 平湖出水累计流量
"cdf1f5ac-b590-9b69-f0f2-3a0593d1aa84": {"sid": "MK_SB_PHCS",
"field": "meter_value"}
}, },
908: { 908: {
...@@ -1023,15 +990,6 @@ SCADA_FIELDS_MAP = { ...@@ -1023,15 +990,6 @@ SCADA_FIELDS_MAP = {
"field": "costtl"}, "field": "costtl"},
"ZNPD_EGLSC_d0522150639__P": {"sid": "EGL_d0522150639", "ZNPD_EGLSC_d0522150639__P": {"sid": "EGL_d0522150639",
"field": "pttl"}, "field": "pttl"},
# 水表
# 供平湖出厂水累计流量
"b63f01a4-dfae-495b-b8a0-827e9931adb1": {"sid": "EGL_SB_PH",
"field": "meter_value"},
# 供良安田出厂水累计流量
"f2967293-982d-4092-b7af-18fa3f20b1f7": {"sid": "EGL_SB_LAT",
"field": "meter_value"}
}, },
903: { 903: {
...@@ -1126,14 +1084,6 @@ SCADA_FIELDS_MAP = { ...@@ -1126,14 +1084,6 @@ SCADA_FIELDS_MAP = {
"field": "costtl"}, "field": "costtl"},
"ZNPD_SHSC_d0426210030__P": {"sid": "SH_d0426210030", "field": "pttl"}, "ZNPD_SHSC_d0426210030__P": {"sid": "SH_d0426210030", "field": "pttl"},
# 水表
# 出厂水DN800累计流量
"4292ac6a-cf4d-fc0f-972e-3a0985efda44": {"sid": "SH_SB_DN800",
"field": "meter_value"},
# 出厂水DN1200累计流量
"42b30f4a-ba68-dc52-1d40-3a0985f0e40f": {"sid": "SH_SB_DN1200",
"field": "meter_value"}
}, },
901: { 901: {
# 塘岭水厂 # 塘岭水厂
...@@ -1171,12 +1121,6 @@ SCADA_FIELDS_MAP = { ...@@ -1171,12 +1121,6 @@ SCADA_FIELDS_MAP = {
"SSBF_DB1_SSB4_ZYGDN_AI": {"sid": "TL_DB1_SSB4", "field": "kwhttl_p"}, "SSBF_DB1_SSB4_ZYGDN_AI": {"sid": "TL_DB1_SSB4", "field": "kwhttl_p"},
"SSBF_DB1_SSB4_ZGLYS_AI": {"sid": "TL_DB1_SSB4", "field": "costtl"}, "SSBF_DB1_SSB4_ZGLYS_AI": {"sid": "TL_DB1_SSB4", "field": "costtl"},
"SSBF_DB1_SSB4_ZYGGL_AI": {"sid": "TL_DB1_SSB4", "field": "pttl"}, "SSBF_DB1_SSB4_ZYGGL_AI": {"sid": "TL_DB1_SSB4", "field": "pttl"},
# 水表
# 塘岭出厂水累计流量
"8b63a7d8-6746-2373-8439-3a0b3651b7a0": {"sid": "TL_SB_1",
"field": "meter_value"},
}, },
902: { 902: {
...@@ -1190,10 +1134,6 @@ SCADA_FIELDS_MAP = { ...@@ -1190,10 +1134,6 @@ SCADA_FIELDS_MAP = {
"630KVAJXZG_PSum": {"sid": "TX_630KVAJXZG", "field": "costtl"}, "630KVAJXZG_PSum": {"sid": "TX_630KVAJXZG", "field": "costtl"},
"630KVAJXZG_P": {"sid": "TX_630KVAJXZG", "field": "pttl"}, "630KVAJXZG_P": {"sid": "TX_630KVAJXZG", "field": "pttl"},
# 水表
# 出厂水累计流量
"189ddc12-1d12-349d-a2e0-3a0ea113eb97": {"sid": "TX_SB_1",
"field": "meter_value"},
}, },
904: { 904: {
...@@ -1278,10 +1218,61 @@ SCADA_FIELDS_MAP = { ...@@ -1278,10 +1218,61 @@ SCADA_FIELDS_MAP = {
"PD_P2_PF": {"sid": "KZ_PD_P2", "field": "costtl"}, "PD_P2_PF": {"sid": "KZ_PD_P2", "field": "costtl"},
"PD_P2_PT": {"sid": "KZ_PD_P2", "field": "pttl"}, "PD_P2_PT": {"sid": "KZ_PD_P2", "field": "pttl"},
# 水表
# 出厂水累计流量
"c3561e90-0144-2b12-90c9-3a0dc41ae323": {"sid": "KZ_SB_1",
"field": "meter_value"},
}, },
} }
WATER_SCADA_MAP = {
# 中心城
# DN1400出厂水累计流量
"5f31a1d0-9d19-6287-3b35-3a0ca7259ce3": {"sid": "ZXC_SB_DN1400",
"cid": 907},
# DN2000出厂水累计流量
"66a7964e-7bcf-57a4-77f5-3a0ca724ef58": {"sid": "ZXC_SB_DN2000",
"cid": 907},
# 獭湖水厂
# 出厂水累计流量
"02f21875-7cf5-d145-b8d8-3a0c137ae145": {"sid": "TH_SB_1", "cid": 905},
# 南坑水厂
# 送水泵房1#流量计累计流量
"8d11d6c4-ba07-4a2c-a37a-8cd89a712431": {"sid": "NK_SB_SSBF1", "cid": 910},
# 送水泵房2#流量计累计流量
"4119f05a-1233-40aa-b2fd-2c60bfda012d": {"sid": "NK_SB_SSBF2", "cid": 910},
# 荷坳水厂
# 出厂水累计流量读数
"d243a49f-4c2b-b992-95ce-3a0bea8781ef": {"sid": "HA_SB_1", "cid": 911},
# 苗坑水厂
# 木古出水累计流量
"66efe33d-589f-0a41-574b-3a0593d34e3f": {"sid": "MK_SB_GMCS", "cid": 1501},
# 平湖出水累计流量
"cdf1f5ac-b590-9b69-f0f2-3a0593d1aa84": {"sid": "MK_SB_PHCS", "cid": 1501},
# 鹅公岭水厂
# 供平湖出厂水累计流量
"b63f01a4-dfae-495b-b8a0-827e9931adb1": {"sid": "EGL_SB_PH", "cid": 908},
# 供良安田出厂水累计流量
"f2967293-982d-4092-b7af-18fa3f20b1f7": {"sid": "EGL_SB_LAT", "cid": 908},
# 沙湖水厂
# 出厂水DN800累计流量
"4292ac6a-cf4d-fc0f-972e-3a0985efda44": {"sid": "SH_SB_DN800", "cid": 903},
# 出厂水DN1200累计流量
"42b30f4a-ba68-dc52-1d40-3a0985f0e40f": {"sid": "SH_SB_DN1200",
"cid": 903},
# 塘岭水厂
# 塘岭出厂水累计流量
"8b63a7d8-6746-2373-8439-3a0b3651b7a0": {"sid": "TL_SB_1", "cid": 901},
# 田心水厂
# 出厂水累计流量
"189ddc12-1d12-349d-a2e0-3a0ea113eb97": {"sid": "TX_SB_1", "cid": 902},
# 坑梓水厂
# 出厂水累计流量
"c3561e90-0144-2b12-90c9-3a0dc41ae323": {"sid": "KZ_SB_1", "cid": 904},
}
# -*- coding:utf-8 -*-
"""
DATE:2026/6/23 16:09
功能:定时从 InfluxDB 接口读取最后一条数据,并通过 MQTT 转发到 EMQX 平台。
"""
import asyncio
import json
import signal
from gmqtt import Client as MQTTClient
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from ems_water_grp.constants import WATER_SCADA_MAP
from utils.utils import admin_client_id, mqtt_pwd
from infra.http.httpx_util import HttpxClient
from infra.config.settings import SETTING
from infra.logger.logger import Logger
# 初始化日志
log_name = f"fwd_water_meter"
Logger.init_logger_path(f"./ems_water_grp", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
# InfluxDB 请求接口地址
INFLUX_URL = "http://172.18.4.77/influx/origin/queryLast"
# 查询载荷
INFLUX_PAYLOAD = {
"facIds": [
{
"factoryId": 907,
"ids": [
"5f31a1d0-9d19-6287-3b35-3a0ca7259ce3",
"66a7964e-7bcf-57a4-77f5-3a0ca724ef58"
]
},
{
"factoryId": 905,
"ids": [
"02f21875-7cf5-d145-b8d8-3a0c137ae145"
]
},
{
"factoryId": 910,
"ids": [
"8d11d6c4-ba07-4a2c-a37a-8cd89a712431",
"4119f05a-1233-40aa-b2fd-2c60bfda012d"
]
},
{
"factoryId": 911,
"ids": [
"d243a49f-4c2b-b992-95ce-3a0bea8781ef"
]
},
{
"factoryId": 1501,
"ids": [
"66efe33d-589f-0a41-574b-3a0593d34e3f",
"cdf1f5ac-b590-9b69-f0f2-3a0593d1aa84"
]
},
{
"factoryId": 908,
"ids": [
"b63f01a4-dfae-495b-b8a0-827e9931adb1",
"f2967293-982d-4092-b7af-18fa3f20b1f7"
]
},
{
"factoryId": 903,
"ids": [
"4292ac6a-cf4d-fc0f-972e-3a0985efda44",
"42b30f4a-ba68-dc52-1d40-3a0985f0e40f"
]
},
{
"factoryId": 901,
"ids": [
"8b63a7d8-6746-2373-8439-3a0b3651b7a0"
]
},
{
"factoryId": 902,
"ids": [
"189ddc12-1d12-349d-a2e0-3a0ea113eb97"
]
},
{
"factoryId": 904,
"ids": [
"c3561e90-0144-2b12-90c9-3a0dc41ae323"
]
}
],
"dataTypeId": 1,
"isPostProcess": 0
}
class FwdWaterMeterSrv:
username = "pot_emqx_super"
topic_prefix = "eems/td/water"
def __init__(self):
self.http_client = HttpxClient()
self.mqtt_client = None
self._stop_event = asyncio.Event()
@staticmethod
def on_connect(client, flags, rc, properties):
logger.info(f"Connected to EMQX success, rc: {rc}")
@staticmethod
def on_disconnect(client, packet, exc=None):
logger.warning(f"Disconnected from EMQX, exc: {exc}")
async def fwd_msg(self, records):
pub_cnt = 0
try:
for item in records:
uid = item.get("id")
if uid not in WATER_SCADA_MAP:
continue
d_map = WATER_SCADA_MAP[uid]
cid, sid = d_map["cid"], d_map["sid"]
try:
val = float(item.get("value", 0))
except (ValueError, TypeError):
val = item.get("value")
payload = {"cid": cid, "mid": sid, "nm": sid,
"images": [{"t": item.get("time", ""),
"tags": {"meter_value": val}}]}
# 动态生成 Topic 并推送
topic = f"{self.topic_prefix}/{sid}"
if self.mqtt_client:
self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
pub_cnt += 1
else:
logger.error("MQTT client is not initialized.")
except Exception as e:
logger.error(e)
logger.info(f"Forwarded {pub_cnt} valid records to EMQX.")
async def fetch_and_forward(self):
""" 核心逻辑:发起 HTTP 请求拉取数据,随后转发至 MQTT """
logger.info("Start fetching data from InfluxDB...")
try:
resp_str, status_code = await self.http_client.post(
INFLUX_URL, headers={"Content-Type": "application/json"},
json=INFLUX_PAYLOAD
)
if status_code != 200:
logger.error(f"Fetch fail, Code:{status_code}, {resp_str}")
return
try:
resp_data = json.loads(resp_str)
except json.JSONDecodeError:
logger.error(f"decode rsp fail, resp: {resp_str}")
return
if resp_data.get("code") != 0:
logger.error(f"Parser code err: {resp_data.get('message')}")
return
records = resp_data.get("data", [])
if not records:
logger.error("Fetch success but no data in response")
return
await self.fwd_msg(records)
except Exception as e:
logger.error(f"Exception occurred in fetch_and_forward: {e}")
async def stop(self):
logger.info("Shutting down InfluxForward Service...")
self._stop_event.set()
await self.http_client.close()
try:
if self.mqtt_client:
await self.mqtt_client.disconnect()
except Exception as e:
logger.error(f"Error during MQTT disconnect: {e}")
async def start(self):
self.mqtt_client = MQTTClient(admin_client_id("FwdWaterMeterSrv"))
self.mqtt_client.set_auth_credentials(self.username,
mqtt_pwd(self.username))
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_disconnect = self.on_disconnect
await self.mqtt_client.connect(SETTING.mqtt_host, SETTING.mqtt_port)
scheduler = AsyncIOScheduler()
scheduler.add_job(self.fetch_and_forward, CronTrigger(minute="5"))
scheduler.start()
logger.info("Scheduler started, job aligned to run every 2 minutes.")
# 监听系统退出信号
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self._stop_event.set)
logger.info("Fwd Water Meter Srv started. Waiting for stop signal...")
try:
await self._stop_event.wait()
except (KeyboardInterrupt, SystemExit):
logger.info("Service interruption received.")
finally:
await self.stop()
async def main():
while True:
try:
srv = FwdWaterMeterSrv()
await srv.start()
break
except Exception as e:
logger.error(f"Fatal error occurred, retrying in 30 seconds: {e}")
await asyncio.sleep(30)
if __name__ == "__main__":
logger.info("Fwd Water Meter Srv is starting...")
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("进程被用户手动终止。")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment