import time import sys from nbus_hal.nbus_serial.serial_port import * from nbus_api.nbus_module_slave import NBusSlaveModule from nbus_api.nbus_bridge import NBusBridge from nbus_hal.nbus_serial.serial_config import * from nbus_api.nbus_sensor import NBusSensor from nbus_types.nbus_parameter_type import NBusParameterID import pandas as pd # example config config = { "port_name": "COM4", "baud": NBusBaudrate.SPEED_921600, "parity": NBusParity.NONE, "timeout": 1.0, "request_attempts": 1, "enable_log": True } if __name__ == "__main__": sys.exit port = NBusSerialPort(NBusSerialConfig(**config)) #module1 = NBusSlaveModule(port, 9) #print(module1.cmd_get_sensor_type()) #sys.exit() #sys.exit(0) bridge = NBusBridge(port, 0.05, 0.05) bridge.init_from_network() print(bridge.cmd_get_data()) print(bridge.cmd_set_reset()) print(bridge.cmd_get_slaves()) print(bridge.cmd_get_format()) bridge.start_data_streaming() time.sleep(5) print(bridge.fetch_chunk_from_stream()) time.sleep(5) bridge.stop_data_streaming() print(bridge.fetch_chunk_from_stream()) print(bridge.fetch_chunk_from_stream()) df = bridge.fetch_all_from_stream() df.to_csv("data.csv", index=False) sys.exit() module1.scan() print(module1.cmd_get_echo(bytearray("ahoj", "ascii"))) print(module1.cmd_get_format()) print(module1.cmd_get_sensor_type()) print(module1.cmd_get_data()) #sys.exit(0) t0 = time.time() port.request_broadcast(NBusCommand.CMD_SET_START, bytearray([])) # time.sleep(1.0) for i in range(200): print(hex(port.get_port().read()[0])) port.request_broadcast(NBusCommand.CMD_SET_STOP, bytearray([])) # algoritmus hazel zvon while port.get_port().in_waiting: time.sleep(0.05) port.get_port().reset_input_buffer() print("flushed baby") # break print(module1.cmd_get_echo(bytearray("ahoj", "ascii")))