import json from pot_libs.logger import log from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils from unify_api.modules.shidianu.service.open_data_service import get_token async def get_td_engine_data(url, sql): token = get_token() log.info(f"token:{token},sql:{sql}") resp_str, status = await AioHttpUtils().post_data( url, data=sql, timeout=50, headers={"Authorization": f"Basic {token}"} ) # log.info(f"resp_str:{resp_str},status:{status}") if status != 200: return False, None results = json.loads(resp_str) return True, results def test_td_engine(): """ from unify_api.modules.common.service.td_engine_service import test_td_engine test_td_engine() """ from pot_libs.settings import SETTING import requests token = get_token() url = f"{SETTING.stb_url}db_electric" print(token) h = {"Authorization": f"Basic {token}"} sql = f"select last_row(*) from mt1332_ele where pid=1395" r = requests.post(url, data=sql, headers=h) print(r.status_code) print(r.content) return