Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 99 additions & 9 deletions aggregate/algo_count_distinct.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
package aggregate

import (
"fmt"
"hash/fnv"
"math"

"github.com/GuanceCloud/cliutils"
"github.com/GuanceCloud/cliutils/point"
"github.com/cespare/xxhash/v2"
)

const (
countDistinctExactLimit = 4096
countDistinctSketchBits = 1 << 18
)

type algoCountDistinct struct {
MetricBase
maxTime int64
// 使用 map 来存储不重复的值
distinctValues map[any]struct{}
// Keep exact hashes until cardinality grows too high, then switch to
// a fixed-size bitmap sketch to bound memory.
distinctValues map[uint64]struct{}
sketch []uint64
}

// type assertions.
var _ Calculator = &algoCountDistinct{}

func (c *algoCountDistinct) Add(x any) {
if inst, ok := x.(*algoCountDistinct); ok {
// 合并不重复的值
for val := range inst.distinctValues {
c.distinctValues[val] = struct{}{}
if inst.sketch != nil {
c.ensureSketch()
for i, word := range inst.sketch {
c.sketch[i] |= word
}
} else {
for val := range inst.distinctValues {
c.addHash(val)
}
}
if inst.maxTime > c.maxTime {
c.maxTime = inst.maxTime
Expand All @@ -32,7 +49,7 @@ func (c *algoCountDistinct) Aggr() ([]*point.Point, error) {
var kvs point.KVs

// 统计不重复值的数量
distinctCount := int64(len(c.distinctValues))
distinctCount := c.count()

kvs = kvs.Add(c.key, distinctCount).
Add(c.key+"_count", distinctCount)
Expand All @@ -50,7 +67,8 @@ func (c *algoCountDistinct) Aggr() ([]*point.Point, error) {
func (c *algoCountDistinct) Reset() {
c.maxTime = 0
// 清空不重复值集合
c.distinctValues = make(map[any]struct{})
c.distinctValues = make(map[uint64]struct{})
c.sketch = nil
}

func (c *algoCountDistinct) doHash(h1 uint64) {
Expand All @@ -65,9 +83,81 @@ func (c *algoCountDistinct) Base() *MetricBase {

// 初始化函数,确保 map 被正确创建.
func newAlgoCountDistinct(mb MetricBase, maxTime int64, value any) *algoCountDistinct {
return &algoCountDistinct{
calc := &algoCountDistinct{
MetricBase: mb,
maxTime: maxTime,
distinctValues: map[any]struct{}{value: {}},
distinctValues: make(map[uint64]struct{}, 1),
}
calc.addValue(value)
return calc
}

func (c *algoCountDistinct) addValue(value any) {
c.addHash(hashDistinctValue(value))
}

func (c *algoCountDistinct) addHash(hash uint64) {
if c.sketch != nil {
c.addSketchHash(hash)
return
}

if c.distinctValues == nil {
c.distinctValues = make(map[uint64]struct{})
}
if _, ok := c.distinctValues[hash]; ok || len(c.distinctValues) < countDistinctExactLimit {
c.distinctValues[hash] = struct{}{}
return
}

c.ensureSketch()
c.addSketchHash(hash)
}

func (c *algoCountDistinct) ensureSketch() {
if c.sketch != nil {
return
}

c.sketch = make([]uint64, countDistinctSketchBits/64)
for hash := range c.distinctValues {
c.addSketchHash(hash)
}
c.distinctValues = nil
}

func (c *algoCountDistinct) addSketchHash(hash uint64) {
idx := hash % countDistinctSketchBits
c.sketch[idx/64] |= uint64(1) << (idx % 64)
}

func (c *algoCountDistinct) count() int64 {
if c.sketch == nil {
return int64(len(c.distinctValues))
}

zeroBits := 0
for _, word := range c.sketch {
if word == ^uint64(0) {
continue
}
for bit := 0; bit < 64; bit++ {
if word&(uint64(1)<<bit) == 0 {
zeroBits++
}
}
}
if zeroBits == 0 {
return countDistinctSketchBits
}

m := float64(countDistinctSketchBits)
estimate := -m * math.Log(float64(zeroBits)/m)
return int64(estimate + 0.5)
}

func hashDistinctValue(value any) uint64 {
h := fnv.New64a()
_, _ = fmt.Fprintf(h, "%T:%v", value, value)
return h.Sum64()
}
12 changes: 7 additions & 5 deletions aggregate/algo_more_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ func TestHistogramAndStdevCalculators(t *testing.T) {
assert.Zero(t, hist.count)

stdev := &algoStdev{MetricBase: base}
stdev.Add(&algoStdev{data: []float64{2}, maxTime: now.UnixNano()})
stdev.Add(&algoStdev{data: []float64{4}, maxTime: now.Add(time.Second).UnixNano()})
stdev.Add(&algoStdev{data: []float64{6}, maxTime: now.Add(2 * time.Second).UnixNano()})
stdev.Add(newAlgoStdev(MetricBase{}, now.UnixNano(), 2))
stdev.Add(newAlgoStdev(MetricBase{}, now.Add(time.Second).UnixNano(), 4))
stdev.Add(newAlgoStdev(MetricBase{}, now.Add(2*time.Second).UnixNano(), 6))
stdev.Add("ignored")
stdev.doHash(1)
require.NotZero(t, stdev.Base().hash)
Expand All @@ -195,15 +195,17 @@ func TestHistogramAndStdevCalculators(t *testing.T) {
assert.Equal(t, int64(3), count)
assert.Equal(t, "test", pts[0].GetTag("env"))

_, err = (&algoStdev{data: []float64{1}}).Aggr()
_, err = newAlgoStdev(MetricBase{}, 0, 1).Aggr()
assert.Error(t, err)
_, err = SampleStdDev([]float64{1})
assert.Error(t, err)
got, err = SampleStdDev([]float64{1, 2, 3})
require.NoError(t, err)
assert.Equal(t, 1.0, got)
stdev.Reset()
assert.Nil(t, stdev.data)
assert.Zero(t, stdev.count)
assert.Zero(t, stdev.mean)
assert.Zero(t, stdev.m2)
assert.Zero(t, stdev.maxTime)
}

Expand Down
53 changes: 45 additions & 8 deletions aggregate/algo_quantiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/cespare/xxhash/v2"
)

const quantileSampleLimit = 8192

type algoQuantiles struct {
maxTime int64
MetricBase
Expand All @@ -19,23 +21,57 @@ type algoQuantiles struct {

func (a *algoQuantiles) Add(x any) {
if inst, ok := x.(*algoQuantiles); ok {
a.count += inst.count
a.all = append(a.all, inst.all...)
before := a.count
for _, v := range inst.all {
a.addValue(v)
}
if missing := inst.count - (a.count - before); missing > 0 {
a.count += missing
}

if inst.maxTime > a.maxTime {
a.maxTime = inst.maxTime
}
}
}

func newAlgoQuantiles(mb MetricBase, maxTime int64, value float64) *algoQuantiles {
calc := &algoQuantiles{
MetricBase: mb,
maxTime: maxTime,
}
calc.addValue(value)
return calc
}

func (a *algoQuantiles) addValue(v float64) {
a.count++
if len(a.all) < quantileSampleLimit {
a.all = append(a.all, v)
return
}

j := int(HashCombine(Seed1, uint64(a.count)) % uint64(a.count))
if j < quantileSampleLimit {
a.all[j] = v
}
}

// GetPercentile 是一个通用方法,用于获取第 p 个百分位数 (0-100).
func (a *algoQuantiles) GetPercentile(p float64) float64 {
n := len(a.all)
if n == 0 {
if n := len(a.all); n == 0 {
return 0
}

sort.Float64s(a.all)
return percentileFromSorted(a.all, p)
}

func percentileFromSorted(sorted []float64, p float64) float64 {
n := len(sorted)
if n == 0 {
return 0
}

// 2. 将百分比转换为 0-1 之间的比例
fraction := p / 100.0
Expand All @@ -47,18 +83,19 @@ func (a *algoQuantiles) GetPercentile(p float64) float64 {

// 4. 边界处理与线性插值水电费
if index >= n-1 {
return a.all[n-1]
return sorted[n-1]
}
return a.all[index] + rest*(a.all[index+1]-a.all[index])
return sorted[index] + rest*(sorted[index+1]-sorted[index])
}

func (a *algoQuantiles) Aggr() ([]*point.Point, error) {
var kvs point.KVs

kvs = kvs.Add(a.key+"_count", a.count)
sort.Float64s(a.all)
for _, quantile := range a.quantiles {
key := fmt.Sprintf("%s_P%.0f", a.key, quantile*100) // %.0f: float to int.
kvs = kvs.Add(key, a.GetPercentile(quantile*100))
kvs = kvs.Add(key, percentileFromSorted(a.all, quantile*100))
}
for _, kv := range a.aggrTags {
// NOTE: if same-name tag key exist, apply the last one.
Expand All @@ -72,7 +109,7 @@ func (a *algoQuantiles) Aggr() ([]*point.Point, error) {

func (a *algoQuantiles) Reset() {
a.maxTime = 0
a.all = []float64{}
a.all = nil
a.count = 0
}

Expand Down
56 changes: 49 additions & 7 deletions aggregate/algo_stdev.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

type algoStdev struct {
MetricBase
data []float64
count int64
mean float64
m2 float64
maxTime int64
}

Expand All @@ -23,8 +25,21 @@ var _ Calculator = &algoStdev{}

func (c *algoStdev) Add(x any) {
if inst, ok := x.(*algoStdev); ok {
// 合并数据点
c.data = append(c.data, inst.data...)
if inst.count == 0 {
return
}

if c.count == 0 {
c.count = inst.count
c.mean = inst.mean
c.m2 = inst.m2
} else {
delta := inst.mean - c.mean
total := c.count + inst.count
c.mean += delta * float64(inst.count) / float64(total)
c.m2 += inst.m2 + delta*delta*float64(c.count)*float64(inst.count)/float64(total)
c.count = total
}

if inst.maxTime > c.maxTime {
c.maxTime = inst.maxTime
Expand All @@ -36,14 +51,13 @@ func (c *algoStdev) Aggr() ([]*point.Point, error) {
var kvs point.KVs

// 计算标准差
stdev, err := SampleStdDev(c.data)
stdev, err := c.SampleStdDev()
if err != nil {
return nil, err
}

count := len(c.data)
kvs = kvs.Add(c.key, stdev).
Add(c.key+"_count", count)
Add(c.key+"_count", c.count)

for _, kv := range c.aggrTags {
// NOTE: if same-name tag key exist, apply the last one.
Expand All @@ -56,7 +70,9 @@ func (c *algoStdev) Aggr() ([]*point.Point, error) {
}

func (c *algoStdev) Reset() {
c.data = nil
c.count = 0
c.mean = 0
c.m2 = 0
c.maxTime = 0
}

Expand All @@ -70,6 +86,32 @@ func (c *algoStdev) Base() *MetricBase {
return &c.MetricBase
}

func newAlgoStdev(mb MetricBase, maxTime int64, value float64) *algoStdev {
c := &algoStdev{
MetricBase: mb,
maxTime: maxTime,
}
c.addValue(value)
return c
}

func (c *algoStdev) addValue(value float64) {
c.count++
delta := value - c.mean
c.mean += delta / float64(c.count)
delta2 := value - c.mean
c.m2 += delta * delta2
}

func (c *algoStdev) SampleStdDev() (float64, error) {
if c.count < 2 {
return 0, errors.New("the sample standard deviation requires at least two data points")
}

variance := c.m2 / float64(c.count-1)
return math.Sqrt(variance), nil
}

// SampleStdDev 计算样本标准差(除以 N-1).
func SampleStdDev(data []float64) (float64, error) {
n := len(data)
Expand Down
Loading
Loading