Instalación de Apache Pulsar

Apache Pulsar is a distributed pub-sub messaging platform designed for high-availability, geo-replication, and multi-tenancy. It separates computing (brokers) from storage (BookKeeper), providing superior scalability and durability compared to traditional message brokers. This guide covers standalone and cluster deployment, tenant/namespace management, and topic operations.

Tabla de Contenidos

Requisitos Previos

Before installing Pulsar, ensure you have:

  • Linux system (Ubuntu 20.04+, CentOS 8+)
  • Java 11+ installed and JAVA_HOME configured
  • 4GB+ RAM available
  • At least 50GB disk space
  • Internet connectivity
  • ZooKeeper knowledge (basic)

Understanding Pulsar Architecture

Pulsar separates compute (brokers) from storage (BookKeeper):

  • Brokers: Handle publish/subscribe, stateless and horizontally scalable
  • BookKeeper: Distributed ledger storage, provides data durability
  • ZooKeeper: Cluster coordination and metadata management

This separation enables:

  • Independent scaling of brokers and storage
  • Instant broker recovery without data loss
  • Geographic replication across regions

Installing Standalone Pulsar

Standalone mode runs all components in a single process, suitable for development and testing.

Download Pulsar:

cd /opt
sudo wget https://archive.apache.org/dist/pulsar/pulsar-3.0.1/apache-pulsar-3.0.1-bin.tar.gz
sudo tar -xzf apache-pulsar-3.0.1-bin.tar.gz
sudo mv apache-pulsar-3.0.1 pulsar
sudo chown -R pulsar:pulsar /opt/pulsar

Create a pulsar user:

sudo useradd -r -s /bin/bash pulsar

Configure environment:

sudo tee /home/pulsar/.bashrc <<EOF
export PULSAR_HOME=/opt/pulsar
export PATH=$PULSAR_HOME/bin:$PATH
EOF

Start Pulsar in standalone mode:

# Foreground (for testing)
pulsar standalone

# Background (production)
nohup pulsar standalone > /var/log/pulsar/standalone.log 2>&1 &

Verify Pulsar is running:

pulsar-admin brokers list

Should output the local broker. Access the Pulsar Dashboard at http://localhost:8080.

Configuring Pulsar Cluster

Deploy a multi-node Pulsar cluster with separate ZooKeeper, BookKeeper, and Broker tiers.

Set up ZooKeeper cluster first. Configure three ZooKeeper nodes with /etc/pulsar/zk.conf on each:

sudo mkdir -p /etc/pulsar /var/lib/pulsar-zk
sudo chown pulsar:pulsar /var/lib/pulsar-zk

On ZK1 (192.168.1.10):

sudo tee /etc/pulsar/zk.conf <<EOF
server.1=zk1.internal:2888:3888
server.2=zk2.internal:2888:3888
server.3=zk3.internal:2888:3888

dataDir=/var/lib/pulsar-zk
clientPort=2181

autopurge.snapRetainCount=3
autopurge.purgeInterval=1
EOF

Create ZooKeeper ID file:

echo "1" | sudo tee /var/lib/pulsar-zk/myid
sudo chown pulsar:pulsar /var/lib/pulsar-zk/myid

Repeat on ZK2 and ZK3, changing the myid file to "2" and "3" respectively.

Start ZooKeeper:

pulsar zk-server /etc/pulsar/zk.conf

Configure BookKeeper for storage. Create /etc/pulsar/bk.conf on each BookKeeper node:

sudo tee /etc/pulsar/bk.conf <<EOF
# ZooKeeper configuration
zkServers=zk1.internal:2181,zk2.internal:2181,zk3.internal:2181

# BookKeeper storage
journalDirectories=/var/lib/pulsar-bk/journal
ledgerDirectories=/var/lib/pulsar-bk/ledgers

# Performance tuning
numWorkerThreads=8
flushInterval=10000

# Enable authentication
# clientSecureServerPort=3181

# Replication
ensembleSize=3
quorumSize=2
ackQuorumSize=2
EOF

Create storage directories:

sudo mkdir -p /var/lib/pulsar-bk/journal /var/lib/pulsar-bk/ledgers
sudo chown -R pulsar:pulsar /var/lib/pulsar-bk

Start BookKeeper:

pulsar bk-server /etc/pulsar/bk.conf

Configure Pulsar Brokers. Create /etc/pulsar/broker.conf on each broker:

sudo tee /etc/pulsar/broker.conf <<EOF
# Cluster configuration
clusterName=pulsar-cluster
zookeeperServers=zk1.internal:2181,zk2.internal:2181,zk3.internal:2181
brokerServiceURL=pulsar://broker1.internal:6650
brokerServiceURLTls=pulsears://broker1.internal:6651
webServiceURL=http://broker1.internal:8080
webServiceURLTls=https://broker1.internal:8443

# Storage configuration
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2

# Performance tuning
numHttpServerThreads=64
httpServerThreadPoolSize=64
numWorkerThreads=32

# Replication
enableReplicaProducer=true

# Persistence
loadBalancerEnabled=true
manualLoadBalancerConfigPath=/etc/pulsar/broker-loadbalancer.json
EOF

Start Pulsar Brokers:

pulsar broker /etc/pulsar/broker.conf

Verify cluster formation:

pulsar-admin clusters list
pulsar-admin brokers list --cluster pulsar-cluster

Managing Tenants and Namespaces

Pulsar uses a multi-tenant architecture with hierarchical organization:

  • Tenant: Top-level administrative unit
  • Namespace: Collection of topics within a tenant
  • Topic: Actual message channel

Create a tenant:

pulsar-admin tenants create company-a \
  --admin-roles admin \
  --allowed-clusters pulsar-cluster

List tenants:

pulsar-admin tenants list

Create a namespace:

pulsar-admin namespaces create company-a/production \
  --clusters pulsar-cluster

List namespaces:

pulsar-admin namespaces list company-a

Set namespace policies (retention, TTL, replication):

# Set message retention to 7 days
pulsar-admin namespaces set-message-ttl company-a/production --messageTtl 604800

# Set persistence policies
pulsar-admin namespaces set-persistence company-a/production \
  --ensemble-size 3 \
  --write-quorum 2 \
  --ack-quorum 2

# Enable geo-replication
pulsar-admin namespaces set-clusters \
  company-a/production \
  --clusters pulsar-cluster,us-west,eu-west

Delete a namespace:

pulsar-admin namespaces delete company-a/production

Topic Management

Topics are the fundamental unit of messaging in Pulsar.

Create a topic:

pulsar-admin topics create \
  persistent://company-a/production/orders

# Or for non-persistent topics
pulsar-admin topics create \
  non-persistent://company-a/production/notifications

List topics in a namespace:

pulsar-admin topics list company-a/production

Create a partitioned topic (for parallel consumption):

pulsar-admin topics create-partitioned-topic \
  persistent://company-a/production/events \
  --partitions 4

Get topic details:

pulsar-admin topics describe \
  persistent://company-a/production/orders

pulsar-admin topics get-message-ttl \
  persistent://company-a/production/orders

Set retention policy:

pulsar-admin topics set-message-ttl \
  persistent://company-a/production/orders \
  --messageTtl 2592000  # 30 days in seconds

Purge a topic (delete all messages):

pulsar-admin topics purge \
  persistent://company-a/production/orders

Delete a topic:

pulsar-admin topics delete \
  persistent://company-a/production/orders

Subscriptions and Consumers

Subscriptions define how consumer groups consume messages from topics.

Create a subscription:

pulsar-admin subscriptions create \
  persistent://company-a/production/orders \
  order-processor

List subscriptions:

pulsar-admin subscriptions list \
  persistent://company-a/production/orders

Get subscription details:

pulsar-admin subscriptions stats \
  persistent://company-a/production/orders/order-processor

Configure subscription type using client libraries. Install Python client:

pip3 install pulsar-client

Create a consumer:

#!/usr/bin/env python3
import pulsar
import json

