tsp_dao.py 5.29 KB
import json
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str

TSP_CURRENT = "meterdata_tsp_current"
TSP_15MIN = "poweriot_tsp_15min"


async def meterdata_tsp_current(tsp_id):
    """根据tsp_id获取redis实时数据"""
    res = await RedisUtils().hget(TSP_CURRENT, tsp_id)
    res = json.loads(res) if res else {}
    return res


async def tsp_histogram_tsp_id(date_start, date_end, tsp_id, interval):
    """TSP信息-历史曲线"""
    start_es = convert_es_str(date_start)
    end_es = convert_es_str(date_end)

    query_body = {
        "size": 0,
        "query": {
            "bool": {
                "must": [
                    {
                        "term": {
                            "tsp_id": tsp_id
                        }
                    },
                    {
                        "range": {
                            "quarter_time": {
                                "gte": start_es,
                                "lte": end_es
                            }
                        }
                    }
                ]
            }
        },
        "aggs": {
            "quarter_time": {
                "date_histogram": {
                    "field": "quarter_time",
                    "interval": interval,
                    "time_zone": "+08:00",
                    "format": "yyyy-MM-dd HH:mm:ss"
                },
                "aggs": {
                    "pm25": {
                        "stats": {
                            "field": "pm25_max"
                        }
                    },
                    "pm10": {
                        "stats": {
                            "field": "pm10_max"
                        }
                    },
                    "tsp": {
                        "stats": {
                            "field": "tsp_max"
                        }
                    }
                }
            }
        }
    }
    log.info(query_body)
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
    return es_re["aggregations"]["quarter_time"]["buckets"]


async def tsp_by_tsp_id_dao(start, end, tsp_list):
    sql = f'SELECT tsp_id, ' \
          f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \
          f'FROM `tsp_day_record` where tsp_id in %s and ' \
          f'create_time BETWEEN "{start}" and "{end}" GROUP BY tsp_id '
    async with MysqlUtil() as conn:
        datas = await conn.fetchall(sql, args=(tsp_list,))
    return datas


async def tsp_histogram_day_tsp_id(interval):  # todo: 扬尘es 待改
    """空气优-按天聚合, 再按tsp_id聚合"""
    query_body = {
        "size": 0,
        "aggs": {
            "quarter_time": {
                "date_histogram": {
                    "field": "quarter_time",
                    "interval": interval,
                    "time_zone": "+08:00",
                    "format": "yyyy-MM-dd HH:mm:ss"
                },
                "aggs": {
                    "tsps": {
                        "terms": {
                            "field": "tsp_id",
                            "size": 1000
                        },
                        "aggs": {
                            "pm25": {
                                "avg": {
                                    "field": "pm25_mean"
                                }
                            },
                            "pm10": {
                                "avg": {
                                    "field": "pm10_mean"
                                }
                            },
                            "tsp": {
                                "avg": {
                                    "field": "tsp_mean"
                                }
                            }
                        }
                    }
                }
            }
        }
    }
    log.info(query_body)
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
    return es_re["aggregations"]["quarter_time"]["buckets"]


async def range_max_value(date_start, date_end):
    """今日最高PM2.5"""
    start_es = convert_es_str(date_start)
    end_es = convert_es_str(date_end)

    query_body = {
        "size": 0,
        "query": {
            "bool": {
                "must": [
                    {
                        "range": {
                            "quarter_time": {
                                "gte": start_es,
                                "lte": end_es
                            }
                        }
                    }
                ]
            }
        },
        "aggs": {
            "pm25": {
                "max": {
                    "field": "pm25_max"
                }
            },
            "pm10": {
                "avg": {
                    "field": "pm10_max"
                }
            },
            "tsp": {
                "avg": {
                    "field": "tsp_max"
                }
            }
        }
    }

    log.info(query_body)
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=TSP_15MIN)
    return es_re