| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- import struct
- from abc import abstractmethod, ABCMeta
- from typing import Tuple
- from nbus_hal.nbus_generic_port import NBusPort
- from nbus_types.nbus_command_type import NBusCommand
- from nbus_types.nbus_data_fomat import *
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_parameter_type import *
- from nbus_types.nbus_address_type import NBusDeviceAddress, NBusModuleAddress
- from nbus_types.nbus_sensor_type import NBusSensorType
- from nbus_api.nbus_common_parser import NbusCommonParser
- @beartype
- class NBusSlaveDevice(metaclass=ABCMeta):
- def __init__(self, address: NBusDeviceAddress):
- self.__address = address
- self.__module_address = None
- self.__port = None
- self.__data_format = None
- self.__parameters = {}
- @property
- def address(self):
- return self.__address
- @property
- def data_format(self):
- return self.__data_format
- @data_format.setter
- def data_format(self, data_format: NBusDataFormat) -> None:
- self.__data_format = data_format
- @property
- def parameters(self):
- return self.__parameters
- @parameters.setter
- def parameters(self, values):
- self.__parameters = values
- def set_parent_module_address(self, address: NBusModuleAddress):
- self.__module_address = address
- def set_device_port(self, port: NBusPort):
- self.__port = port
- """
- ================================================================================================================
- Abstract Methods
- ================================================================================================================
- """
- @abstractmethod
- def data_parameters_loaded(self) -> bool:
- """
- Verify that all necessary parameters are loaded
- before performing data get/set conversion.
- :return: true if ready for conversion, otherwise False
- """
- pass
- @abstractmethod
- def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue:
- """
- Convert a parameter from cmd_get_param() to its engineering range.
- :param param_id: the ID of the parameter
- :param param_value: the value of the parameter in binary format
- :return: the converted parameter value in engineering units
- """
- pass
- @abstractmethod
- def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int:
- """
- Convert a parameter to its binary range for cmd_set_data().
- :param param_id: the ID of the parameter
- :param param_value: the value of the parameter in engineering units
- :return: the converted parameter value in binary format
- """
- pass
- @abstractmethod
- def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
- """
- Convert data from cmd_get_data() to its engineering range.
- :param values: a list of values in binary format to be converted
- :return: a list of converted values in engineering units
- """
- pass
- @abstractmethod
- def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
- """
- Convert data to its binary range for cmd_set_data().
- :param values: a list of values in engineering range to be converted
- :return: a list of converted values in binary format
- """
- pass
- """
- ================================================================================================================
- Device Get Commands
- ================================================================================================================
- """
- def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
- # get response
- resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
- NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
- param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
- param_val = self.map_parameter_get(param_id, param_val_raw)
- self.__parameters[param_id] = param_val
- return param_id, param_val
- def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
- resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
- NBusCommand.CMD_GET_PARAM, bytearray([]))
- params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
- params = []
- for param_id, param_val_raw in params_raw:
- param_val = self.map_parameter_get(param_id, param_val_raw)
- params.append((param_id, param_val))
- self.__parameters[param_id] = param_val
- return params
- def cmd_get_data(self):
- _, *resp = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA, bytearray([]))
- values, _ = NbusCommonParser.data_from_response(self, resp)
- return values
- def cmd_get_sensor_type(self):
- _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_SENSOR_TYPE,
- bytearray([]))
- return NBusSensorType(response[0])
- def cmd_get_format(self):
- _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT,
- bytearray([]))
- self.data_format = NbusCommonParser.format_from_response(response)
- return self.data_format
- """
- ================================================================================================================
- Device Set Commands
- ================================================================================================================
- """
- def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> None:
- raw_value = self.map_parameter_set(param, value) # get raw value
- value_bytes = struct.pack('<I', raw_value & 0xFFFFFFFF) # create bytes
- self.__port.request_device(self.__module_address, self.__address, # proceed request
- NBusCommand.CMD_SET_PARAM, bytearray([param.value, *value_bytes]))
- self.__parameters[param] = value # store new value of parameter
- def cmd_set_data(self, data: list[NBusDataValue]):
- if self.__data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- if not self.data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
- binary_data = self.map_data_set(data)
- _, *resp = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_SET_DATA, bytearray(binary_data))
- return resp
|