nbus_sensor.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import struct
  2. from nbus_hal.nbus_generic_port import NBusPort
  3. from nbus_types.nbus_command_type import NBusCommand
  4. from nbus_types.nbus_data_fomat import *
  5. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  6. from nbus_types.nbus_parameter_type import *
  7. from nbus_types.nbus_address_type import NBusSensorAddress, NBusModuleAddress
  8. from nbus_types.nbus_sensor_type import NBusSensorType
  9. from nbus_api.nbus_common_parser import NbusCommonParser
  10. from nbus_types.nbus_status_type import NBusStatusType
  11. @beartype
  12. class NBusSensor:
  13. """
  14. Class representing nBus sensor type.
  15. """
  16. def __init__(self, port: NBusPort, module_address: NBusModuleAddress, address: NBusSensorAddress):
  17. """
  18. Constructor.
  19. :param port: NBusPort object
  20. :param module_address: address of NBusModule
  21. :param address: device address
  22. """
  23. self.__address = address
  24. self.__type = None
  25. self.__module_address = module_address
  26. self.__port = port
  27. self.__data_format = None
  28. self.__params = {}
  29. """
  30. ================================================================================================================
  31. Public Fields
  32. ================================================================================================================
  33. """
  34. @property
  35. def address(self):
  36. """
  37. Get sensor address.
  38. :return: sensor address
  39. """
  40. return self.__address
  41. @property
  42. def data_format(self):
  43. """
  44. Get data format.
  45. :return: data format
  46. """
  47. return self.__data_format
  48. @data_format.setter
  49. def data_format(self, data_format: NBusDataFormat):
  50. """
  51. Set data format.
  52. :param data_format: format of data
  53. """
  54. self.__data_format = data_format
  55. @property
  56. def parameters(self) -> dict[NBusParameterID, NBusParameterValue]:
  57. """
  58. Get parameters.
  59. :return: parameters
  60. """
  61. return self.__params
  62. @parameters.setter
  63. def parameters(self, values: dict[NBusParameterID, NBusParameterValue]):
  64. """
  65. Set parameters.
  66. :param values: values to set
  67. """
  68. self.__params = values
  69. """
  70. ================================================================================================================
  71. Device Get Commands
  72. ================================================================================================================
  73. """
  74. def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
  75. """
  76. Get single sensor parameter.
  77. :param parameter: parameter id
  78. :return: parameter value
  79. """
  80. # get response
  81. resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
  82. NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
  83. # parse parameter
  84. param_id, param_val = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  85. # store parameter value
  86. self.__params[param_id] = param_val
  87. return param_val
  88. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  89. """
  90. Get all sensor parameters.
  91. :return: dict of parameter id and parameter value
  92. """
  93. # get response
  94. resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
  95. NBusCommand.CMD_GET_PARAM, bytearray([]))
  96. # parse parameters
  97. params = NbusCommonParser.parameters_from_response(resp_len, response)
  98. for param_id, param_val in params:
  99. # store parameters
  100. self.__params[param_id] = param_val
  101. return self.__params.copy()
  102. def cmd_get_data(self) -> list[NBusDataValue]:
  103. """
  104. Get data from sensor.
  105. :raises
  106. :return: dict of device addresses and data values
  107. """
  108. if self.__data_format is None: # check for format and params
  109. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  110. _, *resp = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA,
  111. bytearray([]))
  112. values, _ = NbusCommonParser.data_from_response(self.data_format, resp)
  113. return values
  114. def cmd_get_sensor_type(self) -> NBusSensorType:
  115. """
  116. Get sensor type.
  117. :return: sensor type
  118. """
  119. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  120. NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([]))
  121. self.__type = response[0]
  122. return NBusSensorType(self.__type)
  123. def cmd_get_format(self):
  124. """
  125. Get sensor format.
  126. :return: sensor format
  127. """
  128. _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT,
  129. bytearray([]))
  130. self.data_format = NbusCommonParser.format_from_response(response)
  131. return self.data_format
  132. """
  133. ================================================================================================================
  134. Device Set Commands
  135. ================================================================================================================
  136. """
  137. def cmd_set_find(self, enable: bool) -> NBusStatusType:
  138. """
  139. Turn on/off physical indicator of device.
  140. :param enable: to start (True) / stop (False) finding
  141. :return: status
  142. """
  143. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  144. NBusCommand.CMD_SET_FIND, bytearray([enable]))
  145. return NBusStatusType(response[0])
  146. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
  147. """
  148. Set sensor parameter.
  149. :param param: parameter ID
  150. :param value: parameter value
  151. :return: status
  152. """
  153. # create request packet
  154. param_id_raw = struct.pack("B", param.value)
  155. param_val_raw = struct.pack("<I", value)
  156. param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
  157. # proceed request
  158. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  159. NBusCommand.CMD_SET_PARAM, param_bytes)
  160. # if response is valid, store parameter
  161. if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
  162. self.__params[param] = value
  163. return NBusStatusType(response[1])
  164. def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
  165. -> dict[NBusParameterID, NBusStatusType]:
  166. """
  167. Set multiple sensor parameters.
  168. :param params: parameters
  169. :return: dict od statuses
  170. """
  171. # create request packet
  172. param_bytes = NbusCommonParser.parameters_to_request(params)
  173. # send request
  174. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  175. NBusCommand.CMD_SET_PARAM, bytearray(param_bytes))
  176. # parse statuses
  177. statuses = {}
  178. for i in range(0, resp_length - 1, 2):
  179. p_id = NBusParameterID(response[i])
  180. if response[i + 1] == NBusStatusType.STATUS_SUCCESS:
  181. self.__params[p_id] = params[p_id] # if success, store param
  182. statuses[p_id] = NBusStatusType(response[i + 1])
  183. return statuses
  184. def cmd_set_calibrate(self) -> NBusStatusType:
  185. """
  186. Send calibration command.
  187. :return: calibration status
  188. """
  189. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  190. NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
  191. return NBusStatusType(response[0])
  192. def cmd_set_data(self, data: list[NBusDataValue]) -> NBusStatusType:
  193. """
  194. Set data to read-write sensor.
  195. :param data: data to set
  196. :raises: NBusErrorAPIType
  197. :return: operation status
  198. """
  199. if self.__data_format is None: # check for format and params
  200. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  201. # create request packet
  202. request = [self.__address]
  203. # transform data
  204. request.extend(NbusCommonParser.data_to_request(self.data_format, data))
  205. # send request
  206. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  207. NBusCommand.CMD_SET_DATA, bytearray(request))
  208. # return response status
  209. return NBusStatusType(response[1])