Commit d3ec7ab3 authored by ZZH's avatar ZZH

change to share sub 2026-6-25 15:22

parent ea9455a4
......@@ -8,6 +8,7 @@ import time
import asyncio
import signal
import pendulum
import argparse
from utils.time_format import CST, YMD_Hms
from gmqtt import Client as MQTTClient
from ems_water_grp.constants import SCADA_FIELDS_MAP
......@@ -16,21 +17,19 @@ from ems_water_grp.helper import load_mtype_from_sm
from infra.config.settings import SETTING
from infra.logger.logger import Logger
log_name = f"meter3rd_forward"
Logger.init_logger_path(f"./ems_water_grp", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
class Meter3rdForward:
sub_host = "172.18.4.82"
sub_uname = "nyconsumer"
sub_pwd = "nyconsumer!321"
sub_topic = "factory/data"
# sub_topic = "factory/data"
sub_topic = "$share/qkup/factory/data"
pub_username = "pot_emqx_super"
pub_topic_pfx = "eems/td/"
def __init__(self):
def __init__(self, worker_id):
self.worker_id = worker_id
self.client_sub = None
self.client_pub = None
......@@ -69,7 +68,6 @@ class Meter3rdForward:
data = json.loads(msg_str)
fId = data.get("fId")
if fId:
logger.info(f"fId:{fId}, {data}")
self.d_last_msg[fId] = msg_str
except Exception as e:
logger.error(f"Decode msg:{payload} error: {e}")
......@@ -126,8 +124,8 @@ class Meter3rdForward:
async def snapshot_sampler(self):
while not self.stop_event.is_set():
try:
sec_to_min = 60 - (time.time() % 60) + 5
await asyncio.sleep(sec_to_min)
# sec_to_min = 60 - (time.time() % 60) + 5
# await asyncio.sleep(sec_to_min)
if not self.d_last_msg:
continue
......@@ -138,6 +136,7 @@ class Meter3rdForward:
if fid in SCADA_FIELDS_MAP:
await self.parse_and_forward(fid, raw_msg)
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Processing loop error: {e}")
await asyncio.sleep(1)
......@@ -159,7 +158,8 @@ class Meter3rdForward:
pass
async def start(self):
self.client_sub = MQTTClient(admin_client_id("Meter3rdForward"))
client_id = admin_client_id(f"Meter3rdForward{self.worker_id}")
self.client_sub = MQTTClient(client_id)
self.client_sub.set_auth_credentials(self.sub_uname, self.sub_pwd)
self.client_sub.on_connect = self.on_sub_connect
self.client_sub.on_subscribe = self.on_sub_subscribe
......@@ -200,10 +200,10 @@ class Meter3rdForward:
await self.stop()
async def main():
async def main(worker_id):
while True:
try:
srv = Meter3rdForward()
srv = Meter3rdForward(worker_id)
await srv.start()
break
except Exception as e:
......@@ -212,7 +212,21 @@ async def main():
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"log_num",
metavar="log_num",
type=str,
help="log file number"
)
args = parser.parse_args()
log_num = args.log_num
log_name = f"meter3rd_forward_{log_num}"
Logger.init_logger_path(f"./ems_water_grp", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
try:
asyncio.run(main())
asyncio.run(main(log_num))
except KeyboardInterrupt:
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment