linkage_control_cps.py 2.88 KB
Newer Older
lcn's avatar
lcn committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
from dataclasses import dataclass

from pot_libs.common.components.fields import Cid
from pot_libs.sanic_api import Model
from pot_libs.sanic_api.column import List, Str, Int, Opt
from pot_libs.sanic_api.http_status import timeout_self, service_control, \
    hardware_timeout_opentoclose, hardware_timeout, hardware_control
from unify_api.utils.response_code import DbErr, JwtErr, UserErr, Success


@dataclass
class LinLocationReq(Model):
    cid: Cid


@dataclass
class LinkageLocation(Model):
    location_id: int = Int("监测点id").eg(281)
    name: str = Str("监测点").eg("1#进线_总开关")
    status: int = Int("开关状态, 0:开断, 1:闭合, 2:通信中断").eg(0)


@dataclass
class LinLocationResp(Model, DbErr, JwtErr, UserErr):
    authority: int = Int("是否有权限, 1:有权限, 0:没有权限").eg(1)
    location_info: list = List('联动控制状态监测').items(LinkageLocation)


@dataclass
class SwitchOpReq(Model):
    cid: Cid
    location_id: int = Int("监测点id").eg(281)
    switch: str = Str("操作开关: 01 开断->闭合").eg("01")


@dataclass
class SwitchOpResp(Model, DbErr, JwtErr, UserErr, Success):
    @classmethod
    @timeout_self
    def timeout_self_err(cls):
        """响应超时"""
        return '响应超时'

    @classmethod
    @service_control
    def service_err(cls):
        """响应超时"""
        return '响应超时'

    @classmethod
    @hardware_timeout_opentoclose
    def hardware_timeout_opentoclose_err(cls):
        """闭合失败"""
        return '闭合失败'

    @classmethod
    @hardware_timeout
    def hardware_timeout_err(cls):
        """响应超时"""
        return '响应超时'

    @classmethod
    @hardware_control
    def hardware_control_err(cls):
        """响应超时"""
        return '响应超时'


@dataclass
class SwitchRecordReq(Model):
    cid: Cid
    location_list: list = List("location_id列表").items(Int()).eg([281, 282])
    is_wechat: int = Int("1:小程序端, 0:web端").eg(1)
    page_num: int = Opt(Int("当前请求页").eg(1))
    page_record_num: int = Opt(Int("每页多少条数据").eg(20))
    start: str = Opt(Str("开始时间").eg("2020-07-29 00:00:00"))
    end: str = Opt(Str("结束时间").eg("2020-07-29 23:59:59"))


@dataclass
class SwitchRecord(Model):
    time: str = Str("时间").eg("2020-07-30 09:54:46")
    location: str = Str("监测点名称").eg("1#进线_总开关")
    type: str = Str("远程操作/本地操作, remote/local").eg("local")
    action_info: str = Str("动作情况, 01:开断→闭合").eg("01")
    user: str = Str("操作人").eg("张三")


@dataclass
class SwitchRecordResp(Model, DbErr, JwtErr, UserErr):
    total: int = Int("总条数").eg(60)
    rows: list = List('状态记录列表').items(SwitchRecord)


@dataclass
class SoesResp(Model):
    code: int = Int('自定义状态码').eg(200)
    message: str = Str("具体错误信息").eg("操作成功")