es_query_body.py 4.32 KB
from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str, time_str_to_str, \
    esstr_to_dthoutstr, time_str_to_str1


def es_process(data, fmat, time_key="key_as_string"):
    res = {}
    for _item in data:
        key = time_str_to_str(_item[time_key], fmat)
        res[key] = _item
    return res


def es_time_process(data, time_key="quarter_time", fmt=None):
    res = {}
    for _item in data:
        key = esstr_to_dthoutstr(_item["_source"][time_key], fmt)
        res[key] = _item["_source"]
    return res


def sql_time_process(data, time_key="quarter_time", fmt="%H:%M"):
    res = {}
    for _item in data:
        key = _item[time_key].strftime(fmt)
        res[key] = _item
    return res


def es_process1(data, fmat, time_key="key_as_string"):
    res = {}
    for _item in data:
        key = time_str_to_str1(_item[time_key], fmat)
        res[key] = _item
    return res


def agg_statistics(date_key=None, matchs=None, terms=None, start=None,
                   end=None, aggs_key=()):
    # (date_key="datetime", matchs={"cid": 34}, terms={"location_id": [1]},
    # start=start, end=end, aggs_key=["kwh", "charge"])
    """统计"""
    aggs_body = {}
    for agg in aggs_key:
        aggs_body[agg] = {"sum": {"field": agg}}
    query_body = {
        "size": 0,
        "query": {
            "bool": {"must": []}
        },
        "aggs": aggs_body
    }
    if matchs:
        for k in ["day", "hour", "quarter_time", "month"]:
            if k in matchs:
                matchs[k] = convert_es_str(matchs[k])
        for key in matchs:
            query_body["query"]["bool"]["must"].append(
                {"match": {key: matchs[key]}})
    if terms:
        for key in terms:
            query_body["query"]["bool"]["must"].append(
                {"terms": {key: terms[key]}})
    if start and end:
        start = convert_es_str(start)
        end = convert_es_str(end)
        query_body["query"]["bool"]["must"].append(
            {"range": {date_key: {"gte": start, "lte": end}}})
    log.info("query_body:{}".format(query_body))
    return query_body


class EsQueryBody(object):
    """构造es query body"""

    def __init__(self, matchs=None, terms=None, start=None, end=None,
                 date_key=None, size=0):
        self.matchs = matchs
        self.terms = terms
        self.start = start
        self.end = end
        self.date_key = date_key
        self.size = size

    def query(self):
        """条件查询"""
        query_body = {
            "size": self.size,
            "query": {
                "bool": {"must": []}
            }
        }
        if self.matchs:
            for k in ["day", "hour", "quarter_time", "month"]:
                if k in self.matchs:
                    self.matchs[k] = convert_es_str(self.matchs[k])
            for key in self.matchs:
                query_body["query"]["bool"]["must"].append(
                    {"match": {key: self.matchs[key]}})
        if self.terms:
            for key in self.terms:
                query_body["query"]["bool"]["must"].append(
                    {"terms": {key: self.terms[key]}})
        if self.start and self.end:
            start = convert_es_str(self.start)
            end = convert_es_str(self.end)
            query_body["query"]["bool"]["must"].append(
                {"range": {self.date_key: {"gte": start, "lte": end}}})
        return query_body

    def query_agg_histogram(self, interval):
        """直方图, 聚合电量电费信息"""
        query = self.query()
        query["aggs"] = {
            "quarter_time": {
                "date_histogram": {
                    "field": "quarter_time",
                    "interval": interval,
                    "time_zone": "+08:00",
                    "format": "yyyy-MM-dd HH:mm:ss"
                },
                "aggs": {
                    "kwh": {
                        "stats": {
                            "field": "kwh"
                        }
                    },
                    "p": {
                        "stats": {
                            "field": "p"
                        }
                    },
                    "charge": {
                        "stats": {
                            "field": "charge"
                        }
                    }
                }
            }
        }
        return query