title: "Building a Document Processing Pipeline" slug: "/articles/document-processing-pipeline" description: "Design and implement scalable document processing pipelines with queue management, batch processing, and failure recovery using Celery and RabbitMQ." excerpt: "Learn how to build enterprise-grade document processing pipelines that handle thousands of documents reliably. Covers queue management, distributed task execution, and failure recovery patterns." category: "Technical Guides" tags: ["Pipeline", "Celery", "RabbitMQ", "Architecture", "Scalability"] publishedAt: "2025-11-12" updatedAt: "2026-02-17" readTime: 14 featured: false author: "Dr. Ryder Stevenson" keywords: ["document processing pipeline", "Celery tasks", "distributed processing", "batch OCR", "queue management"]
Building a Document Processing Pipeline
Processing documents at scale requires more than just a good OCR engine. You need a robust pipeline that handles document ingestion, queue management, distributed processing, failure recovery, and result aggregation. This guide walks through building a production-grade document processing pipeline using Python, Celery, and RabbitMQ.
Pipeline Architecture
A well-designed document processing pipeline consists of several stages:
- Ingestion - Accept documents via API or file upload
- Validation - Check file types, sizes, and formats
- Preprocessing - Optimize images for OCR
- Processing - Extract text using OCR engines
- Post-processing - Clean and structure extracted text
- Storage - Save results to database
- Notification - Alert users of completion
Each stage runs independently, connected by message queues that provide buffering, retry logic, and failure recovery.
Technology Stack
Our pipeline uses:
- Celery - Distributed task queue
- RabbitMQ - Message broker
- Redis - Result backend and caching
- PostgreSQL - Document metadata and results
- S3/MinIO - Document storage
- FastAPI - REST API for job submission
Project Structure
document-pipeline/
├── app/
│ ├── celery_app.py # Celery configuration
│ ├── tasks/
│ │ ├── ingestion.py # Document ingestion tasks
│ │ ├── preprocessing.py # Image preprocessing
│ │ ├── ocr.py # OCR processing
│ │ ├── postprocessing.py # Text cleanup
│ │ └── storage.py # Result storage
│ ├── models/
│ │ ├── document.py # Document models
│ │ └── job.py # Job models
│ ├── services/
│ │ ├── storage.py # S3/MinIO client
│ │ └── ocr_engine.py # OCR engine wrapper
│ └── api/
│ └── routes.py # API endpoints
├── docker-compose.yml
├── requirements.txt
└── config.py
Celery Configuration
Set up Celery with proper routing and error handling:
# app/celery_app.py
from celery import Celery
from kombu import Queue, Exchange
import structlog
logger = structlog.get_logger()
# Celery configuration
celery_app = Celery(
'document_pipeline',
broker='amqp://guest:guest@rabbitmq:5672//',
backend='redis://redis:6379/0',
include=[
'app.tasks.ingestion',
'app.tasks.preprocessing',
'app.tasks.ocr',
'app.tasks.postprocessing',
'app.tasks.storage'
]
)
# Task routing
celery_app.conf.task_routes = {
'app.tasks.ingestion.*': {'queue': 'ingestion'},
'app.tasks.preprocessing.*': {'queue': 'preprocessing'},
'app.tasks.ocr.*': {'queue': 'ocr'},
'app.tasks.postprocessing.*': {'queue': 'postprocessing'},
'app.tasks.storage.*': {'queue': 'storage'},
}
# Queue configuration
celery_app.conf.task_queues = (
Queue('ingestion', Exchange('ingestion'), routing_key='ingestion'),
Queue('preprocessing', Exchange('preprocessing'), routing_key='preprocessing'),
Queue('ocr', Exchange('ocr'), routing_key='ocr'),
Queue('postprocessing', Exchange('postprocessing'), routing_key='postprocessing'),
Queue('storage', Exchange('storage'), routing_key='storage'),
)
# General configuration
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 hour hard limit
task_soft_time_limit=3300, # 55 minute soft limit
task_acks_late=True, # Acknowledge after task completion
worker_prefetch_multiplier=1, # One task at a time per worker
task_reject_on_worker_lost=True,
worker_max_tasks_per_child=1000, # Restart workers after 1000 tasks
)
@celery_app.task(bind=True)
def debug_task(self):
"""Debug task for testing."""
logger.info('Request: {0!r}'.format(self.request))
Document Models
Define your data models with proper state management:
# app/models/document.py
from sqlalchemy import Column, String, Integer, DateTime, JSON, Enum as SQLEnum
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class DocumentStatus(str, enum.Enum):
PENDING = "pending"
VALIDATING = "validating"
PREPROCESSING = "preprocessing"
PROCESSING = "processing"
POSTPROCESSING = "postprocessing"
COMPLETED = "completed"
FAILED = "failed"
class ProcessingPriority(str, enum.Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
class Document(Base):
__tablename__ = 'documents'
id = Column(String(36), primary_key=True)
filename = Column(String(255), nullable=False)
file_path = Column(String(512), nullable=False)
file_size = Column(Integer, nullable=False)
mime_type = Column(String(100))
status = Column(SQLEnum(DocumentStatus), default=DocumentStatus.PENDING)
priority = Column(SQLEnum(ProcessingPriority), default=ProcessingPriority.NORMAL)
# Processing metadata
page_count = Column(Integer)
language = Column(String(10), default='eng')
ocr_confidence = Column(Integer)
# Extracted data
extracted_text = Column(String)
metadata = Column(JSON)
# Task tracking
ingestion_task_id = Column(String(36))
preprocessing_task_id = Column(String(36))
ocr_task_id = Column(String(36))
postprocessing_task_id = Column(String(36))
# Timing
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime)
completed_at = Column(DateTime)
# Error handling
error_message = Column(String)
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=3)
Ingestion Task
Handle document upload and validation:
# app/tasks/ingestion.py
from celery import Task
from app.celery_app import celery_app
from app.models.document import Document, DocumentStatus
from app.services.storage import StorageService
import structlog
import uuid
from typing import Dict
logger = structlog.get_logger()
class CallbackTask(Task):
"""Base task with callbacks on success/failure."""
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure."""
logger.error("task_failed", task_id=task_id, error=str(exc))
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success."""
logger.info("task_completed", task_id=task_id)
@celery_app.task(base=CallbackTask, bind=True, max_retries=3)
def ingest_document(self, file_path: str, metadata: Dict) -> str:
"""
Ingest document and trigger preprocessing.
Args:
file_path: Path to uploaded document
metadata: Document metadata
Returns:
Document ID
"""
doc_id = str(uuid.uuid4())
logger.info("ingesting_document", doc_id=doc_id, file_path=file_path)
try:
storage = StorageService()
# Validate file
if not storage.validate_file(file_path):
raise ValueError(f"Invalid file: {file_path}")
# Get file info
file_info = storage.get_file_info(file_path)
# Create document record
document = Document(
id=doc_id,
filename=metadata.get('filename'),
file_path=file_path,
file_size=file_info['size'],
mime_type=file_info['mime_type'],
status=DocumentStatus.VALIDATING,
language=metadata.get('language', 'eng'),
priority=metadata.get('priority', 'normal'),
ingestion_task_id=self.request.id
)
# Save to database
session = get_db_session()
session.add(document)
session.commit()
logger.info("document_ingested", doc_id=doc_id)
# Trigger preprocessing
from app.tasks.preprocessing import preprocess_document
preprocess_document.apply_async(
args=[doc_id],
queue='preprocessing',
priority=get_priority_value(document.priority)
)
return doc_id
except Exception as e:
logger.error("ingestion_failed", doc_id=doc_id, error=str(e))
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries)
def get_priority_value(priority: str) -> int:
"""Convert priority string to numeric value."""
priority_map = {
'low': 0,
'normal': 5,
'high': 8,
'urgent': 10
}
return priority_map.get(priority, 5)
Preprocessing Task
Prepare documents for OCR:
# app/tasks/preprocessing.py
from celery import chain
from app.celery_app import celery_app
from app.models.document import Document, DocumentStatus
from app.services.storage import StorageService
import cv2
import numpy as np
import structlog
logger = structlog.get_logger()
@celery_app.task(bind=True, max_retries=3)
def preprocess_document(self, doc_id: str) -> str:
"""
Preprocess document for optimal OCR.
Args:
doc_id: Document ID
Returns:
Path to preprocessed document
"""
logger.info("preprocessing_document", doc_id=doc_id)
try:
session = get_db_session()
document = session.query(Document).filter_by(id=doc_id).first()
if not document:
raise ValueError(f"Document not found: {doc_id}")
# Update status
document.status = DocumentStatus.PREPROCESSING
document.preprocessing_task_id = self.request.id
session.commit()
storage = StorageService()
# Load image
image_data = storage.download(document.file_path)
nparr = np.frombuffer(image_data, np.uint8)
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# [Preprocessing pipeline](/articles/preprocessing-techniques)
processed = preprocess_pipeline(image)
# Save preprocessed image
_, buffer = cv2.imencode('.png', processed)
preprocessed_path = f"preprocessed/{doc_id}.png"
storage.upload(buffer.tobytes(), preprocessed_path)
logger.info("preprocessing_completed", doc_id=doc_id)
# Trigger OCR
from app.tasks.ocr import process_ocr
process_ocr.apply_async(
args=[doc_id, preprocessed_path],
queue='ocr'
)
return preprocessed_path
except Exception as e:
logger.error("preprocessing_failed", doc_id=doc_id, error=str(e))
session = get_db_session()
document = session.query(Document).filter_by(id=doc_id).first()
if document:
document.status = DocumentStatus.FAILED
document.error_message = str(e)
session.commit()
raise self.retry(exc=e, countdown=2 ** self.request.retries)
def preprocess_pipeline(image: np.ndarray) -> np.ndarray:
"""Apply preprocessing pipeline to image."""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Deskew
coords = np.column_stack(np.where(gray > 0))
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
if abs(angle) > 0.5:
h, w = gray.shape
center = (w // 2, h // 2)
M = cv2.getRotationMatrix2D(center, angle, 1.0)
gray = cv2.warpAffine(gray, M, (w, h),
flags=cv2.INTER_CUBIC,
borderMode=cv2.BORDER_REPLICATE)
# Denoise
denoised = cv2.fastNlMeansDenoising(gray, None, 10, 7, 21)
# Enhance contrast
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
enhanced = clahe.apply(denoised)
# [Binarize](/articles/image-binarization-methods)
_, binary = cv2.threshold(enhanced, 0, 255,
cv2.THRESH_BINARY + cv2.THRESH_OTSU)
return binary
OCR Processing Task
Execute OCR with error handling:
# app/tasks/ocr.py
from app.celery_app import celery_app
from app.models.document import Document, DocumentStatus
from app.services.ocr_engine import OCREngine
import structlog
logger = structlog.get_logger()
@celery_app.task(bind=True, max_retries=2)
def process_ocr(self, doc_id: str, image_path: str) -> Dict:
"""
Process document with OCR engine.
Args:
doc_id: Document ID
image_path: Path to preprocessed image
Returns:
OCR results dictionary
"""
logger.info("processing_ocr", doc_id=doc_id)
try:
session = get_db_session()
document = session.query(Document).filter_by(id=doc_id).first()
if not document:
raise ValueError(f"Document not found: {doc_id}")
# Update status
document.status = DocumentStatus.PROCESSING
document.ocr_task_id = self.request.id
session.commit()
# Initialize OCR engine
ocr_engine = OCREngine()
# Load image
storage = StorageService()
image_data = storage.download(image_path)
# Process with OCR
result = ocr_engine.process(
image_data,
language=document.language,
psm=3 # Auto page segmentation
)
# Update document
document.extracted_text = result['text']
document.ocr_confidence = int(result['confidence'])
document.metadata = result.get('metadata', {})
session.commit()
logger.info("ocr_completed",
doc_id=doc_id,
confidence=result['confidence'],
word_count=len(result['text'].split()))
# Trigger post-processing
from app.tasks.postprocessing import postprocess_text
postprocess_text.apply_async(
args=[doc_id],
queue='postprocessing'
)
return result
except Exception as e:
logger.error("ocr_failed", doc_id=doc_id, error=str(e))
session = get_db_session()
document = session.query(Document).filter_by(id=doc_id).first()
if document:
document.retry_count += 1
if document.retry_count >= document.max_retries:
document.status = DocumentStatus.FAILED
document.error_message = str(e)
session.commit()
raise self.retry(exc=e, countdown=5 * 60) # 5 minute retry delay
Post-processing Task
Clean and structure extracted text:
# app/tasks/postprocessing.py
from app.celery_app import celery_app
from app.models.document import Document, DocumentStatus
import re
import structlog
from datetime import datetime
logger = structlog.get_logger()
@celery_app.task(bind=True)
def postprocess_text(self, doc_id: str) -> Dict:
"""
Clean and structure extracted text.
Args:
doc_id: Document ID
Returns:
Processed text and metadata
"""
logger.info("postprocessing_text", doc_id=doc_id)
try:
session = get_db_session()
document = session.query(Document).filter_by(id=doc_id).first()
if not document:
raise ValueError(f"Document not found: {doc_id}")
# Update status
document.status = DocumentStatus.POSTPROCESSING
document.postprocessing_task_id = self.request.id
session.commit()
# Clean text
cleaned_text = clean_ocr_text(document.extracted_text)
# Extract entities
entities = extract_entities(cleaned_text)
# Update document
document.extracted_text = cleaned_text
document.metadata['entities'] = entities
document.status = DocumentStatus.COMPLETED
document.completed_at = datetime.utcnow()
session.commit()
logger.info("postprocessing_completed", doc_id=doc_id)
# Trigger storage/notification
from app.tasks.storage import store_results
store_results.apply_async(args=[doc_id], queue='storage')
return {
'text': cleaned_text,
'entities': entities
}
except Exception as e:
logger.error("postprocessing_failed", doc_id=doc_id, error=str(e))
raise
def clean_ocr_text(text: str) -> str:
"""Clean OCR artifacts from text."""
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
# Fix common OCR errors
text = text.replace('|', 'I')
# Remove non-printable characters
text = ''.join(char for char in text if char.isprintable() or char in '\n\t')
# Normalize line breaks
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
def extract_entities(text: str) -> Dict:
"""Extract common entities from text."""
entities = {}
# Extract dates
date_pattern = r'\b\d{1,2}[-/]\d{1,2}[-/]\d{2,4}\b'
entities['dates'] = re.findall(date_pattern, text)
# Extract emails
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
entities['emails'] = re.findall(email_pattern, text)
# Extract phone numbers
phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
entities['phones'] = re.findall(phone_pattern, text)
return entities
Docker Compose Configuration
Run the entire pipeline stack:
# docker-compose.yml
version: '3.8'
services:
api:
build: .
command: uvicorn app.api.main:app --host 0.0.0.0 --port 8000
ports:
- "8000:8000"
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
- DATABASE_URL=postgresql://postgres:postgres@db:5432/documents
depends_on:
- rabbitmq
- redis
- db
worker-ingestion:
build: .
command: celery -A app.celery_app worker -Q ingestion -c 4 --loglevel=info
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
- redis
deploy:
replicas: 2
worker-preprocessing:
build: .
command: celery -A app.celery_app worker -Q preprocessing -c 2 --loglevel=info
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
- redis
deploy:
replicas: 3
worker-ocr:
build: .
command: celery -A app.celery_app worker -Q ocr -c 1 --loglevel=info
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
- redis
deploy:
replicas: 4
worker-postprocessing:
build: .
command: celery -A app.celery_app worker -Q postprocessing -c 4 --loglevel=info
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
- redis
deploy:
replicas: 2
flower:
build: .
command: celery -A app.celery_app flower
ports:
- "5555:5555"
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
depends_on:
- rabbitmq
- redis
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
redis:
image: redis:7-alpine
ports:
- "6379:6379"
db:
image: postgres:16-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=documents
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Monitoring and Observability
Use Flower for real-time monitoring:
# Access Flower dashboard
http://localhost:5555
Monitor key metrics:
- Task success/failure rates
- Queue lengths
- Worker utilization
- Average processing time
- Memory usage per worker
Conclusion
This document processing pipeline provides enterprise-grade reliability and scalability. The combination of Celery's task management, RabbitMQ's message queuing, and proper error handling creates a system that can process millions of documents reliably.
Key architectural decisions:
- Separate queues for each processing stage
- Task-level retry logic with exponential backoff
- Worker specialization for optimal resource usage
- Comprehensive logging and monitoring
- Horizontal scalability through worker replication
The pipeline is production-ready and can handle varying loads by adjusting worker counts per queue.
References
-
Ask Solem. (2024). "Celery: Distributed Task Queue." Celery Project Documentation.
-
RabbitMQ Team. (2024). "RabbitMQ Messaging Patterns." RabbitMQ Documentation.
-
Hohpe, G., & Woolf, B. (2003). "Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions." Addison-Wesley.