serial_port.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. import time
  2. import serial
  3. from typing import Any, Callable
  4. from beartype import beartype
  5. from nbus_types.nbus_command_type import *
  6. from nbus_types.nbus_exceptions.nbus_network_exception import *
  7. from nbus_types.nbus_exceptions.nbus_node_exception import *
  8. from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
  9. from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
  10. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  11. from nbus_hal.crc8 import crc8
  12. def default_logger(*message: Any) -> None:
  13. """
  14. Default logger function.
  15. :param message: message to log
  16. :return: None
  17. """
  18. for i in range(len(message)):
  19. if isinstance(message[i], list):
  20. for m in message[i]:
  21. print(hex(m), end="|")
  22. continue
  23. print(message[i], end=" ")
  24. print()
  25. @beartype
  26. class NBusSerialPort(NBusPort):
  27. """
  28. Class representing nBus serial port.
  29. """
  30. def __init__(self, config: NBusSerialConfig):
  31. """
  32. Constructor.
  33. :param config: configuration
  34. """
  35. self._port = serial.Serial(timeout=config.timeout)
  36. self._port.port = config.port_name
  37. self._port.parity = config.parity.value
  38. self._port.baudrate = config.baud.value
  39. self._logger_cb = default_logger
  40. self._enable_log = config.enable_log
  41. self._request_attempts = config.request_attempts
  42. """
  43. ================================================================================================================
  44. API Methods
  45. ================================================================================================================
  46. """
  47. def change_configuration(self, config: NBusSerialConfig):
  48. """
  49. Change port configuration.
  50. :param config: configuration
  51. """
  52. self._port.timeout = config.timeout
  53. self._port.port = config.port_name
  54. self._port.parity = config.parity.value
  55. self._port.baudrate = config.baud.value
  56. self._enable_log = config.enable_log
  57. self._request_attempts = config.request_attempts
  58. def open(self) -> None:
  59. """
  60. Open port.
  61. """
  62. self._port.open()
  63. self._port.flush()
  64. self._log('i', 0, 'Open communication port')
  65. def close(self) -> None:
  66. """
  67. Close port.
  68. """
  69. self._log('i', 0, 'Close communication port')
  70. self._port.close()
  71. def is_connected(self) -> bool:
  72. """
  73. Return connection status.
  74. :return: status (1 = connected, 0 = not connected)
  75. """
  76. return self._port.is_open
  77. def set_logger(self, callback: Callable[[Any], None]) -> None:
  78. """
  79. Set logger function.
  80. :param callback: logging callback
  81. """
  82. self._logger_cb = callback
  83. def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
  84. """
  85. Make broadcast request to nbus network.
  86. :param command: command id
  87. :param data: command data to send
  88. """
  89. request = self._create_packet(bytearray([0, 0, 0, command.value]), data)
  90. self._log("\tBRQ>", list(request))
  91. self._port.write(request) # send message
  92. def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
  93. long_answer: NBusDelay = 0.0) -> bytearray:
  94. """
  95. Make module request to nbus network.
  96. :param module_addr: address of module
  97. :param command: command id
  98. :param data: command data to send
  99. :param long_answer: delay in s for longer answer
  100. :return: | payload length | payload |
  101. """
  102. return self._request_response(module_addr, 0, command.value, data, long_answer)
  103. def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress,
  104. command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
  105. """
  106. Make device request to nbus network.
  107. :param module_addr: address of module
  108. :param sensor_address: address of sensor
  109. :param command: command id
  110. :param data: command data to send
  111. :param long_answer: delay in s for longer answer
  112. :return: | payload length | payload |
  113. """
  114. return self._request_response(module_addr, sensor_address, command.value, data, long_answer)
  115. """
  116. ================================================================================================================
  117. Internal Methods
  118. ================================================================================================================
  119. """
  120. def _request_response(self, module: int, sensor: int, command, data: bytearray,
  121. long_answer: float = 0) -> bytearray:
  122. """
  123. Make request to nbus node and receive response.
  124. :param module: module address
  125. :param sensor: sensor address
  126. :param command: command id
  127. :param data: command data to send
  128. :param long_answer: timeout for long answer
  129. :return: response length | response
  130. """
  131. request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
  132. self._log('d', sensor, "\tRQ>", list(request)) # log request
  133. counter = 0 # err. trials
  134. while True: # try to communicate
  135. self._port.write(request)
  136. if long_answer > 0: # wait for long answer
  137. time.sleep(long_answer)
  138. try:
  139. return self._receive_payload()
  140. except NBusErrorNetwork as Ex: # if network error, try to reconnect
  141. counter += 1
  142. if counter > self._request_attempts:
  143. raise Ex # if out of trials, propagate exception
  144. def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
  145. """
  146. Create packet to send and prepare port.
  147. :param start: head of message
  148. :param data: body of message
  149. :return: request packet
  150. """
  151. if len(data) > 0:
  152. for c in data:
  153. start.append(c)
  154. if not self._port.is_open:
  155. self.open()
  156. self._port.flush()
  157. start[0] = len(start)
  158. crc_sum = crc8(start[1:])
  159. start.append(crc_sum)
  160. return start
  161. def _receive_payload(self) -> bytearray:
  162. """
  163. Read response from serial.
  164. :return: | payload len | payload
  165. """
  166. # read data length
  167. response_l = self._port.read(1)
  168. if (response_l is None) or (len(response_l) == 0): # check if message is empty
  169. raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
  170. response_l = ord(response_l) # convert response length to number
  171. # read response body
  172. response = self._port.read(response_l)
  173. if (response is None) or (len(response) != response_l): # check for message completeness
  174. raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
  175. # check for crc
  176. if response[-1] != crc8(response[:-1]):
  177. raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
  178. self._log('d', 0, "\tRS>", [response_l] + list(response)) # log response
  179. # check for node error
  180. if response[2] & NBUS_ERR_BIT:
  181. raise NBusErrorNode(NBusErrorNodeType(response[3]))
  182. return bytearray([response_l - 4]) + bytearray(response[3:-1]) # return payload length + payload
  183. def _log(self, *message: Any) -> None:
  184. """
  185. Log request/response.
  186. :param message: message to log
  187. """
  188. if self._enable_log:
  189. self._logger_cb(*message)