Files
flucto-heisskleber/tests/test_console_sink.py
Felix Weiler 8c985bdf3c Refactor/background tasks (#75)
* Add start, stop and __repr__ to sink and source types.

* Merge conflicts on mqtt async pub and resampler.

* Add start() and stop() functions to udp and zmq.

Change tests accordingly.

* Rename broker, ip, interface to common config name "host".

* Updated "host" entry in config files.

* Add lazyload to mqtt-source.
2024-02-22 18:50:13 +08:00

60 lines
1.4 KiB
Python

import pytest
from heisskleber.console.sink import AsyncConsoleSink, ConsoleSink
def test_console_sink(capsys) -> None:
sink = ConsoleSink()
sink.send({"key": 3}, "test")
captured = capsys.readouterr()
assert captured.out == "{'key': 3}\n"
def test_console_sink_verbose(capsys) -> None:
sink = ConsoleSink(verbose=True)
sink.send({"key": 3}, "test")
captured = capsys.readouterr()
assert captured.out == "test:\t{'key': 3}\n"
def test_console_sink_pretty(capsys) -> None:
sink = ConsoleSink(pretty=True)
sink.send({"key": 3}, "test")
captured = capsys.readouterr()
assert captured.out == '{\n "key": 3\n}\n'
def test_console_sink_pretty_verbose(capsys) -> None:
sink = ConsoleSink(pretty=True, verbose=True)
sink.send({"key": 3}, "test")
captured = capsys.readouterr()
assert captured.out == 'test:\t{\n "key": 3\n}\n'
def test_console_repr() -> None:
sink = ConsoleSink()
assert repr(sink) == "ConsoleSink(pretty=False, verbose=False)"
def test_async_console_repr() -> None:
sink = AsyncConsoleSink()
assert repr(sink) == "AsyncConsoleSink(pretty=False, verbose=False)"
@pytest.mark.asyncio
async def test_async_console_sink(capsys) -> None:
sink = AsyncConsoleSink()
await sink.send({"key": 3}, "test")
captured = capsys.readouterr()
assert captured.out == "{'key': 3}\n"