import pendulum from unify_api.modules.common.dao.common_dao import ( get_point_monitor_dao, get_location_monitor_dao ) from uuid import uuid4 async def get_point_soe_config(event_data, with_params=False): point_id = event_data.get('point_id') event_settings = {k: v for k, v in event_data.items() if k in [ 'threshold', 'duration', 'enable' ]} datas = await get_point_monitor_dao(point_id) sid = datas.get("sid") meter_sn = datas.get("meter_no") config = { 'soe': { "electric": {meter_sn: {event_data['event_type']: event_settings}} } } if with_params: keys_li = ["ctr", "ptr", "ctnum", "vc", "tc", "imax"] params = {k: v for k, v in datas.items() if k in keys_li and (v is not None)} params['point_id'] = point_id config['params'] = {'electric': {meter_sn: params}} return sid, config async def get_location_soe_config(event, with_params): location_id = event.get('location_id') event_settings = {k: v for k, v in event.items() if k in [ 'threshold', 'duration', 'enable' ]} datas = await get_location_monitor_dao(location_id) sid = datas.get("sid") field = datas.get("ad_field") if field == "residual_current": config = {'soe': {"adio": {event['event_type']: event_settings}}} else: config = { 'soe': {"adio": {field: {event['event_type']: event_settings}}}} if with_params: config['params'] = {'adio': {field: {'location_id': location_id}}} return sid, config async def change_param_to_config(req_json, method): """ web参数, 转换为与装置通信的报文 soe method: config """ sid, config = None, None point_id = req_json.get("point_id") if point_id is not None: sid, config = await get_point_soe_config(req_json, with_params=True) location_id = req_json.get('location_id') if location_id is not None: sid, config = await get_location_soe_config(req_json, with_params=True) request_data = {} if sid: request_id = str(uuid4()) request_data = dict( request_id=request_id, time=pendulum.now().to_datetime_string(), method=method, sid=sid, data=config) return request_data async def switch_control_param_to_config(sid, field, switch, command="switch_control", method="command"): """ 联动控制, 转换为与装置通信的报文 soe method: command """ request_id = str(uuid4()) request_data = dict( request_id=request_id, time=pendulum.now().to_datetime_string(), method=method, sid=sid, data={'command': command, 'params': {'field': field, 'value': switch}} ) return request_data