|
|
8 時間 前 | |
|---|---|---|
| .idea | 1 週間 前 | |
| examples | 8 時間 前 | |
| nbus_api | 8 時間 前 | |
| nbus_hal | 1 日 前 | |
| nbus_types | 1 日 前 | |
| .gitignore | 1 日 前 | |
| BEYOND_README.md | 8 時間 前 | |
| LICENSE | 8 時間 前 | |
| README.md | 8 時間 前 |
The nBus Client API is a Python library providing a high-level, strictly object-oriented interface for communicating with devices over the nBus protocol.
It focuses on:
The API exposes three levels of control:
NBusSensor – individual sensorsNBusSlaveModule – modules containing multiple sensorsNBusBridge – top-level interface for managing modules and data streamsAll communication begins by creating an NBusSerialPort with a configuration object:
from nbus_hal.nbus_serial.serial_config import *
config = {
"port_name": "COM4",
"baud": NBusBaudrate.SPEED_921600,
"parity": NBusParity.NONE,
"timeout": 1.0,
"flush_delay": 0.05,
"request_attempts": 1,
"enable_log": False
}
port = NBusSerialPort(NBusSerialConfig(**config))
The Bridge represents the top-level interface for the entire nBus network.
It discovers modules, retrieves global sensor frames, and enables continuous streaming.
import time
from nbus_hal.nbus_serial.serial_port import *
from nbus_api.nbus_bridge import NBusBridge
BRIDGE_ACQUIRE_WAIT = 0.5
BRIDGE_STREAM_INTERVAL = 5.0
port = NBusSerialPort(NBusSerialConfig(**config))
bridge = NBusBridge(port, BRIDGE_ACQUIRE_WAIT)
try:
bridge.init()
# --- Bridge GET commands ---
print("INFO:", bridge.cmd_get_info())
print("SLAVES:", bridge.cmd_get_slaves())
print("FORMAT:", bridge.cmd_get_format())
print("DATA:", bridge.cmd_get_data())
# --- Bridge SET commands ---
print("RESET:", bridge.cmd_set_reset())
# --- Streaming example ---
print(f"START STREAM FOR {BRIDGE_STREAM_INTERVAL} SECONDS")
bridge.start_streaming()
time.sleep(BRIDGE_STREAM_INTERVAL)
print("STREAM CHUNK:", bridge.fetch_stream_chunk())
print(f"STOP STREAM")
bridge.stop_streaming()
print("SAVE FULL STREAM")
df = bridge.fetch_full_stream()
df.to_csv("data.csv", index=False)
finally:
port.flush()
port.close()
A Module manages multiple sensors.
from nbus_api.nbus_module_slave import NBusSlaveModule
from nbus_types.nbus_parameter_type import NBusParameterID
port = NBusSerialPort(NBusSerialConfig(**config))
module = NBusSlaveModule(port, 5)
try:
module.init(False)
devices = module.get_devices()
accelerometer = devices[1]
led = devices[129]
print(module.cmd_get_echo(b"Hello world!"))
print(module.cmd_get_param(NBusParameterID.PARAM_SAMPLERATE))
print(module.cmd_get_all_params())
print(module.cmd_get_sensor_cnt())
print(module.cmd_get_sensor_type())
print(module.cmd_get_info())
print(module.cmd_get_format())
print(module.cmd_get_data())
module.cmd_set_module_stop()
module.cmd_set_module_start()
module.cmd_set_param(NBusParameterID.PARAM_SAMPLERATE, 12345)
params = {
NBusParameterID.PARAM_RANGE: 12,
NBusParameterID.PARAM_RANGE0: 4234
}
module.cmd_set_multi_params(params)
module.cmd_set_calibrate()
module.cmd_set_data({129: [1], 130: [10, -32]})
finally:
port.flush()
port.close()
accelerometer = devices[1]
print(accelerometer.cmd_get_param(NBusParameterID.PARAM_SAMPLERATE))
print(accelerometer.cmd_get_all_params())
print(accelerometer.cmd_get_sensor_type())
print(accelerometer.cmd_get_format())
print(accelerometer.cmd_get_data())
led = devices[129]
led.cmd_set_find(True)
led.cmd_set_param(NBusParameterID.PARAM_SAMPLERATE, 23456)
led.cmd_set_multi_params({
NBusParameterID.PARAM_RANGE: 12,
NBusParameterID.PARAM_RANGE0: 4234
})
led.cmd_set_calibrate()
led.cmd_set_data([1])
print(led.cmd_get_data())
┌──────────────────────────┐
│ NBusBridge │
└──────────────┬───────────┘
│ 2..*
┌──────────────▼───────────┐
│ NBusSlaveModule │
└──────────────┬───────────┘
│ 1..*
┌──────────────▼───────────┐
│ NBusSensor │
└──────────────────────────┘
MIT License
This code is human-written by Matúš Nečas, 2024-2025