background tasks and celery: the complete guide

12 min read

Why Background Tasks Matter

Picture this: a user clicks "Sign Up" on your website. Your app needs to send a welcome email, process their profile picture, create database records, and maybe notify your analytics service. If you do all of this synchronously, the user sits there waiting 5-10 seconds staring at a loading spinner. That's a terrible experience.

This is where background tasks come in. They let you respond to the user immediately while handling the heavy lifting in the background. Your app stays fast and responsive, and complex tasks get done without blocking anything.

In this post, we'll build a real document processing service using Flask, Celery, and Docker. You'll learn everything from the ground up: brokers, queues, workers, scaling, monitoring, and error handling. By the end, you'll have a production-ready system running in containers.

What We're Building

A document processing service where users can:

  • Upload PDFs and images
  • Extract text from documents automatically
  • Generate thumbnails for images
  • Analyze content for keywords
  • Process multiple documents in parallel
  • Get reports combining results from batch processing

All running in Docker containers with zero local installation needed (except Docker itself).

Prerequisites:

  • Basic Python knowledge
  • Docker installed on your machine
  • A text editor

That's it. Let's build.

What Are Background Tasks?

A background task is any operation that can happen asynchronously—meaning it doesn't need to complete immediately for the user to continue using your app.

Common examples:

  • Sending emails (welcome emails, password resets, notifications)
  • Processing images (resizing, compression, thumbnails)
  • Generating reports (PDFs, CSV exports)
  • Data aggregation (analytics, dashboard calculations)
  • Third-party API calls (payment processing, social media posts)
  • Scheduled jobs (daily backups, weekly summaries)

The key insight: if the user doesn't need the result immediately, it should probably be a background task.

The Core Components: Broker, Queue, and Workers

To understand background tasks, you need to understand three core concepts that work together:

The Queue

A queue is exactly what it sounds like—a line where tasks wait to be processed. When your application creates a task, it gets added to the queue. Tasks are processed in order (usually first-in, first-out), though you can configure priorities.

Think of it like a line at a coffee shop. Orders come in, they wait in line, and baristas process them one by one.

The Broker

The broker is the middleman that manages the queue. It receives tasks from your application, stores them safely, and delivers them to workers when they're ready to process.

Popular brokers:

  • Redis: Fast, simple, perfect for most use cases. It's an in-memory data store that's easy to set up.
  • RabbitMQ: More robust, with advanced features like message routing and guaranteed delivery. Better for large-scale systems.

The broker ensures tasks don't get lost. If your app crashes after creating a task, the broker still has it queued up safely.

The Worker

Workers are processes that continuously check the queue for new tasks and execute them. You can run multiple workers to process tasks in parallel. Workers can run on the same machine as your app or on completely separate servers.

When a worker finishes a task, it immediately checks the queue for the next one. If the queue is empty, workers wait until new tasks arrive.

Enter Celery

Celery is a distributed task queue system for Python. It handles all the complexity of managing tasks, workers, and brokers, giving you a simple API to create and execute background tasks.

Why Celery?

  • Battle-tested in production at massive scale
  • Works with Flask, Django, and standalone Python
  • Supports complex workflows (chains, groups, chords)
  • Built-in retry mechanisms and error handling
  • Powerful monitoring and scaling tools
  • Active community and excellent documentation

Setting Up Celery with Flask Using Docker

Let's build a real document processing service. Users can upload documents, and we'll extract text, generate thumbnails, analyze content, and create reports—all in the background. We'll use Docker to make setup incredibly simple.

Project Structure

Create this folder structure:

document-processor/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── app.py
├── tasks.py
└── uploads/

Docker Setup

First, create docker-compose.yml:

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  web:
    build: .
    command: python app.py
    volumes:
      - .:/app
      - ./uploads:/app/uploads
    ports:
      - "5000:5000"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis

  worker:
    build: .
    command: celery -A tasks.celery worker --loglevel=info --concurrency=4
    volumes:
      - .:/app
      - ./uploads:/app/uploads
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis

  flower:
    build: .
    command: celery -A tasks.celery flower --port=5555
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
      - worker

volumes:
  redis_data:

Create Dockerfile:

FROM python:3.11-slim

WORKDIR /app

# Install system dependencies for PDF/image processing
RUN apt-get update && apt-get install -y \
    poppler-utils \
    tesseract-ocr \
    libmagic1 \
    && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]

Create requirements.txt:

flask==3.0.0
celery==5.3.4
redis==5.0.1
flower==2.0.1
Pillow==10.1.0
PyPDF2==3.0.1
python-magic==0.4.27

Building the Document Processing Service

Create tasks.py with our Celery tasks:

from celery import Celery
from PIL import Image
import PyPDF2
import os
import time
import magic
import logging

logger = logging.getLogger(__name__)

# Initialize Celery
celery = Celery('document_processor')
celery.conf.update(
    broker_url=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
    result_backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1'),
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    enable_utc=True,
)


@celery.task(bind=True, max_retries=3)
def extract_text_from_pdf(self, file_path):
    """Extract text content from PDF"""
    try:
        logger.info(f"Extracting text from {file_path}")
        
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            text = ""
            for page in pdf_reader.pages:
                text += page.extract_text()
        
        word_count = len(text.split())
        logger.info(f"Extracted {word_count} words from {file_path}")
        
        return {
            'file': file_path,
            'text': text[:500],  # First 500 chars
            'word_count': word_count,
            'page_count': len(pdf_reader.pages)
        }
    except Exception as exc:
        logger.error(f"Failed to extract text: {exc}")
        raise self.retry(exc=exc, countdown=60)


@celery.task
def generate_thumbnail(file_path, size=(300, 300)):
    """Generate thumbnail for image files"""
    logger.info(f"Generating thumbnail for {file_path}")
    
    try:
        img = Image.open(file_path)
        img.thumbnail(size)
        
        # Save thumbnail
        base, ext = os.path.splitext(file_path)
        thumb_path = f"{base}_thumb{ext}"
        img.save(thumb_path)
        
        return {
            'original': file_path,
            'thumbnail': thumb_path,
            'size': img.size
        }
    except Exception as e:
        logger.error(f"Thumbnail generation failed: {e}")
        raise


@celery.task
def analyze_content(text_data):
    """Analyze extracted text for keywords and sentiment"""
    logger.info("Analyzing content")
    time.sleep(2)  # Simulate ML processing
    
    text = text_data.get('text', '')
    words = text.lower().split()
    
    # Simple keyword extraction (in real world, use NLP libraries)
    common_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for'}
    keywords = [w for w in set(words) if w not in common_words and len(w) > 3][:10]
    
    return {
        'keywords': keywords,
        'sentiment': 'positive' if 'good' in words else 'neutral',
        'analyzed_words': len(words)
    }


@celery.task
def generate_report(analysis_results):
    """Generate final report from all analysis"""
    logger.info("Generating final report")
    
    report = {
        'status': 'completed',
        'summary': {
            'total_documents': len(analysis_results),
            'total_words': sum(r.get('word_count', 0) for r in analysis_results),
            'processed_at': time.strftime('%Y-%m-%d %H:%M:%S')
        },
        'details': analysis_results
    }
    
    logger.info(f"Report generated for {len(analysis_results)} documents")
    return report


@celery.task
def detect_file_type(file_path):
    """Detect file type using magic numbers"""
    logger.info(f"Detecting file type for {file_path}")
    
    mime = magic.Magic(mime=True)
    file_type = mime.from_file(file_path)
    
    return {
        'file': file_path,
        'mime_type': file_type,
        'is_pdf': 'pdf' in file_type,
        'is_image': 'image' in file_type
    }

Create app.py with Flask routes:

from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
from celery import chain, group, chord
import os
from tasks import (
    celery, extract_text_from_pdf, generate_thumbnail,
    analyze_content, generate_report, detect_file_type
)

app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024  # 16MB max

os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)


@app.route('/')
def index():
    return jsonify({
        'service': 'Document Processing API',
        'status': 'running',
        'endpoints': {
            'upload': '/api/upload',
            'process': '/api/process/<file_id>',
            'batch': '/api/batch-process',
            'status': '/api/task/<task_id>'
        }
    })


@app.route('/api/upload', methods=['POST'])
def upload_file():
    """Upload a document for processing"""
    if 'file' not in request.files:
        return jsonify({'error': 'No file provided'}), 400
    
    file = request.files['file']
    if file.filename == '':
        return jsonify({'error': 'No file selected'}), 400
    
    filename = secure_filename(file.filename)
    file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    file.save(file_path)
    
    # Start processing immediately
    task = detect_file_type.delay(file_path)
    
    return jsonify({
        'message': 'File uploaded successfully',
        'file': filename,
        'task_id': task.id
    }), 201


@app.route('/api/process/<filename>', methods=['POST'])
def process_document(filename):
    """Process a single document with full pipeline"""
    file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(file_path):
        return jsonify({'error': 'File not found'}), 404
    
    # Create processing chain: detect -> extract -> analyze -> report
    workflow = chain(
        detect_file_type.s(file_path),
        extract_text_from_pdf.s(),
        analyze_content.s()
    )
    
    result = workflow.apply_async()
    
    return jsonify({
        'message': 'Processing started',
        'task_id': result.id,
        'file': filename
    })


@app.route('/api/batch-process', methods=['POST'])
def batch_process():
    """Process multiple documents in parallel and generate combined report"""
    files = request.json.get('files', [])
    
    if not files:
        return jsonify({'error': 'No files specified'}), 400
    
    # Process all files in parallel, then generate report
    workflow = chord(
        chain(
            detect_file_type.s(os.path.join(app.config['UPLOAD_FOLDER'], f)),
            extract_text_from_pdf.s(),
            analyze_content.s()
        ) for f in files
    )(generate_report.s())
    
    result = workflow.apply_async()
    
    return jsonify({
        'message': f'Batch processing started for {len(files)} files',
        'task_id': result.id,
        'files': files
    })


@app.route('/api/process-images', methods=['POST'])
def process_images():
    """Process multiple images in parallel"""
    images = request.json.get('images', [])
    
    if not images:
        return jsonify({'error': 'No images specified'}), 400
    
    # Generate thumbnails for all images in parallel
    job = group(
        generate_thumbnail.s(os.path.join(app.config['UPLOAD_FOLDER'], img))
        for img in images
    )
    
    result = job.apply_async()
    
    return jsonify({
        'message': f'Processing {len(images)} images',
        'group_id': result.id
    })


@app.route('/api/task/<task_id>')
def get_task_status(task_id):
    """Check status of any task"""
    task = celery.AsyncResult(task_id)
    
    response = {
        'task_id': task_id,
        'state': task.state,
        'ready': task.ready(),
    }
    
    if task.ready():
        if task.successful():
            response['result'] = task.result
        else:
            response['error'] = str(task.info)
    else:
        response['status'] = 'Task is still processing...'
    
    return jsonify(response)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

Running Everything with Docker

Just one command starts everything:

docker-compose up --build

This starts:

  • Redis on port 6379 (message broker)
  • Flask app on port 5000 (API)
  • Celery workers (4 concurrent workers)
  • Flower on port 5555 (monitoring dashboard)

Testing the Document Processor

Upload a PDF document:

curl -X POST http://localhost:5000/api/upload \
  -F "file=@document.pdf"

Process a single document:

curl -X POST http://localhost:5000/api/process/document.pdf

Batch process multiple documents:

curl -X POST http://localhost:5000/api/batch-process \
  -H "Content-Type: application/json" \
  -d '{"files": ["doc1.pdf", "doc2.pdf", "doc3.pdf"]}'

Check task status:

curl http://localhost:5000/api/task/YOUR_TASK_ID

Open Flower dashboard:

Visit http://localhost:5555 in your browser to see real-time task monitoring.

What's Happening Behind the Scenes

When you upload and process a document:

  1. File Upload: Flask receives the file and saves it
  2. Task Queued: The processing task is sent to Redis
  3. Worker Picks Up: A Celery worker grabs the task from the queue
  4. Chain Execution: Tasks run in sequence:
    • Detect file type
    • Extract text from PDF
    • Analyze content for keywords
  5. Result Stored: Results are saved in Redis
  6. API Response: You can check the status anytime

For batch processing, multiple documents process in parallel, then a final task combines all results into one report. This demonstrates the real power of Celery.

Understanding Task Execution Methods

Celery gives you different ways to execute tasks depending on your needs.

.delay() - Simple Execution

# Process document immediately
extract_text_from_pdf.delay('/uploads/report.pdf')

This is the simplest way. It queues the task immediately and returns.

.apply_async() - Advanced Execution

For more control, use .apply_async():

# Process document after 60 seconds (delayed processing)
extract_text_from_pdf.apply_async(
    args=['/uploads/report.pdf'],
    countdown=60
)

# Schedule processing for specific time (e.g., off-peak hours)
from datetime import datetime, timedelta
eta = datetime.utcnow() + timedelta(hours=2)
extract_text_from_pdf.apply_async(
    args=['/uploads/large_document.pdf'],
    eta=eta
)

# Set high priority for urgent documents
extract_text_from_pdf.apply_async(
    args=['/uploads/urgent.pdf'],
    priority=9
)

Getting Task Results

The /api/task/<task_id> endpoint we created shows how to retrieve results. Every task returns an AsyncResult object that you can query:

task = celery.AsyncResult(task_id)

# Check if task is finished
if task.ready():
    # Get the result
    result = task.result
    print(f"Processing complete: {result}")
else:
    print(f"Task state: {task.state}")

You can also check specific states:

if task.successful():
    print("Task completed successfully")
elif task.failed():
    print("Task failed")
elif task.state == 'PENDING':
    print("Task waiting in queue")
elif task.state == 'STARTED':
    print("Task is running")

Advanced Task Workflows

Real applications often need complex task coordination. Celery provides powerful primitives for this.

Chains: Sequential Execution

Chains execute tasks one after another, passing the result of each task to the next. This is perfect for multi-step document processing.

from celery import chain

# Our document processing tasks (already defined in tasks.py)
# - detect_file_type: Determines if file is PDF or image
# - extract_text_from_pdf: Pulls text content
# - analyze_content: Extracts keywords and sentiment

# Create a processing pipeline
workflow = chain(
    detect_file_type.s('/uploads/report.pdf'),
    extract_text_from_pdf.s(),
    analyze_content.s()
)

# Execute the chain
result = workflow.apply_async()

# Each task's output becomes the next task's input:
# 1. detect_file_type returns: {'file': 'report.pdf', 'is_pdf': True}
# 2. extract_text_from_pdf receives that dict, returns: {'text': '...', 'word_count': 1500}
# 3. analyze_content receives text data, returns: {'keywords': [...], 'sentiment': 'positive'}

Each task's output becomes the input for the next task. If any task fails, the chain stops, and you can check which step failed.

Real use cases for chains:

  • Document processing pipelines (detect → extract → analyze → report)
  • Video processing (upload → transcode → add watermark → generate thumbnail)
  • Data ETL (extract → transform → load)
  • Multi-step user onboarding (create account → send email → setup profile)

Groups: Parallel Execution

Groups execute multiple tasks simultaneously. Perfect for processing multiple files at once.

from celery import group

# Process multiple images in parallel
# (generate_thumbnail task is already defined in tasks.py)

image_paths = [
    '/uploads/photo1.jpg',
    '/uploads/photo2.jpg', 
    '/uploads/photo3.jpg',
    '/uploads/photo4.jpg'
]

# All thumbnails generate at the same time
job = group(
    generate_thumbnail.s(path, (300, 300)) 
    for path in image_paths
)

result = job.apply_async()

# With 4 workers, all 4 images process simultaneously
# Total time: ~2 seconds (time for 1 image)
# Without parallel: ~8 seconds (2 seconds × 4 images)

All tasks in the group execute simultaneously across available workers. This is dramatically faster than processing files one by one.

Real use cases for groups:

  • Generate thumbnails for photo albums
  • Process multiple documents uploaded in bulk
  • Send notification emails to thousands of users
  • Fetch data from multiple APIs at once
  • Run independent calculations in parallel

Performance example:

  • Processing 100 images sequentially: 200 seconds (2s per image)
  • With 10 workers using groups: 20 seconds (10 images at a time)
  • 10x faster!

Chords: Parallel + Aggregation

Chords run tasks in parallel and then execute a callback with all results. This is perfect for batch processing where you need a combined report.

from celery import chord

# Process multiple documents in parallel, then generate a combined report
documents = [
    '/uploads/report1.pdf',
    '/uploads/report2.pdf',
    '/uploads/report3.pdf'
]

# Each document goes through the full pipeline in parallel
# Then generate_report combines all results
workflow = chord(
    chain(
        detect_file_type.s(doc),
        extract_text_from_pdf.s(),
        analyze_content.s()
    ) for doc in documents
)(generate_report.s())

result = workflow.apply_async()

# What happens:
# 1. All 3 documents process in parallel through the chain
# 2. Each completes: extract text → analyze → return results
# 3. When ALL are done, generate_report receives a list of all results
# 4. Final report combines data from all documents

The generate_report task receives a list containing results from all parallel processing chains. It only runs after everything completes.

Real use cases for chords:

  • Batch document processing: Process 100 contracts, generate compliance report
  • Financial analysis: Fetch data from multiple markets, calculate portfolio summary
  • Data aggregation: Scrape 50 websites in parallel, create combined dataset
  • Multi-source reports: Get data from 10 APIs, generate unified dashboard

Example output:

{
  "status": "completed",
  "summary": {
    "total_documents": 3,
    "total_words": 4500,
    "processed_at": "2025-11-28 10:30:00"
  },
  "details": [
    {"file": "report1.pdf", "word_count": 1500, "keywords": ["revenue", "growth"]},
    {"file": "report2.pdf", "word_count": 1800, "keywords": ["sales", "market"]},
    {"file": "report3.pdf", "word_count": 1200, "keywords": ["profit", "margin"]}
  ]
}

This is the power of chords: parallel processing with intelligent result aggregation.

Combining Primitives

You can combine these primitives for complex workflows:

from celery import chain, group, chord

# Fetch data, then process multiple items in parallel, then aggregate
workflow = chain(
    fetch_initial_data.s(),
    group(
        process_item.s(i) for i in range(10)
    ),
    aggregate_results.s()
)

Handling Task Failures and Retries

In production, tasks will fail. Network issues, service outages, bad data—failures are inevitable. Celery has robust mechanisms to handle them.

Automatic Retries

Configure tasks to retry automatically on failure:

import requests
from requests.exceptions import RequestException

@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def call_external_api(self, endpoint):
    """Call external API with automatic retries"""
    try:
        response = requests.get(endpoint, timeout=10)
        response.raise_for_status()
        return response.json()
    except RequestException as exc:
        # Retry the task
        raise self.retry(exc=exc)

Key parameters:

  • bind=True: Gives access to the task instance (self)
  • max_retries=3: Retry up to 3 times
  • default_retry_delay=60: Wait 60 seconds between retries

Exponential Backoff

For external services, use exponential backoff to avoid overwhelming them:

@celery.task(bind=True, max_retries=5)
def fetch_with_backoff(self, url):
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except RequestException as exc:
        # Exponential backoff: 2, 4, 8, 16, 32 seconds
        countdown = 2 ** self.request.retries
        raise self.retry(exc=exc, countdown=countdown)

Each retry waits longer than the previous one, giving the external service time to recover.

Custom Retry Logic

You can retry selectively based on exception type:

@celery.task(bind=True, max_retries=3)
def process_payment(self, payment_id):
    try:
        result = payment_gateway.process(payment_id)
        return result
    except TemporaryError as exc:
        # Retry on temporary errors
        raise self.retry(exc=exc, countdown=30)
    except PermanentError:
        # Don't retry on permanent errors
        log_error(f"Permanent failure for payment {payment_id}")
        return {'status': 'failed'}

Task Time Limits

Prevent tasks from running forever:

@celery.task(time_limit=300, soft_time_limit=270)
def process_large_file(file_path):
    """Process file with time limits"""
    # This task will be killed after 300 seconds (hard limit)
    # SoftTimeLimitExceeded exception raised after 270 seconds
    try:
        # Process file
        pass
    except SoftTimeLimitExceeded:
        # Cleanup and graceful exit
        cleanup_resources()
        raise

Error Callbacks

Execute code when tasks fail:

@celery.task
def handle_error(request, exc, traceback):
    """Called when a task fails"""
    print(f"Task {request.id} failed: {exc}")
    # Send alert, log to monitoring service, etc.


@celery.task(on_failure=handle_error)
def risky_task():
    # Task code that might fail
    pass

Monitoring Celery Workers

In production, you need visibility into what's happening with your tasks and workers.

Celery Events

Celery has a built-in event system. Start the event monitor:

celery -A app.celery events

This shows real-time task events: when tasks are sent, started, succeeded, or failed.

Flower: Web-Based Monitoring

Flower is the best tool for monitoring Celery. It provides a beautiful web interface.

Installation:

pip install flower

Start Flower:

celery -A app.celery flower

Open http://localhost:5555 in your browser. You'll see:

  • Active workers and their status
  • Task history and success/failure rates
  • Task runtime statistics
  • Queue lengths
  • Worker resource usage
  • Ability to restart workers and revoke tasks

Inspecting Workers

Check worker status from the command line:

# List active workers
celery -A app.celery inspect active

# See registered tasks
celery -A app.celery inspect registered

# Check worker stats
celery -A app.celery inspect stats

# See scheduled tasks
celery -A app.celery inspect scheduled

Custom Logging

Add detailed logging to your tasks:

import logging

logger = logging.getLogger(__name__)

@celery.task
def important_task(data):
    logger.info(f"Starting task with data: {data}")
    
    try:
        result = process(data)
        logger.info(f"Task completed successfully: {result}")
        return result
    except Exception as e:
        logger.error(f"Task failed: {e}", exc_info=True)
        raise

Scaling Celery Workers

As your application grows, you'll need to scale your workers to handle increased load.

Running Multiple Workers

The simplest way to scale is running multiple worker processes:

# Run 4 concurrent workers
celery -A app.celery worker --concurrency=4 --loglevel=info

Each worker can process multiple tasks simultaneously using threads or processes.

Worker Concurrency Models

Celery supports different concurrency models:

Processes (default):

celery -A app.celery worker --pool=prefork --concurrency=4

Good for CPU-intensive tasks. Each task runs in a separate process.

Threads:

celery -A app.celery worker --pool=threads --concurrency=10

Good for I/O-bound tasks (network requests, database queries). Lighter weight than processes.

Gevent (async):

pip install gevent
celery -A app.celery worker --pool=gevent --concurrency=100

Excellent for I/O-bound tasks. Can handle many concurrent tasks with minimal resources.

Autoscaling Workers

Workers can automatically scale based on load:

celery -A app.celery worker --autoscale=10,3 --loglevel=info

This runs between 3 and 10 worker processes depending on queue size. When the queue fills up, more workers spin up. When it's empty, workers scale down.

Multiple Machines

For serious scale, distribute workers across multiple servers:

Server 1:

celery -A app.celery worker --hostname=worker1@%h --loglevel=info

Server 2:

celery -A app.celery worker --hostname=worker2@%h --loglevel=info

All workers connect to the same broker (Redis or RabbitMQ), and tasks are distributed across all available workers.

Task Routing

Send different types of tasks to different workers:

# Configure task routes
celery.conf.update(
    task_routes={
        'app.send_email': {'queue': 'emails'},
        'app.process_image': {'queue': 'images'},
        'app.generate_report': {'queue': 'reports'},
    }
)

Start workers for specific queues:

# Worker only for email tasks
celery -A app.celery worker -Q emails --hostname=email-worker

# Worker only for image tasks  
celery -A app.celery worker -Q images --hostname=image-worker

This lets you scale different task types independently.

Priority Queues

Process urgent tasks first:

# High priority task
urgent_task.apply_async(priority=9)

# Normal priority
normal_task.apply_async(priority=5)

# Low priority
background_task.apply_async(priority=1)

Prefetch Multiplier

Control how many tasks each worker grabs at once:

celery -A app.celery worker --prefetch-multiplier=1

A value of 1 means workers take one task at a time. This improves load balancing but slightly reduces throughput. Higher values (2-4) improve throughput but may cause uneven distribution.

Scheduled Tasks with Celery Beat

Celery Beat is a scheduler for periodic tasks—like cron for Celery.

Setup

Install the beat scheduler:

from celery.schedules import crontab

# Configure periodic tasks
celery.conf.beat_schedule = {
    'send-daily-summary': {
        'task': 'app.send_daily_summary',
        'schedule': crontab(hour=9, minute=0),  # Every day at 9 AM
    },
    'cleanup-old-data': {
        'task': 'app.cleanup_old_data',
        'schedule': crontab(hour=2, minute=0, day_of_week=1),  # Mondays at 2 AM
    },
    'check-server-health': {
        'task': 'app.check_health',
        'schedule': 60.0,  # Every 60 seconds
    },
}

Running Beat

Start the beat scheduler:

celery -A app.celery beat --loglevel=info

You need both workers and beat running:

# Terminal 1: Workers
celery -A app.celery worker --loglevel=info

# Terminal 2: Beat scheduler
celery -A app.celery beat --loglevel=info

Production Best Practices

1. Use a Result Backend

Store task results for later retrieval:

celery.conf.update(
    result_backend='redis://localhost:6379/1',
    result_expires=3600,  # Results expire after 1 hour
)

2. Set Task Time Limits

Always set time limits to prevent runaway tasks:

celery.conf.update(
    task_time_limit=30 * 60,  # 30 minutes hard limit
    task_soft_time_limit=25 * 60,  # 25 minutes soft limit
)

3. Acknowledge Tasks Late

For critical tasks, acknowledge after completion:

