Skip to content

REST APIs, Webhooks, and Device Integration

REST APIs, Webhooks, and Device Integration hero image
Modified:
Published:

MQTT excels at real-time streaming, but not every interaction fits the publish/subscribe model. Querying historical data, registering a new device, updating firmware configuration, or revoking an API key: these are all request/response operations where REST is the natural fit. In this lesson you will build a Flask REST API that manages devices and serves sensor data, then add a webhook receiver that processes push notifications with HMAC signature verification. By the end, your MQTT pipeline populates the database and your REST API reads from it, giving you both real-time and on-demand access to your IoT data. #REST #Webhooks #API

When to Use REST vs MQTT

Before writing any code, it is worth understanding when each protocol is the right choice. They are not competitors; they solve different problems and work best together.

Use MQTT When

  • Devices push sensor readings continuously (every few seconds or minutes)
  • You need low-latency, event-driven updates
  • Bandwidth and power are constrained (battery devices, cellular links)
  • Multiple consumers need the same data stream (fan-out via topics)
  • You need QoS guarantees for unreliable networks

Use REST When

  • A client requests data on demand (dashboard loads, report generation)
  • You need to query historical data with filters (time range, aggregation)
  • Device management operations: register, configure, decommission
  • Integration with third-party services that expect HTTP endpoints
  • You need standard authentication (API keys, OAuth, Bearer tokens)
REST + MQTT Unified Pipeline
──────────────────────────────────────────
ESP32 ──MQTT──► Mosquitto ──► Telegraf
Broker │
Admin ──REST──► Flask API ◄── InfluxDB
(browser) │ │
│ ▼
│ Grafana
│ (queries DB)
└──► Device registry
(SQLite/Postgres)
MQTT: real-time sensor data flow
REST: on-demand queries and management

In a typical production IoT system, both protocols coexist:

Data FlowProtocolExample
Sensor readings, device to brokerMQTTESP32 publishes temperature every 30s
Broker to databaseMQTT (subscriber)Telegraf subscribes and writes to InfluxDB
Dashboard queries databaseREST (GET)Grafana or custom UI fetches last 24h of data
Admin registers new deviceREST (POST)Web portal sends device metadata to API
Admin updates device configREST (PUT)Change sampling interval from 30s to 10s
REST API Request/Response Flow
──────────────────────────────────────────
Client (browser/curl) Flask Server
──────────────────── ────────────
GET /api/v1/devices
├──────────────────────► Query DB
│ │
│ 200 OK │
│ [{"id":"esp32-01", ◄┘
│ "last_seen":"..."}]
POST /api/v1/devices/esp32-01/config
{"interval": 10}
├──────────────────────► Validate
│ Publish MQTT cmd
│ 200 OK ◄┘
│ {"status":"updated"}

| Platform notifies external service | Webhook (POST) | Alert fires, platform POSTs to your endpoint |

The lesson builds each piece of this architecture.

REST Fundamentals for IoT



REST (Representational State Transfer) maps CRUD operations to HTTP methods. For IoT, the “resources” are devices, sensor readings, configurations, and commands.

HTTP Methods

MethodOperationIoT ExampleIdempotent
GETReadFetch latest sensor readingYes
POSTCreateRegister a new device, send a commandNo
PUTUpdate/ReplaceUpdate device configurationYes
PATCHPartial updateChange only the sampling intervalYes
DELETERemoveDecommission a deviceYes

Status Codes That Matter

You do not need to memorize every HTTP status code, but these appear constantly in IoT APIs:

CodeMeaningWhen You See It
200OKSuccessful GET, PUT, PATCH
201CreatedSuccessful POST that created a resource
204No ContentSuccessful DELETE
400Bad RequestMalformed JSON, missing required fields
401UnauthorizedMissing or invalid API key
403ForbiddenValid key but insufficient permissions
404Not FoundDevice ID does not exist
409ConflictDevice already registered
429Too Many RequestsRate limit exceeded
500Internal Server ErrorServer bug or database down

JSON Payload Conventions

IoT APIs typically return JSON. A consistent response envelope makes client code simpler:

Successful response
{
"status": "ok",
"data": {
"device_id": "esp32-001",
"temperature": 23.4,
"humidity": 61.2,
"timestamp": "2026-03-10T14:30:00Z"
}
}
Error response
{
"status": "error",
"error": {
"code": 404,
"message": "Device esp32-999 not found"
}
}

Building a Flask REST API for Device Management



Flask is lightweight and widely used for IoT backends. The API server manages device registration, serves sensor data, accepts commands, and updates configuration. In production you would use a proper database; here we use SQLite to keep the focus on the API design.

Project Structure

  • Directoryiot-rest-api/
    • app.py
    • models.py
    • auth.py
    • requirements.txt
    • devices.db

Dependencies

requirements.txt
flask==3.1.0
flask-limiter==3.5.0

Install with:

Terminal window
pip install -r requirements.txt

Database Models

