from pot_libs.common.components.responses import Success, success_res from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.qingstor_util.qs_client import QsClient from pot_libs.sanic_api import summary from pot_libs.settings import SETTING from pot_libs.utils.exc_util import BusinessException, ParamException from unify_api.constants import INSTALL_SHEET from unify_api.modules.zhiwei_u.components.install_sheet_cps import SlReq, \ SuResp, SlResp, SrReq, DrReq from unify_api.modules.zhiwei_u.config import PID_NAME from unify_api.modules.zhiwei_u.dao.install_sheet_dao import \ install_sheet_by_sheet_id, update_install_sheet, delete_install_sheet, \ install_sheet_by_cid, install_sheet_by_sheet_ids from unify_api.modules.zhiwei_u.procedures.common import zhiweiu_required, \ allowed_file from unify_api.modules.zhiwei_u.service.install_sheet_service import \ sheet_list_service from uuid import uuid4 from unify_api.utils.response_code import RET @summary('安装单管理-上传') @zhiweiu_required() async def post_sheet_upload(req) -> SuResp: # 1. 获取files和参数 file = req.files.get("file") file_name = file.name file_name_end = file_name.rsplit(".", 1)[1] file_body = file.body cid = int(req.form.get("cid")) belong = int(req.form.get("belong")) product_id = int(req.form.get("product_id")) p_name = PID_NAME[product_id] # 已存在同名文件不允许上传11 install_list = await install_sheet_by_cid(cid) install_list_f_name = [i["doc_name"] for i in install_list] if file_name in install_list_f_name: raise BusinessException(message=f"有重复文件:{file_name}") # 2. 上传到qingstor if file_body and allowed_file(file_name): qs_url = f"{INSTALL_SHEET}/{p_name}/{cid}/{uuid4()}.{file_name_end}" async with QsClient() as client: resp = await client.put_file(key=qs_url, data=file_body) log.info(f"upload to qingstore the resp: {resp}") # 3. 插入数据到mysql insert_sql = "INSERT into install_sheet " \ "(product_id, cid, belong, doc_name, doc_url) values %s" async with MysqlUtil(db=SETTING.mysql_zhiwei_u_db) as conn: res = await conn.execute(sql=insert_sql, args=( (product_id, cid, belong, file_name, qs_url),)) res_id = await conn.fetchone(sql="select @@IDENTITY as n_id") return SuResp( sheet_id=res_id["n_id"], doc_name=file_name, belong=belong ) else: raise BusinessException(message="文件格式错误") @summary('安装单管理-文件列表') async def post_sheet_list(req, body: SlReq) -> SlResp: cid = body.cid product_id = body.product_id return await sheet_list_service(cid) @summary('安装单管理-下载') async def get_sheet_download(req): args = req.args sheet_id = int(args.get("sheet_id")) install_dic = await install_sheet_by_sheet_id(sheet_id) if not install_dic: raise ParamException(message="文件不存在或已删除") key = install_dic["doc_url"] doc_name = install_dic["doc_name"] async with QsClient() as client: resp_bytes = await client.get_file(key=key) return resp_bytes, doc_name @summary('安装单管理-重命名') @zhiweiu_required() async def post_sheet_rename(req, body: SrReq): sheet_id = body.sheet_id new_name = body.new_name affect_num = await update_install_sheet(new_name, sheet_id) if affect_num == 1: return success_res(msg="操作成功") else: mes = f"操作失败id:{sheet_id}" return success_res(code=RET.op_fail, msg=mes) @summary('安装单管理-删除') @zhiweiu_required() async def post_sheet_delete(req, body: DrReq): sheet_id_list = body.sheet_id_list # 删除qingstore install_list = await install_sheet_by_sheet_ids(sheet_id_list) if len(install_list) != len(sheet_id_list): log.warning(f"删除的文件ids:{install_list},不存在或已删除") return success_res(code=RET.op_fail, msg=f"操作失败ids:{install_list}") for install_dic in install_list: doc_url = install_dic.get("doc_url") sheet_id = install_dic.get("id") async with QsClient() as client: res = await client.del_file(key=doc_url) # mysql逻辑删除 affect_num = await delete_install_sheet(sheet_id) if affect_num == 0: return success_res(code=RET.op_fail, msg="操作失败id:{sheet_id}") return success_res(msg="操作成功")