from pot_libs.common.components.responses import Success from pot_libs.logger import log from pot_libs.sanic_api import summary, description, examples from pot_libs.utils.exc_util import BusinessException from unify_api.modules.anshiu.components.scope_operations_cps import \ ScopeListReq, ScopeListResp, GetScopeConfigReq, GetScopeConfigResp, \ SetScopeConfigReq, SetScopeConfigResp, \ scope_list_req_example, ScopeDetailsResp, ScopeDetailRep, \ set_scope_config_example, InitScopeConfigReq, FlushScopeEsDataReq, \ ScopeListDownloadReq, ScopeListDownloadResp, ScopeDetailResp from unify_api.modules.anshiu.service.scope_operations_serv import \ search_scope_service, scope_detail_data, get_scope_config_serv, \ set_scope_config_serv, init_scope_config, flush_scope_es_data, \ scope_list_download_data, scope_detail_service @summary("识别记录-列表") @description("列表的时候正常传页码,下载的时候is_download=1") @examples(scope_list_req_example) async def post_scope_list(request, body: ScopeListReq) -> ScopeListResp: ''' 识别记录 ''' cid = body.cid page_size = body.page_size page_num = body.page_num start = body.start end = body.end scope_g = body.scope_g pids = body.pids is_download = body.is_download # 监测点选中全部 if pids and pids[0] == -1: pids = [] # 未选中监测点且未选中工厂提示错误 if len(pids) == 0 and not cid: log.error("post_scope_list_param_error pids:%s cid:%s" % (pids, cid)) return ScopeListResp.error_param() if page_num * page_size > 30000: log.error(f"post_scope_list_param_error page_too_large page_num" f":{page_num},page_size:{page_size}") raise BusinessException( message='只能查询前%s条数据,建议缩小查询范围' % 30000) # 下载(限制最大10000条) if is_download == 1: page_num, page_size = 1, 10000 # 替换scope_g if scope_g: scope_g = ['200ms' if i == '0.2s' else i for i in scope_g] rows, total = await search_scope_service(pids, cid, (page_num - 1) * page_size, page_size, start, end, scope_g) return ScopeListResp(rows=rows, total=total, page_num=page_num) @summary("2s录波数据下载") async def post_scope_list_download(request, body: ScopeListDownloadReq) -> ScopeListDownloadResp: start = body.start end = body.end pids = body.pids if len(pids) > 1: raise BusinessException(message="只允许下载一个监测点的数据") datas = await scope_list_download_data(pids, start, end) return ScopeListDownloadResp(rows=datas) @summary('数据统计-录波查询-录波详情') async def post_scope_detail(req, body: ScopeDetailRep) -> ScopeDetailResp: # 1,获取参数 event_id = body.id # 2,获取信息 data, u_list, i_list, residual_current, power = await scope_detail_service( event_id) # 3,返回信息 return ScopeDetailResp( point=data.get("name"), ctnum=data.get("ctnum"), check_dt=data.get("event_datetime"), contin_time=data.get("duration"), item=data.get("item"), scope_g=data.get("scope_g"), type=data.get("type"), v=u_list, location=data.get("location"), i=i_list, residual_current=residual_current, p=power, ) @summary("识别设置-获取配置信息") async def post_get_scope_config(request, body: GetScopeConfigReq) -> GetScopeConfigResp: ''' 识别设置-配置展示 ''' pid = body.pid try: data = await get_scope_config_serv(pid) except Exception as e: log.error('post_get_scope_config error:' + str(e)) return GetScopeConfigResp.server_error() return GetScopeConfigResp(pid=pid, rows=data) @summary("识别设置-设置配置信息") @examples(set_scope_config_example) async def post_set_scope_config(request, body: SetScopeConfigReq) -> SetScopeConfigResp: ''' 识别设置-配置设置 ''' pid = body.pid type = body.type scope_type = body.scope_type # 每一种类型需要的字段 fields = {'state': ['state'], 'i': ['imax', 'igap'], 'v': ['umax', 'umin', 'ugap'], 'residual_current': ['lcmax', 'lcgap'], 'power': ['pttlmax', 'pttlgap'], 'time': ['one_time', 'two_time', 'three_time']} args = {} for field in fields.get(type): args[field] = getattr(body, field) try: await set_scope_config_serv(pid, type, scope_type, args) except Exception as e: log.error('post_set_scope_config' + str(e)) return SetScopeConfigResp(success=0, message=str(e)) return SetScopeConfigResp(success=1, message='操作成功') @summary("识别设置-初始化设备配置信息(开发专用!)") async def post_init_scope_config(request, body: InitScopeConfigReq) -> Success: pids = body.pids # user_id = request.ctx.user_id # if user_id not in ['100653']: # return Success(success=0, message='无此操作权限') error_list = [] for pid in pids: try: await init_scope_config(pid) except Exception as e: log.error(f'{pid}:post_init_scope_config error {str(e)}') error_list.append(str(e)) continue if error_list: log.error(f"post_init_scope_config error:{','.join(error_list)}") else: log.info( f"post_init_scope_config success total_success_count:" f"{str(len(pids))}") return Success(success=1, message=','.join(error_list)) @summary("刷新es录波数据(开发专用!)") async def post_flush_scope_es_data(request, body: FlushScopeEsDataReq) -> \ Success: scope_type_list = body.scope_type_list start_time = body.start_time end_time = body.end_time try: for scope_g in scope_type_list: if scope_g not in ['200ms', '2s', '0.25ms']: continue await flush_scope_es_data(scope_g, start_time, end_time) except Exception as e: log.error(f'post_flush_scope_es_data error {str(e)}') return Success(success=0, message=str(e)) return Success(success=1, message='操作成功')