Fluentd Installation and Plugin Configuration

Fluentd is a unified log collector that centralizes log data from diverse sources into a single processing pipeline, supporting over 1,000 community plugins for inputs, filters, outputs, and parsers. Its plugin architecture, reliable buffering, and native Kubernetes integration make it the backbone of logging pipelines in many cloud-native environments.

Prerequisites

  • Ubuntu/Debian or CentOS/Rocky Linux
  • Ruby 2.7+ (bundled with td-agent)
  • Minimum 1 GB RAM (more for high-volume pipelines)
  • Firewall rules allowing Fluentd input ports

Installing Fluentd

# Install td-agent (Fluentd stable distribution) on Ubuntu/Debian
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-td-agent4.sh | sh

# For CentOS/Rocky Linux
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh

# Start td-agent
sudo systemctl enable --now td-agent
sudo systemctl status td-agent

# Verify installation
td-agent --version
/usr/sbin/td-agent --version

# Alternative: Install Fluent Bit (lightweight C implementation)
# For Kubernetes DaemonSets, Fluent Bit is preferred over Fluentd
curl https://raw.githubusercontent.com/fluent/fluent-bit/master/install.sh | sh
sudo systemctl enable --now fluent-bit

# Ruby gems approach (for latest plugins)
sudo gem install fluentd
sudo fluent-gem install fluent-plugin-elasticsearch

Core Configuration Concepts

Fluentd uses a tag-based routing system. Every event has a tag that determines which plugins process it:

<!-- /etc/td-agent/td-agent.conf structure -->
<!-- Format: source > filter > match -->

<!-- Sources generate events with tags -->
<source>
  @type tail
  path /var/log/nginx/access.log
  tag nginx.access    <!-- Tag for routing -->
  <!-- ... -->
</source>

<!-- Filters modify events matching the tag pattern -->
<filter nginx.**>
  @type record_transformer
  <!-- ... -->
</filter>

<!-- Match routes events to outputs -->
<match nginx.**>
  @type elasticsearch
  <!-- ... -->
</match>

<!-- Route to multiple outputs using copy -->
<match app.**>
  @type copy
  <store>
    @type elasticsearch
    <!-- ... -->
  </store>
  <store>
    @type s3
    <!-- ... -->
  </store>
</match>

Input Plugins

<!-- /etc/td-agent/td-agent.conf -->

<!-- Tail log files (most common input) -->
<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/td-agent/nginx-access.pos  <!-- Tracks read position -->
  tag nginx.access
  refresh_interval 5  <!-- Check for new files every 5s -->
  read_from_head true  <!-- Read from beginning of file -->
  <parse>
    @type nginx
  </parse>
</source>

<!-- Multiple files with glob -->
<source>
  @type tail
  path /var/log/app/*.log
  exclude_path ["/var/log/app/debug.log"]
  pos_file /var/log/td-agent/app.pos
  tag app.logs
  <parse>
    @type json
    time_key timestamp
    time_format %Y-%m-%dT%H:%M:%S.%L%z
  </parse>
</source>

<!-- Syslog input -->
<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  tag syslog
  <transport tcp>
  </transport>
  <parse>
    @type syslog
    with_priority true
  </parse>
</source>

<!-- HTTP input (for webhook-style log posting) -->
<source>
  @type http
  port 9880
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10
  <parse>
    @type json
  </parse>
</source>

<!-- Forward input (receive from other Fluentd instances) -->
<source>
  @type forward
  port 24224
  bind 0.0.0.0
  <security>
    self_hostname aggregator.example.com
    shared_key your-shared-secret
  </security>
</source>

Filter Plugins

<!-- Record transformer: add, modify, or remove fields -->
<filter nginx.**>
  @type record_transformer
  enable_ruby true
  <record>
    hostname "#{Socket.gethostname}"
    environment "#{ENV['ENVIRONMENT'] || 'production'}"
    timestamp ${time.strftime('%Y-%m-%dT%H:%M:%S.%L%z')}
    # Remove fields not needed
    remove_keys agent,gzip_ratio
  </record>
</filter>

<!-- GeoIP enrichment -->
<filter nginx.access>
  @type geoip
  geoip_lookup_keys remote
  geoip2_database /usr/share/GeoIP/GeoLite2-City.mmdb
  <record>
    country_code ${location.country_code["remote"]}
    city         ${location.city["remote"]}
    latitude     ${location.latitude["remote"]}
    longitude    ${location.longitude["remote"]}
  </record>
  skip_adding_null_record true
</filter>

<!-- Grep: filter events by field value -->
<filter app.**>
  @type grep
  <!-- Keep only error and warning events -->
  <regexp>
    key level
    pattern /^(error|warn|critical)$/i
  </regexp>
  <!-- Exclude health check noise -->
  <exclude>
    key http_path
    pattern /\/health|\/metrics/
  </exclude>
</filter>

<!-- Parser: parse a field within an event -->
<filter app.**>
  @type parser
  key_name message
  reserve_data true  <!-- Keep original fields -->
  remove_key_name_field false
  <parse>
    @type regexp
    expression /^(?<level>\w+) (?<timestamp>[^ ]+) (?<message>.+)$/
  </parse>
</filter>

<!-- Record modifier: efficient field modification -->
<filter **>
  @type record_modifier
  remove_keys _dummy,_internal
  char_encoding utf-8:utf-8
</filter>

Output Plugins

# Install plugins for different outputs
sudo td-agent-gem install fluent-plugin-elasticsearch
sudo td-agent-gem install fluent-plugin-kafka
sudo td-agent-gem install fluent-plugin-s3
sudo td-agent-gem install fluent-plugin-prometheus
<!-- Elasticsearch output -->
<match app.**>
  @type elasticsearch
  host elasticsearch.example.com
  port 9200
  scheme https
  user elastic
  password your-password
  index_name fluentd-${tag}-%Y%m%d  <!-- Dynamic index name -->
  type_name _doc
  include_timestamp true
  reconnect_on_error true
  reload_on_failure true
  request_timeout 20s
  <buffer tag,time>
    @type file
    path /var/log/td-agent/buffer/elasticsearch
    timekey 1h
    timekey_wait 10m
    chunk_limit_size 8MB
    flush_interval 5s
    retry_type exponential_backoff
    retry_max_times 10
  </buffer>
</match>

<!-- S3 output for archival -->
<match archive.**>
  @type s3
  aws_key_id YOUR_ACCESS_KEY
  aws_sec_key YOUR_SECRET_KEY
  s3_bucket your-log-archive-bucket
  s3_region us-east-1
  path logs/%Y/%m/%d/${tag}/
  s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
  time_slice_format %H
  store_as gzip_command  <!-- Compress with gzip -->
  <format>
    @type json
  </format>
  <buffer tag,time>
    @type file
    path /var/log/td-agent/buffer/s3
    timekey 1h
    timekey_wait 10m
    chunk_limit_size 64MB
  </buffer>
</match>

<!-- Kafka output for streaming -->
<match stream.**>
  @type kafka2
  brokers kafka-1:9092,kafka-2:9092
  default_topic application-logs
  use_event_time true
  <format>
    @type json
  </format>
  <buffer topic>
    @type file
    path /var/log/td-agent/buffer/kafka
    chunk_limit_size 4MB
    flush_interval 3s
  </buffer>
</match>

<!-- Forward to aggregator -->
<match **>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s
  <server>
    name aggregator-1
    host 10.0.0.11
    port 24224
    weight 60
  </server>
  <server>
    name aggregator-2
    host 10.0.0.12
    port 24224
    weight 40
  </server>
  <secondary>
    <!-- Fallback: write to local file if forwarding fails -->
    @type file
    path /var/log/td-agent/failed_forward
  </secondary>
  <buffer>
    @type file
    path /var/log/td-agent/buffer/forward
    flush_interval 5s
    retry_wait 20s
    retry_max_interval 300s
    retry_forever true
  </buffer>
</match>

Buffering Strategies

<!-- Memory buffer: fast but not persistent -->
<buffer>
  @type memory
  chunk_limit_size 8MB          <!-- Max size per chunk -->
  total_limit_size 512MB        <!-- Max total buffer size -->
  flush_interval 10s
  flush_thread_count 4
  retry_wait 20s
  retry_max_interval 300s
  retry_type exponential_backoff
</buffer>

<!-- File buffer: persistent, survives restart -->
<buffer tag,time>
  @type file
  path /var/log/td-agent/buffer/app
  timekey 1h                    <!-- Group events by hour -->
  timekey_wait 10m              <!-- Wait 10min after timekey before flushing -->
  chunk_limit_size 64MB
  total_limit_size 4GB          <!-- Alert if buffer exceeds this -->
  flush_interval 60s
  flush_thread_count 4
  flush_thread_burst_interval 1s
  overflow_action block         <!-- Block input when buffer full (safer than drop_oldest_chunk) -->
  retry_forever true
  retry_wait 20s
  retry_max_interval 300s
</buffer>

Kubernetes Integration

Deploy Fluentd as a DaemonSet in Kubernetes:

# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    app: fluentd
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
    spec:
      serviceAccountName: fluentd
      tolerations:
        - key: node-role.kubernetes.io/control-plane
          effect: NoSchedule
      containers:
        - name: fluentd
          image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch7
          env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: "elasticsearch.logging.svc"
            - name: FLUENT_ELASTICSEARCH_PORT
              value: "9200"
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: "http"
            - name: K8S_NODE_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
          resources:
            limits:
              memory: 500Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
            - name: fluentd-config
              mountPath: /fluentd/etc/
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers
        - name: fluentd-config
          configMap:
            name: fluentd-config
<!-- Kubernetes-specific Fluentd configuration -->
<source>
  @type tail
  path /var/log/containers/*.log
  pos_file /var/log/fluentd-containers.log.pos
  tag kubernetes.*
  read_from_head true
  <parse>
    @type multi_format
    <pattern>
      format json
      time_key time
      time_format %Y-%m-%dT%H:%M:%S.%NZ
    </pattern>
    <pattern>
      format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
      time_format %Y-%m-%dT%H:%M:%S.%N%:z
    </pattern>
  </parse>
</source>

<!-- Add Kubernetes metadata (namespace, pod name, labels) -->
<filter kubernetes.**>
  @type kubernetes_metadata
  @id filter_kube_metadata
</filter>

<!-- Route by namespace -->
<match kubernetes.var.log.containers.**_kube-system_**>
  @type null  <!-- Drop kube-system logs -->
</match>

Custom Plugins

Create a simple custom filter plugin:

# /etc/td-agent/plugin/filter_sanitize.rb
require 'fluent/plugin/filter'

module Fluent::Plugin
  class SanitizeFilter < Filter
    Fluent::Plugin.register_filter('sanitize', self)

    config_param :fields_to_mask, :array, default: ['password', 'credit_card', 'ssn']
    config_param :mask_value, :string, default: '[REDACTED]'

    def filter(tag, time, record)
      @fields_to_mask.each do |field|
        record[field] = @mask_value if record.key?(field)
      end
      record
    end
  end
end
<!-- Use the custom plugin -->
<filter app.**>
  @type sanitize
  fields_to_mask password,credit_card,api_key,secret
  mask_value [REDACTED]
</filter>
# Reload Fluentd after adding plugins or changing config
sudo systemctl reload td-agent

# Test configuration without restarting
sudo td-agent --dry-run -c /etc/td-agent/td-agent.conf

Troubleshooting

Events not being routed:

# Enable debug logging
sudo td-agent -c /etc/td-agent/td-agent.conf --log-level debug 2>&1 | head -50

# Check which tags are being processed
sudo td-agent-gem exec fluentd --show-plugin-config filter/record_transformer

# Use stdout output to debug
# Add temporarily to match:
<match debug.**>
  @type stdout
</match>

# Test by posting a log event
curl -X POST http://localhost:9880/debug.test \
  -H "Content-Type: application/json" \
  -d '{"message": "test event"}'

Buffer not flushing:

# Check buffer files
ls -la /var/log/td-agent/buffer/

# Check td-agent logs for buffer errors
sudo tail -f /var/log/td-agent/td-agent.log

# Force flush (send SIGUSR1)
sudo kill -USR1 $(cat /var/run/td-agent/td-agent.pid)

Plugin not found:

# List installed plugins
sudo td-agent-gem list | grep fluent

# Install missing plugin
sudo td-agent-gem install fluent-plugin-<name>
sudo systemctl restart td-agent

Conclusion

Fluentd's plugin architecture and tag-based routing make it one of the most flexible log processing solutions available, capable of collecting from dozens of sources and routing to multiple destinations simultaneously. Use file buffers for reliability, implement filters to enrich and sanitize log data before storage, and deploy as a Kubernetes DaemonSet for container log collection. For high-throughput environments, consider Fluent Bit as a lightweight forwarder feeding into a Fluentd aggregator for complex processing.