add configuration management and MQTT update handling

This commit is contained in:
OMGeeky
2025-06-05 17:15:39 +02:00
parent f9c729c8bd
commit 0d42fc2cb9
3 changed files with 296 additions and 46 deletions

View File

@@ -13,9 +13,14 @@ DEFAULT_CONFIG_PATH = "config.json"
# Default configuration values
DEFAULT_CONFIG = {
"device_id": "livingroom",
"device_name": "Wohnzimmer",
"update_interval": 60,
"version": 1,
"sensors": {
"dht22": {
"name": "DHT22 Sensor",
"id": "wohnzimmer-dht22",
"name": "Wohnzimmer",
"pin": 16,
"interval": 60,
"temperature": {"name": "DHT22 Temperature", "unit": "C"},
@@ -25,12 +30,14 @@ DEFAULT_CONFIG = {
"displays": {
"oled": {
"name": "OLED Display",
"enabled": True,
"always_on": False,
"scl_pin": 22,
"sda_pin": 21,
"width": 128,
"height": 64,
"address": "0x3C",
"interval": 1,
"interval": 5,
}
},
"buttons": {"main_button": {"pin": 0, "pull_up": True}},
@@ -38,11 +45,12 @@ DEFAULT_CONFIG = {
"enabled": False,
"broker": "mqtt.example.com",
"port": 1883,
"client_id": "esp_sensor",
"client_id": "{device_id}",
"username": "",
"password": "",
"topic_prefix": "esp/sensors",
"publish_interval": 60, # seconds
"load_config_from_mqtt": True,
"topic_config": "/homecontrol/{device_id}/config",
"topic_data_prefix": "/homecontrol/{device_id}/data",
"ssl": False,
"keepalive": 60,
},
@@ -50,9 +58,71 @@ DEFAULT_CONFIG = {
"ssid": "<your ssid>",
"password": "<your password>",
"timeout": 10,
},
"network_fallback": {
"ssid": "<your fallback ssid>",
"password": "<your fallback password>",
"timeout": 10,
}
}
class Config:
"""
Configuration class to manage loading and saving configuration settings.
"""
def __init__(self, config_path: str = DEFAULT_CONFIG_PATH):
self.current_version = None
self.update_interval = None
self.device_name = None
self.device_id = None
self.network_fallback_config = None
self.network_config = None
self.display_config = None
self.dht_config = None
self.mqtt_config = None
self.config_path = config_path
self.config = self.load_config()
# Get configuration sections
config = self.config
self.update_configs(config)
def update_configs(self, config):
self.mqtt_config = get_mqtt_config(config)
self.dht_config = get_sensor_config("dht22", config)
self.display_config = get_display_config("oled", config)
self.network_config = config.get("network", {})
self.network_fallback_config = config.get("network_fallback", {})
# Get device information and update interval
self.device_id = config.get("device_id", "esp_sensor")
self.device_name = config.get("device_name", "ESP Sensor")
self.update_interval = config.get("update_interval", 60)
self.current_version = config.get("version", 0)
def load_config(self) -> dict:
"""
Load configuration from the specified file.
Returns:
A dictionary containing the configuration
"""
return load_config(self.config_path)
def save_config(self, config: dict) -> bool:
"""
Save the provided configuration to the file.
Args:
config: Configuration dictionary to save
Returns:
True if saving was successful, False otherwise
"""
self.config = config
self.update_configs(config)
return save_config(config, self.config_path)
def load_config(config_path: str = DEFAULT_CONFIG_PATH) :
"""
@@ -166,4 +236,93 @@ def get_mqtt_config(config: dict | None = None) -> dict:
if mqtt_config is None:
mqtt_config = DEFAULT_CONFIG.get("mqtt", {})
# Replace {device_id} placeholders in MQTT configuration
device_id = config.get("device_id", DEFAULT_CONFIG.get("device_id", "esp_sensor"))
mqtt_config = replace_device_id_placeholders(mqtt_config, device_id)
return mqtt_config
def replace_device_id_placeholders(config_section: dict, device_id: str) -> dict:
"""
Replace {device_id} placeholders in configuration values.
Args:
config_section: Configuration section to process
device_id: Device ID to use for replacement
Returns:
Configuration section with placeholders replaced
"""
result = {}
for key, value in config_section.items():
if isinstance(value, str) and "{device_id}" in value:
result[key] = value.replace("{device_id}", device_id)
else:
result[key] = value
return result
def save_config(config: dict, config_path: str = DEFAULT_CONFIG_PATH) -> bool:
"""
Save configuration to a JSON file.
Args:
config: Configuration dictionary to save
config_path: Path to the configuration file (default: config.json)
Returns:
True if saving was successful, False otherwise
"""
try:
with open(config_path, "w") as f:
json.dump(config, f, indent=4)
print(f"Configuration saved to '{config_path}'")
return True
except Exception as e:
print(f"Error saving configuration: {e}")
return False
def check_and_update_config_from_mqtt(mqtt_client, mqtt_config: dict, current_config: dict) -> dict:
"""
Check for configuration updates from MQTT and update local configuration if needed.
Args:
mqtt_client: MQTT client instance
mqtt_config: MQTT configuration dictionary
current_config: Current configuration dictionary
Returns:
Updated configuration dictionary if an update was found, otherwise the current configuration
"""
if not mqtt_config.get("load_config_from_mqtt", False):
print("Loading config from MQTT is disabled")
return current_config
try:
# Get the configuration topic
topic_config = mqtt_config.get("topic_config")
if not topic_config:
print("No configuration topic specified")
return current_config
# Subscribe to the configuration topic
print(f"Subscribing to configuration topic: {topic_config}")
# This is a simplified implementation - in a real implementation, we would
# set up a callback to handle the message and wait for it to be received
# For now, we'll just return the current configuration
# In a real implementation, we would:
# 1. Subscribe to the topic
# 2. Wait for a message (with timeout)
# 3. Parse the message as JSON
# 4. Check if the version is newer than the current version
# 5. If it is, update the local configuration and save it
print("MQTT configuration update check not implemented yet")
return current_config
except Exception as e:
print(f"Error checking for configuration updates: {e}")
return current_config

View File

@@ -114,19 +114,19 @@ def publish_sensor_data(
return False
try:
topic_prefix = mqtt_config.get("topic_prefix", "esp/sensors")
sensor_name = sensor.name.lower().replace(" ", "_")
topic_data_prefix = mqtt_config.get("topic_data_prefix", "/homecontrol/device/data")
sensor_id = getattr(sensor, "id", sensor.name.lower().replace(" ", "_"))
# Publish temperature
temp_topic = f"{topic_prefix}/{sensor_name}/temperature"
temp_topic = f"{topic_data_prefix}/{sensor_id}/temperature"
client.publish(temp_topic, str(temperature).encode())
# Publish humidity
humidity_topic = f"{topic_prefix}/{sensor_name}/humidity"
humidity_topic = f"{topic_data_prefix}/{sensor_id}/humidity"
client.publish(humidity_topic, str(humidity).encode())
# Publish combined data as JSON
data_topic = f"{topic_prefix}/{sensor_name}/data"
data_topic = f"{topic_data_prefix}/{sensor_id}/data"
data = {
"temperature": temperature,
"humidity": humidity,
@@ -142,3 +142,65 @@ def publish_sensor_data(
except Exception as e:
print(f"Failed to publish to MQTT: {e}")
return False
def subscribe_to_config(client: MQTTClient | None, mqtt_config: dict) -> bool:
"""
Subscribe to the configuration topic.
Args:
client: MQTTClient instance
mqtt_config: MQTT configuration dictionary
Returns:
True if subscription was successful, False otherwise
"""
if client is None:
return False
try:
topic_config = mqtt_config.get("topic_config")
if not topic_config:
print("No configuration topic specified")
return False
print(f"Subscribing to configuration topic: {topic_config}")
client.subscribe(topic_config.encode())
return True
except Exception as e:
print(f"Failed to subscribe to configuration topic: {e}")
return False
def check_config_update(client: MQTTClient | None, mqtt_config: dict, current_config: dict) -> dict:
"""
Check for configuration updates from MQTT.
Args:
client: MQTTClient instance
mqtt_config: MQTT configuration dictionary
current_config: Current configuration dictionary
Returns:
Updated configuration dictionary if an update was found, otherwise the current configuration
"""
if client is None or not mqtt_config.get("load_config_from_mqtt", False):
return current_config
try:
# Subscribe to the configuration topic
if not subscribe_to_config(client, mqtt_config):
return current_config
# In a real implementation, we would:
# 1. Set up a callback to handle the message
# 2. Wait for a message (with timeout)
# 3. Parse the message as JSON
# 4. Check if the version is newer than the current version
# 5. If it is, update the local configuration and save it
print("MQTT configuration update check not implemented yet")
return current_config
except Exception as e:
print(f"Error checking for configuration updates: {e}")
return current_config

View File

@@ -13,14 +13,16 @@ import time
from esp_sensors.oled_display import OLEDDisplay
from esp_sensors.dht22 import DHT22Sensor
from esp_sensors.mqtt import setup_mqtt, publish_sensor_data
from esp_sensors.mqtt import setup_mqtt, publish_sensor_data, check_config_update
from esp_sensors.config import (
load_config,
get_button_config,
get_sensor_config,
get_display_config,
get_mqtt_config,
save_config,
)
from src.esp_sensors.config import Config
# Import hardware-specific modules if available (for ESP32/ESP8266)
try:
@@ -84,37 +86,56 @@ def main():
last_read_time = time.time() # this is to make sure, the sleep time is correct
# Load configuration
config = load_config()
# print('config: ', config)
# button_config = get_button_config("main_button", config)
mqtt_config = get_mqtt_config(config)
dht_config = get_sensor_config("dht22", config)
display_config = get_display_config("oled", config)
network_config = config.get("network", {})
# Initialize a DHT22 sensor using configuration
dht_sensor = DHT22Sensor(sensor_config=dht_config)
config = Config()
# Initialize an OLED display using configuration
display = OLEDDisplay(display_config=display_config)
# display.clear()
name_str = f"N: {dht_sensor.name}"
display.set_header(name_str)
display = OLEDDisplay(display_config=config.display_config)
display.clear()
display.set_header(f"Device: {config.device_name}")
display.set_status("Initializing...")
# mqtt_client = None
mqtt_enabled = mqtt_config.get("enabled", False)
mqtt_publish_interval = mqtt_config.get("publish_interval", 60)
# Check if we need to update configuration from MQTT
mqtt_enabled = config.mqtt_config.get("enabled", False)
load_config_from_mqtt = config.mqtt_config.get("load_config_from_mqtt", False)
# # Set up button using configuration
# button_pin = button_config.get("pin", 0)
# if not SIMULATION:
# pull_up = button_config.get("pull_up", True)
# button = Pin(button_pin, Pin.IN, Pin.PULL_UP if pull_up else None)
#
print(f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press...")
if mqtt_enabled and load_config_from_mqtt:
display.set_status("Checking for config updates...")
# Connect to WiFi
if connect_wifi(config.network_config, config.network_fallback_config):
# Set up MQTT client
mqtt_client = setup_mqtt(config.mqtt_config)
if mqtt_client:
# Check for configuration updates
display.set_status("Checking MQTT config...")
updated_config = check_config_update(mqtt_client, config.mqtt_config, config.config)
# Disconnect MQTT client after checking for updates
try:
mqtt_client.disconnect()
except Exception as e:
print(f"Error disconnecting MQTT client: {e}")
# If we got an updated configuration with a newer version, save it
if updated_config != config.config and updated_config.get("version", 0) > config.current_version:
display.set_status("Updating config...")
print(f"Found newer configuration (version {updated_config.get('version')}), updating...")
config.save_config(updated_config)
# Initialize a DHT22 sensor using configuration
dht_sensor = DHT22Sensor(sensor_config=config.dht_config)
# Update display header with sensor name
name_str = f"N: {dht_sensor.name}"
display.set_header(name_str)
# Set up MQTT
mqtt_enabled = config.mqtt_config.get("enabled", False)
print(f"System initialized. Will run every {config.update_interval} seconds...")
mqtt_client = None
# Main loop - sleep until button press, then read and display sensor data
try:
# while True:
@@ -150,16 +171,16 @@ def main():
if mqtt_enabled:
# Initialize wifi connection
display.set_status("Connecting WiFi...")
connect_wifi(network_config)
connect_wifi(config.network_config, config.network_fallback_config)
# Set up MQTT client if enabled
display.set_status("Setting up MQTT...")
print(f"MQTT enabled: {mqtt_enabled}, broker: {mqtt_config.get('broker')}")
mqtt_client = setup_mqtt(mqtt_config)
print(f"MQTT enabled: {mqtt_enabled}, broker: {config.mqtt_config.get('broker')}")
mqtt_client = setup_mqtt(config.mqtt_config)
display.set_status("Publishing to MQTT...")
print(f"Publishing sensor data to MQTT at {mqtt_config.get('broker')}:{mqtt_config.get('port')}")
print(f"Publishing sensor data to MQTT at {config.mqtt_config.get('broker')}:{config.mqtt_config.get('port')}")
# display.display_values([mqtt_client.server, mqtt_client.port])
publish_sensor_data(mqtt_client, mqtt_config, dht_sensor, temperature, humidity)
publish_sensor_data(mqtt_client, config.mqtt_config, dht_sensor, temperature, humidity)
print("Sensor data published to MQTT")
try:
if mqtt_client:
@@ -173,7 +194,10 @@ def main():
# sleep, to be able to do something, before going into deepsleep
time.sleep(display.on_time)
time_until_next_read = mqtt_publish_interval - (time.time() - last_read_time)
time_until_next_read = config.update_interval - (time.time() - last_read_time)
if time_until_next_read < 0:
time_until_next_read = config.update_interval
display.set_status(f"Sleeping {time_until_next_read}s")
print('sleeping for', time_until_next_read, 'seconds')
if not SIMULATION:
@@ -201,7 +225,7 @@ def main():
print("Program terminated by user")
def connect_wifi(network_config: dict):
def connect_wifi(network_config: dict, fallback_config: dict = None):
import network
ssid = network_config.get("ssid")
password = network_config.get("password")
@@ -220,8 +244,15 @@ def connect_wifi(network_config: dict):
# Check if connection attempt has timed out
if time.time() - connection_start_time > timeout:
print("Connection timed out")
# Try fallback network if available
if fallback_config:
print("Trying fallback network")
return connect_wifi(fallback_config)
return False
pass
time.sleep(1)
print('Connection successful')
print(station.ifconfig())
return True
@@ -234,5 +265,3 @@ if __name__ == "__main__":
print(f"An error occurred: {e}")
time.sleep(5) # give time to read the error message and respond
deepsleep(1) # dummy deepsleep to basically reset the system