from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.sanic_api import summary from pot_libs.settings import SETTING from pot_libs.utils.pendulum_wrapper import my_pendulum from unify_api.constants import PRODUCT from unify_api.modules.users.components.product_auth_cps import ( ProductAuthResponse, ProductAuth, ProxyNameReq, ProxyNameResponse, ProxyItem, ) from sanic import response from unify_api.modules.users.procedures.jwt_user import jwt_user from unify_api.modules.users.procedures.user_product_auth import \ get_product_auth @summary("用户权限信息") async def get_product_auth_info(request): """获取用户的工厂/权限信息""" args = request.args # 1.从jwt中获取user_id, 并且得到product_id user_id = jwt_user(request) if not user_id: return ProductAuthResponse().missing_jwt() # 安电U和U助手是同一个域名 if args and args.get("product"): product_id = args.get("product") else: product_id = PRODUCT.get(request.host) log.info(f"product_auth用户权限信息 {user_id}, product_id:{product_id}") # 2.调用函数获取到用户信息 res = await get_product_auth(user_id, product_id) if not res: log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限") return [{"code": 5002, "data": {}, "message": "用户没有该产品权限", "srv_time": my_pendulum.now().to_datetime_string()}, 200] # 用户 res_pro = res.get("product") if res_pro: # 3. 返回 infos = [] for key, value in res_pro.items(): sql = ( "select cid, shortname, fullname, industry, province " "from company where cid = %s" ) async with MysqlUtil() as conn: company_info = await conn.fetchone(sql=sql, args=(int(key),)) # 增加获取logo url逻辑 logo_sql = ( "SELECT p.logo from proxy p left join company_proxy_map c " "on p.proxy_id = c.proxy where c.cid = %s" ) async with MysqlUtil() as conn: logo_info = await conn.fetchone(sql=logo_sql, args=(int(key),)) logo_name = logo_info.get("logo") if logo_info else 'qkyn' if product_id == 2: pa = ProductAuth(ext_modules=value, **company_info) elif logo_name != "qkyn" and product_id == 1: logo_url = f"{SETTING.download_img_url}/image/{logo_name}.png" pa = ProductAuth(ext_modules=value, logo_url=logo_url, **company_info) else: pa = ProductAuth(ext_modules=value, **company_info) infos.append(pa) resp = ProductAuthResponse(cid_info=infos) return resp # 后续的代理用户 else: log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限") return [{"code": 5002, "data": {}, "message": "用户没有该产品权限", "srv_time": my_pendulum.now().to_datetime_string()}, 200] @summary("获取代理名称") async def post_proxy_name(request, body: ProxyNameReq) -> ProxyNameResponse: """获取用户的代理名称""" product_id = body.product user_id = jwt_user(request) if not user_id: return ProxyNameResponse().missing_jwt() # 管理版根据user_product_auth表proxy字段判断代理,如果proxy为空则为清科默认代理1 user_sql = "SELECT proxy from user_product_auth where " \ "user_id = %s and product = %s" async with MysqlUtil() as conn: proxy_res = await conn.fetchone(sql=user_sql, args=(user_id, product_id)) # 默认代理,测试和生产都是1 proxy_id = proxy_res["proxy"].replace(" ", "").split(",") \ if proxy_res["proxy"] and proxy_res["proxy"] != '0' else [1] # if not proxy_id: # # 默认代理,测试和生产都是1 # proxy_id = 1 name_sql = "SELECT * FROM proxy where proxy_id in %s" async with MysqlUtil() as conn: name_res = await conn.fetchall(sql=name_sql, args=(proxy_id,)) proxy_list = [ProxyItem(proxy_id=res["proxy_id"], proxy_full_name=res["fullname"], proxy_short_name=res["shortname"]) for res in name_res] return ProxyNameResponse(proxy_list=proxy_list)