浏览代码

py skripty

Juraj Ďuďák 1 年之前
父节点
当前提交
5e9d8ce8d6
共有 6 个文件被更改,包括 287 次插入228 次删除
  1. 1 1
      Modules/icm20948
  2. 6 0
      test/.idea/other.xml
  3. 50 227
      test/app.py
  4. 64 0
      test/imu_slave.py
  5. 136 0
      test/nbus_slave.py
  6. 30 0
      test/nbus_system.py

+ 1 - 1
Modules/icm20948

@@ -1 +1 @@
-Subproject commit 9fa1b82af75c7e11acfae01da92b422799f131a4
+Subproject commit acb82f70b604b3e4adbcf0015d02f7536cb21b9c

+ 6 - 0
test/.idea/other.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="PySciProjectComponent">
+    <option name="PY_INTERACTIVE_PLOTS_SUGGESTED" value="true" />
+  </component>
+</project>

+ 50 - 227
test/app.py

@@ -1,277 +1,100 @@
-from comm import *
 import sys
-import time
-from struct import *
+from nbus_slave import *
+from nbus_system import *
+import matplotlib.pyplot as plt
+from imu_slave import *
+import numpy as np
 
-
-class NbusSystem:
-    slaves = []
-
-    def __init__(self, debug = False):
-        self.serial_port = SerialComm('/dev/ttyUSB0', debug)
-
-    def create_slave(self, slave):
-        slave = NbusSlave(slave, self.serial_port)
-        return slave
-
-    def finish(self):
-        self.serial_port.close()
-
-
-class NbusSlave:
-
-    def __init__(self, module, comm_port):
-        self.serial_port = comm_port
-        self.module = module
-
-    # def cmd_version(self):
-    #     resp = self.serial_port.request(self.module, 0, CMD_VERSION,[])
-    #     version = chr(resp[3])+chr(resp[4])+chr(resp[5])
-    #     print("Version: "+version)
-
-    def cmd_echo(self, msg):
-        resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg)
-        echo = ""
-        if len(resp) == 0:
-            print("No ECHO (0-size resp)")
-            return 0
-        # print(resp)
-        for r in range(len(msg)):
-            echo = echo + chr(resp[3 + r])
-        # print("Echo:" + echo)
-        return len(resp)
-
-    def cmd_set_param(self, sensor, param, value):
-        print("SET param:", PARAM_NAME[param], "[", param, "]=>", value)
-        int_to_four_bytes = struct.Struct('<I').pack
-        y1, y2, y3, y4 = int_to_four_bytes(value & 0xFFFFFFFF)
-        resp = self.serial_port.request(self.module, sensor, (SET + CMD_PARAM), [param, y1, y2, y3, y4])
-
-    def cmd_get_param_module(self, param):
-        print("GET module param:", PARAM_NAME[param], "[", param, "]")
-        resp = self.serial_port.request(self.module, 0, CMD_PARAM, [param])
-
-    def cmd_get_param(self, sensor, param):
-        # print("GET param:", PARAM_NAME[param], "[", param, "]")
-        resp = self.serial_port.request(self.module, sensor, CMD_PARAM, [param])
-        val = None
-
-        if len(resp) > 2:
-            val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256
-        print(val)
-        value = -1
-        if param == PARAM_RANGE:
-            value = (val+1)**2    # +/- 2, 4, 8, 16g
-        if param == PARAM_SAMPLERATE:
-            if val == 1:
-                value = 562.5
-            if val == 3:
-                value = 281.3
-            if val == 5:
-                value = 187.5
-            if val == 7:
-                value = 140.6
-            if val == 10:
-                value = 102.3
-            if val == 15:
-                value = 70.3
-            if val == 22:
-                value = 48.9
-            if val == 31:
-                value = 32.2
-            if val == 63:
-                value = 17.6
-            if val == 127:
-                value = 8.8
-            if val == 255:
-                value = 4.4
-            if val == 513:
-                value = 2.2
-            if val == 1022:
-                value = 1.1
-            if val == 2044:
-                value = 0.55
-            if val == 4095:
-                value = 0.27
-
-        if param == PARAM_FILTER:
-            value = (val-1)
-        return value
-
-    def cmd_get_params(self, sensor):
-        print("GET params:")
-        resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), [])
-        return resp
-
-    def cmd_sensor_cnt(self):
-        print("SENSOR CNT")
-        resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, [])
-        return resp[3]
-
-    def cmd_sensor_type(self, index):
-        resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, [])
-        return resp[3]
-
-    def cmd_sensors_type(self, pocet):
-        resp = self.serial_port.request(self.module, 0, CMD_SENSOR_TYPE, [])
-        typy = [];
-        for i in range(pocet):
-            typy.append([resp[3 + 2 * i], resp[4 + 2 * i]])
-        return typy
-
-    def cmd_sensor_get_data(self, index):
-        resp = self.serial_port.request(self.module, index, CMD_DATA, [])
-        if len(resp) > 1:
-            h = resp[5] * 256 + resp[4]
-            # print(hex(h))
-            return h / 4096 * 3.3
-        return 0
-
-    def cmd_sensor_get_data_IMU(self, sensor_index):
-        '''
-        sensor_index:
-        1 - ACC
-        2 - GYRO
-        '''
-        resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, [])
-        # print(resp)
-        x = 0
-        y = 0
-        z = 0
-        if len(resp) > 6:
-            x = resp[5] * 256 + resp[4]
-            y = resp[7] * 256 + resp[6]
-            z = resp[9] * 256 + resp[8]
-            if x > 32768:
-                x = x - 65535
-            if y > 32768:
-                y = y - 65535
-            if z > 32768:
-                z = z - 65535
-        return [x, y, z]
-
-    def cmd_sensor_get_data_FSR(self, cnt):
-        resp = self.serial_port.request(self.module, 0, CMD_DATA, [])
-        print(resp)
-        if len(resp) > 1:
-            for i in range(cnt):
-                sen = resp[3 + 3 * i]
-                h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i]
-                print("FSR", sen, ":\t", h / 4096.0 * 3.3)
-
-    def cmd_module_info(self, param):
-        resp = self.serial_port.request(self.module, 0, CMD_INFO, [param])
-        if param == 0xE3:
-            data = 0
-            for i in range(4):
-                data = data + resp[i + 3] * pow(256, i)
-            return hex(data)
-        if param == 0xE6:
-            data = 0
-            for i in range(8):
-                data = data + resp[i + 3] * pow(256, 7 - i)
-            return hex(data)
-        l = len(resp)
-        data = ""
-        for i in range(l - 4):
-            data = data + chr(resp[i + 3])
-        return data
-
-    def cmd_module_start(self):
-        print("MODULE START")
-        self.serial_port.requestBroadcast(CMD_START, [])
-
-    def cmd_module_stop(self):
-        print("MODULE STOP")
-        self.serial_port.requestBroadcast(CMD_STOP, [])
-
-    def cmd_reset(self):
-        print("MODULE RESET")
-        resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3)
-
-    def cmd_store(self, sensor):
-        print("MODULE STORE PARAM")
-        resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1)
+def show_data(data):
+    fig, ax = plt.subplots()
+    ax.plot(data_acc)
+    ax.grid()
+    plt.show()
 
 
 if __name__ == "__main__":
 
     nbus = NbusSystem(False)
