From cf9c8b14c79e934556c9605ce1c062b13922f1a2 Mon Sep 17 00:00:00 2001 From: jake Date: Tue, 28 Apr 2026 20:14:57 -0400 Subject: [PATCH 1/2] Jacob Verdesi Copyright Waiver I dedicate any and all copyright interest in this software to the public domain. I make this dedication for the benefit of the public at large and to the detriment of my heirs and successors. I intend this dedication to be an overt act of relinquishment in perpetuity of all present and future rights to this software under copyright law. From ac1fd7480e488f9d0857093255d9a862bd808991 Mon Sep 17 00:00:00 2001 From: jake Date: Tue, 28 Apr 2026 20:51:14 -0400 Subject: [PATCH 2/2] proxy channel init --- _examples/proxyChannel/go.mod | 15 +++ _examples/proxyChannel/main.go | 132 ++++++++++++++++++++ proxychannel.go | 87 +++++++++++++ proxychannel_test.go | 215 +++++++++++++++++++++++++++++++++ 4 files changed, 449 insertions(+) create mode 100644 _examples/proxyChannel/go.mod create mode 100644 _examples/proxyChannel/main.go create mode 100644 proxychannel.go create mode 100644 proxychannel_test.go diff --git a/_examples/proxyChannel/go.mod b/_examples/proxyChannel/go.mod new file mode 100644 index 00000000..2964db7e --- /dev/null +++ b/_examples/proxyChannel/go.mod @@ -0,0 +1,15 @@ +module github.com/vbauerster/mpb/_examples/proxyChannel + +go 1.25.0 + +require github.com/vbauerster/mpb/v8 v8.12.0 + +require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect + github.com/clipperhouse/uax29/v2 v2.7.0 // indirect + github.com/mattn/go-runewidth v0.0.23 // indirect + golang.org/x/sys v0.43.0 // indirect +) + +replace github.com/vbauerster/mpb/v8 => ../.. diff --git a/_examples/proxyChannel/main.go b/_examples/proxyChannel/main.go new file mode 100644 index 00000000..b68b938a --- /dev/null +++ b/_examples/proxyChannel/main.go @@ -0,0 +1,132 @@ +package main + +import ( + "fmt" + "math/rand" + "sync" + "time" + + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" +) + +// chunk represents a piece of data flowing through a pipeline. +// It implements mpb.Sizer so ProxyChannel tracks bytes rather than items. +type chunk struct { + id int + data []byte +} + +func (c chunk) Size() int64 { return int64(len(c.data)) } + +// job is a plain work item with no size. ProxyChannel counts each as 1. +type job struct{ id int } + +func main() { + const ( + numChunks = 50 + chunkMaxSize = 128 * 1024 // 128 KiB + numJobs = 30 + updateRate = 500 * time.Millisecond + ) + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + + // ── bar 1: chunks with byte-level tracking (Sizer) + EWMA speed/ETA ────── + + // Build chunk sizes upfront so we know the exact total for the bar. + chunkSizes := make([]int, numChunks) + var totalBytes int64 + for i := range chunkSizes { + chunkSizes[i] = rng.Intn(chunkMaxSize) + 1 + totalBytes += int64(chunkSizes[i]) + } + + chunkCh := make(chan chunk, 8) + go func() { + localRng := rand.New(rand.NewSource(time.Now().UnixNano() + 1)) + for i, sz := range chunkSizes { + chunkCh <- chunk{id: i, data: make([]byte, sz)} + time.Sleep(time.Duration(localRng.Intn(40)+10) * time.Millisecond) + } + close(chunkCh) + }() + + // ── bar 2: plain job items (no Sizer, each counts as 1) ────────────────── + + jobCh := make(chan job, 4) + go func() { + localRng := rand.New(rand.NewSource(time.Now().UnixNano() + 2)) + for i := range numJobs { + jobCh <- job{id: i} + time.Sleep(time.Duration(localRng.Intn(80)+20) * time.Millisecond) + } + close(jobCh) + }() + + // ── progress container ──────────────────────────────────────────────────── + + var wg sync.WaitGroup + p := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(72)) + + // Byte-counting bar: uses Sizer path of ProxyChannel + EWMA decorators. + byteBar := p.New(totalBytes, + mpb.BarStyle(), + mpb.PrependDecorators( + decor.Name("chunks ", decor.WC{C: decor.DindentRight}), + ), + mpb.AppendDecorators( + + decor.Counters(decor.SizeB1024(0), "% .1f / % .1f", decor.WCSyncSpaceR), + decor.OnComplete( + decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30, decor.WCSyncSpaceR), "", + ), + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncWidth), "done", + ), + ), + ) + + // Item-counting bar: uses plain increment path of ProxyChannel. + jobBar := p.New(int64(numJobs), + mpb.BarStyle(), + mpb.PrependDecorators( + decor.Name("jobs ", decor.WC{C: decor.DindentRight}), + ), + mpb.AppendDecorators( + decor.Counters(0, "%d / %d", decor.WCSyncSpaceR), + decor.Percentage(decor.WCSyncSpaceR), + decor.OnComplete( + decor.AverageETA(decor.ET_STYLE_GO, decor.WCSyncWidth), "done", + ), + ), + ) + + // ── consumers ───────────────────────────────────────────────────────────── + + wg.Add(2) + + // Consume byte chunks through the proxy; bar updates are batched by updateRate. + go func() { + defer wg.Done() + out := mpb.ProxyChannel(byteBar, chunkCh, updateRate) + for v := range out { + c := v.(chunk) + _ = c // real code would process c.data here + } + }() + + // Consume jobs through the proxy; each job counts as 1 (no Sizer). + go func() { + defer wg.Done() + out := mpb.ProxyChannel(jobBar, jobCh, updateRate) + for v := range out { + j := v.(job) + _ = j // real code would handle the job here + } + }() + + p.Wait() + + fmt.Println("all done.") +} diff --git a/proxychannel.go b/proxychannel.go new file mode 100644 index 00000000..4ef1d6ad --- /dev/null +++ b/proxychannel.go @@ -0,0 +1,87 @@ +package mpb + +import "time" + +// Sizer is implemented by values passed through [ProxyChannel] that can +// report their size in bytes. When T implements Sizer, the bar is incremented +// by T.Size() bytes per value; values that do not implement Sizer count as 1. +type Sizer interface { + Size() int64 +} + +// ProxyChannel wraps ch with bar progress tracking. A goroutine is started that +// reads from ch, increments b, and forwards each value to the returned channel. +// +// Bar updates are batched: the accumulated count is flushed to b only after +// updateInterval has elapsed since the last flush. Values are always forwarded +// to the output channel immediately so the consumer is never held back by the +// update throttle. Any remaining accumulated count is flushed when ch is closed. +// +// T values that implement [Sizer] contribute their byte size to the bar; +// all other values count as 1. +// +// The output channel has the same capacity as ch and is closed when ch is +// closed or b's context is cancelled. Returns nil if b is already done. +func ProxyChannel[T any](b *Bar, ch <-chan T, updateInterval time.Duration) <-chan any { + result := make(chan bool, 1) + select { + case b.operateState <- func(s *bState) { result <- len(s.ewmaDecorators) != 0 }: + hasEwma := <-result + out := make(chan any, cap(ch)) + go runProxyChannel(b, ch, out, updateInterval, hasEwma) + return out + case <-b.ctx.Done(): + return nil + } +} + +func runProxyChannel[T any](b *Bar, in <-chan T, out chan<- any, updateInterval time.Duration, hasEwma bool) { + defer close(out) + + var pending int64 + now := time.Now() + lastFlush := now + batchStart := now + + flush := func() { + if pending == 0 { + return + } + if hasEwma { + b.EwmaIncrInt64(pending, time.Since(batchStart)) + } else { + b.IncrInt64(pending) + } + pending = 0 + now := time.Now() + lastFlush = now + batchStart = now + } + + for { + select { + case v, ok := <-in: + if !ok { + // ch closed: flush remainder then close out + flush() + return + } + if s, isSizer := any(v).(Sizer); isSizer { + pending += s.Size() + } else { + pending++ + } + if time.Since(lastFlush) >= updateInterval { + flush() + } + // fast-pass: forward immediately regardless of bar update + select { + case out <- v: + case <-b.ctx.Done(): + return + } + case <-b.ctx.Done(): + return + } + } +} diff --git a/proxychannel_test.go b/proxychannel_test.go new file mode 100644 index 00000000..430d882a --- /dev/null +++ b/proxychannel_test.go @@ -0,0 +1,215 @@ +package mpb_test + +import ( + "io" + "testing" + "time" + + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" +) + +// countItem is a plain value with no size information; each counts as 1. +type countItem struct{ id int } + +// sizedItem implements mpb.Sizer. +type sizedItem struct{ n int64 } + +func (s sizedItem) Size() int64 { return s.n } + +func sendAndClose[T any](items []T) <-chan T { + ch := make(chan T, len(items)) + for _, v := range items { + ch <- v + } + close(ch) + return ch +} + +func drainAll(ch <-chan any) int { + var n int + for range ch { + n++ + } + return n +} + +// TestProxyChannel verifies that non-Sizer values each count as 1. +func TestProxyChannel(t *testing.T) { + const total = 5 + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(int64(total), mpb.NopStyle()) + + items := make([]countItem, total) + for i := range items { + items[i] = countItem{i} + } + + out := mpb.ProxyChannel(bar, sendAndClose(items), 0) + n := drainAll(out) + p.Wait() + + if n != total { + t.Errorf("received %d values, want %d", n, total) + } + if got := bar.Current(); got != int64(total) { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestProxyChannelSizer verifies that Sizer values contribute their byte size. +func TestProxyChannelSizer(t *testing.T) { + items := []sizedItem{{5}, {6}, {11}} + var total int64 + for _, item := range items { + total += item.Size() + } + + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(total, mpb.NopStyle()) + + out := mpb.ProxyChannel(bar, sendAndClose(items), 0) + drainAll(out) + p.Wait() + + if got := bar.Current(); got != total { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestEwmaProxyChannel verifies EWMA decorator path increments correctly. +func TestEwmaProxyChannel(t *testing.T) { + const total = 4 + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(int64(total), + mpb.NopStyle(), + mpb.AppendDecorators(decor.EwmaETA(decor.ET_STYLE_GO, 30)), + ) + + items := make([]countItem, total) + out := mpb.ProxyChannel(bar, sendAndClose(items), 0) + drainAll(out) + p.Wait() + + if got := bar.Current(); got != int64(total) { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestEwmaProxyChannelSizer verifies EWMA + Sizer path increments by byte size. +func TestEwmaProxyChannelSizer(t *testing.T) { + items := []sizedItem{{3}, {7}} + var total int64 + for _, item := range items { + total += item.Size() + } + + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(total, + mpb.NopStyle(), + mpb.AppendDecorators(decor.EwmaETA(decor.ET_STYLE_GO, 30)), + ) + + out := mpb.ProxyChannel(bar, sendAndClose(items), 0) + drainAll(out) + p.Wait() + + if got := bar.Current(); got != total { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestProxyChannelOrdering verifies that values arrive in the same order they +// were sent. +func TestProxyChannelOrdering(t *testing.T) { + const total = 10 + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(int64(total), mpb.NopStyle()) + + items := make([]countItem, total) + for i := range items { + items[i] = countItem{i} + } + + out := mpb.ProxyChannel(bar, sendAndClose(items), 0) + var idx int + for v := range out { + item := v.(countItem) + if item.id != idx { + t.Errorf("index %d: got id %d, want %d", idx, item.id, idx) + } + idx++ + } + p.Wait() + + if idx != total { + t.Errorf("received %d values, want %d", idx, total) + } +} + +// TestProxyChannelUpdateInterval verifies that a large updateInterval still +// produces the correct final count after ch is closed (the end-of-stream flush). +func TestProxyChannelUpdateInterval(t *testing.T) { + const total = 6 + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(int64(total), mpb.NopStyle()) + + items := make([]countItem, total) + // huge interval so every value is a fast-pass — only the close flush counts + out := mpb.ProxyChannel(bar, sendAndClose(items), time.Hour) + drainAll(out) + p.Wait() + + if got := bar.Current(); got != int64(total) { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestProxyChannelUpdateIntervalSizer same as above but with Sizer values. +func TestProxyChannelUpdateIntervalSizer(t *testing.T) { + items := []sizedItem{{4}, {8}, {16}} + var total int64 + for _, item := range items { + total += item.Size() + } + + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(total, mpb.NopStyle()) + + out := mpb.ProxyChannel(bar, sendAndClose(items), time.Hour) + drainAll(out) + p.Wait() + + if got := bar.Current(); got != total { + t.Errorf("bar.Current() = %d, want %d", got, total) + } +} + +// TestProxyChannelOutputClosed verifies that the output channel is closed when +// the input channel is closed. +func TestProxyChannelOutputClosed(t *testing.T) { + p := mpb.New(mpb.WithOutput(io.Discard)) + bar := p.New(3, mpb.NopStyle()) + + in := make(chan countItem, 3) + in <- countItem{0} + in <- countItem{1} + in <- countItem{2} + close(in) + + out := mpb.ProxyChannel(bar, in, 0) + + done := make(chan struct{}) + go func() { + defer close(done) + drainAll(out) + }() + + select { + case <-done: + // output channel was closed as expected + case <-time.After(timeout): + t.Fatal("output channel was not closed within timeout") + } + p.Wait() +}