Kafka Cluster Configuration and Management

This guide covers deploying and managing a production-grade Apache Kafka cluster with multiple brokers, replication, partitioning strategies, consumer groups, and monitoring. A properly configured Kafka cluster ensures high availability, fault tolerance, and optimal performance for real-time streaming applications.

Table of Contents

Multi-Broker Cluster Architecture

A Kafka cluster consists of multiple broker nodes coordinated through ZooKeeper or KRaft. Configure three or more brokers for production deployments to ensure fault tolerance.

Create configuration files for each broker. Broker 1 configuration at /etc/kafka/broker-1.properties:

# Unique broker identification
broker.id=1
node.id=1

# Network configuration
listeners=PLAINTEXT://broker-1.internal.example.com:9092,CONTROLLER://broker-1.internal.example.com:9093
advertised.listeners=PLAINTEXT://broker-1.public.example.com:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=PLAINTEXT

# ZooKeeper configuration
zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181/kafka

# Log storage
log.dir=/var/kafka-logs/broker-1
log.dirs=/var/kafka-logs/broker-1

# Replication defaults
default.replication.factor=3
min.insync.replicas=2

# Performance tuning
num.network.threads=8
num.io.threads=8
num.replica.fetchers=4
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# Retention
log.retention.hours=168
log.segment.bytes=1073741824

# Group coordinator
group.initial.rebalance.delay.ms=3000

Broker 2 configuration at /etc/kafka/broker-2.properties:

broker.id=2
node.id=2

listeners=PLAINTEXT://broker-2.internal.example.com:9092,CONTROLLER://broker-2.internal.example.com:9093
advertised.listeners=PLAINTEXT://broker-2.public.example.com:9092

zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181/kafka

log.dir=/var/kafka-logs/broker-2
log.dirs=/var/kafka-logs/broker-2

default.replication.factor=3
min.insync.replicas=2

num.network.threads=8
num.io.threads=8
num.replica.fetchers=4

Broker 3 configuration at /etc/kafka/broker-3.properties:

broker.id=3
node.id=3

listeners=PLAINTEXT://broker-3.internal.example.com:9092,CONTROLLER://broker-3.internal.example.com:9093
advertised.listeners=PLAINTEXT://broker-3.public.example.com:9092

zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181/kafka

log.dir=/var/kafka-logs/broker-3
log.dirs=/var/kafka-logs/broker-3

default.replication.factor=3
min.insync.replicas=2

num.network.threads=8
num.io.threads=8
num.replica.fetchers=4

Create systemd service files for each broker:

sudo tee /etc/systemd/system/kafka-broker-1.service <<EOF
[Unit]
Description=Apache Kafka Broker 1
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /etc/kafka/broker-1.properties
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
EOF

Repeat for broker-2 and broker-3 services.

Start all brokers:

for i in 1 2 3; do
  sudo systemctl start kafka-broker-$i
  sudo systemctl enable kafka-broker-$i
done

Verify cluster formation:

kafka-broker-api-versions.sh \
  --bootstrap-server broker-1:9092,broker-2:9092,broker-3:9092 \
  --version

Network Configuration

Configure proper network settings for inter-broker communication and client connectivity. Use separate addresses for internal (broker-to-broker) and external (client) communication.

Update /etc/hosts on all cluster nodes:

sudo tee -a /etc/hosts <<EOF
192.168.1.10 broker-1.internal.example.com broker-1
192.168.1.11 broker-2.internal.example.com broker-2
192.168.1.12 broker-3.internal.example.com broker-3
203.0.113.10 broker-1.public.example.com
203.0.113.11 broker-2.public.example.com
203.0.113.12 broker-3.public.example.com
EOF

Configure firewall rules to allow inter-broker and client communication:

# Allow broker-to-broker communication
sudo ufw allow from 192.168.1.0/24 to any port 9092
sudo ufw allow from 192.168.1.0/24 to any port 9093

# Allow client connections
sudo ufw allow 9092

# Allow ZooKeeper
sudo ufw allow from 192.168.1.0/24 to any port 2181

For AWS security groups or cloud providers, create rules allowing port 9092 from clients and port 9092/9093 between brokers.

Replication and Durability

Kafka replication ensures data durability across broker failures. Configure replication factor and in-sync replicas appropriately.

Create a topic with specified replication settings:

kafka-topics.sh --create \
  --bootstrap-server broker-1:9092 \
  --topic production-data \
  --partitions 12 \
  --replication-factor 3 \
  --config min.insync.replicas=2 \
  --config unclean.leader.election.enable=false

Monitor replica status:

kafka-topics.sh --describe \
  --bootstrap-server broker-1:9092 \
  --topic production-data

The output shows leaders, replicas, and in-sync replicas (ISR). Example output:

Topic: production-data	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
Topic: production-data	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
Topic: production-data	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

Check for under-replicated partitions (warning sign of cluster issues):

kafka-topics.sh --describe \
  --under-replicated-partitions \
  --bootstrap-server broker-1:9092

Configure preferred leader election to rebalance leadership:

kafka-preferred-replica-election.sh \
  --bootstrap-server broker-1:9092

Partition Assignment Strategies

Proper partition assignment balances load across brokers. Configure assignment strategies in broker configuration:

# Round-robin assignment (default)
# Each broker gets one partition of each topic in sequence

# Range assignment (assigns contiguous partitions to brokers)
log.message.format.version=3.0
group.initial.rebalance.delay.ms=3000

# Sticky assignment (minimizes partition movement on rebalancing)
group.coordinator.rebalance.delay.ms=5000

Manually assign partitions using a JSON file. Create /tmp/partition-assignment.json:

{
  "version": 1,
  "partitions": [
    {"topic": "production-data", "partition": 0, "replicas": [1, 2, 3]},
    {"topic": "production-data", "partition": 1, "replicas": [2, 3, 1]},
    {"topic": "production-data", "partition": 2, "replicas": [3, 1, 2]},
    {"topic": "production-data", "partition": 3, "replicas": [1, 2, 3]},
    {"topic": "production-data", "partition": 4, "replicas": [2, 3, 1]},
    {"topic": "production-data", "partition": 5, "replicas": [3, 1, 2]}
  ]
}

Execute the assignment:

kafka-reassign-partitions.sh \
  --bootstrap-server broker-1:9092 \
  --reassignment-json-file /tmp/partition-assignment.json \
  --execute

Monitor reassignment progress:

kafka-reassign-partitions.sh \
  --bootstrap-server broker-1:9092 \
  --reassignment-json-file /tmp/partition-assignment.json \
  --verify

Consumer Groups and Offsets

Consumer groups allow multiple consumers to process messages from a topic in parallel. Each partition is consumed by only one consumer in the group.

Create a consumer group and start consuming:

kafka-console-consumer.sh \
  --bootstrap-server broker-1:9092 \
  --topic production-data \
  --group analytics-consumers \
  --from-beginning

List all consumer groups:

kafka-consumer-groups.sh \
  --bootstrap-server broker-1:9092 \
  --list

Describe a consumer group in detail:

kafka-consumer-groups.sh \
  --bootstrap-server broker-1:9092 \
  --describe \
  --group analytics-consumers

Output shows consumption lag per partition:

GROUP                TOPIC             PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
analytics-consumers  production-data   0          1000            1050            50
analytics-consumers  production-data   1          950             1000            50
analytics-consumers  production-data   2          1025            1025            0

Reset consumer group offset to the earliest available message:

kafka-consumer-groups.sh \
  --bootstrap-server broker-1:9092 \
  --reset-offsets \
  --group analytics-consumers \
  --topic production-data \
  --to-earliest \
  --execute

Reset offset to a specific timestamp:

kafka-consumer-groups.sh \
  --bootstrap-server broker-1:9092 \
  --reset-offsets \
  --group analytics-consumers \
  --topic production-data \
  --to-datetime 2024-01-15T10:30:00.000 \
  --execute

Cluster Monitoring

Monitor cluster health using broker metrics and log analysis. Configure JMX monitoring:

sudo tee -a /etc/systemd/system/kafka-broker-1.service.d/override.conf <<EOF
[Service]
Environment="KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=broker-1.internal.example.com -Dcom.sun.management.jmxremote.rmi.port=9999"
EOF

Use Kafka exporter for Prometheus monitoring:

# Download and install kafka_exporter
wget https://github.com/danielqsj/kafka-exporter/releases/download/v1.6.0/kafka_exporter-1.6.0.linux-amd64.tar.gz
tar -xzf kafka_exporter-1.6.0.linux-amd64.tar.gz
sudo mv kafka_exporter /usr/local/bin/

Create a systemd service for the exporter:

sudo tee /etc/systemd/system/kafka-exporter.service <<EOF
[Unit]
Description=Kafka Exporter
After=network-online.target

[Service]
Type=simple
ExecStart=/usr/local/bin/kafka_exporter --kafka.version=3.6.1 --kafka.addrs=broker-1:9092,broker-2:9092,broker-3:9092
Restart=always

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl start kafka-exporter
sudo systemctl enable kafka-exporter

