
Projection Patterns
Implement event-sourced read models with registrable projections, batch replay, and checkpointing in Python.
Install
npx skills add https://github.com/wshobson/agents --skill projection-patternsWhat is this skill?
- Projection base class with name, handles(), and async apply(Event) contract
- Projector registers multiple projections and runs continuous batch replay (default batch_size 100)
- Checkpoint store integration for resumable consumers from the event store
- Typed Event dataclass: stream_id, event_type, data, version, global_position
- Template 1: Basic Projector pattern with asyncpg-oriented store wiring
Adoption & trust: 6.9k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Entra App Registrationmicrosoft/azure-skills
Azure Aigatewaymicrosoft/azure-skills
Lark Openapi Explorerlarksuite/cli
Supabasesupabase/agent-skills
Firebase Auth Basicsfirebase/agent-skills
Firebase Data Connectfirebase/agent-skills
Journey fit
Primary fit
Projection patterns are introduced when designing backend architecture—event store to queryable views—before ship-time performance tuning. Templates center async Projector loops, Projection ABC, and apply() handlers—core backend/event-sourcing implementation, not frontend or docs.
Common Questions / FAQ
Is Projection Patterns safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Projection Patterns
# projection-patterns — templates and worked examples ## Templates ### Template 1: Basic Projector ```python from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Any, Callable, List import asyncpg @dataclass class Event: stream_id: str event_type: str data: dict version: int global_position: int class Projection(ABC): """Base class for projections.""" @property @abstractmethod def name(self) -> str: """Unique projection name for checkpointing.""" pass @abstractmethod def handles(self) -> List[str]: """List of event types this projection handles.""" pass @abstractmethod async def apply(self, event: Event) -> None: """Apply event to the read model.""" pass class Projector: """Runs projections from event store.""" def __init__(self, event_store, checkpoint_store): self.event_store = event_store self.checkpoint_store = checkpoint_store self.projections: List[Projection] = [] def register(self, projection: Projection): self.projections.append(projection) async def run(self, batch_size: int = 100): """Run all projections continuously.""" while True: for projection in self.projections: await self._run_projection(projection, batch_size) await asyncio.sleep(0.1) async def _run_projection(self, projection: Projection, batch_size: int): checkpoint = await self.checkpoint_store.get(projection.name) position = checkpoint or 0 events = await self.event_store.read_all(position, batch_size) for event in events: if event.event_type in projection.handles(): await projection.apply(event) await self.checkpoint_store.save( projection.name, event.global_position ) async def rebuild(self, projection: Projection): """Rebuild a projection from scratch.""" await self.checkpoint_store.delete(projection.name) # Optionally clear read model tables await self._run_projection(projection, batch_size=1000) ``` ### Template 2: Order Summary Projection ```python class OrderSummaryProjection(Projection): """Projects order events to a summary read model.""" def __init__(self, db_pool: asyncpg.Pool): self.pool = db_pool @property def name(self) -> str: return "order_summary" def handles(self) -> List[str]: return [ "OrderCreated", "OrderItemAdded", "OrderItemRemoved", "OrderShipped", "OrderCompleted", "OrderCancelled" ] async def apply(self, event: Event) -> None: handlers = { "OrderCreated": self._handle_created, "OrderItemAdded": self._handle_item_added, "OrderItemRemoved": self._handle_item_removed, "OrderShipped": self._handle_shipped, "OrderCompleted": self._handle_completed, "OrderCancelled": self._handle_cancelled, } handler = handlers.get(event.event_type) if handler: await handler(event) async def _handle_created(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """ INSERT INTO order_summaries (order_id, customer_id, status, total_amount, item_count, created_at) VALUES ($1, $2, $3, $4, $5, $6) """, event.data['order_id'], event.data['customer_id'], 'pending', 0, 0, event.data['created_at'] ) async def _handle_item_added(self, event: Event): async with self.pool.acquire() as conn: await conn.execute( """