nbus_module_slave.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import struct
  2. from nbus_api.nbus_sensor import NBusSensor
  3. from nbus_api.nbus_common_parser import NbusCommonParser
  4. from nbus_hal.nbus_serial.serial_port import *
  5. from nbus_types.nbus_address_type import NBusModuleAddress
  6. from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat
  7. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  8. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  9. from nbus_types.nbus_status_type import NBusStatusType
  10. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  11. from nbus_types.nbus_info_type import NBusInfo
  12. from nbus_types.nbus_sensor_type import NBusSensorType
  13. @beartype
  14. class NBusSlaveModule:
  15. """
  16. Class representing nBus slave module.
  17. """
  18. def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
  19. """
  20. Constructor.
  21. :param serial_port: serial port
  22. :param module_address: address of module
  23. :param device_cnt: number of devices
  24. """
  25. self.__port = serial_port
  26. self.__module_addr = module_address
  27. self.__params = {}
  28. self.__devices = {}
  29. """
  30. ================================================================================================================
  31. Module General Methods
  32. ================================================================================================================
  33. """
  34. def scan(self):
  35. sensors = self.cmd_get_sensor_type()
  36. for sen_address, sen_type in sensors.items():
  37. self.__devices[sen_address] = NBusSensor(sen_address)
  38. self.__devices[sen_address].type = sen_type
  39. return sensors
  40. def get_devices(self):
  41. return self.__devices
  42. def add_sensor(self, sensor: NBusSensor) -> None:
  43. """
  44. Add sensor to container.
  45. :param sensor: nbus sensor
  46. """
  47. sensor.set_parent_module_address(self.__module_addr)
  48. sensor.set_device_port(self.__port)
  49. self.__devices[sensor.address] = sensor
  50. def get_sensor(self, sensor_address: NBusSensorAddress) -> NBusSensor:
  51. """
  52. Get module sensor.
  53. :param sensor_address: address of sensor
  54. :return: sensor
  55. """
  56. return self.__devices[sensor_address]
  57. """
  58. ================================================================================================================
  59. Module Get Commands
  60. ================================================================================================================
  61. """
  62. def cmd_get_echo(self, message: bytearray) -> bool:
  63. """
  64. Get echo from module.
  65. :param message: message to send
  66. :return: status (True = echo, False = no echo)
  67. """
  68. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
  69. return response == list(message)
  70. def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
  71. """
  72. Get single module parameter.
  73. :param parameter: parameter id
  74. :return: parameter value
  75. """
  76. # get response
  77. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
  78. bytearray([parameter.value]))
  79. # parse parameter
  80. param_id, param_val = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  81. # store parameter value
  82. self.__params[param_id] = param_val
  83. return param_val
  84. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  85. """
  86. Get all module parameters.
  87. :return: dict of parameter id and parameter value
  88. """
  89. # get response
  90. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
  91. # parse parameters
  92. params = NbusCommonParser.parameters_from_response(resp_len, response)
  93. for param_id, param_val in params:
  94. # store parameters
  95. self.__params[param_id] = param_val
  96. return self.__params.copy()
  97. def cmd_get_sensor_cnt(self) -> NBusSensorCount:
  98. """
  99. Get sensor count.
  100. :return: count of read-only and read-write sensors
  101. """
  102. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
  103. return NBusSensorCount(*response)
  104. def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]:
  105. """
  106. Get data from all module sensors.
  107. :return: dict of device addresses and data values
  108. """
  109. # get data
  110. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
  111. # parse data
  112. begin_idx = 0
  113. data = {}
  114. while begin_idx < resp_length:
  115. device_id = response[begin_idx]
  116. # handle errors
  117. if self.__devices[device_id].data_format is None: # check for format and params
  118. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  119. values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format,
  120. response[begin_idx:])
  121. data[device_id] = values
  122. begin_idx += offset + 1
  123. return data
  124. def cmd_get_info(self) -> NBusInfo:
  125. """
  126. Get module info.
  127. :return: module info
  128. """
  129. response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
  130. name = str(response[1:9], "ascii")
  131. typ = str(response[9:12], "ascii")
  132. uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
  133. hw = str(response[16:19], "ascii")
  134. fw = str(response[19:22], "ascii")
  135. mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
  136. ro_count = int(response[30])
  137. rw_count = int(response[31])
  138. return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id,
  139. read_only_sensors=ro_count, read_write_sensors=rw_count)
  140. def cmd_get_format(self) -> dict[NBusSensorAddress, NBusDataFormat]:
  141. """
  142. Get format of all on-board sensors.
  143. :return: dict of sensor addresses and sensor formats
  144. """
  145. # get response
  146. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
  147. bytearray([]))
  148. begin_idx = 0
  149. formats = {}
  150. # parse format
  151. while begin_idx < resp_length:
  152. device_id = response[begin_idx]
  153. device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
  154. self.__devices[device_id].data_format = device_format
  155. formats[device_id] = device_format
  156. begin_idx += 4
  157. return formats
  158. def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]:
  159. """
  160. Get type of all on-board sensors.
  161. :return: dict of sensor addresses and sensor types
  162. """
  163. # get response
  164. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
  165. bytearray([]))
  166. # parse response
  167. types = {}
  168. i = 0
  169. while i < resp_length - 1:
  170. types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1])
  171. i += 2
  172. return types
  173. """
  174. ================================================================================================================
  175. Module Set Commands
  176. ================================================================================================================
  177. """
  178. def cmd_set_module_stop(self) -> NBusStatusType:
  179. """
  180. Stop automatic measuring.
  181. :return: status
  182. """
  183. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
  184. return NBusStatusType(response[0])
  185. def cmd_set_module_start(self) -> NBusStatusType:
  186. """
  187. Start automatic measuring.
  188. :return: status
  189. """
  190. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
  191. return NBusStatusType(response[0])
  192. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
  193. """
  194. Set module parameter.
  195. :param param: parameter ID
  196. :param value: parameter value
  197. :return: status
  198. """
  199. # create request packet
  200. param_id_raw = struct.pack("B", param.value)
  201. param_val_raw = struct.pack("<I",value)
  202. param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
  203. # proceed request
  204. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, param_bytes)
  205. # if response is valid, store parameter
  206. if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
  207. self.__params[param] = value
  208. return NBusStatusType(response[1])
  209. def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
  210. -> dict[NBusParameterID, NBusStatusType]:
  211. """
  212. Set multiple module parameters.
  213. :param params: parameters
  214. :return: dict od statuses
  215. """
  216. # create request packet
  217. param_bytes = NbusCommonParser.parameters_to_request(params)
  218. # send request
  219. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM,
  220. bytearray(param_bytes))
  221. # parse statuses
  222. statuses = {}
  223. for i in range(0, resp_length - 1, 2):
  224. p_id = NBusParameterID(response[i])
  225. if response[i+1] == NBusStatusType.STATUS_SUCCESS:
  226. self.__params[p_id] = params[p_id] # if success, store param
  227. statuses[p_id] = NBusStatusType(response[i + 1])
  228. return statuses
  229. def cmd_set_calibrate(self) -> NBusStatusType:
  230. """
  231. Send calibration command.
  232. :return: calibration status
  233. """
  234. resp_length, *response = self.__port.request_module(self.__module_addr,
  235. NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
  236. print(response)
  237. return NBusStatusType(response[0])
  238. def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \
  239. -> dict[NBusSensorAddress, NBusStatusType]:
  240. """
  241. Set data to read-write sensors.
  242. :param data:
  243. :return: operation statuses
  244. """
  245. # create request packet
  246. request = []
  247. # transform data
  248. for addr in data.keys():
  249. # handle errors
  250. if self.__devices[addr].data_format is None: # check for format and params
  251. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  252. raw_data = data[addr]
  253. request.append(addr)
  254. request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data))
  255. # send request
  256. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA,
  257. bytearray(request))
  258. # return response statuses
  259. statuses = {}
  260. for i in range(0, resp_length - 1, 2):
  261. statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1])
  262. return statuses