1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from pot_libs.utils.exc_util import BusinessException
from unify_api.constants import SLOTS
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
histogram_aggs_points
from unify_api.modules.tsp_water.dao.drop_dust_dao import sum_water_group
from unify_api.modules.tsp_water.dao.tsp_dao import tsp_histogram_avg
from unify_api.utils import time_format
from unify_api.utils.common_utils import round_2n
from unify_api.utils.es_query_body import es_process
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def per_hour_wave(start, end, tsp_id=None):
"""PM2.5/PM10/TSP每小时或每天曲线数据"""
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
interval = "day"
fmt = "MM-DD"
# 需求是每小时一个点
elif interval == 15 * 60:
slots = SLOTS["day"]
interval = "hour"
fmt = "HH:mm"
else:
raise BusinessException(message="time range not day or month")
# 1. 查询es
es_res = await tsp_histogram_avg(start, end, interval, tsp_id)
es_dic = es_process(es_res, fmat=fmt)
# 2. 组装数据
pm25_list = []
pm10_list = []
tsp_list = []
for slot in slots:
if slot in es_dic:
pm25_value = round_2n(es_dic[slot]["pm25"].get("value"))
pm10_value = round_2n(es_dic[slot]["pm10"].get("value"))
tsp_value = round_2n(es_dic[slot]["tsp"].get("value"))
else:
pm25_value, pm10_value, tsp_value = None, None, None
pm25_list.append(pm25_value)
pm10_list.append(pm10_value)
tsp_list.append(tsp_value)
return pm25_list, pm10_list, tsp_list, slots
async def per_hour_wave_new15(start, end, tsp_id=None):
interval, slots = time_format.time_pick_transf(start, end)
mid_sql = f"tsp_id={tsp_id} and" if tsp_id else ""
if interval == 24 * 3600:
sql = f'SELECT DATE_FORMAT(create_time,"%m-%d") date_time, ' \
f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \
f'FROM `tsp_day_record` where {mid_sql} ' \
f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time ' \
f'ORDER BY date_time'
elif interval == 15 * 60:
sql = f'SELECT DATE_FORMAT(create_time,"%%H:00") date_time, ' \
f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \
f'FROM `tsp_15min_record` where {mid_sql} ' \
f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time ' \
f'ORDER BY date_time'
else:
raise BusinessException(message="time range not day or month")
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
datas_map = {data["date_time"]: data for data in datas}
pm25_list = []
pm10_list = []
tsp_list = []
for slot in slots:
slot_data = datas_map.get(slot)
if slot_data:
pm25_value = round_2n(slot_data.get("pm25"))
pm10_value = round_2n(slot_data.get("pm10"))
tsp_value = round_2n(slot_data.get("tsp"))
else:
pm25_value, pm10_value, tsp_value = None, None, None
pm25_list.append(pm25_value)
pm10_list.append(pm10_value)
tsp_list.append(tsp_value)
return pm25_list, pm10_list, tsp_list, slots
async def per_hour_kwh_wave(start, end, tsp_id_list=None):
"""每小时或每天电量曲线数据"""
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
interval = "day"
fmt = "MM-DD"
# 需求是每小时一个点
elif interval == 15 * 60:
slots = SLOTS["day"]
interval = "hour"
fmt = "HH:mm"
else:
raise BusinessException(message="time range not day or month")
# 1. 查询es
es_res = await histogram_aggs_points(start, end, tsp_id_list, interval)
es_dic = es_process(es_res, fmat=fmt)
# 2. 组装数据
kwh_list = []
for slot in slots:
if slot in es_dic:
kwh_value = round_2n(es_dic[slot]["kwh"].get("value"))
else:
kwh_value = None
kwh_list.append(kwh_value)
return kwh_list, slots
async def per_hour_kwh_wave_new15(start, end, pids):
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
sql = f'SELECT DATE_FORMAT(create_time,"%%m-%%d") date_time, ' \
f'sum(kwh) kwh,sum(charge) charge,sum(p) p ' \
f'FROM `point_1day_power` where pid in %s and ' \
f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time' \
f' ORDER BY date_time'
elif interval == 15 * 60:
sql = f'SELECT DATE_FORMAT(create_time,"%%H:00") date_time, ' \
f'sum(kwh) kwh,sum(charge) charge,sum(p) p ' \
f'FROM `point_15min_power` where pid in %s and ' \
f'create_time BETWEEN "{start}" and "{end}" GROUP BY date_time' \
f' ORDER BY date_time'
else:
raise BusinessException(message="time range not day or month")
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pids, ))
datas_map = {data["date_time"]: data for data in datas}
# 2. 组装数据
kwh_list = []
for slot in slots:
slot_data = datas_map.get(slot)
kwh_value = round_2n(slot_data.get("kwh")) if slot_data else None
kwh_list.append(kwh_value)
return kwh_list, slots
async def per_hour_water_wave(start, end, tsp_id_list=None):
"""每小时或每天水量曲线数据"""
interval, slots = time_format.time_pick_transf(start, end)
if interval == 24 * 3600:
date_type = "month"
# 需求是每小时一个点
elif interval == 15 * 60:
slots = SLOTS["day"]
date_type = "day"
else:
raise BusinessException(message="time range not day or month")
# 1. 查询mysql
water_info = await sum_water_group(start, end, date_type=date_type)
# 2. 组装数据
water_list = [None for _ in range(len(slots))]
for index, info in enumerate(water_info):
water_list[index] = round_2n(info["water"])
return water_list