nbus_module_slave.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. import struct
  2. from nbus_api.nbus_sensor import NBusSensor
  3. from nbus_api.nbus_common_parser import NbusCommonParser
  4. from nbus_hal.nbus_serial.serial_port import *
  5. from nbus_types.nbus_address_type import NBusModuleAddress
  6. from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat
  7. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  8. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  9. from nbus_types.nbus_status_type import NBusStatusType
  10. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  11. from nbus_types.nbus_info_type import NBusInfo
  12. from nbus_types.nbus_sensor_type import NBusSensorType
  13. @beartype
  14. class NBusSlaveModule:
  15. """
  16. Class representing nBus slave module.
  17. """
  18. def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
  19. """
  20. Constructor.
  21. :param serial_port: serial port
  22. :param module_address: address of module
  23. """
  24. self.__port = serial_port
  25. self.__module_addr = module_address
  26. self.__params = {}
  27. self.__devices = {}
  28. self._map_param_get = lambda t, v: v # dummy implementation
  29. self._map_param_set = lambda t, v: v # dummy implementation
  30. """
  31. ================================================================================================================
  32. Module General Methods
  33. ================================================================================================================
  34. """
  35. def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
  36. map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]) -> None:
  37. """
  38. Set parameter mappers for module.
  39. :param map_param_get_cb: callback for map param get
  40. :param map_param_set_cb: callback for map param set
  41. """
  42. self._map_param_get = map_param_get_cb
  43. self._map_param_set = map_param_set_cb
  44. def add_sensor(self, sensor: NBusSensor) -> None:
  45. """
  46. Add sensor to container.
  47. :param sensor: nbus sensor
  48. """
  49. sensor.set_parent_module_address(self.__module_addr)
  50. sensor.set_device_port(self.__port)
  51. self.__devices[sensor.address] = sensor
  52. def get_sensor(self, sensor_address: NBusSensorAddress) -> NBusSensor:
  53. """
  54. Get module sensor.
  55. :param sensor_address: address of sensor
  56. :return: sensor
  57. """
  58. return self.__devices[sensor_address]
  59. """
  60. ================================================================================================================
  61. Module Get Commands
  62. ================================================================================================================
  63. """
  64. def cmd_get_echo(self, message: bytearray) -> bool:
  65. """
  66. Get echo from module.
  67. :param message: message to send
  68. :return: status (True = echo, False = no echo)
  69. """
  70. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
  71. return response == list(message)
  72. def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
  73. """
  74. Get single module parameter.
  75. :param parameter: parameter id
  76. :return: parameter value
  77. """
  78. # get response
  79. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
  80. bytearray([parameter.value]))
  81. # parse parameter
  82. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  83. param_val = self._map_param_get(param_id, param_val_raw)
  84. # store parameter value
  85. self.__params[param_id] = param_val
  86. return param_val
  87. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  88. """
  89. Get all module parameters.
  90. :return: dict of parameter id and parameter value
  91. """
  92. # get response
  93. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
  94. # parse parameters
  95. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  96. for param_id, param_val_raw in params_raw:
  97. param_val = self._map_param_get(param_id, param_val_raw)
  98. # store parameters
  99. self.__params[param_id] = param_val
  100. return self.__params.copy()
  101. def cmd_get_sensor_cnt(self) -> NBusSensorCount:
  102. """
  103. Get sensor count.
  104. :return: count of read-only and read-write sensors
  105. """
  106. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
  107. return NBusSensorCount(*response)
  108. def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]:
  109. """
  110. Get data from all module sensors.
  111. :return: dict of device addresses and data values
  112. """
  113. # get data
  114. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
  115. # parse data
  116. begin_idx = 0
  117. data = {}
  118. while begin_idx < resp_length:
  119. device_id = response[begin_idx]
  120. # handle errors
  121. if self.__devices[device_id].data_format is None: # check for format and params
  122. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  123. if not self.__devices[device_id].data_parameters_loaded():
  124. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
  125. values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format,
  126. response[begin_idx:])
  127. data[device_id] = self.__devices[device_id].map_data_get(values)
  128. begin_idx += offset + 1
  129. return data
  130. def cmd_get_info(self) -> NBusInfo:
  131. """
  132. Get module info.
  133. :return: module info
  134. """
  135. response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
  136. name = str(response[1:9], "ascii")
  137. typ = str(response[9:12], "ascii")
  138. uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
  139. hw = str(response[16:19], "ascii")
  140. fw = str(response[19:22], "ascii")
  141. mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
  142. ro_count = int(response[30])
  143. rw_count = int(response[31])
  144. return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id,
  145. read_only_sensors=ro_count, read_write_sensors=rw_count)
  146. def cmd_get_format(self) -> dict[NBusSensorAddress, NBusDataFormat]:
  147. """
  148. Get format of all on-board sensors.
  149. :return: dict of sensor addresses and sensor formats
  150. """
  151. # get response
  152. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
  153. bytearray([]))
  154. begin_idx = 0
  155. formats = {}
  156. # parse format
  157. while begin_idx < resp_length:
  158. device_id = response[begin_idx]
  159. device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
  160. self.__devices[device_id].data_format = device_format
  161. formats[device_id] = device_format
  162. begin_idx += 4
  163. return formats
  164. def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]:
  165. """
  166. Get type of all on-board sensors.
  167. :return: dict of sensor addresses and sensor types
  168. """
  169. # get response
  170. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
  171. bytearray([]))
  172. # parse response
  173. types = {}
  174. i = 0
  175. while i < resp_length - 1:
  176. types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1])
  177. i += 2
  178. return types
  179. """
  180. ================================================================================================================
  181. Module Set Commands
  182. ================================================================================================================
  183. """
  184. def cmd_set_module_stop(self) -> NBusStatusType:
  185. """
  186. Stop automatic measuring.
  187. :return: status
  188. """
  189. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
  190. return NBusStatusType(response[0])
  191. def cmd_set_module_start(self) -> NBusStatusType:
  192. """
  193. Start automatic measuring.
  194. :return: status
  195. """
  196. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
  197. return NBusStatusType(response[0])
  198. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
  199. """
  200. Set module parameter.
  201. :param param: parameter ID
  202. :param value: parameter value
  203. :return: status
  204. """
  205. # create request packet
  206. param_id_raw = struct.pack("B", param.value)
  207. param_val_raw = struct.pack("<I", self._map_param_set(param, value))
  208. param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
  209. # proceed request
  210. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, param_bytes)
  211. # if response is valid, store parameter
  212. if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
  213. self.__params[param] = value
  214. return NBusStatusType(response[1])
  215. def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
  216. -> dict[NBusParameterID, NBusStatusType]:
  217. """
  218. Set multiple module parameters.
  219. :param params: parameters
  220. :return: dict od statuses
  221. """
  222. # scale parameters
  223. for p_id in params.keys():
  224. params[p_id] = self._map_param_set(p_id, params[p_id])
  225. # create request packet
  226. param_bytes = NbusCommonParser.parameters_to_request(params)
  227. # send request
  228. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM,
  229. bytearray(param_bytes))
  230. # parse statuses
  231. statuses = {}
  232. for i in range(0, resp_length - 1, 2):
  233. p_id = NBusParameterID(response[i])
  234. if response[i+1] == NBusStatusType.STATUS_SUCCESS:
  235. self.__params[p_id] = params[p_id] # if success, store param
  236. statuses[p_id] = NBusStatusType(response[i + 1])
  237. return statuses
  238. def cmd_set_calibrate(self) -> NBusStatusType:
  239. """
  240. Send calibration command.
  241. :return: calibration status
  242. """
  243. resp_length, *response = self.__port.request_module(self.__module_addr,
  244. NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
  245. print(response)
  246. return NBusStatusType(response[0])
  247. def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \
  248. -> dict[NBusSensorAddress, NBusStatusType]:
  249. """
  250. Set data to read-write sensors.
  251. :param data:
  252. :return: operation statuses
  253. """
  254. # create request packet
  255. request = []
  256. # transform data
  257. for addr in data.keys():
  258. # handle errors
  259. if self.__devices[addr].data_format is None: # check for format and params
  260. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  261. if not self.__devices[addr].data_parameters_loaded():
  262. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
  263. raw_data = self.__devices[addr].map_data_set(data[addr])
  264. request.append(addr)
  265. request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data))
  266. # send request
  267. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA,
  268. bytearray(request))
  269. # return response statuses
  270. statuses = {}
  271. for i in range(0, resp_length - 1, 2):
  272. statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1])
  273. return statuses