nbus_generic_port.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from abc import ABCMeta, abstractmethod
  2. from typing import Annotated
  3. from beartype import beartype
  4. from beartype.vale import Is
  5. from nbus_types.nbus_command_type import *
  6. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  7. """
  8. NBus delay typedef
  9. """
  10. NBusDelay = Annotated[float, Is[lambda s: s >= 0]]
  11. @beartype
  12. class NBusPort(metaclass=ABCMeta):
  13. """
  14. Class for generic NBus communication port.
  15. """
  16. @abstractmethod
  17. def open(self) -> None:
  18. """
  19. Open communication port.
  20. """
  21. pass
  22. @abstractmethod
  23. def close(self) -> None:
  24. """
  25. Close communication port.
  26. """
  27. pass
  28. @abstractmethod
  29. def flush(self) -> None:
  30. """
  31. Flush port with periodic check.
  32. """
  33. pass
  34. @abstractmethod
  35. def try_read(self) -> bytes:
  36. """
  37. Try reading from port.
  38. :return: bytes
  39. """
  40. pass
  41. @abstractmethod
  42. def is_connected(self) -> bool:
  43. """
  44. Return connection status.
  45. :return: status (1 = connected, 0 = not connected)
  46. """
  47. pass
  48. @abstractmethod
  49. def request_bridge(self, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0):
  50. """
  51. Make bridge request.
  52. :param command: command id
  53. :param data: command data to send
  54. :param long_answer: delay in s for longer answer
  55. """
  56. pass
  57. @abstractmethod
  58. def send_bridge(self, command: NBusCommand, data: bytearray):
  59. """
  60. Make bridge request without waiting for response.
  61. :param command: command id
  62. :param data: command data to send
  63. :param long_answer: delay in s for longer answer
  64. """
  65. pass
  66. @abstractmethod
  67. def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
  68. """
  69. Make broadcast request to nbus network.
  70. :param command: command id
  71. :param data: command data to send
  72. """
  73. pass
  74. @abstractmethod
  75. def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
  76. long_answer: NBusDelay = 0.0) -> bytearray:
  77. """
  78. Make module request to nbus network.
  79. :param module_addr: address of module
  80. :param command: command id
  81. :param data: command data to send
  82. :param long_answer: delay in s for longer answer
  83. :return: | payload length | payload |
  84. """
  85. pass
  86. @abstractmethod
  87. def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress, command: NBusCommand,
  88. data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
  89. """
  90. Make sensor request to nbus network.
  91. :param module_addr: address of module
  92. :param sensor_address: address of sensor
  93. :param command: command id
  94. :param data: command data to send
  95. :param long_answer: delay in s for longer answer
  96. :return: | payload length | payload |
  97. """
  98. pass