| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import struct
- from typing import Tuple, Any
- from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
- from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
- from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
- class NbusCommonParser:
- """
- Class for common parsers for both device and module.
- These methods should be called from the inside of API modules.
- """
- @staticmethod
- def format_from_response(response: list[int]) -> NBusDataFormat:
- """
- Parse format from response.
- Note: byte representation is:
- | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length |
- :param response: response from NBus
- :return: data format
- """
- sign = bool(response[1] & 0x01)
- unit_mul = response[1] >> 1
- if unit_mul >= 64:
- unit_mul -= 128
- value_mul = response[2]
- if value_mul >= 128:
- value_mul -= 256
- byte_len = response[3] & 0x0F
- samples = (response[3] & 0xF0) >> 4
- return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul,
- byte_length=byte_len, samples=samples)
- @staticmethod
- def data_from_response(nbus_device: Any, response: list[int]) -> Tuple[list[NBusDataValue], int]:
- """
- Parse data from response.
- :param nbus_device: device to parse for
- :param response: response from NBus
- :return: parsed data and length of parsed data in bytes
- """
- if nbus_device.data_format is None: # check for format and params
- raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED, nbus_device)
- if not nbus_device.data_parameters_loaded():
- raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED, nbus_device)
- # variables for 2's complement
- max_uint = 1 << (nbus_device.data_format.byte_length << 3)
- half_max_uint = max_uint >> 1
- values = []
- for s in range(nbus_device.data_format.samples):
- value = 0
- for i in range(nbus_device.data_format.byte_length):
- value |= response[(i + 1) + (nbus_device.data_format.byte_length * s)] << (i << 3)
- # compose value little endian
- if nbus_device.data_format.sign == 1 and value >= half_max_uint: # convert 2's complement
- value -= max_uint
- value *= 10 ** nbus_device.data_format.value_multiplier # scale number
- values.append(value) # append number to list
- return nbus_device.map_data_get(values), nbus_device.data_format.samples * nbus_device.data_format.byte_length
- @staticmethod
- def parameters_from_response(resp_len: int, response: list[int]) \
- -> list[Tuple[NBusParameterID, NBusParameterValue]]:
- """
- Parse multiple parameters from response.
- :param resp_len: response length in bytes
- :param response: response from nBus
- :return: list of (parameter id, parameter value)
- """
- offset = 0 # response offset
- params = []
- while offset < resp_len: # parse all params
- new_offset = offset + 5
- # get param id and type
- param_id, *param_value = response[offset:new_offset]
- param_type = NBusParameterID(param_id)
- param_raw_value = struct.unpack("<I", bytearray(param_value))[0] # get single raw value from bytes
- params.append((param_type, param_raw_value))
- offset = new_offset
- return params
|