import sys from nbus_hal.nbus_serial.serial_port import * from nbus_api.nbus_bridge import NBusBridge from nbus_hal.nbus_serial.serial_config import * # example config config = { "port_name": "COM4", "baud": NBusBaudrate.SPEED_921600, "parity": NBusParity.NONE, "timeout": 1.0, "flush_delay": 0.05, "request_attempts": 1, "enable_log": False } BRIDGE_ACQUIRE_WAIT = 0.5 BRIDGE_STREAM_INTERVAL = 5.0 if __name__ == "__main__": # create port port = NBusSerialPort(NBusSerialConfig(**config)) # create module bridge = NBusBridge(port, BRIDGE_ACQUIRE_WAIT) # run commands try: # initialize bridge bridge.init() # test bridge get print("<>") print("CMD GET INFO: ", bridge.cmd_get_info()) print("CMD GET SLAVES: ", bridge.cmd_get_slaves()) print("CMD GET FORMAT: ", bridge.cmd_get_format()) print("CMD GET DATA: ",bridge.cmd_get_data()) # test bridge set print("<>") print("CMD SET RESET: ",bridge.cmd_set_reset()) print("<>") print(f"START STREAM FOR {BRIDGE_STREAM_INTERVAL} SECONDS") bridge.start_streaming() time.sleep(BRIDGE_STREAM_INTERVAL) print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk()) print(f"WAIT FOR {BRIDGE_STREAM_INTERVAL} SECONDS") time.sleep(BRIDGE_STREAM_INTERVAL) print(f"STOP STREAM AFTER {2*BRIDGE_STREAM_INTERVAL} SECONDS") bridge.stop_streaming() print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk()) print("SAVE FULL STREAM:") df = bridge.fetch_full_stream() df.to_csv("data.csv", index=False) except Exception as e: print(str(e)) finally: port.flush() port.close()