import struct from abc import abstractmethod from typing import Tuple, Annotated from beartype.vale import Is from nbus_api.nbus_slave_device import NBusSlaveDevice from nbus_api.nbus_common_parser import NbusCommonParser from nbus_hal.nbus_serial.serial_port import * from nbus_types.nbus_address_type import NBusModuleAddress from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue from nbus_types.nbus_status_type import NBusStatusType from nbus_types.nbus_sensor_count_type import NBusSensorCount from nbus_types.nbus_info_type import NBusInfo from nbus_types.nbus_sensor_type import NBusSensorType @beartype class NBusSlaveModule: def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress): self.__port = serial_port self.__module_addr = module_address self.__params = {} self.__devices = {} self._map_param_get = lambda t, v: v self._map_param_set = lambda t, v: v def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int], map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]): self._map_param_get = map_param_get_cb self._map_param_set = map_param_set_cb def add_device(self, device: NBusSlaveDevice) -> None: device.set_parent_module_address(self.__module_addr) device.set_device_port(self.__port) self.__devices[device.address] = device def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice: return self.__devices[device_address] """ ================================================================================================================ Module Get Commands ================================================================================================================ """ def cmd_get_echo(self, message: bytearray) -> bool: """ Send Echo Command. :param message: message to send :return: status (True = echo, False = no echo) """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message) return response == list(message) def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]: # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0] param_val = self._map_param_get(param_id, param_val_raw) self.__params[param_id] = param_val return param_id, param_val def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]: resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([])) params_raw = NbusCommonParser.parameters_from_response(resp_len, response) for param_id, param_val_raw in params_raw: param_val = self._map_param_get(param_id, param_val_raw) self.__params[param_id] = param_val return self.__params.copy() def cmd_get_sensor_cnt(self) -> NBusSensorCount: _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([])) return NBusSensorCount(*response) def cmd_get_data(self): resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([])) begin_idx = 0 data = {} while begin_idx < resp_length: device_id = response[begin_idx] values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:]) data[device_id] = values begin_idx += offset + 1 return data def cmd_get_info(self): response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([])) name = str(response[1:9], "ascii") typ = str(response[9:12], "ascii") uuid = struct.unpack(" NBusStatusType: """ Send Find Command. :param enable: to start (True) / stop (False) finding :return: status """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_FIND, bytearray([enable])) return NBusStatusType(response[0]) def cmd_set_module_stop(self): self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([])) def cmd_set_module_start(self): self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))