Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions _examples/proxyChannel/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/vbauerster/mpb/_examples/proxyChannel

go 1.25.0

require github.com/vbauerster/mpb/v8 v8.12.0

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/clipperhouse/uax29/v2 v2.7.0 // indirect
github.com/mattn/go-runewidth v0.0.23 // indirect
golang.org/x/sys v0.43.0 // indirect
)

replace github.com/vbauerster/mpb/v8 => ../..
132 changes: 132 additions & 0 deletions _examples/proxyChannel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"fmt"
"math/rand"
"sync"
"time"

"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"
)

// chunk represents a piece of data flowing through a pipeline.
// It implements mpb.Sizer so ProxyChannel tracks bytes rather than items.
type chunk struct {
id int
data []byte
}

func (c chunk) Size() int64 { return int64(len(c.data)) }

// job is a plain work item with no size. ProxyChannel counts each as 1.
type job struct{ id int }

func main() {
const (
numChunks = 50
chunkMaxSize = 128 * 1024 // 128 KiB
numJobs = 30
updateRate = 500 * time.Millisecond
)

rng := rand.New(rand.NewSource(time.Now().UnixNano()))

// ── bar 1: chunks with byte-level tracking (Sizer) + EWMA speed/ETA ──────

// Build chunk sizes upfront so we know the exact total for the bar.
chunkSizes := make([]int, numChunks)
var totalBytes int64
for i := range chunkSizes {
chunkSizes[i] = rng.Intn(chunkMaxSize) + 1
totalBytes += int64(chunkSizes[i])
}

chunkCh := make(chan chunk, 8)
go func() {
localRng := rand.New(rand.NewSource(time.Now().UnixNano() + 1))
for i, sz := range chunkSizes {
chunkCh <- chunk{id: i, data: make([]byte, sz)}
time.Sleep(time.Duration(localRng.Intn(40)+10) * time.Millisecond)
}
close(chunkCh)
}()

// ── bar 2: plain job items (no Sizer, each counts as 1) ──────────────────

jobCh := make(chan job, 4)
go func() {
localRng := rand.New(rand.NewSource(time.Now().UnixNano() + 2))
for i := range numJobs {
jobCh <- job{id: i}
time.Sleep(time.Duration(localRng.Intn(80)+20) * time.Millisecond)
}
close(jobCh)
}()

// ── progress container ────────────────────────────────────────────────────

var wg sync.WaitGroup
p := mpb.New(mpb.WithWaitGroup(&wg), mpb.WithWidth(72))

// Byte-counting bar: uses Sizer path of ProxyChannel + EWMA decorators.
byteBar := p.New(totalBytes,
mpb.BarStyle(),
mpb.PrependDecorators(
decor.Name("chunks ", decor.WC{C: decor.DindentRight}),
),
mpb.AppendDecorators(

decor.Counters(decor.SizeB1024(0), "% .1f / % .1f", decor.WCSyncSpaceR),
decor.OnComplete(
decor.EwmaSpeed(decor.SizeB1024(0), "% .1f", 30, decor.WCSyncSpaceR), "",
),
decor.OnComplete(
decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncWidth), "done",
),
),
)

// Item-counting bar: uses plain increment path of ProxyChannel.
jobBar := p.New(int64(numJobs),
mpb.BarStyle(),
mpb.PrependDecorators(
decor.Name("jobs ", decor.WC{C: decor.DindentRight}),
),
mpb.AppendDecorators(
decor.Counters(0, "%d / %d", decor.WCSyncSpaceR),
decor.Percentage(decor.WCSyncSpaceR),
decor.OnComplete(
decor.AverageETA(decor.ET_STYLE_GO, decor.WCSyncWidth), "done",
),
),
)

// ── consumers ─────────────────────────────────────────────────────────────

wg.Add(2)

// Consume byte chunks through the proxy; bar updates are batched by updateRate.
go func() {
defer wg.Done()
out := mpb.ProxyChannel(byteBar, chunkCh, updateRate)
for v := range out {
c := v.(chunk)
_ = c // real code would process c.data here
}
}()

// Consume jobs through the proxy; each job counts as 1 (no Sizer).
go func() {
defer wg.Done()
out := mpb.ProxyChannel(jobBar, jobCh, updateRate)
for v := range out {
j := v.(job)
_ = j // real code would handle the job here
}
}()

p.Wait()

fmt.Println("all done.")
}
87 changes: 87 additions & 0 deletions proxychannel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mpb

import "time"

// Sizer is implemented by values passed through [ProxyChannel] that can
// report their size in bytes. When T implements Sizer, the bar is incremented
// by T.Size() bytes per value; values that do not implement Sizer count as 1.
type Sizer interface {
Size() int64
}

// ProxyChannel wraps ch with bar progress tracking. A goroutine is started that
// reads from ch, increments b, and forwards each value to the returned channel.
//
// Bar updates are batched: the accumulated count is flushed to b only after
// updateInterval has elapsed since the last flush. Values are always forwarded
// to the output channel immediately so the consumer is never held back by the
// update throttle. Any remaining accumulated count is flushed when ch is closed.
//
// T values that implement [Sizer] contribute their byte size to the bar;
// all other values count as 1.
//
// The output channel has the same capacity as ch and is closed when ch is
// closed or b's context is cancelled. Returns nil if b is already done.
func ProxyChannel[T any](b *Bar, ch <-chan T, updateInterval time.Duration) <-chan any {
result := make(chan bool, 1)
select {
case b.operateState <- func(s *bState) { result <- len(s.ewmaDecorators) != 0 }:
hasEwma := <-result
out := make(chan any, cap(ch))
go runProxyChannel(b, ch, out, updateInterval, hasEwma)
return out
case <-b.ctx.Done():
return nil
}
}

func runProxyChannel[T any](b *Bar, in <-chan T, out chan<- any, updateInterval time.Duration, hasEwma bool) {
defer close(out)

var pending int64
now := time.Now()
lastFlush := now
batchStart := now

flush := func() {
if pending == 0 {
return
}
if hasEwma {
b.EwmaIncrInt64(pending, time.Since(batchStart))
} else {
b.IncrInt64(pending)
}
pending = 0
now := time.Now()
lastFlush = now
batchStart = now
}

for {
select {
case v, ok := <-in:
if !ok {
// ch closed: flush remainder then close out
flush()
return
}
if s, isSizer := any(v).(Sizer); isSizer {
pending += s.Size()
} else {
pending++
}
if time.Since(lastFlush) >= updateInterval {
flush()
}
// fast-pass: forward immediately regardless of bar update
select {
case out <- v:
case <-b.ctx.Done():
return
}
case <-b.ctx.Done():
return
}
}
}
Loading