nbus_sensor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. import struct
  2. from abc import abstractmethod, ABCMeta
  3. from nbus_hal.nbus_generic_port import NBusPort
  4. from nbus_types.nbus_command_type import NBusCommand
  5. from nbus_types.nbus_data_fomat import *
  6. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  7. from nbus_types.nbus_parameter_type import *
  8. from nbus_types.nbus_address_type import NBusSensorAddress, NBusModuleAddress
  9. from nbus_types.nbus_sensor_type import NBusSensorType
  10. from nbus_api.nbus_common_parser import NbusCommonParser
  11. from nbus_types.nbus_status_type import NBusStatusType
  12. @beartype
  13. class NBusSensor(metaclass=ABCMeta):
  14. """
  15. Class representing nBus sensor type.
  16. """
  17. def __init__(self, address: NBusSensorAddress):
  18. """
  19. Constructor.
  20. :param address: device address
  21. """
  22. self.__address = address
  23. self.__module_address = None
  24. self.__port = None
  25. self.__data_format = None
  26. self.__params = {}
  27. """
  28. ================================================================================================================
  29. Public Fields
  30. ================================================================================================================
  31. """
  32. @property
  33. def address(self):
  34. """
  35. Get sensor address.
  36. :return: sensor address
  37. """
  38. return self.__address
  39. @property
  40. def data_format(self):
  41. """
  42. Get data format.
  43. :return: data format
  44. """
  45. return self.__data_format
  46. @data_format.setter
  47. def data_format(self, data_format: NBusDataFormat):
  48. """
  49. Set data format.
  50. :param data_format: format of data
  51. """
  52. self.__data_format = data_format
  53. @property
  54. def parameters(self) -> dict[NBusParameterID, NBusParameterValue]:
  55. """
  56. Get parameters.
  57. :return: parameters
  58. """
  59. return self.__params
  60. @parameters.setter
  61. def parameters(self, values: dict[NBusParameterID, NBusParameterValue]):
  62. """
  63. Set parameters.
  64. :param values: values to set
  65. """
  66. self.__params = values
  67. """
  68. ================================================================================================================
  69. Module-only Methods
  70. ================================================================================================================
  71. """
  72. def set_parent_module_address(self, address: NBusModuleAddress) -> None:
  73. """
  74. Set address of parent module.
  75. :param address: module address
  76. """
  77. self.__module_address = address
  78. def set_device_port(self, port: NBusPort) -> None:
  79. """
  80. Set device communication port.
  81. :param port: communicaiton port
  82. """
  83. self.__port = port
  84. """
  85. ================================================================================================================
  86. Abstract Methods
  87. ================================================================================================================
  88. """
  89. @abstractmethod
  90. def data_parameters_loaded(self) -> bool:
  91. """
  92. Verify that all necessary parameters are loaded
  93. before performing data get/set conversion.
  94. :return: true if ready for conversion, otherwise False
  95. """
  96. pass
  97. @abstractmethod
  98. def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue:
  99. """
  100. Convert a parameter from cmd_get_param() to its engineering range.
  101. :param param_id: the ID of the parameter
  102. :param param_value: the value of the parameter in binary format
  103. :return: the converted parameter value in engineering units
  104. """
  105. pass
  106. @abstractmethod
  107. def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int:
  108. """
  109. Convert a parameter to its binary range for cmd_set_data().
  110. :param param_id: the ID of the parameter
  111. :param param_value: the value of the parameter in engineering units
  112. :return: the converted parameter value in binary format
  113. """
  114. pass
  115. @abstractmethod
  116. def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
  117. """
  118. Convert data from cmd_get_data() to its engineering range.
  119. :param values: a list of values in binary format to be converted
  120. :return: a list of converted values in engineering units
  121. """
  122. pass
  123. @abstractmethod
  124. def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
  125. """
  126. Convert data to its binary range for cmd_set_data().
  127. :param values: a list of values in engineering range to be converted
  128. :return: a list of converted values in binary format
  129. """
  130. pass
  131. """
  132. ================================================================================================================
  133. Device Get Commands
  134. ================================================================================================================
  135. """
  136. def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
  137. """
  138. Get single sensor parameter.
  139. :param parameter: parameter id
  140. :return: parameter value
  141. """
  142. # get response
  143. resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
  144. NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
  145. # parse parameter
  146. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  147. param_val = self.map_parameter_get(param_id, param_val_raw)
  148. # store parameter value
  149. self.__params[param_id] = param_val
  150. return param_val
  151. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  152. """
  153. Get all sensor parameters.
  154. :return: dict of parameter id and parameter value
  155. """
  156. # get response
  157. resp_len, *response = self.__port.request_sensor(self.__module_address, self.__address,
  158. NBusCommand.CMD_GET_PARAM, bytearray([]))
  159. # parse parameters
  160. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  161. for param_id, param_val_raw in params_raw:
  162. param_val = self.map_parameter_get(param_id, param_val_raw)
  163. # store parameters
  164. self.__params[param_id] = param_val
  165. return self.__params.copy()
  166. def cmd_get_data(self) -> list[NBusDataValue]:
  167. """
  168. Get data from sensor.
  169. :raises
  170. :return: dict of device addresses and data values
  171. """
  172. if self.__data_format is None: # check for format and params
  173. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  174. if not self.data_parameters_loaded():
  175. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
  176. _, *resp = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA,
  177. bytearray([]))
  178. values, _ = NbusCommonParser.data_from_response(self.data_format, resp)
  179. return self.map_data_get(values)
  180. def cmd_get_sensor_type(self) -> NBusSensorType:
  181. """
  182. Get sensor type.
  183. :return: sensor type
  184. """
  185. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  186. NBusCommand.CMD_GET_SENSOR_TYPE, bytearray([]))
  187. return NBusSensorType(response[0])
  188. def cmd_get_format(self):
  189. """
  190. Get sensor format.
  191. :return: sensor format
  192. """
  193. _, *response = self.__port.request_sensor(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT,
  194. bytearray([]))
  195. self.data_format = NbusCommonParser.format_from_response(response)
  196. return self.data_format
  197. """
  198. ================================================================================================================
  199. Device Set Commands
  200. ================================================================================================================
  201. """
  202. def cmd_set_find(self, enable: bool) -> NBusStatusType:
  203. """
  204. Turn on/off physical indicator of device.
  205. :param enable: to start (True) / stop (False) finding
  206. :return: status
  207. """
  208. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  209. NBusCommand.CMD_SET_FIND, bytearray([enable]))
  210. return NBusStatusType(response[0])
  211. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
  212. """
  213. Set sensor parameter.
  214. :param param: parameter ID
  215. :param value: parameter value
  216. :return: status
  217. """
  218. # create request packet
  219. param_id_raw = struct.pack("B", param.value)
  220. param_val_raw = struct.pack("<I", self.map_parameter_set(param, value))
  221. param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
  222. # proceed request
  223. _, *response = self.__port.request_sensor(self.__module_address, self.__address,
  224. NBusCommand.CMD_SET_PARAM, param_bytes)
  225. # if response is valid, store parameter
  226. if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
  227. self.__params[param] = value
  228. return NBusStatusType(response[1])
  229. def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
  230. -> dict[NBusParameterID, NBusStatusType]:
  231. """
  232. Set multiple sensor parameters.
  233. :param params: parameters
  234. :return: dict od statuses
  235. """
  236. # scale parameters
  237. for p_id in params.keys():
  238. params[p_id] = self.map_parameter_set(p_id, params[p_id])
  239. # create request packet
  240. param_bytes = NbusCommonParser.parameters_to_request(params)
  241. # send request
  242. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  243. NBusCommand.CMD_SET_PARAM, bytearray(param_bytes))
  244. # parse statuses
  245. statuses = {}
  246. for i in range(0, resp_length - 1, 2):
  247. p_id = NBusParameterID(response[i])
  248. if response[i + 1] == NBusStatusType.STATUS_SUCCESS:
  249. self.__params[p_id] = params[p_id] # if success, store param
  250. statuses[p_id] = NBusStatusType(response[i + 1])
  251. return statuses
  252. def cmd_set_calibrate(self) -> NBusStatusType:
  253. """
  254. Send calibration command.
  255. :return: calibration status
  256. """
  257. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  258. NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
  259. return NBusStatusType(response[0])
  260. def cmd_set_data(self, data: list[NBusDataValue]) -> NBusStatusType:
  261. """
  262. Set data to read-write sensor.
  263. :param data: data to set
  264. :raises: NBusErrorAPIType
  265. :return: operation status
  266. """
  267. if self.__data_format is None: # check for format and params
  268. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  269. if not self.data_parameters_loaded():
  270. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
  271. # create request packet
  272. request = []
  273. # transform data
  274. raw_data = self.map_data_set(data)
  275. request.append(self.__address)
  276. request.extend(NbusCommonParser.data_to_request(self.data_format, raw_data))
  277. # send request
  278. resp_length, *response = self.__port.request_sensor(self.__module_address, self.__address,
  279. NBusCommand.CMD_SET_DATA, bytearray(request))
  280. # return response status
  281. return NBusStatusType(response[1])