Skip to content

Capstone: Production IoT Monitoring System

Capstone: Production IoT Monitoring System hero image
Modified:
Published:
Production System Health Monitoring
──────────────────────────────────────────
Normal Operation:
Node 1: ✓ online 25.3C 62% RH
Node 2: ✓ online 24.8C 58% RH
Node 3: ✓ online 26.1C AQI 42
Failure Detection:
Node 2: ✗ offline (no msg in 5 min)
├──► Grafana: panel turns RED
├──► Node-RED: status alert fires
├──► Slack: "Node 2 offline at 03:12"
└──► Escalation: SMS after 15 min

Seven lessons taught you the individual skills: protocol selection, broker configuration, multi-MCU MQTT clients, time-series dashboards, REST APIs, alert automation, and device security. This capstone pulls all of them together into a single production deployment. ```text Docker Compose Service Map ────────────────────────────────────────── docker-compose.yml ┌─────────────────────────────────────┐ │ mosquitto :1883, :8883 (TLS) │ │ telegraf (internal, no port) │ │ influxdb :8086 │ │ grafana :3000 │ │ nodered :1880 │ │ flask-api :5000 │ └─────────────────────────────────────┘ │ Shared Docker network: iot-net Services reference each other by name: telegraf ──► mosquitto (mqtt://mosquitto) telegraf ──► influxdb (http://influxdb) grafana ──► influxdb (http://influxdb) nodered ──► mosquitto (mqtt://mosquitto)

You will build a greenhouse monitoring system with three sensor nodes, a TLS-secured broker, a time-series pipeline, live dashboards, automated alerts, a REST API for device management, and cloud forwarding to SiliconWit.io. When you finish, you will have a system that runs unattended, recovers from failures, and gives you visibility from anywhere in the world. #IoT #Production #Capstone
## System Overview
The system monitors a greenhouse environment using three sensor nodes, each running on a different MCU platform. All data flows through a central Mosquitto broker secured with TLS and per-device credentials. Telegraf subscribes to MQTT topics and writes readings into InfluxDB. Grafana queries InfluxDB and renders live dashboards. A Flask REST API provides device listing, data queries, and command dispatch. Node-RED handles automation rules and alert routing. The Mosquitto broker also bridges selected topics to SiliconWit.io, giving you a cloud mirror of your data that remains accessible even when your local infrastructure is down.
### Architecture

┌───────────────────────────────────────────────────┐ │ Local Infrastructure │ │ │ ┌─────────────┐ │ ┌────────────┐ ┌──────────┐ ┌───────────┐ │ │ ESP32 Node │────┼──│ │ │ │ │ │ │ │ (BME280 + │ │ │ │ │ │ │ Grafana │ │ │ Soil Moist)│ │ │ Mosquitto │───>│ Telegraf │───>│InfluxDB │ │ │ │ │ │ Broker │ │ │ │ │ │ ├─────────────┤ │ │ (TLS + │ └──────────┘ └───────────┘ │ │ Pico W Node │────┼──│ ACLs) │ │ │ (Light + │ │ │ │───>┌──────────┐ ┌───────────┐ │ │ PIR Motion)│ │ │ │ │ Node-RED │───>│ Slack / │ │ │ │ │ │ │ │ (alerts) │ │ Webhook │ │ ├─────────────┤ │ │ │ └──────────┘ └───────────┘ │ │ STM32 Node │────┼──│ │ │ │ (MQ-135 Air │ │ │ │───>┌──────────┐ │ │ Quality) │ │ │ │ │ Flask API│ │ │ │ │ └─────┬──────┘ └──────────┘ │ └─────────────┘ │ │ │ └────────┼──────────────────────────────────────────┘ │ │ MQTT Bridge (TLS) │ ┌────────▼──────────────────┐ │ SiliconWit.io Cloud │ │ (dashboard, alerts, API) │ └───────────────────────────┘

### System Components
<CardGrid>
<Card title="Sensor Node A: ESP32" icon="star">
Reads BME280 (temperature, humidity, pressure) and a capacitive soil moisture sensor. Publishes every 30 seconds with QoS 1. Implements exponential backoff on reconnection and buffers up to 60 readings in RTC memory when offline. **Skills from:** Lesson 3 (MQTT clients), ESP32 Course (peripherals, deep sleep).
</Card>
<Card title="Sensor Node B: Pico W" icon="star">
Reads an LDR light sensor via ADC and a PIR motion sensor via GPIO interrupt. Publishes light level every 30 seconds and motion events on change (edge triggered). **Skills from:** Lesson 3 (MQTT clients), RPi Pico Course (GPIO, ADC).
</Card>
<Card title="Sensor Node C: STM32 + ESP-01" icon="star">
Reads an MQ-135 air quality sensor via ADC. Publishes air quality index every 30 seconds. The STM32 communicates with the ESP-01 over UART using AT commands. **Skills from:** Lesson 3 (MQTT clients), STM32 Course (UART, ADC).
</Card>
<Card title="Mosquitto Broker" icon="setting">
TLS 1.2 with server certificates, password authentication, topic ACLs that restrict each device to its own publish prefix, persistent sessions, and an MQTT bridge that forwards all sensor data to SiliconWit.io. **Skills from:** Lesson 2 (broker setup), Lesson 7 (TLS and security).
</Card>
<Card title="Telegraf + InfluxDB" icon="document">
Telegraf subscribes to `greenhouse/#` and writes every message into an InfluxDB bucket with 30-day retention. InfluxDB provides the query engine for both Grafana and the REST API. **Skills from:** Lesson 4 (dashboards and data).
</Card>
<Card title="Grafana Dashboard" icon="approve-check">
Panels for temperature, humidity, pressure, soil moisture, light, motion events, air quality, device online/offline status, and alert history. Auto-refreshes every 10 seconds. **Skills from:** Lesson 4 (dashboards and data).
</Card>
<Card title="Flask REST API" icon="document">
Endpoints for listing devices, querying historical data, and dispatching commands to nodes (such as changing the publish interval). Secured with API keys. **Skills from:** Lesson 5 (REST APIs and webhooks).
</Card>
<Card title="Node-RED Alerts" icon="warning">
Threshold alerts (temperature above 35C, humidity below 20%), inactivity alerts (no data from a node for 5 minutes), and notifications via Slack webhook. **Skills from:** Lesson 6 (alerts and automation).
</Card>
<Card title="SiliconWit.io Cloud" icon="rocket">
Data forwarded via MQTT bridge appears on SiliconWit.io dashboards automatically. Students can monitor their greenhouse remotely even when the local Grafana instance is unreachable. **Skills from:** Lesson 2 (broker bridging).
</Card>
</CardGrid>
## MQTT Topic Hierarchy
<InArticleAd />
A well-structured topic hierarchy keeps the system organized and makes ACLs straightforward. Every sensor node publishes under `greenhouse/<node-id>/`, and status topics use retained messages with last will.

greenhouse/esp32-node/temperature “23.5” (Celsius, QoS 1, retained) greenhouse/esp32-node/humidity “61.2” (percent, QoS 1, retained) greenhouse/esp32-node/pressure “1013.2” (hPa, QoS 1, retained) greenhouse/esp32-node/soil_moisture “42” (percent, QoS 1, retained) greenhouse/esp32-node/status “online” (retained, last will: “offline”)

greenhouse/pico-node/light “780” (lux, QoS 1, retained) greenhouse/pico-node/motion “1” (1=detected, 0=clear, QoS 1) greenhouse/pico-node/status “online” (retained, last will: “offline”)

greenhouse/stm32-node/air_quality “127” (AQI 0-500, QoS 1, retained) greenhouse/stm32-node/status “online” (retained, last will: “offline”)

greenhouse/cmd/[node-id] (commands from API)

**Design decisions:**
- **Retained messages** on all sensor topics so that any new subscriber (Telegraf restarting, a new dashboard panel) immediately gets the last known value.
- **QoS 1** for all publishes. QoS 0 risks silent data loss; QoS 2 adds round trips without meaningful benefit for sensor data where occasional duplicates are harmless.
- **Per-node prefixes** make ACLs simple: each device credential is allowed to publish only under its own prefix.
- **Command topics** use `greenhouse/cmd/<node-id>` so the REST API can dispatch configuration changes to specific nodes.
## Broker Configuration
<InArticleAd />
### Mosquitto with TLS and ACLs
The broker runs in a Docker container with TLS certificates, password authentication, and topic ACLs. This configuration was introduced in Lesson 2 and hardened in Lesson 7.
```ini title="mosquitto/mosquitto.conf"
# Listener with TLS
listener 8883
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
cafile /mosquitto/certs/ca.crt
# Authentication
allow_anonymous false
password_file /mosquitto/config/passwd
# Access control
acl_file /mosquitto/config/acl
# Persistence
persistence true
persistence_location /mosquitto/data/
# Logging
log_dest file /mosquitto/log/mosquitto.log
log_type all
# Bridge to SiliconWit.io
connection siliconwit-bridge
address mqtt.siliconwit.io:8883
bridge_cafile /mosquitto/certs/siliconwit-ca.crt
remote_username YOUR_SILICONWIT_USERNAME
remote_password YOUR_SILICONWIT_TOKEN
topic greenhouse/# out 1
bridge_protocol_version mqttv311
bridge_insecure false
start_type automatic
notifications true
notification_topic greenhouse/bridge/status

ACL File

mosquitto/acl
# ESP32 node: publish to its own topics, subscribe to commands
user esp32-node
topic write greenhouse/esp32-node/#
topic read greenhouse/cmd/esp32-node
# Pico W node: publish to its own topics, subscribe to commands
user pico-node
topic write greenhouse/pico-node/#
topic read greenhouse/cmd/pico-node
# STM32 node: publish to its own topics, subscribe to commands
user stm32-node
topic write greenhouse/stm32-node/#
topic read greenhouse/cmd/stm32-node
# Telegraf: read all greenhouse data
user telegraf
topic read greenhouse/#
# API server: read all data, write commands
user api-server
topic read greenhouse/#
topic write greenhouse/cmd/#
# Node-RED: read all data
user nodered
topic read greenhouse/#

Password File

Generate the password file with mosquitto_passwd:

Generate credentials
# Create the password file (first user, -c creates the file)
mosquitto_passwd -c mosquitto/passwd esp32-node
# Add remaining users (no -c, appends to file)
mosquitto_passwd mosquitto/passwd pico-node
mosquitto_passwd mosquitto/passwd stm32-node
mosquitto_passwd mosquitto/passwd telegraf
mosquitto_passwd mosquitto/passwd api-server
mosquitto_passwd mosquitto/passwd nodered

Node Firmware



ESP32 Node (BME280 + Soil Moisture)

This node reads temperature, humidity, and pressure from a BME280 over I2C, and soil moisture from a capacitive sensor on ADC. It publishes every 30 seconds with QoS 1. If the MQTT connection drops, it buffers readings in a ring buffer and publishes them when the connection recovers. Reconnection uses exponential backoff starting at 1 second and capping at 60 seconds (Lesson 3 pattern).

esp32_node/main/main.c
#include <stdio.h>
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/task.h"
#include "esp_log.h"
#include "esp_wifi.h"
#include "esp_event.h"
#include "nvs_flash.h"
#include "mqtt_client.h"
#include "driver/i2c.h"
#include "driver/adc.h"
#define TAG "GREENHOUSE_ESP32"
/* Configuration */
#define PUBLISH_INTERVAL_MS 30000
#define BROKER_URI "mqtts://192.168.1.100:8883"
#define MQTT_USERNAME "esp32-node"
#define MQTT_PASSWORD "your_password_here"
#define MAX_BACKOFF_MS 60000
#define OFFLINE_BUFFER_SIZE 60
/* BME280 I2C address */
#define BME280_ADDR 0x76
#define I2C_MASTER_NUM I2C_NUM_0
#define I2C_SDA_PIN 21
#define I2C_SCL_PIN 22
/* Soil moisture ADC */
#define SOIL_ADC_CHANNEL ADC1_CHANNEL_6 /* GPIO34 */
#define SOIL_DRY_VALUE 3200
#define SOIL_WET_VALUE 1400
/* Topic prefixes */
#define TOPIC_TEMP "greenhouse/esp32-node/temperature"
#define TOPIC_HUMIDITY "greenhouse/esp32-node/humidity"
#define TOPIC_PRESSURE "greenhouse/esp32-node/pressure"
#define TOPIC_SOIL "greenhouse/esp32-node/soil_moisture"
#define TOPIC_STATUS "greenhouse/esp32-node/status"
#define TOPIC_CMD "greenhouse/cmd/esp32-node"
/* State */
static bool mqtt_connected = false;
static esp_mqtt_client_handle_t mqtt_client = NULL;
static int backoff_ms = 1000;
/* Offline buffer */
typedef struct {
float temperature;
float humidity;
float pressure;
int soil_pct;
} sensor_reading_t;
static sensor_reading_t offline_buffer[OFFLINE_BUFFER_SIZE];
static int buffer_head = 0;
static int buffer_count = 0;
/* ── BME280 driver (simplified, register-level) ── */
static esp_err_t bme280_init(void)
{
i2c_config_t conf = {
.mode = I2C_MODE_MASTER,
.sda_io_num = I2C_SDA_PIN,
.scl_io_num = I2C_SCL_PIN,
.sda_pullup_en = GPIO_PULLUP_ENABLE,
.scl_pullup_en = GPIO_PULLUP_ENABLE,
.master.clk_speed = 100000,
};
i2c_param_config(I2C_MASTER_NUM, &conf);
i2c_driver_install(I2C_MASTER_NUM, conf.mode, 0, 0, 0);
/* Write config registers: normal mode, oversampling x1 */
uint8_t ctrl_meas = 0x27; /* osrs_t=1, osrs_p=1, mode=normal */
uint8_t ctrl_hum = 0x01; /* osrs_h=1 */
i2c_cmd_handle_t cmd = i2c_cmd_link_create();
i2c_master_start(cmd);
i2c_master_write_byte(cmd, (BME280_ADDR << 1) | I2C_MASTER_WRITE, true);
i2c_master_write_byte(cmd, 0xF2, true); /* ctrl_hum register */
i2c_master_write_byte(cmd, ctrl_hum, true);
i2c_master_stop(cmd);
esp_err_t ret = i2c_master_cmd_begin(I2C_MASTER_NUM, cmd, pdMS_TO_TICKS(100));
i2c_cmd_link_delete(cmd);
if (ret != ESP_OK) return ret;
cmd = i2c_cmd_link_create();
i2c_master_start(cmd);
i2c_master_write_byte(cmd, (BME280_ADDR << 1) | I2C_MASTER_WRITE, true);
i2c_master_write_byte(cmd, 0xF4, true); /* ctrl_meas register */
i2c_master_write_byte(cmd, ctrl_meas, true);
i2c_master_stop(cmd);
ret = i2c_master_cmd_begin(I2C_MASTER_NUM, cmd, pdMS_TO_TICKS(100));
i2c_cmd_link_delete(cmd);
return ret;
}
static esp_err_t bme280_read(float *temp, float *hum, float *press)
{
uint8_t data[8];
/* Read registers 0xF7..0xFE (pressure, temperature, humidity) */
i2c_cmd_handle_t cmd = i2c_cmd_link_create();
i2c_master_start(cmd);
i2c_master_write_byte(cmd, (BME280_ADDR << 1) | I2C_MASTER_WRITE, true);
i2c_master_write_byte(cmd, 0xF7, true);
i2c_master_start(cmd);
i2c_master_write_byte(cmd, (BME280_ADDR << 1) | I2C_MASTER_READ, true);
i2c_master_read(cmd, data, 8, I2C_MASTER_LAST_NACK);
i2c_master_stop(cmd);
esp_err_t ret = i2c_master_cmd_begin(I2C_MASTER_NUM, cmd, pdMS_TO_TICKS(100));
i2c_cmd_link_delete(cmd);
if (ret != ESP_OK) return ret;
/* Simplified compensation (use full Bosch algorithm in production) */
int32_t adc_t = ((int32_t)data[3] << 12) | ((int32_t)data[4] << 4) | (data[5] >> 4);
int32_t adc_p = ((int32_t)data[0] << 12) | ((int32_t)data[1] << 4) | (data[2] >> 4);
int32_t adc_h = ((int32_t)data[6] << 8) | data[7];
/* Placeholder compensation (replace with calibration data in production) */
*temp = (float)adc_t / 16384.0f - 20.0f;
*press = (float)adc_p / 256.0f;
*hum = (float)adc_h / 512.0f;
return ESP_OK;
}
/* ── Soil moisture ── */
static int read_soil_moisture_pct(void)
{
adc1_config_width(ADC_WIDTH_BIT_12);
adc1_config_channel_atten(SOIL_ADC_CHANNEL, ADC_ATTEN_DB_11);
int raw = adc1_get_raw(SOIL_ADC_CHANNEL);
/* Map raw ADC to 0-100% (dry=0%, wet=100%) */
if (raw >= SOIL_DRY_VALUE) return 0;
if (raw <= SOIL_WET_VALUE) return 100;
return (int)(100.0f * (SOIL_DRY_VALUE - raw) / (SOIL_DRY_VALUE - SOIL_WET_VALUE));
}
/* ── Offline buffer ── */
static void buffer_reading(float temp, float hum, float press, int soil)
{
offline_buffer[buffer_head].temperature = temp;
offline_buffer[buffer_head].humidity = hum;
offline_buffer[buffer_head].pressure = press;
offline_buffer[buffer_head].soil_pct = soil;
buffer_head = (buffer_head + 1) % OFFLINE_BUFFER_SIZE;
if (buffer_count < OFFLINE_BUFFER_SIZE) buffer_count++;
ESP_LOGW(TAG, "Buffered reading (%d in buffer)", buffer_count);
}
static void flush_buffer(esp_mqtt_client_handle_t client)
{
if (buffer_count == 0) return;
ESP_LOGI(TAG, "Flushing %d buffered readings", buffer_count);
int start = (buffer_head - buffer_count + OFFLINE_BUFFER_SIZE) % OFFLINE_BUFFER_SIZE;
for (int i = 0; i < buffer_count; i++) {
int idx = (start + i) % OFFLINE_BUFFER_SIZE;
char val[16];
snprintf(val, sizeof(val), "%.1f", offline_buffer[idx].temperature);
esp_mqtt_client_publish(client, TOPIC_TEMP, val, 0, 1, 1);
snprintf(val, sizeof(val), "%.1f", offline_buffer[idx].humidity);
esp_mqtt_client_publish(client, TOPIC_HUMIDITY, val, 0, 1, 1);
snprintf(val, sizeof(val), "%.1f", offline_buffer[idx].pressure);
esp_mqtt_client_publish(client, TOPIC_PRESSURE, val, 0, 1, 1);
snprintf(val, sizeof(val), "%d", offline_buffer[idx].soil_pct);
esp_mqtt_client_publish(client, TOPIC_SOIL, val, 0, 1, 1);
vTaskDelay(pdMS_TO_TICKS(50)); /* Brief delay to avoid flooding */
}
buffer_count = 0;
buffer_head = 0;
}
/* ── MQTT event handler ── */
static void mqtt_event_handler(void *args, esp_event_base_t base,
int32_t event_id, void *event_data)
{
esp_mqtt_event_handle_t event = event_data;
switch (event_id) {
case MQTT_EVENT_CONNECTED:
ESP_LOGI(TAG, "MQTT connected");
mqtt_connected = true;
backoff_ms = 1000; /* Reset backoff */
/* Publish online status */
esp_mqtt_client_publish(event->client, TOPIC_STATUS, "online", 0, 1, 1);
/* Subscribe to command topic */
esp_mqtt_client_subscribe(event->client, TOPIC_CMD, 1);
/* Flush any buffered readings */
flush_buffer(event->client);
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGW(TAG, "MQTT disconnected, backoff %d ms", backoff_ms);
mqtt_connected = false;
vTaskDelay(pdMS_TO_TICKS(backoff_ms));
if (backoff_ms < MAX_BACKOFF_MS) {
backoff_ms *= 2;
}
break;
case MQTT_EVENT_DATA:
ESP_LOGI(TAG, "Received command on %.*s: %.*s",
event->topic_len, event->topic,
event->data_len, event->data);
/* Parse command JSON here (e.g., change publish interval) */
break;
default:
break;
}
}
/* ── Main task ── */
static void sensor_publish_task(void *pvParameters)
{
bme280_init();
while (1) {
float temp, hum, press;
int soil_pct;
if (bme280_read(&temp, &hum, &press) == ESP_OK) {
soil_pct = read_soil_moisture_pct();
if (mqtt_connected) {
char val[16];
snprintf(val, sizeof(val), "%.1f", temp);
esp_mqtt_client_publish(mqtt_client, TOPIC_TEMP, val, 0, 1, 1);
snprintf(val, sizeof(val), "%.1f", hum);
esp_mqtt_client_publish(mqtt_client, TOPIC_HUMIDITY, val, 0, 1, 1);
snprintf(val, sizeof(val), "%.1f", press);
esp_mqtt_client_publish(mqtt_client, TOPIC_PRESSURE, val, 0, 1, 1);
snprintf(val, sizeof(val), "%d", soil_pct);
esp_mqtt_client_publish(mqtt_client, TOPIC_SOIL, val, 0, 1, 1);
ESP_LOGI(TAG, "Published: T=%.1f H=%.1f P=%.1f Soil=%d%%",
temp, hum, press, soil_pct);
} else {
buffer_reading(temp, hum, press, soil_pct);
}
} else {
ESP_LOGE(TAG, "BME280 read failed");
}
vTaskDelay(pdMS_TO_TICKS(PUBLISH_INTERVAL_MS));
}
}
void app_main(void)
{
nvs_flash_init();
esp_netif_init();
esp_event_loop_create_default();
/* Wi-Fi connect (use your wifi_connect() from Lesson 3) */
wifi_connect();
/* MQTT client setup */
extern const uint8_t server_cert_pem_start[] asm("_binary_ca_crt_start");
esp_mqtt_client_config_t mqtt_cfg = {
.broker = {
.address.uri = BROKER_URI,
.verification.certificate = (const char *)server_cert_pem_start,
},
.credentials = {
.username = MQTT_USERNAME,
.authentication.password = MQTT_PASSWORD,
.client_id = "esp32-greenhouse",
},
.session = {
.last_will = {
.topic = TOPIC_STATUS,
.msg = "offline",
.msg_len = 7,
.qos = 1,
.retain = 1,
},
.keepalive = 30,
},
};
mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID,
mqtt_event_handler, NULL);
esp_mqtt_client_start(mqtt_client);
xTaskCreate(sensor_publish_task, "sensor_pub", 4096, NULL, 5, NULL);
}

Key patterns from Lesson 3:

  • Exponential backoff on disconnect (1s, 2s, 4s, … up to 60s)
  • Offline ring buffer so readings are not lost during network outages
  • Buffer flush on reconnection, with a small delay between publishes to avoid flooding
  • Last will message set to “offline” so the broker notifies all subscribers if the node vanishes
  • Retained messages on all sensor topics so new subscribers get immediate data

Pico W Node (Light + PIR Motion)

The Pico W reads ambient light from an LDR on ADC0 and detects motion from a PIR sensor on GPIO 15. Light level is published every 30 seconds. Motion events are published on change only (edge triggered), reducing unnecessary traffic.

pico_node/main.py
import network
import time
import json
from machine import Pin, ADC
from umqtt.simple import MQTTClient
# Configuration
WIFI_SSID = "your_ssid"
WIFI_PASS = "your_password"
BROKER_IP = "192.168.1.100"
BROKER_PORT = 8883
MQTT_USER = "pico-node"
MQTT_PASS = "your_password_here"
CLIENT_ID = "pico-greenhouse"
PUBLISH_INTERVAL = 30 # seconds
# Topics
TOPIC_LIGHT = b"greenhouse/pico-node/light"
TOPIC_MOTION = b"greenhouse/pico-node/motion"
TOPIC_STATUS = b"greenhouse/pico-node/status"
TOPIC_CMD = b"greenhouse/cmd/pico-node"
# Hardware
ldr = ADC(Pin(26)) # ADC0, GP26
pir = Pin(15, Pin.IN) # PIR sensor on GP15
led = Pin("LED", Pin.OUT) # Onboard LED for status
# State
last_motion = 0
backoff_s = 1
MAX_BACKOFF = 60
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Connecting to Wi-Fi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
timeout = 20
while not wlan.isconnected() and timeout > 0:
time.sleep(1)
timeout -= 1
if wlan.isconnected():
print("Wi-Fi connected:", wlan.ifconfig())
led.on()
return True
else:
print("Wi-Fi connection failed")
led.off()
return False
def read_light_lux():
"""Read LDR and convert ADC value to approximate lux."""
raw = ldr.read_u16() # 0-65535
# Simple linear mapping (calibrate for your LDR + resistor divider)
lux = int(raw * 1000 / 65535)
return lux
def on_message(topic, msg):
"""Handle incoming commands."""
print("Command received:", topic, msg)
try:
cmd = json.loads(msg)
if "interval" in cmd:
global PUBLISH_INTERVAL
PUBLISH_INTERVAL = int(cmd["interval"])
print("Publish interval changed to", PUBLISH_INTERVAL)
except Exception as e:
print("Command parse error:", e)
def mqtt_connect():
"""Connect to MQTT broker with TLS."""
global backoff_s
try:
client = MQTTClient(
CLIENT_ID,
BROKER_IP,
port=BROKER_PORT,
user=MQTT_USER,
password=MQTT_PASS,
keepalive=30,
ssl=True,
)
client.set_last_will(TOPIC_STATUS, b"offline", retain=True, qos=1)
client.set_callback(on_message)
client.connect()
client.subscribe(TOPIC_CMD, 1)
client.publish(TOPIC_STATUS, b"online", retain=True, qos=1)
print("MQTT connected")
backoff_s = 1 # Reset backoff
return client
except Exception as e:
print("MQTT connect failed:", e, "backoff:", backoff_s)
time.sleep(backoff_s)
if backoff_s < MAX_BACKOFF:
backoff_s *= 2
return None
def main():
global last_motion
if not connect_wifi():
return
client = None
last_publish = 0
while True:
# Reconnect if needed
if client is None:
client = mqtt_connect()
if client is None:
continue
try:
# Check for incoming messages (non-blocking)
client.check_msg()
# Publish light level periodically
now = time.time()
if now - last_publish >= PUBLISH_INTERVAL:
lux = read_light_lux()
client.publish(TOPIC_LIGHT, str(lux).encode(), retain=True, qos=1)
print("Published light:", lux, "lux")
last_publish = now
# Publish motion on change (edge detection)
current_motion = pir.value()
if current_motion != last_motion:
client.publish(TOPIC_MOTION, str(current_motion).encode(),
retain=False, qos=1)
print("Motion event:", "detected" if current_motion else "clear")
last_motion = current_motion
time.sleep(0.1) # Small poll interval for responsive motion detection
except Exception as e:
print("MQTT error:", e)
client = None # Force reconnect on next loop
main()

STM32 Node (Air Quality via ESP-01)

The STM32 reads an MQ-135 air quality sensor on its ADC and sends MQTT publishes through an ESP-01 module connected over UART. This uses AT commands to control the ESP-01 (the same pattern from the STM32 course). The firmware is written for STM32 HAL.

stm32_node/Core/Src/main.c (key sections)
/* ── Includes and defines ── */
#include "main.h"
#include <stdio.h>
#include <string.h>
#define MQ135_ADC_CHANNEL ADC_CHANNEL_0 /* PA0 */
#define PUBLISH_INTERVAL_MS 30000
#define UART_TIMEOUT 5000
/* MQTT topics */
static const char *TOPIC_AQ = "greenhouse/stm32-node/air_quality";
static const char *TOPIC_STATUS = "greenhouse/stm32-node/status";
extern UART_HandleTypeDef huart2; /* ESP-01 on USART2 */
extern ADC_HandleTypeDef hadc1;
/* ── ESP-01 AT command helpers ── */
static int esp_send_cmd(const char *cmd, const char *expected,
uint32_t timeout_ms)
{
char rx_buf[256] = {0};
HAL_UART_Transmit(&huart2, (uint8_t *)cmd, strlen(cmd), 1000);
HAL_UART_Transmit(&huart2, (uint8_t *)"\r\n", 2, 100);
HAL_UART_Receive(&huart2, (uint8_t *)rx_buf, sizeof(rx_buf) - 1,
timeout_ms);
if (strstr(rx_buf, expected) != NULL) {
return 1; /* Success */
}
return 0; /* Failed */
}
static int esp_connect_wifi(const char *ssid, const char *pass)
{
char cmd[128];
esp_send_cmd("AT+CWMODE=1", "OK", 2000);
snprintf(cmd, sizeof(cmd), "AT+CWJAP=\"%s\",\"%s\"", ssid, pass);
return esp_send_cmd(cmd, "CONNECTED", 15000);
}
static int esp_mqtt_connect(const char *broker, uint16_t port,
const char *client_id,
const char *user, const char *pass)
{
char cmd[256];
snprintf(cmd, sizeof(cmd),
"AT+MQTTUSERCFG=0,1,\"%s\",\"%s\",\"%s\",0,0,\"\"",
client_id, user, pass);
esp_send_cmd(cmd, "OK", 2000);
/* Set last will */
snprintf(cmd, sizeof(cmd),
"AT+MQTTCONNCFG=0,30,0,\"%s\",\"offline\",1,1",
TOPIC_STATUS);
esp_send_cmd(cmd, "OK", 2000);
snprintf(cmd, sizeof(cmd), "AT+MQTTCONN=0,\"%s\",%d,1",
broker, port);
return esp_send_cmd(cmd, "OK", 10000);
}
static int esp_mqtt_publish(const char *topic, const char *payload,
int qos, int retain)
{
char cmd[256];
snprintf(cmd, sizeof(cmd), "AT+MQTTPUB=0,\"%s\",\"%s\",%d,%d",
topic, payload, qos, retain);
return esp_send_cmd(cmd, "OK", 3000);
}
/* ── Sensor reading ── */
static int read_air_quality(void)
{
HAL_ADC_Start(&hadc1);
HAL_ADC_PollForConversion(&hadc1, 100);
uint32_t raw = HAL_ADC_GetValue(&hadc1);
HAL_ADC_Stop(&hadc1);
/* Map 12-bit ADC (0-4095) to AQI (0-500) */
int aqi = (int)((float)raw * 500.0f / 4095.0f);
return aqi;
}
/* ── Main loop ── */
int main(void)
{
HAL_Init();
SystemClock_Config();
MX_GPIO_Init();
MX_USART2_UART_Init();
MX_ADC1_Init();
/* Wait for ESP-01 to boot */
HAL_Delay(3000);
/* Connect Wi-Fi */
while (!esp_connect_wifi("your_ssid", "your_password")) {
HAL_Delay(5000);
}
/* Connect MQTT */
int connected = 0;
int backoff = 1000;
while (!connected) {
connected = esp_mqtt_connect("192.168.1.100", 8883,
"stm32-greenhouse",
"stm32-node", "your_password");
if (!connected) {
HAL_Delay(backoff);
if (backoff < 60000) backoff *= 2;
}
}
/* Publish online status */
esp_mqtt_publish(TOPIC_STATUS, "online", 1, 1);
/* Main sensor loop */
while (1) {
int aqi = read_air_quality();
char payload[8];
snprintf(payload, sizeof(payload), "%d", aqi);
esp_mqtt_publish(TOPIC_AQ, payload, 1, 1);
HAL_Delay(PUBLISH_INTERVAL_MS);
}
}

Data Pipeline: Telegraf to InfluxDB



Telegraf subscribes to all greenhouse/# topics and writes each message into InfluxDB. This was covered in Lesson 4. Here is the full configuration for the production system.

telegraf/telegraf.conf
# Global agent configuration
[agent]
interval = "10s"
round_interval = true
flush_interval = "10s"
flush_jitter = "2s"
# MQTT consumer input
[[inputs.mqtt_consumer]]
servers = ["ssl://mosquitto:8883"]
topics = ["greenhouse/#"]
username = "telegraf"
password = "your_telegraf_password"
client_id = "telegraf-greenhouse"
qos = 1
persistent_session = true
# TLS configuration
tls_ca = "/etc/telegraf/certs/ca.crt"
insecure_skip_verify = false
# Parse topic to extract node and metric
# Topic format: greenhouse/<node>/<metric>
[[inputs.mqtt_consumer.topic_parsing]]
topic = "greenhouse/+/+"
measurement = "measurement/_/_"
tags = "_/node/metric"
# Data format: plain string values
data_format = "value"
data_type = "float"
# InfluxDB v2 output
[[outputs.influxdb_v2]]
urls = ["http://influxdb:8086"]
token = "your_influxdb_token"
organization = "greenhouse"
bucket = "sensor_data"
# Tag filtering
[outputs.influxdb_v2.tagpass]
node = ["esp32-node", "pico-node", "stm32-node"]

InfluxDB Bucket Configuration

Create bucket with 30-day retention
# Using the InfluxDB CLI inside the container
influx bucket create \
--name sensor_data \
--org greenhouse \
--retention 30d
# Verify the bucket
influx bucket list --org greenhouse

The 30-day retention policy means InfluxDB automatically deletes data older than 30 days. For long-term storage, you can create a second bucket with longer retention and a downsampling task that writes hourly averages to it.

Downsampling task (optional, for long-term storage)
option task = {name: "downsample_hourly", every: 1h}
from(bucket: "sensor_data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "mqtt_consumer")
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> to(bucket: "sensor_data_longterm", org: "greenhouse")

Grafana Dashboard



The Grafana dashboard has panels organized into rows: one row per sensor node, plus a system health row.

Dashboard JSON (Key Panels)

Rather than listing the entire JSON, here are the Flux queries for each major panel. Import these into Grafana by creating a new dashboard and adding panels with InfluxDB (Flux) as the data source.

Panel: Temperature (ESP32 Node)
from(bucket: "sensor_data")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.node == "esp32-node" and r.metric == "temperature")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "temperature")
Panel: Humidity (ESP32 Node)
from(bucket: "sensor_data")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.node == "esp32-node" and r.metric == "humidity")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "humidity")
Panel: Soil Moisture (ESP32 Node)
from(bucket: "sensor_data")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.node == "esp32-node" and r.metric == "soil_moisture")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "soil_moisture")
Panel: Light Level (Pico W Node)
from(bucket: "sensor_data")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.node == "pico-node" and r.metric == "light")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "light")
Panel: Air Quality (STM32 Node)
from(bucket: "sensor_data")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r.node == "stm32-node" and r.metric == "air_quality")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "air_quality")
Panel: Device Status (all nodes)
from(bucket: "sensor_data")
|> range(start: -5m)
|> filter(fn: (r) => r.metric == "status")
|> last()
|> group(columns: ["node"])
|> yield(name: "status")

Dashboard Layout

RowPanelTypeRefresh
ESP32 NodeTemperatureTime series10s
ESP32 NodeHumidityGauge (0-100%)10s
ESP32 NodePressureStat10s
ESP32 NodeSoil MoistureGauge (0-100%)10s
Pico W NodeLight LevelTime series10s
Pico W NodeMotion EventsState timeline10s
STM32 NodeAir Quality (AQI)Gauge (0-500)10s
System HealthDevice StatusStatus map10s
System HealthAlert HistoryTable30s
System HealthMessages/secTime series10s

REST API



The Flask API provides programmatic access to the system (Lesson 5 pattern). It queries InfluxDB for historical data and publishes commands to the broker for device control.

api/app.py
import os
import json
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, abort
from influxdb_client import InfluxDBClient
import paho.mqtt.client as mqtt
app = Flask(__name__)
# Configuration
INFLUX_URL = os.getenv("INFLUX_URL", "http://influxdb:8086")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN", "your_influxdb_token")
INFLUX_ORG = os.getenv("INFLUX_ORG", "greenhouse")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET", "sensor_data")
API_KEY = os.getenv("API_KEY", "your_api_key_here")
MQTT_BROKER = os.getenv("MQTT_BROKER", "mosquitto")
MQTT_PORT = int(os.getenv("MQTT_PORT", "8883"))
MQTT_USER = os.getenv("MQTT_USER", "api-server")
MQTT_PASS = os.getenv("MQTT_PASS", "your_password")
# InfluxDB client
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
query_api = influx_client.query_api()
# MQTT client for publishing commands
mqtt_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="flask-api")
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASS)
mqtt_client.tls_set(ca_certs="/app/certs/ca.crt")
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
mqtt_client.loop_start()
# Known devices
DEVICES = {
"esp32-node": {"name": "ESP32 Greenhouse", "sensors": ["temperature", "humidity", "pressure", "soil_moisture"]},
"pico-node": {"name": "Pico W Greenhouse", "sensors": ["light", "motion"]},
"stm32-node": {"name": "STM32 Greenhouse", "sensors": ["air_quality"]},
}
def require_api_key(f):
"""Simple API key authentication decorator."""
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
key = request.headers.get("X-API-Key")
if key != API_KEY:
abort(401, description="Invalid or missing API key")
return f(*args, **kwargs)
return decorated
@app.route("/api/devices", methods=["GET"])
@require_api_key
def list_devices():
"""List all registered devices and their sensors."""
return jsonify({"devices": DEVICES})
@app.route("/api/devices/<node_id>/data", methods=["GET"])
@require_api_key
def get_device_data(node_id):
"""Query historical data for a device.
Query params:
- metric: sensor metric name (required)
- range: time range, e.g., "1h", "24h", "7d" (default: "1h")
- aggregate: window period, e.g., "1m", "5m" (default: "1m")
"""
if node_id not in DEVICES:
abort(404, description=f"Device {node_id} not found")
metric = request.args.get("metric")
if not metric or metric not in DEVICES[node_id]["sensors"]:
abort(400, description=f"Invalid metric. Available: {DEVICES[node_id]['sensors']}")
time_range = request.args.get("range", "1h")
aggregate = request.args.get("aggregate", "1m")
query = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -{time_range})
|> filter(fn: (r) => r.node == "{node_id}" and r.metric == "{metric}")
|> aggregateWindow(every: {aggregate}, fn: mean, createEmpty: false)
'''
tables = query_api.query(query)
records = []
for table in tables:
for record in table.records:
records.append({
"time": record.get_time().isoformat(),
"value": record.get_value(),
})
return jsonify({
"device": node_id,
"metric": metric,
"range": time_range,
"aggregate": aggregate,
"count": len(records),
"data": records,
})
@app.route("/api/devices/<node_id>/command", methods=["POST"])
@require_api_key
def send_command(node_id):
"""Send a command to a device via MQTT.
Body (JSON):
- Any key-value pairs to send as command payload
- Example: {"interval": 60}
"""
if node_id not in DEVICES:
abort(404, description=f"Device {node_id} not found")
payload = request.get_json()
if not payload:
abort(400, description="Request body must be JSON")
topic = f"greenhouse/cmd/{node_id}"
result = mqtt_client.publish(topic, json.dumps(payload), qos=1)
if result.rc == 0:
return jsonify({"status": "sent", "topic": topic, "payload": payload})
else:
abort(500, description="Failed to publish command")
@app.route("/api/health", methods=["GET"])
def health_check():
"""Health check endpoint (no auth required)."""
return jsonify({"status": "ok", "timestamp": datetime.utcnow().isoformat()})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)

API Usage Examples

List devices
curl -H "X-API-Key: your_api_key_here" \
http://localhost:5000/api/devices
Query temperature data (last 6 hours, 5-minute averages)
curl -H "X-API-Key: your_api_key_here" \
"http://localhost:5000/api/devices/esp32-node/data?metric=temperature&range=6h&aggregate=5m"
Change publish interval on ESP32 node to 60 seconds
curl -X POST -H "X-API-Key: your_api_key_here" \
-H "Content-Type: application/json" \
-d '{"interval": 60}' \
http://localhost:5000/api/devices/esp32-node/command

Alerts and Automation



Node-RED handles threshold alerts and inactivity detection (Lesson 6). The alert flow subscribes to all greenhouse topics and evaluates rules on every message.

Threshold Alerts

ConditionSeverityAction
Temperature > 35 CCriticalSlack notification + Grafana annotation
Temperature < 5 CCriticalSlack notification + Grafana annotation
Humidity < 20%WarningSlack notification
Humidity > 90%WarningSlack notification
Soil moisture < 15%WarningSlack notification (“water the plants”)
Air quality (AQI) > 150CriticalSlack notification (“ventilation needed”)

Inactivity Alerts

If no data arrives from a node for 5 minutes, Node-RED fires an inactivity alert. This catches scenarios where a node crashes silently without the MQTT last will firing (for example, if the node loses Wi-Fi but the TCP connection has not yet timed out at the broker).

nodered/flows.json (threshold + inactivity alert flow)
[
{
"id": "mqtt_in_greenhouse",
"type": "mqtt in",
"topic": "greenhouse/#",
"broker": "mosquitto_broker",
"qos": "1",
"name": "Greenhouse MQTT"
},
{
"id": "parse_topic",
"type": "function",
"name": "Parse Topic and Value",
"func": "const parts = msg.topic.split('/');\nmsg.node = parts[1];\nmsg.metric = parts[2];\nmsg.value = parseFloat(msg.payload);\nmsg.timestamp = Date.now();\n\n// Store last-seen timestamp per node\nconst ctx = global.get('lastSeen') || {};\nctx[msg.node] = msg.timestamp;\nglobal.set('lastSeen', ctx);\n\nreturn msg;"
},
{
"id": "threshold_check",
"type": "switch",
"name": "Threshold Check",
"property": "metric",
"rules": [
{"t": "eq", "v": "temperature"},
{"t": "eq", "v": "humidity"},
{"t": "eq", "v": "soil_moisture"},
{"t": "eq", "v": "air_quality"}
],
"outputs": 4
},
{
"id": "temp_alert",
"type": "function",
"name": "Temperature Alert",
"func": "if (msg.value > 35) {\n msg.payload = {\n text: `CRITICAL: Temperature is ${msg.value}C on ${msg.node} (threshold: 35C). Check greenhouse ventilation.`\n };\n return msg;\n}\nif (msg.value < 5) {\n msg.payload = {\n text: `CRITICAL: Temperature is ${msg.value}C on ${msg.node} (threshold: 5C). Check greenhouse heating.`\n };\n return msg;\n}\nreturn null;"
},
{
"id": "humidity_alert",
"type": "function",
"name": "Humidity Alert",
"func": "if (msg.value < 20) {\n msg.payload = {\n text: `WARNING: Humidity is ${msg.value}% on ${msg.node} (threshold: <20%). Consider misting.`\n };\n return msg;\n}\nif (msg.value > 90) {\n msg.payload = {\n text: `WARNING: Humidity is ${msg.value}% on ${msg.node} (threshold: >90%). Check for condensation.`\n };\n return msg;\n}\nreturn null;"
},
{
"id": "soil_alert",
"type": "function",
"name": "Soil Moisture Alert",
"func": "if (msg.value < 15) {\n msg.payload = {\n text: `WARNING: Soil moisture is ${msg.value}% on ${msg.node}. Water the plants.`\n };\n return msg;\n}\nreturn null;"
},
{
"id": "aqi_alert",
"type": "function",
"name": "Air Quality Alert",
"func": "if (msg.value > 150) {\n msg.payload = {\n text: `CRITICAL: Air quality index is ${msg.value} on ${msg.node} (threshold: 150). Open ventilation.`\n };\n return msg;\n}\nreturn null;"
},
{
"id": "slack_webhook",
"type": "http request",
"method": "POST",
"url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
"name": "Send to Slack"
},
{
"id": "inactivity_check",
"type": "inject",
"repeat": "60",
"name": "Check Every 60s"
},
{
"id": "inactivity_eval",
"type": "function",
"name": "Inactivity Detection",
"func": "const ctx = global.get('lastSeen') || {};\nconst now = Date.now();\nconst threshold = 5 * 60 * 1000; // 5 minutes\nconst nodes = ['esp32-node', 'pico-node', 'stm32-node'];\nconst alerts = [];\n\nfor (const node of nodes) {\n const lastSeen = ctx[node];\n if (!lastSeen || (now - lastSeen) > threshold) {\n alerts.push(node);\n }\n}\n\nif (alerts.length > 0) {\n msg.payload = {\n text: `INACTIVITY ALERT: No data received from ${alerts.join(', ')} for over 5 minutes. Check device connectivity.`\n };\n return msg;\n}\nreturn null;"
}
]

Alert Deduplication

To avoid alert storms (the same threshold firing every 30 seconds), add a rate limiter in each alert function. The pattern stores the last alert timestamp and suppresses duplicates within a cooldown window.

Rate limiter pattern (add to each alert function)
// At the top of each alert function, add:
const cooldown = 5 * 60 * 1000; // 5 minutes between repeated alerts
const key = `alert_${msg.node}_${msg.metric}`;
const lastAlert = flow.get(key) || 0;
const now = Date.now();
if (now - lastAlert < cooldown) {
return null; // Suppress duplicate
}
flow.set(key, now);
// ... rest of alert logic

Docker Compose: Full Stack



All backend services run in Docker containers. This makes deployment reproducible and portable. Podman (fully open source, rootless, no daemon, no paid tiers) works as an alternative, though some inter-container networking and health check features may need minor adjustments with podman-compose.

docker-compose.yml
version: "3.8"
services:
# MQTT broker with TLS
mosquitto:
image: eclipse-mosquitto:2
container_name: greenhouse-mosquitto
ports:
- "8883:8883"
volumes:
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- ./mosquitto/passwd:/mosquitto/config/passwd:ro
- ./mosquitto/acl:/mosquitto/config/acl:ro
- ./certs:/mosquitto/certs:ro
- mosquitto_data:/mosquitto/data
- mosquitto_log:/mosquitto/log
restart: unless-stopped
healthcheck:
test: ["CMD", "mosquitto_sub", "-t", "$$SYS/#", "-C", "1", "-u", "healthcheck", "-P", "healthcheck", "--cafile", "/mosquitto/certs/ca.crt", "-p", "8883"]
interval: 30s
timeout: 10s
retries: 3
# Telegraf: MQTT to InfluxDB
telegraf:
image: telegraf:1.30
container_name: greenhouse-telegraf
volumes:
- ./telegraf/telegraf.conf:/etc/telegraf/telegraf.conf:ro
- ./certs:/etc/telegraf/certs:ro
depends_on:
mosquitto:
condition: service_healthy
influxdb:
condition: service_healthy
restart: unless-stopped
# InfluxDB: time-series database
influxdb:
image: influxdb:2.7
container_name: greenhouse-influxdb
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: your_influxdb_password
DOCKER_INFLUXDB_INIT_ORG: greenhouse
DOCKER_INFLUXDB_INIT_BUCKET: sensor_data
DOCKER_INFLUXDB_INIT_RETENTION: 30d
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: your_influxdb_token
volumes:
- influxdb_data:/var/lib/influxdb2
restart: unless-stopped
healthcheck:
test: ["CMD", "influx", "ping"]
interval: 30s
timeout: 10s
retries: 3
# Grafana: dashboards
grafana:
image: grafana/grafana:10.4.0
container_name: greenhouse-grafana
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_USER: admin
GF_SECURITY_ADMIN_PASSWORD: your_grafana_password
GF_INSTALL_PLUGINS: grafana-clock-panel
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning:ro
depends_on:
influxdb:
condition: service_healthy
restart: unless-stopped
# Flask REST API
flask-api:
build: ./api
container_name: greenhouse-api
ports:
- "5000:5000"
environment:
INFLUX_URL: http://influxdb:8086
INFLUX_TOKEN: your_influxdb_token
INFLUX_ORG: greenhouse
INFLUX_BUCKET: sensor_data
MQTT_BROKER: mosquitto
MQTT_PORT: "8883"
MQTT_USER: api-server
MQTT_PASS: your_password
API_KEY: your_api_key_here
volumes:
- ./certs:/app/certs:ro
depends_on:
- mosquitto
- influxdb
restart: unless-stopped
# Node-RED: alerts and automation
nodered:
image: nodered/node-red:3.1
container_name: greenhouse-nodered
ports:
- "1880:1880"
volumes:
- nodered_data:/data
- ./nodered/flows.json:/data/flows.json:ro
- ./certs:/data/certs:ro
depends_on:
- mosquitto
restart: unless-stopped
volumes:
mosquitto_data:
mosquitto_log:
influxdb_data:
grafana_data:
nodered_data:

Flask API Dockerfile

api/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "--workers", "2", "app:app"]
api/requirements.txt
flask==3.0.0
gunicorn==21.2.0
influxdb-client==1.38.0
paho-mqtt>=2.0.0

Project File Structure



  • Directorygreenhouse-iot/
    • docker-compose.yml
    • Directorycerts/
      • ca.crt
      • ca.key
      • server.crt
      • server.key
      • siliconwit-ca.crt
    • Directorymosquitto/
      • mosquitto.conf
      • passwd
      • acl
    • Directorytelegraf/
      • telegraf.conf
    • Directorygrafana/
      • Directoryprovisioning/
        • Directorydatasources/
          • influxdb.yml
        • Directorydashboards/
          • greenhouse.json
    • Directorynodered/
      • flows.json
    • Directoryapi/
      • Dockerfile
      • requirements.txt
      • app.py
    • Directoryfirmware/
      • Directoryesp32_node/
        • Directorymain/
          • main.c
          • CMakeLists.txt
        • CMakeLists.txt
        • sdkconfig.defaults
      • Directorypico_node/
        • main.py
      • Directorystm32_node/
        • DirectoryCore/
          • DirectorySrc/
            • main.c

Deployment



  1. Generate TLS certificates. Use the certificate generation process from Lesson 7. Create a CA, then sign server and client certificates. Place all certificates in the certs/ directory.

    Terminal window
    # Generate CA
    openssl genrsa -out certs/ca.key 4096
    openssl req -new -x509 -days 3650 -key certs/ca.key \
    -out certs/ca.crt -subj "/CN=Greenhouse CA"
    # Generate server certificate
    openssl genrsa -out certs/server.key 2048
    openssl req -new -key certs/server.key \
    -out certs/server.csr -subj "/CN=mosquitto"
    openssl x509 -req -in certs/server.csr -CA certs/ca.crt \
    -CAkey certs/ca.key -CAcreateserial \
    -out certs/server.crt -days 365
  2. Generate Mosquitto passwords. Run the mosquitto_passwd commands shown earlier to create credentials for all six users (three sensor nodes, Telegraf, the API server, and Node-RED).

  3. Configure your SiliconWit.io bridge credentials. Log in to siliconwit.io, create a device, and copy the MQTT username and token into mosquitto.conf under the bridge section.

  4. Start the Docker stack.

    Terminal window
    docker compose up -d

    Wait for all containers to reach a healthy state:

    Terminal window
    docker compose ps
  5. Verify the broker is running.

    Terminal window
    mosquitto_sub -h localhost -p 8883 \
    --cafile certs/ca.crt \
    -u telegraf -P your_telegraf_password \
    -t "greenhouse/#" -v

    Leave this running in a terminal. You should see messages appear once sensor nodes connect.

  6. Flash the ESP32 node. Build and flash using ESP-IDF:

    Terminal window
    cd firmware/esp32_node
    idf.py set-target esp32
    idf.py build
    idf.py flash monitor

    Verify that temperature, humidity, pressure, and soil moisture messages appear on the mosquitto_sub terminal.

  7. Flash the Pico W node. Copy main.py to the Pico W using Thonny or mpremote:

    Terminal window
    mpremote cp firmware/pico_node/main.py :main.py
    mpremote reset

    Verify light and motion messages appear on the broker.

  8. Flash the STM32 node. Build using STM32CubeIDE or the command-line toolchain and flash via ST-Link:

    Terminal window
    st-flash write build/stm32_node.bin 0x08000000

    Verify air quality messages appear on the broker.

  9. Configure Grafana. Open http://localhost:3000 in your browser. Log in with admin credentials. Add InfluxDB as a data source (Flux query language, URL http://influxdb:8086, token and org as configured). Import or create the dashboard panels listed earlier.

  10. Configure Node-RED alerts. Open http://localhost:1880. The flow file is pre-loaded. Update the Slack webhook URL in the HTTP request node. Deploy the flow and verify alerts fire when thresholds are exceeded.

  11. Verify end-to-end data flow. Confirm that sensor data flows from each node through the broker, into InfluxDB via Telegraf, and appears on the Grafana dashboard. Confirm that the same data appears on your SiliconWit.io dashboard via the MQTT bridge.

  12. Run failure tests (see the next section).

Failure Testing



A production system must handle failures gracefully. Run each of these tests and verify the expected behavior.

Test 1: Unplug a Sensor Node

Power off the ESP32 node by disconnecting its USB cable.

Expected behavior:

  • The MQTT broker delivers the last will message (“offline”) on greenhouse/esp32-node/status within the keep-alive timeout (30 seconds).
  • Grafana’s device status panel shows the ESP32 node as offline.
  • After 5 minutes with no data, Node-RED’s inactivity check fires a Slack notification.
  • The Pico W and STM32 nodes continue publishing normally.

Recovery: Reconnect the ESP32 node. It should reconnect to the broker (using exponential backoff if Wi-Fi or the broker is temporarily unreachable), publish “online” to the status topic, flush any buffered readings, and resume normal 30-second publishes.

Test 2: Restart the Broker

Stop and restart the Mosquitto container:

Broker restart test
docker compose restart mosquitto

Expected behavior:

  • All three sensor nodes detect the TCP connection drop and enter their reconnection loops with exponential backoff.
  • The broker comes back up within a few seconds.
  • Nodes reconnect, re-publish their “online” status, and resume publishing sensor data.
  • Telegraf also reconnects and resumes writing to InfluxDB.
  • The Grafana dashboard shows a brief gap in data (the duration of the broker restart) but otherwise continues normally.
  • Any readings that occurred during the downtime are buffered on the ESP32 node and flushed on reconnection.

Test 3: Kill Power to the Broker Host

This simulates a complete infrastructure failure. Shut down the entire Docker host machine.

Expected behavior:

  • All local services (broker, InfluxDB, Grafana, Node-RED, API) go down.
  • The SiliconWit.io cloud still has all data up to the moment of failure, because the MQTT bridge was forwarding in real time.
  • You can log into siliconwit.io from any device and see the last known readings for all sensor nodes.
  • When the host comes back online, docker compose up -d starts all services. Sensor nodes reconnect, and data flow resumes.

This test demonstrates the value of cloud forwarding: even with a complete local outage, you retain visibility through SiliconWit.io.

Test 4: Network Partition

Disconnect the broker host from the internet (unplug the Ethernet cable or disable Wi-Fi on the host).

Expected behavior:

  • The MQTT bridge to SiliconWit.io drops, but the local broker continues operating normally.
  • Sensor nodes are on the same LAN, so they keep publishing to the local broker.
  • InfluxDB and Grafana continue receiving and displaying data.
  • When the internet connection is restored, the MQTT bridge reconnects to SiliconWit.io automatically.
  • Messages published during the outage are not retroactively forwarded (standard MQTT bridge behavior), but new messages resume immediately.

SiliconWit.io as the Cloud Layer



The MQTT bridge in mosquitto.conf forwards every message under greenhouse/# to SiliconWit.io. This is not a backup. It is a live mirror that provides several production benefits:

Remote monitoring. You can check greenhouse conditions from your phone or any browser by logging into siliconwit.io. No VPN, no port forwarding, no dynamic DNS.

Redundant visibility. If your local Grafana instance is unreachable (power outage, network issue, server crash), SiliconWit.io still has the most recent data. You know the last state of every sensor before the outage.

Alert forwarding. SiliconWit.io has its own alert system. You can configure cloud-side alerts as a second layer: if your local Node-RED fails to send a Slack notification (because the host is down), SiliconWit.io can still detect the anomaly and notify you.

Data sharing. If you need to share greenhouse data with a colleague, an agronomist, or a client, you can give them read access on SiliconWit.io without exposing your local network.

Historical comparison. SiliconWit.io retains data according to your plan. You can compare this week’s greenhouse conditions with last month’s, even if your local InfluxDB has already aged out older data.

The bridge configuration is straightforward: topic greenhouse/# out 1 means “forward all messages matching greenhouse/#, outbound only, with QoS 1.” The bridge uses TLS and authenticates with your SiliconWit.io credentials. If the connection drops, Mosquitto reconnects automatically.

Production Hardening



A system that works in the lab is not the same as a system that runs unattended for months. These are the hardening steps for a real deployment.

Log Rotation

Docker containers write logs that can fill a disk if unchecked. Configure log rotation in docker-compose.yml or in the Docker daemon configuration:

Add to each service in docker-compose.yml
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"

For Mosquitto’s own log file (written inside the container), add a logrotate configuration or limit the file size in mosquitto.conf:

mosquitto.conf (add)
log_type warning
log_type error
# Omit "log_type all" in production to reduce log volume

Backup Strategy

WhatHowFrequency
InfluxDB datainflux backup /backups/ or volume snapshotDaily
Grafana dashboardsExport JSON via API or backup /var/lib/grafanaWeekly
Mosquitto configVersion control (git)On every change
Node-RED flowsExport from UI or backup /data/flows.jsonOn every change
TLS certificatesSecure offline copyOn renewal
Automated InfluxDB backup script
#!/bin/bash
# backup-influxdb.sh
BACKUP_DIR="/backups/influxdb/$(date +%Y-%m-%d)"
mkdir -p "$BACKUP_DIR"
docker exec greenhouse-influxdb influx backup "$BACKUP_DIR" \
--token your_influxdb_token
# Keep only last 7 days of backups
find /backups/influxdb -maxdepth 1 -mtime +7 -type d -exec rm -rf {} \;

Monitoring the Monitor

Your monitoring system itself needs monitoring. If InfluxDB crashes or Grafana becomes unresponsive, who alerts you?

Approach 1: External uptime checks. Use an external service (UptimeRobot, Healthchecks.io, or a simple cron job on another machine) that pings your Flask API’s /api/health endpoint every minute. If it fails to respond, you receive an email or SMS.

Approach 2: SiliconWit.io as the watchdog. Since data is forwarded to SiliconWit.io, configure a SiliconWit.io inactivity alert: if no data arrives from the bridge for 10 minutes, something is wrong with your local infrastructure. This uses the cloud layer as a health check for the local stack.

Simple cron-based health check
# Add to crontab on a separate machine
# Checks every 5 minutes, sends email on failure
*/5 * * * * curl -sf http://greenhouse-host:5000/api/health > /dev/null || \
echo "Greenhouse API is down" | mail -s "ALERT: Greenhouse Down" [email protected]

Certificate Renewal

TLS certificates expire. Set a calendar reminder or a cron job to renew them before expiration. When you renew the broker certificate, restart Mosquitto and verify that all clients reconnect successfully.

Check certificate expiration
openssl x509 -in certs/server.crt -noout -enddate
# Output: notAfter=Mar 10 00:00:00 2027 GMT

Scaling Considerations



The system as described handles three sensor nodes comfortably. What changes at larger scales?

100 Nodes

At 100 nodes publishing every 30 seconds, the broker handles roughly 200 messages per minute. Mosquitto on a Raspberry Pi 4 can sustain this without difficulty. The main concern is InfluxDB write throughput and storage.

Changes needed:

  • Increase InfluxDB write-buffer-size in the configuration.
  • Use Telegraf’s batch_size and flush_interval to batch writes instead of writing every message individually.
  • Add a wildcard ACL pattern (user device-%c with topic write greenhouse/%c/#) so you do not need 100 individual ACL entries.
  • Automate device provisioning (Lesson 7) so you are not manually creating credentials for each node.

1,000 Nodes

At 1,000 nodes, the single Mosquitto instance may become a bottleneck. MQTT bridging between multiple brokers or switching to a clustered broker (EMQX, HiveMQ) becomes necessary.

Changes needed:

  • Deploy EMQX or HiveMQ as a clustered MQTT broker (3 nodes minimum for high availability).
  • Move InfluxDB to dedicated hardware or use InfluxDB Cloud.
  • Place Telegraf behind a load balancer or run multiple Telegraf instances, each subscribing to a subset of topics.
  • Add a message queue (Kafka, NATS) between the broker and InfluxDB for buffering during write spikes.
  • Implement device fleet management: firmware versioning, staged rollouts, health dashboards per device.
  • Grafana dashboards need variables and template queries instead of hardcoded device names.

Cost Considerations at Scale

Component3 Nodes100 Nodes1,000 Nodes
BrokerRaspberry Pi (free)Single VM ($10/mo)3-node cluster ($90/mo)
InfluxDBSame PiDedicated VM ($20/mo)InfluxDB Cloud ($100+/mo)
GrafanaSame PiSame VMGrafana Cloud ($50/mo)
SiliconWit.ioFree tierGrowth planEnterprise plan
Total infra~$0/mo~$30/mo~$240+/mo

These are rough estimates. The point is that the architecture you built in this capstone scales without a fundamental redesign. The same MQTT topic hierarchy, the same Telegraf pipeline, the same Grafana queries all work at higher scale with configuration changes rather than a rewrite.

What You Have Built



This capstone ties together every lesson in the IoT Systems course and draws on skills from the embedded programming series.

SkillWhere You Learned ItHow You Used It Here
Protocol selection (MQTT vs HTTP vs CoAP)Lesson 1Chose MQTT for all sensor-to-broker communication
Broker setup, TLS, ACLs, bridgingLesson 2Configured Mosquitto with TLS, per-device ACLs, SiliconWit.io bridge
Multi-MCU MQTT clientsLesson 3Wrote firmware for ESP32, Pico W, and STM32 nodes
Time-series storage and dashboardsLesson 4Deployed Telegraf, InfluxDB, and Grafana with production queries
REST API and device integrationLesson 5Built a Flask API for data access and command dispatch
Alerts and automationLesson 6Configured Node-RED with threshold and inactivity alerts via Slack
Device security and provisioningLesson 7Applied TLS on all connections, per-device credentials, ACLs
ESP32 peripherals (I2C, ADC)ESP32 CourseRead BME280 and soil moisture sensor
Pico W GPIO and ADCRPi Pico CourseRead LDR and PIR motion sensor
STM32 UART and AT commandsSTM32 CourseControlled ESP-01 for MQTT connectivity
RTOS task managementRTOS CourseFreeRTOS tasks for sensor reading and MQTT publishing
Containerized deploymentEmbedded Linux CourseDocker Compose for the full backend stack

You now have a complete, production-grade IoT monitoring system. It handles node failures, broker restarts, and network outages. It provides local dashboards for real-time visibility and cloud forwarding for remote access. It alerts you when something goes wrong and stores historical data for analysis.

This is not a demo. This is the pattern that real IoT deployments follow, whether the system monitors a greenhouse, a factory floor, a building’s HVAC, or a fleet of agricultural robots. The scale changes, the specific sensors change, but the architecture remains the same: nodes publish, brokers route, databases store, dashboards visualize, alerts notify, and the cloud provides reach.

Exercises



Exercise 1: Add OTA Firmware Updates

Add over-the-air firmware update capability to the ESP32 node. The node should check a firmware HTTP server on each boot (or at a configurable interval) for a newer firmware version. If a new version is available, it should download, verify the SHA-256 hash, and apply the update. Use the ESP-IDF OTA library from the ESP32 course (Lesson 7: OTA Updates and Secure Boot). Test by deploying a firmware update that changes the publish interval from 30 seconds to 15 seconds, and verify the change takes effect after the OTA completes.

Exercise 2: Add an Edge Gateway with Embedded Linux

Deploy a Raspberry Pi running a custom Yocto or Buildroot image (from Embedded Linux Lesson 9) as an edge gateway between the sensor nodes and the cloud. The edge gateway should run a local Mosquitto instance that the sensor nodes publish to over the LAN, perform local data aggregation (compute 5-minute averages), forward aggregated data to SiliconWit.io (reducing cloud bandwidth), cache raw data locally in SQLite for 7 days, and continue operating even when the internet is down. This exercise combines the Embedded Linux course’s custom image building with the IoT Systems course’s broker bridging.

Exercise 3: Multi-Channel Alert Escalation

Extend the alert system with escalation tiers. A warning alert (humidity low, soil dry) should send a Slack message. If the condition persists for 15 minutes without acknowledgment, escalate to an SMS via Twilio. If it persists for 30 minutes, escalate to a phone call. Implement the escalation logic in Node-RED using flow context to track alert state and timers. Add an acknowledgment endpoint to the Flask API so that a user can acknowledge an alert and stop escalation.

Exercise 4: Grafana Provisioning and Infrastructure as Code

Replace the manual Grafana dashboard setup with fully automated provisioning. Create a Grafana provisioning directory with a datasource YAML that configures InfluxDB automatically, and a dashboard JSON that defines all panels programmatically. When docker compose up runs for the first time, Grafana should start with the complete dashboard already configured, no manual steps required. Export your current dashboard as JSON, parameterize it with Grafana variables for the node names, and test by tearing down the stack (docker compose down -v) and bringing it back up.

Comments

Loading comments...


© 2021-2026 SiliconWit®. All rights reserved.