serial_port.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import time
  2. import serial
  3. from pydantic import validate_call
  4. from typing import Any, Callable
  5. from nbus_types.nbus_serial_config import NBusSerialConfig
  6. from nbus_types.nbus_command_type import *
  7. from nbus_types.nbus_exceptions.nbus_network_exception import *
  8. from nbus_types.nbus_exceptions.nbus_node_exception import *
  9. from nbus_hal.crc8 import crc8
  10. def default_logger(*message: Any) -> None:
  11. """
  12. Default logger function.
  13. :param message: message to log
  14. :return: None
  15. """
  16. for i in range(len(message)):
  17. if isinstance(message[i], list):
  18. for m in message[i]:
  19. print(hex(m), end="|")
  20. continue
  21. print(message[i], end=" ")
  22. print()
  23. class NBusSerialPort:
  24. """
  25. Class representing nBus serial port.
  26. """
  27. @validate_call
  28. def __init__(self, config: NBusSerialConfig):
  29. self._port = serial.Serial(timeout=config.timeout)
  30. self._port.port = config.port_name
  31. self._port.parity = config.parity
  32. self._port.baudrate = config.baud
  33. self._logger_cb = default_logger
  34. self._enable_log = config.enable_log
  35. self._request_attempts = config.request_attempts
  36. """
  37. ================================================================================================================
  38. API Methods
  39. ================================================================================================================
  40. """
  41. def open(self) -> None:
  42. """
  43. Open port.
  44. :return: None
  45. """
  46. self._port.open()
  47. self._port.flush()
  48. self._log('i', 0, 'Open communication port')
  49. def close(self) -> None:
  50. """
  51. Close port.
  52. :return: None
  53. """
  54. self._log('i', 0, 'Close communication port')
  55. self._port.close()
  56. def is_connected(self) -> bool:
  57. """
  58. Return connection status.
  59. :return: True = connected | False = not connected
  60. """
  61. return self._port.is_open
  62. @validate_call(validate_return=True)
  63. def set_logger(self, callback: Callable[[Any], None]) -> None:
  64. """
  65. Set logger function.
  66. :param callback: logging callback
  67. :return: None
  68. """
  69. self._logger_cb = callback
  70. @validate_call
  71. def request_broadcast(self, command: NBusCommand, data: list[int]) -> None:
  72. """
  73. Make broadcast request to nbus network.
  74. :param command: command id
  75. :param data: command data to send
  76. :return: None
  77. """
  78. request = self._create_packet([0, 0, 0, command.value], data)
  79. self._log("\tBRQ>", request)
  80. self._port.write(request) # send message
  81. @validate_call(validate_return=True)
  82. def request(self, module: int, sensor: int, command: NBusCommand, data: list[int], long_answer: float = 0) -> list[int]:
  83. """
  84. Make request to nbus node.
  85. :param module: module address
  86. :param sensor: sensor address
  87. :param command: command id
  88. :param data: command data to send
  89. :param long_answer: timeout for long answer
  90. :return: response
  91. """
  92. request = self._create_packet([0, module, sensor, command.value], data) # create request
  93. self._log('d', sensor, "\tRQ>", request) # log request
  94. counter = 0 # err. trials
  95. while True: # try to communicate
  96. self._port.write(request)
  97. if long_answer > 0: # wait for long answer
  98. time.sleep(long_answer)
  99. try:
  100. return self._receive_payload()
  101. except NBusErrorNetwork as Ex: # if network error, try to reconnect
  102. counter += 1
  103. if counter > self._request_attempts:
  104. raise Ex # if out of trials, propagate exception
  105. """
  106. ================================================================================================================
  107. Internal Methods
  108. ================================================================================================================
  109. """
  110. def _create_packet(self, start: list[int], data: list[int]) -> list[int]:
  111. """
  112. Create packet to send and prepare port.
  113. :param start: head of message
  114. :param data: body of message
  115. :return: request packet
  116. """
  117. if len(data) > 0:
  118. for c in data:
  119. start.append(c)
  120. if not self._port.is_open:
  121. self.open()
  122. self._port.flush()
  123. start[0] = len(start)
  124. crc_sum = crc8(start[1:])
  125. start.append(crc_sum)
  126. return start
  127. def _receive_payload(self) -> list[int]:
  128. """
  129. Read response from serial.
  130. :return: payload
  131. """
  132. # read data length
  133. response_l = self._port.read(1)
  134. if (response_l is None) or (len(response_l) == 0): # check if message is empty
  135. raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
  136. response_l = ord(response_l) # convert response length to number
  137. # read response body
  138. response = self._port.read(response_l)
  139. if (response is None) or (len(response) != response_l): # check for message completeness
  140. raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
  141. # check for crc
  142. if response[-1] != crc8(response[:-1]):
  143. raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
  144. self._log('d', 0, "\tRS>", response_l, response) # log response
  145. # check for node error
  146. if response[2] & NBUS_ERR_BIT:
  147. raise NBusErrorNode(NBusErrorNodeType(response[3]))
  148. return [response_l - 4] + list(response)[3:-1] # return payload length + payload
  149. def _log(self, *message: Any) -> None:
  150. """
  151. Log request/response.
  152. :param message: message to log
  153. :return: processed log
  154. """
  155. if self._enable_log:
  156. self._logger_cb(*message)