import datetime import time import math import pandas as pd from pot_libs.utils.exc_util import BusinessException from unify_api import constants from pot_libs.mysql_util.mysql_util import MysqlUtil from unify_api.constants import PM2_5, PM10, TSP, SLOTS from unify_api.utils.common_utils import round_2n from unify_api.modules.common.dao.common_dao import tsp_by_cid, \ storey_wp_by_cid, storey_pl_by_cid from unify_api.modules.tsp_water.components.drop_dust_cps import DtResp, \ ThResp, TisResp, DeResp, SaResp, TcdResp, TpdResp, AdResp from unify_api.modules.tsp_water.dao.tsp_dao import meterdata_tsp_current, \ tsp_histogram_tsp_id, tsp_by_tsp_id_dao from unify_api.modules.tsp_water.dao.tsp_map_dao import \ get_predict_data_day_dao, get_predict_data_month_dao, get_page_data, \ get_contrast_data_day_dao, get_contrast_data_month_dao, get_cid_tsp_dao from unify_api.modules.tsp_water.procedures.drop_dust_pds import \ pm2_5_trans_grade, pm10_trans_grade, tsp_trans_grade from unify_api.modules.tsp_water.procedures.tsp_pds import \ per_hour_water_wave, per_hour_kwh_wave, per_hour_wave from unify_api.utils import time_format from unify_api.utils.common_utils import round_2, correlation, round_0 from unify_api.utils.time_format import start_end_date async def real_time_service(tsp_id): """TSP信息-实时参数""" # 1.根据tsp_id获取redis实时数据 tsp_dic = await meterdata_tsp_current(tsp_id) if not tsp_dic: raise BusinessException(message=f"tsp_id: {tsp_id} no redis data") # 2. 判断数据是否在4h之内 now_ts = int(time.time()) tsp_ts = tsp_dic["timestamp"] if now_ts - tsp_ts > constants.REAL_EXP_TIME: return DtResp() pm2_5 = tsp_dic.get("pm25", "") pm10 = tsp_dic.get("pm10", "") tsp = tsp_dic.get("tsp", "") return DtResp(pm2_5=pm2_5, pm10=pm10, tsp=tsp) async def tsp_history_service(tsp_id, start, end): interval, slots = time_format.time_pick_transf(start, end) # 实时数据 pm25_list, pm10_list, tsp_list = await \ get_tsp_data(tsp_id, start, end, slots, interval) # 预测数据 pm25_predict, pm10_predict, tsp_predict, _ = \ await get_predict_data(tsp_id, start, end, slots) # 对比数据 pm25_contrast, pm10_contrast, _ = \ await get_contrast_data(tsp_id, start, end, slots) return ThResp( pm2_5={"threshold": PM2_5, "value_slots": pm25_list}, pm10={"threshold": PM10, "value_slots": pm10_list}, tsp={"threshold": TSP, "value_slots": tsp_list}, time_slots=slots, pm2_5_predict={"value_slots": pm25_predict}, pm10_predict={"value_slots": pm10_predict}, tsp_predict={"value_slots": tsp_predict}, pm2_5_contrast={"value_slots": pm25_contrast}, pm10_contrast={"value_slots": pm10_contrast}, ) async def get_tsp_data(tsp_id, start, end, slots, interval): if interval == 24 * 3600: sql = f'SELECT DATE_FORMAT(create_time,"%m-%d") date_time, ' \ f'AVG(pm25_max) pm25,AVG(pm25_max) pm10,AVG(tsp_max) tsp ' \ f'FROM `tsp_day_record` where tsp_id={tsp_id} and ' \ f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time' \ f' ORDER BY date_time' elif interval == 15 * 60: sql = f'SELECT DATE_FORMAT(create_time,"%H:00") date_time, ' \ f'AVG(pm25_max) pm25,AVG(pm25_max) pm10,AVG(tsp_max) tsp ' \ f'FROM `tsp_15min_record` where tsp_id={tsp_id} and ' \ f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time' \ f' ORDER BY date_time' else: raise BusinessException(message="time range not day or month") async with MysqlUtil() as conn: datas = await conn.fetchall(sql) datas_map = {data["date_time"]: data for data in datas} # 2. 组装数据 pm25_list = [] pm10_list = [] tsp_list = [] for slot in slots: slot_data = datas_map.get(slot) if slot_data: pm25_value = round_2n(slot_data.get("pm25")) pm10_value = round_2n(slot_data.get("pm10")) tsp_value = round_2n(slot_data.get("tsp")) else: pm25_value, pm10_value, tsp_value = None, None, None pm25_list.append(pm25_value) pm10_list.append(pm10_value) tsp_list.append(tsp_value) return pm25_list, pm10_list, tsp_list # tsp预测数据 async def get_predict_data(tsp_id, start, end, slots): start_f = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S") end_f = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S") if start_f.day == end_f.day: # 返回当天数据 15min数据 predict_data = await get_predict_data_day_dao(tsp_id, start_f, end_f) predict_slots = ["%02d:%02d" % (data["quarter_time"].hour, data["quarter_time"].minute) for data in predict_data] date_predict = [data["quarter_time"].strftime("%Y-%m-%d %H:%M:%S") for data in predict_data] else: # 返回月份数据 每天数据 predict_data = await get_predict_data_month_dao(tsp_id, start_f, end_f) predict_slots = [data["quarter_time"][5:] for data in predict_data] date_predict = [data["quarter_time"] for data in predict_data] pm25_predict = [round(data["pm25"]) for data in predict_data] pm10_predict = [round(data["pm10"]) for data in predict_data] tsp_predict = [round(data["tsp"]) for data in predict_data] # 针对如果缺少数据处理,基本不会执行 if len(predict_data) != len(slots): # 缺少时刻的时间轴 lack_slots = list(set(slots) - set(predict_slots)) for slot in lack_slots: index = slots.index(slot) pm25_predict.insert(index, "") pm10_predict.insert(index, "") tsp_predict.insert(index, "") date_predict.insert(index, "") return pm25_predict, pm10_predict, tsp_predict, date_predict # 对比预测数据 async def get_contrast_data(tsp_id, start, end, slots): beg_f = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S") end_f = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S") if beg_f.day == end_f.day: # 返回当天数据 15min数据 contrast_data = await get_contrast_data_day_dao(tsp_id, beg_f, end_f) contrast_slots = ["%02d:%02d" % (data["quarter_time"].hour, data["quarter_time"].minute) for data in contrast_data] date_contrast = [data["quarter_time"].strftime("%Y-%m-%d %H:%M:%S") for data in contrast_data] else: # 返回月份数据 每天数据 contrast_data = await get_contrast_data_month_dao(tsp_id, beg_f, end_f) contrast_slots = [data["quarter_time"][5:] for data in contrast_data] date_contrast = [data["quarter_time"] for data in contrast_data] pm25_contrast = [round(data["pm25"]) for data in contrast_data] pm10_contrast = [round(data["pm10"]) for data in contrast_data] # 针对如果缺少数据处理,基本不会执行 if len(contrast_data) != len(slots): # 缺少时刻的时间轴 lack_slots = list(set(slots) - set(contrast_slots)) for slot in lack_slots: index = slots.index(slot) pm25_contrast.insert(index, "") pm10_contrast.insert(index, "") date_contrast.insert(index, "") return pm25_contrast, pm10_contrast, date_contrast # 预测偏差 async def tsp_predict_deviation_service(tsp_id, start, end): interval, slots = time_format.time_pick_transf(start, end) # 实时数据 pm25, pm10, tsp = await get_tsp_data(tsp_id, start, end, slots, interval) # 预测数据 pm25_predict, pm10_predict, tsp_predict, date_predict = \ await get_predict_data(tsp_id, start, end, slots) pm25_list, pm10_list, tsp_list = [], [], [] pm25_time, pm10_time, tsp_time = [], [], [] for index, value in enumerate(pm25_predict): if value and pm25[index]: pm25_time.append(date_predict[index]) pm25_list.append( round(math.fabs((pm25[index] - value) / pm25[index]), 3)) for index, value in enumerate(pm10_predict): if value and pm10[index]: pm10_time.append(date_predict[index]) pm10_list.append( round(math.fabs((pm10[index] - value) / pm10[index]), 3)) for index, value in enumerate(tsp_predict): if value and tsp[index]: tsp_time.append(date_predict[index]) tsp_list.append( round(math.fabs((tsp[index] - value) / tsp[index]), 3)) pm25_max, pm25_min, pm25_avg = "", "", "" if pm25_list: pm25_max, pm25_min = max(pm25_list), min(pm25_list) pm25_avg = round(sum(pm25_list) / len(pm25_list), 3) pm10_max, pm10_min, pm10_avg = "", "", "" if pm10_list: pm10_max, pm10_min = max(pm10_list), min(pm10_list) pm10_avg = round(sum(pm10_list) / len(pm10_list), 3) tsp_max, tsp_min, tsp_avg = "", "", "" if tsp_list: tsp_max, tsp_min = max(tsp_list), min(tsp_list) tsp_avg = round(sum(tsp_list) / len(tsp_list), 3) return TpdResp(pm2_5={ "max": pm25_max, "min": pm25_min, "avg": pm25_avg, "max_time": pm25_time[ pm25_list.index(pm25_max)] if pm25_max != "" else "", "min_time": pm25_time[ pm25_list.index(pm25_min)] if pm25_min != "" else "" }, pm10={ "max": pm10_max, "min": pm10_min, "avg": pm10_avg, "max_time": pm10_time[ pm10_list.index(pm10_max)] if pm10_max != "" else "", "min_time": pm10_time[ pm10_list.index(pm10_min)] if pm10_min != "" else "", }, tsp={ "max": tsp_max, "min": tsp_min, "avg": tsp_avg, "max_time": tsp_time[tsp_list.index(tsp_max)] if tsp_max != "" else "", "min_time": tsp_time[tsp_list.index(tsp_min)] if tsp_min != "" else "" }) # 对比偏差 async def tsp_contrast_deviation_service(tsp_id, start, end): interval, slots = time_format.time_pick_transf(start, end) # 实时数据 pm25, pm10, tsp = await get_tsp_data(tsp_id, start, end, slots, interval) # 对比数据 pm25_contrast, pm10_contrast, date_contrast = \ await get_contrast_data(tsp_id, start, end, slots) pm25_list, pm10_list = [], [] pm25_time, pm10_time = [], [] for index, value in enumerate(pm25_contrast): if value and pm25[index]: pm25_time.append(date_contrast[index]) pm25_list.append( round(math.fabs((pm25[index] - value) / pm25[index]), 3)) for index, value in enumerate(pm10_contrast): if value and pm10[index]: pm10_time.append(date_contrast[index]) pm10_list.append( round(math.fabs((pm10[index] - value) / pm10[index]), 3)) pm25_max, pm25_min, pm25_avg = "", "", "" if pm25_list: pm25_max, pm25_min = max(pm25_list), min(pm25_list) pm25_avg = round(sum(pm25_list) / len(pm25_list), 3) pm10_max, pm10_min, pm10_avg = "", "", "" if pm10_list: pm10_max, pm10_min = max(pm10_list), min(pm10_list) pm10_avg = round(sum(pm10_list) / len(pm10_list), 3) return TcdResp(pm2_5={ "max": pm25_max, "min": pm25_min, "avg": pm25_avg, "max_time": pm25_time[ pm25_list.index(pm25_max)] if pm25_max != "" else "", "min_time": pm25_time[ pm25_list.index(pm25_min)] if pm25_min != "" else "" }, pm10={ "max": pm10_max, "min": pm10_min, "avg": pm10_avg, "max_time": pm10_time[ pm10_list.index(pm10_max)] if pm10_max != "" else "", "min_time": pm10_time[ pm10_list.index(pm10_min)] if pm10_min != "" else "", }) async def tsp_index_statistics_service(tsp_id, start, end): now = str(datetime.datetime.now()) if start[:10] == now[:10] and end[:10] == now[:10]: table_name = "tsp_15min_record" else: table_name = "tsp_day_record" sql = f"SELECT pm25_max,pm25_max_time,pm25_min,pm25_min_time," \ f"pm10_max,pm10_max_time,pm10_min,pm10_min_time," \ f"tsp_max,tsp_max_time,tsp_min,tsp_min_time" \ f" FROM {table_name} where tsp_id=%s and create_time " \ f"BETWEEN '{start}' and '{end}' ORDER BY create_time" async with MysqlUtil() as conn: datas = await conn.fetchall(sql, args=(tsp_id,)) if not datas: return TisResp() df = pd.DataFrame(list(datas)) pm25_max = df["pm25_max"].max() pm25_max, pm25_max_time = get_max_min_time(df, pm25_max, "pm25_max") pm25_min = df["pm25_min"].min() pm25_min, pm25_min_time = get_max_min_time(df, pm25_min, "pm25_min") pm10_max = df["pm10_max"].max() pm10_max, pm10_max_time = get_max_min_time(df, pm10_max, "pm10_max") pm10_min = df["pm10_min"].min() pm10_min, pm10_min_time = get_max_min_time(df, pm10_min, "pm10_min") tsp_max = df["tsp_max"].max() tsp_max, tsp_max_time = get_max_min_time(df, tsp_max, "tsp_max") tsp_min = df["tsp_min"].min() tsp_min, tsp_min_time = get_max_min_time(df, tsp_min, "tsp_min") pm25_avg_value = df["pm25_max"].mean() pm25_avg_value = round(pm25_avg_value, 2) if pm25_avg_value else "" pm10_avg_value = df["pm10_max"].mean() pm10_avg_value = round(pm10_avg_value, 2) if pm10_avg_value else "" tsp_avg_value = df["tsp_max"].mean() tsp_avg_value = round(tsp_avg_value, 2) if tsp_avg_value else "" return TisResp(pm2_5={"max": pm25_max, "max_time": pm25_max_time, "min": pm25_min, "min_time": pm25_min_time, "avg": pm25_avg_value}, pm10={"max": pm10_max, "max_time": pm10_max_time, "min": pm10_min, "min_time": pm10_min_time, "avg": pm10_avg_value}, tsp={"max": tsp_max, "max_time": tsp_max_time, "min": tsp_min, "min_time": tsp_min_time, "avg": tsp_avg_value}, ) def get_max_min_time(df, max_value, name): if not pd.isna(max_value): max_datas = df.loc[df[name].idxmax()].to_dict() max_time = max_datas.get(f"{name}_time") max_time = '' if pd.isnull(max_time) else str(max_time) max_value = round_2(max_value) else: max_value, max_time = "", "" return max_value, max_time async def day_env_service(cid): """当日环境""" # 需求逻辑 # 求每个tsp装置pm2.5,pm10,tsp的平均值 # 取平均值高的pm2.5,pm10,tsp today_start, today_end, m_start, m_end = start_end_date() # 1. 根据cid取tsp_id_list tsp_list = await tsp_by_cid(cid) tsp_id_list = [i["tsp_id"] for i in tsp_list] sql_res = await tsp_by_tsp_id_dao(today_start, today_end, tsp_id_list) if not sql_res: return DeResp(pm2_5={"data": "", "grade": ""}, pm10={"data": "", "grade": ""}, tsp={"data": "", "grade": ""}) pm2_5_max = 0 pm10_max = 0 tsp_max = 0 for info in sql_res: pm2_5 = round_2(info["pm25"]) if info["pm25"] else 0 if pm2_5 > pm2_5_max: pm2_5_max = pm2_5 pm10 = round_2(info["pm10"]) if info["pm10"] else 0 if pm10 > pm10_max: pm10_max = pm10 tsp = round_2(info["tsp"]) if info["tsp"] else 0 if tsp > tsp_max: tsp_max = tsp # 调用函数,获取等级 pm2_5_grade = pm2_5_trans_grade(pm2_5_max) pm10_grade = pm10_trans_grade(pm10_max) tsp_grade = tsp_trans_grade(tsp_max) # 3. 返回 return DeResp( pm2_5={"data": pm2_5_max, "grade": pm2_5_grade}, pm10={"data": pm10_max, "grade": pm10_grade}, tsp={"data": tsp_max, "grade": tsp_grade} ) async def stat_analysis_service(cid, tsp_id, start, end): """统计分析-扬尘""" # 1. 查询es, 获取tsp信息 pm25_list, pm10_list, tsp_list, slots = await per_hour_wave( start, end, tsp_id) # 2. 获取雾炮电量数据 storey_list = await storey_wp_by_cid(cid) point_list = [storey["point_id"] for storey in storey_list] kwh_res, slots = await per_hour_kwh_wave(start, end, point_list) r_gun_pm25_value, r_gun_pm25_info = correlation(kwh_res, pm25_list) r_gun_pm10_value, r_gun_pm10_info = correlation(kwh_res, pm10_list) r_gun_tsp_value, r_gun_tsp_info = correlation(kwh_res, tsp_list) # 3. 获取喷淋水量数据 water_res = await per_hour_water_wave(start, end) r_water_pm25_value, r_water_pm25_info = correlation(water_res, pm25_list) r_water_pm10_value, r_water_pm10_info = correlation(water_res, pm10_list) r_water_tsp_value, r_water_tsp_info = correlation(water_res, tsp_list) return SaResp( pm2_5=pm25_list, pm10=pm10_list, tsp=tsp_list, time_slots=slots, fog_gun=kwh_res, water=water_res, r_gun_pm25={"r": r_gun_pm25_value, "name": r_gun_pm25_info}, r_gun_pm10={"r": r_gun_pm10_value, "name": r_gun_pm10_info}, r_gun_tsp={"r": r_gun_tsp_value, "name": r_gun_tsp_info}, r_water_pm25={"r": r_water_pm25_value, "name": r_water_pm25_info}, r_water_pm10={"r": r_water_pm10_value, "name": r_water_pm10_info}, r_water_tsp={"r": r_water_tsp_value, "name": r_water_tsp_info}, ) async def analysis_describe_service(cid, start, end, page_num, page_size, measure_type): data = await get_cid_tsp_dao(cid, start, end, measure_type) page_date = await get_page_data(cid, start, end, page_num, page_size, measure_type) page_list = [] for page in page_date: start_datetime = page["start_datetime"].strftime("%Y-%m-%d %H:%M:%S") end_datetime = page["end_datetime"].strftime("%Y-%m-%d %H:%M:%S") page_list.append({ "datetime": f"{start_datetime[:16]}-{end_datetime[11:16]}", "effective": page["measure_msg"], "is_effective": page["is_valid"], "message": page["effect"] }) effective_rate = f"{round(data['effect'] / data['measures'], 2) * 100}%" \ if data['measures'] else 0 return AdResp( all_count=data["measures"] or 0, effective_count=data["effect"] or 0, effective_rate=effective_rate, page_data=page_list )