nbus_module_slave.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import struct
  2. from nbus_api.nbus_sensor import NBusSensor
  3. from nbus_api.nbus_common_parser import NbusCommonParser
  4. from nbus_hal.nbus_generic_port import *
  5. from nbus_types.nbus_address_type import NBusModuleAddress
  6. from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat
  7. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  8. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  9. from nbus_types.nbus_status_type import NBusStatusType
  10. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  11. from nbus_types.nbus_info_type import NBusModuleInfo
  12. from nbus_types.nbus_sensor_type import NBusSensorType
  13. @beartype
  14. class NBusSlaveModule:
  15. """
  16. Class representing nBus slave module.
  17. """
  18. def __init__(self, port: NBusPort, module_address: NBusModuleAddress):
  19. """
  20. Constructor.
  21. :param port: serial port
  22. :param module_address: address of module
  23. :param device_cnt: number of devices
  24. """
  25. self.__port = port
  26. self.__module_addr = module_address
  27. self.__params = {}
  28. self.__devices = {}
  29. """
  30. ================================================================================================================
  31. Module General Methods
  32. ================================================================================================================
  33. """
  34. def init(self, load_format: bool) -> None:
  35. """
  36. Initialize the module from hardware.
  37. :param load_format: flag to fetch data format
  38. """
  39. sensors = self.cmd_get_sensor_type()
  40. for sen_address, sen_type in sensors.items():
  41. self.__devices[sen_address] = NBusSensor(self.__port, self.__module_addr, sen_address)
  42. self.__devices[sen_address].type = sen_type
  43. if load_format:
  44. self.cmd_get_format()
  45. def get_devices(self) -> dict[NBusSensorAddress, NBusSensor]:
  46. """
  47. Get module devices.
  48. :return: dictionary of connected devices
  49. """
  50. return self.__devices
  51. """
  52. ================================================================================================================
  53. Module Get Commands
  54. ================================================================================================================
  55. """
  56. def cmd_get_echo(self, message: bytearray) -> bool:
  57. """
  58. Get echo from module.
  59. :param message: message to send
  60. :return: status (True = echo, False = no echo)
  61. """
  62. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
  63. return response == list(message)
  64. def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
  65. """
  66. Get single module parameter.
  67. :param parameter: parameter id
  68. :return: parameter value
  69. """
  70. # get response
  71. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
  72. bytearray([parameter.value]))
  73. # parse parameter
  74. param_id, param_val = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  75. # store parameter value
  76. self.__params[param_id] = param_val
  77. return param_val
  78. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  79. """
  80. Get all module parameters.
  81. :return: dict of parameter id and parameter value
  82. """
  83. # get response
  84. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
  85. # parse parameters
  86. params = NbusCommonParser.parameters_from_response(resp_len, response)
  87. for param_id, param_val in params:
  88. # store parameters
  89. self.__params[param_id] = param_val
  90. return self.__params.copy()
  91. def cmd_get_sensor_cnt(self) -> NBusSensorCount:
  92. """
  93. Get sensor count.
  94. :return: count of read-only and read-write sensors
  95. """
  96. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
  97. return NBusSensorCount(*response)
  98. def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]:
  99. """
  100. Get data from all module sensors.
  101. :return: dict of device addresses and data values
  102. """
  103. # get data
  104. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
  105. # parse data
  106. begin_idx = 0
  107. data = {}
  108. while begin_idx < resp_length:
  109. device_id = response[begin_idx]
  110. # handle errors
  111. if self.__devices[device_id].data_format is None: # check for format and params
  112. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  113. values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format,
  114. response[begin_idx:])
  115. data[device_id] = values
  116. begin_idx += offset + 1
  117. return data
  118. def cmd_get_info(self) -> NBusModuleInfo:
  119. """
  120. Get module info.
  121. :return: module info
  122. """
  123. response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
  124. name = str(response[1:9], "ascii")
  125. typ = str(response[9:12], "ascii")
  126. uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
  127. hw = str(response[16:19], "ascii")
  128. fw = str(response[19:22], "ascii")
  129. mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
  130. ro_count = int(response[30])
  131. rw_count = int(response[31])
  132. return NBusModuleInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id,
  133. read_only_sensors=ro_count, read_write_sensors=rw_count)
  134. def cmd_get_format(self) -> dict[NBusSensorAddress, NBusDataFormat]:
  135. """
  136. Get format of all on-board sensors.
  137. :return: dict of sensor addresses and sensor formats
  138. """
  139. # get response
  140. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
  141. bytearray([]))
  142. begin_idx = 0
  143. formats = {}
  144. # parse format
  145. while begin_idx < resp_length:
  146. device_id = response[begin_idx]
  147. device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
  148. self.__devices[device_id].data_format = device_format
  149. formats[device_id] = device_format
  150. begin_idx += 4
  151. return formats
  152. def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]:
  153. """
  154. Get type of all on-board sensors.
  155. :return: dict of sensor addresses and sensor types
  156. """
  157. # get response
  158. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
  159. bytearray([]))
  160. # parse response
  161. types = {}
  162. i = 0
  163. while i < resp_length - 1:
  164. types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1])
  165. i += 2
  166. return types
  167. """
  168. ================================================================================================================
  169. Module Set Commands
  170. ================================================================================================================
  171. """
  172. def cmd_set_module_stop(self) -> NBusStatusType:
  173. """
  174. Stop automatic measuring.
  175. :return: status
  176. """
  177. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
  178. return NBusStatusType(response[0])
  179. def cmd_set_module_start(self) -> NBusStatusType:
  180. """
  181. Start automatic measuring.
  182. :return: status
  183. """
  184. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
  185. return NBusStatusType(response[0])
  186. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
  187. """
  188. Set module parameter.
  189. :param param: parameter ID
  190. :param value: parameter value
  191. :return: status
  192. """
  193. # create request packet
  194. param_id_raw = struct.pack("B", param.value)
  195. param_val_raw = struct.pack("<I",value)
  196. param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
  197. # proceed request
  198. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, param_bytes)
  199. # if response is valid, store parameter
  200. if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
  201. self.__params[param] = value
  202. return NBusStatusType(response[1])
  203. def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
  204. -> dict[NBusParameterID, NBusStatusType]:
  205. """
  206. Set multiple module parameters.
  207. :param params: parameters
  208. :return: dict od statuses
  209. """
  210. # create request packet
  211. param_bytes = NbusCommonParser.parameters_to_request(params)
  212. # send request
  213. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM,
  214. bytearray(param_bytes), long_answer=1.0)
  215. # parse statuses
  216. statuses = {}
  217. for i in range(0, resp_length - 1, 2):
  218. p_id = NBusParameterID(response[i])
  219. if response[i+1] == NBusStatusType.STATUS_SUCCESS:
  220. self.__params[p_id] = params[p_id] # if success, store param
  221. statuses[p_id] = NBusStatusType(response[i + 1])
  222. return statuses
  223. def cmd_set_calibrate(self) -> NBusStatusType:
  224. """
  225. Send calibration command.
  226. :return: calibration status
  227. """
  228. resp_length, *response = self.__port.request_module(self.__module_addr,
  229. NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
  230. print(response)
  231. return NBusStatusType(response[0])
  232. def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \
  233. -> dict[NBusSensorAddress, NBusStatusType]:
  234. """
  235. Set data to read-write sensors.
  236. :param data:
  237. :return: operation statuses
  238. """
  239. # create request packet
  240. request = []
  241. # transform data
  242. for addr in data.keys():
  243. # handle errors
  244. if self.__devices[addr].data_format is None: # check for format and params
  245. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  246. raw_data = data[addr]
  247. request.append(addr)
  248. request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data))
  249. # send request
  250. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA,
  251. bytearray(request))
  252. # return response statuses
  253. statuses = {}
  254. for i in range(0, resp_length - 1, 2):
  255. statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1])
  256. return statuses