Commit 99c71089 authored by ZZH's avatar ZZH

change real time ele qua load from redis 2023-6-25

parent 5be6d7c4
# -*- coding:utf-8 -*-
#
# Author:jing
# Date: 2020/7/9
import json
from dataclasses import fields
import pendulum
from pot_libs.settings import SETTING
from unify_api.constants import CST
from unify_api.modules.common.procedures.points import get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from pot_libs.sanic_api import summary, description, examples
from pot_libs.logger import log
from pot_libs.utils.exc_util import ParamException, BusinessException
from unify_api.modules.common.components.common_cps import CidPointsReq
from unify_api.modules.common.procedures import health_score
from unify_api.modules.electric.procedures.electric_pds import \
trans_electric_tdengine_data
from unify_api.modules.electric.service.electric_service import (
elec_current_storeys_service, qual_current_storeys_service,
elec_card_level_service, qual_current_level_service, elec_index_service,
elec_current_service, get_sdu_i_and_u
elec_current_service
)
from unify_api.utils import time_format
from unify_api import constants
from unify_api.modules.electric.procedures.electric_util import (
load_point_ctnum, add_random_change,
load_point_ctnum, add_random_change
)
from pot_libs.common.components.query import PageRequest
from unify_api.modules.electric.components.electric import (
ElecHistoryResponse,
......@@ -56,10 +40,6 @@ from unify_api.utils.request_util import filed_value_from_list
from unify_api.modules.electric.dao.electric_dao import (
get_qual_history_dao, get_elec_history_dao
)
from unify_api.utils.taos_new import parse_td_columns
METERDATA_CURRENT_KEY = "meterdata_current"
METERDATA_CURRENT_HR_KEY = "meterdata_hr_current"
@summary("用电监测-实时监测-历史曲线")
......@@ -383,87 +363,48 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
try:
point_id = filed_value_from_list(body.filter.equals, "point_id")
cid = req.json["cid"]
except:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
message="param exception, equals is NULL, no point_id")
except Exception as e:
log.warning(f"param exception, equals is NULL, no pid, e:{e}")
raise ParamException(message="param exception, equals is NULL, no pid")
if point_id <= 0 or not point_id or cid <= 0:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
message="param exception, equals is NULL, no point_id")
# 获取mtid
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 过期时间
last_ts = pendulum.now(tz=CST).subtract(
seconds=constants.REAL_EXP_TIME).format("YYYY-MM-DD HH:mm:ss")
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric?tz={CST}"
sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_ts}'"
hr_sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_ts}' " \
f"and harmonic is not null "
is_succ, results = await get_td_engine_data(url, sql)
is_succ2, results2 = await get_td_engine_data(url, hr_sql)
if not is_succ:
raise BusinessException(message="数据查询失败!")
res = trans_electric_tdengine_data(results)
if is_succ2:
res2 = trans_electric_tdengine_data(results2)
harmonic = json.loads(res2.get("harmonic")) if res2.get(
"harmonic") else {}
else:
harmonic = json.loads(res.get("harmonic")) if res.get(
"harmonic") else {}
ctnum = res.get("ctnum") or 3
raise ParamException(message="param exception, equals is NULL, no pid")
try:
time_str, d_rt_ele = await elec_current_service(point_id)
if d_rt_ele is None:
d_rt_ele = {}
# 加些随机变化(防止数据一直不变化)
for k in res.keys():
res[k] = add_random_change(res[k])
for k, v in d_rt_ele.items():
if isinstance(v, (str, int)):
continue
voltage_harmonic_dict, current_harmonic_dict = {}, {}
d_rt_ele[k] = add_random_change(v)
except Exception as e:
log.error(f"post_qual_current service error:{str(e)}")
raise BusinessException(message=f"{str(e)}")
d_vol_harm, d_cur_harm = {}, {}
for k in [field.name for field in fields(VoltageHarmonicRate)]:
if k in harmonic.keys():
voltage_harmonic_dict[k] = harmonic.get(k) or ''
else:
voltage_harmonic_dict[k] = ''
voltage_harmonic = VoltageHarmonicRate(**voltage_harmonic_dict)
d_vol_harm[k] = d_rt_ele.get(k) or ""
voltage_harmonic = VoltageHarmonicRate(**d_vol_harm)
for k in [field.name for field in fields(CurrentHarmonicRate)]:
if k in harmonic.keys():
current_harmonic_dict[k] = harmonic.get(k) or ''
else:
current_harmonic_dict[k] = ''
current_harmonic = CurrentHarmonicRate(**current_harmonic_dict)
health_index = await health_score.load_health_index(cid, point_id)
ret_items = [
"ua_dev",
"ub_dev",
"uc_dev",
"uab_dev",
"ucb_dev",
"freq_dev",
"ubl",
"ibl",
"sdu_i",
"sdu_u",
]
d_cur_harm[k] = d_rt_ele.get(k) or ""
current_harmonic = CurrentHarmonicRate(**d_cur_harm)
# 识电U只有一项有数据,返回具体的项
res = get_sdu_i_and_u(res, ctnum)
if res.get("ts"):
time_str = str(res["ts"])[0:-4]
else:
time_str = time_format.get_datetime_str(0)
health_index = await health_score.load_health_index(cid, point_id)
ret_items = ["ua_dev", "ub_dev", "uc_dev", "uab_dev", "ucb_dev",
"freq_dev", "ubl", "ibl", "sdu_i", "sdu_u"]
return QualCurrentResponse(
ctnum=ctnum,
ctnum=d_rt_ele["ctnum"],
real_time=time_str,
health_index=health_index,
voltage_harmonic=voltage_harmonic,
current_harmonic=current_harmonic,
**{k: v for k, v in res.items() if k in ret_items},
**{k: v for k, v in d_rt_ele.items() if k in ret_items},
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment