Add registry for all senders, receivers and config files. (#188)

* Add registry for all senders, receivers and config files.

Minor tweaks to signatures of SerialReceiver and ConsoleSender to
maintain compability with ather Senders and Receivers.

* fixed failing console test

* fixed import order

---------

Co-authored-by: Aljoscha Sander <aljoscha@flucto.tech>
This commit is contained in:
Felix Weiler
2025-01-23 13:24:37 +00:00
committed by GitHub
parent d2f4233c98
commit cf0f8e73c2
13 changed files with 80 additions and 23 deletions

View File

@@ -1,4 +1,9 @@
from heisskleber.console.receiver import ConsoleReceiver
from heisskleber.console.sender import ConsoleSender
from heisskleber.core import register
from .config import ConsoleConf
from .receiver import ConsoleReceiver
from .sender import ConsoleSender
register("console", ConsoleSender, ConsoleReceiver, ConsoleConf)
__all__ = ["ConsoleReceiver", "ConsoleSender"]

View File

@@ -0,0 +1,11 @@
from dataclasses import dataclass
from heisskleber.core.config import BaseConf
@dataclass
class ConsoleConf(BaseConf):
"""Configuration class for Console operations."""
verbose: bool = False
pretty: bool = False

View File

@@ -4,6 +4,8 @@ from typing import Any, TypeVar
from heisskleber.core import Receiver, Unpacker, json_unpacker
from .config import ConsoleConf
T = TypeVar("T")
@@ -12,16 +14,18 @@ class ConsoleReceiver(Receiver[T]):
def __init__(
self,
config: ConsoleConf,
unpacker: Unpacker[T] = json_unpacker, # type: ignore[assignment]
) -> None:
self.queue: asyncio.Queue[tuple[T, dict[str, Any]]] = asyncio.Queue(maxsize=10)
self.unpack = unpacker
self.unpacker = unpacker
self.config = config
self.task: asyncio.Task[None] | None = None
async def _listener_task(self) -> None:
while True:
payload = sys.stdin.readline().encode() # I know this is stupid, but I adhere to the interface for now
data, extra = self.unpack(payload)
data, extra = self.unpacker(payload)
await self.queue.put((data, extra))
async def receive(self) -> tuple[T, dict[str, Any]]:

View File

@@ -2,6 +2,8 @@ from typing import Any, TypeVar
from heisskleber.core import Packer, Sender, json_packer
from .config import ConsoleConf
T = TypeVar("T")
@@ -10,12 +12,11 @@ class ConsoleSender(Sender[T]):
def __init__(
self,
pretty: bool = False,
verbose: bool = False,
config: ConsoleConf,
packer: Packer[T] = json_packer, # type: ignore[assignment]
) -> None:
self.verbose = verbose
self.pretty = pretty
self.verbose = config.verbose
self.pretty = config.pretty
self.packer = packer
async def send(self, data: T, topic: str | None = None, **kwargs: dict[str, Any]) -> None:

View File

@@ -1,5 +1,7 @@
"""Core classes of the heisskleber library."""
from typing import Any
from .config import BaseConf, ConfigType
from .packer import JSONPacker, Packer, PackerError
from .receiver import Receiver
@@ -9,15 +11,29 @@ from .unpacker import JSONUnpacker, Unpacker, UnpackerError
json_packer = JSONPacker()
json_unpacker = JSONUnpacker()
_sender_registry: dict[str, type[Sender[Any]]] = {}
_receiver_registry: dict[str, type[Receiver[Any]]] = {}
_config_registry: dict[str, type[BaseConf]] = {}
def register(name: str, sender: type[Sender[Any]], receiver: type[Receiver[Any]], config: type[BaseConf]) -> None:
"""Register classes."""
_sender_registry[name] = sender
_receiver_registry[name] = receiver
_config_registry[name] = config
__all__ = [
"Packer",
"Unpacker",
"Sender",
"Receiver",
"json_packer",
"json_unpacker",
"BaseConf",
"ConfigType",
"Packer",
"PackerError",
"Receiver",
"Sender",
"Unpacker",
"UnpackerError",
"json_packer",
"json_unpacker",
"register",
]

View File

@@ -6,8 +6,12 @@ MQTT implementation is achieved via the `aiomqtt`_ package, which is an async wr
.. _paho-mqtt: https://github.com/eclipse/paho.mqtt.python
"""
from heisskleber.core import register
from .config import MqttConf
from .receiver import MqttReceiver
from .sender import MqttSender
register("mqtt", MqttSender, MqttReceiver, MqttConf)
__all__ = ["MqttConf", "MqttReceiver", "MqttSender"]

View File

@@ -1,7 +1,11 @@
"""Asyncronous implementations to read and write to a serial interface."""
from heisskleber.core import register
from .config import SerialConf
from .receiver import SerialReceiver
from .sender import SerialSender
__all__ = ["SerialConf", "SerialSender", "SerialReceiver"]
register("serial", SerialSender, SerialReceiver, SerialConf)
__all__ = ["SerialConf", "SerialReceiver", "SerialSender"]

View File

@@ -25,9 +25,9 @@ class SerialReceiver(Receiver[T]):
"""
def __init__(self, config: SerialConf, unpack: Unpacker[T]) -> None:
def __init__(self, config: SerialConf, unpacker: Unpacker[T]) -> None:
self.config = config
self.unpacker = unpack
self.unpacker = unpacker
self._loop = asyncio.get_running_loop()
self._executor = ThreadPoolExecutor(max_workers=2)
self._lock = asyncio.Lock()

View File

@@ -1,5 +1,9 @@
from heisskleber.core import register
from .config import TcpConf
from .receiver import TcpReceiver
from .sender import TcpSender
__all__ = ["TcpReceiver", "TcpSender", "TcpConf"]
register("tcp", TcpSender, TcpReceiver, TcpConf)
__all__ = ["TcpConf", "TcpReceiver", "TcpSender"]

View File

@@ -1,5 +1,9 @@
from heisskleber.core import register
from .config import UdpConf
from .receiver import UdpReceiver
from .sender import UdpSender
__all__ = ["UdpReceiver", "UdpSender", "UdpConf"]
register("udp", UdpSender, UdpReceiver, UdpConf)
__all__ = ["UdpConf", "UdpReceiver", "UdpSender"]

View File

@@ -1,5 +1,9 @@
from heisskleber.core import register
from .config import ZmqConf
from .receiver import ZmqReceiver
from .sender import ZmqSender
__all__ = ["ZmqConf", "ZmqSender", "ZmqReceiver"]
register("zmq", ZmqSender, ZmqReceiver, ZmqConf)
__all__ = ["ZmqConf", "ZmqReceiver", "ZmqSender"]

View File

@@ -1,11 +1,11 @@
import pytest
from heisskleber.console import ConsoleSender
from heisskleber.console import ConsoleConf, ConsoleSender
@pytest.mark.asyncio
async def test_console_sink(capsys) -> None:
sink = ConsoleSender()
sink = ConsoleSender(ConsoleConf())
await sink.send({"key": 3}, "test")
captured = capsys.readouterr()

View File

@@ -24,7 +24,7 @@ async def test_serial_with_ser() -> None:
port=reader_port,
baudrate=9600,
)
source = SerialReceiver(conf, unpack=serial_unpacker)
source = SerialReceiver(conf, unpacker=serial_unpacker)
await asyncio.sleep(0.1)