data_from_product.py 4.72 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
import json

from aioelasticsearch import Elasticsearch
from pot_libs.settings import SETTING
import asyncio

from elasticsearch.helpers import async_scan


class EsQueryUtil(object):
    pass


class EsUtil(object):
    doc_type = "doc"

    def __init__(self):
        self.client = Elasticsearch(hosts="127.0.0.1:9200")

    async def __aenter__(self):
        await self.client.__aenter__()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        return await self.client.__aexit__(exc_type, exc_val, exc_tb)

    @classmethod
    def get_source(cls, hit):
        result = hit["_source"]
        result["_id"] = hit["_id"]
        return result

    async def search(self, body, index):
        result = await self.client.search(
            body=body, index=index, doc_type=self.doc_type
        )
        return [self.get_source(hit) for hit in result["hits"]["hits"]]

    async def get_page(self, *, body, index, page, size):
        """分页查询"""
        body.update({"from": (page - 1) * size, "size": size})
        result = await self.client.search(
            body=body, index=index, doc_type=self.doc_type,
        )
        hits = result["hits"]
        return hits["total"], [self.get_source(hit) for hit in hits["hits"]]

    async def search_origin(self, body, index):
        """查询原始的es数据"""
        result = await self.client.search(
            body=body, index=index, doc_type=self.doc_type
        )
        return result


async def test():
    query_body = {
        "query": {
            "bool": {
                "filter": [
                    {"term": {"location_id": 280}},
                    {"range": {"time": {"gte": 1595952000, "lte": 1596038399}}}
                ]
            }
        },
        "size": 0,
        "aggs": {
            "aggs_name": {
                "histogram": {"field": "time", "interval": 900},
                "aggs": {
                    "tops": {
                        "top_hits": {"size": 10, "sort": [{"value": {"order": "desc"}}]}
                    }
                }
            }
        }
    }
    async with EsUtil() as es:
        es_results = await es.search_origin(
            body=query_body, index="poweriot_location_1min_aiao"
        )


async def get_data_from_old():
    dev_client = Elasticsearch(hosts="172.18.1.249:9200")

    async for doc in async_scan(
            client=dev_client,
            query={"query": {"bool": {"must": [{"match_all": {}}], "must_not": [], "should": []}}},
            index="poweriot_location_1min_aiao"
    ):
        if doc:
            old_doc_id = doc["_id"]
            old_doc = {key: value for key, value in doc["_source"].items() if key in [
                "proxy_id", "cid", "sid", "location_id", "type", "time", "field", "value"
            ]}
            res = await write_to_new_type(old_doc_id, old_doc)
    await dev_client.close()


async def write_to_new_type(old_doc_id, old_doc):
    async with EsUtil() as es:
        result = await es.client.create("test_index", old_doc_id, old_doc, ignore=409)
        return result
async def main():
    async with EsUtil() as es:
        index_settings = {
            "settings": {"number_of_shards": 1},
            "mappings": {
                "poweriot_location_1min_aiao": {
                    "properties": {
                        "cid": {"type": "long"},
                        "field": {
                            "type": "text",
                            "fields": {
                                "keyword": {"type": "keyword", "ignore_above": 256}
                            },
                        },
                        "location_id": {"type": "long"},
                        "proxy_id": {"type": "long"},
                        "sid": {
                            "type": "text",
                            "fields": {
                                "keyword": {"type": "keyword", "ignore_above": 256}
                            },
                        },
                        "time": {"type": "long"},
                        "type": {
                            "type": "text",
                            "fields": {
                                "keyword": {"type": "keyword", "ignore_above": 256}
                            },
                        },
                        "value": {"type": "float"},
                    }
                }
            },
        }

        # >7.0版本的神坑include_type_name https://www.jianshu.com/p/0e7806fe1255
        res_create_index = await es.client.indices.create(
            index="test_index", body=index_settings, ignore=400, include_type_name=True
        )

    await get_data_from_old()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())