1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str, time_str_to_str, \
esstr_to_dthoutstr, time_str_to_str1
def es_process(data, fmat, time_key="key_as_string"):
res = {}
for _item in data:
key = time_str_to_str(_item[time_key], fmat)
res[key] = _item
return res
def es_time_process(data, time_key="quarter_time", fmt=None):
res = {}
for _item in data:
key = esstr_to_dthoutstr(_item["_source"][time_key], fmt)
res[key] = _item["_source"]
return res
def sql_time_process(data, time_key="quarter_time", fmt="%H:%M"):
res = {}
for _item in data:
key = _item[time_key].strftime(fmt)
res[key] = _item
return res
def es_process1(data, fmat, time_key="key_as_string"):
res = {}
for _item in data:
key = time_str_to_str1(_item[time_key], fmat)
res[key] = _item
return res
def agg_statistics(date_key=None, matchs=None, terms=None, start=None,
end=None, aggs_key=()):
# (date_key="datetime", matchs={"cid": 34}, terms={"location_id": [1]},
# start=start, end=end, aggs_key=["kwh", "charge"])
"""统计"""
aggs_body = {}
for agg in aggs_key:
aggs_body[agg] = {"sum": {"field": agg}}
query_body = {
"size": 0,
"query": {
"bool": {"must": []}
},
"aggs": aggs_body
}
if matchs:
for k in ["day", "hour", "quarter_time", "month"]:
if k in matchs:
matchs[k] = convert_es_str(matchs[k])
for key in matchs:
query_body["query"]["bool"]["must"].append(
{"match": {key: matchs[key]}})
if terms:
for key in terms:
query_body["query"]["bool"]["must"].append(
{"terms": {key: terms[key]}})
if start and end:
start = convert_es_str(start)
end = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{"range": {date_key: {"gte": start, "lte": end}}})
log.info("query_body:{}".format(query_body))
return query_body
class EsQueryBody(object):
"""构造es query body"""
def __init__(self, matchs=None, terms=None, start=None, end=None,
date_key=None, size=0):
self.matchs = matchs
self.terms = terms
self.start = start
self.end = end
self.date_key = date_key
self.size = size
def query(self):
"""条件查询"""
query_body = {
"size": self.size,
"query": {
"bool": {"must": []}
}
}
if self.matchs:
for k in ["day", "hour", "quarter_time", "month"]:
if k in self.matchs:
self.matchs[k] = convert_es_str(self.matchs[k])
for key in self.matchs:
query_body["query"]["bool"]["must"].append(
{"match": {key: self.matchs[key]}})
if self.terms:
for key in self.terms:
query_body["query"]["bool"]["must"].append(
{"terms": {key: self.terms[key]}})
if self.start and self.end:
start = convert_es_str(self.start)
end = convert_es_str(self.end)
query_body["query"]["bool"]["must"].append(
{"range": {self.date_key: {"gte": start, "lte": end}}})
return query_body
def query_agg_histogram(self, interval):
"""直方图, 聚合电量电费信息"""
query = self.query()
query["aggs"] = {
"quarter_time": {
"date_histogram": {
"field": "quarter_time",
"interval": interval,
"time_zone": "+08:00",
"format": "yyyy-MM-dd HH:mm:ss"
},
"aggs": {
"kwh": {
"stats": {
"field": "kwh"
}
},
"p": {
"stats": {
"field": "p"
}
},
"charge": {
"stats": {
"field": "charge"
}
}
}
}
}
return query