Fluentd Installation and Plugin Configuration
Fluentd is a unified log collector that centralizes log data from diverse sources into a single processing pipeline, supporting over 1,000 community plugins for inputs, filters, outputs, and parsers. Its plugin architecture, reliable buffering, and native Kubernetes integration make it the backbone of logging pipelines in many cloud-native environments.
Prerequisites
- Ubuntu/Debian or CentOS/Rocky Linux
- Ruby 2.7+ (bundled with td-agent)
- Minimum 1 GB RAM (more for high-volume pipelines)
- Firewall rules allowing Fluentd input ports
Installing Fluentd
# Install td-agent (Fluentd stable distribution) on Ubuntu/Debian
curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-jammy-td-agent4.sh | sh
# For CentOS/Rocky Linux
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh
# Start td-agent
sudo systemctl enable --now td-agent
sudo systemctl status td-agent
# Verify installation
td-agent --version
/usr/sbin/td-agent --version
# Alternative: Install Fluent Bit (lightweight C implementation)
# For Kubernetes DaemonSets, Fluent Bit is preferred over Fluentd
curl https://raw.githubusercontent.com/fluent/fluent-bit/master/install.sh | sh
sudo systemctl enable --now fluent-bit
# Ruby gems approach (for latest plugins)
sudo gem install fluentd
sudo fluent-gem install fluent-plugin-elasticsearch
Core Configuration Concepts
Fluentd uses a tag-based routing system. Every event has a tag that determines which plugins process it:
<!-- /etc/td-agent/td-agent.conf structure -->
<!-- Format: source > filter > match -->
<!-- Sources generate events with tags -->
<source>
@type tail
path /var/log/nginx/access.log
tag nginx.access <!-- Tag for routing -->
<!-- ... -->
</source>
<!-- Filters modify events matching the tag pattern -->
<filter nginx.**>
@type record_transformer
<!-- ... -->
</filter>
<!-- Match routes events to outputs -->
<match nginx.**>
@type elasticsearch
<!-- ... -->
</match>
<!-- Route to multiple outputs using copy -->
<match app.**>
@type copy
<store>
@type elasticsearch
<!-- ... -->
</store>
<store>
@type s3
<!-- ... -->
</store>
</match>
Input Plugins
<!-- /etc/td-agent/td-agent.conf -->
<!-- Tail log files (most common input) -->
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/td-agent/nginx-access.pos <!-- Tracks read position -->
tag nginx.access
refresh_interval 5 <!-- Check for new files every 5s -->
read_from_head true <!-- Read from beginning of file -->
<parse>
@type nginx
</parse>
</source>
<!-- Multiple files with glob -->
<source>
@type tail
path /var/log/app/*.log
exclude_path ["/var/log/app/debug.log"]
pos_file /var/log/td-agent/app.pos
tag app.logs
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%L%z
</parse>
</source>
<!-- Syslog input -->
<source>
@type syslog
port 5140
bind 0.0.0.0
tag syslog
<transport tcp>
</transport>
<parse>
@type syslog
with_priority true
</parse>
</source>
<!-- HTTP input (for webhook-style log posting) -->
<source>
@type http
port 9880
bind 0.0.0.0
body_size_limit 32m
keepalive_timeout 10
<parse>
@type json
</parse>
</source>
<!-- Forward input (receive from other Fluentd instances) -->
<source>
@type forward
port 24224
bind 0.0.0.0
<security>
self_hostname aggregator.example.com
shared_key your-shared-secret
</security>
</source>
Filter Plugins
<!-- Record transformer: add, modify, or remove fields -->
<filter nginx.**>
@type record_transformer
enable_ruby true
<record>
hostname "#{Socket.gethostname}"
environment "#{ENV['ENVIRONMENT'] || 'production'}"
timestamp ${time.strftime('%Y-%m-%dT%H:%M:%S.%L%z')}
# Remove fields not needed
remove_keys agent,gzip_ratio
</record>
</filter>
<!-- GeoIP enrichment -->
<filter nginx.access>
@type geoip
geoip_lookup_keys remote
geoip2_database /usr/share/GeoIP/GeoLite2-City.mmdb
<record>
country_code ${location.country_code["remote"]}
city ${location.city["remote"]}
latitude ${location.latitude["remote"]}
longitude ${location.longitude["remote"]}
</record>
skip_adding_null_record true
</filter>
<!-- Grep: filter events by field value -->
<filter app.**>
@type grep
<!-- Keep only error and warning events -->
<regexp>
key level
pattern /^(error|warn|critical)$/i
</regexp>
<!-- Exclude health check noise -->
<exclude>
key http_path
pattern /\/health|\/metrics/
</exclude>
</filter>
<!-- Parser: parse a field within an event -->
<filter app.**>
@type parser
key_name message
reserve_data true <!-- Keep original fields -->
remove_key_name_field false
<parse>
@type regexp
expression /^(?<level>\w+) (?<timestamp>[^ ]+) (?<message>.+)$/
</parse>
</filter>
<!-- Record modifier: efficient field modification -->
<filter **>
@type record_modifier
remove_keys _dummy,_internal
char_encoding utf-8:utf-8
</filter>
Output Plugins
# Install plugins for different outputs
sudo td-agent-gem install fluent-plugin-elasticsearch
sudo td-agent-gem install fluent-plugin-kafka
sudo td-agent-gem install fluent-plugin-s3
sudo td-agent-gem install fluent-plugin-prometheus
<!-- Elasticsearch output -->
<match app.**>
@type elasticsearch
host elasticsearch.example.com
port 9200
scheme https
user elastic
password your-password
index_name fluentd-${tag}-%Y%m%d <!-- Dynamic index name -->
type_name _doc
include_timestamp true
reconnect_on_error true
reload_on_failure true
request_timeout 20s
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/elasticsearch
timekey 1h
timekey_wait 10m
chunk_limit_size 8MB
flush_interval 5s
retry_type exponential_backoff
retry_max_times 10
</buffer>
</match>
<!-- S3 output for archival -->
<match archive.**>
@type s3
aws_key_id YOUR_ACCESS_KEY
aws_sec_key YOUR_SECRET_KEY
s3_bucket your-log-archive-bucket
s3_region us-east-1
path logs/%Y/%m/%d/${tag}/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
time_slice_format %H
store_as gzip_command <!-- Compress with gzip -->
<format>
@type json
</format>
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/s3
timekey 1h
timekey_wait 10m
chunk_limit_size 64MB
</buffer>
</match>
<!-- Kafka output for streaming -->
<match stream.**>
@type kafka2
brokers kafka-1:9092,kafka-2:9092
default_topic application-logs
use_event_time true
<format>
@type json
</format>
<buffer topic>
@type file
path /var/log/td-agent/buffer/kafka
chunk_limit_size 4MB
flush_interval 3s
</buffer>
</match>
<!-- Forward to aggregator -->
<match **>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<server>
name aggregator-1
host 10.0.0.11
port 24224
weight 60
</server>
<server>
name aggregator-2
host 10.0.0.12
port 24224
weight 40
</server>
<secondary>
<!-- Fallback: write to local file if forwarding fails -->
@type file
path /var/log/td-agent/failed_forward
</secondary>
<buffer>
@type file
path /var/log/td-agent/buffer/forward
flush_interval 5s
retry_wait 20s
retry_max_interval 300s
retry_forever true
</buffer>
</match>
Buffering Strategies
<!-- Memory buffer: fast but not persistent -->
<buffer>
@type memory
chunk_limit_size 8MB <!-- Max size per chunk -->
total_limit_size 512MB <!-- Max total buffer size -->
flush_interval 10s
flush_thread_count 4
retry_wait 20s
retry_max_interval 300s
retry_type exponential_backoff
</buffer>
<!-- File buffer: persistent, survives restart -->
<buffer tag,time>
@type file
path /var/log/td-agent/buffer/app
timekey 1h <!-- Group events by hour -->
timekey_wait 10m <!-- Wait 10min after timekey before flushing -->
chunk_limit_size 64MB
total_limit_size 4GB <!-- Alert if buffer exceeds this -->
flush_interval 60s
flush_thread_count 4
flush_thread_burst_interval 1s
overflow_action block <!-- Block input when buffer full (safer than drop_oldest_chunk) -->
retry_forever true
retry_wait 20s
retry_max_interval 300s
</buffer>
Kubernetes Integration
Deploy Fluentd as a DaemonSet in Kubernetes:
# fluentd-daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: kube-system
labels:
app: fluentd
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
spec:
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/control-plane
effect: NoSchedule
containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1-debian-elasticsearch7
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: "elasticsearch.logging.svc"
- name: FLUENT_ELASTICSEARCH_PORT
value: "9200"
- name: FLUENT_ELASTICSEARCH_SCHEME
value: "http"
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
resources:
limits:
memory: 500Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluentd-config
mountPath: /fluentd/etc/
volumes:
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluentd-config
configMap:
name: fluentd-config
<!-- Kubernetes-specific Fluentd configuration -->
<source>
@type tail
path /var/log/containers/*.log
pos_file /var/log/fluentd-containers.log.pos
tag kubernetes.*
read_from_head true
<parse>
@type multi_format
<pattern>
format json
time_key time
time_format %Y-%m-%dT%H:%M:%S.%NZ
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>
<!-- Add Kubernetes metadata (namespace, pod name, labels) -->
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
</filter>
<!-- Route by namespace -->
<match kubernetes.var.log.containers.**_kube-system_**>
@type null <!-- Drop kube-system logs -->
</match>
Custom Plugins
Create a simple custom filter plugin:
# /etc/td-agent/plugin/filter_sanitize.rb
require 'fluent/plugin/filter'
module Fluent::Plugin
class SanitizeFilter < Filter
Fluent::Plugin.register_filter('sanitize', self)
config_param :fields_to_mask, :array, default: ['password', 'credit_card', 'ssn']
config_param :mask_value, :string, default: '[REDACTED]'
def filter(tag, time, record)
@fields_to_mask.each do |field|
record[field] = @mask_value if record.key?(field)
end
record
end
end
end
<!-- Use the custom plugin -->
<filter app.**>
@type sanitize
fields_to_mask password,credit_card,api_key,secret
mask_value [REDACTED]
</filter>
# Reload Fluentd after adding plugins or changing config
sudo systemctl reload td-agent
# Test configuration without restarting
sudo td-agent --dry-run -c /etc/td-agent/td-agent.conf
Troubleshooting
Events not being routed:
# Enable debug logging
sudo td-agent -c /etc/td-agent/td-agent.conf --log-level debug 2>&1 | head -50
# Check which tags are being processed
sudo td-agent-gem exec fluentd --show-plugin-config filter/record_transformer
# Use stdout output to debug
# Add temporarily to match:
<match debug.**>
@type stdout
</match>
# Test by posting a log event
curl -X POST http://localhost:9880/debug.test \
-H "Content-Type: application/json" \
-d '{"message": "test event"}'
Buffer not flushing:
# Check buffer files
ls -la /var/log/td-agent/buffer/
# Check td-agent logs for buffer errors
sudo tail -f /var/log/td-agent/td-agent.log
# Force flush (send SIGUSR1)
sudo kill -USR1 $(cat /var/run/td-agent/td-agent.pid)
Plugin not found:
# List installed plugins
sudo td-agent-gem list | grep fluent
# Install missing plugin
sudo td-agent-gem install fluent-plugin-<name>
sudo systemctl restart td-agent
Conclusion
Fluentd's plugin architecture and tag-based routing make it one of the most flexible log processing solutions available, capable of collecting from dozens of sources and routing to multiple destinations simultaneously. Use file buffers for reliability, implement filters to enrich and sanitize log data before storage, and deploy as a Kubernetes DaemonSet for container log collection. For high-throughput environments, consider Fluent Bit as a lightweight forwarder feeding into a Fluentd aggregator for complex processing.