-    slave1 = nbus.create_slave(5)
-    slave1.cmd_module_stop()
-    if slave1.cmd_echo([97, 98, 99, 100]) == 0:
+    s1_adr = 5
+    slave1 = nbus.create_slave(ImuSlave, s1_adr)
+
+    if nbus.get_slave(s1_adr).cmd_echo([97, 98, 99, 100]) == 0:
         sys.exit()
 
     # app.cmd_reset()
     store_param = False
     if store_param:
-        slave1.cmd_set_param(1, PARAM_SAMPLERATE, 1)
-        slave1.cmd_set_param(1, PARAM_RANGE, 1)
-        slave1.cmd_set_param(1, PARAM_FILTER, 2)
+        nbus.get_slave(s1_adr).cmd_set_param(1, PARAM_SAMPLERATE, 1)
+        nbus.get_slave(s1_adr).cmd_set_param(1, PARAM_RANGE, 0)
+        nbus.get_slave(s1_adr).cmd_set_param(1, PARAM_FILTER, 2)
 
-        slave1.cmd_store(1)
+        nbus.get_slave(s1_adr).cmd_store(1)
 
         sys.exit()
 
     # !!!! slave1.cmd_set_param(1, PARAM_SAMPLERATE, 10)
     # !!!! slave1.cmd_set_param(2, PARAM_SAMPLERATE, 10)
     for s in range(1):
