Skip to content

Alerts, Automation, and Rule Engines

Alerts, Automation, and Rule Engines hero image
Modified:
Published:

A dashboard full of charts is useless if nobody is watching it at 3 AM when a freezer compressor fails. Alerts turn passive data into active notifications that reach the right person on the right channel at the right time. In this lesson you will build a Python alert engine that evaluates rules against live MQTT data, wire up five different notification channels (email, SMS, Discord, Slack, Telegram), create visual automation flows with Node-RED, and compare all of these with the managed alert system on SiliconWit.io. #Alerts #Automation #NodeRED

Why Alerting Matters

Data without alerts is just noise. Consider a greenhouse with temperature and humidity sensors publishing to your MQTT broker every 10 seconds. The Grafana dashboard from Lesson 4 shows beautiful charts, but those charts only help if someone is actively looking at them. The moment you walk away from the screen, your monitoring system is blind.

Node-RED Alert Flow
──────────────────────────────────────────
┌──────────┐ ┌──────────┐ ┌─────────┐
│ MQTT In │──►│ Function │──►│ Switch │
│ (sensor/ │ │ (parse │ │ (temp │
│ temp) │ │ JSON) │ │ > 35?) │
└──────────┘ └──────────┘ └────┬────┘
┌──────────┼──────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌───────┐
│ Slack │ │ Email │ │ MQTT │
│ notify │ │ alert │ │ Out │
└────────┘ └────────┘ │(actua-│
│ tor) │
└───────┘

An alert system closes that gap. It continuously evaluates incoming data against a set of rules and takes action when a condition is met: send a message, trigger an actuator, log an incident, or escalate to a second person if the first does not acknowledge within a time window.

What a good alerting system provides:

CapabilityWhy It Matters
Real-time evaluationCatches problems within seconds, not hours
Multiple notification channelsReaches people wherever they are (phone, email, chat)
Cooldown periodsPrevents alert spam during sustained events
EscalationNotifies a backup person if the primary does not respond
Active hoursSuppresses non-critical alerts during off-hours
Audit trailLogs every alert for post-incident review
Alert Evaluation Flow
──────────────────────────────────────────
MQTT message arrives
Parse JSON payload (temp, humidity, ts)
┌──────────────┐
│ temp > 35C? │──YES──► Fire threshold alert
└──────┬───────┘ │
│ NO ├──► Slack message
▼ ├──► Email
┌──────────────┐ └──► Log to DB
│ In cooldown? │
└──────┬───────┘
│ NO
┌──────────────┐
│ Last msg │
│ > 5 min ago? │──YES──► Fire status alert
└──────────────┘ "Device offline"

Alert Types



Not every alert is a simple threshold comparison. A robust IoT system needs several alert types to cover different failure modes.

Threshold Alerts

The most common type. Fire when a value crosses a boundary. Examples: temperature exceeds 35 C, battery voltage drops below 3.2 V, humidity exceeds 80%. Can be upper, lower, or both (band alert).

Status Alerts

Fire when a device changes state. The most important one: “device offline.” If your broker has not received a message from a device in 5 minutes, something is wrong. Also covers: device reconnected, firmware version mismatch, unexpected reboot detected via uptime counter reset.

Rate-of-Change Alerts

Fire when a value changes too quickly. A pressure sensor dropping 10 kPa in 30 seconds may indicate a pipe burst even though the absolute value is still within range. Rate alerts catch fast-moving failures that threshold alerts miss.

Inactivity Alerts

Fire when expected data stops arriving. Different from “device offline” because the device might still be connected but a specific sensor stopped publishing. Example: a soil moisture sensor that publishes every 60 seconds has not sent data in 10 minutes.

Building a Python Alert Engine



We will build an alert engine that subscribes to MQTT topics, evaluates rules defined in a JSON configuration file, tracks sustained conditions, enforces cooldown periods, and dispatches notifications through multiple channels.

Architecture Overview

┌─────────────┐ ┌──────────────┐ ┌───────────────┐
│ MQTT Broker │────▶│ Alert Engine │────▶│ Notifications │
│ (Mosquitto) │ │ (Python) │ │ Email/SMS/ │
└─────────────┘ │ │ │ Discord/Slack/│
│ Rules JSON │ │ Telegram │
└──────────────┘ └───────────────┘

Rule Configuration Format

Each rule is a JSON object that specifies what to watch, what condition triggers the alert, how long the condition must persist, the cooldown period, and what actions to take.

rules.json
{
"rules": [
{
"id": "high-temp-greenhouse",
"name": "Greenhouse High Temperature",
"topic": "sensors/greenhouse/temperature",
"field": "temperature",
"operator": ">",
"threshold": 35.0,
"sustained_seconds": 30,
"cooldown_seconds": 300,
"severity": "critical",
"actions": ["email", "slack", "telegram"]
},
{
"id": "low-battery",
"name": "Low Battery Voltage",
"topic": "sensors/+/battery",
"field": "voltage",
"operator": "<",
"threshold": 3.2,
"sustained_seconds": 60,
"cooldown_seconds": 600,
"severity": "warning",
"actions": ["email"]
},
{
"id": "pressure-drop",
"name": "Rapid Pressure Drop",
"topic": "sensors/compressor/pressure",
"field": "pressure",
"operator": "rate_below",
"threshold": -10.0,
"rate_window_seconds": 30,
"sustained_seconds": 0,
"cooldown_seconds": 120,
"severity": "critical",
"actions": ["email", "sms", "discord"]
},
{
"id": "device-inactive",
"name": "Device Inactivity",
"topic": "sensors/greenhouse/#",
"field": null,
"operator": "inactivity",
"threshold": 600,
"sustained_seconds": 0,
"cooldown_seconds": 1800,
"severity": "warning",
"actions": ["email", "slack"]
}
]
}

Rule fields explained:

FieldDescription
idUnique identifier for the rule
topicMQTT topic to subscribe to (supports wildcards)
fieldJSON key to extract from the payload (null for inactivity)
operatorComparison: >, <, >=, <=, ==, !=, rate_below, rate_above, inactivity
thresholdThe value to compare against
sustained_secondsHow long the condition must persist before firing (0 = instant)
cooldown_secondsMinimum time between repeated alerts for the same rule
severityinfo, warning, critical
actionsList of notification channels to use

The Alert Engine

alert_engine.py
import json
import time
import threading
from collections import defaultdict
from datetime import datetime, timezone
import paho.mqtt.client as mqtt
# --- Configuration ---
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_USERNAME = "alert-engine"
MQTT_PASSWORD = "secure-password"
RULES_FILE = "rules.json"
# --- Load rules ---
def load_rules(path):
with open(path, "r") as f:
data = json.load(f)
return data["rules"]
# --- State tracking ---
class AlertState:
"""Tracks the state of a single alert rule."""
def __init__(self, rule):
self.rule = rule
self.condition_start = None # When the condition first became true
self.last_alert_time = 0 # Timestamp of the last alert sent
self.last_value = None
self.last_message_time = 0 # For inactivity tracking
self.value_history = [] # For rate-of-change tracking
def update_value(self, value, timestamp):
"""Record a new value and its timestamp."""
self.last_value = value
self.last_message_time = timestamp
# Keep a rolling window for rate-of-change calculations
window = self.rule.get("rate_window_seconds", 60)
self.value_history.append((timestamp, value))
cutoff = timestamp - window
self.value_history = [
(t, v) for t, v in self.value_history if t >= cutoff
]
def compute_rate(self):
"""Compute the rate of change over the configured window."""
if len(self.value_history) < 2:
return 0.0
oldest_t, oldest_v = self.value_history[0]
newest_t, newest_v = self.value_history[-1]
dt = newest_t - oldest_t
if dt == 0:
return 0.0
return (newest_v - oldest_v) / dt
def check_condition(self, now):
"""
Evaluate the rule condition. Returns True if the alert
should fire, False otherwise.
"""
rule = self.rule
op = rule["operator"]
# --- Inactivity check ---
if op == "inactivity":
if self.last_message_time == 0:
return False # Never received a message yet
silence = now - self.last_message_time
return silence >= rule["threshold"]
# --- Rate-of-change checks ---
if op in ("rate_below", "rate_above"):
rate = self.compute_rate()
if op == "rate_below":
triggered = rate < rule["threshold"]
else:
triggered = rate > rule["threshold"]
else:
# --- Standard threshold checks ---
if self.last_value is None:
return False
v = self.last_value
t = rule["threshold"]
if op == ">":
triggered = v > t
elif op == "<":
triggered = v < t
elif op == ">=":
triggered = v >= t
elif op == "<=":
triggered = v <= t
elif op == "==":
triggered = v == t
elif op == "!=":
triggered = v != t
else:
triggered = False
# --- Sustained condition check ---
sustained = rule.get("sustained_seconds", 0)
if triggered:
if self.condition_start is None:
self.condition_start = now
elapsed = now - self.condition_start
if elapsed >= sustained:
return True
return False
else:
# Condition cleared, reset the timer
self.condition_start = None
return False
def should_fire(self, now):
"""
Returns True if the condition is met AND the cooldown
period has elapsed since the last alert.
"""
if not self.check_condition(now):
return False
cooldown = self.rule.get("cooldown_seconds", 0)
if (now - self.last_alert_time) < cooldown:
return False
return True
def mark_fired(self, now):
"""Record that an alert was sent."""
self.last_alert_time = now
self.condition_start = None # Reset sustained timer

Processing MQTT Messages

alert_engine.py (continued)
class AlertEngine:
"""Main engine: subscribes to MQTT, evaluates rules, dispatches alerts."""
def __init__(self, rules_file):
self.rules = load_rules(rules_file)
self.states = {}
self.notifiers = {}
self.lock = threading.Lock()
# Create state objects for each rule
for rule in self.rules:
self.states[rule["id"]] = AlertState(rule)
# Set up MQTT client
self.client = mqtt.Client(
mqtt.CallbackAPIVersion.VERSION2,
client_id="alert-engine",
protocol=mqtt.MQTTv5
)
self.client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
def register_notifier(self, name, notifier_func):
"""Register a notification channel by name."""
self.notifiers[name] = notifier_func
def _on_connect(self, client, userdata, flags, reason_code, properties):
print(f"[{datetime.now().isoformat()}] Connected to broker (rc={reason_code})")
# Subscribe to all topics referenced by rules
topics = set(rule["topic"] for rule in self.rules)
for topic in topics:
client.subscribe(topic, qos=1)
print(f" Subscribed to: {topic}")
def _on_message(self, client, userdata, msg):
now = time.time()
topic = msg.topic
# Parse JSON payload
try:
payload = json.loads(msg.payload.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return
with self.lock:
for rule_id, state in self.states.items():
rule = state.rule
# Check if this message matches the rule topic
if not self._topic_matches(rule["topic"], topic):
continue
# Extract the field value
field = rule.get("field")
if field and field in payload:
value = float(payload[field])
state.update_value(value, now)
elif rule["operator"] == "inactivity":
# Any message on this topic resets inactivity
state.last_message_time = now
else:
continue
# Check if the alert should fire
if state.should_fire(now):
self._dispatch(state, now)
state.mark_fired(now)
def _topic_matches(self, pattern, topic):
"""Simple MQTT topic matching with + and # wildcards."""
pattern_parts = pattern.split("/")
topic_parts = topic.split("/")
for i, p in enumerate(pattern_parts):
if p == "#":
return True
if i >= len(topic_parts):
return False
if p == "+":
continue
if p != topic_parts[i]:
return False
return len(pattern_parts) == len(topic_parts)
def _dispatch(self, state, now):
"""Send notifications through all channels configured for this rule."""
rule = state.rule
alert_data = {
"rule_id": rule["id"],
"rule_name": rule["name"],
"severity": rule["severity"],
"value": state.last_value,
"threshold": rule["threshold"],
"operator": rule["operator"],
"timestamp": datetime.fromtimestamp(
now, tz=timezone.utc
).isoformat(),
}
message = (
f"[{alert_data['severity'].upper()}] "
f"{alert_data['rule_name']}: "
f"value={alert_data['value']}, "
f"threshold={alert_data['operator']}{alert_data['threshold']}"
)
print(f"[{alert_data['timestamp']}] ALERT: {message}")
for action in rule.get("actions", []):
notifier = self.notifiers.get(action)
if notifier:
try:
notifier(alert_data, message)
except Exception as e:
print(f" Notifier '{action}' failed: {e}")
def start(self):
"""Connect to broker and start processing."""
self.client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
# Start inactivity checker in a background thread
inactivity_thread = threading.Thread(
target=self._inactivity_loop, daemon=True
)
inactivity_thread.start()
# Blocking loop
print("Alert engine running. Press Ctrl+C to stop.")
self.client.loop_forever()
def _inactivity_loop(self):
"""Periodically check inactivity rules."""
while True:
time.sleep(30)
now = time.time()
with self.lock:
for rule_id, state in self.states.items():
if state.rule["operator"] != "inactivity":
continue
if state.should_fire(now):
self._dispatch(state, now)
state.mark_fired(now)

Notification Channels



Now we implement the functions that actually deliver notifications. Each function takes the same two arguments: alert_data (a dict with all alert details) and message (a pre-formatted string).

Email via SMTP

notifiers/email_notifier.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# Configuration (use environment variables in production)
SMTP_HOST = "smtp.gmail.com"
SMTP_PORT = 587
SMTP_USER = "[email protected]"
SMTP_PASSWORD = "app-specific-password"
RECIPIENT = "[email protected]"
def send_email(alert_data, message):
"""Send an alert email via SMTP."""
severity = alert_data["severity"].upper()
subject = f"[IoT {severity}] {alert_data['rule_name']}"
# Build HTML body
html = f"""
<html>
<body>
<h2 style="color: {'red' if severity == 'CRITICAL' else 'orange'};">
{severity}: {alert_data['rule_name']}
</h2>
<table>
<tr><td><b>Rule ID:</b></td><td>{alert_data['rule_id']}</td></tr>
<tr><td><b>Value:</b></td><td>{alert_data['value']}</td></tr>
<tr><td><b>Threshold:</b></td>
<td>{alert_data['operator']}{alert_data['threshold']}</td></tr>
<tr><td><b>Time:</b></td><td>{alert_data['timestamp']}</td></tr>
</table>
</body>
</html>
"""
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = SMTP_USER
msg["To"] = RECIPIENT
msg.attach(MIMEText(message, "plain"))
msg.attach(MIMEText(html, "html"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_USER, RECIPIENT, msg.as_string())
print(f" Email sent to {RECIPIENT}")

SMS via Twilio

notifiers/sms_notifier.py
import requests
# Twilio credentials (use environment variables in production)
TWILIO_ACCOUNT_SID = "ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
TWILIO_AUTH_TOKEN = "your_auth_token"
TWILIO_FROM_NUMBER = "+1234567890"
RECIPIENT_NUMBER = "+0987654321"
def send_sms(alert_data, message):
"""Send an SMS alert via the Twilio API."""
url = (
f"https://api.twilio.com/2010-04-01/"
f"Accounts/{TWILIO_ACCOUNT_SID}/Messages.json"
)
body = (
f"IoT Alert [{alert_data['severity'].upper()}]\n"
f"{alert_data['rule_name']}\n"
f"Value: {alert_data['value']}\n"
f"Threshold: {alert_data['operator']}{alert_data['threshold']}"
)
resp = requests.post(
url,
data={
"From": TWILIO_FROM_NUMBER,
"To": RECIPIENT_NUMBER,
"Body": body,
},
auth=(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN),
)
if resp.status_code == 201:
print(f" SMS sent to {RECIPIENT_NUMBER}")
else:
print(f" SMS failed: {resp.status_code} {resp.text}")

Discord Webhook

notifiers/discord_notifier.py
import requests
# Create a webhook in Discord: Server Settings -> Integrations -> Webhooks
DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/YOUR_WEBHOOK_ID/YOUR_WEBHOOK_TOKEN"
def send_discord(alert_data, message):
"""Send an alert to a Discord channel via webhook."""
severity = alert_data["severity"].upper()
# Color codes: red for critical, orange for warning, blue for info
color_map = {"CRITICAL": 0xFF0000, "WARNING": 0xFF8C00, "INFO": 0x0099FF}
color = color_map.get(severity, 0x808080)
payload = {
"embeds": [
{
"title": f"{severity}: {alert_data['rule_name']}",
"color": color,
"fields": [
{"name": "Value", "value": str(alert_data["value"]), "inline": True},
{"name": "Threshold", "value": f"{alert_data['operator']}{alert_data['threshold']}", "inline": True},
{"name": "Rule ID", "value": alert_data["rule_id"], "inline": True},
{"name": "Time", "value": alert_data["timestamp"], "inline": False},
],
}
]
}
resp = requests.post(DISCORD_WEBHOOK_URL, json=payload)
if resp.status_code == 204:
print(" Discord notification sent")
else:
print(f" Discord failed: {resp.status_code} {resp.text}")

Slack Webhook

notifiers/slack_notifier.py
import os
import requests
# Create an Incoming Webhook in your Slack workspace:
# https://api.slack.com/messaging/webhooks
SLACK_WEBHOOK_URL = os.getenv("SLACK_WEBHOOK_URL", "")
def send_slack(alert_data, message):
"""Send an alert to a Slack channel via Incoming Webhook."""
severity = alert_data["severity"].upper()
# Slack Block Kit message
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{'🔴' if severity == 'CRITICAL' else '🟡'} IoT Alert: {alert_data['rule_name']}",
},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": f"*Severity:*\n{severity}"},
{"type": "mrkdwn", "text": f"*Value:*\n{alert_data['value']}"},
{"type": "mrkdwn", "text": f"*Threshold:*\n{alert_data['operator']}{alert_data['threshold']}"},
{"type": "mrkdwn", "text": f"*Time:*\n{alert_data['timestamp']}"},
],
},
{"type": "divider"},
{
"type": "context",
"elements": [
{"type": "mrkdwn", "text": f"Rule ID: `{alert_data['rule_id']}`"},
],
},
]
}
resp = requests.post(SLACK_WEBHOOK_URL, json=payload)
if resp.status_code == 200:
print(" Slack notification sent")
else:
print(f" Slack failed: {resp.status_code} {resp.text}")

Telegram Bot

notifiers/telegram_notifier.py
import requests
# 1. Message @BotFather on Telegram to create a bot and get a token
# 2. Send a message to your bot, then fetch the chat_id from:
# https://api.telegram.org/bot<TOKEN>/getUpdates
TELEGRAM_BOT_TOKEN = "1234567890:ABCdefGHIjklMNOpqrsTUVwxyz"
TELEGRAM_CHAT_ID = "987654321"
def send_telegram(alert_data, message):
"""Send an alert via Telegram Bot API."""
severity = alert_data["severity"].upper()
text = (
f"<b>{'🔴' if severity == 'CRITICAL' else '🟡'} IoT Alert</b>\n\n"
f"<b>Rule:</b> {alert_data['rule_name']}\n"
f"<b>Severity:</b> {severity}\n"
f"<b>Value:</b> {alert_data['value']}\n"
f"<b>Threshold:</b> {alert_data['operator']}{alert_data['threshold']}\n"
f"<b>Time:</b> {alert_data['timestamp']}\n"
f"<b>Rule ID:</b> <code>{alert_data['rule_id']}</code>"
)
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
resp = requests.post(
url,
json={
"chat_id": TELEGRAM_CHAT_ID,
"text": text,
"parse_mode": "HTML",
},
)
if resp.status_code == 200:
print(" Telegram notification sent")
else:
print(f" Telegram failed: {resp.status_code} {resp.text}")

Wiring It All Together

main.py
from alert_engine import AlertEngine
from notifiers.email_notifier import send_email
from notifiers.sms_notifier import send_sms
from notifiers.discord_notifier import send_discord
from notifiers.slack_notifier import send_slack
from notifiers.telegram_notifier import send_telegram
engine = AlertEngine("rules.json")
# Register all notification channels
engine.register_notifier("email", send_email)
engine.register_notifier("sms", send_sms)
engine.register_notifier("discord", send_discord)
engine.register_notifier("slack", send_slack)
engine.register_notifier("telegram", send_telegram)
# Start the engine (blocks forever)
engine.start()

Project File Structure

  • Directoryalert-system/
    • main.py
    • alert_engine.py
    • rules.json
    • Directorynotifiers/
      • __init__.py
      • email_notifier.py
      • sms_notifier.py
      • discord_notifier.py
      • slack_notifier.py
      • telegram_notifier.py
    • requirements.txt
requirements.txt
paho-mqtt>=2.0.0
requests>=2.31.0

Testing the Alert Engine

You do not need real sensors to test. Open a second terminal and publish test messages with mosquitto_pub:

Test: trigger a high temperature alert
# Publish a temperature above the 35 C threshold
mosquitto_pub -h localhost -u test -P testpass \
-t "sensors/greenhouse/temperature" \
-m '{"temperature": 37.5, "humidity": 65.2}'
Test: sustained condition (run in a loop)
# Publish every 5 seconds for 60 seconds to trigger sustained alerts
for i in $(seq 1 12); do
mosquitto_pub -h localhost -u test -P testpass \
-t "sensors/greenhouse/temperature" \
-m "{\"temperature\": 38.$(($RANDOM % 10)), \"humidity\": 70.1}"
sleep 5
done

You should see the alert engine print the alert and dispatch notifications after the sustained period elapses.

SiliconWit.io Alert System



If you do not want to run your own alert engine, SiliconWit.io provides a fully managed alert system. It supports all the alert types we discussed (threshold, status, geofence, inactivity) and delivers notifications via email, SMS, and webhooks. It also includes escalation chains, cooldown periods, and active-hours scheduling out of the box.

Setting Up a Threshold Alert on SiliconWit.io

  1. Log in to your SiliconWit.io dashboard and navigate to the device that is publishing sensor data.

  2. Go to Alerts. Click the Alerts tab in the left sidebar, then click Create Alert.

  3. Configure the trigger. Select the data stream (e.g., temperature), set the operator to “greater than,” and set the threshold to 35. Set the sustained duration to 30 seconds.

  4. Set notification channels. Check the boxes for email and/or webhook. For webhooks, paste the URL of your Slack, Discord, or custom endpoint. SiliconWit.io will POST a JSON payload whenever the alert fires.

  5. Configure cooldown and active hours. Set cooldown to 5 minutes so you do not get repeated notifications. Optionally restrict the alert to business hours if it is non-critical.

  6. Save and test. Click Save. Publish a test value above the threshold from your device or with mosquitto_pub. You should receive a notification within seconds.

When to use SiliconWit.io alerts vs. a custom engine:

ScenarioRecommended Approach
Quick prototype, few devicesSiliconWit.io (zero setup)
Complex custom logic, many rulesCustom Python engine
Visual automation with hardware actionsNode-RED
Alerts tied to dashboard panelsGrafana alerting

Node-RED: Visual Flow-Based Automation



Node-RED is an open-source, browser-based flow editor built on Node.js. You build automation logic by wiring together “nodes” on a canvas. Each node performs one task: receive an MQTT message, evaluate a condition, format a payload, send an HTTP request. The visual approach makes it fast to prototype and easy for non-programmers to understand.

Installing Node-RED

Install Node-RED via npm
# Requires Node.js 18 or later
sudo npm install -g --unsafe-perm node-red
# Start Node-RED
node-red

Node-RED will be available at http://localhost:1880.

Core Nodes for IoT Automation

Before building a flow, here are the nodes you will use most often.

NodeCategoryPurpose
mqtt inNetworkSubscribe to an MQTT topic and receive messages
mqtt outNetworkPublish a message to an MQTT topic
functionFunctionWrite JavaScript to transform or evaluate data
switchFunctionRoute messages based on conditions (like an if/else)
changeFunctionSet, change, or delete message properties
http requestNetworkMake HTTP GET/POST requests (webhooks, APIs)
emailSocialSend email via SMTP
debugOutputPrint messages to the debug sidebar
injectInputManually trigger a flow for testing
delayFunctionThrottle, rate-limit, or delay messages

Building a Complete Automation Flow

We will build a flow that does the following:

  1. Subscribes to sensors/greenhouse/temperature
  2. Parses the JSON payload
  3. Checks if temperature exceeds 35 C
  4. If exceeded: sends a Slack notification AND publishes an MQTT command to turn on a ventilation fan
  5. If normal: publishes an MQTT command to turn the fan off

Step 1: MQTT Input Node

Drag an mqtt in node onto the canvas and configure it:

PropertyValue
Serverlocalhost:1883 (add credentials in the server config)
Topicsensors/greenhouse/temperature
QoS1
Outputparsed JSON object

Step 2: Function Node (Parse and Evaluate)

Connect a function node after the mqtt-in node. This node extracts the temperature and adds a flag indicating whether the threshold is exceeded.

Function node: Evaluate Threshold
// Extract temperature from the incoming MQTT payload
var temp = msg.payload.temperature;
if (temp === undefined || temp === null) {
node.warn("No temperature field in payload");
return null; // Drop the message
}
// Attach the raw value and evaluation result
msg.temperature = temp;
msg.threshold = 35;
msg.exceeded = (temp > 35);
return msg;

Step 3: Switch Node (Route by Condition)

Connect a switch node after the function node. Configure it to route on the property msg.exceeded:

OutputCondition
Output 1msg.exceeded is true
Output 2msg.exceeded is false

This gives you two output wires: one for the “alert” path and one for the “normal” path.

Step 4: Alert Path (Slack + Fan ON)

From Output 1 of the switch node, wire two parallel paths:

Path A: Slack Notification

Add a function node to format the Slack message:

Function node: Format Slack Payload
msg.payload = {
"text": "🔴 *Greenhouse Alert*: Temperature is " +
msg.temperature + "°C (threshold: 35°C). " +
"Fan has been activated."
};
msg.headers = { "Content-Type": "application/json" };
msg.url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL";
return msg;

Connect this to an http request node set to POST.

Path B: Fan ON Command

Add a change node that sets msg.payload to {"command": "fan_on", "speed": 100}, then connect it to an mqtt out node publishing to actuators/greenhouse/fan.

Step 5: Normal Path (Fan OFF)

From Output 2 of the switch node, add a change node that sets msg.payload to {"command": "fan_off"}, then connect it to another mqtt out node publishing to actuators/greenhouse/fan.

Step 6: Debug Nodes

Add debug nodes to the output of each path so you can see messages flowing through the system in the Node-RED debug sidebar.

Exporting the Flow

Once your flow is working, click the menu (three horizontal lines in the top right), then Export to get the JSON representation. Here is the complete flow:

node-red-flow.json
[
{
"id": "mqtt_in_1",
"type": "mqtt in",
"name": "Greenhouse Temp",
"topic": "sensors/greenhouse/temperature",
"qos": "1",
"datatype": "json",
"broker": "broker_config",
"wires": [["func_eval"]]
},
{
"id": "func_eval",
"type": "function",
"name": "Evaluate Threshold",
"func": "var temp = msg.payload.temperature;\nif (temp === undefined) return null;\nmsg.temperature = temp;\nmsg.threshold = 35;\nmsg.exceeded = (temp > 35);\nreturn msg;",
"wires": [["switch_1"]]
},
{
"id": "switch_1",
"type": "switch",
"name": "Temp > 35?",
"property": "exceeded",
"rules": [
{"t": "true"},
{"t": "false"}
],
"wires": [
["func_slack", "change_fan_on"],
["change_fan_off"]
]
},
{
"id": "func_slack",
"type": "function",
"name": "Format Slack",
"func": "msg.payload = {text: 'Greenhouse Alert: Temp=' + msg.temperature + 'C'};\nmsg.headers = {'Content-Type':'application/json'};\nreturn msg;",
"wires": [["http_slack"]]
},
{
"id": "http_slack",
"type": "http request",
"name": "Send Slack",
"method": "POST",
"url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
"wires": [["debug_slack"]]
},
{
"id": "change_fan_on",
"type": "change",
"name": "Fan ON",
"rules": [
{"t": "set", "p": "payload", "to": "{\"command\":\"fan_on\",\"speed\":100}", "tot": "json"}
],
"wires": [["mqtt_out_fan"]]
},
{
"id": "change_fan_off",
"type": "change",
"name": "Fan OFF",
"rules": [
{"t": "set", "p": "payload", "to": "{\"command\":\"fan_off\"}", "tot": "json"}
],
"wires": [["mqtt_out_fan"]]
},
{
"id": "mqtt_out_fan",
"type": "mqtt out",
"name": "Fan Command",
"topic": "actuators/greenhouse/fan",
"qos": "1",
"broker": "broker_config",
"wires": []
},
{
"id": "debug_slack",
"type": "debug",
"name": "Slack Response",
"wires": []
},
{
"id": "broker_config",
"type": "mqtt-broker",
"name": "Local Mosquitto",
"broker": "localhost",
"port": "1883",
"credentials": {
"user": "nodered",
"password": "nodered-pass"
}
}
]

To import this flow: open Node-RED, click the menu, Import, paste the JSON, and click Import. Update the broker credentials and Slack webhook URL to match your setup.

Automation Patterns



Simple threshold alerts cover the basics, but production systems need more sophisticated patterns to avoid false positives and handle edge cases.

Hysteresis (Preventing Flapping)

A sensor reading that hovers around the threshold will cause the alert to fire and clear repeatedly, sending a flood of notifications. Hysteresis solves this by using two thresholds: a high threshold to trigger the alert and a lower threshold to clear it.

hysteresis_example.py
class HysteresisAlert:
"""
Alert with separate trigger and reset thresholds.
Trigger at 35 C, clear at 33 C. This prevents flapping
when the temperature oscillates around 35 C.
"""
def __init__(self, trigger_threshold, reset_threshold):
self.trigger_threshold = trigger_threshold
self.reset_threshold = reset_threshold
self.is_active = False
def evaluate(self, value):
if not self.is_active and value > self.trigger_threshold:
self.is_active = True
return "TRIGGERED"
elif self.is_active and value < self.reset_threshold:
self.is_active = False
return "CLEARED"
return "NO_CHANGE"
# Example usage
alert = HysteresisAlert(trigger_threshold=35.0, reset_threshold=33.0)
readings = [33, 34, 35.1, 35.5, 34.8, 35.2, 33.5, 32.9, 33.1]
for r in readings:
result = alert.evaluate(r)
if result != "NO_CHANGE":
print(f" Temp={r}C -> {result}")

Output:

Temp=35.1C -> TRIGGERED
Temp=32.9C -> CLEARED

Without hysteresis, the values 35.1, 34.8, 35.2 would cause trigger, clear, trigger in rapid succession. With hysteresis, the alert triggers once at 35.1 and does not clear until the temperature drops below the reset threshold of 33 C.

Dead-Band

A dead-band is similar to hysteresis but symmetrical. It defines a “no action” zone around a setpoint. This is common in HVAC systems where you want the heater to turn on below 20 C and the cooler to turn on above 25 C, with a dead band between 20 and 25 where neither runs.

dead_band_example.py
class DeadBandController:
"""
Dead-band controller for HVAC-style automation.
Heater ON below low_threshold, cooler ON above high_threshold,
both OFF in the dead band between them.
"""
def __init__(self, low_threshold, high_threshold):
self.low = low_threshold
self.high = high_threshold
self.state = "idle" # "heating", "cooling", or "idle"
def evaluate(self, value):
if value < self.low:
new_state = "heating"
elif value > self.high:
new_state = "cooling"
else:
new_state = self.state # Stay in current state (dead band)
changed = (new_state != self.state)
self.state = new_state
return self.state, changed

Scheduled Actions

Some automations are time-based, not data-based. Node-RED’s inject node can be configured with a cron expression to trigger flows on a schedule. Common examples:

ScheduleUse Case
Every day at 06:00Turn on greenhouse irrigation
Every 15 minutesRequest a status report from all devices
Monday 09:00Generate and email a weekly sensor summary
Sunset (calculated)Turn on outdoor lights

In Python, use the schedule library or system cron jobs:

scheduled_action.py
import schedule
import time
import paho.mqtt.client as mqtt
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.connect("localhost", 1883)
def morning_irrigation():
"""Turn on the irrigation system every morning."""
client.publish(
"actuators/greenhouse/irrigation",
'{"command": "start", "duration_minutes": 30}',
qos=1
)
print("Irrigation started (30 min)")
def request_status():
"""Ask all devices to report their status."""
client.publish(
"commands/all/status",
'{"request": "report"}',
qos=1
)
# Schedule tasks
schedule.every().day.at("06:00").do(morning_irrigation)
schedule.every(15).minutes.do(request_status)
while True:
schedule.run_pending()
time.sleep(1)

If-Then Rules in Node-RED

For more complex if-then logic, Node-RED’s switch node supports multiple conditions chained together. You can also nest function nodes for compound conditions that are difficult to express with the visual switch node.

Function node: compound condition
// Only alert if temperature is high AND humidity is low
// (high temp + low humidity = fire risk in a warehouse)
var temp = msg.payload.temperature;
var humidity = msg.payload.humidity;
if (temp > 40 && humidity < 20) {
msg.alert = "FIRE_RISK";
msg.payload = {
text: "Fire risk: temp=" + temp + "C, humidity=" + humidity + "%"
};
return msg;
}
return null; // Drop message if condition not met

Grafana Alerting



If you already have Grafana dashboards from Lesson 4, you can add alert rules directly to your dashboard panels. This is convenient because the alert definition lives alongside the visualization.

Creating a Grafana Alert Rule

  1. Open the panel you want to alert on (e.g., a temperature time-series chart).

  2. Click the panel title and select Edit.

  3. Go to the Alert tab and click Create alert rule from this panel.

  4. Define the condition. Select the query (your InfluxDB query for temperature), set the reducer to “last” (or “avg” for smoothing), and set the threshold. For example: “WHEN last() OF query(A) IS ABOVE 35.”

  5. Set the evaluation interval. How often Grafana checks the condition. Every 10 seconds is reasonable for real-time monitoring.

  6. Set the “for” duration. This is equivalent to our “sustained_seconds” in the Python engine. A “for” duration of 30 seconds means the condition must be true for 30 consecutive evaluations before the alert fires.

  7. Configure contact points. Go to Alerting > Contact Points in the Grafana sidebar. Add a contact point for each notification channel (email, Slack webhook, Discord webhook, etc.). Then assign the contact point to your alert rule.

  8. Save the dashboard.

Grafana Alert Limitations

Grafana alerting is powerful but has constraints you should know about.

LimitationImpact
Tied to dashboard queriesCannot alert on raw MQTT; data must be in the database first
Evaluation interval minimum10 seconds (not truly real-time)
Limited logicSimple threshold and range conditions; complex compound rules require recording rules in InfluxDB or a separate engine
Notification templatesLess flexible than custom Python formatters
Stateful only within GrafanaIf Grafana restarts, pending alert states may reset

For simple “value above X” alerts, Grafana works well. For anything more complex, use the Python engine or Node-RED.

Comparing Approaches



Custom Python Engine

Best for: Complex rules, custom logic, large-scale deployments. Full control over every aspect: rule evaluation, notification formatting, state management, database logging. Requires you to write and maintain the code, handle restarts, and manage dependencies. Most flexible but most effort.

Node-RED

Best for: Visual prototyping, hardware automation, non-programmer teams. Drag-and-drop flow editor with hundreds of community nodes. Great for wiring MQTT inputs to HTTP outputs, toggling relays, and building dashboards. Less suited for complex stateful logic or very large rule sets.

SiliconWit.io Alerts

Best for: Quick setup, managed infrastructure, small to medium deployments. Zero infrastructure to maintain. Alerts, dashboards, and device management in one platform. Supports threshold, status, geofence, and inactivity alerts with email, SMS, and webhook notifications. Limited customization compared to a self-hosted engine.

Grafana Alerting

Best for: Teams already using Grafana dashboards, alert-on-chart workflows. Alerts are defined alongside visualizations. Good for threshold and range conditions on time-series data. Requires data to already be in a supported database (InfluxDB, Prometheus, etc.). Not designed for real-time MQTT-driven automation.

Decision matrix:

CriterionPython EngineNode-REDSiliconWit.ioGrafana
Setup timeSlowMediumFastMedium
FlexibilityHighMediumLowLow
Real-time MQTTYesYesYesNo
Hardware actuationManualBuilt-inVia webhooksNo
Visual editorNoYesYesPartial
Maintenance burdenHighMediumNoneMedium
ScalabilityHighMediumHighMedium
CostFreeFreeFree tier / paidFree

For this course, we recommend starting with SiliconWit.io for quick wins, then building the Python engine when you need custom logic, and using Node-RED when your automation involves physical actuators like fans, valves, or relays that respond to MQTT commands.

Exercises



Exercise 1: Multi-Channel Alert System

Build a complete alert system using the Python engine from this lesson.

Requirements:

  • Define at least 3 rules in rules.json: one threshold alert, one rate-of-change alert, and one inactivity alert
  • Implement at least 2 notification channels (pick from email, Discord, Slack, Telegram)
  • Test with simulated sensor data using mosquitto_pub
  • Add hysteresis to your threshold alert so it does not flap when the value hovers near the boundary

Deliverables:

  • Your rules.json file
  • Screenshot of notifications arriving on your chosen channels
  • Terminal output showing the alert engine detecting sustained conditions and cooldown enforcement

Exercise 2: Node-RED Automation with Actuator Control

Build a Node-RED flow that monitors two sensors and controls two actuators.

Requirements:

  • Subscribe to sensors/greenhouse/temperature and sensors/greenhouse/soil_moisture
  • If temperature exceeds 35 C, publish a command to actuators/greenhouse/fan to turn on the fan
  • If soil moisture drops below 30%, publish a command to actuators/greenhouse/irrigation to start watering for 10 minutes
  • Add a delay node to rate-limit irrigation commands (no more than once per hour)
  • Send a summary notification to Slack or Discord whenever any actuator state changes

Deliverables:

  • Export your Node-RED flow JSON
  • Screenshot of the flow in the Node-RED editor
  • Debug sidebar output showing messages flowing through both paths

Exercise 3: SiliconWit.io Alert Configuration

Set up alerts on the SiliconWit.io platform for your sensor data.

Requirements:

  • Create a threshold alert for temperature (above 35 C, sustained 30 seconds, cooldown 5 minutes)
  • Create an inactivity alert (no data for 10 minutes)
  • Configure at least one webhook notification that posts to a Discord or Slack channel
  • Test by publishing data above the threshold and by stopping your device to trigger inactivity

Deliverables:

  • Screenshots of the alert configuration on SiliconWit.io
  • Screenshot of the notification arriving on your webhook channel
  • Brief comparison: what was easier or harder compared to the Python engine?

Exercise 4: Grafana Alert with Escalation

Create a Grafana alert rule and simulate an escalation workflow.

Requirements:

  • Add an alert rule to a Grafana dashboard panel from Lesson 4
  • Configure two contact points: a Slack/Discord webhook for the initial alert and an email for escalation
  • Set the alert to fire after 30 seconds of sustained condition
  • Set a second alert rule on the same panel with a higher threshold (e.g., 40 C) that triggers the email escalation contact point
  • Document the difference between Grafana’s “for” duration and the Python engine’s sustained_seconds

Deliverables:

  • Screenshot of the Grafana alert rule configuration
  • Screenshot of both notifications (webhook and email) firing during the test
  • Written explanation of how Grafana’s alert state machine works (Pending, Firing, Resolved) compared to the Python engine’s state tracking

Next lesson: Device Security, TLS, and Provisioning. You will generate X.509 certificates, configure mutual TLS on your MQTT connections, and build a device provisioning workflow that assigns unique identities to each node in your IoT network.

Comments

Loading comments...


© 2021-2026 SiliconWit®. All rights reserved.