from unify_api.modules.zhiwei_u import config from dataclasses import fields from unify_api.modules.zhiwei_u.dao.data_es_dao import \ query_point_1min_index, query_location_1min_index from unify_api.modules.zhiwei_u.dao.data_operations_dao import * from unify_api.modules.zhiwei_u.dao.order_operations_dao import \ select_cname_by_cid from unify_api.modules.zhiwei_u.dao.warning_operations_dao import get_username from unify_api.modules.electric.procedures.electric_util import get_wiring_type from unify_api.modules.zhiwei_u.procedures.common import get_time from unify_api.modules.zhiwei_u.components.data_operations_cps import \ DataOperationSearchResp, U, CurrentHarmonic, I, PH, UDEV, TEMP, RESI, \ VoltageHarmonicRate, CurrentHarmonicRate, MtidToInfoResp, ParamsResp from unify_api.modules.zhiwei_u.service.scope_operations_service import \ excl_download from pot_libs.common.components.responses import Success async def mtid_to_info_service(mtid): """根据mtid sid返回基本信息""" data = await get_info_by_mtid(mtid) if not data: return Success(success=0, message="数据错误") cid = data[0]["cid"] company_data = await select_cname_by_cid(cid) point_data = await select_point_by_mtid(mtid) return MtidToInfoResp( cid=cid, sid=data[0]["sid"], prod_id=company_data["product"], product=config.PRODUCT[company_data["product"]], shortname=company_data["shortname"], pid=point_data["pid"], name=point_data["name"], ) async def sid_to_params_service(sid, pid): """根据sid pid返回参数""" if sid: data = await get_mtid_by_sid(sid) else: data = await get_mtids_by_pid(pid) datas = config.PHYSICAL.copy() if len(data) == 1: location_id = await get_params_by_mtid(data[0].get("mtid")) if len(location_id) == 2: datas["temperature"] = "温度" datas["residual_current"] = "漏电流" elif len(location_id) == 1 and location_id[0]["type"] == "temperature": datas["temperature"] = "温度" elif len(location_id) == 1 and \ location_id[0]["type"] == "residual_current": datas["residual_current"] = "漏电流" return ParamsResp(datas) async def get_temp_data(pid, start, end, slots): mt_data = await get_mtid_by_pid(pid) all_datas = { "Atemp": ['' for _ in range(1440)], "Btemp": ['' for _ in range(1440)], "Ctemp": ['' for _ in range(1440)], "Ntemp": ['' for _ in range(1440)] } datas = await query_location_1min_index(start, end, mt_data["mtid"], fields=["temp1", "temp2", "temp3", "temp4"]) for data in datas: index = slots.index(data["ts"]) all_datas["Atemp"][index] = data["temp1"] all_datas["Btemp"][index] = data["temp2"] all_datas["Ctemp"][index] = data["temp3"] all_datas["Ntemp"][index] = data["temp4"] return all_datas async def data_operation_search_service(mtid, point, params, start, end, download=None, user_id=None): slots = get_time() if not point and mtid: data = await get_pid_by_mtid(mtid) point = data["pid"] # 是否有温度和漏电流 all_datas, temp = {}, {} if "temperature" in params: temperature = await get_temp_data(point, start, end, slots) temp = TEMP(Atemp=temperature["Atemp"], Btemp=temperature["Btemp"], Ctemp=temperature["Ctemp"], Ntemp=temperature["Ntemp"]) resi = {} if "residual_current" in params: mt_data = await get_mtid_by_pid(point) resi_data = await get_locationid_by_mtid(mt_data["mtid"], "residual_current") # 动态漏电流阈值 threhold = await get_residual_current_threhold(resi_data[0]["id"]) residual_current_threhold = threhold["threshold"] if threhold else 30 resi_mid_data = {"resi": ['' for _ in range(1440)]} resi_datas = await query_location_1min_index(start, end, mt_data[ "mtid"], fields=["residual_current"]) for data in resi_datas: index = slots.index(data["ts"]) resi_mid_data["resi"][index] = data["residual_current"] resi = RESI(threhold=residual_current_threhold, value_slots=resi_mid_data.get("resi")) ctnum, _ = await get_wiring_type(point) if ctnum not in [2, 3]: ctnum = 3 words = [] # 不管查询什么都查询一遍u new_params = params.copy() new_params.append("u") # 构建输出字段 config.PHYSICAL_MORE for p in new_params: if p in config.PHYSICAL_MORE: words = words + config.PHYSICAL_MORE[p][ctnum] else: words.append(p) for i in words: all_datas[i] = [] origin_datas = await query_point_1min_index(start, end, mtid, words) new_data = {d["ts"]: d for d in origin_datas} for slot in slots: if slot in new_data.keys(): for i in all_datas.keys(): all_datas[i].append(new_data[slot].get(i, "")) else: for i in all_datas.keys(): all_datas[i].append('') ph = {} voltage_harmonic = VoltageHarmonicRate( **{k: v for k, v in all_datas.items() if k in [field.name for field in fields(VoltageHarmonicRate)]}) if "fdie" in params: current_harmonic = CurrentHarmonic( **{k: v for k, v in all_datas.items() if k in [field.name for field in fields(CurrentHarmonic)]}) else: current_harmonic = CurrentHarmonicRate( **{k: v for k, v in all_datas.items() if k in [field.name for field in fields(CurrentHarmonicRate)]}) # 识电U只有一项有数据,返回具体的项 sdu_i = None sdu_u = None if ctnum == 2: u = U(uab=all_datas.get("uab"), ucb=all_datas.get("ucb")) i = I(ia=all_datas.get("ia"), ic=all_datas.get("ic")) u_dev = UDEV(uab_dev=all_datas.get("uab_dev"), ucb_dev=all_datas.get("ucb_dev")) if all_datas.get("uab") and all_datas.get("ucb") and \ any(all_datas.get("uab")) and not any(all_datas.get("ucb")): sdu_i = "ia" sdu_u = "uab" if all_datas.get("uab") and all_datas.get("ucb") and \ any(all_datas.get("ucb")) and not any(all_datas.get("uab")): sdu_i = "ic" sdu_u = "ucb" else: u = U(ua=all_datas.get("ua"), ub=all_datas.get("ub"), uc=all_datas.get("uc")) i = I(ia=all_datas.get("ia"), ib=all_datas.get("ib"), ic=all_datas.get("ic")) u_dev = UDEV(ua_dev=all_datas.get("ua_dev"), ub_dev=all_datas.get("ub_dev"), uc_dev=all_datas.get("uc_dev")) if ( all_datas.get("ua") and all_datas.get("ub") and all_datas.get("uc") and any(all_datas.get("ua")) and not any(all_datas.get("ub")) and not any(all_datas.get("uc")) ): sdu_i = "ia" sdu_u = "ua" elif ( all_datas.get("ua") and all_datas.get("ub") and all_datas.get("uc") and any(all_datas.get("ub")) and not any(all_datas.get("ua")) and not any(all_datas.get("uc")) ): sdu_i = "ib" sdu_u = "ub" elif ( all_datas.get("ua") and all_datas.get("ub") and all_datas.get("uc") and any(all_datas.get("uc")) and not any(all_datas.get("ua")) and not any(all_datas.get("ub")) ): sdu_i = "ic" sdu_u = "uc" else: ph = PH(**{k: v for k, v in all_datas.items() if k in [field.name for field in fields(PH)]}) if "u" not in params: u = {} if download: download_auth = None # 是否是下载文件 if user_id: user_info = await get_username(user_id) download_auth = user_info.get("role", None) if download_auth == 2: point_info = await get_name_by_pid(point) table_name_list = [config.PHYSICAL.get(p) for p in params if p not in ("temperature", "residual_current")] content_name = "/".join(table_name_list) dict_data = {"时间": slots} key = get_download_data(sdu_u, params) if "temperature" in params: content_name += "/温度" temperature = { "A线温度(℃)": temp.Atemp, "B线温度(℃)": temp.Btemp, "C线温度(℃)": temp.Ctemp, "N线温度(℃)": temp.Ntemp } dict_data.update(temperature) if "residual_current" in params: content_name += "/漏电流" residual_current = { "漏电流(mA)": resi.value_slots } dict_data.update(residual_current) dict_data2 = {config.FILE_KEY[k]: all_datas.get(k) for k in key} dict_data.update(dict_data2) if "fdie" in params and sdu_i and len(sdu_u) == 2: dict_data[config.FILE_KEY[f"fdie{sdu_i[-1]}"]] = \ get_nums(f"fdi{sdu_i[-1]}", f"thdi{sdu_i[-1]}", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}i{sdu_i[-1]}"]] = \ get_nums(f"fdi{sdu_i[-1]}", f"hr{i}i{sdu_i[-1]}", all_datas) elif "fdie" in params and sdu_i and len(sdu_u) == 3: dict_data[config.FILE_KEY["fdiea"]] = \ get_nums("fdia", "thdia", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}ia"]] = \ get_nums("fdia", f"hr{i}ia", all_datas) dict_data[config.FILE_KEY["fdiec"]] = \ get_nums("fdic", "thdic", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}ic"]] = \ get_nums("fdic", f"hr{i}ic", all_datas) elif "fdie" in params: dict_data[config.FILE_KEY["fdiea"]] = \ get_nums("fdia", "thdia", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}ia"]] = \ get_nums("fdia", f"hr{i}ia", all_datas) dict_data[config.FILE_KEY["fdiec"]] = \ get_nums("fdic", "thdic", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}ic"]] = \ get_nums("fdic", f"hr{i}ic", all_datas) dict_data[config.FILE_KEY["fdieb"]] = \ get_nums("fdib", "thdib", all_datas) for i in range(3, 15, 2): dict_data[config.FILE_KEY[f"fd{i}ib"]] = \ get_nums("fdib", f"hr{i}ib", all_datas) table_name = point_info["shortname"] + "_" + point_info["name"] + \ "_" + start[:10] + "_" + content_name return await excl_download(dict_data, table_name, "sheet1") else: return Success(success=0, message="您的权限不足") return DataOperationSearchResp( ctnum=ctnum, sdu_i=sdu_i, sdu_u=sdu_u, u=u, i=i, freq=all_datas.get("freq"), pttl=all_datas.get("pttl"), qttl=all_datas.get("qttl"), sttl=all_datas.get("sttl"), costtl=all_datas.get("costtl"), freq_dev=all_datas.get("freq_dev"), ubl=all_datas.get("ubl"), ibl=all_datas.get("ibl"), ph=ph, u_dev=u_dev, voltage_harmonic=voltage_harmonic, current_harmonic=current_harmonic, temperature=temp, residual_current=resi, time_slots=slots ) def get_download_data(sdu_u, params): key = [i for i in params if i in config.PHYSICAL.keys()] if sdu_u and len(sdu_u) == 2: # 单项 if "u" in params: key.remove("u") key.append(f"u{sdu_u[-1]}") if "i" in params: key.remove("i") key.append(f"i{sdu_u[-1]}") if "u_dev" in params: key.remove("u_dev") key.append(f"u{sdu_u[-1]}_dev") if "thdu" in params: key.remove("thdu") key.append(f"thdu{sdu_u[-1]}") for i in range(3, 15, 2): key.append(f"hr{str(i)}u{sdu_u[-1]}") if "thdi" in params: key.remove("thdi") key.append(f"thdi{sdu_u[-1]}") for i in range(3, 15, 2): key.append(f"hr{str(i)}i{sdu_u[-1]}") if "fdie" in params: key.remove("fdie") elif sdu_u and len(sdu_u) == 3: # 两相 if "u" in params: key.remove("u") key += ["uab", "ucb"] if "i" in params: key.remove("i") key += ["ia", "ic"] if "u_dev" in params: key.remove("u_dev") key += ["uab_dev", "ucb_dev"] if "thdu" in params: key.remove("thdu") key += [ "thduab", "hr3uab", "hr5uab", "hr7uab", "hr9uab", "hr11uab", "hr13uab", "thducb", "hr3ucb", "hr5ucb", "hr7ucb", "hr9ucb", "hr11ucb", "hr13ucb" ] if "thdi" in params: key.remove("thdi") key += [ "thdia", "hr3ia", "hr5ia", "hr7ia", "hr9ia", "hr11ia", "hr13ia", "thdic", "hr3ic", "hr5ic", "hr7ic", "hr9ic", "hr11ic", "hr13ic" ] if "fdie" in params: key.remove("fdie") else: # 三项33 if "u" in params: key.remove("u") key += ["ua", "ub", "uc"] if "i" in params: key.remove("i") key += ["ia", "ib", "ic"] if "ph" in params: key.remove("ph") key += ["phA", "phB", "phC"] if "u_dev" in params: key.remove("u_dev") key += ["ua_dev", "ub_dev", "uc_dev"] if "thdu" in params: key.remove("thdu") key += [ "thdua", "hr3ua", "hr5ua", "hr7ua", "hr9ua", "hr11ua", "hr13ua", "thdub", "hr3ub", "hr5ub", "hr7ub", "hr9ub", "hr11ub", "hr13ub", "thduc", "hr3uc", "hr5uc", "hr7uc", "hr9uc", "hr11uc", "hr13uc" ] if "thdi" in params: key.remove("thdi") key += [ "thdia", "hr3ia", "hr5ia", "hr7ia", "hr9ia", "hr11ia", "hr13ia", "thdic", "hr3ic", "hr5ic", "hr7ic", "hr9ic", "hr11ic", "hr13ic", "thdib", "hr3ib", "hr5ib", "hr7ib", "hr9ib", "hr11ib", "hr13ib", ] if "fdie" in params: key.remove("fdie") return key def get_nums(word1, word2, all_datas): fd = [] for index, num in enumerate(all_datas.get(word1)): if num != "" and all_datas.get(word2)[index] != "": res = num * all_datas.get(word2)[index] fd.append(round(res, 2)) else: fd.append("") return fd