| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- import struct
- import time
- from typing import Optional
- from threading import Thread, Lock
- from beartype import beartype
- import pandas as pd
- from nbus_api.nbus_common_parser import NbusCommonParser
- from nbus_api.nbus_module_slave import NBusSlaveModule
- from nbus_api.nbus_sensor import NBusSensor
- from nbus_hal.crc8 import crc8
- from nbus_hal.nbus_generic_port import NBusPort
- from nbus_types.nbus_address_type import NBusModuleAddress, NBusSensorAddress
- from nbus_types.nbus_command_type import NBusCommand
- from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
- from nbus_types.nbus_defines import *
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_info_type import NBusBridgeInfo
- from nbus_types.nbus_sensor_count_type import NBusSensorCount
- from nbus_types.nbus_slave_meta_type import NBusSlaveMeta
- from nbus_types.nbus_status_type import NBusStatusType
- @beartype
- class NBusBridge:
- def __init__(self, serial_port: NBusPort, acquire_delay: float):
- """
- Constructor.
- :param serial_port: serial port
- :param acquire_delay: intermediate delay between data fetching when data not ready
- :param flush_delay: delay between stream flush
- """
- self.__port = serial_port # serial port reference
- self.__slaves_meta = {} # list of slaves meta information
- self.__acquire_thread = None # thread for data acquisition
- self.__data_raw = bytearray() # raw data buffer
- self._lock = Lock() # thread lock
- self.__in_acquisition = False # flag when in acquisition
- self.__acquire_delay = acquire_delay # intermediate delay between data fetching when data not ready
- self.__df = pd.DataFrame() # internal data frame
- """
- ================================================================================================================
- Module General Methods
- ================================================================================================================
- """
- def init(self) -> None:
- """
- Initialize the bridge module from network.
- """
- try:
- self.cmd_get_slaves()
- self.cmd_get_format()
- except Exception:
- self.panic()
- def panic(self) -> None:
- """
- Reset communication when fatal error occurs.
- """
- self.stop_streaming()
- self.cmd_set_reset()
- self.__data_raw = bytearray()
- self.__df = pd.DataFrame()
- self.__slaves_meta = {}
- self.cmd_get_slaves()
- self.cmd_get_format()
- def get_slaves(self) -> dict[NBusModuleAddress, NBusSlaveModule]:
- """
- Get connected slave modules.
- :return: dictionary of connected slaves
- """
- slaves = {}
- for key, value in self.__slaves_meta.items():
- slaves[key] = value.module_obj
- return slaves
- def start_streaming(self) -> None:
- """
- Start data streaming (e.g. bridge-cast).
- """
- self.__port.send_bridge(NBusCommand.CMD_SET_START, bytearray())
- self.__acquire_thread = Thread(target=self.__acquire_callback)
- self.__in_acquisition = True
- # end thread if running
- if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
- self.__acquire_thread.join()
- self.__acquire_thread.start()
- def stop_streaming(self):
- """
- Stop data streaming (e.g. bridge-cast).
- """
- self.__port.send_bridge(NBusCommand.CMD_SET_STOP, bytearray())
- self.__in_acquisition = False
- if self.__acquire_thread is not None and self.__acquire_thread.is_alive():
- self.__acquire_thread.join()
- self.__port.flush()
- def fetch_stream_chunk(self) -> pd.DataFrame:
- """
- Fetch data from stream (e.g. bridge-cast).
- Can be called anytime.
- It not erase internal dataframe.
- :return: stream data frame
- """
- with self._lock:
- packets = self.__data_raw.split(NBUS_BRIDGE_DATA_HDR)
- packet_cnt = len(packets) - self.__in_acquisition
- parsed_packets = []
- # parse packets
- for i in range(packet_cnt):
- data = self._parse_bridge_data_from_packet(packets[i], True)
- if data is not None:
- parsed_packets.append(data)
- # extend internal dataframe
- if parsed_packets:
- data_frame = pd.DataFrame(parsed_packets)
- self.__df = pd.concat([self.__df, data_frame], ignore_index=True)
- else:
- data_frame = pd.DataFrame()
- # erase raw data buffer
- if self.__in_acquisition:
- unparsed_bytes = len(packets[-1]) + NBUS_BRIDGE_DATA_HDR_SIZE
- self.__data_raw = self.__data_raw[-unparsed_bytes:]
- else:
- self.__data_raw = bytearray()
- return data_frame
- def fetch_full_stream(self) -> pd.DataFrame:
- """
- Fetch all data from stream (e.g. bridge-cast).
- Must be called after stop_streaming() method.
- It will erase internal dataframe.
- :return: stream dataframe
- """
- if self.__in_acquisition:
- return pd.DataFrame()
- df = self.__df
- self.__df = pd.DataFrame()
- return df
- """
- ================================================================================================================
- Bridge Get Commands
- ================================================================================================================
- """
- def cmd_get_data(self) -> pd.DataFrame:
- """
- Get data from all slave devices.
- :return: dataframe of the network data
- """
- response = self.__port.request_bridge(NBusCommand.CMD_GET_DATA, bytearray())
- return pd.DataFrame([self._parse_bridge_data_from_packet(response[1:], False)])
- def cmd_get_info(self) -> NBusBridgeInfo:
- """
- Get data from all slave devices.
- :return: bridge info structure
- """
- response = self.__port.request_bridge(NBusCommand.CMD_GET_INFO, bytearray())
- fw = str(response[1:4], "ascii")
- hw_family = str(response[4:7], "ascii")
- hw = str(response[7:10], "ascii")
- return NBusBridgeInfo(fw=fw, hw_family=hw_family, hw=hw)
- def cmd_get_format(self) -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
- """
- Get format of all connected devices.
- :return: dict of all device formats
- """
- if not self.__slaves_meta:
- raise NBusErrorAPI(NBusErrorAPIType.SLAVES_NOT_LOADED)
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_FORMAT, bytearray())
- data_offset = 0
- fmt = {}
- while data_offset < resp_length:
- slave_addr = response[data_offset]
- data_offset += NBUS_MA_SIZE
- slave_sensor_cnt = self.__slaves_meta[slave_addr].device_count
- format_len = NBUS_FMT_SIZE * (slave_sensor_cnt.read_only_count + slave_sensor_cnt.read_write_count)
- fmt |= self._set_slave_format_from_response(slave_addr, format_len,
- response[data_offset: data_offset + format_len])
- data_offset += format_len
- self._update_slaves_packet_size()
- return fmt
- def cmd_get_slaves(self) -> dict[NBusModuleAddress, NBusSensorCount]:
- """
- Get base information about all connected slaves and update internal fields.
- :return: dict of device count of every connected module
- """
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_GET_SLAVES, bytearray())
- slaves = {}
- data_offset = 0
- while data_offset < resp_length - 2:
- slave_addr = response[data_offset]
- slave_sensor_cnt = NBusSensorCount(response[data_offset + 1] , response[data_offset + 2])
- slaves[slave_addr] = slave_sensor_cnt
- # update internal memory
- if slave_addr not in self.__slaves_meta:
- self.__slaves_meta[slave_addr] = NBusSlaveMeta(
- NBusSlaveModule(self.__port, slave_addr), slave_sensor_cnt, 0)
- data_offset += 3
- return slaves
- """
- ================================================================================================================
- Bridge Get Commands
- ================================================================================================================
- """
- def cmd_set_reset(self) -> NBusStatusType:
- """
- Send reset command to bridge module.
- :return: status of success
- """
- resp_length, *response = self.__port.request_bridge(NBusCommand.CMD_SET_RESET, bytearray())
- return NBusStatusType(response[0])
- """
- ================================================================================================================
- Internal Methods
- ================================================================================================================
- """
- def __acquire_callback(self) -> None:
- """
- Thread worker for data acquisition.
- """
- while self.__in_acquisition:
- data = self.__port.try_read()
- if not data:
- time.sleep(self.__acquire_delay)
- continue
- with self._lock:
- self.__data_raw.extend(data)
- def _parse_bridge_data_from_packet(self, data_packet: bytearray, check_packet: bool) \
- -> Optional[dict[str, NBusDataValue]]:
- """
- Parse the bridge data packet.
- :param data_packet: packet to parse
- :param check_packet: if crc check is required (only in bridge-cast)
- :return: dictionary of data values or None
- """
- packet_size = len(data_packet)
- packet_crc = crc8(data_packet[:NBUS_CRC_ADDR])
- # check validity
- if check_packet and (packet_size < NBUS_TS_SIZE + NBUS_CRC_SIZE or data_packet[NBUS_CRC_ADDR] != packet_crc):
- return None
- # parse data
- try:
- data_offset = 0
- ts = struct.unpack("<I", data_packet[data_offset : data_offset + NBUS_TS_SIZE])[0]
- data = {"TS": ts}
- data_offset += NBUS_TS_SIZE
- while data_offset < packet_size - NBUS_CRC_SIZE:
- module_addr = data_packet[data_offset]
- data_offset += NBUS_MA_SIZE
- slave_size = self.__slaves_meta[module_addr].packet_size
- packet = data_packet[data_offset : data_offset + slave_size]
- data |= self._parse_slave_data_from_response(module_addr, slave_size, packet)
- data_offset += slave_size
- return data
- except Exception:
- return None
- def _update_slaves_packet_size(self) -> None:
- """
- Update slaves packet size from loaded format.
- """
- for slave_meta in self.__slaves_meta.values():
- packet_size = 0
- for device in slave_meta.module_obj.get_devices().values():
- fmt = device.data_format
- packet_size += fmt.byte_length * fmt.samples + NBUS_SA_SIZE
- slave_meta.packet_size = packet_size
- def _set_slave_format_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: list[int]) \
- -> dict[tuple[NBusModuleAddress, NBusSensorAddress], NBusDataFormat]:
- """
- Parse and set format for all slaves from response.
- :param slave_addr: address of slave module
- :param resp_length: length of response
- :param response: raw data
- :return: parsed formats
- """
- data_offset = 0
- formats = {}
- devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
- # parse format
- while data_offset < resp_length:
- device_id = response[data_offset]
- device_format = NbusCommonParser.format_from_response(response[data_offset : data_offset + NBUS_FMT_SIZE])
- if device_id not in devices:
- devices[device_id] = NBusSensor(self.__port, slave_addr, device_id)
- devices[device_id].data_format = device_format
- formats[(NBusModuleAddress(slave_addr), NBusSensorAddress(device_id))] = device_format
- data_offset += NBUS_FMT_SIZE
- return formats
- def _parse_slave_data_from_response(self, slave_addr: NBusModuleAddress, resp_length: int, response: bytearray) \
- -> dict[str, NBusDataValue]:
- """
- Parse data of single slave from response.
- :param slave_addr: address of selected slave
- :param resp_length: length of response
- :param response: raw data
- :return: dict of data values
- """
- data_offset = 0
- data = {}
- devices = self.__slaves_meta[slave_addr].module_obj.get_devices()
- while data_offset < resp_length:
- device_id = response[data_offset]
- if devices[device_id].data_format is None:
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- values, offset = NbusCommonParser.data_from_response(devices[device_id].data_format, response[data_offset:])
- data_tag = str(slave_addr) + "." + str(device_id)
- for i in range(len(values)):
- data[data_tag + "." + str(i + 1)] = values[i]
- data_offset += offset + NBUS_SA_SIZE
- return data
|