nbus_bridge.py 9.0 KB


  1. import struct
  2. from collections import deque
  3. from dataclasses import dataclass
  4. from nbus_api.nbus_common_parser import NbusCommonParser
  5. from nbus_api.nbus_module_slave import NBusSlaveModule
  6. from nbus_api.nbus_sensor import NBusSensor
  7. from nbus_hal.nbus_serial.serial_port import *
  8. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  9. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  10. import time
  11. from threading import Thread, Lock
  12. from collections import namedtuple
  13. import pandas as pd
  14. from nbus_types.nbus_status_type import NBusStatusType
  15. NBUS_RX_META = 4
  16. NBUS_FMT_SIZE = 4
  17. NBUS_TS_SIZE = 4
  18. NBUS_CRC_SIZE = 1
  19. NBUS_MA_SIZE = 1
  20. NBUS_SA_SIZE = 1
  21. NBUS_CRC_ADDR = -1
  22. NBUS_BRIDGE_DATA_HDR = bytearray([0x00] + [0xFF] * 8 + [0x00])
  23. NBUS_BRIDGE_DATA_HDR_SIZE = len(NBUS_BRIDGE_DATA_HDR)
  24. @dataclass
  25. class NBusSlaveMeta:
  26. obj: NBusSlaveModule
  27. cnt: NBusSensorCount
  28. packet_size: int
  29. @beartype
  30. class NBusBridge:
  31. def __init__(self, serial_port: NBusSerialPort, acquire_delay: float, flush_delay: float):
  32. """
  33. Constructor.
  34. :param serial_port: serial port
  35. :param acquire_delay: intermediate delay between data fetching when data not ready
  36. :param flush_delay: delay between stream flush
  37. """
  38. self.__port = serial_port # serial port reference
  39. self.__slaves_meta = {} # list of slaves meta information
  40. self.__acquire_thread = None # thread for data acquisition
  41. self.__data_raw = bytearray() # raw data buffer
  42. self._lock = Lock() # thread lock
  43. self.__in_acquisition = False # flag when in acquisition
  44. self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
  45. self.__flush_delay = flush_delay # delay between stream flush
  46. self.__df = pd.DataFrame() # internal data frame
  47. def init_from_network(self):
  48. try:
  49. self.cmd_get_slaves()
  50. self.cmd_get_format()
  51. except Exception:
  52. self.panic()
  53. def panic(self):
  54. self.stop_data_streaming()
  55. self.cmd_set_reset()
  56. self.cmd_get_slaves()
  57. self.cmd_get_format()
  58. def start_data_streaming(self):
  59. self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
  60. self.__acquire_thread = Thread(target=self.__acquire_callback)
  61. self.__in_acquisition = True
  62. # end thread if running
  63. if self.__acquire_thread.is_alive():
  64. self.__acquire_thread.join()
  65. self.__acquire_thread.start()
  66. def stop_data_streaming(self):
  67. self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
  68. self.__in_acquisition = False
  69. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  70. self.__acquire_thread.join()
  71. self.__port.flush(self.__flush_delay)
  72. def fetch_chunk_from_stream(self):
  73. with self._lock:
  74. packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
  75. packet_cnt = len(packets) - self.__in_acquisition
  76. parsed_packets = []
  77. # parse packets
  78. for i in range(packet_cnt):
  79. data = self._parse_bridge_data(packets[i], True)
  80. if data is not None:
  81. parsed_packets.append(data)
  82. # extend internal dataframe
  83. if parsed_packets:
  84. data_frame = pd.DataFrame(parsed_packets)
  85. self.__df = pd.concat([self.__df, data_frame], ignore_index=True)
  86. else:
  87. data_frame = pd.DataFrame()
  88. # erase raw data buffer
  89. if self.__in_acquisition:
  90. unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
  91. self.__data_raw = self.__data_raw[-unparsed_bytes:]
  92. else:
  93. self.__data_raw = bytearray()
  94. return data_frame
  95. def fetch_all_from_stream(self):
  96. df = self.__df
  97. self.__df = pd.DataFrame()
  98. return df
  99. def cmd_get_slaves(self):
  100. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray([]))
  101. slaves = {}
  102. data_offset = 0
  103. while data_offset < resp_length - 2:
  104. slave_addr = response[data_offset]
  105. slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
  106. slaves[slave_addr] = slave_sensor_cnt
  107. self.__slaves_meta[slave_addr] = NBusSlaveMeta(
  108. NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0
  109. )
  110. data_offset += 3
  111. return slaves
  112. def cmd_set_reset(self):
  113. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
  114. return NBusStatusType(response[0])
  115. def cmd_get_format(self):
  116. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
  117. data_offset = 0
  118. fmt = {}
  119. while data_offset < resp_length:
  120. slave_addr = response[data_offset]
  121. data_offset += NBUS_MA_SIZE
  122. slave_sensor_cnt = self.__slaves_meta[slave_addr].cnt
  123. format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
  124. fmt |= self._set_slave_format_from_response(slave_addr, format_len,
  125. response[data_offset: data_offset + format_len])
  126. data_offset += format_len
  127. self._set_data_packet_size()
  128. return fmt
  129. def cmd_get_data(self):
  130. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
  131. return self._parse_bridge_data(bytearray(response), False)
  132. def _parse_bridge_data(self, data_packet, check_packet):
  133. packet_size = len(data_packet)
  134. packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
  135. if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
  136. return None
  137. try:
  138. data_offset = 0
  139. ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
  140. data = {"TS": ts}
  141. data_offset += NBUS_TS_SIZE
  142. while data_offset < packet_size - NBUS_CRC_SIZE:
  143. module_addr = data_packet[data_offset]
  144. data_offset += NBUS_MA_SIZE
  145. packet_size = self.__slaves_meta[module_addr].packet_size
  146. packet = data_packet[data_offset : data_offset + packet_size]
  147. data |= self._get_data_from_response(module_addr, packet_size, packet)
  148. data_offset += packet_size
  149. return data
  150. except Exception as ex:
  151. return None
  152. def _set_data_packet_size(self):
  153. self.__packet_size = NBUS_TS_SIZE + NBUS_CRC_SIZE
  154. for slave_meta in self.__slaves_meta.values():
  155. packet_size = 0
  156. for device in slave_meta.obj.get_devices().values():
  157. fmt = device.data_format
  158. packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
  159. slave_meta.packet_size = packet_size
  160. self.__packet_size += packet_size + NBUS_MA_SIZE
  161. def _set_slave_format_from_response(self, slave_addr, resp_length, response):
  162. data_offset = 0
  163. formats = {}
  164. devices = self.__slaves_meta[slave_addr].obj.get_devices()
  165. # parse format
  166. while data_offset < resp_length:
  167. device_id = response[data_offset]
  168. device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
  169. if device_id not in devices:
  170. devices[device_id] = NBusSensor(device_id)
  171. devices[device_id].data_format = device_format
  172. tag = str(slave_addr) + "." + str(device_id)
  173. formats[tag] = device_format
  174. data_offset += NBUS_FMT_SIZE
  175. return formats
  176. def _get_data_from_response(self, slave_addr, resp_length, response):
  177. data_offset = 0
  178. data = {}
  179. devices = self.__slaves_meta[slave_addr].obj.get_devices()
  180. while data_offset < resp_length:
  181. device_id = response[data_offset]
  182. if devices[device_id].data_format is None:
  183. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  184. values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
  185. data_tag = str(slave_addr) + "." + str(device_id)
  186. for i in range(len(values)):
  187. data[data_tag + "." + str(i + 1)] = values[i]
  188. data_offset += offset + NBUS_SA_SIZE
  189. return data
  190. def __acquire_callback(self):
  191. while self.__in_acquisition:
  192. data = self.__port.try_read()
  193. if not data:
  194. time.sleep(self.__acquire_delay)
  195. continue
  196. with self._lock:
  197. self.__data_raw.extend(data)