from pot_libs.settings import SETTING from unify_api.utils.common_utils import make_tdengine_data_as_list from unify_api.utils.taos_new import get_td_table_name, get_td_engine_data from unify_api.utils.exc_util import BusinessException from pot_libs.mysql_util.mysql_util import MysqlUtil async def get_aiao_1min_dao(mtid, start_time, end_time): # 查1min温度漏电流 sid_data = await get_sid_by_mtid_dao(mtid) sid = sid_data['sid'].lower() su_table = "new_adio_stb" td_mt_table = get_td_table_name(su_table, mtid) url = f"{SETTING.stb_url}db_adio" # td的精度过高,采用 >= start and < end的形式查询 sql = f"select last_row( ts, temp1, temp2, temp3, temp4, residual_current) " \ f"from adio_stb where TBNAME = '{td_mt_table}' and " \ f"ts >= '{start_time}' AND ts <'{end_time}' " \ f"Interval(60s) order by ts asc" is_succ, results = await get_td_engine_data(url, sql) if not results: su_table = "old_adio_stb" td_mt_table = get_td_table_name(su_table, sid) sql = f"select last_row( ts, temp1, temp2, temp3, temp4, residual_current) " \ f"from adio_stb where TBNAME = '{td_mt_table}' and " \ f"ts >= '{start_time}' AND ts <'{end_time}' " \ f"Interval(60s) order by ts asc" is_succ, results = await get_td_engine_data(url, sql) if not is_succ: raise BusinessException() td_datas = make_tdengine_data_as_list(results) if not td_datas: return "" return td_datas async def get_sid_by_mtid_dao(mtid): # 查15min温度漏电流 sql = f""" SELECT sid FROM monitor WHERE mtid = {mtid} """ async with MysqlUtil() as conn: data = await conn.fetchone(sql, ) return data async def get_aiao_15min_dao(mtid, start_time, end_time): # 查15min温度漏电流 sql = f""" SELECT DATE_FORMAT(create_time,'%H:%i') create_time, value_avg, ad_field FROM location_15min_aiao WHERE create_time > "{start_time}" AND create_time < "{end_time}" AND mtid = {mtid} """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, ) return data async def get_aiao_1day_dao(mtid, start_time, end_time): sql = f""" SELECT DATE_FORMAT(create_time,'%m-%d') create_time, value_avg, ad_field FROM location_1day_aiao WHERE create_time > "{start_time}" AND create_time < "{end_time}" AND mtid = {mtid} """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, ) return data async def get_point_1min_chart_dao(mtid, ctnum, start_time, end_time): if ctnum == 2: stats_items = [ "pttl", "qttl", "uab", "ucb", "ia", "ic", ] else: stats_items = [ "pttl", "qttl", "ua", "ub", "uc", "ia", "ib", "ic", ] # 查1min温度漏电流 sid_data = await get_sid_by_mtid_dao(mtid) sid = sid_data['sid'].lower() su_table = "new_electric_stb" td_mt_table = get_td_table_name(su_table, mtid) url = f"{SETTING.stb_url}db_electric" # td的精度过高,采用 >= start and < end的形式查询 stats_items.insert(0, 'ts') sql = f"select last_row({','.join(stats_items)}) " \ f"from electric_stb where TBNAME = '{td_mt_table}' and " \ f"ts >= '{start_time}' AND ts <'{end_time}' " \ f"Interval(60s) order by ts asc" is_succ, results = await get_td_engine_data(url, sql) if not results: su_table = "old_electric_stb" td_mt_table = get_td_table_name(su_table, sid) sql = f"select last_row( {','.join(stats_items)}) " \ f"from electric_stb where TBNAME = '{td_mt_table}' and " \ f"ts >= '{start_time}' AND ts <'{end_time}' " \ f"Interval(60s) order by ts asc" is_succ, results = await get_td_engine_data(url, sql) if not is_succ: raise BusinessException() td_datas = make_tdengine_data_as_list(results) if not td_datas: return "" return td_datas async def get_point_15min_chart_dao(mtid, stats_items, date_start, date_end): # 查15min温度漏电流 sql = f""" SELECT DATE_FORMAT(create_time,'%H:%i') create_time, {','.join(stats_items)} FROM point_15min_electric WHERE create_time > "{date_start}" AND create_time < "{date_end}" AND mtid = {mtid} """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, ) return data async def get_point_1day_chart_dao(mtid, stats_items, date_start, date_end): sql = f""" SELECT DATE_FORMAT(create_time,'%m-%d') create_time, {','.join(stats_items)} FROM point_1day_electric WHERE create_time > "{date_start}" AND create_time < "{date_end}" AND mtid = {mtid} """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, ) return data async def get_mtid_by_pid_dao(pid): sql = f""" SELECT mtid FROM point WHERE pid = %s """ async with MysqlUtil() as conn: data = await conn.fetchone(sql, args=(pid,)) return data async def get_mtids_by_pids_dao(pid): sql = f""" SELECT mtid FROM point WHERE pid in %s """ async with MysqlUtil() as conn: data = await conn.fetchall(sql, args=(pid,)) data = [i['mtid'] for i in data if i['mtid']] return data async def get_point_monitor_dao(id_value, field="m.mtid"): sql = f"SELECT p.pid,m.meter_no,m.sid,p.ctr,p.ptr,p.ctnum,p.vc,p.tc," \ f"p.imax FROM `point` p INNER JOIN monitor m on m.mtid=p.mtid " \ f"where m.demolished = 0 and {field}=%s;" async with MysqlUtil() as conn: data = await conn.fetchone(sql, args=(id_value,)) return data async def electric_index_list_dao(table_name, point_id, start, end): """ 获取用电数据 :param table_name: :param point_id: :param start: :param end: :return: """ sql = f"SELECT * FROM {table_name} where pid=%s and create_time " \ f"BETWEEN %s and %s ORDER BY create_time desc" async with MysqlUtil() as conn: result = await conn.fetchall(sql, args=(point_id, start, end)) return result or [] async def electric_index_location_dao(table_name, mtid, start, end): """ 获取安全数据 :param table_name: :param mtid: :param start: :param end: :return: """ location_sql = "SELECT mtid,lid,item,ad_type FROM location WHERE mtid=%s" datas_sql = f"SELECT * from {table_name} where mtid = %s and create_time" \ f" BETWEEN '{start}' and '{end}' order by create_time desc" async with MysqlUtil() as conn: location_datas = await conn.fetchall(location_sql, args=(mtid,)) result = await conn.fetchall(datas_sql, args=(mtid,)) return location_datas or [], result or []