import struct from nbus_hal.nbus_generic_port import NBusPort from nbus_types.nbus_command_type import NBusCommand from nbus_types.nbus_data_fomat import * from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_parameter_type import * from nbus_types.nbus_address_type import NBusSensorAddress, NBusModuleAddress from nbus_types.nbus_sensor_type import NBusSensorType from nbus_api.nbus_common_parser import NbusCommonParser from nbus_types.nbus_status_type import NBusStatusType @beartype class NBusSensor: """ Class representing nBus sensor type. """ def __init__(self, port: NBusPort, module_address: NBusModuleAddress, address: NBusSensorAddress): """ Constructor. :param port: NBusPort object :param module_address: address of NBusModule :param address: device address """ self.__address = address self.__type = None self.__module_address = module_address self.__port = port self.__data_format = None self.__params = {} """ ================================================================================================================ Public Fields ================================================================================================================ """ @property def address(self): """ Get sensor address. :return: sensor address """ return self.__address @property def data_format(self): """ Get data format. :return: data format """ return self.__data_format @data_format.setter def data_format(self, data_format: NBusDataFormat): """ Set data format. :param data_format: format of data """ self.__data_format = data_format @property def parameters(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get parameters. :return: parameters """ return self.__params @parameters.setter def parameters(self, values: dict[NBusParameterID, NBusParameterValue]): """ Set parameters. :param values: values to set """ self.__params = values """ ================================================================================================================ Device Get Commands ================================================================================================================ """ def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue: """ Get single sensor parameter. :param parameter: parameter id :return: parameter value """ # get response resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) # parse parameter param_id, param_val = NbusCommonParser.parameters_from_response(resp_len, response)[0] # store parameter value self.__params[param_id] = param_val return param_val def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get all sensor parameters. :return: dict of parameter id and parameter value """ # get response resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_PARAM, bytearray([])) # parse parameters params = NbusCommonParser.parameters_from_response(resp_len, response) for param_id, param_val in params: # store parameters self.__params[param_id] = param_val return self.__params.copy() def cmd_get_data(self) -> list[NBusDataValue]: """ Get data from sensor. :raises :return: dict of device addresses and data values """ if self.__data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) _, *resp = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA, bytearray([])) values, _ = NbusCommonParser.data_from_response(self.data_format, resp) return values def cmd_get_sensor_type(self) -> NBusSensorType: """ Get sensor type. :return: sensor type """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([])) self.__type = response[0] return NBusSensorType(self.__type) def cmd_get_format(self): """ Get sensor format. :return: sensor format """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT, bytearray([])) self.data_format = NbusCommonParser.format_from_response(response) return self.data_format """ ================================================================================================================ Device Set Commands ================================================================================================================ """ def cmd_set_find(self, enable: bool) -> NBusStatusType: """ Turn on/off physical indicator of device. :param enable: to start (True) / stop (False) finding :return: status """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_FIND, bytearray([enable])) return NBusStatusType(response[0]) def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType: """ Set sensor parameter. :param param: parameter ID :param value: parameter value :return: status """ # create request packet param_id_raw = struct.pack("B", param.value) param_val_raw = struct.pack(" dict[NBusParameterID, NBusStatusType]: """ Set multiple sensor parameters. :param params: parameters :return: dict od statuses """ # create request packet param_bytes = NbusCommonParser.parameters_to_request(params) # send request resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_PARAM, bytearray(param_bytes)) # parse statuses statuses = {} for i in range(0, resp_length - 1, 2): p_id = NBusParameterID(response[i]) if response[i + 1] == NBusStatusType.STATUS_SUCCESS: self.__params[p_id] = params[p_id] # if success, store param statuses[p_id] = NBusStatusType(response[i + 1]) return statuses def cmd_set_calibrate(self) -> NBusStatusType: """ Send calibration command. :return: calibration status """ resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_CALIBRATE, bytearray([])) return NBusStatusType(response[0]) def cmd_set_data(self, data: list[NBusDataValue]) -> NBusStatusType: """ Set data to read-write sensor. :param data: data to set :raises: NBusErrorAPIType :return: operation status """ if self.__data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) # create request packet request = [self.__address] # transform data request.extend(NbusCommonParser.data_to_request(self.data_format, data)) # send request resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_DATA, bytearray(request)) # return response status return NBusStatusType(response[1])