Browse Source

+ First commit, added nbus types and serial port.

DLIMIKO 1 year ago
commit
a5f1062f5b

+ 8 - 0
.idea/.gitignore

@@ -0,0 +1,8 @@
+# Default ignored files
+/shelf/
+/workspace.xml
+# Editor-based HTTP Client requests
+/httpRequests/
+# Datasource local storage ignored files
+/dataSources/
+/dataSources.local.xml

+ 6 - 0
.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+    <inspection_tool class="PyTypeCheckerInspection" enabled="false" level="WARNING" enabled_by_default="false" />
+  </profile>
+</component>

+ 6 - 0
.idea/inspectionProfiles/profiles_settings.xml

@@ -0,0 +1,6 @@
+<component name="InspectionProjectProfileManager">
+  <settings>
+    <option name="USE_PROJECT_PROFILE" value="false" />
+    <version value="1.0" />
+  </settings>
+</component>

+ 4 - 0
.idea/misc.xml

@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (nbus_api)" project-jdk-type="Python SDK" />
+</project>

+ 8 - 0
.idea/modules.xml

@@ -0,0 +1,8 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ProjectModuleManager">
+    <modules>
+      <module fileurl="file://$PROJECT_DIR$/.idea/nbus_api.iml" filepath="$PROJECT_DIR$/.idea/nbus_api.iml" />
+    </modules>
+  </component>
+</project>

+ 13 - 0
.idea/nbus_api.iml

@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+  <component name="NewModuleRootManager">
+    <content url="file://$MODULE_DIR$">
+      <excludeFolder url="file://$MODULE_DIR$/im" />
+    </content>
+    <orderEntry type="inheritedJdk" />
+    <orderEntry type="sourceFolder" forTests="false" />
+  </component>
+  <component name="PackageRequirementsSettings">
+    <option name="requirementsPath" value="" />
+  </component>
+</module>

+ 5 - 0
README.md

@@ -0,0 +1,5 @@
+# nBus Client API 
+
+This project provides a Python client API for the nBus protocol over a serial port. 
+It offers a high-level, strictly object-oriented approach, ensuring type safety, clear error handling
+and maintainability.

+ 22 - 0
main.py

@@ -0,0 +1,22 @@
+from nbus_hal.serial_port import *
+
+# example config
+config = {
+    "port_name": "COM4",
+    "baud": 115200,
+    "parity": "N",
+    "timeout": 0.4,
+    "request_attempts": 1,
+    "enable_log": True
+}
+
+if __name__ == "__main__":
+
+    try:
+        port = NBusSerialPort(NBusSerialConfig(**config))
+        port.set_logger(default_logger)
+        response = port.request(5, 1, NBusCommand.CMD_ECHO, [1, 2, 3])
+        print(response)
+
+    except Exception as Ex:
+        print(str(Ex))

+ 37 - 0
nbus_hal/crc8.py

