
Go Concurrency Patterns
Implement bounded parallelism in Go services with worker pools, channels, and context-aware shutdown instead of spawning uncontrolled goroutines.
Install
npx skills add https://github.com/wshobson/agents --skill go-concurrency-patternsWhat is this skill?
- Worker pool pattern with a fixed number of goroutines consuming from a jobs channel
- Context cancellation via select on ctx.Done() inside worker loops
- Buffered results channel sized to job volume with a closer goroutine after WaitGroup
- sync.WaitGroup pairing Add/Done with defer for orderly worker exit
- Documented Pattern 1 walkthrough from job enqueue through result collection
Adoption & trust: 7.3k installs on skills.sh; 36.5k GitHub stars; 3/3 security scanners passed (skills.sh audits).
Recommended Skills
Journey fit
Concurrency patterns are applied while writing backend Go code during the build phase, before shipping load-sensitive workers or APIs. Worker pools, job channels, and sync primitives belong on the backend shelf where APIs, workers, and CLI batch jobs are implemented.
Common Questions / FAQ
Is Go Concurrency Patterns safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Go Concurrency Patterns
# go-concurrency-patterns — detailed patterns and worked examples ## Patterns ### Pattern 1: Worker Pool ```go package main import ( "context" "fmt" "sync" ) type Job struct { ID int Data string } type Result struct { JobID int Output string Err error } func WorkerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result { results := make(chan Result, len(jobs)) var wg sync.WaitGroup for i := 0; i < numWorkers; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for job := range jobs { select { case <-ctx.Done(): return default: result := processJob(job) results <- result } } }(i) } go func() { wg.Wait() close(results) }() return results } func processJob(job Job) Result { // Simulate work return Result{ JobID: job.ID, Output: fmt.Sprintf("Processed: %s", job.Data), } } // Usage func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() jobs := make(chan Job, 100) // Send jobs go func() { for i := 0; i < 50; i++ { jobs <- Job{ID: i, Data: fmt.Sprintf("job-%d", i)} } close(jobs) }() // Process with 5 workers results := WorkerPool(ctx, 5, jobs) for result := range results { fmt.Printf("Result: %+v\n", result) } } ``` ### Pattern 2: Fan-Out/Fan-In Pipeline ```go package main import ( "context" "sync" ) // Stage 1: Generate numbers func generate(ctx context.Context, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case <-ctx.Done(): return case out <- n: } } }() return out } // Stage 2: Square numbers (can run multiple instances) func square(ctx context.Context, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case <-ctx.Done(): return case out <- n * n: } } }() return out } // Fan-in: Merge multiple channels into one func merge(ctx context.Context, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start output goroutine for each input channel output := func(c <-chan int) { defer wg.Done() for n := range c { select { case <-ctx.Done(): return case out <- n: } } } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Close out after all inputs are done go func() { wg.Wait() close(out) }() return out } func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Generate input in := generate(ctx, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) // Fan out to multiple squarers c1 := square(ctx, in) c2 := square(ctx, in) c3 := square(ctx, in) // Fan in results for result := range merge(ctx, c1, c2, c3) { fmt.Println(result) } } ``` ### Pattern 3: Bounded Concurrency with Semaphore ```go package main import ( "context" "fmt" "golang.org/x/sync/semaphore" "sync" ) type RateLimitedWorker struct { sem *semaphore.Weighted } func NewRateLimitedWorker(maxConcurrent int64) *RateLimitedWorker { return &RateLimitedWorker{ sem: semaphore.NewWeighted(maxConcurrent), } } func (w *RateLimitedWorker) Do(ctx context.Context, tasks []func() error) []error { var ( wg sync.WaitGroup mu sync.Mutex errors []error ) for _, task := range tasks {