| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- import struct
- from typing import Tuple
- from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
- from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
- class NbusCommonParser:
- """
- Static class for common parsers for both device and module.
- These methods should be called from the inside of API modules.
- """
- @staticmethod
- def format_from_response(response: list[int]) -> NBusDataFormat:
- """
- Parse format from response.
- Note: byte representation is:
- | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length |
- :param response: response from NBus
- :return: data format
- """
- sign = bool(response[1] & 0x01)
- unit_mul = response[1] >> 1
- if unit_mul >= 64:
- unit_mul -= 128
- value_mul = response[2]
- if value_mul >= 128:
- value_mul -= 256
- byte_len = response[3] & 0x0F
- samples = (response[3] & 0xF0) >> 4
- return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul,
- byte_length=byte_len, samples=samples)
- @staticmethod
- def parameters_from_response(resp_len: int, response: list[int]) \
- -> list[Tuple[NBusParameterID, NBusParameterValue]]:
- """
- Parse multiple parameters from response.
- :param resp_len: response length in bytes
- :param response: response from nBus
- :return: list of (parameter id, parameter value)
- """
- offset = 0 # response offset
- params = []
- while offset < resp_len: # parse all params
- new_offset = offset + 5
- # get param id and type
- param_id, *param_value = response[offset:new_offset]
- param_type = NBusParameterID(param_id)
- param_raw_value = struct.unpack("<I", bytearray(param_value))[0] # get single raw value from bytes
- params.append((param_type, param_raw_value))
- offset = new_offset
- return params
- @staticmethod
- def data_from_response(data_format: NBusDataFormat, response: list[int]) -> Tuple[list[NBusDataValue], int]:
- """
- Parse data from response.
- :param data_format: format of data
- :param response: response from NBus
- :return: parsed data and length of parsed data in bytes
- """
- # variables for 2's complement
- max_uint = 1 << (data_format.byte_length << 3)
- half_max_uint = max_uint >> 1
- values = []
- for s in range(data_format.samples):
- value = 0
- for i in range(data_format.byte_length):
- value |= response[(i + 1) + (data_format.byte_length * s)] << (i << 3)
- # compose value little endian
- if data_format.sign == 1 and value >= half_max_uint: # convert 2's complement
- value -= max_uint
- value *= 10 ** data_format.value_multiplier # scale number
- values.append(value) # append number to list
- return values, data_format.samples * data_format.byte_length
- @staticmethod
- def parameters_to_request(params: dict[NBusParameterID, NBusParameterValue]) -> bytearray:
- """
- Create request from parameters dictionary.
- :param params: parameter dict
- :return: parameter bytearray
- """
- param_bytes = bytearray()
- for p_id in params.keys():
- param_id_raw = struct.pack("B", p_id.value)
- param_val_raw = struct.pack("<I", params[p_id])
- param_bytes += bytearray(param_id_raw) + bytearray(param_val_raw)
- return param_bytes
- @staticmethod
- def data_to_request(data_format: NBusDataFormat, data: list[NBusDataValue]) -> list[int]:
- """
- Parse data to request format.
- :param data_format: format of data
- :param data: data values
- :return: request bytes
- """
- # variables for 2's complement
- max_uint = 1 << (data_format.byte_length << 3)
- request = []
- for value in data:
- # Reverse scaling
- value = int(value / (10 ** data_format.value_multiplier))
- # Handle 2's complement for signed values
- if data_format.sign == 1 and value < 0:
- value += max_uint
- # Break the value into bytes (little-endian)
- for i in range(data_format.byte_length):
- request.append((value >> (i << 3)) & 0xFF)
- return request
|