@@ -0,0 +1,37 @@
+crc8_table = [
+    0x00, 0x07, 0x0E, 0x09, 0x1C, 0x1B, 0x12, 0x15, 0x38, 0x3F, 0x36, 0x31, 0x24, 0x23, 0x2A, 0x2D,
+    0x70, 0x77, 0x7E, 0x79, 0x6C, 0x6B, 0x62, 0x65, 0x48, 0x4F, 0x46, 0x41, 0x54, 0x53, 0x5A, 0x5D,
+    0xE0, 0xE7, 0xEE, 0xE9, 0xFC, 0xFB, 0xF2, 0xF5, 0xD8, 0xDF, 0xD6, 0xD1, 0xC4, 0xC3, 0xCA, 0xCD,
+    0x90, 0x97, 0x9E, 0x99, 0x8C, 0x8B, 0x82, 0x85, 0xA8, 0xAF, 0xA6, 0xA1, 0xB4, 0xB3, 0xBA, 0xBD,
+    0xC7, 0xC0, 0xC9, 0xCE, 0xDB, 0xDC, 0xD5, 0xD2, 0xFF, 0xF8, 0xF1, 0xF6, 0xE3, 0xE4, 0xED, 0xEA,
+    0xB7, 0xB0, 0xB9, 0xBE, 0xAB, 0xAC, 0xA5, 0xA2, 0x8F, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9D, 0x9A,
+    0x27, 0x20, 0x29, 0x2E, 0x3B, 0x3C, 0x35, 0x32, 0x1F, 0x18, 0x11, 0x16, 0x03, 0x04, 0x0D, 0x0A,
+    0x57, 0x50, 0x59, 0x5E, 0x4B, 0x4C, 0x45, 0x42, 0x6F, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7D, 0x7A,
+    0x89, 0x8E, 0x87, 0x80, 0x95, 0x92, 0x9B, 0x9C, 0xB1, 0xB6, 0xBF, 0xB8, 0xAD, 0xAA, 0xA3, 0xA4,
+    0xF9, 0xFE, 0xF7, 0xF0, 0xE5, 0xE2, 0xEB, 0xEC, 0xC1, 0xC6, 0xCF, 0xC8, 0xDD, 0xDA, 0xD3, 0xD4,
+    0x69, 0x6E, 0x67, 0x60, 0x75, 0x72, 0x7B, 0x7C, 0x51, 0x56, 0x5F, 0x58, 0x4D, 0x4A, 0x43, 0x44,
+    0x19, 0x1E, 0x17, 0x10, 0x05, 0x02, 0x0B, 0x0C, 0x21, 0x26, 0x2F, 0x28, 0x3D, 0x3A, 0x33, 0x34,
+    0x4E, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5C, 0x5B, 0x76, 0x71, 0x78, 0x7F, 0x6A, 0x6D, 0x64, 0x63,
+    0x3E, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2C, 0x2B, 0x06, 0x01, 0x08, 0x0F, 0x1A, 0x1D, 0x14, 0x13,
+    0xAE, 0xA9, 0xA0, 0xA7, 0xB2, 0xB5, 0xBC, 0xBB, 0x96, 0x91, 0x98, 0x9F, 0x8A, 0x8D, 0x84, 0x83,
+    0xDE, 0xD9, 0xD0, 0xD7, 0xC2, 0xC5, 0xCC, 0xCB, 0xE6, 0xE1, 0xE8, 0xEF, 0xFA, 0xFD, 0xF4, 0xF3
+]
+
+
+def crc8(data: list[int]) -> int:
+    """
+    Create 8-bit cyclic redundancy code.
+    :param data: input data
+    :return: crc
+    """
+    crc = 0
+    length = len(data)
+    if length == 0:
+        return 0xff
+    crc &= 0xff
+    i = 0
+    while length > 0:
+        crc = crc8_table[crc ^ data[i]]
+        length = length - 1
+        i = i + 1
+    return crc

+ 187 - 0
nbus_hal/serial_port.py

@@ -0,0 +1,187 @@
+import time
+import serial
+from pydantic import validate_call
+from typing import Any, Callable
+from nbus_types.nbus_serial_config import NBusSerialConfig
+from nbus_types.nbus_command_type import *
+from nbus_types.nbus_exceptions.nbus_network_exception import *
+from nbus_types.nbus_exceptions.nbus_node_exception import *
+from nbus_hal.crc8 import crc8
+
+
+def default_logger(*message: Any) -> None:
+    """
+    Default logger function.
+    :param message: message to log
+    :return: None
+    """
+    for i in range(len(message)):
+        if isinstance(message[i], list):
+            for m in message[i]:
+                print(hex(m), end="|")
+            continue
+        print(message[i], end=" ")
+    print()
+
+
+class NBusSerialPort:
+    """
+    Class representing nBus serial port.
+    """
+
+    @validate_call
+    def __init__(self, config: NBusSerialConfig):
+        self._port = serial.Serial(timeout=config.timeout)
+        self._port.port = config.port_name
+        self._port.parity = config.parity
+        self._port.baudrate = config.baud
+        self._logger_cb = default_logger
+        self._enable_log = config.enable_log
+        self._request_attempts = config.request_attempts
+
+    """
+    ================================================================================================================
+                                                API Methods
+    ================================================================================================================
+    """
+
+    def open(self) -> None:
+        """
+        Open port.
+        :return: None
+        """
+        self._port.open()
+        self._port.flush()
+        self._log('i', 0, 'Open communication port')
+
+    def close(self) -> None:
+        """
+        Close port.
+        :return: None
+        """
+        self._log('i', 0, 'Close communication port')
+        self._port.close()
+
+    def is_connected(self) -> bool:
+        """
+        Return connection status.
+        :return: True = connected | False = not connected
+        """
+        return self._port.is_open
+
+    @validate_call(validate_return=True)
+    def set_logger(self, callback: Callable[[Any], None]) -> None:
+        """
+        Set logger function.
+        :param callback: logging callback
+        :return: None
+        """
+        self._logger_cb = callback
+
+    @validate_call
+    def request_broadcast(self, command: NBusCommand, data: list[int]) -> None:
+        """
+        Make broadcast request to nbus network.
+        :param command: command id
+        :param data: command data to send
+        :return: None
+        """
+        request = self._create_packet([0, 0, 0, command.value], data)
+        self._log("\tBRQ>", request)
+        self._port.write(request)  # send message
+
+    @validate_call(validate_return=True)
+    def request(self, module: int, sensor: int, command: NBusCommand, data: list[int], long_answer: float = 0) -> list[int]:
+        """
+        Make request to nbus node.
+        :param module: module address
+        :param sensor: sensor address
+        :param command: command id
+        :param data: command data to send
+        :param long_answer: timeout for long answer
+        :return: response
+        """
+        request = self._create_packet([0, module, sensor, command.value], data)   # create request
+        self._log('d', sensor, "\tRQ>", request)                                  # log request
+
+        counter = 0                                                               # err. trials
+
+        while True:                                                               # try to communicate
+            self._port.write(request)
+
+            if long_answer > 0:                                                   # wait for long answer
+                time.sleep(long_answer)
+
+            try:
+                return self._receive_payload()
+
+            except NBusErrorNetwork as Ex:                                        # if network error, try to reconnect
+                counter += 1
+
+                if counter > self._request_attempts:
+                    raise Ex                                                     # if out of trials, propagate exception
+
+    """
+    ================================================================================================================
+                                                Internal Methods
+    ================================================================================================================
+    """
+
+    def _create_packet(self, start: list[int], data: list[int]) -> list[int]:
+        """
+        Create packet to send and prepare port.
+        :param start: head of message
+        :param data: body of message
+        :return: request packet
+        """
+        if len(data) > 0:
+            for c in data:
+                start.append(c)
+
+        if not self._port.is_open:
+            self.open()
+        self._port.flush()
+
+        start[0] = len(start)
+        crc_sum = crc8(start[1:])
+
+        start.append(crc_sum)
+        return start
+
+    def _receive_payload(self) -> list[int]:
+        """
+        Read response from serial.
+        :return: payload
+        """
+        # read data length
+        response_l = self._port.read(1)
+        if (response_l is None) or (len(response_l) == 0):          # check if message is empty
+            raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
+
+        response_l = ord(response_l)    # convert response length to number
+
+        # read response body
+        response = self._port.read(response_l)
+        if (response is None) or (len(response) != response_l):     # check for message completeness
+            raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
+
+        # check for crc
+        if response[-1] != crc8(response[:-1]):
+            raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
+
+        self._log('d', 0, "\tRS>", response_l, response)   # log response
+
+        # check for node error
+        if response[2] & NBUS_ERR_BIT:
+            raise NBusErrorNode(NBusErrorNodeType(response[3]))
+
+        return [response_l - 4] + list(response)[3:-1]   # return payload length + payload
+
+    def _log(self, *message: Any) -> None:
+        """
+        Log request/response.
+        :param message: message to log
+        :return: processed log
+        """
+        if self._enable_log:
+            self._logger_cb(*message)

+ 29 - 0
nbus_types/nbus_command_type.py

@@ -0,0 +1,29 @@
+from enum import Enum
+
+
+class NBusCommandPrefix(Enum):
+    """
+    Enum class for nBus command prefixes.
+    """
+    SET = 0x20
+    GET = 0x00
+
+
+class NBusCommand(Enum):
+    """
+    Enum class for valid nBus commands.
+    """
+    CMD_ECHO = 0x01
+    CMD_STOP = 0x02
+    CMD_START = 0x03
+    CMD_PARAM = 0x04
+    CMD_SENSOR_CNT = 0x05
+    CMD_SLEEP = 0x06
+    CMD_WAKEUP = 0x07
+    CMD_CALIBRATE = 0x08
+    CMD_RESET = 0x09
+    CMD_STORE = 0x0A
+    CMD_DATA = 0x0B
+    CMD_SYNC = 0x0C
+    CMD_SENSOR_TYPE = 0x0D
+    CMD_INFO = 0x0E

+ 18 - 0
nbus_types/nbus_data_fomat.py

@@ -0,0 +1,18 @@
+from pydantic import BaseModel, Field
+
+
+class NBusDataFormat(BaseModel):
+    """
+    Class for data format.
+
+    :ivar sign: signed = 1 | unsigned = 0
+    :ivar unit_multiplier: unit multiplier (in log10 form)
+    :ivar value_multiplier: value multiplier (in log10 form)
+    :ivar byte_length: number of bytes per value
+    :ivar samples: number of values
+    """
+    sign: bool = Field(frozen=True)
+    unit_multiplier: int = Field(frozen=True)
+    value_multiplier: int = Field(frozen=True)
+    byte_length: int = Field(frozen=True)
+    samples: int = Field(frozen=True)

+ 16 - 0
nbus_types/nbus_exceptions/nbus_api_exception.py

@@ -0,0 +1,16 @@
+from enum import Enum
+
+
+class NBusErrorAPI(Exception):
+    """
+    Exception for api errors.
+    """
+    pass
+
+
+class NBusErrorAPIType(Enum):
+    """
+    Enum class for NBusErrorAPI identification.
+    """
+    FORMAT_NOT_LOADED = 0x201
+    PARAMS_NOT_LOADED = 0x202

