import struct from abc import abstractmethod, ABCMeta from nbus_hal.nbus_generic_port import NBusPort from nbus_types.nbus_command_type import NBusCommand from nbus_types.nbus_data_fomat import * from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_parameter_type import * from nbus_types.nbus_address_type import NBusSensorAddress, NBusModuleAddress from nbus_types.nbus_sensor_type import NBusSensorType from nbus_api.nbus_common_parser import NbusCommonParser from nbus_types.nbus_status_type import NBusStatusType @beartype class NBusSensor(metaclass=ABCMeta): """ Class representing nBus sensor type. """ def __init__(self, address: NBusSensorAddress): """ Constructor. :param address: device address """ self.__address = address self.__module_address = None self.__port = None self.__data_format = None self.__params = {} """ ================================================================================================================ Public Fields ================================================================================================================ """ @property def address(self): """ Get sensor address. :return: sensor address """ return self.__address @property def data_format(self): """ Get data format. :return: data format """ return self.__data_format @data_format.setter def data_format(self, data_format: NBusDataFormat): """ Set data format. :param data_format: format of data """ self.__data_format = data_format @property def parameters(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get parameters. :return: parameters """ return self.__params @parameters.setter def parameters(self, values: dict[NBusParameterID, NBusParameterValue]): """ Set parameters. :param values: values to set """ self.__params = values """ ================================================================================================================ Module-only Methods ================================================================================================================ """ def set_parent_module_address(self, address: NBusModuleAddress) -> None: """ Set address of parent module. :param address: module address """ self.__module_address = address def set_device_port(self, port: NBusPort) -> None: """ Set device communication port. :param port: communicaiton port """ self.__port = port """ ================================================================================================================ Abstract Methods ================================================================================================================ """ @abstractmethod def data_parameters_loaded(self) -> bool: """ Verify that all necessary parameters are loaded before performing data get/set conversion. :return: true if ready for conversion, otherwise False """ pass @abstractmethod def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue: """ Convert a parameter from cmd_get_param() to its engineering range. :param param_id: the ID of the parameter :param param_value: the value of the parameter in binary format :return: the converted parameter value in engineering units """ pass @abstractmethod def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int: """ Convert a parameter to its binary range for cmd_set_data(). :param param_id: the ID of the parameter :param param_value: the value of the parameter in engineering units :return: the converted parameter value in binary format """ pass @abstractmethod def map_data_get(self, values: list[int]) -> list[NBusDataValue]: """ Convert data from cmd_get_data() to its engineering range. :param values: a list of values in binary format to be converted :return: a list of converted values in engineering units """ pass @abstractmethod def map_data_set(self, values: list[NBusDataValue]) -> list[int]: """ Convert data to its binary range for cmd_set_data(). :param values: a list of values in engineering range to be converted :return: a list of converted values in binary format """ pass """ ================================================================================================================ Device Get Commands ================================================================================================================ """ def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue: """ Get single sensor parameter. :param parameter: parameter id :return: parameter value """ # get response resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_PARAM, bytearray([parameter.value])) # parse parameter param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0] param_val = self.map_parameter_get(param_id, param_val_raw) # store parameter value self.__params[param_id] = param_val return param_val def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]: """ Get all sensor parameters. :return: dict of parameter id and parameter value """ # get response resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_PARAM, bytearray([])) # parse parameters params_raw = NbusCommonParser.parameters_from_response(resp_len, response) for param_id, param_val_raw in params_raw: param_val = self.map_parameter_get(param_id, param_val_raw) # store parameters self.__params[param_id] = param_val return self.__params.copy() def cmd_get_data(self) -> list[NBusDataValue]: """ Get data from sensor. :raises :return: dict of device addresses and data values """ if self.__data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) if not self.data_parameters_loaded(): raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED) _, *resp = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA, bytearray([])) values, _ = NbusCommonParser.data_from_response(self.data_format, resp) return self.map_data_get(values) def cmd_get_sensor_type(self) -> NBusSensorType: """ Get sensor type. :return: sensor type """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([])) return NBusSensorType(response[0]) def cmd_get_format(self): """ Get sensor format. :return: sensor format """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT, bytearray([])) self.data_format = NbusCommonParser.format_from_response(response) return self.data_format """ ================================================================================================================ Device Set Commands ================================================================================================================ """ def cmd_set_find(self, enable: bool) -> NBusStatusType: """ Turn on/off physical indicator of device. :param enable: to start (True) / stop (False) finding :return: status """ _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_FIND, bytearray([enable])) return NBusStatusType(response[0]) def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType: """ Set sensor parameter. :param param: parameter ID :param value: parameter value :return: status """ # create request packet param_id_raw = struct.pack("B", param.value) param_val_raw = struct.pack(" dict[NBusParameterID, NBusStatusType]: """ Set multiple sensor parameters. :param params: parameters :return: dict od statuses """ # scale parameters for p_id in params.keys(): params[p_id] = self.map_parameter_set(p_id, params[p_id]) # create request packet param_bytes = NbusCommonParser.parameters_to_request(params) # send request resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_PARAM, bytearray(param_bytes)) # parse statuses statuses = {} for i in range(0, resp_length - 1, 2): p_id = NBusParameterID(response[i]) if response[i + 1] == NBusStatusType.STATUS_SUCCESS: self.__params[p_id] = params[p_id] # if success, store param statuses[p_id] = NBusStatusType(response[i + 1]) return statuses def cmd_set_calibrate(self) -> NBusStatusType: """ Send calibration command. :return: calibration status """ resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_CALIBRATE, bytearray([])) return NBusStatusType(response[0]) def cmd_set_data(self, data: list[NBusDataValue]) -> NBusStatusType: """ Set data to read-write sensor. :param data: data to set :raises: NBusErrorAPIType :return: operation status """ if self.__data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) if not self.data_parameters_loaded(): raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED) # create request packet request = [] # transform data raw_data = self.map_data_set(data) request.append(self.__address) request.extend(NbusCommonParser.data_to_request(self.data_format, raw_data)) # send request resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_SET_DATA, bytearray(request)) # return response status return NBusStatusType(response[1])