Fixed signature on zmq, added udp to factories.

This commit is contained in:
Felix Weiler
2023-11-07 13:10:16 +01:00
parent f0a7f9086a
commit 7021eb1ab4
4 changed files with 19 additions and 13 deletions

View File

@@ -4,18 +4,21 @@ from heisskleber.config import BaseConf, load_config
from heisskleber.core.types import Publisher, Subscriber
from heisskleber.mqtt import MqttConf, MqttPublisher, MqttSubscriber
from heisskleber.serial import SerialConf, SerialPublisher, SerialSubscriber
from heisskleber.udp import UdpConf, UdpPublisher, UdpSubscriber
from heisskleber.zmq import ZmqConf, ZmqPublisher, ZmqSubscriber
_registered_publishers: dict[str, tuple[type[Publisher], type[BaseConf]]] = {
"zmq": (ZmqPublisher, ZmqConf),
"mqtt": (MqttPublisher, MqttConf),
"serial": (SerialPublisher, SerialConf),
"udp": (UdpPublisher, UdpConf),
}
_registered_subscribers: dict[str, tuple[type[Subscriber], type[BaseConf]]] = {
"zmq": (ZmqSubscriber, ZmqConf),
"mqtt": (MqttSubscriber, MqttConf),
"serial": (SerialSubscriber, SerialConf),
"udp": (UdpSubscriber, UdpConf),
}

View File

@@ -0,0 +1,5 @@
from .config import UdpConf
from .publisher import UdpPublisher
from .subscriber import UdpSubscriber
__all__ = ["UdpSubscriber", "UdpPublisher", "UdpConf"]

View File

@@ -1,10 +1,9 @@
import sys
from typing import Any
import zmq
from heisskleber.core.packer import get_packer
from heisskleber.core.types import Publisher
from heisskleber.core.types import Publisher, Serializable
from .config import ZmqConf
@@ -26,9 +25,9 @@ class ZmqPublisher(Publisher):
print(f"failed to bind to zeromq socket: {e}")
sys.exit(-1)
def send(self, topic: str, data: dict[str, Any]) -> None:
def send(self, data: dict[str, Serializable], topic: str) -> None:
payload = self.pack(data)
self.socket.send_multipart([topic, payload.encode("utf-8")])
self.socket.send_multipart([topic.encode(), payload.encode()])
def __del__(self):
self.socket.close()

View File

@@ -29,12 +29,10 @@ class ZmqSubscriber(Subscriber):
print(f"failed to bind to zeromq socket: {e}")
sys.exit(-1)
def _subscribe_single_topic(self, topic: bytes | str):
if isinstance(topic, str):
topic = topic.encode()
self.socket.setsockopt(zmq.SUBSCRIBE, topic)
def _subscribe_single_topic(self, topic: str):
self.socket.setsockopt(zmq.SUBSCRIBE, topic.encode())
def subscribe(self, topic: bytes | str | list[bytes] | list[str]):
def subscribe(self, topic: str | list[str] | tuple[str]):
# Accepts single topic or list of topics
if isinstance(topic, (list, tuple)):
for t in topic:
@@ -42,15 +40,16 @@ class ZmqSubscriber(Subscriber):
else:
self._subscribe_single_topic(topic)
def receive(self) -> tuple[bytes, dict]:
def receive(self) -> tuple[str, dict]:
"""
reads a message from the zmq bus and returns it
Returns:
tuple(topic: bytes, message: dict): the message received
tuple(topic: str, message: dict): the message received
"""
(topic, message) = self.socket.recv_multipart()
message = self.unpack(message.decode())
(topic, payload) = self.socket.recv_multipart()
message = self.unpack(payload.decode())
topic = topic.decode()
return (topic, message)
def __del__(self):