Instalación y Configuración de Plugins de Fluentd

Fluentd es un recolector de datos unificado de código abierto que permite agregar logs de múltiples fuentes, transformarlos y enviarlos a múltiples destinos mediante su arquitectura basada en plugins. Con más de 500 plugins disponibles y soporte nativo para Kubernetes, Fluentd se ha convertido en uno de los estándares de facto para la gestión de logs en infraestructuras modernas, especialmente como componente del popular stack EFK (Elasticsearch + Fluentd + Kibana).

Requisitos Previos

  • Ubuntu 22.04/20.04, Debian 11, o CentOS/Rocky Linux 8+
  • Ruby 2.7+ (incluido en el paquete td-agent)
  • Al menos 512 MB de RAM (más en instalaciones con buffers grandes)
  • Acceso de red al destino final de los logs

Instalación de Fluentd (td-agent)

td-agent es el paquete de distribución estable de Fluentd con todas las dependencias incluidas.

# Instalar td-agent (Ubuntu/Debian)
curl -fsSL https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-td-agent4.sh | sh

# Para CentOS/Rocky Linux 8
curl -fsSL https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

# Verificar la instalación
td-agent --version
td-agent-gem list  # Ver los plugins instalados

# Habilitar e iniciar el servicio
systemctl enable td-agent && systemctl start td-agent
systemctl status td-agent

# Archivo de configuración principal
# /etc/td-agent/td-agent.conf

# Logs de td-agent
tail -f /var/log/td-agent/td-agent.log

# Alternativa: instalar usando Ruby gems directamente
gem install fluentd
fluentd --version

Arquitectura de Configuración

Fluentd usa un formato de configuración declarativo con directivas:

  • <source>: define de dónde se leen los logs (input plugins)
  • <match>: define a dónde se envían los logs que coinciden con una etiqueta (output plugins)
  • <filter>: transforma o filtra los eventos entre source y match
  • <label>: agrupa directivas para enrutamiento interno

Configuración de Inputs y Outputs

cat > /etc/td-agent/td-agent.conf << 'EOF'
# ===============================
# SOURCES (Inputs)
# ===============================

# Leer archivos de log de Nginx
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/td-agent/nginx.access.pos
  tag nginx.access
  read_from_head true
  
  <parse>
    @type nginx
    # Fluentd incluye el parser de Nginx por defecto
  </parse>
</source>

<source>
  @type tail
  path /var/log/nginx/error.log
  pos_file /var/log/td-agent/nginx.error.pos
  tag nginx.error
  
  <parse>
    @type multiline
    format_firstline /^\d{4}\/\d{2}\/\d{2}/
    format1 /^(?<time>\d{4}\/\d{2}\/\d{2} \d{2}:\d{2}:\d{2}) \[(?<level>[^\]]*)\] (?<message>.*)/
  </parse>
</source>

# Leer logs de systemd journal
<source>
  @type systemd
  path /run/log/journal
  tag systemd
  read_from_head true
  
  <entry>
    fields_strip_underscores true
    fields_lowercase true
  </entry>
  
  <storage>
    @type local
    persistent true
    path /var/log/td-agent/journald.pos
  </storage>
</source>

# Recibir logs via HTTP (para aplicaciones que hacen POST)
<source>
  @type http
  port 9880
  bind 0.0.0.0
  
  <parse>
    @type json
  </parse>
</source>

# Recibir logs en formato Syslog
<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag syslog
  
  <transport udp>
  </transport>
  
  <parse>
    @type syslog
    message_format auto
  </parse>
</source>

# ===============================
# OUTPUTS (Matches)
# ===============================

# Enviar logs de Nginx a Elasticsearch
<match nginx.**>
  @type elasticsearch
  host elasticsearch.miempresa.com
  port 9200
  scheme https
  user elastic
  password contraseña-elastic
  
  # Índice con rotación diaria
  index_name fluentd-nginx-%Y%m%d
  
  # Configuración del buffer para alta disponibilidad
  <buffer time>
    @type file
    path /var/log/td-agent/buffer/nginx
    flush_interval 5s
    retry_wait 30s
    retry_max_interval 120s
    retry_forever true
  </buffer>
</match>

# Enviar todos los demás logs al mismo Elasticsearch
<match **>
  @type elasticsearch
  host elasticsearch.miempresa.com
  port 9200
  scheme https
  user elastic
  password contraseña-elastic
  index_name fluentd-logs-%Y%m%d
  
  <buffer time>
    @type file
    path /var/log/td-agent/buffer/general
    flush_interval 10s
  </buffer>
