Celery Task Queue with Redis or RabbitMQ

Celery is a distributed task queue library for Python that enables asynchronous task execution, scheduling, and distributed computing. It works with multiple message brokers (Redis, RabbitMQ) and result backends, making it ideal for background job processing, scheduled tasks, and long-running operations.

Table of Contents

Prerequisites

Before using Celery, ensure you have:

  • Python 3.7+ installed
  • Redis 5.0+ or RabbitMQ 3.8+ (message broker)
  • pip package manager
  • Understanding of asynchronous programming
  • Basic familiarity with message queues

Installing Celery

Install Celery and its dependencies:

pip3 install celery[redis]
# Or for RabbitMQ
pip3 install celery[amqp]

Install additional optional dependencies:

pip3 install redis  # For Redis support
pip3 install pika  # For RabbitMQ support
pip3 install flower  # For monitoring

Verify installation:

celery --version

Celery Architecture

Celery consists of:

  1. Producer: Application that queues tasks
  2. Broker: Message queue (Redis, RabbitMQ) storing tasks
  3. Worker: Service processing tasks from the queue
  4. Result Backend: Stores task results (Redis, RabbitMQ, database)

Typical flow:

  • Producer sends task to broker
  • Worker picks task from broker
  • Worker executes task
  • Result stored in backend
  • Producer retrieves result

Configuring Celery with Redis

Create a Celery application with Redis broker. Create celery_app.py:

#!/usr/bin/env python3
from celery import Celery
import os

# Create Celery app
app = Celery('myapp')

# Configure Redis broker
app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0',
    
    # Task execution settings
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    
    # Task retry settings
    task_acks_late=True,
    task_reject_on_worker_lost=True,
    
    # Worker settings
    worker_prefetch_multiplier=4,
    worker_max_tasks_per_child=1000,
    
    # Task timeout (30 minutes)
    task_soft_time_limit=1800,
    task_time_limit=1900,
)

# Import tasks
from tasks import process_data, send_email

if __name__ == '__main__':
    app.start()

Create tasks.py with task definitions:

#!/usr/bin/env python3
from celery_app import app
import time
import logging

logger = logging.getLogger(__name__)

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_data(self, data_id):
    """Long-running data processing task"""
    try:
        logger.info(f"Processing data {data_id}")
        
        # Simulate processing
        time.sleep(5)
        
        result = f"Processed data {data_id}"
        logger.info(f"Result: {result}")
        return result
        
    except Exception as exc:
        logger.error(f"Task failed: {exc}")
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60)

@app.task(bind=True)
def send_email(self, recipient, subject, body):
    """Send email asynchronously"""
    try:
        logger.info(f"Sending email to {recipient}")
        
        # Simulate sending
        import smtplib
        # Email sending code here
        
        return f"Email sent to {recipient}"
    except Exception as exc:
        logger.error(f"Email send failed: {exc}")
        return False

@app.task
def add(x, y):
    """Simple task for testing"""
    return x + y

@app.task
def multiply(x, y):
    """Another simple task"""
    return x * y

Start the Celery worker:

celery -A celery_app worker --loglevel=info

Queue tasks from your application:

#!/usr/bin/env python3
from celery_app import app
from tasks import process_data, send_email, add

# Queue task immediately
result = process_data.delay(123)
print(f"Task ID: {result.id}")

# Get task result (with timeout)
try:
    output = result.get(timeout=30)
    print(f"Result: {output}")
except Exception as e:
    print(f"Task failed: {e}")

# Schedule task for later
import datetime
eta = datetime.datetime.now() + datetime.timedelta(minutes=5)
result = process_data.apply_async(args=(456,), eta=eta)

# Send email
send_email.delay('[email protected]', 'Welcome', 'Welcome to our service!')

# Chain multiple tasks
from celery import chain
result = chain(add.s(2, 2), multiply.s(4)).apply_async()

# Group parallel tasks
from celery import group
jobs = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = jobs.apply_async()
print(result.get())

Configuring Celery with RabbitMQ

Configure Celery to use RabbitMQ instead of Redis:

#!/usr/bin/env python3
from celery import Celery

app = Celery('myapp')

# Configure RabbitMQ broker
app.conf.update(
    broker_url='amqp://guest:guest@localhost:5672//',
    result_backend='rpc://',  # Or use Redis for results
    
    # Task execution settings
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    
    # RabbitMQ specific settings
    broker_connection_retry_on_startup=True,
    broker_connection_retry=True,
    broker_pool_limit=None,
    
    # Task routing
    task_routes={
        'tasks.send_email': {'queue': 'email'},
        'tasks.process_data': {'queue': 'data-processing'},
    },
)

from tasks import send_email, process_data

if __name__ == '__main__':
    app.start()

With RabbitMQ, configure authentication if needed:

app.conf.broker_url = 'amqp://username:[email protected]:5672//vhost'

Defining and Running Tasks

Create tasks with various patterns:

from celery_app import app
from celery import group, chain, chord
import logging

logger = logging.getLogger(__name__)

# Basic task
@app.task
def basic_task(x, y):
    return x + y

# Task with retries
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def task_with_retries(self, url):
    try:
        import requests
        response = requests.get(url, timeout=10)
        return response.status_code
    except requests.RequestException as exc:
        logger.error(f"Request failed: {exc}")
        raise self.retry(exc=exc, countdown=120)

# Task with progress updates
@app.task(bind=True)
def long_task(self, items):
    total = len(items)
    for i, item in enumerate(items):
        # Update progress
        self.update_state(state='PROGRESS', meta={'current': i+1, 'total': total})
        
        # Process item
        time.sleep(1)
    
    return f"Processed {total} items"

