import asyncio import json import sys import os import pendulum base_path = os.path.abspath('../../../') sys.path.append(base_path) sys.path.append(f'{base_path}/pot_libs') from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.utils.pendulum_wrapper import my_pendulum """ ××运行前确认PRO_INF是否正确×× sql条件product = 1 """ PRO_INF = { "table": "user_portrait", "product": 1 } DEMO_CID = "[42]" async def user_auth_asy(): """同步用户信息到新表""" # 1. 查询user_elecsafe表所有记录 table = PRO_INF["table"] sql = f"SELECT * from {table}" async with MysqlUtil() as conn: demo_info = await conn.fetchall(sql=sql) # 2.构造新表数据 for info in demo_info: user_id = info.get("user_id") product = PRO_INF["product"] type = info.get("type") uassistant_auth = info.get("super_auth") created_time = info.get("created_at") demo_cid = None # 创建时间转换为时间戳 if created_time: dt = my_pendulum.from_format(str(created_time), 'YYYY-MM-DD HH:mm:ss') created_time = dt.int_timestamp if type == 0: # 超级用户,所有工厂的权限 cid_ext = {} sql_cid = "SELECT cid from company where product = 1" async with MysqlUtil() as conn: cid_res = await conn.fetchall(sql=sql_cid) for cid in cid_res: cid_ext[str(cid["cid"])] = [] cid_ext = json.dumps(cid_ext) elif type == 1: cid_ext = {} proxy_id = info.get("proxy_id") sql_cid = "SELECT c.cid from company c left join " \ "company_proxy_map cpm on cpm.cid=c.cid " \ "where cpm.proxy = %s and c.product = 1" async with MysqlUtil() as conn: cid_res = await conn.fetchall(sql=sql_cid, args=(proxy_id,)) for cid in cid_res: cid_ext[str(cid["cid"])] = [] cid_ext = json.dumps(cid_ext) elif type == 2: cid = info.get("cid") cid_ext = {str(cid): []} cid_ext = json.dumps(cid_ext) else: # type3 demo用户 cid_ext = None demo_cid = DEMO_CID # 3.插入数据到user_product_auth if not created_time: dt = pendulum.now() timestamp = dt.int_timestamp created_time = timestamp sql_ins = "INSERT INTO user_product_auth " \ "(user_id, product, cid_ext, demo_cid, uassistant_auth, " \ "created_time) VALUES (%s, %s, %s, %s, %s, %s)" try: async with MysqlUtil() as conn: cid_res = await conn.execute(sql_ins, args=( user_id, product, cid_ext, demo_cid, uassistant_auth, created_time)) except Exception as e: continue if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(user_auth_asy())