InfluxDB y Telegraf para Recolección de Datos IoT
InfluxDB es la base de datos de series temporales más popular para almacenar métricas de sensores IoT, y Telegraf es su agente de recolección de datos con más de 300 plugins de entrada listos para usar. Juntos forman la base del stack de monitoreo IoT: Telegraf recoge datos de sensores MQTT, transforma los valores y los escribe en InfluxDB, donde quedan disponibles para análisis y visualización con Grafana. Esta guía cubre la instalación completa, la configuración del plugin MQTT consumer de Telegraf, las políticas de retención y la configuración de alertas sobre umbrales de sensores.
Requisitos Previos
- Ubuntu 20.04/22.04 o CentOS/Rocky 8/9
- Mínimo 4 GB RAM y 2 vCPU (8 GB RAM recomendado para producción)
- 50 GB de espacio en disco (SSD preferible)
- Un broker MQTT operativo (Mosquitto u otro) con datos de sensores
- Puerto 8086 disponible para InfluxDB
Instalación de InfluxDB 2.x
Ubuntu/Debian
# Añadir el repositorio oficial de InfluxData
curl -s https://repos.influxdata.com/influxdata-archive.key | \
sudo gpg --dearmor -o /usr/share/keyrings/influxdata-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/influxdata-archive-keyring.gpg] \
https://repos.influxdata.com/debian stable main" | \
sudo tee /etc/apt/sources.list.d/influxdata.list
sudo apt-get update
sudo apt-get install -y influxdb2
# Habilitar e iniciar el servicio
sudo systemctl enable influxdb
sudo systemctl start influxdb
# Verificar que InfluxDB está respondiendo
curl http://localhost:8086/health
CentOS/Rocky Linux
# Añadir el repositorio de InfluxData para RPM
sudo tee /etc/yum.repos.d/influxdata.repo << 'EOF'
[influxdata]
name = InfluxData Repository
baseurl = https://repos.influxdata.com/rhel/$releasever/$basearch/stable
enabled = 1
gpgcheck = 1
gpgkey = https://repos.influxdata.com/influxdata-archive.key
EOF
sudo dnf install -y influxdb2
sudo systemctl enable --now influxdb
Configuración del sistema operativo para producción
# Ajustar los límites del sistema para InfluxDB
sudo tee /etc/security/limits.d/influxdb.conf << 'EOF'
influxdb soft nofile 65536
influxdb hard nofile 65536
EOF
# Configurar el directorio de datos en disco SSD si está disponible
sudo tee /etc/influxdb/config.toml << 'EOF'
[storage]
# Directorio principal de datos
engine-path = "/var/lib/influxdb/engine"
[http]
# Enlazar solo a localhost si Telegraf está en el mismo servidor
bind-address = ":8086"
[log]
level = "warn"
EOF
sudo systemctl restart influxdb
Configuración Inicial de InfluxDB
# Configurar InfluxDB mediante la CLI (primera vez)
influx setup \
--username admin \
--password TuPasswordSeguro \
--token mi-token-super-secreto \
--org mi-organizacion \
--bucket sensores-iot \
--retention 30d \
--force
# Verificar que la organización y el bucket se han creado
influx org list
influx bucket list
# Crear buckets adicionales por tipo de dato
influx bucket create \
--name metricas-sistema \
--org mi-organizacion \
--retention 7d
influx bucket create \
--name datos-historicos \
--org mi-organizacion \
--retention 365d
# Generar un token de acceso para Telegraf (solo escritura)
influx auth create \
--org mi-organizacion \
--description "Token Telegraf - Solo escritura" \
--write-bucket sensores-iot \
--write-bucket metricas-sistema
# Guardar el token generado en una variable de entorno
export TELEGRAF_TOKEN="el-token-generado"
Instalación de Telegraf
# Instalar Telegraf (mismo repositorio que InfluxDB)
sudo apt-get install -y telegraf # Ubuntu/Debian
sudo dnf install -y telegraf # CentOS/Rocky
# Habilitar el servicio (no iniciar todavía, primero configurar)
sudo systemctl enable telegraf
# Generar una configuración base para IoT
# Esto crea un archivo de configuración con todos los plugins comentados
telegraf config > /tmp/telegraf-completo.conf
# Crear la configuración personalizada para IoT
sudo mkdir -p /etc/telegraf/telegraf.d
Plugin MQTT Consumer
El plugin mqtt_consumer de Telegraf suscribe a topics MQTT y escribe los mensajes en InfluxDB:
# Crear la configuración del agente Telegraf
sudo tee /etc/telegraf/telegraf.conf << 'EOF'
# Configuración global del agente
[agent]
# Intervalo de recolección por defecto
interval = "10s"
# Redondear el intervalo al múltiplo más cercano
round_interval = true
# Tamaño del lote para escritura en InfluxDB
metric_batch_size = 1000
# Tiempo máximo de espera antes de flushing
metric_buffer_limit = 10000
flush_interval = "10s"
flush_jitter = "2s"
# Nivel de logs
logfile = "/var/log/telegraf/telegraf.log"
logfile_rotation_max_size = "10MB"
logfile_rotation_max_archives = 5
EOF
# Crear la configuración del plugin de salida (InfluxDB)
sudo tee /etc/telegraf/telegraf.d/output-influxdb.conf << 'EOF'
# Salida: InfluxDB 2.x
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "TU_TOKEN_DE_TELEGRAF"
organization = "mi-organizacion"
bucket = "sensores-iot"
# Timeout para las escrituras
timeout = "5s"
# Reintentar escrituras fallidas
retry_max_time = "300s"
EOF
# Crear la configuración del plugin MQTT Consumer
sudo tee /etc/telegraf/telegraf.d/input-mqtt.conf << 'EOF'
# Entrada: Broker MQTT - Datos de sensores IoT
[[inputs.mqtt_consumer]]
# Dirección del broker MQTT
servers = ["tcp://localhost:1883"]
# Topics a los que suscribirse (soporte de comodines MQTT)
topics = [
"sensores/+/temperatura",
"sensores/+/humedad",
"sensores/+/presion",
"sensores/+/estado"
]
# Credenciales del broker MQTT
username = "telegraf"
password = "TuPasswordMQTT"
# Identificador único del cliente MQTT
client_id = "telegraf-iot-collector"
# QoS para las suscripciones (0=at most once, 1=at least once, 2=exactly once)
qos = 1
# Mantener las sesiones entre reinicios de Telegraf
persistent_session = true
# Formato del mensaje MQTT
# Usar "json" si los sensores envían JSON, "value" para valores simples
data_format = "json"
# Extraer el nombre del sensor del topic MQTT
# El topic "sensores/sala-principal/temperatura" genera:
# - tag sensor_id = "sala-principal"
# - tag tipo = "temperatura"
topic_tag = "topic_completo"
# Parsear el topic para extraer tags
[[inputs.mqtt_consumer.topic_parsing]]
topic = "sensores/+/+"
tags = "_/sensor_id/tipo_medida"
EOF
Manejo de diferentes formatos de mensaje MQTT
# Si los sensores envían JSON como: {"valor": 23.5, "bateria": 87, "ts": 1704067200}
sudo tee /etc/telegraf/telegraf.d/input-mqtt-json.conf << 'EOF'
[[inputs.mqtt_consumer]]
servers = ["tcp://localhost:1883"]
topics = ["sensores/#"]
username = "telegraf"
password = "TuPasswordMQTT"
client_id = "telegraf-json-parser"
qos = 1
data_format = "json"
# Mapear campos del JSON a fields de InfluxDB
json_string_fields = ["estado", "unidad"]
# Nombre de la medición en InfluxDB
name_override = "lectura_sensor"
# Parsear el topic para extraer tags automáticamente
[[inputs.mqtt_consumer.topic_parsing]]
topic = "sensores/+/+"
tags = "_/ubicacion/tipo"
# Filtrar valores fuera de rango (temperatura entre -50 y 150 grados)
[inputs.mqtt_consumer.tagpass]
tipo = ["temperatura", "humedad", "presion"]
EOF
Plugins de Entrada Personalizados
Telegraf puede recolectar datos de múltiples fuentes además de MQTT:
# Plugin para leer datos de una API HTTP REST (ej: sensores con API propia)
sudo tee /etc/telegraf/telegraf.d/input-http-sensors.conf << 'EOF'
[[inputs.http]]
# URLs de los sensores con API REST
urls = [
"http://sensor-cocina.local/api/datos",
"http://sensor-garage.local/api/datos"
]
method = "GET"
timeout = "5s"
data_format = "json"
# Añadir tags estáticos para identificar la fuente
[inputs.http.tags]
fuente = "api-rest"
red = "iot-local"
# Plugin para leer datos de un archivo CSV (exportaciones de dispositivos)
[[inputs.file]]
files = ["/var/iot/exportaciones/*.csv"]
data_format = "csv"
csv_header_row_count = 1
csv_timestamp_column = "timestamp"
csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
csv_tag_columns = ["sensor_id", "ubicacion"]
# Plugin para métricas del sistema del servidor Telegraf
[[inputs.cpu]]
percpu = false
totalcpu = true
[[inputs.mem]]
[[inputs.disk]]
ignore_fs = ["tmpfs", "devtmpfs", "devfs", "iso9660", "overlay", "aufs", "squashfs"]
EOF
sudo systemctl start telegraf
sudo systemctl status telegraf
Transformación de Datos
Telegraf incluye procesadores para transformar los datos antes de enviarlos a InfluxDB:
# Crear configuración de procesadores de datos
sudo tee /etc/telegraf/telegraf.d/processors.conf << 'EOF'
# Procesador: Renombrar campos para consistencia
[[processors.rename]]
[[processors.rename.replace]]
field = "temp"
dest = "temperatura"
[[processors.rename.replace]]
field = "hum"
dest = "humedad"
# Procesador: Añadir tags derivados del hostname
[[processors.add_local_origin]]
# Procesador: Filtrar métricas con valores nulos
[[processors.filter]]
fielddrop = ["*_null"]
# Procesador: Calcular la media móvil de temperatura
[[processors.starlark]]
namepass = ["lectura_sensor"]
source = '''
def apply(metric):
# Convertir Fahrenheit a Celsius si la unidad es "F"
if metric.fields.get("unidad", "") == "F":
temp_f = metric.fields.get("temperatura", 0)
metric.fields["temperatura"] = (temp_f - 32) * 5 / 9
metric.fields["unidad"] = "C"
return metric
'''
# Agregador: Calcular estadísticas por ventana de tiempo
[[aggregators.basicstats]]
period = "1m"
drop_original = false
stats = ["mean", "min", "max", "stdev"]
namepass = ["lectura_sensor"]
fieldsinclude = ["temperatura", "humedad"]
EOF
sudo systemctl restart telegraf
Políticas de Retención
InfluxDB 2.x gestiona la retención a nivel de bucket, con tareas de downsampling para datos históricos:
# Ver los buckets y sus políticas de retención actuales
influx bucket list --org mi-organizacion
# Crear una tarea de downsampling: resumir datos horarios en datos diarios
influx task create --org mi-organizacion --file - << 'EOF'
option task = {
name: "Downsampling diario IoT",
every: 1d,
offset: 1h
}
// Leer datos de las últimas 25 horas del bucket de alta resolución
data = from(bucket: "sensores-iot")
|> range(start: -25h, stop: -1h)
|> filter(fn: (r) => r._measurement == "lectura_sensor")
// Calcular estadísticas por hora
data
|> aggregateWindow(every: 1h, fn: mean, createEmpty: false)
|> set(key: "_measurement", value: "lectura_sensor_horaria")
|> to(bucket: "datos-historicos", org: "mi-organizacion")
EOF
# Verificar las tareas creadas
influx task list --org mi-organizacion
Alertas sobre Umbrales de Sensores
InfluxDB 2.x tiene un sistema de alertas nativo con checks y notificaciones:
# Crear un check de umbral para temperatura alta
influx check create threshold \
--name "Temperatura Sala Alta" \
--org mi-organizacion \
--every 1m \
--query-all \
--status-message-template "Temperatura en sala: {{ index .Values \"_value\" }}°C" \
--warn-above 30 \
--crit-above 35
# Crear un endpoint de notificaciones (webhook)
influx notification-endpoint create http \
--name "Webhook Alertas" \
--org mi-organizacion \
--url "https://hooks.tudominio.com/alertas" \
--method POST \
--content-template '{"alerta": "{{ .Name }}", "estado": "{{ .Level }}", "mensaje": "{{ .Message }}"}'
# Crear una regla de notificación: enviar al webhook cuando el estado sea CRIT
influx notification-rule create \
--name "Alerta Temperatura Crítica" \
--org mi-organizacion \
--check-name "Temperatura Sala Alta" \
--endpoint-name "Webhook Alertas" \
--crit
Crear una tarea de alerta con Flux para mayor flexibilidad:
// Tarea Flux: Detectar y alertar anomalías en sensores
option task = {
name: "Detector de anomalías IoT",
every: 5m
}
// Calcular la media y desviación estándar de las últimas 2 horas
estadisticas = from(bucket: "sensores-iot")
|> range(start: -2h)
|> filter(fn: (r) => r._measurement == "lectura_sensor")
|> filter(fn: (r) => r._field == "temperatura")
|> group(columns: ["sensor_id"])
|> reduce(
identity: {suma: 0.0, suma_cuad: 0.0, count: 0},
fn: (r, accumulator) => ({
suma: accumulator.suma + r._value,
suma_cuad: accumulator.suma_cuad + r._value * r._value,
count: accumulator.count + 1
})
)
// Escribir anomalías detectadas en un bucket separado
from(bucket: "sensores-iot")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "lectura_sensor")
|> filter(fn: (r) => r._field == "temperatura")
|> filter(fn: (r) => r._value > 35.0 or r._value < -5.0)
|> map(fn: (r) => ({r with _measurement: "alertas_sensor"}))
|> to(bucket: "sensores-iot", org: "mi-organizacion")
Solución de Problemas
Telegraf no se conecta al broker MQTT:
# Verificar los logs de Telegraf
sudo journalctl -u telegraf --no-pager -n 50
# Probar la conexión MQTT manualmente
mosquitto_sub -h localhost -u telegraf -P TuPassword -t "sensores/#" -v
# Verificar la configuración de Telegraf sin iniciar
telegraf --config /etc/telegraf/telegraf.conf --test
Los datos no aparecen en InfluxDB:
# Comprobar que el token es correcto
influx auth list --org mi-organizacion
# Probar la escritura directa en InfluxDB
influx write \
--org mi-organizacion \
--bucket sensores-iot \
--token TU_TOKEN \
"prueba,sensor_id=test temperatura=23.5"
# Verificar los datos escritos
influx query \
--org mi-organizacion \
--token TU_TOKEN \
'from(bucket:"sensores-iot") |> range(start:-5m) |> limit(n:10)'
El uso de memoria de InfluxDB es excesivo:
# Limitar el uso de caché en la configuración
sudo tee -a /etc/influxdb/config.toml << 'EOF'
[storage]
cache-max-memory-size = "1gb"
cache-snapshot-memory-size = "25mb"
cache-snapshot-write-cold-duration = "10m"
compact-full-write-cold-duration = "4h"
EOF
sudo systemctl restart influxdb
Las tareas de downsampling no se ejecutan:
# Listar y verificar las tareas
influx task list --org mi-organizacion
# Ver los logs de ejecución de una tarea
influx task log list --task-id ID_DE_LA_TAREA --org mi-organizacion
# Ejecutar una tarea manualmente para probarla
influx task run start --task-id ID_DE_LA_TAREA
Conclusión
La combinación de Telegraf e InfluxDB forma la columna vertebral de cualquier stack de monitoreo IoT: Telegraf gestiona la complejidad de recoger datos de múltiples protocolos (MQTT, HTTP, archivos, APIs) y los transforma antes de entregarlos a InfluxDB, donde se almacenan de forma eficiente con políticas de retención y downsampling automático. Con el sistema de alertas nativo de InfluxDB puedes detectar anomalías y condiciones críticas en tiempo real sin depender de herramientas externas. Este stack se integra directamente con Grafana para la visualización, completando una plataforma IoT completa de código abierto.


