| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- from abc import abstractmethod
- from comm import *
- class NbusSlave():
- slave_params = {1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {},
- 14: {}, 15: {}, 16: {}}
- def __init__(self, module, comm_port):
- self.serial_port = comm_port
- self.module = module
- @abstractmethod
- def evaluate_param_value(self, sensor, param, val):
- pass
- def cmd_echo(self, msg):
- resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg)
- echo = ""
- if len(resp) == 0:
- print("No ECHO (0-size resp)")
- return 0
- # print(resp)
- len_req = len(msg) + 4
- if len_req != len(resp):
- return 0
- if len(resp) > 3:
- for r in range(len(msg)):
- echo = echo + chr(resp[3 + r])
- # print("Echo:" + echo)
- return len(resp)
- def cmd_set_param(self, sensor, param, value):
- print("SET param:", PARAM_NAME[param], "[", param, "]=>", value)
- int_to_four_bytes = struct.Struct('<I').pack
- y1, y2, y3, y4 = int_to_four_bytes(value & 0xFFFFFFFF)
- resp = self.serial_port.request(self.module, sensor, (SET + CMD_PARAM), [param, y1, y2, y3, y4])
- self.evaluate_param_value(sensor, param, value)
- return resp
- def cmd_get_param_module(self, param):
- print("GET module param:", PARAM_NAME[param], "[", param, "]")
- resp = self.serial_port.request(self.module, 0, CMD_PARAM, [param])
- def cmd_get_param(self, sensor, param):
- # print("GET param:", PARAM_NAME[param], "[", param, "]")
- resp = self.serial_port.request(self.module, sensor, CMD_PARAM, [param])
- val = None
- if len(resp) > 2:
- val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256
- return self.evaluate_param_value(sensor, param, val)
- return val
- def cmd_get_params(self, sensor):
- print("GET params:")
- resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), [])
- return resp
- def cmd_sensor_cnt(self):
- print("SENSOR CNT")
- resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, [])
- if len(resp)>3:
- return resp[3]
- return 0
- def cmd_sensor_type(self, index):
- resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, [])
- type_struct={}
- n = (len(resp) - 4)//2 # pocet typov
- for i in range(n):
- type_struct[resp[i*2+3]] = resp[i*2+4]
- return type_struct
- def cmd_sensors_format(self):
- resp = self.serial_port.request(self.module, 0, CMD_FORMAT, [])
- type_struct={}
- n = (len(resp) - 4)//4 # pocet typov
- for i in range(n):
- fmt={}
- b1 = resp[3+i*4+1]
- b2 = resp[3+i*4+2]
- b3 = resp[3+i*4+3]
- fmt["sign"] = (b1 & 0x80)>>7
- fmt["multiplier"] = b1 & 0x7F
- fmt["value_multiplier"] = b2
- fmt["byte_length"] = b3>>4
- fmt["samples"] = b2 & 0xF
- type_struct[resp[i*4+3]] = fmt
- return type_struct
- def cmd_sensor_info(self):
- resp = self.serial_port.request(self.module, 0, CMD_INFO, [])
- info_struct={}
- n = (len(resp))
- print(n)
- if n<34:
- return {}
- info_struct["name"] = ""
- for i in range(8):
- if resp[i+3]>=32:
- info_struct["name"] += chr(resp[i+3])
- info_struct["type"] = ""
- for i in range(3):
- info_struct["type"] += chr(resp[i+11])
- info_struct["id"] = ""
- idhw=0
- for i in range(4):
- idhw = idhw*256 + resp[i+14]
- info_struct["id"] = hex(idhw)
- info_struct["FW"] = ""
- for i in range(3):
- info_struct["FW"] += chr(resp[i+18])
- info_struct["HW"] = ""
- for i in range(3):
- info_struct["HW"] += chr(resp[i+21])
- info_struct["MEM"] = ""
- idhw=0
- for i in range(8):
- idhw = idhw*256 + resp[i+24]
- info_struct["MEM"] = hex(idhw)
- info_struct["sensors"] = resp[32]
- info_struct["actuators"] = resp[33]
- return info_struct
- def cmd_sensors_type(self):
- return self.cmd_sensor_type(0)
- def cmd_sensor_get_data(self, index):
- resp = self.serial_port.request(self.module, index, CMD_DATA, [])
- if len(resp) > 1:
- h = resp[5] * 256 + resp[4]
- # print(hex(h))
- return h / 4096 * 3.3
- return 0
- def get_real_value_imu(self, sensor, value):
- # value = value/
- # if self.slave_params[sensor] == 1:
- pass
- def _cmd_sensor_get_data(self, sensor_index):
- '''
- sensor_index:
- 1 - ACC
- 2 - GYRO
- '''
- resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, [])
- # print(resp)
- x = 0
- y = 0
- z = 0
- if len(resp) > 6:
- x = resp[5] * 256 + resp[4]
- y = resp[7] * 256 + resp[6]
- z = resp[9] * 256 + resp[8]
- return [x, y, z]
- def cmd_sensor_get_data_FSR(self, cnt):
- resp = self.serial_port.request(self.module, 0, CMD_DATA, [])
- print(resp)
- if len(resp) > 1:
- for i in range(cnt):
- sen = resp[3 + 3 * i]
- h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i]
- print("FSR", sen, ":\t", h / 4096.0 * 3.3)
- def cmd_module_info(self, param):
- resp = self.serial_port.request(self.module, 0, CMD_INFO, [param])
- if param == 0xE3:
- data = 0
- for i in range(4):
- data = data + resp[i + 3] * pow(256, i)
- return hex(data)
- if param == 0xE6:
- data = 0
- for i in range(8):
- data = data + resp[i + 3] * pow(256, 7 - i)
- return hex(data)
- l = len(resp)
- data = ""
- for i in range(l - 4):
- data = data + chr(resp[i + 3])
- return data
- def cmd_reset(self):
- print("MODULE RESET")
- resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3)
- def cmd_store(self, sensor):
- print("MODULE STORE PARAM")
- resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1)
- def cmd_calibrate(self, sensor):
- print(f"MODULE [{self.module}/{sensor}] RUN CALIBRATION")
- resp = self.serial_port.request(self.module, sensor, (SET + CMD_CALIBRATE), [], long_answer=3 if sensor == 0 else 1.5)
- if len(resp) == 1:
- return 0
- return resp[3]
|