|
|
@@ -0,0 +1,401 @@
|
|
|
+import struct
|
|
|
+import time
|
|
|
+from typing import Optional
|
|
|
+from threading import Thread, Lock
|
|
|
+from beartype import beartype
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from nbus_api.nbus_common_parser import NbusCommonParser
|
|
|
+from nbus_api.nbus_module_slave import NBusSlaveModule
|
|
|
+from nbus_api.nbus_sensor import NBusSensor
|
|
|
+from nbus_hal.crc8 import crc8
|
|
|
+from nbus_hal.nbus_generic_port import NBusPort
|
|
|
+from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
|
|
|
+from nbus_types.nbus_command_type import NBusCommand
|
|
|
+from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
|
|
|
+from nbus_types.nbus_defines import *
|
|
|
+from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
|
|
|
+from nbus_types.nbus_info_type import NBusBridgeInfo
|
|
|
+from nbus_types.nbus_sensor_count_type import NBusSensorCount
|
|
|
+from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
|
|
|
+from nbus_types.nbus_status_type import NBusStatusType
|
|
|
+
|
|
|
+@beartype
|
|
|
+class NBusBridge:
|
|
|
+
|
|
|
+ def __init__(self, serial_port: NBusPort, acquire_delay: float):
|
|
|
+ """
|
|
|
+ Constructor.
|
|
|
+
|
|
|
+ :param serial_port: serial port
|
|
|
+ :param acquire_delay: intermediate delay between data fetching when data not ready
|
|
|
+ :param flush_delay: delay between stream flush
|
|
|
+ """
|
|
|
+ self.__port = serial_port # serial port reference
|
|
|
+ self.__slaves_meta = {} # list of slaves meta information
|
|
|
+ self.__acquire_thread = None # thread for data acquisition
|
|
|
+ self.__data_raw = bytearray() # raw data buffer
|
|
|
+ self._lock = Lock() # thread lock
|
|
|
+ self.__in_acquisition = False # flag when in acquisition
|
|
|
+ self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
|
|
|
+ self.__df = pd.DataFrame() # internal data frame
|
|
|
+ self.__ts0 = None # 0-th timestamp
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Module General Methods
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+ def init(self) -> None:
|
|
|
+ """
|
|
|
+ Initialize the bridge module from network.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.cmd_get_slaves()
|
|
|
+ self.cmd_get_format()
|
|
|
+ except Exception:
|
|
|
+ self.panic()
|
|
|
+
|
|
|
+ def panic(self) -> None:
|
|
|
+ """
|
|
|
+ Reset communication when fatal error occurs.
|
|
|
+ """
|
|
|
+ self.stop_streaming()
|
|
|
+ self.cmd_set_reset()
|
|
|
+
|
|
|
+ self.__data_raw = bytearray()
|
|
|
+ self.__df = pd.DataFrame()
|
|
|
+ self.__slaves_meta = {}
|
|
|
+
|
|
|
+ self.cmd_get_slaves()
|
|
|
+ self.cmd_get_format()
|
|
|
+
|
|
|
+ def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
|
|
|
+ """
|
|
|
+ Get connected slave modules.
|
|
|
+
|
|
|
+ :return: dictionary of connected slaves
|
|
|
+ """
|
|
|
+ slaves = {}
|
|
|
+
|
|
|
+ for key, value in self.__slaves_meta.items():
|
|
|
+ slaves[key] = value.module_obj
|
|
|
+
|
|
|
+ return slaves
|
|
|
+
|
|
|
+ def start_streaming(self) -> None:
|
|
|
+ """
|
|
|
+ Start data streaming (e.g. bridge-cast).
|
|
|
+ """
|
|
|
+ self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
|
|
|
+ self.__acquire_thread = Thread(target=self.__acquire_callback)
|
|
|
+ self.__in_acquisition = True
|
|
|
+
|
|
|
+ # end thread if running
|
|
|
+ if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
|
|
|
+ self.__acquire_thread.join()
|
|
|
+
|
|
|
+ self.__acquire_thread.start()
|
|
|
+
|
|
|
+ def stop_streaming(self):
|
|
|
+ """
|
|
|
+ Stop data streaming (e.g. bridge-cast).
|
|
|
+ """
|
|
|
+ self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
|
|
|
+
|
|
|
+ self.__in_acquisition = False
|
|
|
+
|
|
|
+ if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
|
|
|
+ self.__acquire_thread.join()
|
|
|
+
|
|
|
+ self.__port.flush()
|
|
|
+
|
|
|
+ def fetch_stream_chunk(self) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ Fetch data from stream (e.g. bridge-cast).
|
|
|
+ Can be called anytime.
|
|
|
+ It not erase internal dataframe.
|
|
|
+
|
|
|
+ :return: stream data frame
|
|
|
+ """
|
|
|
+ with self._lock:
|
|
|
+ packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
|
|
|
+ packet_cnt = len(packets) - self.__in_acquisition
|
|
|
+ parsed_packets = []
|
|
|
+
|
|
|
+ # parse packets
|
|
|
+ for i in range(packet_cnt):
|
|
|
+ data = self._parse_bridge_data_from_packet(packets[i], True)
|
|
|
+ if data is not None:
|
|
|
+ parsed_packets.append(data)
|
|
|
+
|
|
|
+ # extend internal dataframe
|
|
|
+ if parsed_packets:
|
|
|
+ data_frame = pd.DataFrame(parsed_packets)
|
|
|
+ self.__df = pd.concat([self.__df, data_frame], ignore_index=True).copy(deep=True)
|
|
|
+ else:
|
|
|
+ data_frame = pd.DataFrame()
|
|
|
+
|
|
|
+ # erase raw data buffer
|
|
|
+ if self.__in_acquisition:
|
|
|
+ unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
|
|
|
+ self.__data_raw = self.__data_raw[-unparsed_bytes:]
|
|
|
+ else:
|
|
|
+ self.__data_raw = bytearray()
|
|
|
+
|
|
|
+ self._transform_timestamp(data_frame)
|
|
|
+
|
|
|
+ return data_frame
|
|
|
+
|
|
|
+ def fetch_full_stream(self) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ Fetch all data from stream (e.g. bridge-cast).
|
|
|
+ Must be called after stop_streaming() method.
|
|
|
+ It will erase internal dataframe.
|
|
|
+
|
|
|
+ :return: stream dataframe
|
|
|
+ """
|
|
|
+ if self.__in_acquisition:
|
|
|
+ return pd.DataFrame()
|
|
|
+
|
|
|
+ df = self.__df
|
|
|
+ self._transform_timestamp(df)
|
|
|
+
|
|
|
+ self.__df = pd.DataFrame()
|
|
|
+ self.__ts0 = None
|
|
|
+
|
|
|
+ return df
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Bridge Get Commands
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+ def cmd_get_data(self) -> pd.DataFrame:
|
|
|
+ """
|
|
|
+ Get data from all slave devices.
|
|
|
+
|
|
|
+ :return: dataframe of the network data
|
|
|
+ """
|
|
|
+ response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
|
|
|
+ return pd.DataFrame([self._parse_bridge_data_from_packet(response[1:], False)])
|
|
|
+
|
|
|
+ def cmd_get_info(self) -> NBusBridgeInfo:
|
|
|
+ """
|
|
|
+ Get data from all slave devices.
|
|
|
+
|
|
|
+ :return: bridge info structure
|
|
|
+ """
|
|
|
+ response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
|
|
|
+ fw = str(response[1:4], "ascii")
|
|
|
+ hw_family = str(response[4:7], "ascii")
|
|
|
+ hw = str(response[7:10], "ascii")
|
|
|
+
|
|
|
+ return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
|
|
|
+
|
|
|
+ def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
|
|
|
+ """
|
|
|
+ Get format of all connected devices.
|
|
|
+
|
|
|
+ :return: dict of all device formats
|
|
|
+ """
|
|
|
+ if not self.__slaves_meta:
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
|
|
|
+
|
|
|
+ resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
|
|
|
+ data_offset = 0
|
|
|
+ fmt = {}
|
|
|
+
|
|
|
+ while data_offset < resp_length:
|
|
|
+ slave_addr = response[data_offset]
|
|
|
+ data_offset += NBUS_MA_SIZE
|
|
|
+
|
|
|
+ slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
|
|
|
+ format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
|
|
|
+
|
|
|
+ fmt |= self._set_slave_format_from_response(slave_addr, format_len,
|
|
|
+ response[data_offset: data_offset + format_len])
|
|
|
+
|
|
|
+ data_offset += format_len
|
|
|
+
|
|
|
+ self._update_slaves_packet_size()
|
|
|
+ return fmt
|
|
|
+
|
|
|
+ def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
|
|
|
+ """
|
|
|
+ Get base information about all connected slaves and update internal fields.
|
|
|
+
|
|
|
+ :return: dict of device count of every connected module
|
|
|
+ """
|
|
|
+ resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
|
|
|
+ slaves = {}
|
|
|
+ data_offset = 0
|
|
|
+
|
|
|
+ while data_offset < resp_length - 2:
|
|
|
+ slave_addr = response[data_offset]
|
|
|
+ slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
|
|
|
+
|
|
|
+ slaves[slave_addr] = slave_sensor_cnt
|
|
|
+
|
|
|
+ # update internal memory
|
|
|
+ if slave_addr not in self.__slaves_meta:
|
|
|
+ self.__slaves_meta[slave_addr] = NBusSlaveMeta(
|
|
|
+ NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
|
|
|
+
|
|
|
+ data_offset += 3
|
|
|
+
|
|
|
+ return slaves
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Bridge Get Commands
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+ def cmd_set_reset(self) -> NBusStatusType:
|
|
|
+ """
|
|
|
+ Send reset command to bridge module.
|
|
|
+
|
|
|
+ :return: status of success
|
|
|
+ """
|
|
|
+ resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
|
|
|
+ return NBusStatusType(response[0])
|
|
|
+
|
|
|
+ """
|
|
|
+ ================================================================================================================
|
|
|
+ Internal Methods
|
|
|
+ ================================================================================================================
|
|
|
+ """
|
|
|
+ def __acquire_callback(self) -> None:
|
|
|
+ """
|
|
|
+ Thread worker for data acquisition.
|
|
|
+ """
|
|
|
+ while self.__in_acquisition:
|
|
|
+ data = self.__port.try_read()
|
|
|
+
|
|
|
+ if not data:
|
|
|
+ time.sleep(self.__acquire_delay)
|
|
|
+ continue
|
|
|
+
|
|
|
+ with self._lock:
|
|
|
+ self.__data_raw.extend(data)
|
|
|
+
|
|
|
+ def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
|
|
|
+ -> Optional[dict[str, NBusDataValue]]:
|
|
|
+ """
|
|
|
+ Parse the bridge data packet.
|
|
|
+
|
|
|
+ :param data_packet: packet to parse
|
|
|
+ :param check_packet: if crc check is required (only in bridge-cast)
|
|
|
+
|
|
|
+ :return: dictionary of data values or None
|
|
|
+ """
|
|
|
+ packet_size = len(data_packet)
|
|
|
+ packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
|
|
|
+ # check validity
|
|
|
+ if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
|
|
|
+ return None
|
|
|
+
|
|
|
+ # parse data
|
|
|
+ try:
|
|
|
+ data_offset = 0
|
|
|
+ ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
|
|
|
+ data = {"TS": ts}
|
|
|
+ data_offset += NBUS_TS_SIZE
|
|
|
+
|
|
|
+ while data_offset < packet_size - NBUS_CRC_SIZE:
|
|
|
+ module_addr = data_packet[data_offset]
|
|
|
+ data_offset += NBUS_MA_SIZE
|
|
|
+ slave_size = self.__slaves_meta[module_addr].packet_size
|
|
|
+ packet = data_packet[data_offset : data_offset + slave_size]
|
|
|
+
|
|
|
+ data |= self._parse_slave_data_from_response(module_addr, slave_size, packet)
|
|
|
+ data_offset += slave_size
|
|
|
+
|
|
|
+ return data
|
|
|
+
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _update_slaves_packet_size(self) -> None:
|
|
|
+ """
|
|
|
+ Update slaves packet size from loaded format.
|
|
|
+ """
|
|
|
+ for slave_meta in self.__slaves_meta.values():
|
|
|
+ packet_size = 0
|
|
|
+
|
|
|
+ for device in slave_meta.module_obj.get_devices().values():
|
|
|
+ fmt = device.data_format
|
|
|
+ packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
|
|
|
+
|
|
|
+ slave_meta.packet_size = packet_size
|
|
|
+
|
|
|
+ def _transform_timestamp(self, data_frame: pd.DataFrame) -> None:
|
|
|
+ """
|
|
|
+ Transform timestamp values in dataframe.
|
|
|
+ :param data_frame: dataframe to transform
|
|
|
+ """
|
|
|
+ if not data_frame.empty and "TS" in self.__df.columns:
|
|
|
+ if self.__ts0 is None:
|
|
|
+ self.__ts0 = self.__df["TS"].iloc[0]
|
|
|
+
|
|
|
+ data_frame["TS"] -= self.__ts0
|
|
|
+
|
|
|
+ def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
|
|
|
+ -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
|
|
|
+ """
|
|
|
+ Parse and set format for all slaves from response.
|
|
|
+
|
|
|
+ :param slave_addr: address of slave module
|
|
|
+ :param resp_length: length of response
|
|
|
+ :param response: raw data
|
|
|
+
|
|
|
+ :return: parsed formats
|
|
|
+ """
|
|
|
+ data_offset = 0
|
|
|
+ formats = {}
|
|
|
+
|
|
|
+ devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
|
|
|
+
|
|
|
+ # parse format
|
|
|
+ while data_offset < resp_length:
|
|
|
+ device_id = response[data_offset]
|
|
|
+ device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
|
|
|
+
|
|
|
+ if device_id not in devices:
|
|
|
+ devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
|
|
|
+
|
|
|
+ devices[device_id].data_format = device_format
|
|
|
+ formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
|
|
|
+ data_offset += NBUS_FMT_SIZE
|
|
|
+
|
|
|
+ return formats
|
|
|
+
|
|
|
+ def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
|
|
|
+ -> dict[str, NBusDataValue]:
|
|
|
+ """
|
|
|
+ Parse data of single slave from response.
|
|
|
+ :param slave_addr: address of selected slave
|
|
|
+ :param resp_length: length of response
|
|
|
+ :param response: raw data
|
|
|
+
|
|
|
+ :return: dict of data values
|
|
|
+ """
|
|
|
+ data_offset = 0
|
|
|
+ data = {}
|
|
|
+ devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
|
|
|
+
|
|
|
+ while data_offset < resp_length:
|
|
|
+ device_id = response[data_offset]
|
|
|
+
|
|
|
+ if devices[device_id].data_format is None:
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
|
|
|
+
|
|
|
+ values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
|
|
|
+ data_tag = str(slave_addr) + "." + str(device_id)
|
|
|
+
|
|
|
+ for i in range(len(values)):
|
|
|
+ data[data_tag + "." + str(i + 1)] = values[i]
|
|
|
+
|
|
|
+ data_offset += offset + NBUS_SA_SIZE
|
|
|
+
|
|
|
+ return data
|