from dataclasses import dataclass from pot_libs.common.components.fields import Sid from pot_libs.sanic_api import Model from pot_libs.sanic_api.column import Opt, Str, Int, Enum, Dict from unify_api.modules.device_cloud.components.common import RequestId, Time @dataclass class Entry(Model): # switch_control field: str = Opt(Str('字段名').eg('temp1')) value: object = Opt(Str('值,类型不确定').eg(23)) # setting-electric electric: int = Opt(Int('电度清零').eg(0)) # setting-max_demand max_demand: int = Opt(Int('最大需量清零').eg(0)) # setting-ctnum ctnum: int = Opt(Int().eg(2)) # setting-meter_interval、harm_interval(设置保存间隔) meter_interval: int = Opt(Int().eg(60)) harm_interval: int = Opt(Int().eg(300)) # upgrade md5: str = Opt(Str('md5')) url: str = Opt(Str('url')) # hardware_module current_level: int = Opt(Int('电流等级').eg(5)) has_board: int = Opt(Int('是否有小板').eg(1)) target: str = Opt(Str('字段名').eg('temp1')) # 4g 模块远程固件升级 ftp_ip: str = Opt(Str('ftp_ip').eg('183.239.240.40')) ftp_port: int = Opt(Int('ftp_port').eg(6001)) ftp_file: str = Opt(Str('ftp_file').eg('6bto6D.pack')) @dataclass class CommandDetails(Model): command: str = Enum('请求').of( 'switch_control', 'restart', 'setting', 'upgrade', 'hardware_module', 'format_flash' ) params: Entry = Opt(Entry) @dataclass class CommandV2Details(Model): # 2.0增加装置拆除命令dismantle command: str = Enum('请求').of( 'switch_control', 'restart', 'setting', 'upgrade', 'hardware_module', 'format_flash', "dismantle", "4G_upgrade" ) params: Entry = Opt(Entry) @dataclass class CommandRequest(Model): sid: Sid request_id: RequestId time: Time method: str = Str().eg('command') data: CommandDetails = CommandDetails @dataclass class CommandV2Request(Model): sid: Sid method: str = Str().eg('command') data: CommandV2Details = CommandV2Details @dataclass class GetConfigRequest(Model): sid: Sid method: str = Str().eg('get') # key: str = Str().eg('soe') @dataclass class GcReq(Model): sid: Sid method: str = Str().eg('get-config') # electric, soe, adio, scope, appliance, mqtt_ip_port, ratio_ids, # data_key, rs485中的一个 key: str = Str().eg('electric') @dataclass class SoeConfig(Model): @classmethod def example(cls): return { 'adio': { 'temp1': { 'overTemp': { 'threshold': 52, 'enable': 1 } }, 'residual_current': { 'overResidualCurrent': { "threshold": 300, "enable": 1 } } }, 'electric': { 'A1911000277': { 'overU': { "threshold": 120, "duration": 60, "enable": 1 } } } } @dataclass class Params(Model): @classmethod def example(cls): return { 'adio': {'temp1': {'location_id': 125}}, 'electric': {'A1903000043': {'point_id': 25}} } @dataclass class ScopeConfig(Model): @classmethod def example(cls): return { 'electric': { 'A1903000043': { "coef": { "Kua": 2.851818, "Bua": 2043.24585, "Kub": 2.849818, "Bub": 2036.039917, "Kuc": 2.854545, "Buc": 2070.549316, "Kia": 230.264999, "Bia": 2067.127197, "Kib": 229.570007, "Bib": 2061.10498, "Kic": 230.429993, "Bic": 2058.934082 }, "threshold": { "umin": 187, "umax": 253, "urate": 0.22, "ugap": 119.25, "irate": 0.3, "igap": 341.880005, "imax": 5, "fmin": 49.5, "fmax": 50.5 } } } } @dataclass class TimeControl(Model): @classmethod def example(cls): return { 'switch_type': 3, "switch1": { "on_time": "06:00:00", "off_time": "18:00:00", "enable": 1, "work_at_night": 1 }, "switch2": { "on_time": "06:00:00", "off_time": "18:00:00", "enable": 1, "work_at_night": 1 } } @dataclass class ApplianceConfig(Model): @classmethod def example(cls): return { 'A': { 'feature_lib': { 'appliance_list': ["television", "ele_car_battery"], "television": [[], [], []], "ele_car_battery": [[], [], []] }, 'trans_threshold': 50, 'delay_time': 180 }, 'B': { 'feature_lib': { 'appliance_list': ["television", "ele_car_battery"], "television": [[], [], []], "ele_car_battery": [[], [], []] }, 'trans_threshold': 50, 'delay_time': 180 }, 'C': { 'feature_lib': { 'appliance_list': ["television", "ele_car_battery"], "television": [[], [], []], "ele_car_battery": [[], [], []] }, 'trans_threshold': 50, 'delay_time': 180 } } @dataclass class ElectricConfig(Model): # upload_freq字段单位为秒,表示多少秒上传一条数据 @classmethod def example(cls): return { "A1911000277": { "upload_freq": 60, "harm_count": 17 }, "A0000000002": { "upload_freq": 60, "harm_count": 17 } } @dataclass class SoeV2Config(Model): @classmethod def example(cls): return { "adio": { "temp1": { "overTemp": { "threshold": 52, "enable": 1 }, "overTempRange1min": { "threshold": 56, "enable": 1 } }, "residual_current": { "overResidualCurrent": { "threshold": 300, "enable": 1, "overRCnumberTHR": 3 } } }, "electric": { "A1911000277": { "overU": { "threshold": 120, "duration": 60, "enable": 1 }, "overI": { "threshold": 10, "duration": 80, "enable": 1 } }, "00000002": { "overU": { "threshold": 120, "duration": 60, "enable": 1 } } } } @dataclass class AdioConfig(Model): # upload_freq字段单位为秒,表示多少秒上传一条数据;switch_type默认为1 @classmethod def example(cls): return { "upload_freq": 60, "time_control": { "switch_type": 1, "switch1": { "on_time": "06:00:00", "off_time": "18:00:00", "enable": 1, "work_at_night": 1, }, "switch2": { "on_time": "06:00:00", "off_time": "18:00:00", "enable": 1, "work_at_night": 1, } } } @dataclass class ScopeV2Config(Model): @classmethod def example(cls): return { "A2004000206": { "threshold": { "scopeEnable": 1, "scoplimit": 150, "scopeEnableTime": 0, "harmNumber": 17, "umin": 67, "umax": 49, "urate": 0.22, "ugap": 50, "irate": 0.3, "igap": 20, "imax": 400, "fmin": 49.5, "fmax": 50.5 } }, "A1900000002": { "threshold": { "scopeEnable": 1, "scoplimit": 130, "scopeEnableTime": 1, "harmNumber": 13, "umin": 67, "umax": 49, "urate": 0.22, "ugap": 50, "irate": 0.3, "igap": 20, "imax": 400, "fmin": 49.5, "fmax": 50.5 } } } @dataclass class ApplianceV2Config(Model): # trans_threshold为突变阈值;delay_time为appliance报文上传延时时间,单位s @classmethod def example(cls): return { "A": { "feature_lib": { "appliance_list": [ "television", "ele_car_battery" ], "television": [ [], [], [] ], "ele_car_battery": [ [], [], [] ] }, "trans_threshold": 50, "delay_time": { "ele_car_battery": 600, "hot_fast": 300, "other": 210 } }, "B": { "feature_lib": { "appliance_list": [ "television", "ele_car_battery" ], "television": [ [], [], [] ], "ele_car_battery": [ [], [], [] ] }, "trans_threshold": 50, "delay_time": { "ele_car_battery": 600, "hot_fast": 300, "other": 210 } }, "C": { "feature_lib": { "appliance_list": [ "television", "ele_car_battery" ], "television": [ [], [], [] ], "ele_car_battery": [ [], [], [] ] }, "trans_threshold": 50, "delay_time": { "ele_car_battery": 600, "hot_fast": 300, "other": 210 } } } @dataclass class MqttIpPortConfig(Model): @classmethod def example(cls): return { "ip": "139.198.16.201", "port": "1883" } @dataclass class RatioIdsConfig(Model): @classmethod def example(cls): return { "point_ids": [ {"sid": "A109090002", "meter_sn": "000000001", "pid": 345, "mtid": 81, "cid": 10}, {"sid": "A109090002", "meter_sn": "000000002", "pid": 346, "mtid": 82, "cid": 10}, {"sid": "A109090002", "meter_sn": "000000003", "pid": 347, "mtid": 83, "cid": 10} ], "location_ids": [ {"sid": "A109090002", "field": "temp1", "lid": 123, "mtid": 84, "cid": 10}, {"sid": "A109090002", "field": "temp2", "lid": 124, "mtid": 85, "cid": 10}, {"sid": "A109090002", "field": "temp3", "lid": 125, "mtid": 86, "cid": 10}, {"sid": "A109090002", "field": "residual_current", "lid": 126, "mtid": 87, "cid": 10}, {"sid": "A109090002", "field": "switch1", "lid": 127, "mtid": 88, "cid": 10} ], "ratio": { "A1903000043": { "ctr": 1000, "ptr": 1, "vc": 400, "tc": 2500, "imax": 2500, "nom_freq": 50 }, "A1903000056": { "ctr": 1000, "ptr": 1, "vc": 400, "tc": 2500, "imax": 2500, "nom_freq": 50 } } } @dataclass class DataKeyConfig(Model): # data_key对所有point都生效 @classmethod def example(cls): return { "key_name": ["data", "data_v2"] } @dataclass class Rs485Config(Model): @classmethod def example(cls): return { "com1": { "work_mode": "poll", "protocol": "customize", "classify": [ { "meter_addr": [ "A2005000024" ], "parameters": [ 9600, 8, "N", 1 ], "timeout": 0.4 } ] } } @dataclass class TimeControlConfig(Model): @classmethod def example(cls): return { "on_time": "06:00:00", "off_time": "18:00:00", "enable": 1, "switch_type": 1 } @dataclass class ChangeSoe(Model): # todo soe: dict = Opt(SoeConfig) # scope: dict = Opt(ScopeConfig) params: dict = Opt(Params) time_control: dict = Opt(TimeControl) appliance: dict = Opt(ApplianceConfig) rs485: dict = Opt(Dict('r485配置')) @dataclass class ChangeSoeV2(Model): electric: dict = Opt(ElectricConfig) soe: dict = Opt(SoeV2Config) adio: dict = Opt(AdioConfig) scope: dict = Opt(ScopeV2Config) appliance: dict = Opt(ApplianceV2Config) mqtt_ip_port: dict = Opt(MqttIpPortConfig) ratio_ids: dict = Opt(RatioIdsConfig) data_key: dict = Opt(DataKeyConfig) rs485: dict = Opt(Rs485Config) # v1.0 time_control下发配置,迁移到v2.0 time_control: dict = Opt(TimeControlConfig) # v1.0 兼容旧版的params params: dict = Opt(Params) @dataclass class ChangeConfigRequest(Model): sid: Sid request_id: RequestId time: Time method: str = Str().eg('config') data: ChangeSoe = ChangeSoe @dataclass class ChangeConfigReqV2(Model): sid: Sid method: str = Str().eg('config') data: ChangeSoeV2 = ChangeSoeV2 @dataclass class GdsReq(Model): sid: Sid method: str = Str().eg('get-status')