remove simulation mode support from MQTT handling and related files

This commit is contained in:
OMGeeky
2025-06-07 10:52:40 +02:00
parent 2a710d63c9
commit d0f243d261
3 changed files with 4 additions and 86 deletions

View File

@@ -64,14 +64,6 @@ def main():
retain=True
)
# In simulation mode, simulate receiving a message
if SIMULATION:
print("Simulation mode: Simulating a control message")
client.client.simulate_message(
MQTT_CONFIG["topic_control"],
json.dumps({"command": "set_led", "value": "on"}).encode()
)
# Read from the control topic with a timeout
print(f"Waiting for messages on {MQTT_CONFIG['topic_control']} (timeout: 10s)")
message = client.read_topic(MQTT_CONFIG["topic_control"], 10)

View File

@@ -12,16 +12,6 @@ import json
import socket
import struct
# Determine if we're running on ESP hardware or in simulation mode
try:
# import network
SIMULATION = False
except ImportError:
SIMULATION = True
print(
"[MQTT] Running in simulation mode - MQTT messages will be printed to console"
)
# MQTT Protocol Constants
MQTT_PROTOCOL_LEVEL = 4 # MQTT 3.1.1
MQTT_CLEAN_SESSION = 1
@@ -65,24 +55,6 @@ class MQTTClient:
keepalive=60,
ssl=False,
):
if SIMULATION:
# In simulation mode, we don't actually connect to a broker
self.client_id = client_id
self.server = server
self.port = port
self.user = user
self.password = password
self.keepalive = keepalive
self.ssl = ssl
self.connected = False
self.callback = None
self.last_message = None
self.last_topic = None
self.subscriptions = {} # Track subscribed topics
self.sock = None
self.pid = 0 # Packet ID
return
# Real implementation for ESP hardware
self.client_id = client_id
self.server = server
@@ -147,9 +119,6 @@ class MQTTClient:
def _recv_packet(self, timeout=1.0):
"""Receive an MQTT packet from the broker"""
if SIMULATION:
# In simulation mode, we don't actually receive packets
return None, None
if self.sock is None:
raise MQTTException("Not connected to broker (_recv_packet)")
@@ -185,10 +154,6 @@ class MQTTClient:
def connect(self):
"""Connect to the MQTT broker"""
if SIMULATION:
print(f"[MQTT SIM] Connecting to {self.server}:{self.port} as {self.client_id}")
self.connected = True
return 0
# Create socket
try:
@@ -249,10 +214,6 @@ class MQTTClient:
def disconnect(self):
"""Disconnect from the MQTT broker"""
if SIMULATION:
print("[MQTT SIM] Disconnected")
self.connected = False
return
if self.connected:
try:
@@ -277,10 +238,6 @@ class MQTTClient:
def publish(self, topic, msg, retain=False, qos=0):
"""Publish a message to a topic"""
if SIMULATION:
retain_str = " (retained)" if retain else ""
print(f"[MQTT SIM] Publishing to {topic}{retain_str}: {msg}")
return
if not self.connected:
raise MQTTException("Not connected to broker (publish)")
@@ -325,11 +282,6 @@ class MQTTClient:
def subscribe(self, topic, qos=0):
"""Subscribe to a topic"""
if SIMULATION:
topic_str = topic.decode('utf-8') if isinstance(topic, bytes) else topic
print(f"[MQTT SIM] Subscribed to topic: {topic_str}")
self.subscriptions[topic_str] = qos
return
if not self.connected:
raise MQTTException("Not connected to broker (subscribe)")
@@ -367,8 +319,6 @@ class MQTTClient:
def set_callback(self, callback):
"""Set callback for received messages"""
self.callback = callback
if SIMULATION:
print(f"[MQTT SIM] Callback set")
def check_msg(self):
"""Check for pending messages from the broker"""
@@ -413,18 +363,6 @@ class MQTTClient:
return
# For simulation only - allows us to simulate receiving a message
def simulate_message(self, topic, msg):
"""Simulate receiving a message (simulation mode only)"""
if not SIMULATION:
return
self.last_topic = topic
self.last_message = msg
print(f"[MQTT SIM] Simulated message received on {topic}: {msg}")
if self.callback:
self.callback(topic, msg)
class ESP32MQTTClient:
"""

View File

@@ -17,15 +17,8 @@ from esp_sensors.oled_display import OLEDDisplay
from esp_sensors.config import Config
# Import hardware-specific modules if available (for ESP32/ESP8266)
try:
from machine import Pin, deepsleep
import esp32
SIMULATION = False
except ImportError:
# Simulation mode for development on non-ESP hardware
SIMULATION = True
print("Running in simulation mode - hardware functions will be simulated")
from machine import Pin, deepsleep
import esp32
def simulate_button_press(timeout=None):
@@ -195,12 +188,7 @@ def main():
display.set_status(f"Sleeping {time_until_next_read}s")
print('sleeping for', time_until_next_read, 'seconds')
if not SIMULATION:
deepsleep(time_until_next_read * 1000)
else:
# Simulate sleep
print(f"Simulated deep sleep for {time_until_next_read:.1f} seconds")
time.sleep(time_until_next_read)
except KeyboardInterrupt:
# Clean up on exit