Redis Pub/Sub for Real-Time Messaging

Redis Pub/Sub provides a lightweight, real-time messaging system built into Redis. It enables publish-subscribe messaging patterns with multiple subscribers listening to channels. While simpler than dedicated message brokers, Redis Pub/Sub excels for real-time notifications, live updates, and low-latency communication in applications already using Redis.

Table of Contents

Prerequisites

Before implementing Redis Pub/Sub, ensure you have:

  • Linux system with Redis 6.0+ installed
  • Python with redis-py library or other Redis client
  • Understanding of Pub/Sub messaging concepts
  • Basic Redis knowledge

Redis Installation and Setup

Install Redis on Ubuntu/Debian:

sudo apt-get update
sudo apt-get install -y redis-server redis-tools

Start and enable Redis:

sudo systemctl start redis-server
sudo systemctl enable redis-server

Verify Redis is running:

redis-cli ping

Should output: PONG

For production deployments, configure Redis security. Edit /etc/redis/redis.conf:

sudo nano /etc/redis/redis.conf

Essential configuration:

# Network binding
bind 127.0.0.1 ::1

# Port
port 6379

# Require authentication
requirepass redis_secure_password_123

# Database count
databases 16

# Memory limit
maxmemory 256mb
maxmemory-policy allkeys-lru

# Logging
loglevel notice
logfile /var/log/redis/redis-server.log

# Persistence
save 900 1
save 300 10
save 60 10000
appendonly yes

Restart Redis:

sudo systemctl restart redis-server

Test authenticated connection:

redis-cli -a redis_secure_password_123 ping

Pub/Sub Fundamentals

Redis Pub/Sub operates on the publish-subscribe model. Publishers send messages to channels, and subscribers listen to those channels without prior knowledge of each other.

Basic publish operation:

redis-cli
> PUBLISH notifications "Hello subscribers"
(integer) 2  # Returns number of subscribers that received the message

Subscribe to a channel:

redis-cli
> SUBSCRIBE notifications
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notifications"
3) (integer) 1

From another terminal, publish a message:

redis-cli
> PUBLISH notifications "System maintenance at 2 AM"
(integer) 1

The subscriber receives the message in real-time. Multiple subscribers can listen to the same channel:

# Terminal 1
redis-cli -a password
> SUBSCRIBE user-events
Reading messages...

# Terminal 2
redis-cli -a password
> SUBSCRIBE user-events
Reading messages...

# Terminal 3
redis-cli -a password
> PUBLISH user-events "User login event"
(integer) 2  # Both terminals receive the message

Channel Patterns and Wildcards

Redis Pub/Sub supports pattern matching with PSUBSCRIBE for subscribing to multiple channels with wildcards.

Subscribe to pattern:

redis-cli
> PSUBSCRIBE "user:*:notifications"
Reading messages...

Publish to channels matching the pattern:

redis-cli
> PUBLISH "user:123:notifications" "Profile updated"
(integer) 1
> PUBLISH "user:456:notifications" "Password changed"
(integer) 1
> PUBLISH "user:789:notifications" "Subscription activated"
(integer) 1

All messages to channels matching the pattern are delivered to the subscriber.

Common pattern examples:

# Subscribe to all channels
PSUBSCRIBE *

# Subscribe to user notifications
PSUBSCRIBE "user:*:*"

# Subscribe to event categories
PSUBSCRIBE "events:*"

# Subscribe to system alerts
PSUBSCRIBE "alerts:*:critical"

# Subscribe to multiple patterns
PSUBSCRIBE "user:*" "system:*" "events:*"

Unsubscribe from channels:

UNSUBSCRIBE notifications
PUNSUBSCRIBE "user:*:notifications"

List active subscriptions:

# From another connection
PUBSUB CHANNELS
PUBSUB NUMSUB notifications user-events
PUBSUB NUMPAT  # Number of pattern subscriptions

Practical Pub/Sub Examples

Create a Python publisher for real-time notifications:

pip3 install redis

Create publisher.py:

#!/usr/bin/env python3
import redis
import json
from datetime import datetime
import time

# Connect to Redis
r = redis.Redis(
    host='localhost',
    port=6379,
    password='redis_secure_password_123',
    decode_responses=True
)

def publish_notification(user_id, event_type, message):
    """Publish a notification to user channel"""
    channel = f"user:{user_id}:notifications"
    
    payload = {
        "event_type": event_type,
        "message": message,
        "timestamp": datetime.now().isoformat()
    }
    
    subscribers = r.publish(channel, json.dumps(payload))
    print(f"Published to {channel}: {subscribers} subscribers received message")
    return subscribers

