Commit 262b5203 authored by wang.wenrong's avatar wang.wenrong

Merge branch 'wwr' into 'develop'

Wwr

See merge request !2
parents d3b3699e 857bdfdd
Subproject commit 05cfad9387871294a0215dabeccad37adbc487e3
Subproject commit a878a1f82788f11deea04f8e64f668a83ed3a449
......@@ -32,7 +32,7 @@ async def get_adio_current_data(mtid):
'''
获取安全监测实时数据
'''
url = f"{SETTING.stb_url}db_adio"
url = f"{SETTING.stb_url}db_adio?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_adi"
is_success, results = await get_td_engine_data(url, sql)
if not is_success:
......
......@@ -48,3 +48,20 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format):
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,))
return datas
async def get_elec_mtid_sid_by_cid(cid):
sql = (
f"""
SELECT
mtid,
sid
FROM
monitor
WHERE
cid = {cid};
"""
)
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
return datas if datas else []
......@@ -229,7 +229,7 @@ async def qual_current_data(point_mid):
async def elec_current_data_new15(mtids, cid):
res_map = {}
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select tbname,last_row(*) from electric_stb
where cpyid={cid}
......
......@@ -30,6 +30,7 @@ from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from unify_api.modules.electric.components.electric import (
ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
)
from unify_api.utils.taos_new import parse_td_columns
async def elec_current_storeys_service(storeys):
......@@ -884,12 +885,12 @@ async def elec_current_service_new15(point_id):
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
return '',{}
head = results["head"]
return '', {}
head = parse_td_columns(results)
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
......
......@@ -878,7 +878,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql)
log.info(f"is_succ:{is_succ}")
......
......@@ -35,7 +35,7 @@ from unify_api.modules.home_page.dao.count_info_dao import (
from unify_api.modules.electric_optimization.dao.power_index import (
price_policy_by_cid
)
from unify_api.utils.taos_new import parse_td_columns
async def other_info(company_id):
"""
......@@ -604,7 +604,7 @@ async def current_load_new15(cid):
from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select last_row(*) from electric_stb
where cpyid={cid}
......@@ -613,8 +613,7 @@ async def current_load_new15(cid):
is_succ, results = await get_td_engine_data(url, sql)
now_tt = int(time.time())
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i
for i in results["head"]]
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
......@@ -624,10 +623,8 @@ async def current_load_new15(cid):
if item:
mdptime_tt = None
if "mdptime" in item:
mdptime = datetime.strptime(item["mdptime"],
"%Y-%m-%d %H:%M:%S.%f")
mdptime_tt = time.mktime(mdptime.timetuple())
item_tt = item.get("timestamp") or mdptime_tt
mdptime_dt = pendulum.parse(item["mdptime"])
item_tt = item.get("timestamp") or mdptime_dt.int_timestamp
if item_tt:
# 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60:
......
......@@ -2,6 +2,10 @@ import json
import time
import re
import pendulum
from unify_api.modules.electric.dao.electric_dao import \
get_elec_mtid_sid_by_cid
from unify_api.modules.common.dao.common_dao import monitor_by_cid
from unify_api.utils.common_utils import round_2
from pot_libs.logger import log
......@@ -25,6 +29,8 @@ from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\
select_point_dao
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.utils.taos_new import parse_td_columns, td3_tbl_compate, \
get_td_table_name
async def health_ctl_rate_service(cid):
......@@ -188,23 +194,49 @@ async def health_ctl_rate_service_new15(cid):
now_ts = int(time.time())
real_tt = now_ts
total = 0
url = f"{SETTING.stb_url}db_electric"
sql = f"""
select last_row(*) from electric_stb
where cpyid={cid}
group by tbname
"""
datas = await get_elec_mtid_sid_by_cid(cid)
td_mt_tables = tuple(
(get_td_table_name("electric", data["mtid"]) for data in datas if
data["mtid"]))
td_mt_tables = td3_tbl_compate(td_mt_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_mt_tables} group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
is_succ, results = await get_td_engine_data(url, sql)
time_str = time_format.get_datetime_str(real_tt)
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i
for i in results["head"]]
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
ubl=1,
)
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
(f"s{data['sid'].lower()}_e" for data in datas if data["sid"]))
td_s_tables = td3_tbl_compate(td_s_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_s_tables} group by tbname"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1,
freq_dev=1,
ubl=1,
)
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
for data in datas:
real_tt = int(time.mktime(time.strptime(data["ts"],
"%Y-%m-%d %H:%M:%S.%f")))
real_tt = pendulum.parse(data["ts"]).int_timestamp
if now_ts - real_tt > REAL_EXP_TIME:
continue
total += 1
......@@ -242,12 +274,6 @@ async def health_ctl_rate_service_new15(cid):
grade = get_dev_grade(dev_type="lf", cur=lf)
if grade and grade >= 60:
stats["lf"] += 1
else:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
ubl=1,
)
if total == 0:
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
......@@ -262,5 +288,3 @@ async def health_ctl_rate_service_new15(cid):
freq_dev=round_2(stats["freq_dev"] / total),
ubl=round_2(stats["ubl"] / total),
)
......@@ -202,7 +202,7 @@ async def get_p_list_new15(mtid, meter_sn, start, end, time_slots):
'''
sn = meter_sn.lower()
p_field = f"p{sn}"
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select max({p_field}) {p_field} from mt{mtid}_ele " \
f"where meter_sn='{str.upper(sn)}' " \
f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
......
logger:
base_path: /home/ubuntu/data/code/log
qingstor:
access_key_id: OTWZLLMOLNPQRLNWQEDL
access_key_secret: m2mdWqBsE1csLhV5RkGQcOME36P2NHWs5mBrgGlL
bucket: poweriot-develop-env
region: pek3B
sanic:
config:
REQUEST_TIMEOUT: 60
RESPONSE_TIMEOUT: 60
swagger:
doc:
# for swagger
API_VERSION:
API_TITLE:
API_DESCRIPTION:
API_TERMS_OF_SERVICE:
# API_LICENSE_NAME:
# API_LICENSE_URL
ui_config:
logger:
kafka_servers: 139.159.158.241:9092,121.37.19.225:9092,124.71.223.21:9092
servers:
web:
host: 0.0.0.0
port: 8095
admin:
host:
port:
base:
host:
port:
mysql:
db: power_iot
user: poweriot
password: power_iot123
port: 4000
host: 139.159.234.186
mysql_zhiwei_u:
db: devops
mqtt:
host: 172.18.1.3
port: 1883
es:
# 故意写错 为了捕获错误
host: 172.18.1.2491:9200
redis_single:
host: redis://172.18.1.253:6379
redis:
nodes:
- host: 172.18.2.2
port: 7001
- host: 172.18.2.2
port: 7002
- host: 172.18.2.3
port: 7001
- host: 172.18.2.3
port: 7002
- host: 172.18.2.4
port: 7001
- host: 172.18.2.4
port: 7002
wechat_open:
app_id: wxf23bec7698a6e4d4
secret_key: 3169cd62466d14c8ce3f7b1d4059f0e2
#联动控制,开关监测,传数给业务后台
linkage_control_url: http://172.18.1.9:15000/switch_control
# 事件设置,传数给装置
event_post_url: http://172.18.1.9:15000/config_soe
# sso服务
auth_url: http://124.71.43.237:9000/unify-api/auth
verify_url: http://124.71.43.237:9000/unify-api/verify
logout_url: http://124.71.43.237:9000/unify-api/logout
refresh_token_url: http://124.71.43.237:9000/unify-api/refresh_token
# 图像下载域名
download_img_url: https://www.ucyber.cn
# 产品域名
zhidianu_url: /zhidianu/
andianu_url: /andianu/
m_andianu_url: /m-andianu/
shidianu_url: /shidianu/
m_zhidianu_url: /m-zhidianu/
zdu_url: /intelligentu/
shidianu_new_url: /new-shidianu/
dust_url: /yangchen/
alarm_cid:
88: # cid
- 88 # user_id
app_name: unify_api
machine_num: 1
tz: Asia/Shanghai
debug_mode: 1
accesskey_id: LTAI4GA7ZZwKNC6qHX9BgDHy
accesskey_secret: zFy9L299km45fXSQGAV5jR03jVJOoh
wechat_open2:
app_id: wx7e1181a0feb75b00
secret_key: c4215dae034efc5f658c39c7cd32f0ceffff
# 微信注册, 开放平台设置, 测试和生产不同
wechat_mp:
app_id: wxde1b52ac47fc1dff
secret_key: d036cc96f25b873be01a3d7af75a2664
token: xiongguobao11
mqtt_user_name: richard_handsome
device_status:
host: 172.18.1.248
port: 8081
app_id: pot_sanic
app_secret: MzAwMTE2NTk0MDI3OTg4NzY5OTU5NTExMjk1NDIzMjgzMjA
# 知电u补充数据
td_engine:
taosd_host: dev-business.localdomain
taosd_port: 6030
taosd_user: pot
taosd_passwd: power_iot123
stb_url: http://124.71.97.95:6041/rest/sql/
# 华侨新村、新塘坑、横岗 cids
new_datas_cids: [164, 83, 112]
import json
import re
from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
# from bromake.modules.shidianu.service.open_data_service import get_token
from pot_libs.settings import SETTING
import base64
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_td_engine_data(url, sql):
token = get_token()
log.info(f"token:{token},sql:{sql}")
resp_str, status = await AioHttpUtils().post_data(
url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
)
log.info(f"resp_str:{resp_str},status:{status}")
if status != 200:
return False, None
results = json.loads(resp_str)
return True, results
def get_token():
user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
token = base64.b64encode(user_password.encode()).decode()
return token
async def insert_into_tidb(schema, tables, value):
sql = (
f'''
INSERT INTO {schema}.{tables} VALUES({value});
'''
)
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql)
return datas if datas else {}
def test_td_engine():
"""
td_eignne insert into TiDB
"""
from pot_libs.settings import SETTING
import requests
token = get_token()
url = f"{SETTING.stb_url}db_water?tz=Asia/Shanghai"
print(token)
h = {"Authorization": f"Basic {token}"}
sql = f"select * from db_water.water_stb limit 10 "
r = requests.post(url, data=sql, headers=h)
print(r.status_code)
print(r.content)
a_list, a_dict = [], {}
for data in json.loads(r.content)['data']:
index = 0
for head in json.loads(r.content)['head']:
a_dict.update({head: data[index]})
index += 1
a_list.append(a_dict)
print(a_list)
async def elec_current_data_new16(mtids):
res_map = {}
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
table_names = [get_td_table_name("electric", mtid) for mtid in mtids]
if len(table_names) > 1:
for table_name in table_names:
sql = f"select last_row(*) from {table_name} group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
res_map.update(res_map)
return res_map, {}
else:
sql = f"select last_row(*) from " + "".join(
table_names) + "group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
return res_map, {}
def get_td_table_name(topic, id):
"""
:param topic: 需要查询的主题
:param id: 表命名使用的id
:return:
"""
topic_map = {
"water": "water_bromake%s",
"electric": "mt%s_ele_bromake",
"pv_ele": "pv_ele%s",
"pv_sts": "pv_sts%s",
"ws": "ws%s"
}
table_name = topic_map.get(topic)
return table_name % id
# td 3.0
def td3_tbl_compate(td_tables):
if len(td_tables) > 1:
return tuple(td_tables)
return f"('{td_tables[0]}')"
def parse_td_columns(rsp_data):
head = []
for col in rsp_data["column_meta"]:
r = re.findall(r'last_row\((.*)\)', col[0])
tbl_field = r[0] if r else col[0]
head.append(tbl_field)
return head
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment