Type-safe, composable concurrency patterns with backpressure for Go
Quacker provides composable, channel-native concurrency primitives for Go. Backpressure propagates naturally through Go's channel semantics — no custom protocols, no framework lock-in.
go get github.com/SangTran-127/quackerRequirements: Go 1.25+
pipeline — Stream processing with backpressure
Compose stages into pipelines. Each stage transforms a channel into a channel. Backpressure is free — when a downstream stage is slow, upstream blocks automatically.
ctx, cancel := context.WithCancelCause(ctx)
out := pipeline.Run(ctx, input,
pipeline.Map(3, func(ctx context.Context, e Event) (Event, bool) {
enriched, err := enrich(ctx, e)
if err != nil {
cancel(err)
return e, false
}
return enriched, true
}),
pipeline.Filter(func(e Event) bool { return e.Priority > 0 }),
pipeline.Sink(5, func(ctx context.Context, e Event) {
store(ctx, e)
}),
)
for range out {}
if err := context.Cause(ctx); err != nil {
log.Fatal(err)
}Stages: Map, Filter, ForEach, Buffer, Tee, Take, Sink, Batch
fanout — 1-to-N distribution
Distribute a single stream to multiple consumers. RoundRobin for load balancing, Broadcast for pub/sub.
fo, _ := fanout.NewFanOut[int](
fanout.WithWorkerCount(3),
fanout.WithStrategy(fanout.Broadcast),
)
fo.Run(ctx, input)
for _, ch := range fo.Outputs() {
go consume(ch)
}fanin — N-to-1 merge
Merge multiple channels into a single stream.
fi, _ := fanin.NewFanIn[int]()
fi.Add(stream1)
fi.Add(stream2)
fi.Add(stream3)
merged := fi.Run(ctx)workerpool — Bounded task processing
Fixed-size worker pool with push-based task queuing, metrics, and graceful shutdown.
pool, _ := workerpool.NewWorkerPool[MyTask](ctx,
workerpool.WithNumWorkers(4),
workerpool.WithTaskQueueSize(100),
)
pool.Start()
pool.Push(task)
pool.StopAndWait()The primitives connect through channels — Go's universal connector:
+-- pipeline --+
source --> fanout --+-- pipeline --+-- fanin --> pipeline --> sink
+-- pipeline --+
fo.Run(ctx, source)
fast := pipeline.Run(ctx, fo.Outputs()[0], pipeline.Map(10, enrichFast))
slow := pipeline.Run(ctx, fo.Outputs()[1], pipeline.Map(2, enrichSlow))
fi.Add(fast)
fi.Add(slow)
merged := fi.Run(ctx)
out := pipeline.Run(ctx, merged, pipeline.Sink(5, store))- Backpressure is free. Channels block when full. No custom protocol.
- Errors use stdlib.
context.WithCancelCause+context.Cause. No custom error types. - Shutdown is clear. Close input for graceful drain. Cancel context for immediate stop.
- Small interfaces. Observer interfaces are 2-3 methods, always optional.
- No dependencies. Core logic is stdlib only (
golang.org/x/syncfor errgroup in workerpool).
Built with Go
