import time import serial from typing import Any, Callable from beartype import beartype from nbus_types.nbus_command_type import * from nbus_types.nbus_exceptions.nbus_network_exception import * from nbus_types.nbus_exceptions.nbus_node_exception import * from nbus_hal.nbus_serial.serial_config import NBusSerialConfig from nbus_hal.nbus_generic_port import NBusPort, NBusDelay from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress from nbus_hal.crc8 import crc8 def default_logger(*message: Any) -> None: """ Default logger function. :param message: message to log :return: None """ for i in range(len(message)): if isinstance(message[i], list): for m in message[i]: print(hex(m), end="|") continue print(message[i], end=" ") print() @beartype class NBusSerialPort(NBusPort): """ Class representing nBus serial port. """ def __init__(self, config: NBusSerialConfig): self._port = serial.Serial(timeout=config.timeout) self._port.port = config.port_name self._port.parity = config.parity.value self._port.baudrate = config.baud.value self._logger_cb = default_logger self._enable_log = config.enable_log self._request_attempts = config.request_attempts """ ================================================================================================================ API Methods ================================================================================================================ """ def open(self) -> None: """ Open port. """ self._port.open() self._port.flush() self._log('i', 0, 'Open communication port') def close(self) -> None: """ Close port. """ self._log('i', 0, 'Close communication port') self._port.close() def is_connected(self) -> bool: """ Return connection status. :return: status (1 = connected, 0 = not connected) """ return self._port.is_open def set_logger(self, callback: Callable[[Any], None]) -> None: """ Set logger function. :param callback: logging callback """ self._logger_cb = callback def request_broadcast(self, command: NBusCommand, data: bytearray) -> None: """ Make broadcast request to nbus network. :param command: command id :param data: command data to send """ request = self._create_packet([0, 0, 0, command.value], data) self._log("\tBRQ>", request) self._port.write(request) # send message def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make module request to nbus network. :param module_addr: address of module :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ return self._request_response(module_addr, 0, command.value, data, long_answer) def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make device request to nbus network. :param module_addr: address of module :param sensor_address: address of sensor :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ return self._request_response(module_addr, sensor_address, command.value, data, long_answer) """ ================================================================================================================ Internal Methods ================================================================================================================ """ def _request_response(self, module: int, sensor: int, command, data: bytearray, long_answer: float = 0) -> bytearray: """ Make request to nbus node and receive response. :param module: module address :param sensor: sensor address :param command: command id :param data: command data to send :param long_answer: timeout for long answer :return: response length | response """ request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request self._log('d', sensor, "\tRQ>", request) # log request counter = 0 # err. trials while True: # try to communicate self._port.write(request) if long_answer > 0: # wait for long answer time.sleep(long_answer) try: return self._receive_payload() except NBusErrorNetwork as Ex: # if network error, try to reconnect counter += 1 if counter > self._request_attempts: raise Ex # if out of trials, propagate exception def _create_packet(self, start: bytearray, data: bytearray) -> bytearray: """ Create packet to send and prepare port. :param start: head of message :param data: body of message :return: request packet """ if len(data) > 0: for c in data: start.append(c) if not self._port.is_open: self.open() self._port.flush() start[0] = len(start) crc_sum = crc8(start[1:]) start.append(crc_sum) return start def _receive_payload(self) -> bytearray: """ Read response from serial. :return: | payload len | payload """ # read data length response_l = self._port.read(1) if (response_l is None) or (len(response_l) == 0): # check if message is empty raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE) response_l = ord(response_l) # convert response length to number # read response body response = self._port.read(response_l) if (response is None) or (len(response) != response_l): # check for message completeness raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE) # check for crc if response[-1] != crc8(response[:-1]): raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE) self._log('d', 0, "\tRS>", [response_l] + list(response)) # log response # check for node error if response[2] & NBUS_ERR_BIT: raise NBusErrorNode(NBusErrorNodeType(response[3])) return bytearray([response_l - 4]) + bytearray(response[3:-1]) # return payload length + payload def _log(self, *message: Any) -> None: """ Log request/response. :param message: message to log """ if self._enable_log: self._logger_cb(*message)