def publish_system_alert(severity, message):
    """Publish a system-wide alert"""
    channel = f"alerts:{severity}"
    
    payload = {
        "severity": severity,
        "message": message,
        "timestamp": datetime.now().isoformat()
    }
    
    subscribers = r.publish(channel, json.dumps(payload))
    print(f"Published alert: {subscribers} subscribers received message")

# Example usage
if __name__ == "__main__":
    # Publish user-specific notification
    publish_notification("user_123", "login", "New login from 192.168.1.100")
    
    # Publish system alert
    publish_system_alert("critical", "Database connection pool exhausted")
    
    # Multiple notifications
    for i in range(1, 6):
        publish_notification(f"user_{i:03d}", "message", f"You have a new message {i}")
        time.sleep(1)

Create subscriber.py:

#!/usr/bin/env python3
import redis
import json
from threading import Thread

# Connect to Redis
r = redis.Redis(
    host='localhost',
    port=6379,
    password='redis_secure_password_123',
    decode_responses=True
)

def subscribe_user_notifications(user_id):
    """Subscribe to user-specific notifications"""
    pubsub = r.pubsub()
    channel = f"user:{user_id}:notifications"
    pubsub.subscribe(channel)
    
    print(f"Subscribed to {channel}")
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            try:
                data = json.loads(message['data'])
                print(f"[{channel}] {data}")
            except json.JSONDecodeError:
                print(f"[{channel}] {message['data']}")

def subscribe_alerts(severity):
    """Subscribe to severity-based alerts"""
    pubsub = r.pubsub()
    channel = f"alerts:{severity}"
    pubsub.subscribe(channel)
    
    print(f"Subscribed to {channel}")
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            try:
                data = json.loads(message['data'])
                print(f"[ALERT {severity.upper()}] {data}")
            except json.JSONDecodeError:
                print(f"[ALERT {severity.upper()}] {message['data']}")

def subscribe_pattern(pattern):
    """Subscribe to channel pattern"""
    pubsub = r.pubsub()
    pubsub.psubscribe(pattern)
    
    print(f"Subscribed to pattern: {pattern}")
    
    for message in pubsub.listen():
        if message['type'] == 'pmessage':
            print(f"[{message['channel']}] {message['data']}")

# Run subscribers in separate threads
if __name__ == "__main__":
    # Subscribe to user notifications
    user_thread = Thread(target=subscribe_user_notifications, args=("user_123",))
    user_thread.daemon = True
    user_thread.start()
    
    # Subscribe to critical alerts
    alert_thread = Thread(target=subscribe_alerts, args=("critical",))
    alert_thread.daemon = True
    alert_thread.start()
    
    # Subscribe to all events
    pattern_thread = Thread(target=subscribe_pattern, args=("events:*",))
    pattern_thread.daemon = True
    pattern_thread.start()
    
    # Keep the program running
    try:
        while True:
            pass
    except KeyboardInterrupt:
        print("Shutting down")

Run subscriber in one terminal:

python3 subscriber.py

Run publisher in another:

python3 publisher.py

Streams vs Pub/Sub

Redis Streams provide persistence and message history, unlike Pub/Sub which loses messages when no subscribers are connected. Compare the two:

redis-cli

Pub/Sub example (messages lost if no subscribers):

# Terminal 1 - Publish first
> PUBLISH events "Event 1"
(integer) 0  # No subscribers

# Terminal 2 - Subscribe later
> SUBSCRIBE events
# Never receives "Event 1"

Streams example (messages are stored):

# Add message to stream
> XADD events * type "login" user_id "123"
"1234567890-0"

# Consume from stream (gets historical messages)
> XREAD STREAMS events 0
1) 1) "events"
   2) 1) 1) "1234567890-0"
         2) 1) "type"
            2) "login"
            3) "user_id"
            4) "123"

Use Pub/Sub for: real-time notifications, live updates, fast transient messages Use Streams for: audit logs, persistent event history, message replay capability

Create a hybrid solution combining both:

#!/usr/bin/env python3
import redis
import json
from datetime import datetime

r = redis.Redis(
    host='localhost',
    port=6379,
    password='redis_secure_password_123',
    decode_responses=True
)

def publish_event(event_type, data):
    """Publish event to both Pub/Sub (real-time) and Stream (persistent)"""
    
    payload = {
        "event_type": event_type,
        "data": data,
        "timestamp": datetime.now().isoformat()
    }
    
    # Pub/Sub for real-time delivery
    r.publish(f"events:{event_type}", json.dumps(payload))
    
    # Stream for persistence
    r.xadd("events:stream", payload)
    
    print(f"Published event: {event_type}")

