|
@@ -34,6 +34,10 @@ class NBusSerialPort(NBusPort):
|
|
|
Class representing nBus serial port.
|
|
Class representing nBus serial port.
|
|
|
"""
|
|
"""
|
|
|
def __init__(self, config: NBusSerialConfig):
|
|
def __init__(self, config: NBusSerialConfig):
|
|
|
|
|
+ """
|
|
|
|
|
+ Constructor.
|
|
|
|
|
+ :param config: configuration
|
|
|
|
|
+ """
|
|
|
self._port = serial.Serial(timeout=config.timeout)
|
|
self._port = serial.Serial(timeout=config.timeout)
|
|
|
self._port.port = config.port_name
|
|
self._port.port = config.port_name
|
|
|
self._port.parity = config.parity.value
|
|
self._port.parity = config.parity.value
|
|
@@ -48,6 +52,18 @@ class NBusSerialPort(NBusPort):
|
|
|
================================================================================================================
|
|
================================================================================================================
|
|
|
"""
|
|
"""
|
|
|
|
|
|
|
|
|
|
+ def change_configuration(self, config: NBusSerialConfig):
|
|
|
|
|
+ """
|
|
|
|
|
+ Change port configuration.
|
|
|
|
|
+ :param config: configuration
|
|
|
|
|
+ """
|
|
|
|
|
+ self._port.timeout = config.timeout
|
|
|
|
|
+ self._port.port = config.port_name
|
|
|
|
|
+ self._port.parity = config.parity.value
|
|
|
|
|
+ self._port.baudrate = config.baud.value
|
|
|
|
|
+ self._enable_log = config.enable_log
|
|
|
|
|
+ self._request_attempts = config.request_attempts
|
|
|
|
|
+
|
|
|
def open(self) -> None:
|
|
def open(self) -> None:
|
|
|
"""
|
|
"""
|
|
|
Open port.
|
|
Open port.
|
|
@@ -83,8 +99,8 @@ class NBusSerialPort(NBusPort):
|
|
|
:param command: command id
|
|
:param command: command id
|
|
|
:param data: command data to send
|
|
:param data: command data to send
|
|
|
"""
|
|
"""
|
|
|
- request = self._create_packet([0, 0, 0, command.value], data)
|
|
|
|
|
- self._log("\tBRQ>", request)
|
|
|
|
|
|
|
+ request = self._create_packet(bytearray([0, 0, 0, command.value]), data)
|
|
|
|
|
+ self._log("\tBRQ>", list(request))
|
|
|
self._port.write(request) # send message
|
|
self._port.write(request) # send message
|
|
|
|
|
|
|
|
def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
|
|
def request_module(self, module_addr: NBusModuleAddress, command: NBusCommand, data: bytearray,
|
|
@@ -130,7 +146,7 @@ class NBusSerialPort(NBusPort):
|
|
|
:return: response length | response
|
|
:return: response length | response
|
|
|
"""
|
|
"""
|
|
|
request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
|
|
request = self._create_packet(bytearray([0, module, sensor, command]), data) # create request
|
|
|
- self._log('d', sensor, "\tRQ>", request) # log request
|
|
|
|
|
|
|
+ self._log('d', sensor, "\tRQ>", list(request)) # log request
|
|
|
|
|
|
|
|
counter = 0 # err. trials
|
|
counter = 0 # err. trials
|
|
|
|
|
|