nbus_slave.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. from abc import abstractmethod
  2. from comm import *
  3. class NbusSlave():
  4. slave_params = {1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {},
  5. 14: {}, 15: {}, 16: {}}
  6. def __init__(self, module, comm_port):
  7. self.serial_port = comm_port
  8. self.module = module
  9. @abstractmethod
  10. def evaluate_param_value(self, sensor, param, val):
  11. pass
  12. def cmd_echo(self, msg):
  13. resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg)
  14. echo = ""
  15. if len(resp) == 0:
  16. print("No ECHO (0-size resp)")
  17. return 0
  18. # print(resp)
  19. for r in range(len(msg)):
  20. echo = echo + chr(resp[3 + r])
  21. # print("Echo:" + echo)
  22. return len(resp)
  23. def cmd_set_param(self, sensor, param, value):
  24. print("SET param:", PARAM_NAME[param], "[", param, "]=>", value)
  25. int_to_four_bytes = struct.Struct('<I').pack
  26. y1, y2, y3, y4 = int_to_four_bytes(value & 0xFFFFFFFF)
  27. resp = self.serial_port.request(self.module, sensor, (SET + CMD_PARAM), [param, y1, y2, y3, y4])
  28. self.evaluate_param_value(sensor, param, value)
  29. return resp
  30. def cmd_get_param_module(self, param):
  31. print("GET module param:", PARAM_NAME[param], "[", param, "]")
  32. resp = self.serial_port.request(self.module, 0, CMD_PARAM, [param])
  33. def cmd_get_param(self, sensor, param):
  34. # print("GET param:", PARAM_NAME[param], "[", param, "]")
  35. resp = self.serial_port.request(self.module, sensor, CMD_PARAM, [param])
  36. val = None
  37. if len(resp) > 2:
  38. val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256
  39. return self.evaluate_param_value(sensor, param, val)
  40. return val
  41. def cmd_get_params(self, sensor):
  42. print("GET params:")
  43. resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), [])
  44. return resp
  45. def cmd_sensor_cnt(self):
  46. print("SENSOR CNT")
  47. resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, [])
  48. return resp[3]
  49. def cmd_sensor_type(self, index):
  50. resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, [])
  51. type_struct={}
  52. n = (len(resp) - 4)//2 # pocet typov
  53. for i in range(n):
  54. type_struct[resp[i*2+3]] = resp[i*2+4]
  55. return type_struct
  56. def cmd_sensors_format(self):
  57. resp = self.serial_port.request(self.module, 0, CMD_FORMAT, [])
  58. type_struct={}
  59. n = (len(resp) - 4)//4 # pocet typov
  60. for i in range(n):
  61. fmt={}
  62. b1 = resp[3+i*4+1]
  63. b2 = resp[3+i*4+2]
  64. b3 = resp[3+i*4+3]
  65. fmt["sign"] = (b1 & 0x80)>>7
  66. fmt["multiplier"] = b1 & 0x7F
  67. fmt["value_multiplier"] = b2
  68. fmt["byte_length"] = b3>>4
  69. fmt["samples"] = b2 & 0xF
  70. type_struct[resp[i*4+3]] = fmt
  71. return type_struct
  72. def cmd_sensor_info(self):
  73. resp = self.serial_port.request(self.module, 0, CMD_INFO, [])
  74. info_struct={}
  75. n = (len(resp))
  76. print(n)
  77. if n<34:
  78. return {}
  79. info_struct["name"] = ""
  80. for i in range(8):
  81. if resp[i+3]>=32:
  82. info_struct["name"] += chr(resp[i+3])
  83. info_struct["type"] = ""
  84. for i in range(3):
  85. info_struct["type"] += chr(resp[i+11])
  86. info_struct["id"] = ""
  87. idhw=0
  88. for i in range(4):
  89. idhw = idhw*256 + resp[i+14]
  90. info_struct["id"] = hex(idhw)
  91. info_struct["FW"] = ""
  92. for i in range(3):
  93. info_struct["FW"] += chr(resp[i+18])
  94. info_struct["HW"] = ""
  95. for i in range(3):
  96. info_struct["HW"] += chr(resp[i+21])
  97. info_struct["MEM"] = ""
  98. idhw=0
  99. for i in range(8):
  100. idhw = idhw*256 + resp[i+24]
  101. info_struct["MEM"] = hex(idhw)
  102. info_struct["sensors"] = resp[32]
  103. info_struct["actuators"] = resp[33]
  104. return info_struct
  105. def cmd_sensors_type(self):
  106. return self.cmd_sensor_type(0)
  107. def cmd_sensor_get_data(self, index):
  108. resp = self.serial_port.request(self.module, index, CMD_DATA, [])
  109. if len(resp) > 1:
  110. h = resp[5] * 256 + resp[4]
  111. # print(hex(h))
  112. return h / 4096 * 3.3
  113. return 0
  114. def get_real_value_imu(self, sensor, value):
  115. # value = value/
  116. # if self.slave_params[sensor] == 1:
  117. pass
  118. def _cmd_sensor_get_data(self, sensor_index):
  119. '''
  120. sensor_index:
  121. 1 - ACC
  122. 2 - GYRO
  123. '''
  124. resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, [])
  125. # print(resp)
  126. x = 0
  127. y = 0
  128. z = 0
  129. if len(resp) > 6:
  130. x = resp[5] * 256 + resp[4]
  131. y = resp[7] * 256 + resp[6]
  132. z = resp[9] * 256 + resp[8]
  133. return [x, y, z]
  134. def cmd_sensor_get_data_FSR(self, cnt):
  135. resp = self.serial_port.request(self.module, 0, CMD_DATA, [])
  136. print(resp)
  137. if len(resp) > 1:
  138. for i in range(cnt):
  139. sen = resp[3 + 3 * i]
  140. h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i]
  141. print("FSR", sen, ":\t", h / 4096.0 * 3.3)
  142. def cmd_module_info(self, param):
  143. resp = self.serial_port.request(self.module, 0, CMD_INFO, [param])
  144. if param == 0xE3:
  145. data = 0
  146. for i in range(4):
  147. data = data + resp[i + 3] * pow(256, i)
  148. return hex(data)
  149. if param == 0xE6:
  150. data = 0
  151. for i in range(8):
  152. data = data + resp[i + 3] * pow(256, 7 - i)
  153. return hex(data)
  154. l = len(resp)
  155. data = ""
  156. for i in range(l - 4):
  157. data = data + chr(resp[i + 3])
  158. return data
  159. def cmd_reset(self):
  160. print("MODULE RESET")
  161. resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3)
  162. def cmd_store(self, sensor):
  163. print("MODULE STORE PARAM")
  164. resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1)
  165. def cmd_calibrate(self, sensor):
  166. print(f"MODULE [{self.module}/{sensor}] RUN CALIBRATION")
  167. resp = self.serial_port.request(self.module, sensor, (SET + CMD_CALIBRATE), [], long_answer=3 if sensor == 0 else 1.5)
  168. if len(resp) == 1:
  169. return 0
  170. return resp[3]