mirror of
https://github.com/OMGeeky/homecontrol.esp-sensors.git
synced 2026-02-13 21:18:17 +01:00
timed push to mqtt with manual trigger
This commit is contained in:
109
src/main.py
109
src/main.py
@@ -1,10 +1,10 @@
|
||||
"""
|
||||
Main application for ESP32 sensor display that activates on button press.
|
||||
Main application for ESP32 sensor display that activates on button press or at regular intervals.
|
||||
|
||||
This program:
|
||||
1. Sets up a button input
|
||||
2. Uses low-power sleep mode to conserve energy
|
||||
3. Wakes up and reads sensor data when the button is pressed
|
||||
3. Wakes up and reads sensor data when the button is pressed or at regular intervals
|
||||
4. Displays the data on an OLED screen
|
||||
5. Publishes sensor data to MQTT broker (if enabled)
|
||||
"""
|
||||
@@ -34,12 +34,40 @@ except ImportError:
|
||||
print("Running in simulation mode - hardware functions will be simulated")
|
||||
|
||||
|
||||
def simulate_button_press():
|
||||
"""Simulate a button press in simulation mode."""
|
||||
print(
|
||||
"\nPress Enter to simulate a button press (or 'q' to quit, Ctrl+C to exit)..."
|
||||
)
|
||||
def simulate_button_press(timeout=None):
|
||||
"""
|
||||
Simulate a button press in simulation mode.
|
||||
|
||||
Args:
|
||||
timeout: Time in seconds to wait for input before returning.
|
||||
If None, wait indefinitely.
|
||||
|
||||
Returns:
|
||||
True if button was pressed or timeout occurred, False to exit
|
||||
"""
|
||||
import select
|
||||
import sys
|
||||
|
||||
if timeout is not None:
|
||||
print(
|
||||
f"\nPress Enter to simulate a button press (or 'q' to quit, Ctrl+C to exit)..."
|
||||
f"\nWill automatically continue in {timeout} seconds..."
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"\nPress Enter to simulate a button press (or 'q' to quit, Ctrl+C to exit)..."
|
||||
)
|
||||
|
||||
try:
|
||||
# Set up select to monitor stdin with timeout
|
||||
if timeout is not None:
|
||||
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
|
||||
if not rlist:
|
||||
# Timeout occurred, no input
|
||||
print("Timeout reached, continuing automatically...")
|
||||
return True
|
||||
|
||||
# If we get here, either there was input or timeout was None
|
||||
user_input = input()
|
||||
if user_input.lower() == "q":
|
||||
return False
|
||||
@@ -71,6 +99,7 @@ def main():
|
||||
mqtt_client = setup_mqtt(mqtt_config)
|
||||
mqtt_publish_interval = mqtt_config.get("publish_interval", 60)
|
||||
last_publish_time = 0
|
||||
last_read_time = 0 # Track the last time sensors were read
|
||||
|
||||
# Set up button using configuration
|
||||
button_pin = button_config.get("pin", 0)
|
||||
@@ -80,35 +109,67 @@ def main():
|
||||
|
||||
# Display initialization message
|
||||
display.clear()
|
||||
display.display_text("Ready - Press Button", 0, 0)
|
||||
print("System initialized. Waiting for button press...")
|
||||
display.display_text("Ready - Auto & Button", 0, 0)
|
||||
print(f"System initialized. Will run every {mqtt_publish_interval} seconds or on button press...")
|
||||
|
||||
# Main loop - sleep until button press, then read and display sensor data
|
||||
try:
|
||||
while True:
|
||||
# Wait for button press
|
||||
# Calculate time until next scheduled reading
|
||||
current_time = time.time()
|
||||
time_since_last_read = current_time - last_read_time
|
||||
time_until_next_read = max(0, mqtt_publish_interval - int(time_since_last_read))
|
||||
|
||||
# Wait for button press or until next scheduled reading
|
||||
button_pressed = False
|
||||
|
||||
if SIMULATION:
|
||||
# In simulation mode, wait for Enter key
|
||||
if not simulate_button_press():
|
||||
break # Exit if Ctrl+C was pressed
|
||||
# In simulation mode, wait for Enter key with timeout
|
||||
if not simulate_button_press(timeout=time_until_next_read if last_read_time > 0 else None):
|
||||
break # Exit if 'q' was pressed or Ctrl+C
|
||||
|
||||
# If we get here, either button was pressed or timeout occurred
|
||||
button_pressed = True
|
||||
else:
|
||||
# In hardware mode, check if button is pressed (active low)
|
||||
if button.value() == 1: # Button not pressed
|
||||
if button.value() == 0: # Button is pressed
|
||||
button_pressed = True
|
||||
elif time_since_last_read >= mqtt_publish_interval:
|
||||
# Time for scheduled reading
|
||||
button_pressed = True
|
||||
else:
|
||||
# Go to light sleep mode to save power
|
||||
# Wake up on pin change (button press)
|
||||
print("Entering light sleep mode...")
|
||||
esp32.wake_on_ext0(
|
||||
pin=button, level=0
|
||||
) # Wake on button press (low)
|
||||
esp32.light_sleep() # Light sleep preserves RAM but saves power
|
||||
# When we get here, the button was pressed
|
||||
# Wake up on pin change (button press) or timer
|
||||
print(f"Entering light sleep mode for {time_until_next_read:.1f} seconds or until button press...")
|
||||
|
||||
print("Button pressed! Reading sensor data...")
|
||||
# Set up wake on button press
|
||||
esp32.wake_on_ext0(pin=button, level=0) # Wake on button press (low)
|
||||
|
||||
# Set up wake on timer
|
||||
if last_read_time > 0: # Skip timer on first run
|
||||
# Convert seconds to milliseconds for sleep
|
||||
esp32.wake_on_timer(time_until_next_read * 1000)
|
||||
|
||||
# Enter light sleep
|
||||
esp32.light_sleep() # Light sleep preserves RAM but saves power
|
||||
|
||||
# When we get here, either the button was pressed or the timer expired
|
||||
button_pressed = True
|
||||
|
||||
# Determine if this was triggered by a button press or scheduled interval
|
||||
if SIMULATION:
|
||||
trigger_source = "user input or scheduled interval"
|
||||
else:
|
||||
trigger_source = "button press" if button.value() == 0 else "scheduled interval"
|
||||
print(f"Reading sensor data (triggered by {trigger_source})...")
|
||||
|
||||
# Read sensor values
|
||||
temperature = dht_sensor.read_temperature()
|
||||
humidity = dht_sensor.read_humidity()
|
||||
|
||||
# Update last read time
|
||||
last_read_time = time.time()
|
||||
|
||||
# Format values for display
|
||||
temp_str = f"Temp: {temperature:.1f} C"
|
||||
hum_str = f"Humidity: {humidity:.1f}%"
|
||||
@@ -135,10 +196,10 @@ def main():
|
||||
|
||||
# Clear display to save power
|
||||
display.clear()
|
||||
display.display_text("Ready - Press Button", 0, 0)
|
||||
display.display_text("Ready - Auto & Button", 0, 0)
|
||||
|
||||
if SIMULATION:
|
||||
print("Display cleared. Ready for next button press.")
|
||||
print(f"Display cleared. Will run again in {mqtt_publish_interval - (time.time() - last_read_time):.1f} seconds or on button press.")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
# Clean up on exit
|
||||
|
||||
Reference in New Issue
Block a user