| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- import time
- import serial
- from pydantic import validate_call
- from typing import Any, Callable
- from nbus_types.nbus_serial_config import NBusSerialConfig
- from nbus_types.nbus_command_type import *
- from nbus_types.nbus_exceptions.nbus_network_exception import *
- from nbus_types.nbus_exceptions.nbus_node_exception import *
- from nbus_hal.crc8 import crc8
- def default_logger(*message: Any) -> None:
- """
- Default logger function.
- :param message: message to log
- :return: None
- """
- for i in range(len(message)):
- if isinstance(message[i], list):
- for m in message[i]:
- print(hex(m), end="|")
- continue
- print(message[i], end=" ")
- print()
- class NBusSerialPort:
- """
- Class representing nBus serial port.
- """
- @validate_call
- def __init__(self, config: NBusSerialConfig):
- self._port = serial.Serial(timeout=config.timeout)
- self._port.port = config.port_name
- self._port.parity = config.parity
- self._port.baudrate = config.baud
- self._logger_cb = default_logger
- self._enable_log = config.enable_log
- self._request_attempts = config.request_attempts
- """
- ================================================================================================================
- API Methods
- ================================================================================================================
- """
- def open(self) -> None:
- """
- Open port.
- :return: None
- """
- self._port.open()
- self._port.flush()
- self._log('i', 0, 'Open communication port')
- def close(self) -> None:
- """
- Close port.
- :return: None
- """
- self._log('i', 0, 'Close communication port')
- self._port.close()
- def is_connected(self) -> bool:
- """
- Return connection status.
- :return: True = connected | False = not connected
- """
- return self._port.is_open
- @validate_call(validate_return=True)
- def set_logger(self, callback: Callable[[Any], None]) -> None:
- """
- Set logger function.
- :param callback: logging callback
- :return: None
- """
- self._logger_cb = callback
- @validate_call
- def request_broadcast(self, command: NBusCommand, data: list[int]) -> None:
- """
- Make broadcast request to nbus network.
- :param command: command id
- :param data: command data to send
- :return: None
- """
- request = self._create_packet([0, 0, 0, command.value], data)
- self._log("\tBRQ>", request)
- self._port.write(request) # send message
- @validate_call(validate_return=True)
- def request(self, module: int, sensor: int, command: NBusCommand, data: list[int], long_answer: float = 0) -> list[int]:
- """
- Make request to nbus node.
- :param module: module address
- :param sensor: sensor address
- :param command: command id
- :param data: command data to send
- :param long_answer: timeout for long answer
- :return: response
- """
- request = self._create_packet([0, module, sensor, command.value], data) # create request
- self._log('d', sensor, "\tRQ>", request) # log request
- counter = 0 # err. trials
- while True: # try to communicate
- self._port.write(request)
- if long_answer > 0: # wait for long answer
- time.sleep(long_answer)
- try:
- return self._receive_payload()
- except NBusErrorNetwork as Ex: # if network error, try to reconnect
- counter += 1
- if counter > self._request_attempts:
- raise Ex # if out of trials, propagate exception
- """
- ================================================================================================================
- Internal Methods
- ================================================================================================================
- """
- def _create_packet(self, start: list[int], data: list[int]) -> list[int]:
- """
- Create packet to send and prepare port.
- :param start: head of message
- :param data: body of message
- :return: request packet
- """
- if len(data) > 0:
- for c in data:
- start.append(c)
- if not self._port.is_open:
- self.open()
- self._port.flush()
- start[0] = len(start)
- crc_sum = crc8(start[1:])
- start.append(crc_sum)
- return start
- def _receive_payload(self) -> list[int]:
- """
- Read response from serial.
- :return: payload
- """
- # read data length
- response_l = self._port.read(1)
- if (response_l is None) or (len(response_l) == 0): # check if message is empty
- raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
- response_l = ord(response_l) # convert response length to number
- # read response body
- response = self._port.read(response_l)
- if (response is None) or (len(response) != response_l): # check for message completeness
- raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
- # check for crc
- if response[-1] != crc8(response[:-1]):
- raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
- self._log('d', 0, "\tRS>", response_l, response) # log response
- # check for node error
- if response[2] & NBUS_ERR_BIT:
- raise NBusErrorNode(NBusErrorNodeType(response[3]))
- return [response_l - 4] + list(response)[3:-1] # return payload length + payload
- def _log(self, *message: Any) -> None:
- """
- Log request/response.
- :param message: message to log
- :return: processed log
- """
- if self._enable_log:
- self._logger_cb(*message)
|