|
|
@@ -1,5 +1,14 @@
|
|
|
+import struct
|
|
|
+from abc import abstractmethod
|
|
|
+from typing import Tuple, Annotated
|
|
|
+
|
|
|
+from beartype.vale import Is
|
|
|
+
|
|
|
+from nbus_api.nbus_slave_device import NBusSlaveDevice
|
|
|
+from nbus_api.nbus_common_parser import NbusCommonParser
|
|
|
from nbus_hal.nbus_serial.serial_port import *
|
|
|
from nbus_types.nbus_address_type import NBusModuleAddress
|
|
|
+from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
|
|
|
|
|
|
|
|
|
@beartype
|
|
|
@@ -10,12 +19,119 @@ class NBusSlaveModule:
|
|
|
self.__module_addr = module_address
|
|
|
self.__params = {}
|
|
|
self.__devices = {}
|
|
|
+ self._map_param_get = lambda t, v: v
|
|
|
+ self._map_param_set = lambda t, v: v
|
|
|
+
|
|
|
+ def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
|
|
|
+ map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]):
|
|
|
+ self._map_param_get = map_param_get_cb
|
|
|
+ self._map_param_set = map_param_set_cb
|
|
|
+
|
|
|
+ def add_device(self, device: NBusSlaveDevice) -> None:
|
|
|
+ device.set_parent_module_address(self.__module_addr)
|
|
|
+ device.set_device_port(self.__port)
|
|
|
+ self.__devices[device.address] = device
|
|
|
+
|
|
|
+ def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice:
|
|
|
+ return self.__devices[device_address]
|
|
|
|
|
|
- def cmd_get_echo_module(self, message: bytearray) -> bool:
|
|
|
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_ECHO, message)
|
|
|
+
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Module Get Commands
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+
|
|
|
+ def cmd_get_echo(self, message: bytearray) -> bool:
|
|
|
+ """
|
|
|
+ Send Echo Command.
|
|
|
+ :param message: message to send
|
|
|
+ :return: status (True = echo, False = no echo)
|
|
|
+ """
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
|
|
|
return response == list(message)
|
|
|
|
|
|
+ def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
|
|
|
+ # get response
|
|
|
+ resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
|
|
|
+ bytearray([parameter.value]))
|
|
|
+
|
|
|
+ param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
|
|
|
+ param_val = self._map_param_get(param_id, param_val_raw)
|
|
|
+
|
|
|
+ self.__params[param_id] = param_val
|
|
|
+
|
|
|
+ return param_id, param_val
|
|
|
+
|
|
|
+ def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
|
|
|
+ resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
|
|
|
+ params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
|
|
|
+
|
|
|
+ params = []
|
|
|
+
|
|
|
+ for param_id, param_val_raw in params_raw:
|
|
|
+ param_val = self._map_param_get(param_id, param_val_raw)
|
|
|
+
|
|
|
+ params.append((param_id, param_val))
|
|
|
+ self.__params[param_id] = param_val
|
|
|
+
|
|
|
+ return params
|
|
|
+
|
|
|
+ def cmd_get_sensor_cnt(self) -> Annotated[int, Is[lambda value: 0 < value < 64]]:
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
|
|
|
+ return response[0]
|
|
|
+
|
|
|
+ def cmd_get_data(self):
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
|
|
|
+ begin_idx = 0
|
|
|
+ data = []
|
|
|
+
|
|
|
+ while begin_idx < resp_length:
|
|
|
+ device_id = response[begin_idx]
|
|
|
+
|
|
|
+ values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:])
|
|
|
+ data.append((device_id, values))
|
|
|
+ begin_idx += offset + 1
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ def cmd_get_info(self, parameter: NBusInfoParam):
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO,
|
|
|
+ bytearray([parameter.value]))
|
|
|
+
|
|
|
+ if parameter == NBusInfoParam.INFO_FORMAT:
|
|
|
+ begin_idx = 0
|
|
|
+ formats = []
|
|
|
+
|
|
|
+ while begin_idx < resp_length:
|
|
|
+ device_id = response[begin_idx]
|
|
|
+ device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
|
|
|
+
|
|
|
+ self.__devices[device_id].data_format = device_format
|
|
|
+ formats.append((device_id, device_format))
|
|
|
+ begin_idx += 4
|
|
|
+
|
|
|
+ return formats
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def cmd_get_sensor_type(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Module Set Commands
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+
|
|
|
+ def cmd_set_module_stop(self):
|
|
|
+ self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
|
|
|
|
|
|
+ def cmd_set_module_start(self):
|
|
|
+ self.__port.request_module(self.__module_addr,NBusCommand.CMD_SET_START, bytearray([]))
|
|
|
|
|
|
|
|
|
|