Commit b04fd32c authored by ZZH's avatar ZZH

remove es 2023-6-1

parent bf1a6c4f
import pandas as pd
from collections import defaultdict
from datetime import datetime
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
async def location_stats_statics(
cid, pid, start_timestamp, end_timestamp, _type="residual_current"
):
groups = {}
if _type == "residual_current":
args = (cid, "residual_current")
else:
args = (cid, "temperature")
async with MysqlUtil() as conn:
sql = "SELECT mtid, id, `group`, item, `type` FROM location WHERE cid=%s and `type`=%s"
locations = await conn.fetchall(sql, args=args)
for location in locations:
group = location.get("mtid")
groups.setdefault(group, []).append(location)
location_map = {}
async with MysqlUtil() as conn:
sql = "SELECT pid, name, mtid FROM point WHERE pid=%s"
point = await conn.fetchone(sql, args=(pid,))
if point:
# point_name = point["name"]
mtid = point["mtid"]
locations = groups.get(mtid, [])
for l in locations:
location_map.setdefault(l["item"], []).append(l["id"])
if not location_map:
log.info(f"{pid}无任何location_id")
return {}
all_location_ids = []
for item, item_location_ids in location_map.items():
all_location_ids.extend(item_location_ids)
aggs = {
"value_avg": {"avg": {"field": "value_avg"}},
"value_max": {"top_hits": {"sort": [{"value_max": {"order": "desc"}}], "size": 1}},
"value_min": {"top_hits": {"sort": [{"value_min": {"order": "asc"}}], "size": 1}},
}
query_body = {
"query": {
"bool": {
"filter": [
{"term": {"cid": cid}},
{"term": {"type.keyword": _type}},
{"terms": {"location_id": all_location_ids}},
{"range": {"time": {"gte": start_timestamp, "lte": end_timestamp,}}},
]
}
},
"size": 0,
"aggs": {},
}
if _type == "residual_current":
info_map = defaultdict(dict)
query_body["aggs"] = aggs
async with EsUtil() as es:
print("------debug", query_body)
es_results = await es.search_origin(body=query_body, index=constants.LOCATION_15MIN_AIAO)
value_min_hits = (
es_results.get("aggregations", {})
.get("value_min", {})
.get("hits", {})
.get("hits", [])
)
if value_min_hits:
min_dt = value_min_hits[0]["_source"]["value_min_time"]
info_map["漏电流"]["value_min"] = {
"value": value_min_hits[0]["_source"]["value_min"],
"time": str(datetime.strptime(min_dt, "%Y-%m-%dT%H:%M:%S+08:00")),
}
value_max_hits = (
es_results.get("aggregations", {})
.get("value_max", {})
.get("hits", {})
.get("hits", [])
)
if value_max_hits:
max_dt = value_max_hits[0]["_source"]["value_max_time"]
info_map["漏电流"]["value_max"] = {
"value": value_max_hits[0]["_source"]["value_max"],
"time": str(datetime.strptime(max_dt, "%Y-%m-%dT%H:%M:%S+08:00")),
}
info_map["漏电流"]["value_avg"] = (
es_results.get("aggregations", {}).get("value_avg", {}).get("value")
)
return info_map
elif _type == "temperature":
for item, location_ids in location_map.items():
# 温度,漏电流
query_body["aggs"][f"{item}_aggs"] = {
"filter": {"terms": {"location_id": location_ids}},
"aggs": aggs,
}
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body, index=constants.LOCATION_15MIN_AIAO)
info_map = defaultdict(dict)
if es_results:
for item in location_map.keys():
value_min_hits = (
es_results.get("aggregations", {})
.get(f"{item}_aggs")
.get("value_min", {})
.get("hits", {})
.get("hits", [])
)
if value_min_hits:
min_dt = value_min_hits[0]["_source"]["value_min_time"]
info_map[f"{item}温度"]["value_min"] = {
"value": value_min_hits[0]["_source"]["value_min"],
"time": str(datetime.strptime(min_dt, "%Y-%m-%dT%H:%M:%S+08:00")),
}
value_max_hits = (
es_results.get("aggregations", {})
.get(f"{item}_aggs")
.get("value_max", {})
.get("hits", {})
.get("hits", [])
)
if value_max_hits:
max_dt = value_max_hits[0]["_source"]["value_max_time"]
info_map[f"{item}温度"]["value_max"] = {
"value": value_max_hits[0]["_source"]["value_max"],
"time": str(datetime.strptime(max_dt, "%Y-%m-%dT%H:%M:%S+08:00")),
}
info_map[f"{item}温度"]["value_avg"] = (
es_results.get("aggregations", {})
.get(f"{item}_aggs")
.get("value_avg", {})
.get("value")
)
return info_map
async def location_stats_statics_new15(table_name, cid, start, end):
async def location_stats_statics(table_name, cid, start, end):
sql = "SELECT mtid,lid,item,ad_type FROM location WHERE cid=%s"
location_map = {}
async with MysqlUtil() as conn:
locations = await conn.fetchall(sql, args=(cid, ))
locations = await conn.fetchall(sql, args=(cid,))
if not locations:
return location_map
for loca in locations:
......@@ -164,14 +15,15 @@ async def location_stats_statics_new15(table_name, cid, start, end):
f" BETWEEN '{start}' and '{end}' order by create_time desc"
lids = list(location_map.keys())
async with MysqlUtil() as conn:
results = await conn.fetchall(datas_sql, args=(lids, ))
results = await conn.fetchall(datas_sql, args=(lids,))
if not results:
return {}
df = pd.DataFrame(list(results))
for lid in lids:
max_value = df.loc[df["lid"]==lid].value_max.max()
max_value = df.loc[df["lid"] == lid].value_max.max()
if not pd.isna(max_value):
max_datas = df.loc[df.loc[df["lid"]==lid].value_max.idxmax()].to_dict()
max_datas = df.loc[
df.loc[df["lid"] == lid].value_max.idxmax()].to_dict()
max_value_time = max_datas.get("value_max_time")
max_value_time = '' if pd.isnull(max_value_time) else str(
max_value_time)
......@@ -180,7 +32,8 @@ async def location_stats_statics_new15(table_name, cid, start, end):
max_value, max_value_time = "", ""
min_value = df.loc[df["lid"] == lid].value_min.min()
if not pd.isna(min_value):
min_datas = df.loc[df.loc[df["lid"]==lid].value_min.idxmin()].to_dict()
min_datas = df.loc[
df.loc[df["lid"] == lid].value_min.idxmin()].to_dict()
min_value_time = min_datas.get("value_min_time")
min_value_time = '' if pd.isnull(min_value_time) else str(
min_value_time)
......
import json
import time
import aioredis
import re
from pot_libs.settings import SETTING
from unify_api.modules.electric.procedures.electric_util import \
add_random_change
from pot_libs.logger import log
from unify_api import constants
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.utils.taos_new import parse_td_columns
METERDATA_CURRENT_KEY = "meterdata_current"
METERDATA_CURRENT_HR_KEY = "meterdata_hr_current"
async def elec_current_data(point_mid):
"""用电监测实时值"""
now_ts = int(time.time())
res = None
# pipline查询redis
redis = await aioredis.create_redis_pool(SETTING.redis_single)
tr1 = redis.multi_exec()
for point, mid_ctnum in point_mid.items():
mid = mid_ctnum["mid"]
tr1.hget(METERDATA_CURRENT_KEY, mid)
res_p1 = await tr1.execute()
tr2 = redis.multi_exec()
for point, mid_ctnum in point_mid.items():
mid = mid_ctnum["mid"]
tr2.hget(METERDATA_CURRENT_HR_KEY, mid)
res_p2 = await tr2.execute()
i = 0
ret_dic = {}
for point, mid_ctnum in point_mid.items():
result = res_p1[i]
result2 = res_p2[i]
i += 1
mid = mid_ctnum["mid"]
ctnum = mid_ctnum["ctnum"]
# 原本的计算逻辑
if result and result2:
res = json.loads(result)
res2 = json.loads(result2)
real1_tt = res.get("timestamp")
real_tt = res2.get("timestamp")
if (
real1_tt
and real_tt
and now_ts - real1_tt <= constants.REAL_EXP_TIME
and now_ts - real_tt <= constants.REAL_EXP_TIME
):
for k in res2.keys():
if not k in res.keys():
res[k] = res2[k]
else:
log.info("1 realtime_elec_qual of mid %s is expired." % mid)
res = None
elif result2:
res = json.loads(result2)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("2 realtime_elec_qual of mid %s is expired." % mid)
res = None
elif result:
res = json.loads(result)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("3 realtime_elec_qual of mid %s is expired." % mid)
res = None
else:
log.error("realtime_elec_qual not exist")
# 加些随机变化(防止数据一直不变化)
if res:
for k in res.keys():
res[k] = add_random_change(res[k])
else:
log.error("realtime_elec_qual not exist")
ret_dic[mid] = None
continue
if not ctnum:
# 如果已经拆掉了,还有报文,用报文中的ctnum
ctnum = res.get("ctnum") or 3
if "ctnum" in res:
del res["ctnum"]
# 识电U只有一项有数据,返回具体的项
res["sdu_i"] = None
res["sdu_u"] = None
if "ia" in res and "ib" not in res and "ic" not in res:
res["sdu_i"] = "ia"
if ctnum == 3:
res["sdu_u"] = "ua"
if ctnum == 2:
res["sdu_u"] = "uab"
if "ib" in res and "ia" not in res and "ic" not in res:
res["sdu_i"] = "ib"
if ctnum == 3:
res["sdu_u"] = "ub"
if "ic" in res and "ia" not in res and "ib" not in res:
res["sdu_i"] = "ic"
if ctnum == 3:
res["sdu_u"] = "uc"
if ctnum == 2:
res["sdu_u"] = "ucb"
ret_dic[mid] = res
return ret_dic
async def qual_current_data(point_mid):
"""电能质量实时值"""
now_ts = int(time.time())
res = None
# pipline查询redis
redis = await aioredis.create_redis_pool(SETTING.redis_single)
tr1 = redis.multi_exec()
for point, mid_ctnum in point_mid.items():
mid = mid_ctnum["mid"]
tr1.hget(METERDATA_CURRENT_KEY, mid)
res_p1 = await tr1.execute()
tr2 = redis.multi_exec()
for point, mid_ctnum in point_mid.items():
mid = mid_ctnum["mid"]
tr2.hget(METERDATA_CURRENT_HR_KEY, mid)
res_p2 = await tr2.execute()
i = 0
ret_dic = {}
for point, mid_ctnum in point_mid.items():
result = res_p1[i]
result2 = res_p2[i]
i += 1
mid = mid_ctnum["mid"]
ctnum = mid_ctnum["ctnum"]
# 原本的计算逻辑
if result and result2:
res = json.loads(result)
res2 = json.loads(result2)
real1_tt = res.get("timestamp")
real_tt = res2.get("timestamp")
if (
real1_tt
and real_tt
and now_ts - real1_tt <= constants.REAL_EXP_TIME
and now_ts - real_tt <= constants.REAL_EXP_TIME
):
for k in res2.keys():
if not k in res.keys():
res[k] = res2[k]
else:
log.info("1 realtime_power_qual of mid %s is expired." % mid)
res = None
elif result2:
res = json.loads(result2)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("2 realtime_power_qual of mid %s is expired." % mid)
res = None
elif result:
res = json.loads(result)
real_tt = res.get("timestamp")
if real_tt and now_ts - real_tt <= constants.REAL_EXP_TIME:
res = res
else:
log.info("3 realtime_power_qual of mid %s is expired." % mid)
res = None
else:
log.error("realtime_elec_qual not exist")
# 加些随机变化(防止数据一直不变化)
if res:
for k in res.keys():
res[k] = add_random_change(res[k])
else:
log.error("realtime_power_quality not exist")
ret_dic[mid] = None
continue
if not ctnum:
# 如果已经拆掉了,还有报文,用报文中的ctnum
ctnum = res.get("ctnum") or 3
if "ctnum" in res:
del res["ctnum"]
# 识电U只有一项有数据,返回具体的项
res["sdu_i"] = None
res["sdu_u"] = None
if ctnum == 3:
if "ua_dev" in res and "ub_dev" not in res and "uc_dev" not in res:
res["sdu_i"] = "ia"
res["sdu_u"] = "ua"
if "ub_dev" in res and "ua_dev" not in res and "uc_dev" not in res:
res["sdu_i"] = "ib"
res["sdu_u"] = "ub"
if "uc_dev" in res and "ub_dev" not in res and "ua_dev" not in res:
res["sdu_i"] = "ic"
res["sdu_u"] = "uc"
if ctnum == 2:
if "uab_dev" in res and "ucb_dev" not in res:
res["sdu_i"] = "ia"
res["sdu_u"] = "uab"
if "ucb_dev" in res and "uab_dev" not in res:
res["sdu_i"] = "ic"
res["sdu_u"] = "ucb"
ret_dic[mid] = res
return ret_dic
async def elec_current_data_new15(mtids, cid):
async def elec_current_data(mtids, cid):
res_map = {}
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
......
import pandas as pd
from pot_libs.settings import SETTING
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, \
LF_THRESHOLD, THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD
from unify_api.constants import (
POINT_LEVEL_MAP, U_THRESHOLD, COSTTL_THRESHOLD, LF_THRESHOLD,
THDU_THRESHOLD, BL_THRESHOLD, THDI_THRESHOLD
)
from unify_api.modules.common.procedures.points import points_by_storeys, \
get_meter_by_point
from unify_api.modules.common.service.td_engine_service import \
......@@ -11,22 +12,14 @@ from unify_api.modules.common.service.td_engine_service import \
from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points, get_electric_datas_dao
from unify_api.modules.electric.procedures.electric_pds import \
elec_current_data, qual_current_data, elec_current_data_new15
from unify_api.modules.electric.procedures.electric_util import \
batch_get_wiring_type
elec_current_data
from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2, round_4, multiplication_two
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type, get_wiring_type_new15
from pot_libs.utils.pendulum_wrapper import my_pendulum
from datetime import datetime
from pot_libs.es_util.es_query import EsQuery
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api.modules.common.procedures.location_temp_rcurrent import \
location_stats_statics, location_stats_statics_new15
from unify_api import constants
from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
location_stats_statics
from unify_api.modules.electric.components.electric import (
ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
)
......@@ -44,7 +37,7 @@ async def elec_current_storeys_service(storeys):
# point_mid = await batch_get_wiring_type(points)
# # 3. 获取redis数据
# res = await elec_current_data(point_mid)
res = await elec_current_data_new15(mtids, cid)
res = await elec_current_data(mtids, cid)
# 4. 返回数据
elec_data = {}
for info in point_list:
......@@ -131,7 +124,7 @@ async def qual_current_storeys_service(storeys):
# res = await qual_current_data(point_mid)
mtids = [point["mtid"] for point in point_list if point["mtid"]]
cid = point_list[0]['cid'] if len(point_list) > 0 else 0
res = await elec_current_data_new15(mtids, cid)
res = await elec_current_data(mtids, cid)
# 4. 返回数据
qual_data = {}
for info in point_list:
......@@ -215,7 +208,7 @@ async def elec_card_level_service(point_list):
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
results = await elec_current_data_new15(mtids, cid)
results = await elec_current_data(mtids, cid)
# 4. 返回数据
ret_data = {
"inline": [],
......@@ -287,7 +280,7 @@ async def elec_card_level_service(point_list):
"ub_dev": "",
"uc_dev": "",
}
ret_data[m_type].append(res_dic)
return EclResp(
inline=ret_data["inline"],
......@@ -309,7 +302,7 @@ async def qual_current_level_service(point_list):
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
res = await elec_current_data_new15(mtids, cid)
res = await elec_current_data(mtids, cid)
# 4. 返回数据
ret_data = {
"inline": [],
......@@ -327,15 +320,15 @@ async def qual_current_level_service(point_list):
# 初始化返回dic
if res.get(mtid):
time_str = res[mtid]["ts"][:-4]
fdia = round_2(res[mtid].get("fdia"))
fdib = round_2(res[mtid].get("fdib"))
fdic = round_2(res[mtid].get("fdic"))
thdia = round_4(res[mtid].get("thdia"))
thdib = round_4(res[mtid].get("thdib"))
thdic = round_4(res[mtid].get("thdic"))
res_dic = {
"name": m_name,
"point_id": point_id,
......@@ -345,7 +338,7 @@ async def qual_current_level_service(point_list):
"thdia": thdia,
"thdib": thdib,
"thdic": thdic,
"thdua": round_4(res[mtid].get("thdua")),
"thdub": round_4(res[mtid].get("thdub")),
"thduc": round_4(res[mtid].get("thduc")),
......@@ -386,7 +379,7 @@ async def qual_current_level_service(point_list):
"thdia": "",
"thdib": "",
"thdic": "",
"thdua": "",
"thdub": "",
"thduc": "",
......@@ -410,7 +403,7 @@ async def qual_current_level_service(point_list):
"thdib_virtual": "",
"thdic_virtual": "",
}
ret_data[m_type].append(res_dic)
return QclResp(
inline=ret_data["inline"],
......@@ -562,8 +555,8 @@ async def elec_index_service(cid, point_id, start, end):
elec_qual_indexes.append(elec_index)
# 小程序需要这漏电流和温度
if cid:
location_datas = await location_stats_statics_new15(redi_table_name,
cid, start, end)
location_datas = await location_stats_statics(redi_table_name, cid,
start, end)
if location_datas:
for key, value in location_datas.items():
if value["ad_type"] == "residual_current":
......@@ -604,11 +597,11 @@ async def elec_current_service(point_id):
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
# 识电U只有一项有数据,返回具体的项
ctnum = res.get("ctnum") or 3
res = get_sdu_i_and_u(res, ctnum)
if res.get("ts"):
time_str = str(res["ts"])[0:19]
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment