Commit 9e291eec authored by ZZH's avatar ZZH

opt ems collector 2026-4-30 17:57

parent 9d4ca45c
......@@ -7,8 +7,11 @@ import asyncio
import json
import time
import math
import signal
from typing import Dict, List, Optional, Tuple
from gmqtt import Client as MQTTClient
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from utils.encrypt import gen_sign
from utils.constants import USER_AES
......@@ -234,8 +237,7 @@ class WaterGrpService:
self.api_client = WaterGrpClient()
self.mqtt_client = None
self.topology_cache = []
self.last_sync_ts = 0
self._stop_event = asyncio.Event()
@staticmethod
def on_subscribe(client, mid, qos, properties):
......@@ -255,8 +257,11 @@ class WaterGrpService:
async def stop(self):
logger.info("Shutting down Service...")
self._stop_event.set()
await self.api_client.close()
await self.mqtt_client.disconnect()
if self.mqtt_client:
await self.mqtt_client.disconnect()
async def sync_once_data(self):
logger.info(f"Start poll real data)")
......@@ -317,7 +322,24 @@ class WaterGrpService:
await self.mqtt_client.connect(SETTING.mqtt_host, SETTING.mqtt_port)
await self.main_loop()
# await self.main_loop()
scheduler = AsyncIOScheduler()
scheduler.add_job(self.sync_once_data, CronTrigger(second="2"))
scheduler.start()
logger.info("Scheduler started, job aligned to 2nd of every minute")
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self._stop_event.set)
logger.info("Service Engine started. Waiting for stop signal...")
try:
await self._stop_event.wait()
except (KeyboardInterrupt, SystemExit):
logger.info("Service interruption received.")
finally:
await self.stop()
async def tools():
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment