Skip to content

SangTran-127/quacker

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

65 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Quacker

Quacker

Type-safe, composable concurrency patterns with backpressure for Go

CI Go Reference Coverage Go Report Card Made with Go License

Quacker provides composable, channel-native concurrency primitives for Go. Backpressure propagates naturally through Go's channel semantics — no custom protocols, no framework lock-in.

Installation

go get github.com/SangTran-127/quacker

Requirements: Go 1.25+

Packages

pipeline — Stream processing with backpressure

Compose stages into pipelines. Each stage transforms a channel into a channel. Backpressure is free — when a downstream stage is slow, upstream blocks automatically.

ctx, cancel := context.WithCancelCause(ctx)

out := pipeline.Run(ctx, input,
    pipeline.Map(3, func(ctx context.Context, e Event) (Event, bool) {
        enriched, err := enrich(ctx, e)
        if err != nil {
            cancel(err)
            return e, false
        }
        return enriched, true
    }),
    pipeline.Filter(func(e Event) bool { return e.Priority > 0 }),
    pipeline.Sink(5, func(ctx context.Context, e Event) {
        store(ctx, e)
    }),
)

for range out {}
if err := context.Cause(ctx); err != nil {
    log.Fatal(err)
}

Stages: Map, Filter, ForEach, Buffer, Tee, Take, Sink, Batch

fanout — 1-to-N distribution

Distribute a single stream to multiple consumers. RoundRobin for load balancing, Broadcast for pub/sub.

fo, _ := fanout.NewFanOut[int](
    fanout.WithWorkerCount(3),
    fanout.WithStrategy(fanout.Broadcast),
)
fo.Run(ctx, input)

for _, ch := range fo.Outputs() {
    go consume(ch)
}

fanin — N-to-1 merge

Merge multiple channels into a single stream.

fi, _ := fanin.NewFanIn[int]()
fi.Add(stream1)
fi.Add(stream2)
fi.Add(stream3)
merged := fi.Run(ctx)

workerpool — Bounded task processing

Fixed-size worker pool with push-based task queuing, metrics, and graceful shutdown.

pool, _ := workerpool.NewWorkerPool[MyTask](ctx,
    workerpool.WithNumWorkers(4),
    workerpool.WithTaskQueueSize(100),
)
pool.Start()
pool.Push(task)
pool.StopAndWait()

How they compose

The primitives connect through channels — Go's universal connector:

                    +-- pipeline --+
source --> fanout --+-- pipeline --+-- fanin --> pipeline --> sink
                    +-- pipeline --+
fo.Run(ctx, source)

fast := pipeline.Run(ctx, fo.Outputs()[0], pipeline.Map(10, enrichFast))
slow := pipeline.Run(ctx, fo.Outputs()[1], pipeline.Map(2, enrichSlow))

fi.Add(fast)
fi.Add(slow)
merged := fi.Run(ctx)

out := pipeline.Run(ctx, merged, pipeline.Sink(5, store))

Design

  • Backpressure is free. Channels block when full. No custom protocol.
  • Errors use stdlib. context.WithCancelCause + context.Cause. No custom error types.
  • Shutdown is clear. Close input for graceful drain. Cancel context for immediate stop.
  • Small interfaces. Observer interfaces are 2-3 methods, always optional.
  • No dependencies. Core logic is stdlib only (golang.org/x/sync for errgroup in workerpool).

Built with Go

About

A collection of type-safe, composable, and observable concurrency models for Go.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages