from abc import abstractmethod from comm import * class NbusSlave(): slave_params = {1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {}, 14: {}, 15: {}, 16: {}} def __init__(self, module, comm_port): self.serial_port = comm_port self.module = module @abstractmethod def evaluate_param_value(self, sensor, param, val): pass def cmd_echo(self, msg): resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg) echo = "" if len(resp) == 0: print("No ECHO (0-size resp)") return 0 # print(resp) for r in range(len(msg)): echo = echo + chr(resp[3 + r]) # print("Echo:" + echo) return len(resp) def cmd_set_param(self, sensor, param, value): print("SET param:", PARAM_NAME[param], "[", param, "]=>", value) int_to_four_bytes = struct.Struct(' 2: val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256 return self.evaluate_param_value(sensor, param, val) return val def cmd_get_params(self, sensor): print("GET params:") resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), []) return resp def cmd_sensor_cnt(self): print("SENSOR CNT") resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, []) return resp[3] def cmd_sensor_type(self, index): resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, []) return resp[3] def cmd_sensors_type(self, pocet): resp = self.serial_port.request(self.module, 0, CMD_SENSOR_TYPE, []) typy = []; for i in range(pocet): typy.append([resp[3 + 2 * i], resp[4 + 2 * i]]) return typy def cmd_sensor_get_data(self, index): resp = self.serial_port.request(self.module, index, CMD_DATA, []) if len(resp) > 1: h = resp[5] * 256 + resp[4] # print(hex(h)) return h / 4096 * 3.3 return 0 def get_real_value_imu(self, sensor, value): # value = value/ # if self.slave_params[sensor] == 1: pass def _cmd_sensor_get_data(self, sensor_index): ''' sensor_index: 1 - ACC 2 - GYRO ''' resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, []) # print(resp) x = 0 y = 0 z = 0 if len(resp) > 6: x = resp[5] * 256 + resp[4] y = resp[7] * 256 + resp[6] z = resp[9] * 256 + resp[8] return [x, y, z] def cmd_sensor_get_data_FSR(self, cnt): resp = self.serial_port.request(self.module, 0, CMD_DATA, []) print(resp) if len(resp) > 1: for i in range(cnt): sen = resp[3 + 3 * i] h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i] print("FSR", sen, ":\t", h / 4096.0 * 3.3) def cmd_module_info(self, param): resp = self.serial_port.request(self.module, 0, CMD_INFO, [param]) if param == 0xE3: data = 0 for i in range(4): data = data + resp[i + 3] * pow(256, i) return hex(data) if param == 0xE6: data = 0 for i in range(8): data = data + resp[i + 3] * pow(256, 7 - i) return hex(data) l = len(resp) data = "" for i in range(l - 4): data = data + chr(resp[i + 3]) return data def cmd_reset(self): print("MODULE RESET") resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3) def cmd_store(self, sensor): print("MODULE STORE PARAM") resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1) def cmd_calibrate(self, sensor): print(f"MODULE [{self.module}/{sensor}] RUN CALIBRATION") resp = self.serial_port.request(self.module, sensor, (SET + CMD_CALIBRATE), [], long_answer=3 if sensor == 0 else 1.5) if len(resp) == 1: return 0 return resp[3]