| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- import sys
- from nbus_hal.nbus_serial.serial_port import *
- from nbus_api.nbus_bridge import NBusBridge
- from nbus_hal.nbus_serial.serial_config import *
- # example config
- config = {
- "port_name": "COM4",
- "baud": NBusBaudrate.SPEED_921600,
- "parity": NBusParity.NONE,
- "timeout": 1.0,
- "flush_delay": 0.05,
- "request_attempts": 1,
- "enable_log": False
- }
- BRIDGE_ACQUIRE_WAIT = 0.5
- BRIDGE_STREAM_INTERVAL = 5.0
- if __name__ == "__main__":
- # create port
- port = NBusSerialPort(NBusSerialConfig(**config))
- # create module
- bridge = NBusBridge(port, BRIDGE_ACQUIRE_WAIT)
- # run commands
- try:
- # initialize bridge
- bridge.init()
- # test bridge get
- print("<<TEST BRIDGE GET>>")
- print("CMD GET INFO: ", bridge.cmd_get_info())
- print("CMD GET SLAVES: ", bridge.cmd_get_slaves())
- print("CMD GET FORMAT: ", bridge.cmd_get_format())
- print("CMD GET DATA: ",bridge.cmd_get_data())
- # test bridge set
- print("<<TEST BRIDGE SET>>")
- print("CMD SET RESET: ",bridge.cmd_set_reset())
- print("<<TEST BRIDGE STREAMING>>")
- print(f"START STREAM FOR {BRIDGE_STREAM_INTERVAL} SECONDS")
- bridge.start_streaming()
- time.sleep(BRIDGE_STREAM_INTERVAL)
- print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk())
- print(f"WAIT FOR {BRIDGE_STREAM_INTERVAL} SECONDS")
- time.sleep(BRIDGE_STREAM_INTERVAL)
- print(f"STOP STREAM AFTER {2*BRIDGE_STREAM_INTERVAL} SECONDS")
- bridge.stop_streaming()
- print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk())
- print("SAVE FULL STREAM:")
- df = bridge.fetch_full_stream()
- df.to_csv("data.csv", index=False)
- except Exception as e:
- print(e)
- finally:
- port.flush()
- port.close()
|