from pot_libs.logger import log from pot_libs.mysql_util.mysql_util import MysqlUtil from pot_libs.es_util.es_utils import EsUtil from unify_api import constants import pandas as pd async def get_location_by_ids(location_ids): ''' 根据location_id获取各项温度信息 ''' async with MysqlUtil() as conn: sql = "select lid,item,ad_type from location where lid in %s" result = await conn.fetchall(sql, args=(tuple(location_ids),)) location_info = {res.get('lid'): res for res in result} return location_info async def get_mtid_by_location_ids(location_ids): """ 根据location——id获取mtid """ async with MysqlUtil() as conn: sql = f""" SELECT m.mtid, m.`name`, m.m_type, l.lid, l.item, l.ad_type FROM monitor m INNER JOIN location l ON m.mtid = l.mtid WHERE l.lid in {tuple(location_ids)} AND m.demolished = 0 """ result = await conn.fetchall(sql, ) return result async def get_threshold_by_location(location_ids, type='overResidualCurrent', default_threshold=30): ''' 根据location_id获取阈值 ''' async with MysqlUtil() as conn: sql = "select threshold from soe_config_record where lid in %s " \ "and etype = %s limit 1 " settings = await conn.fetchall(sql, args=(tuple(location_ids), type)) if settings: return settings[0]["threshold"] return default_threshold async def get_es_aiao_15min_data(query_body): ''' 从es中获取环境相关数据(15min) ''' async with EsUtil() as es: es_results = await es.search_origin(body=query_body, index=constants.LOCATION_15MIN_AIAO) return es_results.get("aggregations", {}) async def get_es_point_15min_data(query_body): ''' 从es中获取电力相关数据(15min) ''' async with EsUtil() as es: es_results = await es.search_origin(body=query_body, index=constants.POINT_15MIN_INDEX) return es_results.get("aggregations", {}) async def get_es_point_1min_data(query_body, start): ''' 从es中获取电气相关数据(1min) ''' async with EsUtil() as es: # 电气相关数据1min的需要分表查询 p_database = "poweriot_point_1min_index_" + start[:4] + "_" + \ str(int(start[5:7])) es_results = await es.search_origin(body=query_body, index=p_database) return es_results.get("hits", {}).get("hits", {}) async def get_aiao_1min_pds(slots, temp_res_data, res_curr_th): temp, res, s_value, a_value, b_value, c_value, n_value = [], [], { "item": "漏电流", "threhold": res_curr_th}, {"item": "A相"}, { "item": "B相"}, { "item": "C相"}, { "item": "N相"} temp1, temp2, temp3, temp4, res_curr = {}, {}, {}, {}, {} [temp1.update({i.get("ts"): i.get("temp1")}) for i in temp_res_data] [temp2.update({i.get("ts"): i.get("temp2")}) for i in temp_res_data] [temp3.update({i.get("ts"): i.get("temp3")}) for i in temp_res_data] [temp4.update({i.get("ts"): i.get("temp4")}) for i in temp_res_data] [res_curr.update({i.get("ts"): i.get("residual_current")}) for i in temp_res_data] a_slot, b_slot, c_slot, n_slot, res_slot = [], [], [], [], [] [a_slot.append(temp1.get(i, "")) for i in slots] [b_slot.append(temp2.get(i, "")) for i in slots] [c_slot.append(temp3.get(i, "")) for i in slots] [n_slot.append(temp4.get(i, "")) for i in slots] [res_slot.append(res_curr.get(i, "")) for i in slots] a_value.update({"value_slots": a_slot}) b_value.update({"value_slots": b_slot}) c_value.update({"value_slots": c_slot}) n_value.update({"value_slots": n_slot}) s_value.update({"value_slots": res_slot}) temp.append(a_value) temp.append(b_value) temp.append(c_value) temp.append(n_value) res.append(s_value) return temp, res async def get_aiao_data_pds(slots, temp_res_data, res_curr_th): temp, res, s_value, a_value, b_value, c_value, n_value = [], [], { "item": "漏电流", "threhold": res_curr_th}, {"item": "A相"}, { "item": "B相"}, {"item": "C相"}, {"item": "N线"} temp1, temp2, temp3, temp4, res_curr = {}, {}, {}, {}, {} for i in temp_res_data: if i.get('ad_field') == 'temp1': temp1.update({i.get("create_time"): i.get("value_avg")}) elif i.get('ad_field') == "temp2": temp2.update({i.get("create_time"): i.get("value_avg")}) elif i.get('ad_field') == "temp3": temp3.update({i.get("create_time"): i.get("value_avg")}) elif i.get('ad_field') == "temp4": temp4.update({i.get("create_time"): i.get("value_avg")}) else: res_curr.update({i.get("create_time"): i.get("value_avg")}) a_slot, b_slot, c_slot, n_slot, res_slot = [], [], [], [], [] [a_slot.append(temp1.get(i, "")) for i in slots] [b_slot.append(temp2.get(i, "")) for i in slots] [c_slot.append(temp3.get(i, "")) for i in slots] [n_slot.append(temp4.get(i, "")) for i in slots] [res_slot.append(res_curr.get(i, "")) for i in slots] a_value.update({"value_slots": a_slot}) b_value.update({"value_slots": b_slot}) c_value.update({"value_slots": c_slot}) n_value.update({"value_slots": n_slot}) s_value.update({"value_slots": res_slot}) temp.append(a_value) temp.append(b_value) temp.append(c_value) temp.append(n_value) res.append(s_value) return temp, res async def get_point_1min_chart_pds(ctnum, slots, data): if ctnum == 3: i, v, power, ia_value, ib_value, ic_value, ua_value, ub_value, uc_value, pttl_value, qttl_value \ = [], [], [], {"item": "ia"}, {"item": "ib"}, {"item": "ic"}, { "item": "ua"}, {"item": "ub"}, {"item": "uc"}, {"item": "pttl"}, { "item": "qttl"} ia_dict, ib_dict, ic_dict, pttl_dict, qttl_dict, ua_dict, ub_dict, uc_dict \ = {}, {}, {}, {}, {}, {}, {}, {} [ia_dict.update({i.get("ts"): i.get("ia")}) for i in data] [ib_dict.update({i.get("ts"): i.get("ib")}) for i in data] [ic_dict.update({i.get("ts"): i.get("ic")}) for i in data] [pttl_dict.update({i.get("ts"): i.get("pttl")}) for i in data] [qttl_dict.update({i.get("ts"): i.get("qttl")}) for i in data] [ua_dict.update({i.get("ts"): i.get("ua")}) for i in data] [ub_dict.update({i.get("ts"): i.get("ub")}) for i in data] [uc_dict.update({i.get("ts"): i.get("uc")}) for i in data] ia_list, ib_list, ic_list, pttl_list, qttl_list, ua_list, ub_list, uc_list \ = [], [], [], [], [], [], [], [] [ia_list.append(ia_dict.get(i, "")) for i in slots] [ib_list.append(ia_dict.get(i, "")) for i in slots] [ic_list.append(ia_dict.get(i, "")) for i in slots] [pttl_list.append(ia_dict.get(i, "")) for i in slots] [qttl_list.append(ia_dict.get(i, "")) for i in slots] [ua_list.append(ia_dict.get(i, "")) for i in slots] [ub_list.append(ia_dict.get(i, "")) for i in slots] [uc_list.append(ia_dict.get(i, "")) for i in slots] ia_value.update({"value_slots": ia_list}) ib_value.update({"value_slots": ib_list}) ic_value.update({"value_slots": ic_list}) pttl_value.update({"value_slots": pttl_list}) qttl_value.update({"value_slots": qttl_list}) ua_value.update({"value_slots": ua_list}) ub_value.update({"value_slots": ub_list}) uc_value.update({"value_slots": uc_list}) i.append(ia_value) i.append(ib_value) i.append(ic_value) v.append(ua_value) v.append(ub_value) v.append(uc_value) power.append(pttl_value) power.append(qttl_value) else: i, v, power, ia_value, ic_value, uab_value, ucb_value, pttl_value, qttl_value \ = [], [], [], {"item": "ia"}, {"item": "ic"}, { "item": "uab"}, {"item": "ucb"}, {"item": "pttl"}, {"item": "qttl"} ia_dict, ic_dict, pttl_dict, qttl_dict, uab_dict, ucb_dict, \ = {}, {}, {}, {}, {}, {} [ia_dict.update({i.get("ts"): i.get("ia")}) for i in data] [ic_dict.update({i.get("ts"): i.get("ic")}) for i in data] [pttl_dict.update({i.get("ts"): i.get("pttl")}) for i in data] [qttl_dict.update({i.get("ts"): i.get("qttl")}) for i in data] [uab_dict.update({i.get("ts"): i.get("ua")}) for i in data] [ucb_dict.update({i.get("ts"): i.get("ub")}) for i in data] ia_list, ic_list, pttl_list, qttl_list, uab_list, ucb_list \ = [], [], [], [], [], [] [ia_list.append(ia_dict.get(i, "")) for i in slots] [ic_list.append(ia_dict.get(i, "")) for i in slots] [pttl_list.append(ia_dict.get(i, "")) for i in slots] [qttl_list.append(ia_dict.get(i, "")) for i in slots] [uab_list.append(ia_dict.get(i, "")) for i in slots] [ucb_list.append(ia_dict.get(i, "")) for i in slots] ia_value.update({"value_slots": ia_list}) ic_value.update({"value_slots": ic_list}) pttl_value.update({"value_slots": pttl_list}) qttl_value.update({"value_slots": qttl_list}) uab_value.update({"value_slots": uab_list}) ucb_value.update({"value_slots": ucb_list}) i.append(ia_value) i.append(ic_value) v.append(uab_value) v.append(ucb_value) power.append(pttl_value) power.append(qttl_value) return i, v, power async def get_point_data_chart_pds(ctnum, slots, data): if ctnum == 3: i, v, power, ia_value, ib_value, ic_value, ua_value, ub_value, uc_value, pttl_value, qttl_value \ = [], [], [], {"item": "ia"}, {"item": "ib"}, {"item": "ic"}, { "item": "ua"}, {"item": "ub"}, {"item": "uc"}, {"item": "pttl"}, { "item": "qttl"} ia_dict, ib_dict, ic_dict, pttl_dict, qttl_dict, ua_dict, ub_dict, uc_dict \ = {}, {}, {}, {}, {}, {}, {}, {} [ia_dict.update({i.get("create_time"): i.get("ia_mean")}) for i in data] [ib_dict.update({i.get("create_time"): i.get("ib_mean")}) for i in data] [ic_dict.update({i.get("create_time"): i.get("ic_mean")}) for i in data] [pttl_dict.update({i.get("create_time"): i.get("pttl_mean")}) for i in data] [qttl_dict.update({i.get("create_time"): i.get("qttl_mean")}) for i in data] [ua_dict.update({i.get("create_time"): i.get("ua_mean")}) for i in data] [ub_dict.update({i.get("create_time"): i.get("ub_mean")}) for i in data] [uc_dict.update({i.get("create_time"): i.get("uc_mean")}) for i in data] ia_list, ib_list, ic_list, pttl_list, qttl_list, ua_list, ub_list, uc_list \ = [], [], [], [], [], [], [], [] [ia_list.append(ia_dict.get(i, "")) for i in slots] [ib_list.append(ia_dict.get(i, "")) for i in slots] [ic_list.append(ia_dict.get(i, "")) for i in slots] [pttl_list.append(ia_dict.get(i, "")) for i in slots] [qttl_list.append(ia_dict.get(i, "")) for i in slots] [ua_list.append(ia_dict.get(i, "")) for i in slots] [ub_list.append(ia_dict.get(i, "")) for i in slots] [uc_list.append(ia_dict.get(i, "")) for i in slots] ia_value.update({"value_slots": ia_list}) ib_value.update({"value_slots": ib_list}) ic_value.update({"value_slots": ic_list}) pttl_value.update({"value_slots": pttl_list}) qttl_value.update({"value_slots": qttl_list}) ua_value.update({"value_slots": ua_list}) ub_value.update({"value_slots": ub_list}) uc_value.update({"value_slots": uc_list}) i.append(ia_value) i.append(ib_value) i.append(ic_value) v.append(ua_value) v.append(ub_value) v.append(uc_value) power.append(pttl_value) power.append(qttl_value) else: i, v, power, ia_value, ic_value, uab_value, ucb_value, pttl_value, qttl_value \ = [], [], [], {"item": "ia"}, {"item": "ic"}, { "item": "uab"}, {"item": "ucb"}, {"item": "pttl"}, {"item": "qttl"} ia_dict, ic_dict, pttl_dict, qttl_dict, uab_dict, ucb_dict, \ = {}, {}, {}, {}, {}, {} [ia_dict.update({i.get("create_time"): i.get("ia_mean")}) for i in data] [ic_dict.update({i.get("create_time"): i.get("ic_mean")}) for i in data] [pttl_dict.update({i.get("create_time"): i.get("pttl_mean")}) for i in data] [qttl_dict.update({i.get("create_time"): i.get("qttl_mean")}) for i in data] [uab_dict.update({i.get("create_time"): i.get("ua_mean")}) for i in data] [ucb_dict.update({i.get("create_time"): i.get("ub_mean")}) for i in data] ia_list, ic_list, pttl_list, qttl_list, uab_list, ucb_list \ = [], [], [], [], [], [] [ia_list.append(ia_dict.get(i, "")) for i in slots] [ic_list.append(ia_dict.get(i, "")) for i in slots] [pttl_list.append(ia_dict.get(i, "")) for i in slots] [qttl_list.append(ia_dict.get(i, "")) for i in slots] [uab_list.append(ia_dict.get(i, "")) for i in slots] [ucb_list.append(ia_dict.get(i, "")) for i in slots] ia_value.update({"value_slots": ia_list}) ic_value.update({"value_slots": ic_list}) pttl_value.update({"value_slots": pttl_list}) qttl_value.update({"value_slots": qttl_list}) uab_value.update({"value_slots": uab_list}) ucb_value.update({"value_slots": ucb_list}) i.append(ia_value) i.append(ic_value) v.append(uab_value) v.append(ucb_value) power.append(pttl_value) power.append(qttl_value) return i, v, power GENERAL_PARAM_FIELD_2 = [ "lf_mean", "lf_min", "lf_max", "pttl_mean", "pttl_min", "pttl_max", "qttl_mean", "qttl_min", "qttl_max", "costtl_mean", "costtl_min", "costtl_max", "uab_mean", "uab_min", "uab_max", "ucb_mean", "ucb_min", "ucb_max", "ia_mean", "ia_min", "ia_max", "ic_mean", "ic_min", "ic_max", "freq_mean", "freq_min", "freq_max" ] GENERAL_PARAM_FIELD_3 = [ # "lf_mean", "lf_min", "lf_max", "freq_mean", "freq_min", # "freq_max", "costtl_mean", "costtl_min", "costtl_max", "pttl_mean", "pttl_min", "pttl_max", "qttl_mean", "qttl_min", "qttl_max", "ua_mean", "ua_min", "ua_max", "ub_mean", "ub_min", "ub_max", "uc_mean", "uc_min", "uc_max", "ia_mean", "ia_min", "ia_max", "ib_mean", "ib_min", "ib_max", "ic_mean", "ic_min", "ic_max", ] ELECTRIC_QUALITY_FIELD_2 = [ "ubl_mean", "ubl_min", "ubl_max", "ibl_mean", "ibl_min", "ibl_max", "thduab_mean", "thduab_min", "thduab_max", "thducb_mean", "thducb_min", "thducb_max", "thdia_mean", "thdia_min", "thdia_max", "thdic_mean", "thdic_min", "thdic_max", "uab_dev_mean", "uab_dev_min", "uab_dev_max", "freq_dev_mean", "freq_dev_min", "freq_dev_max", ] ELECTRIC_QUALITY_FIELD_3 = [ "ubl_mean", "ubl_min", "ubl_max", "ibl_mean", "ibl_min", "ibl_max", "thdua_mean", "thdua_min", "thdua_max", "thdub_mean", "thdub_min", "thdub_max", "thduc_mean", "thduc_min", "thduc_max", "thdia_mean", "thdia_min", "thdia_max", "thdib_mean", "thdib_min", "thdib_max", "thdic_mean", "thdic_min", "thdic_max", "ua_dev_mean", "ua_dev_min", "ua_dev_max", "freq_dev_mean", "freq_dev_min", "freq_dev_max" ] def cal_electic_value(datas, index_fields, mtid=None): """ 用电指数数据封装 :param datas: 数据 :param index_fields: 字段 :param mtid: 字段是否需要合并mtid :return: """ df = pd.DataFrame(list(datas)) indexes_list = [] _index_fields = {field.rsplit("_", 1)[0] for field in index_fields} for item in _index_fields: if datas: # item = item.rsplit("_", 1)[0] max_item_name = f"{item}_max" max_value = df[max_item_name].max() if not pd.isna(max_value): max_datas = df.loc[df[max_item_name].idxmax()].to_dict() max_time = max_datas.get(f"{max_item_name}_time") max_time = "" if pd.isnull(max_time) else str(max_time) else: max_value, max_time = "", "" min_item_name = f"{item}_min" min_value = df[min_item_name].min() if not pd.isna(min_value): min_datas = df.loc[df[min_item_name].idxmin()].to_dict() min_time = min_datas.get(f"{min_item_name}_time") min_time = "" if pd.isnull(min_time) else str(min_time) else: min_value, min_time = "", "" mean_item_name = f"{item}_mean" avg_value = df[mean_item_name].mean() if not pd.isna(avg_value): avg_value = round(avg_value, 2) else: avg_value = "" # if not max_value and not min_value and not avg_value: # continue if mtid: mtid = str(mtid) electric_index = dict( index=item, ) electric_index["max_" + mtid] = max_value electric_index["max_time_" + mtid] = max_time or "" electric_index["min_" + mtid] = min_value electric_index["min_time_" + mtid] = min_time or "" electric_index["avg_" + mtid] = avg_value else: electric_index = dict( item=item, max=max_value, max_time=max_time or "", min=min_value, min_time=min_time or "", avg=avg_value, ) else: electric_index = dict(item=item, max="", max_time="", min="", min_time="", avg="") indexes_list.append(electric_index) return indexes_list def cal_aiao_value(location_datas, datas, mtid=None): """ 安全指数数据封装 :param location_datas: 安全设置 :param datas: 数据 :param mtid: 字段是否需要合并mtid :return: """ location_map = {loca["lid"]: loca for loca in location_datas} df = pd.DataFrame(list(datas)) indexes_list = [] for lid, item in location_map.items(): if item["ad_type"] == "residual_current": index = "漏电流" else: index = f"{item.get('item')}" if datas: current_df = df.loc[df["lid"] == lid] if current_df.empty: continue max_value = current_df.value_max.max() if not pd.isna(max_value): max_datas = df.loc[ current_df.value_max.idxmax()].to_dict() max_value_time = max_datas.get("value_max_time") max_value_time = "" if pd.isnull(max_value_time) else str( max_value_time) max_value = round(max_value, 2) else: max_value, max_value_time = "", "" min_value = current_df.value_min.min() if not pd.isna(min_value): min_datas = df.loc[ current_df.value_min.idxmin()].to_dict() min_value_time = min_datas.get("value_min_time") min_value_time = "" if pd.isnull(min_value_time) else str( min_value_time) min_value = round(min_value, 2) else: min_value, min_value_time = "", "" mean_value = current_df.value_avg.mean() if not pd.isna(mean_value): mean_value = round(mean_value, 2) else: mean_value = "" if mtid: mtid = str(mtid) electric_index = dict( item=index, ) electric_index["max_" + mtid] = max_value electric_index["max_time_" + mtid] = max_value_time electric_index["min_" + mtid] = min_value electric_index["min_time_" + mtid] = min_value_time electric_index["avg_" + mtid] = mean_value else: electric_index = dict( type=item["ad_type"], item=index, max=max_value, max_time=max_value_time, min=min_value, min_time=min_value_time, avg=mean_value, ) else: electric_index = dict(type=item["ad_type"], item=index, max="", max_time="", min="", min_time="", avg="") indexes_list.append(electric_index) return indexes_list def cal_pt_value(datas, mtid=None): """ 用电指数数据封装 :param datas: 数据 :param mtid: 字段是否需要合并mtid :return: """ df = pd.DataFrame(list(datas)) indexes_list = [] if datas: # item = item.rsplit("_", 1)[0] max_item_name = f"temp_max" max_value = df[max_item_name].max() if not pd.isna(max_value): max_datas = df.loc[df[max_item_name].idxmax()].to_dict() max_time = max_datas.get(f"{max_item_name}_time") max_time = "" if pd.isnull(max_time) else str(max_time) else: max_value, max_time = "", "" min_item_name = f"temp_min" min_value = df[min_item_name].min() if not pd.isna(min_value): min_datas = df.loc[df[min_item_name].idxmin()].to_dict() min_time = min_datas.get(f"{min_item_name}_time") min_time = "" if pd.isnull(min_time) else str(min_time) else: min_value, min_time = "", "" mean_item_name = f"temp_mean" avg_value = df[mean_item_name].mean() if not pd.isna(avg_value): avg_value = round(avg_value, 2) else: avg_value = "" # if not max_value and not min_value and not avg_value: # continue if mtid: mtid = str(mtid) electric_index = dict( index="温度(℃)", ) electric_index["max_" + mtid] = max_value electric_index["max_time_" + mtid] = max_time or "" electric_index["min_" + mtid] = min_value electric_index["min_time_" + mtid] = min_time or "" electric_index["avg_" + mtid] = avg_value else: electric_index = dict( index="温度(℃)", max=max_value, max_time=max_time or "", min=min_value, min_time=min_time or "", avg=avg_value, ) else: electric_index = dict(index="温度(℃)", max="", max_time="", min="", min_time="", avg="") indexes_list.append(electric_index) return indexes_list