IntermediateUpdated Dec 11, 2025
Batch Image Processing
Learn to process multiple images efficiently using queues, parallel processing, and optimal batching strategies.
AR
Alex Rivera
Senior Engineer
8 min read
Why Batch Processing?
When you need to generate many images, batch processing provides:
- Efficiency - Reduced overhead per request
- Cost savings - Better resource utilization
- Reliability - Built-in retry and error handling
- Scalability - Handle thousands of images
Basic Batch Generation
python
from abstrakt import AbstraktClient
import asyncio
client = AbstraktClient()
prompts = [
"A serene mountain landscape at sunrise",
"A bustling city street at night",
"A peaceful beach at sunset",
"A cozy cabin in snowy woods",
"A futuristic space station"
]
async def generate_batch(prompts):
tasks = []
for prompt in prompts:
task = client.run_async("fal-ai/flux/schnell", {
"input": {"prompt": prompt}
})
tasks.append(task)
results = await asyncio.gather(*tasks)
return [r.images[0].url for r in results]
urls = asyncio.run(generate_batch(prompts))Rate Limit Aware Batching
python
import asyncio
from asyncio import Semaphore
class BatchProcessor:
def __init__(self, max_concurrent=10, delay=0.1):
self.semaphore = Semaphore(max_concurrent)
self.delay = delay
self.client = AbstraktClient()
async def process_one(self, prompt):
async with self.semaphore:
await asyncio.sleep(self.delay)
return await self.client.run_async(
"fal-ai/flux/schnell",
{"input": {"prompt": prompt}}
)
async def process_batch(self, prompts):
tasks = [self.process_one(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
processor = BatchProcessor(max_concurrent=5)
results = asyncio.run(processor.process_batch(prompts))Using Webhooks for Large Batches
For very large batches, use webhooks to avoid timeouts:
python
def submit_batch(prompts):
job_ids = []
for prompt in prompts:
result = client.submit("fal-ai/flux/schnell", {
"input": {"prompt": prompt},
"webhook_url": "https://your-server.com/webhook"
})
job_ids.append(result.request_id)
return job_idsWebhook handler:
python
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
data = request.json
if data['status'] == 'completed':
image_url = data['result']['items'][0]['url']
# Save or process the image
save_image(data['request_id'], image_url)
return {'received': True}Progress Tracking
python
from tqdm import tqdm
import asyncio
async def process_with_progress(prompts):
results = []
with tqdm(total=len(prompts), desc="Generating") as pbar:
for prompt in prompts:
result = await client.run_async(
"fal-ai/flux/schnell",
{"input": {"prompt": prompt}}
)
results.append(result)
pbar.update(1)
return resultsError Handling & Retries
python
async def process_with_retry(prompt, max_retries=3):
for attempt in range(max_retries):
try:
return await client.run_async(
"fal-ai/flux/schnell",
{"input": {"prompt": prompt}}
)
except RateLimitError as e:
await asyncio.sleep(e.retry_after)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoffOptimal Batch Sizes
| Tier | Recommended Batch Size | Concurrency |
|---|---|---|
| Free | 5-10 | 2 |
| Pro | 20-50 | 10 |
| Business | 100-500 | 50 |
Next Steps
- Set up webhooks for async processing
- Learn LoRA training for custom models
- Explore video generation
#batch#performance#scaling