Commit 5c6c1e29 authored by lcn's avatar lcn

bug修复

parent 354b291f
......@@ -20,3 +20,11 @@ async def elec_current_data(mtids, cid):
if data["mtid"] in mtids:
res_map[data["mtid"]] = data
return res_map
def trans_electric_tdengine_data(results):
head = parse_td_columns(results)
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
return res
......@@ -5,7 +5,11 @@
import time
import json
from dataclasses import fields
import pendulum
from pot_libs.settings import SETTING
from unify_api.constants import CST
from unify_api.modules.common.procedures.points import get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
......@@ -17,6 +21,8 @@ from pot_libs.utils.exc_util import (
)
from unify_api.modules.common.components.common_cps import CidPointsReq
from unify_api.modules.common.procedures import health_score
from unify_api.modules.electric.procedures.electric_pds import \
trans_electric_tdengine_data
from unify_api.modules.electric.service.electric_service import (
elec_current_storeys_service, qual_current_storeys_service,
elec_card_level_service, qual_current_level_service, elec_index_service,
......@@ -72,15 +78,15 @@ async def post_elec_history(req, body: PageRequest) -> ElecHistoryResponse:
except:
log.error("param error, ranges is NULL")
raise ParamException(message="param error, ranges is NULL")
try:
intervel, slots = time_format.time_pick_transf(date_start, date_end)
except:
log.error("param error, date format error")
raise ParamException(message="param error, date format error")
point_id = filed_value_from_list(body.filter.equals, "point_id")
if point_id <= 0 or not point_id:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
......@@ -193,16 +199,16 @@ async def post_elec_current(req, body: PageRequest) -> ElecCurrentResponse:
async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
point_id = filed_value_from_list(body.filter.equals, "point_id")
if point_id <= 0 or not point_id:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
raise ParamException(message=msg)
ctnum, mid = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum")
if not mid:
log.error(f"post_elec_current pid={point_id} 没有mid 装置已经被拆除")
# 拆了,没数据,返回空
......@@ -211,7 +217,7 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
now_ts = int(time.time())
real_tt = now_ts
res = None
result = await RedisUtils().hget(METERDATA_CURRENT_KEY, mid)
result2 = await RedisUtils().hget(METERDATA_CURRENT_HR_KEY, mid)
if result and result2:
......@@ -232,7 +238,7 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
log.info("1 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
elif result2:
res = json.loads(result2)
real_tt = res.get("timestamp")
......@@ -242,7 +248,7 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
log.info("2 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
elif result:
res = json.loads(result)
real_tt = res.get("timestamp")
......@@ -252,14 +258,14 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
log.info("3 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
else:
log.error("realtime_elec_qual not exist")
time_str = time_format.get_datetime_str(real_tt)
except Exception as e:
log.exception(e)
raise DBException
# 加些随机变化(防止数据一直不变化)
if res:
for k in res.keys():
......@@ -267,14 +273,14 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
else:
log.error("realtime_elec_qual not exist")
return ElecCurrentResponse()
if not ctnum:
# 如果已经拆掉了,还有报文,用报文中的ctnum
ctnum = res.get("ctnum") or 3
if "ctnum" in res:
del res["ctnum"]
# 识电U只有一项有数据,返回具体的项
res["sdu_i"] = None
res["sdu_u"] = None
......@@ -294,7 +300,7 @@ async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
res["sdu_u"] = "uc"
if ctnum == 2:
res["sdu_u"] = "ucb"
return ElecCurrentResponse(
ctnum=ctnum,
real_time=time_str,
......@@ -312,7 +318,7 @@ async def post_elec_index(req, body: PageRequest) -> ElecIndexResponse:
cid = req.json.get("cid")
# 1. 获取point_id
point_id = filed_value_from_list(body.filter.equals, "point_id")
if not point_id or point_id <= 0:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
......@@ -339,20 +345,20 @@ async def post_qual_history(req, body: PageRequest) -> QualHistoryResponse:
except:
log.error("param error, ranges is NULL")
raise ParamException(message="param error, ranges is NULL")
try:
intervel, slots = time_format.time_pick_transf(date_start, date_end)
except:
log.error("param error, date format error")
raise ParamException(message="param error, date format error")
point_id = filed_value_from_list(body.filter.equals, "point_id")
if not point_id or point_id <= 0:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
raise ParamException(message=msg)
return await qual_history_service(date_start, date_end, intervel, slots,
point_id)
......@@ -411,7 +417,7 @@ async def qual_history_service(start, end, intervel, slots, pid):
else:
for stats_item in stats_items:
elec_data[stats_item].append("")
voltage_dev = VoltageDev(
**{
k.rsplit("_", 1)[0]: v
......@@ -474,7 +480,7 @@ async def qual_history_service(start, end, intervel, slots, pid):
):
sdu_i = "ic"
sdu_u = "uc"
return QualHistoryResponse(
ctnum=ctnum,
voltage_dev=voltage_dev,
......@@ -499,7 +505,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
message="param exception, equals is NULL, no point_id")
if point_id <= 0 or not point_id or cid <= 0:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
......@@ -510,61 +516,73 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
raise BusinessException(
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 过期时间
last_ts = pendulum.now(tz=CST).subtract(
seconds=constants.REAL_EXP_TIME).format("YYYY-MM-DD HH:mm:ss")
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
url = f"{SETTING.stb_url}db_electric?tz={CST}"
sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_ts}'"
hr_sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_ts}' " \
f"and harmonic is not null "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
ctnum = res.get("ctnum") or 3
is_succ2, results2 = await get_td_engine_data(url, hr_sql)
if not is_succ:
raise BusinessException(message="数据查询失败!")
res = trans_electric_tdengine_data(results)
if is_succ2:
res2 = trans_electric_tdengine_data(results2)
harmonic = json.loads(res2.get("harmonic")) if res2.get(
"harmonic") else {}
else:
harmonic = json.loads(res.get("harmonic")) if res.get(
"harmonic") else {}
voltage_harmonic_dict, current_harmonic_dict = {}, {}
for k in [field.name for field in fields(VoltageHarmonicRate)]:
if k in harmonic.keys():
voltage_harmonic_dict[k] = harmonic.get(k) or ''
else:
voltage_harmonic_dict[k] = ''
voltage_harmonic = VoltageHarmonicRate(**voltage_harmonic_dict)
for k in [field.name for field in fields(CurrentHarmonicRate)]:
if k in harmonic.keys():
current_harmonic_dict[k] = harmonic.get(k) or ''
else:
current_harmonic_dict[k] = ''
current_harmonic = CurrentHarmonicRate(**current_harmonic_dict)
health_index = await health_score.load_health_index(cid, point_id)
ret_items = [
"ua_dev",
"ub_dev",
"uc_dev",
"uab_dev",
"ucb_dev",
"freq_dev",
"ubl",
"ibl",
"sdu_i",
"sdu_u",
]
# 识电U只有一项有数据,返回具体的项
res = get_sdu_i_and_u(res, ctnum)
if res.get("ts"):
time_str = str(res["ts"])[0:-4]
ctnum = res.get("ctnum") or 3
# 加些随机变化(防止数据一直不变化)
for k in res.keys():
res[k] = add_random_change(res[k])
voltage_harmonic_dict, current_harmonic_dict = {}, {}
for k in [field.name for field in fields(VoltageHarmonicRate)]:
if k in harmonic.keys():
voltage_harmonic_dict[k] = harmonic.get(k) or ''
else:
voltage_harmonic_dict[k] = ''
voltage_harmonic = VoltageHarmonicRate(**voltage_harmonic_dict)
for k in [field.name for field in fields(CurrentHarmonicRate)]:
if k in harmonic.keys():
current_harmonic_dict[k] = harmonic.get(k) or ''
else:
time_str = time_format.get_datetime_str(0)
return QualCurrentResponse(
ctnum=ctnum,
real_time=time_str,
health_index=health_index,
voltage_harmonic=voltage_harmonic,
current_harmonic=current_harmonic,
**{k: v for k, v in res.items() if k in ret_items},
)
current_harmonic_dict[k] = ''
current_harmonic = CurrentHarmonicRate(**current_harmonic_dict)
health_index = await health_score.load_health_index(cid, point_id)
ret_items = [
"ua_dev",
"ub_dev",
"uc_dev",
"uab_dev",
"ucb_dev",
"freq_dev",
"ubl",
"ibl",
"sdu_i",
"sdu_u",
]
# 识电U只有一项有数据,返回具体的项
res = get_sdu_i_and_u(res, ctnum)
if res.get("ts"):
time_str = str(res["ts"])[0:-4]
else:
raise BusinessException(message="数据查询失败!")
time_str = time_format.get_datetime_str(0)
return QualCurrentResponse(
ctnum=ctnum,
real_time=time_str,
health_index=health_index,
voltage_harmonic=voltage_harmonic,
current_harmonic=current_harmonic,
**{k: v for k, v in res.items() if k in ret_items},
)
@summary("用电监测-实时监测-楼层")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment