Commit 4b40411d authored by wang.wenrong's avatar wang.wenrong

Merge branch 'develop' into 'master'

Develop

See merge request !6
parents 93f53d31 884ef9e5
Subproject commit 05cfad9387871294a0215dabeccad37adbc487e3
Subproject commit a878a1f82788f11deea04f8e64f668a83ed3a449
import sys
sys.path.append(f'/home/ubuntu/data/code/unify_api_1.5/pot_libs')
sys.path.append(f'/home/ubuntu/data/code/unify_api_1.5')
\ No newline at end of file
sys.path.append(f'/home/ubuntu/data/code/unify_api2/pot_libs')
sys.path.append(f'/home/ubuntu/data/code/unify_api2')
\ No newline at end of file
......@@ -438,3 +438,11 @@ DOWNLOAD_ORDER = "filedata/zhiweiu/order"
SMS_LOGIN_TEMPLATE = "SMS_222195119"
SMS_SIGN_NAME = "清科优能"
TDENGINE_CHILD_SUBFIX_LIST = {
"db_appliance": "app",
"db_soe": "soe",
"db_scope": "scp",
"db_adio": "adi",
"db_electric": "ele",
}
\ No newline at end of file
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
async def get_location_dao(lids):
location_info = {}
sql = "SELECT lid, item, ad_type FROM location WHERE lid IN %s"
sql = "SELECT lid, item, mtid, ad_type FROM location WHERE lid IN %s"
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(lids,))
if result:
......@@ -11,7 +14,8 @@ async def get_location_dao(lids):
id = res.get("lid")
item = res.get("item")
type = res.get("ad_type")
location_info[id] = {"item": item, "type": type}
mtid = res.get("mtid")
location_info[id] = {"item": item, "type": type, "mtid": mtid}
return location_info
......@@ -22,3 +26,19 @@ async def get_location_15min_dao(lid, start, end):
async with MysqlUtil() as conn:
result = await conn.fetchall(sql, args=(lid,))
return result
async def get_adio_current_data(mtid):
'''
获取安全监测实时数据
'''
url = f"{SETTING.stb_url}db_adio?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_adi"
is_success, results = await get_td_engine_data(url, sql)
if not is_success:
return {}
if not results['data']:
return {}
head = results['head']
res = dict(zip(head, results['data'][0]))
return res
......@@ -27,7 +27,7 @@ from unify_api.modules.adio.components.adio import (
)
from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from unify_api.modules.adio.dao.adio_dao import get_location_dao, \
get_location_15min_dao
get_location_15min_dao, get_adio_current_data
@summary("返回安全监测历史曲线")
......@@ -120,6 +120,62 @@ async def post_adio_current(req, body: PageRequest) -> AdioCurrentResponse:
except:
log.warning("para exception, in_groups is NULL, no location_id")
return AdioCurrentResponse(temperature=[], residual_current=[])
# location_ids
location_group = in_group.group
if not location_group:
log.warning("para exception, in_groups is NULL, no location_id")
return AdioCurrentResponse(temperature=[], residual_current=[])
# 读取location表信息
location_info = await get_location_dao(location_group)
if not location_info:
log.warning("location_id error location_info empty")
return AdioCurrentResponse(temperature=[], residual_current=[])
# 获取mtid信息
mtid = list(location_info.values())[0]['mtid']
# 读取tdengine里面的数据
aido_data = await get_adio_current_data(mtid)
if not aido_data:
return AdioCurrentResponse(temperature=[], residual_current=[])
temperature = []
residual_current = []
trans_field = {"A相": "temp1", "B相": "temp2", "C相": "temp3",
"N线": "temp4"}
for location_id, item_info in location_info.items():
time_str = aido_data.get('ts')[:19]
item = item_info.get("item", "")
if item_info.get("type") == "residual_current":
adio_current = AdioCurrent(
type="residual_current",
item="漏电流",
real_time=time_str,
value=aido_data.get("residual_current")
)
residual_current.append(adio_current)
else:
type_filed = trans_field.get(item)
adio_current = AdioCurrent(
type="temperature",
item=item,
real_time=time_str,
value=aido_data.get(type_filed),
)
temperature.append(adio_current)
return AdioCurrentResponse(temperature=temperature,
residual_current=residual_current)
@summary("返回安全监测实时参数(老的)")
@description("包含温度和漏电流(老的)")
@examples(adio_current_example)
async def post_adio_current_bak(req, body: PageRequest) -> AdioCurrentResponse:
try:
in_group = body.filter.in_groups[0]
except:
log.warning("para exception, in_groups is NULL, no location_id")
return AdioCurrentResponse(temperature=[], residual_current=[])
# location_ids
location_group = in_group.group
......
......@@ -35,6 +35,7 @@ async def get_list_alarm_dao(mid_sql, page_size, page_num):
sql = f"SELECT point_1min_event.*, company.fullname " \
f"FROM `point_1min_event` LEFT JOIN company " \
f"on company.cid=point_1min_event.cid where {mid_sql} " \
f"order by event_datetime desc " \
f"limit {start}, {page_size}"
async with MysqlUtil() as conn:
data = await conn.fetchall(sql)
......
......@@ -459,6 +459,25 @@ async def sdu_alarm_aggs_date_importance(cid):
async def sdu_alarm_aggs_type(cid, start, end):
sql = f"""
SELECT
COUNT(*) doc_count,
event_type
FROM
point_1min_event pevent
WHERE
cid = {cid}
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
pevent.event_type;
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
return datas if datas else []
async def sdu_alarm_aggs_type_old(cid, start, end):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
......@@ -505,44 +524,19 @@ async def sdu_alarm_aggs_type(cid, start, end):
async def alarm_aggs_importance(cid, start, end):
"""按报警等级聚合"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
query_body = {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"range": {
"datetime": {
"gte": es_start_str,
"lte": es_end_str
}
}
}
]
}
},
"aggs": {
"importance": {
"terms": {
"field": "importance",
"size": 10
}
}
}
}
log.info("query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
return es_result["aggregations"]["importance"]["buckets"]
sql = f"""
SELECT
COUNT(*) alarm_count,
importance
FROM
point_1min_event pevent
WHERE
cid = {54}
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
pevent.importance
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
return datas if datas else []
......@@ -20,7 +20,7 @@ from unify_api.modules.common.procedures.points import points_by_storeys
from unify_api.modules.home_page.components.security_info_cps import \
SecurityCountResp, LevelCount, ContentCount, AlarmContentDistributionResp
from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info, alarm_content_time_distribution
alarm_content_time_distribution, alarm_count_info_new15
from unify_api.utils.common_utils import round_1, division_two
......@@ -418,7 +418,7 @@ async def sdu_index_alarm_ranking_service_new15(cid, start, end, product):
async def zdu_level_distribution_service(cid, start, end, product):
"""报警统计-报警等级-智电u"""
alarm_info_map = await alarm_count_info([cid], start, end, "month")
alarm_info_map = await alarm_count_info_new15([cid], start, end, "month")
first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"],
......
......@@ -260,14 +260,14 @@ async def list_alarm_service_new15(cid, point_id, start, end, importance,
if len(importance) == 1:
li.append(f"importance={importance[0]}")
else:
li.append(f"importance in {tuple(importance)}")
li.append(f"importance in {str(tuple(importance)).strip(',')}")
else:
li.append("importance in (1, 2, 3)")
if alarm_type:
if len(alarm_type) == 1:
li.append(f"event_type='{alarm_type[0]}'")
else:
li.append(f"event_type in {tuple(alarm_type)}")
li.append(f"event_type in {str(tuple(alarm_type)).strip(',')}")
mid_sql = " and ".join(li)
total = await get_total_list_alarm_dao(mid_sql)
mid_sql2 = " and ".join(["point_1min_event."+i for i in li])
......
......@@ -32,15 +32,29 @@ from unify_api.modules.alarm_manager.components.list_alarm import (
@summary("返回报警信息列表")
@description("筛选字段:监测点")
# @examples(list_alarm_example)
async def post_list_alarm(req, body: ListAlarmReq) -> ListAlarmResponse:
cid = body.cid
point_id = body.point_id
start = body.start
end = body.end
importance = body.importance
async def post_list_alarm(req, body: PageRequest) -> ListAlarmResponse:
cid = req.json.get("cid")
page_size = body.page_size
page_num = body.page_num
alarm_type = body.alarm_type
start, end = '', ''
point_id = None
alarm_type = []
importance = []
if body.filter.ranges:
_range = body.filter.ranges[0]
_, start, end = _range.field, _range.start, _range.end
if body.filter.equals:
for equal in body.filter.equals:
if equal.field == 'cid':
cid = equal.value
elif equal.field == 'point_id':
point_id = equal.value
if body.filter.in_groups:
for in_group in body.filter.in_groups:
if in_group.field == 'type':
alarm_type = in_group.group
elif in_group.field == 'importance':
importance = in_group.group
return await list_alarm_service_new15(cid, point_id, start, end,
importance, page_size, page_num,
alarm_type)
......
......@@ -103,7 +103,7 @@ async def carbon_emission_index_service(cid, start, end):
# 3. 碳排指数
# 3.1 能耗偏差
if energy_standard:
deviation = (energy_standard - conversion_year) / energy_standard
deviation = (energy_standard - conversion_year) / energy_standard if conversion_year else 1
# 3.2 碳排指数
c_index = carbon_index(deviation)
# 4. 达标情况
......
......@@ -36,7 +36,7 @@ async def storey_by_cid(cid):
async def query_points_by_storey(storeys):
"""根据storey_id查询point_id和room_name"""
sql = "SELECT s.storey_id,s.storey_name,s.point_id,s.room_name,m.mtid," \
"p.ctnum from storey_room_map s LEFT JOIN point p " \
"p.ctnum,p.cid from storey_room_map s LEFT JOIN point p " \
"on p.pid=s.point_id LEFT JOIN monitor m on m.mtid=p.mtid " \
"where s.storey_id in %s and m.demolished=0 " \
"order by s.storey_id, s.room_name"
......@@ -71,9 +71,10 @@ async def mid_by_pid(pid):
async def get_point_monitor_dao(pid):
sql = "SELECT m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc,p.imax " \
sql = "SELECT m.meter_no,m.mtid,m.sid," \
"p.ctr,p.ptr,p.ctnum,p.vc,p.tc,p.imax" \
"FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \
"where p.pid=%s limit 1;"
"where p.pid=%s and m.demolished = 0"
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql, args=(pid,))
return datas
......
......@@ -3,8 +3,8 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
async def inline_zdu_by_cid(cid):
"""inline_zdu"""
sql = "SELECT * from inline_zdu iz inner join monitor m2 " \
"on iz.mtid = m2.mtid where iz.cid_belongedto = %s " \
sql = "SELECT * from inline iz inner join monitor m2 " \
"on iz.mtid = m2.mtid where iz.cid = %s " \
"order by iz.sort_num"
async with MysqlUtil() as conn:
inline_zdu_list = await conn.fetchall(sql, args=(cid,))
......
......@@ -164,6 +164,35 @@ async def proxy_safe_run_info(cids, start_time_str=None, end_time_str=None):
async def alarm_time_distribution(company_ids, start, end):
sql = f"""
SELECT
HOUR (pevent.event_datetime) event_hour,
COUNT(*) event_count
FROM
point_1min_event pevent
WHERE
cid IN %s
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
HOUR (pevent.event_datetime)
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(company_ids, ))
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0, "morning_alarm_cnt": 0}
for data in datas:
hour = int(data["event_hour"])
if hour >= 6 and hour < 18:
time_distribution_map["day_alarm_cnt"] += data["event_count"]
elif hour >= 18 and hour <= 23:
time_distribution_map["night_alarm_cnt"] += data["event_count"]
else:
time_distribution_map["morning_alarm_cnt"] += data["event_count"]
return time_distribution_map
async def alarm_time_distribution_old(company_ids, start, end):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month, day=start_dt.day).strftime(
......
......@@ -156,6 +156,8 @@ async def location_stats_statics_new15(table_name, cid, start, end):
location_map = {}
async with MysqlUtil() as conn:
locations = await conn.fetchall(sql, args=(cid, ))
if not locations:
return location_map
for loca in locations:
location_map[loca["lid"]] = loca
datas_sql = f"SELECT * from {table_name} where lid in %s and create_time" \
......@@ -186,7 +188,10 @@ async def location_stats_statics_new15(table_name, cid, start, end):
else:
min_value, min_value_time = "", ""
mean_value = df.loc[df["lid"] == lid].value_avg.mean()
mean_value = round(mean_value, 2) if mean_value else ""
if not pd.isna(mean_value):
mean_value = round(mean_value, 2) if mean_value else ""
else:
mean_value = ''
location_map[lid].update({
"max_value": max_value, "max_value_time": max_value_time,
"min_value": min_value, "min_value_time": min_value_time,
......
......@@ -42,7 +42,12 @@ async def get_test(request):
def is_need_verify(path):
# return False
if SETTING.debug_mode == 1:
return False
if "swagger" in path:
return False
log.info("is_or_not_need_verify path:%s", path)
if "swagger" in path:
return False
......@@ -98,6 +103,8 @@ def is_need_verify(path):
async def auth_verify(request):
if SETTING.debug_mode == 1:
return False
path = request.path
if is_need_verify(path):
token = request.token
......@@ -119,11 +126,12 @@ async def auth_verify(request):
# 直接将请求转发到auth
resp_str, status = await AioHttpUtils().post(
SETTING.verify_url,
data={},
data={"db": SETTING.mysql_db},
timeout=50,
headers={"Authorization": f"Bearer {token}"}
)
log.info(f"request auth_url resp_str={resp_str} {type(resp_str)} status={status}")
log.info(
f"request auth_url resp_str={resp_str} {type(resp_str)} status={status}")
if status == 200:
payload = ujson.loads(resp_str)
setattr(request.ctx, "user_id", payload["user_id"])
......@@ -135,6 +143,10 @@ async def auth_verify(request):
def is_need_auth_verify_cid(path):
if SETTING.debug_mode == 1:
return False
if "swagger" in path:
return False
if re.match("^/unify-api/auth/", path):
return False
if re.match("^/api/common/wechat/mp", path):
......@@ -147,6 +159,8 @@ def is_need_auth_verify_cid(path):
async def auth_verify_cid(request):
"""增加工厂权限校验"""
if SETTING.debug_mode == 1:
return False
path = request.path
# if not re.match("^/unify-api/auth/?$", path):
if is_need_auth_verify_cid(path):
......@@ -178,7 +192,7 @@ async def auth_verify_cid(request):
# 3. 如果参数中包含cid或者cids,cid_list
if cid or cids or cid_list:
user_id = request.ctx.user_id
proxy_id = args.get("proxy_id")
# 2.2 从mysql获取
if proxy_id:
......
......@@ -132,7 +132,7 @@ async def pttl_max_new15(cid, start, end, point_id=None, inline_id=None):
# 根据时间范围, 返回不同时间格式
if max_val_time:
if date_type == "day":
max_val_time = str(max_val_time)[:10]
max_val_time = str(max_val_time)[11:16]
elif date_type == "month":
max_val_time = str(max_val_time)[5:10]
else:
......
......@@ -29,8 +29,7 @@ def test_td_engine():
url = f"{SETTING.stb_url}db_electric"
print(token)
h = {"Authorization": f"Basic {token}"}
sql = f"select last_row(*) from electric_stb where cpyid =204 " \
f"group by tbname"
sql = f"select last_row(*) from mt1332_ele where pid=1395"
r = requests.post(url, data=sql, headers=h)
print(r.status_code)
print(r.content)
......
......@@ -122,6 +122,18 @@ async def query_charge_aggs(date_start, date_end, cid_list):
return es_re["aggregations"]["cids"]["buckets"]
async def query_charge_new15(date_start, date_end, cid_list):
sql = f"""
select cid,sum(kwh) kwh,sum(charge) charge from company_15min_power
where cid in %s and create_time BETWEEN %s and %s
group by cid
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(cid_list, date_start,
date_end))
return datas
async def power_charge_p_aggs(date_start, date_end, cid_list, interval):
"""
date_histogram,
......
......@@ -4,7 +4,7 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_annual_sunshine_hours(cid):
sql = "SELECT pv.annual_effective_hours FROM " \
"`algorithm_distributed_pv_quick_check_list` pv " \
"`algo_distributed_pv_quick_check_list` pv " \
"LEFT JOIN company c on c.city =pv.city where c.cid=%s"
async with MysqlUtil() as conn:
hours = await conn.fetchone(sql, args=(cid,))
......@@ -12,7 +12,7 @@ async def get_annual_sunshine_hours(cid):
async def get_p(cid):
sql = "SELECT pv.hour, pv.p FROM `algorithm_distributed_pv` pv " \
sql = "SELECT pv.hour, pv.p FROM `algo_distributed_pv` pv " \
"LEFT JOIN company c on c.city =pv.city where c.cid=%s"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(cid,))
......@@ -29,8 +29,8 @@ async def get_elec_price_dao(cid):
async def get_max_demand_by_inlid(inlids):
sql = """SELECT a.has_space,b.related_inlids FROM
`algorithm_md_space_analysis_result` a
LEFT JOIN algorithm_md_space_analysis_unit b on a.space_analysis_id=b.id
`algo_md_space_analysis_result` a
LEFT JOIN algo_md_space_analysis_unit b on a.space_analysis_id=b.id
WHERE b.related_inlids in %s ORDER BY a.create_time DESC LIMIT %s"""
async with MysqlUtil() as conn:
spaces = await conn.fetchall(sql, args=(inlids, len(inlids)))
......@@ -52,7 +52,7 @@ async def insert_price_policy_data_dao(cid, inline_id, start_month, quarters,
async def inset_algorithm_power_factor_dao(inline_id, start_time, std_cos):
sql = """INSERT INTO `algorithm_power_factor` (`inlid`, `start_time`,
sql = """INSERT INTO `algo_power_factor` (`inlid`, `start_time`,
`valid`, `std_cos`) VALUES ( %s, %s, %s, %s); """
async with MysqlUtil() as conn:
await conn.execute(sql, args=(inline_id, start_time, 1, std_cos))
......@@ -60,7 +60,7 @@ async def inset_algorithm_power_factor_dao(inline_id, start_time, std_cos):
async def get_algorithm_power_factor_dao(cid):
sql = "select a.std_cos from algorithm_power_factor a " \
sql = "select a.std_cos from algo_power_factor a " \
"LEFT JOIN inline_zdu line on line.inlid =a.inlid " \
"WHERE line.cid_belongedto = %s ORDER BY a.start_time desc limit 1"
async with MysqlUtil() as conn:
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.utils.time_format import get_start_end_by_tz_time_new
def convert_es_str(str1: object) -> object:
......@@ -64,6 +66,20 @@ async def query_search_kwh_p(cid, start, end, interval):
return es_re["aggregations"]["quarter_time"]["buckets"]
async def query_search_kwh_p_new15(cid, start, end):
start, _ = get_start_end_by_tz_time_new(start)
_, end = get_start_end_by_tz_time_new(end)
sql = f"""
select p,kwh,create_time from company_15min_power
where cid = %s and create_time BETWEEN %s and %s
order by create_time asc
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql,
args=(cid, start, end))
return datas
async def query_spfv_price(cid, start, end):
query_body = {
"query": {
......@@ -103,3 +119,15 @@ async def query_spfv_price(cid, start, end):
es_re = await es.search_origin(body=query_body,
index="poweriot_company_15min_power")
return es_re["aggregations"]["charge"]["avg"], es_re["aggregations"]["kwh"]["avg"]
async def query_spfv_price_new15(cid, start, end):
start, _ = get_start_end_by_tz_time_new(start)
_, end = get_start_end_by_tz_time_new(end)
sql = f"""
select avg(charge) charge,avg(kwh) kwh from company_15min_power
where cid = %s and create_time BETWEEN %s and %s
"""
async with MysqlUtil() as conn:
data = await conn.fetchone(sql=sql, args=(cid, start, end))
return data.get('charge', 0), data.get('kwh', 0)
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import COMPANY_1DAY_POWER, COMPANY_15MIN_POWER
from unify_api.modules.common.dao.common_dao import company_by_cids
from unify_api.modules.elec_charge.components.elec_charge_cps import Spvf, \
PowerViewRes
from unify_api.modules.elec_charge.dao.elec_charge_dao import query_charge_aggs
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs, query_charge_new15
from unify_api.utils.common_utils import round_2, process_es_data, round_4
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.time_format import convert_es_str, last_time_str
......@@ -26,9 +28,9 @@ def quarters_trans(quarters):
}
temp_value = ""
temp_index = 0
start = my_pendulum.from_format("00:00", 'HH:mm')
for index, value in enumerate(quarters):
if index == 0:
temp_value = value
......@@ -37,12 +39,12 @@ def quarters_trans(quarters):
# dic2[temp_value].append(str(temp_index) + "-" + str(index-1))
# 计算时长
dic2[temp_value + "dt"] += (index - temp_index) * 15 / 60
# 转换为时间范围
minute_s = start.add(minutes=temp_index * 15).format("HH:mm")
minute_e = start.add(minutes=index * 15).format("HH:mm")
dic2[temp_value].append(str(minute_s) + "-" + str(minute_e))
# 重置temp_value和temp_index
temp_value = value
temp_index = index
......@@ -73,6 +75,17 @@ async def proxy_power(cid_list):
return 0
async def proxy_power15(cid_list):
"""渠道版累计用电"""
sql = f"""
select sum(kwh) kwh from company_1day_power where cid in %s
"""
# 3.返回数据
async with MysqlUtil() as conn:
res = await conn.fetchone(sql=sql, args=(cid_list,))
return round(res.get("kwh", 0), 2) if res else 0
async def power_overview_proxy(date_start, date_end, cid_list):
"""渠道版, 抽离电量电费信息,供调用"""
pv1 = {} # 电量
......@@ -127,7 +140,7 @@ async def power_overview_proxy(date_start, date_end, cid_list):
}
}
}
log.info(query_body)
# 3. 查询es
async with EsUtil() as es:
......@@ -154,6 +167,26 @@ async def power_overview_proxy(date_start, date_end, cid_list):
return pv1, pv2
async def power_overview_proxy15(date_start, date_end, cid_list):
"""渠道版, 抽离电量电费信息,供调用"""
pv1 = {} # 电量
pv2 = {} # 电费
sql = f"""
select spfv,sum(p) as p,sum(kwh) as kwh,sum(charge) as charge
from company_15min_power
where cid in %s and create_time BETWEEN %s and %s
group by spfv
"""
async with MysqlUtil() as conn:
res = await conn.fetchall(sql=sql,
args=(cid_list, date_start, date_end))
# 4. 构造返回
for info in res:
pv1[info.get("spfv")] = round_2(info.get("kwh"))
pv2[info.get("spfv")] = round_2(info.get("charge"))
return pv1, pv2
def total_value(dict_total):
# spfv是对象
if not dict_total:
......@@ -222,6 +255,69 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
return kwh_list, charge_list, price_list
async def power_aggs_cid_proxy_new15(start, end, cid_list, date_type):
"""渠道版,电量电费信息,根据cid聚合,再聚合求出sum电量/电费"""
# 1. 求出上周期时间
start_last, end_last = last_time_str(start, end, date_type)
# 2. 获取es结果
re_this = await query_charge_new15(start, end, cid_list)
re_last = await query_charge_new15(start_last, end_last, cid_list)
if not re_this:
log.info(
f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
re_last_dic = process_es_data(re_last, key="cid")
# 3. 构造返回
kwh_list = []
charge_list = []
price_list = []
# 3.1 查询出cid和工厂名对应关系
company_list = await company_by_cids(cid_list)
# 把cid提出来
com_dic = process_es_data(company_list, key="cid")
for info in re_this:
cid = info.get("cid")
cid_name = com_dic[cid]["shortname"]
kwh = round_2(info.get("kwh"))
if kwh == 0:
continue
# 上一周期如果没有数据, 此数据不参与统计
try:
kwh_last = re_last_dic[cid]["kwh"]
kwh_rate = round_4((kwh - kwh_last) / kwh_last)
if kwh_last == 0:
continue
except Exception as e:
log.error(e)
log.info(
f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
continue
charge = round_2(info.get("charge"))
try:
charge_last = re_last_dic[cid]["charge"]
charge_rate = round_4((charge - charge_last) / charge_last)
if charge_last == 0:
continue
except Exception as e:
log.error(e)
log.info("本次有数据, 上周期没有数据")
log.info(
f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
continue
price = round_2(charge / kwh)
price_last = round_2(charge_last / kwh_last)
price_rate = round_4((price - price_last) / price_last)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh, "rate": kwh_rate})
charge_list.append(
{"name": cid_name, "value": charge, "rate": charge_rate})
price_list.append(
{"name": cid_name, "value": price, "rate": price_rate})
return kwh_list, charge_list, price_list
async def power_index_aggs_cid_proxy(start, end, cid_list, date_type):
"""power_aggs_cid_proxy缩减版, 没有增长率"""
# 1. 获取es结果
......@@ -247,7 +343,40 @@ async def power_index_aggs_cid_proxy(start, end, cid_list, date_type):
if kwh == 0:
continue
price = round_2(charge / kwh)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh})
charge_list.append({"name": cid_name, "value": charge})
price_list.append({"name": cid_name, "value": price})
return kwh_list, charge_list, price_list
async def power_index_cid_proxy_new15(start, end, cid_list, date_type):
"""power_aggs_cid_proxy缩减版, 没有增长率"""
# 1. 获取es结果
res = await query_charge_new15(start, end, cid_list)
if not res:
log.info(
f"未查询到数据, cid_list:{cid_list}, start:{start}, end:{end}")
return [], [], []
# 3. 构造返回
kwh_list = []
charge_list = []
price_list = []
# 3.1 查询出cid和工厂名对应关系
company_list = await company_by_cids(cid_list)
# 把cid提出来
com_dic = process_es_data(company_list, key="cid")
for info in res:
cid = info.get("cid")
cid_name = com_dic[cid]["shortname"] if cid in com_dic else ''
kwh = round_2(info.get("kwh"))
charge = round_2(info.get("charge"))
# 值为0不参与排名统计
if kwh == 0:
continue
price = round_2(charge / kwh)
# 构造kwh
kwh_list.append({"name": cid_name, "value": kwh})
charge_list.append({"name": cid_name, "value": charge})
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import COMPANY_15MIN_POWER, SLOTS, SLOTS_15MIN
from unify_api.modules.elec_charge.components.elec_statistics_cps import \
SlotValue
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
power_charge_p_aggs
from unify_api.utils.common_utils import round_2
from unify_api.utils.es_query_body import EsQueryBody, es_process
from unify_api.utils.es_query_body import EsQueryBody, es_process, \
sql_time_process
from unify_api.utils.time_format import time_pick_transf
......@@ -65,6 +67,42 @@ async def proxy_today_yesterday_p(cid_list, start, end):
return sv
async def proxy_today_yesterday_p_new15(cid_list, start, end):
"""proxy首页今日和昨日负荷"""
sql = f"""
select create_time,sum(p) as p from company_15min_power
where cid in %s and create_time between %s and %s
group by create_time
order by create_time asc
"""
# 2.获取slots
intervel, slots = time_pick_transf(start, end)
# 3. 查询es
async with MysqlUtil() as conn:
res = await conn.fetchall(sql=sql, args=(cid_list, start, end))
if not res:
return SlotValue(slots=slots, value=[])
# 4.为了es结果和slots对应
es_re = sql_time_process(res, fmt="%H:%M", time_key='create_time')
sv = SlotValue() # 今日负荷对象
sv.slots = slots
tmp_list = []
# 5.拼接返回
for slot in slots:
if slot in es_re:
# 1.每个时间点,电量信息
value = es_re[slot].get("p")
# 值为0是正常数据
if value == 0:
tmp_list.append(0.0)
else:
tmp_list.append(value or "")
else:
tmp_list.append("")
sv.value = tmp_list
return sv
def by_slots(slots, es_re_dic):
# 拼接slot, value返回
kwh_list = []
......
......@@ -12,4 +12,4 @@ async def get_co2_price():
await aredis_utils.RedisUtils().set("co2_price", co2_price)
except:
co2_price = await aredis_utils.RedisUtils().get("co2_price")
return co2_price
return co2_price or 0
......@@ -20,8 +20,8 @@ from unify_api.modules.elec_charge.components.elec_charge_cps import \
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, get_kwh_charge, query_charge_aggs_points_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy, \
power_index_aggs_cid_proxy
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy_new15, \
power_index_cid_proxy_new15, power_overview_proxy15
from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service
from unify_api.utils.common_utils import round_2, round_4, NumListHelper
......@@ -151,7 +151,7 @@ async def post_price_policy(req, body: PricePolicyReq) -> PricePolicyResp:
quarters = price.get("quarters")
price_info_dic = price
break
if len(quarters) != 96:
log.error("quarters config fail")
return PricePolicyResp(price_info=[])
......@@ -191,12 +191,14 @@ async def aver_elec_price_new15(start, end, point_id, cid, date_type):
return this_ck, None
# 上周期电量电费
start_last, end_last = last_time_str(start, end, date_type)
last_datas = await get_kwh_charge(table_name, name, value, start_last, end_last)
last_datas = await get_kwh_charge(table_name, name, value, start_last,
end_last)
last_ck = ChargeKwh()
last_sum_charge = last_datas.get("sum_charge")
last_sum_kwh = last_datas.get("sum_kwh")
this_ck.charge = round(last_sum_charge, 2) if last_sum_charge else last_sum_charge
this_ck.kwh = round(last_sum_kwh, 2) if last_sum_kwh else last_sum_kwh
last_ck.charge = round(last_sum_charge,
2) if last_sum_charge else last_sum_charge
last_ck.kwh = round(last_sum_kwh, 2) if last_sum_kwh else last_sum_kwh
return this_ck, last_ck
......@@ -311,8 +313,8 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
end = now_date
# 获取上一周期开始结束时间
start_last, end_last = last_time_str(start, end, date_type)
power, charge = await power_overview_proxy(start, end, cid_list)
power_last, charge_last = await power_overview_proxy(start_last, end_last,
power, charge = await power_overview_proxy15(start, end, cid_list)
power_last, charge_last = await power_overview_proxy15(start_last, end_last,
cid_list)
if not all([power, charge, power_last, charge_last]):
return PopResp(power=Spvf(), charge=Spvf())
......@@ -363,30 +365,30 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
"month")
this_month_p, this_month_charge = await power_overview_proxy(
this_month_p, this_month_charge = await power_overview_proxy15(
month_start, month_end, cid_list)
last_month_p, last_month_charge = await power_overview_proxy(
last_month_p, last_month_charge = await power_overview_proxy15(
last_month_start, last_month_end, cid_list)
if not all([this_month_p, this_month_charge, last_month_p,
last_month_charge]):
return MtpResp()
this_month_total_power = total_value(this_month_p)
last_month_total_power = total_value(last_month_p)
month_power_rate = (this_month_total_power -
last_month_total_power) / last_month_total_power
# 2. 今日/昨日数据
last_day_start, last_day_end = last_time_str(today_start, today_end, "day")
this_day_p, this_day_charge = await power_overview_proxy(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy(
this_day_p, this_day_charge = await power_overview_proxy15(today_start,
today_end,
cid_list)
last_day_p, last_day_charge = await power_overview_proxy15(
last_day_start, last_day_end, cid_list)
if not all([this_day_p, this_day_charge, last_day_p, last_day_charge]):
return MtpResp()
this_day_total_power = total_value(this_day_p)
last_day_total_power = total_value(last_day_p)
day_power_rate = \
......@@ -410,7 +412,7 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
date_type = body.date_type
if date_type == "range":
date_type = "month"
# 如果end是今天, 则end=当前时间, 避免增长率错误
end_tmp = end.split(" ")[0]
now_date, timestamp = srv_time()
......@@ -424,9 +426,8 @@ async def post_power_sort_proxy(req, body: PopReq) -> PspResp:
end = now_date
# 2. 查询工厂电量电费信息
kwh_list, charge_list, price_list = await power_aggs_cid_proxy(start, end,
cid_list,
date_type)
kwh_list, charge_list, price_list = await power_aggs_cid_proxy_new15(
start, end, cid_list, date_type)
kwh_list_st = sorted(kwh_list, key=lambda i: i['value'], reverse=True)
charge_list_st = sorted(charge_list, key=lambda i: i['value'],
reverse=True)
......@@ -449,7 +450,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
return IpspResp()
today_start, today_end, month_start, month_end = today_month_date()
# 2. 获取今日数据
kwh_list_d, charge_list_d, price_list_d = await power_index_aggs_cid_proxy(
kwh_list_d, charge_list_d, price_list_d = await power_index_cid_proxy_new15(
today_start, today_end, cid_list, "day")
kwh_list_d_st = sorted(kwh_list_d, key=lambda i: i['value'],
reverse=True)[:5]
......@@ -458,7 +459,7 @@ async def post_index_power_sort_proxy(req) -> IpspResp:
price_list_d_st = sorted(price_list_d, key=lambda i: i['value'],
reverse=True)[:5]
# 2. 获取本月数据
kwh_list_m, charge_list_m, price_list_m = await power_index_aggs_cid_proxy(
kwh_list_m, charge_list_m, price_list_m = await power_index_cid_proxy_new15(
month_start, month_end, cid_list, "month")
kwh_list_m_st = sorted(kwh_list_m, key=lambda i: i['value'],
reverse=True)[:5]
......
......@@ -17,8 +17,9 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
power_charge_p_aggs, power_charge_p_cid_aggs, histogram_aggs_points, \
power_charge_p_point_aggs
from unify_api.modules.elec_charge.procedures.elec_statis_proxy_pds import \
proxy_today_yesterday_p, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, power_charge
proxy_today_yesterday_p_new15, by_slots
from unify_api.modules.elec_charge.common.utils import aver_price, \
power_charge, power_charge_new15
from unify_api.utils.common_utils import process_es_data
from unify_api.utils.es_query_body import es_process
from unify_api.utils.time_format import last_time_str, proxy_power_slots, \
......@@ -49,26 +50,26 @@ async def post_power_statis_proxy(req,
# 2. 如果是日统计,则需要增加今日/昨日负荷曲线, 15min一个点
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p(cid_list, start, end)
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p(cid_list,
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge(cid_list, point_id,
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, point_id,
last_start,
last_end,
date_type)
......@@ -78,7 +79,7 @@ async def post_power_statis_proxy(req,
last_aver_price=last_aver_price)
elif date_type == "year":
# 本月电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv,
......@@ -91,17 +92,17 @@ async def post_power_statis_proxy(req,
# 自定义选时范围,不需要最后时间的数据,解决bug
end = end_f.subtract(minutes=1).format("YYYY-MM-DD HH:mm:ss")
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end,
date_type)
# 负荷曲线
this_p = await proxy_today_yesterday_p(cid_list, start,
this_p = await proxy_today_yesterday_p_new15(cid_list, start,
end)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=this_p)
else:
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, point_id, start,
kwh_sv, charge_sv = await power_charge_new15(cid_list, point_id, start,
end, date_type)
# 平均电价
this_aver_price = aver_price(kwh_sv, charge_sv)
......@@ -359,26 +360,26 @@ async def post_power_statist_opt(req, body: PopReq) -> PcStatiResp:
date_type = "month"
if date_type == "day":
# 电量电费
kwh_sv, charge_sv = await power_charge(cid_list, -1, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
# 需要增加15min电量电费, 渠道版首页不需要下载,暂时去掉
# 今日/昨日负荷曲线
today_p = await proxy_today_yesterday_p(cid_list, start, end)
today_p = await proxy_today_yesterday_p_new15(cid_list, start, end)
ysd_start, ysd_end = last_time_str(start, end, "day")
yesterday_p = await proxy_today_yesterday_p(cid_list,
yesterday_p = await proxy_today_yesterday_p_new15(cid_list,
ysd_start,
ysd_end)
return PcStatiResp(kwh=kwh_sv, charge=charge_sv, today_p=today_p,
yesterday_p=yesterday_p)
elif date_type == "month":
# 本月电量电费, 平均电价
kwh_sv, charge_sv = await power_charge(cid_list, -1, start, end,
kwh_sv, charge_sv = await power_charge_new15(cid_list, -1, start, end,
date_type)
this_aver_price = aver_price(kwh_sv, charge_sv)
# 上月电量电费, 平均电价
last_start, last_end = last_time_str(start, end, "month")
# 需要增加15min电量电费
last_kwh_sv, last_charge_sv = await power_charge(cid_list, -1,
last_kwh_sv, last_charge_sv = await power_charge_new15(cid_list, -1,
last_start,
last_end,
date_type)
......
......@@ -31,7 +31,7 @@ async def post_max_p(req, body: MaxpReq) -> MaxpResp:
point_id = body.point_id
start = body.start
end = body.end
max_val, max_val_time = await pttl_max(cid, start, end, point_id=point_id)
max_val, max_val_time = await pttl_max_new15(cid, start, end, point_id=point_id)
return MaxpResp(maxp=max_val, date_time=max_val_time)
......
......@@ -3,7 +3,7 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
async def monitor_point_join_by_points(points):
"""monitor和point关联"""
sql = "SELECT m.mtid,p.ctnum,m.name, m.m_type, p.pid " \
sql = "SELECT m.mtid,p.ctnum,m.name, m.m_type, p.pid,p.cid " \
"FROM monitor m inner join point p on m.mtid = p.mtid " \
"WHERE p.pid in %s and m.demolished = 0 order by field(p.pid,{})".\
format(str(points).replace("[", "").replace("]", ""))
......@@ -48,3 +48,20 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format):
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid,))
return datas
async def get_elec_mtid_sid_by_cid(cid):
sql = (
f"""
SELECT
mtid,
sid
FROM
monitor
WHERE
cid = {cid};
"""
)
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
return datas if datas else []
......@@ -227,17 +227,20 @@ async def qual_current_data(point_mid):
return ret_dic
async def elec_current_data_new15(mtids):
async def elec_current_data_new15(mtids, cid):
res_map = {}
url = f"{SETTING.stb_url}db_electric"
table_name = [f"mt{mtid}_ele" for mtid in mtids]
sql = f'select last_row(*) from electric_stb ' \
f'where TBNAME IN {tuple(table_name)} group by tbname'
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select tbname,last_row(*) from electric_stb
where cpyid={cid}
group by tbname
"""
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i
for i in results["head"]]
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
if data["mtid"] in mtids:
res_map[data["mtid"]] = data
return res_map
......@@ -30,6 +30,7 @@ from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from unify_api.modules.electric.components.electric import (
ElecIndexResponse, ElecIndex, EscResp, QcsResp, EclResp, QclResp,
)
from unify_api.utils.taos_new import parse_td_columns
async def elec_current_storeys_service(storeys):
......@@ -38,11 +39,12 @@ async def elec_current_storeys_service(storeys):
point_list = await points_by_storeys(storeys)
# mtids
mtids = [i.get("mtid") for i in point_list]
cid = point_list[0]['cid'] if len(point_list) > 0 else 0
# 2.获取mid, ctnum
# point_mid = await batch_get_wiring_type(points)
# # 3. 获取redis数据
# res = await elec_current_data(point_mid)
res = await elec_current_data_new15(mtids)
res = await elec_current_data_new15(mtids, cid)
# 4. 返回数据
elec_data = {}
for info in point_list:
......@@ -128,7 +130,8 @@ async def qual_current_storeys_service(storeys):
# # 3. 获取redis数据
# res = await qual_current_data(point_mid)
mtids = [point["mtid"] for point in point_list if point["mtid"]]
res = await elec_current_data_new15(mtids)
cid = point_list[0]['cid'] if len(point_list) > 0 else 0
res = await elec_current_data_new15(mtids, cid)
# 4. 返回数据
qual_data = {}
for info in point_list:
......@@ -210,7 +213,8 @@ async def elec_card_level_service(point_list):
# # 3. 获取redis数据
# res_redis = await elec_current_data(point_mid)
mtids = [monitor["mtid"] for monitor in monitor_point_list if monitor["mtid"]]
results = await elec_current_data_new15(mtids)
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
results = await elec_current_data_new15(mtids, cid)
# 4. 返回数据
ret_data = {
"inline": [],
......@@ -303,7 +307,8 @@ async def qual_current_level_service(point_list):
# res_redis = await qual_current_data(point_mid)
mtids = [monitor["mtid"] for monitor in monitor_point_list if
monitor["mtid"]]
res = await elec_current_data_new15(mtids)
cid = monitor_point_list[0]['cid'] if len(monitor_point_list) > 0 else 0
res = await elec_current_data_new15(mtids, cid)
# 4. 返回数据
ret_data = {
"inline": [],
......@@ -797,7 +802,10 @@ async def elec_index_service_new15(cid, point_id, start, end):
min_value, min_time = "", ""
mean_item_name = f"{item}_mean"
avg_value = df[mean_item_name].mean()
avg_value = round(avg_value, 2) if avg_value else ""
if not pd.isna(avg_value):
avg_value = round(avg_value, 2) if avg_value else ""
else:
avg_value = ""
elec_index = ElecIndex(
stats_index=item_name,
max=max_value,
......@@ -830,7 +838,10 @@ async def elec_index_service_new15(cid, point_id, start, end):
min_value, min_time = "", ""
mean_item_name = f"{item}_mean"
avg_value = df[mean_item_name].mean()
avg_value = round(avg_value, 2) if avg_value else ""
if not pd.isna(avg_value):
avg_value = round(avg_value, 2) if avg_value else ""
else:
avg_value = ""
elec_index = ElecIndex(
stats_index=item_name,
max=max_value,
......@@ -874,12 +885,12 @@ async def elec_current_service_new15(point_id):
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric"
sql = f"select last(*) from mt{mtid}_ele where pid={point_id}"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
return '',{}
head = results["head"]
return '', {}
head = parse_td_columns(results)
if not results["data"]:
results["data"] = ['' for i in range(len(head))]
res = dict(zip(head, results["data"][0]))
......
......@@ -769,7 +769,7 @@ async def qual_history_service_new15(start, end, intervel, slots, pid):
for slot in slots:
if slot in datas.keys():
for stats_item in stats_items:
value = datas[slot].get(stats_item) or ""
value = datas[slot].get(stats_item, "")
if value and stats_item == "freq_dev_mean":
# 如果频率偏差保留两位小数之后为0了,那么直接返回0,防止出现-0.00 的情况
if abs(value) < 0.05:
......@@ -878,8 +878,8 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
message="没有该监测点的monitor信息,请联系运维人员!")
mtid = meter_info["mtid"]
# 获取子表中的实时数据
url = f"{SETTING.stb_url}db_electric"
sql = f"select last(*) from mt{mtid}_ele where pid={point_id}"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select last_row(*) from mt{mtid}_ele where pid={point_id}"
is_succ, results = await get_td_engine_data(url, sql)
log.info(f"is_succ:{is_succ}")
if is_succ:
......@@ -932,7 +932,7 @@ async def post_qual_current(req, body: PageRequest) -> QualCurrentResponse:
**{k: v for k, v in res.items() if k in ret_items},
)
else:
return BusinessException
raise BusinessException(message="数据查询失败!")
@summary("用电监测-实时监测-楼层")
......
......@@ -56,7 +56,7 @@ async def alarm_aggs_point_location(date_start, date_end, cid):
async def get_inline_by_cid(cid):
sql = "SELECT inlid, `name` FROM inline WHERE cid_belongedto=%s"
sql = "SELECT inlid, `name` FROM inline WHERE cid=%s"
async with MysqlUtil() as conn:
inlines = await conn.fetchall(sql, args=(cid,))
return inlines
......@@ -66,7 +66,7 @@ async def get_inline_by_cid(cid):
async def get_power_factor_kpi(inline_ids, month_str):
sql = "SELECT b.name, a.inlid, a.save_charge pf_cost, a.kpi_x, " \
"a.save_charge FROM algo_power_factor_result a LEFT JOIN inline b " \
"on a.inlid = b.inlid WHERE a.inlid in %s and a.res_time=%s"
"on a.inlid = b.inlid WHERE a.inlid in %s and a.`month`=%s"
async with MysqlUtil() as conn:
power_factor_results = await conn.fetchall(sql, args=(
inline_ids, month_str))
......@@ -76,7 +76,7 @@ async def get_power_factor_kpi(inline_ids, month_str):
# 移峰填谷
async def get_pcvf_kpi(inline_ids, last_month_str):
sql = "select b.name, a.score, a.cost_save " \
"from algorithm_plsi_result a left join inline b " \
"from algo_plsi_result a left join inline b " \
"on a.inlid = b.inlid where a.inlid in %s and a.month = %s"
async with MysqlUtil() as conn:
pcvfs = await conn.fetchall(sql, args=(inline_ids, last_month_str))
......@@ -86,7 +86,7 @@ async def get_pcvf_kpi(inline_ids, last_month_str):
# 经济运行
async def get_economic_kpi(inline_ids, last_month_str):
sql = "select b.name, a.kpi_x, a.save_charge, a.mean_load_factor " \
"from algorithm_economic_operation_result a " \
"from algo_economic_operation_result a " \
"left join inline b on a.inlid = b.inlid " \
"where a.inlid in %s and a.month = %s"
async with MysqlUtil() as conn:
......@@ -98,11 +98,11 @@ async def get_economic_kpi(inline_ids, last_month_str):
async def get_md_space(inline_ids, last_month_str):
sql = (
"select a.inline_md_charge, a.kpi_x, a.save_charge, "
"a.inline_md_predict, b.related_inlids,b.tc_runtime "
"from algorithm_md_space_analysis_result a "
"inner join algorithm_md_space_analysis_unit b "
"a.inline_md_predict, b.inlid,b.tc_runtime "
"from algo_md_space_analysis_result a "
"inner join algo_md_space_analysis_unit b "
"on a.space_analysis_id=b.id "
"where b.related_inlids in %s and a.month = %s and valid=1;"
"where b.inlid in %s and a.month = %s and valid=1;"
)
async with MysqlUtil() as conn:
md_spaces = await conn.fetchall(sql, args=(inline_ids, last_month_str))
......
......@@ -13,8 +13,10 @@ from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import Importance, EVENT_TYPE_MAP, SDU_ALARM_LIST
from unify_api.modules.alarm_manager.dao.list_static_dao import \
sdu_alarm_importance_dao, alarm_aggs_importance
from unify_api.modules.common.dao.common_dao import monitor_point_join
sdu_alarm_importance_dao, alarm_aggs_importance, \
sdu_alarm_importance_dao_new15
from unify_api.modules.common.dao.common_dao import monitor_point_join, \
monitor_by_cid
from unify_api.modules.common.procedures.common_utils import get_electric_index
from unify_api.modules.common.procedures.points import get_points, \
proxy_points, get_points_num
......@@ -33,9 +35,63 @@ from unify_api.modules.home_page.dao.count_info_dao import (
from unify_api.modules.electric_optimization.dao.power_index import (
price_policy_by_cid
)
from unify_api.utils.taos_new import parse_td_columns
async def other_info(company_id):
"""
今日报警数和累计安全运行天数,报警数
"""
alarm_sql = f"""
SELECT
DATE(pevent.event_datetime) event_date,
COUNT(*) event_count
FROM
point_1min_event pevent
WHERE
cid = %s
GROUP BY
DATE(pevent.event_datetime)
"""
now_time = datetime.now()
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select create_time from company where cid = %s"
alarm_data = await conn.fetchone(alarm_sql, (company_id,))
company = await conn.fetchone(company_sql, (company_id,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
today_alarm_count = 0
alarm_count = 0
if not alarm_data:
log.warn(
"No alarm data %s" % (company_id)
)
# 1. 增加逻辑,新增工厂如果还没有事件产生
# 系统安全运行天数: 当前时间 - 工厂安装时间 + 1
safe_run_days = (now_time - create_time).days + 1
return today_alarm_count, safe_run_days, alarm_count
# 5. 构造返回
# 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2
total_days = (now_time - create_time).days + 2
has_alarm_days = 0
for data in alarm_data:
create_time.strftime("%Y-%m-%d")
if data["event_date"].strftime("%Y-%m-%d") == str(now_time)[:10]:
today_alarm_count += data["event_count"]
if data["event_count"] != 0:
# 没有报警,看做是安全运行了,统计累计安全运行的天数
has_alarm_days += 1
alarm_count += data["event_count"]
safe_run_days = total_days - has_alarm_days
log.info(
f"today_alarm_count={today_alarm_count} safe_run_days={safe_run_days}")
return today_alarm_count, safe_run_days, alarm_count
async def other_info_old(company_id):
"""
今日报警数和累计安全运行天数,报警数
:param company_id:
......@@ -548,14 +604,16 @@ async def current_load_new15(cid):
from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
url = f"{SETTING.stb_url}db_electric"
sql = f"select last_row(*) from electric_stb where cpyid ={cid} " \
f"group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select last_row(*) from electric_stb
where cpyid={cid}
group by tbname
"""
is_succ, results = await get_td_engine_data(url, sql)
now_tt = int(time.time())
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i
for i in results["head"]]
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
......@@ -565,10 +623,8 @@ async def current_load_new15(cid):
if item:
mdptime_tt = None
if "mdptime" in item:
mdptime = datetime.strptime(item["mdptime"],
"%Y-%m-%d %H:%M:%S.%f")
mdptime_tt = time.mktime(mdptime.timetuple())
item_tt = item.get("timestamp") or mdptime_tt
mdptime_dt = pendulum.parse(item["mdptime"])
item_tt = item.get("timestamp") or mdptime_dt.int_timestamp
if item_tt:
# 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60:
......@@ -713,7 +769,7 @@ async def company_power_use_info_new15(company_id, start, end):
async def get_company_charge_price(company_id, es_time_start, es_time_end):
power_use_info = await company_power_use_info(company_id, es_time_start,
power_use_info = await company_power_use_info_new15(company_id, es_time_start,
es_time_end)
if power_use_info["kwh"]:
unit_price = power_use_info["charge"] / power_use_info["kwh"]
......@@ -1394,6 +1450,7 @@ async def electric_use_info_points_sdu_new15(start, end, points):
electric_use_score=electric_use_score,
)
async def optimization_count_info_new(company_id: int):
"""
首页用电经济指数和用电优化模块统计数据
......@@ -1413,7 +1470,7 @@ async def optimization_count_info_new(company_id: int):
)
es_end_time = pendulum.datetime(now.year, now.month, 1).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
power_use_info = await company_power_use_info(company_id, es_start_time,
power_use_info = await company_power_use_info_new15(company_id, es_start_time,
es_end_time)
month_charge = power_use_info["charge"]
count_info_map = {
......@@ -1600,7 +1657,7 @@ async def optimization_count_info_new(company_id: int):
price_md = price_policy["price_md"] if price_policy["price_md"] else 0
price_tc = price_policy["price_tc"] if price_policy["price_tc"] else 0
# 最大需量
md_spaces = await get_md_space(inline_ids, last_month_str)
md_spaces = await get_md_space(inline_ids, last_month_dt)
md_space_kpi_x_list = [i["kpi_x"] for i in md_spaces if
type(i["kpi_x"]) in [int, float]]
......@@ -1677,7 +1734,7 @@ async def cid_alarm_importance_count(cid, start, end):
"""计算工厂报警数, 按报警等级"""
monitor_point_list = await monitor_point_join(cid)
point_list = [i["pid"] for i in monitor_point_list]
es_res = await sdu_alarm_importance_dao(start, end, point_list)
es_res = await sdu_alarm_importance_dao_new15(start, end, point_list)
es_res_key = {i["key"]: i for i in es_res}
res_list = []
......@@ -1711,12 +1768,12 @@ async def alarm_importance_count_total(cid, start, end):
es_res = await alarm_aggs_importance(cid, start, end)
first_cnt, second_cnt, third_cnt = 0, 0, 0
for buckets in es_res:
if buckets["key"] == Importance.First.value:
first_cnt += buckets["doc_count"]
elif buckets["key"] == Importance.Second.value:
second_cnt += buckets["doc_count"]
elif buckets["key"] == Importance.Third.value:
third_cnt += buckets["doc_count"]
if buckets["importance"] == Importance.First.value:
first_cnt += buckets["alarm_count"]
elif buckets["importance"] == Importance.Second.value:
second_cnt += buckets["alarm_count"]
elif buckets["importance"] == Importance.Third.value:
third_cnt += buckets["alarm_count"]
return {
"first_cnt": first_cnt,
"second_cnt": second_cnt,
......
......@@ -23,6 +23,7 @@ from unify_api.modules.common.procedures.points import get_points
from unify_api.modules.home_page.procedures.count_info_pds import \
datetime_to_timestamp
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.time_format import last30_day_range_today
async def proxy_alarm_score(cids):
......@@ -129,6 +130,80 @@ async def security_level_count(cids):
async def alarm_percentage_count(cids):
start, end = last30_day_range_today()
importance_sql = f"""
SELECT
COUNT(*) doc_count,
importance
FROM
point_1min_event pevent
WHERE
cid = %s
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
pevent.importance
"""
event_type_sql = f"""
SELECT
COUNT(*) doc_count,
event_type
FROM
point_1min_event pevent
WHERE
cid = %s
AND pevent.event_datetime >= '{start}'
AND pevent.event_datetime <= '{end}'
GROUP BY
pevent.event_type;
"""
async with MysqlUtil() as conn:
event_type_data = await conn.fetchall(event_type_sql, args=(cids,))
importance_data = await conn.fetchall(importance_sql, args=(cids,))
first_alarm_cnt, second_alarm_cnt, third_alarm_cnt = 0, 0, 0
for bucket in importance_data:
if bucket["importance"] == Importance.First.value:
first_alarm_cnt += bucket["doc_count"]
elif bucket["importance"] == Importance.Second.value:
second_alarm_cnt += bucket["doc_count"]
elif bucket["importance"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
temperature_cnt, residual_current_cnt, electric_param_cnt = 0, 0, 0
for bucket in event_type_data:
if bucket["event_type"] in [
"overTemp",
"overTempRange1min",
"overTempRange15min",
"overTempTrendDaily",
"overTempTrendQuarterly",
]:
temperature_cnt += bucket["doc_count"]
elif bucket["event_type"] in [
"overResidualCurrent",
]:
residual_current_cnt += bucket["doc_count"]
else:
electric_param_cnt += bucket["doc_count"]
time_distribution_map = await alarm_time_distribution(cids, start,
end)
alarm_percentage_map = {
"first_alarm_cnt": first_alarm_cnt,
"second_alarm_cnt": second_alarm_cnt,
"third_alarm_cnt": third_alarm_cnt,
"temperature_cnt": temperature_cnt,
"residual_current_cnt": residual_current_cnt,
"electric_param_cnt": electric_param_cnt,
"day_alarm_cnt": time_distribution_map["day_alarm_cnt"],
"night_alarm_cnt": time_distribution_map["night_alarm_cnt"],
"morning_alarm_cnt": time_distribution_map["morning_alarm_cnt"],
}
return alarm_percentage_map
async def alarm_percentage_count_old(cids):
now = datetime.now()
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
......@@ -511,26 +586,26 @@ async def alarm_safe_power(cid, start, end):
for bucket in es_res:
# 温度
if bucket["key"] in ("overTemp", "overTempRange1min",
if bucket["event_type"] in ("overTemp", "overTempRange1min",
"overTempRange15min"):
temperature_cnt += bucket["doc_count"]
# 漏电流
elif bucket["key"] in ("overResidualCurrent",):
elif bucket["event_type"] in ("overResidualCurrent",):
residual_current_cnt += bucket["doc_count"]
# 负载率
elif bucket["key"] in ("overPR",):
elif bucket["event_type"] in ("overPR",):
lr_cnt += bucket["doc_count"]
# 功率因数
elif bucket["key"] in ("underPhasePF", "underTotalPF"):
elif bucket["event_type"] in ("underPhasePF", "underTotalPF"):
power_factor_cnt += bucket["doc_count"]
# 欠压
elif bucket["key"] in ("underU",):
elif bucket["event_type"] in ("underU",):
under_u_cnt += bucket["doc_count"]
# 过压
elif bucket["key"] in ("overU",):
elif bucket["event_type"] in ("overU",):
over_u_cnt += bucket["doc_count"]
# 过流
elif bucket["key"] in ("overI",):
elif bucket["event_type"] in ("overI",):
over_i_cnt += bucket["doc_count"]
alarm_map = {
......
......@@ -13,7 +13,7 @@ from unify_api.modules.common.procedures.points import proxy_points, list_point
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, point_aggs_kwh
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
proxy_power
proxy_power, proxy_power15
from unify_api.modules.home_page.components.count_info_proxy_cps import \
IycResp, IycmResp, RtrResp, AlarmLevelCnt, AlarmContentCnt, CmResp, \
ApcResp, AsiResp, HsiResp, AiiResp
......@@ -23,7 +23,8 @@ from unify_api.modules.home_page.procedures.count_info_pds import other_info, \
electric_use_info, cid_alarm_importance_count, \
alarm_importance_count_total, power_factor, current_load, \
get_company_charge_price, health_status_res, carbon_status_res_web, \
optimization_count_info, economic_index_desc
optimization_count_info, economic_index_desc, electric_use_info_new15, \
power_factor_new15, current_load_new15
from unify_api.modules.home_page.procedures.count_info_proxy_pds import \
alarm_percentage_count, alarm_safe_power
from unify_api.modules.tsp_water.dao.drop_dust_dao import \
......@@ -50,7 +51,7 @@ async def post_zd_info_factory_service(cid_list):
safe_operation_days += safe_run_days
total_alarm += alarm_count
# 4. 累计监测用电
total_power = await proxy_power(cid_list)
total_power = await proxy_power15(cid_list)
# 5. 日均碳排放 = 累计用电 * 0.754 / 总运行天数
# 运行天数
sql = "select create_time from company where cid = %s"
......@@ -290,9 +291,9 @@ async def alarm_price_costtl_service(cid):
# 1. 今日报警
imp_dic = await alarm_importance_count_total(cid, today_start, today_end)
# 2. 实时功率因数, 上月功率因数
cos_ttl, last_month_cos = await power_factor(cid)
cos_ttl, last_month_cos = await power_factor_new15(cid)
# 3. 实时负荷
cur_load = await current_load(cid)
cur_load = await current_load_new15(cid)
# 4. 平均电价
# 昨天
yesterday_start, yesterday_end = yesterday_range()
......@@ -421,7 +422,7 @@ async def all_index_info_service(cid):
health_index = round(health_index)
health_status = health_status_res(health_index, "web")
# 2. 安全指数
elec_info = await electric_use_info(cid)
elec_info = await electric_use_info_new15(cid)
safety_index = elec_info.electric_use_score
safety_status = safety_ratio_res(safety_index, "web")
# 3. 碳排指数
......
import json
import time
import re
import pendulum
from unify_api.modules.electric.dao.electric_dao import \
get_elec_mtid_sid_by_cid
from unify_api.modules.common.dao.common_dao import monitor_by_cid
from unify_api.utils.common_utils import round_2
from pot_libs.logger import log
from pot_libs.settings import SETTING
......@@ -23,6 +29,8 @@ from unify_api.modules.zhiwei_u.dao.warning_operations_dao import\
select_point_dao
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
from unify_api.utils.taos_new import parse_td_columns, td3_tbl_compate, \
get_td_table_name
async def health_ctl_rate_service(cid):
......@@ -186,63 +194,86 @@ async def health_ctl_rate_service_new15(cid):
now_ts = int(time.time())
real_tt = now_ts
total = 0
url = f"{SETTING.stb_url}db_electric"
sql = f"select last_row(*) from electric_stb where cpyid ={cid} " \
f"group by tbname"
datas = await get_elec_mtid_sid_by_cid(cid)
td_mt_tables = tuple(
(get_td_table_name("electric", data["mtid"]) for data in datas if
data["mtid"]))
td_mt_tables = td3_tbl_compate(td_mt_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_mt_tables} group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
is_succ, results = await get_td_engine_data(url, sql)
time_str = time_format.get_datetime_str(real_tt)
if is_succ:
head = [re.findall(r'last_row\((.*)\)', i)[0] if "(" in i else i
for i in results["head"]]
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
for data in datas:
real_tt = int(time.mktime(time.strptime(data["ts"],
"%Y-%m-%d %H:%M:%S.%f")))
if now_ts - real_tt > REAL_EXP_TIME:
continue
total += 1
ctnum = data.get("ctnum")
# 电压偏差
v_dev = data.get("ua_dev") if ctnum == 3 else data.get("uab_dev")
grade = get_dev_grade(dev_type="v", cur=v_dev)
if grade and grade >= 60:
stats["v_dev"] += 1
# 频率偏差
freq_dev = data.get("freq_dev")
grade = get_dev_grade(dev_type="freq", cur=freq_dev)
if grade and grade >= 60:
stats["freq_dev"] += 1
# 三相电压不平衡度
ubl = data.get("ubl")
grade = get_dev_grade(dev_type="ubl", cur=ubl)
if grade and grade >= 60:
stats["ubl"] += 1
# 功率因数
costtl = data.get("costtl")
grade = get_dev_grade(dev_type="costtl", cur=costtl)
if grade and grade >= 60:
stats["costtl"] += 1
# (电压)谐波畸变率
thdu = data.get("thdua") if ctnum == 3 else data.get("thduab")
grade = get_dev_grade(dev_type="thdu", cur=thdu)
if grade and grade >= 60:
stats["thdu"] += 1
# 负载率
lf = data.get("lf")
if lf is None:
stats["lf"] += 1
else:
grade = get_dev_grade(dev_type="lf", cur=lf)
if grade and grade >= 60:
stats["lf"] += 1
else:
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
ubl=1,
)
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
(f"s{data['sid'].lower()}_e" for data in datas if data["sid"]))
td_s_tables = td3_tbl_compate(td_s_tables)
sql = f"select last_row(*) from electric_stb " \
f"where TBNAME IN {td_s_tables} group by tbname"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
log.warn(f"cid={cid} 无任何有效mid")
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1,
freq_dev=1,
ubl=1,
)
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
for data in datas:
real_tt = pendulum.parse(data["ts"]).int_timestamp
if now_ts - real_tt > REAL_EXP_TIME:
continue
total += 1
ctnum = data.get("ctnum")
# 电压偏差
v_dev = data.get("ua_dev") if ctnum == 3 else data.get("uab_dev")
grade = get_dev_grade(dev_type="v", cur=v_dev)
if grade and grade >= 60:
stats["v_dev"] += 1
# 频率偏差
freq_dev = data.get("freq_dev")
grade = get_dev_grade(dev_type="freq", cur=freq_dev)
if grade and grade >= 60:
stats["freq_dev"] += 1
# 三相电压不平衡度
ubl = data.get("ubl")
grade = get_dev_grade(dev_type="ubl", cur=ubl)
if grade and grade >= 60:
stats["ubl"] += 1
# 功率因数
costtl = data.get("costtl")
grade = get_dev_grade(dev_type="costtl", cur=costtl)
if grade and grade >= 60:
stats["costtl"] += 1
# (电压)谐波畸变率
thdu = data.get("thdua") if ctnum == 3 else data.get("thduab")
grade = get_dev_grade(dev_type="thdu", cur=thdu)
if grade and grade >= 60:
stats["thdu"] += 1
# 负载率
lf = data.get("lf")
if lf is None:
stats["lf"] += 1
else:
grade = get_dev_grade(dev_type="lf", cur=lf)
if grade and grade >= 60:
stats["lf"] += 1
if total == 0:
return HealthCtlRateRes(
real_time=time_str, lf=1, costtl=1, thdu=1, v_dev=1, freq_dev=1,
......@@ -257,5 +288,3 @@ async def health_ctl_rate_service_new15(cid):
freq_dev=round_2(stats["freq_dev"] / total),
ubl=round_2(stats["ubl"] / total),
)
......@@ -13,7 +13,7 @@ from unify_api.modules.common.procedures.points import proxy_points, get_points
from unify_api.modules.common.procedures.power_cps import power_use_count, \
power_use_count_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
proxy_power
proxy_power, proxy_power15
from unify_api.modules.home_page.components.count_info_proxy_cps import (
CountInfoProxyResp,
ProxySecurityLevelCntResp,
......@@ -43,7 +43,7 @@ from unify_api.modules.home_page.procedures.count_info_proxy_pds import (
total_run_day_proxy,
)
from unify_api.modules.home_page.procedures.security_info_pds import \
alarm_count_info
alarm_count_info_new15
from unify_api.modules.home_page.service.count_info_service import \
safe_run_sdu, safe_run_sdu_new15
from unify_api.modules.elec_charge.components.elec_charge_cps import \
......@@ -225,7 +225,7 @@ async def post_reg_alarm_distribution(request,
if product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id
cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info(cids, start, end, date_type)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type)
type_alarm_cnt_map = alarm_info_map["type_alarm_cnt_map"]
return AlarmDistributionResp(
alarm_categories=RegAlarmCnt(
......@@ -250,7 +250,7 @@ async def post_reg_alarm_rank(request,
if product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id
cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info(cids, start, end, date_type)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type)
cid_alarm_cnt_map = alarm_info_map["cid_alarm_cnt_map"]
cid_info_map = await get_cid_info(all=True)
......@@ -277,7 +277,7 @@ async def post_zhidian_info_proxy(req, body: ProductProxyReq) -> AipResp:
# 2. 监测点位
total_monitor = await proxy_points(cid_list)
# 3. 累计监测用电
total_power = await proxy_power(cid_list)
total_power = await proxy_power15(cid_list)
# 4. 用户接入总时长, 每个工厂接入时长总和
total_run_day = await total_run_day_proxy(cid_list)
return AipResp(
......
......@@ -19,9 +19,8 @@ from unify_api.modules.home_page.components.security_info_cps import (
AlarmSummaryResp,
)
from unify_api.modules.home_page.procedures.security_info_pds import (
alarm_count_info,
alarm_content_time_distribution,
alarm_summary,
alarm_summary, alarm_count_info_new15,
)
......@@ -43,7 +42,7 @@ async def post_security_index(request, body: SecurityCountReq) -> SecurityCountR
elif product == Product.RecognitionElectric.value:
user_id = request.ctx.user_id
cids = await get_cids(user_id, product)
alarm_info_map = await alarm_count_info(cids, start, end, date_type)
alarm_info_map = await alarm_count_info_new15(cids, start, end, date_type)
first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"],
......@@ -84,7 +83,7 @@ async def post_alarm_level_distribution(request, body: SecurityCommonReq) -> Sec
else:
raise BusinessException(message=f"暂时不支持其他产品")
alarm_info_map = await alarm_count_info(req_cids, start, end, date_type)
alarm_info_map = await alarm_count_info_new15(req_cids, start, end, date_type)
first_alarm, second_alarm, third_alarm = (
alarm_info_map["first_alarm"],
alarm_info_map["second_alarm"],
......
......@@ -202,7 +202,7 @@ async def get_p_list_new15(mtid, meter_sn, start, end, time_slots):
'''
sn = meter_sn.lower()
p_field = f"p{sn}"
url = f"{SETTING.stb_url}db_electric"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"select max({p_field}) {p_field} from mt{mtid}_ele " \
f"where meter_sn='{str.upper(sn)}' " \
f"and ts >= '{start}' and ts <= '{end}' INTERVAL(1m)"
......
......@@ -9,8 +9,8 @@ from unify_api.modules.shidianu.components.open_data_cps import (
)
from unify_api.modules.alarm_manager.components.list_alarm import \
ListAlarmResponse, Alarm
from unify_api.modules.common.dao.common_dao import meter_by_mids,\
monitor_point_storey_join_in
from unify_api.modules.common.dao.common_dao import meter_by_mids, \
monitor_point_storey_join_in, points_monitor_by_cid
from unify_api.modules.common.procedures.points import point_to_mid
from unify_api.modules.shidianu.dao.open_data_dao import \
get_user_product_auth, result_longgang_by_cid, monitor_point_company
......@@ -87,9 +87,9 @@ async def basic_info_longgang_service(user_id, page_size, page_num):
async def stb_data_longgang_service(user_id, type):
db_name = {
"soe": "db_soe",
"electric": "db_electric",
"appliance": "db_appliance"
"soe": {"super": "db_soe", "suffix": "soe"},
"electric": {"super": "db_electric", "suffix": "ele"},
"appliance": {"super": "db_appliance", "suffix": "app"}
}
if type not in db_name.keys():
return success_res(code=4002, msg="type错误")
......@@ -105,10 +105,12 @@ async def stb_data_longgang_service(user_id, type):
return success_res(code=4001, msg="您没有权限访问")
token = get_token()
stb_url = f"{SETTING.stb_url}{db_name[type]}"
table_name = f"{type}_stb"
sql = f"select last_row(*) from {table_name} " \
f"where cpyid in {tuple(cids)} group by tbname"
stb_url = f"{SETTING.stb_url}{db_name[type]['super']}"
sql = f"""
select last_row(*) from electric_stb
where cpyid = {cids[0]}
group by tbname
"""
resp_str, status = await AioHttpUtils().post_data(
stb_url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
......
......@@ -24,7 +24,11 @@ async def auth_phone_verify(phone, verify):
verify_server = await RedisUtils().get(f"sms:sms_{phone}")
log.info(f"auth_phone_verify phone:{phone}, verify_server:{verify_server},"
f"verify:{verify}")
if not verify_server or verify != verify_server.decode():
if not verify_server:
return False
if isinstance(verify_server, str) and verify != verify_server:
return False
if isinstance(verify_server, bytes) and verify != verify_server.decode():
return False
return True
......
......@@ -20,7 +20,8 @@ async def wechat_login(args, host):
"user_info": args.get("user_info"),
"client_name": args.get("client_name"),
"host": host,
"product": args.get("product")
"product": args.get("product"),
"db": SETTING.mysql_db
}
try:
......@@ -45,7 +46,8 @@ async def app_login(args, host):
"client_name": args.get("client_name"),
"unionid": args.get("unionid"),
"host": host,
"product": args.get("product")
"product": args.get("product"),
"db": SETTING.mysql_db
}
if not args.get("unionid"):
return 401, {"code": 40001, "data": None, "message": "unionid is None"}
......@@ -72,6 +74,7 @@ async def web_login(args, host):
"code": args['code'],
"client_name": args.get("client_name"),
"host": host,
"db": SETTING.mysql_db
}
try:
# auth_url = "http://0.0.0.0:9000/unify-api/auth"
......@@ -110,7 +113,8 @@ async def third_login(args, host):
"password": password,
"client_name": args.get("client_name"),
"host": host,
"product": args.get("product")
"product": args.get("product"),
"db": SETTING.mysql_db
}
try:
log.info(
......@@ -148,6 +152,7 @@ async def web_third_login(args, host):
"password": password,
"client_name": args.get("client_name"),
"host": host,
"db": SETTING.mysql_db
}
try:
log.info(
......@@ -197,6 +202,7 @@ async def validation_login(args, host):
"user_id": user["user_id"],
"client_name": client_name,
"host": host,
"db": SETTING.mysql_db
}
try:
log.info(SETTING.auth_url, f"request auth_url={SETTING.auth_url} "
......
......@@ -50,7 +50,12 @@ async def load_product_auth(user_id, product_id):
cid_sql = base_sql
args = (pro_id,)
else:
cid_sql = base_sql + " and proxy = %s"
proxy_id = proxy_id.split(',')
cid_sql = f"""
select c.cid from company c
left join company_proxy_map cpm on c.cid =cpm.cid
where product = %s and is_show = 1 and proxy in %s
"""
args = (pro_id, proxy_id)
async with mysql_util.MysqlUtil() as conn:
cids = await conn.fetchall(cid_sql, args=args)
......
......@@ -28,7 +28,7 @@ class AuthView(HTTPMethodView):
resp_str, status_code = await AioHttpUtils().post(
SETTING.auth_url,
{"user_name": "balabala", "password": "balabala",
"client_name": client_name},
"client_name": client_name, "db": SETTING.mysql_db},
timeout=50,
)
print(f"resp_str = {resp_str} status={status_code}")
......
......@@ -152,7 +152,8 @@ async def post_save_userinfo(request, body: SaveUserReq):
"phone": phone,
"client_name": "validation",
"host": request.host,
"user_id": user["user_id"]
"user_id": user["user_id"],
"db": SETTING.mysql_db
}
resp_str, status = await AioHttpUtils().post(
SETTING.auth_url,
......
......@@ -54,8 +54,13 @@ async def get_user_logout(request) -> Success:
# await jwt_utils.store_token_blacklist(token)
try:
log.info(f"request logout_url={SETTING.logout_url}")
resp_str, status = await AioHttpUtils().get(
request_body = {
"db": SETTING.mysql_db
}
resp_str, status = await AioHttpUtils().post(
SETTING.logout_url,
request_body,
timeout=50,
headers={"Authorization": f"Bearer {token}"},
)
log.info(f"request auth_url resp_str={resp_str} status={status}")
......
......@@ -21,7 +21,11 @@ class RefreshView(HTTPMethodView):
status=200)
resp, status_code = await AioHttpUtils().post(
SETTING.refresh_token_url,
{"refresh_token": refresh_token, "client_name": client_name},
{
"refresh_token": refresh_token,
"client_name": client_name,
"db": SETTING.mysql_db
},
timeout=50,
headers={"Authorization": f"Bearer {token}"}
)
......
# /usr/bin/env python
# -*- coding: UTF-8 -*-l
import sys
sys.path.append(f'/home/ubuntu/data/code/unify_api_1.5/unify_api/modules/'
sys.path.append(f'/home/ubuntu/data/code/unify_api2/unify_api/modules/'
f'zhiwei_u/fault_foreast')
from filterSteadyData import filterSteadyData, testFeat
from knnForecast import classify
......
......@@ -114,7 +114,7 @@ if __name__ == '__main__':
import matplotlib.pyplot as plt
# import matplotlib;matplotlib.use("TkAgg")
os.chdir(os.path.split(__file__)[0])
filename1 = r'/home/ubuntu/data/code/unify_api_1.5/unify_api/modules/zhiwei_u/fault_foreast'
filename1 = r'/home/ubuntu/data/code/unify_api2/unify_api/modules/zhiwei_u/fault_foreast'
# sid = "A1911000284"
# files=os.listdir(filename1)
# for file in files:
......
......@@ -5,8 +5,8 @@ import pendulum
import asyncio
import aioschedule as schedule
sys.path.append("/home/ubuntu/data/code/unify_api_1.5/pot_libs")
sys.path.append("/home/ubuntu/data/code/unify_api_1.5")
sys.path.append("/home/ubuntu/data/code/unify_api2/pot_libs")
sys.path.append("/home/ubuntu/data/code/unify_api2")
from pot_libs.logger import log
from pot_libs.aiokafka_util.kafka_util import KafkaUtils
from unify_api.modules.zhiwei_u.dao.install_sheet_dao import \
......
import sys
import json
import pytest
sys.path.append(f'/home/ubuntu/data/code/unify_api_1.5/pot_libs')
sys.path.append(f'/home/ubuntu/data/code/unify_api_1.5')
sys.path.append(f'/home/ubuntu/data/code/unify_api2/pot_libs')
sys.path.append(f'/home/ubuntu/data/code/unify_api2')
from unify_api.tests.constants_t import TOKEN, HTTP_PREFIX
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
......
......@@ -8,8 +8,8 @@ import pendulum
import logging
import sys
sys.path.append("/home/ubuntu/data/code/unify_api_1.5/pot_libs")
sys.path.append("/home/ubuntu/data/code/unify_api_1.5")
sys.path.append("/home/ubuntu/data/code/unify_api2/pot_libs")
sys.path.append("/home/ubuntu/data/code/unify_api2")
from pot_libs.aiomqtt_util.hbmqtt_utils import MqttUtil
logging.basicConfig(
level=logging.INFO,
......
import json
import re
from pot_libs.logger import log
from pot_libs.aiohttp_util.aiohttp_utils import AioHttpUtils
# from bromake.modules.shidianu.service.open_data_service import get_token
from pot_libs.settings import SETTING
import base64
from pot_libs.mysql_util.mysql_util import MysqlUtil
async def get_td_engine_data(url, sql):
token = get_token()
log.info(f"token:{token},sql:{sql}")
resp_str, status = await AioHttpUtils().post_data(
url, data=sql, timeout=50,
headers={"Authorization": f"Basic {token}"}
)
log.info(f"resp_str:{resp_str},status:{status}")
if status != 200:
return False, None
results = json.loads(resp_str)
return True, results
def get_token():
user_password = f"{SETTING.td_user}:{SETTING.td_pwd}"
token = base64.b64encode(user_password.encode()).decode()
return token
async def insert_into_tidb(schema, tables, value):
sql = (
f'''
INSERT INTO {schema}.{tables} VALUES({value});
'''
)
async with MysqlUtil() as conn:
datas = await conn.fetchone(sql)
return datas if datas else {}
def test_td_engine():
"""
td_eignne insert into TiDB
"""
from pot_libs.settings import SETTING
import requests
token = get_token()
url = f"{SETTING.stb_url}db_water?tz=Asia/Shanghai"
print(token)
h = {"Authorization": f"Basic {token}"}
sql = f"select * from db_water.water_stb limit 10 "
r = requests.post(url, data=sql, headers=h)
print(r.status_code)
print(r.content)
a_list, a_dict = [], {}
for data in json.loads(r.content)['data']:
index = 0
for head in json.loads(r.content)['head']:
a_dict.update({head: data[index]})
index += 1
a_list.append(a_dict)
print(a_list)
async def elec_current_data_new16(mtids):
res_map = {}
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
table_names = [get_td_table_name("electric", mtid) for mtid in mtids]
if len(table_names) > 1:
for table_name in table_names:
sql = f"select last_row(*) from {table_name} group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
res_map.update(res_map)
return res_map, {}
else:
sql = f"select last_row(*) from " + "".join(
table_names) + "group by tbname "
is_succ, results = await get_td_engine_data(url, sql)
if is_succ:
head = parse_td_columns(results)
for res in results["data"]:
data = dict(zip(head, res))
res_map[data["mtid"]] = data
return res_map, {}
def get_td_table_name(topic, id):
"""
:param topic: 需要查询的主题
:param id: 表命名使用的id
:return:
"""
topic_map = {
"water": "water_bromake%s",
"electric": "mt%s_ele_bromake",
"pv_ele": "pv_ele%s",
"pv_sts": "pv_sts%s",
"ws": "ws%s"
}
table_name = topic_map.get(topic)
return table_name % id
# td 3.0
def td3_tbl_compate(td_tables):
if len(td_tables) > 1:
return tuple(td_tables)
return f"('{td_tables[0]}')"
def parse_td_columns(rsp_data):
head = []
for col in rsp_data["column_meta"]:
r = re.findall(r'last_row\((.*)\)', col[0])
tbl_field = r[0] if r else col[0]
head.append(tbl_field)
return head
......@@ -134,14 +134,19 @@ def year_slots(start, end):
return slots
def day_slots():
def day_slots(type='minutes'):
"""
获取一天时间点
"""
dt = my_pendulum.now().start_of('day')
slots = [
dt.add(minutes=i).format("HH:mm") for i in range(1440)
]
if type == 'minutes':
slots = [
dt.add(minutes=i).format("HH:mm") for i in range(1440)
]
else:
slots = [
dt.add(hours=i).format("HH") for i in range(24)
]
return slots
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment