make timer consistent, no matter if the button was pressed or not & some cleanup/formatting etc.

This commit is contained in:
OMGeeky
2025-05-07 23:36:49 +02:00
parent 065252513b
commit 8bc58e4056
4 changed files with 99 additions and 46 deletions

View File

@@ -12,15 +12,27 @@ from typing import Dict, Any, Optional, Union
# Import hardware-specific modules if available (for ESP32/ESP8266)
try:
from umqtt.simple import MQTTClient
SIMULATION = False
except ImportError:
# Simulation mode for development on non-ESP hardware
SIMULATION = True
print("[MQTT] Running in simulation mode - MQTT messages will be printed to console")
print(
"[MQTT] Running in simulation mode - MQTT messages will be printed to console"
)
# Mock MQTT client for simulation
class MQTTClient:
def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False):
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
):
self.client_id = client_id
self.server = server
self.port = port
@@ -79,8 +91,13 @@ def setup_mqtt(mqtt_config: Dict[str, Any]) -> Optional[MQTTClient]:
return None
def publish_sensor_data(client: Optional[MQTTClient], mqtt_config: Dict[str, Any],
sensor: Any, temperature: float, humidity: float) -> bool:
def publish_sensor_data(
client: Optional[MQTTClient],
mqtt_config: Dict[str, Any],
sensor: Any,
temperature: float,
humidity: float,
) -> bool:
"""
Publish sensor data to MQTT topics.
@@ -115,11 +132,13 @@ def publish_sensor_data(client: Optional[MQTTClient], mqtt_config: Dict[str, Any
"temperature": temperature,
"humidity": humidity,
"timestamp": time.time(),
"unit": sensor.unit
"unit": sensor.unit,
}
client.publish(data_topic, json.dumps(data).encode())
print(f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}")
print(
f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}"
)
return True
except Exception as e:
print(f"Failed to publish to MQTT: {e}")

View File

@@ -29,6 +29,7 @@ class OLEDDisplay(Sensor):
height: int = None,
address: int | str = None,
interval: int = None,
on_time: int = None,
display_config: Dict[str, Any] = None,
):
"""
@@ -42,6 +43,7 @@ class OLEDDisplay(Sensor):
height: Display height in pixels (if None, loaded from config)
address: I2C address of the display (if None, loaded from config)
interval: Refresh interval in seconds (if None, loaded from config)
on_time: The time, the display should stay on (if None, loaded from config)
display_config: Configuration dictionary
"""
@@ -65,6 +67,9 @@ class OLEDDisplay(Sensor):
self.sda_pin = sda_pin # Already set above
self.width = width if width is not None else display_config.get("width", 128)
self.height = height if height is not None else display_config.get("height", 64)
self.on_time = (
on_time if on_time is not None else display_config.get("on_time", 5)
)
# Handle address (could be string in config)
if address is None:

View File

@@ -27,6 +27,7 @@ from esp_sensors.config import (
try:
from machine import Pin, deepsleep
import esp32
SIMULATION = False
except ImportError:
# Simulation mode for development on non-ESP hardware
@@ -110,7 +111,9 @@ def main():
# Display initialization message
display.clear()
display.display_text("Ready - Auto & Button", 0, 0)
print(f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press...")
print(
f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press..."
)
# Main loop - sleep until button press, then read and display sensor data
try:
@@ -118,32 +121,36 @@ def main():
# Calculate time until next scheduled reading
current_time = time.time()
time_since_last_read = current_time - last_read_time
time_until_next_read = max(0, mqtt_publish_interval - int(time_since_last_read))
time_until_next_read = max(
0, mqtt_publish_interval - int(time_since_last_read)
)
# Wait for button press or until next scheduled reading
button_pressed = False
if SIMULATION:
# In simulation mode, wait for Enter key with timeout
if not simulate_button_press(timeout=time_until_next_read if last_read_time > 0 else None):
if not simulate_button_press(timeout=time_until_next_read):
break # Exit if 'q' was pressed or Ctrl+C
# If we get here, either button was pressed or timeout occurred
button_pressed = True
else:
# In hardware mode, check if button is pressed (active low)
if button.value() == 0: # Button is pressed
button_pressed = True
button_pressed_value = 0 if not pull_up else 1 # TODO: check if 1 is correct
if button.value() == button_pressed_value: # Button is pressed
pass
elif time_since_last_read >= mqtt_publish_interval:
# Time for scheduled reading
button_pressed = True
pass
else:
# Go to light sleep mode to save power
# Wake up on pin change (button press) or timer
print(f"Entering light sleep mode for {time_until_next_read:.1f} seconds or until button press...")
print(
f"Entering light sleep mode for {time_until_next_read:.1f} seconds or until button press..."
)
# Set up wake on button press
esp32.wake_on_ext0(pin=button, level=0) # Wake on button press (low)
esp32.wake_on_ext0(
pin=button, level=button_pressed_value
) # Wake on button press (low)
# Set up wake on timer
if last_read_time > 0: # Skip timer on first run
@@ -154,13 +161,15 @@ def main():
esp32.light_sleep() # Light sleep preserves RAM but saves power
# When we get here, either the button was pressed or the timer expired
button_pressed = True
print(f"Awake from light sleep")
# Determine if this was triggered by a button press or scheduled interval
if SIMULATION:
trigger_source = "user input or scheduled interval"
else:
trigger_source = "button press" if button.value() == 0 else "scheduled interval"
trigger_source = (
"button press" if button.value() == 0 else "scheduled interval"
)
print(f"Reading sensor data (triggered by {trigger_source})...")
# Read sensor values
@@ -169,6 +178,13 @@ def main():
# Update last read time
last_read_time = time.time()
# Adjust for actual sleep duration (so if we sleep longer or shorter (because of
# the button) than expected, we don't miss the next scheduled read)
actual_sleep_duration = last_read_time - current_time
last_read_time -= (
(actual_sleep_duration - time_until_next_read)
% mqtt_publish_interval
)
# Format values for display
temp_str = f"Temp: {temperature:.1f} C"
@@ -176,7 +192,9 @@ def main():
time_str = f"Time: {time.time():.0f}"
name_str = f"Sensor: {dht_sensor.name}"
# Display values
# TODO: only display values, if the button has been clicked
display.display_values(
[name_str, temp_str, hum_str, time_str, "Press button again"]
)
@@ -184,22 +202,30 @@ def main():
# Print to console
print(f"Updated display with: {temp_str}, {hum_str}")
# Publish to MQTT if enabled and interval has elapsed
# Publish to MQTT if enabled
current_time = time.time()
if mqtt_client and (current_time - last_publish_time >= mqtt_publish_interval):
publish_sensor_data(mqtt_client, mqtt_config, dht_sensor, temperature, humidity)
publish_sensor_data(
mqtt_client, mqtt_config, dht_sensor, temperature, humidity
)
if mqtt_client and (
current_time - last_publish_time >= mqtt_publish_interval
):
last_publish_time = current_time
print(f"Next MQTT publish in {mqtt_publish_interval} seconds")
print(f"Next normal MQTT publish in {mqtt_publish_interval} seconds")
# Keep display on for a few seconds before going back to sleep
time.sleep(5)
time.sleep(display.on_time)
# Clear display to save power
display.clear()
display.display_text("Ready - Auto & Button", 0, 0)
print("last_read_time", last_read_time)
if SIMULATION:
print(f"Display cleared. Will run again in {mqtt_publish_interval - (time.time() - last_read_time):.1f} seconds or on button press.")
print(
f"Display cleared. Will run again in {mqtt_publish_interval - (time.time() - last_read_time):.1f} seconds or on button press."
)
except KeyboardInterrupt:
# Clean up on exit

View File

@@ -10,8 +10,9 @@ from src.esp_sensors.mqtt import setup_mqtt, publish_sensor_data, MQTTClient
class TestSensor:
"""Mock sensor class for testing."""
__test__ = False # Prevent pytest from treating this as a test case
def __init__(self, name="Test Sensor", temperature_unit="C"):
self.name = name
self.temperature_unit = temperature_unit
@@ -57,14 +58,14 @@ def test_setup_mqtt_disabled(disabled_mqtt_config):
def test_setup_mqtt_enabled(mqtt_config):
"""Test that setup_mqtt creates and connects a client when MQTT is enabled."""
with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client:
with patch("src.esp_sensors.mqtt.MQTTClient") as mock_mqtt_client:
# Configure the mock
mock_client_instance = MagicMock()
mock_mqtt_client.return_value = mock_client_instance
# Call the function
client = setup_mqtt(mqtt_config)
# Verify MQTTClient was created with correct parameters
mock_mqtt_client.assert_called_once_with(
mqtt_config["client_id"],
@@ -73,30 +74,30 @@ def test_setup_mqtt_enabled(mqtt_config):
mqtt_config["username"],
mqtt_config["password"],
mqtt_config["keepalive"],
mqtt_config["ssl"]
mqtt_config["ssl"],
)
# Verify connect was called
mock_client_instance.connect.assert_called_once()
# Verify the client was returned
assert client == mock_client_instance
def test_setup_mqtt_connection_error(mqtt_config):
"""Test that setup_mqtt handles connection errors gracefully."""
with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client:
with patch("src.esp_sensors.mqtt.MQTTClient") as mock_mqtt_client:
# Configure the mock to raise an exception on connect
mock_client_instance = MagicMock()
mock_client_instance.connect.side_effect = Exception("Connection failed")
mock_mqtt_client.return_value = mock_client_instance
# Call the function
client = setup_mqtt(mqtt_config)
# Verify connect was called
mock_client_instance.connect.assert_called_once()
# Verify None was returned due to the error
assert client is None
@@ -105,23 +106,25 @@ def test_publish_sensor_data_success(mqtt_config, mock_sensor):
"""Test that publish_sensor_data publishes to the correct topics."""
# Create a mock client
mock_client = MagicMock()
# Call the function
temperature = 25.5
humidity = 60.0
result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, temperature, humidity)
result = publish_sensor_data(
mock_client, mqtt_config, mock_sensor, temperature, humidity
)
# Verify the result
assert result is True
# Verify publish was called for temperature
temp_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/temperature"
mock_client.publish.assert_any_call(temp_topic, str(temperature).encode())
# Verify publish was called for humidity
humidity_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/humidity"
mock_client.publish.assert_any_call(humidity_topic, str(humidity).encode())
# Verify publish was called for combined data
data_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/data"
# Check that the JSON data was published
@@ -149,9 +152,9 @@ def test_publish_sensor_data_error(mqtt_config, mock_sensor):
# Create a mock client that raises an exception on publish
mock_client = MagicMock()
mock_client.publish.side_effect = Exception("Publish failed")
# Call the function
result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, 25.5, 60.0)
# Verify the result
assert result is False
assert result is False