Commit 8080048e authored by ZZH's avatar ZZH

remove es 2023-7-20

parent ea3c9232
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.logger import log
from unify_api.constants import POINT_1MIN_EVENT, SDU_ALARM_LIST
from unify_api.utils.time_format import convert_es_str
......@@ -43,58 +41,32 @@ async def new_list_alarm_dao(cid, points, start, end, importance, offset,
return total.get('total_count', 0), data
async def wx_list_alarm_dao(cids, product, start, end):
async def wx_list_alarm_dao(cids, start, end):
"""小程序消息列表, 取当前范围最新40条"""
# alarm_list = list(EVENT_TYPE_MAP.keys()) # 安电u
#
# if product == Product.RecognitionElectric.value: # 识电u
alarm_list = SDU_ALARM_LIST
query_body = {
"size": 40,
"query": {
"bool": {
"must": [
{
"terms": {
"cid": cids
}
},
{
"terms": {
"type.keyword": alarm_list
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
cid = cids[0] if isinstance(cids, (list, tuple)) else cids
cond_lst = [f"cid={cid}", f"event_type in {tuple(SDU_ALARM_LIST)}"]
if start and end:
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
log.info(f"index:{index}--query_body:{query_body}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re
s_dts = convert_es_str(start)
e_dts = convert_es_str(end)
cond_lst.append(f"event_datetime BETWEEN '{s_dts}' and '{e_dts}'")
else:
now_date = pendulum.now()
time_format = "%Y-%m-%d %H:%M:%S"
end_date = str(now_date.strftime(time_format))
cond_lst.append(f"event_datetime < '{end_date}'")
cond_str = " AND ".join(cond_lst)
async with MysqlUtil() as conn:
sql = f"select count(*) cnt from point_1min_event WHERE {cond_str};"
total = await conn.fetchone(sql)
total_count = total.get("cnt", 0)
if total_count <= 0:
return 0, []
sql = f"select * from point_1min_event WHERE {cond_str} " \
f"order by event_datetime desc limit 40;"
data = await conn.fetchall(sql)
return total_count, data
async def list_alarm_zdu_dao(cid, points, start, end, importance, event_type):
......@@ -123,50 +95,17 @@ async def list_alarm_zdu_dao(cid, points, start, end, importance, event_type):
async def wx_list_alarm_zdu_dao(cid, points, start, end):
"""智电u, wx小程序, 取当前范围最新100条"""
query_body = {
"size": 100,
"query": {
"bool": {
"must": [
{
"term": {
"cid": cid
}
},
{
"terms": {
"point_id": points
}
}
]
}
},
"sort": [
{
"datetime": {
"order": "desc"
}
}
]
}
mid_li = [f"cid={cid}"]
if len(points) == 1:
mid_li.append(f"pid={points[0]}")
else:
mid_li.append(f"pid in {tuple(points)}")
if start and end:
start_es = convert_es_str(start)
end_es = convert_es_str(end)
query_body["query"]["bool"]["must"].append(
{
"range": {
"datetime": {
"gte": start_es,
"lte": end_es
}
}
}
)
log.info(f"index:{index}--query_body:{query_body}")
async with EsUtil() as es:
es_re = await es.search_origin(body=query_body, index=index)
return es_re
mid_li.append(f"event_datetime BETWEEN '{start}' and '{end}'")
sql = f"select * from point_1min_event where {' and '.join(mid_li)} " \
f"order by event_datetime desc limit 100;"
async with MysqlUtil() as conn:
return await conn.fetchall(sql)
async def zdu_alarm_sort_dao(cid, start, end, page_size, page_num):
......
......@@ -2,8 +2,7 @@ from unify_api import constants
from unify_api.modules.alarm_manager.components.list_alarm import \
ListAlarmResponse, Alarm
from unify_api.modules.alarm_manager.dao.alarm_setting_dao import \
company_extend_dao, list_alarm_data_dao, get_list_alarm_dao, \
get_total_list_alarm_dao
company_extend_dao, get_list_alarm_dao, get_total_list_alarm_dao
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
wx_list_alarm_dao, \
wx_list_alarm_zdu_dao, list_alarm_zdu_dao, new_list_alarm_dao
......@@ -35,8 +34,8 @@ async def new_list_alarm_service(cid, storeys, offset, page_size, start, end,
for i in point_list if i["point_id"] in points]
# 2. 查询结果
total, results = await new_list_alarm_dao(cid, points, start, end,
importance,
offset, page_size)
importance,
offset, page_size)
if not results:
return ListAlarmResponse(total=0, rows=[])
# 2. 构建返回数据
......@@ -94,43 +93,36 @@ async def new_list_alarm_service(cid, storeys, offset, page_size, start, end,
async def wx_list_alarm_service(cids, product, start, end):
"""小程序消息列表"""
# 1. es查询结果
es_res = await wx_list_alarm_dao(cids, product, start, end)
if not es_res["hits"]["hits"]:
total, results = await wx_list_alarm_dao(cids, start, end)
if not results:
return ListAlarmResponse(total=0, rows=[])
# 2. 构建返回数据
cid_info_map = await get_cid_info(all=True)
rows = []
for info in es_res["hits"]["hits"]:
es_id = info["_id"]
source = info["_source"]
type = source.get("type")
mode = source.get("mode")
for res in results:
es_id = res.get("id")
type = res.get("event_type")
type_str = constants.SDU_EVENT_TYPE_MAP.get(type, type)
point_id = source.get("point_id")
location_id = source.get("location_id")
date_time = source.get("datetime")
point_id = res.get("pid")
date_time = res.get("event_datetime")
dt = time_format.convert_to_dt(date_time)
date_time = time_format.convert_dt_to_timestr(dt)
event_duration = source.get("event_duration")
cid = int(source.get("cid")) if source.get("cid") else source.get(
"cid")
event_duration = res.get("event_duration")
cid = int(res.get("cid")) if res.get("cid") else res.get("cid")
alarm = Alarm(
point_id=point_id,
name=source.get("name"),
importance=source.get("importance"),
name=res.get("name"),
importance=res.get("importance"),
date_time=date_time,
type=type,
type_name=type_str,
description=source.get("message"),
description=res.get("message"),
es_id=es_id,
event_duration=event_duration,
company_name=cid_info_map.get(cid, {}).get("fullname", ""),
)
rows.append(alarm)
# total小程序不分页, 返回了但是不用
real_total = es_res["hits"]["total"]
return ListAlarmResponse(total=real_total, rows=rows)
return ListAlarmResponse(total=total, rows=rows)
async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
......@@ -193,26 +185,26 @@ async def list_alarm_zdu_service(cid, point_list, page_num, page_size, start,
async def wx_list_alarm_zdu_service(cid, point_list, start, end):
"""小程序消息列表-智电u"""
# 1. es查询结果
es_res = await wx_list_alarm_zdu_dao(cid, point_list, start, end)
if not es_res["hits"]["hits"]:
results = await wx_list_alarm_zdu_dao(cid, point_list, start, end)
if not results:
return ListAlarmResponse(total=0, rows=[])
# 2. 获取工厂, 报警type对应的描述信息
event_dic = await company_extend_dao(cid)
event_dic_map = {event["key"]: event for event in event_dic}
# 3. 构建返回数据
rows = []
for info in es_res["hits"]["hits"]:
es_id = info["_id"]
source = info["_source"]
type = source.get("type")
mode = source.get("mode")
for res in results:
es_id = res.get("id")
type = res.get("event_type")
mode = res.get("event_mode")
type_str = constants.EVENT_TYPE_MAP.get(type, type)
point_id = source.get("point_id")
location_id = source.get("location_id")
date_time = source.get("datetime")
point_id = res.get("pid")
location_id = res.get("lid")
date_time = res.get("event_datetime")
dt = time_format.convert_to_dt(date_time)
date_time = time_format.convert_dt_to_timestr(dt)
event_duration = source.get("event_duration")
event_duration = res.get("event_duration")
if point_id and mode == "scope":
url = "/scope_details?doc_id=%s" % es_id
redirect_type = "scope"
......@@ -224,8 +216,8 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end):
redirect_type = ""
alarm = Alarm(
name=source.get("name"),
importance=source.get("importance"),
name=res.get("name"),
importance=res.get("importance"),
date_time=date_time,
type=type,
type_name=type_str,
......@@ -235,13 +227,12 @@ async def wx_list_alarm_zdu_service(cid, point_list, start, end):
url=url,
event_duration=event_duration,
point_id=point_id,
content=source.get("message"),
content=res.get("message"),
)
rows.append(alarm)
# total小程序不分页, 返回了但是不用
total = es_res["hits"]["total"]
return ListAlarmResponse(total=total, rows=rows)
return ListAlarmResponse(total=len(results), rows=rows)
async def list_alarm_service(cid, point_id, start, end, importance, page_size,
......
import calendar
from datetime import datetime
import pendulum
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import CST
......@@ -14,13 +9,13 @@ def point_day2month(dt):
if isinstance(dt, int) or isinstance(dt, float):
dt = pendulum.from_timestamp(dt, tz="Asia/Shanghai")
es_index = f"{constants.POINT_1MIN_INDEX}_{dt.year}_{dt.month}"
elif isinstance(dt, datetime):
es_index = f"{constants.POINT_1MIN_INDEX}_{dt.year}_{dt.month}"
else:
es_index = constants.POINT_1MIN_INDEX
return es_index
......@@ -28,7 +23,7 @@ async def today_alarm_cnt(cids):
start_time = pendulum.today(tz="Asia/Shanghai")
es_end_time = start_time.subtract(days=-1).format("YYYY-MM-DD HH:mm:ss")
es_start_time = start_time.format("YYYY-MM-DD HH:mm:ss")
sql = f"""
select cid,count(*) count
from point_1min_event pe
......@@ -40,9 +35,9 @@ async def today_alarm_cnt(cids):
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql,
args=(cids, es_start_time, es_end_time))
cid_bucket_map = {i["cid"]: i["count"] for i in datas}
cid_alarm_map = {cid: {"today_alarm_count": 0} for cid in cids}
for cid in cids:
alarm_count = cid_bucket_map.get("cid") or 0
......@@ -50,10 +45,8 @@ async def today_alarm_cnt(cids):
return cid_alarm_map
async def proxy_safe_run_info(cids, start_time_str=None,
end_time_str=None):
end_time_str=None):
"""
批量获取 各个工厂的安全运行天数以及今日报警数, 如果是获取月份的,那么计算这个月的安全运行天数
:param cids:
......@@ -86,12 +79,12 @@ async def proxy_safe_run_info(cids, start_time_str=None,
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql=sql, args=(cids,))
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select cid, create_time from company where cid in %s"
companys = await conn.fetchall(company_sql, (cids,))
create_time_timestamp_map = {
company["cid"]: pendulum.from_timestamp(
company["create_time"], tz=CST) for company in companys
......@@ -116,7 +109,7 @@ async def proxy_safe_run_info(cids, start_time_str=None,
total_days = (end_dt - create_dt).days + 1
elif create_ts > end_ts:
total_days = 0
has_alarm_days = cid_alarm_count_dict.get("cid") or 0
safe_run_days = total_days - has_alarm_days
cid_alarm_map[cid]["safe_run_days"] = safe_run_days
......@@ -144,70 +137,15 @@ async def alarm_time_distribution(company_ids, start, end):
"""
async with MysqlUtil() as conn:
datas = await conn.fetchall(sql, args=(company_ids,))
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0,
"morning_alarm_cnt": 0}
for data in datas:
hour = int(data["event_hour"])
if hour >= 6 and hour < 18:
if 6 <= hour < 18:
time_distribution_map["day_alarm_cnt"] += data["event_count"]
elif hour >= 18 and hour <= 23:
elif 18 <= hour <= 23:
time_distribution_map["night_alarm_cnt"] += data["event_count"]
else:
time_distribution_map["morning_alarm_cnt"] += data["event_count"]
return time_distribution_map
async def alarm_time_distribution_old(company_ids, start, end):
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00"
)
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str, }}},
{"terms": {"cid": company_ids}}]
query_body = {
"query": {"bool": {"filter": filter_list}},
"size": 0,
"aggs": {
"cid_aggs": {
"terms": {"field": "cid", "size": 10000},
"aggs": {
"time_alarms": {
"date_histogram": {
"field": "datetime",
"order": {"_key": "desc"},
"min_doc_count": 1,
"interval": "hour",
"format": "HH",
"time_zone": "+08:00",
}
}
},
}
},
}
log.info("alarm time distribute query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
print(f"alarm time distribute es_result = {es_result}")
buckets = es_result["aggregations"]["cid_aggs"]["buckets"] or []
time_distribution_map = {"day_alarm_cnt": 0, "night_alarm_cnt": 0,
"morning_alarm_cnt": 0}
for i in buckets:
cid_buckets = i.get("time_alarms", {}).get("buckets", [])
for item in cid_buckets:
hour = int(item["key_as_string"])
if hour >= 6 and hour < 18:
time_distribution_map["day_alarm_cnt"] += item["doc_count"]
elif hour >= 18 and hour <= 23:
time_distribution_map["night_alarm_cnt"] += item["doc_count"]
else:
time_distribution_map["morning_alarm_cnt"] += item["doc_count"]
return time_distribution_map
from datetime import datetime
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
async def month_md_space(inline_id: int, month_list: list):
......
......@@ -5,12 +5,9 @@ This is a tornado process and responds request from web back server.
"""
import pendulum
import datetime
import pandas as pd
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api.constants import ENERGY_INVESTMENT_ESS, \
INLINE_15MIN_POWER_ESINDEX, INLINE_1DAY_POWER_ESINDEX
from unify_api.constants import ENERGY_INVESTMENT_ESS
from unify_api.modules.energy_optimize.service.ess_optimation_tool import \
EssOptimizationTool
from unify_api.modules.energy_optimize.service.ess_utils import \
......@@ -27,11 +24,11 @@ class AutoDic(dict):
class EnergyStoreOptimize(object):
def __init__(self, inlid):
self._inlid = inlid
# self._r_cache = redis.Redis(host="172.18.1.253", db=1)
async def calc_inline(self, ess_system):
rlt = {'rlt_flag': True}
inl_info = await self._get_inline_info()
......@@ -47,7 +44,7 @@ class EnergyStoreOptimize(object):
max_dt.int_timestamp)
time_str_d = PricePolicyHelper.quarter_chars_2_time_str(
pp_info_d['quarters'])
# construct inline_var
inline_var = {'inline_capacity': inl_info['tc_runtime'],
'capacity_price': pp_info_d['price_tc'],
......@@ -68,10 +65,10 @@ class EnergyStoreOptimize(object):
sct = await self._contruct_section('v', pp_info_d, time_str_d,
max_dt)
inline_var['section_v'] = sct
# contruct df_curve
df_curve = await self._get_kw_curve(max_dt)
# handle return
if len(df_curve) == 0:
rlt['rlt_flag'] = False
......@@ -90,7 +87,7 @@ class EnergyStoreOptimize(object):
eot.economic_evaluate)
rlt['opt_curve'] = self.convert_opt_curve(eot.opt_curve)
return rlt
def convert_economic_evaluate(self, economic_evaluate):
invest_income_table = economic_evaluate['invest_income_table']
table = []
......@@ -106,7 +103,7 @@ class EnergyStoreOptimize(object):
table.append(tmp_d)
economic_evaluate['invest_income_table'] = table
return economic_evaluate
def convert_opt_curve(self, opt_curve):
rlt = []
for idx, row in opt_curve.iterrows():
......@@ -117,7 +114,7 @@ class EnergyStoreOptimize(object):
tmp_d['load_bat_curve'] = row['load_bat_curve']
rlt.append(tmp_d)
return rlt
async def _contruct_section(self, p_char, pp_info_d, time_str_d, max_dt):
""" contruct section_x for inline_var."""
section = {'price': pp_info_d['price_' + p_char]}
......@@ -126,7 +123,7 @@ class EnergyStoreOptimize(object):
time_range_str = ';'.join(time_str_d[p_char])
section['time_range'] = time_range_str
return section
async def _get_inline_info(self):
""" get inline_vc, tc_runtime, cid from redis.
:return: a dict
......@@ -141,7 +138,7 @@ class EnergyStoreOptimize(object):
'tc_runtime': info['tc_runtime'],
'cid': info['cid']}
return rlt
async def _get_company_price_policy(self, cid):
# pp_json = await RedisUtils(db=1).hget(PRICE_POLICY_HASHNAME, str(cid))
# pp_json = self._r_cache.hget(PRICE_POLICY_HASHNAME, str(cid))
......@@ -155,7 +152,7 @@ class EnergyStoreOptimize(object):
return result
# pp = json.loads(pp_json)
# return pp
async def _build_max_kwh_day(self):
""" build es query sentance for find max kwh day."""
dt = pendulum.now()
......@@ -170,9 +167,9 @@ class EnergyStoreOptimize(object):
async with MysqlUtil() as conn:
result = await conn.fetchone(sql, args=(self._inlid, start_time,
end_time))
return result.get("create_time") if result else None
async def _find_kwh_max_day(self):
""" find the max kwh day in latest 6 months.
:return: a dt object, or None if no doc
......@@ -183,7 +180,7 @@ class EnergyStoreOptimize(object):
return create_time
create_time = create_time.strftime("%Y-%m-%d %H:%M:%S")
return pendulum.parse(create_time, tz='Asia/Shanghai')
async def _build_aggs_kwh(self, p_char, start_dt):
end_dt = start_dt.add(days=1)
start_time = start_dt.format("YYYY-MM-DD HH:mm:ss")
......@@ -198,12 +195,12 @@ class EnergyStoreOptimize(object):
result = await conn.fetchone(sql, args=(self._inlid, p_char,
start_time,
end_time))
return result.get("kwh") if result else 0
async def _get_total_kwh_of_one_pchar(self, p_char, start_dt):
return await self._build_aggs_kwh(p_char, start_dt)
async def _build_kw_curve(self, start_dt):
end_dt = start_dt.add(days=1)
start_time = start_dt.format("YYYY-MM-DD HH:mm:ss")
......@@ -219,7 +216,7 @@ class EnergyStoreOptimize(object):
start_time,
end_time))
return results or []
async def _get_kw_curve(self, start_dt):
hits_list = await self._build_kw_curve(start_dt)
kw_list = []
......
from pot_libs.logger import log
from unify_api.utils.time_format import convert_es_str
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.es_util.es_query import EsQuery
from pot_libs.common.components.query import PageRequest, Equal, Filter
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
ScopeDetailsResponse
from pot_libs.mysql_util.mysql_util import MysqlUtil
......
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.settings import SETTING
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao, \
get_sid_by_mtid_dao, get_mtids_by_pids_dao
from unify_api.utils.common_utils import make_tdengine_data_as_list
from unify_api.utils.taos_new import get_td_engine_data
from unify_api.utils.time_format import convert_es_str, CST
from unify_api.modules.zhiwei_u.config import SCOPE_DATABASE
from pot_libs.mysql_util.mysql_util import MysqlUtil
......
......@@ -21,16 +21,9 @@ from unify_api.modules.zhiwei_u.dao.scope_operations_dao import \
select_cname_by_cids, select_sid_by_pids
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
SearchScopeResq, Alarm
from pot_libs.common.components.query import (
PageRequest,
Equal,
Filter
)
from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username
from pot_libs.common.components.responses import Success
from unify_api.modules.electric.procedures.electric_util import get_wiring_type
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.es_util.es_query import EsQuery
from pot_libs.logger import log
from unify_api.modules.zhiwei_u.components.scope_operations_cps import \
ScopeDetailsResponse, ScopeContent
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment