| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- import struct
- from abc import abstractmethod, ABCMeta
- from nbus_hal.nbus_generic_port import NBusPort
- from nbus_types.nbus_command_type import NBusCommand
- from nbus_types.nbus_data_fomat import *
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_parameter_type import *
- from nbus_types.nbus_address_type import NBusSensorAddress, NBusModuleAddress
- from nbus_types.nbus_sensor_type import NBusSensorType
- from nbus_api.nbus_common_parser import NbusCommonParser
- from nbus_types.nbus_status_type import NBusStatusType
- @beartype
- class NBusSensor(metaclass=ABCMeta):
- """
- Class representing nBus sensor type.
- """
- def __init__(self, address: NBusSensorAddress):
- """
- Constructor.
- :param address: device address
- """
- self.__address = address
- self.__module_address = None
- self.__port = None
- self.__data_format = None
- self.__params = {}
- """
- ================================================================================================================
- Public Fields
- ================================================================================================================
- """
- @property
- def address(self):
- """
- Get sensor address.
- :return: sensor address
- """
- return self.__address
- @property
- def data_format(self):
- """
- Get data format.
- :return: data format
- """
- return self.__data_format
- @data_format.setter
- def data_format(self, data_format: NBusDataFormat):
- """
- Set data format.
- :param data_format: format of data
- """
- self.__data_format = data_format
- @property
- def parameters(self) -> dict[NBusParameterID, NBusParameterValue]:
- """
- Get parameters.
- :return: parameters
- """
- return self.__params
- @parameters.setter
- def parameters(self, values: dict[NBusParameterID, NBusParameterValue]):
- """
- Set parameters.
- :param values: values to set
- """
- self.__params = values
- """
- ================================================================================================================
- Module-only Methods
- ================================================================================================================
- """
- def set_parent_module_address(self, address: NBusModuleAddress) -> None:
- """
- Set address of parent module.
- :param address: module address
- """
- self.__module_address = address
- def set_device_port(self, port: NBusPort) -> None:
- """
- Set device communication port.
- :param port: communicaiton port
- """
- self.__port = port
- """
- ================================================================================================================
- Abstract Methods
- ================================================================================================================
- """
- @abstractmethod
- def data_parameters_loaded(self) -> bool:
- """
- Verify that all necessary parameters are loaded
- before performing data get/set conversion.
- :return: true if ready for conversion, otherwise False
- """
- pass
- @abstractmethod
- def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue:
- """
- Convert a parameter from cmd_get_param() to its engineering range.
- :param param_id: the ID of the parameter
- :param param_value: the value of the parameter in binary format
- :return: the converted parameter value in engineering units
- """
- pass
- @abstractmethod
- def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int:
- """
- Convert a parameter to its binary range for cmd_set_data().
- :param param_id: the ID of the parameter
- :param param_value: the value of the parameter in engineering units
- :return: the converted parameter value in binary format
- """
- pass
- @abstractmethod
- def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
- """
- Convert data from cmd_get_data() to its engineering range.
- :param values: a list of values in binary format to be converted
- :return: a list of converted values in engineering units
- """
- pass
- @abstractmethod
- def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
- """
- Convert data to its binary range for cmd_set_data().
- :param values: a list of values in engineering range to be converted
- :return: a list of converted values in binary format
- """
- pass
- """
- ================================================================================================================
- Device Get Commands
- ================================================================================================================
- """
- def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
- """
- Get single sensor parameter.
- :param parameter: parameter id
- :return: parameter value
- """
- # get response
- resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
- # parse parameter
- param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
- param_val = self.map_parameter_get(param_id, param_val_raw)
- # store parameter value
- self.__params[param_id] = param_val
- return param_val
- def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
- """
- Get all sensor parameters.
- :return: dict of parameter id and parameter value
- """
- # get response
- resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_GET_PARAM, bytearray([]))
- # parse parameters
- params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
- for param_id, param_val_raw in params_raw:
- param_val = self.map_parameter_get(param_id, param_val_raw)
- # store parameters
- self.__params[param_id] = param_val
- return self.__params.copy()
- def cmd_get_data(self) -> list[NBusDataValue]:
- """
- Get data from sensor.
- :raises
- :return: dict of device addresses and data values
- """
- if self.__data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- if not self.data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
- _, *resp = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA,
- bytearray([]))
- values, _ = NbusCommonParser.data_from_response(self.data_format, resp)
- return self.map_data_get(values)
- def cmd_get_sensor_type(self) -> NBusSensorType:
- """
- Get sensor type.
- :return: sensor type
- """
- _, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([]))
- return NBusSensorType(response[0])
- def cmd_get_format(self):
- """
- Get sensor format.
- :return: sensor format
- """
- _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT,
- bytearray([]))
- self.data_format = NbusCommonParser.format_from_response(response)
- return self.data_format
- """
- ================================================================================================================
- Device Set Commands
- ================================================================================================================
- """
- def cmd_set_find(self, enable: bool) -> NBusStatusType:
- """
- Turn on/off physical indicator of device.
- :param enable: to start (True) / stop (False) finding
- :return: status
- """
- _, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_SET_FIND, bytearray([enable]))
- return NBusStatusType(response[0])
- def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
- """
- Set sensor parameter.
- :param param: parameter ID
- :param value: parameter value
- :return: status
- """
- # create request packet
- param_id_raw = struct.pack("B", param.value)
- param_val_raw = struct.pack("<I", self.map_parameter_set(param, value))
- param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
- # proceed request
- _, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_SET_PARAM, param_bytes)
- # if response is valid, store parameter
- if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
- self.__params[param] = value
- return NBusStatusType(response[1])
- def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
- -> dict[NBusParameterID, NBusStatusType]:
- """
- Set multiple sensor parameters.
- :param params: parameters
- :return: dict od statuses
- """
- # scale parameters
- for p_id in params.keys():
- params[p_id] = self.map_parameter_set(p_id, params[p_id])
- # create request packet
- param_bytes = NbusCommonParser.parameters_to_request(params)
- # send request
- resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_SET_PARAM, bytearray(param_bytes))
- # parse statuses
- statuses = {}
- for i in range(0, resp_length - 1, 2):
- p_id = NBusParameterID(response[i])
- if response[i + 1] == NBusStatusType.STATUS_SUCCESS:
- self.__params[p_id] = params[p_id] # if success, store param
- statuses[p_id] = NBusStatusType(response[i + 1])
- return statuses
- def cmd_set_calibrate(self) -> NBusStatusType:
- """
- Send calibration command.
- :return: calibration status
- """
- resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
- return NBusStatusType(response[0])
- def cmd_set_data(self, data: list[NBusDataValue]) -> NBusStatusType:
- """
- Set data to read-write sensor.
- :param data: data to set
- :raises: NBusErrorAPIType
- :return: operation status
- """
- if self.__data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- if not self.data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
- # create request packet
- request = []
- # transform data
- raw_data = self.map_data_set(data)
- request.append(self.__address)
- request.extend(NbusCommonParser.data_to_request(self.data_format, raw_data))
- # send request
- resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
- NBusCommand.CMD_SET_DATA, bytearray(request))
- # return response status
- return NBusStatusType(response[1])
|