Commit b127e317 authored by ZZH's avatar ZZH

remove es 2023-5-30

parent f1c2b7ac
from datetime import datetime
from pot_libs.es_util.es_utils import EsUtil
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from unify_api import constants
from unify_api.constants import SDU_ALARM_LIST
from unify_api.modules.alarm_manager.dao.list_alarm_dao import \
alarm_content_time_distribution_dao
......@@ -119,45 +115,6 @@ async def risk_distribution(start, end, point_id_list, is_new=False):
return security_user, risk_user
async def zdu_summary_info(cid, start, end):
"""
智电u, 报警总数/报警监测点数
"""
start_dt = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
end_dt = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
es_start_str = datetime(year=start_dt.year, month=start_dt.month,
day=start_dt.day).strftime(
"%Y-%m-%dT%H:%M:%S+08:00")
es_end_str = end_dt.strftime("%Y-%m-%dT%H:%M:%S+08:00")
filter_list = [
{"range": {"datetime": {"gte": es_start_str, "lte": es_end_str}}},
{"term": {"cid": cid}},
]
query_body = {
"size": 0,
"query": {"bool": {"filter": filter_list}},
"aggs": {
"alarm_cnt": {
"terms": {
"field": "point_id"
}
}
}
}
log.info("query_body query_body={}".format(query_body))
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
buckets = es_result["aggregations"]["alarm_cnt"]["buckets"]
# 总报警数
total_alarm_cnt = 0
for info in buckets:
total_alarm_cnt += info["doc_count"]
return total_alarm_cnt, len(buckets) # 总监测点数
async def alarm_content_time_distribution_pds(cid, start, end, ):
electric_param_detail = {
"harmonic": 0,
......
......@@ -325,7 +325,6 @@ async def zdu_content_distribution_service(cid, start, end, product):
async def zdu_summary_service(cid, start, end, product):
"""报警统计-统计概况信息-智电u"""
# 1. 报警总数, 报警监测点数, 平均报警 = 报警总数除以报警监测点
# total_alarm_cnt, alarm_points = await zdu_summary_info(cid, start, end)
total_alarm_cnt, alarm_points = await zdu_summary_dao(cid, start, end)
# 2. 安全运行, 本月累计安全运行天数,从月初算起, 未出现一级报警则加一天
safe_run = await zdu_alarm_aggs_date_impotent(cid, start, end)
......
......@@ -97,79 +97,6 @@ async def other_info(company_id):
return today_alarm_count, safe_run_days, alarm_count
async def other_info_old(company_id):
"""
今日报警数和累计安全运行天数,报警数
:param company_id:
:return:
"""
filters = [
{"term": {"cid": company_id}},
{"term": {"mode": "alarm"}},
# 报警包含scope类型
# {"terms": {"mode.keyword": ["alarm", "scope"]}},
]
query_body = {
"query": {"bool": {"filter": filters}},
"size": 0,
"aggs": {
"date_alarms": {
"date_histogram": {
"field": "datetime",
"order": {"_key": "desc"},
"min_doc_count": 0,
"interval": "day",
"format": "yyyy-MM-dd",
"time_zone": "+08:00",
}
}
},
}
async with EsUtil() as es:
es_result = await es.search_origin(body=query_body,
index=constants.POINT_1MIN_EVENT)
now_time = datetime.now()
# 获取到工厂安装时间create_time
async with MysqlUtil() as conn:
company_sql = "select create_time from company where cid = %s"
company = await conn.fetchone(company_sql, (company_id,))
create_time_timestamp = company["create_time"]
create_time = datetime.fromtimestamp(create_time_timestamp)
today_alarm_count = 0
alarm_count = 0
date_buckets = es_result.get("aggregations", {}).get("date_alarms",
{}).get("buckets", [])
if not date_buckets:
log.warn(
"Can not find data on es(index: %s): %s" % (
constants.POINT_1MIN_EVENT, query_body)
)
# 1. 增加逻辑,新增工厂如果还没有事件产生
# 系统安全运行天数: 当前时间 - 工厂安装时间 + 1
safe_run_days = (now_time - create_time).days + 1
return today_alarm_count, safe_run_days, alarm_count
# 5. 构造返回
# 如果每天都有报警, 防止安全运行天数-1天, 所以total_days +2
total_days = (now_time - create_time).days + 2
has_alarm_days = 0
for i in date_buckets:
if i["key_as_string"] == str(now_time)[:10]:
today_alarm_count += i["doc_count"]
if i["doc_count"] != 0:
# 没有报警,看做是安全运行了,统计累计安全运行的天数
has_alarm_days += 1
alarm_count += i["doc_count"]
safe_run_days = total_days - has_alarm_days
log.info(
f"today_alarm_count={today_alarm_count} safe_run_days={safe_run_days}")
return today_alarm_count, safe_run_days, alarm_count
async def other_info_new15(cid):
today_alarm_count, alarm_count, has_alarm_days = 0, 0, 0
sql = "SELECT DATE_FORMAT(event_datetime,'%%Y-%%m-%%d') event_date, " \
......@@ -529,24 +456,7 @@ async def current_load_new15(cid, end_dt=None):
return total
async def power_count_info(company_id):
"""
近30天負荷最大值
:param company_id:
:param start_time:
:param end_time:
:return:
"""
now = datetime.now()
start_time = (now - timedelta(30)).strftime("%Y-%m-%d %H:%M:%S")
end_time = now.strftime("%Y-%m-%d %H:%M:%S")
max_30d_load, _time = await pttl_max(company_id, start_time, end_time, -1)
cur_load = await current_load(company_id)
return cur_load, max_30d_load
async def power_count_info_new15(cid):
async def power_count_info(cid):
"""近30天負荷最大值"""
now = datetime.now()
start_time = (now - timedelta(30)).strftime("%Y-%m-%d 00:00:00")
......@@ -557,62 +467,7 @@ async def power_count_info_new15(cid):
return round_2(cur_load), round_2(max_30d_load)
async def get_max_aiao_of_filed(company_id, start_time, end_time,
filed="temperature"):
"""
近30日最高温度, 近30日最大漏电流
:param company_id:
:param start_time:
:param end_time:
:param filed:
:return:
"""
range = Range(field="time", start=start_time, end=end_time)
equal = Equal(field="cid", value=company_id)
# 这里内部调用,不做任何keyerror的假设
support_field_map = {
"residual_current": Equal(field="type", value="residual_current"),
"temperature": Equal(field="type", value="temperature"),
}
filter = Filter(
equals=[equal, support_field_map[filed]], ranges=[range], in_groups=[],
keywords=[],
)
page_request = PageRequest(page_size=1, page_num=1, sort=None,
filter=filter)
# 获取近30天漏电流最大值
query_body = EsQuery.aggr_index(page_request, stats_items=["value_max"])
async with EsUtil() as es:
es_results = await es.search_origin(body=query_body,
index=constants.LOCATION_15MIN_AIAO)
value_max = es_results.get("aggregations", {}).get("value_max_max", {})
rc_max_hits = value_max.get("hits", {}).get("hits")
max_info, location_map = {}, {}
if rc_max_hits:
max_info = rc_max_hits[0]["_source"]
location_id = max_info["location_id"]
async with MysqlUtil() as conn:
location_sql = "select `group`, `item`, `type` from location where id = %s"
location_map = await conn.fetchone(sql=location_sql,
args=(location_id,))
max_ts = max_info.get("value_max_time", 0)
max_occur_time = (
str(datetime.strptime(max_ts, "%Y-%m-%dT%H:%M:%S+08:00"))[:-3]
if max_info
else None
)
return MaxResidualCurrent(
max=round(max_info["value_max"], 2) if max_info else None,
location_name=f"{location_map['group']}_{'漏电流' if location_map['item'] == 'default' else location_map['item']}"
if location_map
else None,
occur_time=max_occur_time,
)
async def get_max_aiao_of_filed_new15(cid, start, end, filed="temperature"):
async def get_max_aiao_of_filed(cid, start, end, filed="temperature"):
value_max, location_name, occur_time = None, None, None
sql = f"SELECT a.value_max,a.value_max_time,p.name,b.item FROM " \
f"`location_15min_aiao` a LEFT JOIN location b on a.lid=b.lid " \
......@@ -675,46 +530,7 @@ async def get_company_charge_price(company_id, es_time_start, es_time_end):
return unit_price
async def power_charge_price(company_id):
"""
首页获取昨日平均电价, 上月平均电价
:param company_id:
:return:
"""
# 昨日平均电价
now = datetime.now()
yestoday = now - timedelta(1)
yestoday_start = datetime(yestoday.year, yestoday.month, yestoday.day, 0,
0, 0)
yestoday_end = yestoday_start + timedelta(1)
es_yestoday_start = datetime.strftime(yestoday_start,
"%Y-%m-%dT%H:%M:%S+08:00")
es_yestoday_end = datetime.strftime(yestoday_end,
"%Y-%m-%dT%H:%M:%S+08:00")
yestoday_price = await get_company_charge_price(company_id,
es_yestoday_start,
es_yestoday_end)
if now.month == 1:
last_month = 12
year = now.year - 1
last_month_start = datetime(year=year, month=last_month, day=1)
else:
last_month_start = datetime(year=now.year, month=now.month - 1, day=1)
last_month_end = datetime(year=now.year, month=now.month, day=1)
es_last_month_start = datetime.strftime(last_month_start,
"%Y-%m-%dT%H:%M:%S+08:00")
es_last_month_end = datetime.strftime(last_month_end,
"%Y-%m-%dT%H:%M:%S+08:00")
last_month_price = await get_company_charge_price(
company_id, es_last_month_start, es_last_month_end
)
return yestoday_price, last_month_price
async def power_charge_price_new15(cid):
async def power_charge_price(cid):
""" 首页获取昨日平均电价, 上月平均电价"""
# 昨日平均电价
now = datetime.now()
......
......@@ -5,7 +5,6 @@ from aioredis import RedisError
from elasticsearch import ElasticsearchException
from pymysql import MySQLError
from pot_libs.common.components.query import PageRequest, Range, Equal, Filter
from pot_libs.logger import log
from pot_libs.sanic_api import summary
from unify_api.modules.common.components.common_cps import CidReq
......@@ -20,13 +19,13 @@ from unify_api.modules.home_page.components.count_info_proxy_cps import \
CountInfoProxyResp, IycResp, IycmResp, RtrResp, CmResp, ApcResp, AsiResp, \
HsiResp, AiiResp
from unify_api.modules.home_page.procedures.count_info_pds import (
get_max_aiao_of_filed, get_max_aiao_of_filed_new15,
get_max_aiao_of_filed,
normal_rate_of_location, normal_rate_of_location_new15,
other_info, other_info_new15,
power_count_info, power_count_info_new15,
power_count_info,
electric_use_info,
datetime_to_timestamp,
power_charge_price, power_charge_price_new15,
power_charge_price,
cal_power_factor,
optimization_count_info, optimization_count_info_new
)
......@@ -49,17 +48,12 @@ async def post_count_info(request, body: CountInfoReq) -> CountInfoResp:
now_tt = str(datetime.now())
start_tt = str(datetime.now() - timedelta(30))
try:
# max_residual_current = await get_max_aiao_of_filed(
# company_id, start_tt, now_tt, "residual_current"
# )
max_residual_current = await get_max_aiao_of_filed_new15(
company_id, start_tt, now_tt, "residual_current"
)
# max_temperature = await get_max_aiao_of_filed(company_id, start_tt,
# now_tt, "temperature")
max_temperature = await get_max_aiao_of_filed_new15(
company_id, start_tt, now_tt, "temperature"
)
max_residual_current = await get_max_aiao_of_filed(company_id,
start_tt, now_tt,
"residual_current")
max_temperature = await get_max_aiao_of_filed(company_id, start_tt,
now_tt, "temperature")
# 温度和漏电流实时达标率
# temperature_qr, residual_current_qr = await normal_rate_of_location(
......@@ -74,17 +68,14 @@ async def post_count_info(request, body: CountInfoReq) -> CountInfoResp:
company_id)
# 实时负荷和近30日最高负荷
# current_load, max_30d_load = await power_count_info(company_id)
current_load, max_30d_load = await power_count_info_new15(company_id)
current_load, max_30d_load = await power_count_info(company_id)
# 用电安全指数, 报警分, 近30天报警1,2,3级数目
# electric_info = await electric_use_info(company_id)
electric_info = await electric_use_info(company_id)
# 昨日平均电价, 上月平均电价
# yestoday_price, last_month_price = await power_charge_price(
# company_id)
yestoday_price, last_month_price = await power_charge_price_new15(
yestoday_price, last_month_price = await power_charge_price(
company_id)
# 实时功率因数, 上月功率因数
......@@ -211,7 +202,8 @@ async def post_info_yang_chen(req, body: CountInfoReq) -> IycResp:
cid = body.cid
# return await info_yang_chen_service(cid)
return IycResp(total_point=20, air_quality=10, safe_operation_days=11,
total_water=132.80, total_kwh=0) # 扬尘接口,返回空数据
total_water=132.80, total_kwh=0) # 扬尘接口,返回空数据
@summary("工厂版首页地图信息-扬尘")
async def post_info_yang_chen_map(req, body: CountInfoReq) -> IycmResp:
......@@ -221,6 +213,7 @@ async def post_info_yang_chen_map(req, body: CountInfoReq) -> IycmResp:
return IycmResp(safety_index=89, today_alarm=5, total_max_pm25=27.12,
center_address=[], range_address=[])
@summary("获取首页用电经济指数")
async def post_electric_economic_index_new(request,
body: CountInfoReq) -> EconomicPowerCountNewResp:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment