import time import serial from pydantic import validate_call from typing import Any, Callable from nbus_types.nbus_serial_config import NBusSerialConfig from nbus_types.nbus_command_type import * from nbus_types.nbus_exceptions.nbus_network_exception import * from nbus_types.nbus_exceptions.nbus_node_exception import * from nbus_hal.crc8 import crc8 def default_logger(*message: Any) -> None: """ Default logger function. :param message: message to log :return: None """ for i in range(len(message)): if isinstance(message[i], list): for m in message[i]: print(hex(m), end="|") continue print(message[i], end=" ") print() class NBusSerialPort: """ Class representing nBus serial port. """ @validate_call def __init__(self, config: NBusSerialConfig): self._port = serial.Serial(timeout=config.timeout) self._port.port = config.port_name self._port.parity = config.parity self._port.baudrate = config.baud self._logger_cb = default_logger self._enable_log = config.enable_log self._request_attempts = config.request_attempts """ ================================================================================================================ API Methods ================================================================================================================ """ def open(self) -> None: """ Open port. :return: None """ self._port.open() self._port.flush() self._log('i', 0, 'Open communication port') def close(self) -> None: """ Close port. :return: None """ self._log('i', 0, 'Close communication port') self._port.close() def is_connected(self) -> bool: """ Return connection status. :return: True = connected | False = not connected """ return self._port.is_open @validate_call(validate_return=True) def set_logger(self, callback: Callable[[Any], None]) -> None: """ Set logger function. :param callback: logging callback :return: None """ self._logger_cb = callback @validate_call def request_broadcast(self, command: NBusCommand, data: list[int]) -> None: """ Make broadcast request to nbus network. :param command: command id :param data: command data to send :return: None """ request = self._create_packet([0, 0, 0, command.value], data) self._log("\tBRQ>", request) self._port.write(request) # send message @validate_call(validate_return=True) def request(self, module: int, sensor: int, command: NBusCommand, data: list[int], long_answer: float = 0) -> list[int]: """ Make request to nbus node. :param module: module address :param sensor: sensor address :param command: command id :param data: command data to send :param long_answer: timeout for long answer :return: response """ request = self._create_packet([0, module, sensor, command.value], data) # create request self._log('d', sensor, "\tRQ>", request) # log request counter = 0 # err. trials while True: # try to communicate self._port.write(request) if long_answer > 0: # wait for long answer time.sleep(long_answer) try: return self._receive_payload() except NBusErrorNetwork as Ex: # if network error, try to reconnect counter += 1 if counter > self._request_attempts: raise Ex # if out of trials, propagate exception """ ================================================================================================================ Internal Methods ================================================================================================================ """ def _create_packet(self, start: list[int], data: list[int]) -> list[int]: """ Create packet to send and prepare port. :param start: head of message :param data: body of message :return: request packet """ if len(data) > 0: for c in data: start.append(c) if not self._port.is_open: self.open() self._port.flush() start[0] = len(start) crc_sum = crc8(start[1:]) start.append(crc_sum) return start def _receive_payload(self) -> list[int]: """ Read response from serial. :return: payload """ # read data length response_l = self._port.read(1) if (response_l is None) or (len(response_l) == 0): # check if message is empty raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE) response_l = ord(response_l) # convert response length to number # read response body response = self._port.read(response_l) if (response is None) or (len(response) != response_l): # check for message completeness raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE) # check for crc if response[-1] != crc8(response[:-1]): raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE) self._log('d', 0, "\tRS>", response_l, response) # log response # check for node error if response[2] & NBUS_ERR_BIT: raise NBusErrorNode(NBusErrorNodeType(response[3])) return [response_l - 4] + list(response)[3:-1] # return payload length + payload def _log(self, *message: Any) -> None: """ Log request/response. :param message: message to log :return: processed log """ if self._enable_log: self._logger_cb(*message)