Commit 0c04e7d4 authored by ZZH's avatar ZZH

add ems_collector code structure 2026-4-22 15:24

parent d72b7e91
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
*.yaml
*.pickle
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# mac ds store
.DS_Store
# data files that do not need to save in sql
data_file/
# Pycharm files
.idea/
# config file
# system conf
src/system_config/system.conf
src/system_config/settings.yaml
#test
/test
/zz_test
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 15:14
"""
This diff is collapsed.
This diff is collapsed.
# -*- coding:utf-8 -*-
"""
DATE:2026/4/13 14:31
"""
FIELD_MAP = {
244: { # 电池堆
'TotalVoltage': 'u',
'TotalCurrent': 'i',
'OperP': 'pttl',
'SOC': 'soc',
'SOH': 'soh',
'SOCMax': 'cell_soc_max',
'SOCMin': 'cell_soc_min',
# ================= 单体电压、温度极值及定位 =================
'UcellAvg': 'cell_u_mean',
'UcellMax': 'cell_u_max',
'UcellNumMax': 'cell_u_max_cell_no',
'UcellGNumMax': 'cell_u_max_rack_no',
'UcellMin': 'cell_u_min',
'UcellNumMin': 'cell_u_min_cell_no',
'UcellGNumMin': 'cell_u_min_rack_no',
'TcellAvg': 'cell_temp_mean',
'TcellMax': 'cell_temp_max',
'TcellNumMax': 'cell_temp_max_cell_no',
'TcellGNumMax': 'cell_temp_max_rack_no',
'TcellMin': 'cell_temp_min',
'TcellNumMin': 'cell_temp_min_cell_no',
'TcellGNumMin': 'cell_temp_min_rack_no',
# ================= 簇级 聚合极值 =================
'UgroupAvg': 'rack_u_mean',
'UgroupMax': 'rack_u_max',
'UgroupNumMax': 'rack_u_max_rack_no',
'UgroupMin': 'rack_u_min',
'UgroupNumMin': 'rack_u_min_rack_no',
'IgroupMax': 'rack_i_max',
'IgroupNumMax': 'rack_i_max_rack_no',
'IgroupMin': 'rack_i_min',
'IgroupNumMin': 'rack_i_min_rack_no',
'StChargEng': 'chg_cap_max_e', # 堆最大可充电量
'StDischargEng': 'disc_cap_max_e', # 堆最大可放电量
'ChargEngD': 'chg_cap_day', # 日充电电量
'DischargEngD': 'disc_cap_day', # 日放电电量
'ChargEngT': 'accum_chg_cap', # 堆累计充电电量
'DischargEngT': 'accum_disc_cap', # 堆累计放电电量
'CharHourT': 'accum_chg_time', # 堆累计充电时间
'DischarHourT': 'accum_disc_time', # 堆累计放电时间
},
243: { # 变流器
'Uab': 'uab',
'Ubc': 'ubc',
'Uca': 'uca',
'Ia': 'ia',
'Ib': 'ib',
'Ic': 'ic',
'Psum': 'pttl',
'Qsum': 'qttl',
'TotalAppaP': 'sttl',
'Cos': 'costtl',
'Vdc': 'udc',
'Idc': 'idc',
'Pdc': 'pdc',
# 正向-充电(用电),负向-放电(发电)
'CharKwhD': 'kwhttl_day_n', # 日累计充电量
'DiscKwhD': 'kwhttl_day_p', # 日累计放电量
'CharKwhM': 'kwhttl_month_n', # 月累计充电量 -> PCS 交流反向有功总电能(月)
'DiscKwhM': 'kwhttl_month_p', # 月累计放电量 -> PCS 交流正向有功总电能(月)
'CharKwhY': 'kwhttl_year_n', # 年累计充电量 -> PCS 交流反向有功总电能(年)
'DiscKwhY': 'kwhttl_year_p', # 年累计放电量 -> PCS 交流正向有功总电能(年)
'EChargeT': 'kwhttl_sum_n', # 总充电量 -> PCS 交流反向有功总电能(总)
'EDischT': 'kwhttl_sum_p', # 总放电量 -> PCS 交流正向有功总电能(总)
# 'CharKwhD-DiscKwhD': "kwhttl_day", # PCS有功总电能累计值(日)
},
251: { # 并网电表
'Ua': 'ua',
'Ub': 'ub',
'Uc': 'uc',
'Uab': 'uab',
'Ubc': 'ubc',
'Uca': 'uca',
'Ia': 'ia',
'Ib': 'ib',
'Ic': 'ic',
'Freq': 'freq',
'Pa': 'pa',
'Pb': 'pb',
'Pc': 'pc',
'P': 'pttl',
'Qa': 'qa',
'Qb': 'qb',
'Qc': 'qc',
'Q': 'qttl',
'Sa': 'sa',
'Sb': 'sb',
'Sc': 'sc',
'S': 'sttl',
'FacA': 'cosa',
'FacB': 'cosb',
'FacC': 'cosc',
'Fac': 'costtl',
"EletD": "power_day",
"EletM": "power_month",
"EletY": "power_year",
}
}
def transfer_realtime(pyd, datas):
cid, sid, = pyd["siteId"], pyd["equipmentCode"]
e_type = pyd["equipmentType"]
rlt = dict(cid=cid, mid=sid, nm=sid, e_type=e_type)
t = datas[0]["pushTime"]
tags = {}
for r in datas:
brief_code = r["keyword"].split("/")[-1]
tags[FIELD_MAP[e_type][brief_code]] = r["value"]
rlt["images"] = [dict(t=t, tags=tags)]
return rlt
# -*- coding:utf-8 -*-
"""
DATE:2026/4/22 14:58
"""
import asyncio
from gmqtt import Client as MQTTClient
from utils.utils import admin_client_id
from infra.logger.logger import Logger
log_name = f"meter_3rd"
Logger.init_logger_path(f"./ems_water_grp", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
MQTT_BROKER = "172.18.4.82"
MQTT_USER = "nyconsumer"
MQTT_PWD = "nyconsumer!321"
TOPIC = "factory/data"
class Meter3rdSrv:
topic = "factory/data"
def __init__(self):
self.mqtt_client = None
@staticmethod
def on_subscribe(client, mid, qos, properties):
logger.info(f"Sub to {TOPIC} success, mid: {mid}")
@staticmethod
def on_connect(client, flags, rc, properties):
logger.info(f"Connected to EMQX success, rc: {rc}")
client.subscribe(TOPIC, qos=1)
@staticmethod
def on_disconnect(client, packet, exc=None):
logger.warning(f"Disconnected from EMQX, exc: {exc}")
@staticmethod
def on_message(client, topic, payload, qos, properties):
payload = payload.decode("utf-8")
logger.error(f"Received message: topic={topic}, payload={payload}")
async def stop(self):
logger.info("Shutting down Service...")
await self.mqtt_client.disconnect()
async def start(self):
self.mqtt_client = MQTTClient(admin_client_id("WaterGrpService"))
self.mqtt_client.set_auth_credentials(MQTT_USER, MQTT_PWD)
self.mqtt_client.on_connect = self.on_connect
self.mqtt_client.on_subscribe = self.on_subscribe
self.mqtt_client.on_disconnect = self.on_disconnect
await self.mqtt_client.connect(MQTT_BROKER, 1883)
async def main():
srv = Meter3rdSrv()
await srv.start()
if __name__ == '__main__':
asyncio.run(main())
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 15:17
"""
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 17:59
"""
# -*- coding:utf-8 -*-
"""
DATE:2024/4/30 16:56
"""
import os
import yaml
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
set_url = os.path.join(BASE_DIR, "config/settings.yaml")
class Setting(object):
def __init__(self):
self.config = None
@classmethod
def load(cls):
with open(set_url) as file:
return yaml.safe_load(file.read())
def lazy_load(self, *keys):
if self.config is None:
self.config = self.load()
value = self.config
for key in keys:
if isinstance(value, dict):
value = value.get(key)
else:
return value
else:
return value
@property
def logger_base_path(self):
return self.lazy_load("logger", "base_path")
@property
def debug_mode(self):
return self.lazy_load("debug_mode")
@property
def redis_nodes(self):
return self.lazy_load("redis", "nodes")
@property
def redis_password(self):
return self.lazy_load("redis", "password")
@property
def mysql_db(self):
return self.lazy_load("mysql", "db")
@property
def mysql_user(self):
return self.lazy_load("mysql", "user")
@property
def mysql_password(self):
return self.lazy_load("mysql", "password")
@property
def mysql_port(self):
return self.lazy_load("mysql", "port")
@property
def mysql_host(self):
return self.lazy_load("mysql", "host")
@property
def mysql_charset(self):
return self.lazy_load("mysql", "charset")
@property
def tdengine_host(self):
return self.lazy_load("tdengine", "taosd_host")
@property
def tdengine_port(self):
return self.lazy_load("tdengine", "taosd_port")
@property
def tdengine_user(self):
return self.lazy_load("tdengine", "taosd_user")
@property
def tdengine_password(self):
return self.lazy_load("tdengine", "taosd_passwd")
@property
def kafka_servers(self):
return self.lazy_load("kafka_servers")
@property
def mqtt_host(self):
return self.lazy_load("mqtt", "host")
@property
def mqtt_port(self):
return self.lazy_load("mqtt", "port")
@property
def mqtt_user_name(self):
return self.lazy_load("mqtt", "mqtt_user_name")
@property
def get_mqtt_pwd_encry_algo(self):
return self.lazy_load('mqtt', 'encry_algo')
@property
def get_mqtt_host_encry_algo(self):
return self.lazy_load('mqtt', 'host_type')
@property
def web_host(self):
return self.lazy_load("servers", "web", "host")
@property
def web_port(self):
return self.lazy_load("servers", "web", "port")
@property
def qk_app_id(self):
return self.lazy_load("qk_apis_encrypt", "AppID")
@property
def qk_app_scr(self):
return self.lazy_load("qk_apis_encrypt", "AppSecret")
@property
def qk_token_key(self):
return self.lazy_load("qk_apis_encrypt", "token_key")
@property
def qk_aes_key(self):
return self.lazy_load("qk_apis_encrypt", "AESKey")
@property
def qk_aes_iv(self):
return self.lazy_load("qk_apis_encrypt", "AESIV")
@property
def qk_token_exp(self):
return self.lazy_load("qk_apis_encrypt", "TokenExpTime")
@property
def app_id_3rd(self):
return self.lazy_load("encrypt_3rd", "AppID")
@property
def app_scr_3rd(self):
return self.lazy_load("encrypt_3rd", "AppSecret")
@property
def aes_key_3rd(self):
return self.lazy_load("encrypt_3rd", "AESKey")
@property
def aes_iv_3rd(self):
return self.lazy_load("encrypt_3rd", "AESIV")
@property
def token_exp_3rd(self):
return self.lazy_load("encrypt_3rd", "TokenExpTime")
@property
def token_key_3rd(self):
return self.lazy_load("encrypt_3rd", "token_key")
@property
def tidb_host(self):
return self.lazy_load("tidb", "host")
@property
def tidb_port(self):
return self.lazy_load("tidb", "port")
@property
def tidb_server_user(self):
return self.lazy_load("tidb", "user")
@property
def tidb_passwd(self):
return self.lazy_load("tidb", "password")
@property
def tidb_default_db(self):
return self.lazy_load("tidb", "db")
@property
def tidb_charset(self):
return self.lazy_load("tidb", "charset")
@property
def stb_url(self):
return self.lazy_load("stb_url")
SETTING = Setting()
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 18:02
"""
# httpx_util.py —— 专为 httpx==0.11.1 编写
import asyncio
import logging
from enum import Enum
from typing import Optional, Tuple
import httpx
logger = logging.getLogger(__name__)
class HttpMethod(str, Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
class HttpxClient:
"""
httpx.AsyncClient 的单例封装,专为 httpx==0.11.1 适配。
核心设计:
1. 单例模式保证全进程只有一个连接池
2. 用 _initialized 标志位替代 is_closed(0.11.1 不支持)
3. 异步锁在模块加载时确定性初始化,避免竞争条件
4. 支持 async with 语法管理生命周期
"""
_instance: Optional["HttpxClient"] = None
_client: Optional[httpx.AsyncClient] = None
_lock: asyncio.Lock = None # 延迟到首次使用时初始化
_initialized: bool = False # 替代 is_closed 的状态标志
def __new__(cls) -> "HttpxClient":
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def _get_lock(cls) -> asyncio.Lock:
"""
安全地获取锁实例。
注意:asyncio.Lock() 必须在事件循环启动后创建,
因此采用懒初始化,但通过双重判断保证只创建一次。
"""
if cls._lock is None:
# 此处在事件循环内首次调用,单线程 asyncio 环境下是安全的
cls._lock = asyncio.Lock()
return cls._lock
async def _ensure_client(self) -> None:
"""确保客户端已初始化,使用双重检查锁保证并发安全。"""
# 快速路径:已初始化则直接返回(无锁,高性能)
if self._initialized and self._client is not None:
return
# 慢速路径:获取锁后再次检查
async with self._get_lock():
if self._initialized and self._client is not None:
return
logger.info("正在初始化 HttpxClient 连接池 (httpx==0.11.1)...")
try:
# httpx 0.11.1 的正确 API:
# - Timeout 只接受单个 float 或关键字参数
# - 连接池通过 pool_limits 控制,类名为 PoolLimits
# - 参数名为 hard_limit / soft_limit
self.__class__._client = httpx.AsyncClient(
timeout=httpx.Timeout(
timeout=30.0, # 总超时
connect_timeout=10.0 # 连接超时(0.11.1 支持此参数名)
),
pool_limits=httpx.PoolLimits(
hard_limit=50, # 最大连接数
soft_limit=10 # 最大空闲连接数
),
verify=False, # 跳过 SSL 验证(如需验证请改为 True)
)
self.__class__._initialized = True
logger.info("HttpxClient 连接池初始化成功")
except Exception as e:
logger.error(f"HttpxClient 初始化失败: {e}")
raise
async def request(
self,
method: HttpMethod,
url: str,
**kwargs
) -> httpx.Response:
"""执行 HTTP 请求,统一处理超时和连接异常。"""
await self._ensure_client()
method_str = method.value if isinstance(method, HttpMethod) else str(method).upper()
try:
response = await self._client.request(method_str, url, **kwargs)
return response
except httpx.TimeoutException as e:
logger.error(f"请求超时: {method_str} {url} - {e}")
raise TimeoutError(f"请求超时: {url}") from e
except httpx.HTTPError as e:
logger.error(f"HTTP 请求失败: {method_str} {url} - {e}")
raise ConnectionError(f"请求失败: {url}") from e
except Exception as e:
logger.error(f"未知请求异常: {method_str} {url} - {e}")
raise
async def get(self, url: str, **kwargs) -> Tuple[str, int]:
"""GET 请求,返回 (响应文本, 状态码)。"""
resp = await self.request(HttpMethod.GET, url, **kwargs)
return resp.text, resp.status_code
async def post(self, url: str, **kwargs) -> Tuple[str, int]:
"""POST 请求,返回 (响应文本, 状态码)。"""
resp = await self.request(HttpMethod.POST, url, **kwargs)
return resp.text, resp.status_code
async def __aenter__(self) -> "HttpxClient":
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@classmethod
async def close(cls) -> None:
"""安全关闭连接池,释放所有底层 TCP 连接。"""
async with cls._get_lock():
if cls._client is not None and cls._initialized:
try:
# 0.11.1 使用 aclose(),如果不存在则降级为 close()
close_fn = getattr(cls._client, "aclose", None)
if close_fn is None:
close_fn = getattr(cls._client, "close", None)
if close_fn is not None:
result = close_fn()
# 兼容同步/异步两种 close 实现
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.warning(f"关闭 HttpxClient 时出现异常(可忽略): {e}")
finally:
cls._client = None
cls._initialized = False
logger.info("HttpxClient 已关闭")
\ No newline at end of file
# -*- coding:utf-8 -*-
"""
DATE:2026/2/3 17:14
"""
import asyncio
from typing import Optional, Tuple
from enum import Enum
import httpx
from infra.logger.logger import Logger
log_name = f"httpx_util"
Logger.init_logger_path(f"./infra", f"{log_name}.logger", log_name)
logger = Logger.getLogger(log_name)
class HttpMethod(str, Enum):
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
class HttpxClient:
""" 为了兼容Sanic, 版本限定0.11.1 """
_instance: Optional['HttpxClient'] = None
_client: Optional[httpx.AsyncClient] = None
_lock: Optional[asyncio.Lock] = None
_initialized: bool = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def get_lock(cls) -> asyncio.Lock:
if cls._lock is None:
cls._lock = asyncio.Lock()
return cls._lock
@classmethod
async def _ensure_client(cls):
if cls._initialized and cls._client is not None:
return
async with cls.get_lock():
if cls._initialized and cls._client is not None:
return
try:
cls._client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout=30.0, connect_timeout=10.0),
pool_limits=httpx.PoolLimits(
hard_limit=100, soft_limit=20
),
http2=True,
verify=False, # 默认关闭SSL验证
)
cls._initialized = True
logger.info("HttpxClient initialized successful")
except Exception as e:
logger.error(f"Failed to init HttpxClient: {e}")
raise
async def request(self, method, url, **kwargs) -> httpx.Response:
await self._ensure_client()
method = method.value if isinstance(method, HttpMethod) else str(
method).upper()
try:
return await self._client.request(method, url, **kwargs)
except httpx.TimeoutException as e:
logger.error(f"Request timeout: {method} {url}")
raise TimeoutError(f"Request timeout: {url}") from e
except httpx.HTTPError as e:
logger.error(f"Request failed: {method} {url} - {e}")
raise ConnectionError(f"Request failed: {url}") from e
except Exception as e:
logger.error(f"Unknown Request Exc, {method} {url}, {e}")
raise
async def get(self, url: str, **kwargs) -> Tuple[str, int]:
resp = await self.request(HttpMethod.GET, url, **kwargs)
return resp.text, resp.status_code
async def post(self, url: str, **kwargs) -> Tuple[str, int]:
resp = await self.request(HttpMethod.POST, url, **kwargs)
return resp.text, resp.status_code
async def __aenter__(self) -> "HttpxClient":
await self._ensure_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
@classmethod
async def close(cls):
async with cls.get_lock():
if cls._client is not None and cls._initialized:
try:
close_fn = getattr(cls._client, "aclose", None)
if close_fn is None:
close_fn = getattr(cls._client, "close", None)
if close_fn is not None:
result = close_fn()
# 兼容同步/异步两种 close 实现
if asyncio.iscoroutine(result):
await result
except Exception as e:
logger.warning(f"Close HttpxClient Exc, {e}")
finally:
cls._client = None
cls._initialized = False
logger.info("HttpxClient Closed")
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 18:03
"""
# -*- coding:utf-8 -*-
"""
DATE:2022/2/16 13:49
"""
import logging
import os
import sys
from logging.handlers import RotatingFileHandler
from typing import Dict
from infra.config.settings import SETTING
class ErrorFilter(logging.Filter):
def filter(self, record):
if record.levelno >= logging.WARNING:
return True
return False
class Logger(object):
""" Abstract python stand logging module for power iot.
All logging messages are written to file. And default log file size is 3
and default backupCount is 3.
Default logger name is "root", and we combine system_config module in here
to get the base log path.
Every logger instance has its own path config, store in dict `log_path`.
# NOTE: must call init_logger_path before getLogger for every logger.
"""
base_path = SETTING.logger_base_path
loggers = {}
log_path: Dict[str, str] = {}
__fmt = ("%(asctime)s %(levelname)s %(filename)s-%(lineno)d "
"pid-%(process)d: %(message)s")
__console_fmt = ('%(asctime)s %(levelname)s File "%(pathname)s", line '
'%(lineno)d p-%(process)d-t-%(thread)d: %(message)s')
__date_fmt = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(fmt=__fmt, datefmt=__date_fmt)
console_formatter = logging.Formatter(__console_fmt, __date_fmt)
@classmethod
def init_logger_path(cls, relative_dir, file_name, name='root'):
""" Init the log file path.
The log path is generated by base_path + relative_dir + file_name.
:param relative_dir: the relative directories part of the log file
:param file_name: the file name part of the log file
:param name: logger name, default is 'root'
# NOTE: the default logger is a logger named 'root', not the
root logger, the root logger can not get through
logging.getLogger('root'), only logging.getLogger() can get the root
logger, namely:
The root logger's name is 'root', but you can't access the root logger
by name.
"""
path_dir = os.path.join(cls.base_path, relative_dir)
if not os.path.exists(path_dir):
os.makedirs(path_dir, mode=0o755, exist_ok=True)
path = os.path.join(path_dir, file_name)
cls.log_path[name] = path
@classmethod
def update_logger_filename(cls, file_name, name='root'):
""" Update the file name part of logger `name`. Used by command line
parameter to overwrite the logger path set by `init_logger_path`.
:param file_name: the file name part of logger `name`
:param name: logger name, default is 'root'
# NOTE: only update the file name part, the relative dir part keeps
the same after calling init_logger_path.
# NOTE: default logger is a logger named 'root', not the root logger
"""
lg = cls.loggers.pop(name)
lg.handlers = []
path = cls.log_path.pop(name)
pl = path.split('/')
pl.pop()
pl.append(file_name)
path = '/'.join(pl)
cls.log_path[name] = path
return cls.getLogger(name)
@classmethod
def getLogger(cls, name="root"):
""" Mimic the stand logging module's getLogger method.
:param name: get the logger named `name`, default is 'root'
# NOTE: default logger is a logger named 'root', not the root logger
"""
if cls.loggers.get(name) is None:
lg = logging.getLogger(name)
lg.setLevel(logging.INFO)
log_path: str = cls.log_path[name]
handler = RotatingFileHandler(
log_path, mode='a', maxBytes=100 * 1024 * 1024, backupCount=3)
handler.setFormatter(cls.formatter)
lg.addHandler(handler)
# console = logging.StreamHandler(sys.stdout)
# console.setFormatter(cls.console_formatter)
# lg.addHandler(console)
# 会有重复日志,主要是想利用stderr的颜色高亮。
# 后续可以对logger的几个方法进行封装,这样就可以区别对待不同的等级了。
err = logging.StreamHandler(sys.stderr)
err.setFormatter(cls.console_formatter)
err.setLevel(logging.ERROR)
lg.addHandler(err)
# 错误日记单独记录到error.log
err_path_list = log_path.split("/")
err_path_list.pop()
err_log_path = "/".join(err_path_list)
err_file = os.path.join(err_log_path, "error.log")
err_handler = RotatingFileHandler(err_file, mode='a',
maxBytes=100 * 1024 * 1024,
backupCount=3)
err_handler.addFilter(ErrorFilter())
err_handler.setFormatter(cls.formatter)
lg.addHandler(err_handler)
lg.propagate = False # stop passing events to higher level loggers
cls.loggers[name] = lg
return logging.getLogger(name)
if __name__ == '__main__':
Logger.init_logger_path("./power", "power_1day.log", "power_1day")
logger = Logger.getLogger("power_1day")
print(logger)
\ No newline at end of file
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 18:02
"""
# -*- coding:utf-8 -*-
"""
DATE:2025/6/20 13:36
"""
import sys
import asyncio
import aiomysql
from aiomysql import DictCursor, Cursor
from infra.config.settings import SETTING
from infra.logger import Logger
log_name = f"aiomysql_util"
Logger.init_logger_path(f"./common_util", f"{log_name}.log", log_name)
logger = Logger.getLogger(log_name)
class MysqlUtil(object):
__pools = {} # key:(db, echo)
_pool_lock = None
def __init__(self, db=SETTING.mysql_db, echo=False):
self.conn_manager = None
self.conn = None
self.db = db
self.echo = echo
@classmethod
def _get_lock(cls):
if cls._pool_lock is None:
cls._pool_lock = asyncio.Lock()
return cls._pool_lock
@classmethod
async def init_pool(cls):
await cls.get_pool(db=SETTING.mysql_db, echo=False)
logger.info("Default Mysql pool initialized successfully.")
@classmethod
async def get_pool(cls, db, echo):
pool_key = (db, echo)
pool = cls.__pools.get(pool_key)
if pool and not cls.is_pool_closed(pool):
return pool
async with cls._get_lock():
pool = cls.__pools.get(pool_key)
if not pool or cls.is_pool_closed(pool):
try:
pool = await aiomysql.create_pool(
host=SETTING.mysql_host,
port=SETTING.mysql_port,
user=SETTING.mysql_user,
password=SETTING.mysql_password,
db=db,
autocommit=True,
cursorclass=DictCursor,
use_unicode=True,
echo=echo)
cls.__pools[pool_key] = pool
logger.info(f"Created MySQL pool for db={db}, echo={echo}")
except Exception as e:
logger.error(f"Failed to create MySQL pool: {e}")
raise
return pool
@classmethod
def is_pool_closed(cls, pool):
if pool is None:
return True
try:
return pool.closed
except AttributeError:
return getattr(pool, "_closed", True)
except Exception as e:
logger.exception(e)
return True
async def __aenter__(self):
pool = await self.get_pool(self.db, self.echo)
self.conn_manager = pool.acquire()
try:
self.conn = await self.conn_manager.__aenter__()
return self
except Exception:
await self.conn_manager.__aexit__(*sys.exc_info())
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.conn_manager:
await self.conn_manager.__aexit__(exc_type, exc_val, exc_tb)
async def fetchone(self, sql, args=None):
async with self.conn.cursor() as cur:
await cur.execute(sql, args)
return await cur.fetchone()
async def fetch_value(self, sql, args=None):
async with self.conn.cursor(Cursor) as cur:
await cur.execute(sql, args)
try:
return (await cur.fetchone())[0]
except Exception:
return None
async def fetchall(self, sql, args=None):
async with self.conn.cursor() as cur:
await cur.execute(sql, args)
return await cur.fetchall()
async def execute(self, sql, args=None):
async with self.conn.cursor() as cur:
return await cur.execute(sql, args)
async def insert_one(self, sql, args=None, return_auto_increment_id=False):
async with self.conn.cursor() as cur:
result = await cur.execute(sql, args)
if return_auto_increment_id:
return cur.lastrowid
else:
return result
async def insert_many(self, sql, args=None):
async with self.conn.cursor() as cur:
return await cur.executemany(sql, args)
@classmethod
async def close_all_pools(cls):
for pool in cls.__pools.values():
pool.close()
await pool.wait_closed()
cls.__pools.clear()
async def main():
async with MysqlUtil("power_iot") as conn:
result = await conn.fetch_value('select count(*) from location ')
print(result)
if __name__ == '__main__':
from asyncio import get_event_loop
get_event_loop().run_until_complete(main())
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 15:15
"""
# -*- coding:utf-8 -*-
"""
DATE:2026/4/9 15:33
"""
# 运营商标识appId, 运营商秘钥app_scr, token_key
USER_AES = {
"WATER_GRP": {
"AppID": "b4e09d947ccd4695b67f4803d05e5b81",
"AppSecret": "OWM4N2M5NzBkOWRiMDY5NjU0MTZjZjUzZDVjM2ExMDM="
},
}
# -*- coding:utf-8 -*-
"""
DATE:2024/4/29 16:26
"""
from gmssl import sm3
import time
import base64
import hmac
from infra.config.settings import SETTING
from utils.constants import USER_AES
def gen_sign(app_id, app_scr):
msg_bytes = f"{app_id}{app_scr}".encode("utf-8")
return sm3.sm3_hash([b for b in msg_bytes])
def gen_user_sign(user):
if user not in USER_AES:
return ""
app_id, app_scr = USER_AES[user]["AppID"], USER_AES[user]["AppSecret"]
return gen_sign(app_id, app_scr)
def gen_token(tkey, expire=SETTING.token_exp_3rd):
exp_str = str(int(time.time()) + expire)
exp_bytes = exp_str.encode("utf-8")
key_sha = hmac.new(tkey.encode("utf-8"), exp_bytes, "sha1").hexdigest()
token = exp_str + ":" + key_sha
b64_token = base64.urlsafe_b64encode(token.encode("utf-8"))
return b64_token.decode("utf-8")
def sm3_encrypt(msg_bytes: bytes):
return sm3.sm3_hash([b for b in msg_bytes])
def load_public_params(app_id):
app_scr = USER_AES[app_id]["AppSecret"]
token_key = USER_AES[app_id]["token_key"]
msg_bytes = f"{app_id}{app_scr}".encode("utf-8")
sign = sm3_encrypt(msg_bytes)
access_token = gen_token(token_key)
return dict(app_id=app_id, app_scr=app_scr, sign=sign, tkey=token_key,
access_token=access_token)
async def load_auth_info(app_id):
opr_info = await load_operator_info(app_id)
app_scr = opr_info["opr_secret"]
AES_key = opr_info["data_secret"]
AES_iv = opr_info["data_secret_iv"]
token_key = opr_info["token_key"]
msg_bytes = f"{app_id}{app_scr}".encode("utf-8")
sign = sm3_encrypt(msg_bytes)
access_token = gen_token(token_key)
return dict(app_id=app_id, app_scr=app_scr, sign=sign, tkey=token_key,
AES_key=AES_key, AES_iv=AES_iv, access_token=access_token)
def verify_token(opr_info, token):
token_str = base64.urlsafe_b64decode(token).decode("utf-8")
token_lst = token_str.split(":")
if len(token_lst) != 2:
return False
exp_str = token_lst[0]
if float(exp_str) < time.time():
return False
token_sha = token_lst[1]
tkey = opr_info["token_key"]
key_sha = hmac.new(tkey.encode("utf-8"), exp_str.encode("utf-8"), "sha1")
if key_sha.hexdigest() != token_sha:
return False
return True
# -*- coding:utf-8 -*-
"""
DATE:2024/4/30 13:44
"""
import pendulum
CST = "Asia/Shanghai"
YMD = "YYYYMMDD"
YM = "YYYYMM"
YMD_Hm = "YYYY-MM-DD HH:mm"
YMD_Hms = "YYYY-MM-DD HH:mm:ss"
def dt2timestamp(dt, dt_fmt):
dt = pendulum.from_format(str(dt), dt_fmt, tz=CST)
return dt.int_timestamp
def timestamp2dts(timestamp, dt_fmt):
dt = pendulum.from_timestamp(timestamp, tz=CST)
return dt.format(dt_fmt)
def gen_cur_hour():
dt = pendulum.now(tz=CST)
ts = dt.int_timestamp // (60 * 60) * (60 * 60)
return pendulum.from_timestamp(ts, tz=CST)
def gen_cur_month_dt():
now = pendulum.now(tz=CST)
return pendulum.datetime(now.year, now.month, 1, tz=CST)
def gen_future_day_dt(days):
now = pendulum.now(tz=CST)
today_dt = pendulum.datetime(now.year, now.month, now.day, tz=CST)
return today_dt.add(days=days)
\ No newline at end of file
# -*- coding:utf-8 -*-
"""
DATE:2026/4/14 17:23
"""
import os
import socket
import hashlib
from infra.config.settings import SETTING
from infra.logger.logger import Logger
Logger.init_logger_path("./utils", "utils.log", "utils")
logger = Logger.getLogger("utils")
def admin_client_id(uid):
host_str = socket.gethostbyname(socket.gethostname()).replace(".", "_")
return f"/eems/admin/{uid}@{host_str}_{os.getpid()}"
def mqtt_pwd(username):
encry_algo = "sha1"
try:
encry_algo = SETTING.get_mqtt_pwd_encry_algo
except Exception as e:
logger.exception(e)
if encry_algo == "plain":
return username
return hashlib.sha1(username.encode("utf-8")).hexdigest()
if __name__ == '__main__':
print(admin_client_id("admin"))
print(mqtt_pwd("pot_emqx_super"))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment