from pot_libs.sanic_api import summary, description, examples from pot_libs.logger import log from unify_api.modules.anshiu.service.equip_management_serv import \ equip_run_list_serv, equip_run_statistics_serv from unify_api.modules.anshiu.components.equip_management_cps import ( EquipManagementTotalReq, EquipManagementListReq, EquipManagementTotalResp, EquipManagementListResp, EquipRunReq, EquipRunListResp, EquipRunStatusReq, EquipRunStatusResp, EquipRunStatisticsReq, EquipRunStatisticsResp ) from unify_api.modules.anshiu.procedures.equip_management_pds import ( check_company_exist, equip_management_list, equip_management_total, equip_run_list, get_equip_run_status ) @summary("设备管理-获取设备统计信息") async def post_equip_management_total(request, body: EquipManagementTotalReq) -> EquipManagementTotalResp: company_id = body.cid total_info = await equip_management_total(company_id) return EquipManagementTotalResp( installed_number=total_info["installed_number"], start_time=total_info["start_time"]) @summary("设备管理-获取设备列表/下载") @description("列表的时候正常传页码,下载的时候is_download=1") async def post_equip_management_list(request, body: EquipManagementListReq) -> EquipManagementListResp: company_id = body.cid is_download = body.is_download page_size, page_num = body.page_size, body.page_num log.info( f"post_equip_management_list company_id={company_id},page_size={page_size},page_num={page_num}" ) # 下载(限制最大10000条) if is_download == 1: page_num, page_size = 1, 10000 company_exist = await check_company_exist(company_id) if not company_exist: return EquipManagementListResp.user_error() page_map = await equip_management_list(company_id, page_num, page_size) return EquipManagementListResp(rows=page_map["rows"], total=page_map["total"], page_num=page_num) @summary("运行统计-列表") @description("列表的时候正常传页码,下载的时候is_download=1") async def post_equip_run_list(request, body: EquipRunReq) -> EquipRunListResp: try: company_id = body.cid point_ids = body.point_ids page_size = body.page_size page_num = body.page_num start_time = body.start end_time = body.end is_download = body.is_download sort_field = body.sort_field if body.sort_field and is_download == 0 \ else 'start_time' sort_type = body.sort_type if body.sort_type and is_download == 0 \ else 'desc' # 未选中监测点直接返回 if len(point_ids) == 0: return EquipRunListResp(rows=[], total=0, page_num=page_num) # 监测点选中全部 if point_ids[0] == -1: point_ids = [] # 未选中监测点且未选中工厂提示错误 if len(point_ids) == 0 and not company_id: log.error( "post_scope_list_param_error pids:%s cid:%s" % ( point_ids, company_id)) return EquipRunListResp.error_param() # 下载(限制最大10000条) if is_download == 1: page_num, page_size = 1, 10000 except Exception as e: log.error("post_equip_run_list_error :" + e) return EquipRunListResp.param_error() # 从数据库中获取数据 rows, total = await equip_run_list_serv(company_id, point_ids, start_time, end_time, page_num, page_size, sort_field, sort_type) return EquipRunListResp(rows=rows, total=total, page_num=page_num) @summary("运行统计-获取统计数据") async def post_equip_run_statistics(request, body: EquipRunStatisticsReq) -> EquipRunStatisticsResp: ''' 获取运行统计数据 ''' cid = body.cid start_time = body.start end_time = body.end point_ids = body.point_ids # 未选中监测点直接返回 if len(point_ids) == 0: return EquipRunStatisticsResp(count=0,run_all_time='',run_avg_time='',run_max_time='') # 监测点选中全部 if point_ids[0] == -1: point_ids = [] # 未选中监测点且未选中工厂提示错误 if len(point_ids) == 0 and not cid: log.error("post_scope_list_param_error pids:%s cid:%s" % (point_ids, cid)) return EquipRunStatisticsResp.error_param() data = await equip_run_statistics_serv(cid, point_ids, start_time, end_time) return EquipRunStatisticsResp(count=data['total_count'], run_all_time=data['all_time'], run_avg_time=data['avg_time'], run_max_time=data['max_time']) @summary('获取检测点运行状态') async def post_equit_run_status(request, body: EquipRunStatusReq) -> EquipRunStatusResp: point_id = body.point_id if not point_id or point_id <= 0: log.error(f'get_equit_run_status_param_error point:{point_id}') return EquipRunStatusResp.param_error() status = await get_equip_run_status(point_id) return EquipRunStatusResp(is_run=status)