models.py
import sqlite3
import json
from datetime import datetime, timezone
DB_PATH = "devices.db"
def get_db():
"""Get a database connection with row factory enabled."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
def init_db():
"""Create tables if they do not exist."""
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS devices (
device_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL DEFAULT 'sensor',
location TEXT,
config TEXT DEFAULT '{}',
status TEXT DEFAULT 'active',
registered TEXT NOT NULL,
last_seen TEXT
);
CREATE TABLE IF NOT EXISTS readings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
payload TEXT NOT NULL,
timestamp TEXT NOT NULL,
FOREIGN KEY (device_id) REFERENCES devices(device_id)
);
CREATE TABLE IF NOT EXISTS commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT NOT NULL,
command TEXT NOT NULL,
params TEXT DEFAULT '{}',
status TEXT DEFAULT 'pending',
created TEXT NOT NULL,
executed TEXT,
FOREIGN KEY (device_id) REFERENCES devices(device_id)
);
CREATE INDEX IF NOT EXISTS idx_readings_device
ON readings(device_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_commands_device
ON commands(device_id, created);
""")
conn.close()
def device_exists(device_id):
"""Check if a device is registered."""
conn = get_db()
row = conn.execute(
"SELECT 1 FROM devices WHERE device_id = ?", (device_id,)
).fetchone()
conn.close()
return row is not None
def register_device(device_id, name, device_type="sensor",
location=None, config=None):
"""Register a new device. Returns True on success."""
conn = get_db()
now = datetime.now(timezone.utc).isoformat()
config_str = json.dumps(config or {})
try:
conn.execute(
"""INSERT INTO devices
(device_id, name, type, location, config, registered)
VALUES (?, ?, ?, ?, ?, ?)""",
(device_id, name, device_type, location, config_str, now)
)
conn.commit()
return True
except sqlite3.IntegrityError:
return False
finally:
conn.close()
def get_all_devices():
"""Return all registered devices."""
conn = get_db()
rows = conn.execute(
"SELECT * FROM devices ORDER BY registered DESC"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_device(device_id):
"""Return a single device or None."""
conn = get_db()
row = conn.execute(
"SELECT * FROM devices WHERE device_id = ?", (device_id,)
).fetchone()
conn.close()
return dict(row) if row else None
def update_device_config(device_id, config):
"""Update a device's configuration. Returns True if found."""
conn = get_db()
now = datetime.now(timezone.utc).isoformat()
cursor = conn.execute(
"UPDATE devices SET config = ?, last_seen = ? WHERE device_id = ?",
(json.dumps(config), now, device_id)
)
conn.commit()
updated = cursor.rowcount > 0
conn.close()
return updated
def delete_device(device_id):
"""Remove a device and all its data. Returns True if found."""
conn = get_db()
cursor = conn.execute(
"DELETE FROM devices WHERE device_id = ?", (device_id,)
)
conn.execute(
"DELETE FROM readings WHERE device_id = ?", (device_id,)
)
conn.execute(
"DELETE FROM commands WHERE device_id = ?", (device_id,)
)
conn.commit()
deleted = cursor.rowcount > 0
conn.close()
return deleted
def insert_reading(device_id, payload):
"""Store a sensor reading."""
conn = get_db()
now = datetime.now(timezone.utc).isoformat()
conn.execute(
"INSERT INTO readings (device_id, payload, timestamp) VALUES (?, ?, ?)",
(device_id, json.dumps(payload), now)
)
conn.execute(
"UPDATE devices SET last_seen = ? WHERE device_id = ?",
(now, device_id)
)
conn.commit()
conn.close()
def get_latest_reading(device_id):
"""Return the most recent reading for a device."""
conn = get_db()
row = conn.execute(
"""SELECT * FROM readings
WHERE device_id = ?
ORDER BY timestamp DESC LIMIT 1""",
(device_id,)
).fetchone()
conn.close()
if row:
result = dict(row)
result["payload"] = json.loads(result["payload"])
return result
return None
def get_readings_range(device_id, start, end, limit=1000):
"""Return readings within a time range."""
conn = get_db()
rows = conn.execute(
"""SELECT * FROM readings
WHERE device_id = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC LIMIT ?""",
(device_id, start, end, limit)
).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["payload"] = json.loads(d["payload"])
results.append(d)
return results
def insert_command(device_id, command, params=None):
"""Queue a command for a device. Returns the command ID."""
conn = get_db()
now = datetime.now(timezone.utc).isoformat()
cursor = conn.execute(
"""INSERT INTO commands (device_id, command, params, created)
VALUES (?, ?, ?, ?)""",
(device_id, command, json.dumps(params or {}), now)
)
conn.commit()
cmd_id = cursor.lastrowid
conn.close()
return cmd_id

API Key Authentication

auth.py
import os
import hashlib
import secrets
from functools import wraps
from flask import request, jsonify
# In production, store hashed keys in a database.
# For this lesson, we use a simple dictionary.
API_KEYS = {
# key: {"name": "owner", "permissions": ["read", "write", "admin"]}
}
def generate_api_key(name, permissions=None):
"""Generate a new API key in swk_ format."""
raw = secrets.token_hex(24)
key = f"swk_{raw}"
API_KEYS[key] = {
"name": name,
"permissions": permissions or ["read"],
}
return key
def require_auth(permission="read"):
"""Decorator that checks for a valid API key with the required permission."""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
key = auth_header[7:]
else:
key = request.headers.get("X-API-Key", "")
if not key:
return jsonify({
"status": "error",
"error": {
"code": 401,
"message": "Missing API key. Use Authorization: Bearer <key> "
"or X-API-Key header."
}
}), 401
key_data = API_KEYS.get(key)
if not key_data:
return jsonify({
"status": "error",
"error": {
"code": 401,
"message": "Invalid API key."
}
}), 401
if permission not in key_data["permissions"]:
return jsonify({
"status": "error",
"error": {
"code": 403,
"message": f"Key does not have '{permission}' permission."
}
}), 403
return f(*args, **kwargs)
return wrapped
return decorator
# Generate a default admin key for development
DEFAULT_KEY = generate_api_key("admin", ["read", "write", "admin"])

The Flask Application

app.py
from flask import Flask, request, jsonify
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from models import (
init_db, get_all_devices, get_device, register_device,
update_device_config, delete_device, device_exists,
insert_reading, get_latest_reading, get_readings_range,
insert_command
)
from auth import require_auth, DEFAULT_KEY
app = Flask(__name__)
# Rate limiting: 100 requests per minute per IP
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["100 per minute"],
storage_uri="memory://",
)
# ================================================================
# Device Management Endpoints
# ================================================================
@app.route("/api/devices", methods=["GET"])
@require_auth("read")
def list_devices():
"""GET /api/devices - List all registered devices."""
devices = get_all_devices()
return jsonify({
"status": "ok",
"count": len(devices),
"data": devices
})
@app.route("/api/devices", methods=["POST"])
@require_auth("write")
def create_device():
"""POST /api/devices - Register a new device."""
body = request.get_json(silent=True)
if not body:
return jsonify({
"status": "error",
"error": {"code": 400, "message": "Request body must be JSON."}
}), 400
device_id = body.get("device_id")
name = body.get("name")
if not device_id or not name:
return jsonify({
"status": "error",
"error": {"code": 400,
"message": "Fields 'device_id' and 'name' are required."}
}), 400
success = register_device(
device_id=device_id,
name=name,
device_type=body.get("type", "sensor"),
location=body.get("location"),
config=body.get("config"),
)
if not success:
return jsonify({
"status": "error",
"error": {"code": 409,
"message": f"Device '{device_id}' already exists."}
}), 409
device = get_device(device_id)
return jsonify({"status": "ok", "data": device}), 201
@app.route("/api/devices/<device_id>", methods=["GET"])
@require_auth("read")
def get_device_info(device_id):
"""GET /api/devices/{id} - Get device details."""
device = get_device(device_id)
if not device:
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
return jsonify({"status": "ok", "data": device})
@app.route("/api/devices/<device_id>", methods=["DELETE"])
@require_auth("admin")
def remove_device(device_id):
"""DELETE /api/devices/{id} - Decommission a device."""
deleted = delete_device(device_id)
if not deleted:
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
return jsonify({"status": "ok", "message": "Device removed."}), 200
# ================================================================
# Sensor Data Endpoints
# ================================================================
@app.route("/api/devices/<device_id>/data", methods=["GET"])
@require_auth("read")
def get_device_data(device_id):
"""
GET /api/devices/{id}/data - Get sensor readings.
Query params: start, end (ISO 8601 timestamps) for historical range.
Without params, returns the latest reading.
"""
if not device_exists(device_id):
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
start = request.args.get("start")
end = request.args.get("end")
if start and end:
# Historical range query
limit = request.args.get("limit", 1000, type=int)
limit = min(limit, 10000) # cap at 10k rows
readings = get_readings_range(device_id, start, end, limit)
return jsonify({
"status": "ok",
"count": len(readings),
"data": readings
})
else:
# Latest reading
reading = get_latest_reading(device_id)
if not reading:
return jsonify({
"status": "ok",
"data": None,
"message": "No readings yet."
})
return jsonify({"status": "ok", "data": reading})
@app.route("/api/devices/<device_id>/data", methods=["POST"])
@require_auth("write")
@limiter.limit("60 per minute")
def ingest_data(device_id):
"""POST /api/devices/{id}/data - Ingest a sensor reading via HTTP."""
if not device_exists(device_id):
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
body = request.get_json(silent=True)
if not body:
return jsonify({
"status": "error",
"error": {"code": 400, "message": "Request body must be JSON."}
}), 400
insert_reading(device_id, body)
return jsonify({"status": "ok", "message": "Reading stored."}), 201
# ================================================================
# Command Endpoints
# ================================================================
@app.route("/api/devices/<device_id>/command", methods=["POST"])
@require_auth("write")
def send_command(device_id):
"""POST /api/devices/{id}/command - Send a command to a device."""
if not device_exists(device_id):
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
body = request.get_json(silent=True)
if not body or "command" not in body:
return jsonify({
"status": "error",
"error": {"code": 400,
"message": "Field 'command' is required."}
}), 400
cmd_id = insert_command(
device_id=device_id,
command=body["command"],
params=body.get("params"),
)
return jsonify({
"status": "ok",
"data": {"command_id": cmd_id, "status": "pending"}
}), 201
# ================================================================
# Configuration Endpoints
# ================================================================
@app.route("/api/devices/<device_id>/config", methods=["GET"])
@require_auth("read")
def get_config(device_id):
"""GET /api/devices/{id}/config - Get device configuration."""
device = get_device(device_id)
if not device:
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
import json
config = json.loads(device.get("config", "{}"))
return jsonify({"status": "ok", "data": config})
@app.route("/api/devices/<device_id>/config", methods=["PUT"])
@require_auth("write")
def update_config(device_id):
"""PUT /api/devices/{id}/config - Update device configuration."""
body = request.get_json(silent=True)
if not body:
return jsonify({
"status": "error",
"error": {"code": 400, "message": "Request body must be JSON."}
}), 400
updated = update_device_config(device_id, body)
if not updated:
return jsonify({
"status": "error",
"error": {"code": 404,
"message": f"Device '{device_id}' not found."}
}), 404
return jsonify({"status": "ok", "message": "Configuration updated."})
# ================================================================
# Health Check (no auth required)
# ================================================================
@app.route("/api/health", methods=["GET"])
@limiter.exempt
def health():
"""GET /api/health - Server health check."""
return jsonify({"status": "ok", "service": "iot-rest-api"})
# ================================================================
# Entry Point
# ================================================================
if __name__ == "__main__":
init_db()
print(f"\n Default API key: {DEFAULT_KEY}")
print(f" Use: curl -H 'Authorization: Bearer {DEFAULT_KEY}' ...\n")
app.run(host="0.0.0.0", port=5000, debug=True)

Testing the API with curl

Start the server and test each endpoint:

  1. Start the server:

    Terminal window
    python app.py

    Note the API key printed to the terminal. You will use it in all requests.

  2. Register a device:

    POST /api/devices
    curl -X POST http://localhost:5000/api/devices \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE" \
    -H "Content-Type: application/json" \
    -d '{
    "device_id": "esp32-001",
    "name": "Greenhouse Sensor",
    "type": "sensor",
    "location": "greenhouse-north",
    "config": {
    "sample_interval_s": 30,
    "mqtt_topic": "greenhouse/north/env"
    }
    }'
  3. List all devices:

    GET /api/devices
    curl http://localhost:5000/api/devices \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE"
  4. Ingest a sensor reading:

    POST /api/devices/{id}/data
    curl -X POST http://localhost:5000/api/devices/esp32-001/data \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE" \
    -H "Content-Type: application/json" \
    -d '{
    "temperature": 24.3,
    "humidity": 58.7,
    "soil_moisture": 42.1
    }'
  5. Get the latest reading:

    GET /api/devices/{id}/data
    curl http://localhost:5000/api/devices/esp32-001/data \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE"
  6. Query a historical range:

    GET /api/devices/{id}/data?start=&end=
    curl "http://localhost:5000/api/devices/esp32-001/data?\
    start=2026-03-10T00:00:00&end=2026-03-10T23:59:59" \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE"
  7. Send a command to a device:

    POST /api/devices/{id}/command
    curl -X POST http://localhost:5000/api/devices/esp32-001/command \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE" \
    -H "Content-Type: application/json" \
    -d '{"command": "reboot", "params": {"delay_s": 5}}'
  8. Update device configuration:

    PUT /api/devices/{id}/config
    curl -X PUT http://localhost:5000/api/devices/esp32-001/config \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE" \
    -H "Content-Type: application/json" \
    -d '{"sample_interval_s": 10, "mqtt_topic": "greenhouse/north/env"}'
  9. Delete a device (requires admin permission):

    DELETE /api/devices/{id}
    curl -X DELETE http://localhost:5000/api/devices/esp32-001 \
    -H "Authorization: Bearer swk_YOUR_KEY_HERE"

SiliconWit.io REST API



If you prefer a managed IoT platform instead of running your own server, SiliconWit.io provides a REST API that handles device management, data storage, and alerting for you.

API Endpoints

SiliconWit.io exposes two API surfaces:

Base URLPurposeAuthentication
siliconwit.io/api/Device management, dashboard configSession or API key
api.siliconwit.io/v1/Public data API, integrationsAPI key (swk_... format)

HTTP Data Ingestion

Devices that cannot use MQTT (or need a fallback) can POST sensor data directly over HTTP:

POST to SiliconWit.io ingest endpoint
curl -X POST https://siliconwit.io/api/devices/ingest \
-H "Authorization: Bearer swk_YOUR_SILICONWIT_KEY" \
-H "Content-Type: application/json" \
-d '{
"device_id": "esp32-001",
"readings": {
"temperature": 24.3,
"humidity": 58.7
}
}'

This is especially useful for devices behind restrictive firewalls that block MQTT ports but allow outbound HTTPS. The platform stores the data in the same time-series database as MQTT-ingested data, so your dashboards and alerts work identically regardless of ingestion method.

Querying Data

GET historical data from SiliconWit.io
curl "https://api.siliconwit.io/v1/devices/esp32-001/data?\
start=2026-03-10T00:00:00Z&end=2026-03-10T12:00:00Z&limit=500" \
-H "Authorization: Bearer swk_YOUR_SILICONWIT_KEY"

You can use either SiliconWit.io or your own Flask API for the rest of this lesson. The concepts are identical; only the base URL changes.

Webhooks: Server Push Notifications



Polling an API every few seconds to check if something changed is wasteful. Webhooks flip the model: the server calls your endpoint when an event occurs. You register a URL, and the platform POSTs to it whenever a condition triggers.

How Webhooks Work

  1. Register your endpoint. Tell the platform: “When device esp32-001 goes offline, POST to https://myserver.example.com/webhooks/iot.”

  2. Event occurs. The device misses three consecutive heartbeats. The platform detects it is offline.

  3. Platform sends POST. The platform constructs a JSON payload describing the event, signs it with a shared secret, and POSTs it to your registered URL.

  4. Your server processes the event. Your endpoint validates the signature, extracts the event data, takes action (send a Slack message, page an engineer, log to a database), and returns HTTP 200.

  5. Platform confirms delivery. If your endpoint does not return 2xx within a timeout (typically 5 to 10 seconds), the platform retries with exponential backoff.

Security: Why Signatures Matter

Anyone who knows your webhook URL could send fake events. HMAC-SHA256 signatures prevent this. The platform and your server share a secret key. The platform computes HMAC-SHA256(secret, request_body) and includes it in a header. Your server recomputes the HMAC and compares. If they match, the request is authentic.

POST /webhooks/iot HTTP/1.1
Host: myserver.example.com
Content-Type: application/json
X-Webhook-Signature: sha256=a1b2c3d4e5f6...
{"event": "device_offline", "device_id": "esp32-001", ...}

Building a Webhook Receiver



This Flask application receives webhook events, validates the HMAC signature, processes the event, and returns an acknowledgment.

webhook_receiver.py
import hmac
import hashlib
import json
import logging
from datetime import datetime, timezone
from flask import Flask, request, jsonify
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("webhook")
# Shared secret (in production, load from environment variable)
WEBHOOK_SECRET = "your-webhook-secret-change-this"
def verify_signature(payload_bytes, signature_header):
"""
Verify the HMAC-SHA256 signature of a webhook payload.
Args:
payload_bytes: The raw request body as bytes.
signature_header: The value of the X-Webhook-Signature header.
Returns:
True if the signature is valid, False otherwise.
"""
if not signature_header:
logger.warning("Missing X-Webhook-Signature header")
return False
# The header format is "sha256=<hex digest>"
if not signature_header.startswith("sha256="):
logger.warning("Invalid signature format: %s", signature_header)
return False
received_sig = signature_header[7:] # strip "sha256=" prefix
# Compute expected signature
expected_sig = hmac.new(
key=WEBHOOK_SECRET.encode("utf-8"),
msg=payload_bytes,
digestmod=hashlib.sha256,
).hexdigest()
# Use constant-time comparison to prevent timing attacks
return hmac.compare_digest(received_sig, expected_sig)
def process_event(event):
"""
Process a validated webhook event.
Replace this with your actual business logic.
"""
event_type = event.get("event", "unknown")
if event_type == "device_offline":
logger.info(
"ALERT: Device %s went offline at %s",
event.get("device_id"),
event.get("timestamp"),
)
# Example: send a Slack notification, page an engineer, etc.
elif event_type == "threshold_exceeded":
logger.info(
"ALERT: Device %s, %s = %s (threshold: %s, severity: %s)",
event.get("device_id"),
event.get("condition"),
event.get("current_value"),
event.get("threshold"),
event.get("severity"),
)
# Example: trigger an automation rule
elif event_type == "device_online":
logger.info(
"INFO: Device %s came back online at %s",
event.get("device_id"),
event.get("timestamp"),
)
else:
logger.info("Received event type '%s': %s", event_type, event)
@app.route("/webhooks/iot", methods=["POST"])
def webhook_endpoint():
"""Receive and process webhook events."""
# Step 1: Get the raw body (before JSON parsing, for signature verification)
payload_bytes = request.get_data()
# Step 2: Verify the HMAC signature
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(payload_bytes, signature):
logger.warning(
"Invalid signature from %s", request.remote_addr
)
return jsonify({"status": "error", "message": "Invalid signature."}), 401
# Step 3: Parse the JSON payload
try:
event = json.loads(payload_bytes)
except json.JSONDecodeError:
return jsonify({"status": "error", "message": "Invalid JSON."}), 400
# Step 4: Log receipt
logger.info(
"Received webhook: event=%s, device=%s",
event.get("event"),
event.get("device_id"),
)
# Step 5: Process the event
process_event(event)
# Step 6: Return 200 to acknowledge receipt
return jsonify({"status": "ok", "received": True}), 200
if __name__ == "__main__":
logger.info("Webhook receiver starting on port 8080")
app.run(host="0.0.0.0", port=8080, debug=True)

Testing the Webhook Receiver

