nbus_common_parser.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import struct
  2. from typing import Tuple
  3. from nbus_types.nbus_data_fomat import NBusDataFormat, NBusDataValue
  4. from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
  5. class NbusCommonParser:
  6. """
  7. Static class for common parsers for both device and module.
  8. These methods should be called from the inside of API modules.
  9. """
  10. @staticmethod
  11. def format_from_response(response: list[int]) -> NBusDataFormat:
  12. """
  13. Parse format from response.
  14. Note: byte representation is:
  15. | device_address | unit_multiplier, sign | value_multiplier | samples, byte_length |
  16. :param response: response from NBus
  17. :return: data format
  18. """
  19. sign = bool(response[1] & 0x01)
  20. unit_mul = response[1] >> 1
  21. if unit_mul >= 64:
  22. unit_mul -= 128
  23. value_mul = response[2]
  24. if value_mul >= 128:
  25. value_mul -= 256
  26. byte_len = response[3] & 0x0F
  27. samples = (response[3] & 0xF0) >> 4
  28. return NBusDataFormat(sign=sign, unit_multiplier=unit_mul, value_multiplier=value_mul,
  29. byte_length=byte_len, samples=samples)
  30. @staticmethod
  31. def parameters_from_response(resp_len: int, response: list[int]) \
  32. -> list[Tuple[NBusParameterID, NBusParameterValue]]:
  33. """
  34. Parse multiple parameters from response.
  35. :param resp_len: response length in bytes
  36. :param response: response from nBus
  37. :return: list of (parameter id, parameter value)
  38. """
  39. offset = 0 # response offset
  40. params = []
  41. while offset < resp_len: # parse all params
  42. new_offset = offset + 5
  43. # get param id and type
  44. param_id, *param_value = response[offset:new_offset]
  45. param_type = NBusParameterID(param_id)
  46. param_raw_value = struct.unpack("<I", bytearray(param_value))[0] # get single raw value from bytes
  47. params.append((param_type, param_raw_value))
  48. offset = new_offset
  49. return params
  50. @staticmethod
  51. def data_from_response(data_format: NBusDataFormat, response: list[int]) -> Tuple[list[NBusDataValue], int]:
  52. """
  53. Parse data from response.
  54. :param data_format: format of data
  55. :param response: response from NBus
  56. :return: parsed data and length of parsed data in bytes
  57. """
  58. # variables for 2's complement
  59. max_uint = 1 << (data_format.byte_length << 3)
  60. half_max_uint = max_uint >> 1
  61. values = []
  62. for s in range(data_format.samples):
  63. value = 0
  64. for i in range(data_format.byte_length):
  65. value |= response[(i + 1) + (data_format.byte_length * s)] << (i << 3)
  66. # compose value little endian
  67. if data_format.sign == 1 and value >= half_max_uint: # convert 2's complement
  68. value -= max_uint
  69. value *= 10 ** data_format.value_multiplier # scale number
  70. values.append(value) # append number to list
  71. return values, data_format.samples * data_format.byte_length
  72. @staticmethod
  73. def parameters_to_request(params: dict[NBusParameterID, NBusParameterValue]) -> bytearray:
  74. """
  75. Create request from parameters dictionary.
  76. :param params: parameter dict
  77. :return: parameter bytearray
  78. """
  79. param_bytes = bytearray()
  80. for p_id in params.keys():
  81. param_id_raw = struct.pack("B", p_id.value)
  82. param_val_raw = struct.pack("<I", params[p_id])
  83. param_bytes += bytearray(param_id_raw) + bytearray(param_val_raw)
  84. return param_bytes
  85. @staticmethod
  86. def data_to_request(data_format: NBusDataFormat, data: list[NBusDataValue]) -> list[int]:
  87. """
  88. Parse data to request format.
  89. :param data_format: format of data
  90. :param data: data values
  91. :return: request bytes
  92. """
  93. # variables for 2's complement
  94. max_uint = 1 << (data_format.byte_length << 3)
  95. request = []
  96. for value in data:
  97. # Reverse scaling
  98. value = int(value / (10 ** data_format.value_multiplier))
  99. # Handle 2's complement for signed values
  100. if data_format.sign == 1 and value < 0:
  101. value += max_uint
  102. # Break the value into bytes (little-endian)
  103. for i in range(data_format.byte_length):
  104. request.append((value >> (i << 3)) & 0xFF)
  105. return request