+ 17 - 0
nbus_types/nbus_exceptions/nbus_network_exception.py

@@ -0,0 +1,17 @@
+from enum import Enum
+
+
+class NBusErrorNetwork(Exception):
+    """
+    Exception for node errors in nBus network.
+    """
+    pass
+
+
+class NBusErrorNetworkType(Enum):
+    """
+    Enum class for NBusErrorNode identification.
+    """
+    EMPTY_MESSAGE = 0x101
+    DAMAGED_MESSAGE = 0x102
+    MESSAGE_NOT_COMPLETE = 0x103

+ 41 - 0
nbus_types/nbus_exceptions/nbus_node_exception.py

@@ -0,0 +1,41 @@
+from enum import Enum
+
+"""
+Message error bit.
+"""
+NBUS_ERR_BIT = 0x80
+
+
+class NBusErrorNode(Exception):
+    """
+    Exception for node errors in nBus node.
+    """
+    pass
+
+
+class NBusErrorNodeType(Enum):
+    """
+    Enum class for NBusErrorNode identification.
+    """
+    EMPTY_MESSAGE = 0xFF
+    DAMAGED_MESSAGE = 0xFE
+    MESSAGE_NOT_COMPLETE = 0xFD
+    OK_CODE = 0x00
+    ILLEGAL_FUNCTION = 0x01
+    ILLEGAL_DEVICE_ADDRESS = 0x02
+    ILLEGAL_DATA_VALUE = 0x03
+    SLAVE_DEVICE_FAILURE = 0x04
+    ACKNOWLEDGE = 0x05
+    DEVICE_BUSY = 0x06
+    DEVICE_NOT_READY = 0x07
+    PARAM_NOT_IMPLEMENTED = 0x10
+    GENERIC_SUBSLAVE_ERROR = 0x13
+    ERR_SUBSLAVE_INIT_FAIL = 0x14
+    SUBSLAVE_CUSTOM_ERR_1 = 0x1A
+    SUBSLAVE_CUSTOM_ERR_2 = 0x1B
+    SUBSLAVE_CUSTOM_ERR_3 = 0x1C
+    SUBSLAVE_CUSTOM_ERR_4 = 0x1D
+    SUBSLAVE_CUSTOM_ERR_5 = 0x1E
+    SUBSLAVE_CUSTOM_ERR_6 = 0x1F
+    ERR_OUTPUT_ONLY = 0x21
+    ERR_INPUT_ONLY = 0x22

+ 23 - 0
nbus_types/nbus_parameter_type.py

@@ -0,0 +1,23 @@
+from enum import Enum
+from typing import Union
+
+
+class NBusParameterType(Enum):
+    """
+    Enum class for nBus parameter types.
+    """
+    PARAM_NONE = 0xFF
+    PARAM_TIMEBASE = 0
+    PARAM_RESOLUTION = 1
+    PARAM_GAIN = 2
+    PARAM_OFFSET = 3
+    PARAM_SAMPLERATE = 4
+    PARAM_RANGE = 5
+    PARAM_RANGE0 = 6
+    PARAM_FILTER = 7
+
+
+"""
+Typedef for nBus parameter value.
+"""
+NBusParameterValue = Union[float, int]

+ 17 - 0
nbus_types/nbus_sensor_type.py

@@ -0,0 +1,17 @@
+from enum import Enum
+
+
+class NBusSensorType(Enum):
+    """
+    Enum class for sensor type.
+    """
+    TYPE_UNKNOWN = 0xFF
+    TYPE_ACCELEROMETER = 0
+    TYPE_GYROSCOPE = 1
+    TYPE_MAGNETOMETER = 2
+    TYPE_TEMPERATURE = 3
+    TYPE_HUMIDITY = 4
+    TYPE_PRESSURE = 5
+    TYPE_HEART_RATE = 6
+    TYPE_DEVIATION_DISTANCE = 7
+    

+ 20 - 0
nbus_types/nbus_serial_config.py

@@ -0,0 +1,20 @@
+from pydantic import BaseModel, Field
+
+
+class NBusSerialConfig(BaseModel):
+    """
+    Configuration of nBus serial port.
+
+    :ivar port_name: The serial port identifier.
+    :ivar baud: The baud rate for the serial communication.
+    :ivar parity: The parity bit setting for the serial communication.
+    :ivar timeout: The timeout value for the serial communication.
+    :ivar request_attempts: The number of attempts for a request.
+    :ivar enable_log: Flag to enable or disable logging.
+    """
+    port_name: str = Field(frozen=True)
+    baud: int = Field(frozen=True)
+    parity: str = Field(frozen=True)
+    timeout: float = Field(frozen=True)
+    request_attempts: int = Field(frozen=True)
+    enable_log: bool = Field(frozen=True)