from abc import ABCMeta, abstractmethod from typing import Annotated from beartype import beartype from beartype.vale import Is from nbus_types.nbus_command_type import * from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress """ NBus delay typedef """ NBusDelay = Annotated[float, Is[lambda s: s >= 0]] @beartype class NBusPort(metaclass=ABCMeta): """ Class for generic NBus communication port. """ @abstractmethod def open(self) -> None: """ Open communication port. """ pass @abstractmethod def close(self) -> None: """ Close communication port. """ pass @abstractmethod def flush(self) -> None: """ Flush port with periodic check. """ pass @abstractmethod def try_read(self) -> bytes: """ Try reading from port. :return: bytes """ pass @abstractmethod def is_connected(self) -> bool: """ Return connection status. :return: status (1 = connected, 0 = not connected) """ pass @abstractmethod def request_bridge(self, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0): """ Make bridge request. :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer """ pass @abstractmethod def send_bridge(self, command: NBusCommand, data: bytearray): """ Make bridge request without waiting for response. :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer """ pass @abstractmethod def request_broadcast(self, command: NBusCommand, data: bytearray) -> None: """ Make broadcast request to nbus network. :param command: command id :param data: command data to send """ pass @abstractmethod def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make module request to nbus network. :param module_addr: address of module :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ pass @abstractmethod def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make sensor request to nbus network. :param module_addr: address of module :param sensor_address: address of sensor :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ pass