import struct from collections import deque from dataclasses import dataclass from nbus_api.nbus_common_parser import NbusCommonParser from nbus_api.nbus_module_slave import NBusSlaveModule from nbus_api.nbus_sensor import NBusSensor from nbus_hal.nbus_serial.serial_port import * from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_sensor_count_type import NBusSensorCount import time from threading import Thread, Lock from collections import namedtuple import pandas as pd from nbus_types.nbus_status_type import NBusStatusType NBUS_RX_META = 4 NBUS_FMT_SIZE = 4 NBUS_TS_SIZE = 4 NBUS_CRC_SIZE = 1 NBUS_MA_SIZE = 1 NBUS_SA_SIZE = 1 NBUS_CRC_ADDR = -1 NBUS_BRIDGE_DATA_HDR = bytearray([0x00] + [0xFF] * 8 + [0x00]) NBUS_BRIDGE_DATA_HDR_SIZE = len(NBUS_BRIDGE_DATA_HDR) @dataclass class NBusSlaveMeta: obj: NBusSlaveModule cnt: NBusSensorCount packet_size: int @beartype class NBusBridge: def __init__(self, serial_port: NBusSerialPort, acquire_delay: float, flush_delay: float): """ Constructor. :param serial_port: serial port :param acquire_delay: intermediate delay between data fetching when data not ready :param flush_delay: delay between stream flush """ self.__port = serial_port # serial port reference self.__slaves_meta = {} # list of slaves meta information self.__acquire_thread = None # thread for data acquisition self.__data_raw = bytearray() # raw data buffer self._lock = Lock() # thread lock self.__in_acquisition = False # flag when in acquisition self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready self.__flush_delay = flush_delay # delay between stream flush self.__df = pd.DataFrame() # internal data frame def init_from_network(self): try: self.cmd_get_slaves() self.cmd_get_format() except Exception: self.panic() def panic(self): self.stop_data_streaming() self.cmd_set_reset() self.cmd_get_slaves() self.cmd_get_format() def start_data_streaming(self): self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray()) self.__acquire_thread = Thread(target=self.__acquire_callback) self.__in_acquisition = True # end thread if running if self.__acquire_thread.is_alive(): self.__acquire_thread.join() self.__acquire_thread.start() def stop_data_streaming(self): self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray()) self.__in_acquisition = False if self.__acquire_thread is not None and self.__acquire_thread.is_alive(): self.__acquire_thread.join() self.__port.flush(self.__flush_delay) def fetch_chunk_from_stream(self): with self._lock: packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR) packet_cnt = len(packets) - self.__in_acquisition parsed_packets = [] # parse packets for i in range(packet_cnt): data = self._parse_bridge_data(packets[i], True) if data is not None: parsed_packets.append(data) # extend internal dataframe if parsed_packets: data_frame = pd.DataFrame(parsed_packets) self.__df = pd.concat([self.__df, data_frame], ignore_index=True) else: data_frame = pd.DataFrame() # erase raw data buffer if self.__in_acquisition: unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE self.__data_raw = self.__data_raw[-unparsed_bytes:] else: self.__data_raw = bytearray() return data_frame def fetch_all_from_stream(self): df = self.__df self.__df = pd.DataFrame() return df def cmd_get_slaves(self): resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray([])) slaves = {} data_offset = 0 while data_offset < resp_length - 2: slave_addr = response[data_offset] slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2]) slaves[slave_addr] = slave_sensor_cnt self.__slaves_meta[slave_addr] = NBusSlaveMeta( NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0 ) data_offset += 3 return slaves def cmd_set_reset(self): resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray()) return NBusStatusType(response[0]) def cmd_get_format(self): resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray()) data_offset = 0 fmt = {} while data_offset < resp_length: slave_addr = response[data_offset] data_offset += NBUS_MA_SIZE slave_sensor_cnt = self.__slaves_meta[slave_addr].cnt format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count) fmt |= self._set_slave_format_from_response(slave_addr, format_len, response[data_offset: data_offset + format_len]) data_offset += format_len self._set_data_packet_size() return fmt def cmd_get_data(self): resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray()) return self._parse_bridge_data(bytearray(response), False) def _parse_bridge_data(self, data_packet, check_packet): packet_size = len(data_packet) packet_crc = crc8(data_packet[:NBUS_CRC_ADDR]) if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc): return None try: data_offset = 0 ts = struct.unpack("