From 8bc58e40564e6354ce86482e15065839be1ca718 Mon Sep 17 00:00:00 2001 From: OMGeeky Date: Wed, 7 May 2025 23:36:49 +0200 Subject: [PATCH] make timer consistent, no matter if the button was pressed or not & some cleanup/formatting etc. --- src/esp_sensors/mqtt.py | 31 +++++++++++++--- src/esp_sensors/oled_display.py | 5 +++ src/main.py | 66 +++++++++++++++++++++++---------- tests/test_mqtt.py | 43 +++++++++++---------- 4 files changed, 99 insertions(+), 46 deletions(-) diff --git a/src/esp_sensors/mqtt.py b/src/esp_sensors/mqtt.py index 10eeaf0..f0a8c1e 100644 --- a/src/esp_sensors/mqtt.py +++ b/src/esp_sensors/mqtt.py @@ -12,15 +12,27 @@ from typing import Dict, Any, Optional, Union # Import hardware-specific modules if available (for ESP32/ESP8266) try: from umqtt.simple import MQTTClient + SIMULATION = False except ImportError: # Simulation mode for development on non-ESP hardware SIMULATION = True - print("[MQTT] Running in simulation mode - MQTT messages will be printed to console") + print( + "[MQTT] Running in simulation mode - MQTT messages will be printed to console" + ) # Mock MQTT client for simulation class MQTTClient: - def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False): + def __init__( + self, + client_id, + server, + port=0, + user=None, + password=None, + keepalive=0, + ssl=False, + ): self.client_id = client_id self.server = server self.port = port @@ -79,8 +91,13 @@ def setup_mqtt(mqtt_config: Dict[str, Any]) -> Optional[MQTTClient]: return None -def publish_sensor_data(client: Optional[MQTTClient], mqtt_config: Dict[str, Any], - sensor: Any, temperature: float, humidity: float) -> bool: +def publish_sensor_data( + client: Optional[MQTTClient], + mqtt_config: Dict[str, Any], + sensor: Any, + temperature: float, + humidity: float, +) -> bool: """ Publish sensor data to MQTT topics. @@ -115,11 +132,13 @@ def publish_sensor_data(client: Optional[MQTTClient], mqtt_config: Dict[str, Any "temperature": temperature, "humidity": humidity, "timestamp": time.time(), - "unit": sensor.unit + "unit": sensor.unit, } client.publish(data_topic, json.dumps(data).encode()) - print(f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}") + print( + f"Published sensor data to MQTT: {temp_topic}, {humidity_topic}, {data_topic}" + ) return True except Exception as e: print(f"Failed to publish to MQTT: {e}") diff --git a/src/esp_sensors/oled_display.py b/src/esp_sensors/oled_display.py index a6f26ac..1753729 100644 --- a/src/esp_sensors/oled_display.py +++ b/src/esp_sensors/oled_display.py @@ -29,6 +29,7 @@ class OLEDDisplay(Sensor): height: int = None, address: int | str = None, interval: int = None, + on_time: int = None, display_config: Dict[str, Any] = None, ): """ @@ -42,6 +43,7 @@ class OLEDDisplay(Sensor): height: Display height in pixels (if None, loaded from config) address: I2C address of the display (if None, loaded from config) interval: Refresh interval in seconds (if None, loaded from config) + on_time: The time, the display should stay on (if None, loaded from config) display_config: Configuration dictionary """ @@ -65,6 +67,9 @@ class OLEDDisplay(Sensor): self.sda_pin = sda_pin # Already set above self.width = width if width is not None else display_config.get("width", 128) self.height = height if height is not None else display_config.get("height", 64) + self.on_time = ( + on_time if on_time is not None else display_config.get("on_time", 5) + ) # Handle address (could be string in config) if address is None: diff --git a/src/main.py b/src/main.py index d6816e1..c611269 100644 --- a/src/main.py +++ b/src/main.py @@ -27,6 +27,7 @@ from esp_sensors.config import ( try: from machine import Pin, deepsleep import esp32 + SIMULATION = False except ImportError: # Simulation mode for development on non-ESP hardware @@ -110,7 +111,9 @@ def main(): # Display initialization message display.clear() display.display_text("Ready - Auto & Button", 0, 0) - print(f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press...") + print( + f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press..." + ) # Main loop - sleep until button press, then read and display sensor data try: @@ -118,32 +121,36 @@ def main(): # Calculate time until next scheduled reading current_time = time.time() time_since_last_read = current_time - last_read_time - time_until_next_read = max(0, mqtt_publish_interval - int(time_since_last_read)) + time_until_next_read = max( + 0, mqtt_publish_interval - int(time_since_last_read) + ) # Wait for button press or until next scheduled reading - button_pressed = False - if SIMULATION: # In simulation mode, wait for Enter key with timeout - if not simulate_button_press(timeout=time_until_next_read if last_read_time > 0 else None): + + if not simulate_button_press(timeout=time_until_next_read): break # Exit if 'q' was pressed or Ctrl+C - # If we get here, either button was pressed or timeout occurred - button_pressed = True else: # In hardware mode, check if button is pressed (active low) - if button.value() == 0: # Button is pressed - button_pressed = True + button_pressed_value = 0 if not pull_up else 1 # TODO: check if 1 is correct + if button.value() == button_pressed_value: # Button is pressed + pass elif time_since_last_read >= mqtt_publish_interval: # Time for scheduled reading - button_pressed = True + pass else: # Go to light sleep mode to save power # Wake up on pin change (button press) or timer - print(f"Entering light sleep mode for {time_until_next_read:.1f} seconds or until button press...") + print( + f"Entering light sleep mode for {time_until_next_read:.1f} seconds or until button press..." + ) # Set up wake on button press - esp32.wake_on_ext0(pin=button, level=0) # Wake on button press (low) + esp32.wake_on_ext0( + pin=button, level=button_pressed_value + ) # Wake on button press (low) # Set up wake on timer if last_read_time > 0: # Skip timer on first run @@ -154,13 +161,15 @@ def main(): esp32.light_sleep() # Light sleep preserves RAM but saves power # When we get here, either the button was pressed or the timer expired - button_pressed = True + print(f"Awake from light sleep") # Determine if this was triggered by a button press or scheduled interval if SIMULATION: trigger_source = "user input or scheduled interval" else: - trigger_source = "button press" if button.value() == 0 else "scheduled interval" + trigger_source = ( + "button press" if button.value() == 0 else "scheduled interval" + ) print(f"Reading sensor data (triggered by {trigger_source})...") # Read sensor values @@ -169,6 +178,13 @@ def main(): # Update last read time last_read_time = time.time() + # Adjust for actual sleep duration (so if we sleep longer or shorter (because of + # the button) than expected, we don't miss the next scheduled read) + actual_sleep_duration = last_read_time - current_time + last_read_time -= ( + (actual_sleep_duration - time_until_next_read) + % mqtt_publish_interval + ) # Format values for display temp_str = f"Temp: {temperature:.1f} C" @@ -176,7 +192,9 @@ def main(): time_str = f"Time: {time.time():.0f}" name_str = f"Sensor: {dht_sensor.name}" + # Display values + # TODO: only display values, if the button has been clicked display.display_values( [name_str, temp_str, hum_str, time_str, "Press button again"] ) @@ -184,22 +202,30 @@ def main(): # Print to console print(f"Updated display with: {temp_str}, {hum_str}") - # Publish to MQTT if enabled and interval has elapsed + # Publish to MQTT if enabled current_time = time.time() - if mqtt_client and (current_time - last_publish_time >= mqtt_publish_interval): - publish_sensor_data(mqtt_client, mqtt_config, dht_sensor, temperature, humidity) + publish_sensor_data( + mqtt_client, mqtt_config, dht_sensor, temperature, humidity + ) + if mqtt_client and ( + current_time - last_publish_time >= mqtt_publish_interval + ): last_publish_time = current_time - print(f"Next MQTT publish in {mqtt_publish_interval} seconds") + print(f"Next normal MQTT publish in {mqtt_publish_interval} seconds") # Keep display on for a few seconds before going back to sleep - time.sleep(5) + time.sleep(display.on_time) # Clear display to save power display.clear() display.display_text("Ready - Auto & Button", 0, 0) + print("last_read_time", last_read_time) + if SIMULATION: - print(f"Display cleared. Will run again in {mqtt_publish_interval - (time.time() - last_read_time):.1f} seconds or on button press.") + print( + f"Display cleared. Will run again in {mqtt_publish_interval - (time.time() - last_read_time):.1f} seconds or on button press." + ) except KeyboardInterrupt: # Clean up on exit diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py index b606acf..10a5e59 100644 --- a/tests/test_mqtt.py +++ b/tests/test_mqtt.py @@ -10,8 +10,9 @@ from src.esp_sensors.mqtt import setup_mqtt, publish_sensor_data, MQTTClient class TestSensor: """Mock sensor class for testing.""" + __test__ = False # Prevent pytest from treating this as a test case - + def __init__(self, name="Test Sensor", temperature_unit="C"): self.name = name self.temperature_unit = temperature_unit @@ -57,14 +58,14 @@ def test_setup_mqtt_disabled(disabled_mqtt_config): def test_setup_mqtt_enabled(mqtt_config): """Test that setup_mqtt creates and connects a client when MQTT is enabled.""" - with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client: + with patch("src.esp_sensors.mqtt.MQTTClient") as mock_mqtt_client: # Configure the mock mock_client_instance = MagicMock() mock_mqtt_client.return_value = mock_client_instance - + # Call the function client = setup_mqtt(mqtt_config) - + # Verify MQTTClient was created with correct parameters mock_mqtt_client.assert_called_once_with( mqtt_config["client_id"], @@ -73,30 +74,30 @@ def test_setup_mqtt_enabled(mqtt_config): mqtt_config["username"], mqtt_config["password"], mqtt_config["keepalive"], - mqtt_config["ssl"] + mqtt_config["ssl"], ) - + # Verify connect was called mock_client_instance.connect.assert_called_once() - + # Verify the client was returned assert client == mock_client_instance def test_setup_mqtt_connection_error(mqtt_config): """Test that setup_mqtt handles connection errors gracefully.""" - with patch('src.esp_sensors.mqtt.MQTTClient') as mock_mqtt_client: + with patch("src.esp_sensors.mqtt.MQTTClient") as mock_mqtt_client: # Configure the mock to raise an exception on connect mock_client_instance = MagicMock() mock_client_instance.connect.side_effect = Exception("Connection failed") mock_mqtt_client.return_value = mock_client_instance - + # Call the function client = setup_mqtt(mqtt_config) - + # Verify connect was called mock_client_instance.connect.assert_called_once() - + # Verify None was returned due to the error assert client is None @@ -105,23 +106,25 @@ def test_publish_sensor_data_success(mqtt_config, mock_sensor): """Test that publish_sensor_data publishes to the correct topics.""" # Create a mock client mock_client = MagicMock() - + # Call the function temperature = 25.5 humidity = 60.0 - result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, temperature, humidity) - + result = publish_sensor_data( + mock_client, mqtt_config, mock_sensor, temperature, humidity + ) + # Verify the result assert result is True - + # Verify publish was called for temperature temp_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/temperature" mock_client.publish.assert_any_call(temp_topic, str(temperature).encode()) - + # Verify publish was called for humidity humidity_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/humidity" mock_client.publish.assert_any_call(humidity_topic, str(humidity).encode()) - + # Verify publish was called for combined data data_topic = f"{mqtt_config['topic_prefix']}/{mock_sensor.name.lower().replace(' ', '_')}/data" # Check that the JSON data was published @@ -149,9 +152,9 @@ def test_publish_sensor_data_error(mqtt_config, mock_sensor): # Create a mock client that raises an exception on publish mock_client = MagicMock() mock_client.publish.side_effect = Exception("Publish failed") - + # Call the function result = publish_sensor_data(mock_client, mqtt_config, mock_sensor, 25.5, 60.0) - + # Verify the result - assert result is False \ No newline at end of file + assert result is False