nbus_bridge.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. import struct
  2. import time
  3. from typing import Optional
  4. from threading import Thread, Lock
  5. from beartype import beartype
  6. import pandas as pd
  7. from nbus_api.nbus_common_parser import NbusCommonParser
  8. from nbus_api.nbus_module_slave import NBusSlaveModule
  9. from nbus_api.nbus_sensor import NBusSensor
  10. from nbus_hal.crc8 import crc8
  11. from nbus_hal.nbus_generic_port import NBusPort
  12. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  13. from nbus_types.nbus_command_type import NBusCommand
  14. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  15. from nbus_types.nbus_defines import *
  16. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  17. from nbus_types.nbus_info_type import NBusBridgeInfo
  18. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  19. from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
  20. from nbus_types.nbus_status_type import NBusStatusType
  21. @beartype
  22. class NBusBridge:
  23. def __init__(self, serial_port: NBusPort, acquire_delay: float):
  24. """
  25. Constructor.
  26. :param serial_port: serial port
  27. :param acquire_delay: intermediate delay between data fetching when data not ready
  28. :param flush_delay: delay between stream flush
  29. """
  30. self.__port = serial_port # serial port reference
  31. self.__slaves_meta = {} # list of slaves meta information
  32. self.__acquire_thread = None # thread for data acquisition
  33. self.__data_raw = bytearray() # raw data buffer
  34. self._lock = Lock() # thread lock
  35. self.__in_acquisition = False # flag when in acquisition
  36. self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
  37. self.__df = pd.DataFrame() # internal data frame
  38. self.__ts0 = None # 0-th timestamp
  39. """
  40. ================================================================================================================
  41. Module General Methods
  42. ================================================================================================================
  43. """
  44. def init(self) -> None:
  45. """
  46. Initialize the bridge module from network.
  47. """
  48. try:
  49. self.cmd_get_slaves()
  50. self.cmd_get_format()
  51. except Exception:
  52. self.panic()
  53. def panic(self) -> None:
  54. """
  55. Reset communication when fatal error occurs.
  56. """
  57. self.stop_streaming()
  58. self.cmd_set_reset()
  59. self.__data_raw = bytearray()
  60. self.__df = pd.DataFrame()
  61. self.__slaves_meta = {}
  62. self.cmd_get_slaves()
  63. self.cmd_get_format()
  64. def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
  65. """
  66. Get connected slave modules.
  67. :return: dictionary of connected slaves
  68. """
  69. slaves = {}
  70. for key, value in self.__slaves_meta.items():
  71. slaves[key] = value.module_obj
  72. return slaves
  73. def start_streaming(self) -> None:
  74. """
  75. Start data streaming (e.g. bridge-cast).
  76. """
  77. self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
  78. self.__acquire_thread = Thread(target=self.__acquire_callback)
  79. self.__in_acquisition = True
  80. # end thread if running
  81. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  82. self.__acquire_thread.join()
  83. self.__acquire_thread.start()
  84. def stop_streaming(self):
  85. """
  86. Stop data streaming (e.g. bridge-cast).
  87. """
  88. self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
  89. self.__in_acquisition = False
  90. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  91. self.__acquire_thread.join()
  92. self.__port.flush()
  93. def fetch_stream_chunk(self) -> pd.DataFrame:
  94. """
  95. Fetch data from stream (e.g. bridge-cast).
  96. Can be called anytime.
  97. It not erase internal dataframe.
  98. :return: stream data frame
  99. """
  100. with self._lock:
  101. packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
  102. packet_cnt = len(packets) - self.__in_acquisition
  103. parsed_packets = []
  104. # parse packets
  105. for i in range(packet_cnt):
  106. data = self._parse_bridge_data_from_packet(packets[i], True)
  107. if data is not None:
  108. parsed_packets.append(data)
  109. # extend internal dataframe
  110. if parsed_packets:
  111. data_frame = pd.DataFrame(parsed_packets)
  112. self.__df = pd.concat([self.__df, data_frame], ignore_index=True).copy(deep=True)
  113. else:
  114. data_frame = pd.DataFrame()
  115. # erase raw data buffer
  116. if self.__in_acquisition:
  117. unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
  118. self.__data_raw = self.__data_raw[-unparsed_bytes:]
  119. else:
  120. self.__data_raw = bytearray()
  121. self._transform_timestamp(data_frame)
  122. return data_frame
  123. def fetch_full_stream(self) -> pd.DataFrame:
  124. """
  125. Fetch all data from stream (e.g. bridge-cast).
  126. Must be called after stop_streaming() method.
  127. It will erase internal dataframe.
  128. :return: stream dataframe
  129. """
  130. if self.__in_acquisition:
  131. return pd.DataFrame()
  132. df = self.__df
  133. self._transform_timestamp(df)
  134. self.__df = pd.DataFrame()
  135. self.__ts0 = None
  136. return df
  137. """
  138. ================================================================================================================
  139. Bridge Get Commands
  140. ================================================================================================================
  141. """
  142. def cmd_get_data(self) -> pd.DataFrame:
  143. """
  144. Get data from all slave devices.
  145. :return: dataframe of the network data
  146. """
  147. response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
  148. return pd.DataFrame([self._parse_bridge_data_from_packet(response[1:], False)])
  149. def cmd_get_info(self) -> NBusBridgeInfo:
  150. """
  151. Get data from all slave devices.
  152. :return: bridge info structure
  153. """
  154. response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
  155. fw = str(response[1:4], "ascii")
  156. hw_family = str(response[4:7], "ascii")
  157. hw = str(response[7:10], "ascii")
  158. return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
  159. def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  160. """
  161. Get format of all connected devices.
  162. :return: dict of all device formats
  163. """
  164. if not self.__slaves_meta:
  165. raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
  166. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
  167. data_offset = 0
  168. fmt = {}
  169. while data_offset < resp_length:
  170. slave_addr = response[data_offset]
  171. data_offset += NBUS_MA_SIZE
  172. slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
  173. format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
  174. fmt |= self._set_slave_format_from_response(slave_addr, format_len,
  175. response[data_offset: data_offset + format_len])
  176. data_offset += format_len
  177. self._update_slaves_packet_size()
  178. return fmt
  179. def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
  180. """
  181. Get base information about all connected slaves and update internal fields.
  182. :return: dict of device count of every connected module
  183. """
  184. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
  185. slaves = {}
  186. data_offset = 0
  187. while data_offset < resp_length - 2:
  188. slave_addr = response[data_offset]
  189. slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
  190. slaves[slave_addr] = slave_sensor_cnt
  191. # update internal memory
  192. if slave_addr not in self.__slaves_meta:
  193. self.__slaves_meta[slave_addr] = NBusSlaveMeta(
  194. NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
  195. data_offset += 3
  196. return slaves
  197. """
  198. ================================================================================================================
  199. Bridge Get Commands
  200. ================================================================================================================
  201. """
  202. def cmd_set_reset(self) -> NBusStatusType:
  203. """
  204. Send reset command to bridge module.
  205. :return: status of success
  206. """
  207. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
  208. return NBusStatusType(response[0])
  209. """
  210. ================================================================================================================
  211. Internal Methods
  212. ================================================================================================================
  213. """
  214. def __acquire_callback(self) -> None:
  215. """
  216. Thread worker for data acquisition.
  217. """
  218. while self.__in_acquisition:
  219. data = self.__port.try_read()
  220. if not data:
  221. time.sleep(self.__acquire_delay)
  222. continue
  223. with self._lock:
  224. self.__data_raw.extend(data)
  225. def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
  226. -> Optional[dict[str, NBusDataValue]]:
  227. """
  228. Parse the bridge data packet.
  229. :param data_packet: packet to parse
  230. :param check_packet: if crc check is required (only in bridge-cast)
  231. :return: dictionary of data values or None
  232. """
  233. packet_size = len(data_packet)
  234. packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
  235. # check validity
  236. if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
  237. return None
  238. # parse data
  239. try:
  240. data_offset = 0
  241. ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
  242. data = {"TS": ts}
  243. data_offset += NBUS_TS_SIZE
  244. while data_offset < packet_size - NBUS_CRC_SIZE:
  245. module_addr = data_packet[data_offset]
  246. data_offset += NBUS_MA_SIZE
  247. slave_size = self.__slaves_meta[module_addr].packet_size
  248. packet = data_packet[data_offset : data_offset + slave_size]
  249. data |= self._parse_slave_data_from_response(module_addr, slave_size, packet)
  250. data_offset += slave_size
  251. return data
  252. except Exception:
  253. return None
  254. def _update_slaves_packet_size(self) -> None:
  255. """
  256. Update slaves packet size from loaded format.
  257. """
  258. for slave_meta in self.__slaves_meta.values():
  259. packet_size = 0
  260. for device in slave_meta.module_obj.get_devices().values():
  261. fmt = device.data_format
  262. packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
  263. slave_meta.packet_size = packet_size
  264. def _transform_timestamp(self, data_frame: pd.DataFrame) -> None:
  265. """
  266. Transform timestamp values in dataframe.
  267. :param data_frame: dataframe to transform
  268. """
  269. if not data_frame.empty and "TS" in self.__df.columns:
  270. if self.__ts0 is None:
  271. self.__ts0 = self.__df["TS"].iloc[0]
  272. data_frame["TS"] -= self.__ts0
  273. def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
  274. -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  275. """
  276. Parse and set format for all slaves from response.
  277. :param slave_addr: address of slave module
  278. :param resp_length: length of response
  279. :param response: raw data
  280. :return: parsed formats
  281. """
  282. data_offset = 0
  283. formats = {}
  284. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  285. # parse format
  286. while data_offset < resp_length:
  287. device_id = response[data_offset]
  288. device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
  289. if device_id not in devices:
  290. devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
  291. devices[device_id].data_format = device_format
  292. formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
  293. data_offset += NBUS_FMT_SIZE
  294. return formats
  295. def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
  296. -> dict[str, NBusDataValue]:
  297. """
  298. Parse data of single slave from response.
  299. :param slave_addr: address of selected slave
  300. :param resp_length: length of response
  301. :param response: raw data
  302. :return: dict of data values
  303. """
  304. data_offset = 0
  305. data = {}
  306. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  307. while data_offset < resp_length:
  308. device_id = response[data_offset]
  309. if devices[device_id].data_format is None:
  310. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  311. values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
  312. data_tag = str(slave_addr) + "." + str(device_id)
  313. for i in range(len(values)):
  314. data[data_tag + "." + str(i + 1)] = values[i]
  315. data_offset += offset + NBUS_SA_SIZE
  316. return data