Commit eb80aa41 authored by ZZH's avatar ZZH

fix meter data collector 2026-5-22 15:15

parent e057286e
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
DATE:2026/4/13 14:31 DATE:2026/4/13 14:31
""" """
from infra.mysql.aiomysql_util import MysqlUtil
EMS_DEVS = [ EMS_DEVS = [
# 荷坳水厂水光储一体化站 # 荷坳水厂水光储一体化站
...@@ -203,3 +204,15 @@ def transfer_realtime(pyd, datas): ...@@ -203,3 +204,15 @@ def transfer_realtime(pyd, datas):
tags[FIELD_MAP[equipmentType][brief_code]] = r["value"] tags[FIELD_MAP[equipmentType][brief_code]] = r["value"]
rlt["images"] = [dict(t=t, tags=tags)] rlt["images"] = [dict(t=t, tags=tags)]
return rlt return rlt
async def load_mtype_from_sm(sid):
async with MysqlUtil("water_works") as conn:
sql = f"SELECT m_type from water_works.monitor WHERE sid=%s"
return await conn.fetch_value(sql, (sid,))
if __name__ == '__main__':
import asyncio
asyncio.run(load_mtype_from_sm("HA_JXG2"))
...@@ -12,6 +12,7 @@ from utils.time_format import CST, YMD_Hms ...@@ -12,6 +12,7 @@ from utils.time_format import CST, YMD_Hms
from gmqtt import Client as MQTTClient from gmqtt import Client as MQTTClient
from ems_water_grp.constants import SCADA_FIELDS_MAP from ems_water_grp.constants import SCADA_FIELDS_MAP
from utils.utils import admin_client_id, mqtt_pwd from utils.utils import admin_client_id, mqtt_pwd
from ems_water_grp.helper import load_mtype_from_sm
from infra.config.settings import SETTING from infra.config.settings import SETTING
from infra.logger.logger import Logger from infra.logger.logger import Logger
...@@ -27,7 +28,7 @@ class Meter3rdForward: ...@@ -27,7 +28,7 @@ class Meter3rdForward:
sub_topic = "factory/data" sub_topic = "factory/data"
pub_username = "pot_emqx_super" pub_username = "pot_emqx_super"
pub_topic_prefix = "eems/td/" pub_topic_pfx = "eems/td/"
def __init__(self): def __init__(self):
self.client_sub = None self.client_sub = None
...@@ -72,6 +73,16 @@ class Meter3rdForward: ...@@ -72,6 +73,16 @@ class Meter3rdForward:
except Exception as e: except Exception as e:
logger.error(f"Decode msg:{payload} error: {e}") logger.error(f"Decode msg:{payload} error: {e}")
@staticmethod
async def parse_topic(sid):
m_type = await load_mtype_from_sm(sid)
if m_type:
if m_type == 137:
return "pcc_ele"
elif m_type == 136:
return "load_ele"
return ""
async def parse_and_forward(self, fid, raw_payload): async def parse_and_forward(self, fid, raw_payload):
""" 重新封装消息,推送至EMQX """ """ 重新封装消息,推送至EMQX """
try: try:
...@@ -96,9 +107,13 @@ class Meter3rdForward: ...@@ -96,9 +107,13 @@ class Meter3rdForward:
cnt = 0 cnt = 0
for sid, payload in d_pyds.items(): for sid, payload in d_pyds.items():
logger.info(f"{sid}: {payload}") logger.info(f"{sid}: {payload}")
self.client_pub.publish( topic = await self.parse_topic(sid)
f"{self.pub_topic_prefix}/load_ele/{sid}", if not topic:
json.dumps(payload), qos=1) logger.error(f"Parse topic {sid} failed, pyd:{payload}")
continue
topic = f"{self.pub_topic_pfx}/{topic}/{sid}"
self.client_pub.publish(topic, json.dumps(payload), qos=1)
cnt += 1 cnt += 1
logger.info(f"forward fid:{fid} {cnt} msgs") logger.info(f"forward fid:{fid} {cnt} msgs")
......
...@@ -8,7 +8,7 @@ import asyncio ...@@ -8,7 +8,7 @@ import asyncio
import aiomysql import aiomysql
from aiomysql import DictCursor, Cursor from aiomysql import DictCursor, Cursor
from infra.config.settings import SETTING from infra.config.settings import SETTING
from infra.logger import Logger from infra.logger.logger import Logger
log_name = f"aiomysql_util" log_name = f"aiomysql_util"
Logger.init_logger_path(f"./common_util", f"{log_name}.log", log_name) Logger.init_logger_path(f"./common_util", f"{log_name}.log", log_name)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment