||
- import struct
- from collections import deque
- from dataclasses import dataclass
- from nbus_api.nbus_common_parser import NbusCommonParser
- from nbus_api.nbus_module_slave import NBusSlaveModule
- from nbus_api.nbus_sensor import NBusSensor
- from nbus_hal.nbus_serial.serial_port import *
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_sensor_count_type import NBusSensorCount
- import time
- from threading import Thread, Lock
- from collections import namedtuple
- import pandas as pd
- from nbus_types.nbus_status_type import NBusStatusType
- NBUS_RX_META = 4
- NBUS_FMT_SIZE = 4
- NBUS_TS_SIZE = 4
- NBUS_CRC_SIZE = 1
- NBUS_MA_SIZE = 1
- NBUS_SA_SIZE = 1
- NBUS_CRC_ADDR = -1
- NBUS_BRIDGE_DATA_HDR = bytearray([0x00] + [0xFF] * 8 + [0x00])
- NBUS_BRIDGE_DATA_HDR_SIZE = len(NBUS_BRIDGE_DATA_HDR)
- @dataclass
- class NBusSlaveMeta:
- obj: NBusSlaveModule
- cnt: NBusSensorCount
- packet_size: int
- @beartype
- class NBusBridge:
- def __init__(self, serial_port: NBusSerialPort, acquire_delay: float, flush_delay: float):
- """
- Constructor.
- :param serial_port: serial port
- :param acquire_delay: intermediate delay between data fetching when data not ready
- :param flush_delay: delay between stream flush
- """
- self.__port = serial_port # serial port reference
- self.__slaves_meta = {} # list of slaves meta information
- self.__acquire_thread = None # thread for data acquisition
- self.__data_raw = bytearray() # raw data buffer
- self._lock = Lock() # thread lock
- self.__in_acquisition = False # flag when in acquisition
- self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
- self.__flush_delay = flush_delay # delay between stream flush
- self.__df = pd.DataFrame() # internal data frame
- def init_from_network(self):
- try:
- self.cmd_get_slaves()
- self.cmd_get_format()
- except Exception:
- self.panic()
- def panic(self):
- self.stop_data_streaming()
- self.cmd_set_reset()
- self.cmd_get_slaves()
- self.cmd_get_format()
- def start_data_streaming(self):
- self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
- self.__acquire_thread = Thread(target=self.__acquire_callback)
- self.__in_acquisition = True
- # end thread if running
- if self.__acquire_thread.is_alive():
- self.__acquire_thread.join()
- self.__acquire_thread.start()
- def stop_data_streaming(self):
- self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
- self.__in_acquisition = False
- if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
- self.__acquire_thread.join()
- self.__port.flush(self.__flush_delay)
- def fetch_chunk_from_stream(self):
- with self._lock:
- packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
- packet_cnt = len(packets) - self.__in_acquisition
- parsed_packets = []
- # parse packets
- for i in range(packet_cnt):
- data = self._parse_bridge_data(packets[i], True)
- if data is not None:
- parsed_packets.append(data)
- # extend internal dataframe
- if parsed_packets:
- data_frame = pd.DataFrame(parsed_packets)
- self.__df = pd.concat([self.__df, data_frame], ignore_index=True)
- else:
- data_frame = pd.DataFrame()
- # erase raw data buffer
- if self.__in_acquisition:
- unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
- self.__data_raw = self.__data_raw[-unparsed_bytes:]
- else:
- self.__data_raw = bytearray()
- return data_frame
- def fetch_all_from_stream(self):
- df = self.__df
- self.__df = pd.DataFrame()
- return df
- def cmd_get_slaves(self):
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray([]))
- slaves = {}
- data_offset = 0
- while data_offset < resp_length - 2:
- slave_addr = response[data_offset]
- slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
- slaves[slave_addr] = slave_sensor_cnt
- self.__slaves_meta[slave_addr] = NBusSlaveMeta(
- NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0
- )
- data_offset += 3
- return slaves
- def cmd_set_reset(self):
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
- return NBusStatusType(response[0])
- def cmd_get_format(self):
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
- data_offset = 0
- fmt = {}
- while data_offset < resp_length:
- slave_addr = response[data_offset]
- data_offset += NBUS_MA_SIZE
- slave_sensor_cnt = self.__slaves_meta[slave_addr].cnt
- format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
- fmt |= self._set_slave_format_from_response(slave_addr, format_len,
- response[data_offset: data_offset + format_len])
- data_offset += format_len
- self._set_data_packet_size()
- return fmt
- def cmd_get_data(self):
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
- return self._parse_bridge_data(bytearray(response), False)
- def _parse_bridge_data(self, data_packet, check_packet):
- packet_size = len(data_packet)
- packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
- if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
- return None
- try:
- data_offset = 0
- ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
- data = {"TS": ts}
- data_offset += NBUS_TS_SIZE
- while data_offset < packet_size - NBUS_CRC_SIZE:
- module_addr = data_packet[data_offset]
- data_offset += NBUS_MA_SIZE
- packet_size = self.__slaves_meta[module_addr].packet_size
- packet = data_packet[data_offset : data_offset + packet_size]
- data |= self._get_data_from_response(module_addr, packet_size, packet)
- data_offset += packet_size
- return data
- except Exception as ex:
- return None
- def _set_data_packet_size(self):
- self.__packet_size = NBUS_TS_SIZE + NBUS_CRC_SIZE
- for slave_meta in self.__slaves_meta.values():
- packet_size = 0
- for device in slave_meta.obj.get_devices().values():
- fmt = device.data_format
- packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
- slave_meta.packet_size = packet_size
- self.__packet_size += packet_size + NBUS_MA_SIZE
- def _set_slave_format_from_response(self, slave_addr, resp_length, response):
- data_offset = 0
- formats = {}
- devices = self.__slaves_meta[slave_addr].obj.get_devices()
- # parse format
- while data_offset < resp_length:
- device_id = response[data_offset]
- device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
- if device_id not in devices:
- devices[device_id] = NBusSensor(device_id)
- devices[device_id].data_format = device_format
- tag = str(slave_addr) + "." + str(device_id)
- formats[tag] = device_format
- data_offset += NBUS_FMT_SIZE
- return formats
- def _get_data_from_response(self, slave_addr, resp_length, response):
- data_offset = 0
- data = {}
- devices = self.__slaves_meta[slave_addr].obj.get_devices()
- while data_offset < resp_length:
- device_id = response[data_offset]
- if devices[device_id].data_format is None:
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
- data_tag = str(slave_addr) + "." + str(device_id)
- for i in range(len(values)):
- data[data_tag + "." + str(i + 1)] = values[i]
- data_offset += offset + NBUS_SA_SIZE
- return data
- def __acquire_callback(self):
- while self.__in_acquisition:
- data = self.__port.try_read()
- if not data:
- time.sleep(self.__acquire_delay)
- continue
- with self._lock:
- self.__data_raw.extend(data)
|