import struct from typing import Tuple from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue class NbusCommonParser: """ Static class for common parsers for both device and module. These methods should be called from the inside of API modules. """ @staticmethod def format_from_response(response: list[int]) -> NBusDataFormat: """ Parse format from response. Note: byte representation is: | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length | :param response: response from NBus :return: data format """ sign = bool(response[1] & 0x01) unit_mul = response[1] >> 1 if unit_mul >= 64: unit_mul -= 128 value_mul = response[2] if value_mul >= 128: value_mul -= 256 byte_len = response[3] & 0x0F samples = (response[3] & 0xF0) >> 4 return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul, byte_length=byte_len, samples=samples) @staticmethod def parameters_from_response(resp_len: int, response: list[int]) \ -> list[Tuple[NBusParameterID, NBusParameterValue]]: """ Parse multiple parameters from response. :param resp_len: response length in bytes :param response: response from nBus :return: list of (parameter id, parameter value) """ offset = 0 # response offset params = [] while offset < resp_len: # parse all params new_offset = offset + 5 # get param id and type param_id, *param_value = response[offset:new_offset] param_type = NBusParameterID(param_id) param_raw_value = struct.unpack(" Tuple[list[NBusDataValue], int]: """ Parse data from response. :param data_format: format of data :param response: response from NBus :return: parsed data and length of parsed data in bytes """ # variables for 2's complement max_uint = 1 << (data_format.byte_length << 3) half_max_uint = max_uint >> 1 values = [] for s in range(data_format.samples): value = 0 for i in range(data_format.byte_length): value |= response[(i + 1) + (data_format.byte_length * s)] << (i << 3) # compose value little endian if data_format.sign == 1 and value >= half_max_uint: # convert 2's complement value -= max_uint value *= 10 ** data_format.value_multiplier # scale number values.append(value) # append number to list return values, data_format.samples * data_format.byte_length @staticmethod def parameters_to_request(params: dict[NBusParameterID, NBusParameterValue]) -> bytearray: """ Create request from parameters dictionary. :param params: parameter dict :return: parameter bytearray """ param_bytes = bytearray() for p_id in params.keys(): param_id_raw = struct.pack("B", p_id.value) param_val_raw = struct.pack(" list[int]: """ Parse data to request format. :param data_format: format of data :param data: data values :return: request bytes """ # variables for 2's complement max_uint = 1 << (data_format.byte_length << 3) request = [] for value in data: # Reverse scaling value = int(value / (10 ** data_format.value_multiplier)) # Handle 2's complement for signed values if data_format.sign == 1 and value < 0: value += max_uint # Break the value into bytes (little-endian) for i in range(data_format.byte_length): request.append((value >> (i << 3)) & 0xFF) return request