data_es_dao.py 5.54 KB
from pot_libs.es_util.es_utils import EsUtil
from unify_api.utils.time_format import convert_es_str
from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE

from pot_libs.mysql_util.mysql_util import MysqlUtil


async def query_point_1min_index(p_database, date_start, date_end,
                                 point_id):
    """point点某一天1440个点数据"""
    start_es = convert_es_str(date_start)
    end_es = convert_es_str(date_end)
    query_body = {
        "size": 10000,
        "query": {
            "bool": {
                "must": [
                    {
                        "term": {
                            "point_id": point_id
                        }
                    },
                    {
                        "range": {
                            "datetime": {
                                "gte": start_es,
                                "lte": end_es
                            }
                        }
                    }
                ]
            }
        },
        "sort": [{"datetime": {"order": "asc"}}]
    }
    async with EsUtil() as es:
        es_re = await es.search(body=query_body, index=p_database)
    return es_re


async def query_location_1min_index(l_database, date_start, date_end,
                                    location_id):
    """location点某一天1440*n个点数据"""
    start_es = convert_es_str(date_start)
    end_es = convert_es_str(date_end)
    query_body = {
        "size": 10000,
        "query": {
            "bool": {
                "must": [
                    {
                        "terms": {
                            "location_id": location_id
                        }
                    },
                    {
                        "range": {
                            "datetime": {
                                "gte": start_es,
                                "lte": end_es
                            }
                        }
                    }
                ]
            }
        },
        "sort": [{"datetime": {"order": "asc"}}]
    }
    async with EsUtil() as es:
        es_re = await es.search(body=query_body, index=l_database)
    return es_re


async def get_search_scope(cid, pid, start, end):

    where_list = [f"pid = {pid}"]
    if cid:
        where_list.append(f"cid in {cid}")
    if start and end:
        where_list.append(f"event_datetime>={start} and event_datetime<={end}")
    where_str = "and".join(where_list)
    sql = f"select * from point_1min_event where {where_str} " \
          f"ORDER BY event_datetime desc limit 5000"
    async with MysqlUtil() as conn:
        data = await conn.fetchall(sql)
    return data

################
async def query_search_scope(cid, pid, page_num, page_size,
                             start, end):
    query_body = {
        "from": (page_num - 1) * page_size,
        "size": page_size,
        "query": {
            "bool": {
                "must": [
                    {
                        "term": {
                            "mode.keyword": "scope"
                        }
                    }
                ]
            }
        },
        "sort": [
            {
                "datetime": {
                    "order": "desc"
                }
            }
        ]
    }
    if start and end:
        start_es = convert_es_str(start)
        end_es = convert_es_str(end)
        query_body["query"]["bool"]["must"].append(
            {
                "range": {
                    "datetime": {
                        "gte": start_es,
                        "lte": end_es
                    }
                }
            }
        )
    if cid:
        query_body["query"]["bool"]["must"].append(
            {
                "terms": {
                    "cid": cid
                }
            }
        )
    if pid:
        query_body["query"]["bool"]["must"].append(
            {
                "term": {
                    "point_id": pid
                }
            }
        )
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE)
    return es_re


async def get_scope_pids(pids, start, end):
    if start and end:
        where_str = f"pid in {pids} and event_datetime>={start} and " \
                    f"event_datetime<={end}"
    else:
        where_str = f"pid in {pids}"
    sql = f"select * from point_1min_event where {where_str}  " \
          f"ORDER BY event_datetime desc limit 5000"
    async with MysqlUtil() as conn:
        data = await conn.fetchall(sql)
    return data

################################
async def query_search_scope_pids(pids, page_num, page_size, start, end):
    query_body = {
        "from": (page_num - 1) * page_size,
        "size": page_size,
        "query": {
            "bool": {
                "must": [
                    {"term": {"mode.keyword": "scope"}},
                    {"terms": {"point_id": pids}}
                ]
            }
        },
        "sort": [
            {
                "datetime": {
                    "order": "desc"
                }
            }
        ]
    }
    if start and end:
        start_es = convert_es_str(start)
        end_es = convert_es_str(end)
        query_body["query"]["bool"]["must"].append(
            {
                "range": {
                    "datetime": {
                        "gte": start_es,
                        "lte": end_es
                    }
                }
            }
        )
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE)
    return es_re