
Event Store Design
Bootstrap event-sourced persistence with copy-ready PostgreSQL schemas and Python event-store patterns instead of inventing streams, versions, and snapshots from scratch.
Install
npx skills add https://github.com/wshobson/agents --skill event-store-designWhat is this skill?
- PostgreSQL events table with stream versioning, global position, JSONB payloads, and uniqueness on (stream_id, version)
- Indexes for stream reads, global subscriptions, event-type filters, and time queries
- Companion snapshots and subscription_checkpoints tables for read models and consumers
- Template 2: Python dataclass-oriented event store implementation pattern (in SKILL.md)
- Worked examples section for adapting templates to your domain events
Adoption & trust: 7k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Entra App Registrationmicrosoft/azure-skills
Azure Aigatewaymicrosoft/azure-skills
Lark Openapi Explorerlarksuite/cli
Supabasesupabase/agent-skills
Firebase Auth Basicsfirebase/agent-skills
Firebase Data Connectfirebase/agent-skills
Journey fit
Primary fit
Event store design is core backend architecture during product build, not a launch or growth task. Backend subphase covers data models, append-only logs, projections, and subscription checkpoints—the skill’s templates target that layer.
Common Questions / FAQ
Is Event Store Design safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Event Store Design
# event-store-design — templates and worked examples ## Templates ### Template 1: PostgreSQL Event Store Schema ```sql -- Events table CREATE TABLE events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), stream_id VARCHAR(255) NOT NULL, stream_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version BIGINT NOT NULL, global_position BIGSERIAL, created_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_stream_version UNIQUE (stream_id, version) ); -- Index for stream queries CREATE INDEX idx_events_stream_id ON events(stream_id, version); -- Index for global subscription CREATE INDEX idx_events_global_position ON events(global_position); -- Index for event type queries CREATE INDEX idx_events_event_type ON events(event_type); -- Index for time-based queries CREATE INDEX idx_events_created_at ON events(created_at); -- Snapshots table CREATE TABLE snapshots ( stream_id VARCHAR(255) PRIMARY KEY, stream_type VARCHAR(255) NOT NULL, snapshot_data JSONB NOT NULL, version BIGINT NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ); -- Subscriptions checkpoint table CREATE TABLE subscription_checkpoints ( subscription_id VARCHAR(255) PRIMARY KEY, last_position BIGINT NOT NULL DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` ### Template 2: Python Event Store Implementation ```python from dataclasses import dataclass, field from datetime import datetime from typing import Any, Optional, List from uuid import UUID, uuid4 import json import asyncpg @dataclass class Event: stream_id: str event_type: str data: dict metadata: dict = field(default_factory=dict) event_id: UUID = field(default_factory=uuid4) version: Optional[int] = None global_position: Optional[int] = None created_at: datetime = field(default_factory=datetime.utcnow) class EventStore: def __init__(self, pool: asyncpg.Pool): self.pool = pool async def append_events( self, stream_id: str, stream_type: str, events: List[Event], expected_version: Optional[int] = None ) -> List[Event]: """Append events to a stream with optimistic concurrency.""" async with self.pool.acquire() as conn: async with conn.transaction(): # Check expected version if expected_version is not None: current = await conn.fetchval( "SELECT MAX(version) FROM events WHERE stream_id = $1", stream_id ) current = current or 0 if current != expected_version: raise ConcurrencyError( f"Expected version {expected_version}, got {current}" ) # Get starting version start_version = await conn.fetchval( "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1", stream_id ) # Insert events saved_events = [] for i, event in enumerate(events): event.version = start_version + i row = await conn.fetchrow( """ INSERT INTO events (id, stream_id, stream_type, event_type, event_data, metadata, version, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING global_position """, event.event_id, stream_id, stream_type, event.event_type, json.dumps(event.data), json.dumps(event.metadata), event.version,