nbus_slave.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from abc import abstractmethod
  2. from comm import *
  3. class NbusSlave():
  4. slave_params = {1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, 11: {}, 12: {}, 13: {},
  5. 14: {}, 15: {}, 16: {}}
  6. def __init__(self, module, comm_port):
  7. self.serial_port = comm_port
  8. self.module = module
  9. @abstractmethod
  10. def evaluate_param_value(self, sensor, param, val):
  11. pass
  12. def cmd_echo(self, msg):
  13. resp = self.serial_port.request(self.module, 0, CMD_ECHO, msg)
  14. echo = ""
  15. if len(resp) == 0:
  16. print("No ECHO (0-size resp)")
  17. return 0
  18. # print(resp)
  19. len_req = len(msg) + 4
  20. if len_req != len(resp):
  21. return 0
  22. if len(resp) > 3:
  23. for r in range(len(msg)):
  24. echo = echo + chr(resp[3 + r])
  25. # print("Echo:" + echo)
  26. return len(resp)
  27. def cmd_set_param(self, sensor, param, value):
  28. print("SET param:", PARAM_NAME[param], "[", param, "]=>", value)
  29. int_to_four_bytes = struct.Struct('<I').pack
  30. y1, y2, y3, y4 = int_to_four_bytes(value & 0xFFFFFFFF)
  31. resp = self.serial_port.request(self.module, sensor, (SET + CMD_PARAM), [param, y1, y2, y3, y4])
  32. self.evaluate_param_value(sensor, param, value)
  33. return resp
  34. def cmd_get_param_module(self, param):
  35. print("GET module param:", PARAM_NAME[param], "[", param, "]")
  36. resp = self.serial_port.request(self.module, 0, CMD_PARAM, [param])
  37. def cmd_get_param(self, sensor, param):
  38. # print("GET param:", PARAM_NAME[param], "[", param, "]")
  39. resp = self.serial_port.request(self.module, sensor, CMD_PARAM, [param])
  40. val = None
  41. if len(resp) > 2:
  42. val = resp[4] + resp[5] * 256 + resp[6] * 256 * 256 + resp[7] * 256 * 256 * 256
  43. return self.evaluate_param_value(sensor, param, val)
  44. return val
  45. def cmd_get_params(self, sensor):
  46. print("GET params:")
  47. resp = self.serial_port.request(self.module, sensor, (CMD_PARAM), [])
  48. return resp
  49. def cmd_sensor_cnt(self):
  50. print("SENSOR CNT")
  51. resp = self.serial_port.request(self.module, 0, CMD_SENSOR_CNT, [])
  52. if len(resp)>3:
  53. return resp[3]
  54. return 0
  55. def cmd_sensor_type(self, index):
  56. resp = self.serial_port.request(self.module, index, CMD_SENSOR_TYPE, [])
  57. type_struct={}
  58. n = (len(resp) - 4)//2 # pocet typov
  59. for i in range(n):
  60. type_struct[resp[i*2+3]] = resp[i*2+4]
  61. return type_struct
  62. def cmd_sensors_format(self):
  63. resp = self.serial_port.request(self.module, 0, CMD_FORMAT, [])
  64. type_struct={}
  65. n = (len(resp) - 4)//4 # pocet typov
  66. for i in range(n):
  67. fmt={}
  68. b1 = resp[3+i*4+1]
  69. b2 = resp[3+i*4+2]
  70. b3 = resp[3+i*4+3]
  71. fmt["sign"] = (b1 & 0x80)>>7
  72. fmt["multiplier"] = b1 & 0x7F
  73. fmt["value_multiplier"] = b2
  74. fmt["byte_length"] = b3>>4
  75. fmt["samples"] = b2 & 0xF
  76. type_struct[resp[i*4+3]] = fmt
  77. return type_struct
  78. def cmd_sensor_info(self):
  79. resp = self.serial_port.request(self.module, 0, CMD_INFO, [])
  80. info_struct={}
  81. n = (len(resp))
  82. print(n)
  83. if n<34:
  84. return {}
  85. info_struct["name"] = ""
  86. for i in range(8):
  87. if resp[i+3]>=32:
  88. info_struct["name"] += chr(resp[i+3])
  89. info_struct["type"] = ""
  90. for i in range(3):
  91. info_struct["type"] += chr(resp[i+11])
  92. info_struct["id"] = ""
  93. idhw=0
  94. for i in range(4):
  95. idhw = idhw*256 + resp[i+14]
  96. info_struct["id"] = hex(idhw)
  97. info_struct["FW"] = ""
  98. for i in range(3):
  99. info_struct["FW"] += chr(resp[i+18])
  100. info_struct["HW"] = ""
  101. for i in range(3):
  102. info_struct["HW"] += chr(resp[i+21])
  103. info_struct["MEM"] = ""
  104. idhw=0
  105. for i in range(8):
  106. idhw = idhw*256 + resp[i+24]
  107. info_struct["MEM"] = hex(idhw)
  108. info_struct["sensors"] = resp[32]
  109. info_struct["actuators"] = resp[33]
  110. return info_struct
  111. def cmd_sensors_type(self):
  112. return self.cmd_sensor_type(0)
  113. def cmd_sensor_get_data(self, index):
  114. resp = self.serial_port.request(self.module, index, CMD_DATA, [])
  115. if len(resp) > 1:
  116. h = resp[5] * 256 + resp[4]
  117. # print(hex(h))
  118. return h / 4096 * 3.3
  119. return 0
  120. def get_real_value_imu(self, sensor, value):
  121. # value = value/
  122. # if self.slave_params[sensor] == 1:
  123. pass
  124. def _cmd_sensor_get_data(self, sensor_index):
  125. '''
  126. sensor_index:
  127. 1 - ACC
  128. 2 - GYRO
  129. '''
  130. resp = self.serial_port.request(self.module, sensor_index, CMD_DATA, [])
  131. # print(resp)
  132. x = 0
  133. y = 0
  134. z = 0
  135. if len(resp) > 6:
  136. x = resp[5] * 256 + resp[4]
  137. y = resp[7] * 256 + resp[6]
  138. z = resp[9] * 256 + resp[8]
  139. return [x, y, z]
  140. def cmd_sensor_get_data_FSR(self, cnt):
  141. resp = self.serial_port.request(self.module, 0, CMD_DATA, [])
  142. print(resp)
  143. if len(resp) > 1:
  144. for i in range(cnt):
  145. sen = resp[3 + 3 * i]
  146. h = resp[5 + 3 * i] * 256 + resp[4 + 3 * i]
  147. print("FSR", sen, ":\t", h / 4096.0 * 3.3)
  148. def cmd_module_info(self, param):
  149. resp = self.serial_port.request(self.module, 0, CMD_INFO, [param])
  150. if param == 0xE3:
  151. data = 0
  152. for i in range(4):
  153. data = data + resp[i + 3] * pow(256, i)
  154. return hex(data)
  155. if param == 0xE6:
  156. data = 0
  157. for i in range(8):
  158. data = data + resp[i + 3] * pow(256, 7 - i)
  159. return hex(data)
  160. l = len(resp)
  161. data = ""
  162. for i in range(l - 4):
  163. data = data + chr(resp[i + 3])
  164. return data
  165. def cmd_reset(self):
  166. print("MODULE RESET")
  167. resp = self.serial_port.request(self.module, 0, (SET + CMD_RESET), [], long_answer=0.3)
  168. def cmd_store(self, sensor):
  169. print("MODULE STORE PARAM")
  170. resp = self.serial_port.request(self.module, sensor, (SET + CMD_STORE), [], long_answer=0.1)
  171. def cmd_calibrate(self, sensor):
  172. print(f"MODULE [{self.module}/{sensor}] RUN CALIBRATION")
  173. resp = self.serial_port.request(self.module, sensor, (SET + CMD_CALIBRATE), [], long_answer=3 if sensor == 0 else 1.5)
  174. if len(resp) == 1:
  175. return 0
  176. return resp[3]