-        sr = slave1.cmd_get_param(s + 1, PARAM_SAMPLERATE)
-        r = slave1.cmd_get_param(s + 1, PARAM_RANGE)
-        lpf = slave1.cmd_get_param(s + 1, PARAM_FILTER)
-        ee = slave1.cmd_get_param(s + 1, PARAM_GAIN)
+        sr = nbus.get_slave(s1_adr).cmd_get_param(s + 1, PARAM_SAMPLERATE)
+        r = nbus.get_slave(s1_adr).cmd_get_param(s + 1, PARAM_RANGE)
+        lpf = nbus.get_slave(s1_adr).cmd_get_param(s + 1, PARAM_FILTER)
+        ee = nbus.get_slave(s1_adr).cmd_get_param(s + 1, PARAM_GAIN)
         print(sr, r, lpf, ee)
 
-    sr = slave1.cmd_get_param(1, PARAM_SAMPLERATE)
+    sr = nbus.get_slave(s1_adr).cmd_get_param(1, PARAM_SAMPLERATE)
 
-    # time.sleep(0.5)
-    # app.cmd_module_stop()
+    nbus.get_slave(s1_adr).cmd_set_param(1, PARAM_RANGE, 1)
+    print(nbus.get_slave(s1_adr).slave_params)
 
-    slave1.cmd_module_start()
+    nbus.cmd_start()
     time.sleep(0.1)
 
-    # app.cmd_module_stop()
-    sys.exit()
-    pocet = slave1.cmd_sensor_cnt()
+    # nbus.cmd_stop()
+    # sys.exit()
+    pocet = nbus.get_slave(s1_adr).cmd_sensor_cnt()
     # sys.exit()
     print("pocet senzorov=", pocet)
-    # app.cmd_sensor_get_data_FSR(pocet)
-    for i in range(50):
-        acc = slave1.cmd_sensor_get_data_IMU(1)
-        # gyr = slave1.cmd_sensor_get_data_IMU(2)
-        print(acc)
+
+    data_acc = []
+    for i in range(164):
+        acc = nbus.get_slave(s1_adr).cmd_sensor_get_data_IMU(1)
+        # gyr = nbus.get_slave(s1_adr).cmd_sensor_get_data_IMU(2)
+        data_acc.append(acc)
         # print(gyr)
-    slave1.cmd_module_stop()
+        print(".", end=" ")
+    nbus.cmd_stop()
+    show_data(data_acc)
+
     # sys.exit()
 
     for i in range(pocet):
-        d = slave1.cmd_sensor_get_data(i + 1)
+        d = nbus.get_slave(s1_adr).cmd_sensor_get_data(i + 1)
         print("Data", i + 1, ":\t", d)
 
     print("Sensor type")
     for i in range(pocet):
-        print(slave1.cmd_sensor_type(i + 1))
+        print(nbus.get_slave(s1_adr).cmd_sensor_type(i + 1))
     print("Sensors type")
-    print(slave1.cmd_sensors_type(pocet))
+    print(nbus.get_slave(s1_adr).cmd_sensors_type(pocet))
 
-    par = slave1.cmd_get_params(1)
+    par = nbus.get_slave(s1_adr).cmd_get_params(1)
     print(par)
 
-    r = slave1.cmd_module_info(0xE1)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE1)
     print('NAME:', r)
-    r = slave1.cmd_module_info(0xE2)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE2)
     print('TYPE', r)
 