# Connect to cluster
client = pulsar.Client('pulsar://localhost:6650')

# Consumer with exclusive subscription
consumer = client.subscribe(
    topic='persistent://company-a/production/orders',
    subscription_name='order-processor',
    subscription_type=pulsar.ConsumerType.Exclusive,
    consumer_type=pulsar.ConsumerType.Exclusive
)

try:
    while True:
        msg = consumer.receive()
        data = json.loads(msg.data())
        print(f"Received: {data}")
        consumer.acknowledge(msg)
except KeyboardInterrupt:
    consumer.close()
    client.close()

Shared subscription (multiple consumers in group):

consumer = client.subscribe(
    topic='persistent://company-a/production/orders',
    subscription_name='order-processors',
    subscription_type=pulsar.ConsumerType.Shared
)

Failover subscription (one active consumer, others standby):

consumer = client.subscribe(
    topic='persistent://company-a/production/orders',
    subscription_name='order-backup',
    subscription_type=pulsar.ConsumerType.Failover
)

Reset subscription offset:

pulsar-admin subscriptions reset-cursor \
  persistent://company-a/production/orders/order-processor \
  --time 2024-01-15T10:30:00

Pulsar Functions

Pulsar Functions enable lightweight stream processing within Pulsar.

Create a simple function:

# simple_function.py
def process(input):
    return input.upper()

Submit the function:

pulsar-admin functions create \
  --function-config-file /etc/pulsar/function-config.yaml \
  --jar /opt/pulsar/examples/api-examples.jar \
  --classname org.apache.pulsar.functions.api.examples.ExclamationFunction

Or via YAML configuration:

cat > /etc/pulsar/function-config.yaml <<EOF
name: ProcessOrders
namespace: company-a/production
tenant: company-a
inputs:
  - persistent://company-a/production/orders
output: persistent://company-a/production/processed-orders
className: com.example.OrderProcessor
runtime: JAVA
parallelism: 4
resources:
  cpu: 0.5
  memory: 512
  disk: 1024
EOF

pulsar-admin functions create --function-config-file /etc/pulsar/function-config.yaml

List functions:

pulsar-admin functions list --namespace company-a/production

Get function status:

pulsar-admin functions status \
  --namespace company-a/production \
  --function-name ProcessOrders

Monitoring and Management

Monitor Pulsar cluster health using the dashboard and command-line tools.

Access the Pulsar Dashboard at http://localhost:8080 to visualize:

  • Cluster topology
  • Broker status
  • Topic metrics
  • Consumer lag
  • Function execution

Monitor broker metrics:

pulsar-admin brokers stats

Check broker connectivity:

pulsar-admin brokers list --cluster pulsar-cluster

Monitor namespace usage:

pulsar-admin namespaces stats-internal company-a/production

Track consumer lag:

pulsar-admin subscriptions stats \
  persistent://company-a/production/orders/order-processor

Set up Prometheus monitoring:

curl http://localhost:8080/admin/v2/broker-stats/prometheus

Solución de Problemas

Check broker logs:

tail -f /var/log/pulsar/broker.log

Verify ZooKeeper connectivity:

echo ruok | nc localhost 2181

Check BookKeeper status:

pulsar-admin bookkeeper list-ledgers
pulsar-admin bookkeeper ledger info <ledger-id>

Verify cluster metadata:

pulsar-admin clusters get pulsar-cluster

Test producer/consumer:

# Producer
pulsar-client produce -m "test message" \
  persistent://company-a/production/test

# Consumer
pulsar-client consume -s test-sub -n 10 \
  persistent://company-a/production/test

Conclusión

Apache Pulsar provides a modern, scalable messaging platform with separation of compute and storage, multi-tenancy, and geo-replication capabilities. This guide covered standalone and cluster deployment, tenant/namespace organization, topic and subscription management, and Pulsar Functions for stream processing. For production deployments, configure proper replication factors, enable TLS encryption, set retention policies, monitor cluster health, and establish disaster recovery procedures. Pulsar's architecture makes it excellent for building resilient, globally distributed messaging systems at enterprise scale.