# nBus Client API The **nBus Client API** is a Python library providing a high-level, strictly object-oriented interface for communicating with devices over the **nBus protocol**. It focuses on: - Clean architecture - Strong runtime type safety (via *Beatype*) - Predictable error handling - Maintainable abstractions for multi-sensor systems The API exposes **three levels of control**: 1. **`NBusSensor`** – individual sensors 2. **`NBusSlaveModule`** – modules containing multiple sensors 3. **`NBusBridge`** – top-level interface for managing modules and data streams ## Serial Configuration All communication begins by creating an `NBusSerialPort` with a configuration object: ```python from nbus_hal.nbus_serial.serial_config import * config = { "port_name": "COM4", "baud": NBusBaudrate.SPEED_921600, "parity": NBusParity.NONE, "timeout": 1.0, "flush_delay": 0.05, "request_attempts": 1, "enable_log": False } port = NBusSerialPort(NBusSerialConfig(**config)) ``` --- ## 1. Using the NBusBridge The **Bridge** represents the top-level interface for the entire nBus network. It discovers modules, retrieves global sensor frames, and enables continuous streaming. ### Example ```python import time from nbus_hal.nbus_serial.serial_port import * from nbus_api.nbus_bridge import NBusBridge BRIDGE_ACQUIRE_WAIT = 0.5 BRIDGE_STREAM_INTERVAL = 5.0 port = NBusSerialPort(NBusSerialConfig(**config)) bridge = NBusBridge(port, BRIDGE_ACQUIRE_WAIT) try: bridge.init() # --- Bridge GET commands --- print("INFO:", bridge.cmd_get_info()) print("SLAVES:", bridge.cmd_get_slaves()) print("FORMAT:", bridge.cmd_get_format()) print("DATA:", bridge.cmd_get_data()) # --- Bridge SET commands --- print("RESET:", bridge.cmd_set_reset()) # --- Streaming example --- print(f"START STREAM FOR {BRIDGE_STREAM_INTERVAL} SECONDS") bridge.start_streaming() time.sleep(BRIDGE_STREAM_INTERVAL) print("STREAM CHUNK:", bridge.fetch_stream_chunk()) print(f"STOP STREAM") bridge.stop_streaming() print("SAVE FULL STREAM") df = bridge.fetch_full_stream() df.to_csv("data.csv", index=False) finally: port.flush() port.close() ``` --- ## 2. Using the NBusSlaveModule A **Module** manages multiple sensors. ### Example ```python from nbus_api.nbus_module_slave import NBusSlaveModule from nbus_types.nbus_parameter_type import NBusParameterID port = NBusSerialPort(NBusSerialConfig(**config)) module = NBusSlaveModule(port, 5) try: module.init(False) devices = module.get_devices() accelerometer = devices[1] led = devices[129] print(module.cmd_get_echo(b"Hello world!")) print(module.cmd_get_param(NBusParameterID.PARAM_SAMPLERATE)) print(module.cmd_get_all_params()) print(module.cmd_get_sensor_cnt()) print(module.cmd_get_sensor_type()) print(module.cmd_get_info()) print(module.cmd_get_format()) print(module.cmd_get_data()) module.cmd_set_module_stop() module.cmd_set_module_start() module.cmd_set_param(NBusParameterID.PARAM_SAMPLERATE, 12345) params = { NBusParameterID.PARAM_RANGE: 12, NBusParameterID.PARAM_RANGE0: 4234 } module.cmd_set_multi_params(params) module.cmd_set_calibrate() module.cmd_set_data({129: [1], 130: [10, -32]}) finally: port.flush() port.close() ``` --- ## 3. Using NBusSensor ### Example ```python accelerometer = devices[1] print(accelerometer.cmd_get_param(NBusParameterID.PARAM_SAMPLERATE)) print(accelerometer.cmd_get_all_params()) print(accelerometer.cmd_get_sensor_type()) print(accelerometer.cmd_get_format()) print(accelerometer.cmd_get_data()) led = devices[129] led.cmd_set_find(True) led.cmd_set_param(NBusParameterID.PARAM_SAMPLERATE, 23456) led.cmd_set_multi_params({ NBusParameterID.PARAM_RANGE: 12, NBusParameterID.PARAM_RANGE0: 4234 }) led.cmd_set_calibrate() led.cmd_set_data([1]) print(led.cmd_get_data()) ``` --- ## API Architecture Overview ``` ┌──────────────────────────┐ │ NBusBridge │ └──────────────┬───────────┘ │ 2..* ┌──────────────▼───────────┐ │ NBusSlaveModule │ └──────────────┬───────────┘ │ 1..* ┌──────────────▼───────────┐ │ NBusSensor │ └──────────────────────────┘ ``` ## License MIT License ## Author [*This code is human-written*](BEYOND_README.md) by Matúš Nečas, 2024-2025