To test locally, write a small script that simulates what the platform does: construct a payload, sign it, and POST it to your receiver.

test_webhook.py
import hmac
import hashlib
import json
import requests
from datetime import datetime, timezone
WEBHOOK_URL = "http://localhost:8080/webhooks/iot"
WEBHOOK_SECRET = "your-webhook-secret-change-this"
def send_test_event(event_data):
"""Sign and send a test webhook event."""
payload = json.dumps(event_data).encode("utf-8")
# Compute HMAC-SHA256 signature
signature = hmac.new(
key=WEBHOOK_SECRET.encode("utf-8"),
msg=payload,
digestmod=hashlib.sha256,
).hexdigest()
headers = {
"Content-Type": "application/json",
"X-Webhook-Signature": f"sha256={signature}",
}
response = requests.post(WEBHOOK_URL, data=payload, headers=headers)
print(f"Status: {response.status_code}")
print(f"Response: {response.json()}")
return response
# Test 1: Device offline event
print("--- Test 1: Device offline ---")
send_test_event({
"event": "device_offline",
"alert_id": "alert-001",
"device_id": "esp32-001",
"timestamp": datetime.now(timezone.utc).isoformat(),
"message": "Device missed 3 consecutive heartbeats.",
})
# Test 2: Threshold exceeded event
print("\n--- Test 2: Threshold exceeded ---")
send_test_event({
"event": "threshold_exceeded",
"alert_id": "alert-002",
"device_id": "esp32-001",
"condition": "temperature",
"current_value": 38.7,
"threshold": 35.0,
"severity": "critical",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
# Test 3: Invalid signature (should be rejected)
print("\n--- Test 3: Invalid signature ---")
payload = json.dumps({"event": "test"}).encode("utf-8")
bad_sig = "sha256=0000000000000000000000000000000000000000000000000000000000000000"
response = requests.post(
WEBHOOK_URL,
data=payload,
headers={
"Content-Type": "application/json",
"X-Webhook-Signature": bad_sig,
},
)
print(f"Status: {response.status_code} (expected 401)")
print(f"Response: {response.json()}")

Run the test script in a separate terminal while the receiver is running:

Terminal window
# Terminal 1: start the receiver
python webhook_receiver.py
# Terminal 2: send test events
python test_webhook.py

Expected output from the receiver:

INFO:webhook:Received webhook: event=device_offline, device=esp32-001
INFO:webhook:ALERT: Device esp32-001 went offline at 2026-03-10T14:30:00+00:00
INFO:webhook:Received webhook: event=threshold_exceeded, device=esp32-001
INFO:webhook:ALERT: Device esp32-001, temperature = 38.7 (threshold: 35.0, severity: critical)
WARNING:webhook:Invalid signature from 127.0.0.1

SiliconWit.io Webhook Payloads



When you configure webhooks on SiliconWit.io, the platform sends structured JSON events. Here is the format you can expect:

Device offline event
{
"event": "device_offline",
"alert_id": "swk_alert_8f3a2b1c",
"device_id": "esp32-001",
"device_name": "Greenhouse Sensor",
"timestamp": "2026-03-10T14:30:00Z",
"message": "Device missed 3 consecutive heartbeats.",
"severity": "warning",
"metadata": {
"last_seen": "2026-03-10T14:25:00Z",
"expected_interval_s": 30
}
}
Threshold exceeded event
{
"event": "threshold_exceeded",
"alert_id": "swk_alert_4d7e9f0a",
"device_id": "esp32-001",
"device_name": "Greenhouse Sensor",
"condition": "temperature > 35.0",
"current_value": 38.7,
"threshold": 35.0,
"severity": "critical",
"timestamp": "2026-03-10T15:10:00Z",
"metadata": {
"unit": "celsius",
"duration_above_s": 120
}
}

The X-Webhook-Signature header uses the same sha256=<hex> format shown in the receiver code. The shared secret is generated when you create the webhook configuration in the SiliconWit.io dashboard.

Integrating MQTT and REST



In Lessons 2 through 4, you set up an MQTT broker, wrote firmware clients, and stored data in InfluxDB via Telegraf. The REST API from this lesson reads from a database. How do these fit together?

The Data Flow

ESP32 --[MQTT]--> Broker --[Telegraf]--> InfluxDB
|
v
REST API --[HTTP]--> Dashboard / Mobile App
|
v
Webhook --> External Services

MQTT handles the real-time ingestion path. Telegraf subscribes to the broker topics and writes every message to InfluxDB (or SQLite, or PostgreSQL). The REST API does not touch MQTT at all; it queries the database directly. This separation means:

  • MQTT can be down temporarily without affecting historical queries.
  • The REST API can serve thousands of dashboard users without adding load to the broker.
  • Webhooks fire based on database triggers or rule engines, not directly from MQTT messages.

Connecting the Flask API to InfluxDB

If you followed Lesson 4 and have InfluxDB running with Telegraf, you can swap the SQLite backend for InfluxDB queries. Here is how the data endpoint changes:

SQLite query
def get_readings_range(device_id, start, end, limit=1000):
conn = get_db()
rows = conn.execute(
"""SELECT * FROM readings
WHERE device_id = ? AND timestamp >= ? AND timestamp <= ?
ORDER BY timestamp ASC LIMIT ?""",
(device_id, start, end, limit)
).fetchall()
conn.close()
return [dict(r) for r in rows]

Both backends serve the same REST endpoints. The client code (dashboard, mobile app, CLI tool) does not care where the data is stored. This is the value of putting a REST API in front of your data.

ESP32 as HTTP Client: Fallback Data Ingestion



MQTT is the primary data path, but sometimes the broker is unreachable. A robust device should fall back to HTTP POST. This ESP-IDF code sends sensor data to your REST API (or to SiliconWit.io’s ingest endpoint) when MQTT fails.

http_fallback.c
#include "esp_http_client.h"
#include "esp_log.h"
#include "cJSON.h"
static const char *TAG = "http_fallback";
/* Your REST API server address */
#define API_URL "http://192.168.1.100:5000/api/devices/esp32-001/data"
#define API_KEY "swk_your_api_key_here"
/* For SiliconWit.io, use:
* #define API_URL "https://siliconwit.io/api/devices/ingest"
* and add the root CA certificate for TLS.
*/
esp_err_t http_post_sensor_data(float temperature, float humidity)
{
/* Build JSON payload */
cJSON *root = cJSON_CreateObject();
cJSON_AddStringToObject(root, "device_id", "esp32-001");
cJSON_AddNumberToObject(root, "temperature", temperature);
cJSON_AddNumberToObject(root, "humidity", humidity);
char *payload = cJSON_PrintUnformatted(root);
cJSON_Delete(root);
if (!payload) {
ESP_LOGE(TAG, "Failed to create JSON payload");
return ESP_FAIL;
}
/* Configure HTTP client */
esp_http_client_config_t config = {
.url = API_URL,
.method = HTTP_METHOD_POST,
.timeout_ms = 10000,
};
esp_http_client_handle_t client = esp_http_client_init(&config);
/* Set headers */
esp_http_client_set_header(client, "Content-Type", "application/json");
esp_http_client_set_header(client, "Authorization",
"Bearer " API_KEY);
/* Set POST data */
esp_http_client_set_post_field(client, payload, strlen(payload));
/* Perform the request */
esp_err_t err = esp_http_client_perform(client);
if (err == ESP_OK) {
int status = esp_http_client_get_status_code(client);
ESP_LOGI(TAG, "HTTP POST status=%d", status);
if (status == 201) {
ESP_LOGI(TAG, "Data ingested successfully via HTTP");
} else {
ESP_LOGW(TAG, "Server returned status %d", status);
}
} else {
ESP_LOGE(TAG, "HTTP POST failed: %s", esp_err_to_name(err));
}
esp_http_client_cleanup(client);
free(payload);
return err;
}

Using the Fallback in Your MQTT Client

In your main sensor task, try MQTT first. If the publish fails (broker unreachable, connection lost), fall back to HTTP:

Fallback logic in sensor task
void sensor_publish_task(void *pvParameters)
{
while (1) {
float temp = read_temperature();
float hum = read_humidity();
/* Try MQTT first */
int mqtt_rc = mqtt_publish_reading(temp, hum);
if (mqtt_rc != 0) {
ESP_LOGW(TAG, "MQTT publish failed (rc=%d), trying HTTP", mqtt_rc);
http_post_sensor_data(temp, hum);
}
vTaskDelay(pdMS_TO_TICKS(30000)); /* 30 seconds */
}
}

This dual-path approach ensures data reaches the backend even during broker outages. The REST API ingest endpoint stores the data in the same database, so no readings are lost.

Rate Limiting and Pagination



Production APIs need to protect against abuse and handle large result sets efficiently.

Rate Limiting

The Flask API already includes flask-limiter with a global limit of 100 requests per minute per IP. For sensitive endpoints (data ingestion, commands), you can set tighter limits:

Per-endpoint rate limits
@app.route("/api/devices/<device_id>/data", methods=["POST"])
@require_auth("write")
@limiter.limit("60 per minute") # stricter than the global 100/min
def ingest_data(device_id):
...
@app.route("/api/devices/<device_id>/command", methods=["POST"])
@require_auth("write")
@limiter.limit("10 per minute") # commands are expensive operations
def send_command(device_id):
...

When a client exceeds the limit, the server returns 429 Too Many Requests with a Retry-After header:

{
"status": "error",
"error": {
"code": 429,
"message": "Rate limit exceeded. Try again in 42 seconds."
}
}

Pagination

For endpoints that return lists (devices, readings), add offset and limit parameters:

Paginated device listing
@app.route("/api/devices", methods=["GET"])
@require_auth("read")
def list_devices():
offset = request.args.get("offset", 0, type=int)
limit = request.args.get("limit", 50, type=int)
limit = min(limit, 200) # cap maximum page size
conn = get_db()
total = conn.execute("SELECT COUNT(*) FROM devices").fetchone()[0]
rows = conn.execute(
"SELECT * FROM devices ORDER BY registered DESC LIMIT ? OFFSET ?",
(limit, offset)
).fetchall()
conn.close()
devices = [dict(r) for r in rows]
return jsonify({
"status": "ok",
"data": devices,
"pagination": {
"offset": offset,
"limit": limit,
"total": total,
"has_more": (offset + limit) < total,
}
})

Clients iterate through pages by incrementing offset:

Terminal window
# Page 1
curl "http://localhost:5000/api/devices?offset=0&limit=20" -H "..."
# Page 2
curl "http://localhost:5000/api/devices?offset=20&limit=20" -H "..."

API Authentication Patterns



The auth.py module demonstrates two common authentication methods. Here is a summary of the options you will encounter in IoT APIs.

API Keys

The simplest approach. The client includes a static key in every request.

Terminal window
curl -H "Authorization: Bearer swk_abc123..." http://localhost:5000/api/devices

This is the standard approach. The Authorization: Bearer pattern is recognized by proxies, load balancers, and API gateways.

Key Management Best Practices

PracticeWhy
Prefix keys with a service identifier (swk_)Easy to identify in logs and code scans
Hash keys before storing in the databaseA database leak does not expose raw keys
Support key rotation (issue new key, deprecate old)Limits exposure window if a key is compromised
Assign permissions per key (read, write, admin)Principle of least privilege
Set expiration dates on keysForces periodic rotation
Log key usage (not the key itself)Audit trail for troubleshooting

Bearer Tokens (JWT)

For more complex systems, JSON Web Tokens (JWT) carry claims (user ID, permissions, expiration) inside the token itself. The server verifies the signature without a database lookup. This is common in platforms like SiliconWit.io where a user logs in, receives a JWT, and uses it for subsequent API calls.

JWT verification (conceptual)
import jwt
SECRET_KEY = "your-jwt-secret"
def verify_jwt(token):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload # {"user_id": "...", "permissions": [...], "exp": ...}
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None

For this course, API keys are sufficient. JWT adds complexity that is only justified when you have user accounts with different roles.

Putting It All Together



Here is the complete architecture from this lesson in context with the previous lessons:

Full IoT Data Pipeline

Lesson 2: Mosquitto broker with TLS and ACLs. Lesson 3: ESP32, Pico W, and STM32 publish sensor data via MQTT. Lesson 4: Telegraf subscribes to MQTT topics and writes to InfluxDB. Grafana dashboards visualize the data. Lesson 5 (this lesson): REST API serves data on demand. Webhooks push alerts to external services. ESP32 falls back to HTTP when MQTT is unavailable.

The separation of concerns is clear:

  • MQTT handles the high-frequency, low-latency ingestion path.
  • REST handles the request/response, query, and management path.
  • Webhooks handle the event-driven notification path.

Each protocol does what it is best at. No single protocol handles everything well, and trying to force one into the wrong role leads to fragile, inefficient systems.

Exercises



  1. Add device registration via the REST API and verify with curl. Start the Flask server, register three devices with different types (sensor, actuator, gateway), ingest at least five readings per device, then query the historical data for one device using the start and end parameters. Export the results as a CSV file using a Python script that calls the API and writes the output.

  2. Implement a webhook relay that forwards events to Discord. Extend the webhook receiver so that when it receives a threshold_exceeded event with severity critical, it sends a message to a Discord channel using a Discord webhook URL. Test the full chain: send a test event to your receiver, verify the message appears in Discord. Add a severity_filter configuration so that only events above a certain severity (info, warning, critical) are forwarded.

  3. Build a combined MQTT subscriber and REST API server. Write a single Python application that subscribes to MQTT topics (using paho-mqtt) and also runs a Flask REST API. MQTT messages are stored in SQLite as they arrive. The REST API serves the stored data. Note that paho-mqtt runs callbacks in a background thread, so you must create a new sqlite3.connect() inside each callback rather than sharing a connection from the main thread (SQLite raises a threading error otherwise). Test by publishing MQTT messages from your ESP32, then querying the REST API from a browser or curl. This eliminates the need for Telegraf and InfluxDB for simple setups.

  4. Implement API key rotation. Extend the auth.py module to support key rotation. Add endpoints to generate a new key (POST /api/keys), list active keys (GET /api/keys), and revoke a key (DELETE /api/keys/{key_id}). Each key should have a creation timestamp and an optional expiration date. Test by generating a new key, using it successfully, revoking it, and confirming that subsequent requests with the revoked key return 401.

Summary



You built a complete REST API for IoT device management with endpoints for registration, data ingestion, historical queries, commands, and configuration updates. The API uses proper HTTP methods (GET, POST, PUT, DELETE), returns consistent JSON responses, authenticates requests with API keys, and enforces rate limits. You then built a webhook receiver that validates HMAC-SHA256 signatures to ensure only authentic events are processed. The ESP32 HTTP client provides a fallback data path when MQTT is unavailable. Together with the MQTT pipeline from previous lessons, you now have a three-protocol architecture: MQTT for real-time streaming, REST for on-demand queries and management, and webhooks for push notifications.

Comments

Loading comments...


© 2021-2026 SiliconWit®. All rights reserved.