scope_operations_pds.py 3.61 KB
Newer Older
wang.wenrong's avatar
wang.wenrong committed
1
import asyncio
wang.wenrong's avatar
wang.wenrong committed
2
import datetime
wang.wenrong's avatar
wang.wenrong committed
3
import json
wang.wenrong's avatar
wang.wenrong committed
4
from pot_libs.qingstor_util.qs_client import QsClient
wang.wenrong's avatar
wang.wenrong committed
5 6
from pot_libs.logger import log
from pot_libs.mysql_util.mysql_util import MysqlUtil
wang.wenrong's avatar
wang.wenrong committed
7 8 9 10
from unify_api.modules.anshiu.dao.fine_monitor_dao import get_mtid_by_pid_dao
from unify_api.modules.anshiu.dao.scope_operations_dao import \
    get_scope_url_by_pid
from unify_api.utils.log_utils import LOGGER
wang.wenrong's avatar
wang.wenrong committed
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60


async def get_scope_config_by_pid(pid):
    '''
    获取某个设备的录波配置
    '''
    async with MysqlUtil() as conn:
        sql = "select threshold from scope_config_record where pid = %s"
        result = await conn.fetchone(sql=sql, args=(pid,))
    return result.get('threshold', {}) if result else {}


async def set_scope_config_by_pid(pid, config):
    '''
        设置录波配置
    '''
    try:
        async with MysqlUtil() as conn:
            sql = "update scope_config_record set threshold=%s where pid=%s"
            result = await conn.execute(sql=sql, args=(config, pid))
    except Exception as e:
        log.error('set_scope_config_by_pid update error:' + str(e))
        return -1
    return result


async def add_scope_config_by_pid(pid, configs):
    '''
        增加录波配置
    '''
    try:
        async with MysqlUtil() as conn:
            sql = "INSERT INTO `power_iot`.`scope_config_record` (`pid`, " \
                  "`coef`, `threshold`, `vol_cur`) VALUES (%s, %s, %s, %s)"
            result = await conn.execute(sql=sql, args=(pid,
                                                       json.dumps(
                                                           configs["coef"]),
                                                       json.dumps(configs[
                                                                      "threshold"]),
                                                       json.dumps(configs)))
    except Exception as e:
        log.error('add_scope_config_by_pid add error:' + str(e))
        return -1
    return result


async def get_scope_list_by_pid(pids, start_dt, end_dt, scope_g="2s"):
    """
        ES查询2s录波记录数据
    """
wang.wenrong's avatar
wang.wenrong committed
61 62 63 64 65 66 67
    mtid = await get_mtid_by_pid_dao(pids)
    mtid = mtid.get('mtid')
    # 获取录波曲线数据
    scope_url_data = await get_scope_url_by_pid(mtid, start_dt, end_dt)
    scope_list = []
    starttime_wm = datetime.datetime.now()
    print(f'起始时间:{starttime_wm}')
wang.wenrong's avatar
wang.wenrong committed
68
    task_list = []
wang.wenrong's avatar
wang.wenrong committed
69 70
    for scope_data in scope_url_data:
        try:
wang.wenrong's avatar
wang.wenrong committed
71 72
            url = scope_data.get("url")
            task_list.append(get_qs_datas(url))
wang.wenrong's avatar
wang.wenrong committed
73 74 75
        except Exception as e:
            LOGGER.error(f"scope_detail_service error message:{str(e)}")
            return []
wang.wenrong's avatar
wang.wenrong committed
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
    qs_dats = await asyncio.gather(*task_list)
    qs_data_map = {data[0]: data[1] for data in qs_dats}
    for scope_data in scope_url_data:
        url = scope_data.get("url")
        wave_data = qs_data_map[url]
        [scope_list.append({"datetime": scope_data['datetime'],
                            "ua": wave_data['ua'][i],
                            "ub": wave_data['ub'][i],
                            "uc": wave_data['uc'][i],
                            "ia": wave_data['ia'][i],
                            "ib": wave_data['ib'][i],
                            "ic": wave_data['ic'][i],
                            "lc": wave_data['lc'][i],
                            "pttl": wave_data['pttl'][i]}) for i in
         range(len(wave_data.get('ua')))]
wang.wenrong's avatar
wang.wenrong committed
91 92 93 94
    end_time_wm = datetime.datetime.now()
    print(f'结束时间:{end_time_wm}')

    return scope_list
wang.wenrong's avatar
wang.wenrong committed
95 96 97 98 99 100


async def get_qs_datas(url):
    async with QsClient() as qs:
        wave_data = await qs.get_object(url)
    return url, wave_data