nbus_slave_device.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import struct
  2. from abc import abstractmethod, ABCMeta
  3. from typing import Tuple
  4. from nbus_hal.nbus_generic_port import NBusPort
  5. from nbus_types.nbus_command_type import NBusCommand, NBusInfoParam
  6. from nbus_types.nbus_data_fomat import *
  7. from nbus_types.nbus_parameter_type import *
  8. from nbus_types.nbus_address_type import NBusDeviceAddress, NBusModuleAddress
  9. from nbus_types.nbus_sensor_type import NBusSensorType
  10. from nbus_api.nbus_common_parser import NbusCommonParser
  11. @beartype
  12. class NBusSlaveDevice(metaclass=ABCMeta):
  13. def __init__(self, address: NBusDeviceAddress):
  14. self.__address = address
  15. self.__module_address = None
  16. self.__port = None
  17. self.__data_format = None
  18. self.__parameters = {}
  19. @property
  20. def address(self):
  21. return self.__address
  22. @property
  23. def data_format(self):
  24. return self.__data_format
  25. @data_format.setter
  26. def data_format(self, data_format: NBusDataFormat) -> None:
  27. self.__data_format = data_format
  28. @property
  29. def parameters(self):
  30. return self.__parameters
  31. @parameters.setter
  32. def parameters(self, values):
  33. self.__parameters = values
  34. def set_parent_module_address(self, address: NBusModuleAddress):
  35. self.__module_address = address
  36. def set_device_port(self, port: NBusPort):
  37. self.__port = port
  38. """
  39. ================================================================================================================
  40. Abstract Methods
  41. ================================================================================================================
  42. """
  43. @abstractmethod
  44. def data_parameters_loaded(self) -> bool:
  45. """
  46. Verify that all necessary parameters are loaded
  47. before performing data get/set conversion.
  48. :return: true if ready for conversion, otherwise False
  49. """
  50. pass
  51. @abstractmethod
  52. def map_parameter_get(self, param_id: NBusParameterID, param_value: int) -> NBusParameterValue:
  53. """
  54. Convert a parameter from cmd_get_param() to its engineering range.
  55. :param param_id: the ID of the parameter
  56. :param param_value: the value of the parameter in binary format
  57. :return: the converted parameter value in engineering units
  58. """
  59. pass
  60. @abstractmethod
  61. def map_parameter_set(self, param_id: NBusParameterID, param_value: NBusParameterValue) -> int:
  62. """
  63. Convert a parameter to its binary range for cmd_set_data().
  64. :param param_id: the ID of the parameter
  65. :param param_value: the value of the parameter in engineering units
  66. :return: the converted parameter value in binary format
  67. """
  68. pass
  69. @abstractmethod
  70. def map_data_get(self, values: list[int]) -> list[NBusDataValue]:
  71. """
  72. Convert data from cmd_get_data() to its engineering range.
  73. :param values: a list of values in binary format to be converted
  74. :return: a list of converted values in engineering units
  75. """
  76. pass
  77. @abstractmethod
  78. def map_data_set(self, values: list[NBusDataValue]) -> list[int]:
  79. """
  80. Convert data to its binary range for cmd_set_data().
  81. :param values: a list of values in engineering range to be converted
  82. :return: a list of converted values in binary format
  83. """
  84. pass
  85. """
  86. ================================================================================================================
  87. Device Get Commands
  88. ================================================================================================================
  89. """
  90. def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
  91. # get response
  92. resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
  93. NBusCommand.CMD_GET_PARAM, bytearray([parameter.value]))
  94. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  95. param_val = self.map_parameter_get(param_id, param_val_raw)
  96. self.__parameters[param_id] = param_val
  97. return param_id, param_val
  98. def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
  99. resp_len, *response = self.__port.request_device(self.__module_address, self.__address,
  100. NBusCommand.CMD_GET_PARAM, bytearray([]))
  101. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  102. params = []
  103. for param_id, param_val_raw in params_raw:
  104. param_val = self.map_parameter_get(param_id, param_val_raw)
  105. params.append((param_id, param_val))
  106. self.__parameters[param_id] = param_val
  107. return params
  108. def cmd_get_data(self):
  109. _, *resp = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_DATA, bytearray([]))
  110. values, _ = NbusCommonParser.data_from_response(self, resp)
  111. return values
  112. def cmd_get_sensor_type(self):
  113. _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_SENSOR_TYPE,
  114. bytearray([]))
  115. return NBusSensorType(response[0])
  116. def cmd_get_info(self, parameter: NBusInfoParam):
  117. _, *response = self.__port.request_device(self.__module_address, self.__address, NBusCommand.CMD_GET_INFO,
  118. bytearray([parameter.value]))
  119. if parameter == NBusInfoParam.INFO_FORMAT:
  120. self.data_format = NbusCommonParser.format_from_response(response)
  121. return self.data_format