Commit 0bc47465 authored by ZZH's avatar ZZH

remove es 2023-6-2

parent 448a64af
import asyncio import asyncio
import datetime import datetime
import json import json
import math
from pot_libs.qingstor_util.qs_client import QsClient from pot_libs.qingstor_util.qs_client import QsClient
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeDetail
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao
from unify_api.modules.anshiu.dao.scope_operations_dao import \ from unify_api.modules.anshiu.dao.scope_operations_dao import \
get_scope_url_by_pid get_scope_url_by_pid
from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type_new15
from unify_api.utils.log_utils import LOGGER from unify_api.utils.log_utils import LOGGER
......
import json import json
import math import math
from pot_libs.qingstor_util.qs_client import QsClient from pot_libs.qingstor_util.qs_client import QsClient
import pandas as pd
from pandas.core.dtypes.inference import is_number
from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin from unify_api.modules.zhiwei_u.fault_foreast.actionFile import actionFilemin
from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg from unify_api.modules.zhiwei_u.fault_foreast.test import leakage_reg
from unify_api.utils.log_utils import LOGGER from unify_api.utils.log_utils import LOGGER
...@@ -16,8 +13,6 @@ from pot_libs.es_util.es_utils import EsUtil ...@@ -16,8 +13,6 @@ from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log from pot_libs.logger import log
from pot_libs.utils.exc_util import BusinessException from pot_libs.utils.exc_util import BusinessException
from pot_libs.utils.pendulum_wrapper import my_pendulum from pot_libs.utils.pendulum_wrapper import my_pendulum
from pot_libs.utils.time_format import convert_dt_to_timestr, \
convert_to_es_str, time_str_to_str
from unify_api.modules.anshiu.components.scope_operations_cps import \ from unify_api.modules.anshiu.components.scope_operations_cps import \
ScopeListItem, ScopeContent, ScopeDetailsResp, GetScopeConfigList, \ ScopeListItem, ScopeContent, ScopeDetailsResp, GetScopeConfigList, \
init_scope_config_example, ScopeItemDownload, ScopeDetail, ScopeDetails init_scope_config_example, ScopeItemDownload, ScopeDetail, ScopeDetails
...@@ -27,16 +22,12 @@ from unify_api.modules.anshiu.dao.scope_operations_dao import \ ...@@ -27,16 +22,12 @@ from unify_api.modules.anshiu.dao.scope_operations_dao import \
from unify_api.modules.anshiu.procedures.scope_operations_pds import \ from unify_api.modules.anshiu.procedures.scope_operations_pds import \
get_scope_config_by_pid, set_scope_config_by_pid, add_scope_config_by_pid, \ get_scope_config_by_pid, set_scope_config_by_pid, add_scope_config_by_pid, \
get_scope_list_by_pid get_scope_list_by_pid
from unify_api.modules.common.dao.common_dao import point_by_points, \
points_by_cid
from unify_api.modules.device_cloud.procedures.mqtt_helper import \ from unify_api.modules.device_cloud.procedures.mqtt_helper import \
change_param_to_config change_param_to_config
from unify_api.modules.electric.procedures.electric_util import \ from unify_api.modules.electric.procedures.electric_util import \
get_wiring_type_new15 get_wiring_type_new15
from unify_api.modules.zhiwei_u import config from unify_api.modules.zhiwei_u import config
from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope_pids, \ from unify_api.modules.zhiwei_u.dao.data_es_dao import query_search_scope
query_search_scope
from unify_api.utils import time_format
from unify_api.utils.time_format import get_time_duration, \ from unify_api.utils.time_format import get_time_duration, \
get_current_datetime_str, convert_str_to_timestamp get_current_datetime_str, convert_str_to_timestamp
......
...@@ -359,7 +359,7 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"): ...@@ -359,7 +359,7 @@ async def get_fields_by_mtid(mtid, table_name="monitor", fields="m_type"):
return result return result
async def sql_point_15min_index_new15(start, end, pid): async def load_point_pttl_mean(start, end, pid):
sql = f"SELECT pttl_mean, create_time FROM `point_15min_electric` " \ sql = f"SELECT pttl_mean, create_time FROM `point_15min_electric` " \
f"where pid=%s and create_time BETWEEN '{start}' and '{end}'" f"where pid=%s and create_time BETWEEN '{start}' and '{end}'"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
......
...@@ -8,8 +8,7 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \ ...@@ -8,8 +8,7 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points query_charge_aggs_points
from unify_api.modules.electric.dao.electric_dao import \ from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points monitor_point_join_by_points
from unify_api.modules.home_page.procedures.count_info_pds import current_load, \ from unify_api.modules.home_page.procedures.count_info_pds import real_time_load
current_load_new15
from unify_api.utils.common_utils import round_2, division_two from unify_api.utils.common_utils import round_2, division_two
...@@ -107,9 +106,9 @@ async def kwh_card_level_service(cid, point_list, start, end): ...@@ -107,9 +106,9 @@ async def kwh_card_level_service(cid, point_list, start, end):
async def load_info_service(cid_list): async def load_info_service(cid_list):
# 实时负荷 # 实时负荷
cur_load = await current_load_new15(cid_list) cur_load = await real_time_load(cid_list)
yesterday_dt = pendulum.now(tz="Asia/Shanghai").subtract(days=1) yesterday_dt = pendulum.now(tz="Asia/Shanghai").subtract(days=1)
yes_load = await current_load_new15(cid_list, yesterday_dt) yes_load = await real_time_load(cid_list, yesterday_dt)
load_percent = round((cur_load - yes_load) / yes_load, load_percent = round((cur_load - yes_load) / yes_load,
2) if cur_load and yes_load else "" 2) if cur_load and yes_load else ""
return cur_load, yes_load, load_percent return cur_load, yes_load, load_percent
...@@ -260,60 +260,7 @@ async def normal_rate_of_location(cid): ...@@ -260,60 +260,7 @@ async def normal_rate_of_location(cid):
return temperature_qr, residual_current_qr return temperature_qr, residual_current_qr
async def current_load(company_id): async def real_time_load(cid, end_dt=None):
"""
实时负荷
:param company_id:
:return:
"""
async with MysqlUtil() as conn:
point_sql = "select pid from point where cid= %s " \
"and add_to_company = 1"
points = await conn.fetchall(point_sql, args=(company_id,))
point_ids = [p["pid"] for p in points]
if not point_ids:
return ""
async with MysqlUtil() as conn:
meter_sql = (
"SELECT pid, mid FROM change_meter_record WHERE pid in %s ORDER BY pid, start_time"
)
change_meters = await conn.fetchall(meter_sql,
args=(tuple(point_ids),))
# 正序排序,最后这个map存储的是按照start_time是最近的mid
change_meter_map = {m["pid"]: m["mid"] for m in change_meters if
m["mid"] is not None}
newest_mids = list(change_meter_map.values())
meterdata_currents = []
if newest_mids:
meterdata_currents = await RedisUtils().hmget(METERDATA_CURRENT_KEY,
*newest_mids)
now_tt = int(time.time())
if meterdata_currents:
total = 0
for item in meterdata_currents:
# 这里是有可能item为None的
if item:
item = json.loads(item.decode())
mdptime_tt = None
if "mdptime" in item:
mdptime = datetime.strptime(item["mdptime"],
"%Y-%m-%d %H:%M:%S")
mdptime_tt = time.mktime(mdptime.timetuple())
item_tt = item.get("timestamp") or mdptime_tt
if item_tt:
# 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60:
total += item["pttl"]
return total
return ""
async def current_load_new15(cid, end_dt=None):
"""实时负荷""" """实时负荷"""
datas = await get_elec_mtid_sid_by_cid(cid) datas = await get_elec_mtid_sid_by_cid(cid)
td_mt_tables = tuple( td_mt_tables = tuple(
...@@ -362,7 +309,7 @@ async def power_count_info(cid): ...@@ -362,7 +309,7 @@ async def power_count_info(cid):
end_time = now.strftime("%Y-%m-%d %H:%M:%S") end_time = now.strftime("%Y-%m-%d %H:%M:%S")
max_30d_load, _time = await pttl_max_new15(cid, start_time, end_time, -1) max_30d_load, _time = await pttl_max_new15(cid, start_time, end_time, -1)
cur_load = await current_load_new15(cid) cur_load = await real_time_load(cid)
return round_2(cur_load), round_2(max_30d_load) return round_2(cur_load), round_2(max_30d_load)
......
...@@ -19,10 +19,10 @@ from unify_api.modules.home_page.dao.count_info_dao import \ ...@@ -19,10 +19,10 @@ from unify_api.modules.home_page.dao.count_info_dao import \
alarm_aggs_point_location alarm_aggs_point_location
from unify_api.modules.home_page.procedures.count_info_pds import other_info, \ from unify_api.modules.home_page.procedures.count_info_pds import other_info, \
electric_use_info, cid_alarm_importance_count, \ electric_use_info, cid_alarm_importance_count, \
alarm_importance_count_total, current_load, \ alarm_importance_count_total, \
get_company_charge_price, health_status_res, carbon_status_res_web, \ get_company_charge_price, health_status_res, carbon_status_res_web, \
optimization_count_info, economic_index_desc, \ optimization_count_info, economic_index_desc, \
cal_power_factor, current_load_new15 cal_power_factor, real_time_load
from unify_api.modules.home_page.procedures.count_info_proxy_pds import \ from unify_api.modules.home_page.procedures.count_info_proxy_pds import \
alarm_percentage_count, alarm_safe_power alarm_percentage_count, alarm_safe_power
from unify_api.modules.tsp_water.dao.drop_dust_dao import \ from unify_api.modules.tsp_water.dao.drop_dust_dao import \
...@@ -270,7 +270,7 @@ async def alarm_price_costtl_service(cid): ...@@ -270,7 +270,7 @@ async def alarm_price_costtl_service(cid):
# 2. 实时功率因数, 上月功率因数 # 2. 实时功率因数, 上月功率因数
cos_ttl, last_month_cos = await cal_power_factor(cid) cos_ttl, last_month_cos = await cal_power_factor(cid)
# 3. 实时负荷 # 3. 实时负荷
cur_load = await current_load_new15(cid) cur_load = await real_time_load(cid)
# 4. 平均电价 # 4. 平均电价
# 昨天 # 昨天
yesterday_start, yesterday_end = yesterday_range() yesterday_start, yesterday_end = yesterday_range()
......
...@@ -12,7 +12,7 @@ TSP_15MIN = "poweriot_tsp_15min" ...@@ -12,7 +12,7 @@ TSP_15MIN = "poweriot_tsp_15min"
async def meterdata_tsp_current(tsp_id): async def meterdata_tsp_current(tsp_id):
"""根据tsp_id获取redis实时数据""" """根据tsp_id获取redis实时数据"""
res = await RedisUtils().hget(TSP_CURRENT, tsp_id) res = await RedisUtils().hget(TSP_CURRENT, tsp_id)
res =json.loads(res) if res else {} res = json.loads(res) if res else {}
return res return res
...@@ -76,201 +76,13 @@ async def tsp_histogram_tsp_id(date_start, date_end, tsp_id, interval): ...@@ -76,201 +76,13 @@ async def tsp_histogram_tsp_id(date_start, date_end, tsp_id, interval):
return es_re["aggregations"]["quarter_time"]["buckets"] return es_re["aggregations"]["quarter_time"]["buckets"]
async def tsp_index_statistics(date_start, date_end, tsp_id):
"""TSP信息-指标统计"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"query": {
"bool": {
"must": [
{
"term": {
"tsp_id": tsp_id
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"size": 0,
"aggs": {
"pm25_max": {
"top_hits": {
"sort": [
{
"pm25_max": {
"order": "desc"
}
}
],
"size": 1,
"_source": ["pm25_max", "pm25_max_time"]
}
},
"pm25_min": {
"top_hits": {
"sort": [
{
"pm25_min": {
"order": "asc"
}
}
],
"size": 1,
"_source": ["pm25_min", "pm25_min_time"]
}
},
"pm10_max": {
"top_hits": {
"sort": [
{
"pm10_max": {
"order": "desc"
}
}
],
"size": 1,
"_source": ["pm10_max", "pm10_max_time"]
}
},
"pm10_min": {
"top_hits": {
"sort": [
{
"pm10_min": {
"order": "asc"
}
}
],
"size": 1,
"_source": ["pm10_min", "pm10_min_time"]
}
},
"tsp_max": {
"top_hits": {
"sort": [
{
"tsp_max": {
"order": "desc"
}
}
],
"size": 1,
"_source": ["tsp_max", "tsp_max_time"]
}
},
"tsp_min": {
"top_hits": {
"sort": [
{
"tsp_min": {
"order": "asc"
}
}
],
"size": 1,
"_source": ["tsp_min", "tsp_min_time"]
}
},
"pm25_avg": {
"avg": {
"field": "pm25_max"
}
},
"pm10_avg": {
"avg": {
"field": "pm10_max"
}
},
"tsp_avg": {
"avg": {
"field": "tsp_max"
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
return es_re["aggregations"]
async def tsp_aggs_tsp_id(date_start, date_end, tsp_list):
"""1. 按tsp_id聚合
2. 分别求平均值
"""
start_es = convert_es_str(date_start)
end_es = convert_es_str(date_end)
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"terms": {
"tsp_id": tsp_list
}
},
{
"range": {
"quarter_time": {
"gte": start_es,
"lte": end_es
}
}
}
]
}
},
"aggs": {
"tsps": {
"terms": {
"field": "tsp_id",
"size": 1000
},
"aggs": {
"pm25": {
"avg": {
"field": "pm25_mean"
}
},
"pm10": {
"avg": {
"field": "pm10_mean"
}
},
"tsp": {
"avg": {
"field": "tsp_mean"
}
}
}
}
}
}
log.info(query_body)
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
return es_re["aggregations"]["tsps"]["buckets"]
async def tsp_by_tsp_id_dao(start, end, tsp_list): async def tsp_by_tsp_id_dao(start, end, tsp_list):
sql = f'SELECT tsp_id, ' \ sql = f'SELECT tsp_id, ' \
f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \ f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \
f'FROM `tsp_day_record` where tsp_id in %s and ' \ f'FROM `tsp_day_record` where tsp_id in %s and ' \
f'create_time BETWEEN "{start}" and "{end}" GROUP BY tsp_id ' f'create_time BETWEEN "{start}" and "{end}" GROUP BY tsp_id '
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(tsp_list, )) datas = await conn.fetchall(sql, args=(tsp_list,))
return datas return datas
......
...@@ -2,7 +2,7 @@ from unify_api.constants import SLOTS_15MIN, DUST_STATE ...@@ -2,7 +2,7 @@ from unify_api.constants import SLOTS_15MIN, DUST_STATE
from unify_api.modules.common.dao.common_dao import storey_pl_by_cid, \ from unify_api.modules.common.dao.common_dao import storey_pl_by_cid, \
storey_wp_by_cid storey_wp_by_cid
from unify_api.modules.common.dao.common_dao import \ from unify_api.modules.common.dao.common_dao import \
sql_point_15min_index_new15 load_point_pttl_mean
from unify_api.modules.common.procedures.points import points_by_storeys from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.tsp_water.components.drop_dust_cps import DdwResp, \ from unify_api.modules.tsp_water.components.drop_dust_cps import DdwResp, \
DdResp, IrmResp, IosResp, ItiResp, WsStatiResp DdResp, IrmResp, IosResp, ItiResp, WsStatiResp
...@@ -11,7 +11,7 @@ from unify_api.modules.tsp_water.dao.drop_dust_dao import \ ...@@ -11,7 +11,7 @@ from unify_api.modules.tsp_water.dao.drop_dust_dao import \
dust_water_run_state_by_time, sum_water_runts_group, sum_kwh_runts_group, \ dust_water_run_state_by_time, sum_water_runts_group, sum_kwh_runts_group, \
dust_water_run_day_sum_water, dust_fogcan_run_day_sum_kwh, sum_water_group dust_water_run_day_sum_water, dust_fogcan_run_day_sum_kwh, sum_water_group
from unify_api.modules.tsp_water.service.tsp_service import day_env_service,\ from unify_api.modules.tsp_water.service.tsp_service import day_env_service,\
day_env_service_new15 day_env_service
from unify_api.utils.common_utils import round_2 from unify_api.utils.common_utils import round_2
from unify_api.utils.time_format import srv_time, last7_day_range, \ from unify_api.utils.time_format import srv_time, last7_day_range, \
start_end_date start_end_date
...@@ -108,7 +108,7 @@ async def post_drop_dust_wave_service(point_id, start, end): ...@@ -108,7 +108,7 @@ async def post_drop_dust_wave_service(point_id, start, end):
"""降尘措施-雾炮-运行曲线""" """降尘措施-雾炮-运行曲线"""
# 1. 获取聚合信息 # 1. 获取聚合信息
slots_list = SLOTS_15MIN slots_list = SLOTS_15MIN
sql_re = await sql_point_15min_index_new15(start, end, point_id) sql_re = await load_point_pttl_mean(start, end, point_id)
if not sql_re: if not sql_re:
return DdwResp(slots=[], value=[]) return DdwResp(slots=[], value=[])
es_re_dic = {str(i["create_time"])[-8:-3]: i for i in sql_re} es_re_dic = {str(i["create_time"])[-8:-3]: i for i in sql_re}
...@@ -228,7 +228,7 @@ async def index_today_info_service(cid): ...@@ -228,7 +228,7 @@ async def index_today_info_service(cid):
"""首页-今日数据-扬尘""" """首页-今日数据-扬尘"""
# 1. 环境信息 # 1. 环境信息
# dr = await day_env_service(cid) # dr = await day_env_service(cid)
dr = await day_env_service_new15(cid) dr = await day_env_service(cid)
# 2. 今日用水, 用电 # 2. 今日用水, 用电
today_start, today_end, m_start, m_end = start_end_date() today_start, today_end, m_start, m_end = start_end_date()
start = today_start.split(" ")[0] start = today_start.split(" ")[0]
......
...@@ -12,8 +12,7 @@ from unify_api.modules.common.dao.common_dao import tsp_by_cid, \ ...@@ -12,8 +12,7 @@ from unify_api.modules.common.dao.common_dao import tsp_by_cid, \
from unify_api.modules.tsp_water.components.drop_dust_cps import DtResp, \ from unify_api.modules.tsp_water.components.drop_dust_cps import DtResp, \
ThResp, TisResp, DeResp, SaResp, TcdResp, TpdResp, AdResp ThResp, TisResp, DeResp, SaResp, TcdResp, TpdResp, AdResp
from unify_api.modules.tsp_water.dao.tsp_dao import meterdata_tsp_current, \ from unify_api.modules.tsp_water.dao.tsp_dao import meterdata_tsp_current, \
tsp_histogram_tsp_id, tsp_index_statistics, tsp_aggs_tsp_id, \ tsp_histogram_tsp_id, tsp_by_tsp_id_dao
tsp_by_tsp_id_dao
from unify_api.modules.tsp_water.dao.tsp_map_dao import \ from unify_api.modules.tsp_water.dao.tsp_map_dao import \
get_predict_data_day_dao, get_predict_data_month_dao, get_page_data, \ get_predict_data_day_dao, get_predict_data_month_dao, get_page_data, \
get_contrast_data_day_dao, get_contrast_data_month_dao, get_cid_tsp_dao get_contrast_data_day_dao, get_contrast_data_month_dao, get_cid_tsp_dao
...@@ -24,8 +23,7 @@ from unify_api.modules.tsp_water.procedures.tsp_pds import per_hour_wave, \ ...@@ -24,8 +23,7 @@ from unify_api.modules.tsp_water.procedures.tsp_pds import per_hour_wave, \
per_hour_wave_new15 per_hour_wave_new15
from unify_api.utils import time_format from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2, correlation, round_0 from unify_api.utils.common_utils import round_2, correlation, round_0
from unify_api.utils.es_query_body import es_process from unify_api.utils.time_format import start_end_date
from unify_api.utils.time_format import esstr_to_dthoutstr, start_end_date
async def real_time_service(tsp_id): async def real_time_service(tsp_id):
...@@ -49,7 +47,7 @@ async def tsp_history_service(tsp_id, start, end): ...@@ -49,7 +47,7 @@ async def tsp_history_service(tsp_id, start, end):
interval, slots = time_format.time_pick_transf(start, end) interval, slots = time_format.time_pick_transf(start, end)
# 实时数据 # 实时数据
pm25_list, pm10_list, tsp_list = await \ pm25_list, pm10_list, tsp_list = await \
get_data_new15(tsp_id, start, end, slots, interval) get_tsp_data(tsp_id, start, end, slots, interval)
# 预测数据 # 预测数据
pm25_predict, pm10_predict, tsp_predict, _ = \ pm25_predict, pm10_predict, tsp_predict, _ = \
await get_predict_data(tsp_id, start, end, slots) await get_predict_data(tsp_id, start, end, slots)
...@@ -69,44 +67,7 @@ async def tsp_history_service(tsp_id, start, end): ...@@ -69,44 +67,7 @@ async def tsp_history_service(tsp_id, start, end):
) )
# tsp实时数据 async def get_tsp_data(tsp_id, start, end, slots, interval):
async def get_data(tsp_id, start, end, slots, interval):
"""TSP信息-历史曲线"""
# 1. 查询es
if interval == 24 * 3600:
interval = "day"
fmt = "MM-DD"
elif interval == 15 * 60:
interval = "15m"
fmt = "HH:mm"
else:
raise BusinessException(message="time range not day or month")
es_res = await tsp_histogram_tsp_id(start, end, tsp_id, interval)
if not es_res:
return ThResp(pm2_5={"threshold": PM2_5, "value_slots": []},
pm10={"threshold": PM10, "value_slots": []},
tsp={"threshold": TSP, "value_slots": []},
time_slots=[]
)
es_dic = es_process(es_res, fmat=fmt)
# 2. 组装数据
pm25_list = []
pm10_list = []
tsp_list = []
for slot in slots:
if slot in es_dic:
pm25_value = round_0(es_dic[slot]["pm25"].get("avg"))
pm10_value = round_0(es_dic[slot]["pm10"].get("avg"))
tsp_value = round_0(es_dic[slot]["tsp"].get("avg"))
else:
pm25_value, pm10_value, tsp_value = "", "", ""
pm25_list.append(pm25_value)
pm10_list.append(pm10_value)
tsp_list.append(tsp_value)
return pm25_list, pm10_list, tsp_list
async def get_data_new15(tsp_id, start, end, slots, interval):
if interval == 24 * 3600: if interval == 24 * 3600:
sql = f'SELECT DATE_FORMAT(create_time,"%m-%d") date_time, ' \ sql = f'SELECT DATE_FORMAT(create_time,"%m-%d") date_time, ' \
f'AVG(pm25_max) pm25,AVG(pm25_max) pm10,AVG(tsp_max) tsp ' \ f'AVG(pm25_max) pm25,AVG(pm25_max) pm10,AVG(tsp_max) tsp ' \
...@@ -210,7 +171,7 @@ async def get_contrast_data(tsp_id, start, end, slots): ...@@ -210,7 +171,7 @@ async def get_contrast_data(tsp_id, start, end, slots):
async def tsp_predict_deviation_service(tsp_id, start, end): async def tsp_predict_deviation_service(tsp_id, start, end):
interval, slots = time_format.time_pick_transf(start, end) interval, slots = time_format.time_pick_transf(start, end)
# 实时数据 # 实时数据
pm25, pm10, tsp = await get_data_new15(tsp_id, start, end, slots, interval) pm25, pm10, tsp = await get_tsp_data(tsp_id, start, end, slots, interval)
# 预测数据 # 预测数据
pm25_predict, pm10_predict, tsp_predict, date_predict = \ pm25_predict, pm10_predict, tsp_predict, date_predict = \
await get_predict_data(tsp_id, start, end, slots) await get_predict_data(tsp_id, start, end, slots)
...@@ -220,24 +181,25 @@ async def tsp_predict_deviation_service(tsp_id, start, end): ...@@ -220,24 +181,25 @@ async def tsp_predict_deviation_service(tsp_id, start, end):
if value and pm25[index]: if value and pm25[index]:
pm25_time.append(date_predict[index]) pm25_time.append(date_predict[index])
pm25_list.append( pm25_list.append(
round(math.fabs((pm25[index]-value)/pm25[index]), 3)) round(math.fabs((pm25[index] - value) / pm25[index]), 3))
for index, value in enumerate(pm10_predict): for index, value in enumerate(pm10_predict):
if value and pm10[index]: if value and pm10[index]:
pm10_time.append(date_predict[index]) pm10_time.append(date_predict[index])
pm10_list.append( pm10_list.append(
round(math.fabs((pm10[index]-value)/pm10[index]), 3)) round(math.fabs((pm10[index] - value) / pm10[index]), 3))
for index, value in enumerate(tsp_predict): for index, value in enumerate(tsp_predict):
if value and tsp[index]: if value and tsp[index]:
tsp_time.append(date_predict[index]) tsp_time.append(date_predict[index])
tsp_list.append(round(math.fabs((tsp[index]-value)/tsp[index]), 3)) tsp_list.append(
round(math.fabs((tsp[index] - value) / tsp[index]), 3))
pm25_max, pm25_min, pm25_avg = "", "", "" pm25_max, pm25_min, pm25_avg = "", "", ""
if pm25_list: if pm25_list:
pm25_max, pm25_min = max(pm25_list), min(pm25_list) pm25_max, pm25_min = max(pm25_list), min(pm25_list)
pm25_avg = round(sum(pm25_list)/len(pm25_list), 3) pm25_avg = round(sum(pm25_list) / len(pm25_list), 3)
pm10_max, pm10_min, pm10_avg = "", "", "" pm10_max, pm10_min, pm10_avg = "", "", ""
if pm10_list: if pm10_list:
pm10_max, pm10_min = max(pm10_list), min(pm10_list) pm10_max, pm10_min = max(pm10_list), min(pm10_list)
pm10_avg = round(sum(pm10_list)/len(pm10_list), 3) pm10_avg = round(sum(pm10_list) / len(pm10_list), 3)
tsp_max, tsp_min, tsp_avg = "", "", "" tsp_max, tsp_min, tsp_avg = "", "", ""
if tsp_list: if tsp_list:
tsp_max, tsp_min = max(tsp_list), min(tsp_list) tsp_max, tsp_min = max(tsp_list), min(tsp_list)
...@@ -245,12 +207,16 @@ async def tsp_predict_deviation_service(tsp_id, start, end): ...@@ -245,12 +207,16 @@ async def tsp_predict_deviation_service(tsp_id, start, end):
return TpdResp(pm2_5={ return TpdResp(pm2_5={
"max": pm25_max, "min": pm25_min, "avg": pm25_avg, "max": pm25_max, "min": pm25_min, "avg": pm25_avg,
"max_time": pm25_time[pm25_list.index(pm25_max)] if pm25_max != "" else "", "max_time": pm25_time[
"min_time": pm25_time[pm25_list.index(pm25_min)] if pm25_min != "" else "" pm25_list.index(pm25_max)] if pm25_max != "" else "",
"min_time": pm25_time[
pm25_list.index(pm25_min)] if pm25_min != "" else ""
}, pm10={ }, pm10={
"max": pm10_max, "min": pm10_min, "avg": pm10_avg, "max": pm10_max, "min": pm10_min, "avg": pm10_avg,
"max_time": pm10_time[pm10_list.index(pm10_max)] if pm10_max != "" else "", "max_time": pm10_time[
"min_time": pm10_time[pm10_list.index(pm10_min)] if pm10_min != "" else "", pm10_list.index(pm10_max)] if pm10_max != "" else "",
"min_time": pm10_time[
pm10_list.index(pm10_min)] if pm10_min != "" else "",
}, tsp={ }, tsp={
"max": tsp_max, "min": tsp_min, "avg": tsp_avg, "max": tsp_max, "min": tsp_min, "avg": tsp_avg,
"max_time": tsp_time[tsp_list.index(tsp_max)] if tsp_max != "" else "", "max_time": tsp_time[tsp_list.index(tsp_max)] if tsp_max != "" else "",
...@@ -262,7 +228,7 @@ async def tsp_predict_deviation_service(tsp_id, start, end): ...@@ -262,7 +228,7 @@ async def tsp_predict_deviation_service(tsp_id, start, end):
async def tsp_contrast_deviation_service(tsp_id, start, end): async def tsp_contrast_deviation_service(tsp_id, start, end):
interval, slots = time_format.time_pick_transf(start, end) interval, slots = time_format.time_pick_transf(start, end)
# 实时数据 # 实时数据
pm25, pm10, tsp = await get_data_new15(tsp_id, start, end, slots, interval) pm25, pm10, tsp = await get_tsp_data(tsp_id, start, end, slots, interval)
# 对比数据 # 对比数据
pm25_contrast, pm10_contrast, date_contrast = \ pm25_contrast, pm10_contrast, date_contrast = \
await get_contrast_data(tsp_id, start, end, slots) await get_contrast_data(tsp_id, start, end, slots)
...@@ -288,112 +254,20 @@ async def tsp_contrast_deviation_service(tsp_id, start, end): ...@@ -288,112 +254,20 @@ async def tsp_contrast_deviation_service(tsp_id, start, end):
pm10_avg = round(sum(pm10_list) / len(pm10_list), 3) pm10_avg = round(sum(pm10_list) / len(pm10_list), 3)
return TcdResp(pm2_5={ return TcdResp(pm2_5={
"max": pm25_max, "min": pm25_min, "avg": pm25_avg, "max": pm25_max, "min": pm25_min, "avg": pm25_avg,
"max_time": pm25_time[pm25_list.index(pm25_max)] if pm25_max != "" else "", "max_time": pm25_time[
"min_time": pm25_time[pm25_list.index(pm25_min)] if pm25_min != "" else "" pm25_list.index(pm25_max)] if pm25_max != "" else "",
"min_time": pm25_time[
pm25_list.index(pm25_min)] if pm25_min != "" else ""
}, pm10={ }, pm10={
"max": pm10_max, "min": pm10_min, "avg": pm10_avg, "max": pm10_max, "min": pm10_min, "avg": pm10_avg,
"max_time": pm10_time[pm10_list.index(pm10_max)] if pm10_max != "" else "", "max_time": pm10_time[
"min_time": pm10_time[pm10_list.index(pm10_min)] if pm10_min != "" else "", pm10_list.index(pm10_max)] if pm10_max != "" else "",
"min_time": pm10_time[
pm10_list.index(pm10_min)] if pm10_min != "" else "",
}) })
async def tsp_index_statistics_service(tsp_id, start, end): async def tsp_index_statistics_service(tsp_id, start, end):
"""TSP信息-指标统计"""
# 1. 查询es
es_res = await tsp_index_statistics(start, end, tsp_id)
if not es_res:
return TisResp()
# 2.1 pm25
# max
pm25_max = ""
pm25_max_time = ""
pm_25_max_hits = es_res["pm25_max"]["hits"]["hits"]
if pm_25_max_hits:
pm25_max = round_2(pm_25_max_hits[0]["_source"].get("pm25_max"))
pm25_max_time = esstr_to_dthoutstr(
pm_25_max_hits[0]["_source"].get("pm25_max_time"),
format="%Y-%m-%d %H:%M:%S")
# min
pm25_min = ""
pm25_min_time = ""
pm_25_min_hits = es_res["pm25_min"]["hits"]["hits"]
if pm_25_min_hits:
pm25_min = round_2(pm_25_min_hits[0]["_source"].get("pm25_min"))
pm25_min_time = esstr_to_dthoutstr(
pm_25_min_hits[0]["_source"].get("pm25_min_time"),
format="%Y-%m-%d %H:%M:%S")
# avg
pm25_avg = ""
if es_res["pm25_avg"].get("value"):
pm25_avg = round(es_res["pm25_avg"].get("value"))
# 2.2 pm10
# max
pm10_max = ""
pm10_max_time = ""
pm_10_max_hits = es_res["pm10_max"]["hits"]["hits"]
if pm_10_max_hits:
pm10_max = round_2(pm_10_max_hits[0]["_source"].get("pm10_max"))
pm10_max_time = esstr_to_dthoutstr(
pm_10_max_hits[0]["_source"].get("pm10_max_time"),
format="%Y-%m-%d %H:%M:%S")
# min
pm10_min = ""
pm10_min_time = ""
pm_10_min_hits = es_res["pm10_min"]["hits"]["hits"]
if pm_10_min_hits:
pm10_min = round_2(pm_10_min_hits[0]["_source"].get("pm10_min"))
pm10_min_time = esstr_to_dthoutstr(
pm_10_min_hits[0]["_source"].get("pm10_min_time"),
format="%Y-%m-%d %H:%M:%S")
# avg
pm10_avg = ""
if es_res["pm10_avg"].get("value"):
pm10_avg = round(es_res["pm10_avg"].get("value"))
# 2.3 tsp
# max
tsp_max = ""
tsp_max_time = ""
tsp_max_hits = es_res["tsp_max"]["hits"]["hits"]
if tsp_max_hits:
tsp_max = round_2(tsp_max_hits[0]["_source"].get("tsp_max"))
tsp_max_time = esstr_to_dthoutstr(
tsp_max_hits[0]["_source"].get("tsp_max_time"),
format="%Y-%m-%d %H:%M:%S")
# min
tsp_min = ""
tsp_min_time = ""
tsp_min_hits = es_res["tsp_min"]["hits"]["hits"]
if tsp_min_hits:
tsp_min = round_2(tsp_min_hits[0]["_source"].get("tsp_min"))
tsp_min_time = esstr_to_dthoutstr(
tsp_min_hits[0]["_source"].get("tsp_min_time"),
format="%Y-%m-%d %H:%M:%S")
# avg
tsp_avg = ""
if es_res["tsp_avg"].get("value"):
tsp_avg = round(es_res["tsp_avg"].get("value"))
return TisResp(pm2_5={"max": pm25_max,
"max_time": pm25_max_time,
"min": pm25_min,
"min_time": pm25_min_time,
"avg": pm25_avg},
pm10={"max": pm10_max,
"max_time": pm10_max_time,
"min": pm10_min,
"min_time": pm10_min_time,
"avg": pm10_avg},
tsp={"max": tsp_max,
"max_time": tsp_max_time,
"min": tsp_min,
"min_time": tsp_min_time,
"avg": tsp_avg},
)
async def tsp_index_statistics_service_new15(tsp_id, start, end):
now = str(datetime.datetime.now()) now = str(datetime.datetime.now())
if start[:10] == now[:10] and end[:10] == now[:10]: if start[:10] == now[:10] and end[:10] == now[:10]:
table_name = "tsp_15min_record" table_name = "tsp_15min_record"
...@@ -405,7 +279,7 @@ async def tsp_index_statistics_service_new15(tsp_id, start, end): ...@@ -405,7 +279,7 @@ async def tsp_index_statistics_service_new15(tsp_id, start, end):
f" FROM {table_name} where tsp_id=%s and create_time " \ f" FROM {table_name} where tsp_id=%s and create_time " \
f"BETWEEN '{start}' and '{end}' ORDER BY create_time" f"BETWEEN '{start}' and '{end}' ORDER BY create_time"
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(tsp_id, )) datas = await conn.fetchall(sql, args=(tsp_id,))
if not datas: if not datas:
return TisResp() return TisResp()
df = pd.DataFrame(list(datas)) df = pd.DataFrame(list(datas))
...@@ -457,48 +331,6 @@ def get_max_min_time(df, max_value, name): ...@@ -457,48 +331,6 @@ def get_max_min_time(df, max_value, name):
async def day_env_service(cid): async def day_env_service(cid):
"""当日环境"""
# 需求逻辑
# 求每个tsp装置pm2.5,pm10,tsp的平均值
# 取平均值高的pm2.5,pm10,tsp
today_start, today_end, m_start, m_end = start_end_date()
# 1. 根据cid取tsp_id_list
tsp_list = await tsp_by_cid(cid)
tsp_id_list = [i["tsp_id"] for i in tsp_list]
# 2. 取es数据
es_res = await tsp_aggs_tsp_id(today_start, today_end, tsp_id_list)
if not es_res:
return DeResp(pm2_5={"data": "", "grade": ""},
pm10={"data": "", "grade": ""},
tsp={"data": "", "grade": ""})
pm2_5_max = 0
pm10_max = 0
tsp_max = 0
for info in es_res:
pm2_5 = round(info["pm25"]["value"])
if pm2_5 > pm2_5_max:
pm2_5_max = pm2_5
pm10 = round(info["pm10"]["value"])
if pm10 > pm10_max:
pm10_max = pm10
tsp = round(info["tsp"]["value"])
if tsp > tsp_max:
tsp_max = tsp
# 调用函数,获取等级
pm2_5_grade = pm2_5_trans_grade(pm2_5_max)
pm10_grade = pm10_trans_grade(pm10_max)
tsp_grade = tsp_trans_grade(tsp_max)
# 3. 返回
return DeResp(
pm2_5={"data": pm2_5_max, "grade": pm2_5_grade},
pm10={"data": pm10_max, "grade": pm10_grade},
tsp={"data": tsp_max, "grade": tsp_grade}
)
async def day_env_service_new15(cid):
"""当日环境""" """当日环境"""
# 需求逻辑 # 需求逻辑
# 求每个tsp装置pm2.5,pm10,tsp的平均值 # 求每个tsp装置pm2.5,pm10,tsp的平均值
...@@ -589,7 +421,7 @@ async def analysis_describe_service(cid, start, end, page_num, page_size, ...@@ -589,7 +421,7 @@ async def analysis_describe_service(cid, start, end, page_num, page_size,
"is_effective": page["is_valid"], "is_effective": page["is_valid"],
"message": page["effect"] "message": page["effect"]
}) })
effective_rate = f"{round(data['effect']/data['measures'],2)*100}%" \ effective_rate = f"{round(data['effect'] / data['measures'], 2) * 100}%" \
if data['measures'] else 0 if data['measures'] else 0
return AdResp( return AdResp(
all_count=data["measures"] or 0, all_count=data["measures"] or 0,
......
...@@ -5,8 +5,7 @@ from unify_api.modules.tsp_water.components.drop_dust_cps import DtReq, \ ...@@ -5,8 +5,7 @@ from unify_api.modules.tsp_water.components.drop_dust_cps import DtReq, \
from unify_api.modules.tsp_water.service.tsp_service import \ from unify_api.modules.tsp_water.service.tsp_service import \
real_time_service, tsp_history_service, tsp_index_statistics_service, \ real_time_service, tsp_history_service, tsp_index_statistics_service, \
day_env_service, stat_analysis_service, tsp_predict_deviation_service, \ day_env_service, stat_analysis_service, tsp_predict_deviation_service, \
tsp_contrast_deviation_service, analysis_describe_service, \ tsp_contrast_deviation_service, analysis_describe_service
day_env_service_new15, tsp_index_statistics_service_new15
@summary("TSP信息-实时参数") @summary("TSP信息-实时参数")
...@@ -62,14 +61,13 @@ async def post_tsp_index_statistics(req, body: TisReq) -> TisResp: ...@@ -62,14 +61,13 @@ async def post_tsp_index_statistics(req, body: TisReq) -> TisResp:
start = body.start start = body.start
end = body.end end = body.end
# return await tsp_index_statistics_service(tsp_id, start, end) # return await tsp_index_statistics_service(tsp_id, start, end)
# return await tsp_index_statistics_service_new15(tsp_id, start, end)
return TisResp(pm2_5={}, pm10={}, tsp={}) return TisResp(pm2_5={}, pm10={}, tsp={})
@summary("当日环境") @summary("当日环境")
async def post_day_env(req, body: DeReq) -> DeResp: async def post_day_env(req, body: DeReq) -> DeResp:
cid = body.cid cid = body.cid
return await day_env_service_new15(cid) return await day_env_service(cid)
@summary("统计分析-扬尘") @summary("统计分析-扬尘")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment