| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- import time
- import serial
- from typing import Any, Callable
- from beartype import beartype
- from nbus_types.nbus_command_type import *
- from nbus_types.nbus_exceptions.nbus_network_exception import *
- from nbus_types.nbus_exceptions.nbus_node_exception import *
- from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
- from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
- from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
- from nbus_hal.crc8 import crc8
- def default_logger(*message: Any) -> None:
- """
- Default logger function.
- :param message: message to log
- :return: None
- """
- for i in range(len(message)):
- if isinstance(message[i], list):
- for m in message[i]:
- print(hex(m), end="|")
- continue
- print(message[i], end=" ")
- print()
- @beartype
- class NBusSerialPort(NBusPort):
- """
- Class representing nBus serial port.
- """
- def __init__(self, config: NBusSerialConfig):
- """
- Constructor.
- :param config: configuration
- """
- self._port = serial.Serial(timeout=config.timeout)
- self._port.port = config.port_name
- self._port.parity = config.parity.value
- self._port.baudrate = config.baud.value
- self._logger_cb = default_logger
- self._enable_log = config.enable_log
- self._request_attempts = config.request_attempts
- """
- ================================================================================================================
- API Methods
- ================================================================================================================
- """
- def change_configuration(self, config: NBusSerialConfig):
- """
- Change port configuration.
- :param config: configuration
- """
- self._port.timeout = config.timeout
- self._port.port = config.port_name
- self._port.parity = config.parity.value
- self._port.baudrate = config.baud.value
- self._enable_log = config.enable_log
- self._request_attempts = config.request_attempts
- def open(self) -> None:
- """
- Open port.
- """
- self._port.open()
- self._port.flush()
- self._log('i', 0, 'Open communication port')
- def close(self) -> None:
- """
- Close port.
- """
- self._log('i', 0, 'Close communication port')
- self._port.close()
- def is_connected(self) -> bool:
- """
- Return connection status.
- :return: status (1 = connected, 0 = not connected)
- """
- return self._port.is_open
- def set_logger(self, callback: Callable[[Any], None]) -> None:
- """
- Set logger function.
- :param callback: logging callback
- """
- self._logger_cb = callback
- def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
- """
- Make broadcast request to nbus network.
- :param command: command id
- :param data: command data to send
- """
- request = self._create_packet(bytearray([0, 0, 0, command.value]), data)
- self._log("\tBRQ>", list(request))
- self._port.write(request) # send message
- def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
- long_answer: NBusDelay = 0.0) -> bytearray:
- """
- Make module request to nbus network.
- :param module_addr: address of module
- :param command: command id
- :param data: command data to send
- :param long_answer: delay in s for longer answer
- :return: | payload length | payload |
- """
- return self._request_response(module_addr, 0, command.value, data, long_answer)
- def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress,
- command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
- """
- Make device request to nbus network.
- :param module_addr: address of module
- :param sensor_address: address of sensor
- :param command: command id
- :param data: command data to send
- :param long_answer: delay in s for longer answer
- :return: | payload length | payload |
- """
- return self._request_response(module_addr, sensor_address, command.value, data, long_answer)
- """
- ================================================================================================================
- Internal Methods
- ================================================================================================================
- """
- def _request_response(self, module: int, sensor: int, command, data: bytearray,
- long_answer: float = 0) -> bytearray:
- """
- Make request to nbus node and receive response.
- :param module: module address
- :param sensor: sensor address
- :param command: command id
- :param data: command data to send
- :param long_answer: timeout for long answer
- :return: response length | response
- """
- request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
- self._log('d', sensor, "\tRQ>", list(request)) # log request
- counter = 0 # err. trials
- while True: # try to communicate
- self._port.write(request)
- if long_answer > 0: # wait for long answer
- time.sleep(long_answer)
- try:
- return self._receive_payload()
- except NBusErrorNetwork as Ex: # if network error, try to reconnect
- counter += 1
- if counter > self._request_attempts:
- raise Ex # if out of trials, propagate exception
- def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
- """
- Create packet to send and prepare port.
- :param start: head of message
- :param data: body of message
- :return: request packet
- """
- if len(data) > 0:
- for c in data:
- start.append(c)
- if not self._port.is_open:
- self.open()
- self._port.flush()
- start[0] = len(start)
- crc_sum = crc8(start[1:])
- start.append(crc_sum)
- return start
- def _receive_payload(self) -> bytearray:
- """
- Read response from serial.
- :return: | payload len | payload
- """
- # read data length
- response_l = self._port.read(1)
- if (response_l is None) or (len(response_l) == 0): # check if message is empty
- raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
- response_l = ord(response_l) # convert response length to number
- # read response body
- response = self._port.read(response_l)
- if (response is None) or (len(response) != response_l): # check for message completeness
- raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
- # check for crc
- if response[-1] != crc8(response[:-1]):
- raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
- self._log('d', 0, "\tRS>", [response_l] + list(response)) # log response
- # check for node error
- if response[2] & NBUS_ERR_BIT:
- raise NBusErrorNode(NBusErrorNodeType(response[3]))
- return bytearray([response_l - 4]) + bytearray(response[3:-1]) # return payload length + payload
- def _log(self, *message: Any) -> None:
- """
- Log request/response.
- :param message: message to log
- """
- if self._enable_log:
- self._logger_cb(*message)
|