nbus_common_parser.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import struct
  2. from typing import Tuple, Any
  3. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  4. from nbus_types.nbus_exceptions.nbus_api_exception import NBusErrorAPI, NBusErrorAPIType
  5. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  6. class NbusCommonParser:
  7. """
  8. Class for common parsers for both device and module.
  9. These methods should be called from the inside of API modules.
  10. """
  11. @staticmethod
  12. def format_from_response(response: list[int]) -> NBusDataFormat:
  13. """
  14. Parse format from response.
  15. Note: byte representation is:
  16. | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length |
  17. :param response: response from NBus
  18. :return: data format
  19. """
  20. sign = bool(response[1] & 0x01)
  21. unit_mul = response[1] >> 1
  22. if unit_mul >= 64:
  23. unit_mul -= 128
  24. value_mul = response[2]
  25. if value_mul >= 128:
  26. value_mul -= 256
  27. byte_len = response[3] & 0x0F
  28. samples = (response[3] & 0xF0) >> 4
  29. return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul,
  30. byte_length=byte_len, samples=samples)
  31. @staticmethod
  32. def data_from_response(nbus_device: Any, response: list[int]) -> Tuple[list[NBusDataValue], int]:
  33. """
  34. Parse data from response.
  35. :param nbus_device: device to parse for
  36. :param response: response from NBus
  37. :return: parsed data and length of parsed data in bytes
  38. """
  39. if nbus_device.data_format is None: # check for format and params
  40. raise NBusErrorAPI(NBusErrorAPIType.FORMAT_NOT_LOADED, nbus_device)
  41. if not nbus_device.data_parameters_loaded():
  42. raise NBusErrorAPI(NBusErrorAPIType.PARAMS_NOT_LOADED, nbus_device)
  43. # variables for 2's complement
  44. max_uint = 1 << (nbus_device.data_format.byte_length << 3)
  45. half_max_uint = max_uint >> 1
  46. values = []
  47. for s in range(nbus_device.data_format.samples):
  48. value = 0
  49. for i in range(nbus_device.data_format.byte_length):
  50. value |= response[(i + 1) + (nbus_device.data_format.byte_length * s)] << (i << 3)
  51. # compose value little endian
  52. if nbus_device.data_format.sign == 1 and value >= half_max_uint: # convert 2's complement
  53. value -= max_uint
  54. value *= 10 ** nbus_device.data_format.value_multiplier # scale number
  55. values.append(value) # append number to list
  56. return nbus_device.map_data_get(values), nbus_device.data_format.samples * nbus_device.data_format.byte_length
  57. @staticmethod
  58. def parameters_from_response(resp_len: int, response: list[int]) \
  59. -> list[Tuple[NBusParameterID, NBusParameterValue]]:
  60. """
  61. Parse multiple parameters from response.
  62. :param resp_len: response length in bytes
  63. :param response: response from nBus
  64. :return: list of (parameter id, parameter value)
  65. """
  66. offset = 0 # response offset
  67. params = []
  68. while offset < resp_len: # parse all params
  69. new_offset = offset + 5
  70. # get param id and type
  71. param_id, *param_value = response[offset:new_offset]
  72. param_type = NBusParameterID(param_id)
  73. param_raw_value = struct.unpack("<I", bytearray(param_value))[0] # get single raw value from bytes
  74. params.append((param_type, param_raw_value))
  75. offset = new_offset
  76. return params