title: "Batch Processing: Scaling OCR to Thousands of Documents" slug: "/articles/batch-processing-scaling-ocr" description: "Scale OCR to process thousands of documents efficiently with batch processing, parallel execution, resource optimization, and distributed computing strategies." excerpt: "Master batch OCR processing at scale. Learn strategies for parallel execution, memory management, cost optimization, and distributed processing that handle millions of documents." category: "Technical Guides" tags: ["Batch Processing", "Scalability", "Performance", "Distributed Systems", "Optimization"] publishedAt: "2025-11-12" updatedAt: "2026-02-17" readTime: 15 featured: false author: "Dr. Ryder Stevenson" keywords: ["batch OCR", "parallel processing", "distributed OCR", "scale OCR", "high volume document processing"]
Batch Processing: Scaling OCR to Thousands of Documents
Processing a single document with OCR is straightforward. Processing thousands—or millions—of documents efficiently requires sophisticated batch processing strategies. This guide covers the architecture, implementation patterns, and optimization techniques for large-scale OCR batch processing.
Batch Processing Architecture
Effective batch OCR systems separate concerns into distinct stages:
- Job Scheduling - Queue management and prioritization
- Resource Allocation - CPU, memory, and GPU distribution
- Parallel Processing - Concurrent document processing
- Result Aggregation - Collecting and storing results
- Error Handling - Retry logic and failure recovery
- Monitoring - Progress tracking and accuracy validation
Batch Job Models
Define your batch job structure:
# app/models/batch_job.py
from sqlalchemy import Column, String, Integer, DateTime, JSON, Float, Enum
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import enum
Base = declarative_base()
class BatchStatus(str, enum.Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
PAUSED = "paused"
class ProcessingMode(str, enum.Enum):
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
DISTRIBUTED = "distributed"
class BatchJob(Base):
__tablename__ = 'batch_jobs'
id = Column(String(36), primary_key=True)
name = Column(String(255), nullable=False)
description = Column(String)
# Configuration
mode = Column(Enum(ProcessingMode), default=ProcessingMode.PARALLEL)
max_workers = Column(Integer, default=4)
batch_size = Column(Integer, default=100)
timeout_per_document = Column(Integer, default=300) # seconds
# Progress tracking
total_documents = Column(Integer, nullable=False)
processed_documents = Column(Integer, default=0)
successful_documents = Column(Integer, default=0)
failed_documents = Column(Integer, default=0)
# Status
status = Column(Enum(BatchStatus), default=BatchStatus.PENDING)
progress_percentage = Column(Float, default=0.0)
# Timing
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime)
completed_at = Column(DateTime)
estimated_completion = Column(DateTime)
# Resource usage
cpu_usage_avg = Column(Float)
memory_usage_avg = Column(Float)
total_cost = Column(Float)
# Configuration
ocr_config = Column(JSON) # Language, PSM, etc.
preprocessing_config = Column(JSON)
# Results
output_path = Column(String)
error_log_path = Column(String)
summary = Column(JSON)
class DocumentTask(Base):
__tablename__ = 'document_tasks'
id = Column(String(36), primary_key=True)
batch_job_id = Column(String(36), nullable=False)
file_path = Column(String(512), nullable=False)
file_size = Column(Integer)
status = Column(String(20))
retry_count = Column(Integer, default=0)
max_retries = Column(Integer, default=3)
started_at = Column(DateTime)
completed_at = Column(DateTime)
processing_time = Column(Float)
result_text = Column(String)
confidence = Column(Float)
error_message = Column(String)
worker_id = Column(String(50))
Batch Processing Engine
Implement the core batch processing engine:
# app/services/batch_processor.py
import asyncio
from typing import List, Dict, Optional, Callable
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import multiprocessing as mp
from datetime import datetime, timedelta
import structlog
from tqdm import tqdm
logger = structlog.get_logger()
class BatchProcessor:
"""High-performance batch OCR processor."""
def __init__(self,
max_workers: Optional[int] = None,
mode: str = "parallel",
use_gpu: bool = False):
"""
Initialize batch processor.
Args:
max_workers: Maximum parallel workers (default: CPU count)
mode: Processing mode (sequential, parallel, distributed)
use_gpu: Whether to use GPU acceleration
"""
self.max_workers = max_workers or mp.cpu_count()
self.mode = mode
self.use_gpu = use_gpu
# Initialize executor based on mode
if mode == "parallel":
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
else:
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
self.stats = {
'total_processed': 0,
'total_failed': 0,
'total_time': 0.0,
'avg_time_per_doc': 0.0
}
async def process_batch(self,
batch_job_id: str,
document_paths: List[str],
process_func: Callable,
batch_size: int = 100,
progress_callback: Optional[Callable] = None) -> Dict:
"""
Process batch of documents.
Args:
batch_job_id: Batch job identifier
document_paths: List of document file paths
process_func: Function to process each document
batch_size: Documents per batch
progress_callback: Optional callback for progress updates
Returns:
Batch processing results
"""
start_time = datetime.utcnow()
total_docs = len(document_paths)
logger.info("batch_started",
batch_id=batch_job_id,
total_docs=total_docs,
workers=self.max_workers)
results = []
errors = []
# Process in batches
for batch_num, i in enumerate(range(0, total_docs, batch_size)):
batch = document_paths[i:i + batch_size]
batch_start = datetime.utcnow()
logger.info("processing_batch",
batch_num=batch_num,
batch_size=len(batch))
# Process batch based on mode
if self.mode == "sequential":
batch_results = await self._process_sequential(batch, process_func)
elif self.mode == "parallel":
batch_results = await self._process_parallel(batch, process_func)
else: # distributed
batch_results = await self._process_distributed(batch, process_func)
# Separate successes and failures
for result in batch_results:
if result.get('error'):
errors.append(result)
else:
results.append(result)
# Update progress
processed_count = i + len(batch)
progress = (processed_count / total_docs) * 100
if progress_callback:
await progress_callback(batch_job_id, progress, processed_count)
# Calculate ETA
elapsed = (datetime.utcnow() - start_time).total_seconds()
avg_time = elapsed / processed_count
remaining_docs = total_docs - processed_count
eta = timedelta(seconds=avg_time * remaining_docs)
logger.info("batch_progress",
progress=f"{progress:.1f}%",
processed=processed_count,
total=total_docs,
eta=str(eta))
# Calculate final stats
total_time = (datetime.utcnow() - start_time).total_seconds()
avg_time_per_doc = total_time / total_docs if total_docs > 0 else 0
summary = {
'batch_job_id': batch_job_id,
'total_documents': total_docs,
'successful': len(results),
'failed': len(errors),
'success_rate': (len(results) / total_docs * 100) if total_docs > 0 else 0,
'total_time_seconds': total_time,
'avg_time_per_document': avg_time_per_doc,
'throughput_docs_per_second': total_docs / total_time if total_time > 0 else 0,
'results': results,
'errors': errors
}
logger.info("batch_completed",
batch_id=batch_job_id,
successful=len(results),
failed=len(errors),
total_time=total_time)
return summary
async def _process_sequential(self,
documents: List[str],
process_func: Callable) -> List[Dict]:
"""Process documents sequentially."""
results = []
for doc_path in tqdm(documents, desc="Processing"):
try:
result = await process_func(doc_path)
results.append(result)
except Exception as e:
logger.error("document_failed",
path=doc_path,
error=str(e))
results.append({
'path': doc_path,
'error': str(e)
})
return results
async def _process_parallel(self,
documents: List[str],
process_func: Callable) -> List[Dict]:
"""Process documents in parallel."""
loop = asyncio.get_event_loop()
# Create tasks
tasks = []
for doc_path in documents:
task = loop.run_in_executor(
self.executor,
self._process_document_safe,
doc_path,
process_func
)
tasks.append(task)
# Wait for all tasks
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
'path': documents[i],
'error': str(result)
})
else:
processed_results.append(result)
return processed_results
def _process_document_safe(self,
doc_path: str,
process_func: Callable) -> Dict:
"""Safely process document with error handling."""
try:
return process_func(doc_path)
except Exception as e:
logger.error("document_processing_error",
path=doc_path,
error=str(e))
return {
'path': doc_path,
'error': str(e)
}
async def _process_distributed(self,
documents: List[str],
process_func: Callable) -> List[Dict]:
"""Process documents using distributed workers (Celery/Ray)."""
# Placeholder for distributed processing
# In production, integrate with Celery, Ray, or Dask
logger.info("distributed_processing_requested")
return await self._process_parallel(documents, process_func)
def shutdown(self):
"""Shutdown executor."""
self.executor.shutdown(wait=True)
Memory-Efficient Processing
Handle large batches without running out of memory:
# app/services/memory_efficient_processor.py
import gc
import psutil
import structlog
from typing import Iterator, List
import os
logger = structlog.get_logger()
class MemoryEfficientProcessor:
"""Process documents with memory constraints."""
def __init__(self, memory_limit_gb: float = 4.0):
"""
Initialize with memory limit.
Args:
memory_limit_gb: Maximum memory usage in GB
"""
self.memory_limit_bytes = memory_limit_gb * 1024 * 1024 * 1024
self.process = psutil.Process(os.getpid())
def chunk_documents(self,
document_paths: List[str],
chunk_size: int = 100) -> Iterator[List[str]]:
"""
Yield document chunks based on memory availability.
Args:
document_paths: List of document paths
chunk_size: Initial chunk size
Yields:
Chunks of document paths
"""
current_chunk = []
for doc_path in document_paths:
current_chunk.append(doc_path)
# Check memory usage
memory_info = self.process.memory_info()
current_memory = memory_info.rss
if len(current_chunk) >= chunk_size or \
current_memory > (self.memory_limit_bytes * 0.8):
logger.info("yielding_chunk",
chunk_size=len(current_chunk),
memory_mb=current_memory / 1024 / 1024)
yield current_chunk
# Clear chunk and force garbage collection
current_chunk = []
gc.collect()
# Yield remaining documents
if current_chunk:
yield current_chunk
def get_memory_usage(self) -> Dict:
"""Get current memory usage statistics."""
memory_info = self.process.memory_info()
return {
'rss_mb': memory_info.rss / 1024 / 1024,
'vms_mb': memory_info.vms / 1024 / 1024,
'percent': self.process.memory_percent(),
'available_mb': psutil.virtual_memory().available / 1024 / 1024
}
def check_memory_available(self, required_mb: float = 500) -> bool:
"""Check if sufficient memory is available."""
available = psutil.virtual_memory().available / 1024 / 1024
return available >= required_mb
Dynamic Resource Allocation
Adjust worker count based on system resources:
# app/services/resource_allocator.py
import psutil
import structlog
from typing import Dict
logger = structlog.get_logger()
class ResourceAllocator:
"""Dynamically allocate processing resources."""
def __init__(self):
self.cpu_count = psutil.cpu_count()
self.total_memory = psutil.virtual_memory().total
def calculate_optimal_workers(self,
avg_memory_per_task_mb: float = 500,
target_cpu_percent: float = 80.0) -> int:
"""
Calculate optimal number of workers.
Args:
avg_memory_per_task_mb: Average memory per task in MB
target_cpu_percent: Target CPU utilization percentage
Returns:
Optimal worker count
"""
# Calculate based on CPU
cpu_workers = int(self.cpu_count * (target_cpu_percent / 100))
# Calculate based on memory
available_memory_mb = psutil.virtual_memory().available / 1024 / 1024
# Reserve 20% for system
usable_memory_mb = available_memory_mb * 0.8
memory_workers = int(usable_memory_mb / avg_memory_per_task_mb)
# Take minimum to avoid oversubscription
optimal = min(cpu_workers, memory_workers)
# Ensure at least 1 worker
optimal = max(1, optimal)
logger.info("calculated_optimal_workers",
cpu_workers=cpu_workers,
memory_workers=memory_workers,
optimal=optimal)
return optimal
def get_system_stats(self) -> Dict:
"""Get current system resource statistics."""
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
return {
'cpu': {
'count': self.cpu_count,
'percent': cpu_percent,
'per_cpu': psutil.cpu_percent(interval=1, percpu=True)
},
'memory': {
'total_gb': memory.total / 1024 / 1024 / 1024,
'available_gb': memory.available / 1024 / 1024 / 1024,
'used_percent': memory.percent
},
'disk': {
'total_gb': disk.total / 1024 / 1024 / 1024,
'free_gb': disk.free / 1024 / 1024 / 1024,
'used_percent': disk.percent
}
}
def should_throttle(self,
cpu_threshold: float = 90.0,
memory_threshold: float = 85.0) -> bool:
"""Determine if processing should be throttled."""
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
if cpu_percent > cpu_threshold:
logger.warning("cpu_threshold_exceeded",
current=cpu_percent,
threshold=cpu_threshold)
return True
if memory_percent > memory_threshold:
logger.warning("memory_threshold_exceeded",
current=memory_percent,
threshold=memory_threshold)
return True
return False
Progress Tracking and Monitoring
Implement real-time progress tracking:
# app/services/progress_tracker.py
from typing import Dict, Optional
import asyncio
from datetime import datetime, timedelta
import structlog
logger = structlog.get_logger()
class ProgressTracker:
"""Track and report batch processing progress."""
def __init__(self, total_documents: int):
self.total_documents = total_documents
self.processed = 0
self.successful = 0
self.failed = 0
self.start_time = datetime.utcnow()
self.last_update = self.start_time
async def update(self, success: bool = True):
"""Update progress counters."""
self.processed += 1
if success:
self.successful += 1
else:
self.failed += 1
self.last_update = datetime.utcnow()
def get_progress(self) -> Dict:
"""Get current progress statistics."""
elapsed = (datetime.utcnow() - self.start_time).total_seconds()
progress_percent = (self.processed / self.total_documents * 100) \
if self.total_documents > 0 else 0
# Calculate ETA
if self.processed > 0:
avg_time_per_doc = elapsed / self.processed
remaining = self.total_documents - self.processed
eta_seconds = avg_time_per_doc * remaining
eta = datetime.utcnow() + timedelta(seconds=eta_seconds)
else:
eta = None
# Calculate throughput
throughput = self.processed / elapsed if elapsed > 0 else 0
return {
'total': self.total_documents,
'processed': self.processed,
'successful': self.successful,
'failed': self.failed,
'progress_percent': progress_percent,
'elapsed_seconds': elapsed,
'eta': eta.isoformat() if eta else None,
'throughput_per_second': throughput,
'success_rate': (self.successful / self.processed * 100)
if self.processed > 0 else 0
}
async def monitor(self, update_interval: int = 10):
"""Continuously monitor and log progress."""
while self.processed < self.total_documents:
progress = self.get_progress()
logger.info("progress_update",
processed=progress['processed'],
total=progress['total'],
percent=f"{progress['progress_percent']:.1f}%",
throughput=f"{progress['throughput_per_second']:.2f} docs/sec",
eta=progress['eta'])
await asyncio.sleep(update_interval)
Result Aggregation
Efficiently aggregate and store results:
# app/services/result_aggregator.py
import json
import csv
from typing import List, Dict
import structlog
from pathlib import Path
logger = structlog.get_logger()
class ResultAggregator:
"""Aggregate and export batch processing results."""
def __init__(self, output_dir: str):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def save_results(self,
batch_job_id: str,
results: List[Dict],
format: str = "jsonl") -> str:
"""
Save results to file.
Args:
batch_job_id: Batch job identifier
results: List of result dictionaries
format: Output format (jsonl, csv, json)
Returns:
Path to output file
"""
if format == "jsonl":
return self._save_jsonl(batch_job_id, results)
elif format == "csv":
return self._save_csv(batch_job_id, results)
elif format == "json":
return self._save_json(batch_job_id, results)
else:
raise ValueError(f"Unsupported format: {format}")
def _save_jsonl(self, batch_job_id: str, results: List[Dict]) -> str:
"""Save as JSON Lines format."""
output_path = self.output_dir / f"{batch_job_id}_results.jsonl"
with open(output_path, 'w', encoding='utf-8') as f:
for result in results:
f.write(json.dumps(result, ensure_ascii=False) + '\n')
logger.info("results_saved_jsonl",
path=str(output_path),
count=len(results))
return str(output_path)
def _save_csv(self, batch_job_id: str, results: List[Dict]) -> str:
"""Save as CSV format."""
output_path = self.output_dir / f"{batch_job_id}_results.csv"
if not results:
return str(output_path)
# Get all unique keys
fieldnames = set()
for result in results:
fieldnames.update(result.keys())
fieldnames = sorted(fieldnames)
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
logger.info("results_saved_csv",
path=str(output_path),
count=len(results))
return str(output_path)
def _save_json(self, batch_job_id: str, results: List[Dict]) -> str:
"""Save as single JSON file."""
output_path = self.output_dir / f"{batch_job_id}_results.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info("results_saved_json",
path=str(output_path),
count=len(results))
return str(output_path)
def generate_summary(self, batch_job_id: str,
summary_data: Dict) -> str:
"""Generate and save batch summary report."""
summary_path = self.output_dir / f"{batch_job_id}_summary.json"
with open(summary_path, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=2, default=str)
logger.info("summary_generated", path=str(summary_path))
return str(summary_path)
Performance Optimization
Key strategies for maximum throughput:
# Performance tips and configuration
# 1. Batch size optimization
# Smaller batches: More frequent progress updates, better error isolation
# Larger batches: Better throughput, less overhead
OPTIMAL_BATCH_SIZE = 100 # Sweet spot for most workloads
# 2. Worker count optimization
# Too few: Underutilized CPU/GPU
# Too many: Context switching overhead, memory pressure
WORKERS = min(multiprocessing.cpu_count(), 8)
# 3. Memory management
# Process documents in chunks
# Force garbage collection between batches
# Use memory-mapped files for large images
# 4. I/O optimization
# Use async I/O for network operations
# Batch S3 uploads/downloads
# Use local SSD cache for frequently accessed files
# 5. Preprocessing optimization
# Cache preprocessed images
# Skip preprocessing for high-quality scans
# Parallelize preprocessing separately from OCR
# 6. GPU utilization
# Batch multiple images per GPU call
# Use mixed precision for faster inference
# Pipeline CPU preprocessing with GPU OCR
Conclusion
Scaling OCR to process thousands of documents requires careful attention to parallelization, resource management, and error handling. The architecture and patterns presented here provide a production-ready foundation for high-volume document processing.
Key takeaways:
- Separate concerns: scheduling, processing, aggregation
- Use appropriate parallelism for your workload
- Monitor and adjust resource allocation dynamically
- Implement comprehensive error handling and retry logic
- Track progress and provide ETA estimates
- Optimize batch size based on document characteristics
With these patterns, you can build OCR systems that efficiently process millions of documents while maintaining reliability and cost-effectiveness.
References
-
Gorelick, M., & Ozsvald, I. (2020). "High Performance Python: Practical Performant Programming for Humans." O'Reilly Media.
-
Beazley, D., & Jones, B. K. (2013). "Python Cookbook: Recipes for Mastering Python 3." O'Reilly Media.
-
Tang, Y., et al. (2024). "Distributed Machine Learning Patterns." Manning Publications.