Commit b3f0d6ca authored by ZZH's avatar ZZH

real time adio get from redis 2023-6-21

parent 9a60c699
......@@ -4,11 +4,16 @@
# Date: 2020/7/9
import json
import time
import pendulum
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.sanic_api import summary, description, examples
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.settings import SETTING
from pot_libs.logger import log
from unify_api.utils import time_format
from unify_api.utils.time_format import CST, YMD_Hms, timestamp2dts
from unify_api import constants
from unify_api.utils.common_utils import round_2
from unify_api.modules.adio.components.adio import (
......@@ -134,39 +139,36 @@ async def post_adio_current(req, body: PageRequest) -> AdioCurrentResponse:
if not location_info:
log.warning("location_id error location_info empty")
return AdioCurrentResponse(temperature=[], residual_current=[])
# 获取mtid信息
mtid = list(location_info.values())[0]['mtid']
# 读取tdengine里面的数据
aido_data = await get_adio_current_data(mtid)
if not aido_data:
# load real time adio
lids = list(location_info.keys())
prefix = f"real_time:adio:{SETTING.mysql_db}"
keys = [f"{prefix}:{lid}" for lid in lids]
rt_rlts = await RedisUtils().mget(keys)
rt_adios = [json.loads(r) for r in rt_rlts if r]
d_rt_adio = {adio["lid"]: adio for adio in rt_adios}
if not d_rt_adio:
return AdioCurrentResponse(temperature=[], residual_current=[])
temperature = []
residual_current = []
trans_field = {"A相": "temp1", "B相": "temp2", "C相": "temp3",
"N线": "temp4"}
for location_id, item_info in location_info.items():
time_str = aido_data.get('ts')[:19]
item = item_info.get("item", "")
if item_info.get("type") == "residual_current":
adio_current = AdioCurrent(
type="residual_current",
item="漏电流",
real_time=time_str,
value=aido_data.get("residual_current")
)
residual_current.append(adio_current)
else:
type_filed = trans_field.get(item)
adio_current = AdioCurrent(
type="temperature",
item=item,
real_time=time_str,
value=aido_data.get(type_filed),
)
temperature.append(adio_current)
return AdioCurrentResponse(temperature=temperature,
residual_current=residual_current)
temp_lst = []
rc_lst = []
for lid, loc_info in location_info.items():
if lid not in d_rt_adio:
ts = pendulum.now(tz=CST).int_timestamp
ad_value = ""
else:
ts, ad_value = d_rt_adio[lid]["ts"], d_rt_adio[lid]["v"]
time_str = timestamp2dts(ts, YMD_Hms)
if loc_info.get("type") == "residual_current":
adio_current = AdioCurrent(type="residual_current", item="漏电流",
real_time=time_str, value=ad_value)
rc_lst.append(adio_current)
else:
adio_current = AdioCurrent(type="temperature",
item=loc_info.get("item", ""),
real_time=time_str, value=ad_value, )
temp_lst.append(adio_current)
return AdioCurrentResponse(temperature=temp_lst, residual_current=rc_lst)
@summary("返回安全监测实时参数(老的)")
......
......@@ -174,24 +174,22 @@ async def post_aver_elec_price(req, body: AverPriceReq) -> AverPriceResp:
@summary('首页今日本月电量电费')
async def post_index_charge(req, body: IndexChargeReq) -> IndexChargeResp:
cid = body.cid
point_id = body.point_id
today_start, today_end, month_start, month_end = today_month_date()
pid = body.point_id
s_today, e_today, s_month, e_month = today_month_date()
# 1. 今日电量电费spvf
kwh_t, charge_t = await power_overview(today_start, today_end, point_id,
cid)
kwh_t, charge_t = await power_overview(s_today, e_today, pid, cid)
today_spvf = PowerViewRes(power=kwh_t, charge=charge_t)
# 2. 本月电量电费spvf
kwh_m, charge_m = await power_overview(month_start, month_end, point_id,
cid)
kwh_m, charge_m = await power_overview(s_month, e_month, pid, cid)
month_spvf = PowerViewRes(power=kwh_m, charge=charge_m)
# 3. 今日平均电价和增长率
this_ck_t, last_ck_t = await avg_ele_price(today_start, today_end,
point_id, cid,
this_ck_t, last_ck_t = await avg_ele_price(s_today, e_today, pid, cid,
date_type="day")
today_power = AverPriceResp(this_power=this_ck_t, last_power=last_ck_t)
# 4. 本月平均电价和增长率
this_ck_m, last_ck_m = await avg_ele_price(month_start, month_end,
point_id, cid,
this_ck_m, last_ck_m = await avg_ele_price(s_month, e_month, pid, cid,
date_type="month")
month_power = AverPriceResp(this_power=this_ck_m, last_power=last_ck_m)
return IndexChargeResp(today_spvf=today_spvf, month_spvf=month_spvf,
......@@ -278,12 +276,12 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
log.info(f"未查询到工厂userId:{user_id},product:{product}"
f",proxy_id:{proxy_id}")
return MtpResp()
today_start, today_end, month_start, month_end = today_month_date()
s_today, e_today, s_month, e_month = today_month_date()
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
last_month_start, last_month_end = last_time_str(s_month, e_month,
"month", True)
this_month_p, this_month_charge = await power_overview_proxy(
month_start, month_end, cid_list)
s_month, e_month, cid_list)
last_month_p, last_month_charge = await power_overview_proxy(
last_month_start, last_month_end, cid_list)
......@@ -296,12 +294,12 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
month_power_rate = (this_month_total_power -
last_month_total_power) / last_month_total_power
# 2. 今日/昨日数据
last_day_start, last_day_end = last_time_str(today_start, today_end, "day")
this_day_p, this_day_charge = await power_overview_proxy(today_start,
today_end,
last_day_start, last_day_end = last_time_str(s_today, e_today, "day")
this_day_p, this_day_charge = await power_overview_proxy(s_today, e_today,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy(last_day_start,
last_day_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy(
last_day_start, last_day_end, cid_list)
if not all([this_day_p, this_day_charge, last_day_p, last_day_charge]):
return MtpResp()
......@@ -343,8 +341,9 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
end = now_date
# 2. 查询工厂电量电费信息
kwh_list, charge_list, price_list = await power_aggs_cid_proxy(
start, end, cid_list, date_type)
kwh_list, charge_list, price_list = await power_aggs_cid_proxy(start, end,
cid_list,
date_type)
kwh_list_st = sorted(kwh_list, key=lambda i: i['value'], reverse=True)
charge_list_st = sorted(charge_list, key=lambda i: i['value'],
reverse=True)
......@@ -365,19 +364,19 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
if not cid_list:
log.info(f"未查询到工厂, userId:{user_id} product:{product}")
return IpspResp()
today_start, today_end, month_start, month_end = today_month_date()
s_today, e_today, s_month, e_month = today_month_date()
# 2. 获取今日数据
kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy(
today_start, today_end, cid_list, "day")
kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'],
reverse=True)[:5]
s_today, e_today, cid_list, "day")
kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'], reverse=True)[
:5]
charge_list_d_st = sorted(charge_list_d, key=lambda i: i['value'],
reverse=True)[:5]
price_list_d_st = sorted(price_list_d, key=lambda i: i['value'],
reverse=True)[:5]
# 2. 获取本月数据
kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy(
month_start, month_end, cid_list, "month")
s_month, e_month, cid_list, "month")
kwh_list_m_st = sorted(kwh_list_m, key=lambda i: i['value'],
reverse=True)[:5]
charge_list_m_st = sorted(charge_list_m, key=lambda i: i['value'],
......
......@@ -24,7 +24,7 @@ def check_value_is_null(value):
return value in (None, "null", "NULL", "")
async def get_user_hardware_info(company_id, page_num, page_size):
async def get_user_hardware_info(cid, page_num, page_size):
# 1. 获取总的point数量
async with MysqlUtil() as conn:
point_sql_tmp = "select p.pid, p.name, p.create_time, p.update_time " \
......@@ -33,7 +33,7 @@ async def get_user_hardware_info(company_id, page_num, page_size):
"left join monitor_reuse m on m.mtid = c.mtid " \
"where c.demolished=0 " \
"and (p.cid =%s or m.cid = %s) "
points_tmp = await conn.fetchall(sql=point_sql_tmp, args=(company_id,company_id))
points_tmp = await conn.fetchall(point_sql_tmp, (cid, cid))
point_ids_tmp = [point["pid"] for point in points_tmp]
# 拆表
point_mid_map_tmp, point_count = await point_to_mid(point_ids_tmp)
......@@ -44,7 +44,8 @@ async def get_user_hardware_info(company_id, page_num, page_size):
point_sql = "select pid, name, create_time, update_time, position " \
"from point where cid = %s or pid in %s " \
"limit %s offset %s"
points = await conn.fetchall(sql=point_sql, args=(company_id, tuple(point_id_tmp), page_size, offset))
points = await conn.fetchall(sql=point_sql, args=(
cid, tuple(point_id_tmp), page_size, offset))
point_ids = [point["pid"] for point in points]
# 获取所有poin_id和mid对应关系
point_mid_map, point_count_limit = await point_to_mid(point_ids)
......@@ -57,7 +58,8 @@ async def get_user_hardware_info(company_id, page_num, page_size):
meter_changes = []
if point_ids:
async with MysqlUtil() as conn:
meter_changes = await conn.fetchall(meter_change_sql, args=(tuple(point_ids),))
meter_changes = await conn.fetchall(meter_change_sql,
args=(tuple(point_ids),))
point_meter_map = defaultdict(dict)
for i in meter_changes:
point_meter_map[i["pid"]] = i
......@@ -70,7 +72,7 @@ async def get_user_hardware_info(company_id, page_num, page_size):
meters = await conn.fetchall(meter_sql, args=(tuple(meter_ids),))
async with MysqlUtil() as conn:
meter_param_record_sql ="""SELECT
meter_param_record_sql = """SELECT
mtid,
ptr,
ctr,
......@@ -105,7 +107,8 @@ async def get_user_hardware_info(company_id, page_num, page_size):
# 组装pid和meter的对应关系
for point_id, meter_change in point_meter_map.items():
point_meter_map[point_id] = meter_map[meter_change["mtid"]] if meter_change["mtid"] else {}
point_meter_map[point_id] = meter_map[meter_change["mtid"]] if \
meter_change["mtid"] else {}
datas = []
for point in points:
......@@ -147,7 +150,8 @@ async def get_user_hardware_info(company_id, page_num, page_size):
data["ct_change"] = meter_param["ctr"]
data["wiring_type"] = "两表法" if meter_param["ctnum"] == 2 else "三表法"
if not check_value_is_null(meter_param["voltage_side"]):
data["high_or_low_side"] = "高压侧" if meter_param["voltage_side"] == 1 else "低压侧"
data["high_or_low_side"] = "高压侧" if meter_param[
"voltage_side"] == 1 else "低压侧"
if not check_value_is_null(meter_param["tc"]):
data["inline_capacity"] = meter_param["tc"]
if not check_value_is_null(meter_param["vc"]):
......@@ -165,7 +169,7 @@ async def get_user_hardware_info_new15(company_id, page_num, page_size):
"on p.mtid=m.mtid where m.demolished=0 and p.cid=%s " \
"ORDER BY p.create_time desc"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(company_id, ))
datas = await conn.fetchall(sql=sql, args=(company_id,))
results = []
if page_num > 0 and page_size > 0:
start = (page_num - 1) * page_size
......@@ -208,28 +212,20 @@ async def hardware_statistics(company_id):
:param company_id:
:return:
"""
# async with MysqlUtil() as conn:
# point_sql = "select count(*) as point_count from point where cid = %s"
# point_count_map = await conn.fetchone(sql=point_sql, args=(company_id,))
#
# point_count = point_count_map["point_count"]
async with MysqlUtil() as conn:
point_sql = "select p.pid from point p " \
sql = "select p.pid from point p " \
"left JOIN monitor c on p.mtid = c.mtid " \
"left join monitor_reuse m on c.mtid = m.mtid " \
"where c.demolished=0 and " \
"(p.cid =%s or m.cid=%s)"
points = await conn.fetchall(sql=point_sql,
args=(company_id, company_id))
points = await conn.fetchall(sql, (company_id, company_id))
point_ids = [point["pid"] for point in points]
mid_info, point_count = await point_to_mid(point_ids)
async with MysqlUtil() as conn:
inline_sql = "select count(*) as inline_count from inline where " \
"cid = %s"
inline_count_map = await conn.fetchone(sql=inline_sql,
args=(company_id,))
inline_count_map = await conn.fetchone(inline_sql, (company_id,))
inline_count = inline_count_map["inline_count"]
async with MysqlUtil() as conn:
......@@ -242,8 +238,7 @@ async def hardware_statistics(company_id):
# power_capacity供电容量字段计算错误,改为inline表的tc_runtime字段相加
async with MysqlUtil() as conn:
capacity_sql = "select tc_runtime from inline where cid = %s"
capacity_list = await conn.fetchall(sql=capacity_sql,
args=(company_id,))
capacity_list = await conn.fetchall(capacity_sql, (company_id,))
power_capacity = 0
for capacity in capacity_list:
if capacity and capacity.get("tc_runtime"):
......
......@@ -294,7 +294,7 @@ def last_15min_range():
def today_month_date():
"""今日本月和上一周期时间 今天10:00/昨日10:00"""
# today_start, today_end, month_start, month_end
now = pendulum.now()
now = pendulum.now(tz=CST)
# 今天开始到结束时间
today_end = str(now.format(YMD_Hms))
today_start = str(now.start_of('day').format(YMD_Hms))
......@@ -1491,3 +1491,8 @@ def get_time_diff(start, end):
def get_15min_ago(time_fmt="YYYY-MM-DD HH:mm:ss"):
return my_pendulum.now(tz=CST).subtract(minutes=15).format(time_fmt)
def timestamp2dts(timestamp, dt_fmt):
dt = pendulum.from_timestamp(timestamp, tz=CST)
return dt.format(dt_fmt)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment