product_auth.py 4.02 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
from pot_libs.settings import SETTING
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import PRODUCT
from unify_api.modules.users.components.product_auth_cps import (
    ProductAuthResponse,
    ProductAuth,
    ProxyNameReq,
    ProxyNameResponse,
    ProxyItem,
)

from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.modules.users.procedures.user_product_auth import \
    get_product_auth
ZZH's avatar
ZZH committed
18 19 20
from unify_api.modules.users.dao.current_user_info_dao import (
    load_compy_info, load_compy_logo
)
lcn's avatar
lcn committed
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48


@summary("用户权限信息")
async def get_product_auth_info(request):
    """获取用户的工厂/权限信息"""
    args = request.args
    # 1.从jwt中获取user_id, 并且得到product_id
    user_id = jwt_user(request)
    if not user_id:
        return ProductAuthResponse().missing_jwt()
    # 安电U和U助手是同一个域名
    if args and args.get("product"):
        product_id = args.get("product")
    else:
        product_id = PRODUCT.get(request.host)
    log.info(f"product_auth用户权限信息 {user_id}, product_id:{product_id}")
    # 2.调用函数获取到用户信息
    res = await get_product_auth(user_id, product_id)
    if not res:
        log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限")
        return [{"code": 5002, "data": {}, "message": "用户没有该产品权限",
                 "srv_time": my_pendulum.now().to_datetime_string()}, 200]
    # 用户
    res_pro = res.get("product")
    if res_pro:
        # 3. 返回
        infos = []
        for key, value in res_pro.items():
ZZH's avatar
ZZH committed
49 50
            company_info = await load_compy_info(key)
            logo_info = await load_compy_logo(key)
lcn's avatar
lcn committed
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
            logo_name = logo_info.get("logo") if logo_info else 'qkyn'
            if product_id == 2:
                pa = ProductAuth(ext_modules=value, **company_info)
            elif logo_name != "qkyn" and product_id == 1:
                logo_url = f"{SETTING.download_img_url}/image/{logo_name}.png"
                pa = ProductAuth(ext_modules=value, logo_url=logo_url,
                                 **company_info)
            else:
                pa = ProductAuth(ext_modules=value, **company_info)
            infos.append(pa)
        resp = ProductAuthResponse(cid_info=infos)
        return resp
    # 后续的代理用户
    else:
        log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限")
        return [{"code": 5002, "data": {}, "message": "用户没有该产品权限",
                 "srv_time": my_pendulum.now().to_datetime_string()}, 200]


@summary("获取代理名称")
async def post_proxy_name(request, body: ProxyNameReq) -> ProxyNameResponse:
    """获取用户的代理名称"""
    product_id = body.product
    user_id = jwt_user(request)
    if not user_id:
        return ProxyNameResponse().missing_jwt()
    # 管理版根据user_product_auth表proxy字段判断代理,如果proxy为空则为清科默认代理1
    user_sql = "SELECT proxy from user_product_auth where " \
               "user_id = %s and product = %s"
    async with MysqlUtil() as conn:
        proxy_res = await conn.fetchone(sql=user_sql,
                                        args=(user_id, product_id))
    # 默认代理,测试和生产都是1
    proxy_id = proxy_res["proxy"].replace(" ", "").split(",") \
        if proxy_res["proxy"] and proxy_res["proxy"] != '0' else [1]
    # if not proxy_id:
    #     # 默认代理,测试和生产都是1
    #     proxy_id = 1
    name_sql = "SELECT * FROM proxy where proxy_id in %s"
    async with MysqlUtil() as conn:
        name_res = await conn.fetchall(sql=name_sql, args=(proxy_id,))
    proxy_list = [ProxyItem(proxy_id=res["proxy_id"],
                            proxy_full_name=res["fullname"],
                            proxy_short_name=res["shortname"])
                  for res in name_res]
    return ProxyNameResponse(proxy_list=proxy_list)