import time import serial from typing import Any, Callable, Optional from beartype import beartype from nbus_types.nbus_command_type import * from nbus_types.nbus_exceptions.nbus_network_exception import * from nbus_types.nbus_exceptions.nbus_node_exception import * from nbus_hal.nbus_serial.serial_config import NBusSerialConfig from nbus_hal.nbus_generic_port import NBusPort, NBusDelay from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress from nbus_hal.crc8 import crc8 from nbus_types.nbus_defines import * def default_logger(*message: Any) -> None: """ Default logger function. :param message: message to log :return: None """ for i in range(len(message)): if isinstance(message[i], list): for m in message[i]: print(hex(m), end="|") continue print(message[i], end=" ") print() @beartype class NBusSerialPort(NBusPort): """ Class representing nBus serial port. """ def __init__(self, config: NBusSerialConfig): """ Constructor. :param config: configuration """ self._flush_delay = config.flush_delay self._port = serial.Serial(timeout=config.timeout) self._port.port = config.port_name self._port.parity = config.parity.value self._port.baudrate = config.baud.value self._logger_cb = default_logger self._enable_log = config.enable_log self._request_attempts = config.request_attempts """ ================================================================================================================ API Methods ================================================================================================================ """ def change_configuration(self, config: NBusSerialConfig): """ Change port configuration. :param config: configuration """ self._port.timeout = config.timeout self._flush_delay = config.flush_delay self._port.port = config.port_name self._port.parity = config.parity.value self._port.baudrate = config.baud.value self._enable_log = config.enable_log self._request_attempts = config.request_attempts def open(self) -> None: """ Open port. """ self._port.open() self._port.flush() self._log("INFO", " 0", "\tOpen communication port") def close(self) -> None: """ Close port. """ self._log("INFO", " 0", "\tClose communication port") self._port.close() def flush(self) -> None: """ Flush port with periodic check. """ while self._port.in_waiting: self._log("INFO", " 0", "\tFlush communication port") self._port.reset_input_buffer() self._port.reset_output_buffer() time.sleep(self._flush_delay) def try_read(self) -> bytes: """ Try reading from port. :return: bytes """ if self._port.in_waiting > 0: data = self._port.read() self._log("DATA", f"{len(data):3}", "\tARS>", list(data)) return data else: return bytes() def is_connected(self) -> bool: """ Return connection status. :return: status (1 = connected, 0 = not connected) """ return self._port.is_open def set_logger(self, callback: Callable[[Any], None]) -> None: """ Set logger function. :param callback: logging callback """ self._logger_cb = callback def request_bridge(self, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0): """ Make bridge request. :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer """ return self._request_response(NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value, data, long_answer) def send_bridge(self, command: NBusCommand, data: bytearray): """ Make bridge request without waiting for response. :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer """ request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value]), data) self._log("DATA", f"{request[0]:3}", "\tARQ>", list(request[1:])) self._port.write(request) # send message def request_broadcast(self, command: NBusCommand, data: bytearray) -> None: """ Make broadcast request to nbus network. :param command: command id :param data: command data to send """ request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value]), data) self._log("DATA", f"{request[0]:3}", "\tARQ>", list(request[1:])) self._port.write(request) # send message def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make module request to nbus network. :param module_addr: address of module :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ return self._request_response(module_addr, 0, command.value, data, long_answer) def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make device request to nbus network. :param module_addr: address of module :param sensor_address: address of sensor :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ return self._request_response(module_addr, sensor_address, command.value, data, long_answer) """ ================================================================================================================ Internal Methods ================================================================================================================ """ def _request_response(self, module: int, sensor: int, command, data: bytearray, long_answer: float = 0) -> bytearray: """ Make request to nbus node and receive response. :param module: module address :param sensor: sensor address :param command: command id :param data: command data to send :param long_answer: timeout for long answer :return: response length | response """ request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request self._log("DATA", f"{request[0]:3}", "\tRQ>", list(request[1:])) # log request counter = 0 # err. trials while True: # try to communicate self._port.write(request) if long_answer > 0: # wait for long answer time.sleep(long_answer) try: return self._receive_payload() except NBusErrorNetwork as Ex: # if network error, try to reconnect counter += 1 if counter > self._request_attempts: raise Ex # if out of trials, propagate exception def _create_packet(self, start: bytearray, data: bytearray) -> bytearray: """ Create packet to send and prepare port. :param start: head of message :param data: body of message :return: request packet """ if len(data) > 0: for c in data: start.append(c) if not self._port.is_open: self.open() self._port.flush() start[0] = len(start) crc_sum = crc8(start[1:]) start.append(crc_sum) return start def _receive_payload(self) -> bytearray: """ Read response from serial. :return: | payload len | payload """ # read data length response_l = self._port.read(1) if (response_l is None) or (len(response_l) == 0): # check if message is empty raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE) response_l = ord(response_l) # convert response length to number # read response body response = self._port.read(response_l) if (response is None) or (len(response) != response_l): # check for message completeness raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE) # check for crc if response[NBUS_CRC_ADDR] != crc8(response[:NBUS_CRC_ADDR]): raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE) self._log("DATA", f"{response_l:3}", "\tRS>", list(response)) # log response # check for node error if response[NBUS_FC_ADDR] & NBUS_ERR_BIT: raise NBusErrorNode(NBusErrorNodeType(response[NBUS_DATA0_ADDR])) # return payload length + payload return bytearray([response_l - NBUS_RX_META_SIZE]) + bytearray(response[NBUS_DATA0_ADDR:NBUS_CRC_ADDR]) def _log(self, *message: Any) -> None: """ Log request/response. :param message: message to log """ if self._enable_log: self._logger_cb(*message)