nbus_bridge.py 14 KB


  1. import struct
  2. import time
  3. from typing import Optional
  4. from threading import Thread, Lock
  5. from beartype import beartype
  6. import pandas as pd
  7. from nbus_api.nbus_common_parser import NbusCommonParser
  8. from nbus_api.nbus_module_slave import NBusSlaveModule
  9. from nbus_api.nbus_sensor import NBusSensor
  10. from nbus_hal.crc8 import crc8
  11. from nbus_hal.nbus_generic_port import NBusPort
  12. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  13. from nbus_types.nbus_command_type import NBusCommand
  14. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  15. from nbus_types.nbus_defines import *
  16. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  17. from nbus_types.nbus_info_type import NBusBridgeInfo
  18. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  19. from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
  20. from nbus_types.nbus_status_type import NBusStatusType
  21. @beartype
  22. class NBusBridge:
  23. def __init__(self, serial_port: NBusPort, acquire_delay: float):
  24. """
  25. Constructor.
  26. :param serial_port: serial port
  27. :param acquire_delay: intermediate delay between data fetching when data not ready
  28. :param flush_delay: delay between stream flush
  29. """
  30. self.__port = serial_port # serial port reference
  31. self.__slaves_meta = {} # list of slaves meta information
  32. self.__acquire_thread = None # thread for data acquisition
  33. self.__data_raw = bytearray() # raw data buffer
  34. self._lock = Lock() # thread lock
  35. self.__in_acquisition = False # flag when in acquisition
  36. self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
  37. self.__df = pd.DataFrame() # internal data frame
  38. self.__ts0 = None # 0-th timestamp
  39. """
  40. ================================================================================================================
  41. Module General Methods
  42. ================================================================================================================
  43. """
  44. def init(self) -> None:
  45. """
  46. Initialize the bridge module from network.
  47. """
  48. try:
  49. self.cmd_get_slaves()
  50. self.cmd_get_format()
  51. except Exception:
  52. self.panic()
  53. def panic(self) -> None:
  54. """
  55. Reset communication when fatal error occurs.
  56. """
  57. self.stop_streaming()
  58. self.cmd_set_reset()
  59. self.__data_raw = bytearray()
  60. self.__df = pd.DataFrame()
  61. self.__slaves_meta = {}
  62. self.cmd_get_slaves()
  63. self.cmd_get_format()
  64. def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
  65. """
  66. Get connected slave modules.
  67. :return: dictionary of connected slaves
  68. """
  69. slaves = {}
  70. for key, value in self.__slaves_meta.items():
  71. slaves[key] = value.module_obj
  72. return slaves
  73. def start_streaming(self) -> None:
  74. """
  75. Start data streaming (e.g. bridge-cast).
  76. """
  77. self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
  78. self.__acquire_thread = Thread(target=self.__acquire_callback)
  79. self.__in_acquisition = True
  80. # end thread if running
  81. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  82. self.__acquire_thread.join()
  83. self.__acquire_thread.start()
  84. def stop_streaming(self):
  85. """
  86. Stop data streaming (e.g. bridge-cast).
  87. """
  88. self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
  89. self.__in_acquisition = False
  90. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  91. self.__acquire_thread.join()
  92. self.__port.flush()
  93. def fetch_stream_chunk(self) -> pd.DataFrame:
  94. """
  95. Fetch data from stream (e.g. bridge-cast).
  96. Can be called anytime.
  97. It not erase internal dataframe.
  98. :return: stream data frame
  99. """
  100. with self._lock:
  101. packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
  102. packet_cnt = len(packets) - self.__in_acquisition
  103. parsed_packets = []
  104. # parse packets
  105. for i in range(packet_cnt):
  106. data = self._parse_bridge_data_from_packet(packets[i], True)
  107. if data is not None:
  108. parsed_packets.append(data)
  109. # extend internal dataframe
  110. if parsed_packets:
  111. data_frame = pd.DataFrame(parsed_packets)
  112. self.__df = pd.concat([self.__df, data_frame], ignore_index=True).copy(deep=True)
  113. else:
  114. data_frame = pd.DataFrame()
  115. # erase raw data buffer
  116. if self.__in_acquisition:
  117. unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
  118. self.__data_raw = self.__data_raw[-unparsed_bytes:]
  119. else:
  120. self.__data_raw = bytearray()
  121. self._transform_timestamp(data_frame)
  122. return data_frame
  123. def fetch_full_stream(self) -> pd.DataFrame:
  124. """
  125. Fetch all data from stream (e.g. bridge-cast).
  126. Must be called after stop_streaming() method.
  127. It will erase internal dataframe.
  128. :return: stream dataframe
  129. """
  130. if self.__in_acquisition:
  131. return pd.DataFrame()
  132. df = self.__df
  133. self._transform_timestamp(df)
  134. self.__df = pd.DataFrame()
  135. self.__ts0 = None
  136. return df
  137. """
  138. ================================================================================================================
  139. Bridge Get Commands
  140. ================================================================================================================
  141. """
  142. def cmd_get_data(self) -> pd.DataFrame:
  143. """
  144. Get data from all slave devices.
  145. :return: dataframe of the network data
  146. """
  147. response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
  148. return pd.DataFrame([self._parse_bridge_data_from_packet(response[1:], False)])
  149. def cmd_get_info(self) -> NBusBridgeInfo:
  150. """
  151. Get data from all slave devices.
  152. :return: bridge info structure
  153. """
  154. response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
  155. fw = str(response[1:4], "ascii")
  156. hw_family = str(response[4:7], "ascii")
  157. hw = str(response[7:10], "ascii")
  158. return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
  159. def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  160. """
  161. Get format of all connected devices.
  162. :return: dict of all device formats
  163. """
  164. if not self.__slaves_meta:
  165. raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
  166. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
  167. data_offset = 0
  168. fmt = {}
  169. while data_offset < resp_length:
  170. slave_addr = response[data_offset]
  171. data_offset += NBUS_MA_SIZE
  172. slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
  173. format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
  174. fmt |= self._set_slave_format_from_response(slave_addr, format_len,
  175. response[data_offset: data_offset + format_len])
  176. data_offset += format_len
  177. self._update_slaves_packet_size()
  178. return fmt
  179. def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
  180. """
  181. Get base information about all connected slaves and update internal fields.
  182. :return: dict of device count of every connected module
  183. """
  184. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
  185. slaves = {}
  186. data_offset = 0
  187. while data_offset < resp_length - 2:
  188. slave_addr = response[data_offset]
  189. slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
  190. slaves[slave_addr] = slave_sensor_cnt
  191. # update internal memory
  192. if slave_addr not in self.__slaves_meta:
  193. self.__slaves_meta[slave_addr] = NBusSlaveMeta(
  194. NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
  195. data_offset += 3
  196. return slaves
  197. """
  198. ================================================================================================================
  199. Bridge Get Commands
  200. ================================================================================================================
  201. """
  202. def cmd_set_reset(self) -> NBusStatusType:
  203. """
  204. Send reset command to bridge module.
  205. :return: status of success
  206. """
  207. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
  208. return NBusStatusType(response[0])
  209. """
  210. ================================================================================================================
  211. Internal Methods
  212. ================================================================================================================
  213. """
  214. def __acquire_callback(self) -> None:
  215. """
  216. Thread worker for data acquisition.
  217. """
  218. while self.__in_acquisition:
  219. data = self.__port.try_read()
  220. if not data:
  221. time.sleep(self.__acquire_delay)
  222. continue
  223. with self._lock:
  224. self.__data_raw.extend(data)
  225. def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
  226. -> Optional[dict[str, NBusDataValue]]:
  227. """
  228. Parse the bridge data packet.
  229. :param data_packet: packet to parse
  230. :param check_packet: if crc check is required (only in bridge-cast)
  231. :return: dictionary of data values or None
  232. """
  233. packet_size = len(data_packet)
  234. packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
  235. # check validity
  236. if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
  237. return None
  238. # parse data
  239. try:
  240. data_offset = 0
  241. ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
  242. data = {"TS": ts}
  243. data_offset += NBUS_TS_SIZE
  244. while data_offset < packet_size - NBUS_CRC_SIZE:
  245. module_addr = data_packet[data_offset]
  246. data_offset += NBUS_MA_SIZE
  247. slave_size = self.__slaves_meta[module_addr].packet_size
  248. packet = data_packet[data_offset : data_offset + slave_size]
  249. data |= self._parse_slave_data_from_response(module_addr, slave_size, packet)
  250. data_offset += slave_size
  251. return data
  252. except Exception:
  253. return None
  254. def _update_slaves_packet_size(self) -> None:
  255. """
  256. Update slaves packet size from loaded format.
  257. """
  258. for slave_meta in self.__slaves_meta.values():
  259. packet_size = 0
  260. for device in slave_meta.module_obj.get_devices().values():
  261. fmt = device.data_format
  262. packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
  263. slave_meta.packet_size = packet_size
  264. def _transform_timestamp(self, data_frame: pd.DataFrame) -> None:
  265. """
  266. Transform timestamp values in dataframe.
  267. :param data_frame: dataframe to transform
  268. """
  269. if not data_frame.empty and "TS" in self.__df.columns:
  270. if self.__ts0 is None:
  271. self.__ts0 = self.__df["TS"].iloc[0]
  272. data_frame["TS"] -= self.__ts0
  273. def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
  274. -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  275. """
  276. Parse and set format for all slaves from response.
  277. :param slave_addr: address of slave module
  278. :param resp_length: length of response
  279. :param response: raw data
  280. :return: parsed formats
  281. """
  282. data_offset = 0
  283. formats = {}
  284. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  285. # parse format
  286. while data_offset < resp_length:
  287. device_id = response[data_offset]
  288. device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
  289. if device_id not in devices:
  290. devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
  291. devices[device_id].data_format = device_format
  292. formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
  293. data_offset += NBUS_FMT_SIZE
  294. return formats
  295. def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
  296. -> dict[str, NBusDataValue]:
  297. """
  298. Parse data of single slave from response.
  299. :param slave_addr: address of selected slave
  300. :param resp_length: length of response
  301. :param response: raw data
  302. :return: dict of data values
  303. """
  304. data_offset = 0
  305. data = {}
  306. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  307. while data_offset < resp_length:
  308. device_id = response[data_offset]
  309. if devices[device_id].data_format is None:
  310. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  311. values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
  312. data_tag = str(slave_addr) + "." + str(device_id)
  313. for i in range(len(values)):
  314. data[data_tag + "." + str(i + 1)] = values[i]
  315. data_offset += offset + NBUS_SA_SIZE
  316. return data