import struct from abc import abstractmethod from typing import Tuple, Annotated from beartype.vale import Is from nbus_api.nbus_slave_device import NBusSlaveDevice from nbus_api.nbus_common_parser import NbusCommonParser from nbus_hal.nbus_serial.serial_port import * from nbus_types.nbus_address_type import NBusModuleAddress from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue @beartype class NBusSlaveModule: def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress): self.__port = serial_port self.__module_addr = module_address self.__params = {} self.__devices = {} self._map_param_get = lambda t, v: v self._map_param_set = lambda t, v: v def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int], map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]): self._map_param_get = map_param_get_cb self._map_param_set = map_param_set_cb def add_device(self, device: NBusSlaveDevice) -> None: device.set_parent_module_address(self.__module_addr) device.set_device_port(self.__port) self.__devices[device.address] = device def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice: return self.__devices[device_address] """ ================================================================================================================ Module Get Commands ================================================================================================================ """ def cmd_get_echo(self, message: bytearray) -> bool: """ Send Echo Command. :param message: message to send :return: status (True = echo, False = no echo) """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message) return response == list(message) def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]: # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0] param_val = self._map_param_get(param_id, param_val_raw) self.__params[param_id] = param_val return param_id, param_val def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]: resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([])) params_raw = NbusCommonParser.parameters_from_response(resp_len, response) params = [] for param_id, param_val_raw in params_raw: param_val = self._map_param_get(param_id, param_val_raw) params.append((param_id, param_val)) self.__params[param_id] = param_val return params def cmd_get_sensor_cnt(self) -> Annotated[int, Is[lambda value: 0 < value < 64]]: _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([])) return response[0] def cmd_get_data(self): resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([])) begin_idx = 0 data = [] while begin_idx < resp_length: device_id = response[begin_idx] values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:]) data.append((device_id, values)) begin_idx += offset + 1 return data def cmd_get_info(self, parameter: NBusInfoParam): resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([parameter.value])) if parameter == NBusInfoParam.INFO_FORMAT: begin_idx = 0 formats = [] while begin_idx < resp_length: device_id = response[begin_idx] device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4]) self.__devices[device_id].data_format = device_format formats.append((device_id, device_format)) begin_idx += 4 return formats def cmd_get_sensor_type(self): pass """ ================================================================================================================ Module Set Commands ================================================================================================================ """ def cmd_set_module_stop(self): self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([])) def cmd_set_module_start(self): self.__port.request_module(self.__module_addr,NBusCommand.CMD_SET_START, bytearray([]))