mirror of
https://github.com/OMGeeky/homecontrol.esp-sensors.git
synced 2025-12-26 17:02:29 +01:00
feat: implement simulation mode for DHT22 sensor and add dummy sensor for testing
This commit is contained in:
@@ -7,14 +7,11 @@ try:
|
||||
from machine import Pin
|
||||
|
||||
SIMULATION = False
|
||||
except ImportError:
|
||||
import random
|
||||
|
||||
SIMULATION = True
|
||||
except ModuleNotFoundError:
|
||||
SIMULATION = True # We're in a test environment
|
||||
|
||||
from .temperature import TemperatureSensor
|
||||
from .humidity import HumiditySensor
|
||||
from .config import get_sensor_config
|
||||
|
||||
|
||||
class DHT22Sensor(TemperatureSensor, HumiditySensor):
|
||||
@@ -68,6 +65,9 @@ class DHT22Sensor(TemperatureSensor, HumiditySensor):
|
||||
pin1 = Pin(self.pin)
|
||||
self._sensor = dht.DHT22(pin1)
|
||||
print(f"DHT22 sensor initialized on pin {pin1}")
|
||||
else:
|
||||
print("Initializing DHT22 sensor in simulation mode...")
|
||||
self._sensor = None
|
||||
|
||||
def apply_parameters(self, interval, name, pin, sensor_config):
|
||||
# Get main parameters from config if not provided
|
||||
@@ -107,7 +107,7 @@ class DHT22Sensor(TemperatureSensor, HumiditySensor):
|
||||
|
||||
self._last_reading = round(temp, 1)
|
||||
# Also read humidity while we're at it
|
||||
self._last_humidity = round(self._sensor.humidity(), 1)
|
||||
self._last_humidity = self._sensor.humidity()
|
||||
except Exception as e:
|
||||
print(f"Error reading DHT22 sensor: {e}")
|
||||
# Return last reading if available, otherwise default value
|
||||
@@ -116,7 +116,7 @@ class DHT22Sensor(TemperatureSensor, HumiditySensor):
|
||||
if self._last_humidity is None:
|
||||
self._last_humidity = 0.0
|
||||
|
||||
return self._last_reading
|
||||
return self._last_reading
|
||||
|
||||
def read(self) -> float:
|
||||
"""
|
||||
@@ -134,20 +134,20 @@ class DHT22Sensor(TemperatureSensor, HumiditySensor):
|
||||
Returns:
|
||||
The humidity reading as a float (percentage)
|
||||
"""
|
||||
# If we haven't read yet, read only humidity
|
||||
if self._last_humidity is None:
|
||||
if SIMULATION:
|
||||
# Use parent class simulation
|
||||
return super().read_humidity()
|
||||
else:
|
||||
# Actual hardware reading
|
||||
try:
|
||||
self._sensor.measure()
|
||||
self._last_humidity = round(self._sensor.humidity(), 1)
|
||||
except Exception as e:
|
||||
print(f"Error reading DHT22 humidity: {e}")
|
||||
# Return default value if no previous reading
|
||||
self._last_humidity = 0.0
|
||||
|
||||
if SIMULATION:
|
||||
# Use parent class simulation
|
||||
return super().read_humidity()
|
||||
else:
|
||||
# Actual hardware reading
|
||||
try:
|
||||
self._sensor.measure()
|
||||
self._last_humidity = self._sensor.humidity()
|
||||
except Exception as e:
|
||||
print(f"Error reading DHT22 humidity: {e}")
|
||||
# Return default value if no previous reading
|
||||
self._last_humidity = 0.0
|
||||
|
||||
return self._last_humidity
|
||||
|
||||
def get_metadata(self):
|
||||
|
||||
37
src/esp_sensors/dummy_sensor.py
Normal file
37
src/esp_sensors/dummy_sensor.py
Normal file
@@ -0,0 +1,37 @@
|
||||
try:
|
||||
import machine
|
||||
|
||||
SIMULATION = False
|
||||
except ModuleNotFoundError:
|
||||
SIMULATION = True # We're in a test environment
|
||||
import random # For generating random values in tests
|
||||
|
||||
|
||||
def read_dummy(name: str, unit: str) -> float:
|
||||
"""
|
||||
Dummy function to simulate reading a sensor value.
|
||||
|
||||
Args:
|
||||
name: The name of the sensor (e.g., "temperature", "humidity")
|
||||
unit: The unit of the data to generate (one of ['F', 'C', '%'])
|
||||
|
||||
Returns:
|
||||
A simulated sensor reading as a float
|
||||
"""
|
||||
|
||||
if SIMULATION:
|
||||
# Simulation mode - generate random values for testing
|
||||
if unit == "F":
|
||||
# Simulate a temperature reading in Fahrenheit
|
||||
return round(random.uniform(59.0, 86.0), 1)
|
||||
elif unit == "%":
|
||||
# Simulate a humidity reading in percentage
|
||||
return round(random.uniform(30.0, 90.0), 1)
|
||||
elif unit == "C":
|
||||
return round(random.uniform(15.0, 30.0), 1)
|
||||
else:
|
||||
raise ValueError(f"Unsupported unit for dummy sensor: {unit}")
|
||||
else:
|
||||
# This method should be overridden by subclasses to implement
|
||||
# actual temperature reading from hardware
|
||||
raise NotImplementedError(f"Subclasses must implement read_{name}()")
|
||||
@@ -2,7 +2,7 @@
|
||||
Humidity sensor module for ESP-based sensors.
|
||||
"""
|
||||
|
||||
import random
|
||||
from .dummy_sensor import read_dummy # Dummy sensor for simulation purposes
|
||||
from .sensor import Sensor
|
||||
from .config import get_sensor_config
|
||||
|
||||
@@ -41,9 +41,7 @@ class HumiditySensor(Sensor):
|
||||
Returns:
|
||||
The humidity reading as a float (percentage)
|
||||
"""
|
||||
# This is a simulation for testing purposes
|
||||
# In a real implementation, this would read from the actual sensor
|
||||
self._last_humidity = round(random.uniform(30.0, 90.0), 1)
|
||||
self._last_humidity = read_dummy("humidity", unit="%")
|
||||
return self._last_humidity
|
||||
|
||||
def get_metadata(self):
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
MQTT module for ESP sensors.
|
||||
|
||||
This module provides functionality to connect to an MQTT broker and publish sensor data.
|
||||
It supports both real hardware and simulation mode.
|
||||
|
||||
This module uses the MQTTClient class from mqtt_client.py for the core MQTT implementation.
|
||||
"""
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
Temperature sensor module for ESP-based sensors.
|
||||
"""
|
||||
|
||||
import random
|
||||
from .dummy_sensor import read_dummy # Dummy sensor for simulation purposes
|
||||
from .sensor import Sensor
|
||||
from .config import get_sensor_config
|
||||
|
||||
@@ -50,12 +50,7 @@ class TemperatureSensor(Sensor):
|
||||
Returns:
|
||||
The temperature reading as a float
|
||||
"""
|
||||
# This is a simulation for testing purposes
|
||||
# In a real implementation, this would read from the actual sensor
|
||||
if self.unit == "C":
|
||||
self._last_reading = round(random.uniform(15.0, 30.0), 1)
|
||||
else:
|
||||
self._last_reading = round(random.uniform(59.0, 86.0), 1)
|
||||
self._last_reading = read_dummy("temperature", unit=self.unit)
|
||||
return self._last_reading
|
||||
|
||||
def read(self) -> float:
|
||||
|
||||
42
src/main.py
42
src/main.py
@@ -26,48 +26,6 @@ from machine import Pin, deepsleep
|
||||
import esp32
|
||||
|
||||
|
||||
def simulate_button_press(timeout=None):
|
||||
"""
|
||||
Simulate a button press in simulation mode.
|
||||
|
||||
Args:
|
||||
timeout: Time in seconds to wait for input before returning.
|
||||
If None, wait indefinitely.
|
||||
|
||||
Returns:
|
||||
True if button was pressed or timeout occurred, False to exit
|
||||
"""
|
||||
import select
|
||||
import sys
|
||||
|
||||
if timeout is not None:
|
||||
print(
|
||||
f"\nPress Enter to simulate a button press (or 'q' to quit, Ctrl+C to exit)..."
|
||||
f"\nWill automatically continue in {timeout} seconds..."
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"\nPress Enter to simulate a button press (or 'q' to quit, Ctrl+C to exit)..."
|
||||
)
|
||||
|
||||
try:
|
||||
# Set up select to monitor stdin with timeout
|
||||
if timeout is not None:
|
||||
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
|
||||
if not rlist:
|
||||
# Timeout occurred, no input
|
||||
print("Timeout reached, continuing automatically...")
|
||||
return True
|
||||
|
||||
# If we get here, either there was input or timeout was None
|
||||
user_input = input()
|
||||
if user_input.lower() == "q":
|
||||
return False
|
||||
return True
|
||||
except KeyboardInterrupt:
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main function to demonstrate button-triggered sensor display with MQTT publishing.
|
||||
|
||||
@@ -27,6 +27,7 @@ def mqtt_config():
|
||||
"publish_interval": 30,
|
||||
"ssl": False,
|
||||
"keepalive": 60,
|
||||
"config_wait_time": 5.0,
|
||||
"use_esp32_client": True,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user