device2cloud_cps.py 8.68 KB
from dataclasses import dataclass

import typing

from pot_libs.common.components.fields import Sid, MeterSn
from pot_libs.sanic_api import Model
from pot_libs.sanic_api.column import Str, Int, Opt, List, File, Dict
from pot_libs.sanic_api.openapi3 import Example
from unify_api.modules.device_cloud.components.common import Time


@dataclass
class Event(Model):
    event_type: str = Str('事件类型')
    threshold: int = Int('阈值')
    duration: int = Opt(Int('持续时间'))
    phase: str = Opt(Str('相').eg('A'))
    line: str = Opt(Str('线').eg('cb'))
    max: int = Opt(Int('最大值'))
    min: int = Opt(Int('最小值'))
    cur: int = Opt(Int('当前值'))
    field: str = Opt(Int('字段名'))


@dataclass
class Soe(Model):
    events: typing.List[Event] = List('事件列表').items(Event)


@dataclass
class SoeRequest(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('soe')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v1.0.8')
    meter_sn: MeterSn = MeterSn
    data: Soe = Soe

    @classmethod
    def examples(cls):
        data = dict(
            overI=Example(dict(
                event_type='overI',
                threshold=100,
                duration=60,
                phase='A',
                max=110
            ), summary='过流'),
            overPR=Example(dict(
                event_type='overPR',
                threshold=80,
                cur=81
            ), summary='负载率过高'),
            overResidualCurrent=Example(dict(
                event_type='overResidualCurrent',
                threshold=300,
                cur=350
            ), summary='剩余电流越限'),
            overTemp=Example(dict(
                event_type='overTemp',
                threshold=55,
                cur=57,
                field='temp2'
            ), summary='温度越限'),
            overTempRange1min=Example(dict(
                event_type='overTempRange1min',
                threshold=5,
                cur=7,
                field='temp3'
            ), summary='温度变化1min越限'),
            overTHDI=Example(dict(
                event_type='overTHDI',
                threshold=55,
                cur='58'
            ), summary='电流总谐波有效值越限'),
            overTHDU=Example(dict(
                event_type='overTHDU',
                threshold=200,
                cur=210
            ), summary='电压总谐波畸变率越限'),
            overU=Example(dict(
                event_type='overU',
                threshold='120',
                duration=60,
                phase='B',
                max=123
            ), summary='过压'),
            unbalanceI=Example(dict(
                event_type='unbalanceI',
                threshold=30,
                duration=60,
                max=32
            ), summary='三相电流不平衡度'),
            unbalanceU=Example(dict(
                event_type='unbalanceU',
                threshold=30,
                duration=60,
                max=33
            ), summary='三相电压不平衡度'),
            underPhasePF=Example(dict(
                event_type='underPhasePF',
                threshold=0,
                duration=60,
                phase='C',
                max=0.3,
            ), summary='某相功率因数越下限'),
            underTotalPF=Example(dict(
                event_type='underTotalPF',
                threshold=0,
                duration=60,
                max=0.4
            ), summary='总功率因数越下限'),
            underU=Example(dict(
                event_type='underU',
                threshold=78,
                duration=60,
                min=75
            ), summary='欠压')
        )
        for value in data.values():
            value.value = {
                "eq_type": "iotmtA",
                "data_topic": "adio",
                "sid": "A1911000284",
                "time": "2020-04-21 11:39:56",
                "soft_hardware_version": "v1.0.8",
                "meter_sn": "A1911000284",
                "data": {
                    "events": [value.value]
                }
            }
        return data


@dataclass
class SoeReq(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('soe')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v2.0.1')
    message_type: str = Str('adio').eg('adio')
    cid: int = Int()
    mtid: int = Int()
    data: Soe = Soe

    @classmethod
    def examples(cls):
        # soe温度告警报文样例
        data = {
            "events": [{
                "event_type": "overTemp",
                "cur": 58.013336,
                "threshold": 55,
                "field": "temp1",
                "lid": 410
            }, {
                "event_type": "overTemp",
                "cur": 60.550724,
                "threshold": 55,
                "field": "temp2",
                "lid": 410
            }, {
                "event_type": "overTemp",
                "cur": 67.842102,
                "threshold": 55,
                "field": "temp3",
                "lid": 410
            }, {
                "event_type": "overTemp",
                "cur": 61.613964,
                "threshold": 55,
                "field": "temp4",
                "lid": 410
            }, {
                "event_type": "overResidualCurrent",
                "cur": 352.308167,
                "threshold": 100,
                "field": "residual_current",
                "lid": 410
            }]
        }
        # soe漏电流告警报文样例
        # data = {
        #     "events": [{
        #         "event_type": "overResidualCurrent",
        #         "cur": 30.576611,
        #         "threshold": 30,
        #         "field": "residual_current",
        #         "lid": 410
        #     }]
        # }
        return data


@dataclass
class Adio(Model):
    aiao: object
    dido: object

    @classmethod
    def example(cls):
        return {
            'aiao': {},
            'dido': {'switch1': '00'}
        }


@dataclass
class AdioNew(Model):
    aiao: object
    dido: object

    @classmethod
    def example(cls):
        return {
            "dido": {
                "switch1": "11",
                "lid_switch1": 409
            },
            "aiao": {
                "temp1": 18.244444,
                "lid_temp1": 410,
                "temp2": 18.355556,
                "lid_temp2": 411,
                "temp3": 18.444445,
                "lid_temp3": 412,
                "temp4": 18.066666,
                "lid_temp4": 413,
                "residual_current": 0.205019,
                "lid_rc": 414,
                "humidity": 0,
                "lid_humidity": 415
            }
        }


@dataclass
class AdioRequest(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('adio')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v1.0.8')
    meter_sn: MeterSn = MeterSn
    data: Adio = Adio


@dataclass
class AdioReq(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('adio')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v2.0.3')
    meter_sn: MeterSn = MeterSn
    cid: int = Int()
    mtid: int = Int()
    data: AdioNew = AdioNew


@dataclass
class ScopeFiles(Model):
    scope_files: list = List().items(File())


@dataclass
class MeterSpec(Model):
    current_level: int = Int('current_level').eg(300)
    has_board: int = Int('has_board').eg(1)


@dataclass
class HardWare(Model):
    iccid: str = Str('SIM卡id').eg('')
    rssi: int = Int('信号强度').eg(-80)
    datanum: int = Int('挤压文件数').eg(3)
    nandfree: int = Int('剩余空间').eg(254238720)
    meter_spec: dict = Dict('sid').values(MeterSpec)


@dataclass
class HardWareRequest(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('hardware')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v1.0.8')
    meter_sn: MeterSn = MeterSn
    data: HardWare = HardWare


@dataclass
class ScopeRequest(Model):
    eq_type: str = Str('装置类型').eg('iotmtA')
    data_topic: str = Str('topic').eg('scope')
    sid: Sid = Sid
    time: Time = Time
    soft_hardware_version: str = Str('版本号').eg('v1.0.8')
    meter_sn: MeterSn = MeterSn
    faults: list = List('录波事件类型').items(Int())
    scope_count: int = Int('录波文件总数')
    scope_id: int = Int('录波id')
    slice_id: int = Int('录波文件索引')
    data: str = Str('录波字符')