Celery Task Queue with Redis or RabbitMQ
Celery is a distributed task queue library for Python that enables asynchronous task execution, scheduling, and distributed computing. It works with multiple message brokers (Redis, RabbitMQ) and result backends, making it ideal for background job processing, scheduled tasks, and long-running operations.
Table of Contents
- Prerequisites
- Installing Celery
- Celery Architecture
- Configuring Celery with Redis
- Configuring Celery with RabbitMQ
- Defining and Running Tasks
- Celery Beat Scheduler
- Task Routing and Queues
- Monitoring with Flower
- Troubleshooting
- Conclusion
Prerequisites
Before using Celery, ensure you have:
- Python 3.7+ installed
- Redis 5.0+ or RabbitMQ 3.8+ (message broker)
- pip package manager
- Understanding of asynchronous programming
- Basic familiarity with message queues
Installing Celery
Install Celery and its dependencies:
pip3 install celery[redis]
# Or for RabbitMQ
pip3 install celery[amqp]
Install additional optional dependencies:
pip3 install redis # For Redis support
pip3 install pika # For RabbitMQ support
pip3 install flower # For monitoring
Verify installation:
celery --version
Celery Architecture
Celery consists of:
- Producer: Application that queues tasks
- Broker: Message queue (Redis, RabbitMQ) storing tasks
- Worker: Service processing tasks from the queue
- Result Backend: Stores task results (Redis, RabbitMQ, database)
Typical flow:
- Producer sends task to broker
- Worker picks task from broker
- Worker executes task
- Result stored in backend
- Producer retrieves result
Configuring Celery with Redis
Create a Celery application with Redis broker. Create celery_app.py:
#!/usr/bin/env python3
from celery import Celery
import os
# Create Celery app
app = Celery('myapp')
# Configure Redis broker
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
# Task execution settings
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# Task retry settings
task_acks_late=True,
task_reject_on_worker_lost=True,
# Worker settings
worker_prefetch_multiplier=4,
worker_max_tasks_per_child=1000,
# Task timeout (30 minutes)
task_soft_time_limit=1800,
task_time_limit=1900,
)
# Import tasks
from tasks import process_data, send_email
if __name__ == '__main__':
app.start()
Create tasks.py with task definitions:
#!/usr/bin/env python3
from celery_app import app
import time
import logging
logger = logging.getLogger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_data(self, data_id):
"""Long-running data processing task"""
try:
logger.info(f"Processing data {data_id}")
# Simulate processing
time.sleep(5)
result = f"Processed data {data_id}"
logger.info(f"Result: {result}")
return result
except Exception as exc:
logger.error(f"Task failed: {exc}")
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60)
@app.task(bind=True)
def send_email(self, recipient, subject, body):
"""Send email asynchronously"""
try:
logger.info(f"Sending email to {recipient}")
# Simulate sending
import smtplib
# Email sending code here
return f"Email sent to {recipient}"
except Exception as exc:
logger.error(f"Email send failed: {exc}")
return False
@app.task
def add(x, y):
"""Simple task for testing"""
return x + y
@app.task
def multiply(x, y):
"""Another simple task"""
return x * y
Start the Celery worker:
celery -A celery_app worker --loglevel=info
Queue tasks from your application:
#!/usr/bin/env python3
from celery_app import app
from tasks import process_data, send_email, add
# Queue task immediately
result = process_data.delay(123)
print(f"Task ID: {result.id}")
# Get task result (with timeout)
try:
output = result.get(timeout=30)
print(f"Result: {output}")
except Exception as e:
print(f"Task failed: {e}")
# Schedule task for later
import datetime
eta = datetime.datetime.now() + datetime.timedelta(minutes=5)
result = process_data.apply_async(args=(456,), eta=eta)
# Send email
send_email.delay('[email protected]', 'Welcome', 'Welcome to our service!')
# Chain multiple tasks
from celery import chain
result = chain(add.s(2, 2), multiply.s(4)).apply_async()
# Group parallel tasks
from celery import group
jobs = group(add.s(2, 2), add.s(4, 4), add.s(8, 8))
result = jobs.apply_async()
print(result.get())
Configuring Celery with RabbitMQ
Configure Celery to use RabbitMQ instead of Redis:
#!/usr/bin/env python3
from celery import Celery
app = Celery('myapp')
# Configure RabbitMQ broker
app.conf.update(
broker_url='amqp://guest:guest@localhost:5672//',
result_backend='rpc://', # Or use Redis for results
# Task execution settings
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
# RabbitMQ specific settings
broker_connection_retry_on_startup=True,
broker_connection_retry=True,
broker_pool_limit=None,
# Task routing
task_routes={
'tasks.send_email': {'queue': 'email'},
'tasks.process_data': {'queue': 'data-processing'},
},
)
from tasks import send_email, process_data
if __name__ == '__main__':
app.start()
With RabbitMQ, configure authentication if needed:
app.conf.broker_url = 'amqp://username:[email protected]:5672//vhost'
Defining and Running Tasks
Create tasks with various patterns:
from celery_app import app
from celery import group, chain, chord
import logging
logger = logging.getLogger(__name__)
# Basic task
@app.task
def basic_task(x, y):
return x + y
# Task with retries
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def task_with_retries(self, url):
try:
import requests
response = requests.get(url, timeout=10)
return response.status_code
except requests.RequestException as exc:
logger.error(f"Request failed: {exc}")
raise self.retry(exc=exc, countdown=120)
# Task with progress updates
@app.task(bind=True)
def long_task(self, items):
total = len(items)
for i, item in enumerate(items):
# Update progress
self.update_state(state='PROGRESS', meta={'current': i+1, 'total': total})
# Process item
time.sleep(1)
return f"Processed {total} items"
# Execute task immediately
result = basic_task(2, 3)
print(result)
# Queue task
async_result = basic_task.delay(5, 10)
print(f"Task ID: {async_result.id}")
print(f"Result: {async_result.get()}")
# Apply with kwargs
result = basic_task.apply_async(kwargs={'x': 4, 'y': 5})
# Chain tasks (execute sequentially)
workflow = chain(
basic_task.s(2, 2), # 2 + 2 = 4
basic_task.s(3), # 4 + 3 = 7
)
result = workflow.apply_async()
print(result.get())
# Group tasks (execute in parallel)
job = group([
basic_task.s(2, 2),
basic_task.s(4, 4),
basic_task.s(8, 8),
])
result = job.apply_async()
print(result.get())
# Chord (parallel with callback)
workflow = chord([
basic_task.s(2, 2),
basic_task.s(4, 4),
basic_task.s(8, 8),
])(basic_task.s(10)) # Callback receives results
result = workflow.apply_async()
Celery Beat Scheduler
Schedule periodic tasks with Celery Beat. Create beat_schedule.py:
#!/usr/bin/env python3
from celery_app import app
from celery.schedules import crontab
import datetime
# Configure Beat scheduler
app.conf.beat_schedule = {
# Run every 10 seconds
'add-every-10-seconds': {
'task': 'tasks.basic_task',
'schedule': 10.0,
'args': (2, 3)
},
# Run every minute
'process-data-every-minute': {
'task': 'tasks.process_data',
'schedule': 60.0,
'args': (123,)
},
# Run at specific time (daily)
'daily-cleanup': {
'task': 'tasks.cleanup_old_data',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
# Run every Monday at 8 AM
'weekly-report': {
'task': 'tasks.generate_report',
'schedule': crontab(day_of_week=0, hour=8, minute=0),
},
# Run every 5 minutes
'every-5-minutes': {
'task': 'tasks.health_check',
'schedule': 300.0,
}
}
# Configure timezone
app.conf.timezone = 'UTC'
Start Celery Beat:
celery -A celery_app beat --loglevel=info
Start worker to execute scheduled tasks:
celery -A celery_app worker --loglevel=info
Task Routing and Queues
Route tasks to specific workers based on task type. Configure routing:
#!/usr/bin/env python3
from celery import Celery
app = Celery('myapp')
app.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/0',
# Define queues
task_queues=({
'default': {'exchange': 'default', 'routing_key': 'default'},
'email': {'exchange': 'email', 'routing_key': 'email'},
'data-processing': {'exchange': 'data', 'routing_key': 'data'},
'high-priority': {'exchange': 'priority', 'routing_key': 'priority'},
}),
# Route tasks to queues
task_routes={
'tasks.send_email': {'queue': 'email', 'routing_key': 'email'},
'tasks.process_data': {'queue': 'data-processing', 'routing_key': 'data'},
'tasks.urgent_task': {'queue': 'high-priority', 'routing_key': 'priority'},
}
)
Start workers for specific queues:
# Worker processing only email tasks
celery -A celery_app worker -Q email --loglevel=info
# Worker processing data tasks
celery -A celery_app worker -Q data-processing --loglevel=info
# Worker processing multiple queues
celery -A celery_app worker -Q default,high-priority --loglevel=info
# Worker with concurrency setting
celery -A celery_app worker -c 4 --loglevel=info
Monitoring with Flower
Flower provides a web-based monitoring interface for Celery.
Install Flower:
pip3 install flower
Start Flower:
celery -A celery_app flower --port=5555
Access the dashboard at http://localhost:5555. Monitor:
- Active workers
- Running tasks
- Task history
- Queue statistics
- Worker pool management
Configure Flower:
celery -A celery_app flower \
--port=5555 \
--address=0.0.0.0 \
--basic_auth=user:password \
--persistent=True \
--db=/var/lib/flower/flower.db
Troubleshooting
Check worker status:
# Using flower web interface (best option)
curl http://localhost:5555/api/workers
# Using celery command
celery -A celery_app inspect active_queues
celery -A celery_app inspect stats
Debug task execution:
from celery_app import app
# Check task status
result = basic_task.delay(1, 2)
print(result.status)
print(result.result)
print(result.traceback)
# Get all states for task
from celery.result import AsyncResult
task_id = 'task-id-here'
result = AsyncResult(task_id, app=app)
print(result.state)
Common issues and solutions:
# Check broker connectivity
celery -A celery_app inspect ping
# Purge all queued tasks
celery -A celery_app purge
# Revoke specific task
celery -A celery_app revoke task-id-here
# Revoke all tasks
celery -A celery_app revoke --all
# Check registered tasks
celery -A celery_app inspect registered
Conclusion
Celery provides a robust, scalable task queue system for Python applications. This guide covered installation, configuration with Redis and RabbitMQ, task definition, scheduling with Beat, task routing, and monitoring with Flower. For production deployments, implement proper error handling, configure retry strategies, use dedicated workers for critical tasks, monitor queue depths and worker performance, and establish backup strategies for critical jobs. Celery excels in scenarios requiring background job processing, scheduled tasks, and distributed computing across multiple machines.


