Commit 7321a21d authored by ZZH's avatar ZZH

opt 3rd data collector srv 2026-6-16 16:10

parent dd7f0e02
......@@ -362,7 +362,8 @@ async def tools():
"siteId": 185,
"equipmentCode": "SESBMS00003",
"equipmentType": 244,
"briefCodeList": ["StChargEng", "StDischargEng", "StCap","TotalVoltage"]
"briefCodeList": ["StChargEng", "StDischargEng", "StCap",
"TotalVoltage"]
}
pyd, rlt = await api_client.fetch_realtime(pyd)
print("rlt", rlt)
......@@ -371,11 +372,14 @@ async def tools():
async def main():
try:
srv = WaterGrpService()
await srv.start()
except Exception as e:
logger.error(f"Sync data fail, will retry next loop: {e}")
while True:
try:
srv = WaterGrpService()
await srv.start()
break
except Exception as e:
logger.error(f"Sync data fail, will retry next loop: {e}")
await asyncio.sleep(30)
if __name__ == "__main__":
......
......@@ -141,13 +141,19 @@ class Meter3rdForward:
async def stop(self):
logger.info("Shutting down Meter3rdForward Service...")
if self.client_sub:
await self.client_sub.disconnect()
self.stop_event.set()
if self.client_pub:
await self.client_pub.disconnect()
try:
if self.client_sub:
await self.client_sub.disconnect()
except Exception:
pass
self.stop_event.set()
try:
if self.client_pub:
await self.client_pub.disconnect()
except Exception:
pass
async def start(self):
self.client_sub = MQTTClient(admin_client_id("Meter3rdForward"))
......@@ -162,23 +168,44 @@ class Meter3rdForward:
self.client_pub.on_connect = self.on_pub_connect
self.client_pub.on_disconnect = self.on_pub_disconnect
await asyncio.gather(
self.client_sub.connect(self.sub_host, 1883),
self.client_pub.connect(SETTING.mqtt_host, SETTING.mqtt_port)
)
loop = asyncio.get_running_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self.stop_event.set)
while not self.stop_event.is_set():
try:
await asyncio.gather(
self.client_sub.connect(self.sub_host, 1883),
self.client_pub.connect(SETTING.mqtt_host,
SETTING.mqtt_port)
)
logger.info("双向 MQTT 服务初始连接全部成功!")
break
except Exception as e:
logger.error(f"MQTT 初始连接失败,30秒后重试:{e}")
await asyncio.sleep(30)
if self.stop_event.is_set():
return
asyncio.create_task(self.snapshot_sampler())
await self.stop_event.wait()
logger.info("Meter3rdForward 核心服务启动,等待退出信号...")
try:
await self.stop_event.wait()
finally:
await self.stop()
async def main():
srv = Meter3rdForward()
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(srv.stop()))
await srv.start()
while True:
try:
srv = Meter3rdForward()
await srv.start()
break
except Exception as e:
logger.error(f"转发服务发生致命异常,10秒后尝试重新初始化: {e}")
await asyncio.sleep(10)
if __name__ == '__main__':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment