from pot_libs.es_util.es_utils import EsUtil from pot_libs.settings import SETTING from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao, \ get_sid_by_mtid_dao, get_mtids_by_pids_dao from unify_api.utils.common_utils import make_tdengine_data_as_list from unify_api.utils.taos_new import get_td_engine_data from unify_api.utils.time_format import convert_es_str, CST from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE from pot_libs.mysql_util.mysql_util import MysqlUtil async def query_point_1min_index(date_start, date_end, mtid, fields=None): """point点某一天1440个点数据""" if fields: fields.insert(0, 'ts') fields_str = ",".join(fields) else: fields_str = "*" db = "db_electric" url = f"{SETTING.stb_url}{db}" table_name = "mt%s_ele" % mtid sql = f"select {fields_str} from {table_name} where ts >= '{date_start}'" \ f" and ts <= '{date_end}' order by ts asc" is_succ, tdengine_data = await get_td_engine_data(url, sql) if not is_succ: return [] results = make_tdengine_data_as_list(tdengine_data) return results async def query_location_1min_index(date_start, date_end, mtid, fields=None): """location点某一天1440*n个点数据""" if fields: fields.insert(0, 'ts') fields_str = ",".join(fields) else: fields_str = "*" db = "db_adio" url = f"{SETTING.stb_url}{db}" table_name = "mt%s_adi" % mtid sql = f"select {fields_str} from {table_name} where ts >= '{date_start}'" \ f" and ts <= '{date_end}' order by ts asc" is_succ, tdengine_data = await get_td_engine_data(url, sql) if not is_succ: return [] results = make_tdengine_data_as_list(tdengine_data) return results async def get_search_scope(cid, pid, start, end): where_list = [f"pid = {pid}"] if cid: where_list.append(f"cid in %s") if start and end: where_list.append( f"event_datetime >= '{start}' and event_datetime <='{end}' ") where_str = " and ".join(where_list) sql = f"select * from point_1min_event where {where_str} " \ f"ORDER BY event_datetime desc limit 5000" async with MysqlUtil() as conn: if cid: data = await conn.fetchall(sql, args=(cid,)) else: data = await conn.fetchall(sql) return data async def query_search_scope(cid, pid, page_num, page_size, start_time, end_time, scope_g): """ 查询录波列表 """ if len(pid) > 1: mtid = await get_mtids_by_pids_dao(pid) else: mtid = await get_mtid_by_pid_dao(pid) where = "" if cid: where += f" and pe.cid={cid} " if start_time: where += f" and pt.event_datetime >= '{start_time}' " if end_time: where += f" and pe.create_time <= '{end_time}' " if mtid: if len(mtid) == 1: where += f" and pe.mtid = {mtid['mtid']} " else: where += f" and pe.mtid in {tuple(mtid)} " if scope_g: if len(scope_g) == 1: where += f" AND pe.scope_g = {scope_g[0]} " else: where += f" AND pe.scope_g in {tuple(scope_g)} " sql = f""" SELECT pt.event_id doc_id, DATE_FORMAT(pt.event_datetime, '%Y-%m-%d %H:%i:%s') check_dt, pt.`name` point, pt.message message, pe.scope_g scope_type FROM point_1min_event pt LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid AND pe.create_time = pt.event_datetime WHERE pt.event_mode = 'scope' {where} ORDER BY pe.create_time DESC LIMIT {page_num} , {page_size} """ total_sql = f""" SELECT count(*) total FROM point_1min_event pt LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid AND pe.create_time = pt.event_datetime WHERE pt.event_mode = 'scope' {where} """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, ) total = await conn.fetchone(total_sql) return data, total async def get_scope_pids(pids, start, end): if start and end: where_str = f"pid in {pids} and event_datetime>={start} and " \ f"event_datetime<={end}" else: where_str = f"pid in {pids}" sql = f"select * from point_1min_event where {where_str} " \ f"ORDER BY event_datetime desc limit 5000" async with MysqlUtil() as conn: data = await conn.fetchall(sql) return data async def query_search_scope_pids(pids, page_num, page_size, start, end): query_body = { "from": (page_num - 1) * page_size, "size": page_size, "query": { "bool": { "must": [ {"term": {"mode.keyword": "scope"}}, {"terms": {"point_id": pids}} ] } }, "sort": [ { "datetime": { "order": "desc" } } ] } if start and end: start_es = convert_es_str(start) end_es = convert_es_str(end) query_body["query"]["bool"]["must"].append( { "range": { "datetime": { "gte": start_es, "lte": end_es } } } ) async with EsUtil() as es: es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE) return es_re