import struct import time from typing import Optional from threading import Thread, Lock from beartype import beartype import pandas as pd from nbus_api.nbus_common_parser import NbusCommonParser from nbus_api.nbus_module_slave import NBusSlaveModule from nbus_api.nbus_sensor import NBusSensor from nbus_hal.crc8 import crc8 from nbus_hal.nbus_generic_port import NBusPort from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress from nbus_types.nbus_command_type import NBusCommand from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue from nbus_types.nbus_defines import * from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_info_type import NBusBridgeInfo from nbus_types.nbus_sensor_count_type import NBusSensorCount from nbus_types.nbus_slave_meta_type import NBusSlaveMeta from nbus_types.nbus_status_type import NBusStatusType @beartype class NBusBridge: def __init__(self, serial_port: NBusPort, acquire_delay: float): """ Constructor. :param serial_port: serial port :param acquire_delay: intermediate delay between data fetching when data not ready :param flush_delay: delay between stream flush """ self.__port = serial_port # serial port reference self.__slaves_meta = {} # list of slaves meta information self.__acquire_thread = None # thread for data acquisition self.__data_raw = bytearray() # raw data buffer self._lock = Lock() # thread lock self.__in_acquisition = False # flag when in acquisition self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready self.__df = pd.DataFrame() # internal data frame """ ================================================================================================================ Module General Methods ================================================================================================================ """ def init(self) -> None: """ Initialize the bridge module from network. """ try: self.cmd_get_slaves() self.cmd_get_format() except Exception: self.panic() def panic(self) -> None: """ Reset communication when fatal error occurs. """ self.stop_streaming() self.cmd_set_reset() self.__data_raw = bytearray() self.__df = pd.DataFrame() self.__slaves_meta = {} self.cmd_get_slaves() self.cmd_get_format() def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]: """ Get connected slave modules. :return: dictionary of connected slaves """ slaves = {} for key, value in self.__slaves_meta.items(): slaves[key] = value.module_obj return slaves def start_streaming(self) -> None: """ Start data streaming (e.g. bridge-cast). """ self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray()) self.__acquire_thread = Thread(target=self.__acquire_callback) self.__in_acquisition = True # end thread if running if self.__acquire_thread is not None and self.__acquire_thread.is_alive(): self.__acquire_thread.join() self.__acquire_thread.start() def stop_streaming(self): """ Stop data streaming (e.g. bridge-cast). """ self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray()) self.__in_acquisition = False if self.__acquire_thread is not None and self.__acquire_thread.is_alive(): self.__acquire_thread.join() self.__port.flush() def fetch_stream_chunk(self) -> pd.DataFrame: """ Fetch data from stream (e.g. bridge-cast). Can be called anytime. It not erase internal dataframe. :return: stream data frame """ with self._lock: packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR) packet_cnt = len(packets) - self.__in_acquisition parsed_packets = [] # parse packets for i in range(packet_cnt): data = self._parse_bridge_data_from_packet(packets[i], True) if data is not None: parsed_packets.append(data) # extend internal dataframe if parsed_packets: data_frame = pd.DataFrame(parsed_packets) self.__df = pd.concat([self.__df, data_frame], ignore_index=True) else: data_frame = pd.DataFrame() # erase raw data buffer if self.__in_acquisition: unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE self.__data_raw = self.__data_raw[-unparsed_bytes:] else: self.__data_raw = bytearray() return data_frame def fetch_full_stream(self) -> pd.DataFrame: """ Fetch all data from stream (e.g. bridge-cast). Must be called after stop_streaming() method. It will erase internal dataframe. :return: stream dataframe """ if self.__in_acquisition: return pd.DataFrame() df = self.__df self.__df = pd.DataFrame() return df """ ================================================================================================================ Bridge Get Commands ================================================================================================================ """ def cmd_get_data(self) -> pd.DataFrame: """ Get data from all slave devices. :return: dataframe of the network data """ response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray()) return pd.DataFrame([self._parse_bridge_data_from_packet(response[1:], False)]) def cmd_get_info(self) -> NBusBridgeInfo: """ Get data from all slave devices. :return: bridge info structure """ response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray()) fw = str(response[1:4], "ascii") hw_family = str(response[4:7], "ascii") hw = str(response[7:10], "ascii") return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw) def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]: """ Get format of all connected devices. :return: dict of all device formats """ if not self.__slaves_meta: raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED) resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray()) data_offset = 0 fmt = {} while data_offset < resp_length: slave_addr = response[data_offset] data_offset += NBUS_MA_SIZE slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count) fmt |= self._set_slave_format_from_response(slave_addr, format_len, response[data_offset: data_offset + format_len]) data_offset += format_len self._update_slaves_packet_size() return fmt def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]: """ Get base information about all connected slaves and update internal fields. :return: dict of device count of every connected module """ resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray()) slaves = {} data_offset = 0 while data_offset < resp_length - 2: slave_addr = response[data_offset] slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2]) slaves[slave_addr] = slave_sensor_cnt # update internal memory if slave_addr not in self.__slaves_meta: self.__slaves_meta[slave_addr] = NBusSlaveMeta( NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0) data_offset += 3 return slaves """ ================================================================================================================ Bridge Get Commands ================================================================================================================ """ def cmd_set_reset(self) -> NBusStatusType: """ Send reset command to bridge module. :return: status of success """ resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray()) return NBusStatusType(response[0]) """ ================================================================================================================ Internal Methods ================================================================================================================ """ def __acquire_callback(self) -> None: """ Thread worker for data acquisition. """ while self.__in_acquisition: data = self.__port.try_read() if not data: time.sleep(self.__acquire_delay) continue with self._lock: self.__data_raw.extend(data) def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \ -> Optional[dict[str, NBusDataValue]]: """ Parse the bridge data packet. :param data_packet: packet to parse :param check_packet: if crc check is required (only in bridge-cast) :return: dictionary of data values or None """ packet_size = len(data_packet) packet_crc = crc8(data_packet[:NBUS_CRC_ADDR]) # check validity if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc): return None # parse data try: data_offset = 0 ts = struct.unpack(" None: """ Update slaves packet size from loaded format. """ for slave_meta in self.__slaves_meta.values(): packet_size = 0 for device in slave_meta.module_obj.get_devices().values(): fmt = device.data_format packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE slave_meta.packet_size = packet_size def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \ -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]: """ Parse and set format for all slaves from response. :param slave_addr: address of slave module :param resp_length: length of response :param response: raw data :return: parsed formats """ data_offset = 0 formats = {} devices = self.__slaves_meta[slave_addr].module_obj.get_devices() # parse format while data_offset < resp_length: device_id = response[data_offset] device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE]) if device_id not in devices: devices[device_id] = NBusSensor(self.__port, slave_addr, device_id) devices[device_id].data_format = device_format formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format data_offset += NBUS_FMT_SIZE return formats def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \ -> dict[str, NBusDataValue]: """ Parse data of single slave from response. :param slave_addr: address of selected slave :param resp_length: length of response :param response: raw data :return: dict of data values """ data_offset = 0 data = {} devices = self.__slaves_meta[slave_addr].module_obj.get_devices() while data_offset < resp_length: device_id = response[data_offset] if devices[device_id].data_format is None: raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED) values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:]) data_tag = str(slave_addr) + "." + str(device_id) for i in range(len(values)): data[data_tag + "." + str(i + 1)] = values[i] data_offset += offset + NBUS_SA_SIZE return data