</match>
EOF

Filtros y Transformaciones

# Agregar filtros entre los inputs y outputs
cat >> /etc/td-agent/td-agent.conf << 'EOF'

# ===============================
# FILTERS (entre source y match)
# ===============================

# Enriquecer logs de Nginx con el nombre del host
<filter nginx.access>
  @type record_transformer
  
  # Agregar campos a cada evento
  <record>
    hostname "#{Socket.gethostname}"
    environment "#{ENV['ENVIRONMENT'] || 'produccion'}"
    servicio nginx
  </record>
  
  # Eliminar campos que no queremos
  remove_keys agent, gzip_ratio
</filter>

# Filtrar health checks de Nginx que no queremos indexar
<filter nginx.access>
  @type grep
  
  # Excluir peticiones a /health y /metrics
  <exclude>
    key path
    pattern /\/(health|metrics|ping)/
  </exclude>
  
  # Excluir peticiones del propio balanceador de carga
  <exclude>
    key remote
    pattern 127\.0\.0\.1
  </exclude>
</filter>

# Parsear logs JSON de la aplicación y aplanar los campos
<filter app.**>
  @type parser
  key_name message
  reserve_data true
  remove_key_name_field true
  
  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S.%N%z
  </parse>
</filter>

# Agregar GeoIP basado en la IP del cliente
<filter nginx.access>
  @type geoip
  
  # Instalar primero: td-agent-gem install fluent-plugin-geoip
  geoip2_database /usr/share/GeoIP/GeoLite2-City.mmdb
  geoip_lookup_keys remote
  
  <record>
    city        ${city.names.es["remote"]}
    country     ${country.iso_code["remote"]}
    latitude    ${location.latitude["remote"]}
    longitude   ${location.longitude["remote"]}
  </record>
  
  skip_adding_null_record true
</filter>
EOF

Gestión de Buffers

Los buffers son críticos para la fiabilidad de Fluentd cuando el destino no está disponible.

cat >> /etc/td-agent/td-agent.conf << 'EOF'

# Configuración avanzada de buffer de archivo para alta disponibilidad
<match important_logs.**>
  @type elasticsearch
  host elasticsearch.miempresa.com
  port 9200
  
  # Buffer persistente en disco para no perder logs si Fluentd se reinicia
  <buffer tag, time>
    @type file
    path /var/log/td-agent/buffer/important
    
    # Acumular hasta 256 MB o durante 5 segundos antes de enviar
    chunk_limit_size 64m
    total_limit_size 1g
    flush_interval 5s
    
    # Estrategia de reintentos con backoff exponencial
    retry_type exponential_backoff
    retry_wait 5s
    retry_max_interval 120s
    retry_timeout 72h
    retry_forever true
    
    # Número de threads para el flush
    flush_thread_count 4
    flush_thread_interval 5s
  </buffer>
  
  # Manejar chunks que no se pueden enviar
  <secondary>
    @type file
    path /var/log/td-agent/failed_chunks/important
  </secondary>
</match>
EOF

# Crear directorios de buffer con los permisos correctos
mkdir -p /var/log/td-agent/buffer/{nginx,general,important}
mkdir -p /var/log/td-agent/failed_chunks
chown -R td-agent:td-agent /var/log/td-agent/buffer /var/log/td-agent/failed_chunks

Integración con Kubernetes

# En Kubernetes, Fluentd se despliega como DaemonSet para recoger los logs de todos los nodos
# Instalar Fluentd en Kubernetes con Helm
helm repo add fluent https://fluent.github.io/helm-charts
helm repo update

# Instalar fluentd como DaemonSet
helm install fluentd fluent/fluentd \
  --namespace kube-logging \
  --create-namespace \
  --set "elasticsearch.host=elasticsearch.monitoring.svc.cluster.local" \
  --set "elasticsearch.port=9200"

# Configuración personalizada para Kubernetes
cat > fluentd-k8s-config.yaml << 'EOF'
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-config
  namespace: kube-logging
data:
  fluent.conf: |
    # Recoger logs de todos los contenedores
    <source>
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/fluentd/containers.pos
      tag kubernetes.*
      read_from_head true
      
      <parse>
        @type cri
      </parse>
    </source>
    
    # Enriquecer con metadatos de Kubernetes
    <filter kubernetes.**>
      @type kubernetes_metadata
      @id filter_kube_metadata
      kubernetes_url https://#{ENV['KUBERNETES_SERVICE_HOST']}:#{ENV['KUBERNETES_SERVICE_PORT_HTTPS']}
      skip_labels false
      skip_container_metadata false
      skip_namespace_metadata false
    </filter>
    
    # Separar logs por namespace
    <match kubernetes.var.log.containers.**kube-system**.log>
      @type null  # Descartar logs del sistema (opcional)
    </match>
    
    <match kubernetes.**>
      @type elasticsearch
      host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
      port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
      index_name fluentd-kube-%Y%m%d
      
      <buffer time>
        @type file
        path /var/log/fluentd/buffer
        timekey 1d
        timekey_wait 10m
        flush_thread_count 8
      </buffer>
    </match>
EOF

kubectl apply -f fluentd-k8s-config.yaml

Plugins Personalizados

# Instalar plugins adicionales con td-agent-gem
td-agent-gem install fluent-plugin-elasticsearch
td-agent-gem install fluent-plugin-kafka
td-agent-gem install fluent-plugin-s3
td-agent-gem install fluent-plugin-geoip
td-agent-gem install fluent-plugin-systemd
td-agent-gem install fluent-plugin-kubernetes_metadata_filter

# Verificar los plugins instalados
td-agent-gem list | grep fluent-plugin

# Configurar el output a Kafka como alternativa a Elasticsearch
cat > /etc/td-agent/kafka-output.conf << 'EOF'
<match app.**>
  @type kafka2
  brokers kafka-1.miempresa.local:9092,kafka-2.miempresa.local:9092
  default_topic logs
  
  <format>
    @type json
  </format>
  
  <buffer tag>
    @type file
    path /var/log/td-agent/buffer/kafka
    flush_interval 5s
  </buffer>
</match>
EOF

# Configurar el output a S3 para archivado de logs
cat > /etc/td-agent/s3-output.conf << 'EOF'
<match archive.**>
  @type s3
  
  aws_key_id  "#{ENV['AWS_ACCESS_KEY_ID']}"
  aws_sec_key "#{ENV['AWS_SECRET_ACCESS_KEY']}"
  s3_bucket   mi-bucket-logs-archivo
  s3_region   us-east-1
  
  # Comprimir los logs antes de subir a S3
  store_as gzip
  
  # Organizar por fecha en S3
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  path logs/
  time_slice_format %Y%m%d_%H
  
  <buffer time>
    @type file
    path /var/log/td-agent/buffer/s3
    timekey 1h
    timekey_wait 10m
  </buffer>
</match>
EOF

Solución de Problemas

td-agent no inicia:

# Verificar la configuración
td-agent --dry-run -c /etc/td-agent/td-agent.conf

# Ver los errores de sintaxis
td-agent --test -c /etc/td-agent/td-agent.conf

# Ver los logs de inicio
journalctl -u td-agent -f
tail -f /var/log/td-agent/td-agent.log

Logs no llegan a Elasticsearch:

# Verificar la conectividad
curl -s https://elasticsearch.miempresa.com:9200/_cluster/health

# Activar el debug en la configuración
# Cambiar en /etc/td-agent/td-agent.conf:
# <system>
#   log_level debug
# </system>

# Ver los chunks de buffer pendientes
ls -la /var/log/td-agent/buffer/

# Forzar el flush inmediato del buffer
kill -SIGUSR1 $(cat /var/run/td-agent/td-agent.pid)

Alto uso de memoria:

# Reducir el tamaño del buffer en memoria
# En la sección <buffer>:
# @type file  # Usar buffer en disco en lugar de memoria
# total_limit_size 256m

# Verificar el uso de memoria
ps aux | grep td-agent
cat /proc/$(pgrep -f td-agent)/status | grep VmRSS

Conclusión

Fluentd destaca por su arquitectura de plugins que permite construir pipelines de logging altamente personalizados: desde la recolección de logs de archivos hasta el enriquecimiento con metadatos de Kubernetes, pasando por la entrega fiable a múltiples destinos simultáneamente. Su sistema de buffers persistentes garantiza que ningún log se pierda incluso cuando los sistemas de destino no están disponibles temporalmente, lo que lo convierte en la opción predilecta para entornos críticos donde la fiabilidad del pipeline de observabilidad es fundamental.