|
@@ -0,0 +1,358 @@
|
|
|
|
|
+import struct
|
|
|
|
|
+from nbus_api.nbus_sensor import NBusSensor
|
|
|
|
|
+from nbus_api.nbus_common_parser import NbusCommonParser
|
|
|
|
|
+from nbus_hal.nbus_serial.serial_port import *
|
|
|
|
|
+from nbus_types.nbus_address_type import NBusModuleAddress
|
|
|
|
|
+from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat
|
|
|
|
|
+from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
|
|
|
|
|
+from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
|
|
|
|
|
+from nbus_types.nbus_status_type import NBusStatusType
|
|
|
|
|
+from nbus_types.nbus_sensor_count_type import NBusSensorCount
|
|
|
|
|
+from nbus_types.nbus_info_type import NBusInfo
|
|
|
|
|
+from nbus_types.nbus_sensor_type import NBusSensorType
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@beartype
|
|
|
|
|
+class NBusSlaveModule:
|
|
|
|
|
+ """
|
|
|
|
|
+ Class representing nBus slave module.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
|
|
|
|
|
+ """
|
|
|
|
|
+ Constructor.
|
|
|
|
|
+
|
|
|
|
|
+ :param serial_port: serial port
|
|
|
|
|
+ :param module_address: address of module
|
|
|
|
|
+ """
|
|
|
|
|
+ self.__port = serial_port
|
|
|
|
|
+ self.__module_addr = module_address
|
|
|
|
|
+ self.__params = {}
|
|
|
|
|
+ self.__devices = {}
|
|
|
|
|
+ self._map_param_get = lambda t, v: v # dummy implementation
|
|
|
|
|
+ self._map_param_set = lambda t, v: v # dummy implementation
|
|
|
|
|
+
|
|
|
|
|
+ """
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ Module General Methods
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
|
|
|
|
|
+ map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Set parameter mappers for module.
|
|
|
|
|
+
|
|
|
|
|
+ :param map_param_get_cb: callback for map param get
|
|
|
|
|
+ :param map_param_set_cb: callback for map param set
|
|
|
|
|
+ """
|
|
|
|
|
+ self._map_param_get = map_param_get_cb
|
|
|
|
|
+ self._map_param_set = map_param_set_cb
|
|
|
|
|
+
|
|
|
|
|
+ def add_sensor(self, sensor: NBusSensor) -> None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Add sensor to container.
|
|
|
|
|
+
|
|
|
|
|
+ :param sensor: nbus sensor
|
|
|
|
|
+ """
|
|
|
|
|
+ sensor.set_parent_module_address(self.__module_addr)
|
|
|
|
|
+ sensor.set_device_port(self.__port)
|
|
|
|
|
+ self.__devices[sensor.address] = sensor
|
|
|
|
|
+
|
|
|
|
|
+ def get_sensor(self, sensor_address: NBusSensorAddress) -> NBusSensor:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get module sensor.
|
|
|
|
|
+
|
|
|
|
|
+ :param sensor_address: address of sensor
|
|
|
|
|
+
|
|
|
|
|
+ :return: sensor
|
|
|
|
|
+ """
|
|
|
|
|
+ return self.__devices[sensor_address]
|
|
|
|
|
+
|
|
|
|
|
+ """
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ Module Get Commands
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ """
|
|
|
|
|
+ def cmd_get_echo(self, message: bytearray) -> bool:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get echo from module.
|
|
|
|
|
+
|
|
|
|
|
+ :param message: message to send
|
|
|
|
|
+ :return: status (True = echo, False = no echo)
|
|
|
|
|
+ """
|
|
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
|
|
|
|
|
+ return response == list(message)
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get single module parameter.
|
|
|
|
|
+
|
|
|
|
|
+ :param parameter: parameter id
|
|
|
|
|
+ :return: parameter value
|
|
|
|
|
+ """
|
|
|
|
|
+ # get response
|
|
|
|
|
+ resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
|
|
|
|
|
+ bytearray([parameter.value]))
|
|
|
|
|
+
|
|
|
|
|
+ # parse parameter
|
|
|
|
|
+ param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
|
|
|
|
|
+ param_val = self._map_param_get(param_id, param_val_raw)
|
|
|
|
|
+
|
|
|
|
|
+ # store parameter value
|
|
|
|
|
+ self.__params[param_id] = param_val
|
|
|
|
|
+
|
|
|
|
|
+ return param_val
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get all module parameters.
|
|
|
|
|
+
|
|
|
|
|
+ :return: dict of parameter id and parameter value
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # get response
|
|
|
|
|
+ resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
|
|
|
|
|
+
|
|
|
|
|
+ # parse parameters
|
|
|
|
|
+ params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
|
|
|
|
|
+
|
|
|
|
|
+ for param_id, param_val_raw in params_raw:
|
|
|
|
|
+ param_val = self._map_param_get(param_id, param_val_raw)
|
|
|
|
|
+
|
|
|
|
|
+ # store parameters
|
|
|
|
|
+ self.__params[param_id] = param_val
|
|
|
|
|
+
|
|
|
|
|
+ return self.__params.copy()
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_sensor_cnt(self) -> NBusSensorCount:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get sensor count.
|
|
|
|
|
+
|
|
|
|
|
+ :return: count of read-only and read-write sensors
|
|
|
|
|
+ """
|
|
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
|
|
|
|
|
+ return NBusSensorCount(*response)
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get data from all module sensors.
|
|
|
|
|
+
|
|
|
|
|
+ :return: dict of device addresses and data values
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # get data
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
|
|
|
|
|
+
|
|
|
|
|
+ # parse data
|
|
|
|
|
+ begin_idx = 0
|
|
|
|
|
+ data = {}
|
|
|
|
|
+
|
|
|
|
|
+ while begin_idx < resp_length:
|
|
|
|
|
+ device_id = response[begin_idx]
|
|
|
|
|
+
|
|
|
|
|
+ # handle errors
|
|
|
|
|
+ if self.__devices[device_id].data_format is None: # check for format and params
|
|
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
|
|
|
|
|
+ if not self.__devices[device_id].data_parameters_loaded():
|
|
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
|
|
|
|
|
+
|
|
|
|
|
+ values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format,
|
|
|
|
|
+ response[begin_idx:])
|
|
|
|
|
+ data[device_id] = self.__devices[device_id].map_data_get(values)
|
|
|
|
|
+ begin_idx += offset + 1
|
|
|
|
|
+
|
|
|
|
|
+ return data
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_info(self) -> NBusInfo:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get module info.
|
|
|
|
|
+
|
|
|
|
|
+ :return: module info
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
|
|
|
|
|
+
|
|
|
|
|
+ name = str(response[1:9], "ascii")
|
|
|
|
|
+ typ = str(response[9:12], "ascii")
|
|
|
|
|
+ uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
|
|
|
|
|
+ hw = str(response[16:19], "ascii")
|
|
|
|
|
+ fw = str(response[19:22], "ascii")
|
|
|
|
|
+ mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
|
|
|
|
|
+ ro_count = int(response[30])
|
|
|
|
|
+ rw_count = int(response[31])
|
|
|
|
|
+
|
|
|
|
|
+ return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id,
|
|
|
|
|
+ read_only_sensors=ro_count, read_write_sensors=rw_count)
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_format(self) -> dict[NBusSensorAddress, NBusDataFormat]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get format of all on-board sensors.
|
|
|
|
|
+
|
|
|
|
|
+ :return: dict of sensor addresses and sensor formats
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # get response
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
|
|
|
|
|
+ bytearray([]))
|
|
|
|
|
+ begin_idx = 0
|
|
|
|
|
+ formats = {}
|
|
|
|
|
+
|
|
|
|
|
+ # parse format
|
|
|
|
|
+ while begin_idx < resp_length:
|
|
|
|
|
+ device_id = response[begin_idx]
|
|
|
|
|
+ device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
|
|
|
|
|
+
|
|
|
|
|
+ self.__devices[device_id].data_format = device_format
|
|
|
|
|
+ formats[device_id] = device_format
|
|
|
|
|
+ begin_idx += 4
|
|
|
|
|
+
|
|
|
|
|
+ return formats
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Get type of all on-board sensors.
|
|
|
|
|
+
|
|
|
|
|
+ :return: dict of sensor addresses and sensor types
|
|
|
|
|
+ """
|
|
|
|
|
+ # get response
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
|
|
|
|
|
+ bytearray([]))
|
|
|
|
|
+ # parse response
|
|
|
|
|
+ types = {}
|
|
|
|
|
+ i = 0
|
|
|
|
|
+
|
|
|
|
|
+ while i < resp_length - 1:
|
|
|
|
|
+ types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1])
|
|
|
|
|
+ i += 2
|
|
|
|
|
+
|
|
|
|
|
+ return types
|
|
|
|
|
+
|
|
|
|
|
+ """
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ Module Set Commands
|
|
|
|
|
+ ================================================================================================================
|
|
|
|
|
+ """
|
|
|
|
|
+ def cmd_set_module_stop(self) -> NBusStatusType:
|
|
|
|
|
+ """
|
|
|
|
|
+ Stop automatic measuring.
|
|
|
|
|
+
|
|
|
|
|
+ :return: status
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
|
|
|
|
|
+ return NBusStatusType(response[0])
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_set_module_start(self) -> NBusStatusType:
|
|
|
|
|
+ """
|
|
|
|
|
+ Start automatic measuring.
|
|
|
|
|
+
|
|
|
|
|
+ :return: status
|
|
|
|
|
+ """
|
|
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
|
|
|
|
|
+ return NBusStatusType(response[0])
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
|
|
|
|
|
+ """
|
|
|
|
|
+ Set module parameter.
|
|
|
|
|
+
|
|
|
|
|
+ :param param: parameter ID
|
|
|
|
|
+ :param value: parameter value
|
|
|
|
|
+ :return: status
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # create request packet
|
|
|
|
|
+ param_id_raw = struct.pack("B", param.value)
|
|
|
|
|
+ param_val_raw = struct.pack("<I", self._map_param_set(param, value))
|
|
|
|
|
+ param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
|
|
|
|
|
+
|
|
|
|
|
+ # proceed request
|
|
|
|
|
+ _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, param_bytes)
|
|
|
|
|
+
|
|
|
|
|
+ # if response is valid, store parameter
|
|
|
|
|
+ if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
|
|
|
|
|
+ self.__params[param] = value
|
|
|
|
|
+
|
|
|
|
|
+ return NBusStatusType(response[1])
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
|
|
|
|
|
+ -> dict[NBusParameterID, NBusStatusType]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Set multiple module parameters.
|
|
|
|
|
+
|
|
|
|
|
+ :param params: parameters
|
|
|
|
|
+ :return: dict od statuses
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # scale parameters
|
|
|
|
|
+ for p_id in params.keys():
|
|
|
|
|
+ params[p_id] = self._map_param_set(p_id, params[p_id])
|
|
|
|
|
+
|
|
|
|
|
+ # create request packet
|
|
|
|
|
+ param_bytes = NbusCommonParser.parameters_to_request(params)
|
|
|
|
|
+
|
|
|
|
|
+ # send request
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM,
|
|
|
|
|
+ bytearray(param_bytes))
|
|
|
|
|
+ # parse statuses
|
|
|
|
|
+ statuses = {}
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(0, resp_length - 1, 2):
|
|
|
|
|
+ p_id = NBusParameterID(response[i])
|
|
|
|
|
+ if response[i+1] == NBusStatusType.STATUS_SUCCESS:
|
|
|
|
|
+ self.__params[p_id] = params[p_id] # if success, store param
|
|
|
|
|
+
|
|
|
|
|
+ statuses[p_id] = NBusStatusType(response[i + 1])
|
|
|
|
|
+
|
|
|
|
|
+ return statuses
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_set_calibrate(self) -> NBusStatusType:
|
|
|
|
|
+ """
|
|
|
|
|
+ Send calibration command.
|
|
|
|
|
+
|
|
|
|
|
+ :return: calibration status
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr,
|
|
|
|
|
+ NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
|
|
|
|
|
+ print(response)
|
|
|
|
|
+ return NBusStatusType(response[0])
|
|
|
|
|
+
|
|
|
|
|
+ def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \
|
|
|
|
|
+ -> dict[NBusSensorAddress, NBusStatusType]:
|
|
|
|
|
+ """
|
|
|
|
|
+ Set data to read-write sensors.
|
|
|
|
|
+
|
|
|
|
|
+ :param data:
|
|
|
|
|
+ :return: operation statuses
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ # create request packet
|
|
|
|
|
+ request = []
|
|
|
|
|
+
|
|
|
|
|
+ # transform data
|
|
|
|
|
+ for addr in data.keys():
|
|
|
|
|
+ # handle errors
|
|
|
|
|
+ if self.__devices[addr].data_format is None: # check for format and params
|
|
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
|
|
|
|
|
+ if not self.__devices[addr].data_parameters_loaded():
|
|
|
|
|
+ raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
|
|
|
|
|
+
|
|
|
|
|
+ raw_data = self.__devices[addr].map_data_set(data[addr])
|
|
|
|
|
+ request.append(addr)
|
|
|
|
|
+ request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data))
|
|
|
|
|
+
|
|
|
|
|
+ # send request
|
|
|
|
|
+ resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA,
|
|
|
|
|
+ bytearray(request))
|
|
|
|
|
+
|
|
|
|
|
+ # return response statuses
|
|
|
|
|
+ statuses = {}
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(0, resp_length - 1, 2):
|
|
|
|
|
+ statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1])
|
|
|
|
|
+
|
|
|
|
|
+ return statuses
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+
|