From 62ab4a1f7717bcca8dc9099421803fa1e2dea9f0 Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Mon, 13 Apr 2026 18:02:33 -0700 Subject: [PATCH 1/8] Add time-windowed tdigest implementation --- common/stats/windowed_tdigest.go | 256 ++++++++++++++++++++++++++ common/stats/windowed_tdigest_test.go | 140 ++++++++++++++ go.mod | 1 + go.sum | 2 + 4 files changed, 399 insertions(+) create mode 100644 common/stats/windowed_tdigest.go create mode 100644 common/stats/windowed_tdigest_test.go diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go new file mode 100644 index 00000000000..caabe5b7e7f --- /dev/null +++ b/common/stats/windowed_tdigest.go @@ -0,0 +1,256 @@ +package stats + +import ( + "errors" + "sync" + "time" + + "github.com/caio/go-tdigest/v5" +) + +type ( + // TimeWindowedStats collects values into fixed-duration time windows, + // each backed by a t-digest, enabling approximate quantile queries + // over a sliding time range. + // Windows are guaranteed to be non-overlapping. + TimeWindowedStats interface { + // Record records a value that occurred at the given timestamp. + Record(value float64, timestamp time.Time) + // RecordToLatestWindow records a value in the current time window. Faster than Record. + // Warning: This is lower-accuracy than Record, lock-access will + // bias the recording time later than the call-time + RecordToLatestWindow(value float64) + RecordMulti(value float64, timestamp time.Time, count uint64) + RecordMultiToLatestWindow(value float64, count uint64) + // Quantile returns the approximate value at quantile q (0 <= q <= 1) + // across all active windows. + Quantile(q float64) float64 + // TrimmedMean returns the approximate mean value within the given + // quantile range (0 <= q <= 1) across all active windows. + TrimmedMean(lowerQuantile, upperQuantile float64) float64 + SubWindowForTime(instant time.Time) TimeWindowView + } + + TimeWindowView interface { + Quantile(q float64) float64 + TrimmedMean(lowerQuantile, upperQuantile float64) float64 + Start() time.Time + End() time.Time + } + + // WindowConfig controls the windowing behavior. + WindowConfig struct { + // windowSize is the duration of each window. + WindowSize time.Duration + // windowCount is the maximum number of windows to retain. + WindowCount int + // FillBlankIntervals controls whether gaps in the windowing should be preserved. + // This matters when windowSize is shorter than the largest gap in the measured data + // and individual time-windows are being fetched via TimeWindowedStats.SubWindowForTime. + FillBlankIntervals bool + } + + timeWindowedTDigest struct { + mu sync.Mutex + windows []timedWindow // ring buffer, pre-allocated to windowCount + head int // index of the newest window + cfg WindowConfig + } + + timedWindow struct { + tdigest *tdigest.TDigest + start time.Time + end time.Time + } +) + +// NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests. +func NewWindowedTDigest(cfg WindowConfig) TimeWindowedStats { + if cfg.WindowCount <= 0 { + panic("windowCount must be non-negative") + } + if cfg.WindowSize.Milliseconds() <= 50 { + panic("probable misconfiguration detected: windowSize is too small, consider increasing it to at least 50ms") + } + return &timeWindowedTDigest{ + windows: make([]timedWindow, cfg.WindowCount), + cfg: cfg, + // mu and head both empty + } +} + +func (w *timeWindowedTDigest) Record(value float64, timestamp time.Time) { + w.RecordMulti(value, timestamp, 1) +} + +func (w *timeWindowedTDigest) RecordMulti(value float64, timestamp time.Time, count uint64) { + window, err := w.getOrCreateWindow(timestamp) + if err != nil { + // Drop data for invalid timestamps + return + } + _ = window.tdigest.AddWeighted(value, count) +} + +func (w *timeWindowedTDigest) RecordToLatestWindow(value float64) { + w.RecordMultiToLatestWindow(value, 1) +} + +func (w *timeWindowedTDigest) RecordMultiToLatestWindow(value float64, count uint64) { + window, err := w.getOrCreateWindow(time.Time{}) + if err != nil { + return + } + _ = window.tdigest.AddWeighted(value, count) +} + +func (w *timeWindowedTDigest) SubWindowForTime(instant time.Time) TimeWindowView { + w.mu.Lock() + defer w.mu.Unlock() + window, _ := w.searchWindowsBackwards(instant) + return window +} + +func (w *timeWindowedTDigest) Quantile(q float64) float64 { + return w.getMergedWindows().Quantile(q) +} + +func (w *timeWindowedTDigest) TrimmedMean(lowerQuantile, upperQuantile float64) float64 { + return w.getMergedWindows().TrimmedMean(lowerQuantile, upperQuantile) +} + +// TODO: this is expensive, maybe cache everything but the latest window so we can skip N merges? +func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest { + windows := w.cloneWindows() + var merged *tdigest.TDigest + for idx := 0; idx < len(windows); idx++ { + if windows[idx].tdigest != nil { + if merged == nil { + merged = windows[idx].tdigest + } else { + _ = merged.Merge(windows[idx].tdigest) + } + } + } + return merged +} + +// cloneWindows creates a shallow copy of the ring buffer. +// Use this to avoid holding the lock while accessing the windows for aggregated queries. +func (w *timeWindowedTDigest) cloneWindows() []timedWindow { + w.mu.Lock() + defer w.mu.Unlock() + windows := make([]timedWindow, len(w.windows)) + copy(windows, w.windows) + return windows +} + +// getOrCreateWindow returns the window containing the given timestamp, creating a new one if necessary. +// 0 is a valid timestamp and will return the most recent window. +func (w *timeWindowedTDigest) getOrCreateWindow(timestamp time.Time) (*timedWindow, error) { + w.mu.Lock() + defer w.mu.Unlock() + if timestamp.IsZero() { + timestamp = time.Now() + } + candidate, err := w.searchWindowsBackwards(timestamp) + if err == nil { + return candidate, nil + } + if errors.Is(err, errTooNew) { + return w.advanceWindow(timestamp), nil + } + // err is errTooOld or errInGap + return nil, err +} + +var errTooOld = errors.New("time was older than the earliest window") +var errTooNew = errors.New("time was newer than the latest window") +var errInGap = errors.New("time was in a gap between windows") + +// Precondition: w.mu is held. +// Returns the window containing the given timestamp, or error if no window exists. +func (w *timeWindowedTDigest) searchWindowsBackwards(timestamp time.Time) (*timedWindow, error) { + latest := w.windows[w.head] + if !timestamp.Before(latest.start) { + // If the requested timestamp is after the latest window, no point in searching + if !timestamp.Before(latest.end) { + return nil, errTooNew + } + return &latest, nil + } + for idx := w.modDec(w.head); idx != w.head; idx = w.modDec(idx) { + candidate := w.windows[idx] + // Window start is inclusive, end is exclusive. We're iterating + // backwards in time, so the first window that matches is the one we want. + if !timestamp.Before(candidate.start) { + // The first window that matches might be too short to include this timestamp. + // Make sure the timestamp is actually in the window. + if !timestamp.Before(w.windows[idx].end) { + return nil, errInGap + } + return &candidate, nil + } + } + // All the windows are newer than the requested timestamp. + return nil, errTooOld +} + +// Precondition: w.mu is held. +func (w *timeWindowedTDigest) advanceWindow(timestamp time.Time) *timedWindow { + if w.cfg.FillBlankIntervals { + // Fill in all the intervening blank windows. + // The empty digests will not affect the stats, but they will drop old windows. + lastWindow := &w.windows[w.head] + if lastWindow.tdigest == nil { + // Special-case, the latest window is uninitialized. Just take it. + return w.advanceWindowSimple(timestamp) + } + for { + lastWindow = w.advanceWindowSimple(lastWindow.end) + // Need a do-while here to enforce that lastWindow contains timestamp + if timestamp.Before(lastWindow.end) { + break + } + } + return lastWindow + } + return w.advanceWindowSimple(timestamp) +} + +func (w *timeWindowedTDigest) advanceWindowSimple(start time.Time) *timedWindow { + w.head = w.modInc(w.head) + digest, _ := tdigest.New() + window := timedWindow{ + tdigest: digest, + start: start, + end: start.Add(w.cfg.WindowSize), + } + w.windows[w.head] = window + return &window +} + +// modulo-increment, for wrapping around the ring buffer +func (w *timeWindowedTDigest) modInc(idx int) int { + return (idx + 1) % w.cfg.WindowCount +} + +// modulo-decrement, for wrapping around the ring buffer. +func (w *timeWindowedTDigest) modDec(idx int) int { + return (idx - 1 + w.cfg.WindowCount) % w.cfg.WindowCount +} + +func (w *timedWindow) Start() time.Time { + return w.start +} +func (w *timedWindow) End() time.Time { + return w.end +} + +func (w *timedWindow) Quantile(q float64) float64 { + return w.tdigest.Quantile(q) +} + +func (w *timedWindow) TrimmedMean(lowerQuantile, upperQuantile float64) float64 { + return w.tdigest.TrimmedMean(lowerQuantile, upperQuantile) +} diff --git a/common/stats/windowed_tdigest_test.go b/common/stats/windowed_tdigest_test.go new file mode 100644 index 00000000000..53327a5c95a --- /dev/null +++ b/common/stats/windowed_tdigest_test.go @@ -0,0 +1,140 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func n(seconds int) time.Time { + return time.Unix(int64(seconds), 0) +} + +type ( + RecordingValue struct { + Value float64 + Time time.Time + Count uint64 + } + TimeWindowedStatsTestCase struct { + Name string + WindowConfig WindowConfig + Expectations []TestExpectation + RecordingValues []RecordingValue + } + TestExpectation func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) +) + +// TestWindowConfig is the default window configuration used in tests. +// It plays nicely with the "n" function above. +var TestWindowConfig = WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 10, +} +var TestWindowConfigBlanks = WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 10, + FillBlankIntervals: true, +} + +func computeSimpleAverage(values []RecordingValue) map[int]RecordingValue { + averages := make(map[int]RecordingValue) + for _, value := range values { + existing := averages[value.Time.Second()] + averages[value.Time.Second()] = RecordingValue{ + existing.Value + value.Value, + existing.Time, + existing.Count + value.Count, + } + } + return averages +} + +func countNonEmptyWindows(stats *timeWindowedTDigest) int { + count := 0 + for _, bucket := range stats.windows { + if bucket.tdigest != nil { + count++ + } + } + return count +} + +func incrementingData(start time.Time, count int) []RecordingValue { + RecordingValues := make([]RecordingValue, count) + for i := 0; i < count; i++ { + RecordingValues[i] = RecordingValue{ + Value: float64(i), + Time: start.Add(time.Duration(i) * time.Second), + Count: 1, + } + } + return RecordingValues +} + +func TestWindowedDigest(t *testing.T) { + // Use when unique windows are less than the max window count + simpleExpectations := []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + averages := computeSimpleAverage(tc.RecordingValues) + require.Equal(t, len(averages), countNonEmptyWindows(stats)) + for bucket, value := range averages { + window := stats.SubWindowForTime(n(bucket)) + require.InDelta(t, value.Value/float64(value.Count), window.TrimmedMean(0, 1.0), 0.01) + } + }, + } + + testCases := []TimeWindowedStatsTestCase{ + {"empty", TestWindowConfig, simpleExpectations, nil}, + {"single-value", TestWindowConfig, simpleExpectations, []RecordingValue{ + {float64(10), n(1), 1}, + }}, + {"full-buckets", TestWindowConfig, simpleExpectations, + incrementingData(n(1), 10), + }, + {"overflow", TestWindowConfig, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 10, countNonEmptyWindows(stats)) + // datapoints will be 10-19, giving us max=19, min=10, avg=14.5 + require.InDelta(t, 14.5, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 19, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) + }, + }, incrementingData(n(1), 20)}, + {"blank-drops-old-data", TestWindowConfigBlanks, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 10, countNonEmptyWindows(stats)) + // avg=max=min because the old datapoint expired + require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(0.0), 0.01) + }}, []RecordingValue{ + {float64(10), n(1), 1}, + {float64(20), n(11), 1}, + }}, + {"fill-blanks-simple", TestWindowConfigBlanks, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 5, countNonEmptyWindows(stats)) + require.InDelta(t, 15, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) + }}, []RecordingValue{ + {float64(10), n(1), 1}, + {float64(20), n(5), 1}, + }}, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + stats := NewWindowedTDigest(tc.WindowConfig) + for _, value := range tc.RecordingValues { + stats.RecordMulti(value.Value, value.Time, value.Count) + } + for _, expectation := range tc.Expectations { + expectation(t, tc, stats.(*timeWindowedTDigest)) + } + }) + } +} diff --git a/go.mod b/go.mod index a8031c011ee..9d9427390c5 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/aws/aws-sdk-go v1.55.8 github.com/blang/semver/v4 v4.0.0 github.com/cactus/go-statsd-client/v5 v5.1.0 + github.com/caio/go-tdigest/v5 v5.0.0 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/emirpasic/gods v1.18.1 github.com/fatih/color v1.18.0 diff --git a/go.sum b/go.sum index fa7633c649a..b72a150e266 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go. github.com/cactus/go-statsd-client/v4 v4.0.0/go.mod h1:m73kwJp6TN0Ja9P6ycdZhWM1MlfxY/95WZ//IptPQ+Y= github.com/cactus/go-statsd-client/v5 v5.1.0 h1:sbbdfIl9PgisjEoXzvXI1lwUKWElngsjJKaZeC021P4= github.com/cactus/go-statsd-client/v5 v5.1.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY= +github.com/caio/go-tdigest/v5 v5.0.0 h1:XQKgYSazZPbWFDAJ51dKqoZoDrISmTrB8UcWwCmfo6Y= +github.com/caio/go-tdigest/v5 v5.0.0/go.mod h1:wI618wZoAYzIDZlpX2CfyTQdrdGtwEZOJuXdrI3zk/Y= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= From f19bf1c55658c3ac3dcca72c287076bc71335210 Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Mon, 13 Apr 2026 18:07:53 -0700 Subject: [PATCH 2/8] make fmt --- common/stats/windowed_tdigest.go | 2 +- common/stats/windowed_tdigest_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go index caabe5b7e7f..1fc3f6fbe68 100644 --- a/common/stats/windowed_tdigest.go +++ b/common/stats/windowed_tdigest.go @@ -123,7 +123,7 @@ func (w *timeWindowedTDigest) TrimmedMean(lowerQuantile, upperQuantile float64) func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest { windows := w.cloneWindows() var merged *tdigest.TDigest - for idx := 0; idx < len(windows); idx++ { + for idx := range windows { if windows[idx].tdigest != nil { if merged == nil { merged = windows[idx].tdigest diff --git a/common/stats/windowed_tdigest_test.go b/common/stats/windowed_tdigest_test.go index 53327a5c95a..d684248d797 100644 --- a/common/stats/windowed_tdigest_test.go +++ b/common/stats/windowed_tdigest_test.go @@ -63,7 +63,7 @@ func countNonEmptyWindows(stats *timeWindowedTDigest) int { func incrementingData(start time.Time, count int) []RecordingValue { RecordingValues := make([]RecordingValue, count) - for i := 0; i < count; i++ { + for i := range count { RecordingValues[i] = RecordingValue{ Value: float64(i), Time: start.Add(time.Duration(i) * time.Second), From 7cbc4892085b89a64460ed079c4e425940c84f21 Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Tue, 14 Apr 2026 09:44:16 -0700 Subject: [PATCH 3/8] add/fix tests, go mod tidy --- common/stats/windowed_tdigest.go | 16 ++--- common/stats/windowed_tdigest_test.go | 95 +++++++++++++++++++++++++++ go.sum | 2 + 3 files changed, 103 insertions(+), 10 deletions(-) diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go index 1fc3f6fbe68..8d244d89aee 100644 --- a/common/stats/windowed_tdigest.go +++ b/common/stats/windowed_tdigest.go @@ -65,18 +65,18 @@ type ( ) // NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests. -func NewWindowedTDigest(cfg WindowConfig) TimeWindowedStats { +func NewWindowedTDigest(cfg WindowConfig) (TimeWindowedStats, error) { if cfg.WindowCount <= 0 { - panic("windowCount must be non-negative") + return nil, errors.New("windowCount must be non-negative") } if cfg.WindowSize.Milliseconds() <= 50 { - panic("probable misconfiguration detected: windowSize is too small, consider increasing it to at least 50ms") + return nil, errors.New("probable misconfiguration detected: windowSize is too small, consider increasing it to at least 50ms") } return &timeWindowedTDigest{ windows: make([]timedWindow, cfg.WindowCount), cfg: cfg, // mu and head both empty - } + }, nil } func (w *timeWindowedTDigest) Record(value float64, timestamp time.Time) { @@ -122,14 +122,10 @@ func (w *timeWindowedTDigest) TrimmedMean(lowerQuantile, upperQuantile float64) // TODO: this is expensive, maybe cache everything but the latest window so we can skip N merges? func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest { windows := w.cloneWindows() - var merged *tdigest.TDigest + merged, _ := tdigest.New() for idx := range windows { if windows[idx].tdigest != nil { - if merged == nil { - merged = windows[idx].tdigest - } else { - _ = merged.Merge(windows[idx].tdigest) - } + _ = merged.Merge(windows[idx].tdigest) } } return merged diff --git a/common/stats/windowed_tdigest_test.go b/common/stats/windowed_tdigest_test.go index d684248d797..5d8f9db1581 100644 --- a/common/stats/windowed_tdigest_test.go +++ b/common/stats/windowed_tdigest_test.go @@ -138,3 +138,98 @@ func TestWindowedDigest(t *testing.T) { }) } } + +func TestWindowedDigest_OldTimestampDropped(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig) + // Record at t=5, then try to record at t=1 which is before the earliest window. + stats.Record(100, n(5)) + stats.Record(999, n(1)) + td := stats.(*timeWindowedTDigest) + require.Equal(t, 1, countNonEmptyWindows(td)) + require.InDelta(t, 100, stats.Quantile(0.5), 0.01) +} + +func TestWindowedDigest_GapTimestampDropped(t *testing.T) { + // Without FillBlankIntervals, a gap between windows causes the value to be dropped. + stats := NewWindowedTDigest(TestWindowConfig) + // Create window at t=1 (covers [1,2)), then advance to t=5 (covers [5,6)). + // t=3 falls in the gap [2,5) with no window. + stats.Record(10, n(1)) + stats.Record(50, n(5)) + stats.Record(999, n(3)) // should be dropped (in gap) + td := stats.(*timeWindowedTDigest) + require.Equal(t, 2, countNonEmptyWindows(td)) + require.InDelta(t, 30, stats.TrimmedMean(0, 1.0), 0.01) +} + +func TestWindowedDigest_WindowBoundaryInclusiveExclusive(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig) + // Record at t=1, creating window [1s, 2s). + stats.Record(10, n(1)) + td := stats.(*timeWindowedTDigest) + + // t=1 (start) is inclusive — should find the window. + w := td.SubWindowForTime(n(1)) + require.NotNil(t, w) + require.InDelta(t, 10, w.Quantile(0.5), 0.01) + + // t=2 (end) is exclusive — should NOT find the window. + w2 := td.SubWindowForTime(n(2)) + require.Nil(t, w2) +} + +func TestWindowedDigest_SubWindowForTimeMissing(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig).(*timeWindowedTDigest) + // No data recorded yet — all windows are uninitialized. + w := stats.SubWindowForTime(n(5)) + require.Nil(t, w) +} + +func TestWindowedDigest_RecordMultiWeighted(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig) + // RecordMulti with count=5 should weight the value heavily. + stats.RecordMulti(10, n(1), 5) + stats.RecordMulti(20, n(1), 1) + // Weighted mean: (10*5 + 20*1) / 6 = 11.67 + require.InDelta(t, 11.67, stats.TrimmedMean(0, 1.0), 0.5) +} + +func TestWindowedDigest_MultipleValuesInSameWindow(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig) + // All values land in the same 1-second window [1s, 2s). + halfwidth := 500 * time.Millisecond + stats.Record(10, n(1)) + stats.Record(20, n(1).Add(halfwidth)) + stats.Record(30, n(1).Add(halfwidth)) + td := stats.(*timeWindowedTDigest) + require.Equal(t, 1, countNonEmptyWindows(td)) + require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01) // (10+20+30)/3 + require.InDelta(t, 30, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) +} + +func TestWindowedDigest_RingBufferWrapPreservesNewest(t *testing.T) { + cfg := WindowConfig{WindowSize: 1 * time.Second, WindowCount: 3} + stats := NewWindowedTDigest(cfg) + // Insert 5 values into 5 different windows; ring buffer holds 3. + for i := 1; i <= 5; i++ { + stats.Record(float64(i*10), n(i)) + } + td := stats.(*timeWindowedTDigest) + require.Equal(t, 3, countNonEmptyWindows(td)) + // Windows 3, 4, 5 should survive (values 30, 40, 50). + require.InDelta(t, 50, stats.Quantile(1.0), 0.01) + require.InDelta(t, 30, stats.Quantile(0.0), 0.01) + require.InDelta(t, 40, stats.TrimmedMean(0, 1.0), 0.01) +} + +func TestWindowedDigest_RecordToLatestWindow(t *testing.T) { + stats := NewWindowedTDigest(TestWindowConfig) + // RecordToLatestWindow uses time.Now(), so it should create a window. + stats.RecordToLatestWindow(42) + stats.RecordMultiToLatestWindow(100, 3) + td := stats.(*timeWindowedTDigest) + require.Equal(t, 1, countNonEmptyWindows(td)) + // Weighted mean: (42*1 + 100*3) / 4 = 85.5 + require.InDelta(t, 85.5, stats.TrimmedMean(0, 1.0), 0.01) +} diff --git a/go.sum b/go.sum index b72a150e266..41b859e8021 100644 --- a/go.sum +++ b/go.sum @@ -263,6 +263,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= From 06b26dc50f6d857a8605cb4b6626c3fae3ae582b Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Tue, 14 Apr 2026 09:54:07 -0700 Subject: [PATCH 4/8] oops, add casts to concrete type back --- common/stats/windowed_tdigest_test.go | 40 +++++++++++++-------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/common/stats/windowed_tdigest_test.go b/common/stats/windowed_tdigest_test.go index 5d8f9db1581..b359d9269bd 100644 --- a/common/stats/windowed_tdigest_test.go +++ b/common/stats/windowed_tdigest_test.go @@ -128,7 +128,7 @@ func TestWindowedDigest(t *testing.T) { for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { - stats := NewWindowedTDigest(tc.WindowConfig) + stats, _ := NewWindowedTDigest(tc.WindowConfig) for _, value := range tc.RecordingValues { stats.RecordMulti(value.Value, value.Time, value.Count) } @@ -140,33 +140,33 @@ func TestWindowedDigest(t *testing.T) { } func TestWindowedDigest_OldTimestampDropped(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // Record at t=5, then try to record at t=1 which is before the earliest window. stats.Record(100, n(5)) stats.Record(999, n(1)) - td := stats.(*timeWindowedTDigest) - require.Equal(t, 1, countNonEmptyWindows(td)) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) require.InDelta(t, 100, stats.Quantile(0.5), 0.01) } func TestWindowedDigest_GapTimestampDropped(t *testing.T) { // Without FillBlankIntervals, a gap between windows causes the value to be dropped. - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // Create window at t=1 (covers [1,2)), then advance to t=5 (covers [5,6)). // t=3 falls in the gap [2,5) with no window. stats.Record(10, n(1)) stats.Record(50, n(5)) stats.Record(999, n(3)) // should be dropped (in gap) - td := stats.(*timeWindowedTDigest) - require.Equal(t, 2, countNonEmptyWindows(td)) + td := stats + require.Equal(t, 2, countNonEmptyWindows(td.(*timeWindowedTDigest))) require.InDelta(t, 30, stats.TrimmedMean(0, 1.0), 0.01) } func TestWindowedDigest_WindowBoundaryInclusiveExclusive(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // Record at t=1, creating window [1s, 2s). stats.Record(10, n(1)) - td := stats.(*timeWindowedTDigest) + td := stats // t=1 (start) is inclusive — should find the window. w := td.SubWindowForTime(n(1)) @@ -179,14 +179,14 @@ func TestWindowedDigest_WindowBoundaryInclusiveExclusive(t *testing.T) { } func TestWindowedDigest_SubWindowForTimeMissing(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig).(*timeWindowedTDigest) + stats, _ := NewWindowedTDigest(TestWindowConfig) // No data recorded yet — all windows are uninitialized. w := stats.SubWindowForTime(n(5)) require.Nil(t, w) } func TestWindowedDigest_RecordMultiWeighted(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // RecordMulti with count=5 should weight the value heavily. stats.RecordMulti(10, n(1), 5) stats.RecordMulti(20, n(1), 1) @@ -195,14 +195,14 @@ func TestWindowedDigest_RecordMultiWeighted(t *testing.T) { } func TestWindowedDigest_MultipleValuesInSameWindow(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // All values land in the same 1-second window [1s, 2s). halfwidth := 500 * time.Millisecond stats.Record(10, n(1)) stats.Record(20, n(1).Add(halfwidth)) stats.Record(30, n(1).Add(halfwidth)) - td := stats.(*timeWindowedTDigest) - require.Equal(t, 1, countNonEmptyWindows(td)) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01) // (10+20+30)/3 require.InDelta(t, 30, stats.Quantile(1.0), 0.01) require.InDelta(t, 10, stats.Quantile(0.0), 0.01) @@ -210,13 +210,13 @@ func TestWindowedDigest_MultipleValuesInSameWindow(t *testing.T) { func TestWindowedDigest_RingBufferWrapPreservesNewest(t *testing.T) { cfg := WindowConfig{WindowSize: 1 * time.Second, WindowCount: 3} - stats := NewWindowedTDigest(cfg) + stats, _ := NewWindowedTDigest(cfg) // Insert 5 values into 5 different windows; ring buffer holds 3. for i := 1; i <= 5; i++ { stats.Record(float64(i*10), n(i)) } - td := stats.(*timeWindowedTDigest) - require.Equal(t, 3, countNonEmptyWindows(td)) + td := stats + require.Equal(t, 3, countNonEmptyWindows(td.(*timeWindowedTDigest))) // Windows 3, 4, 5 should survive (values 30, 40, 50). require.InDelta(t, 50, stats.Quantile(1.0), 0.01) require.InDelta(t, 30, stats.Quantile(0.0), 0.01) @@ -224,12 +224,12 @@ func TestWindowedDigest_RingBufferWrapPreservesNewest(t *testing.T) { } func TestWindowedDigest_RecordToLatestWindow(t *testing.T) { - stats := NewWindowedTDigest(TestWindowConfig) + stats, _ := NewWindowedTDigest(TestWindowConfig) // RecordToLatestWindow uses time.Now(), so it should create a window. stats.RecordToLatestWindow(42) stats.RecordMultiToLatestWindow(100, 3) - td := stats.(*timeWindowedTDigest) - require.Equal(t, 1, countNonEmptyWindows(td)) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) // Weighted mean: (42*1 + 100*3) / 4 = 85.5 require.InDelta(t, 85.5, stats.TrimmedMean(0, 1.0), 0.01) } From 09a3dd0af85d3d8140824658256f014f23a024ea Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Tue, 14 Apr 2026 17:20:31 -0700 Subject: [PATCH 5/8] Add benchmarks and comments --- common/stats/windowed_tdigest.go | 34 ++++- .../windowed_tdigest_performance_test.go | 132 ++++++++++++++++++ 2 files changed, 159 insertions(+), 7 deletions(-) create mode 100644 common/stats/windowed_tdigest_performance_test.go diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go index 8d244d89aee..f2eee95e558 100644 --- a/common/stats/windowed_tdigest.go +++ b/common/stats/windowed_tdigest.go @@ -16,23 +16,37 @@ type ( TimeWindowedStats interface { // Record records a value that occurred at the given timestamp. Record(value float64, timestamp time.Time) - // RecordToLatestWindow records a value in the current time window. Faster than Record. - // Warning: This is lower-accuracy than Record, lock-access will - // bias the recording time later than the call-time + // RecordToLatestWindow records a value in the current time window. + // This will call time.Now(), don't call this in a tight loop. RecordToLatestWindow(value float64) + // RecordMulti records a weighted value that occurred at the given timestamp + // with the provided count. Imagine you have 3 counts of 10, and you + // call RecordMulti(20, t, 3). Your new mean will be 15 because you added + // 3 counts of 20. RecordMulti(value float64, timestamp time.Time, count uint64) + // RecordMultiToLatestWindow records a weighted value in the current time window. + // This will call time.Now(), don't call this in a tight loop. RecordMultiToLatestWindow(value float64, count uint64) - // Quantile returns the approximate value at quantile q (0 <= q <= 1) - // across all active windows. + // Quantile returns the approximate value at quantile q (0 <= q <= 1). + // Aggregates across all active windows. Quantile(q float64) float64 - // TrimmedMean returns the approximate mean value within the given - // quantile range (0 <= q <= 1) across all active windows. + // TrimmedMean returns the mean of the values within the given quantile range (0 <= q <= 1). + // For example, TrimmedMean(0.9, 0.99) returns the average of all the values between the + // 90th percentile and the 99th percentile. + // Aggregates across all active windows. TrimmedMean(lowerQuantile, upperQuantile float64) float64 + // SubWindowForTime returns a view of the time window containing the given instant. + // Use this to get detailed information about a specific time window. SubWindowForTime(instant time.Time) TimeWindowView } + // TimeWindowView is a read-only view of a single time window. TimeWindowView interface { + // Quantile returns the approximate value at quantile q (0 <= q <= 1) Quantile(q float64) float64 + // TrimmedMean returns the mean of the values within the given quantile range (0 <= q <= 1). + // For example, TrimmedMean(0.9, 0.99) returns the average of all the values between the + // 90th percentile and the 99th percentile. TrimmedMean(lowerQuantile, upperQuantile float64) float64 Start() time.Time End() time.Time @@ -64,7 +78,13 @@ type ( } ) +// Compile-time assertion that timeWindowedTDigest implements TimeWindowedStats. +var _ TimeWindowedStats = (*timeWindowedTDigest)(nil) + // NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests. +// Some guidance on sizing the windows/counts: Prefer windows containing 100-1000 datapoints. +// T-Digest is somewhat super-linear with datapoint count, and 1000 is a breakpoint with the +// cost of just creating a new window. See BenchmarkInsert in windowed_tdigest_performance_test.go for details. func NewWindowedTDigest(cfg WindowConfig) (TimeWindowedStats, error) { if cfg.WindowCount <= 0 { return nil, errors.New("windowCount must be non-negative") diff --git a/common/stats/windowed_tdigest_performance_test.go b/common/stats/windowed_tdigest_performance_test.go new file mode 100644 index 00000000000..7a5932560eb --- /dev/null +++ b/common/stats/windowed_tdigest_performance_test.go @@ -0,0 +1,132 @@ +package stats + +import ( + "fmt" + "math/rand/v2" + "testing" + "time" +) + +// Uses declarations from windowed_tdigest_test.go +// func n(seconds int) time.Time +// RecordingValue struct { +// Value float64 +// Time time.Time +// Count uint64 +// } + +type UIntGenerator interface { + Uint64() uint64 +} + +// - ms, us, ns +// BenchmarkNewWindow 8,058,719 ns/op +// 8ms to add 1000 new windows each containing a single datapoint. +// For windows sized to more than a few milliseconds, this means window +// creation will not be a bottleneck. +func BenchmarkNewWindow(b *testing.B) { + var i int + for b.Loop() { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 300, + }) + statsImpl := stats.(*timeWindowedTDigest) + for range 1000 { + now := n(i) + statsImpl.RecordMulti(float64(i), now, 52) + i++ + } + } +} + +// - ms, us, ns +// BenchmarkTenMillionRecordQuery/trimmed-mean 99,039,996 ns/op +// BenchmarkTenMillionRecordQuery/quantile 98,738,240 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile 1,509 ns/op +func BenchmarkTenMillionRecordQuery(b *testing.B) { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 300, + }) + datapoints := generateDatapoints(10_000_000, 10_000, rand.NewPCG(7, 42)) + for _, dp := range datapoints { + stats.RecordMulti(dp.Value, dp.Time, dp.Count) + } + b.Run("trimmed-mean", func(b *testing.B) { + for b.Loop() { + stats.TrimmedMean(0, 1.0) + } + }) + b.Run("quantile", func(b *testing.B) { + for b.Loop() { + stats.Quantile(0.9) + } + }) + b.Run("subwindow-quantile", func(b *testing.B) { + for b.Loop() { + stats.SubWindowForTime(datapoints[9_000_000].Time).Quantile(0.9) + } + }) +} + +// How to read this chart: Lower numbers are better. There are two datasets tested, +// 10k and 10M. Observe that increasing window size for a fixed datapoint count +// has a nonlinear impact on latency. Observe that increasing datapoint count +// for a fixed window size also has a somewhat super-linear impact on latency. +// - s, ms, us, ns +// BenchmarkInsert/datapoints-10,000-window-size-1 81,422,795 ns/op +// BenchmarkInsert/datapoints-10,000-window-size-10 8,321,690 ns/op +// BenchmarkInsert/datapoints-10,000-window-size-100 1,497,160 ns/op +// BenchmarkInsert/datapoints-10,000-window-size-1000 1,697,073 ns/op +// BenchmarkInsert/datapoints-10,000-window-size-10000 4,462,444 ns/op +// BenchmarkInsert/datapoints-10,000-window-size-100000 ns/op +// ------------------------------------------------------------------------------- +// BenchmarkInsert/datapoints-10,000,000-window-size-1 DNF ns/op +// BenchmarkInsert/datapoints-10,000,000-window-size-10 9,668,609,417 ns/op +// BenchmarkInsert/datapoints-10,000,000-window-size-100 1,483,158,625 ns/op +// BenchmarkInsert/datapoints-10,000,000-window-size-1000 1,716,553,958 ns/op +// BenchmarkInsert/datapoints-10,000,000-window-size-10000 4,623,385,375 ns/op +// BenchmarkInsert/datapoints-10,000,000-window-size-100000 5,567,272,666 ns/op +func BenchmarkInsert(b *testing.B) { + slidingBenchmark("datapoints-10,000", b, 10_000, 1, 10_000) + slidingBenchmark("datapoints-10,000,000", b, 10_000_000, 10, 1000) +} + +func generateDatapoints(datapoints int, pointsPerBucket uint64, random UIntGenerator) (output []RecordingValue) { + output = make([]RecordingValue, datapoints) + timeIdx := uint64(1) + for i := range datapoints { + output[i] = RecordingValue{ + Value: float64(i), + Time: n(int(timeIdx)), + Count: random.Uint64() % 100, + } + // Expected-value of pointsPerBucket records per second, with uniform variation + if random.Uint64()%pointsPerBucket == 0 { + timeIdx++ + } + } + return +} + +func slidingBenchmark(baseName string, b *testing.B, datapointCount, startWindowSize, maxWindowSize int) { + if (startWindowSize != 1 && startWindowSize%10 != 0) || maxWindowSize%10 != 0 || + datapointCount%10 != 0 { + b.Skip("startWindowSize, maxWindowSize, datapointCount must be a multiple of 10") + } + datapoints := generateDatapoints(datapointCount, 1, rand.NewPCG(7, 42)) + for windowSize := startWindowSize; windowSize <= maxWindowSize; windowSize *= 10 { + b.Run(fmt.Sprintf("%s-window-size-%d", baseName, windowSize), func(b *testing.B) { + for b.Loop() { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: time.Duration(windowSize) * time.Second, + WindowCount: datapointCount / windowSize, + }) + for _, dp := range datapoints { + stats.RecordMulti(dp.Value, dp.Time, dp.Count) + } + } + }) + } +} From 4bc8a1b915a1402c027a7475fcb4986a35c88c60 Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Wed, 15 Apr 2026 10:24:43 -0700 Subject: [PATCH 6/8] Add allocations to benchmark results --- .../windowed_tdigest_performance_test.go | 98 +++++++++++-------- 1 file changed, 56 insertions(+), 42 deletions(-) diff --git a/common/stats/windowed_tdigest_performance_test.go b/common/stats/windowed_tdigest_performance_test.go index 7a5932560eb..5a774982a09 100644 --- a/common/stats/windowed_tdigest_performance_test.go +++ b/common/stats/windowed_tdigest_performance_test.go @@ -40,57 +40,70 @@ func BenchmarkNewWindow(b *testing.B) { } } -// - ms, us, ns -// BenchmarkTenMillionRecordQuery/trimmed-mean 99,039,996 ns/op -// BenchmarkTenMillionRecordQuery/quantile 98,738,240 ns/op -// BenchmarkTenMillionRecordQuery/subwindow-quantile 1,509 ns/op +// - s, ms, us, ns +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-10 11,655,230,875 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-10 11,743,078,833 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-10 1,795,897 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-100 5,297,755,167 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-100 4,950,984,583 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-100 144,574 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-1000 3,844,660,708 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-1000 3,646,952,708 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-1000 13,883 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-10000 473,836,570 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-10000 473,955,278 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-10000 798,834 ns/op func BenchmarkTenMillionRecordQuery(b *testing.B) { - stats, _ := NewWindowedTDigest(WindowConfig{ - WindowSize: 1 * time.Second, - WindowCount: 300, - }) - datapoints := generateDatapoints(10_000_000, 10_000, rand.NewPCG(7, 42)) - for _, dp := range datapoints { - stats.RecordMulti(dp.Value, dp.Time, dp.Count) - } - b.Run("trimmed-mean", func(b *testing.B) { - for b.Loop() { - stats.TrimmedMean(0, 1.0) - } - }) - b.Run("quantile", func(b *testing.B) { - for b.Loop() { - stats.Quantile(0.9) - } - }) - b.Run("subwindow-quantile", func(b *testing.B) { - for b.Loop() { - stats.SubWindowForTime(datapoints[9_000_000].Time).Quantile(0.9) + for windowSize := 10; windowSize <= 10_000; windowSize *= 10 { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: time.Duration(windowSize) * time.Second, + WindowCount: 10_000_000 / windowSize, + }) + datapoints := generateDatapoints(10_000_000, 1, rand.NewPCG(7, 42)) + for _, dp := range datapoints { + stats.RecordMulti(dp.Value, dp.Time, dp.Count) } - }) + b.Run(fmt.Sprintf("trimmed-mean-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.TrimmedMean(0, 1.0) + } + }) + b.Run(fmt.Sprintf("quantile-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.Quantile(0.9) + } + }) + b.Run(fmt.Sprintf("subwindow-quantile-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.SubWindowForTime(datapoints[9_000_000].Time).Quantile(0.9) + } + }) + } } // How to read this chart: Lower numbers are better. There are two datasets tested, // 10k and 10M. Observe that increasing window size for a fixed datapoint count // has a nonlinear impact on latency. Observe that increasing datapoint count // for a fixed window size also has a somewhat super-linear impact on latency. -// - s, ms, us, ns -// BenchmarkInsert/datapoints-10,000-window-size-1 81,422,795 ns/op -// BenchmarkInsert/datapoints-10,000-window-size-10 8,321,690 ns/op -// BenchmarkInsert/datapoints-10,000-window-size-100 1,497,160 ns/op -// BenchmarkInsert/datapoints-10,000-window-size-1000 1,697,073 ns/op -// BenchmarkInsert/datapoints-10,000-window-size-10000 4,462,444 ns/op -// BenchmarkInsert/datapoints-10,000-window-size-100000 ns/op -// ------------------------------------------------------------------------------- -// BenchmarkInsert/datapoints-10,000,000-window-size-1 DNF ns/op -// BenchmarkInsert/datapoints-10,000,000-window-size-10 9,668,609,417 ns/op -// BenchmarkInsert/datapoints-10,000,000-window-size-100 1,483,158,625 ns/op -// BenchmarkInsert/datapoints-10,000,000-window-size-1000 1,716,553,958 ns/op -// BenchmarkInsert/datapoints-10,000,000-window-size-10000 4,623,385,375 ns/op -// BenchmarkInsert/datapoints-10,000,000-window-size-100000 5,567,272,666 ns/op +// Increasing window rotation by reducing the total window count was tested. +// It did not make a significant (<1%) impact on latency +// - s, ms, us, ns GB, MB, KB, B +// BenchmarkInsert/datapoints-10,000-window-size-1 79,594,851 ns/op 220,978,893 B/op 90,277 allocs/op +// BenchmarkInsert/datapoints-10,000-window-size-10 8,262,293 ns/op 22,684,401 B/op 18,272 allocs/op +// BenchmarkInsert/datapoints-10,000-window-size-100 1,496,739 ns/op 2,852,384 B/op 11,067 allocs/op +// BenchmarkInsert/datapoints-10,000-window-size-1000 1,764,475 ns/op 867,330 B/op 10,344 allocs/op +// BenchmarkInsert/datapoints-10,000-window-size-10000 4,493,649 ns/op 1,161,041 B/op 10,316 allocs/op +// BenchmarkInsert/datapoints-10,000-window-size-100000 +// - s, ms, us, ns GB, MB, KB, B +// BenchmarkInsert/datapoints-10,000,000-window-size-1 +// BenchmarkInsert/datapoints-10,000,000-window-size-10 9,569,462,583 ns/op 22,680,717,288 B/op 18,302,020 allocs/op +// BenchmarkInsert/datapoints-10,000,000-window-size-100 1,888,510,125 ns/op 2,851,910,864 B/op 11,101,984 allocs/op +// BenchmarkInsert/datapoints-10,000,000-window-size-1000 1,782,219,708 ns/op 869,035,776 B/op 10,381,987 allocs/op +// BenchmarkInsert/datapoints-10,000,000-window-size-10000 4,656,263,666 ns/op 1,162,586,672 B/op 10,353,995 allocs/op +// BenchmarkInsert/datapoints-10,000,000-window-size-100000 5,698,841,500 ns/op 1,248,606,488 B/op 10,356,404 allocs/op func BenchmarkInsert(b *testing.B) { slidingBenchmark("datapoints-10,000", b, 10_000, 1, 10_000) - slidingBenchmark("datapoints-10,000,000", b, 10_000_000, 10, 1000) + slidingBenchmark("datapoints-10,000,000", b, 10_000_000, 10, 100_000) } func generateDatapoints(datapoints int, pointsPerBucket uint64, random UIntGenerator) (output []RecordingValue) { @@ -118,10 +131,11 @@ func slidingBenchmark(baseName string, b *testing.B, datapointCount, startWindow datapoints := generateDatapoints(datapointCount, 1, rand.NewPCG(7, 42)) for windowSize := startWindowSize; windowSize <= maxWindowSize; windowSize *= 10 { b.Run(fmt.Sprintf("%s-window-size-%d", baseName, windowSize), func(b *testing.B) { + b.ReportAllocs() for b.Loop() { stats, _ := NewWindowedTDigest(WindowConfig{ WindowSize: time.Duration(windowSize) * time.Second, - WindowCount: datapointCount / windowSize, + WindowCount: datapointCount, }) for _, dp := range datapoints { stats.RecordMulti(dp.Value, dp.Time, dp.Count) From ecd0bb8d4a88ffb42c3de2c73f66e1db0961b41d Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Wed, 15 Apr 2026 15:23:24 -0700 Subject: [PATCH 7/8] Add resident set size estimation to the benchmark --- common/stats/windowed_tdigest.go | 12 ++ .../windowed_tdigest_performance_test.go | 148 +++++++++++++----- 2 files changed, 118 insertions(+), 42 deletions(-) diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go index f2eee95e558..0661745ee42 100644 --- a/common/stats/windowed_tdigest.go +++ b/common/stats/windowed_tdigest.go @@ -151,6 +151,12 @@ func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest { return merged } +func (w *timeWindowedTDigest) pretouchWindowsForTest(start time.Time) { + for range w.cfg.WindowCount { + w.advanceWindowSimple(start) + } +} + // cloneWindows creates a shallow copy of the ring buffer. // Use this to avoid holding the lock while accessing the windows for aggregated queries. func (w *timeWindowedTDigest) cloneWindows() []timedWindow { @@ -236,6 +242,12 @@ func (w *timeWindowedTDigest) advanceWindow(timestamp time.Time) *timedWindow { func (w *timeWindowedTDigest) advanceWindowSimple(start time.Time) *timedWindow { w.head = w.modInc(w.head) + if curr := &w.windows[w.head]; curr.tdigest != nil { + curr.tdigest.Reset() + curr.start = start + curr.end = start.Add(w.cfg.WindowSize) + return curr + } digest, _ := tdigest.New() window := timedWindow{ tdigest: digest, diff --git a/common/stats/windowed_tdigest_performance_test.go b/common/stats/windowed_tdigest_performance_test.go index 5a774982a09..e2ba7062d2a 100644 --- a/common/stats/windowed_tdigest_performance_test.go +++ b/common/stats/windowed_tdigest_performance_test.go @@ -5,6 +5,7 @@ import ( "math/rand/v2" "testing" "time" + "unsafe" ) // Uses declarations from windowed_tdigest_test.go @@ -59,7 +60,7 @@ func BenchmarkTenMillionRecordQuery(b *testing.B) { WindowSize: time.Duration(windowSize) * time.Second, WindowCount: 10_000_000 / windowSize, }) - datapoints := generateDatapoints(10_000_000, 1, rand.NewPCG(7, 42)) + datapoints := generateDatapoints(1, 10_000_000, 1, rand.NewPCG(7, 42)) for _, dp := range datapoints { stats.RecordMulti(dp.Value, dp.Time, dp.Count) } @@ -86,61 +87,124 @@ func BenchmarkTenMillionRecordQuery(b *testing.B) { // has a nonlinear impact on latency. Observe that increasing datapoint count // for a fixed window size also has a somewhat super-linear impact on latency. // Increasing window rotation by reducing the total window count was tested. -// It did not make a significant (<1%) impact on latency -// - s, ms, us, ns GB, MB, KB, B -// BenchmarkInsert/datapoints-10,000-window-size-1 79,594,851 ns/op 220,978,893 B/op 90,277 allocs/op -// BenchmarkInsert/datapoints-10,000-window-size-10 8,262,293 ns/op 22,684,401 B/op 18,272 allocs/op -// BenchmarkInsert/datapoints-10,000-window-size-100 1,496,739 ns/op 2,852,384 B/op 11,067 allocs/op -// BenchmarkInsert/datapoints-10,000-window-size-1000 1,764,475 ns/op 867,330 B/op 10,344 allocs/op -// BenchmarkInsert/datapoints-10,000-window-size-10000 4,493,649 ns/op 1,161,041 B/op 10,316 allocs/op -// BenchmarkInsert/datapoints-10,000-window-size-100000 -// - s, ms, us, ns GB, MB, KB, B -// BenchmarkInsert/datapoints-10,000,000-window-size-1 -// BenchmarkInsert/datapoints-10,000,000-window-size-10 9,569,462,583 ns/op 22,680,717,288 B/op 18,302,020 allocs/op -// BenchmarkInsert/datapoints-10,000,000-window-size-100 1,888,510,125 ns/op 2,851,910,864 B/op 11,101,984 allocs/op -// BenchmarkInsert/datapoints-10,000,000-window-size-1000 1,782,219,708 ns/op 869,035,776 B/op 10,381,987 allocs/op -// BenchmarkInsert/datapoints-10,000,000-window-size-10000 4,656,263,666 ns/op 1,162,586,672 B/op 10,353,995 allocs/op -// BenchmarkInsert/datapoints-10,000,000-window-size-100000 5,698,841,500 ns/op 1,248,606,488 B/op 10,356,404 allocs/op +// It did not make a significant (<1%) impact on latency. +// Tests were run on a 2025 Macbook Pro M4 Max, your CPU features may differ! +// +// - s, ms, us, ns MB, KB, B GB, MB, KB, B +// BenchmarkInsert/10k-datapoints-retain-all-window-size-10 402,160 ns/op 254,672 B/stats-size-estimate 656,036 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-100 680,058 ns/op 168,272 B/stats-size-estimate 649,773 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-1000 1,668,972 ns/op 159,632 B/stats-size-estimate 647,274 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-10000 4,438,201 ns/op 23,232 B/stats-size-estimate 1,139,031 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-10 11,239,292,208 ns/op 254,389,632 B/stats-size-estimate 22,680,700,936 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-100 1,765,953,792 ns/op 167,989,632 B/stats-size-estimate 2,851,912,088 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-1000 1,723,260,708 ns/op 159,349,632 B/stats-size-estimate ,869,028,368 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-10000 4,644,341,542 ns/op 22,848,592 B/stats-size-estimate 1,162,587,904 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-100000 5,661,283,750 ns/op 2,395,696 B/stats-size-estimate 1,248,610,672 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-10 392,717,680 ns/op 254,560 B/stats-size-estimate 656,047,216 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-100 680,680,917 ns/op 168,160 B/stats-size-estimate 649,804,400 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-1000 1,665,408,958 ns/op 159,520 B/stats-size-estimate 648,923,456 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-10000 4,630,973,833 ns/op 23,168 B/stats-size-estimate 1,115,983,792 B/op-alloc func BenchmarkInsert(b *testing.B) { - slidingBenchmark("datapoints-10,000", b, 10_000, 1, 10_000) - slidingBenchmark("datapoints-10,000,000", b, 10_000_000, 10, 100_000) + slidingBenchmark(benchmarkInput{ + name: "10k-datapoints-retain-all", + b: b, + inputDatapointCount: 10_000, + windowRotations: 1, + startWindowSize: 10, + maxWindowSize: 10_000, + }) + slidingBenchmark(benchmarkInput{ + name: "10M-datapoints-retain-all", + b: b, + inputDatapointCount: 10_000_000, + windowRotations: 1, + startWindowSize: 10, + maxWindowSize: 100_000, + }) + slidingBenchmark(benchmarkInput{ + name: "10M-datapoints-retain-10k", + b: b, + inputDatapointCount: 10_000_000, + windowRotations: 1_000, + startWindowSize: 10, + maxWindowSize: 10_000, + }) } -func generateDatapoints(datapoints int, pointsPerBucket uint64, random UIntGenerator) (output []RecordingValue) { - output = make([]RecordingValue, datapoints) - timeIdx := uint64(1) - for i := range datapoints { - output[i] = RecordingValue{ - Value: float64(i), - Time: n(int(timeIdx)), - Count: random.Uint64() % 100, - } - // Expected-value of pointsPerBucket records per second, with uniform variation - if random.Uint64()%pointsPerBucket == 0 { - timeIdx++ - } - } - return +type benchmarkInput struct { + name string + b *testing.B + inputDatapointCount int + windowRotations int + startWindowSize int + maxWindowSize int } -func slidingBenchmark(baseName string, b *testing.B, datapointCount, startWindowSize, maxWindowSize int) { - if (startWindowSize != 1 && startWindowSize%10 != 0) || maxWindowSize%10 != 0 || - datapointCount%10 != 0 { - b.Skip("startWindowSize, maxWindowSize, datapointCount must be a multiple of 10") +func slidingBenchmark(in benchmarkInput) { + if (in.startWindowSize != 1 && in.startWindowSize%10 != 0) || in.maxWindowSize%10 != 0 || + in.inputDatapointCount%10 != 0 { + in.b.Skip("startWindowSize, maxWindowSize, datapointCount must be a multiple of 10") } - datapoints := generateDatapoints(datapointCount, 1, rand.NewPCG(7, 42)) - for windowSize := startWindowSize; windowSize <= maxWindowSize; windowSize *= 10 { - b.Run(fmt.Sprintf("%s-window-size-%d", baseName, windowSize), func(b *testing.B) { + if in.inputDatapointCount/in.maxWindowSize < in.windowRotations { + in.b.Skip("Not enough datapoints to rotate windows that many times.") + } + datapoints := generateDatapoints(uint64(in.maxWindowSize*in.windowRotations), in.inputDatapointCount, + 1, rand.NewPCG(7, 42)) + for windowSize := in.startWindowSize; windowSize <= in.maxWindowSize; windowSize *= 10 { + in.b.Run(fmt.Sprintf("%s-window-size-%d", in.name, windowSize), func(b *testing.B) { b.ReportAllocs() - for b.Loop() { + for range b.N { + // Need to recreate this each loop stats, _ := NewWindowedTDigest(WindowConfig{ WindowSize: time.Duration(windowSize) * time.Second, - WindowCount: datapointCount, + WindowCount: in.inputDatapointCount / (windowSize * in.windowRotations), }) + stats.(*timeWindowedTDigest).pretouchWindowsForTest(n(1)) + b.StartTimer() for _, dp := range datapoints { stats.RecordMulti(dp.Value, dp.Time, dp.Count) } + b.StopTimer() + b.ReportMetric(float64(estimateTDigestSize(stats.(*timeWindowedTDigest))), "B/stats-size-estimate") } }) } } + +func estimateTDigestSize(impl *timeWindowedTDigest) uintptr { + // Count outer struct size + size := unsafe.Sizeof(*impl) + for _, w := range impl.windows { + // Count the window struct size + size += unsafe.Sizeof(timedWindow{}) + if w.tdigest != nil { + // Count tdigest-struct's size + size += unsafe.Sizeof(*w.tdigest) + // Reverse-engineered from the tdigest package implementation: + // Each tdigest contains a "summary", which contains two slices + // of equal size, "mean" and "count". ForEachCentroid iterates those slices. + w.tdigest.ForEachCentroid(func(_ float64, _ uint64) bool { + size += 16 + return true + }) + } + } + return size +} + +func generateDatapoints(timeStartSeconds uint64, datapoints int, pointsPerBucket uint64, random UIntGenerator) (output []RecordingValue) { + output = make([]RecordingValue, datapoints) + timeIdx := timeStartSeconds + for i := range datapoints { + output[i] = RecordingValue{ + Value: float64(i), + Time: n(int(timeIdx)), + Count: random.Uint64() % 100, + } + // Expected-value of pointsPerBucket records per second, with uniform variation + if random.Uint64()%pointsPerBucket == 0 { + timeIdx++ + } + } + return +} From cfe84382b2f3d3cba42918b171d7467be8068767 Mon Sep 17 00:00:00 2001 From: Nick Beaumont Date: Wed, 15 Apr 2026 15:59:04 -0700 Subject: [PATCH 8/8] Update recommendation --- common/stats/windowed_tdigest.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go index 0661745ee42..ec09ca8f8af 100644 --- a/common/stats/windowed_tdigest.go +++ b/common/stats/windowed_tdigest.go @@ -82,9 +82,10 @@ type ( var _ TimeWindowedStats = (*timeWindowedTDigest)(nil) // NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests. -// Some guidance on sizing the windows/counts: Prefer windows containing 100-1000 datapoints. -// T-Digest is somewhat super-linear with datapoint count, and 1000 is a breakpoint with the -// cost of just creating a new window. See BenchmarkInsert in windowed_tdigest_performance_test.go for details. +// Some guidance on sizing the windows/counts: Prefer windows containing about 10,000 datapoints. +// Insert latency increases by a factor of about 3x from a size of 1k-10k, but the stored size in RAM drops by 7x. +// So, if you want 300 seconds of history on an event that records 1k counts/sec, 3 10-second windows +// is fine. func NewWindowedTDigest(cfg WindowConfig) (TimeWindowedStats, error) { if cfg.WindowCount <= 0 { return nil, errors.New("windowCount must be non-negative")