taos_new.py 2.61 KB
Newer Older
wang.wenrong's avatar
wang.wenrong committed
1 2 3 4 5 6 7 8
import json
import re

from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from pot_libs.settings import SETTING
import base64
from pot_libs.mysql_util.mysql_util import MysqlUtil
ZZH's avatar
ZZH committed
9
from unify_api.constants import S030_TOPIC, WG_TOPIC, TD_TBL_POSTFIX
wang.wenrong's avatar
wang.wenrong committed
10 11 12


async def get_td_engine_data(url, sql):
wang.wenrong's avatar
wang.wenrong committed
13 14
    if "?" not in url:
        url += "?tz=Asia/Shanghai"
wang.wenrong's avatar
wang.wenrong committed
15 16 17 18 19 20
    token = get_token()
    log.info(f"token:{token},sql:{sql}")
    resp_str, status = await AioHttpUtils().post_data(
        url, data=sql, timeout=50,
        headers={"Authorization": f"Basic {token}"}
    )
wang.wenrong's avatar
wang.wenrong committed
21 22
    results = json.loads(resp_str)
    if results["code"] != 0:
wang.wenrong's avatar
wang.wenrong committed
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
        return False, None
    return True, results


def get_token():
    user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
    token = base64.b64encode(user_password.encode()).decode()
    return token


async def insert_into_tidb(schema, tables, value):
    sql = (
        f'''
         INSERT INTO {schema}.{tables} VALUES({value});
        '''
    )
    async with MysqlUtil() as conn:
        datas = await conn.fetchone(sql)
    return datas if datas else {}


def test_td_engine():
    """
    td_eignne insert into TiDB
    """
    from pot_libs.settings import SETTING
    import requests
    token = get_token()
    url = f"{SETTING.stb_url}db_water?tz=Asia/Shanghai"
    print(token)
    h = {"Authorization": f"Basic {token}"}
    sql = f"select * from db_water.water_stb limit 10 "
    r = requests.post(url, data=sql, headers=h)
    print(r.status_code)
    print(r.content)
    a_list, a_dict = [], {}
    for data in json.loads(r.content)['data']:
        index = 0
        for head in json.loads(r.content)['head']:
            a_dict.update({head: data[index]})
            index += 1
        a_list.append(a_dict)
    print(a_list)


# td 3.0
def td3_tbl_compate(td_tables):
    if len(td_tables) > 1:
        return tuple(td_tables)
wang.wenrong's avatar
wang.wenrong committed
72

wang.wenrong's avatar
wang.wenrong committed
73 74 75 76 77
    return f"('{td_tables[0]}')"


def parse_td_columns(rsp_data):
    head = []
wang.wenrong's avatar
wang.wenrong committed
78 79
    if not rsp_data.get("column_meta"):
        return []
wang.wenrong's avatar
wang.wenrong committed
80 81 82 83 84
    for col in rsp_data["column_meta"]:
        r = re.findall(r'last_row\((.*)\)', col[0])
        tbl_field = r[0] if r else col[0]
        head.append(tbl_field)
    return head
wang.wenrong's avatar
wang.wenrong committed
85 86


ZZH's avatar
ZZH committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100
def get_td_table_name(topic, tbl_id):
    db_name = SETTING.mysql_db
    if topic in S030_TOPIC:
        tbl_name = f"mt{tbl_id}_{TD_TBL_POSTFIX[topic]}"
        if db_name == "bromake":
            tbl_name += "_bromake"
        elif db_name == "electric_ops":
            tbl_name += "_electric_ops"
    elif topic in WG_TOPIC:
        tbl_name = f"{topic}{tbl_id}"
    else:
        return None

    return tbl_name