
Cqrs Implementation
Scaffold CQRS command types, handlers, and a command bus in Python for write-side domain workflows.
Install
npx skills add https://github.com/wshobson/agents --skill cqrs-implementationWhat is this skill?
- Command base with generated command_id and UTC timestamp
- Concrete commands: CreateOrder, AddOrderItem, CancelOrder
- Generic CommandHandler ABC and CommandBus register/dispatch
- Async handle and dispatch pattern ready for handler implementations
Adoption & trust: 7.1k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Entra App Registrationmicrosoft/azure-skills
Azure Aigatewaymicrosoft/azure-skills
Lark Openapi Explorerlarksuite/cli
Supabasesupabase/agent-skills
Firebase Auth Basicsfirebase/agent-skills
Firebase Data Connectfirebase/agent-skills
Journey fit
Common Questions / FAQ
Is Cqrs Implementation safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Cqrs Implementation
# cqrs-implementation — templates and worked examples ## Templates ### Template 1: Command Infrastructure ```python from abc import ABC, abstractmethod from dataclasses import dataclass from typing import TypeVar, Generic, Dict, Any, Type from datetime import datetime import uuid # Command base @dataclass class Command: command_id: str = None timestamp: datetime = None def __post_init__(self): self.command_id = self.command_id or str(uuid.uuid4()) self.timestamp = self.timestamp or datetime.utcnow() # Concrete commands @dataclass class CreateOrder(Command): customer_id: str items: list shipping_address: dict @dataclass class AddOrderItem(Command): order_id: str product_id: str quantity: int price: float @dataclass class CancelOrder(Command): order_id: str reason: str # Command handler base T = TypeVar('T', bound=Command) class CommandHandler(ABC, Generic[T]): @abstractmethod async def handle(self, command: T) -> Any: pass # Command bus class CommandBus: def __init__(self): self._handlers: Dict[Type[Command], CommandHandler] = {} def register(self, command_type: Type[Command], handler: CommandHandler): self._handlers[command_type] = handler async def dispatch(self, command: Command) -> Any: handler = self._handlers.get(type(command)) if not handler: raise ValueError(f"No handler for {type(command).__name__}") return await handler.handle(command) # Command handler implementation class CreateOrderHandler(CommandHandler[CreateOrder]): def __init__(self, order_repository, event_store): self.order_repository = order_repository self.event_store = event_store async def handle(self, command: CreateOrder) -> str: # Validate if not command.items: raise ValueError("Order must have at least one item") # Create aggregate order = Order.create( customer_id=command.customer_id, items=command.items, shipping_address=command.shipping_address ) # Persist events await self.event_store.append_events( stream_id=f"Order-{order.id}", stream_type="Order", events=order.uncommitted_events ) return order.id ``` ### Template 2: Query Infrastructure ```python from abc import ABC, abstractmethod from dataclasses import dataclass from typing import TypeVar, Generic, List, Optional # Query base @dataclass class Query: pass # Concrete queries @dataclass class GetOrderById(Query): order_id: str @dataclass class GetCustomerOrders(Query): customer_id: str status: Optional[str] = None page: int = 1 page_size: int = 20 @dataclass class SearchOrders(Query): query: str filters: dict = None sort_by: str = "created_at" sort_order: str = "desc" # Query result types @dataclass class OrderView: order_id: str customer_id: str status: str total_amount: float item_count: int created_at: datetime shipped_at: Optional[datetime] = None @dataclass class PaginatedResult(Generic[T]): items: List[T] total: int page: int page_size: int @property def total_pages(self) -> int: return (self.total + self.page_size - 1) // self.page_size # Query handler base T = TypeVar('T', bound=Query) R = TypeVar('R') class QueryHandler(ABC, Generic[T, R]): @abstractmethod async def handle(self, query: T) -> R: pass # Query bus class QueryBus: def __init__(self): self._handlers: Dict[Type[Query], QueryHandler] = {} def register(self, query_type: Type[Query], handler: QueryHandler): self._handlers[query_type] = handler async def dispatch(self, query: Query) -> Any: handler = self._handlers.get(type(query)) if not handler: raise ValueError(f"No handler for {type(query).__na