Browse Source

+ Redefined core datatypes and typechecking

DLIMIKO 1 year ago
parent
commit
cff4676287

+ 11 - 6
main.py

@@ -1,10 +1,13 @@
-from nbus_hal.serial_port import *
+from nbus_hal.nbus_serial.serial_port import *
+from nbus_api.nbus_slave_module import NBusSlaveModule
+from nbus_types.nbus_address_type import NBusModuleAddress
+from nbus_hal.nbus_serial.serial_types import *
 
 # example config
 config = {
     "port_name": "COM4",
-    "baud": 115200,
-    "parity": "N",
+    "baud": NBusBaudrate.SPEED_921600,
+    "parity": NBusParity.NONE,
     "timeout": 0.4,
     "request_attempts": 1,
     "enable_log": True
@@ -13,10 +16,12 @@ config = {
 if __name__ == "__main__":
 
     try:
+
+
+
         port = NBusSerialPort(NBusSerialConfig(**config))
-        port.set_logger(default_logger)
-        response = port.request(5, 1, NBusCommand.CMD_ECHO, [1, 2, 3])
-        print(response)
+        nbus = NBusSlaveModule(port, NBusModuleAddress(5))
+        print(nbus.cmd_get_echo_module(bytearray([1,2,3])))
 
     except Exception as Ex:
         print(str(Ex))

+ 92 - 0
nbus_api/nbus_slave_device.py

@@ -0,0 +1,92 @@
+from abc import abstractmethod, ABCMeta
+from nbus_types.nbus_data_fomat import *
+from nbus_types.nbus_parameter_type import *
+from nbus_types.nbus_address_type import NBusDeviceAddress
+
+
+@beartype
+class NBusSlaveDevice(metaclass=ABCMeta):
+
+    def __init__(self, address: NBusDeviceAddress):
+        self.__address = address
+        self.__data_format = None
+        self.__parameters = None
+
+
+    @property
+    def address(self):
+        return self.__address
+
+    @property
+    def data_format(self):
+        return self.__data_format
+
+    @data_format.setter
+    def data_format(self, data_format: NBusDataFormat) -> None:
+        self.__data_format = data_format
+
+    @property
+    def parameters(self):
+        return self.__parameters
+
+    @parameters.setter
+    def parameters(self, values):
+        self.__parameters = values
+
+    """
+    ================================================================================================================
+                                                        Device Interface
+    ================================================================================================================
+    """
+
+    @abstractmethod
+    def data_params_loaded(self) -> bool:
+        """
+        Verify that all necessary parameters are loaded
+        before performing data get/set conversion.
+
+        :return: true if ready for conversion, otherwise False
+        """
+        pass
+
+    @abstractmethod
+    def map_param_get(self, param_id: NBusParameterType, param_value: int) -> NBusParameterValue:
+        """
+        Convert a parameter from cmd_get_param() to its engineering range.
+
+        :param param_id: the ID of the parameter
+        :param param_value: the value of the parameter in binary format
+        :return: the converted parameter value in engineering units
+        """
+        pass
+
+    @abstractmethod
+    def map_param_set(self, param_id: NBusParameterType, param_value: NBusParameterValue) -> int:
+        """
+        Convert a parameter to its binary range for cmd_set_data().
+
+        :param param_id: the ID of the parameter
+        :param param_value: the value of the parameter in engineering units
+        :return: the converted parameter value in binary format
+        """
+        pass
+
+    @abstractmethod
+    def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
+        """
+        Convert data from cmd_get_data() to its engineering range.
+
+        :param values: a list of values in binary format to be converted
+        :return: a list of converted values in engineering units
+        """
+        pass
+
+    @abstractmethod
+    def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
+        """
+        Convert data to its binary range for cmd_set_data().
+
+        :param values: a list of values in engineering range to be converted
+        :return: a list of converted values in binary format
+        """
+        pass

+ 22 - 0
nbus_api/nbus_slave_module.py

@@ -0,0 +1,22 @@
+from nbus_hal.nbus_serial.serial_port import *
+from nbus_types.nbus_address_type import NBusModuleAddress
+
+
+@beartype
+class NBusSlaveModule:
+
+    def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
+        self.__port = serial_port
+        self.__module_addr = module_address
+        self.__params = {}
+        self.__devices = {}
+
+    def cmd_get_echo_module(self, message: bytearray) -> bool:
+        _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_ECHO, message)
+        return response == list(message)
+
+
+
+
+
+

+ 77 - 0
nbus_hal/nbus_generic_port.py

@@ -0,0 +1,77 @@
+from abc import ABCMeta, abstractmethod
+from typing import Annotated
+from beartype import beartype
+from beartype.vale import Is
+from nbus_types.nbus_address_type import NBusModuleAddress, NBusDeviceAddress
+from nbus_types.nbus_command_type import NBusCommand
+
+
+"""
+NBus delay typedef
+"""
+NBusDelay = Annotated[float, Is[lambda s: s >= 0]]
+
+
+@beartype
+class NBusPort(metaclass=ABCMeta):
+    """
+    Class for generic NBus communication port.
+    """
+
+    @abstractmethod
+    def open(self) -> None:
+        """
+        Open communication port.
+        """
+        pass
+
+    @abstractmethod
+    def close(self) -> None:
+        """
+        Close communication port.
+        """
+        pass
+
+    @abstractmethod
+    def is_connected(self) -> bool:
+        """
+        Return connection status.
+        :return: status (1 = connected, 0 = not connected)
+        """
+        pass
+
+    @abstractmethod
+    def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
+        """
+        Make broadcast request to nbus network.
+        :param command: command id
+        :param data: command data to send
+        """
+        pass
+
+    @abstractmethod
+    def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
+                       long_answer: NBusDelay = 0.0) -> bytearray:
+        """
+        Make module request to nbus network.
+        :param module_addr: address of module
+        :param command: command id
+        :param data: command data to send
+        :param long_answer: delay in s for longer answer
+        :return: | payload length | payload |
+        """
+        pass
+
+    @abstractmethod
+    def request_device(self, module_addr: NBusModuleAddress, device_address: NBusDeviceAddress, command: NBusCommand,
+                       data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
+        """
+        Make device request to nbus network.
+        :param module_addr: address of module
+        :param device_address: address of device
+        :param command: command id
+        :param data: command data to send
+        :param long_answer: delay in s for longer answer
+        :return: | payload length | payload |
+        """
+        pass

+ 50 - 0
nbus_hal/nbus_serial/serial_config.py

@@ -0,0 +1,50 @@
+from dataclasses import dataclass
+from enum import Enum
+from typing import Annotated
+from beartype import beartype
+from beartype.vale import Is
+
+
+class NBusBaudrate(Enum):
+    """
+    Enum class for serial baudrate speed.
+    """
+    SPEED_9600 = 9600
+    SPEED_19200 = 19200
+    SPEED_38400 = 38400
+    SPEED_57600 = 7600
+    SPEED_115200 = 115200
+    SPEED_230400 = 230400
+    SPEED_460800 = 460800
+    SPEED_576000 = 576000
+    SPEED_921600 = 921600
+
+
+class NBusParity(Enum):
+    """
+    Enum class for serial parity.
+    """
+    NONE = "N"
+    ODD = "O"
+    EVEN = "E"
+
+
+@beartype
+@dataclass(frozen=True)
+class NBusSerialConfig:
+    """
+    Configuration of serial port.
+
+    :ivar port_name: The serial port identifier.
+    :ivar baud: The baud rate for the serial communication.
+    :ivar parity: The parity bit setting for the serial communication.
+    :ivar timeout: The timeout value for the serial communication.
+    :ivar request_attempts: The number of attempts for a request.
+    :ivar enable_log: Flag to enable or disable logging.
+    """
+    port_name: str
+    baud: NBusBaudrate
+    parity: NBusParity
+    timeout: Annotated[float, Is[lambda value: value > 0]]
+    request_attempts: Annotated[int, Is[lambda value: value > 0]]
+    enable_log: bool

+ 59 - 38
nbus_hal/serial_port.py → nbus_hal/nbus_serial/serial_port.py

@@ -1,11 +1,15 @@
 import time
 import serial
-from pydantic import validate_call
 from typing import Any, Callable
-from nbus_types.nbus_serial_config import NBusSerialConfig
+
+from beartype import beartype
+
 from nbus_types.nbus_command_type import *
 from nbus_types.nbus_exceptions.nbus_network_exception import *
 from nbus_types.nbus_exceptions.nbus_node_exception import *
+from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
+from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
+from nbus_types.nbus_address_type import NBusModuleAddress, NBusDeviceAddress
 from nbus_hal.crc8 import crc8
 
 
@@ -24,17 +28,16 @@ def default_logger(*message: Any) -> None:
     print()
 
 
-class NBusSerialPort:
+@beartype
+class NBusSerialPort(NBusPort):
     """
     Class representing nBus serial port.
     """
-
-    @validate_call
     def __init__(self, config: NBusSerialConfig):
         self._port = serial.Serial(timeout=config.timeout)
         self._port.port = config.port_name
-        self._port.parity = config.parity
-        self._port.baudrate = config.baud
+        self._port.parity = config.parity.value
+        self._port.baudrate = config.baud.value
         self._logger_cb = default_logger
         self._enable_log = config.enable_log
         self._request_attempts = config.request_attempts
@@ -48,7 +51,6 @@ class NBusSerialPort:
     def open(self) -> None:
         """
         Open port.
-        :return: None
         """
         self._port.open()
         self._port.flush()
@@ -57,7 +59,6 @@ class NBusSerialPort:
     def close(self) -> None:
         """
         Close port.
-        :return: None
         """
         self._log('i', 0, 'Close communication port')
         self._port.close()
@@ -65,69 +66,90 @@ class NBusSerialPort:
     def is_connected(self) -> bool:
         """
         Return connection status.
-        :return: True = connected | False = not connected
+        :return: status (1 = connected, 0 = not connected)
         """
         return self._port.is_open
 
-    @validate_call(validate_return=True)
     def set_logger(self, callback: Callable[[Any], None]) -> None:
         """
         Set logger function.
         :param callback: logging callback
-        :return: None
         """
         self._logger_cb = callback
 
-    @validate_call
-    def request_broadcast(self, command: NBusCommand, data: list[int]) -> None:
+    def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
         """
         Make broadcast request to nbus network.
         :param command: command id
         :param data: command data to send
-        :return: None
         """
         request = self._create_packet([0, 0, 0, command.value], data)
         self._log("\tBRQ>", request)
         self._port.write(request)  # send message
 
-    @validate_call(validate_return=True)
-    def request(self, module: int, sensor: int, command: NBusCommand, data: list[int], long_answer: float = 0) -> list[int]:
+    def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
+                       long_answer: NBusDelay = 0.0) -> bytearray:
         """
-        Make request to nbus node.
+        Make module request to nbus network.
+        :param module_addr: address of module
+        :param command: command id
+        :param data: command data to send
+        :param long_answer: delay in s for longer answer
+        :return: | payload length | payload |
+        """
+        return self._request_response(module_addr.value, 0, command.value, data, long_answer)
+
+    def request_device(self, module_addr: NBusModuleAddress, device_address: NBusDeviceAddress,
+                       command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
+        """
+        Make device request to nbus network.
+        :param module_addr: address of module
+        :param device_address: address of device
+        :param command: command id
+        :param data: command data to send
+        :param long_answer: delay in s for longer answer
+        :return: | payload length | payload |
+        """
+        return self._request_response(module_addr.value, device_address.value, command.value, data, long_answer)
+
+    """
+    ================================================================================================================
+                                                Internal Methods
+    ================================================================================================================
+    """
+
+    def _request_response(self, module: int, sensor: int, command, data: bytearray,
+                          long_answer: float = 0) -> bytearray:
+        """
+        Make request to nbus node and receive response.
         :param module: module address
         :param sensor: sensor address
         :param command: command id
         :param data: command data to send
         :param long_answer: timeout for long answer
-        :return: response
+        :return: response length | response
         """
-        request = self._create_packet([0, module, sensor, command.value], data)   # create request
-        self._log('d', sensor, "\tRQ>", request)                                  # log request
+        request = self._create_packet(bytearray([0, module, sensor, command]), data)  # create request
+        self._log('d', sensor, "\tRQ>", request)  # log request
 
-        counter = 0                                                               # err. trials
+        counter = 0  # err. trials
 
-        while True:                                                               # try to communicate
+        while True:  # try to communicate
             self._port.write(request)
 
-            if long_answer > 0:                                                   # wait for long answer
+            if long_answer > 0:  # wait for long answer
                 time.sleep(long_answer)
 
             try:
                 return self._receive_payload()
 
-            except NBusErrorNetwork as Ex:                                        # if network error, try to reconnect
+            except NBusErrorNetwork as Ex:  # if network error, try to reconnect
                 counter += 1
 
                 if counter > self._request_attempts:
-                    raise Ex                                                     # if out of trials, propagate exception
-
-    """
-    ================================================================================================================
-                                                Internal Methods
-    ================================================================================================================
-    """
+                    raise Ex  # if out of trials, propagate exception
 
-    def _create_packet(self, start: list[int], data: list[int]) -> list[int]:
+    def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
         """
         Create packet to send and prepare port.
         :param start: head of message
@@ -148,10 +170,10 @@ class NBusSerialPort:
         start.append(crc_sum)
         return start
 
-    def _receive_payload(self) -> list[int]:
+    def _receive_payload(self) -> bytearray:
         """
         Read response from serial.
-        :return: payload
+        :return: | payload len | payload
         """
         # read data length
         response_l = self._port.read(1)
@@ -169,19 +191,18 @@ class NBusSerialPort:
         if response[-1] != crc8(response[:-1]):
             raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
 
-        self._log('d', 0, "\tRS>", response_l, response)   # log response
+        self._log('d', 0, "\tRS>", [response_l] + list(response))   # log response
 
         # check for node error
         if response[2] & NBUS_ERR_BIT:
             raise NBusErrorNode(NBusErrorNodeType(response[3]))
 
-        return [response_l - 4] + list(response)[3:-1]   # return payload length + payload
+        return bytearray([response_l - 4]) + bytearray(response[3:-1])   # return payload length + payload
 
     def _log(self, *message: Any) -> None:
         """
         Log request/response.
         :param message: message to log
-        :return: processed log
         """
         if self._enable_log:
             self._logger_cb(*message)

+ 39 - 0
nbus_types/nbus_address_type.py

@@ -0,0 +1,39 @@
+from enum import Enum
+from typing import Annotated
+from beartype import beartype
+from beartype.vale import Is
+
+
+@beartype
+class NBusModuleAddress:
+    """
+    Class for NBus module address type.
+    """
+    def __init__(self, address: Annotated[int, Is[lambda x: 1 <= x <= 127]]):
+        self.__addr = address
+
+    @property
+    def value(self):
+        return self.__addr
+
+
+class NBusDeviceIO(Enum):
+    """
+    Enum class for device data direction.
+    """
+    OUTPUT = 0
+    INPUT = 1
+
+
+@beartype
+class NBusDeviceAddress:
+    """
+    Class for NBus device address.
+    """
+    def __init__(self, io: NBusDeviceIO, address: Annotated[int, Is[lambda value: 1 <= value < 31]]):
+        self.__io = io
+        self.__addr = address
+
+    @property
+    def value(self):
+        return self.__addr + 128 * self.__io.value

+ 15 - 8
nbus_types/nbus_command_type.py

@@ -1,18 +1,11 @@
 from enum import Enum
 
 
-class NBusCommandPrefix(Enum):
-    """
-    Enum class for nBus command prefixes.
-    """
-    SET = 0x20
-    GET = 0x00
-
-
 class NBusCommand(Enum):
     """
     Enum class for valid nBus commands.
     """
+    # get
     CMD_ECHO = 0x01
     CMD_STOP = 0x02
     CMD_START = 0x03
@@ -27,3 +20,17 @@ class NBusCommand(Enum):
     CMD_SYNC = 0x0C
     CMD_SENSOR_TYPE = 0x0D
     CMD_INFO = 0x0E
+
+    # set
+    CMD_PARAM_SET = 0x24
+    CMD_CALIBRATE_SET = 0x28
+    CMD_STORE_SET = 0x2A
+    CMD_DATA_SET = 0x2B
+
+
+class NBusInfoParam(Enum):
+    """
+    Enum class for info command parameters.
+    """
+    INFO_GENERAL = 0xE1
+    INFO_FORMAT = 0xE2

+ 17 - 7
nbus_types/nbus_data_fomat.py

@@ -1,7 +1,17 @@
-from pydantic import BaseModel, Field
+from beartype import beartype
+from beartype.vale import Is
+from dataclasses import dataclass
+from typing import Union, Annotated
 
+"""
+Type for data value.
+"""
+NBusDataValue = Union[int, float]
 
-class NBusDataFormat(BaseModel):
+
+@beartype
+@dataclass(frozen=True)
+class NBusDataFormat:
     """
     Class for data format.
 
@@ -11,8 +21,8 @@ class NBusDataFormat(BaseModel):
     :ivar byte_length: number of bytes per value
     :ivar samples: number of values
     """
-    sign: bool = Field(frozen=True)
-    unit_multiplier: int = Field(frozen=True)
-    value_multiplier: int = Field(frozen=True)
-    byte_length: int = Field(frozen=True)
-    samples: int = Field(frozen=True)
+    sign: bool
+    unit_multiplier: Annotated[int, Is[lambda value: -64 <= value <= 63]]
+    value_multiplier: Annotated[int, Is[lambda value: -128 <= value <= 127]]
+    byte_length: Annotated[int, Is[lambda value: 1 <= value <= 8]]
+    samples: Annotated[int, Is[lambda value: 1 <= value <= 16]]

+ 5 - 6
nbus_types/nbus_parameter_type.py

@@ -1,6 +1,11 @@
 from enum import Enum
 from typing import Union
 
+"""
+Typedef for nBus parameter value.
+"""
+NBusParameterValue = Union[float, int]
+
 
 class NBusParameterType(Enum):
     """
@@ -15,9 +20,3 @@ class NBusParameterType(Enum):
     PARAM_RANGE = 5
     PARAM_RANGE0 = 6
     PARAM_FILTER = 7
-
-
-"""
-Typedef for nBus parameter value.
-"""
-NBusParameterValue = Union[float, int]

+ 0 - 20
nbus_types/nbus_serial_config.py

@@ -1,20 +0,0 @@
-from pydantic import BaseModel, Field
-
-
-class NBusSerialConfig(BaseModel):
-    """
-    Configuration of nBus serial port.
-
-    :ivar port_name: The serial port identifier.
-    :ivar baud: The baud rate for the serial communication.
-    :ivar parity: The parity bit setting for the serial communication.
-    :ivar timeout: The timeout value for the serial communication.
-    :ivar request_attempts: The number of attempts for a request.
-    :ivar enable_log: Flag to enable or disable logging.
-    """
-    port_name: str = Field(frozen=True)
-    baud: int = Field(frozen=True)
-    parity: str = Field(frozen=True)
-    timeout: float = Field(frozen=True)
-    request_attempts: int = Field(frozen=True)
-    enable_log: bool = Field(frozen=True)