Add keyword arguments to Receiver.receive() (#214)

* Add keyword arguments to Receiver.receive()

This change allows for more low level control of communication, e.g.
with serial devices.

* Rename line termination char and add to config.
This commit is contained in:
Felix Weiler
2025-05-06 13:40:11 +02:00
committed by GitHub
parent e95cbda207
commit 2767dceddb
9 changed files with 21 additions and 9 deletions

View File

@@ -28,7 +28,7 @@ class ConsoleReceiver(Receiver[T]):
data, extra = self.unpacker(payload)
await self.queue.put((data, extra))
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Receive the next message from the console input."""
if not self.task:
self.task = asyncio.create_task(self._listener_task())

View File

@@ -30,7 +30,7 @@ class Receiver(ABC, Generic[T_co]):
unpacker: Unpacker[T_co]
@abstractmethod
async def receive(self) -> tuple[T_co, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T_co, dict[str, Any]]:
"""Receive data from the implemented input stream.
Returns:

View File

@@ -72,7 +72,7 @@ class FileReader(Receiver[T]):
if self._current_file is not None:
await self._loop.run_in_executor(self._executor, self._current_file.close)
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Get the next data and extra tuple from the watched file."""
return await anext(self._iter)

View File

@@ -54,7 +54,7 @@ class MqttReceiver(Receiver[T]):
self._message_queue: Queue[Message] = Queue(self.config.max_saved_messages)
self._listener_task: Task[None] | None = None
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Receive and process the next message from the queue.
Returns:

View File

@@ -27,3 +27,4 @@ class SerialConf(BaseConf):
encoding: str = "ascii"
parity: Literal["N", "O", "E"] = "N" # definitions from serial.PARTITY_'N'ONE / 'O'DD / 'E'VEN
stopbits: Literal[1, 2] = 1 # 1.5 not yet implemented
termination_char: bytes = b"\n"

View File

@@ -34,12 +34,18 @@ class SerialReceiver(Receiver[T]):
self._is_connected = False
self._cancel_read_timeout = 1
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive( # noqa: D417
self, *, termination_char: bytes | None = None, read_bytes: int = -1, **kwargs: Any
) -> tuple[T, dict[str, Any]]:
"""Receive data from the serial port.
This method reads a line from the serial port, unpacks it, and returns the data.
If the serial port is not connected, it will attempt to connect first.
Arguments:
termination_char: Line termination character that signals the message end.
read_bytes: Number of bytes to read. Defaults to -1, i.e. infinite.
Returns:
tuple[T, dict[str, Any]]: A tuple containing the unpacked data and any extra information.
@@ -50,8 +56,13 @@ class SerialReceiver(Receiver[T]):
if not self._is_connected:
await self.start()
# Use config termination char by default, overwrite with passed termination char
expected_line_termintation = termination_char or self.config.termination_char
try:
payload = await asyncio.get_running_loop().run_in_executor(self._executor, self._ser.readline, -1)
payload = await asyncio.get_running_loop().run_in_executor(
self._executor, self._ser.read_until, expected_line_termintation, read_bytes
)
except asyncio.CancelledError:
await asyncio.shield(self._cancel_read())
raise

View File

@@ -24,7 +24,7 @@ class TcpReceiver(Receiver[T]):
self.reader: asyncio.StreamReader | None = None
self.writer: asyncio.StreamWriter | None = None
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Receive data from a connection.
Attempt to read data from the connection and handle the process of re-establishing the connection if necessary.

View File

@@ -60,7 +60,7 @@ class UdpReceiver(Receiver[T]):
self._transport = None
self._is_connected = False
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Get the next message from the udp connection.
Returns:

View File

@@ -31,7 +31,7 @@ class ZmqReceiver(Receiver[T]):
self.unpack = unpacker
self.is_connected = False
async def receive(self) -> tuple[T, dict[str, Any]]:
async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]:
"""Read a message from the zmq bus and return it.
Returns: