Commit a2e7dd6a authored by ZZH's avatar ZZH

remove es 2023-6-5

parent f0fe05e6
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
from unify_api.modules.product_info.procedures.hardware_pds import (
get_user_hardware_info, hardware_statistics)
......
......@@ -5,8 +5,7 @@ from unify_api.modules.product_info.components.hardware_cps import (
HardwareInfoManResq, HardwareInfoListReq, HardwareInfoListResq
)
from unify_api.modules.product_info.procedures.hardware_pds import (
company_available, get_user_hardware_info, hardware_statistics,
get_user_hardware_info_new15, hardware_statistics_new15
company_available, get_user_hardware_info_new15, hardware_statistics_new15
)
from pot_libs.logger import log
from unify_api.modules.product_info.service.hardware_info import \
......@@ -16,7 +15,8 @@ from unify_api.modules.common.procedures.cids import get_cids, get_proxy_cids
@summary("获取硬件信息")
async def post_hardware_info_list(request, body: HardwareInfoReq) -> HardwareInfoRespList:
async def post_hardware_info_list(request,
body: HardwareInfoReq) -> HardwareInfoRespList:
company_id = body.cid
page_size, page_num = body.page_size, body.page_num
log.info(
......@@ -27,18 +27,21 @@ async def post_hardware_info_list(request, body: HardwareInfoReq) -> HardwareInf
return HardwareInfoRespList.user_error()
# page_map = await get_user_hardware_info(company_id, page_num, page_size)
page_map = await get_user_hardware_info_new15(company_id, page_num, page_size)
page_map = await get_user_hardware_info_new15(company_id, page_num,
page_size)
return HardwareInfoRespList(rows=page_map["rows"], total=page_map["total"])
@summary("硬件统计信息")
async def post_hardware_info_count(request, body: HardwareInfoCountReq) -> HardwareInfoCountResp:
async def post_hardware_info_count(request,
body: HardwareInfoCountReq) -> HardwareInfoCountResp:
company_id = body.cid
# statistics_info_map = await hardware_statistics(company_id)
statistics_info_map = await hardware_statistics_new15(company_id)
return HardwareInfoCountResp(
installed_number=statistics_info_map["installed_number"],
legal_measurement_number=statistics_info_map["legal_measurement_number"],
legal_measurement_number=statistics_info_map[
"legal_measurement_number"],
start_time=statistics_info_map["start_time"],
power_capacity=statistics_info_map["power_capacity"],
)
......@@ -58,7 +61,8 @@ async def post_hardware_list_sdu(req, body: HlsReq) -> HlsResp:
@summary("管理版-关于我们-统计")
async def post_hardware_info_management(req, body: HardwareInfoManReq) -> HardwareInfoManResq:
async def post_hardware_info_management(req,
body: HardwareInfoManReq) -> HardwareInfoManResq:
product = body.product
# user_id = 88 if req.ctx else req.ctx.user_id
try:
......
import json
from datetime import datetime, timedelta
from pot_libs.utils.exc_util import BusinessException
from unify_api import constants
from unify_api.modules.alarm_manager.dao.list_static_dao import \
......@@ -12,13 +10,12 @@ from unify_api.modules.home_page.procedures.count_info_pds import \
from unify_api.modules.shidianu.components.algorithm_cps import WcResp, AbcResp
from unify_api.modules.shidianu.dao.analysis_result_dao import \
query_sdu_power_wave, query_sdu_recog_record
from unify_api.modules.shidianu.procedures.output_result import get_p_list, \
get_curve_p
from unify_api.modules.shidianu.procedures.output_result import get_curve_p
from unify_api.utils.time_format import last30_day_range, \
get_start_end_by_tz_time, day_slots, get_start_end_by_tz_time_new
day_slots, get_start_end_by_tz_time_new
async def wave_curve_service_new15(point_id, req_date, product):
async def wave_curve_srv(point_id, req_date, product):
# 1,获取slots
time_slot = day_slots()
# 2. 获取sid
......
......@@ -2,14 +2,10 @@ import json
from collections import defaultdict
from datetime import datetime, timedelta
from itertools import groupby
import requests
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.settings import SETTING
from pot_libs.utils.exc_util import BusinessException
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import POINT_1MIN_INDEX, PRODUCT, ExtendModule
from unify_api.modules.common.procedures.common_cps import point_day2month
from unify_api.modules.common.service.td_engine_service import \
......@@ -24,15 +20,19 @@ from unify_api.modules.common.procedures.points import get_meter_by_point
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.modules.shidianu.procedures.power_param import power_p
from unify_api.modules.shidianu.service.IntergratePipe import hisPieceFind
from unify_api.modules.shidianu.service.electricProportion import electricProportion
from unify_api.modules.shidianu.service.electricProportion import \
electricProportion
import numpy as np
import pandas as pd
import asyncio
from unify_api.modules.shidianu.service.open_data_service import get_token
from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, params_mining
from unify_api.modules.users.procedures.user_product_auth import get_product_auth
from unify_api.utils.time_format import last_n_day, srv_time
from unify_api.modules.shidianu.service.paramAnaly import paramAnalysis, \
params_mining
from unify_api.modules.users.procedures.user_product_auth import \
get_product_auth
from unify_api.utils.time_format import (
last_n_day, srv_time, get_day_start, YMD_Hms
)
from unify_api.utils.taos_new import parse_td_columns
......@@ -143,60 +143,6 @@ def get_runtime_list(state_dict):
return runtime_list
async def get_p_list(sid, meter_sn, req_date, time_slots):
sn = meter_sn.lower()
year, month, day = req_date.split("-")
start_time = datetime(year=int(year), month=int(month), day=int(day))
end_time = start_time + timedelta(days=1)
es_start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S+08:00")
es_end_time = end_time.strftime("%Y-%m-%dT%H:%M:%S+08:00")
es_index = point_day2month(start_time)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{"term": {"sid.keyword": sid}},
{"term": {"meter_sn.keyword": {"value": str.upper(sn)}}},
{"range": {"datetime": {"gte": es_start_time, "lt": es_end_time,}}},
]
}
},
"sort": [{"datetime": {"order": "asc"}}],
"aggs": {
"datetime": {
"date_histogram": {
"field": "datetime",
"interval": "minute",
"time_zone": "+08:00",
"format": "HH:mm",
},
"aggs": {
f"p{sn}": {"stats": {"field": f"p{sn}"}},
f"i{sn}": {"stats": {"field": f"i{sn}"}},
},
}
},
}
log.info(f"query_body={query_body}")
async with EsUtil() as es:
res = await es.search_origin(body=query_body, index=es_index)
buckets = res["aggregations"]["datetime"]["buckets"]
es_result_map = {i["key_as_string"]: i for i in buckets}
p_list = []
for slot in time_slots:
if slot in es_result_map:
p = es_result_map[slot].get("p" + sn).get("max")
if p is None:
p = ""
p_list.append(round(p, 4) if type(p) in [int, float] else p)
else:
p_list.append("")
return p_list
async def get_curve_p(mtid, meter_sn, start, end, time_slots):
'''
从tdenine中获取相关数据
......@@ -218,7 +164,7 @@ async def get_curve_p(mtid, meter_sn, start, end, time_slots):
datas[slot] = data[p_field]
p_list = []
for slot in time_slots:
p = datas.get(slot,"")
p = datas.get(slot, "")
if p:
p = round(p, 4) if type(p) in [int, float] else p
p_list.append(p)
......@@ -230,20 +176,26 @@ def time_interval_class(item_obj):
return time_str
async def algorithm_result_to_front(point_id, req_date, user_id, product, detail):
meter_info = await get_meter_by_point(point_id)
async def algorithm_result_to_front(pid, req_date, user_id, product, detail):
meter_info = await get_meter_by_point(pid)
if not meter_info:
raise BusinessException(message="没有该监测点的meter信息,请联系运维人员!")
sid, meter_no = meter_info["sid"], meter_info["meter_no"]
mtid = meter_info["mtid"]
dt = datetime.strptime(req_date + " 00:00:00", "%Y-%m-%d %H:%M:%S")
time_slot = [
datetime.strftime(dt + timedelta(minutes=i), "%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
datetime.strftime(dt + timedelta(minutes=i),
"%Y-%m-%d %H:%M:%S").split(" ")[1][:5]
for i in range(1440)
]
p_list = await get_p_list(sid, meter_no, req_date, time_slot)
s_dt = get_day_start(req_date, dt_fmt="YYYY-MM-DD")
s_dts, e_dts = s_dt.format(YMD_Hms), s_dt.add(days=1).format(YMD_Hms)
p_list = await get_curve_p(mtid, meter_no, s_dts, e_dts, time_slot)
async with MysqlUtil() as conn:
sql = "select * from sdu_dev_run_anal where pid=%s and cal_day=%s"
res = await conn.fetchone(sql, args=(point_id, req_date))
res = await conn.fetchone(sql, args=(pid, req_date))
if not res:
return AlgorithmOutput(
time_slot=time_slot,
......@@ -279,7 +231,7 @@ async def algorithm_result_to_front(point_id, req_date, user_id, product, detail
# 根据配置扩展模块取不同的算法结果字段
async with MysqlUtil() as conn:
point_sql = "select pid, cid from point where pid=%s"
point_map = await conn.fetchone(point_sql, args=(point_id,))
point_map = await conn.fetchone(point_sql, args=(pid,))
if not point_map:
raise BusinessException(message="没有该监测点的信息,请联系运维人员!")
cid = point_map["cid"]
......@@ -288,10 +240,12 @@ async def algorithm_result_to_front(point_id, req_date, user_id, product, detail
cid_ext_module_map = product_auth_map["product"]
if str(cid) not in cid_ext_module_map:
log.error(f"用户user_id = {user_id} 工厂cid={cid}的权限")
raise BusinessException(message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限")
raise BusinessException(
message=f"用户user_id = {user_id} 没有工厂cid={cid}的权限")
act_info = json.loads(res["act_info"]) if res["act_info"] else {}
if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[str(cid)] and detail == 1:
if ExtendModule.AlgorithmResultDetail.value in cid_ext_module_map[
str(cid)] and detail == 1:
# 如果识电U配置了2那么给权限看新字段(内部人看的详细识别数据)
act_info = json.loads(res["act_info2"]) if res["act_info2"] else {}
for event_type, event_list in act_info.items():
......@@ -304,20 +258,25 @@ async def algorithm_result_to_front(point_id, req_date, user_id, product, detail
action_list.append(item[0])
result_dict = {
"设备运行情况": json.loads(res["dev_run_info"]) if res["dev_run_info"] is not None else [],
"运行时间段": json.loads(res["dev_run_tp"]) if res["dev_run_tp"] is not None else [],
"设备运行情况": json.loads(res["dev_run_info"]) if res[
"dev_run_info"] is not None else [],
"运行时间段": json.loads(res["dev_run_tp"]) if res[
"dev_run_tp"] is not None else [],
"行为列表": action_list,
"行为时间": action_time_list,
}
state_dict = {
"电量与时长": json.loads(res["ele_quan_dur"]) if res["ele_quan_dur"] is not None else {},
"波动": json.loads(res["dev_wave"]) if res["dev_wave"] is not None else {},
"电量与时长": json.loads(res["ele_quan_dur"]) if res[
"ele_quan_dur"] is not None else {},
"波动": json.loads(res["dev_wave"]) if res[
"dev_wave"] is not None else {},
}
log.info(f"算法结果 result_dict={result_dict} state_dict={state_dict}")
electric_action_list = [
ElectricActionItem(
action_name=action_name, action_time=result_dict["行为时间"][index].split(" ")[1][:5],
action_name=action_name,
action_time=result_dict["行为时间"][index].split(" ")[1][:5],
)
for index, action_name in enumerate(result_dict["行为列表"])
]
......@@ -349,7 +308,8 @@ async def algorithm_result_to_front(point_id, req_date, user_id, product, detail
run_period_list=[
RunPeriodtem(
running_devices=running_devices,
time_period=[i.split(" ")[1][:5] for i in result_dict["运行时间段"][index]],
time_period=[i.split(" ")[1][:5] for i in
result_dict["运行时间段"][index]],
)
for index, running_devices in enumerate(result_dict["设备运行情况"])
],
......@@ -359,7 +319,8 @@ async def algorithm_result_to_front(point_id, req_date, user_id, product, detail
async def main():
result_dict, state_dict, p_list = await output_result("A2004000192", "A", "2020-12-02")
result_dict, state_dict, p_list = await output_result("A2004000192", "A",
"2020-12-02")
print(result_dict)
print(state_dict)
......
from pot_libs.sanic_api import summary, description
from unify_api.constants import PRODUCT
from unify_api.modules.shidianu.components.algorithm_cps import (
AlgorithmOutput,
AlgorithmInput, WcReq, WcResp, AbcResp, AbcReq,
)
from unify_api.modules.shidianu.procedures.analysis_result_service import \
wave_curve_service_new15, alarm_behavior_curve_service
wave_curve_srv, alarm_behavior_curve_service
from unify_api.modules.shidianu.procedures.output_result import algorithm_result_to_front
from unify_api.utils.time_format import srv_time
......@@ -26,7 +25,7 @@ async def post_wave_curve(req, body: WcReq) -> WcResp:
point_id = body.point_id
req_date = body.req_date
product = body.product
return await wave_curve_service_new15(point_id, req_date, product)
return await wave_curve_srv(point_id, req_date, product)
@summary("识电u-用电行为-近30天统计")
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment