# -*- coding: utf-8 -*- import json import os import base64 import struct import re import numpy as np class LeakCurrentParser(object): """ 故障录波数据解析 path: json文件路径 return: self.ileak, 漏电流序列 self.ileak_rms, 漏电流序列有效值 """ def __init__(self, path, offset=2048, n_sample=80): self._path = path self.fault_type = None self.scope_count = 0 self._n_sample = n_sample self._T = 0.02 self._sample_f = 0.00025 self.time_range = None self.ileak = 0 self.ileak_rms = 0 self.offset = offset self.max_value = 4095 self.max_volt = 2.5 self.r = 10 self.max_i = 1000 / 200 self.k_A2mA = 1000 self.max_v_eq_max_i = self.max_volt / self.r * self.max_i self.k = self.max_v_eq_max_i * self.k_A2mA @staticmethod def _json_parser(file_name): '''just read json file''' data = None if os.path.exists(file_name): with open(file_name, 'r') as f: data = json.load(f) return data def _data_gen(self, filepath): filenames = sorted(os.listdir(filepath)) data_mix = [] time_range = [] for file in filenames: if re.search(r'.json$', file.lower()): fullpath = os.path.join(filepath, file) json_data = self._json_parser(fullpath) time_range.append(json_data['time']) self.scope_count = json_data['scope_count'] data_mix.extend(self._data_decode(json_data['data'])) self.fault_type = json_data["faultType"] self.time_range = time_range return data_mix @staticmethod def _data_decode(data): decoded_data = base64.b64decode(data) list_data = list(struct.unpack("<400H", decoded_data)) return list_data def _rms(self, data): len_data = len(data) data_pow2 = np.array(data) * np.array(data) rms_var = [] for i in range(len_data): if i >= self._n_sample: sum_T = np.sum(data_pow2[i - self._n_sample:i]) rms_var.append(np.sqrt(sum_T / self._n_sample)) else: sum_T = np.sum(data_pow2[0:self._n_sample]) rms_var.append(np.sqrt(sum_T / self._n_sample)) return rms_var def data_parser(self): data = self._data_gen(self._path) data_b = (np.array(data) - self.offset) / self.max_value * self.k self.ileak = list(data_b) self.ileak_rms = self._rms(self.ileak) def output(self): self.data_parser() def leakage_reg(ileak_rms=None,leak_hold=30,dula_dots=160): count=0 flag=0 for i in range(len(ileak_rms)): if ileak_rms[i]>leak_hold and flag==0: count=count+1 flag=1 elif ileak_rms[i]>leak_hold and flag==1: count = count + 1 else: flag=0 count=0 if count>=dula_dots: return "漏电流越限" return "漏电流未越限" if __name__ == '__main__': import matplotlib.pyplot as plt # import matplotlib;matplotlib.use("TkAgg") os.chdir(os.path.split(__file__)[0]) filename1 = r'/home/ubuntu/data/code/unify_api_1.5/unify_api/modules/zhiwei_u/fault_foreast' # sid = "A1911000284" # files=os.listdir(filename1) # for file in files: FaultRecordClass = LeakCurrentParser(filename1, offset=2048, n_sample=80) FaultRecordClass.output() # # plt.plot(FaultRecordClass.ileak, label='ileak') # plt.plot(FaultRecordClass.ileak_rms, label='ileak_rms') # plt.legend() # plt.show() # print(np.round(FaultRecordClass.ileak, 2)) # print(np.round(FaultRecordClass.ileak_rms, 2)) ileak= np.round(FaultRecordClass.ileak, 2) ileak_rms=np.round(FaultRecordClass.ileak_rms, 2) leakres=leakage_reg(ileak_rms=ileak_rms, leak_hold=30, dula_dots=80) # ileak_rms:漏电流有效值 类型: [] # ileak:漏电流实际值 type:[] # leak_hold:漏电流告警阈值 int # dula_dots:漏电流越限持续点数 int print(leakres)