nbus_bridge.py 14 KB


  1. import struct
  2. import time
  3. from typing import Optional
  4. from threading import Thread, Lock
  5. from beartype import beartype
  6. import pandas as pd
  7. from nbus_api.nbus_common_parser import NbusCommonParser
  8. from nbus_api.nbus_module_slave import NBusSlaveModule
  9. from nbus_api.nbus_sensor import NBusSensor
  10. from nbus_hal.crc8 import crc8
  11. from nbus_hal.nbus_generic_port import NBusPort
  12. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  13. from nbus_types.nbus_command_type import NBusCommand
  14. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  15. from nbus_types.nbus_defines import *
  16. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  17. from nbus_types.nbus_info_type import NBusBridgeInfo
  18. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  19. from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
  20. from nbus_types.nbus_status_type import NBusStatusType
  21. @beartype
  22. class NBusBridge:
  23. def __init__(self, serial_port: NBusPort, acquire_delay: float):
  24. """
  25. Constructor.
  26. :param serial_port: serial port
  27. :param acquire_delay: intermediate delay between data fetching when data not ready
  28. :param flush_delay: delay between stream flush
  29. """
  30. self.__port = serial_port # serial port reference
  31. self.__slaves_meta = {} # list of slaves meta information
  32. self.__acquire_thread = None # thread for data acquisition
  33. self.__data_raw = bytearray() # raw data buffer
  34. self._lock = Lock() # thread lock
  35. self.__in_acquisition = False # flag when in acquisition
  36. self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
  37. self.__df = pd.DataFrame() # internal data frame
  38. """
  39. ================================================================================================================
  40. Module General Methods
  41. ================================================================================================================
  42. """
  43. def init(self) -> None:
  44. """
  45. Initialize the bridge module from network.
  46. """
  47. try:
  48. self.cmd_get_slaves()
  49. self.cmd_get_format()
  50. except Exception:
  51. self.panic()
  52. def panic(self) -> None:
  53. """
  54. Reset communication when fatal error occurs.
  55. """
  56. self.stop_streaming()
  57. self.cmd_set_reset()
  58. self.__data_raw = bytearray()
  59. self.__df = pd.DataFrame()
  60. self.__slaves_meta = {}
  61. self.cmd_get_slaves()
  62. self.cmd_get_format()
  63. def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
  64. """
  65. Get connected slave modules.
  66. :return: dictionary of connected slaves
  67. """
  68. slaves = {}
  69. for key, value in self.__slaves_meta.items():
  70. slaves[key] = value.module_obj
  71. return slaves
  72. def start_streaming(self) -> None:
  73. """
  74. Start data streaming (e.g. bridge-cast).
  75. """
  76. self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
  77. self.__acquire_thread = Thread(target=self.__acquire_callback)
  78. self.__in_acquisition = True
  79. # end thread if running
  80. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  81. self.__acquire_thread.join()
  82. self.__acquire_thread.start()
  83. def stop_streaming(self):
  84. """
  85. Stop data streaming (e.g. bridge-cast).
  86. """
  87. self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
  88. self.__in_acquisition = False
  89. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  90. self.__acquire_thread.join()
  91. self.__port.flush()
  92. def fetch_stream_chunk(self) -> pd.DataFrame:
  93. """
  94. Fetch data from stream (e.g. bridge-cast).
  95. Can be called anytime.
  96. It not erase internal dataframe.
  97. :return: stream data frame
  98. """
  99. with self._lock:
  100. packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
  101. packet_cnt = len(packets) - self.__in_acquisition
  102. parsed_packets = []
  103. # parse packets
  104. for i in range(packet_cnt):
  105. data = self._parse_bridge_data_from_packet(packets[i], True)
  106. if data is not None:
  107. parsed_packets.append(data)
  108. # extend internal dataframe
  109. if parsed_packets:
  110. data_frame = pd.DataFrame(parsed_packets)
  111. self.__df = pd.concat([self.__df, data_frame], ignore_index=True)
  112. else:
  113. data_frame = pd.DataFrame()
  114. # erase raw data buffer
  115. if self.__in_acquisition:
  116. unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
  117. self.__data_raw = self.__data_raw[-unparsed_bytes:]
  118. else:
  119. self.__data_raw = bytearray()
  120. return data_frame
  121. def fetch_full_stream(self) -> pd.DataFrame:
  122. """
  123. Fetch all data from stream (e.g. bridge-cast).
  124. Must be called after stop_streaming() method.
  125. It will erase internal dataframe.
  126. :return: stream dataframe
  127. """
  128. if self.__in_acquisition:
  129. return pd.DataFrame()
  130. df = self.__df
  131. self.__df = pd.DataFrame()
  132. return df
  133. """
  134. ================================================================================================================
  135. Bridge Get Commands
  136. ================================================================================================================
  137. """
  138. def cmd_get_data(self) -> pd.DataFrame:
  139. """
  140. Get data from all slave devices.
  141. :return: dataframe of the network data
  142. """
  143. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
  144. return pd.DataFrame([self._parse_bridge_data_from_packet(bytearray(response), False)])
  145. def cmd_get_info(self) -> NBusBridgeInfo:
  146. """
  147. Get data from all slave devices.
  148. :return: bridge info structure
  149. """
  150. response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
  151. fw = str(response[1:4], "ascii")
  152. hw_family = str(response[4:7], "ascii")
  153. hw = str(response[7:10], "ascii")
  154. return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
  155. def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  156. """
  157. Get format of all connected devices.
  158. :return: dict of all device formats
  159. """
  160. if not self.__slaves_meta:
  161. raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
  162. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
  163. data_offset = 0
  164. fmt = {}
  165. while data_offset < resp_length:
  166. slave_addr = response[data_offset]
  167. data_offset += NBUS_MA_SIZE
  168. slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
  169. format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
  170. fmt |= self._set_slave_format_from_response(slave_addr, format_len,
  171. response[data_offset: data_offset + format_len])
  172. data_offset += format_len
  173. self._update_slaves_packet_size()
  174. return fmt
  175. def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
  176. """
  177. Get base information about all connected slaves and update internal fields.
  178. :return: dict of device count of every connected module
  179. """
  180. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
  181. slaves = {}
  182. data_offset = 0
  183. while data_offset < resp_length - 2:
  184. slave_addr = response[data_offset]
  185. slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
  186. slaves[slave_addr] = slave_sensor_cnt
  187. # update internal memory
  188. if slave_addr not in self.__slaves_meta:
  189. self.__slaves_meta[slave_addr] = NBusSlaveMeta(
  190. NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
  191. data_offset += 3
  192. return slaves
  193. """
  194. ================================================================================================================
  195. Bridge Get Commands
  196. ================================================================================================================
  197. """
  198. def cmd_set_reset(self) -> NBusStatusType:
  199. """
  200. Send reset command to bridge module.
  201. :return: status of success
  202. """
  203. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
  204. return NBusStatusType(response[0])
  205. """
  206. ================================================================================================================
  207. Internal Methods
  208. ================================================================================================================
  209. """
  210. def __acquire_callback(self) -> None:
  211. """
  212. Thread worker for data acquisition.
  213. """
  214. while self.__in_acquisition:
  215. data = self.__port.try_read()
  216. if not data:
  217. time.sleep(self.__acquire_delay)
  218. continue
  219. with self._lock:
  220. self.__data_raw.extend(data)
  221. def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
  222. -> Optional[dict[str, NBusDataValue]]:
  223. """
  224. Parse the bridge data packet.
  225. :param data_packet: packet to parse
  226. :param check_packet: if crc check is required (only in bridge-cast)
  227. :return: dictionary of data values or None
  228. """
  229. packet_size = len(data_packet)
  230. packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
  231. # check validity
  232. if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
  233. return None
  234. # parse data
  235. try:
  236. data_offset = 0
  237. ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
  238. data = {"TS": ts}
  239. data_offset += NBUS_TS_SIZE
  240. while data_offset < packet_size - NBUS_CRC_SIZE:
  241. module_addr = data_packet[data_offset]
  242. data_offset += NBUS_MA_SIZE
  243. packet_size = self.__slaves_meta[module_addr].packet_size
  244. packet = data_packet[data_offset : data_offset + packet_size]
  245. data |= self._parse_slave_data_from_response(module_addr, packet_size, packet)
  246. data_offset += packet_size
  247. return data
  248. except Exception:
  249. return None
  250. def _update_slaves_packet_size(self) -> None:
  251. """
  252. Update slaves packet size from loaded format.
  253. """
  254. for slave_meta in self.__slaves_meta.values():
  255. packet_size = 0
  256. for device in slave_meta.module_obj.get_devices().values():
  257. fmt = device.data_format
  258. packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
  259. slave_meta.packet_size = packet_size
  260. def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
  261. -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  262. """
  263. Parse and set format for all slaves from response.
  264. :param slave_addr: address of slave module
  265. :param resp_length: length of response
  266. :param response: raw data
  267. :return: parsed formats
  268. """
  269. data_offset = 0
  270. formats = {}
  271. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  272. # parse format
  273. while data_offset < resp_length:
  274. device_id = response[data_offset]
  275. device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
  276. if device_id not in devices:
  277. devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
  278. devices[device_id].data_format = device_format
  279. formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
  280. data_offset += NBUS_FMT_SIZE
  281. return formats
  282. def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
  283. -> dict[str, NBusDataValue]:
  284. """
  285. Parse data of single slave from response.
  286. :param slave_addr: address of selected slave
  287. :param resp_length: length of response
  288. :param response: raw data
  289. :return: dict of data values
  290. """
  291. data_offset = 0
  292. data = {}
  293. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  294. while data_offset < resp_length:
  295. device_id = response[data_offset]
  296. if devices[device_id].data_format is None:
  297. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  298. values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
  299. data_tag = str(slave_addr) + "." + str(device_id)
  300. for i in range(len(values)):
  301. data[data_tag + "." + str(i + 1)] = values[i]
  302. data_offset += offset + NBUS_SA_SIZE
  303. return data