diff --git a/aggregate/algo_count_distinct.go b/aggregate/algo_count_distinct.go index fe54e0e3..cbfe333b 100644 --- a/aggregate/algo_count_distinct.go +++ b/aggregate/algo_count_distinct.go @@ -1,16 +1,27 @@ package aggregate import ( + "fmt" + "hash/fnv" + "math" + "github.com/GuanceCloud/cliutils" "github.com/GuanceCloud/cliutils/point" "github.com/cespare/xxhash/v2" ) +const ( + countDistinctExactLimit = 4096 + countDistinctSketchBits = 1 << 18 +) + type algoCountDistinct struct { MetricBase maxTime int64 - // 使用 map 来存储不重复的值 - distinctValues map[any]struct{} + // Keep exact hashes until cardinality grows too high, then switch to + // a fixed-size bitmap sketch to bound memory. + distinctValues map[uint64]struct{} + sketch []uint64 } // type assertions. @@ -18,9 +29,15 @@ var _ Calculator = &algoCountDistinct{} func (c *algoCountDistinct) Add(x any) { if inst, ok := x.(*algoCountDistinct); ok { - // 合并不重复的值 - for val := range inst.distinctValues { - c.distinctValues[val] = struct{}{} + if inst.sketch != nil { + c.ensureSketch() + for i, word := range inst.sketch { + c.sketch[i] |= word + } + } else { + for val := range inst.distinctValues { + c.addHash(val) + } } if inst.maxTime > c.maxTime { c.maxTime = inst.maxTime @@ -32,7 +49,7 @@ func (c *algoCountDistinct) Aggr() ([]*point.Point, error) { var kvs point.KVs // 统计不重复值的数量 - distinctCount := int64(len(c.distinctValues)) + distinctCount := c.count() kvs = kvs.Add(c.key, distinctCount). Add(c.key+"_count", distinctCount) @@ -50,7 +67,8 @@ func (c *algoCountDistinct) Aggr() ([]*point.Point, error) { func (c *algoCountDistinct) Reset() { c.maxTime = 0 // 清空不重复值集合 - c.distinctValues = make(map[any]struct{}) + c.distinctValues = make(map[uint64]struct{}) + c.sketch = nil } func (c *algoCountDistinct) doHash(h1 uint64) { @@ -65,9 +83,81 @@ func (c *algoCountDistinct) Base() *MetricBase { // 初始化函数,确保 map 被正确创建. func newAlgoCountDistinct(mb MetricBase, maxTime int64, value any) *algoCountDistinct { - return &algoCountDistinct{ + calc := &algoCountDistinct{ MetricBase: mb, maxTime: maxTime, - distinctValues: map[any]struct{}{value: {}}, + distinctValues: make(map[uint64]struct{}, 1), + } + calc.addValue(value) + return calc +} + +func (c *algoCountDistinct) addValue(value any) { + c.addHash(hashDistinctValue(value)) +} + +func (c *algoCountDistinct) addHash(hash uint64) { + if c.sketch != nil { + c.addSketchHash(hash) + return + } + + if c.distinctValues == nil { + c.distinctValues = make(map[uint64]struct{}) + } + if _, ok := c.distinctValues[hash]; ok || len(c.distinctValues) < countDistinctExactLimit { + c.distinctValues[hash] = struct{}{} + return + } + + c.ensureSketch() + c.addSketchHash(hash) +} + +func (c *algoCountDistinct) ensureSketch() { + if c.sketch != nil { + return } + + c.sketch = make([]uint64, countDistinctSketchBits/64) + for hash := range c.distinctValues { + c.addSketchHash(hash) + } + c.distinctValues = nil +} + +func (c *algoCountDistinct) addSketchHash(hash uint64) { + idx := hash % countDistinctSketchBits + c.sketch[idx/64] |= uint64(1) << (idx % 64) +} + +func (c *algoCountDistinct) count() int64 { + if c.sketch == nil { + return int64(len(c.distinctValues)) + } + + zeroBits := 0 + for _, word := range c.sketch { + if word == ^uint64(0) { + continue + } + for bit := 0; bit < 64; bit++ { + if word&(uint64(1)< 0 { + a.count += missing + } if inst.maxTime > a.maxTime { a.maxTime = inst.maxTime @@ -28,14 +35,43 @@ func (a *algoQuantiles) Add(x any) { } } +func newAlgoQuantiles(mb MetricBase, maxTime int64, value float64) *algoQuantiles { + calc := &algoQuantiles{ + MetricBase: mb, + maxTime: maxTime, + } + calc.addValue(value) + return calc +} + +func (a *algoQuantiles) addValue(v float64) { + a.count++ + if len(a.all) < quantileSampleLimit { + a.all = append(a.all, v) + return + } + + j := int(HashCombine(Seed1, uint64(a.count)) % uint64(a.count)) + if j < quantileSampleLimit { + a.all[j] = v + } +} + // GetPercentile 是一个通用方法,用于获取第 p 个百分位数 (0-100). func (a *algoQuantiles) GetPercentile(p float64) float64 { - n := len(a.all) - if n == 0 { + if n := len(a.all); n == 0 { return 0 } sort.Float64s(a.all) + return percentileFromSorted(a.all, p) +} + +func percentileFromSorted(sorted []float64, p float64) float64 { + n := len(sorted) + if n == 0 { + return 0 + } // 2. 将百分比转换为 0-1 之间的比例 fraction := p / 100.0 @@ -47,18 +83,19 @@ func (a *algoQuantiles) GetPercentile(p float64) float64 { // 4. 边界处理与线性插值水电费 if index >= n-1 { - return a.all[n-1] + return sorted[n-1] } - return a.all[index] + rest*(a.all[index+1]-a.all[index]) + return sorted[index] + rest*(sorted[index+1]-sorted[index]) } func (a *algoQuantiles) Aggr() ([]*point.Point, error) { var kvs point.KVs kvs = kvs.Add(a.key+"_count", a.count) + sort.Float64s(a.all) for _, quantile := range a.quantiles { key := fmt.Sprintf("%s_P%.0f", a.key, quantile*100) // %.0f: float to int. - kvs = kvs.Add(key, a.GetPercentile(quantile*100)) + kvs = kvs.Add(key, percentileFromSorted(a.all, quantile*100)) } for _, kv := range a.aggrTags { // NOTE: if same-name tag key exist, apply the last one. @@ -72,7 +109,7 @@ func (a *algoQuantiles) Aggr() ([]*point.Point, error) { func (a *algoQuantiles) Reset() { a.maxTime = 0 - a.all = []float64{} + a.all = nil a.count = 0 } diff --git a/aggregate/algo_stdev.go b/aggregate/algo_stdev.go index 5b425186..4425e271 100644 --- a/aggregate/algo_stdev.go +++ b/aggregate/algo_stdev.go @@ -14,7 +14,9 @@ import ( type algoStdev struct { MetricBase - data []float64 + count int64 + mean float64 + m2 float64 maxTime int64 } @@ -23,8 +25,21 @@ var _ Calculator = &algoStdev{} func (c *algoStdev) Add(x any) { if inst, ok := x.(*algoStdev); ok { - // 合并数据点 - c.data = append(c.data, inst.data...) + if inst.count == 0 { + return + } + + if c.count == 0 { + c.count = inst.count + c.mean = inst.mean + c.m2 = inst.m2 + } else { + delta := inst.mean - c.mean + total := c.count + inst.count + c.mean += delta * float64(inst.count) / float64(total) + c.m2 += inst.m2 + delta*delta*float64(c.count)*float64(inst.count)/float64(total) + c.count = total + } if inst.maxTime > c.maxTime { c.maxTime = inst.maxTime @@ -36,14 +51,13 @@ func (c *algoStdev) Aggr() ([]*point.Point, error) { var kvs point.KVs // 计算标准差 - stdev, err := SampleStdDev(c.data) + stdev, err := c.SampleStdDev() if err != nil { return nil, err } - count := len(c.data) kvs = kvs.Add(c.key, stdev). - Add(c.key+"_count", count) + Add(c.key+"_count", c.count) for _, kv := range c.aggrTags { // NOTE: if same-name tag key exist, apply the last one. @@ -56,7 +70,9 @@ func (c *algoStdev) Aggr() ([]*point.Point, error) { } func (c *algoStdev) Reset() { - c.data = nil + c.count = 0 + c.mean = 0 + c.m2 = 0 c.maxTime = 0 } @@ -70,6 +86,32 @@ func (c *algoStdev) Base() *MetricBase { return &c.MetricBase } +func newAlgoStdev(mb MetricBase, maxTime int64, value float64) *algoStdev { + c := &algoStdev{ + MetricBase: mb, + maxTime: maxTime, + } + c.addValue(value) + return c +} + +func (c *algoStdev) addValue(value float64) { + c.count++ + delta := value - c.mean + c.mean += delta / float64(c.count) + delta2 := value - c.mean + c.m2 += delta * delta2 +} + +func (c *algoStdev) SampleStdDev() (float64, error) { + if c.count < 2 { + return 0, errors.New("the sample standard deviation requires at least two data points") + } + + variance := c.m2 / float64(c.count-1) + return math.Sqrt(variance), nil +} + // SampleStdDev 计算样本标准差(除以 N-1). func SampleStdDev(data []float64) (float64, error) { n := len(data) diff --git a/aggregate/algo_string.go b/aggregate/algo_string.go index 6514bfe6..a2a67e63 100644 --- a/aggregate/algo_string.go +++ b/aggregate/algo_string.go @@ -66,14 +66,14 @@ func formatHistogramBuckets(leBucket map[string]float64) string { return "{" + strings.Join(parts, ", ") + "}" } -func formatDistinctValues(values map[any]struct{}) string { +func formatDistinctValues(values map[uint64]struct{}) string { if len(values) == 0 { return "[]" } parts := make([]string, 0, len(values)) for value := range values { - parts = append(parts, fmt.Sprintf("%T:%v", value, value)) + parts = append(parts, fmt.Sprintf("%016x", value)) } sort.Strings(parts) @@ -142,8 +142,9 @@ func (c *algoHistogram) ToString() string { func (a *algoQuantiles) ToString() string { return fmt.Sprintf( - "algoQuantiles{count=%d max_time=%d quantiles=%s all=%s %s}", + "algoQuantiles{count=%d sample_count=%d max_time=%d quantiles=%s all=%s %s}", a.count, + len(a.all), a.maxTime, formatFloat64Slice(a.quantiles), formatFloat64Slice(a.all), @@ -153,18 +154,20 @@ func (a *algoQuantiles) ToString() string { func (c *algoStdev) ToString() string { return fmt.Sprintf( - "algoStdev{count=%d max_time=%d data=%s %s}", - len(c.data), + "algoStdev{count=%d mean=%g m2=%g max_time=%d %s}", + c.count, + c.mean, + c.m2, c.maxTime, - formatFloat64Slice(c.data), formatMetricBaseForCalc(&c.MetricBase), ) } func (c *algoCountDistinct) ToString() string { return fmt.Sprintf( - "algoCountDistinct{count=%d max_time=%d distinct_values=%s %s}", - len(c.distinctValues), + "algoCountDistinct{count=%d sketch=%t max_time=%d distinct_values=%s %s}", + c.count(), + c.sketch != nil, c.maxTime, formatDistinctValues(c.distinctValues), formatMetricBaseForCalc(&c.MetricBase), diff --git a/aggregate/algo_string_test.go b/aggregate/algo_string_test.go index 2febb17e..332b2820 100644 --- a/aggregate/algo_string_test.go +++ b/aggregate/algo_string_test.go @@ -1,6 +1,7 @@ package aggregate import ( + "fmt" "testing" "time" @@ -44,12 +45,12 @@ func TestCalculatorToString(t *testing.T) { time.Unix(1700000001, 0).UnixNano(), "alice", ) - calc.distinctValues[42] = struct{}{} - calc.distinctValues[true] = struct{}{} + calc.distinctValues[hashDistinctValue(42)] = struct{}{} + calc.distinctValues[hashDistinctValue(true)] = struct{}{} assert.Equal( t, - "algoCountDistinct{count=3 max_time=1700000001000000000 distinct_values=[bool:true, int:42, string:alice] base={name=demo key=user hash=7 window=5s next_wall_time= heap_idx=0 tags=[env=prod]}}", + fmt.Sprintf("algoCountDistinct{count=3 sketch=false max_time=1700000001000000000 distinct_values=%s base={name=demo key=user hash=7 window=5s next_wall_time= heap_idx=0 tags=[env=prod]}}", formatDistinctValues(calc.distinctValues)), calc.ToString(), ) }) diff --git a/aggregate/algo_test.go b/aggregate/algo_test.go index 63f72bbd..258cf701 100644 --- a/aggregate/algo_test.go +++ b/aggregate/algo_test.go @@ -48,13 +48,13 @@ func TestAlgoCountDistinct_Add(t *T.T) { calc1.Add(calc4) // 验证不重复值的数量 - assert.Equal(t, 4, len(calc1.distinctValues)) + assert.Equal(t, int64(4), calc1.count()) // 验证所有值都存在 - _, hasInt := calc1.distinctValues[42] - _, hasFloat := calc1.distinctValues[3.14] - _, hasString := calc1.distinctValues["test_string"] - _, hasBool := calc1.distinctValues[true] + _, hasInt := calc1.distinctValues[hashDistinctValue(42)] + _, hasFloat := calc1.distinctValues[hashDistinctValue(3.14)] + _, hasString := calc1.distinctValues[hashDistinctValue("test_string")] + _, hasBool := calc1.distinctValues[hashDistinctValue(true)] assert.True(t, hasInt, "int value should exist") assert.True(t, hasFloat, "float value should exist") @@ -71,7 +71,7 @@ func TestAlgoCountDistinct_Add(t *T.T) { calc1.Add(calc5) // 不重复值数量应该仍然是4 - assert.Equal(t, 4, len(calc1.distinctValues)) + assert.Equal(t, int64(4), calc1.count()) // 测试Aggr方法 points, err := calc1.Aggr() @@ -115,7 +115,7 @@ func TestAlgoCountDistinct_Add(t *T.T) { calc1.Add(calc2) // 空字符串应该被视为不同的值 - assert.Equal(t, 2, len(calc1.distinctValues)) + assert.Equal(t, int64(2), calc1.count()) }) t.Run("reset-test", func(t *T.T) { @@ -135,16 +135,16 @@ func TestAlgoCountDistinct_Add(t *T.T) { calc2 := newAlgoCountDistinct(mb2, time.Now().UnixNano()+1000, 3.14) calc.Add(calc2) - assert.Equal(t, 2, len(calc.distinctValues)) + assert.Equal(t, int64(2), calc.count()) // 重置 calc.Reset() - assert.Equal(t, 0, len(calc.distinctValues)) + assert.Equal(t, int64(0), calc.count()) assert.Equal(t, int64(0), calc.maxTime) // 重置后应该可以重新添加值 calc.Add(calc2) - assert.Equal(t, 1, len(calc.distinctValues)) + assert.Equal(t, int64(1), calc.count()) }) } diff --git a/aggregate/business_coverage_more_test.go b/aggregate/business_coverage_more_test.go index ff1c3fff..b1f977fa 100644 --- a/aggregate/business_coverage_more_test.go +++ b/aggregate/business_coverage_more_test.go @@ -213,7 +213,7 @@ func TestSmallBusinessHelpersAndStrings(t *testing.T) { &algoMin{MetricBase: base, min: 1, count: 1, maxTime: 1}, &algoHistogram{MetricBase: base, count: 1, val: 1, maxTime: 1, leBucket: map[string]float64{"1": 1}}, &algoQuantiles{MetricBase: base, count: 1, maxTime: 1, quantiles: []float64{0.5}, all: []float64{1}}, - &algoStdev{MetricBase: base, maxTime: 1, data: []float64{1, 2}}, + newAlgoStdev(base, 1, 1), newAlgoCountDistinct(base, 1, "alice"), &algoCountFirst{MetricBase: base, first: 1, firstTime: 1, count: 1}, &algoCountLast{MetricBase: base, last: 1, lastTime: 1, count: 1}, diff --git a/aggregate/calculator.go b/aggregate/calculator.go index c84c7a9f..29be9e0f 100644 --- a/aggregate/calculator.go +++ b/aggregate/calculator.go @@ -141,10 +141,18 @@ func AlignNextWallTime(t time.Time, align time.Duration) int64 { } func newCalculators(batch *AggregationBatch) (res []Calculator) { + if batch == nil || batch.Points == nil || len(batch.AggregationOpts) == 0 { + return nil + } + var ptwrap *point.Point // now = time.Now() for key, algo := range batch.AggregationOpts { + if algo == nil { + continue + } + var extraTags [][2]string // nextWallTime = AlignNextWallTime(now, time.Duration(algo.Window)) @@ -274,12 +282,7 @@ func newCalculators(batch *AggregationBatch) (res []Calculator) { res = append(res, calc) case QUANTILES: - calc := &algoQuantiles{ - count: 1, - all: []float64{f64}, - maxTime: ptwrap.Time().UnixNano(), - MetricBase: mb, - } + calc := newAlgoQuantiles(mb, ptwrap.Time().UnixNano(), f64) if algo.Options != nil { switch opt := algo.Options.(type) { case *AggregationAlgo_QuantileOpts: @@ -292,11 +295,7 @@ func newCalculators(batch *AggregationBatch) (res []Calculator) { res = append(res, calc) case STDEV: - calc := &algoStdev{ - MetricBase: mb, - data: []float64{f64}, - maxTime: ptwrap.Time().UnixNano(), - } + calc := newAlgoStdev(mb, ptwrap.Time().UnixNano(), f64) calc.doHash(batch.RoutingKey) res = append(res, calc) diff --git a/aggregate/calculator_test.go b/aggregate/calculator_test.go index 01892b67..c62e3a43 100644 --- a/aggregate/calculator_test.go +++ b/aggregate/calculator_test.go @@ -94,6 +94,15 @@ func TestNewCalculatorsQuantiles(t *T.T) { assert.Equal(t, []float64{0.5, 0.9}, q.quantiles) } +func TestNewCalculatorsSkipsInvalidBatchShape(t *T.T) { + assert.Empty(t, newCalculators(nil)) + assert.Empty(t, newCalculators(&AggregationBatch{})) + assert.Empty(t, newCalculators(&AggregationBatch{ + Points: &point.PBPoints{}, + AggregationOpts: map[string]*AggregationAlgo{"value": nil}, + })) +} + func TestNewCalculatorsWindowUsesNanoseconds(t *T.T) { pt := point.NewPoint( "demo", diff --git a/aggregate/windows.go b/aggregate/windows.go index f73f1c37..3b3a9d28 100644 --- a/aggregate/windows.go +++ b/aggregate/windows.go @@ -7,6 +7,8 @@ import ( "github.com/GuanceCloud/cliutils/point" ) +const windowCacheReuseMaxEntries = 4096 + type Window struct { lock sync.Mutex // 为每一个window创建一把锁 cache map[uint64]Calculator @@ -16,6 +18,14 @@ type Window struct { // Reset 准备将对象放回池子前调用. func (w *Window) Reset() { w.Token = "" + if w.cache == nil { + w.cache = make(map[uint64]Calculator, 64) + return + } + if len(w.cache) > windowCacheReuseMaxEntries { + w.cache = make(map[uint64]Calculator, 64) + return + } // 清空 Map 但保留底层哈希桶空间 // Go 1.11+ 优化:遍历 delete 所有的 key 是极快的 for k := range w.cache { @@ -36,7 +46,8 @@ func (w *Window) AddCal(cal Calculator) { } type Windows struct { - lock sync.Mutex + lock sync.Mutex + closed bool // 为方便快速定位到用户数据的所在的window需要一个ID表 // token -> Window ID IDs map[string]int @@ -45,20 +56,41 @@ type Windows struct { } func (ws *Windows) AddCal(token string, cal Calculator) { + _ = ws.addCal(token, cal) +} + +func (ws *Windows) addCal(token string, cal Calculator) bool { ws.lock.Lock() + defer ws.lock.Unlock() + if ws.closed { + return false + } + id, ok := ws.IDs[token] if !ok { // 从 Pool 获取对象而非直接 new newW := windowPool.Get().(*Window) + newW.Reset() newW.Token = token id = len(ws.WS) ws.IDs[token] = id ws.WS = append(ws.WS, newW) } - ws.lock.Unlock() ws.WS[id].AddCal(cal) + return true +} + +func (ws *Windows) Close() []*Window { + ws.lock.Lock() + defer ws.lock.Unlock() + + ws.closed = true + windows := append([]*Window(nil), ws.WS...) + ws.IDs = nil + ws.WS = nil + return windows } type Cache struct { @@ -78,14 +110,26 @@ func NewCache(exp time.Duration) *Cache { } func (c *Cache) GetAndSetBucket(exp int64, token string, cal Calculator) { - c.lock.Lock() - ws, ok := c.WindowsBuckets[exp] - if !ok { - ws = &Windows{IDs: make(map[string]int), WS: make([]*Window, 0)} - c.WindowsBuckets[exp] = ws + _ = c.getAndSetBucket(exp, token, cal) +} + +func (c *Cache) getAndSetBucket(exp int64, token string, cal Calculator) bool { + for { + c.lock.Lock() + ws, ok := c.WindowsBuckets[exp] + if !ok { + ws = &Windows{IDs: make(map[string]int), WS: make([]*Window, 0)} + c.WindowsBuckets[exp] = ws + } + c.lock.Unlock() + + if ws.addCal(token, cal) { + return true + } + if time.Now().Unix() >= exp { + return false + } } - c.lock.Unlock() - ws.AddCal(token, cal) } func (c *Cache) AddBatch(token string, batch *AggregationBatch) (n, expN int) { @@ -96,8 +140,11 @@ func (c *Cache) AddBatch(token string, batch *AggregationBatch) (n, expN int) { expN++ continue } - c.GetAndSetBucket(exp, token, cal) - n++ + if c.getAndSetBucket(exp, token, cal) { + n++ + } else { + expN++ + } } return n, expN } @@ -118,7 +165,7 @@ func (c *Cache) GetExpWidows() []*Window { now := time.Now().Unix() for t, ws := range c.WindowsBuckets { if t <= now { - wss = append(wss, ws.WS...) + wss = append(wss, ws.Close()...) delete(c.WindowsBuckets, t) } } @@ -143,11 +190,18 @@ func WindowsToData(ws []*Window) []*PointsData { } pts = append(pts, pbs...) } + if len(pts) == 0 { + window.Reset() + windowPool.Put(window) + continue + } // 每一个用户下的Window 都是一个独立的包 pds = append(pds, &PointsData{ PTS: pts, Token: window.Token, }) + window.Reset() + windowPool.Put(window) } return pds diff --git a/aggregate/windows_more_test.go b/aggregate/windows_more_test.go index 2bb6e197..8a3dfd8e 100644 --- a/aggregate/windows_more_test.go +++ b/aggregate/windows_more_test.go @@ -73,15 +73,10 @@ func TestWindowsToDataSkipsFailedCalculator(t *testing.T) { window := &Window{ Token: "token-a", cache: map[uint64]Calculator{ - 1: &algoStdev{ - MetricBase: MetricBase{key: "latency", name: "request"}, - data: []float64{1}, - }, + 1: newAlgoStdev(MetricBase{key: "latency", name: "request"}, 0, 1), }, } data := WindowsToData([]*Window{window}) - require.Len(t, data, 1) - assert.Equal(t, "token-a", data[0].Token) - assert.Empty(t, data[0].PTS) + assert.Empty(t, data) }