mirror of
https://github.com/OMGeeky/homecontrol.esp-sensors.git
synced 2025-12-26 17:02:29 +01:00
refactor: update MQTT client tests to use mock methods for packet handling
This commit is contained in:
@@ -53,14 +53,14 @@ class TestMQTTClient:
|
||||
# First call should return 1
|
||||
assert mqtt_client._generate_packet_id() == 1
|
||||
assert mqtt_client.pid == 1
|
||||
|
||||
|
||||
# Second call should return 2
|
||||
assert mqtt_client._generate_packet_id() == 2
|
||||
assert mqtt_client.pid == 2
|
||||
|
||||
|
||||
# Set pid to 65535 (max value)
|
||||
mqtt_client.pid = 65535
|
||||
|
||||
|
||||
# Next call should wrap around to 0
|
||||
assert mqtt_client._generate_packet_id() == 0
|
||||
assert mqtt_client.pid == 0
|
||||
@@ -69,11 +69,11 @@ class TestMQTTClient:
|
||||
"""Test that _encode_length correctly encodes MQTT remaining length."""
|
||||
# Test small length (< 128)
|
||||
assert list(mqtt_client._encode_length(64)) == [64]
|
||||
|
||||
|
||||
# Test medium length (128-16383)
|
||||
assert list(mqtt_client._encode_length(128)) == [128 & 0x7F | 0x80, 1]
|
||||
assert list(mqtt_client._encode_length(8192)) == [0x80, 0x40]
|
||||
|
||||
|
||||
# Test large length (16384-2097151)
|
||||
assert list(mqtt_client._encode_length(2097151)) == [0xFF, 0xFF, 0x7F]
|
||||
|
||||
@@ -84,7 +84,7 @@ class TestMQTTClient:
|
||||
assert len(result) == 6 # 2 bytes length + 4 bytes string
|
||||
assert result[0:2] == b'\x00\x04' # Length (4) in network byte order
|
||||
assert result[2:] == b'test' # String content
|
||||
|
||||
|
||||
# Test with bytes input
|
||||
result = mqtt_client._encode_string(b"test")
|
||||
assert len(result) == 6
|
||||
@@ -97,28 +97,23 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
# Configure the mock socket to return a successful CONNACK
|
||||
mock_sock.recv.side_effect = [
|
||||
b'\x20', # CONNACK packet type
|
||||
b'\x02', # Remaining length
|
||||
b'\x00\x00' # Session present flag (0) + return code (0 = accepted)
|
||||
]
|
||||
|
||||
# Call connect
|
||||
result = mqtt_client.connect()
|
||||
|
||||
# Verify socket was created and connected
|
||||
mock_socket.assert_called_once()
|
||||
mock_sock.connect.assert_called_once_with(("test.mosquitto.org", 1883))
|
||||
|
||||
# Verify CONNECT packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
# Verify result
|
||||
assert result == 0
|
||||
assert mqtt_client.connected is True
|
||||
assert mqtt_client.sock is mock_sock
|
||||
|
||||
# Mock the _recv_packet method to return a successful CONNACK
|
||||
with patch.object(mqtt_client, '_recv_packet', return_value=(CONNACK, b'\x00\x00')):
|
||||
# Call connect
|
||||
result = mqtt_client.connect()
|
||||
|
||||
# Verify socket was created and connected
|
||||
mock_socket.assert_called_once()
|
||||
mock_sock.connect.assert_called_once_with(("test.mosquitto.org", 1883))
|
||||
|
||||
# Verify CONNECT packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
# Verify result
|
||||
assert result == 0
|
||||
assert mqtt_client.connected is True
|
||||
assert mqtt_client.sock is mock_sock
|
||||
|
||||
@patch('socket.socket')
|
||||
def test_connect_failure(self, mock_socket, mqtt_client):
|
||||
@@ -126,17 +121,12 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
# Configure the mock socket to return a failed CONNACK
|
||||
mock_sock.recv.side_effect = [
|
||||
b'\x20', # CONNACK packet type
|
||||
b'\x02', # Remaining length
|
||||
b'\x00\x01' # Session present flag (0) + return code (1 = refused, protocol version)
|
||||
]
|
||||
|
||||
# Call connect and verify it raises an exception
|
||||
with pytest.raises(MQTTException, match="Connection refused: 1"):
|
||||
mqtt_client.connect()
|
||||
|
||||
# Mock the _recv_packet method to return a failed CONNACK
|
||||
with patch.object(mqtt_client, '_recv_packet', return_value=(CONNACK, b'\x00\x01')):
|
||||
# Call connect and verify it raises an exception
|
||||
with pytest.raises(MQTTException, match="Connection refused: 1"):
|
||||
mqtt_client.connect()
|
||||
|
||||
@patch('socket.socket')
|
||||
def test_disconnect(self, mock_socket, mqtt_client):
|
||||
@@ -144,20 +134,20 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
|
||||
# Set up the client as connected
|
||||
mqtt_client.sock = mock_sock
|
||||
mqtt_client.connected = True
|
||||
|
||||
|
||||
# Call disconnect
|
||||
mqtt_client.disconnect()
|
||||
|
||||
|
||||
# Verify DISCONNECT packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
|
||||
# Verify socket was closed
|
||||
mock_sock.close.assert_called_once()
|
||||
|
||||
|
||||
# Verify client state
|
||||
assert mqtt_client.connected is False
|
||||
assert mqtt_client.sock is None
|
||||
@@ -168,30 +158,27 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
|
||||
# Set up the client as connected
|
||||
mqtt_client.sock = mock_sock
|
||||
mqtt_client.connected = True
|
||||
mqtt_client.last_ping = 0
|
||||
|
||||
# Call publish
|
||||
|
||||
# Call publish with QoS 0
|
||||
mqtt_client.publish("test/topic", "test message")
|
||||
|
||||
|
||||
# Verify PUBLISH packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
|
||||
# Test with QoS 1
|
||||
mock_sock.reset_mock()
|
||||
mock_sock.recv.side_effect = [
|
||||
b'\x40', # PUBACK packet type
|
||||
b'\x02', # Remaining length
|
||||
b'\x00\x01' # Packet ID
|
||||
]
|
||||
|
||||
mqtt_client.publish("test/topic", "test message", qos=1)
|
||||
|
||||
# Verify PUBLISH packet was sent
|
||||
assert mock_sock.send.call_count == 1
|
||||
|
||||
# Mock the _recv_packet method instead of directly mocking socket.recv
|
||||
with patch.object(mqtt_client, '_recv_packet', return_value=(PUBACK, b'\x00\x01')):
|
||||
mqtt_client.publish("test/topic", "test message", qos=1)
|
||||
|
||||
# Verify PUBLISH packet was sent
|
||||
assert mock_sock.send.call_count == 1
|
||||
|
||||
@patch('socket.socket')
|
||||
def test_subscribe(self, mock_socket, mqtt_client):
|
||||
@@ -199,28 +186,23 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
|
||||
# Set up the client as connected
|
||||
mqtt_client.sock = mock_sock
|
||||
mqtt_client.connected = True
|
||||
mqtt_client.last_ping = 0
|
||||
|
||||
# Configure the mock socket to return a successful SUBACK
|
||||
mock_sock.recv.side_effect = [
|
||||
b'\x90', # SUBACK packet type
|
||||
b'\x03', # Remaining length
|
||||
b'\x00\x01\x00' # Packet ID + return code (0 = success)
|
||||
]
|
||||
|
||||
# Call subscribe
|
||||
mqtt_client.subscribe("test/topic")
|
||||
|
||||
# Verify SUBSCRIBE packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
# Verify subscription was stored
|
||||
assert "test/topic" in mqtt_client.subscriptions
|
||||
assert mqtt_client.subscriptions["test/topic"] == 0
|
||||
|
||||
# Mock the _recv_packet method to return a successful SUBACK
|
||||
with patch.object(mqtt_client, '_recv_packet', return_value=(SUBACK, b'\x00\x01\x00')):
|
||||
# Call subscribe
|
||||
mqtt_client.subscribe("test/topic")
|
||||
|
||||
# Verify SUBSCRIBE packet was sent
|
||||
mock_sock.send.assert_called_once()
|
||||
|
||||
# Verify subscription was stored
|
||||
assert "test/topic" in mqtt_client.subscriptions
|
||||
assert mqtt_client.subscriptions["test/topic"] == 0
|
||||
|
||||
@patch('socket.socket')
|
||||
def test_check_msg(self, mock_socket, mqtt_client):
|
||||
@@ -228,39 +210,37 @@ class TestMQTTClient:
|
||||
# Configure the mock socket
|
||||
mock_sock = MagicMock()
|
||||
mock_socket.return_value = mock_sock
|
||||
|
||||
|
||||
# Set up the client as connected
|
||||
mqtt_client.sock = mock_sock
|
||||
mqtt_client.connected = True
|
||||
mqtt_client.last_ping = 0
|
||||
|
||||
|
||||
# Set up a mock callback
|
||||
mock_callback = MagicMock()
|
||||
mqtt_client.set_callback(mock_callback)
|
||||
|
||||
# Configure the mock socket to return a PUBLISH packet
|
||||
|
||||
# Prepare the topic and message
|
||||
topic = "test/topic"
|
||||
message = "test message"
|
||||
topic_encoded = struct.pack("!H", len(topic)) + topic.encode()
|
||||
mock_sock.recv.side_effect = [
|
||||
bytes([PUBLISH]), # PUBLISH packet type
|
||||
bytes([len(topic_encoded) + len(message)]), # Remaining length
|
||||
topic_encoded + message.encode() # Topic + message
|
||||
]
|
||||
|
||||
# Call check_msg
|
||||
mqtt_client.check_msg()
|
||||
|
||||
# Verify callback was called with correct parameters
|
||||
mock_callback.assert_called_once_with(topic.encode(), message.encode())
|
||||
payload = topic_encoded + message.encode()
|
||||
|
||||
# Mock the _recv_packet method to return a PUBLISH packet
|
||||
with patch.object(mqtt_client, '_recv_packet', return_value=(PUBLISH, payload)):
|
||||
# Call check_msg
|
||||
mqtt_client.check_msg()
|
||||
|
||||
# Verify callback was called with correct parameters
|
||||
mock_callback.assert_called_once_with(topic.encode(), message.encode())
|
||||
|
||||
def test_set_callback(self, mqtt_client):
|
||||
"""Test setting a callback function."""
|
||||
# Create a mock callback
|
||||
mock_callback = MagicMock()
|
||||
|
||||
|
||||
# Set the callback
|
||||
mqtt_client.set_callback(mock_callback)
|
||||
|
||||
|
||||
# Verify callback was set
|
||||
assert mqtt_client.callback is mock_callback
|
||||
assert mqtt_client.callback is mock_callback
|
||||
|
||||
Reference in New Issue
Block a user