nbus_slave_module.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import struct
  2. from abc import abstractmethod
  3. from typing import Tuple, Annotated
  4. from beartype.vale import Is
  5. from nbus_api.nbus_slave_device import NBusSlaveDevice
  6. from nbus_api.nbus_common_parser import NbusCommonParser
  7. from nbus_hal.nbus_serial.serial_port import *
  8. from nbus_types.nbus_address_type import NBusModuleAddress
  9. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  10. from nbus_types.nbus_status_type import NBusStatusType
  11. from nbus_types.nbus_sensor_count_type import NBusSensorCount
  12. from nbus_types.nbus_info_type import NBusInfo
  13. from nbus_types.nbus_sensor_type import NBusSensorType
  14. @beartype
  15. class NBusSlaveModule:
  16. def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
  17. self.__port = serial_port
  18. self.__module_addr = module_address
  19. self.__params = {}
  20. self.__devices = {}
  21. self._map_param_get = lambda t, v: v
  22. self._map_param_set = lambda t, v: v
  23. def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
  24. map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]):
  25. self._map_param_get = map_param_get_cb
  26. self._map_param_set = map_param_set_cb
  27. def add_device(self, device: NBusSlaveDevice) -> None:
  28. device.set_parent_module_address(self.__module_addr)
  29. device.set_device_port(self.__port)
  30. self.__devices[device.address] = device
  31. def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice:
  32. return self.__devices[device_address]
  33. """
  34. ================================================================================================================
  35. Module Get Commands
  36. ================================================================================================================
  37. """
  38. def cmd_get_echo(self, message: bytearray) -> bool:
  39. """
  40. Send Echo Command.
  41. :param message: message to send
  42. :return: status (True = echo, False = no echo)
  43. """
  44. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
  45. return response == list(message)
  46. def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
  47. # get response
  48. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
  49. bytearray([parameter.value]))
  50. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  51. param_val = self._map_param_get(param_id, param_val_raw)
  52. self.__params[param_id] = param_val
  53. return param_id, param_val
  54. def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
  55. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
  56. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  57. for param_id, param_val_raw in params_raw:
  58. param_val = self._map_param_get(param_id, param_val_raw)
  59. self.__params[param_id] = param_val
  60. return self.__params.copy()
  61. def cmd_get_sensor_cnt(self) -> NBusSensorCount:
  62. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
  63. return NBusSensorCount(*response)
  64. def cmd_get_data(self):
  65. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
  66. begin_idx = 0
  67. data = {}
  68. while begin_idx < resp_length:
  69. device_id = response[begin_idx]
  70. values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:])
  71. data[device_id] = values
  72. begin_idx += offset + 1
  73. return data
  74. def cmd_get_info(self):
  75. response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
  76. name = str(response[1:9], "ascii")
  77. typ = str(response[9:12], "ascii")
  78. uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
  79. hw = str(response[16:19], "ascii")
  80. fw = str(response[19:22], "ascii")
  81. mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
  82. return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id)
  83. def cmd_get_format(self):
  84. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
  85. bytearray([]))
  86. begin_idx = 0
  87. formats = {}
  88. while begin_idx < resp_length:
  89. device_id = response[begin_idx]
  90. device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
  91. self.__devices[device_id].data_format = device_format
  92. formats[device_id] = device_format
  93. begin_idx += 4
  94. return formats
  95. def cmd_get_sensor_type(self):
  96. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
  97. bytearray([]))
  98. types = {}
  99. i = 0
  100. while i < resp_length - 1:
  101. types[NBusDeviceAddress(response[i])] = NBusSensorType(response[i+1])
  102. i += 2
  103. return types
  104. """
  105. ================================================================================================================
  106. Module Set Commands
  107. ================================================================================================================
  108. """
  109. def cmd_set_find(self, enable: bool) -> NBusStatusType:
  110. """
  111. Send Find Command.
  112. :param enable: to start (True) / stop (False) finding
  113. :return: status
  114. """
  115. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_FIND, bytearray([enable]))
  116. return NBusStatusType(response[0])
  117. def cmd_set_module_stop(self):
  118. self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
  119. def cmd_set_module_start(self):
  120. self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))