taos_new.py 4.93 KB
Newer Older
wang.wenrong's avatar
wang.wenrong committed
1 2 3 4 5 6 7 8 9 10 11 12
import json
import re

from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
# from bromake.modules.shidianu.service.open_data_service import get_token
from pot_libs.settings import SETTING
import base64
from pot_libs.mysql_util.mysql_util import MysqlUtil


async def get_td_engine_data(url, sql):
wang.wenrong's avatar
wang.wenrong committed
13 14
    if "?" not in url:
        url += "?tz=Asia/Shanghai"
wang.wenrong's avatar
wang.wenrong committed
15 16 17 18 19 20
    token = get_token()
    log.info(f"token:{token},sql:{sql}")
    resp_str, status = await AioHttpUtils().post_data(
        url, data=sql, timeout=50,
        headers={"Authorization": f"Basic {token}"}
    )
wang.wenrong's avatar
wang.wenrong committed
21
    results = json.loads(resp_str)
wang.wenrong's avatar
wang.wenrong committed
22
    log.info(f"resp_str:{resp_str},status:{status}")
wang.wenrong's avatar
wang.wenrong committed
23
    if results["code"] != 0:
wang.wenrong's avatar
wang.wenrong committed
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
        return False, None
    return True, results


def get_token():
    user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
    token = base64.b64encode(user_password.encode()).decode()
    return token


async def insert_into_tidb(schema, tables, value):
    sql = (
        f'''
         INSERT INTO {schema}.{tables} VALUES({value});
        '''
    )
    async with MysqlUtil() as conn:
        datas = await conn.fetchone(sql)
    return datas if datas else {}


def test_td_engine():
    """
    td_eignne insert into TiDB
    """
    from pot_libs.settings import SETTING
    import requests
    token = get_token()
    url = f"{SETTING.stb_url}db_water?tz=Asia/Shanghai"
    print(token)
    h = {"Authorization": f"Basic {token}"}
    sql = f"select * from db_water.water_stb limit 10 "
    r = requests.post(url, data=sql, headers=h)
    print(r.status_code)
    print(r.content)
    a_list, a_dict = [], {}
    for data in json.loads(r.content)['data']:
        index = 0
        for head in json.loads(r.content)['head']:
            a_dict.update({head: data[index]})
            index += 1
        a_list.append(a_dict)
    print(a_list)


async def elec_current_data_new16(mtids):
    res_map = {}
    url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
    table_names = [get_td_table_name("electric", mtid) for mtid in mtids]
    if len(table_names) > 1:
        for table_name in table_names:
            sql = f"select last_row(*) from {table_name} group by tbname "
            is_succ, results = await get_td_engine_data(url, sql)
            if is_succ:
                head = parse_td_columns(results)
                for res in results["data"]:
                    data = dict(zip(head, res))
                    res_map[data["mtid"]] = data
            res_map.update(res_map)
            return res_map, {}
    else:
        sql = f"select last_row(*) from " + "".join(
            table_names) + "group by tbname "
        is_succ, results = await get_td_engine_data(url, sql)
        if is_succ:
            head = parse_td_columns(results)
            for res in results["data"]:
                data = dict(zip(head, res))
                res_map[data["mtid"]] = data
        return res_map, {}


def get_td_table_name(topic, id):
    """
    :param topic: 需要查询的主题
    :param id: 表命名使用的id
    :return:
    """
    topic_map = {
lcn's avatar
lcn committed
103
        "electric": "mt%s_ele",
wang.wenrong's avatar
wang.wenrong committed
104 105 106 107 108 109 110 111 112 113 114 115
        "pv_ele": "pv_ele%s",
        "pv_sts": "pv_sts%s",
        "ws": "ws%s"
    }
    table_name = topic_map.get(topic)
    return table_name % id


# td 3.0
def td3_tbl_compate(td_tables):
    if len(td_tables) > 1:
        return tuple(td_tables)
wang.wenrong's avatar
wang.wenrong committed
116

wang.wenrong's avatar
wang.wenrong committed
117 118 119 120 121
    return f"('{td_tables[0]}')"


def parse_td_columns(rsp_data):
    head = []
wang.wenrong's avatar
wang.wenrong committed
122 123
    if not rsp_data.get("column_meta"):
        return []
wang.wenrong's avatar
wang.wenrong committed
124 125 126 127 128
    for col in rsp_data["column_meta"]:
        r = re.findall(r'last_row\((.*)\)', col[0])
        tbl_field = r[0] if r else col[0]
        head.append(tbl_field)
    return head
wang.wenrong's avatar
wang.wenrong committed
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154


def get_td_table_name(topic, id):
    """
    :param topic: 需要查询的主题
    :param id: 表命名使用的id
    :return:
    """
    topic_map = {
        "water": "water_ele_electric_ops%s",
        "electric": "mt%s_ele_electric_ops",
        "pv_ele": "pv_ele%s",
        "pv_sts": "pv_sts%s",
        "ws": "ws%s",
        "adi": "mt%s_adi_electric_ops",
        "pt_temp": "pt_temp%s_electric_ops",  # 电容
        "ent_guard_sts_stb": "ent_guard_sts%s",  # 门控
        "ent_guard_soe_stb": "ent_guard_soe%s",  # 门控报警
        "indoor_temp_stb": "indoor_temp%s",  # 室内温度
        "indoor_temp_soe_stb": "indoor_temp_soe%s",  # 室内温度报警
        "indoor_hum_stb": "indoor_hum%s",  # 室内湿度
        "indoor_hum_soe_stb": "indoor_hum_soe%s",  # 室内湿度报警
        "water_fld_sts_stb": "water_fld_sts%s",  # 水浸
        "water_fld_soe_stb": "water_fld_soe%s",  # 水浸报警
        "smoke_sts_stb": "smoke_sts_stb%s",  # 烟感
        "smoke_soe_stb": "smoke_soe_stb%s",  # 烟感报警
wang.wenrong's avatar
wang.wenrong committed
155
        "old_adio_stb": "s_%s_a",  # 安电--旧 这里是%s是sid
wang.wenrong's avatar
wang.wenrong committed
156
        "new_adio_stb": "mt%s_adi",  # 安电——新
wang.wenrong's avatar
wang.wenrong committed
157 158
        "old_electric_stb": "s_%s_a",
        "new_electric_stb": "mt%s_ele",
wang.wenrong's avatar
wang.wenrong committed
159 160 161 162

    }
    table_name = topic_map.get(topic)
    return table_name % id