nbus_bridge.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. import struct
  2. import time
  3. from typing import Optional
  4. from threading import Thread, Lock
  5. from beartype import beartype
  6. import pandas as pd
  7. from nbus_api.nbus_common_parser import NbusCommonParser
  8. from nbus_api.nbus_module_slave import NBusSlaveModule
  9. from nbus_api.nbus_sensor import NBusSensor
  10. from nbus_hal.crc8 import crc8
  11. from nbus_hal.nbus_generic_port import NBusPort
  12. from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
  13. from nbus_types.nbus_command_type import NBusCommand
  14. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  15. from nbus_types.nbus_defines import *
  16. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  17. from nbus_types.nbus_info_type import NBusBridgeInfo
  18. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  19. from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
  20. from nbus_types.nbus_status_type import NBusStatusType
  21. @beartype
  22. class NBusBridge:
  23. def __init__(self, serial_port: NBusPort, acquire_delay: float):
  24. """
  25. Constructor.
  26. :param serial_port: serial port
  27. :param acquire_delay: intermediate delay between data fetching when data not ready
  28. :param flush_delay: delay between stream flush
  29. """
  30. self.__port = serial_port # serial port reference
  31. self.__slaves_meta = {} # list of slaves meta information
  32. self.__acquire_thread = None # thread for data acquisition
  33. self.__data_raw = bytearray() # raw data buffer
  34. self._lock = Lock() # thread lock
  35. self.__in_acquisition = False # flag when in acquisition
  36. self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
  37. self.__df = pd.DataFrame() # internal data frame
  38. """
  39. ================================================================================================================
  40. Module General Methods
  41. ================================================================================================================
  42. """
  43. def init(self) -> None:
  44. """
  45. Initialize the bridge module from network.
  46. """
  47. try:
  48. self.cmd_get_slaves()
  49. self.cmd_get_format()
  50. except Exception:
  51. self.panic()
  52. def panic(self) -> None:
  53. """
  54. Reset communication when fatal error occurs.
  55. """
  56. self.stop_streaming()
  57. self.cmd_set_reset()
  58. self.__data_raw = bytearray()
  59. self.__df = pd.DataFrame()
  60. self.__slaves_meta = {}
  61. self.cmd_get_slaves()
  62. self.cmd_get_format()
  63. def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
  64. """
  65. Get connected slave modules.
  66. :return: dictionary of connected slaves
  67. """
  68. slaves = {}
  69. for key, value in self.__slaves_meta.items():
  70. slaves[key] = value.module_obj
  71. return slaves
  72. def start_streaming(self) -> None:
  73. """
  74. Start data streaming (e.g. bridge-cast).
  75. """
  76. self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
  77. self.__acquire_thread = Thread(target=self.__acquire_callback)
  78. self.__in_acquisition = True
  79. # end thread if running
  80. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  81. self.__acquire_thread.join()
  82. self.__acquire_thread.start()
  83. def stop_streaming(self):
  84. """
  85. Stop data streaming (e.g. bridge-cast).
  86. """
  87. self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
  88. self.__in_acquisition = False
  89. if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
  90. self.__acquire_thread.join()
  91. self.__port.flush()
  92. def fetch_stream_chunk(self) -> pd.DataFrame:
  93. """
  94. Fetch data from stream (e.g. bridge-cast).
  95. Can be called anytime.
  96. It not erase internal dataframe.
  97. :return: stream data frame
  98. """
  99. with self._lock:
  100. packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
  101. packet_cnt = len(packets) - self.__in_acquisition
  102. parsed_packets = []
  103. # parse packets
  104. for i in range(packet_cnt):
  105. data = self._parse_bridge_data_from_packet(packets[i], True)
  106. if data is not None:
  107. parsed_packets.append(data)
  108. # extend internal dataframe
  109. if parsed_packets:
  110. data_frame = pd.DataFrame(parsed_packets)
  111. self.__df = pd.concat([self.__df, data_frame], ignore_index=True)
  112. else:
  113. data_frame = pd.DataFrame()
  114. # erase raw data buffer
  115. if self.__in_acquisition:
  116. unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
  117. self.__data_raw = self.__data_raw[-unparsed_bytes:]
  118. else:
  119. self.__data_raw = bytearray()
  120. return data_frame
  121. def fetch_full_stream(self) -> pd.DataFrame:
  122. """
  123. Fetch all data from stream (e.g. bridge-cast).
  124. Must be called after stop_streaming() method.
  125. It will erase internal dataframe.
  126. :return: stream dataframe
  127. """
  128. if self.__in_acquisition:
  129. return pd.DataFrame()
  130. df = self.__df
  131. self.__df = pd.DataFrame()
  132. return df
  133. """
  134. ================================================================================================================
  135. Bridge Get Commands
  136. ================================================================================================================
  137. """
  138. def cmd_get_data(self) -> pd.DataFrame:
  139. """
  140. Get data from all slave devices.
  141. :return: dataframe of the network data
  142. """
  143. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
  144. return pd.DataFrame([self._parse_bridge_data_from_packet(bytearray(response), False)])
  145. def cmd_get_info(self) -> NBusBridgeInfo:
  146. """
  147. Get data from all slave devices.
  148. :return: bridge info structure
  149. """
  150. response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
  151. fw = str(response[1:4], "ascii")
  152. hw_family = str(response[4:7], "ascii")
  153. hw = str(response[7:10], "ascii")
  154. return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
  155. def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  156. """
  157. Get format of all connected devices.
  158. :return: dict of all device formats
  159. """
  160. if not self.__slaves_meta:
  161. raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
  162. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
  163. data_offset = 0
  164. fmt = {}
  165. while data_offset < resp_length:
  166. slave_addr = response[data_offset]
  167. data_offset += NBUS_MA_SIZE
  168. slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
  169. format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
  170. fmt |= self._set_slave_format_from_response(slave_addr, format_len,
  171. response[data_offset: data_offset + format_len])
  172. data_offset += format_len
  173. self._update_slaves_packet_size()
  174. return fmt
  175. def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
  176. """
  177. Get base information about all connected slaves and update internal fields.
  178. :return: dict of device count of every connected module
  179. """
  180. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
  181. slaves = {}
  182. data_offset = 0
  183. while data_offset < resp_length - 2:
  184. slave_addr = response[data_offset]
  185. slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
  186. slaves[slave_addr] = slave_sensor_cnt
  187. # update internal memory
  188. if slave_addr not in self.__slaves_meta:
  189. self.__slaves_meta[slave_addr] = NBusSlaveMeta(
  190. NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
  191. data_offset += 3
  192. return slaves
  193. """
  194. ================================================================================================================
  195. Bridge Get Commands
  196. ================================================================================================================
  197. """
  198. def cmd_set_reset(self) -> NBusStatusType:
  199. """
  200. Send reset command to bridge module.
  201. :return: status of success
  202. """
  203. resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
  204. return NBusStatusType(response[0])
  205. """
  206. ================================================================================================================
  207. Internal Methods
  208. ================================================================================================================
  209. """
  210. def __acquire_callback(self) -> None:
  211. """
  212. Thread worker for data acquisition.
  213. """
  214. while self.__in_acquisition:
  215. data = self.__port.try_read()
  216. if not data:
  217. time.sleep(self.__acquire_delay)
  218. continue
  219. with self._lock:
  220. self.__data_raw.extend(data)
  221. def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
  222. -> Optional[dict[str, NBusDataValue]]:
  223. """
  224. Parse the bridge data packet.
  225. :param data_packet: packet to parse
  226. :param check_packet: if crc check is required (only in bridge-cast)
  227. :return: dictionary of data values or None
  228. """
  229. packet_size = len(data_packet)
  230. packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
  231. # check validity
  232. if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
  233. return None
  234. # parse data
  235. try:
  236. data_offset = 0
  237. ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
  238. data = {"TS": ts}
  239. data_offset += NBUS_TS_SIZE
  240. while data_offset < packet_size - NBUS_CRC_SIZE:
  241. module_addr = data_packet[data_offset]
  242. data_offset += NBUS_MA_SIZE
  243. packet_size = self.__slaves_meta[module_addr].packet_size
  244. packet = data_packet[data_offset : data_offset + packet_size]
  245. data |= self._parse_slave_data_from_response(module_addr, packet_size, packet)
  246. data_offset += packet_size
  247. return data
  248. except Exception:
  249. return None
  250. def _update_slaves_packet_size(self) -> None:
  251. """
  252. Update slaves packet size from loaded format.
  253. """
  254. for slave_meta in self.__slaves_meta.values():
  255. packet_size = 0
  256. for device in slave_meta.module_obj.get_devices().values():
  257. fmt = device.data_format
  258. packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
  259. slave_meta.packet_size = packet_size
  260. def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
  261. -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
  262. """
  263. Parse and set format for all slaves from response.
  264. :param slave_addr: address of slave module
  265. :param resp_length: length of response
  266. :param response: raw data
  267. :return: parsed formats
  268. """
  269. data_offset = 0
  270. formats = {}
  271. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  272. # parse format
  273. while data_offset < resp_length:
  274. device_id = response[data_offset]
  275. device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
  276. if device_id not in devices:
  277. devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
  278. devices[device_id].data_format = device_format
  279. formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
  280. data_offset += NBUS_FMT_SIZE
  281. return formats
  282. def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
  283. -> dict[str, NBusDataValue]:
  284. """
  285. Parse data of single slave from response.
  286. :param slave_addr: address of selected slave
  287. :param resp_length: length of response
  288. :param response: raw data
  289. :return: dict of data values
  290. """
  291. data_offset = 0
  292. data = {}
  293. devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
  294. while data_offset < resp_length:
  295. device_id = response[data_offset]
  296. if devices[device_id].data_format is None:
  297. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  298. values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
  299. data_tag = str(slave_addr) + "." + str(device_id)
  300. for i in range(len(values)):
  301. data[data_tag + "." + str(i + 1)] = values[i]
  302. data_offset += offset + NBUS_SA_SIZE
  303. return data