
Batch Processing
Run large parallel AI batches over files or API items with concurrency limits, retries, checkpoints, and aggregated success/failure reports.
Install
npx skills add https://github.com/itallstartedwithaidea/agent-skills --skill batch-processingWhat is this skill?
- Parallel AI execution with configurable concurrency and throughput control
- Exponential backoff on rate limits plus automatic retry for transient failures
- Checkpoint-based resumability so crashes do not lose completed items
- Structured progress reporting with per-item status, timing, and errors
- Homogeneous and heterogeneous batches with dead-letter queue for permanent failures
Adoption & trust: 1 installs on skills.sh; 18 GitHub stars; 3/3 security scanners passed (skills.sh audits); trending (+100% hot-view momentum).
Recommended Skills
Journey fit
Production-scale agent pipelines are built when you turn one-off prompts into repeatable tooling—primary shelf is agent-tooling in Build. Batch orchestration—rate limits, dead-letter queues, resumability—is agent infrastructure, not a single integration endpoint.
Common Questions / FAQ
Is Batch Processing safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Batch Processing
# Batch Processing Part of [Agent Skills™](https://github.com/itallstartedwithaidea/agent-skills) by [googleadsagent.ai™](https://googleadsagent.ai) ## Description Batch Processing enables parallel AI task execution with progress tracking, error handling, rate limiting, and result aggregation. The agent processes large collections of items—documents, images, code files, API requests—through AI pipelines concurrently, managing throughput, failures, and partial results without losing work. Single-item AI processing is straightforward; batch processing at scale introduces failures, rate limits, memory constraints, and the need for resumability. This skill handles these production realities: configurable concurrency limits, exponential backoff on rate limit errors, checkpoint-based resumability after crashes, and structured progress reporting that shows exactly which items succeeded, failed, or are pending. The skill supports both homogeneous batches (same operation on every item) and heterogeneous batches (different operations routed by item type). Results are aggregated into structured reports with per-item status, timing, and error details. Failed items are automatically retried with backoff, and permanently failed items are collected into a dead-letter queue for manual inspection. ## Use When - Processing hundreds or thousands of items through an AI pipeline - Translating, summarizing, or classifying large document collections - Generating embeddings for a corpus of documents - Running code analysis across an entire repository - Batch-generating images, descriptions, or metadata - Any task that processes items sequentially but could benefit from parallelism ## How It Works ```mermaid graph TD A[Input Batch: N Items] --> B[Load Checkpoint: Resume if Exists] B --> C[Partition into Work Chunks] C --> D[Parallel Workers: Concurrency Limit] D --> E[Worker 1: Process Item] D --> F[Worker 2: Process Item] D --> G[Worker K: Process Item] E --> H{Success?} F --> H G --> H H -->|Yes| I[Record Result] H -->|Rate Limited| J[Backoff + Retry] H -->|Failed| K[Retry Queue] J --> D K --> L{Retries Exhausted?} L -->|No| D L -->|Yes| M[Dead Letter Queue] I --> N[Save Checkpoint] N --> O[Progress Report] O --> P{All Done?} P -->|No| D P -->|Yes| Q[Final Aggregation Report] ``` The engine partitions work across concurrent workers, respecting rate limits and retry budgets. Checkpoints persist after each chunk, enabling crash recovery. The dead-letter queue captures permanently failed items for human review. ## Implementation ```python import asyncio from dataclasses import dataclass, field from time import time import json @dataclass class BatchItem: id: str input: dict status: str = "pending" result: dict | None = None error: str | None = None attempts: int = 0 duration_ms: float = 0 @dataclass class BatchResult: total: int succeeded: int failed: int dead_letter: int duration_s: float items: list[BatchItem] = field(default_factory=list) class BatchProcessor: def __init__(self, concurrency: int = 5, max_retries: int = 3, checkpoint_file: str = "batch_checkpoint.json"): self.concurrency = concurrency self.max_retries = max_retries self.checkpoint_file = checkpoint_file self.semaphore = asyncio.Semaphore(concurrency) async def process(self, items: list[BatchItem], processor_fn) -> BatchResult: items = self._load_checkpoint(items) start = time() pending = [i for i in items if i.status == "pending"] tasks = [self._process_item(item, processor_fn) for item in pending] await asyncio.gather(*tasks, return_exceptions=True) return BatchResult