Commit fb18f2f4 authored by lcn's avatar lcn

修复Bug

parent 80208422
......@@ -217,7 +217,7 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
for info in es_re_this:
cid = info.get("key")
cid_name = com_dic[cid]["shortname"]
kwh = round_2(info.get("kwh")["value"])
if kwh == 0:
continue
......@@ -229,9 +229,10 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
continue
except Exception as e:
log.error(e)
log.info(f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
log.info(
f"本次有电量数据, 上周期没有电量数据, cid:{cid}, start:{start}, end:{end}")
continue
charge = round_2(info.get("charge")["value"])
try:
charge_last = es_re_last_dic[cid]["charge"]["value"]
......@@ -241,7 +242,8 @@ async def power_aggs_cid_proxy(start, end, cid_list, date_type):
except Exception as e:
log.error(e)
log.info("本次有数据, 上周期没有数据")
log.info(f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
log.info(
f"本次有电费数据, 上周期没有电费数据, cid:{cid}, start:{start}, end:{end}")
continue
price = round_2(charge / kwh)
price_last = round_2(charge_last / kwh_last)
......
import pendulum
from unify_api.constants import POINT_LEVEL_MAP
from unify_api.modules.common.components.common_cps import LevelResp
from unify_api.modules.common.procedures.points import points_by_storeys
......@@ -6,6 +8,8 @@ from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, query_charge_aggs_points_new15
from unify_api.modules.electric.dao.electric_dao import \
monitor_point_join_by_points
from unify_api.modules.home_page.procedures.count_info_pds import current_load, \
current_load_new15
from unify_api.utils.common_utils import round_2, division_two
......@@ -86,11 +90,11 @@ async def kwh_card_level_service(cid, point_list, start, end):
kwh = round_2(es_res[point_id]["kwh"])
charge = round_2(es_res[point_id]["charge"])
price = round_2(division_two(charge, kwh))
res_dic["kwh"] = kwh
res_dic["charge"] = charge
res_dic["price"] = price
ret_data[m_type].append(res_dic)
return LevelResp(
inline=ret_data["inline"],
......@@ -101,4 +105,11 @@ async def kwh_card_level_service(cid, point_list, start, end):
)
async def load_info_service(cid_list):
# 实时负荷
cur_load = await current_load_new15(cid_list)
yesterday_dt = pendulum.now(tz="Asia/Shanghai").subtract(days=1)
yes_load = await current_load_new15(cid_list, yesterday_dt)
load_percent = round((cur_load - yes_load) / yes_load,
2) if cur_load and yes_load else ""
return cur_load, yes_load, load_percent
......@@ -16,14 +16,16 @@ from unify_api.modules.elec_charge.components.elec_charge_cps import \
power_overview_example, PricePolicyReq, PricePolicyResp, \
PricePolicy, AverPriceReq, PowerViewRes, Spvf, AverPriceResp, ChargeKwh, \
IndexChargeReq, IndexChargeResp, PopReq, PopResp, MtpResp, PspResp, \
IpspResp, KpReq, KpResp, KclReq, ProductProxyReq
IpspResp, KpReq, KpResp, KclReq, ProductProxyReq, LoadInfoReq, LoadInfoResp
from unify_api.modules.elec_charge.dao.elec_charge_dao import \
query_charge_aggs_points, get_kwh_charge, query_charge_aggs_points_new15
from unify_api.modules.elec_charge.procedures.elec_charge_pds import \
quarters_trans, power_overview_proxy, total_value, power_aggs_cid_proxy_new15, \
quarters_trans, power_overview_proxy, total_value, \
power_aggs_cid_proxy_new15, \
power_index_cid_proxy_new15, power_overview_proxy15
from unify_api.modules.elec_charge.service.elec_charge_service import \
kwh_points_service, kwh_card_level_service
kwh_points_service, kwh_card_level_service, load_info_service
from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.utils.common_utils import round_2, round_4, NumListHelper
from unify_api.utils.es_query_body import agg_statistics
from unify_api.utils.request_util import filed_value_from_list
......@@ -289,7 +291,7 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
proxy_id = body.proxy_id
host = req.host
product = PRODUCT.get(host)
user_id = req.ctx.user_id
user_id = jwt_user(req)
# 全部工厂
if not cid_list:
log.info(f"power_overview_proxy根据用户userId:{user_id} "
......@@ -314,13 +316,14 @@ async def post_power_overview_proxy(req, body: PopReq) -> PopResp:
# 获取上一周期开始结束时间
start_last, end_last = last_time_str(start, end, date_type)
power, charge = await power_overview_proxy15(start, end, cid_list)
power_last, charge_last = await power_overview_proxy15(start_last, end_last,
cid_list)
power_last, charge_last = await power_overview_proxy15(start_last,
end_last,
cid_list)
if not all([power, charge, power_last, charge_last]):
return PopResp(power=Spvf(), charge=Spvf())
total_power = total_value(power)
total_charge = total_value(charge)
total_power_last = total_value(power_last)
total_charge_last = total_value(charge_last)
# 增长率
......@@ -351,7 +354,7 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
# 1. 获取参数
host = req.host
product = PRODUCT.get(host)
user_id = req.ctx.user_id
user_id = jwt_user(req)
# cid_list = await get_cids(user_id, product)
proxy_id = body.proxy_id
cid_list = await get_proxy_cids(user_id, product, proxy_id) \
......@@ -364,7 +367,7 @@ async def post_month_today_proxy(req, body: ProductProxyReq) -> MtpResp:
today_start, today_end, month_start, month_end = today_month_date()
# 2. 本月/上月数据
last_month_start, last_month_end = last_time_str(month_start, month_end,
"month")
"month", True)
this_month_p, this_month_charge = await power_overview_proxy15(
month_start, month_end, cid_list)
last_month_p, last_month_charge = await power_overview_proxy15(
......@@ -541,3 +544,31 @@ async def post_kwh_card_level(req, body: KclReq) -> LevelResp:
start = body.start
end = body.end
return await kwh_card_level_service(cid, point_list, start, end)
@summary("获取知电管理版首页负荷信息")
async def post_load_info(request, body: LoadInfoReq) -> LoadInfoResp:
# 1. 获取company_id
# 1. 获取参数
product = PRODUCT.get(request.host)
user_id = jwt_user(request)
proxy_id = body.proxy_id
cid_list = await get_proxy_cids(user_id, product, proxy_id) \
if proxy_id else None
# 全部工厂
if not cid_list:
log.info(f"未查询到工厂, userId:{user_id} product:{product}")
return LoadInfoResp()
try:
# 实时负荷,昨日同时负荷,对比昨日
current_load, yesterday_load, load_percent = await load_info_service(
cid_list)
except Exception as e:
log.exception(e)
return LoadInfoResp().server_error()
return LoadInfoResp(
current_load=current_load,
yesterday_load=yesterday_load,
load_percent=load_percent
)
......@@ -5,7 +5,7 @@ async def monitor_point_join_by_points(points):
"""monitor和point关联"""
sql = "SELECT m.mtid,p.ctnum,m.name, m.m_type, p.pid,p.cid " \
"FROM monitor m inner join point p on m.mtid = p.mtid " \
"WHERE p.pid in %s and m.demolished = 0 order by field(p.pid,{})".\
"WHERE p.pid in %s and m.demolished = 0 order by field(p.pid,{})". \
format(str(points).replace("[", "").replace("]", ""))
async with MysqlUtil() as conn:
monitor_point_list = await conn.fetchall(sql, args=(tuple(points),))
......@@ -26,7 +26,7 @@ async def get_electric_datas_dao(table_name, pid, start, end):
sql = f"SELECT * FROM {table_name} where pid=%s and create_time " \
f"BETWEEN '{start}' and '{end}' ORDER BY create_time desc"
async with MysqlUtil() as conn:
electric_datas = await conn.fetchall(sql, args=(pid, ))
electric_datas = await conn.fetchall(sql, args=(pid,))
return electric_datas
......@@ -35,7 +35,7 @@ async def get_qual_history_dao(table_name, pid, start, end, date_format):
f"p.* FROM {table_name} p where p.pid=%s and p.create_time " \
f"BETWEEN '{start}' and '{end}' order by p.create_time"
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(pid, ))
datas = await conn.fetchall(sql, args=(pid,))
return datas
......@@ -51,6 +51,12 @@ async def get_elec_history_dao(table_name, pid, start, end, date_format):
async def get_elec_mtid_sid_by_cid(cid):
if isinstance(cid, tuple):
cid_tuple = cid
elif isinstance(cid, list):
cid_tuple = tuple(cid)
else:
cid_tuple = (cid,)
sql = (
f"""
SELECT
......@@ -59,9 +65,9 @@ async def get_elec_mtid_sid_by_cid(cid):
FROM
monitor
WHERE
cid = {cid};
cid in %s;
"""
)
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql)
datas = await conn.fetchall(sql, args=(cid_tuple,))
return datas if datas else []
......@@ -3,6 +3,10 @@ import time
from datetime import datetime, timedelta
from math import sqrt
import pendulum
from pot_libs.settings import SETTING
from unify_api.modules.electric.dao.electric_dao import \
get_elec_mtid_sid_by_cid
from unify_api.utils.common_utils import round_2
from pot_libs.aredis_util.aredis_utils import RedisUtils
from pot_libs.common.components.query import Range, Equal, Filter, PageRequest
......@@ -35,7 +39,9 @@ from unify_api.modules.home_page.dao.count_info_dao import (
from unify_api.modules.electric_optimization.dao.power_index import (
price_policy_by_cid
)
from unify_api.utils.taos_new import parse_td_columns
from unify_api.utils.taos_new import parse_td_columns, get_td_table_name, \
td3_tbl_compate, get_td_engine_data
async def other_info(company_id):
"""
......@@ -52,7 +58,7 @@ async def other_info(company_id):
GROUP BY
DATE(pevent.event_datetime)
"""
now_time = datetime.now()
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
......@@ -61,7 +67,7 @@ async def other_info(company_id):
company = await conn.fetchone(company_sql, (company_id,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
today_alarm_count = 0
alarm_count = 0
if not alarm_data:
......@@ -72,7 +78,7 @@ async def other_info(company_id):
# 系统安全运行天数: 当前时间 - 工厂安装时间 + 1
safe_run_days = (now_time - create_time).days + 1
return today_alarm_count, safe_run_days, alarm_count
# 5. 构造返回
# 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2
total_days = (now_time - create_time).days + 2
......@@ -119,20 +125,20 @@ async def other_info_old(company_id):
}
},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
now_time = datetime.now()
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select create_time from company where cid = %s"
company = await conn.fetchone(company_sql, (company_id,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
today_alarm_count = 0
alarm_count = 0
date_buckets = es_result.get("aggregations", {}).get("date_alarms",
......@@ -146,7 +152,7 @@ async def other_info_old(company_id):
# 系统安全运行天数: 当前时间 - 工厂安装时间 + 1
safe_run_days = (now_time - create_time).days + 1
return today_alarm_count, safe_run_days, alarm_count
# 5. 构造返回
# 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2
total_days = (now_time - create_time).days + 2
......@@ -214,7 +220,7 @@ async def electric_use_info(company_id):
end_timestamp = datetime_to_timestamp(now)
start_timestamp = datetime_to_timestamp(
datetime(now.year, now.month, now.day) - timedelta(30))
score_events = [
i
for i in EVENT_TYPE_MAP.keys()
......@@ -256,12 +262,12 @@ async def electric_use_info(company_id):
},
},
}
log.info("cal_score_safe_electric query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
score_buckets = (
es_result.get("aggregations", {}).get("score_aggs", {}).get("types",
{}).get(
......@@ -277,7 +283,7 @@ async def electric_use_info(company_id):
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
company_point_map = await get_points([company_id])
point_len = len(company_point_map.get(company_id) or {})
alarm_score = (
......@@ -290,15 +296,15 @@ async def electric_use_info(company_id):
f"alarm_score:{alarm_score}")
if alarm_score >= 15:
alarm_score = 15
electric_use_score = get_electric_index(alarm_score)
log.info(
"point_len={} alarm_score={} electric_use_score={}".format(
point_len, alarm_score, electric_use_score
)
)
alarm_buckets = (
es_result.get("aggregations", {}).get("alarm_aggs", {}).get("types",
{}).get(
......@@ -337,7 +343,7 @@ async def electric_use_info_new15(cid):
f"GROUP BY importance"
first_score, second_score, third_score = 0, 0, 0
async with MysqlUtil() as conn:
score_datas = await conn.fetchall(score_sql, args=(cid, ))
score_datas = await conn.fetchall(score_sql, args=(cid,))
alarm_datas = await conn.fetchall(alarm_sql, args=(cid, score_events))
for data in score_datas:
if data["importance"] == Importance.First.value:
......@@ -389,27 +395,27 @@ async def normal_rate_of_location(company_id):
)
for location in locations:
location_map[location["id"]] = location
# todo批量hmget
count_info_map = {
"residual_current": {"total": 0, "normal": 0},
"temperature": {"total": 0, "normal": 0},
}
print(f"len(location_map)={len(location_map)}")
location_ids = list(location_map.keys())
adio_currents = []
if location_ids:
adio_currents = await RedisUtils().hmget("adio_current",
*location_ids)
*location_ids)
adio_info_map = {}
for index, item_byte in enumerate(adio_currents):
if item_byte:
item = json.loads(item_byte.decode())
adio_info_map[location_ids[index]] = item
for location_id, location_info in location_map.items():
audio_info = adio_info_map.get(location_id)
count_info_map[location_info["type"]]["total"] += 1
......@@ -420,7 +426,7 @@ async def normal_rate_of_location(company_id):
# 超过4小时的值不统计在normal里
log.warn(f"adio_current location_id={location_id} has expire!")
continue
print(
"threshold={} location_info['type'] = {} audio_info['value']={}".format(
location_info["threshold"], location_info["type"],
......@@ -447,7 +453,7 @@ async def normal_rate_of_location(company_id):
)
+ "%"
)
if count_info_map["residual_current"]["total"] == 0:
residual_current_qr = "100%"
else:
......@@ -463,7 +469,7 @@ async def normal_rate_of_location(company_id):
)
+ "%"
)
return temperature_qr, residual_current_qr
......@@ -488,13 +494,13 @@ async def normal_rate_of_location_new15(cid):
adio_currents = []
if location_ids:
adio_currents = await RedisUtils().hmget("adio_current",
*location_ids)
*location_ids)
adio_info_map = {}
for index, item_byte in enumerate(adio_currents):
if item_byte:
item = json.loads(item_byte.decode())
adio_info_map[location_ids[index]] = item
for location_id, location_info in location_map.items():
audio_info = adio_info_map.get(location_id)
count_info_map[location_info["type"]]["total"] += 1
......@@ -525,7 +531,7 @@ async def normal_rate_of_location_new15(cid):
)
+ "%"
)
if count_info_map["residual_current"]["total"] == 0:
residual_current_qr = "100%"
else:
......@@ -541,7 +547,7 @@ async def normal_rate_of_location_new15(cid):
)
+ "%"
)
return temperature_qr, residual_current_qr
......@@ -556,10 +562,10 @@ async def current_load(company_id):
"and add_to_company = 1"
points = await conn.fetchall(point_sql, args=(company_id,))
point_ids = [p["pid"] for p in points]
if not point_ids:
return ""
async with MysqlUtil() as conn:
meter_sql = (
"SELECT pid, mid FROM change_meter_record WHERE pid in %s ORDER BY pid, start_time"
......@@ -569,15 +575,15 @@ async def current_load(company_id):
# 正序排序,最后这个map存储的是按照start_time是最近的mid
change_meter_map = {m["pid"]: m["mid"] for m in change_meters if
m["mid"] is not None}
newest_mids = list(change_meter_map.values())
meterdata_currents = []
if newest_mids:
meterdata_currents = await RedisUtils().hmget(METERDATA_CURRENT_KEY,
*newest_mids)
*newest_mids)
now_tt = int(time.time())
if meterdata_currents:
total = 0
for item in meterdata_currents:
......@@ -598,39 +604,46 @@ async def current_load(company_id):
return ""
async def current_load_new15(cid):
async def current_load_new15(cid, end_dt=None):
"""实时负荷"""
import re
from pot_libs.settings import SETTING
from unify_api.modules.common.service.td_engine_service import \
get_td_engine_data
datas = await get_elec_mtid_sid_by_cid(cid)
td_mt_tables = tuple(
(get_td_table_name("electric", data["mtid"]) for data in datas if
data["mtid"]))
td_mt_tables = td3_tbl_compate(td_mt_tables)
if not end_dt:
end_dt = pendulum.now(tz="Asia/Shanghai")
start_dt = end_dt.subtract(minutes=2)
sql = f"select last_row(mdptime, pttl) from electric_stb " \
f"where TBNAME IN {td_mt_tables} and ts>='{str(start_dt)}' and ts " \
f"<='{str(end_dt)}' group by tbname"
url = f"{SETTING.stb_url}db_electric?tz=Asia/Shanghai"
sql = f"""
select last_row(*) from electric_stb
where cpyid={cid}
group by tbname
"""
is_succ, results = await get_td_engine_data(url, sql)
now_tt = int(time.time())
if is_succ:
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
total = 0
for item in datas:
# 这里是有可能item为None的
if item:
mdptime_tt = None
if "mdptime" in item:
mdptime_dt = pendulum.parse(item["mdptime"])
item_tt = item.get("timestamp") or mdptime_dt.int_timestamp
if item_tt:
# 小于2分钟内的数据相加为实时负荷
if now_tt - item_tt <= 2 * 60:
total += item["pttl"]
return total
return ""
if not is_succ:
return ""
if not results["data"]: # 兼容:mt表(2.0架构)里面拿不到数据再从sid表(1.0架构)里面拿
td_s_tables = tuple(
(f"s{data['sid'].lower()}_e" for data in datas if data["sid"]))
td_s_tables = td3_tbl_compate(td_s_tables)
sql = f"select last_row(mdptime, pttl) from electric_stb " \
f"where TBNAME IN {td_s_tables} group by tbname"
is_succ, results = await get_td_engine_data(url, sql)
if not is_succ:
return ""
head = parse_td_columns(results)
datas = []
for res in results["data"]:
datas.append(dict(zip(head, res)))
total = 0
for item in datas:
if not item:
continue
total += item["pttl"]
return total
async def power_count_info(company_id):
......@@ -644,7 +657,7 @@ async def power_count_info(company_id):
now = datetime.now()
start_time = (now - timedelta(30)).strftime("%Y-%m-%d %H:%M:%S")
end_time = now.strftime("%Y-%m-%d %H:%M:%S")
max_30d_load, _time = await pttl_max(company_id, start_time, end_time, -1)
cur_load = await current_load(company_id)
return cur_load, max_30d_load
......@@ -655,7 +668,7 @@ async def power_count_info_new15(cid):
now = datetime.now()
start_time = (now - timedelta(30)).strftime("%Y-%m-%d 00:00:00")
end_time = now.strftime("%Y-%m-%d %H:%M:%S")
max_30d_load, _time = await pttl_max_new15(cid, start_time, end_time, -1)
cur_load = await current_load_new15(cid)
return round_2(cur_load), round_2(max_30d_load)
......@@ -691,7 +704,7 @@ async def get_max_aiao_of_filed(company_id, start_time, end_time,
index=constants.LOCATION_15MIN_AIAO)
value_max = es_results.get("aggregations", {}).get("value_max_max", {})
rc_max_hits = value_max.get("hits", {}).get("hits")
max_info, location_map = {}, {}
if rc_max_hits:
max_info = rc_max_hits[0]["_source"]
......@@ -706,7 +719,7 @@ async def get_max_aiao_of_filed(company_id, start_time, end_time,
if max_info
else None
)
return MaxResidualCurrent(
max=round(max_info["value_max"], 2) if max_info else None,
location_name=f"{location_map['group']}_{'漏电流' if location_map['item'] == 'default' else location_map['item']}"
......@@ -769,8 +782,9 @@ async def company_power_use_info_new15(company_id, start, end):
async def get_company_charge_price(company_id, es_time_start, es_time_end):
power_use_info = await company_power_use_info_new15(company_id, es_time_start,
es_time_end)
power_use_info = await company_power_use_info_new15(company_id,
es_time_start,
es_time_end)
if power_use_info["kwh"]:
unit_price = power_use_info["charge"] / power_use_info["kwh"]
else:
......@@ -790,7 +804,7 @@ async def power_charge_price(company_id):
yestoday_start = datetime(yestoday.year, yestoday.month, yestoday.day, 0,
0, 0)
yestoday_end = yestoday_start + timedelta(1)
es_yestoday_start = datetime.strftime(yestoday_start,
"%Y-%m-%dT%H:%M:%S+08:00")
es_yestoday_end = datetime.strftime(yestoday_end,
......@@ -798,7 +812,7 @@ async def power_charge_price(company_id):
yestoday_price = await get_company_charge_price(company_id,
es_yestoday_start,
es_yestoday_end)
if now.month == 1:
last_month = 12
year = now.year - 1
......@@ -813,7 +827,7 @@ async def power_charge_price(company_id):
last_month_price = await get_company_charge_price(
company_id, es_last_month_start, es_last_month_end
)
return yestoday_price, last_month_price
......@@ -835,7 +849,7 @@ async def power_charge_price_new15(cid):
year = now.year - 1
last_month_start = datetime(year=year, month=last_month, day=1)
else:
last_month_start = datetime(year=now.year, month=now.month-1, day=1)
last_month_start = datetime(year=now.year, month=now.month - 1, day=1)
last_month_end = datetime(year=now.year, month=now.month, day=1)
last_month_datas = await company_power_use_info_new15(
cid, str(last_month_start), str(last_month_end)
......@@ -859,19 +873,19 @@ async def power_factor(company_id):
)
points = await conn.fetchall(point_sql, args=(company_id, 1))
point_ids = [i["pid"] for i in points]
now = datetime.now()
if now.month == 1:
last_month_dt = datetime(year=now.year - 1, month=12, day=1)
else:
last_month_dt = datetime(year=now.year, month=now.month - 1, day=1)
# 首页功率因数取所有进线中最小的
async with MysqlUtil() as conn:
sql = "SELECT inlid, `name` FROM inline WHERE cid=%s"
inlines = await conn.fetchall(sql, args=(company_id,))
inline_ids = [inline["inlid"] for inline in inlines]
power_factor_results = []
sql = "SELECT inlid, save_charge pf_cost, `kpi_x`, `save_charge` " \
"FROM algo_power_factor_result WHERE inlid in %s and month=%s"
......@@ -883,7 +897,7 @@ async def power_factor(company_id):
type(i["kpi_x"]) in [int, float]
]
last_month_cos = min(pf_kpi_x_list) if len(pf_kpi_x_list) else ""
async with EsUtil() as es:
dt = pendulum.now(tz="Asia/Shanghai")
tstamp = dt.int_timestamp // (15 * 60) * (15 * 60)
......@@ -913,18 +927,18 @@ async def power_factor(company_id):
"qttl_mean": item["qttl_mean"],
}
)
total_pttl, total_qttl = 0, 0
for point_id, records in point_map.items():
total_pttl += records[0]["pttl_mean"]
total_qttl += records[0]["qttl_mean"]
# 计算实时功率的公式
cos_ttl = ""
l = sqrt(total_pttl * total_pttl + total_qttl * total_qttl)
if l:
cos_ttl = round(total_pttl / l, 2)
if type(last_month_cos) in [int, float]:
last_month_cos = round(last_month_cos, 2)
return cos_ttl, last_month_cos
......@@ -934,7 +948,7 @@ async def power_factor_new15(cid):
"""首页获取实时功率因数, 上月功率因数"""
point_sql = "select pid,inlid from point where cid=%s and add_to_company=1"
async with MysqlUtil() as conn:
points = await conn.fetchall(point_sql, args=(cid, ))
points = await conn.fetchall(point_sql, args=(cid,))
point_ids = [i["pid"] for i in points]
now = datetime.now()
if now.month == 1:
......@@ -957,11 +971,11 @@ async def power_factor_new15(cid):
type(i["kpi_x"]) in [int, float]
]
last_month_cos = min(pf_kpi_x_list) if len(pf_kpi_x_list) else ""
dt = pendulum.now(tz="Asia/Shanghai")
tstamp = dt.int_timestamp // (15 * 60) * (15 * 60)
dt = pendulum.from_timestamp(tstamp, tz="Asia/Shanghai")
end_dt = (dt-timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S")
end_dt = (dt - timedelta(minutes=15)).strftime("%Y-%m-%d %H:%M:%S")
str_dt = dt.strftime("%Y-%m-%d %H:%M:%S")
electric_sql = f"SELECT pid,create_time,pttl_mean,qttl_mean FROM " \
f"`point_15min_electric` where create_time in " \
......@@ -985,7 +999,7 @@ async def power_factor_new15(cid):
l = sqrt(total_pttl * total_pttl + total_qttl * total_qttl)
if l:
cos_ttl = round(total_pttl / l, 2)
if type(last_month_cos) in [int, float]:
last_month_cos = round(last_month_cos, 2)
return cos_ttl, last_month_cos
......@@ -997,12 +1011,12 @@ async def optimization_count_info(company_id: int):
:param company_id:
:return:
"""
async with MysqlUtil() as conn:
sql = "SELECT inlid, `name` FROM inline WHERE cid=%s"
inlines = await conn.fetchall(sql, args=(company_id,))
inline_ids = [inline["inlid"] for inline in inlines]
# 获取公司上月用电
# now = datetime.now()
# es_start_time = (
......@@ -1014,12 +1028,12 @@ async def optimization_count_info(company_id: int):
# "%Y-%m-%dT%H:%M:%S+08:00")
# power_use_info = await company_power_use_info(company_id, es_start_time,
# es_end_time)
now = datetime.now()
start_time = (
pendulum.datetime(now.year, now.month, 1)
.subtract(months=1)
.strftime("%Y-%m-%d %H:%M:%S")
.subtract(months=1)
.strftime("%Y-%m-%d %H:%M:%S")
)
end_time = pendulum.datetime(now.year, now.month, 1).strftime(
"%Y-%m-%d %H:%M:%S")
......@@ -1043,13 +1057,13 @@ async def optimization_count_info(company_id: int):
}
)
return count_info_map
now = datetime.now()
if now.month == 1:
last_month_dt = datetime(year=now.year - 1, month=12, day=1)
else:
last_month_dt = datetime(year=now.year, month=now.month - 1, day=1)
# 功率因数
async with MysqlUtil() as conn:
sql = "SELECT inlid, `cos`, save_charge pf_cost, kpi_x, save_charge " \
......@@ -1063,7 +1077,7 @@ async def optimization_count_info(company_id: int):
2,
)
total_pf_save = 0 if total_pf_save <= 0 else total_pf_save
pf_kpi_x_list = [
i["kpi_x"] for i in power_factor_results if
type(i["kpi_x"]) in [int, float]
......@@ -1079,20 +1093,20 @@ async def optimization_count_info(company_id: int):
pf_desc = "空间适中"
else:
pf_desc = "空间较大"
count_info_map["power_factor"] = {
"save_charge": total_pf_save if pf_kpi_x != "" else "",
"kpi_x": pf_kpi_x,
"desc": pf_desc,
}
# 移峰填谷指数
async with MysqlUtil() as conn:
sql = "select `score`, `cost_save` from `algo_plsi_result` " \
"where `inlid` in %s and `month` = %s"
last_month_str = datetime.strftime(last_month_dt, "%Y-%m")
pcvfs = await conn.fetchall(sql, args=(inline_ids, last_month_str))
pcvf_kpi_x_list = [i["score"] for i in pcvfs if
type(i["score"]) in [int, float]]
pcvf_kpi_x = min(pcvf_kpi_x_list) if len(pcvf_kpi_x_list) else ""
......@@ -1100,7 +1114,7 @@ async def optimization_count_info(company_id: int):
sum([i["cost_save"] for i in pcvfs if
i["cost_save"] and i["cost_save"] >= 0]), 2
)
if pcvf_kpi_x == "":
pcvf_desc = ""
elif pcvf_kpi_x >= 90:
......@@ -1111,14 +1125,14 @@ async def optimization_count_info(company_id: int):
pcvf_desc = "空间适中"
else:
pcvf_desc = "空间较大"
total_pcvf_save = 0 if total_pcvf_save <= 0 else total_pcvf_save
count_info_map["pcvf"] = {
"save_charge": total_pcvf_save if pcvf_kpi_x != "" else "",
"kpi_x": pcvf_kpi_x,
"desc": pcvf_desc,
}
# 经济运行
async with MysqlUtil() as conn:
sql = "select `kpi_x`, `save_charge`, `mean_load_factor` " \
......@@ -1155,13 +1169,13 @@ async def optimization_count_info(company_id: int):
economic_desc = "空间适中"
else:
economic_desc = "空间较大"
count_info_map["power_save"] = {
"save_charge": total_economic_save if economic_kpi_x != "" else "",
"kpi_x": economic_kpi_x,
"desc": economic_desc,
}
# 最大需量
async with MysqlUtil() as conn:
sql = (
......@@ -1173,7 +1187,7 @@ async def optimization_count_info(company_id: int):
)
last_month_str = datetime.strftime(last_month_dt, "%Y-%m")
md_spaces = await conn.fetchall(sql, args=(inline_ids, last_month_str))
md_space_kpi_x_list = [i["kpi_x"] for i in md_spaces if
type(i["kpi_x"]) in [int, float]]
md_space_kpi_x = max(md_space_kpi_x_list) if len(
......@@ -1202,7 +1216,7 @@ async def optimization_count_info(company_id: int):
"kpi_x": md_space_kpi_x,
"desc": md_space_desc,
}
total_save_cost = 0
for _, item in count_info_map.items():
total_save_cost += (
......@@ -1211,12 +1225,12 @@ async def optimization_count_info(company_id: int):
)
save_percent = total_save_cost / month_charge if month_charge else ""
count_info_map["save_percent"] = save_percent
# 计算最大需量
async with MysqlUtil() as conn:
sql = "select `price_md`,`price_tc` from `price_policy` where `cid`=%s"
price_policy = await conn.fetchone(sql, args=(company_id,))
total_md_space_charge = sum(
[i["inline_md_charge"] for i in md_spaces if i["inline_md_charge"]])
total_md_space_p = (
......@@ -1225,7 +1239,7 @@ async def optimization_count_info(company_id: int):
else ""
)
count_info_map["md_space_p"] = total_md_space_p
# 经济运行最低负载率
mean_load_factors = [
i["mean_load_factor"] for i in economic_operations if
......@@ -1260,12 +1274,12 @@ async def electric_use_info_sdu(cid):
}
}
}
log.info("cal_score_safe_electric query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
score_buckets = (
es_result.get("aggregations", {}).get("alarm_aggs", {}).get("buckets",
[])
......@@ -1280,7 +1294,7 @@ async def electric_use_info_sdu(cid):
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
company_point_map = await get_points([cid])
point_len = len(company_point_map.get(cid) or {})
alarm_score = (
......@@ -1291,9 +1305,9 @@ async def electric_use_info_sdu(cid):
)
if alarm_score >= 15:
alarm_score = 15
electric_use_score = get_electric_index(alarm_score)
log.info(
"point_len={} alarm_score={} electric_use_score={}".format(
point_len, alarm_score, electric_use_score
......@@ -1366,12 +1380,12 @@ async def electric_use_info_points_sdu(start, end, points):
}
}
}
log.info("electric_use_info_points query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
score_buckets = (
es_result.get("aggregations", {}).get("alarm_aggs", {}).get("buckets",
[])
......@@ -1386,15 +1400,15 @@ async def electric_use_info_points_sdu(start, end, points):
second_alarm_cnt += bucket["doc_count"]
elif bucket["key"] == Importance.Third.value:
third_alarm_cnt += bucket["doc_count"]
alarm_score = (first_alarm_cnt * 2 + second_alarm_cnt * 1 +
third_alarm_cnt * 0.5) / len(points)
if alarm_score >= 15:
alarm_score = 15
electric_use_score = get_electric_index(alarm_score)
log.info(
"point_len={} alarm_score={} electric_use_score={}".format(
len(points), alarm_score, electric_use_score
......@@ -1415,9 +1429,9 @@ async def electric_use_info_points_sdu_new15(start, end, points):
f"where pid in %s and event_datetime BETWEEN %s and %s " \
f"and event_type in %s group by importance"
async with MysqlUtil() as conn:
results = await conn.fetchall(sql,args=(points,start,end,
SDU_ALARM_LIST))
results = await conn.fetchall(sql, args=(points, start, end,
SDU_ALARM_LIST))
first_alarm_cnt = 0
second_alarm_cnt = 0
third_alarm_cnt = 0
......@@ -1428,15 +1442,15 @@ async def electric_use_info_points_sdu_new15(start, end, points):
second_alarm_cnt += result["doc_count"]
elif result["importance"] == Importance.Third.value:
third_alarm_cnt += result["doc_count"]
alarm_score = (first_alarm_cnt * 2 + second_alarm_cnt * 1 +
third_alarm_cnt * 0.5) / len(points)
if alarm_score >= 15:
alarm_score = 15
electric_use_score = get_electric_index(alarm_score)
log.info(
"point_len={} alarm_score={} electric_use_score={}".format(
len(points), alarm_score, electric_use_score
......@@ -1457,21 +1471,22 @@ async def optimization_count_info_new(company_id: int):
:param company_id:
:return:
"""
inlines = await get_inline_by_cid(company_id)
inline_ids = [inline["inlid"] for inline in inlines]
# 获取公司上月用电
now = datetime.now()
es_start_time = (
pendulum.datetime(now.year, now.month, 1)
.subtract(months=1)
.strftime("%Y-%m-%dT%H:%M:%S+08:00")
.subtract(months=1)
.strftime("%Y-%m-%dT%H:%M:%S+08:00")
)
es_end_time = pendulum.datetime(now.year, now.month, 1).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
power_use_info = await company_power_use_info_new15(company_id, es_start_time,
es_end_time)
power_use_info = await company_power_use_info_new15(company_id,
es_start_time,
es_end_time)
month_charge = power_use_info["charge"]
count_info_map = {
"avg_price": power_use_info["charge"] / power_use_info["kwh"]
......@@ -1497,7 +1512,7 @@ async def optimization_count_info_new(company_id: int):
last_month_dt = datetime(year=now.year, month=now.month - 1, day=1)
last_month_str = datetime.strftime(last_month_dt, "%Y-%m")
# 功率因数
power_factor_results = await get_power_factor_kpi(inline_ids,
last_month_dt)
total_pf_save = round(
......@@ -1510,7 +1525,7 @@ async def optimization_count_info_new(company_id: int):
(i["name"], i["kpi_x"]) for i in power_factor_results if
type(i["kpi_x"]) in [int, float]
]
if len(pf_kpi_x_list):
pf_kpi_x_num = [pf_kpi[1] for pf_kpi in pf_kpi_x_list]
pf_kpi_x = min(pf_kpi_x_num)
......@@ -1545,12 +1560,12 @@ async def optimization_count_info_new(company_id: int):
pcvfs = await get_pcvf_kpi(inline_ids, last_month_str)
pcvf_kpi_x_list = [(i["name"], i["score"]) for i in pcvfs if
type(i["score"]) in [int, float]]
if len(pcvf_kpi_x_list):
pcvf_kpi_x_num = [pcvf_kpi[1] for pcvf_kpi in pcvf_kpi_x_list]
pcvf_kpi_x = min(pcvf_kpi_x_num)
pcvf_kpi_x_name = []
if pcvf_kpi_x < 70:
for index, kpi_num in enumerate(pcvf_kpi_x_num):
if kpi_num < 70:
......@@ -1564,7 +1579,7 @@ async def optimization_count_info_new(company_id: int):
f"引入新能源,转移高峰电量至低谷"
else:
pcvf_desc = "平均电价处于较低水平,请继续保持"
else:
pcvf_kpi_x = ""
pcvf_desc = ""
......@@ -1582,7 +1597,7 @@ async def optimization_count_info_new(company_id: int):
sum([i["cost_save"] for i in pcvfs if
i["cost_save"] and i["cost_save"] >= 0]), 2
)
total_pcvf_save = 0 if total_pcvf_save <= 0 else total_pcvf_save
count_info_map["pcvf"] = {
"save_charge": total_pcvf_save if pcvf_kpi_x != "" else "",
......@@ -1590,7 +1605,7 @@ async def optimization_count_info_new(company_id: int):
"desc": pcvf_desc,
"space": pcvf_space
}
# 经济运行
economic_operations = await get_economic_kpi(inline_ids, last_month_str)
economic_kpi_x_list = [
......@@ -1651,14 +1666,14 @@ async def optimization_count_info_new(company_id: int):
"desc": economic_desc,
"space": econ_space
}
# 容量、需量价格
price_policy = await price_policy_by_cid(company_id)
price_md = price_policy["price_md"] if price_policy["price_md"] else 0
price_tc = price_policy["price_tc"] if price_policy["price_tc"] else 0
# 最大需量
md_spaces = await get_md_space(inline_ids, last_month_dt)
md_space_kpi_x_list = [i["kpi_x"] for i in md_spaces if
type(i["kpi_x"]) in [int, float]]
md_space_kpi_x = max(md_space_kpi_x_list) if len(
......@@ -1688,13 +1703,13 @@ async def optimization_count_info_new(company_id: int):
md_space_tc_runtimes[index]["tc_runtime"] * price_tc >= \
price_md * item["inline_md_predict"]:
md_space_name.append(md_space_tc_runtimes[index]["name"])
if len(md_space_name):
md_space_desc = f"若次月负荷无较大变动,建议{'、'.join(md_space_name)}" \
f"选择按最大需量计费"
else:
md_space_desc = "不存在容改需空间"
count_info_map["md_space"] = {
"save_charge": total_md_space_save if md_space_kpi_x != "" else "",
"kpi_x": md_space_kpi_x,
......@@ -1736,7 +1751,7 @@ async def cid_alarm_importance_count(cid, start, end):
point_list = [i["pid"] for i in monitor_point_list]
es_res = await sdu_alarm_importance_dao_new15(start, end, point_list)
es_res_key = {i["key"]: i for i in es_res}
res_list = []
for info in monitor_point_list:
name = info.get("name")
......@@ -1752,7 +1767,7 @@ async def cid_alarm_importance_count(cid, start, end):
tmp_dic["second"] += b["doc_count"]
elif b["key"] == Importance.Third.value:
tmp_dic["third"] += b["doc_count"]
tmp_dic["alarm_count"] = tmp_dic["first"] + tmp_dic["second"] + \
tmp_dic[
"third"]
......
......@@ -115,6 +115,7 @@ def proxy_power_slots(start, end, date_format="MM-DD", is_duration=False):
slots.append(dt)
return slots
def day_of_month(start):
"""这个月有几天"""
start_f = my_pendulum.from_format(start, 'YYYY-MM-DD HH:mm:ss')
......@@ -311,7 +312,7 @@ def convert_to_es_str(str1, format="YYYY-MM-DD HH:mm:ss"):
return str(es_date)
def last_time_str(start, end, date_type):
def last_time_str(start, end, date_type, date_end=False):
"""年月日, 获取上一周期时间"""
if date_type not in ("day", "month", "year"):
return None
......@@ -319,13 +320,22 @@ def last_time_str(start, end, date_type):
end_f = my_pendulum.from_format(end, 'YYYY-MM-DD HH:mm:ss')
if date_type == "day":
start_last = start_f.subtract(days=1)
end_last = end_f.subtract(days=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(days=1)
elif date_type == "month":
start_last = start_f.subtract(months=1)
end_last = end_f.subtract(months=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(months=1)
else:
start_last = start_f.subtract(years=1)
end_last = end_f.subtract(years=1)
if date_end:
end_last = start_last.end_of(unit=date_type)
else:
end_last = end_f.subtract(years=1)
return start_last.format("YYYY-MM-DD HH:mm:ss"), end_last.format(
"YYYY-MM-DD HH:mm:ss")
......@@ -356,7 +366,7 @@ def get_datetime_str(timestamp):
'''
if not timestamp:
timestamp = time.time()
time_array = time.localtime(timestamp)
return time.strftime("%Y-%m-%d %H:%M:%S", time_array)
......@@ -671,7 +681,7 @@ def get_last_3month_start_end(date=None):
:param dt: datetime obj
:return:
"""
if not date:
dt = get_current_date_time()
else:
......@@ -733,7 +743,7 @@ def get_this_year_start_end(date=None):
:param dt: datetime obj
:return:
"""
if not date:
dt = get_current_date_time()
else:
......@@ -749,7 +759,7 @@ def get_last_year_start_end(date=None):
:param dt: datetime obj
:return:
"""
if not date:
dt = get_current_date_time()
else:
......@@ -765,7 +775,7 @@ def get_last_month_start_end(date=None):
:param dt: datetime obj
:return:
"""
if not date:
dt = get_current_date_time()
else:
......@@ -873,7 +883,7 @@ def get_current_start_n_end_dt(date_type, include_end_day=False):
start_year += 1
start_month = start_month - 12
start = datetime.datetime(start_year, start_month, 1)
elif date_type == "season":
season_number = int((current_dt.month - 1) / 3)
start = datetime.datetime(current_dt.year, season_number * 3 + 1, 1)
......@@ -984,7 +994,7 @@ def get_previous_slot(slots, date_type, compare_type=None):
return get_slots_between_date(
start - datetime.timedelta(days=days),
end - datetime.timedelta(days=days), date_type)
elif date_type == "hour":
hours = int((end - start).total_seconds() / 3600) + 1
return get_slots_between_date(start - datetime.timedelta(hours=hours),
......@@ -1032,8 +1042,8 @@ def get_slots_between_date(start, end, interval_type, growth_type=None):
while start <= end:
date_dts.append([start, start + datetime.timedelta(days=1)])
start += datetime.timedelta(days=1)
elif interval_type == "month":
start_months = start.year * 12 + start.month
end_months = end.year * 12 + end.month
......@@ -1145,7 +1155,7 @@ def deco_convert_date_to_dt(f):
if kwarg in kwargs and kwargs[kwarg]:
kwargs[kwarg] = convert_to_dt(kwargs[kwarg])
return f(*args, **kwargs)
return deco
......@@ -1432,7 +1442,7 @@ def get_time_duration_by_str(duration_str):
days = int(duration_str / (60 * 60 * 24))
if days > 0:
return_str += "%s天" % str(days)
hours = int(duration_str % (60 * 60 * 24) / (60 * 60))
if hours > 0:
return_str += "%s时" % str(hours)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment