InfluxDB and Telegraf for IoT Data Collection

Telegraf is InfluxData's open-source agent for collecting, processing, and forwarding metrics, and when combined with InfluxDB it forms a complete IoT data pipeline from sensor ingestion to time-series storage and alerting. This guide covers installing Telegraf, configuring the MQTT consumer plugin for IoT data, writing custom input plugins, transforming data, setting retention policies, and alerting on sensor thresholds.

Prerequisites

  • Ubuntu 20.04/22.04 or CentOS 8/Rocky Linux 8+
  • InfluxDB 2.x installed and running (see InfluxDB installation guide)
  • Mosquitto MQTT broker with IoT sensor data
  • At least 512 MB RAM
  • Root or sudo access

Install Telegraf

Ubuntu/Debian:

# Add InfluxData repository (if not already added for InfluxDB)
curl -fsSL https://repos.influxdata.com/influxdata-archive_compat.key \
  | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/influxdata.gpg

echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdata.gpg] https://repos.influxdata.com/debian stable main" \
  | sudo tee /etc/apt/sources.list.d/influxdata.list

sudo apt update
sudo apt install -y telegraf

sudo systemctl enable telegraf

CentOS/Rocky Linux:

sudo dnf install -y telegraf
sudo systemctl enable telegraf

Verify installation:

telegraf --version
telegraf --sample-config | head -30

Configure MQTT Consumer Plugin

Create the main Telegraf configuration for IoT data collection:

# Create a clean configuration file
cat > /etc/telegraf/telegraf.conf << 'EOF'
[global_tags]
  environment = "production"
  datacenter = "us-east"

[agent]
  interval = "10s"
  round_interval = true
  metric_batch_size = 5000
  metric_buffer_limit = 100000
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  hostname = ""
  omit_hostname = false

# Primary output: InfluxDB 2.x
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-influxdb-token"
  organization = "myorg"
  bucket = "iot-sensors"
  timeout = "5s"
EOF

Create the MQTT consumer configuration:

cat > /etc/telegraf/telegraf.d/mqtt.conf << 'EOF'
# MQTT Consumer - collect sensor data from IoT devices
[[inputs.mqtt_consumer]]
  # MQTT broker connection
  servers = ["tcp://localhost:1883"]

  # Topics to subscribe to
  # Use + for single-level wildcard, # for multi-level
  topics = [
    "home/+/temperature",
    "home/+/humidity",
    "home/+/battery",
    "home/+/pressure",
    "factory/+/sensors/#"
  ]

  # MQTT authentication
  username = "telegraf"
  password = "telegrafpassword"

  # QoS (0=at most once, 1=at least once, 2=exactly once)
  qos = 1

  # Client ID (must be unique per broker)
  client_id = "telegraf-iot-collector"

  # Data format of incoming MQTT messages
  data_format = "json"

  # JSON path configuration
  json_time_key = "timestamp"
  json_time_format = "unix"

  # Create a tag from the MQTT topic
  topic_tag = "mqtt_topic"
  
  # Extract tags from topic segments
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "home/+/+"
    measurement = "_/device_id/measurement"
    tags = "_/device_id/_"
    # This creates a "device_id" tag from the second topic segment
    # and uses the third segment as the measurement name

  # Timeout for initial connection
  connection_timeout = "30s"
EOF

Expected MQTT message format from devices:

{
  "temperature": 22.5,
  "unit": "C",
  "timestamp": 1704067200,
  "battery_pct": 87
}
# Create MQTT user for Telegraf
sudo mosquitto_passwd -b /etc/mosquitto/passwd telegraf telegrafpassword
sudo systemctl restart mosquitto

# Test Telegraf config
telegraf --config /etc/telegraf/telegraf.conf --config-directory /etc/telegraf/telegraf.d --test

# Start Telegraf
sudo systemctl start telegraf
sudo systemctl status telegraf

# Monitor incoming metrics
sudo journalctl -u telegraf -f

Custom Input Plugins

Use the exec plugin to collect data from custom scripts:

cat > /etc/telegraf/telegraf.d/custom-sensors.conf << 'EOF'
# Execute a custom script for sensor data
[[inputs.exec]]
  commands = ["/usr/local/bin/collect-custom-sensors.sh"]
  timeout = "10s"
  data_format = "influx"   # Output in InfluxDB line protocol
  interval = "30s"         # Override global interval for this plugin
EOF

# Create the custom sensor collection script
cat > /usr/local/bin/collect-custom-sensors.sh << 'SCRIPT'
#!/bin/bash
# Read temperature from DS18B20 sensor
TEMP=$(cat /sys/bus/w1/devices/28-*/temperature 2>/dev/null | awk '{printf "%.1f", $1/1000}')
if [ -n "$TEMP" ]; then
  echo "ds18b20_temp,sensor_id=28-00000abc1234 temperature=${TEMP} $(date +%s)000000000"
fi

# Read from DHT22 via Python
/usr/bin/python3 /usr/local/bin/read_dht22.py 2>/dev/null
SCRIPT
chmod +x /usr/local/bin/collect-custom-sensors.sh

Use the http plugin to poll REST APIs:

cat > /etc/telegraf/telegraf.d/rest-sensors.conf << 'EOF'
# Poll REST API for sensor data
[[inputs.http]]
  urls = [
    "http://192.168.1.50/api/sensors",
    "http://192.168.1.51/api/sensors"
  ]
  method = "GET"
  timeout = "5s"
  headers = {"Authorization" = "Bearer sensor-api-key"}
  data_format = "json"
  json_query = "data.readings"
  json_time_key = "recorded_at"
  json_time_format = "2006-01-02T15:04:05Z"
  
  name_override = "rest_sensor"
  [inputs.http.tags]
    source = "rest_api"
EOF

Data Transformation with Processors

Processors transform, filter, and enrich metrics before they are written to InfluxDB:

cat > /etc/telegraf/telegraf.d/processors.conf << 'EOF'
# Convert temperature from Fahrenheit to Celsius
[[processors.converter]]
  [processors.converter.fields]
    float = ["temperature", "humidity", "pressure"]

# Add tags based on device location lookup
[[processors.enum]]
  [[processors.enum.mapping]]
    field = "device_id"
    dest = "location"
    [processors.enum.mapping.value_mappings]
      "sensor001" = "living_room"
      "sensor002" = "bedroom"
      "sensor003" = "kitchen"
      "sensor004" = "garage"

# Filter out invalid readings
[[processors.starlark]]
  source = '''
def apply(metric):
    # Drop readings with invalid temperature (sensor error)
    temp = metric.fields.get("temperature", None)
    if temp is not None and (temp < -50 or temp > 100):
        return None  # Drop the metric
    
    # Add derived field: heat index
    temp = metric.fields.get("temperature")
    humidity = metric.fields.get("humidity")
    if temp is not None and humidity is not None:
        # Simple heat index approximation
        metric.fields["heat_index"] = temp + (humidity * 0.1)
    
    return metric
'''

# Rename fields for consistency
[[processors.rename]]
  [[processors.rename.replace]]
    field = "temp"
    dest = "temperature"
  [[processors.rename.replace]]
    field = "hum"
    dest = "humidity"
  [[processors.rename.replace]]
    field = "batt"
    dest = "battery_pct"

# Add geographic tags for IoT devices
[[processors.topk]]
  k = 10
  fields = ["temperature"]
  # Keep only top K measurements (use for anomaly detection output)
EOF

Retention Policies and Downsampling

Configure bucket retention and use Telegraf to write to multiple buckets with different retention:

# Create buckets with different retentions in InfluxDB
influx bucket create --name iot-sensors --org myorg --retention 7d     # Raw data: 7 days
influx bucket create --name iot-hourly --org myorg --retention 90d     # Hourly avg: 90 days
influx bucket create --name iot-daily --org myorg --retention 365d     # Daily avg: 1 year

# Create a Telegraf aggregator to compute hourly averages
cat > /etc/telegraf/telegraf.d/aggregators.conf << 'EOF'
# Compute hourly statistics and write to long-term bucket
[[aggregators.basicstats]]
  period = "1h"              # Aggregate over 1 hour
  delay = "0s"
  grace = "0s"
  drop_original = false      # Keep original (raw) metrics too
  stats = ["mean", "min", "max", "stdev", "count"]
  
  # Only aggregate sensor measurements
  namepass = ["temperature", "humidity", "pressure"]
  
  [aggregators.basicstats.tags]
    aggregation_type = "hourly"
EOF

# Write aggregated metrics to the hourly bucket via separate output
cat >> /etc/telegraf/telegraf.d/aggregators.conf << 'EOF'
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-influxdb-token"
  organization = "myorg"
  bucket = "iot-hourly"
  
  # Only write metrics tagged with aggregation_type=hourly
  [outputs.influxdb_v2.tagpass]
    aggregation_type = ["hourly"]
EOF

InfluxDB tasks for downsampling (run in InfluxDB UI or CLI):

// InfluxDB Task: Downsample hourly averages to daily bucket
option task = {
    name: "Downsample IoT Hourly to Daily",
    every: 1d,
    offset: 5m
}

from(bucket: "iot-hourly")
  |> range(start: -1d, stop: now())
  |> filter(fn: (r) => r._measurement =~ /temperature|humidity|pressure/)
  |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)
  |> set(key: "aggregation_type", value: "daily")
  |> to(bucket: "iot-daily", org: "myorg")

Alerting on Sensor Thresholds

Use InfluxDB Checks and Notifications for threshold alerting:

# Create a threshold check via InfluxDB UI:
# Alerting > Checks > Create Check > Threshold Check

# Or via Flux script (run in InfluxDB Tasks):
// InfluxDB Task: Alert on high temperature
option task = {
    name: "Temperature Threshold Alert",
    every: 1m
}

import "slack"
import "influxdata/influxdb/monitor"

data = from(bucket: "iot-sensors")
    |> range(start: -2m)
    |> filter(fn: (r) => r._measurement == "temperature")
    |> filter(fn: (r) => r._field == "value")
    |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
    |> last()

data
    |> filter(fn: (r) => r._value > 35.0)
    |> map(fn: (r) => ({r with alert_message: "High temperature on device ${r.device_id}: ${r._value}°C"}))
    |> slack.message(
        url: "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
        token: "",
        channel: "#iot-alerts",
        text: "Temperature alert: ${r.alert_message}",
        color: "danger"
    )

Telegraf-based alerting using the execd output:

cat > /etc/telegraf/telegraf.d/alerting.conf << 'EOF'
# Use execd output to run a script when thresholds exceeded
[[outputs.execd]]
  command = ["/usr/local/bin/iot-alert-handler.sh"]
  signal = "STDIN"
  
  # Only forward metrics that exceed threshold (filtered earlier)
  [outputs.execd.tagpass]
    threshold_exceeded = ["true"]
EOF

cat > /usr/local/bin/iot-alert-handler.sh << 'SCRIPT'
#!/bin/bash
# Read line protocol from stdin
while IFS= read -r line; do
  # Parse measurement and value from line protocol
  measurement=$(echo "$line" | awk '{print $1}' | cut -d',' -f1)
  value=$(echo "$line" | grep -oP '(?<=value=)[0-9.]+')
  device=$(echo "$line" | grep -oP '(?<=device_id=)[a-zA-Z0-9_]+')
  
  # Send Slack notification
  curl -s -X POST "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
    -H 'Content-Type: application/json' \
    -d "{\"text\": \"Alert: ${measurement} threshold exceeded on ${device}: ${value}\"}"
done
SCRIPT
chmod +x /usr/local/bin/iot-alert-handler.sh

Multiple Output Destinations

Write the same IoT data to multiple backends:

cat > /etc/telegraf/telegraf.d/outputs.conf << 'EOF'
# Primary: InfluxDB for dashboards
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-influxdb-token"
  organization = "myorg"
  bucket = "iot-sensors"

# Secondary: MQTT - republish processed data
[[outputs.mqtt]]
  servers = ["tcp://localhost:1883"]
  topic_prefix = "processed"
  username = "telegraf"
  password = "telegrafpassword"
  data_format = "json"

# Tertiary: File output for backup/archiving
[[outputs.file]]
  files = ["/var/log/telegraf/iot-data.log"]
  rotation_interval = "24h"
  rotation_max_size = "100MB"
  rotation_max_archives = 7
  data_format = "csv"
  csv_header = true
  
  # Only log temperature and humidity to file
  namepass = ["temperature", "humidity"]
EOF

Monitoring Telegraf Itself

cat > /etc/telegraf/telegraf.d/internal.conf << 'EOF'
# Monitor Telegraf's own metrics
[[inputs.internal]]
  collect_memstats = true

[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "your-influxdb-token"
  organization = "myorg"
  bucket = "telegraf-metrics"
  
  # Write internal Telegraf metrics to separate bucket
  [outputs.influxdb_v2.tagpass]
    measurement = ["internal_*"]
EOF

# Key internal metrics:
# internal_agent.gather_errors   - input plugin errors
# internal_agent.metrics_dropped - metrics dropped (buffer full)
# internal_agent.metrics_written - successfully written metrics
# internal_write.metrics_written - per-output write counts

# Query Telegraf internal metrics
influx query --org myorg '
from(bucket: "telegraf-metrics")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "internal_agent")
  |> filter(fn: (r) => r._field == "gather_errors")
  |> last()
'

Troubleshooting

MQTT consumer not receiving messages:

# Test MQTT subscription manually
mosquitto_sub -h localhost -p 1883 \
  -u telegraf -P telegrafpassword \
  -t "home/#" -v

# Check Telegraf logs
sudo journalctl -u telegraf -n 50 | grep -i mqtt

# Run Telegraf in test mode
telegraf --config /etc/telegraf/telegraf.conf \
  --config-directory /etc/telegraf/telegraf.d \
  --input-filter mqtt_consumer \
  --test

Metrics not appearing in InfluxDB:

# Check for write errors in Telegraf logs
sudo journalctl -u telegraf -n 50 | grep -i "error\|E!"

# Test InfluxDB connectivity
curl -X POST "http://localhost:8086/api/v2/write?org=myorg&bucket=iot-sensors" \
  -H "Authorization: Token your-token" \
  --data-raw "test_metric value=1.0 $(date +%s)000000000"

# Verify token has write permission to the bucket
influx auth list | grep iot-sensors

High memory usage:

# Check buffer usage
telegraf --config /etc/telegraf/telegraf.conf --test 2>&1 | grep -i buffer

# Reduce buffer limits in [agent]:
# metric_buffer_limit = 10000  (reduce from 100000)
# metric_batch_size = 1000     (reduce from 5000)

# Check for slow outputs blocking the buffer
sudo journalctl -u telegraf | grep "output wrote batch"

Starlark processor errors:

# Test Starlark script syntax
telegraf --config /etc/telegraf/telegraf.conf \
  --plugin-filter "starlark" \
  --test 2>&1 | grep -i "starlark\|error"

# Common issue: wrong indentation (Starlark is Python-like)

Conclusion

The Telegraf + InfluxDB stack provides a production-ready IoT data pipeline that scales from a single sensor to millions of data points per second. The MQTT consumer plugin requires zero custom code to ingest data from standard IoT devices, processors handle data transformation and validation, and InfluxDB tasks automate downsampling for long-term storage efficiency. Configure separate buckets with appropriate retention policies to balance query performance, storage costs, and data availability.