import struct from typing import Tuple, Any from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue class NbusCommonParser: """ Class for common parsers for both device and module. These methods should be called from the inside of API modules. """ @staticmethod def format_from_response(response: list[int]) -> NBusDataFormat: """ Parse format from response. Note: byte representation is: | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length | :param response: response from NBus :return: data format """ sign = bool(response[1] & 0x01) unit_mul = response[1] >> 1 if unit_mul >= 64: unit_mul -= 128 value_mul = response[2] if value_mul >= 128: value_mul -= 256 byte_len = response[3] & 0x0F samples = (response[3] & 0xF0) >> 4 return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul, byte_length=byte_len, samples=samples) @staticmethod def data_from_response(nbus_device: Any, response: list[int]) -> Tuple[list[NBusDataValue], int]: """ Parse data from response. :param nbus_device: device to parse for :param response: response from NBus :return: parsed data and length of parsed data in bytes """ if nbus_device.data_format is None: # check for format and params raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED, nbus_device) if not nbus_device.data_parameters_loaded(): raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED, nbus_device) # variables for 2's complement max_uint = 1 << (nbus_device.data_format.byte_length << 3) half_max_uint = max_uint >> 1 values = [] for s in range(nbus_device.data_format.samples): value = 0 for i in range(nbus_device.data_format.byte_length): value |= response[(i + 1) + (nbus_device.data_format.byte_length * s)] << (i << 3) # compose value little endian if nbus_device.data_format.sign == 1 and value >= half_max_uint: # convert 2's complement value -= max_uint value *= 10 ** nbus_device.data_format.value_multiplier # scale number values.append(value) # append number to list return nbus_device.map_data_get(values), nbus_device.data_format.samples * nbus_device.data_format.byte_length @staticmethod def parameters_from_response(resp_len: int, response: list[int]) \ -> list[Tuple[NBusParameterID, NBusParameterValue]]: """ Parse multiple parameters from response. :param resp_len: response length in bytes :param response: response from nBus :return: list of (parameter id, parameter value) """ offset = 0 # response offset params = [] while offset < resp_len: # parse all params new_offset = offset + 5 # get param id and type param_id, *param_value = response[offset:new_offset] param_type = NBusParameterID(param_id) param_raw_value = struct.unpack("