from abc import ABCMeta, abstractmethod from typing import Annotated from beartype import beartype from beartype.vale import Is from nbus_types.nbus_address_type import NBusModuleAddress, NBusDeviceAddress from nbus_types.nbus_command_type import NBusCommand """ NBus delay typedef """ NBusDelay = Annotated[float, Is[lambda s: s >= 0]] @beartype class NBusPort(metaclass=ABCMeta): """ Class for generic NBus communication port. """ @abstractmethod def open(self) -> None: """ Open communication port. """ pass @abstractmethod def close(self) -> None: """ Close communication port. """ pass @abstractmethod def is_connected(self) -> bool: """ Return connection status. :return: status (1 = connected, 0 = not connected) """ pass @abstractmethod def request_broadcast(self, command: NBusCommand, data: bytearray) -> None: """ Make broadcast request to nbus network. :param command: command id :param data: command data to send """ pass @abstractmethod def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make module request to nbus network. :param module_addr: address of module :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ pass @abstractmethod def request_device(self, module_addr: NBusModuleAddress, device_address: NBusDeviceAddress, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray: """ Make device request to nbus network. :param module_addr: address of module :param device_address: address of device :param command: command id :param data: command data to send :param long_answer: delay in s for longer answer :return: | payload length | payload | """ pass