diff --git a/src/heisskleber/console/receiver.py b/src/heisskleber/console/receiver.py index 6548e15..f53e53a 100644 --- a/src/heisskleber/console/receiver.py +++ b/src/heisskleber/console/receiver.py @@ -28,7 +28,7 @@ class ConsoleReceiver(Receiver[T]): data, extra = self.unpacker(payload) await self.queue.put((data, extra)) - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Receive the next message from the console input.""" if not self.task: self.task = asyncio.create_task(self._listener_task()) diff --git a/src/heisskleber/core/receiver.py b/src/heisskleber/core/receiver.py index 3984fdc..fd4d655 100644 --- a/src/heisskleber/core/receiver.py +++ b/src/heisskleber/core/receiver.py @@ -30,7 +30,7 @@ class Receiver(ABC, Generic[T_co]): unpacker: Unpacker[T_co] @abstractmethod - async def receive(self) -> tuple[T_co, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T_co, dict[str, Any]]: """Receive data from the implemented input stream. Returns: diff --git a/src/heisskleber/file/receiver.py b/src/heisskleber/file/receiver.py index 63800aa..b201553 100644 --- a/src/heisskleber/file/receiver.py +++ b/src/heisskleber/file/receiver.py @@ -72,7 +72,7 @@ class FileReader(Receiver[T]): if self._current_file is not None: await self._loop.run_in_executor(self._executor, self._current_file.close) - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Get the next data and extra tuple from the watched file.""" return await anext(self._iter) diff --git a/src/heisskleber/mqtt/receiver.py b/src/heisskleber/mqtt/receiver.py index befbc92..038a3f1 100644 --- a/src/heisskleber/mqtt/receiver.py +++ b/src/heisskleber/mqtt/receiver.py @@ -54,7 +54,7 @@ class MqttReceiver(Receiver[T]): self._message_queue: Queue[Message] = Queue(self.config.max_saved_messages) self._listener_task: Task[None] | None = None - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Receive and process the next message from the queue. Returns: diff --git a/src/heisskleber/serial/config.py b/src/heisskleber/serial/config.py index 15167e5..3076dbf 100644 --- a/src/heisskleber/serial/config.py +++ b/src/heisskleber/serial/config.py @@ -27,3 +27,4 @@ class SerialConf(BaseConf): encoding: str = "ascii" parity: Literal["N", "O", "E"] = "N" # definitions from serial.PARTITY_'N'ONE / 'O'DD / 'E'VEN stopbits: Literal[1, 2] = 1 # 1.5 not yet implemented + termination_char: bytes = b"\n" diff --git a/src/heisskleber/serial/receiver.py b/src/heisskleber/serial/receiver.py index 013b03b..4bac146 100644 --- a/src/heisskleber/serial/receiver.py +++ b/src/heisskleber/serial/receiver.py @@ -34,12 +34,18 @@ class SerialReceiver(Receiver[T]): self._is_connected = False self._cancel_read_timeout = 1 - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive( # noqa: D417 + self, *, termination_char: bytes | None = None, read_bytes: int = -1, **kwargs: Any + ) -> tuple[T, dict[str, Any]]: """Receive data from the serial port. This method reads a line from the serial port, unpacks it, and returns the data. If the serial port is not connected, it will attempt to connect first. + Arguments: + termination_char: Line termination character that signals the message end. + read_bytes: Number of bytes to read. Defaults to -1, i.e. infinite. + Returns: tuple[T, dict[str, Any]]: A tuple containing the unpacked data and any extra information. @@ -50,8 +56,13 @@ class SerialReceiver(Receiver[T]): if not self._is_connected: await self.start() + # Use config termination char by default, overwrite with passed termination char + expected_line_termintation = termination_char or self.config.termination_char + try: - payload = await asyncio.get_running_loop().run_in_executor(self._executor, self._ser.readline, -1) + payload = await asyncio.get_running_loop().run_in_executor( + self._executor, self._ser.read_until, expected_line_termintation, read_bytes + ) except asyncio.CancelledError: await asyncio.shield(self._cancel_read()) raise diff --git a/src/heisskleber/tcp/receiver.py b/src/heisskleber/tcp/receiver.py index 31a913c..c7cf74a 100644 --- a/src/heisskleber/tcp/receiver.py +++ b/src/heisskleber/tcp/receiver.py @@ -24,7 +24,7 @@ class TcpReceiver(Receiver[T]): self.reader: asyncio.StreamReader | None = None self.writer: asyncio.StreamWriter | None = None - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Receive data from a connection. Attempt to read data from the connection and handle the process of re-establishing the connection if necessary. diff --git a/src/heisskleber/udp/receiver.py b/src/heisskleber/udp/receiver.py index f668bba..be4ff76 100644 --- a/src/heisskleber/udp/receiver.py +++ b/src/heisskleber/udp/receiver.py @@ -60,7 +60,7 @@ class UdpReceiver(Receiver[T]): self._transport = None self._is_connected = False - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Get the next message from the udp connection. Returns: diff --git a/src/heisskleber/zmq/receiver.py b/src/heisskleber/zmq/receiver.py index 880d23d..85b724f 100644 --- a/src/heisskleber/zmq/receiver.py +++ b/src/heisskleber/zmq/receiver.py @@ -31,7 +31,7 @@ class ZmqReceiver(Receiver[T]): self.unpack = unpacker self.is_connected = False - async def receive(self) -> tuple[T, dict[str, Any]]: + async def receive(self, **kwargs: Any) -> tuple[T, dict[str, Any]]: """Read a message from the zmq bus and return it. Returns: