import json from pot_libs.aredis_util.aredis_utils import RedisUtils from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.es_util.es_utils import EsUtil from pot_libs.logger import log from unify_api.utils.time_format import convert_es_str TSP_CURRENT = "meterdata_tsp_current" TSP_15MIN = "poweriot_tsp_15min" async def meterdata_tsp_current(tsp_id): """根据tsp_id获取redis实时数据""" res = await RedisUtils().hget(TSP_CURRENT, tsp_id) res = json.loads(res) if res else {} return res async def tsp_histogram_tsp_id(date_start, date_end, tsp_id, interval): """TSP信息-历史曲线""" start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "term": { "tsp_id": tsp_id } }, { "range": { "quarter_time": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "quarter_time": { "date_histogram": { "field": "quarter_time", "interval": interval, "time_zone": "+08:00", "format": "yyyy-MM-dd HH:mm:ss" }, "aggs": { "pm25": { "stats": { "field": "pm25_max" } }, "pm10": { "stats": { "field": "pm10_max" } }, "tsp": { "stats": { "field": "tsp_max" } } } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=TSP_15MIN) return es_re["aggregations"]["quarter_time"]["buckets"] async def tsp_by_tsp_id_dao(start, end, tsp_list): sql = f'SELECT tsp_id, ' \ f'AVG(pm25_mean) pm25,AVG(pm10_mean) pm10,AVG(tsp_mean) tsp ' \ f'FROM `tsp_day_record` where tsp_id in %s and ' \ f'create_time BETWEEN "{start}" and "{end}" GROUP BY tsp_id ' async with MysqlUtil() as conn: datas = await conn.fetchall(sql, args=(tsp_list,)) return datas async def tsp_histogram_day_tsp_id(interval): # todo: 扬尘es 待改 """空气优-按天聚合, 再按tsp_id聚合""" query_body = { "size": 0, "aggs": { "quarter_time": { "date_histogram": { "field": "quarter_time", "interval": interval, "time_zone": "+08:00", "format": "yyyy-MM-dd HH:mm:ss" }, "aggs": { "tsps": { "terms": { "field": "tsp_id", "size": 1000 }, "aggs": { "pm25": { "avg": { "field": "pm25_mean" } }, "pm10": { "avg": { "field": "pm10_mean" } }, "tsp": { "avg": { "field": "tsp_mean" } } } } } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=TSP_15MIN) return es_re["aggregations"]["quarter_time"]["buckets"] async def range_max_value(date_start, date_end): """今日最高PM2.5""" start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "range": { "quarter_time": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "pm25": { "max": { "field": "pm25_max" } }, "pm10": { "avg": { "field": "pm10_max" } }, "tsp": { "avg": { "field": "tsp_max" } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=TSP_15MIN) return es_re