Commit 857bdfdd authored by wang.wenrong's avatar wang.wenrong

初次修改

parent 1beae7d6
...@@ -32,7 +32,7 @@ async def get_adio_current_data(mtid): ...@@ -32,7 +32,7 @@ async def get_adio_current_data(mtid):
''' '''
获取安全监测实时数据 获取安全监测实时数据
''' '''
url = f"{SETTING.stb_url}db_adio" url = f"{SETTING.stb_url}db_adio?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_adi" sql = f"select last_row(*) from mt{mtid}_adi"
is_success, results = await get_td_engine_data(url, sql) is_success, results = await get_td_engine_data(url, sql)
if not is_success: if not is_success:
......
...@@ -48,3 +48,20 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format): ...@@ -48,3 +48,20 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format):
async with MysqlUtil() as conn: async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,)) datas = await conn.fetchall(sql, args=(pid,))
return datas return datas
async def get_elec_mtid_sid_by_cid(cid):
sql = (
f"""
SELECT
mtid,
sid
FROM
monitor
WHERE
cid = {cid};
"""
)
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
return datas if datas else []
...@@ -229,7 +229,7 @@ async def qual_current_data(point_mid): ...@@ -229,7 +229,7 @@ async def qual_current_data(point_mid):
async def elec_current_data_new15(mtids, cid): async def elec_current_data_new15(mtids, cid):
res_map = {} res_map = {}
url = f"{SETTING.stb_url}db_electric" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f""" sql = f"""
select tbname,last_row(*) from electric_stb select tbname,last_row(*) from electric_stb
where cpyid={cid} where cpyid={cid}
......
...@@ -30,6 +30,7 @@ from pot_libs.common.components.query import PageRequest, Range, Equal, Filter ...@@ -30,6 +30,7 @@ from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from unify_api.modules.electric.components.electric import ( from unify_api.modules.electric.components.electric import (
ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp, ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
) )
from unify_api.utils.taos_new import parse_td_columns
async def elec_current_storeys_service(storeys): async def elec_current_storeys_service(storeys):
...@@ -884,12 +885,12 @@ async def elec_current_service_new15(point_id): ...@@ -884,12 +885,12 @@ async def elec_current_service_new15(point_id):
message="没有该监测点的monitor信息,请联系运维人员!") message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"] mtid = meter_info["mtid"]
# 获取子表中的实时数据 # 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}" sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql) is_succ, results = await get_td_engine_data(url, sql)
if not is_succ: if not is_succ:
return '',{} return '', {}
head = results["head"] head = parse_td_columns(results)
if not results["data"]: if not results["data"]:
results["data"] = ['' for i in range(len(head))] results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0])) res = dict(zip(head, results["data"][0]))
......
...@@ -878,7 +878,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse: ...@@ -878,7 +878,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
message="没有该监测点的monitor信息,请联系运维人员!") message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"] mtid = meter_info["mtid"]
# 获取子表中的实时数据 # 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}" sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql) is_succ, results = await get_td_engine_data(url, sql)
log.info(f"is_succ:{is_succ}") log.info(f"is_succ:{is_succ}")
......
...@@ -35,7 +35,7 @@ from unify_api.modules.home_page.dao.count_info_dao import ( ...@@ -35,7 +35,7 @@ from unify_api.modules.home_page.dao.count_info_dao import (
from unify_api.modules.electric_optimization.dao.power_index import ( from unify_api.modules.electric_optimization.dao.power_index import (
price_policy_by_cid price_policy_by_cid
) )
from unify_api.utils.taos_new import parse_td_columns
async def other_info(company_id): async def other_info(company_id):
""" """
...@@ -604,7 +604,7 @@ async def current_load_new15(cid): ...@@ -604,7 +604,7 @@ async def current_load_new15(cid):
from pot_libs.settings import SETTING from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \ from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data get_td_engine_data
url = f"{SETTING.stb_url}db_electric" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f""" sql = f"""
select last_row(*) from electric_stb select last_row(*) from electric_stb
where cpyid={cid} where cpyid={cid}
...@@ -613,8 +613,7 @@ async def current_load_new15(cid): ...@@ -613,8 +613,7 @@ async def current_load_new15(cid):
is_succ, results = await get_td_engine_data(url, sql) is_succ, results = await get_td_engine_data(url, sql)
now_tt = int(time.time()) now_tt = int(time.time())
if is_succ: if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i head = parse_td_columns(results)
for i in results["head"]]
datas = [] datas = []
for res in results["data"]: for res in results["data"]:
datas.append(dict(zip(head, res))) datas.append(dict(zip(head, res)))
...@@ -624,10 +623,8 @@ async def current_load_new15(cid): ...@@ -624,10 +623,8 @@ async def current_load_new15(cid):
if item: if item:
mdptime_tt = None mdptime_tt = None
if "mdptime" in item: if "mdptime" in item:
mdptime = datetime.strptime(item["mdptime"], mdptime_dt = pendulum.parse(item["mdptime"])
"%Y-%m-%d %H:%M:%S.%f") item_tt = item.get("timestamp") or mdptime_dt.int_timestamp
mdptime_tt = time.mktime(mdptime.timetuple())
item_tt = item.get("timestamp") or mdptime_tt
if item_tt: if item_tt:
# 小于2分钟内的数据相加为实时负荷 # 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60: if now_tt - item_tt <= 2 * 60:
......
...@@ -2,6 +2,10 @@ import json ...@@ -2,6 +2,10 @@ import json
import time import time
import re import re
import pendulum
from unify_api.modules.electric.dao.electric_dao import \
get_elec_mtid_sid_by_cid
from unify_api.modules.common.dao.common_dao import monitor_by_cid from unify_api.modules.common.dao.common_dao import monitor_by_cid
from unify_api.utils.common_utils import round_2 from unify_api.utils.common_utils import round_2
from pot_libs.logger import log from pot_libs.logger import log
...@@ -25,6 +29,8 @@ from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\ ...@@ -25,6 +29,8 @@ from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\
select_point_dao select_point_dao
from unify_api.modules.common.service.td_engine_service import \ from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data get_td_engine_data
from unify_api.utils.taos_new import parse_td_columns, td3_tbl_compate, \
get_td_table_name
async def health_ctl_rate_service(cid): async def health_ctl_rate_service(cid):
...@@ -188,23 +194,49 @@ async def health_ctl_rate_service_new15(cid): ...@@ -188,23 +194,49 @@ async def health_ctl_rate_service_new15(cid):
now_ts = int(time.time()) now_ts = int(time.time())
real_tt = now_ts real_tt = now_ts
total = 0 total = 0
url = f"{SETTING.stb_url}db_electric"
sql = f""" datas = await get_elec_mtid_sid_by_cid(cid)
select last_row(*) from electric_stb td_mt_tables = tuple(
where cpyid={cid} (get_td_table_name("electric", data["mtid"]) for data in datas if
group by tbname data["mtid"]))
"""
td_mt_tables = td3_tbl_compate(td_mt_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_mt_tables} group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
is_succ, results = await get_td_engine_data(url, sql) is_succ, results = await get_td_engine_data(url, sql)
time_str = time_format.get_datetime_str(real_tt) time_str = time_format.get_datetime_str(real_tt)
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i if not is_succ:
for i in results["head"]] log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
ubl=1,
)
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
(f"s{data['sid'].lower()}_e" for data in datas if data["sid"]))
td_s_tables = td3_tbl_compate(td_s_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_s_tables} group by tbname"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1,
freq_dev=1,
ubl=1,
)
head = parse_td_columns(results)
datas = [] datas = []
for res in results["data"]: for res in results["data"]:
datas.append(dict(zip(head, res))) datas.append(dict(zip(head, res)))
for data in datas: for data in datas:
real_tt = int(time.mktime(time.strptime(data["ts"], real_tt = pendulum.parse(data["ts"]).int_timestamp
"%Y-%m-%d %H:%M:%S.%f")))
if now_ts - real_tt > REAL_EXP_TIME: if now_ts - real_tt > REAL_EXP_TIME:
continue continue
total += 1 total += 1
...@@ -242,12 +274,6 @@ async def health_ctl_rate_service_new15(cid): ...@@ -242,12 +274,6 @@ async def health_ctl_rate_service_new15(cid):
grade = get_dev_grade(dev_type="lf", cur=lf) grade = get_dev_grade(dev_type="lf", cur=lf)
if grade and grade >= 60: if grade and grade >= 60:
stats["lf"] += 1 stats["lf"] += 1
else:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
ubl=1,
)
if total == 0: if total == 0:
return HealthCtlRateRes( return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1, real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
...@@ -262,5 +288,3 @@ async def health_ctl_rate_service_new15(cid): ...@@ -262,5 +288,3 @@ async def health_ctl_rate_service_new15(cid):
freq_dev=round_2(stats["freq_dev"] / total), freq_dev=round_2(stats["freq_dev"] / total),
ubl=round_2(stats["ubl"] / total), ubl=round_2(stats["ubl"] / total),
) )
...@@ -202,7 +202,7 @@ async def get_p_list_new15(mtid, meter_sn, start, end, time_slots): ...@@ -202,7 +202,7 @@ async def get_p_list_new15(mtid, meter_sn, start, end, time_slots):
''' '''
sn = meter_sn.lower() sn = meter_sn.lower()
p_field = f"p{sn}" p_field = f"p{sn}"
url = f"{SETTING.stb_url}db_electric" url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select max({p_field}) {p_field} from mt{mtid}_ele " \ sql = f"select max({p_field}) {p_field} from mt{mtid}_ele " \
f"where meter_sn='{str.upper(sn)}' " \ f"where meter_sn='{str.upper(sn)}' " \
f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)" f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
......
import json
import re
from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
# from bromake.modules.shidianu.service.open_data_service import get_token
from pot_libs.settings import SETTING
import base64
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_td_engine_data(url, sql):
token = get_token()
log.info(f"token:{token},sql:{sql}")
resp_str, status = await AioHttpUtils().post_data(
url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
)
log.info(f"resp_str:{resp_str},status:{status}")
if status != 200:
return False, None
results = json.loads(resp_str)
return True, results
def get_token():
user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
token = base64.b64encode(user_password.encode()).decode()
return token
async def insert_into_tidb(schema, tables, value):
sql = (
f'''
INSERT INTO {schema}.{tables} VALUES({value});
'''
)
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql)
return datas if datas else {}
def test_td_engine():
"""
td_eignne insert into TiDB
"""
from pot_libs.settings import SETTING
import requests
token = get_token()
url = f"{SETTING.stb_url}db_water?tz=Asia/Shanghai"
print(token)
h = {"Authorization": f"Basic {token}"}
sql = f"select * from db_water.water_stb limit 10 "
r = requests.post(url, data=sql, headers=h)
print(r.status_code)
print(r.content)
a_list, a_dict = [], {}
for data in json.loads(r.content)['data']:
index = 0
for head in json.loads(r.content)['head']:
a_dict.update({head: data[index]})
index += 1
a_list.append(a_dict)
print(a_list)
async def elec_current_data_new16(mtids):
res_map = {}
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
table_names = [get_td_table_name("electric", mtid) for mtid in mtids]
if len(table_names) > 1:
for table_name in table_names:
sql = f"select last_row(*) from {table_name} group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
res_map.update(res_map)
return res_map, {}
else:
sql = f"select last_row(*) from " + "".join(
table_names) + "group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
return res_map, {}
def get_td_table_name(topic, id):
"""
:param topic: 需要查询的主题
:param id: 表命名使用的id
:return:
"""
topic_map = {
"water": "water_bromake%s",
"electric": "mt%s_ele_bromake",
"pv_ele": "pv_ele%s",
"pv_sts": "pv_sts%s",
"ws": "ws%s"
}
table_name = topic_map.get(topic)
return table_name % id
# td 3.0
def td3_tbl_compate(td_tables):
if len(td_tables) > 1:
return tuple(td_tables)
return f"('{td_tables[0]}')"
def parse_td_columns(rsp_data):
head = []
for col in rsp_data["column_meta"]:
r = re.findall(r'last_row\((.*)\)', col[0])
tbl_field = r[0] if r else col[0]
head.append(tbl_field)
return head
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment