basic working mqtt setup

This commit is contained in:
OMGeeky
2025-05-07 22:54:26 +02:00
parent 13d7e4a220
commit 855510d474
4 changed files with 312 additions and 105 deletions

View File

@@ -67,6 +67,8 @@ The JSON payload published to the `data` topic has the following format:
## Usage
### Automatic Usage
Once configured, the ESP device will automatically publish sensor data to the MQTT broker at the specified interval. No additional code is required.
To enable MQTT publishing:
@@ -75,10 +77,35 @@ To enable MQTT publishing:
2. Configure the MQTT broker address and credentials.
3. Restart the ESP device.
### Programmatic Usage
The MQTT functionality is available as a module that can be imported and used in your own code:
```python
from esp_sensors.mqtt import setup_mqtt, publish_sensor_data
from esp_sensors.config import get_mqtt_config, load_config
# Load MQTT configuration
config = load_config()
mqtt_config = get_mqtt_config(config)
# Set up MQTT client
mqtt_client = setup_mqtt(mqtt_config)
# Publish sensor data
if mqtt_client:
publish_sensor_data(mqtt_client, mqtt_config, sensor, temperature, humidity)
```
The module provides the following functions:
- `setup_mqtt(mqtt_config)`: Sets up and connects to the MQTT broker using the provided configuration. Returns an MQTT client if successful, None otherwise.
- `publish_sensor_data(client, mqtt_config, sensor, temperature, humidity)`: Publishes sensor data to the MQTT broker. Returns True if successful, False otherwise.
## Simulation Mode
In simulation mode, MQTT messages are not actually sent to a broker but are printed to the console. This allows you to test your code without an actual MQTT broker.
## Dependencies
This feature requires the `micropython-umqtt.simple` package, which is included in the project's dependencies.
This feature requires the `micropython-umqtt.simple` package, which is included in the project's dependencies.

126
src/esp_sensors/mqtt.py Normal file
View File

@@ -0,0 +1,126 @@
"""
MQTT module for ESP sensors.
This module provides functionality to connect to an MQTT broker and publish sensor data.
It supports both real hardware and simulation mode.
"""
import time
import json
from typing import Dict, Any, Optional, Union
# Import hardware-specific modules if available (for ESP32/ESP8266)
try:
from umqtt.simple import MQTTClient
SIMULATION = False
except ImportError:
# Simulation mode for development on non-ESP hardware
SIMULATION = True
print("[MQTT] Running in simulation mode - MQTT messages will be printed to console")
# Mock MQTT client for simulation
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False):
self.client_id = client_id
self.server = server
self.port = port
self.user = user
self.password = password
self.keepalive = keepalive
self.ssl = ssl
self.connected = False
def connect(self):
print(f"[MQTT] Connecting to {self.server}:{self.port} as {self.client_id}")
self.connected = True
return 0
def disconnect(self):
print("[MQTT] Disconnected")
self.connected = False
def publish(self, topic, msg):
print(f"[MQTT] Publishing to {topic}: {msg}")
return
def setup_mqtt(mqtt_config: Dict[str, Any]) -> Optional[MQTTClient]:
"""
Set up and connect to the MQTT broker.
Args:
mqtt_config: MQTT configuration dictionary
Returns:
MQTTClient instance if enabled and connected, None otherwise
"""
if not mqtt_config.get("enabled", False):
print("MQTT is disabled in configuration")
return None
try:
client_id = mqtt_config.get("client_id", "esp_sensor")
broker = mqtt_config.get("broker", "mqtt.example.com")
port = mqtt_config.get("port", 1883)
username = mqtt_config.get("username", "")
password = mqtt_config.get("password", "")
keepalive = mqtt_config.get("keepalive", 60)
ssl = mqtt_config.get("ssl", False)
print(f"Setting up MQTT client: {client_id} -> {broker}:{port}")
client = MQTTClient(client_id, broker, port, username, password, keepalive, ssl)
# Try to connect
client.connect()
print("MQTT connected successfully")
return client
except Exception as e:
print(f"Failed to connect to MQTT broker: {e}")
return None
def publish_sensor_data(client: Optional[MQTTClient], mqtt_config: Dict[str, Any],
sensor: Any, temperature: float, humidity: float) -> bool:
"""
Publish sensor data to MQTT topics.
Args:
client: MQTTClient instance
mqtt_config: MQTT configuration dictionary
sensor: Sensor instance
temperature: Temperature reading
humidity: Humidity reading
Returns:
True if publishing was successful, False otherwise
"""
if client is None:
return False
try:
topic_prefix = mqtt_config.get("topic_prefix", "esp/sensors")
sensor_name = sensor.name.lower().replace(" ", "_")
# Publish temperature
temp_topic = f"{topic_prefix}/{sensor_name}/temperature"
client.publish(temp_topic, str(temperature).encode())
# Publish humidity
humidity_topic = f"{topic_prefix}/{sensor_name}/humidity"
client.publish(humidity_topic, str(humidity).encode())
# Publish combined data as JSON
data_topic = f"{topic_prefix}/{sensor_name}/data"
data = {
"temperature": temperature,
"humidity": humidity,
"timestamp": time.time(),
"unit": sensor.unit
}
client.publish(data_topic, json.dumps(data).encode())
print(f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}")
return True
except Exception as e:
print(f"Failed to publish to MQTT: {e}")
return False

View File

@@ -14,6 +14,7 @@ import sys
from esp_sensors.oled_display import OLEDDisplay
from esp_sensors.dht22 import DHT22Sensor
from esp_sensors.mqtt import setup_mqtt, publish_sensor_data
from esp_sensors.config import (
load_config,
get_button_config,
@@ -26,116 +27,12 @@ from esp_sensors.config import (
try:
from machine import Pin, deepsleep
import esp32
from umqtt.simple import MQTTClient
SIMULATION = False
except ImportError:
# Simulation mode for development on non-ESP hardware
SIMULATION = True
print("Running in simulation mode - hardware functions will be simulated")
# Mock MQTT client for simulation
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False):
self.client_id = client_id
self.server = server
self.port = port
self.user = user
self.password = password
self.keepalive = keepalive
self.ssl = ssl
self.connected = False
def connect(self):
print(f"[MQTT] Connecting to {self.server}:{self.port} as {self.client_id}")
self.connected = True
return 0
def disconnect(self):
print("[MQTT] Disconnected")
self.connected = False
def publish(self, topic, msg):
print(f"[MQTT] Publishing to {topic}: {msg}")
return
def setup_mqtt(mqtt_config):
"""
Set up and connect to the MQTT broker.
Args:
mqtt_config: MQTT configuration dictionary
Returns:
MQTTClient instance if enabled and connected, None otherwise
"""
if not mqtt_config.get("enabled", False):
print("MQTT is disabled in configuration")
return None
try:
client_id = mqtt_config.get("client_id", "esp_sensor")
broker = mqtt_config.get("broker", "mqtt.example.com")
port = mqtt_config.get("port", 1883)
username = mqtt_config.get("username", "")
password = mqtt_config.get("password", "")
keepalive = mqtt_config.get("keepalive", 60)
ssl = mqtt_config.get("ssl", False)
print(f"Setting up MQTT client: {client_id} -> {broker}:{port}")
client = MQTTClient(client_id, broker, port, username, password, keepalive, ssl)
# Try to connect
client.connect()
print("MQTT connected successfully")
return client
except Exception as e:
print(f"Failed to connect to MQTT broker: {e}")
return None
def publish_sensor_data(client, mqtt_config, sensor, temperature, humidity):
"""
Publish sensor data to MQTT topics.
Args:
client: MQTTClient instance
mqtt_config: MQTT configuration dictionary
sensor: Sensor instance
temperature: Temperature reading
humidity: Humidity reading
"""
if client is None:
return
try:
topic_prefix = mqtt_config.get("topic_prefix", "esp/sensors")
sensor_name = sensor.name.lower().replace(" ", "_")
# Publish temperature
temp_topic = f"{topic_prefix}/{sensor_name}/temperature"
client.publish(temp_topic, str(temperature).encode())
# Publish humidity
humidity_topic = f"{topic_prefix}/{sensor_name}/humidity"
client.publish(humidity_topic, str(humidity).encode())
# Publish combined data as JSON
import json
data_topic = f"{topic_prefix}/{sensor_name}/data"
data = {
"temperature": temperature,
"humidity": humidity,
"timestamp": time.time(),
"unit": sensor.temperature_unit
}
client.publish(data_topic, json.dumps(data).encode())
print(f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}")
except Exception as e:
print(f"Failed to publish to MQTT: {e}")
def simulate_button_press():
"""Simulate a button press in simulation mode."""

157
tests/test_mqtt.py Normal file
View File

@@ -0,0 +1,157 @@
"""
Tests for the MQTT module.
"""
import pytest
import json
from unittest.mock import patch, MagicMock
from src.esp_sensors.mqtt import setup_mqtt, publish_sensor_data, MQTTClient
class TestSensor:
"""Mock sensor class for testing."""
__test__ = False # Prevent pytest from treating this as a test case
def __init__(self, name="Test Sensor", temperature_unit="C"):
self.name = name
self.temperature_unit = temperature_unit
@pytest.fixture
def mqtt_config():
"""Fixture providing a sample MQTT configuration."""
return {
"enabled": True,
"broker": "test.mosquitto.org",
"port": 1883,
"client_id": "test_client",
"username": "test_user",
"password": "test_pass",
"topic_prefix": "test/sensors",
"publish_interval": 30,
"ssl": False,
"keepalive": 60,
}
@pytest.fixture
def disabled_mqtt_config():
"""Fixture providing a disabled MQTT configuration."""
return {
"enabled": False,
"broker": "test.mosquitto.org",
}
@pytest.fixture
def mock_sensor():
"""Fixture providing a mock sensor."""
return TestSensor(name="DHT22 Sensor", temperature_unit="C")
def test_setup_mqtt_disabled(disabled_mqtt_config):
"""Test that setup_mqtt returns None when MQTT is disabled."""
client = setup_mqtt(disabled_mqtt_config)
assert client is None
def test_setup_mqtt_enabled(mqtt_config):
"""Test that setup_mqtt creates and connects a client when MQTT is enabled."""
with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client:
# Configure the mock
mock_client_instance = MagicMock()
mock_mqtt_client.return_value = mock_client_instance
# Call the function
client = setup_mqtt(mqtt_config)
# Verify MQTTClient was created with correct parameters
mock_mqtt_client.assert_called_once_with(
mqtt_config["client_id"],
mqtt_config["broker"],
mqtt_config["port"],
mqtt_config["username"],
mqtt_config["password"],
mqtt_config["keepalive"],
mqtt_config["ssl"]
)
# Verify connect was called
mock_client_instance.connect.assert_called_once()
# Verify the client was returned
assert client == mock_client_instance
def test_setup_mqtt_connection_error(mqtt_config):
"""Test that setup_mqtt handles connection errors gracefully."""
with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client:
# Configure the mock to raise an exception on connect
mock_client_instance = MagicMock()
mock_client_instance.connect.side_effect = Exception("Connection failed")
mock_mqtt_client.return_value = mock_client_instance
# Call the function
client = setup_mqtt(mqtt_config)
# Verify connect was called
mock_client_instance.connect.assert_called_once()
# Verify None was returned due to the error
assert client is None
def test_publish_sensor_data_success(mqtt_config, mock_sensor):
"""Test that publish_sensor_data publishes to the correct topics."""
# Create a mock client
mock_client = MagicMock()
# Call the function
temperature = 25.5
humidity = 60.0
result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, temperature, humidity)
# Verify the result
assert result is True
# Verify publish was called for temperature
temp_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/temperature"
mock_client.publish.assert_any_call(temp_topic, str(temperature).encode())
# Verify publish was called for humidity
humidity_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/humidity"
mock_client.publish.assert_any_call(humidity_topic, str(humidity).encode())
# Verify publish was called for combined data
data_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/data"
# Check that the JSON data was published
for call_args in mock_client.publish.call_args_list:
if call_args[0][0] == data_topic:
# Parse the JSON data
data = json.loads(call_args[0][1].decode())
assert data["temperature"] == temperature
assert data["humidity"] == humidity
assert data["unit"] == mock_sensor.temperature_unit
assert "timestamp" in data
break
else:
pytest.fail("Data topic was not published")
def test_publish_sensor_data_no_client(mqtt_config, mock_sensor):
"""Test that publish_sensor_data returns False when client is None."""
result = publish_sensor_data(None, mqtt_config, mock_sensor, 25.5, 60.0)
assert result is False
def test_publish_sensor_data_error(mqtt_config, mock_sensor):
"""Test that publish_sensor_data handles errors gracefully."""
# Create a mock client that raises an exception on publish
mock_client = MagicMock()
mock_client.publish.side_effect = Exception("Publish failed")
# Call the function
result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, 25.5, 60.0)
# Verify the result
assert result is False