| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import struct
- from abc import abstractmethod
- from typing import Tuple, Annotated
- from beartype.vale import Is
- from nbus_api.nbus_slave_device import NBusSlaveDevice
- from nbus_api.nbus_common_parser import NbusCommonParser
- from nbus_hal.nbus_serial.serial_port import *
- from nbus_types.nbus_address_type import NBusModuleAddress
- from nbus_types.nbus_parameter_type import NBusParameterID, NBusParameterValue
- @beartype
- class NBusSlaveModule:
- def __init__(self, serial_port: NBusSerialPort, module_address: NBusModuleAddress):
- self.__port = serial_port
- self.__module_addr = module_address
- self.__params = {}
- self.__devices = {}
- self._map_param_get = lambda t, v: v
- self._map_param_set = lambda t, v: v
- def set_module_parameter_mappers(self, map_param_get_cb: Callable[[NBusParameterID, NBusParameterValue], int],
- map_param_set_cb: Callable[[NBusParameterID, int], NBusParameterValue]):
- self._map_param_get = map_param_get_cb
- self._map_param_set = map_param_set_cb
- def add_device(self, device: NBusSlaveDevice) -> None:
- device.set_parent_module_address(self.__module_addr)
- device.set_device_port(self.__port)
- self.__devices[device.address] = device
- def get_device(self, device_address: NBusDeviceAddress) -> NBusSlaveDevice:
- return self.__devices[device_address]
- """
- ================================================================================================================
- Module Get Commands
- ================================================================================================================
- """
- def cmd_get_echo(self, message: bytearray) -> bool:
- """
- Send Echo Command.
- :param message: message to send
- :return: status (True = echo, False = no echo)
- """
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_ECHO, message)
- return response == list(message)
- def cmd_get_param(self, parameter: NBusParameterID) -> Tuple[NBusParameterID, NBusParameterValue]:
- # get response
- resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM,
- bytearray([parameter.value]))
- param_id, param_val_raw = NbusCommonParser.parameters_from_response(resp_len, response)[0]
- param_val = self._map_param_get(param_id, param_val_raw)
- self.__params[param_id] = param_val
- return param_id, param_val
- def cmd_get_all_params(self) -> list[Tuple[NBusParameterID, NBusParameterValue]]:
- resp_len, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_PARAM, bytearray([]))
- params_raw = NbusCommonParser.parameters_from_response(resp_len, response)
- params = []
- for param_id, param_val_raw in params_raw:
- param_val = self._map_param_get(param_id, param_val_raw)
- params.append((param_id, param_val))
- self.__params[param_id] = param_val
- return params
- def cmd_get_sensor_cnt(self) -> Annotated[int, Is[lambda value: 0 < value < 64]]:
- _, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_SENSOR_CNT, bytearray([]))
- return response[0]
- def cmd_get_data(self):
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_DATA, bytearray([]))
- begin_idx = 0
- data = []
- while begin_idx < resp_length:
- device_id = response[begin_idx]
- values, offset = NbusCommonParser.data_from_response(self.__devices[device_id], response[begin_idx:])
- data.append((device_id, values))
- begin_idx += offset + 1
- return data
- def cmd_get_info(self, parameter: NBusInfoParam):
- resp_length, *response = self.__port.request_module(self.__module_addr, NBusCommand.CMD_GET_INFO,
- bytearray([parameter.value]))
- if parameter == NBusInfoParam.INFO_FORMAT:
- begin_idx = 0
- formats = []
- while begin_idx < resp_length:
- device_id = response[begin_idx]
- device_format = NbusCommonParser.format_from_response(response[begin_idx:begin_idx + 4])
- self.__devices[device_id].data_format = device_format
- formats.append((device_id, device_format))
- begin_idx += 4
- return formats
- def cmd_get_sensor_type(self):
- pass
- """
- ================================================================================================================
- Module Set Commands
- ================================================================================================================
- """
- def cmd_set_module_stop(self):
- self._send_request_module(self.__module_addr, NBusCommand.CMD_SET_STOP, bytearray([]))
- def cmd_set_module_start(self):
- self.__port.request_module(self.__module_addr,NBusCommand.CMD_SET_START, bytearray([]))
|