|
|
@@ -9,6 +9,10 @@ from nbus_api.nbus_common_parser import NbusCommonParser
|
|
|
from nbus_hal.nbus_serial.serial_port import *
|
|
|
from nbus_types.nbus_address_type import NBusModuleAddress
|
|
|
from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
|
|
|
+from nbus_types.nbus_status_type import NBusStatusType
|
|
|
+from nbus_types.nbus_sensor_count_type import NBusSensorCount
|
|
|
+from nbus_types.nbus_info_type import NBusInfo
|
|
|
+from nbus_types.nbus_sensor_type import NBusSensorType
|
|
|
|
|
|
|
|
|
@beartype
|
|
|
@@ -42,7 +46,6 @@ class NBusSlaveModule:
|
|
|
Module Get Commands
|
|
|
================================================================================================================
|
|
|
"""
|
|
|
-
|
|
|
def cmd_get_echo(self, message: bytearray) -> bool:
|
|
|
"""
|
|
|
Send Echo Command.
|
|
|
@@ -64,61 +67,76 @@ class NBusSlaveModule:
|
|
|
|
|
|
return param_id, param_val
|
|
|
|
|
|
- def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
|
|
|
+ def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
|
|
|
resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
|
|
|
params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
|
|
|
|
|
|
- params = []
|
|
|
-
|
|
|
for param_id, param_val_raw in params_raw:
|
|
|
param_val = self._map_param_get(param_id, param_val_raw)
|
|
|
|
|
|
- params.append((param_id, param_val))
|
|
|
self.__params[param_id] = param_val
|
|
|
|
|
|
- return params
|
|
|
+ return self.__params.copy()
|
|
|
|
|
|
- def cmd_get_sensor_cnt(self) -> Annotated[int, Is[lambda value: 0 < value < 64]]:
|
|
|
+ def cmd_get_sensor_cnt(self) -> NBusSensorCount:
|
|
|
_, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
|
|
|
- return response[0]
|
|
|
+ return NBusSensorCount(*response)
|
|
|
|
|
|
def cmd_get_data(self):
|
|
|
resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
|
|
|
begin_idx = 0
|
|
|
- data = []
|
|
|
+ data = {}
|
|
|
|
|
|
while begin_idx < resp_length:
|
|
|
device_id = response[begin_idx]
|
|
|
|
|
|
values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:])
|
|
|
- data.append((device_id, values))
|
|
|
+ data[device_id] = values
|
|
|
begin_idx += offset + 1
|
|
|
|
|
|
return data
|
|
|
|
|
|
- def cmd_get_info(self, parameter: NBusInfoParam):
|
|
|
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO,
|
|
|
- bytearray([parameter.value]))
|
|
|
+ def cmd_get_info(self):
|
|
|
+ response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
|
|
|
|
|
|
- if parameter == NBusInfoParam.INFO_FORMAT:
|
|
|
- begin_idx = 0
|
|
|
- formats = []
|
|
|
+ name = str(response[1:9], "ascii")
|
|
|
+ typ = str(response[9:12], "ascii")
|
|
|
+ uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
|
|
|
+ hw = str(response[16:19], "ascii")
|
|
|
+ fw = str(response[19:22], "ascii")
|
|
|
+ mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
|
|
|
|
|
|
- while begin_idx < resp_length:
|
|
|
- device_id = response[begin_idx]
|
|
|
- device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
|
|
|
+ return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id)
|
|
|
|
|
|
- self.__devices[device_id].data_format = device_format
|
|
|
- formats.append((device_id, device_format))
|
|
|
- begin_idx += 4
|
|
|
|
|
|
- return formats
|
|
|
+ def cmd_get_format(self):
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
|
|
|
+ bytearray([]))
|
|
|
+ begin_idx = 0
|
|
|
+ formats = {}
|
|
|
|
|
|
+ while begin_idx < resp_length:
|
|
|
+ device_id = response[begin_idx]
|
|
|
+ device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
|
|
|
+
|
|
|
+ self.__devices[device_id].data_format = device_format
|
|
|
+ formats[device_id] = device_format
|
|
|
+ begin_idx += 4
|
|
|
|
|
|
+ return formats
|
|
|
|
|
|
|
|
|
def cmd_get_sensor_type(self):
|
|
|
- pass
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
|
|
|
+ bytearray([]))
|
|
|
+ types = {}
|
|
|
+ i = 0
|
|
|
+
|
|
|
+ while i < resp_length - 1:
|
|
|
+ types[NBusDeviceAddress(response[i])] = NBusSensorType(response[i+1])
|
|
|
+ i += 2
|
|
|
+
|
|
|
+ return types
|
|
|
|
|
|
|
|
|
"""
|
|
|
@@ -127,12 +145,19 @@ class NBusSlaveModule:
|
|
|
================================================================================================================
|
|
|
"""
|
|
|
|
|
|
+ def cmd_set_find(self, enable: bool) -> NBusStatusType:
|
|
|
+ """
|
|
|
+ Send Find Command.
|
|
|
+ :param enable: to start (True) / stop (False) finding
|
|
|
+ :return: status
|
|
|
+ """
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_FIND, bytearray([enable]))
|
|
|
+ return NBusStatusType(response[0])
|
|
|
+
|
|
|
def cmd_set_module_stop(self):
|
|
|
self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
|
|
|
|
|
|
def cmd_set_module_start(self):
|
|
|
- self.__port.request_module(self.__module_addr,NBusCommand.CMD_SET_START, bytearray([]))
|
|
|
-
|
|
|
-
|
|
|
+ self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
|
|
|
|
|
|
|