nbus_slave_device.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import struct
  2. from abc import abstractmethod, ABCMeta
  3. from typing import Tuple
  4. from nbus_hal.nbus_generic_port import NBusPort
  5. from nbus_types.nbus_command_type import NBusCommand
  6. from nbus_types.nbus_data_fomat import *
  7. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  8. from nbus_types.nbus_parameter_type import *
  9. from nbus_types.nbus_address_type import NBusDeviceAddress, NBusModuleAddress
  10. from nbus_types.nbus_sensor_type import NBusSensorType
  11. from nbus_api.nbus_common_parser import NbusCommonParser
  12. @beartype
  13. class NBusSlaveDevice(metaclass=ABCMeta):
  14. def __init__(self, address: NBusDeviceAddress):
  15. self.__address = address
  16. self.__module_address = None
  17. self.__port = None
  18. self.__data_format = None
  19. self.__parameters = {}
  20. @property
  21. def address(self):
  22. return self.__address
  23. @property
  24. def data_format(self):
  25. return self.__data_format
  26. @data_format.setter
  27. def data_format(self, data_format: NBusDataFormat) -> None:
  28. self.__data_format = data_format
  29. @property
  30. def parameters(self):
  31. return self.__parameters
  32. @parameters.setter
  33. def parameters(self, values):
  34. self.__parameters = values
  35. def set_parent_module_address(self, address: NBusModuleAddress):
  36. self.__module_address = address
  37. def set_device_port(self, port: NBusPort):
  38. self.__port = port
  39. """
  40. ================================================================================================================
  41. Abstract Methods
  42. ================================================================================================================
  43. """
  44. @abstractmethod
  45. def data_parameters_loaded(self) -> bool:
  46. """
  47. Verify that all necessary parameters are loaded
  48. before performing data get/set conversion.
  49. :return: true if ready for conversion, otherwise False
  50. """
  51. pass
  52. @abstractmethod
  53. def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue:
  54. """
  55. Convert a parameter from cmd_get_param() to its engineering range.
  56. :param param_id: the ID of the parameter
  57. :param param_value: the value of the parameter in binary format
  58. :return: the converted parameter value in engineering units
  59. """
  60. pass
  61. @abstractmethod
  62. def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int:
  63. """
  64. Convert a parameter to its binary range for cmd_set_data().
  65. :param param_id: the ID of the parameter
  66. :param param_value: the value of the parameter in engineering units
  67. :return: the converted parameter value in binary format
  68. """
  69. pass
  70. @abstractmethod
  71. def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
  72. """
  73. Convert data from cmd_get_data() to its engineering range.
  74. :param values: a list of values in binary format to be converted
  75. :return: a list of converted values in engineering units
  76. """
  77. pass
  78. @abstractmethod
  79. def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
  80. """
  81. Convert data to its binary range for cmd_set_data().
  82. :param values: a list of values in engineering range to be converted
  83. :return: a list of converted values in binary format
  84. """
  85. pass
  86. """
  87. ================================================================================================================
  88. Device Get Commands
  89. ================================================================================================================
  90. """
  91. def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
  92. # get response
  93. resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
  94. NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
  95. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  96. param_val = self.map_parameter_get(param_id, param_val_raw)
  97. self.__parameters[param_id] = param_val
  98. return param_id, param_val
  99. def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
  100. resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
  101. NBusCommand.CMD_GET_PARAM, bytearray([]))
  102. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  103. params = []
  104. for param_id, param_val_raw in params_raw:
  105. param_val = self.map_parameter_get(param_id, param_val_raw)
  106. params.append((param_id, param_val))
  107. self.__parameters[param_id] = param_val
  108. return params
  109. def cmd_get_data(self):
  110. _, *resp = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA, bytearray([]))
  111. values, _ = NbusCommonParser.data_from_response(self, resp)
  112. return values
  113. def cmd_get_sensor_type(self):
  114. _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_SENSOR_TYPE,
  115. bytearray([]))
  116. return NBusSensorType(response[0])
  117. def cmd_get_format(self):
  118. _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_FORMAT,
  119. bytearray([]))
  120. self.data_format = NbusCommonParser.format_from_response(response)
  121. return self.data_format
  122. """
  123. ================================================================================================================
  124. Device Set Commands
  125. ================================================================================================================
  126. """
  127. def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> None:
  128. raw_value = self.map_parameter_set(param, value) # get raw value
  129. value_bytes = struct.pack('<I', raw_value & 0xFFFFFFFF) # create bytes
  130. self.__port.request_device(self.__module_address, self.__address, # proceed request
  131. NBusCommand.CMD_SET_PARAM, bytearray([param.value, *value_bytes]))
  132. self.__parameters[param] = value # store new value of parameter
  133. def cmd_set_data(self, data: list[NBusDataValue]):
  134. if self.__data_format is None: # check for format and params
  135. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
  136. if not self.data_parameters_loaded():
  137. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
  138. binary_data = self.map_data_set(data)
  139. _, *resp = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_SET_DATA, bytearray(binary_data))
  140. return resp