diff --git a/common/stats/windowed_tdigest.go b/common/stats/windowed_tdigest.go new file mode 100644 index 00000000000..ec09ca8f8af --- /dev/null +++ b/common/stats/windowed_tdigest.go @@ -0,0 +1,285 @@ +package stats + +import ( + "errors" + "sync" + "time" + + "github.com/caio/go-tdigest/v5" +) + +type ( + // TimeWindowedStats collects values into fixed-duration time windows, + // each backed by a t-digest, enabling approximate quantile queries + // over a sliding time range. + // Windows are guaranteed to be non-overlapping. + TimeWindowedStats interface { + // Record records a value that occurred at the given timestamp. + Record(value float64, timestamp time.Time) + // RecordToLatestWindow records a value in the current time window. + // This will call time.Now(), don't call this in a tight loop. + RecordToLatestWindow(value float64) + // RecordMulti records a weighted value that occurred at the given timestamp + // with the provided count. Imagine you have 3 counts of 10, and you + // call RecordMulti(20, t, 3). Your new mean will be 15 because you added + // 3 counts of 20. + RecordMulti(value float64, timestamp time.Time, count uint64) + // RecordMultiToLatestWindow records a weighted value in the current time window. + // This will call time.Now(), don't call this in a tight loop. + RecordMultiToLatestWindow(value float64, count uint64) + // Quantile returns the approximate value at quantile q (0 <= q <= 1). + // Aggregates across all active windows. + Quantile(q float64) float64 + // TrimmedMean returns the mean of the values within the given quantile range (0 <= q <= 1). + // For example, TrimmedMean(0.9, 0.99) returns the average of all the values between the + // 90th percentile and the 99th percentile. + // Aggregates across all active windows. + TrimmedMean(lowerQuantile, upperQuantile float64) float64 + // SubWindowForTime returns a view of the time window containing the given instant. + // Use this to get detailed information about a specific time window. + SubWindowForTime(instant time.Time) TimeWindowView + } + + // TimeWindowView is a read-only view of a single time window. + TimeWindowView interface { + // Quantile returns the approximate value at quantile q (0 <= q <= 1) + Quantile(q float64) float64 + // TrimmedMean returns the mean of the values within the given quantile range (0 <= q <= 1). + // For example, TrimmedMean(0.9, 0.99) returns the average of all the values between the + // 90th percentile and the 99th percentile. + TrimmedMean(lowerQuantile, upperQuantile float64) float64 + Start() time.Time + End() time.Time + } + + // WindowConfig controls the windowing behavior. + WindowConfig struct { + // windowSize is the duration of each window. + WindowSize time.Duration + // windowCount is the maximum number of windows to retain. + WindowCount int + // FillBlankIntervals controls whether gaps in the windowing should be preserved. + // This matters when windowSize is shorter than the largest gap in the measured data + // and individual time-windows are being fetched via TimeWindowedStats.SubWindowForTime. + FillBlankIntervals bool + } + + timeWindowedTDigest struct { + mu sync.Mutex + windows []timedWindow // ring buffer, pre-allocated to windowCount + head int // index of the newest window + cfg WindowConfig + } + + timedWindow struct { + tdigest *tdigest.TDigest + start time.Time + end time.Time + } +) + +// Compile-time assertion that timeWindowedTDigest implements TimeWindowedStats. +var _ TimeWindowedStats = (*timeWindowedTDigest)(nil) + +// NewWindowedTDigest creates a new TimeWindowedStats backed by per-window t-digests. +// Some guidance on sizing the windows/counts: Prefer windows containing about 10,000 datapoints. +// Insert latency increases by a factor of about 3x from a size of 1k-10k, but the stored size in RAM drops by 7x. +// So, if you want 300 seconds of history on an event that records 1k counts/sec, 3 10-second windows +// is fine. +func NewWindowedTDigest(cfg WindowConfig) (TimeWindowedStats, error) { + if cfg.WindowCount <= 0 { + return nil, errors.New("windowCount must be non-negative") + } + if cfg.WindowSize.Milliseconds() <= 50 { + return nil, errors.New("probable misconfiguration detected: windowSize is too small, consider increasing it to at least 50ms") + } + return &timeWindowedTDigest{ + windows: make([]timedWindow, cfg.WindowCount), + cfg: cfg, + // mu and head both empty + }, nil +} + +func (w *timeWindowedTDigest) Record(value float64, timestamp time.Time) { + w.RecordMulti(value, timestamp, 1) +} + +func (w *timeWindowedTDigest) RecordMulti(value float64, timestamp time.Time, count uint64) { + window, err := w.getOrCreateWindow(timestamp) + if err != nil { + // Drop data for invalid timestamps + return + } + _ = window.tdigest.AddWeighted(value, count) +} + +func (w *timeWindowedTDigest) RecordToLatestWindow(value float64) { + w.RecordMultiToLatestWindow(value, 1) +} + +func (w *timeWindowedTDigest) RecordMultiToLatestWindow(value float64, count uint64) { + window, err := w.getOrCreateWindow(time.Time{}) + if err != nil { + return + } + _ = window.tdigest.AddWeighted(value, count) +} + +func (w *timeWindowedTDigest) SubWindowForTime(instant time.Time) TimeWindowView { + w.mu.Lock() + defer w.mu.Unlock() + window, _ := w.searchWindowsBackwards(instant) + return window +} + +func (w *timeWindowedTDigest) Quantile(q float64) float64 { + return w.getMergedWindows().Quantile(q) +} + +func (w *timeWindowedTDigest) TrimmedMean(lowerQuantile, upperQuantile float64) float64 { + return w.getMergedWindows().TrimmedMean(lowerQuantile, upperQuantile) +} + +// TODO: this is expensive, maybe cache everything but the latest window so we can skip N merges? +func (w *timeWindowedTDigest) getMergedWindows() *tdigest.TDigest { + windows := w.cloneWindows() + merged, _ := tdigest.New() + for idx := range windows { + if windows[idx].tdigest != nil { + _ = merged.Merge(windows[idx].tdigest) + } + } + return merged +} + +func (w *timeWindowedTDigest) pretouchWindowsForTest(start time.Time) { + for range w.cfg.WindowCount { + w.advanceWindowSimple(start) + } +} + +// cloneWindows creates a shallow copy of the ring buffer. +// Use this to avoid holding the lock while accessing the windows for aggregated queries. +func (w *timeWindowedTDigest) cloneWindows() []timedWindow { + w.mu.Lock() + defer w.mu.Unlock() + windows := make([]timedWindow, len(w.windows)) + copy(windows, w.windows) + return windows +} + +// getOrCreateWindow returns the window containing the given timestamp, creating a new one if necessary. +// 0 is a valid timestamp and will return the most recent window. +func (w *timeWindowedTDigest) getOrCreateWindow(timestamp time.Time) (*timedWindow, error) { + w.mu.Lock() + defer w.mu.Unlock() + if timestamp.IsZero() { + timestamp = time.Now() + } + candidate, err := w.searchWindowsBackwards(timestamp) + if err == nil { + return candidate, nil + } + if errors.Is(err, errTooNew) { + return w.advanceWindow(timestamp), nil + } + // err is errTooOld or errInGap + return nil, err +} + +var errTooOld = errors.New("time was older than the earliest window") +var errTooNew = errors.New("time was newer than the latest window") +var errInGap = errors.New("time was in a gap between windows") + +// Precondition: w.mu is held. +// Returns the window containing the given timestamp, or error if no window exists. +func (w *timeWindowedTDigest) searchWindowsBackwards(timestamp time.Time) (*timedWindow, error) { + latest := w.windows[w.head] + if !timestamp.Before(latest.start) { + // If the requested timestamp is after the latest window, no point in searching + if !timestamp.Before(latest.end) { + return nil, errTooNew + } + return &latest, nil + } + for idx := w.modDec(w.head); idx != w.head; idx = w.modDec(idx) { + candidate := w.windows[idx] + // Window start is inclusive, end is exclusive. We're iterating + // backwards in time, so the first window that matches is the one we want. + if !timestamp.Before(candidate.start) { + // The first window that matches might be too short to include this timestamp. + // Make sure the timestamp is actually in the window. + if !timestamp.Before(w.windows[idx].end) { + return nil, errInGap + } + return &candidate, nil + } + } + // All the windows are newer than the requested timestamp. + return nil, errTooOld +} + +// Precondition: w.mu is held. +func (w *timeWindowedTDigest) advanceWindow(timestamp time.Time) *timedWindow { + if w.cfg.FillBlankIntervals { + // Fill in all the intervening blank windows. + // The empty digests will not affect the stats, but they will drop old windows. + lastWindow := &w.windows[w.head] + if lastWindow.tdigest == nil { + // Special-case, the latest window is uninitialized. Just take it. + return w.advanceWindowSimple(timestamp) + } + for { + lastWindow = w.advanceWindowSimple(lastWindow.end) + // Need a do-while here to enforce that lastWindow contains timestamp + if timestamp.Before(lastWindow.end) { + break + } + } + return lastWindow + } + return w.advanceWindowSimple(timestamp) +} + +func (w *timeWindowedTDigest) advanceWindowSimple(start time.Time) *timedWindow { + w.head = w.modInc(w.head) + if curr := &w.windows[w.head]; curr.tdigest != nil { + curr.tdigest.Reset() + curr.start = start + curr.end = start.Add(w.cfg.WindowSize) + return curr + } + digest, _ := tdigest.New() + window := timedWindow{ + tdigest: digest, + start: start, + end: start.Add(w.cfg.WindowSize), + } + w.windows[w.head] = window + return &window +} + +// modulo-increment, for wrapping around the ring buffer +func (w *timeWindowedTDigest) modInc(idx int) int { + return (idx + 1) % w.cfg.WindowCount +} + +// modulo-decrement, for wrapping around the ring buffer. +func (w *timeWindowedTDigest) modDec(idx int) int { + return (idx - 1 + w.cfg.WindowCount) % w.cfg.WindowCount +} + +func (w *timedWindow) Start() time.Time { + return w.start +} +func (w *timedWindow) End() time.Time { + return w.end +} + +func (w *timedWindow) Quantile(q float64) float64 { + return w.tdigest.Quantile(q) +} + +func (w *timedWindow) TrimmedMean(lowerQuantile, upperQuantile float64) float64 { + return w.tdigest.TrimmedMean(lowerQuantile, upperQuantile) +} diff --git a/common/stats/windowed_tdigest_performance_test.go b/common/stats/windowed_tdigest_performance_test.go new file mode 100644 index 00000000000..e2ba7062d2a --- /dev/null +++ b/common/stats/windowed_tdigest_performance_test.go @@ -0,0 +1,210 @@ +package stats + +import ( + "fmt" + "math/rand/v2" + "testing" + "time" + "unsafe" +) + +// Uses declarations from windowed_tdigest_test.go +// func n(seconds int) time.Time +// RecordingValue struct { +// Value float64 +// Time time.Time +// Count uint64 +// } + +type UIntGenerator interface { + Uint64() uint64 +} + +// - ms, us, ns +// BenchmarkNewWindow 8,058,719 ns/op +// 8ms to add 1000 new windows each containing a single datapoint. +// For windows sized to more than a few milliseconds, this means window +// creation will not be a bottleneck. +func BenchmarkNewWindow(b *testing.B) { + var i int + for b.Loop() { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 300, + }) + statsImpl := stats.(*timeWindowedTDigest) + for range 1000 { + now := n(i) + statsImpl.RecordMulti(float64(i), now, 52) + i++ + } + } +} + +// - s, ms, us, ns +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-10 11,655,230,875 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-10 11,743,078,833 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-10 1,795,897 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-100 5,297,755,167 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-100 4,950,984,583 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-100 144,574 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-1000 3,844,660,708 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-1000 3,646,952,708 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-1000 13,883 ns/op +// BenchmarkTenMillionRecordQuery/trimmed-mean-size-10000 473,836,570 ns/op +// BenchmarkTenMillionRecordQuery/quantile-size-10000 473,955,278 ns/op +// BenchmarkTenMillionRecordQuery/subwindow-quantile-size-10000 798,834 ns/op +func BenchmarkTenMillionRecordQuery(b *testing.B) { + for windowSize := 10; windowSize <= 10_000; windowSize *= 10 { + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: time.Duration(windowSize) * time.Second, + WindowCount: 10_000_000 / windowSize, + }) + datapoints := generateDatapoints(1, 10_000_000, 1, rand.NewPCG(7, 42)) + for _, dp := range datapoints { + stats.RecordMulti(dp.Value, dp.Time, dp.Count) + } + b.Run(fmt.Sprintf("trimmed-mean-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.TrimmedMean(0, 1.0) + } + }) + b.Run(fmt.Sprintf("quantile-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.Quantile(0.9) + } + }) + b.Run(fmt.Sprintf("subwindow-quantile-size-%d", windowSize), func(b *testing.B) { + for b.Loop() { + stats.SubWindowForTime(datapoints[9_000_000].Time).Quantile(0.9) + } + }) + } +} + +// How to read this chart: Lower numbers are better. There are two datasets tested, +// 10k and 10M. Observe that increasing window size for a fixed datapoint count +// has a nonlinear impact on latency. Observe that increasing datapoint count +// for a fixed window size also has a somewhat super-linear impact on latency. +// Increasing window rotation by reducing the total window count was tested. +// It did not make a significant (<1%) impact on latency. +// Tests were run on a 2025 Macbook Pro M4 Max, your CPU features may differ! +// +// - s, ms, us, ns MB, KB, B GB, MB, KB, B +// BenchmarkInsert/10k-datapoints-retain-all-window-size-10 402,160 ns/op 254,672 B/stats-size-estimate 656,036 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-100 680,058 ns/op 168,272 B/stats-size-estimate 649,773 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-1000 1,668,972 ns/op 159,632 B/stats-size-estimate 647,274 B/op-alloc +// BenchmarkInsert/10k-datapoints-retain-all-window-size-10000 4,438,201 ns/op 23,232 B/stats-size-estimate 1,139,031 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-10 11,239,292,208 ns/op 254,389,632 B/stats-size-estimate 22,680,700,936 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-100 1,765,953,792 ns/op 167,989,632 B/stats-size-estimate 2,851,912,088 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-1000 1,723,260,708 ns/op 159,349,632 B/stats-size-estimate ,869,028,368 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-10000 4,644,341,542 ns/op 22,848,592 B/stats-size-estimate 1,162,587,904 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-all-window-size-100000 5,661,283,750 ns/op 2,395,696 B/stats-size-estimate 1,248,610,672 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-10 392,717,680 ns/op 254,560 B/stats-size-estimate 656,047,216 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-100 680,680,917 ns/op 168,160 B/stats-size-estimate 649,804,400 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-1000 1,665,408,958 ns/op 159,520 B/stats-size-estimate 648,923,456 B/op-alloc +// BenchmarkInsert/10M-datapoints-retain-10k-window-size-10000 4,630,973,833 ns/op 23,168 B/stats-size-estimate 1,115,983,792 B/op-alloc +func BenchmarkInsert(b *testing.B) { + slidingBenchmark(benchmarkInput{ + name: "10k-datapoints-retain-all", + b: b, + inputDatapointCount: 10_000, + windowRotations: 1, + startWindowSize: 10, + maxWindowSize: 10_000, + }) + slidingBenchmark(benchmarkInput{ + name: "10M-datapoints-retain-all", + b: b, + inputDatapointCount: 10_000_000, + windowRotations: 1, + startWindowSize: 10, + maxWindowSize: 100_000, + }) + slidingBenchmark(benchmarkInput{ + name: "10M-datapoints-retain-10k", + b: b, + inputDatapointCount: 10_000_000, + windowRotations: 1_000, + startWindowSize: 10, + maxWindowSize: 10_000, + }) +} + +type benchmarkInput struct { + name string + b *testing.B + inputDatapointCount int + windowRotations int + startWindowSize int + maxWindowSize int +} + +func slidingBenchmark(in benchmarkInput) { + if (in.startWindowSize != 1 && in.startWindowSize%10 != 0) || in.maxWindowSize%10 != 0 || + in.inputDatapointCount%10 != 0 { + in.b.Skip("startWindowSize, maxWindowSize, datapointCount must be a multiple of 10") + } + if in.inputDatapointCount/in.maxWindowSize < in.windowRotations { + in.b.Skip("Not enough datapoints to rotate windows that many times.") + } + datapoints := generateDatapoints(uint64(in.maxWindowSize*in.windowRotations), in.inputDatapointCount, + 1, rand.NewPCG(7, 42)) + for windowSize := in.startWindowSize; windowSize <= in.maxWindowSize; windowSize *= 10 { + in.b.Run(fmt.Sprintf("%s-window-size-%d", in.name, windowSize), func(b *testing.B) { + b.ReportAllocs() + for range b.N { + // Need to recreate this each loop + stats, _ := NewWindowedTDigest(WindowConfig{ + WindowSize: time.Duration(windowSize) * time.Second, + WindowCount: in.inputDatapointCount / (windowSize * in.windowRotations), + }) + stats.(*timeWindowedTDigest).pretouchWindowsForTest(n(1)) + b.StartTimer() + for _, dp := range datapoints { + stats.RecordMulti(dp.Value, dp.Time, dp.Count) + } + b.StopTimer() + b.ReportMetric(float64(estimateTDigestSize(stats.(*timeWindowedTDigest))), "B/stats-size-estimate") + } + }) + } +} + +func estimateTDigestSize(impl *timeWindowedTDigest) uintptr { + // Count outer struct size + size := unsafe.Sizeof(*impl) + for _, w := range impl.windows { + // Count the window struct size + size += unsafe.Sizeof(timedWindow{}) + if w.tdigest != nil { + // Count tdigest-struct's size + size += unsafe.Sizeof(*w.tdigest) + // Reverse-engineered from the tdigest package implementation: + // Each tdigest contains a "summary", which contains two slices + // of equal size, "mean" and "count". ForEachCentroid iterates those slices. + w.tdigest.ForEachCentroid(func(_ float64, _ uint64) bool { + size += 16 + return true + }) + } + } + return size +} + +func generateDatapoints(timeStartSeconds uint64, datapoints int, pointsPerBucket uint64, random UIntGenerator) (output []RecordingValue) { + output = make([]RecordingValue, datapoints) + timeIdx := timeStartSeconds + for i := range datapoints { + output[i] = RecordingValue{ + Value: float64(i), + Time: n(int(timeIdx)), + Count: random.Uint64() % 100, + } + // Expected-value of pointsPerBucket records per second, with uniform variation + if random.Uint64()%pointsPerBucket == 0 { + timeIdx++ + } + } + return +} diff --git a/common/stats/windowed_tdigest_test.go b/common/stats/windowed_tdigest_test.go new file mode 100644 index 00000000000..b359d9269bd --- /dev/null +++ b/common/stats/windowed_tdigest_test.go @@ -0,0 +1,235 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func n(seconds int) time.Time { + return time.Unix(int64(seconds), 0) +} + +type ( + RecordingValue struct { + Value float64 + Time time.Time + Count uint64 + } + TimeWindowedStatsTestCase struct { + Name string + WindowConfig WindowConfig + Expectations []TestExpectation + RecordingValues []RecordingValue + } + TestExpectation func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) +) + +// TestWindowConfig is the default window configuration used in tests. +// It plays nicely with the "n" function above. +var TestWindowConfig = WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 10, +} +var TestWindowConfigBlanks = WindowConfig{ + WindowSize: 1 * time.Second, + WindowCount: 10, + FillBlankIntervals: true, +} + +func computeSimpleAverage(values []RecordingValue) map[int]RecordingValue { + averages := make(map[int]RecordingValue) + for _, value := range values { + existing := averages[value.Time.Second()] + averages[value.Time.Second()] = RecordingValue{ + existing.Value + value.Value, + existing.Time, + existing.Count + value.Count, + } + } + return averages +} + +func countNonEmptyWindows(stats *timeWindowedTDigest) int { + count := 0 + for _, bucket := range stats.windows { + if bucket.tdigest != nil { + count++ + } + } + return count +} + +func incrementingData(start time.Time, count int) []RecordingValue { + RecordingValues := make([]RecordingValue, count) + for i := range count { + RecordingValues[i] = RecordingValue{ + Value: float64(i), + Time: start.Add(time.Duration(i) * time.Second), + Count: 1, + } + } + return RecordingValues +} + +func TestWindowedDigest(t *testing.T) { + // Use when unique windows are less than the max window count + simpleExpectations := []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + averages := computeSimpleAverage(tc.RecordingValues) + require.Equal(t, len(averages), countNonEmptyWindows(stats)) + for bucket, value := range averages { + window := stats.SubWindowForTime(n(bucket)) + require.InDelta(t, value.Value/float64(value.Count), window.TrimmedMean(0, 1.0), 0.01) + } + }, + } + + testCases := []TimeWindowedStatsTestCase{ + {"empty", TestWindowConfig, simpleExpectations, nil}, + {"single-value", TestWindowConfig, simpleExpectations, []RecordingValue{ + {float64(10), n(1), 1}, + }}, + {"full-buckets", TestWindowConfig, simpleExpectations, + incrementingData(n(1), 10), + }, + {"overflow", TestWindowConfig, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 10, countNonEmptyWindows(stats)) + // datapoints will be 10-19, giving us max=19, min=10, avg=14.5 + require.InDelta(t, 14.5, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 19, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) + }, + }, incrementingData(n(1), 20)}, + {"blank-drops-old-data", TestWindowConfigBlanks, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 10, countNonEmptyWindows(stats)) + // avg=max=min because the old datapoint expired + require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(0.0), 0.01) + }}, []RecordingValue{ + {float64(10), n(1), 1}, + {float64(20), n(11), 1}, + }}, + {"fill-blanks-simple", TestWindowConfigBlanks, []TestExpectation{ + func(t *testing.T, tc TimeWindowedStatsTestCase, stats *timeWindowedTDigest) { + require.Equal(t, 5, countNonEmptyWindows(stats)) + require.InDelta(t, 15, stats.TrimmedMean(0, 1.0), 0.01) + require.InDelta(t, 20, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) + }}, []RecordingValue{ + {float64(10), n(1), 1}, + {float64(20), n(5), 1}, + }}, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + stats, _ := NewWindowedTDigest(tc.WindowConfig) + for _, value := range tc.RecordingValues { + stats.RecordMulti(value.Value, value.Time, value.Count) + } + for _, expectation := range tc.Expectations { + expectation(t, tc, stats.(*timeWindowedTDigest)) + } + }) + } +} + +func TestWindowedDigest_OldTimestampDropped(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // Record at t=5, then try to record at t=1 which is before the earliest window. + stats.Record(100, n(5)) + stats.Record(999, n(1)) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) + require.InDelta(t, 100, stats.Quantile(0.5), 0.01) +} + +func TestWindowedDigest_GapTimestampDropped(t *testing.T) { + // Without FillBlankIntervals, a gap between windows causes the value to be dropped. + stats, _ := NewWindowedTDigest(TestWindowConfig) + // Create window at t=1 (covers [1,2)), then advance to t=5 (covers [5,6)). + // t=3 falls in the gap [2,5) with no window. + stats.Record(10, n(1)) + stats.Record(50, n(5)) + stats.Record(999, n(3)) // should be dropped (in gap) + td := stats + require.Equal(t, 2, countNonEmptyWindows(td.(*timeWindowedTDigest))) + require.InDelta(t, 30, stats.TrimmedMean(0, 1.0), 0.01) +} + +func TestWindowedDigest_WindowBoundaryInclusiveExclusive(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // Record at t=1, creating window [1s, 2s). + stats.Record(10, n(1)) + td := stats + + // t=1 (start) is inclusive — should find the window. + w := td.SubWindowForTime(n(1)) + require.NotNil(t, w) + require.InDelta(t, 10, w.Quantile(0.5), 0.01) + + // t=2 (end) is exclusive — should NOT find the window. + w2 := td.SubWindowForTime(n(2)) + require.Nil(t, w2) +} + +func TestWindowedDigest_SubWindowForTimeMissing(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // No data recorded yet — all windows are uninitialized. + w := stats.SubWindowForTime(n(5)) + require.Nil(t, w) +} + +func TestWindowedDigest_RecordMultiWeighted(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // RecordMulti with count=5 should weight the value heavily. + stats.RecordMulti(10, n(1), 5) + stats.RecordMulti(20, n(1), 1) + // Weighted mean: (10*5 + 20*1) / 6 = 11.67 + require.InDelta(t, 11.67, stats.TrimmedMean(0, 1.0), 0.5) +} + +func TestWindowedDigest_MultipleValuesInSameWindow(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // All values land in the same 1-second window [1s, 2s). + halfwidth := 500 * time.Millisecond + stats.Record(10, n(1)) + stats.Record(20, n(1).Add(halfwidth)) + stats.Record(30, n(1).Add(halfwidth)) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) + require.InDelta(t, 20, stats.TrimmedMean(0, 1.0), 0.01) // (10+20+30)/3 + require.InDelta(t, 30, stats.Quantile(1.0), 0.01) + require.InDelta(t, 10, stats.Quantile(0.0), 0.01) +} + +func TestWindowedDigest_RingBufferWrapPreservesNewest(t *testing.T) { + cfg := WindowConfig{WindowSize: 1 * time.Second, WindowCount: 3} + stats, _ := NewWindowedTDigest(cfg) + // Insert 5 values into 5 different windows; ring buffer holds 3. + for i := 1; i <= 5; i++ { + stats.Record(float64(i*10), n(i)) + } + td := stats + require.Equal(t, 3, countNonEmptyWindows(td.(*timeWindowedTDigest))) + // Windows 3, 4, 5 should survive (values 30, 40, 50). + require.InDelta(t, 50, stats.Quantile(1.0), 0.01) + require.InDelta(t, 30, stats.Quantile(0.0), 0.01) + require.InDelta(t, 40, stats.TrimmedMean(0, 1.0), 0.01) +} + +func TestWindowedDigest_RecordToLatestWindow(t *testing.T) { + stats, _ := NewWindowedTDigest(TestWindowConfig) + // RecordToLatestWindow uses time.Now(), so it should create a window. + stats.RecordToLatestWindow(42) + stats.RecordMultiToLatestWindow(100, 3) + td := stats + require.Equal(t, 1, countNonEmptyWindows(td.(*timeWindowedTDigest))) + // Weighted mean: (42*1 + 100*3) / 4 = 85.5 + require.InDelta(t, 85.5, stats.TrimmedMean(0, 1.0), 0.01) +} diff --git a/go.mod b/go.mod index a8031c011ee..9d9427390c5 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/aws/aws-sdk-go v1.55.8 github.com/blang/semver/v4 v4.0.0 github.com/cactus/go-statsd-client/v5 v5.1.0 + github.com/caio/go-tdigest/v5 v5.0.0 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/emirpasic/gods v1.18.1 github.com/fatih/color v1.18.0 diff --git a/go.sum b/go.sum index fa7633c649a..41b859e8021 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,8 @@ github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go. github.com/cactus/go-statsd-client/v4 v4.0.0/go.mod h1:m73kwJp6TN0Ja9P6ycdZhWM1MlfxY/95WZ//IptPQ+Y= github.com/cactus/go-statsd-client/v5 v5.1.0 h1:sbbdfIl9PgisjEoXzvXI1lwUKWElngsjJKaZeC021P4= github.com/cactus/go-statsd-client/v5 v5.1.0/go.mod h1:COEvJ1E+/E2L4q6QE5CkjWPi4eeDw9maJBMIuMPBZbY= +github.com/caio/go-tdigest/v5 v5.0.0 h1:XQKgYSazZPbWFDAJ51dKqoZoDrISmTrB8UcWwCmfo6Y= +github.com/caio/go-tdigest/v5 v5.0.0/go.mod h1:wI618wZoAYzIDZlpX2CfyTQdrdGtwEZOJuXdrI3zk/Y= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -261,6 +263,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4= +github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=