# Execute task immediately
result = basic_task(2, 3)
print(result)

# Queue task
async_result = basic_task.delay(5, 10)
print(f"Task ID: {async_result.id}")
print(f"Result: {async_result.get()}")

# Apply with kwargs
result = basic_task.apply_async(kwargs={'x': 4, 'y': 5})

# Chain tasks (execute sequentially)
workflow = chain(
    basic_task.s(2, 2),      # 2 + 2 = 4
    basic_task.s(3),          # 4 + 3 = 7
)
result = workflow.apply_async()
print(result.get())

# Group tasks (execute in parallel)
job = group([
    basic_task.s(2, 2),
    basic_task.s(4, 4),
    basic_task.s(8, 8),
])
result = job.apply_async()
print(result.get())

# Chord (parallel with callback)
workflow = chord([
    basic_task.s(2, 2),
    basic_task.s(4, 4),
    basic_task.s(8, 8),
])(basic_task.s(10))  # Callback receives results
result = workflow.apply_async()

Celery Beat Scheduler

Schedule periodic tasks with Celery Beat. Create beat_schedule.py:

#!/usr/bin/env python3
from celery_app import app
from celery.schedules import crontab
import datetime

# Configure Beat scheduler
app.conf.beat_schedule = {
    # Run every 10 seconds
    'add-every-10-seconds': {
        'task': 'tasks.basic_task',
        'schedule': 10.0,
        'args': (2, 3)
    },
    
    # Run every minute
    'process-data-every-minute': {
        'task': 'tasks.process_data',
        'schedule': 60.0,
        'args': (123,)
    },
    
    # Run at specific time (daily)
    'daily-cleanup': {
        'task': 'tasks.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0),  # 2 AM daily
    },
    
    # Run every Monday at 8 AM
    'weekly-report': {
        'task': 'tasks.generate_report',
        'schedule': crontab(day_of_week=0, hour=8, minute=0),
    },
    
    # Run every 5 minutes
    'every-5-minutes': {
        'task': 'tasks.health_check',
        'schedule': 300.0,
    }
}

# Configure timezone
app.conf.timezone = 'UTC'

Start Celery Beat:

celery -A celery_app beat --loglevel=info

Start worker to execute scheduled tasks:

celery -A celery_app worker --loglevel=info

Task Routing and Queues

Route tasks to specific workers based on task type. Configure routing:

#!/usr/bin/env python3
from celery import Celery

app = Celery('myapp')

app.conf.update(
    broker_url='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0',
    
    # Define queues
    task_queues=({
        'default': {'exchange': 'default', 'routing_key': 'default'},
        'email': {'exchange': 'email', 'routing_key': 'email'},
        'data-processing': {'exchange': 'data', 'routing_key': 'data'},
        'high-priority': {'exchange': 'priority', 'routing_key': 'priority'},
    }),
    
    # Route tasks to queues
    task_routes={
        'tasks.send_email': {'queue': 'email', 'routing_key': 'email'},
        'tasks.process_data': {'queue': 'data-processing', 'routing_key': 'data'},
        'tasks.urgent_task': {'queue': 'high-priority', 'routing_key': 'priority'},
    }
)

Start workers for specific queues:

# Worker processing only email tasks
celery -A celery_app worker -Q email --loglevel=info

# Worker processing data tasks
celery -A celery_app worker -Q data-processing --loglevel=info

# Worker processing multiple queues
celery -A celery_app worker -Q default,high-priority --loglevel=info

# Worker with concurrency setting
celery -A celery_app worker -c 4 --loglevel=info

Monitoring with Flower

Flower provides a web-based monitoring interface for Celery.

Install Flower:

pip3 install flower

Start Flower:

celery -A celery_app flower --port=5555

Access the dashboard at http://localhost:5555. Monitor:

  • Active workers
  • Running tasks
  • Task history
  • Queue statistics
  • Worker pool management

Configure Flower:

celery -A celery_app flower \
  --port=5555 \
  --address=0.0.0.0 \
  --basic_auth=user:password \
  --persistent=True \
  --db=/var/lib/flower/flower.db

Troubleshooting

Check worker status:

# Using flower web interface (best option)
curl http://localhost:5555/api/workers

# Using celery command
celery -A celery_app inspect active_queues
celery -A celery_app inspect stats

Debug task execution:

from celery_app import app

# Check task status
result = basic_task.delay(1, 2)
print(result.status)
print(result.result)
print(result.traceback)

# Get all states for task
from celery.result import AsyncResult
task_id = 'task-id-here'
result = AsyncResult(task_id, app=app)
print(result.state)

Common issues and solutions:

# Check broker connectivity
celery -A celery_app inspect ping

# Purge all queued tasks
celery -A celery_app purge

# Revoke specific task
celery -A celery_app revoke task-id-here

# Revoke all tasks
celery -A celery_app revoke --all

# Check registered tasks
celery -A celery_app inspect registered

Conclusion

Celery provides a robust, scalable task queue system for Python applications. This guide covered installation, configuration with Redis and RabbitMQ, task definition, scheduling with Beat, task routing, and monitoring with Flower. For production deployments, implement proper error handling, configure retry strategies, use dedicated workers for critical tasks, monitor queue depths and worker performance, and establish backup strategies for critical jobs. Celery excels in scenarios requiring background job processing, scheduled tasks, and distributed computing across multiple machines.