data_es_dao.py 5.91 KB
Newer Older
lcn's avatar
lcn committed
1
from pot_libs.es_util.es_utils import EsUtil
lcn's avatar
lcn committed
2
from pot_libs.settings import SETTING
wang.wenrong's avatar
wang.wenrong committed
3 4
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao, \
    get_sid_by_mtid_dao, get_mtids_by_pids_dao
lcn's avatar
lcn committed
5 6 7
from unify_api.utils.common_utils import make_tdengine_data_as_list
from unify_api.utils.taos_new import get_td_engine_data
from unify_api.utils.time_format import convert_es_str, CST
lcn's avatar
lcn committed
8 9 10 11 12
from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE

from pot_libs.mysql_util.mysql_util import MysqlUtil


lcn's avatar
lcn committed
13 14
async def query_point_1min_index(date_start, date_end,
                                 mtid, fields=None):
lcn's avatar
lcn committed
15
    """point点某一天1440个点数据"""
lcn's avatar
lcn committed
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
    if fields:
        fields.insert(0, 'ts')
        fields_str = ",".join(fields)
    else:
        fields_str = "*"
    db = "db_electric"
    url = f"{SETTING.stb_url}{db}"
    table_name = "mt%s_ele" % mtid
    sql = f"select {fields_str} from {table_name} where ts >= '{date_start}'" \
          f" and ts <= '{date_end}' order by ts asc"
    is_succ, tdengine_data = await get_td_engine_data(url, sql)
    if not is_succ:
        return []
    results = make_tdengine_data_as_list(tdengine_data)
    return results


async def query_location_1min_index(date_start, date_end, mtid,
                                    fields=None):
lcn's avatar
lcn committed
35
    """location点某一天1440*n个点数据"""
lcn's avatar
lcn committed
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    if fields:
        fields.insert(0, 'ts')
        fields_str = ",".join(fields)
    else:
        fields_str = "*"
    db = "db_adio"
    url = f"{SETTING.stb_url}{db}"
    table_name = "mt%s_adi" % mtid
    sql = f"select {fields_str} from {table_name} where ts >= '{date_start}'" \
          f" and ts <= '{date_end}' order by ts asc"
    is_succ, tdengine_data = await get_td_engine_data(url, sql)
    if not is_succ:
        return []
    results = make_tdengine_data_as_list(tdengine_data)
    return results
lcn's avatar
lcn committed
51 52 53 54 55


async def get_search_scope(cid, pid, start, end):
    where_list = [f"pid = {pid}"]
    if cid:
wang.wenrong's avatar
wang.wenrong committed
56
        where_list.append(f"cid in %s")
lcn's avatar
lcn committed
57
    if start and end:
wang.wenrong's avatar
wang.wenrong committed
58 59
        where_list.append(
            f"event_datetime >= '{start}' and event_datetime <='{end}' ")
wang.wenrong's avatar
wang.wenrong committed
60
    where_str = " and ".join(where_list)
lcn's avatar
lcn committed
61 62 63
    sql = f"select * from point_1min_event where {where_str} " \
          f"ORDER BY event_datetime desc limit 5000"
    async with MysqlUtil() as conn:
wang.wenrong's avatar
wang.wenrong committed
64
        if cid:
wang.wenrong's avatar
wang.wenrong committed
65
            data = await conn.fetchall(sql, args=(cid,))
wang.wenrong's avatar
wang.wenrong committed
66 67
        else:
            data = await conn.fetchall(sql)
lcn's avatar
lcn committed
68 69
    return data

wang.wenrong's avatar
wang.wenrong committed
70

lcn's avatar
lcn committed
71
async def query_search_scope(cid, pid, page_num, page_size,
wang.wenrong's avatar
wang.wenrong committed
72 73 74 75 76
                             start_time, end_time, scope_g):
    """
    查询录波列表
    """
    if len(pid) > 1:
lcn's avatar
lcn committed
77
        
wang.wenrong's avatar
wang.wenrong committed
78
        mtid = await get_mtids_by_pids_dao(pid)
lcn's avatar
lcn committed
79
    
wang.wenrong's avatar
wang.wenrong committed
80
    else:
wang.wenrong's avatar
wang.wenrong committed
81
        mtid = await get_mtid_by_pid_dao(pid)
lcn's avatar
lcn committed
82
    
wang.wenrong's avatar
wang.wenrong committed
83
    where = ""
wang.wenrong's avatar
wang.wenrong committed
84 85
    if cid:
        where += f" and pe.cid={cid} "