celery.conf.update(
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

If a worker crashes, the task gets requeued instead of lost.

4. Monitor Queue Sizes

Watch for growing queues:

@celery.task
def monitor_queues():
    from celery import current_app
    inspector = current_app.control.inspect()
    
    stats = inspector.stats()
    for worker, data in stats.items():
        queue_size = data.get('total', 0)
        if queue_size > 1000:
            alert_operations(f"Queue size critical: {queue_size}")

5. Use Connection Pools

Reuse database/API connections:

from redis import ConnectionPool

pool = ConnectionPool(host='localhost', port=6379, db=0)

@celery.task
def task_with_redis():
    import redis
    r = redis.Redis(connection_pool=pool)
    # Use connection

6. Graceful Shutdown

Handle worker shutdown cleanly:

from celery.signals import worker_shutdown

@worker_shutdown.connect
def cleanup(**kwargs):
    print("Worker shutting down, cleaning up resources")
    # Close connections, save state, etc.

Real-World Example: Complete Document Processing System

Here's the complete system we built. You can find all files in the setup section above.

Running in Production Mode

For production, update docker-compose.yml to add environment variables:

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    restart: unless-stopped

  web:
    build: .
    command: gunicorn -w 4 -b 0.0.0.0:5000 app:app
    volumes:
      - ./uploads:/app/uploads
    ports:
      - "5000:5000"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
      - FLASK_ENV=production
    depends_on:
      - redis
    restart: unless-stopped

  worker:
    build: .
    command: celery -A tasks.celery worker --loglevel=info --concurrency=8 --autoscale=10,3
    volumes:
      - ./uploads:/app/uploads
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
    restart: unless-stopped
    deploy:
      replicas: 2  # Run 2 worker containers

  flower:
    build: .
    command: celery -A tasks.celery flower --port=5555 --basic_auth=admin:secure_password
    ports:
      - "5555:5555"
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
    restart: unless-stopped

volumes:
  redis_data:

Add gunicorn to requirements.txt:

gunicorn==21.2.0

Advanced Features: Adding Scheduled Tasks

Add to tasks.py:

from celery.schedules import crontab

# Configure scheduled tasks
celery.conf.beat_schedule = {
    'cleanup-old-files': {
        'task': 'tasks.cleanup_old_uploads',
        'schedule': crontab(hour=2, minute=0),  # 2 AM daily
    },
    'generate-daily-report': {
        'task': 'tasks.generate_usage_report',
        'schedule': crontab(hour=9, minute=0),  # 9 AM daily
    },
    'health-check': {
        'task': 'tasks.check_system_health',
        'schedule': 300.0,  # Every 5 minutes
    },
}


@celery.task
def cleanup_old_uploads():
    """Remove files older than 7 days"""
    import time
    from pathlib import Path
    
    upload_dir = Path('uploads')
    cutoff = time.time() - (7 * 24 * 60 * 60)
    
    removed = 0
    for file in upload_dir.glob('*'):
        if file.stat().st_mtime < cutoff:
            file.unlink()
            removed += 1
    
    logger.info(f"Cleaned up {removed} old files")
    return {'removed': removed}


@celery.task
def generate_usage_report():
    """Generate daily usage statistics"""
    # Aggregate task statistics
    inspector = celery.control.inspect()
    stats = inspector.stats()
    
    report = {
        'date': time.strftime('%Y-%m-%d'),
        'active_workers': len(stats) if stats else 0,
        'tasks_processed': sum(s.get('total', {}).get('tasks.total', 0) 
                              for s in stats.values()) if stats else 0
    }
    
    logger.info(f"Daily report: {report}")
    return report

Add beat service to docker-compose.yml:

  beat:
    build: .
    command: celery -A tasks.celery beat --loglevel=info
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    depends_on:
      - redis
    restart: unless-stopped

Testing Advanced Workflows

Create a test script test_workflows.py:

import requests
import time

BASE_URL = "http://localhost:5000"

def test_single_document():
    """Test processing a single document"""
    print("\n=== Testing Single Document Processing ===")
    
    # Upload file
    with open('test_document.pdf', 'rb') as f:
        response = requests.post(
            f"{BASE_URL}/api/upload",
            files={'file': f}
        )
    
    print(f"Upload: {response.json()}")
    task_id = response.json()['task_id']
    
    # Check status
    time.sleep(2)
    response = requests.get(f"{BASE_URL}/api/task/{task_id}")
    print(f"Status: {response.json()}")


def test_batch_processing():
    """Test batch processing multiple documents"""
    print("\n=== Testing Batch Processing ===")
    
    files = ['doc1.pdf', 'doc2.pdf', 'doc3.pdf']
    response = requests.post(
        f"{BASE_URL}/api/batch-process",
        json={'files': files}
    )
    
    print(f"Batch started: {response.json()}")
    task_id = response.json()['task_id']
    
    # Poll for completion
    while True:
        response = requests.get(f"{BASE_URL}/api/task/{task_id}")
        status = response.json()
        
        if status['ready']:
            print(f"Batch complete: {status['result']}")
            break
        
        print("Still processing...")
        time.sleep(3)


if __name__ == '__main__':
    test_single_document()
    test_batch_processing()

Run tests:

python test_workflows.py

Common Pitfalls and How to Avoid Them

1. Passing Large Objects to Tasks

Bad:

@celery.task
def process_document(huge_pdf_bytes):  # Don't pass file contents!
    # This serializes the entire file to JSON/pickle
    process_pdf(huge_pdf_bytes)

Good:

@celery.task
def process_document(file_path):  # Pass file path instead
    # Worker reads file directly from disk
    with open(file_path, 'rb') as f:
        process_pdf(f.read())

Celery serializes task arguments. Passing large objects (images, PDFs, dataframes) makes tasks slow and wastes memory. Always pass file paths or IDs and load data inside the task.

2. Not Setting Time Limits

Always set time limits to prevent tasks from running forever:

@celery.task(time_limit=300, soft_time_limit=270)  # 5 minute max
def extract_text_from_pdf(file_path):
    # If this takes more than 5 minutes, Celery kills it
    pass

Without time limits, a single stuck task can occupy a worker forever.

3. Ignoring Failed Tasks

Monitor and alert on failures:

@celery.task(bind=True)
def critical_task(self):
    try:
        # Process important document
        result = process_document()
        return result
    except Exception as e:
        # Log error, send alert
        logger.error(f"Task {self.request.id} failed: {e}")
        send_slack_alert(f"Critical task failed: {e}")
        raise

Use error tracking services like Sentry for production systems.

4. Blocking the Worker

Never use operations without timeouts:

# Bad - could hang forever
response = requests.get(external_api)
pdf_reader = PyPDF2.PdfReader(file)

# Good - always use timeouts
response = requests.get(external_api, timeout=30)
with timeout_context(60):
    pdf_reader = PyPDF2.PdfReader(file)

5. Not Using Connection Pools

Reuse database connections in tasks:

# Bad - creates new connection every time
@celery.task
def save_result(data):
    db = create_database_connection()  # Expensive!
    db.save(data)
    db.close()

# Good - use connection pool
from sqlalchemy import create_engine
engine = create_engine('postgresql://...', pool_size=10)

@celery.task
def save_result(data):
    with engine.connect() as conn:  # Reuses connection
        conn.execute(...)

6. Not Handling File Cleanup

Clean up temporary files:

@celery.task
def process_and_cleanup(file_path):
    try:
        # Process file
        result = process_document(file_path)
        return result
    finally:
        # Always cleanup, even if task fails
        if os.path.exists(file_path):
            os.remove(file_path)

7. Forgetting to Scale Workers

One worker can't handle everything. Monitor queue sizes in Flower and scale workers when queues grow:

# Scale workers to handle load
docker-compose up --scale worker=5

Troubleshooting Common Issues

Workers Not Picking Up Tasks

Check worker status in Flower:

Open http://localhost:5555 and verify workers are connected

Verify Redis is running:

docker-compose ps
# All services should show "Up"

Check worker logs:

docker-compose logs worker

Restart everything:

docker-compose down
docker-compose up --build

Tasks Failing with Import Errors

Make sure your tasks.py is in the correct location and all dependencies are in requirements.txt.

# Rebuild images to install new dependencies
docker-compose build
docker-compose up

Redis Connection Refused

Ensure Redis container is running and healthy:

docker-compose ps redis
docker exec -it document-processor_redis_1 redis-cli ping
# Should return: PONG

File Upload Issues

Create the uploads directory and set permissions:

mkdir -p uploads
chmod 777 uploads

Slow Performance

Check worker count:

# See how many workers are running
docker-compose ps worker

Scale up workers:

docker-compose up --scale worker=4

Check system resources:

docker stats

Viewing Logs

# All logs
docker-compose logs -f

# Specific service
docker-compose logs -f worker

# Last 100 lines
docker-compose logs --tail=100 worker

Deployment Tips

Environment Variables

Create .env file for configuration:

CELERY_BROKER_URL=redis://redis:6379/0
CELERY_RESULT_BACKEND=redis://redis:6379/1
FLOWER_BASIC_AUTH=admin:your_secure_password
MAX_UPLOAD_SIZE=50MB
WORKER_CONCURRENCY=8

Update docker-compose.yml to use it:

services:
  worker:
    env_file:
      - .env

Monitoring in Production

Add health checks to docker-compose.yml:

services:
  redis:
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 3s
      retries: 3
  
  worker:
    healthcheck:
      test: ["CMD", "celery", "-A", "tasks.celery", "inspect", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

Backing Up Redis Data

Redis stores task results. Backup the volume:

docker run --rm \
  -v document-processor_redis_data:/data \
  -v $(pwd):/backup \
  busybox tar czf /backup/redis-backup.tar.gz /data

Production Checklist

  • [ ] Set secure Flower password
  • [ ] Configure time limits for all tasks
  • [ ] Enable result expiration to save memory
  • [ ] Set up logging to external service
  • [ ] Configure autoscaling for workers
  • [ ] Add health checks
  • [ ] Set up monitoring alerts
  • [ ] Test failure scenarios
  • [ ] Document recovery procedures
  • [ ] Regular backups of Redis data

Conclusion

You just built a production-ready document processing service with Celery. Here's what you learned:

  • Docker setup: Everything runs in containers—no local installation headaches
  • Real-world example: Document processing service that extracts text, generates thumbnails, and creates reports
  • Fundamentals: How brokers, queues, and workers coordinate
  • Task workflows: Chains for sequential processing, groups for parallel execution, chords for aggregation
  • Error handling: Automatic retries, exponential backoff, and time limits
  • Scaling: Running multiple workers with autoscaling
  • Monitoring: Flower dashboard for real-time visibility
  • Scheduling: Periodic tasks with Celery Beat
  • Production patterns: Connection pools, graceful shutdown, queue monitoring

What You Can Build Next

Now that you understand background tasks, here are projects you can build:

Video Processing Service

  • Upload videos, generate multiple quality versions
  • Extract thumbnails at specific timestamps
  • Add watermarks, subtitles, or effects
  • Perfect for learning FFmpeg + Celery

Data Pipeline

  • Fetch data from multiple APIs in parallel
  • Transform and clean data
  • Load into database or generate reports
  • Great for data engineering practice

Web Scraper

  • Scrape multiple websites concurrently
  • Extract and process content
  • Store results in database
  • Handle rate limits with retries

Image Processing API

  • Resize, compress, and optimize images
  • Apply filters and effects
  • Generate multiple formats
  • Batch process thousands of images

Report Generator

  • Collect data from multiple sources
  • Generate PDF/Excel reports
  • Email results to users
  • Schedule daily/weekly reports

Running Your Service

Your complete document processing service is ready. Start it with:

# Clone or create the project
mkdir document-processor && cd document-processor

# Create all the files (app.py, tasks.py, docker-compose.yml, etc.)
# from the setup section above

# Start everything
docker-compose up --build

# In another terminal, test it
curl -X POST http://localhost:5000/api/upload -F "file=@test.pdf"

# Open Flower monitoring
open http://localhost:5555

Background tasks transform how applications work. They keep your app fast and responsive while handling complex work in the background. Celery makes this easy to implement and scale—from handling 10 tasks per day to 10 million.

The document processing service you built demonstrates real-world patterns you'll use in production systems. Adapt these patterns to your needs, whether you're processing videos, generating reports, or building data pipelines.

Start simple. Add complexity as you need it. Scale when your users demand it. The patterns in this guide work at any scale.

Now go build something amazing. Your background workers are ready.


Questions? Hit me up—I'd love to see what you build with this.


thanks for reading.share on x