| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- import struct
- from nbus_api.nbus_sensor import NBusSensor
- from nbus_api.nbus_common_parser import NbusCommonParser
- from nbus_hal.nbus_serial.serial_port import *
- from nbus_types.nbus_address_type import NBusModuleAddress
- from nbus_types.nbus_data_fomat import NBusDataValue, NBusDataFormat
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
- from nbus_types.nbus_status_type import NBusStatusType
- from nbus_types.nbus_sensor_count_type import NBusSensorCount
- from nbus_types.nbus_info_type import NBusInfo
- from nbus_types.nbus_sensor_type import NBusSensorType
- @beartype
- class NBusSlaveModule:
- """
- Class representing nBus slave module.
- """
- def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
- """
- Constructor.
- :param serial_port: serial port
- :param module_address: address of module
- """
- self.__port = serial_port
- self.__module_addr = module_address
- self.__params = {}
- self.__devices = {}
- self._map_param_get = lambda t, v: v # dummy implementation
- self._map_param_set = lambda t, v: v # dummy implementation
- """
- ================================================================================================================
- Module General Methods
- ================================================================================================================
- """
- def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
- map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]) -> None:
- """
- Set parameter mappers for module.
- :param map_param_get_cb: callback for map param get
- :param map_param_set_cb: callback for map param set
- """
- self._map_param_get = map_param_get_cb
- self._map_param_set = map_param_set_cb
- def add_sensor(self, sensor: NBusSensor) -> None:
- """
- Add sensor to container.
- :param sensor: nbus sensor
- """
- sensor.set_parent_module_address(self.__module_addr)
- sensor.set_device_port(self.__port)
- self.__devices[sensor.address] = sensor
- def get_sensor(self, sensor_address: NBusSensorAddress) -> NBusSensor:
- """
- Get module sensor.
- :param sensor_address: address of sensor
- :return: sensor
- """
- return self.__devices[sensor_address]
- """
- ================================================================================================================
- Module Get Commands
- ================================================================================================================
- """
- def cmd_get_echo(self, message: bytearray) -> bool:
- """
- Get echo from module.
- :param message: message to send
- :return: status (True = echo, False = no echo)
- """
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
- return response == list(message)
- def cmd_get_param(self, parameter: NBusParameterID) -> NBusParameterValue:
- """
- Get single module parameter.
- :param parameter: parameter id
- :return: parameter value
- """
- # get response
- resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
- bytearray([parameter.value]))
- # parse parameter
- param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
- param_val = self._map_param_get(param_id, param_val_raw)
- # store parameter value
- self.__params[param_id] = param_val
- return param_val
- def cmd_get_all_params(self) -> dict[NBusParameterID, NBusParameterValue]:
- """
- Get all module parameters.
- :return: dict of parameter id and parameter value
- """
- # get response
- resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
- # parse parameters
- params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
- for param_id, param_val_raw in params_raw:
- param_val = self._map_param_get(param_id, param_val_raw)
- # store parameters
- self.__params[param_id] = param_val
- return self.__params.copy()
- def cmd_get_sensor_cnt(self) -> NBusSensorCount:
- """
- Get sensor count.
- :return: count of read-only and read-write sensors
- """
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
- return NBusSensorCount(*response)
- def cmd_get_data(self) -> dict[NBusSensorAddress, list[NBusDataValue]]:
- """
- Get data from all module sensors.
- :return: dict of device addresses and data values
- """
- # get data
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
- # parse data
- begin_idx = 0
- data = {}
- while begin_idx < resp_length:
- device_id = response[begin_idx]
- # handle errors
- if self.__devices[device_id].data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- if not self.__devices[device_id].data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
- values, offset = NbusCommonParser.data_from_response(self.__devices[device_id].data_format,
- response[begin_idx:])
- data[device_id] = self.__devices[device_id].map_data_get(values)
- begin_idx += offset + 1
- return data
- def cmd_get_info(self) -> NBusInfo:
- """
- Get module info.
- :return: module info
- """
- response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO, bytearray([]))
- name = str(response[1:9], "ascii")
- typ = str(response[9:12], "ascii")
- uuid = struct.unpack("<I", bytearray(response[12:16]))[0]
- hw = str(response[16:19], "ascii")
- fw = str(response[19:22], "ascii")
- mem_id = struct.unpack("<Q", bytearray(response[22:30]))[0]
- ro_count = int(response[30])
- rw_count = int(response[31])
- return NBusInfo(module_name=name, module_type=typ, uuid=uuid, hw=hw, fw=fw, memory_id=mem_id,
- read_only_sensors=ro_count, read_write_sensors=rw_count)
- def cmd_get_format(self) -> dict[NBusSensorAddress, NBusDataFormat]:
- """
- Get format of all on-board sensors.
- :return: dict of sensor addresses and sensor formats
- """
- # get response
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_FORMAT,
- bytearray([]))
- begin_idx = 0
- formats = {}
- # parse format
- while begin_idx < resp_length:
- device_id = response[begin_idx]
- device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
- self.__devices[device_id].data_format = device_format
- formats[device_id] = device_format
- begin_idx += 4
- return formats
- def cmd_get_sensor_type(self) -> dict[NBusSensorAddress, NBusSensorType]:
- """
- Get type of all on-board sensors.
- :return: dict of sensor addresses and sensor types
- """
- # get response
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_TYPE,
- bytearray([]))
- # parse response
- types = {}
- i = 0
- while i < resp_length - 1:
- types[NBusSensorAddress(response[i])] = NBusSensorType(response[i + 1])
- i += 2
- return types
- """
- ================================================================================================================
- Module Set Commands
- ================================================================================================================
- """
- def cmd_set_module_stop(self) -> NBusStatusType:
- """
- Stop automatic measuring.
- :return: status
- """
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
- return NBusStatusType(response[0])
- def cmd_set_module_start(self) -> NBusStatusType:
- """
- Start automatic measuring.
- :return: status
- """
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_START, bytearray([]))
- return NBusStatusType(response[0])
- def cmd_set_param(self, param: NBusParameterID, value: NBusParameterValue) -> NBusStatusType:
- """
- Set module parameter.
- :param param: parameter ID
- :param value: parameter value
- :return: status
- """
- # create request packet
- param_id_raw = struct.pack("B", param.value)
- param_val_raw = struct.pack("<I", self._map_param_set(param, value))
- param_bytes = bytearray(param_id_raw) + bytearray(param_val_raw)
- # proceed request
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM, param_bytes)
- # if response is valid, store parameter
- if response[0] == param.value and response[1] == NBusStatusType.STATUS_SUCCESS:
- self.__params[param] = value
- return NBusStatusType(response[1])
- def cmd_set_multi_params(self, params: dict[NBusParameterID, NBusParameterValue]) \
- -> dict[NBusParameterID, NBusStatusType]:
- """
- Set multiple module parameters.
- :param params: parameters
- :return: dict od statuses
- """
- # scale parameters
- for p_id in params.keys():
- params[p_id] = self._map_param_set(p_id, params[p_id])
- # create request packet
- param_bytes = NbusCommonParser.parameters_to_request(params)
- # send request
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_PARAM,
- bytearray(param_bytes))
- # parse statuses
- statuses = {}
- for i in range(0, resp_length - 1, 2):
- p_id = NBusParameterID(response[i])
- if response[i+1] == NBusStatusType.STATUS_SUCCESS:
- self.__params[p_id] = params[p_id] # if success, store param
- statuses[p_id] = NBusStatusType(response[i + 1])
- return statuses
- def cmd_set_calibrate(self) -> NBusStatusType:
- """
- Send calibration command.
- :return: calibration status
- """
- resp_length, *response = self.__port.request_module(self.__module_addr,
- NBusCommand.CMD_SET_CALIBRATE, bytearray([]))
- print(response)
- return NBusStatusType(response[0])
- def cmd_set_data(self, data: dict[NBusSensorAddress, list[NBusDataValue]]) \
- -> dict[NBusSensorAddress, NBusStatusType]:
- """
- Set data to read-write sensors.
- :param data:
- :return: operation statuses
- """
- # create request packet
- request = []
- # transform data
- for addr in data.keys():
- # handle errors
- if self.__devices[addr].data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED)
- if not self.__devices[addr].data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED)
- raw_data = self.__devices[addr].map_data_set(data[addr])
- request.append(addr)
- request.extend(NbusCommonParser.data_to_request(self.__devices[addr].data_format, raw_data))
- # send request
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_SET_DATA,
- bytearray(request))
- # return response statuses
- statuses = {}
- for i in range(0, resp_length - 1, 2):
- statuses[NBusSensorAddress(response[i])] = NBusStatusType(response[i + 1])
- return statuses
|