Monitor key metrics:

# Check broker status
kafka-broker-api-versions.sh --bootstrap-server broker-1:9092

# Monitor ISR status
kafka-topics.sh --describe --bootstrap-server broker-1:9092 | grep -i isr

# Check broker metadata
kafka-metadata-shell.sh --snapshot /var/kafka-logs/broker-1/__cluster_metadata-0/00000000000000000000.log

Scaling and Load Balancing

Add new brokers to an existing cluster by configuring them with new broker IDs and registering them in ZooKeeper. When adding broker 4:

broker.id=4
node.id=4

listeners=PLAINTEXT://broker-4.internal.example.com:9092,CONTROLLER://broker-4.internal.example.com:9093
advertised.listeners=PLAINTEXT://broker-4.public.example.com:9092

zookeeper.connect=zk-1:2181,zk-2:2181,zk-3:2181/kafka

log.dir=/var/kafka-logs/broker-4
log.dirs=/var/kafka-logs/broker-4

default.replication.factor=3
min.insync.replicas=2

Start the new broker:

sudo systemctl start kafka-broker-4
sudo systemctl enable kafka-broker-4

Rebalance partitions to include the new broker. Create a JSON file listing topics to rebalance:

{
  "topics": [
    {"topic": "production-data"},
    {"topic": "user-events"},
    {"topic": "transactions"}
  ],
  "version": 1
}

Generate a reassignment plan:

kafka-reassign-partitions.sh \
  --bootstrap-server broker-1:9092 \
  --generate \
  --topics-to-move-json-file /tmp/topics.json \
  --broker-list "1,2,3,4" > /tmp/reassignment-plan.json

Execute the reassignment:

kafka-reassign-partitions.sh \
  --bootstrap-server broker-1:9092 \
  --execute \
  --reassignment-json-file /tmp/reassignment-plan.json

High Availability Configuration

Implement policies for high availability using Kafka configurations and clustering strategies:

# Set cluster-wide policies for high availability
kafka-configs.sh \
  --bootstrap-server broker-1:9092 \
  --alter \
  --add-config unclean.leader.election.enable=false \
  --entity-type brokers \
  --entity-name ""

Configure automatic leader election:

# Enable automatic ISR shrinking
auto.leader.rebalance.enable=true

# Prevent unclean leader election (no data loss)
unclean.leader.election.enable=false

# Connection timeouts
controller.socket.timeout.ms=30000

Set up monitoring and alerting for cluster issues:

# Monitor under-replicated partitions
watch -n 5 'kafka-topics.sh --describe --under-replicated-partitions --bootstrap-server broker-1:9092'

# Monitor offline partitions
watch -n 5 'kafka-topics.sh --describe --offline --bootstrap-server broker-1:9092'

Troubleshooting Cluster Issues

Handle common cluster problems systematically. Check broker connectivity:

# Test broker connectivity
for broker in broker-1 broker-2 broker-3; do
  echo "Testing $broker"
  kafka-broker-api-versions.sh --bootstrap-server $broker:9092 && echo "OK" || echo "FAILED"
done

Resolve partition leadership issues:

# Identify partitions without a leader
kafka-topics.sh --describe \
  --bootstrap-server broker-1:9092 | grep "Leader: -1"

# Force leader election for offline partitions
kafka-leader-election.sh \
  --bootstrap-server broker-1:9092 \
  --election-type PREFERRED \
  --path-to-json-file /tmp/leaders.json

Handle broker failures by removing and adding brokers:

# Remove a failed broker from the cluster
kafka-reassign-partitions.sh \
  --bootstrap-server broker-1:9092 \
  --generate \
  --topics-to-move-json-file /tmp/topics.json \
  --broker-list "1,2,3" > /tmp/reassignment.json

Check ZooKeeper state:

# Connect to ZooKeeper and check broker registrations
zkCli.sh -server localhost:2181
ls /kafka/brokers/ids
cat /kafka/brokers/ids/1

Conclusion

Managing a production Kafka cluster requires proper configuration of multiple brokers, careful replication strategies, and continuous monitoring. This guide covered cluster architecture, networking, replication, partition assignment, consumer group management, scaling, and high availability. Implement regular backups, monitor consumption lag, maintain healthy ISR states, and plan for failure scenarios. Use monitoring tools like Prometheus and Grafana to track metrics and set up alerts for critical conditions to ensure your Kafka cluster operates reliably for real-time data streaming applications.