# Use cases
publish_event("user_login", {"user_id": "123", "ip": "192.168.1.1"})
publish_event("order_created", {"order_id": "ORD-001", "amount": 99.99})

Performance Considerations

Redis Pub/Sub is optimized for speed but has limitations. Test performance:

# Measure pub/sub throughput
redis-benchmark -p 6379 -a redis_secure_password_123 -t publish -n 100000 -q

Configuration optimizations:

# Edit redis.conf
sudo nano /etc/redis/redis.conf

Add for Pub/Sub optimization:

# Disable persistence for higher throughput if appropriate
save ""
appendonly no

# Increase client output buffer limits for subscribers
client-output-buffer-limit pubsub 32mb 8mb 60

# TCP backlog
tcp-backlog 511

# Protocol buffer size
proto-max-bulk-len 536870912

Monitor Pub/Sub performance:

redis-cli
> INFO stats
> PUBSUB CHANNELS
> PUBSUB NUMPAT

Limitations and Workarounds

Key limitations of Redis Pub/Sub:

  1. No message persistence: Messages sent to channels with no subscribers are lost
  2. No message acknowledgment: No way to confirm message delivery
  3. Fire-and-forget: No guaranteed delivery mechanism

Workarounds:

  1. Use Redis Streams for persistence:
r.xadd("messages:backup", {"channel": "events", "message": "data"})
  1. Use a separate worker to acknowledge messages:
def reliable_publish(channel, message):
    # Store in reliable queue
    r.lpush(f"queue:{channel}", json.dumps(message))
    # Publish to real-time subscribers
    r.publish(channel, json.dumps(message))
  1. Implement at-least-once delivery with worker processing:
# Publisher
r.rpush("pending_messages", json.dumps({"channel": "events", "msg": "data"}))

# Worker
while True:
    msg = r.rpoplpush("pending_messages", "processing_messages")
    if msg:
        publish_msg(msg)
        r.lrem("processing_messages", 0, msg)

Monitoring Pub/Sub

Monitor Pub/Sub activity:

redis-cli
> PUBSUB CHANNELS
> PUBSUB NUMSUB notifications alerts:critical
> PUBSUB NUMPAT

Create a monitoring script:

#!/usr/bin/env python3
import redis
import json
from datetime import datetime

r = redis.Redis(
    host='localhost',
    port=6379,
    password='redis_secure_password_123',
    decode_responses=True
)

def monitor_pubsub():
    """Monitor active Pub/Sub subscriptions"""
    
    # Get active channels
    channels = r.pubsub_channels()
    print(f"Active channels: {channels}")
    
    # Get subscriber count per channel
    if channels:
        numsub = r.pubsub_numsub(*channels)
        for channel, count in numsub:
            print(f"  {channel}: {count} subscribers")
    
    # Get pattern subscriptions
    numpat = r.pubsub_numpat()
    print(f"Pattern subscriptions: {numpat}")
    
    # Get stats
    info = r.info('stats')
    print(f"Total commands processed: {info.get('total_commands_processed')}")

if __name__ == "__main__":
    monitor_pubsub()

Advanced Patterns

Implement request-response pattern with Pub/Sub:

import redis
import uuid
import time

r = redis.Redis(decode_responses=True)

def request_reply(request_channel, response_channel, request_data, timeout=5):
    """Implement request-reply with Pub/Sub"""
    
    # Subscribe to response channel
    pubsub = r.pubsub()
    pubsub.subscribe(response_channel)
    
    # Send request
    r.publish(request_channel, json.dumps(request_data))
    
    # Wait for response
    start = time.time()
    for message in pubsub.listen():
        if message['type'] == 'message':
            return json.loads(message['data'])
        
        if time.time() - start > timeout:
            raise TimeoutError("No response received")

# Usage
request_id = str(uuid.uuid4())
response = request_reply(
    "service:requests",
    f"service:responses:{request_id}",
    {"request_id": request_id, "action": "get_user", "user_id": "123"}
)

Conclusion

Redis Pub/Sub provides a lightweight, real-time messaging solution ideal for applications already using Redis. It excels for live notifications, dashboard updates, and real-time data distribution with minimal latency overhead. However, for guaranteed message delivery and persistence requirements, combine Pub/Sub with Streams or use dedicated message brokers like RabbitMQ or Kafka. For production deployments using Pub/Sub, implement proper error handling, consider persistence requirements, set up monitoring, and plan for message delivery guarantees if needed. Redis Pub/Sub remains an excellent choice for latency-critical, real-time applications where some message loss is acceptable.