InfluxDB y Telegraf para Recolección de Datos IoT

InfluxDB es la base de datos de series temporales más popular para almacenar métricas de sensores IoT, y Telegraf es su agente de recolección de datos con más de 300 plugins de entrada listos para usar. Juntos forman la base del stack de monitoreo IoT: Telegraf recoge datos de sensores MQTT, transforma los valores y los escribe en InfluxDB, donde quedan disponibles para análisis y visualización con Grafana. Esta guía cubre la instalación completa, la configuración del plugin MQTT consumer de Telegraf, las políticas de retención y la configuración de alertas sobre umbrales de sensores.

Requisitos Previos

  • Ubuntu 20.04/22.04 o CentOS/Rocky 8/9
  • Mínimo 4 GB RAM y 2 vCPU (8 GB RAM recomendado para producción)
  • 50 GB de espacio en disco (SSD preferible)
  • Un broker MQTT operativo (Mosquitto u otro) con datos de sensores
  • Puerto 8086 disponible para InfluxDB

Instalación de InfluxDB 2.x

Ubuntu/Debian

# Añadir el repositorio oficial de InfluxData
curl -s https://repos.influxdata.com/influxdata-archive.key | \
  sudo gpg --dearmor -o /usr/share/keyrings/influxdata-archive-keyring.gpg

echo "deb [signed-by=/usr/share/keyrings/influxdata-archive-keyring.gpg] \
  https://repos.influxdata.com/debian stable main" | \
  sudo tee /etc/apt/sources.list.d/influxdata.list

sudo apt-get update
sudo apt-get install -y influxdb2

# Habilitar e iniciar el servicio
sudo systemctl enable influxdb
sudo systemctl start influxdb

# Verificar que InfluxDB está respondiendo
curl http://localhost:8086/health

CentOS/Rocky Linux

# Añadir el repositorio de InfluxData para RPM
sudo tee /etc/yum.repos.d/influxdata.repo << 'EOF'
[influxdata]
name = InfluxData Repository
baseurl = https://repos.influxdata.com/rhel/$releasever/$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdata-archive.key
EOF

sudo dnf install -y influxdb2
sudo systemctl enable --now influxdb

Configuración del sistema operativo para producción

# Ajustar los límites del sistema para InfluxDB
sudo tee /etc/security/limits.d/influxdb.conf << 'EOF'
influxdb soft nofile 65536
influxdb hard nofile 65536
EOF

# Configurar el directorio de datos en disco SSD si está disponible
sudo tee /etc/influxdb/config.toml << 'EOF'
[storage]
  # Directorio principal de datos
  engine-path = "/var/lib/influxdb/engine"

[http]
  # Enlazar solo a localhost si Telegraf está en el mismo servidor
  bind-address = ":8086"
  
[log]
  level = "warn"
EOF

sudo systemctl restart influxdb

Configuración Inicial de InfluxDB

# Configurar InfluxDB mediante la CLI (primera vez)
influx setup \
  --username admin \
  --password TuPasswordSeguro \
  --token mi-token-super-secreto \
  --org mi-organizacion \
  --bucket sensores-iot \
  --retention 30d \
  --force

# Verificar que la organización y el bucket se han creado
influx org list
influx bucket list

# Crear buckets adicionales por tipo de dato
influx bucket create \
  --name metricas-sistema \
  --org mi-organizacion \
  --retention 7d

influx bucket create \
  --name datos-historicos \
  --org mi-organizacion \
  --retention 365d

# Generar un token de acceso para Telegraf (solo escritura)
influx auth create \
  --org mi-organizacion \
  --description "Token Telegraf - Solo escritura" \
  --write-bucket sensores-iot \
  --write-bucket metricas-sistema

# Guardar el token generado en una variable de entorno
export TELEGRAF_TOKEN="el-token-generado"

Instalación de Telegraf

# Instalar Telegraf (mismo repositorio que InfluxDB)
sudo apt-get install -y telegraf   # Ubuntu/Debian
sudo dnf install -y telegraf       # CentOS/Rocky

# Habilitar el servicio (no iniciar todavía, primero configurar)
sudo systemctl enable telegraf

# Generar una configuración base para IoT
# Esto crea un archivo de configuración con todos los plugins comentados
telegraf config > /tmp/telegraf-completo.conf

# Crear la configuración personalizada para IoT
sudo mkdir -p /etc/telegraf/telegraf.d

Plugin MQTT Consumer

El plugin mqtt_consumer de Telegraf suscribe a topics MQTT y escribe los mensajes en InfluxDB:

# Crear la configuración del agente Telegraf
sudo tee /etc/telegraf/telegraf.conf << 'EOF'
# Configuración global del agente
[agent]
  # Intervalo de recolección por defecto
  interval = "10s"
  # Redondear el intervalo al múltiplo más cercano
  round_interval = true
  # Tamaño del lote para escritura en InfluxDB
  metric_batch_size = 1000
  # Tiempo máximo de espera antes de flushing
  metric_buffer_limit = 10000
  flush_interval = "10s"
  flush_jitter = "2s"
  # Nivel de logs
  logfile = "/var/log/telegraf/telegraf.log"
  logfile_rotation_max_size = "10MB"
  logfile_rotation_max_archives = 5
EOF

# Crear la configuración del plugin de salida (InfluxDB)
sudo tee /etc/telegraf/telegraf.d/output-influxdb.conf << 'EOF'
# Salida: InfluxDB 2.x
[[outputs.influxdb_v2]]
  urls = ["http://localhost:8086"]
  token = "TU_TOKEN_DE_TELEGRAF"
  organization = "mi-organizacion"
  bucket = "sensores-iot"
  # Timeout para las escrituras
  timeout = "5s"
  # Reintentar escrituras fallidas
  retry_max_time = "300s"
EOF

# Crear la configuración del plugin MQTT Consumer
sudo tee /etc/telegraf/telegraf.d/input-mqtt.conf << 'EOF'
# Entrada: Broker MQTT - Datos de sensores IoT
[[inputs.mqtt_consumer]]
  # Dirección del broker MQTT
  servers = ["tcp://localhost:1883"]
  
  # Topics a los que suscribirse (soporte de comodines MQTT)
  topics = [
    "sensores/+/temperatura",
    "sensores/+/humedad",
    "sensores/+/presion",
    "sensores/+/estado"
  ]
  
  # Credenciales del broker MQTT
  username = "telegraf"
  password = "TuPasswordMQTT"
  
  # Identificador único del cliente MQTT
  client_id = "telegraf-iot-collector"
  
  # QoS para las suscripciones (0=at most once, 1=at least once, 2=exactly once)
  qos = 1
  
  # Mantener las sesiones entre reinicios de Telegraf
  persistent_session = true
  
  # Formato del mensaje MQTT
  # Usar "json" si los sensores envían JSON, "value" para valores simples
  data_format = "json"
  
  # Extraer el nombre del sensor del topic MQTT
  # El topic "sensores/sala-principal/temperatura" genera:
  #   - tag sensor_id = "sala-principal"
  #   - tag tipo = "temperatura"
  topic_tag = "topic_completo"
  
  # Parsear el topic para extraer tags
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "sensores/+/+"
    tags = "_/sensor_id/tipo_medida"
EOF

Manejo de diferentes formatos de mensaje MQTT

# Si los sensores envían JSON como: {"valor": 23.5, "bateria": 87, "ts": 1704067200}
sudo tee /etc/telegraf/telegraf.d/input-mqtt-json.conf << 'EOF'
[[inputs.mqtt_consumer]]
  servers = ["tcp://localhost:1883"]
  topics = ["sensores/#"]
  username = "telegraf"
  password = "TuPasswordMQTT"
  client_id = "telegraf-json-parser"
  qos = 1
  data_format = "json"
  
  # Mapear campos del JSON a fields de InfluxDB
  json_string_fields = ["estado", "unidad"]
  
  # Nombre de la medición en InfluxDB
  name_override = "lectura_sensor"
  
  # Parsear el topic para extraer tags automáticamente
  [[inputs.mqtt_consumer.topic_parsing]]
    topic = "sensores/+/+"
    tags = "_/ubicacion/tipo"
    
  # Filtrar valores fuera de rango (temperatura entre -50 y 150 grados)
  [inputs.mqtt_consumer.tagpass]
    tipo = ["temperatura", "humedad", "presion"]
EOF

Plugins de Entrada Personalizados

Telegraf puede recolectar datos de múltiples fuentes además de MQTT:

# Plugin para leer datos de una API HTTP REST (ej: sensores con API propia)
sudo tee /etc/telegraf/telegraf.d/input-http-sensors.conf << 'EOF'
[[inputs.http]]
  # URLs de los sensores con API REST
  urls = [
    "http://sensor-cocina.local/api/datos",
    "http://sensor-garage.local/api/datos"
  ]
  method = "GET"
  timeout = "5s"
  data_format = "json"
  
  # Añadir tags estáticos para identificar la fuente
  [inputs.http.tags]
    fuente = "api-rest"
    red = "iot-local"

# Plugin para leer datos de un archivo CSV (exportaciones de dispositivos)
[[inputs.file]]
  files = ["/var/iot/exportaciones/*.csv"]
  data_format = "csv"
  csv_header_row_count = 1
  csv_timestamp_column = "timestamp"
  csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
  csv_tag_columns = ["sensor_id", "ubicacion"]
  
# Plugin para métricas del sistema del servidor Telegraf
[[inputs.cpu]]
  percpu = false
  totalcpu = true
  
[[inputs.mem]]

[[inputs.disk]]
  ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
EOF

sudo systemctl start telegraf
sudo systemctl status telegraf

Transformación de Datos

Telegraf incluye procesadores para transformar los datos antes de enviarlos a InfluxDB:

# Crear configuración de procesadores de datos
sudo tee /etc/telegraf/telegraf.d/processors.conf << 'EOF'
# Procesador: Renombrar campos para consistencia
[[processors.rename]]
  [[processors.rename.replace]]
    field = "temp"
    dest = "temperatura"
  [[processors.rename.replace]]
    field = "hum"
    dest = "humedad"

# Procesador: Añadir tags derivados del hostname
[[processors.add_local_origin]]

# Procesador: Filtrar métricas con valores nulos
[[processors.filter]]
  fielddrop = ["*_null"]

# Procesador: Calcular la media móvil de temperatura
[[processors.starlark]]
  namepass = ["lectura_sensor"]
  source = '''
def apply(metric):
    # Convertir Fahrenheit a Celsius si la unidad es "F"
    if metric.fields.get("unidad", "") == "F":
        temp_f = metric.fields.get("temperatura", 0)
        metric.fields["temperatura"] = (temp_f - 32) * 5 / 9
        metric.fields["unidad"] = "C"
    return metric
'''

# Agregador: Calcular estadísticas por ventana de tiempo
[[aggregators.basicstats]]
  period = "1m"
  drop_original = false
  stats = ["mean", "min", "max", "stdev"]
  namepass = ["lectura_sensor"]
  fieldsinclude = ["temperatura", "humedad"]
EOF

sudo systemctl restart telegraf

Políticas de Retención

InfluxDB 2.x gestiona la retención a nivel de bucket, con tareas de downsampling para datos históricos:

# Ver los buckets y sus políticas de retención actuales
influx bucket list --org mi-organizacion

# Crear una tarea de downsampling: resumir datos horarios en datos diarios
influx task create --org mi-organizacion --file - << 'EOF'
option task = {
  name: "Downsampling diario IoT",
  every: 1d,
  offset: 1h
}

// Leer datos de las últimas 25 horas del bucket de alta resolución
data = from(bucket: "sensores-iot")
  |> range(start: -25h, stop: -1h)
  |> filter(fn: (r) => r._measurement == "lectura_sensor")

// Calcular estadísticas por hora
data
  |> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
  |> set(key: "_measurement", value: "lectura_sensor_horaria")
  |> to(bucket: "datos-historicos", org: "mi-organizacion")
EOF

# Verificar las tareas creadas
influx task list --org mi-organizacion

Alertas sobre Umbrales de Sensores

InfluxDB 2.x tiene un sistema de alertas nativo con checks y notificaciones:

# Crear un check de umbral para temperatura alta
influx check create threshold \
  --name "Temperatura Sala Alta" \
  --org mi-organizacion \
  --every 1m \
  --query-all \
  --status-message-template "Temperatura en sala: {{ index .Values \"_value\" }}°C" \
  --warn-above 30 \
  --crit-above 35

# Crear un endpoint de notificaciones (webhook)
influx notification-endpoint create http \
  --name "Webhook Alertas" \
  --org mi-organizacion \
  --url "https://hooks.tudominio.com/alertas" \
  --method POST \
  --content-template '{"alerta": "{{ .Name }}", "estado": "{{ .Level }}", "mensaje": "{{ .Message }}"}'

# Crear una regla de notificación: enviar al webhook cuando el estado sea CRIT
influx notification-rule create \
  --name "Alerta Temperatura Crítica" \
  --org mi-organizacion \
  --check-name "Temperatura Sala Alta" \
  --endpoint-name "Webhook Alertas" \
  --crit

Crear una tarea de alerta con Flux para mayor flexibilidad:

// Tarea Flux: Detectar y alertar anomalías en sensores
option task = {
  name: "Detector de anomalías IoT",
  every: 5m
}

// Calcular la media y desviación estándar de las últimas 2 horas
estadisticas = from(bucket: "sensores-iot")
  |> range(start: -2h)
  |> filter(fn: (r) => r._measurement == "lectura_sensor")
  |> filter(fn: (r) => r._field == "temperatura")
  |> group(columns: ["sensor_id"])
  |> reduce(
      identity: {suma: 0.0, suma_cuad: 0.0, count: 0},
      fn: (r, accumulator) => ({
          suma: accumulator.suma + r._value,
          suma_cuad: accumulator.suma_cuad + r._value * r._value,
          count: accumulator.count + 1
      })
  )

// Escribir anomalías detectadas en un bucket separado
from(bucket: "sensores-iot")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "lectura_sensor")
  |> filter(fn: (r) => r._field == "temperatura")
  |> filter(fn: (r) => r._value > 35.0 or r._value < -5.0)
  |> map(fn: (r) => ({r with _measurement: "alertas_sensor"}))
  |> to(bucket: "sensores-iot", org: "mi-organizacion")

Solución de Problemas

Telegraf no se conecta al broker MQTT:

# Verificar los logs de Telegraf
sudo journalctl -u telegraf --no-pager -n 50

# Probar la conexión MQTT manualmente
mosquitto_sub -h localhost -u telegraf -P TuPassword -t "sensores/#" -v

# Verificar la configuración de Telegraf sin iniciar
telegraf --config /etc/telegraf/telegraf.conf --test

Los datos no aparecen en InfluxDB:

# Comprobar que el token es correcto
influx auth list --org mi-organizacion

# Probar la escritura directa en InfluxDB
influx write \
  --org mi-organizacion \
  --bucket sensores-iot \
  --token TU_TOKEN \
  "prueba,sensor_id=test temperatura=23.5"

# Verificar los datos escritos
influx query \
  --org mi-organizacion \
  --token TU_TOKEN \
  'from(bucket:"sensores-iot") |> range(start:-5m) |> limit(n:10)'

El uso de memoria de InfluxDB es excesivo:

# Limitar el uso de caché en la configuración
sudo tee -a /etc/influxdb/config.toml << 'EOF'
[storage]
  cache-max-memory-size = "1gb"
  cache-snapshot-memory-size = "25mb"
  cache-snapshot-write-cold-duration = "10m"
  compact-full-write-cold-duration = "4h"
EOF

sudo systemctl restart influxdb

Las tareas de downsampling no se ejecutan:

# Listar y verificar las tareas
influx task list --org mi-organizacion

# Ver los logs de ejecución de una tarea
influx task log list --task-id ID_DE_LA_TAREA --org mi-organizacion

# Ejecutar una tarea manualmente para probarla
influx task run start --task-id ID_DE_LA_TAREA

Conclusión

La combinación de Telegraf e InfluxDB forma la columna vertebral de cualquier stack de monitoreo IoT: Telegraf gestiona la complejidad de recoger datos de múltiples protocolos (MQTT, HTTP, archivos, APIs) y los transforma antes de entregarlos a InfluxDB, donde se almacenan de forma eficiente con políticas de retención y downsampling automático. Con el sistema de alertas nativo de InfluxDB puedes detectar anomalías y condiciones críticas en tiempo real sin depender de herramientas externas. Este stack se integra directamente con Grafana para la visualización, completando una plataforma IoT completa de código abierto.