Commit b93d4e1b authored by ZZH's avatar ZZH

opt meter 3rd 2026-4-24 14:23

parent 7e30f302
This diff is collapsed.
......@@ -3,71 +3,158 @@
DATE:2026/4/22 14:58
"""
import json
import time
import asyncio
import signal
import pendulum
from utils.time_format import CST, YMD_Hms
from gmqtt import Client as MQTTClient
from utils.utils import admin_client_id
from ems_water_grp.constants import SCADA_FIELDS_MAP
from utils.utils import admin_client_id, mqtt_pwd
from infra.config.settings import SETTING
from infra.logger.logger import Logger
log_name = f"meter_3rd"
log_name = f"meter3rd_forward"
Logger.init_logger_path(f"./ems_water_grp", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
MQTT_BROKER = "172.18.4.82"
MQTT_USER = "nyconsumer"
MQTT_PWD = "nyconsumer!321"
TOPIC = "factory/data"
class Meter3rdForward:
sub_host = "172.18.4.82"
sub_uname = "nyconsumer"
sub_pwd = "nyconsumer!321"
sub_topic = "factory/data"
class Meter3rdSrv:
topic = "factory/data"
pub_username = "pot_emqx_super"
pub_topic_prefix = "eems/td/"
def __init__(self):
self.mqtt_client = None
self.client_sub = None
self.client_pub = None
self.stop_event = asyncio.Event()
self.d_last_msg = {}
@property
def pub_pwd(self):
return mqtt_pwd(self.pub_username)
@staticmethod
def on_subscribe(client, mid, qos, properties):
logger.info(f"Sub to {TOPIC} success, mid: {mid}")
def on_sub_subscribe(client, mid, qos, properties):
logger.info(f"Sub to {Meter3rdForward.sub_topic} success, mid: {mid}")
@staticmethod
def on_connect(client, flags, rc, properties):
logger.info(f"Connected to EMQX success, rc: {rc}")
client.subscribe(TOPIC, qos=1)
def on_sub_connect(client, flags, rc, properties):
logger.info(f"Connected to {Meter3rdForward.sub_host} success rc:{rc}")
client.subscribe(Meter3rdForward.sub_topic, qos=1)
@staticmethod
def on_disconnect(client, packet, exc=None):
logger.warning(f"Disconnected from EMQX, exc: {exc}")
def on_sub_disconnect(client, packet, exc=None):
logger.warning(f"Disconnected from {Meter3rdForward.sub_host} {exc}")
@staticmethod
def on_message(client, topic, payload, qos, properties):
payload = payload.decode("utf-8")
logger.info(f"Received message: topic={topic}, payload={payload}")
def on_pub_connect(client, flags, rc, properties):
logger.info(f"Connected to {SETTING.mqtt_host} success, rc: {rc}")
@staticmethod
def on_pub_disconnect(client, packet, exc=None):
logger.warning(f"Disconnected from EMQX {SETTING.mqtt_host} exc {exc}")
def on_sub_message(self, client, topic, payload, qos, properties):
try:
msg_str = payload.decode("utf-8")
data = json.loads(msg_str)
cid = data.get("fId")
if cid:
self.d_last_msg[cid] = msg_str
except Exception as e:
logger.error(f"Decode msg:{payload} error: {e}")
async def parse_and_forward(self, cid, raw_payload):
""" 重新封装消息,推送至EMQX """
try:
d_pyds = {}
data = json.loads(raw_payload)
ts = pendulum.from_timestamp(data["time"], tz=CST).format(YMD_Hms)
for item in data["items"]:
try:
scada_id = item["n"]
if scada_id in SCADA_FIELDS_MAP[cid].keys():
tmp = SCADA_FIELDS_MAP[cid][scada_id]
mid = tmp["sid"]
tag = {tmp["field"]: item["v"]}
if mid in d_pyds:
d_pyds[mid]["images"][0]["tags"].update(tag)
else:
d_pyds[mid] = {"cid": cid, "mid": mid, "nm": mid,
"images": [{"t": ts, "tags": tag}]}
except Exception as e:
continue
for sid, payload in d_pyds.items():
logger.info(f"{sid}: {payload}")
self.client_pub.publish(
f"{self.pub_topic_prefix}/load_ele/{sid}",
json.dumps(payload), qos=1)
except Exception as e:
logger.error(f"parse_and_forward error: {e}")
async def snapshot_sampler(self):
while not self.stop_event.is_set():
try:
sec_to_min = 60 - (time.time() % 60) + 5
await asyncio.sleep(sec_to_min)
if not self.d_last_msg:
continue
cur_batch = dict(self.d_last_msg)
self.d_last_msg.clear()
for cid, raw_msg in cur_batch.items():
if cid in SCADA_FIELDS_MAP:
await self.parse_and_forward(cid, raw_msg)
except Exception as e:
logger.error(f"Processing loop error: {e}")
await asyncio.sleep(1)
async def stop(self):
logger.info("Shutting down Service...")
if self.mqtt_client:
await self.mqtt_client.disconnect()
logger.info("Shutting down Meter3rdForward Service...")
if self.client_sub:
await self.client_sub.disconnect()
if self.client_pub:
await self.client_pub.disconnect()
self.stop_event.set()
async def start(self):
self.mqtt_client = MQTTClient(admin_client_id("WaterGrpService"))
self.mqtt_client.set_auth_credentials(MQTT_USER, MQTT_PWD)
self.client_sub = MQTTClient(admin_client_id("Meter3rdForward"))
self.client_sub.set_auth_credentials(self.sub_uname, self.sub_pwd)
self.client_sub.on_connect = self.on_sub_connect
self.client_sub.on_subscribe = self.on_sub_subscribe
self.client_sub.on_disconnect = self.on_sub_disconnect
self.client_sub.on_message = self.on_sub_message
self.client_pub = MQTTClient(admin_client_id("Meter3rdForward"))
self.client_pub.set_auth_credentials(self.pub_username, self.pub_pwd)
self.client_pub.on_connect = self.on_pub_connect
self.client_pub.on_disconnect = self.on_pub_disconnect
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_subscribe = self.on_subscribe
self.mqtt_client.on_disconnect = self.on_disconnect
self.mqtt_client.on_message = self.on_message
await asyncio.gather(
self.client_sub.connect(self.sub_host, 1883),
self.client_pub.connect(SETTING.mqtt_host, SETTING.mqtt_port)
)
await self.mqtt_client.connect(MQTT_BROKER, 1883)
asyncio.create_task(self.snapshot_sampler())
await self.stop_event.wait()
async def main():
srv = Meter3rdSrv()
srv = Meter3rdForward()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(srv.stop()))
......@@ -76,4 +163,7 @@ async def main():
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment