from pot_libs.es_util.es_utils import EsUtil from pot_libs.logger import log from unify_api.constants import COMPANY_15MIN_POWER, POINT_15MIN_POWER from unify_api.utils.time_format import convert_es_str from pot_libs.mysql_util.mysql_util import MysqlUtil index = COMPANY_15MIN_POWER index_point = POINT_15MIN_POWER async def point_day_power_dao(cid, start, end): sql = f""" SELECT DATE_FORMAT(pp.create_time, '%%Y-%%m-%%d') create_time, SUM(pp.kwh) kwh, SUM(pp.p) p, SUM(pp.charge) charge from point_1day_power pp LEFT JOIN point p on pp.pid=p.pid LEFT JOIN monitor m on m.mtid=p.mtid where p.cid=%s and pp.create_time>='{start}' and pp.create_time<='{end}' and m.demolished=0 GROUP BY pp.create_time ORDER BY pp.create_time """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, args=(cid,)) return data async def get_total_kwh_dao(cid, start, end): sql = f""" SELECT sum(pp.kwh) total_kwh from point_1day_power pp LEFT JOIN point p ON p.pid=pp.pid LEFT JOIN monitor m on m.mtid=p.mtid where p.cid=%s and pp.create_time>='{start}' and pp.create_time<='{end}' and m.demolished=0 """ async with MysqlUtil() as conn: total_kwh = await conn.fetchone(sql, args=(cid,)) return total_kwh async def get_kwh_charge(table_name, name, value, start, end): sql = f"SELECT sum(kwh) sum_kwh, sum(charge) sum_charge " \ f"FROM {table_name} where create_time>='{start}' and " \ f"create_time<='{end}' and {name} = %s" async with MysqlUtil() as conn: datas = await conn.fetchone(sql, args=(value,)) return datas async def load_cmpy_charge(date_start, date_end, cids): sql = f""" select cid,sum(kwh) kwh,sum(charge) charge from company_15min_power where cid in %s and create_time BETWEEN %s and %s group by cid """ async with MysqlUtil() as conn: datas = await conn.fetchall(sql=sql, args=(cids, date_start, date_end)) return datas async def power_charge_p_aggs(date_start, date_end, cid_list, interval): """ date_histogram, """ if interval == "hour": time_fmt = "%%Y-%%m-%%d %%H" elif interval == "day": time_fmt = "%%Y-%%m-%%d" else: time_fmt = "%%Y-%%m-%%d %%H:%%i" sql = f""" select date_format(create_time,"{time_fmt}") as create_time,sum(kwh) kwh,sum(charge) charge,sum(p) p from company_15min_power where cid in %s and create_time >= %s and create_time <= %s group by date_format(create_time,"{time_fmt}") """ async with MysqlUtil() as conn: results = await conn.fetchall(sql, args=(cid_list, date_start, date_end)) return results or [] async def power_charge_p_cid_aggs(date_start, date_end, cid_list, interval): """ excel下载, 按照cid,date_histogram两次聚合,求电量电费 """ if interval == "hour": time_fmt = "%%Y-%%m-%%d %%H" elif interval == "day": time_fmt = "%%Y-%%m-%%d" else: time_fmt = "%%Y-%%m-%%d %%H:%%i" sql = f""" select cid,date_format(create_time,"{time_fmt}") as create_time, sum(kwh) kwh,sum(charge) charge,sum(p) p from company_15min_power where cid in %s and create_time >= %s and create_time <= %s group by cid,date_format(create_time,"{time_fmt}") """ async with MysqlUtil() as conn: results = await conn.fetchall(sql, args=(cid_list, date_start, date_end)) return results or [] async def query_charge_aggs_points(start, end, point_list): sql = f"SELECT pid,sum(kwh) kwh,SUM(charge) charge " \ f"FROM `point_15min_power` " \ f"where pid in %s and create_time BETWEEN '{start}' and '{end}' " \ f"GROUP BY pid" async with MysqlUtil() as conn: datas = await conn.fetchall(sql, args=(point_list,)) return datas async def histogram_aggs_points(date_start, date_end, point_list, interval): """date_histogram""" start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "terms": { "pid": point_list } }, { "range": { "quarter_time": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "quarter_time": { "date_histogram": { "field": "quarter_time", "interval": interval, "time_zone": "+08:00", "format": "yyyy-MM-dd HH:mm:ss" }, "aggs": { "kwh": { "sum": { "field": "kwh" } }, "charge": { "sum": { "field": "charge" } }, "p": { "sum": { "field": "p" } } } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=index_point) return es_re["aggregations"]["quarter_time"]["buckets"] async def power_charge_p_point_aggs(date_start, date_end, pid_list, interval): """ excel下载, 按照pid,date_histogram两次聚合,求电量电费 """ start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "terms": { "pid": pid_list } }, { "range": { "quarter_time": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "pids": { "terms": { "field": "pid", "size": 10000 }, "aggs": { "quarter_time": { "date_histogram": { "field": "quarter_time", "interval": interval, "time_zone": "+08:00", "format": "yyyy-MM-dd HH:mm:ss" }, "aggs": { "kwh": { "sum": { "field": "kwh" } }, "charge": { "sum": { "field": "charge" } }, "p": { "sum": { "field": "p" } } } } } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=index_point) return es_re["aggregations"]["pids"]["buckets"] async def point_kwh_charge(point_list, start=None, end=None): """1.5版本根据pid,求电量电费""" if start and end: sql = f"SELECT sum(kwh) kwh, sum(charge) charge FROM " \ f"`point_15min_power` " \ f"where pid in %s and create_time BETWEEN {start} and {end}" else: sql = "SELECT sum(kwh) kwh,sum(charge) charge FROM point_15min_power" \ " where pid in %s" async with MysqlUtil() as conn: data = await conn.fetchone(sql, args=(point_list,)) return data async def extended_bounds_agg(date_start, date_end, cid_list, interval): """ date_histogram, """ start_es = convert_es_str(date_start) end_es = convert_es_str(date_end) query_body = { "size": 0, "query": { "bool": { "must": [ { "terms": { "cid": cid_list } }, { "range": { "quarter_time": { "gte": start_es, "lte": end_es } } } ] } }, "aggs": { "quarter_time": { "date_histogram": { "field": "quarter_time", "interval": interval, "time_zone": "+08:00", "format": "yyyy-MM-dd HH:mm:ss", "extended_bounds": { "min": date_start, "max": date_end } }, "aggs": { "kwh": { "sum": { "field": "kwh" } }, "charge": { "sum": { "field": "charge" } }, "p": { "sum": { "field": "p" } } } } } } log.info(query_body) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=index) return es_re["aggregations"]["quarter_time"]["buckets"]