Commit 9d4ca45c authored by ZZH's avatar ZZH

opt ems collector 2026-4-30 17:32

parent da0ba8f2
......@@ -269,36 +269,42 @@ class WaterGrpService:
results = await asyncio.gather(*tasks, return_exceptions=True)
pub_cnt = 0
for pyd, data_lst in results:
for item in results:
if isinstance(item, Exception):
logger.error(f"Task generated an exception: {item}")
continue
try:
pyd, data_lst = item
if not data_lst:
continue
payload = transfer_realtime(pyd, data_lst)
topic = ETYPE_TOPIC[pyd["equipmentType"]]
topic = f"{self.topic_prefix}{topic}/{pyd['equipmentCode']}"
logger.info(f"Pub {topic}, pyd: {payload}")
self.mqtt_client.publish(topic, payload)
self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
pub_cnt += 1
except Exception as e:
logger.error(f"Parse and forward data:{data_lst} fail, {e}")
logger.error(f"Parse and forward data fail, {e}")
continue
logger.info(f"Poll finished, forward {pub_cnt} datas")
async def main_loop(self):
while True:
try:
try:
while True:
start_ts = time.time()
await self.sync_once_data()
try:
await self.sync_once_data()
except Exception as e:
logger.error(f"Sync data fail in this loop: {e}")
sleep_ts = max(0.1, POLL_INTVL - (time.time() - start_ts))
if sleep_ts > 0:
await asyncio.sleep(sleep_ts)
except Exception as e:
logger.error(f"Sync data fail, will retry next loop: {e}")
finally:
await self.stop()
finally:
await self.stop()
async def start(self):
self.mqtt_client = MQTTClient(admin_client_id("WaterGrpService"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment