import struct from nbus_api.nbus_sensor import NBusSensor from nbus_api.nbus_common_parser import NbusCommonParser from nbus_hal.nbus_serial.serial_port import * from nbus_types.nbus_address_type import NBusModuleAddress from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue from nbus_types.nbus_status_type import NBusStatusType from nbus_types.nbus_sensor_count_type import NBusSensorCount from nbus_types.nbus_info_type import NBusInfo from nbus_types.nbus_sensor_type import NBusSensorType @beartype class NBusSlaveModule: """ Class representing nBus slave module. """ def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress): """ Constructor. :param serial_port: serial port :param module_address: address of module """ self.__port = serial_port self.__module_addr = module_address self.__params = {} self.__devices = {} self._map_param_get = lambda t, v: v # dummy implementation self._map_param_set = lambda t, v: v # dummy implementation """ ================================================================================================================ Module General Methods ================================================================================================================ """ def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int], map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]) -> None: """ Set parameter mappers for module. :param map_param_get_cb: callback for map param get :param map_param_set_cb: callback for map param set """ self._map_param_get = map_param_get_cb self._map_param_set = map_param_set_cb def add_sensor(self, sensor: NBusSensor) -> None: """ Add sensor to container. :param sensor: nbus sensor """ sensor.set_parent_module_address(self.__module_addr) sensor.set_device_port(self.__port) self.__devices[sensor.address] = sensor def get_sensor(self, sensor_address: NBusSensorAddress) -> NBusSensor: """ Get module sensor. :param sensor_address: address of sensor :return: sensor """ return self.__devices[sensor_address] """ ================================================================================================================ Module Get Commands ================================================================================================================ """ def cmd_get_echo(self, message: bytearray) -> bool: """ Get echo from module. :param message: message to send :return: status (True = echo, False = no echo) """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message) return response == list(message) def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue: """ Get single module parameter. :param parameter: parameter id :return: parameter value """ # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) # parse parameter param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0] param_val = self._map_param_get(param_id, param_val_raw) # store parameter value self.__params[param_id] = param_val return param_val def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get all module parameters. :return: dict of parameter id and parameter value """ # get response resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([])) # parse parameters params_raw = NbusCommonParser.parameters_from_response(resp_len, response) for param_id, param_val_raw in params_raw: param_val = self._map_param_get(param_id, param_val_raw) # store parameters self.__params[param_id] = param_val return self.__params.copy() def cmd_get_sensor_cnt(self) -> NBusSensorCount: """ Get sensor count. :return: count of read-only and read-write sensors """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([])) return NBusSensorCount(*response) def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]: """ Get data from all module sensors. :return: dict of device addresses and data values """ # get data resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([])) # parse data begin_idx = 0 data = {} while begin_idx < resp_length: device_id = response[begin_idx] # handle errors if self.__devices[device_id].data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) if not self.__devices[device_id].data_parameters_loaded(): raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED) values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format, response[begin_idx:]) data[device_id] = self.__devices[device_id].map_data_get(values) begin_idx += offset + 1 return data def cmd_get_info(self) -> NBusInfo: """ Get module info. :return: module info """ response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([])) name = str(response[1:9], "ascii") typ = str(response[9:12], "ascii") uuid = struct.unpack(" dict[NBusSensorAddress, NBusDataFormat]: """ Get format of all on-board sensors. :return: dict of sensor addresses and sensor formats """ # get response resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT, bytearray([])) begin_idx = 0 formats = {} # parse format while begin_idx < resp_length: device_id = response[begin_idx] device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4]) self.__devices[device_id].data_format = device_format formats[device_id] = device_format begin_idx += 4 return formats def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]: """ Get type of all on-board sensors. :return: dict of sensor addresses and sensor types """ # get response resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([])) # parse response types = {} i = 0 while i < resp_length - 1: types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1]) i += 2 return types """ ================================================================================================================ Module Set Commands ================================================================================================================ """ def cmd_set_module_stop(self) -> NBusStatusType: """ Stop automatic measuring. :return: status """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([])) return NBusStatusType(response[0]) def cmd_set_module_start(self) -> NBusStatusType: """ Start automatic measuring. :return: status """ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([])) return NBusStatusType(response[0]) def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType: """ Set module parameter. :param param: parameter ID :param value: parameter value :return: status """ # create request packet param_id_raw = struct.pack("B", param.value) param_val_raw = struct.pack(" dict[NBusParameterID, NBusStatusType]: """ Set multiple module parameters. :param params: parameters :return: dict od statuses """ # scale parameters for p_id in params.keys(): params[p_id] = self._map_param_set(p_id, params[p_id]) # create request packet param_bytes = NbusCommonParser.parameters_to_request(params) # send request resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, bytearray(param_bytes)) # parse statuses statuses = {} for i in range(0, resp_length - 1, 2): p_id = NBusParameterID(response[i]) if response[i+1] == NBusStatusType.STATUS_SUCCESS: self.__params[p_id] = params[p_id] # if success, store param statuses[p_id] = NBusStatusType(response[i + 1]) return statuses def cmd_set_calibrate(self) -> NBusStatusType: """ Send calibration command. :return: calibration status """ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_CALIBRATE, bytearray([])) print(response) return NBusStatusType(response[0]) def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \ -> dict[NBusSensorAddress, NBusStatusType]: """ Set data to read-write sensors. :param data: :return: operation statuses """ # create request packet request = [] # transform data for addr in data.keys(): # handle errors if self.__devices[addr].data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) if not self.__devices[addr].data_parameters_loaded(): raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED) raw_data = self.__devices[addr].map_data_set(data[addr]) request.append(addr) request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data)) # send request resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA, bytearray(request)) # return response statuses statuses = {} for i in range(0, resp_length - 1, 2): statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1]) return statuses