from dataclasses import dataclass import typing from pot_libs.common.components.fields import Sid, MeterSn from pot_libs.sanic_api import Model from pot_libs.sanic_api.column import Str, Int, Opt, List, File, Dict from pot_libs.sanic_api.openapi3 import Example from unify_api.modules.device_cloud.components.common import Time @dataclass class Event(Model): event_type: str = Str('事件类型') threshold: int = Int('阈值') duration: int = Opt(Int('持续时间')) phase: str = Opt(Str('相').eg('A')) line: str = Opt(Str('线').eg('cb')) max: int = Opt(Int('最大值')) min: int = Opt(Int('最小值')) cur: int = Opt(Int('当前值')) field: str = Opt(Int('字段名')) @dataclass class Soe(Model): events: typing.List[Event] = List('事件列表').items(Event) @dataclass class SoeRequest(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('soe') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v1.0.8') meter_sn: MeterSn = MeterSn data: Soe = Soe @classmethod def examples(cls): data = dict( overI=Example(dict( event_type='overI', threshold=100, duration=60, phase='A', max=110 ), summary='过流'), overPR=Example(dict( event_type='overPR', threshold=80, cur=81 ), summary='负载率过高'), overResidualCurrent=Example(dict( event_type='overResidualCurrent', threshold=300, cur=350 ), summary='剩余电流越限'), overTemp=Example(dict( event_type='overTemp', threshold=55, cur=57, field='temp2' ), summary='温度越限'), overTempRange1min=Example(dict( event_type='overTempRange1min', threshold=5, cur=7, field='temp3' ), summary='温度变化1min越限'), overTHDI=Example(dict( event_type='overTHDI', threshold=55, cur='58' ), summary='电流总谐波有效值越限'), overTHDU=Example(dict( event_type='overTHDU', threshold=200, cur=210 ), summary='电压总谐波畸变率越限'), overU=Example(dict( event_type='overU', threshold='120', duration=60, phase='B', max=123 ), summary='过压'), unbalanceI=Example(dict( event_type='unbalanceI', threshold=30, duration=60, max=32 ), summary='三相电流不平衡度'), unbalanceU=Example(dict( event_type='unbalanceU', threshold=30, duration=60, max=33 ), summary='三相电压不平衡度'), underPhasePF=Example(dict( event_type='underPhasePF', threshold=0, duration=60, phase='C', max=0.3, ), summary='某相功率因数越下限'), underTotalPF=Example(dict( event_type='underTotalPF', threshold=0, duration=60, max=0.4 ), summary='总功率因数越下限'), underU=Example(dict( event_type='underU', threshold=78, duration=60, min=75 ), summary='欠压') ) for value in data.values(): value.value = { "eq_type": "iotmtA", "data_topic": "adio", "sid": "A1911000284", "time": "2020-04-21 11:39:56", "soft_hardware_version": "v1.0.8", "meter_sn": "A1911000284", "data": { "events": [value.value] } } return data @dataclass class SoeReq(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('soe') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v2.0.1') message_type: str = Str('adio').eg('adio') cid: int = Int() mtid: int = Int() data: Soe = Soe @classmethod def examples(cls): # soe温度告警报文样例 data = { "events": [{ "event_type": "overTemp", "cur": 58.013336, "threshold": 55, "field": "temp1", "lid": 410 }, { "event_type": "overTemp", "cur": 60.550724, "threshold": 55, "field": "temp2", "lid": 410 }, { "event_type": "overTemp", "cur": 67.842102, "threshold": 55, "field": "temp3", "lid": 410 }, { "event_type": "overTemp", "cur": 61.613964, "threshold": 55, "field": "temp4", "lid": 410 }, { "event_type": "overResidualCurrent", "cur": 352.308167, "threshold": 100, "field": "residual_current", "lid": 410 }] } # soe漏电流告警报文样例 # data = { # "events": [{ # "event_type": "overResidualCurrent", # "cur": 30.576611, # "threshold": 30, # "field": "residual_current", # "lid": 410 # }] # } return data @dataclass class Adio(Model): aiao: object dido: object @classmethod def example(cls): return { 'aiao': {}, 'dido': {'switch1': '00'} } @dataclass class AdioNew(Model): aiao: object dido: object @classmethod def example(cls): return { "dido": { "switch1": "11", "lid_switch1": 409 }, "aiao": { "temp1": 18.244444, "lid_temp1": 410, "temp2": 18.355556, "lid_temp2": 411, "temp3": 18.444445, "lid_temp3": 412, "temp4": 18.066666, "lid_temp4": 413, "residual_current": 0.205019, "lid_rc": 414, "humidity": 0, "lid_humidity": 415 } } @dataclass class AdioRequest(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('adio') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v1.0.8') meter_sn: MeterSn = MeterSn data: Adio = Adio @dataclass class AdioReq(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('adio') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v2.0.3') meter_sn: MeterSn = MeterSn cid: int = Int() mtid: int = Int() data: AdioNew = AdioNew @dataclass class ScopeFiles(Model): scope_files: list = List().items(File()) @dataclass class MeterSpec(Model): current_level: int = Int('current_level').eg(300) has_board: int = Int('has_board').eg(1) @dataclass class HardWare(Model): iccid: str = Str('SIM卡id').eg('') rssi: int = Int('信号强度').eg(-80) datanum: int = Int('挤压文件数').eg(3) nandfree: int = Int('剩余空间').eg(254238720) meter_spec: dict = Dict('sid').values(MeterSpec) @dataclass class HardWareRequest(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('hardware') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v1.0.8') meter_sn: MeterSn = MeterSn data: HardWare = HardWare @dataclass class ScopeRequest(Model): eq_type: str = Str('装置类型').eg('iotmtA') data_topic: str = Str('topic').eg('scope') sid: Sid = Sid time: Time = Time soft_hardware_version: str = Str('版本号').eg('v1.0.8') meter_sn: MeterSn = MeterSn faults: list = List('录波事件类型').items(Int()) scope_count: int = Int('录波文件总数') scope_id: int = Int('录波id') slice_id: int = Int('录波文件索引') data: str = Str('录波字符')