background tasks and celery: the complete guide
Why Background Tasks Matter
Picture this: a user clicks "Sign Up" on your website. Your app needs to send a welcome email, process their profile picture, create database records, and maybe notify your analytics service. If you do all of this synchronously, the user sits there waiting 5-10 seconds staring at a loading spinner. That's a terrible experience.
This is where background tasks come in. They let you respond to the user immediately while handling the heavy lifting in the background. Your app stays fast and responsive, and complex tasks get done without blocking anything.
In this post, we'll build a real document processing service using Flask, Celery, and Docker. You'll learn everything from the ground up: brokers, queues, workers, scaling, monitoring, and error handling. By the end, you'll have a production-ready system running in containers.
What We're Building
A document processing service where users can:
- Upload PDFs and images
- Extract text from documents automatically
- Generate thumbnails for images
- Analyze content for keywords
- Process multiple documents in parallel
- Get reports combining results from batch processing
All running in Docker containers with zero local installation needed (except Docker itself).
Prerequisites:
- Basic Python knowledge
- Docker installed on your machine
- A text editor
That's it. Let's build.
What Are Background Tasks?
A background task is any operation that can happen asynchronously—meaning it doesn't need to complete immediately for the user to continue using your app.
Common examples:
- Sending emails (welcome emails, password resets, notifications)
- Processing images (resizing, compression, thumbnails)
- Generating reports (PDFs, CSV exports)
- Data aggregation (analytics, dashboard calculations)
- Third-party API calls (payment processing, social media posts)
- Scheduled jobs (daily backups, weekly summaries)
The key insight: if the user doesn't need the result immediately, it should probably be a background task.
The Core Components: Broker, Queue, and Workers
To understand background tasks, you need to understand three core concepts that work together:
The Queue
A queue is exactly what it sounds like—a line where tasks wait to be processed. When your application creates a task, it gets added to the queue. Tasks are processed in order (usually first-in, first-out), though you can configure priorities.
Think of it like a line at a coffee shop. Orders come in, they wait in line, and baristas process them one by one.
The Broker
The broker is the middleman that manages the queue. It receives tasks from your application, stores them safely, and delivers them to workers when they're ready to process.
Popular brokers:
- Redis: Fast, simple, perfect for most use cases. It's an in-memory data store that's easy to set up.
- RabbitMQ: More robust, with advanced features like message routing and guaranteed delivery. Better for large-scale systems.
The broker ensures tasks don't get lost. If your app crashes after creating a task, the broker still has it queued up safely.
The Worker
Workers are processes that continuously check the queue for new tasks and execute them. You can run multiple workers to process tasks in parallel. Workers can run on the same machine as your app or on completely separate servers.
When a worker finishes a task, it immediately checks the queue for the next one. If the queue is empty, workers wait until new tasks arrive.
Enter Celery
Celery is a distributed task queue system for Python. It handles all the complexity of managing tasks, workers, and brokers, giving you a simple API to create and execute background tasks.
Why Celery?
- Battle-tested in production at massive scale
- Works with Flask, Django, and standalone Python
- Supports complex workflows (chains, groups, chords)
- Built-in retry mechanisms and error handling
- Powerful monitoring and scaling tools
- Active community and excellent documentation
Setting Up Celery with Flask Using Docker
Let's build a real document processing service. Users can upload documents, and we'll extract text, generate thumbnails, analyze content, and create reports—all in the background. We'll use Docker to make setup incredibly simple.
Project Structure
Create this folder structure:
document-processor/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app.py
├── tasks.py
└── uploads/
Docker Setup
First, create docker-compose.yml:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
web:
build: .
command: python app.py
volumes:
- .:/app
- ./uploads:/app/uploads
ports:
- "5000:5000"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
worker:
build: .
command: celery -A tasks.celery worker --loglevel=info --concurrency=4
volumes:
- .:/app
- ./uploads:/app/uploads
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
flower:
build: .
command: celery -A tasks.celery flower --port=5555
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
- worker
volumes:
redis_data:
Create Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies for PDF/image processing
RUN apt-get update && apt-get install -y \
poppler-utils \
tesseract-ocr \
libmagic1 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "app.py"]
Create requirements.txt:
flask==3.0.0
celery==5.3.4
redis==5.0.1
flower==2.0.1
Pillow==10.1.0
PyPDF2==3.0.1
python-magic==0.4.27
Building the Document Processing Service
Create tasks.py with our Celery tasks:
from celery import Celery
from PIL import Image
import PyPDF2
import os
import time
import magic
import logging
logger = logging.getLogger(__name__)
# Initialize Celery
celery = Celery('document_processor')
celery.conf.update(
broker_url=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
result_backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC',
enable_utc=True,
)
@celery.task(bind=True, max_retries=3)
def extract_text_from_pdf(self, file_path):
"""Extract text content from PDF"""
try:
logger.info(f"Extracting text from {file_path}")
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
word_count = len(text.split())
logger.info(f"Extracted {word_count} words from {file_path}")
return {
'file': file_path,
'text': text[:500], # First 500 chars
'word_count': word_count,
'page_count': len(pdf_reader.pages)
}
except Exception as exc:
logger.error(f"Failed to extract text: {exc}")
raise self.retry(exc=exc, countdown=60)
@celery.task
def generate_thumbnail(file_path, size=(300, 300)):
"""Generate thumbnail for image files"""
logger.info(f"Generating thumbnail for {file_path}")
try:
img = Image.open(file_path)
img.thumbnail(size)
# Save thumbnail
base, ext = os.path.splitext(file_path)
thumb_path = f"{base}_thumb{ext}"
img.save(thumb_path)
return {
'original': file_path,
'thumbnail': thumb_path,
'size': img.size
}
except Exception as e:
logger.error(f"Thumbnail generation failed: {e}")
raise
@celery.task
def analyze_content(text_data):
"""Analyze extracted text for keywords and sentiment"""
logger.info("Analyzing content")
time.sleep(2) # Simulate ML processing
text = text_data.get('text', '')
words = text.lower().split()
# Simple keyword extraction (in real world, use NLP libraries)
common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
keywords = [w for w in set(words) if w not in common_words and len(w) > 3][:10]
return {
'keywords': keywords,
'sentiment': 'positive' if 'good' in words else 'neutral',
'analyzed_words': len(words)
}
@celery.task
def generate_report(analysis_results):
"""Generate final report from all analysis"""
logger.info("Generating final report")
report = {
'status': 'completed',
'summary': {
'total_documents': len(analysis_results),
'total_words': sum(r.get('word_count', 0) for r in analysis_results),
'processed_at': time.strftime('%Y-%m-%d %H:%M:%S')
},
'details': analysis_results
}
logger.info(f"Report generated for {len(analysis_results)} documents")
return report
@celery.task
def detect_file_type(file_path):
"""Detect file type using magic numbers"""
logger.info(f"Detecting file type for {file_path}")
mime = magic.Magic(mime=True)
file_type = mime.from_file(file_path)
return {
'file': file_path,
'mime_type': file_type,
'is_pdf': 'pdf' in file_type,
'is_image': 'image' in file_type
}
Create app.py with Flask routes:
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
from celery import chain, group, chord
import os
from tasks import (
celery, extract_text_from_pdf, generate_thumbnail,
analyze_content, generate_report, detect_file_type
)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
@app.route('/')
def index():
return jsonify({
'service': 'Document Processing API',
'status': 'running',
'endpoints': {
'upload': '/api/upload',
'process': '/api/process/<file_id>',
'batch': '/api/batch-process',
'status': '/api/task/<task_id>'
}
})
@app.route('/api/upload', methods=['POST'])
def upload_file():
"""Upload a document for processing"""
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# Start processing immediately
task = detect_file_type.delay(file_path)
return jsonify({
'message': 'File uploaded successfully',
'file': filename,
'task_id': task.id
}), 201
@app.route('/api/process/<filename>', methods=['POST'])
def process_document(filename):
"""Process a single document with full pipeline"""
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# Create processing chain: detect -> extract -> analyze -> report
workflow = chain(
detect_file_type.s(file_path),
extract_text_from_pdf.s(),
analyze_content.s()
)
result = workflow.apply_async()
return jsonify({
'message': 'Processing started',
'task_id': result.id,
'file': filename
})
@app.route('/api/batch-process', methods=['POST'])
def batch_process():
"""Process multiple documents in parallel and generate combined report"""
files = request.json.get('files', [])
if not files:
return jsonify({'error': 'No files specified'}), 400
# Process all files in parallel, then generate report
workflow = chord(
chain(
detect_file_type.s(os.path.join(app.config['UPLOAD_FOLDER'], f)),
extract_text_from_pdf.s(),
analyze_content.s()
) for f in files
)(generate_report.s())
result = workflow.apply_async()
return jsonify({
'message': f'Batch processing started for {len(files)} files',
'task_id': result.id,
'files': files
})
@app.route('/api/process-images', methods=['POST'])
def process_images():
"""Process multiple images in parallel"""
images = request.json.get('images', [])
if not images:
return jsonify({'error': 'No images specified'}), 400
# Generate thumbnails for all images in parallel
job = group(
generate_thumbnail.s(os.path.join(app.config['UPLOAD_FOLDER'], img))
for img in images
)
result = job.apply_async()
return jsonify({
'message': f'Processing {len(images)} images',
'group_id': result.id
})
@app.route('/api/task/<task_id>')
def get_task_status(task_id):
"""Check status of any task"""
task = celery.AsyncResult(task_id)
response = {
'task_id': task_id,
'state': task.state,
'ready': task.ready(),
}
if task.ready():
if task.successful():
response['result'] = task.result
else:
response['error'] = str(task.info)
else:
response['status'] = 'Task is still processing...'
return jsonify(response)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
Running Everything with Docker
Just one command starts everything:
docker-compose up --build
This starts:
- Redis on port 6379 (message broker)
- Flask app on port 5000 (API)
- Celery workers (4 concurrent workers)
- Flower on port 5555 (monitoring dashboard)
Testing the Document Processor
Upload a PDF document:
curl -X POST http://localhost:5000/api/upload \
-F "file=@document.pdf"
Process a single document:
curl -X POST http://localhost:5000/api/process/document.pdf
Batch process multiple documents:
curl -X POST http://localhost:5000/api/batch-process \
-H "Content-Type: application/json" \
-d '{"files": ["doc1.pdf", "doc2.pdf", "doc3.pdf"]}'
Check task status:
curl http://localhost:5000/api/task/YOUR_TASK_ID
Open Flower dashboard:
Visit http://localhost:5555 in your browser to see real-time task monitoring.
What's Happening Behind the Scenes
When you upload and process a document:
- File Upload: Flask receives the file and saves it
- Task Queued: The processing task is sent to Redis
- Worker Picks Up: A Celery worker grabs the task from the queue
- Chain Execution: Tasks run in sequence:
- Detect file type
- Extract text from PDF
- Analyze content for keywords
- Result Stored: Results are saved in Redis
- API Response: You can check the status anytime
For batch processing, multiple documents process in parallel, then a final task combines all results into one report. This demonstrates the real power of Celery.
Understanding Task Execution Methods
Celery gives you different ways to execute tasks depending on your needs.
.delay() - Simple Execution
# Process document immediately
extract_text_from_pdf.delay('/uploads/report.pdf')
This is the simplest way. It queues the task immediately and returns.
.apply_async() - Advanced Execution
For more control, use .apply_async():
# Process document after 60 seconds (delayed processing)
extract_text_from_pdf.apply_async(
args=['/uploads/report.pdf'],
countdown=60
)
# Schedule processing for specific time (e.g., off-peak hours)
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(hours=2)
extract_text_from_pdf.apply_async(
args=['/uploads/large_document.pdf'],
eta=eta
)
# Set high priority for urgent documents
extract_text_from_pdf.apply_async(
args=['/uploads/urgent.pdf'],
priority=9
)
Getting Task Results
The /api/task/<task_id> endpoint we created shows how to retrieve results. Every task returns an AsyncResult object that you can query:
task = celery.AsyncResult(task_id)
# Check if task is finished
if task.ready():
# Get the result
result = task.result
print(f"Processing complete: {result}")
else:
print(f"Task state: {task.state}")
You can also check specific states:
if task.successful():
print("Task completed successfully")
elif task.failed():
print("Task failed")
elif task.state == 'PENDING':
print("Task waiting in queue")
elif task.state == 'STARTED':
print("Task is running")
Advanced Task Workflows
Real applications often need complex task coordination. Celery provides powerful primitives for this.
Chains: Sequential Execution
Chains execute tasks one after another, passing the result of each task to the next. This is perfect for multi-step document processing.
from celery import chain
# Our document processing tasks (already defined in tasks.py)
# - detect_file_type: Determines if file is PDF or image
# - extract_text_from_pdf: Pulls text content
# - analyze_content: Extracts keywords and sentiment
# Create a processing pipeline
workflow = chain(
detect_file_type.s('/uploads/report.pdf'),
extract_text_from_pdf.s(),
analyze_content.s()
)
# Execute the chain
result = workflow.apply_async()
# Each task's output becomes the next task's input:
# 1. detect_file_type returns: {'file': 'report.pdf', 'is_pdf': True}
# 2. extract_text_from_pdf receives that dict, returns: {'text': '...', 'word_count': 1500}
# 3. analyze_content receives text data, returns: {'keywords': [...], 'sentiment': 'positive'}
Each task's output becomes the input for the next task. If any task fails, the chain stops, and you can check which step failed.
Real use cases for chains:
- Document processing pipelines (detect → extract → analyze → report)
- Video processing (upload → transcode → add watermark → generate thumbnail)
- Data ETL (extract → transform → load)
- Multi-step user onboarding (create account → send email → setup profile)
Groups: Parallel Execution
Groups execute multiple tasks simultaneously. Perfect for processing multiple files at once.
from celery import group
# Process multiple images in parallel
# (generate_thumbnail task is already defined in tasks.py)
image_paths = [
'/uploads/photo1.jpg',
'/uploads/photo2.jpg',
'/uploads/photo3.jpg',
'/uploads/photo4.jpg'
]
# All thumbnails generate at the same time
job = group(
generate_thumbnail.s(path, (300, 300))
for path in image_paths
)
result = job.apply_async()
# With 4 workers, all 4 images process simultaneously
# Total time: ~2 seconds (time for 1 image)
# Without parallel: ~8 seconds (2 seconds × 4 images)
All tasks in the group execute simultaneously across available workers. This is dramatically faster than processing files one by one.
Real use cases for groups:
- Generate thumbnails for photo albums
- Process multiple documents uploaded in bulk
- Send notification emails to thousands of users
- Fetch data from multiple APIs at once
- Run independent calculations in parallel
Performance example:
- Processing 100 images sequentially: 200 seconds (2s per image)
- With 10 workers using groups: 20 seconds (10 images at a time)
- 10x faster!
Chords: Parallel + Aggregation
Chords run tasks in parallel and then execute a callback with all results. This is perfect for batch processing where you need a combined report.
from celery import chord
# Process multiple documents in parallel, then generate a combined report
documents = [
'/uploads/report1.pdf',
'/uploads/report2.pdf',
'/uploads/report3.pdf'
]
# Each document goes through the full pipeline in parallel
# Then generate_report combines all results
workflow = chord(
chain(
detect_file_type.s(doc),
extract_text_from_pdf.s(),
analyze_content.s()
) for doc in documents
)(generate_report.s())
result = workflow.apply_async()
# What happens:
# 1. All 3 documents process in parallel through the chain
# 2. Each completes: extract text → analyze → return results
# 3. When ALL are done, generate_report receives a list of all results
# 4. Final report combines data from all documents
The generate_report task receives a list containing results from all parallel processing chains. It only runs after everything completes.
Real use cases for chords:
- Batch document processing: Process 100 contracts, generate compliance report
- Financial analysis: Fetch data from multiple markets, calculate portfolio summary
- Data aggregation: Scrape 50 websites in parallel, create combined dataset
- Multi-source reports: Get data from 10 APIs, generate unified dashboard
Example output:
{
"status": "completed",
"summary": {
"total_documents": 3,
"total_words": 4500,
"processed_at": "2025-11-28 10:30:00"
},
"details": [
{"file": "report1.pdf", "word_count": 1500, "keywords": ["revenue", "growth"]},
{"file": "report2.pdf", "word_count": 1800, "keywords": ["sales", "market"]},
{"file": "report3.pdf", "word_count": 1200, "keywords": ["profit", "margin"]}
]
}
This is the power of chords: parallel processing with intelligent result aggregation.
Combining Primitives
You can combine these primitives for complex workflows:
from celery import chain, group, chord
# Fetch data, then process multiple items in parallel, then aggregate
workflow = chain(
fetch_initial_data.s(),
group(
process_item.s(i) for i in range(10)
),
aggregate_results.s()
)
Handling Task Failures and Retries
In production, tasks will fail. Network issues, service outages, bad data—failures are inevitable. Celery has robust mechanisms to handle them.
Automatic Retries
Configure tasks to retry automatically on failure:
import requests
from requests.exceptions import RequestException
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def call_external_api(self, endpoint):
"""Call external API with automatic retries"""
try:
response = requests.get(endpoint, timeout=10)
response.raise_for_status()
return response.json()
except RequestException as exc:
# Retry the task
raise self.retry(exc=exc)
Key parameters:
bind=True: Gives access to the task instance (self)max_retries=3: Retry up to 3 timesdefault_retry_delay=60: Wait 60 seconds between retries
Exponential Backoff
For external services, use exponential backoff to avoid overwhelming them:
@celery.task(bind=True, max_retries=5)
def fetch_with_backoff(self, url):
try:
response = requests.get(url)
response.raise_for_status()
return response.json()
except RequestException as exc:
# Exponential backoff: 2, 4, 8, 16, 32 seconds
countdown = 2 ** self.request.retries
raise self.retry(exc=exc, countdown=countdown)
Each retry waits longer than the previous one, giving the external service time to recover.
Custom Retry Logic
You can retry selectively based on exception type:
@celery.task(bind=True, max_retries=3)
def process_payment(self, payment_id):
try:
result = payment_gateway.process(payment_id)
return result
except TemporaryError as exc:
# Retry on temporary errors
raise self.retry(exc=exc, countdown=30)
except PermanentError:
# Don't retry on permanent errors
log_error(f"Permanent failure for payment {payment_id}")
return {'status': 'failed'}
Task Time Limits
Prevent tasks from running forever:
@celery.task(time_limit=300, soft_time_limit=270)
def process_large_file(file_path):
"""Process file with time limits"""
# This task will be killed after 300 seconds (hard limit)
# SoftTimeLimitExceeded exception raised after 270 seconds
try:
# Process file
pass
except SoftTimeLimitExceeded:
# Cleanup and graceful exit
cleanup_resources()
raise
Error Callbacks
Execute code when tasks fail:
@celery.task
def handle_error(request, exc, traceback):
"""Called when a task fails"""
print(f"Task {request.id} failed: {exc}")
# Send alert, log to monitoring service, etc.
@celery.task(on_failure=handle_error)
def risky_task():
# Task code that might fail
pass
Monitoring Celery Workers
In production, you need visibility into what's happening with your tasks and workers.
Celery Events
Celery has a built-in event system. Start the event monitor:
celery -A app.celery events
This shows real-time task events: when tasks are sent, started, succeeded, or failed.
Flower: Web-Based Monitoring
Flower is the best tool for monitoring Celery. It provides a beautiful web interface.
Installation:
pip install flower
Start Flower:
celery -A app.celery flower
Open http://localhost:5555 in your browser. You'll see:
- Active workers and their status
- Task history and success/failure rates
- Task runtime statistics
- Queue lengths
- Worker resource usage
- Ability to restart workers and revoke tasks
Inspecting Workers
Check worker status from the command line:
# List active workers
celery -A app.celery inspect active
# See registered tasks
celery -A app.celery inspect registered
# Check worker stats
celery -A app.celery inspect stats
# See scheduled tasks
celery -A app.celery inspect scheduled
Custom Logging
Add detailed logging to your tasks:
import logging
logger = logging.getLogger(__name__)
@celery.task
def important_task(data):
logger.info(f"Starting task with data: {data}")
try:
result = process(data)
logger.info(f"Task completed successfully: {result}")
return result
except Exception as e:
logger.error(f"Task failed: {e}", exc_info=True)
raise
Scaling Celery Workers
As your application grows, you'll need to scale your workers to handle increased load.
Running Multiple Workers
The simplest way to scale is running multiple worker processes:
# Run 4 concurrent workers
celery -A app.celery worker --concurrency=4 --loglevel=info
Each worker can process multiple tasks simultaneously using threads or processes.
Worker Concurrency Models
Celery supports different concurrency models:
Processes (default):
celery -A app.celery worker --pool=prefork --concurrency=4
Good for CPU-intensive tasks. Each task runs in a separate process.
Threads:
celery -A app.celery worker --pool=threads --concurrency=10
Good for I/O-bound tasks (network requests, database queries). Lighter weight than processes.
Gevent (async):
pip install gevent
celery -A app.celery worker --pool=gevent --concurrency=100
Excellent for I/O-bound tasks. Can handle many concurrent tasks with minimal resources.
Autoscaling Workers
Workers can automatically scale based on load:
celery -A app.celery worker --autoscale=10,3 --loglevel=info
This runs between 3 and 10 worker processes depending on queue size. When the queue fills up, more workers spin up. When it's empty, workers scale down.
Multiple Machines
For serious scale, distribute workers across multiple servers:
Server 1:
celery -A app.celery worker --hostname=worker1@%h --loglevel=info
Server 2:
celery -A app.celery worker --hostname=worker2@%h --loglevel=info
All workers connect to the same broker (Redis or RabbitMQ), and tasks are distributed across all available workers.
Task Routing
Send different types of tasks to different workers:
# Configure task routes
celery.conf.update(
task_routes={
'app.send_email': {'queue': 'emails'},
'app.process_image': {'queue': 'images'},
'app.generate_report': {'queue': 'reports'},
}
)
Start workers for specific queues:
# Worker only for email tasks
celery -A app.celery worker -Q emails --hostname=email-worker
# Worker only for image tasks
celery -A app.celery worker -Q images --hostname=image-worker
This lets you scale different task types independently.
Priority Queues
Process urgent tasks first:
# High priority task
urgent_task.apply_async(priority=9)
# Normal priority
normal_task.apply_async(priority=5)
# Low priority
background_task.apply_async(priority=1)
Prefetch Multiplier
Control how many tasks each worker grabs at once:
celery -A app.celery worker --prefetch-multiplier=1
A value of 1 means workers take one task at a time. This improves load balancing but slightly reduces throughput. Higher values (2-4) improve throughput but may cause uneven distribution.
Scheduled Tasks with Celery Beat
Celery Beat is a scheduler for periodic tasks—like cron for Celery.
Setup
Install the beat scheduler:
from celery.schedules import crontab
# Configure periodic tasks
celery.conf.beat_schedule = {
'send-daily-summary': {
'task': 'app.send_daily_summary',
'schedule': crontab(hour=9, minute=0), # Every day at 9 AM
},
'cleanup-old-data': {
'task': 'app.cleanup_old_data',
'schedule': crontab(hour=2, minute=0, day_of_week=1), # Mondays at 2 AM
},
'check-server-health': {
'task': 'app.check_health',
'schedule': 60.0, # Every 60 seconds
},
}
Running Beat
Start the beat scheduler:
celery -A app.celery beat --loglevel=info
You need both workers and beat running:
# Terminal 1: Workers
celery -A app.celery worker --loglevel=info
# Terminal 2: Beat scheduler
celery -A app.celery beat --loglevel=info
Production Best Practices
1. Use a Result Backend
Store task results for later retrieval:
celery.conf.update(
result_backend='redis://localhost:6379/1',
result_expires=3600, # Results expire after 1 hour
)
2. Set Task Time Limits
Always set time limits to prevent runaway tasks:
celery.conf.update(
task_time_limit=30 * 60, # 30 minutes hard limit
task_soft_time_limit=25 * 60, # 25 minutes soft limit
)
3. Acknowledge Tasks Late
For critical tasks, acknowledge after completion:
celery.conf.update(
task_acks_late=True,
worker_prefetch_multiplier=1,
)
If a worker crashes, the task gets requeued instead of lost.
4. Monitor Queue Sizes
Watch for growing queues:
@celery.task
def monitor_queues():
from celery import current_app
inspector = current_app.control.inspect()
stats = inspector.stats()
for worker, data in stats.items():
queue_size = data.get('total', 0)
if queue_size > 1000:
alert_operations(f"Queue size critical: {queue_size}")
5. Use Connection Pools
Reuse database/API connections:
from redis import ConnectionPool
pool = ConnectionPool(host='localhost', port=6379, db=0)
@celery.task
def task_with_redis():
import redis
r = redis.Redis(connection_pool=pool)
# Use connection
6. Graceful Shutdown
Handle worker shutdown cleanly:
from celery.signals import worker_shutdown
@worker_shutdown.connect
def cleanup(**kwargs):
print("Worker shutting down, cleaning up resources")
# Close connections, save state, etc.
Real-World Example: Complete Document Processing System
Here's the complete system we built. You can find all files in the setup section above.
Running in Production Mode
For production, update docker-compose.yml to add environment variables:
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
web:
build: .
command: gunicorn -w 4 -b 0.0.0.0:5000 app:app
volumes:
- ./uploads:/app/uploads
ports:
- "5000:5000"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
- FLASK_ENV=production
depends_on:
- redis
restart: unless-stopped
worker:
build: .
command: celery -A tasks.celery worker --loglevel=info --concurrency=8 --autoscale=10,3
volumes:
- ./uploads:/app/uploads
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
restart: unless-stopped
deploy:
replicas: 2 # Run 2 worker containers
flower:
build: .
command: celery -A tasks.celery flower --port=5555 --basic_auth=admin:secure_password
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
restart: unless-stopped
volumes:
redis_data:
Add gunicorn to requirements.txt:
gunicorn==21.2.0
Advanced Features: Adding Scheduled Tasks
Add to tasks.py:
from celery.schedules import crontab
# Configure scheduled tasks
celery.conf.beat_schedule = {
'cleanup-old-files': {
'task': 'tasks.cleanup_old_uploads',
'schedule': crontab(hour=2, minute=0), # 2 AM daily
},
'generate-daily-report': {
'task': 'tasks.generate_usage_report',
'schedule': crontab(hour=9, minute=0), # 9 AM daily
},
'health-check': {
'task': 'tasks.check_system_health',
'schedule': 300.0, # Every 5 minutes
},
}
@celery.task
def cleanup_old_uploads():
"""Remove files older than 7 days"""
import time
from pathlib import Path
upload_dir = Path('uploads')
cutoff = time.time() - (7 * 24 * 60 * 60)
removed = 0
for file in upload_dir.glob('*'):
if file.stat().st_mtime < cutoff:
file.unlink()
removed += 1
logger.info(f"Cleaned up {removed} old files")
return {'removed': removed}
@celery.task
def generate_usage_report():
"""Generate daily usage statistics"""
# Aggregate task statistics
inspector = celery.control.inspect()
stats = inspector.stats()
report = {
'date': time.strftime('%Y-%m-%d'),
'active_workers': len(stats) if stats else 0,
'tasks_processed': sum(s.get('total', {}).get('tasks.total', 0)
for s in stats.values()) if stats else 0
}
logger.info(f"Daily report: {report}")
return report
Add beat service to docker-compose.yml:
beat:
build: .
command: celery -A tasks.celery beat --loglevel=info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
depends_on:
- redis
restart: unless-stopped
Testing Advanced Workflows
Create a test script test_workflows.py:
import requests
import time
BASE_URL = "http://localhost:5000"
def test_single_document():
"""Test processing a single document"""
print("\n=== Testing Single Document Processing ===")
# Upload file
with open('test_document.pdf', 'rb') as f:
response = requests.post(
f"{BASE_URL}/api/upload",
files={'file': f}
)
print(f"Upload: {response.json()}")
task_id = response.json()['task_id']
# Check status
time.sleep(2)
response = requests.get(f"{BASE_URL}/api/task/{task_id}")
print(f"Status: {response.json()}")
def test_batch_processing():
"""Test batch processing multiple documents"""
print("\n=== Testing Batch Processing ===")
files = ['doc1.pdf', 'doc2.pdf', 'doc3.pdf']
response = requests.post(
f"{BASE_URL}/api/batch-process",
json={'files': files}
)
print(f"Batch started: {response.json()}")
task_id = response.json()['task_id']
# Poll for completion
while True:
response = requests.get(f"{BASE_URL}/api/task/{task_id}")
status = response.json()
if status['ready']:
print(f"Batch complete: {status['result']}")
break
print("Still processing...")
time.sleep(3)
if __name__ == '__main__':
test_single_document()
test_batch_processing()
Run tests:
python test_workflows.py
Common Pitfalls and How to Avoid Them
1. Passing Large Objects to Tasks
Bad:
@celery.task
def process_document(huge_pdf_bytes): # Don't pass file contents!
# This serializes the entire file to JSON/pickle
process_pdf(huge_pdf_bytes)
Good:
@celery.task
def process_document(file_path): # Pass file path instead
# Worker reads file directly from disk
with open(file_path, 'rb') as f:
process_pdf(f.read())
Celery serializes task arguments. Passing large objects (images, PDFs, dataframes) makes tasks slow and wastes memory. Always pass file paths or IDs and load data inside the task.
2. Not Setting Time Limits
Always set time limits to prevent tasks from running forever:
@celery.task(time_limit=300, soft_time_limit=270) # 5 minute max
def extract_text_from_pdf(file_path):
# If this takes more than 5 minutes, Celery kills it
pass
Without time limits, a single stuck task can occupy a worker forever.
3. Ignoring Failed Tasks
Monitor and alert on failures:
@celery.task(bind=True)
def critical_task(self):
try:
# Process important document
result = process_document()
return result
except Exception as e:
# Log error, send alert
logger.error(f"Task {self.request.id} failed: {e}")
send_slack_alert(f"Critical task failed: {e}")
raise
Use error tracking services like Sentry for production systems.
4. Blocking the Worker
Never use operations without timeouts:
# Bad - could hang forever
response = requests.get(external_api)
pdf_reader = PyPDF2.PdfReader(file)
# Good - always use timeouts
response = requests.get(external_api, timeout=30)
with timeout_context(60):
pdf_reader = PyPDF2.PdfReader(file)
5. Not Using Connection Pools
Reuse database connections in tasks:
# Bad - creates new connection every time
@celery.task
def save_result(data):
db = create_database_connection() # Expensive!
db.save(data)
db.close()
# Good - use connection pool
from sqlalchemy import create_engine
engine = create_engine('postgresql://...', pool_size=10)
@celery.task
def save_result(data):
with engine.connect() as conn: # Reuses connection
conn.execute(...)
6. Not Handling File Cleanup
Clean up temporary files:
@celery.task
def process_and_cleanup(file_path):
try:
# Process file
result = process_document(file_path)
return result
finally:
# Always cleanup, even if task fails
if os.path.exists(file_path):
os.remove(file_path)
7. Forgetting to Scale Workers
One worker can't handle everything. Monitor queue sizes in Flower and scale workers when queues grow:
# Scale workers to handle load
docker-compose up --scale worker=5
Troubleshooting Common Issues
Workers Not Picking Up Tasks
Check worker status in Flower:
Open http://localhost:5555 and verify workers are connected
Verify Redis is running:
docker-compose ps
# All services should show "Up"
Check worker logs:
docker-compose logs worker
Restart everything:
docker-compose down
docker-compose up --build
Tasks Failing with Import Errors
Make sure your tasks.py is in the correct location and all dependencies are in requirements.txt.
# Rebuild images to install new dependencies
docker-compose build
docker-compose up
Redis Connection Refused
Ensure Redis container is running and healthy:
docker-compose ps redis
docker exec -it document-processor_redis_1 redis-cli ping
# Should return: PONG
File Upload Issues
Create the uploads directory and set permissions:
mkdir -p uploads
chmod 777 uploads
Slow Performance
Check worker count:
# See how many workers are running
docker-compose ps worker
Scale up workers:
docker-compose up --scale worker=4
Check system resources:
docker stats
Viewing Logs
# All logs
docker-compose logs -f
# Specific service
docker-compose logs -f worker
# Last 100 lines
docker-compose logs --tail=100 worker
Deployment Tips
Environment Variables
Create .env file for configuration:
CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/1
FLOWER_BASIC_AUTH=admin:your_secure_password
MAX_UPLOAD_SIZE=50MB
WORKER_CONCURRENCY=8
Update docker-compose.yml to use it:
services:
worker:
env_file:
- .env
Monitoring in Production
Add health checks to docker-compose.yml:
services:
redis:
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 3
worker:
healthcheck:
test: ["CMD", "celery", "-A", "tasks.celery", "inspect", "ping"]
interval: 30s
timeout: 10s
retries: 3
Backing Up Redis Data
Redis stores task results. Backup the volume:
docker run --rm \
-v document-processor_redis_data:/data \
-v $(pwd):/backup \
busybox tar czf /backup/redis-backup.tar.gz /data
Production Checklist
- [ ] Set secure Flower password
- [ ] Configure time limits for all tasks
- [ ] Enable result expiration to save memory
- [ ] Set up logging to external service
- [ ] Configure autoscaling for workers
- [ ] Add health checks
- [ ] Set up monitoring alerts
- [ ] Test failure scenarios
- [ ] Document recovery procedures
- [ ] Regular backups of Redis data
Conclusion
You just built a production-ready document processing service with Celery. Here's what you learned:
- Docker setup: Everything runs in containers—no local installation headaches
- Real-world example: Document processing service that extracts text, generates thumbnails, and creates reports
- Fundamentals: How brokers, queues, and workers coordinate
- Task workflows: Chains for sequential processing, groups for parallel execution, chords for aggregation
- Error handling: Automatic retries, exponential backoff, and time limits
- Scaling: Running multiple workers with autoscaling
- Monitoring: Flower dashboard for real-time visibility
- Scheduling: Periodic tasks with Celery Beat
- Production patterns: Connection pools, graceful shutdown, queue monitoring
What You Can Build Next
Now that you understand background tasks, here are projects you can build:
Video Processing Service
- Upload videos, generate multiple quality versions
- Extract thumbnails at specific timestamps
- Add watermarks, subtitles, or effects
- Perfect for learning FFmpeg + Celery
Data Pipeline
- Fetch data from multiple APIs in parallel
- Transform and clean data
- Load into database or generate reports
- Great for data engineering practice
Web Scraper
- Scrape multiple websites concurrently
- Extract and process content
- Store results in database
- Handle rate limits with retries
Image Processing API
- Resize, compress, and optimize images
- Apply filters and effects
- Generate multiple formats
- Batch process thousands of images
Report Generator
- Collect data from multiple sources
- Generate PDF/Excel reports
- Email results to users
- Schedule daily/weekly reports
Running Your Service
Your complete document processing service is ready. Start it with:
# Clone or create the project
mkdir document-processor && cd document-processor
# Create all the files (app.py, tasks.py, docker-compose.yml, etc.)
# from the setup section above
# Start everything
docker-compose up --build
# In another terminal, test it
curl -X POST http://localhost:5000/api/upload -F "file=@test.pdf"
# Open Flower monitoring
open http://localhost:5555
Background tasks transform how applications work. They keep your app fast and responsive while handling complex work in the background. Celery makes this easy to implement and scale—from handling 10 tasks per day to 10 million.
The document processing service you built demonstrates real-world patterns you'll use in production systems. Adapt these patterns to your needs, whether you're processing videos, generating reports, or building data pipelines.
Start simple. Add complexity as you need it. Scale when your users demand it. The patterns in this guide work at any scale.
Now go build something amazing. Your background workers are ready.
Questions? Hit me up—I'd love to see what you build with this.