product_auth.py 4.44 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
from pot_libs.sanic_api import summary
from pot_libs.settings import SETTING
from pot_libs.utils.pendulum_wrapper import my_pendulum
from unify_api.constants import PRODUCT
from unify_api.modules.users.components.product_auth_cps import (
    ProductAuthResponse,
    ProductAuth,
    ProxyNameReq,
    ProxyNameResponse,
    ProxyItem,
)
from sanic import response

from unify_api.modules.users.procedures.jwt_user import jwt_user
from unify_api.modules.users.procedures.user_product_auth import \
    get_product_auth


@summary("用户权限信息")
async def get_product_auth_info(request):
    """获取用户的工厂/权限信息"""
    args = request.args
    # 1.从jwt中获取user_id, 并且得到product_id
    user_id = jwt_user(request)
    if not user_id:
        return ProductAuthResponse().missing_jwt()
    # 安电U和U助手是同一个域名
    if args and args.get("product"):
        product_id = args.get("product")
    else:
        product_id = PRODUCT.get(request.host)
    log.info(f"product_auth用户权限信息 {user_id}, product_id:{product_id}")
    # 2.调用函数获取到用户信息
    res = await get_product_auth(user_id, product_id)
    if not res:
        log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限")
        return [{"code": 5002, "data": {}, "message": "用户没有该产品权限",
                 "srv_time": my_pendulum.now().to_datetime_string()}, 200]
    # 用户
    res_pro = res.get("product")
    if res_pro:
        # 3. 返回
        infos = []
        for key, value in res_pro.items():
            sql = (
                "select cid, shortname, fullname, industry, province "
                "from company where cid = %s"
            )
            async with MysqlUtil() as conn:
                company_info = await conn.fetchone(sql=sql, args=(int(key),))
            # 增加获取logo url逻辑
            logo_sql = (
                "SELECT p.logo from proxy p left join company_proxy_map c "
                "on p.proxy_id = c.proxy where c.cid = %s"
            )
            async with MysqlUtil() as conn:
                logo_info = await conn.fetchone(sql=logo_sql, args=(int(key),))
            logo_name = logo_info.get("logo") if logo_info else 'qkyn'
            if product_id == 2:
                pa = ProductAuth(ext_modules=value, **company_info)
            elif logo_name != "qkyn" and product_id == 1:
                logo_url = f"{SETTING.download_img_url}/image/{logo_name}.png"
                pa = ProductAuth(ext_modules=value, logo_url=logo_url,
                                 **company_info)
            else:
                pa = ProductAuth(ext_modules=value, **company_info)
            infos.append(pa)
        resp = ProductAuthResponse(cid_info=infos)
        return resp
    # 后续的代理用户
    else:
        log.error(f"用户 user_id={user_id} 没有产品product_id={product_id}的权限")
        return [{"code": 5002, "data": {}, "message": "用户没有该产品权限",
                 "srv_time": my_pendulum.now().to_datetime_string()}, 200]


@summary("获取代理名称")
async def post_proxy_name(request, body: ProxyNameReq) -> ProxyNameResponse:
    """获取用户的代理名称"""
    product_id = body.product
    user_id = jwt_user(request)
    if not user_id:
        return ProxyNameResponse().missing_jwt()
    # 管理版根据user_product_auth表proxy字段判断代理,如果proxy为空则为清科默认代理1
    user_sql = "SELECT proxy from user_product_auth where " \
               "user_id = %s and product = %s"
    async with MysqlUtil() as conn:
        proxy_res = await conn.fetchone(sql=user_sql,
                                        args=(user_id, product_id))
    # 默认代理,测试和生产都是1
    proxy_id = proxy_res["proxy"].replace(" ", "").split(",") \
        if proxy_res["proxy"] and proxy_res["proxy"] != '0' else [1]
    # if not proxy_id:
    #     # 默认代理,测试和生产都是1
    #     proxy_id = 1
    name_sql = "SELECT * FROM proxy where proxy_id in %s"
    async with MysqlUtil() as conn:
        name_res = await conn.fetchall(sql=name_sql, args=(proxy_id,))
    proxy_list = [ProxyItem(proxy_id=res["proxy_id"],
                            proxy_full_name=res["fullname"],
                            proxy_short_name=res["shortname"])
                  for res in name_res]
    return ProxyNameResponse(proxy_list=proxy_list)