serial_port.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. import time
  2. import serial
  3. from typing import Any, Callable, Optional
  4. from beartype import beartype
  5. from nbus_types.nbus_command_type import *
  6. from nbus_types.nbus_exceptions.nbus_network_exception import *
  7. from nbus_types.nbus_exceptions.nbus_node_exception import *
  8. from nbus_hal.nbus_serial.serial_config import NBusSerialConfig
  9. from nbus_hal.nbus_generic_port import NBusPort, NBusDelay
  10. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  11. from nbus_hal.crc8 import crc8
  12. from nbus_types.nbus_defines import *
  13. def default_logger(*message: Any) -> None:
  14. """
  15. Default logger function.
  16. :param message: message to log
  17. :return: None
  18. """
  19. for i in range(len(message)):
  20. if isinstance(message[i], list):
  21. for m in message[i]:
  22. print(hex(m), end="|")
  23. continue
  24. print(message[i], end=" ")
  25. print()
  26. @beartype
  27. class NBusSerialPort(NBusPort):
  28. """
  29. Class representing nBus serial port.
  30. """
  31. def __init__(self, config: NBusSerialConfig):
  32. """
  33. Constructor.
  34. :param config: configuration
  35. """
  36. self._flush_delay = config.flush_delay
  37. self._port = serial.Serial(timeout=config.timeout)
  38. self._port.port = config.port_name
  39. self._port.parity = config.parity.value
  40. self._port.baudrate = config.baud.value
  41. self._logger_cb = default_logger
  42. self._enable_log = config.enable_log
  43. self._request_attempts = config.request_attempts
  44. """
  45. ================================================================================================================
  46. API Methods
  47. ================================================================================================================
  48. """
  49. def change_configuration(self, config: NBusSerialConfig):
  50. """
  51. Change port configuration.
  52. :param config: configuration
  53. """
  54. self._port.timeout = config.timeout
  55. self._flush_delay = config.flush_delay
  56. self._port.port = config.port_name
  57. self._port.parity = config.parity.value
  58. self._port.baudrate = config.baud.value
  59. self._enable_log = config.enable_log
  60. self._request_attempts = config.request_attempts
  61. def open(self) -> None:
  62. """
  63. Open port.
  64. """
  65. self._port.open()
  66. self._port.flush()
  67. self._log("INFO", " 0", "\tOpen communication port")
  68. def close(self) -> None:
  69. """
  70. Close port.
  71. """
  72. self._log("INFO", " 0", "\tClose communication port")
  73. self._port.close()
  74. def flush(self) -> None:
  75. """
  76. Flush port with periodic check.
  77. """
  78. while self._port.in_waiting:
  79. self._log("INFO", " 0", "\tFlush communication port")
  80. self._port.reset_input_buffer()
  81. self._port.reset_output_buffer()
  82. time.sleep(self._flush_delay)
  83. def try_read(self) -> bytes:
  84. """
  85. Try reading from port.
  86. :return: bytes
  87. """
  88. if self._port.in_waiting > 0:
  89. data = self._port.read()
  90. self._log("DATA", f"{len(data):3}", "\tARS>", list(data))
  91. return data
  92. else:
  93. return bytes()
  94. def is_connected(self) -> bool:
  95. """
  96. Return connection status.
  97. :return: status (1 = connected, 0 = not connected)
  98. """
  99. return self._port.is_open
  100. def set_logger(self, callback: Callable[[Any], None]) -> None:
  101. """
  102. Set logger function.
  103. :param callback: logging callback
  104. """
  105. self._logger_cb = callback
  106. def request_bridge(self, command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0):
  107. """
  108. Make bridge request.
  109. :param command: command id
  110. :param data: command data to send
  111. :param long_answer: delay in s for longer answer
  112. """
  113. return self._request_response(NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value, data, long_answer)
  114. def send_bridge(self, command: NBusCommand, data: bytearray):
  115. """
  116. Make bridge request without waiting for response.
  117. :param command: command id
  118. :param data: command data to send
  119. :param long_answer: delay in s for longer answer
  120. """
  121. request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BRIDGE_ADDR, command.value]), data)
  122. self._log("DATA", f"{request[0]:3}", "\tARQ>", list(request[1:]))
  123. self._port.write(request) # send message
  124. def request_broadcast(self, command: NBusCommand, data: bytearray) -> None:
  125. """
  126. Make broadcast request to nbus network.
  127. :param command: command id
  128. :param data: command data to send
  129. """
  130. request = self._create_packet(bytearray([0, NBUS_BROADCAST_ADDR, NBUS_BROADCAST_ADDR, command.value]), data)
  131. self._log("DATA", f"{request[0]:3}", "\tARQ>", list(request[1:]))
  132. self._port.write(request) # send message
  133. def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
  134. long_answer: NBusDelay = 0.0) -> bytearray:
  135. """
  136. Make module request to nbus network.
  137. :param module_addr: address of module
  138. :param command: command id
  139. :param data: command data to send
  140. :param long_answer: delay in s for longer answer
  141. :return: | payload length | payload |
  142. """
  143. return self._request_response(module_addr, 0, command.value, data, long_answer)
  144. def request_sensor(self, module_addr: NBusModuleAddress, sensor_address: NBusSensorAddress,
  145. command: NBusCommand, data: bytearray, long_answer: NBusDelay = 0.0) -> bytearray:
  146. """
  147. Make device request to nbus network.
  148. :param module_addr: address of module
  149. :param sensor_address: address of sensor
  150. :param command: command id
  151. :param data: command data to send
  152. :param long_answer: delay in s for longer answer
  153. :return: | payload length | payload |
  154. """
  155. return self._request_response(module_addr, sensor_address, command.value, data, long_answer)
  156. """
  157. ================================================================================================================
  158. Internal Methods
  159. ================================================================================================================
  160. """
  161. def _request_response(self, module: int, sensor: int, command, data: bytearray,
  162. long_answer: float = 0) -> bytearray:
  163. """
  164. Make request to nbus node and receive response.
  165. :param module: module address
  166. :param sensor: sensor address
  167. :param command: command id
  168. :param data: command data to send
  169. :param long_answer: timeout for long answer
  170. :return: response length | response
  171. """
  172. request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
  173. self._log("DATA", f"{request[0]:3}", "\tRQ>", list(request[1:])) # log request
  174. counter = 0 # err. trials
  175. while True: # try to communicate
  176. self._port.write(request)
  177. if long_answer > 0: # wait for long answer
  178. time.sleep(long_answer)
  179. try:
  180. return self._receive_payload()
  181. except NBusErrorNetwork as Ex: # if network error, try to reconnect
  182. counter += 1
  183. if counter > self._request_attempts:
  184. raise Ex # if out of trials, propagate exception
  185. def _create_packet(self, start: bytearray, data: bytearray) -> bytearray:
  186. """
  187. Create packet to send and prepare port.
  188. :param start: head of message
  189. :param data: body of message
  190. :return: request packet
  191. """
  192. if len(data) > 0:
  193. for c in data:
  194. start.append(c)
  195. if not self._port.is_open:
  196. self.open()
  197. self._port.flush()
  198. start[0] = len(start)
  199. crc_sum = crc8(start[1:])
  200. start.append(crc_sum)
  201. return start
  202. def _receive_payload(self) -> bytearray:
  203. """
  204. Read response from serial.
  205. :return: | payload len | payload
  206. """
  207. # read data length
  208. response_l = self._port.read(1)
  209. if (response_l is None) or (len(response_l) == 0): # check if message is empty
  210. raise NBusErrorNetwork(NBusErrorNetworkType.EMPTY_MESSAGE)
  211. response_l = ord(response_l) # convert response length to number
  212. # read response body
  213. response = self._port.read(response_l)
  214. if (response is None) or (len(response) != response_l): # check for message completeness
  215. raise NBusErrorNetwork(NBusErrorNetworkType.MESSAGE_NOT_COMPLETE)
  216. # check for crc
  217. if response[NBUS_CRC_ADDR] != crc8(response[:NBUS_CRC_ADDR]):
  218. raise NBusErrorNetwork(NBusErrorNetworkType.DAMAGED_MESSAGE)
  219. self._log("DATA", f"{response_l:3}", "\tRS>", list(response)) # log response
  220. # check for node error
  221. if response[NBUS_FC_ADDR] & NBUS_ERR_BIT:
  222. raise NBusErrorNode(NBusErrorNodeType(response[NBUS_DATA0_ADDR]))
  223. # return payload length + payload
  224. return bytearray([response_l - NBUS_RX_META_SIZE]) + bytearray(response[NBUS_DATA0_ADDR:NBUS_CRC_ADDR])
  225. def _log(self, *message: Any) -> None:
  226. """
  227. Log request/response.
  228. :param message: message to log
  229. """
  230. if self._enable_log:
  231. self._logger_cb(*message)