Commit 5be6d7c4 authored by ZZH's avatar ZZH

change real time electric load from redis 2023-6-25

parent 27de7a5d
import json
import pandas as pd
import pendulum
from pot_libs.settings import SETTING
from pot_libs.logger import log
from pot_libs.utils.exc_util import BusinessException
from pot_libs.aredis_util.aredis_utils import RedisUtils
from unify_api.constants import (
POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, LF_THRESHOLD,
THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD
)
from unify_api.modules.common.procedures.points import points_by_storeys, \
get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points, get_electric_datas_dao
from unify_api.modules.electric.procedures.electric_pds import \
......@@ -18,13 +20,13 @@ from unify_api.utils.common_utils import round_2, round_4, multiplication_two
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type, load_point_ctnum
from datetime import datetime
from unify_api.constants import REAL_EXP_TIME
from unify_api.modules.common.procedures.location_temp_rcurrent import \
location_stats_statics
from unify_api.modules.electric.components.electric import (
ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
)
from unify_api.utils.taos_new import parse_td_columns
from unify_api.utils.time_format import get_15min_ago
from unify_api.utils.time_format import CST, YMD_Hms, timestamp2dts
async def elec_current_storeys_service(storeys):
......@@ -33,7 +35,7 @@ async def elec_current_storeys_service(storeys):
point_list = await points_by_storeys(storeys)
# mtids
mtids = [i.get("mtid") for i in point_list]
cid = point_list[0]['cid'] if len(point_list) > 0 else 0
cid = point_list[0]["cid"] if len(point_list) > 0 else 0
# 2.获取mid, ctnum
# point_mid = await batch_get_wiring_type(points)
# # 3. 获取redis数据
......@@ -70,8 +72,8 @@ async def elec_current_storeys_service(storeys):
"freq": round_2(res[mtid].get("freq")),
"costtl": round_2(res[mtid].get("costtl")),
"lf": round_2(res[mtid].get("lf")),
"sdu_i": res[mtid].get('sdu_i'),
"sdu_u": res[mtid].get('sdu_u'),
"sdu_i": res[mtid].get("sdu_i"),
"sdu_u": res[mtid].get("sdu_u"),
}
# redis数据过期,或者没有数据
else:
......@@ -124,7 +126,7 @@ async def qual_current_storeys_service(storeys):
# # 3. 获取redis数据
# res = await qual_current_data(point_mid)
mtids = [point["mtid"] for point in point_list if point["mtid"]]
cid = point_list[0]['cid'] if len(point_list) > 0 else 0
cid = point_list[0]["cid"] if len(point_list) > 0 else 0
res = await elec_current_data(mtids, cid)
# 4. 返回数据
qual_data = {}
......@@ -208,7 +210,7 @@ async def elec_card_level_service(point_list):
# res_redis = await elec_current_data(point_mid)
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
cid = monitor_point_list[0]["cid"] if len(monitor_point_list) > 0 else 0
results = await elec_current_data(mtids, cid)
# 4. 返回数据
ret_data = {
......@@ -302,7 +304,7 @@ async def qual_current_level_service(point_list):
# res_redis = await qual_current_data(point_mid)
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
cid = monitor_point_list[0]["cid"] if len(monitor_point_list) > 0 else 0
res = await elec_current_data(mtids, cid)
# 4. 返回数据
ret_data = {
......@@ -583,32 +585,49 @@ async def elec_index_service(cid, point_id, start, end):
async def elec_current_service(point_id):
# 获取mtid
meter_info = await get_meter_by_point(point_id)
if not meter_info:
raise BusinessException(
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
last_15min_time = get_15min_ago()
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_15min_time}'"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ or not results or results.get("code") > 0:
return '', {}
head = parse_td_columns(results)
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
p_info = await get_meter_by_point(point_id)
if not p_info or not p_info["mtid"]:
msg = f"没有监测点:{point_id} monitor信息,请联系运维人员!"
raise BusinessException(message=msg)
# 识电U只有一项有数据,返回具体的项
ctnum = res.get("ctnum") or 3
res = get_sdu_i_and_u(res, ctnum)
now_ts = pendulum.now(tz=CST).int_timestamp
d_ele_rt, ts = None, now_ts
try:
mtid = p_info["mtid"]
key = f"real_time:electric:{SETTING.mysql_db}:{mtid}"
key_hr = f"real_time:electric_hr:{SETTING.mysql_db}:{mtid}"
rt_ele, rt_ele_hr = await RedisUtils().mget([key, key_hr])
if rt_ele and rt_ele_hr:
rt_ele, rt_ele_hr = json.loads(rt_ele), json.loads(rt_ele_hr)
if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
for k in rt_ele_hr.keys():
if k not in rt_ele.keys():
rt_ele[k] = rt_ele_hr[k]
if res.get("ts"):
time_str = str(res["ts"])[0:19]
else:
time_str = time_format.get_datetime_str(0)
return time_str, res
d_ele_rt, ts = rt_ele, rt_ele["ts"]
elif rt_ele:
rt_ele = json.loads(rt_ele)
if now_ts - rt_ele["ts"] <= REAL_EXP_TIME:
d_ele_rt, ts = rt_ele, rt_ele["ts"]
elif rt_ele_hr:
rt_ele_hr = json.loads(rt_ele_hr)
if now_ts - rt_ele_hr["ts"] <= REAL_EXP_TIME:
d_ele_rt, ts = rt_ele_hr, rt_ele_hr["ts"]
except Exception as e:
log.error(f"parse real time electric error, pid:{point_id}")
log.exception(e)
time_str = timestamp2dts(ts, YMD_Hms)
if d_ele_rt is None:
return time_str, None
# 识电U只有一项有数据,返回具体的项
ctnum = d_ele_rt.get("ctnum") or 3
return time_str, get_sdu_i_and_u(d_ele_rt, ctnum)
def get_sdu_i_and_u(res, ctnum):
......@@ -617,15 +636,15 @@ def get_sdu_i_and_u(res, ctnum):
'''
res["sdu_i"] = None
res["sdu_u"] = None
meter_n = res.get('meter_sn', '').lower()
if meter_n == 'a':
meter_sn = res.get("meter_sn", "").lower()
if meter_sn == "a":
res["sdu_i"] = "ia"
res["sdu_u"] = "ua" if ctnum == 3 else "uab"
if meter_n == 'b':
if meter_sn == "b":
res["sdu_i"] = "ib"
if ctnum == 3:
res["sdu_u"] = "ub"
if meter_n == 'c':
if meter_sn == "c":
res["sdu_i"] = "ic"
res["sdu_u"] = "uc" if ctnum == 3 else "ucb"
return res
......@@ -2,7 +2,6 @@
#
# Author:jing
# Date: 2020/7/9
import time
import json
from dataclasses import fields
......@@ -13,12 +12,9 @@ from unify_api.constants import CST
from unify_api.modules.common.procedures.points import get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.sanic_api import summary, description, examples
from pot_libs.logger import log
from pot_libs.utils.exc_util import (
ParamException, DBException, BusinessException
)
from pot_libs.utils.exc_util import ParamException, BusinessException
from unify_api.modules.common.components.common_cps import CidPointsReq
from unify_api.modules.common.procedures import health_score
from unify_api.modules.electric.procedures.electric_pds import \
......@@ -31,8 +27,7 @@ from unify_api.modules.electric.service.electric_service import (
from unify_api.utils import time_format
from unify_api import constants
from unify_api.modules.electric.procedures.electric_util import (
get_wiring_type, load_point_ctnum,
add_random_change,
load_point_ctnum, add_random_change,
)
from pot_libs.common.components.query import PageRequest
......@@ -78,15 +73,15 @@ async def post_elec_history(req, body: PageRequest) -> ElecHistoryResponse:
except:
log.error("param error, ranges is NULL")
raise ParamException(message="param error, ranges is NULL")
try:
intervel, slots = time_format.time_pick_transf(date_start, date_end)
except:
log.error("param error, date format error")
raise ParamException(message="param error, date format error")
point_id = filed_value_from_list(body.filter.equals, "point_id")
if point_id <= 0 or not point_id:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
......@@ -179,137 +174,24 @@ async def post_elec_current(req, body: PageRequest) -> ElecCurrentResponse:
point_id = filed_value_from_list(body.filter.equals, "point_id")
if point_id <= 0 or not point_id:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
message="param exception, equals is Ncount_infoULL, no point_id")
raise ParamException(message="param exception, equals NULL, no pid")
try:
time_str, res = await elec_current_service(point_id)
# 加些随机变化(防止数据一直不变化)
for k, v in res.items():
if isinstance(v, (str, int)):
continue
res[k] = add_random_change(v)
except Exception as e:
log.error(f"post_elec_current elec_current_service error:"
f"{str(e)}")
log.error(f"post_elec_current service error:{str(e)}")
raise BusinessException(message=f"{str(e)}")
return ElecCurrentResponse(
real_time=time_str,
**{
k: v
for k, v in res.items()
if k in [field.name for field in fields(ElecCurrentResponse)]
},
)
async def post_elec_current_bak(req, body: PageRequest) -> ElecCurrentResponse:
point_id = filed_value_from_list(body.filter.equals, "point_id")
if point_id <= 0 or not point_id:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
raise ParamException(message=msg)
ctnum, mid = await get_wiring_type(point_id)
if ctnum not in [2, 3]:
log.error(f"elec_index point_id={point_id} ctnum={ctnum} 找不到ctnum")
if not mid:
log.error(f"post_elec_current pid={point_id} 没有mid 装置已经被拆除")
# 拆了,没数据,返回空
return ElecCurrentResponse()
try:
now_ts = int(time.time())
real_tt = now_ts
res = None
result = await RedisUtils().hget(METERDATA_CURRENT_KEY, mid)
result2 = await RedisUtils().hget(METERDATA_CURRENT_HR_KEY, mid)
if result and result2:
res = json.loads(result)
res2 = json.loads(result2)
real1_tt = res.get("timestamp")
real_tt = res2.get("timestamp")
if (
real1_tt
and real_tt
and now_ts - real1_tt <= constants.REAL_EXP_TIME
and now_ts - real_tt <= constants.REAL_EXP_TIME
):
for k in res2.keys():
if not k in res.keys():
res[k] = res2[k]
else:
log.info("1 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
elif result2:
res = json.loads(result2)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("2 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
elif result:
res = json.loads(result)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("3 realtime_elec_qual of mid %s is expired." % mid)
res = None
real_tt = now_ts
else:
log.error("realtime_elec_qual not exist")
time_str = time_format.get_datetime_str(real_tt)
except Exception as e:
log.exception(e)
raise DBException
# 加些随机变化(防止数据一直不变化)
if res:
for k in res.keys():
res[k] = add_random_change(res[k])
else:
log.error("realtime_elec_qual not exist")
return ElecCurrentResponse()
if not ctnum:
# 如果已经拆掉了,还有报文,用报文中的ctnum
ctnum = res.get("ctnum") or 3
if "ctnum" in res:
del res["ctnum"]
# 识电U只有一项有数据,返回具体的项
res["sdu_i"] = None
res["sdu_u"] = None
if "ia" in res and "ib" not in res and "ic" not in res:
res["sdu_i"] = "ia"
if ctnum == 3:
res["sdu_u"] = "ua"
if ctnum == 2:
res["sdu_u"] = "uab"
if "ib" in res and "ia" not in res and "ic" not in res:
res["sdu_i"] = "ib"
if ctnum == 3:
res["sdu_u"] = "ub"
if "ic" in res and "ia" not in res and "ib" not in res:
res["sdu_i"] = "ic"
if ctnum == 3:
res["sdu_u"] = "uc"
if ctnum == 2:
res["sdu_u"] = "ucb"
return ElecCurrentResponse(
ctnum=ctnum,
real_time=time_str,
**{
k: v
for k, v in res.items()
if k in [field.name for field in fields(ElecCurrentResponse)]
},
)
return ElecCurrentResponse(real_time=time_str,
**{k: v for k, v in res.items()
if k in [field.name for field in
fields(ElecCurrentResponse)]},
)
@summary("指标统计-指标统计-常规参数+电能质量")
......@@ -318,7 +200,7 @@ async def post_elec_index(req, body: PageRequest) -> ElecIndexResponse:
cid = req.json.get("cid")
# 1. 获取point_id
point_id = filed_value_from_list(body.filter.equals, "point_id")
if not point_id or point_id <= 0:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
......@@ -345,20 +227,20 @@ async def post_qual_history(req, body: PageRequest) -> QualHistoryResponse:
except:
log.error("param error, ranges is NULL")
raise ParamException(message="param error, ranges is NULL")
try:
intervel, slots = time_format.time_pick_transf(date_start, date_end)
except:
log.error("param error, date format error")
raise ParamException(message="param error, date format error")
point_id = filed_value_from_list(body.filter.equals, "point_id")
if not point_id or point_id <= 0:
msg = "param exception, equals is NULL, no point_id"
log.warning(msg)
raise ParamException(message=msg)
return await qual_history_service(date_start, date_end, intervel, slots,
point_id)
......@@ -417,7 +299,7 @@ async def qual_history_service(start, end, intervel, slots, pid):
else:
for stats_item in stats_items:
elec_data[stats_item].append("")
voltage_dev = VoltageDev(
**{
k.rsplit("_", 1)[0]: v
......@@ -480,7 +362,7 @@ async def qual_history_service(start, end, intervel, slots, pid):
):
sdu_i = "ic"
sdu_u = "uc"
return QualHistoryResponse(
ctnum=ctnum,
voltage_dev=voltage_dev,
......@@ -505,7 +387,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
message="param exception, equals is NULL, no point_id")
if point_id <= 0 or not point_id or cid <= 0:
log.warning("param exception, equals is NULL, no point_id")
raise ParamException(
......@@ -519,7 +401,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
# 过期时间
last_ts = pendulum.now(tz=CST).subtract(
seconds=constants.REAL_EXP_TIME).format("YYYY-MM-DD HH:mm:ss")
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric?tz={CST}"
sql = f"select last_row(*) from mt{mtid}_ele where ts>='{last_ts}'"
......@@ -541,7 +423,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
# 加些随机变化(防止数据一直不变化)
for k in res.keys():
res[k] = add_random_change(res[k])
voltage_harmonic_dict, current_harmonic_dict = {}, {}
for k in [field.name for field in fields(VoltageHarmonicRate)]:
if k in harmonic.keys():
......@@ -568,7 +450,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
"sdu_i",
"sdu_u",
]
# 识电U只有一项有数据,返回具体的项
res = get_sdu_i_and_u(res, ctnum)
if res.get("ts"):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment