mirror of
https://github.com/OMGeeky/homecontrol.esp-sensors.git
synced 2025-12-26 17:02:29 +01:00
316 lines
11 KiB
Python
316 lines
11 KiB
Python
"""
|
|
Tests for the MQTT Client module.
|
|
|
|
This module contains tests for the MQTTClient class in the mqtt_client.py module.
|
|
"""
|
|
|
|
import struct
|
|
import time
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
import pytest
|
|
|
|
from src.esp_sensors.mqtt_client import (
|
|
MQTTClient,
|
|
MQTTException,
|
|
CONNACK,
|
|
PUBLISH,
|
|
PUBACK,
|
|
SUBACK,
|
|
)
|
|
|
|
|
|
class TestMQTTClient:
|
|
"""Tests for the MQTTClient class."""
|
|
|
|
@pytest.fixture
|
|
def mqtt_client(self):
|
|
"""Fixture providing a basic MQTTClient instance."""
|
|
return MQTTClient(
|
|
client_id="test_client",
|
|
server="test.mosquitto.org",
|
|
port=1883,
|
|
user="test_user",
|
|
password="test_pass",
|
|
keepalive=60,
|
|
ssl=False,
|
|
)
|
|
|
|
def test_init(self, mqtt_client):
|
|
"""Test that the MQTTClient initializes with the correct attributes."""
|
|
assert mqtt_client.client_id == "test_client"
|
|
assert mqtt_client.server == "test.mosquitto.org"
|
|
assert mqtt_client.port == 1883
|
|
assert mqtt_client.user == "test_user"
|
|
assert mqtt_client.password == "test_pass"
|
|
assert mqtt_client.keepalive == 60
|
|
assert mqtt_client.ssl is False
|
|
assert mqtt_client.sock is None
|
|
assert mqtt_client.connected is False
|
|
assert mqtt_client.callback is None
|
|
assert mqtt_client.pid == 0
|
|
assert mqtt_client.subscriptions == {}
|
|
assert mqtt_client.last_ping == 0
|
|
|
|
def test_generate_packet_id(self, mqtt_client):
|
|
"""Test that _generate_packet_id returns sequential IDs and wraps around."""
|
|
# First call should return 1
|
|
assert mqtt_client._generate_packet_id() == 1
|
|
assert mqtt_client.pid == 1
|
|
|
|
# Second call should return 2
|
|
assert mqtt_client._generate_packet_id() == 2
|
|
assert mqtt_client.pid == 2
|
|
|
|
# Set pid to 65535 (max value)
|
|
mqtt_client.pid = 65535
|
|
|
|
# Next call should wrap around to 0
|
|
assert mqtt_client._generate_packet_id() == 0
|
|
assert mqtt_client.pid == 0
|
|
|
|
def test_encode_length(self, mqtt_client):
|
|
"""Test that _encode_length correctly encodes MQTT remaining length."""
|
|
# Test small length (< 128)
|
|
assert list(mqtt_client._encode_length(64)) == [64]
|
|
|
|
# Test medium length (128-16383)
|
|
assert list(mqtt_client._encode_length(128)) == [128 & 0x7F | 0x80, 1]
|
|
assert list(mqtt_client._encode_length(8192)) == [0x80, 0x40]
|
|
|
|
# Test large length (16384-2097151)
|
|
assert list(mqtt_client._encode_length(2097151)) == [0xFF, 0xFF, 0x7F]
|
|
|
|
def test_encode_string(self, mqtt_client):
|
|
"""Test that _encode_string correctly encodes strings for MQTT packets."""
|
|
# Test with string input
|
|
result = mqtt_client._encode_string("test")
|
|
assert len(result) == 6 # 2 bytes length + 4 bytes string
|
|
assert result[0:2] == b"\x00\x04" # Length (4) in network byte order
|
|
assert result[2:] == b"test" # String content
|
|
|
|
# Test with bytes input
|
|
result = mqtt_client._encode_string(b"test")
|
|
assert len(result) == 6
|
|
assert result[0:2] == b"\x00\x04"
|
|
assert result[2:] == b"test"
|
|
|
|
@patch("socket.socket")
|
|
def test_connect_success(self, mock_socket, mqtt_client):
|
|
"""Test successful connection to MQTT broker."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Mock the _recv_packet method to return a successful CONNACK
|
|
with patch.object(
|
|
mqtt_client, "_recv_packet", return_value=(CONNACK, b"\x00\x00")
|
|
):
|
|
# Call connect
|
|
result = mqtt_client.connect()
|
|
|
|
# Verify socket was created and connected
|
|
mock_socket.assert_called_once()
|
|
mock_sock.connect.assert_called_once_with(("test.mosquitto.org", 1883))
|
|
|
|
# Verify CONNECT packet was sent
|
|
mock_sock.send.assert_called_once()
|
|
|
|
# Verify result
|
|
assert result == 0
|
|
assert mqtt_client.connected is True
|
|
assert mqtt_client.sock is mock_sock
|
|
|
|
@patch("socket.socket")
|
|
def test_connect_timeout(self, mock_socket, mqtt_client):
|
|
"""Test connection with timeout."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Mock the _recv_packet method to return None (simulating timeout)
|
|
with patch.object(mqtt_client, "_recv_packet", return_value=(None, None)):
|
|
# Call connect
|
|
result = mqtt_client.connect()
|
|
|
|
# Verify socket was created and connected
|
|
mock_socket.assert_called_once()
|
|
mock_sock.connect.assert_called_once_with(("test.mosquitto.org", 1883))
|
|
|
|
# Verify CONNECT packet was sent
|
|
mock_sock.send.assert_called_once()
|
|
|
|
# Verify result indicates failure but doesn't crash
|
|
assert result == 1
|
|
assert mqtt_client.connected is False
|
|
|
|
@patch("socket.socket")
|
|
def test_connect_failure(self, mock_socket, mqtt_client):
|
|
"""Test connection failure to MQTT broker."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Mock the _recv_packet method to return a failed CONNACK
|
|
with patch.object(
|
|
mqtt_client, "_recv_packet", return_value=(CONNACK, b"\x00\x01")
|
|
):
|
|
# Call connect and verify it raises an exception
|
|
with pytest.raises(MQTTException, match="Connection refused: 1"):
|
|
mqtt_client.connect()
|
|
|
|
@patch("socket.socket")
|
|
def test_disconnect(self, mock_socket, mqtt_client):
|
|
"""Test disconnection from MQTT broker."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Set up the client as connected
|
|
mqtt_client.sock = mock_sock
|
|
mqtt_client.connected = True
|
|
|
|
# Call disconnect
|
|
mqtt_client.disconnect()
|
|
|
|
# Verify DISCONNECT packet was sent
|
|
mock_sock.send.assert_called_once()
|
|
|
|
# Verify socket was closed
|
|
mock_sock.close.assert_called_once()
|
|
|
|
# Verify client state
|
|
assert mqtt_client.connected is False
|
|
assert mqtt_client.sock is None
|
|
|
|
@patch("socket.socket")
|
|
def test_publish(self, mock_socket, mqtt_client):
|
|
"""Test publishing a message to a topic."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Set up the client as connected
|
|
mqtt_client.sock = mock_sock
|
|
mqtt_client.connected = True
|
|
# Set last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Call publish with QoS 0
|
|
mqtt_client.publish("test/topic", "test message")
|
|
|
|
# Verify PUBLISH packet was sent
|
|
mock_sock.send.assert_called_once()
|
|
|
|
# Test with QoS 1
|
|
mock_sock.reset_mock()
|
|
# Reset last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Mock the _recv_packet method instead of directly mocking socket.recv
|
|
with patch.object(
|
|
mqtt_client, "_recv_packet", return_value=(PUBACK, b"\x00\x01")
|
|
):
|
|
mqtt_client.publish("test/topic", "test message", qos=1)
|
|
|
|
# Verify PUBLISH packet was sent
|
|
assert mock_sock.send.call_count == 1
|
|
|
|
# Test with QoS 1 and timeout
|
|
mock_sock.reset_mock()
|
|
# Reset last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Mock _recv_packet to return None (simulating timeout)
|
|
with patch.object(mqtt_client, "_recv_packet", return_value=(None, None)):
|
|
# This should not raise an exception
|
|
mqtt_client.publish("test/topic", "test message", qos=1)
|
|
|
|
# Verify PUBLISH packet was still sent
|
|
assert mock_sock.send.call_count == 1
|
|
|
|
@patch("socket.socket")
|
|
def test_subscribe(self, mock_socket, mqtt_client):
|
|
"""Test subscribing to a topic."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Set up the client as connected
|
|
mqtt_client.sock = mock_sock
|
|
mqtt_client.connected = True
|
|
# Set last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Mock the _recv_packet method to return a successful SUBACK
|
|
with patch.object(
|
|
mqtt_client, "_recv_packet", return_value=(SUBACK, b"\x00\x01\x00")
|
|
):
|
|
# Call subscribe
|
|
mqtt_client.subscribe("test/topic")
|
|
|
|
# Verify SUBSCRIBE packet was sent
|
|
mock_sock.send.assert_called_once()
|
|
|
|
# Verify subscription was stored
|
|
assert "test/topic" in mqtt_client.subscriptions
|
|
assert mqtt_client.subscriptions["test/topic"] == 0
|
|
|
|
# Test with timeout
|
|
mock_sock.reset_mock()
|
|
# Reset last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Mock _recv_packet to return None (simulating timeout)
|
|
with patch.object(mqtt_client, "_recv_packet", return_value=(None, None)):
|
|
# This should not raise an exception
|
|
mqtt_client.subscribe("test/timeout")
|
|
|
|
# Verify SUBSCRIBE packet was still sent
|
|
assert mock_sock.send.call_count == 1
|
|
|
|
# Verify subscription was still stored
|
|
assert "test/timeout" in mqtt_client.subscriptions
|
|
|
|
@patch("socket.socket")
|
|
def test_check_msg(self, mock_socket, mqtt_client):
|
|
"""Test checking for messages."""
|
|
# Configure the mock socket
|
|
mock_sock = MagicMock()
|
|
mock_socket.return_value = mock_sock
|
|
|
|
# Set up the client as connected
|
|
mqtt_client.sock = mock_sock
|
|
mqtt_client.connected = True
|
|
# Set last_ping to current time to prevent ping from being triggered
|
|
mqtt_client.last_ping = time.time()
|
|
|
|
# Set up a mock callback
|
|
mock_callback = MagicMock()
|
|
mqtt_client.set_callback(mock_callback)
|
|
|
|
# Prepare the topic and message
|
|
topic = "test/topic"
|
|
message = "test message"
|
|
topic_encoded = struct.pack("!H", len(topic)) + topic.encode()
|
|
payload = topic_encoded + message.encode()
|
|
|
|
# Mock the _recv_packet method to return a PUBLISH packet
|
|
with patch.object(mqtt_client, "_recv_packet", return_value=(PUBLISH, payload)):
|
|
# Call check_msg
|
|
mqtt_client.check_msg()
|
|
|
|
# Verify callback was called with correct parameters
|
|
mock_callback.assert_called_once_with(topic.encode(), message.encode())
|
|
|
|
def test_set_callback(self, mqtt_client):
|
|
"""Test setting a callback function."""
|
|
# Create a mock callback
|
|
mock_callback = MagicMock()
|
|
|
|
# Set the callback
|
|
mqtt_client.set_callback(mock_callback)
|
|
|
|
# Verify callback was set
|
|
assert mqtt_client.callback is mock_callback
|