example_bridge.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import sys
  2. from nbus_hal.nbus_serial.serial_port import *
  3. from nbus_api.nbus_bridge import NBusBridge
  4. from nbus_hal.nbus_serial.serial_config import *
  5. # example config
  6. config = {
  7. "port_name": "COM4",
  8. "baud": NBusBaudrate.SPEED_921600,
  9. "parity": NBusParity.NONE,
  10. "timeout": 1.0,
  11. "flush_delay": 0.05,
  12. "request_attempts": 1,
  13. "enable_log": False
  14. }
  15. BRIDGE_ACQUIRE_WAIT = 0.5
  16. BRIDGE_STREAM_INTERVAL = 5.0
  17. if __name__ == "__main__":
  18. # create port
  19. port = NBusSerialPort(NBusSerialConfig(**config))
  20. # create module
  21. bridge = NBusBridge(port, BRIDGE_ACQUIRE_WAIT)
  22. # run commands
  23. try:
  24. # initialize bridge
  25. bridge.init()
  26. # test bridge get
  27. print("<<TEST BRIDGE GET>>")
  28. print("CMD GET INFO: ", bridge.cmd_get_info())
  29. print("CMD GET SLAVES: ", bridge.cmd_get_slaves())
  30. print("CMD GET FORMAT: ", bridge.cmd_get_format())
  31. print("CMD GET DATA: ",bridge.cmd_get_data())
  32. # test bridge set
  33. print("<<TEST BRIDGE SET>>")
  34. print("CMD SET RESET: ",bridge.cmd_set_reset())
  35. print("<<TEST BRIDGE STREAMING>>")
  36. print(f"START STREAM FOR {BRIDGE_STREAM_INTERVAL} SECONDS")
  37. bridge.start_streaming()
  38. time.sleep(BRIDGE_STREAM_INTERVAL)
  39. print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk())
  40. print(f"WAIT FOR {BRIDGE_STREAM_INTERVAL} SECONDS")
  41. time.sleep(BRIDGE_STREAM_INTERVAL)
  42. print(f"STOP STREAM AFTER {2*BRIDGE_STREAM_INTERVAL} SECONDS")
  43. bridge.stop_streaming()
  44. print("FETCH STREAM CHUNK:", bridge.fetch_stream_chunk())
  45. print("SAVE FULL STREAM:")
  46. df = bridge.fetch_full_stream()
  47. df.to_csv("data.csv", index=False)
  48. except Exception as e:
  49. print(str(e))
  50. finally:
  51. port.flush()
  52. port.close()