from aioredis import RedisError from elasticsearch import ElasticsearchException from pymysql import MySQLError from pot_libs.logger import log from pot_libs.sanic_api import summary from pot_libs.utils.exc_util import BusinessException from unify_api.constants import Product from unify_api.modules.alarm_manager.procedures.alarm_static_pds import \ alarm_content_time_distribution_pds from unify_api.modules.common.procedures.cids import get_cids, get_proxy_cids from unify_api.modules.home_page.components.security_info_cps import ( SecurityCountReq, SecurityCountResp, AlarmContentDistributionResp, SecurityCommonReq, ElectricParam, LevelCount, ContentCount, AlarmSummaryResp, ) from unify_api.modules.home_page.procedures.security_info_pds import ( alarm_summary, alarm_count_info, ) from unify_api.modules.users.procedures.jwt_user import jwt_user @summary("获取首页今日或者近30天安全报警统计信息") async def post_security_index(request, body: SecurityCountReq) -> SecurityCountResp: try: product = body.product cid = body.cid start = body.start end = body.end date_type = body.date_type cids = [cid] if product == Product.AndianUManage.value: user_id = request.ctx.user_id # cids = await get_cids(user_id, product) proxy_id = body.proxy_id cids = await get_proxy_cids(user_id, product, proxy_id) elif product == Product.RecognitionElectric.value: user_id = request.ctx.user_id cids = await get_cids(user_id, product) alarm_info_map = await alarm_count_info(cids, start, end, date_type) first_alarm, second_alarm, third_alarm = ( alarm_info_map["first_alarm"], alarm_info_map["second_alarm"], alarm_info_map["third_alarm"], ) except (ElasticsearchException, MySQLError, RedisError) as e: log.exception(e) return SecurityCountResp().db_error() except Exception as e: log.exception(e) return SecurityCountResp().server_error() return SecurityCountResp( first_alarm=first_alarm, second_alarm=second_alarm, third_alarm=third_alarm, ) @summary("获取工厂(可多个)日, 月份安全报警不同等级时间分布信息") async def post_alarm_level_distribution(request, body: SecurityCommonReq) -> SecurityCountResp: """ 目前用于 1.安电U管理版本->报警记录 :param request: :param body: :return: """ product = body.product req_cids = body.cids start = body.start end = body.end date_type = body.date_type if product == Product.AndianUManage.value: user_id = request.ctx.user_id # cids = await get_cids(user_id, product) proxy_id = body.proxy_id cids = await get_proxy_cids(user_id, product, proxy_id) if any(i not in cids for i in req_cids): raise BusinessException(message=f"你没有工厂{set(req_cids) - set(cids)}没有权限") else: raise BusinessException(message=f"暂时不支持其他产品") alarm_info_map = await alarm_count_info(req_cids, start, end, date_type) first_alarm, second_alarm, third_alarm = ( alarm_info_map["first_alarm"], alarm_info_map["second_alarm"], alarm_info_map["third_alarm"], ) return SecurityCountResp( first_alarm=first_alarm, second_alarm=second_alarm, third_alarm=third_alarm, level_detail=LevelCount( first_alarm_cnt=sum(first_alarm["value"]), second_alarm_cnt=sum(second_alarm["value"]), third_alarm_cnt=sum(third_alarm["value"]), ), ) @summary("获取工厂(可多个)日, 月份安全报警内容时间分布信息") async def post_alarm_content_distribution( request, body: SecurityCommonReq ) -> AlarmContentDistributionResp: """ 目前用于 1.安电U管理版本->报警记录 :param request: :param body: :return: """ product = body.product req_cids = body.cids start = body.start end = body.end date_type = body.date_type if product == Product.AndianUManage.value: user_id = jwt_user(request) # cids = await get_cids(user_id, product) proxy_id = body.proxy_id cids = await get_proxy_cids(user_id, product, proxy_id) if any(i not in cids for i in req_cids): raise BusinessException(message=f"你没有工厂{set(req_cids) - set(cids)}没有权限") else: raise BusinessException(message=f"暂时不支持其他产品") alarm_info_map = await alarm_content_time_distribution_pds(req_cids, start, end) temperature, residual_current, electric_param, electric_param_detail = ( alarm_info_map["temperature"], alarm_info_map["residual_current"], alarm_info_map["electric_param"], alarm_info_map["electric_param_detail"], ) return AlarmContentDistributionResp( temperature=temperature, residual_current=residual_current, electric_param=electric_param, content_detail=ContentCount( temperature_cnt=sum(temperature["value"]), residual_current_cnt=sum(residual_current["value"]), electric_param_cnt=sum(electric_param["value"]), ), electric_param_detail=ElectricParam(**electric_param_detail), ) @summary("获取工厂(可多个)报警统计概况信息") async def post_alarm_summary(request, body: SecurityCommonReq) -> AlarmSummaryResp: """ 目前用于 1.安电U管理版本->报警记录 :param request: :param body: :return: """ product = body.product req_cids = body.cids start = body.start end = body.end date_type = body.date_type if not req_cids: raise BusinessException(message=f"暂无工厂") if product == Product.AndianUManage.value: user_id = jwt_user(request) # cids = await get_cids(user_id, product) proxy_id = body.proxy_id cids = await get_proxy_cids(user_id, product, proxy_id) if any(i not in cids for i in req_cids): raise BusinessException(message=f"你没有工厂{set(req_cids) - set(cids)}没有权限") else: raise BusinessException(message=f"暂时不支持其他产品") summary_map = await alarm_summary(req_cids, start, end, date_type) return AlarmSummaryResp(**summary_map)