TutorialsImageBatch Image Processing
IntermediateUpdated Dec 11, 2025

Batch Image Processing

Learn to process multiple images efficiently using queues, parallel processing, and optimal batching strategies.

AR
Alex Rivera
Senior Engineer
8 min read

Why Batch Processing?

When you need to generate many images, batch processing provides:

  • Efficiency - Reduced overhead per request
  • Cost savings - Better resource utilization
  • Reliability - Built-in retry and error handling
  • Scalability - Handle thousands of images

Basic Batch Generation

python
from abstrakt import AbstraktClient
import asyncio

client = AbstraktClient()

prompts = [
    "A serene mountain landscape at sunrise",
    "A bustling city street at night",
    "A peaceful beach at sunset",
    "A cozy cabin in snowy woods",
    "A futuristic space station"
]

async def generate_batch(prompts):
    tasks = []
    
    for prompt in prompts:
        task = client.run_async("fal-ai/flux/schnell", {
            "input": {"prompt": prompt}
        })
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return [r.images[0].url for r in results]

urls = asyncio.run(generate_batch(prompts))

Rate Limit Aware Batching

python
import asyncio
from asyncio import Semaphore

class BatchProcessor:
    def __init__(self, max_concurrent=10, delay=0.1):
        self.semaphore = Semaphore(max_concurrent)
        self.delay = delay
        self.client = AbstraktClient()
    
    async def process_one(self, prompt):
        async with self.semaphore:
            await asyncio.sleep(self.delay)
            return await self.client.run_async(
                "fal-ai/flux/schnell",
                {"input": {"prompt": prompt}}
            )
    
    async def process_batch(self, prompts):
        tasks = [self.process_one(p) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
processor = BatchProcessor(max_concurrent=5)
results = asyncio.run(processor.process_batch(prompts))

Using Webhooks for Large Batches

For very large batches, use webhooks to avoid timeouts:

python
def submit_batch(prompts):
    job_ids = []
    
    for prompt in prompts:
        result = client.submit("fal-ai/flux/schnell", {
            "input": {"prompt": prompt},
            "webhook_url": "https://your-server.com/webhook"
        })
        job_ids.append(result.request_id)
    
    return job_ids

Webhook handler:

python
from flask import Flask, request

app = Flask(__name__)

@app.route('/webhook', methods=['POST'])
def handle_webhook():
    data = request.json
    
    if data['status'] == 'completed':
        image_url = data['result']['items'][0]['url']
        # Save or process the image
        save_image(data['request_id'], image_url)
    
    return {'received': True}

Progress Tracking

python
from tqdm import tqdm
import asyncio

async def process_with_progress(prompts):
    results = []
    
    with tqdm(total=len(prompts), desc="Generating") as pbar:
        for prompt in prompts:
            result = await client.run_async(
                "fal-ai/flux/schnell",
                {"input": {"prompt": prompt}}
            )
            results.append(result)
            pbar.update(1)
    
    return results

Error Handling & Retries

python
async def process_with_retry(prompt, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await client.run_async(
                "fal-ai/flux/schnell",
                {"input": {"prompt": prompt}}
            )
        except RateLimitError as e:
            await asyncio.sleep(e.retry_after)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

Optimal Batch Sizes

TierRecommended Batch SizeConcurrency
Free5-102
Pro20-5010
Business100-50050

Next Steps

#batch#performance#scaling