import json
from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
from unify_api.modules.shidianu.service.open_data_service import get_token
async def get_td_engine_data(url, sql):
token = get_token()
log.info(f"token:{token},sql:{sql}")
resp_str, status = await AioHttpUtils().post_data(
url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
)
log.info(f"resp_str:{resp_str},status:{status}")
if status != 200:
return False, None
results = json.loads(resp_str)
return True, results
def test_td_engine():
"""
from unify_api.modules.common.service.td_engine_service import test_td_engine
test_td_engine()
"""
from pot_libs.settings import SETTING
import requests
token = get_token()
url = f"{SETTING.stb_url}db_electric"
print(token)
h = {"Authorization": f"Basic {token}"}
sql = f"select last_row(*) from mt1332_ele where pid=1395"
r = requests.post(url, data=sql, headers=h)
print(r.status_code)
print(r.content)
return
-
lcn authored4bb601d6