wang.wenrong's avatar
wang.wenrong committed
86
    if start_time:
wang.wenrong's avatar
wang.wenrong committed
87
        where += f" and pt.event_datetime >= '{start_time}' "
wang.wenrong's avatar
wang.wenrong committed
88
    if end_time:
wang.wenrong's avatar
wang.wenrong committed
89
        where += f" and pe.create_time <= '{end_time}' "
wang.wenrong's avatar
wang.wenrong committed
90
    if mtid:
wang.wenrong's avatar
wang.wenrong committed
91 92 93 94 95 96 97 98 99
        if len(mtid) == 1:
            where += f" and pe.mtid = {mtid['mtid']} "
        else:
            where += f" and pe.mtid in {tuple(mtid)} "
    if scope_g:
        if len(scope_g) == 1:
            where += f" AND pe.scope_g =  {scope_g[0]}  "
        else:
            where += f" AND pe.scope_g in  {tuple(scope_g)}  "
lcn's avatar
lcn committed
100
    
wang.wenrong's avatar
wang.wenrong committed
101 102 103
    sql = f"""
                SELECT
                    pt.event_id doc_id,
wang.wenrong's avatar
wang.wenrong committed
104
                    DATE_FORMAT(pt.event_datetime, '%Y-%m-%d %H:%i:%s') check_dt,
wang.wenrong's avatar
wang.wenrong committed
105 106 107 108 109 110 111 112
                    pt.`name` point,
                    pt.message message,
                    pe.scope_g scope_type
                FROM
                    point_1min_event pt
                    LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid 
                    AND pe.create_time = pt.event_datetime 
                WHERE
wang.wenrong's avatar
wang.wenrong committed
113
                    pt.event_mode = 'scope'
wang.wenrong's avatar
wang.wenrong committed
114 115 116
                    {where}
                ORDER BY
                    pe.create_time DESC 
wang.wenrong's avatar
wang.wenrong committed
117
                    LIMIT  {page_num} , {page_size} """
lcn's avatar
lcn committed
118
    
wang.wenrong's avatar
wang.wenrong committed
119 120 121 122 123 124 125 126 127 128 129
    total_sql = f"""
                SELECT
                    count(*) total
                FROM
                    point_1min_event pt
                    LEFT JOIN point_1min_scope pe ON pt.mtid = pe.mtid 
                    AND pe.create_time = pt.event_datetime 
                WHERE
                    pt.event_mode = 'scope'
                    {where}
            """
wang.wenrong's avatar
wang.wenrong committed
130
    async with MysqlUtil() as conn:
lcn's avatar
lcn committed
131
        
wang.wenrong's avatar
wang.wenrong committed
132
        data = await conn.fetchall(sql, )
wang.wenrong's avatar
wang.wenrong committed
133
        total = await conn.fetchone(total_sql)
lcn's avatar
lcn committed
134
    
wang.wenrong's avatar
wang.wenrong committed
135
    return data, total
lcn's avatar
lcn committed
136 137 138 139 140 141 142 143 144 145 146 147 148 149


async def get_scope_pids(pids, start, end):
    if start and end:
        where_str = f"pid in {pids} and event_datetime>={start} and " \
                    f"event_datetime<={end}"
    else:
        where_str = f"pid in {pids}"
    sql = f"select * from point_1min_event where {where_str}  " \
          f"ORDER BY event_datetime desc limit 5000"
    async with MysqlUtil() as conn:
        data = await conn.fetchall(sql)
    return data

wang.wenrong's avatar
wang.wenrong committed
150

lcn's avatar
lcn committed
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
async def query_search_scope_pids(pids, page_num, page_size, start, end):
    query_body = {
        "from": (page_num - 1) * page_size,
        "size": page_size,
        "query": {
            "bool": {
                "must": [
                    {"term": {"mode.keyword": "scope"}},
                    {"terms": {"point_id": pids}}
                ]
            }
        },
        "sort": [
            {
                "datetime": {
                    "order": "desc"
                }
            }
        ]
    }
    if start and end:
        start_es = convert_es_str(start)
        end_es = convert_es_str(end)
        query_body["query"]["bool"]["must"].append(
            {
                "range": {
                    "datetime": {
                        "gte": start_es,
                        "lte": end_es
                    }
                }
            }
        )
    async with EsUtil() as es:
        es_re = await es.search_origin(body=query_body, index=SCOPE_DATABASE)
    return es_re