nbus_slave_module.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import struct
  2. from abc import abstractmethod
  3. from typing import Tuple, Annotated
  4. from beartype.vale import Is
  5. from nbus_api.nbus_slave_device import NBusSlaveDevice
  6. from nbus_api.nbus_common_parser import NbusCommonParser
  7. from nbus_hal.nbus_serial.serial_port import *
  8. from nbus_types.nbus_address_type import NBusModuleAddress
  9. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  10. @beartype
  11. class NBusSlaveModule:
  12. def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
  13. self.__port = serial_port
  14. self.__module_addr = module_address
  15. self.__params = {}
  16. self.__devices = {}
  17. self._map_param_get = lambda t, v: v
  18. self._map_param_set = lambda t, v: v
  19. def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
  20. map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]):
  21. self._map_param_get = map_param_get_cb
  22. self._map_param_set = map_param_set_cb
  23. def add_device(self, device: NBusSlaveDevice) -> None:
  24. device.set_parent_module_address(self.__module_addr)
  25. device.set_device_port(self.__port)
  26. self.__devices[device.address] = device
  27. def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice:
  28. return self.__devices[device_address]
  29. """
  30. ================================================================================================================
  31. Module Get Commands
  32. ================================================================================================================
  33. """
  34. def cmd_get_echo(self, message: bytearray) -> bool:
  35. """
  36. Send Echo Command.
  37. :param message: message to send
  38. :return: status (True = echo, False = no echo)
  39. """
  40. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
  41. return response == list(message)
  42. def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
  43. # get response
  44. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
  45. bytearray([parameter.value]))
  46. param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
  47. param_val = self._map_param_get(param_id, param_val_raw)
  48. self.__params[param_id] = param_val
  49. return param_id, param_val
  50. def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
  51. resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
  52. params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
  53. params = []
  54. for param_id, param_val_raw in params_raw:
  55. param_val = self._map_param_get(param_id, param_val_raw)
  56. params.append((param_id, param_val))
  57. self.__params[param_id] = param_val
  58. return params
  59. def cmd_get_sensor_cnt(self) -> Annotated[int, Is[lambda value: 0 < value < 64]]:
  60. _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
  61. return response[0]
  62. def cmd_get_data(self):
  63. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
  64. begin_idx = 0
  65. data = []
  66. while begin_idx < resp_length:
  67. device_id = response[begin_idx]
  68. values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:])
  69. data.append((device_id, values))
  70. begin_idx += offset + 1
  71. return data
  72. def cmd_get_info(self, parameter: NBusInfoParam):
  73. resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO,
  74. bytearray([parameter.value]))
  75. if parameter == NBusInfoParam.INFO_FORMAT:
  76. begin_idx = 0
  77. formats = []
  78. while begin_idx < resp_length:
  79. device_id = response[begin_idx]
  80. device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
  81. self.__devices[device_id].data_format = device_format
  82. formats.append((device_id, device_format))
  83. begin_idx += 4
  84. return formats
  85. def cmd_get_sensor_type(self):
  86. pass
  87. """
  88. ================================================================================================================
  89. Module Set Commands
  90. ================================================================================================================
  91. """
  92. def cmd_set_module_stop(self):
  93. self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
  94. def cmd_set_module_start(self):
  95. self.__port.request_module(self.__module_addr,NBusCommand.CMD_SET_START, bytearray([]))