serial_port.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. import time
  2. import serial
  3. from typing import Any, Callable
  4. from beartype import beartype
  5. from nbus_types.nbus_command_type import *
  6. from nbus_types.nbus_exceptions.nbus_network_exception import *
  7. from nbus_types.nbus_exceptions.nbus_node_exception import *
  8. from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
  9. from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
  10. from nbus_types.nbus_address_type import NBusModuleAddress, NBusDeviceAddress
  11. from nbus_hal.crc8 import crc8
  12. def default_logger(*message: Any) -> None:
  13. """
  14. Default logger function.
  15. :param message: message to log
  16. :return: None
  17. """
  18. for i in range(len(message)):
  19. if isinstance(message[i], list):
  20. for m in message[i]:
  21. print(hex(m), end="|")
  22. continue
  23. print(message[i], end=" ")
  24. print()
  25. @beartype
  26. class NBusSerialPort(NBusPort):
  27. """
  28. Class representing nBus serial port.
  29. """
  30. def __init__(self, config: NBusSerialConfig):
  31. self._port = serial.Serial(timeout=config.timeout)
  32. self._port.port = config.port_name
  33. self._port.parity = config.parity.value
  34. self._port.baudrate = config.baud.value
  35. self._logger_cb = default_logger
  36. self._enable_log = config.enable_log
  37. self._request_attempts = config.request_attempts
  38. """
  39. ================================================================================================================
  40. API Methods
  41. ================================================================================================================
  42. """
  43. def open(self) -> None:
  44. """
  45. Open port.
  46. """
  47. self._port.open()
  48. self._port.flush()
  49. self._log('i', 0, 'Open communication port')
  50. def close(self) -> None:
  51. """
  52. Close port.
  53. """
  54. self._log('i', 0, 'Close communication port')
  55. self._port.close()
  56. def is_connected(self) -> bool:
  57. """
  58. Return connection status.
  59. :return: status (1 = connected, 0 = not connected)
  60. """
  61. return self._port.is_open
  62. def set_logger(self, callback: Callable[[Any], None]) -> None:
  63. """
  64. Set logger function.
  65. :param callback: logging callback
  66. """
  67. self._logger_cb = callback
  68. def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
  69. """
  70. Make broadcast request to nbus network.
  71. :param command: command id
  72. :param data: command data to send
  73. """
  74. request = self._create_packet([0, 0, 0, command.value], data)
  75. self._log("\tBRQ>", request)
  76. self._port.write(request) # send message
  77. def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
  78. long_answer: NBusDelay = 0.0) -> bytearray:
  79. """
  80. Make module request to nbus network.
  81. :param module_addr: address of module
  82. :param command: command id
  83. :param data: command data to send
  84. :param long_answer: delay in s for longer answer
  85. :return: | payload length | payload |
  86. """
  87. return self._request_response(module_addr, 0, command.value, data, long_answer)
  88. def request_device(self, module_addr: NBusModuleAddress, device_address: NBusDeviceAddress,
  89. command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
  90. """
  91. Make device request to nbus network.
  92. :param module_addr: address of module
  93. :param device_address: address of device
  94. :param command: command id
  95. :param data: command data to send
  96. :param long_answer: delay in s for longer answer
  97. :return: | payload length | payload |
  98. """
  99. return self._request_response(module_addr, device_address, command.value, data, long_answer)
  100. """
  101. ================================================================================================================
  102. Internal Methods
  103. ================================================================================================================
  104. """
  105. def _request_response(self, module: int, sensor: int, command, data: bytearray,
  106. long_answer: float = 0) -> bytearray:
  107. """
  108. Make request to nbus node and receive response.
  109. :param module: module address
  110. :param sensor: sensor address
  111. :param command: command id
  112. :param data: command data to send
  113. :param long_answer: timeout for long answer
  114. :return: response length | response
  115. """
  116. request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
  117. self._log('d', sensor, "\tRQ>", request) # log request
  118. counter = 0 # err. trials
  119. while True: # try to communicate
  120. self._port.write(request)
  121. if long_answer > 0: # wait for long answer
  122. time.sleep(long_answer)
  123. try:
  124. return self._receive_payload()
  125. except NBusErrorNetwork as Ex: # if network error, try to reconnect
  126. counter += 1
  127. if counter > self._request_attempts:
  128. raise Ex # if out of trials, propagate exception
  129. def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
  130. """
  131. Create packet to send and prepare port.
  132. :param start: head of message
  133. :param data: body of message
  134. :return: request packet
  135. """
  136. if len(data) > 0:
  137. for c in data:
  138. start.append(c)
  139. if not self._port.is_open:
  140. self.open()
  141. self._port.flush()
  142. start[0] = len(start)
  143. crc_sum = crc8(start[1:])
  144. start.append(crc_sum)
  145. return start
  146. def _receive_payload(self) -> bytearray:
  147. """
  148. Read response from serial.
  149. :return: | payload len | payload
  150. """
  151. # read data length
  152. response_l = self._port.read(1)
  153. if (response_l is None) or (len(response_l) == 0): # check if message is empty
  154. raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
  155. response_l = ord(response_l) # convert response length to number
  156. # read response body
  157. response = self._port.read(response_l)
  158. if (response is None) or (len(response) != response_l): # check for message completeness
  159. raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
  160. # check for crc
  161. if response[-1] != crc8(response[:-1]):
  162. raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
  163. self._log('d', 0, "\tRS>", [response_l] + list(response)) # log response
  164. # check for node error
  165. if response[2] & NBUS_ERR_BIT:
  166. raise NBusErrorNode(NBusErrorNodeType(response[3]))
  167. return bytearray([response_l - 4]) + bytearray(response[3:-1]) # return payload length + payload
  168. def _log(self, *message: Any) -> None:
  169. """
  170. Log request/response.
  171. :param message: message to log
  172. """
  173. if self._enable_log:
  174. self._logger_cb(*message)