import struct from nbus_api.nbus_sensor import NBusSensor from nbus_api.nbus_common_parser import NbusCommonParser from nbus_hal.nbus_generic_port import * from nbus_types.nbus_address_type import NBusModuleAddress from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue from nbus_types.nbus_status_type import NBusStatusType from nbus_types.nbus_sensor_count_type import NBusSensorCount from nbus_types.nbus_info_type import NBusModuleInfo from nbus_types.nbus_sensor_type import NBusSensorType @beartype class NBusSlaveModule: """ Class representing nBus slave module. """ def __init__(self, port: NBusPort, module_address: NBusModuleAddress): """ Constructor. :param port: serial port :param module_address: address of module :param device_cnt: number of devices """ self.__port = port self.__module_addr = module_address self.__params = {} self.__devices = {} """ ================================================================================================================ Module General Methods ================================================================================================================ """ def init(self, load_format: bool) -> None: """ Initialize the module from hardware. :param load_format: flag to fetch data format """ sensors = self.cmd_get_sensor_type() for sen_address, sen_type in sensors.items(): self.__devices[sen_address] = NBusSensor(self.__port, self.__module_addr, sen_address) self.__devices[sen_address].type = sen_type if load_format: self.cmd_get_format() def get_devices(self) -> dict[NBusSensorAddress, NBusSensor]: """ Get module devices. :return: dictionary of connected devices """ return self.__devices """ ================================================================================================================ Module Get Commands ================================================================================================================ """ def cmd_get_echo(self, message: bytearray) -> bool: """ Get echo from module. :param message: message to send :return: status (True = echo, False = no echo) """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message) return response == list(message) def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue: """ Get single module parameter. :param parameter: parameter id :return: parameter value """ # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) # parse parameter param_id, param_val = NbusCommonParser.parameters_from_response(resp_len, response)[0] # store parameter value self.__params[param_id] = param_val return param_val def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get all module parameters. :return: dict of parameter id and parameter value """ # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([])) # parse parameters params = NbusCommonParser.parameters_from_response(resp_len, response) for param_id, param_val in params: # store parameters self.__params[param_id] = param_val return self.__params.copy() def cmd_get_sensor_cnt(self) -> NBusSensorCount: """ Get sensor count. :return: count of read-only and read-write sensors """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([])) return NBusSensorCount(*response) def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]: """ Get data from all module sensors. :return: dict of device addresses and data values """ # get data resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([])) # parse data begin_idx = 0 data = {} while begin_idx < resp_length: device_id = response[begin_idx] # handle errors if self.__devices[device_id].data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format, response[begin_idx:]) data[device_id] = values begin_idx += offset + 1 return data def cmd_get_info(self) -> NBusModuleInfo: """ Get module info. :return: module info """ response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([])) name = str(response[1:9], "ascii") typ = str(response[9:12], "ascii") uuid = struct.unpack(" dict[NBusSensorAddress, NBusDataFormat]: """ Get format of all on-board sensors. :return: dict of sensor addresses and sensor formats """ # get response resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT, bytearray([])) begin_idx = 0 formats = {} # parse format while begin_idx < resp_length: device_id = response[begin_idx] device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4]) self.__devices[device_id].data_format = device_format formats[device_id] = device_format begin_idx += 4 return formats def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]: """ Get type of all on-board sensors. :return: dict of sensor addresses and sensor types """ # get response resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([])) # parse response types = {} i = 0 while i < resp_length - 1: types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1]) i += 2 return types """ ================================================================================================================ Module Set Commands ================================================================================================================ """ def cmd_set_module_stop(self) -> NBusStatusType: """ Stop automatic measuring. :return: status """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([])) return NBusStatusType(response[0]) def cmd_set_module_start(self) -> NBusStatusType: """ Start automatic measuring. :return: status """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([])) return NBusStatusType(response[0]) def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType: """ Set module parameter. :param param: parameter ID :param value: parameter value :return: status """ # create request packet param_id_raw = struct.pack("B", param.value) param_val_raw = struct.pack(" dict[NBusParameterID, NBusStatusType]: """ Set multiple module parameters. :param params: parameters :return: dict od statuses """ # create request packet param_bytes = NbusCommonParser.parameters_to_request(params) # send request resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, bytearray(param_bytes), long_answer=1.0) # parse statuses statuses = {} for i in range(0, resp_length - 1, 2): p_id = NBusParameterID(response[i]) if response[i+1] == NBusStatusType.STATUS_SUCCESS: self.__params[p_id] = params[p_id] # if success, store param statuses[p_id] = NBusStatusType(response[i + 1]) return statuses def cmd_set_calibrate(self) -> NBusStatusType: """ Send calibration command. :return: calibration status """ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_CALIBRATE, bytearray([])) print(response) return NBusStatusType(response[0]) def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \ -> dict[NBusSensorAddress, NBusStatusType]: """ Set data to read-write sensors. :param data: :return: operation statuses """ # create request packet request = [] # transform data for addr in data.keys(): # handle errors if self.__devices[addr].data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) raw_data = data[addr] request.append(addr) request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data)) # send request resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA, bytearray(request)) # return response statuses statuses = {} for i in range(0, resp_length - 1, 2): statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1]) return statuses