serial_port.py 9.1 KB


  1. import time
  2. import serial
  3. from typing import Any, Callable
  4. from nbus_types.nbus_defines import *
  5. from beartype import beartype
  6. from nbus_types.nbus_command_type import *
  7. from nbus_types.nbus_exceptions.nbus_network_exception import *
  8. from nbus_types.nbus_exceptions.nbus_node_exception import *
  9. from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
  10. from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
  11. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  12. from nbus_hal.crc8 import crc8
  13. def default_logger(*message: Any) -> None:
  14. """
  15. Default logger function.
  16. :param message: message to log
  17. :return: None
  18. """
  19. for i in range(len(message)):
  20. if isinstance(message[i], list):
  21. for m in message[i]:
  22. print(hex(m), end="|")
  23. continue
  24. print(message[i], end=" ")
  25. print()
  26. @beartype
  27. class NBusSerialPort(NBusPort):
  28. """
  29. Class representing nBus serial port.
  30. """
  31. def __init__(self, config: NBusSerialConfig):
  32. """
  33. Constructor.
  34. :param config: configuration
  35. """
  36. self._port = serial.Serial(timeout=config.timeout)
  37. self._port.port = config.port_name
  38. self._port.parity = config.parity.value
  39. self._port.baudrate = config.baud.value
  40. self._logger_cb = default_logger
  41. self._enable_log = config.enable_log
  42. self._request_attempts = config.request_attempts
  43. """
  44. ================================================================================================================
  45. API Methods
  46. ================================================================================================================
  47. """
  48. def get_port(self):
  49. return self._port
  50. def change_configuration(self, config: NBusSerialConfig):
  51. """
  52. Change port configuration.
  53. :param config: configuration
  54. """
  55. self._port.timeout = config.timeout
  56. self._port.port = config.port_name
  57. self._port.parity = config.parity.value
  58. self._port.baudrate = config.baud.value
  59. self._enable_log = config.enable_log
  60. self._request_attempts = config.request_attempts
  61. def open(self) -> None:
  62. """
  63. Open port.
  64. """
  65. self._port.open()
  66. self._port.flush()
  67. self._log('i', 0, 'Open communication port')
  68. def flush(self, inter_delay) -> None:
  69. while self._port.in_waiting:
  70. self._port.reset_input_buffer()
  71. time.sleep(inter_delay)
  72. def try_read(self):
  73. if self._port.in_waiting > 0:
  74. return self._port.read()
  75. else:
  76. return bytearray()
  77. def close(self) -> None:
  78. """
  79. Close port.
  80. """
  81. self._log('i', 0, 'Close communication port')
  82. self._port.close()
  83. def is_connected(self) -> bool:
  84. """
  85. Return connection status.
  86. :return: status (1 = connected, 0 = not connected)
  87. """
  88. return self._port.is_open
  89. def set_logger(self, callback: Callable[[Any], None]) -> None:
  90. """
  91. Set logger function.
  92. :param callback: logging callback
  93. """
  94. self._logger_cb = callback
  95. def request_bridge(self, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0):
  96. """
  97. Make broadcast request to the nbus bridge module.
  98. :param command: command id
  99. :param data: command data to send
  100. :param long_answer: delay in s for longer answer
  101. """
  102. return self._request_response(NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value, data, long_answer)
  103. def send_bridge(self, command: NBusCommand, data: bytearray):
  104. """
  105. Make broadcast request to the nbus bridge module.
  106. :param command: command id
  107. :param data: command data to send
  108. :param long_answer: delay in s for longer answer
  109. """
  110. request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value]), data)
  111. self._log("\tBRQ>", list(request))
  112. self._port.write(request) # send message
  113. def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
  114. """
  115. Make broadcast request to nbus network.
  116. :param command: command id
  117. :param data: command data to send
  118. """
  119. request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value]), data)
  120. self._log("\tBRQ>", list(request))
  121. self._port.write(request) # send message
  122. def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
  123. long_answer: NBusDelay = 0.0) -> bytearray:
  124. """
  125. Make module request to nbus network.
  126. :param module_addr: address of module
  127. :param command: command id
  128. :param data: command data to send
  129. :param long_answer: delay in s for longer answer
  130. :return: | payload length | payload |
  131. """
  132. return self._request_response(module_addr, 0, command.value, data, long_answer)
  133. def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress,
  134. command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
  135. """
  136. Make device request to nbus network.
  137. :param module_addr: address of module
  138. :param sensor_address: address of sensor
  139. :param command: command id
  140. :param data: command data to send
  141. :param long_answer: delay in s for longer answer
  142. :return: | payload length | payload |
  143. """
  144. return self._request_response(module_addr, sensor_address, command.value, data, long_answer)
  145. """
  146. ================================================================================================================
  147. Internal Methods
  148. ================================================================================================================
  149. """
  150. def _request_response(self, module: int, sensor: int, command, data: bytearray,
  151. long_answer: float = 0) -> bytearray:
  152. """
  153. Make request to nbus node and receive response.
  154. :param module: module address
  155. :param sensor: sensor address
  156. :param command: command id
  157. :param data: command data to send
  158. :param long_answer: timeout for long answer
  159. :return: response length | response
  160. """
  161. request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
  162. self._log('d', sensor, "\tRQ>", list(request)) # log request
  163. counter = 0 # err. trials
  164. while True: # try to communicate
  165. self._port.write(request)
  166. if long_answer > 0: # wait for long answer
  167. time.sleep(long_answer)
  168. try:
  169. return self._receive_payload()
  170. except NBusErrorNetwork as Ex: # if network error, try to reconnect
  171. counter += 1
  172. if counter > self._request_attempts:
  173. raise Ex # if out of trials, propagate exception
  174. def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
  175. """
  176. Create packet to send and prepare port.
  177. :param start: head of message
  178. :param data: body of message
  179. :return: request packet
  180. """
  181. if len(data) > 0:
  182. for c in data:
  183. start.append(c)
  184. if not self._port.is_open:
  185. self.open()
  186. self._port.flush()
  187. start[0] = len(start)
  188. crc_sum = crc8(start[1:])
  189. start.append(crc_sum)
  190. return start
  191. def _receive_payload(self) -> bytearray:
  192. """
  193. Read response from serial.
  194. :return: | payload len | payload
  195. """
  196. # read data length
  197. response_l = self._port.read(1)
  198. if (response_l is None) or (len(response_l) == 0): # check if message is empty
  199. raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
  200. response_l = ord(response_l) # convert response length to number
  201. # read response body
  202. response = self._port.read(response_l)
  203. if (response is None) or (len(response) != response_l): # check for message completeness
  204. raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
  205. # check for crc
  206. if response[NBUS_CRC_ADDR] != crc8(response[:NBUS_CRC_ADDR]):
  207. raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
  208. self._log('d', 0, "\tRS>", [response_l] + list(response)) # log response
  209. # check for node error
  210. if response[NBUS_FC_ADDR] & NBUS_ERR_BIT:
  211. raise NBusErrorNode(NBusErrorNodeType(response[NBUS_DATA0_ADDR]))
  212. # return payload length + payload
  213. return bytearray([response_l - NBUS_RX_META]) + bytearray(response[NBUS_DATA0_ADDR:NBUS_CRC_ADDR])
  214. def _log(self, *message: Any) -> None:
  215. """
  216. Log request/response.
  217. :param message: message to log
  218. """
  219. if self._enable_log:
  220. self._logger_cb(*message)