-    r = slave1.cmd_module_info(0xE4)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE4)
     print('FW', r)
 
-    r = slave1.cmd_module_info(0xE5)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE5)
     print('HW', r)
 
-    r = slave1.cmd_module_info(0xE3)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE3)
     print("UUID", r)
 
-    r = slave1.cmd_module_info(0xE6)
+    r = nbus.get_slave(s1_adr).cmd_module_info(0xE6)
     print("MEM ID", r)
-    sr = slave1.cmd_get_param(0, PARAM_SAMPLERATE)
+
     nbus.finish()

+ 64 - 0
test/imu_slave.py

@@ -0,0 +1,64 @@
+from nbus_slave import *
+
+
+class ImuSlave(NbusSlave):
+
+    def __init__(self, module, comm_port):
+        NbusSlave.__init__(self, module, comm_port)
+
+    def evaluate_param_value(self, sensor, param, val):
+        value = -1
+        if param == PARAM_RANGE:
+            value = 2 ** (val + 1)  # +/- 2, 4, 8, 16g
+        if param == PARAM_SAMPLERATE:
+            if val == 1:
+                value = 562.5
+            if val == 3:
+                value = 281.3
+            if val == 5:
+                value = 187.5
+            if val == 7:
+                value = 140.6
+            if val == 10:
+                value = 102.3
+            if val == 15:
+                value = 70.3
+            if val == 22:
+                value = 48.9
+            if val == 31:
+                value = 32.2
+            if val == 63:
+                value = 17.6
+            if val == 127:
+                value = 8.8
+            if val == 255:
+                value = 4.4
+            if val == 513:
+                value = 2.2
+            if val == 1022:
+                value = 1.1
+            if val == 2044:
+                value = 0.55
+            if val == 4095:
+                value = 0.27
+
+        if param == PARAM_FILTER:
+            value = (val - 1)
+
+        self.slave_params[sensor][param] = value
+
+        return value
+
+    def cmd_sensor_get_data_IMU(self, sensor_index):
+        [x, y, z] = self._cmd_sensor_get_data(sensor_index)
+        z = z - 16384 / (self.slave_params[sensor_index][PARAM_RANGE]/2)
+        if x > 32768:
+            x = x - 65535
+        if y > 32768:
+            y = y - 65535
+        if z > 32768:
+            z = z - 65535
+        x = x/16384 * (self.slave_params[sensor_index][PARAM_RANGE]/2)
+        y = y/16384 * (self.slave_params[sensor_index][PARAM_RANGE]/2)
+        z = z/16384 * (self.slave_params[sensor_index][PARAM_RANGE]/2)
+        return [x, y, z]

+ 136 - 0
test/nbus_slave.py

@@ -0,0 +1,136 @@
+from abc import abstractmethod
+
+from comm import *
+
+
+class NbusSlave():
+    slave_params = {1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {},
+                    14: {}, 15: {}, 16: {}}
+
+    def __init__(self, module, comm_port):
+        self.serial_port = comm_port
+        self.module = module
+
+    @abstractmethod
+    def evaluate_param_value(self, sensor, param, val):
+        pass
+
+    def cmd_echo(self, msg):
+        resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg)
+        echo = ""
+        if len(resp) == 0:
+            print("No ECHO (0-size resp)")
+            return 0
+        # print(resp)
+        for r in range(len(msg)):
+            echo = echo + chr(resp[3 + r])
+        # print("Echo:" + echo)
+        return len(resp)
+
+    def cmd_set_param(self, sensor, param, value):
+        print("SET param:", PARAM_NAME[param], "[", param, "]=>", value)
+        int_to_four_bytes = struct.Struct('<I').pack
+        y1, y2, y3, y4 = int_to_four_bytes(value & 0xFFFFFFFF)
+        resp = self.serial_port.request(self.module, sensor, (SET + CMD_PARAM), [param, y1, y2, y3, y4])
+        self.evaluate_param_value(sensor, param, value)
+        return resp
+
+    def cmd_get_param_module(self, param):
+        print("GET module param:", PARAM_NAME[param], "[", param, "]")
+        resp = self.serial_port.request(self.module, 0, CMD_PARAM, [param])
+
+    def cmd_get_param(self, sensor, param):
+        # print("GET param:", PARAM_NAME[param], "[", param, "]")
+        resp = self.serial_port.request(self.module, sensor, CMD_PARAM, [param])
+        val = None
+
+        if len(resp) > 2:
+            val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256
+            return self.evaluate_param_value(sensor, param, val)
+        return val
+
+    def cmd_get_params(self, sensor):
+        print("GET params:")
+        resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), [])
+        return resp
+
+    def cmd_sensor_cnt(self):
+        print("SENSOR CNT")
+        resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, [])
+        return resp[3]
+
+    def cmd_sensor_type(self, index):
+        resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, [])
+        return resp[3]
+
+    def cmd_sensors_type(self, pocet):
+        resp = self.serial_port.request(self.module, 0, CMD_SENSOR_TYPE, [])
+        typy = [];
+        for i in range(pocet):
+            typy.append([resp[3 + 2 * i], resp[4 + 2 * i]])
+        return typy
+
+    def cmd_sensor_get_data(self, index):
+        resp = self.serial_port.request(self.module, index, CMD_DATA, [])
+        if len(resp) > 1:
+            h = resp[5] * 256 + resp[4]
+            # print(hex(h))
+            return h / 4096 * 3.3
+        return 0
+
+    def get_real_value_imu(self, sensor, value):
+        # value = value/
+        # if self.slave_params[sensor] == 1:
+        pass
+
+    def _cmd_sensor_get_data(self, sensor_index):
+        '''
+        sensor_index:
+        1 - ACC
+        2 - GYRO
+        '''
+        resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, [])
+        # print(resp)
+        x = 0
+        y = 0
+        z = 0
+        if len(resp) > 6:
+            x = resp[5] * 256 + resp[4]
+            y = resp[7] * 256 + resp[6]
+            z = resp[9] * 256 + resp[8]
+        return [x, y, z]
+
+    def cmd_sensor_get_data_FSR(self, cnt):
+        resp = self.serial_port.request(self.module, 0, CMD_DATA, [])
+        print(resp)
+        if len(resp) > 1:
+            for i in range(cnt):
+                sen = resp[3 + 3 * i]
+                h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i]
+                print("FSR", sen, ":\t", h / 4096.0 * 3.3)
+
+    def cmd_module_info(self, param):
+        resp = self.serial_port.request(self.module, 0, CMD_INFO, [param])
+        if param == 0xE3:
+            data = 0
+            for i in range(4):
+                data = data + resp[i + 3] * pow(256, i)
+            return hex(data)
+        if param == 0xE6:
+            data = 0
+            for i in range(8):
+                data = data + resp[i + 3] * pow(256, 7 - i)
+            return hex(data)
+        l = len(resp)
+        data = ""
+        for i in range(l - 4):
+            data = data + chr(resp[i + 3])
+        return data
+
+    def cmd_reset(self):
+        print("MODULE RESET")
+        resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3)
+
+    def cmd_store(self, sensor):
+        print("MODULE STORE PARAM")
+        resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1)

+ 30 - 0
test/nbus_system.py

@@ -0,0 +1,30 @@
+from nbus_slave import *
+from imu_slave import *
+
+
+class NbusSystem:
+    slaves = {}
+
+    def __init__(self, debug=False):
+        self.serial_port = SerialComm('/dev/ttyUSB0', debug)
+
+    def create_slave(self, clazz, slave_address):
+        slave = None
+        if type(clazz) is type(ImuSlave):
+            slave = ImuSlave(slave_address, self.serial_port)
+        self.slaves[slave_address] = slave
+        return slave
+
+    def finish(self):
+        self.serial_port.close()
+
+    def cmd_start(self):
+        print("MODULE START")
+        self.serial_port.requestBroadcast(CMD_START, [])
+
+    def cmd_stop(self):
+        print("MODULE STOP")
+        self.serial_port.requestBroadcast(CMD_STOP, [])
+
+    def get_slave(self, address):
+        return self.slaves[address]