Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions aggregate/docs/feature-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -520,13 +520,14 @@ data_ttl = "1m"

真实执行语义:

1. pipeline 顺序执行
2. 第一条给出最终决定的 pipeline 生效
3. 后面的 pipeline 不再执行
1. 先遍历整条 trace 或整组数据里的所有 point/span,记录每条 pipeline 是否至少命中过一次
2. 所有 point/span 扫描完成后,再按 pipeline 顺序选择第一条命中的 pipeline
3. 第一条命中的 pipeline 给出最终决定,后面的 pipeline 不再执行

另外两个容易误判的点:

- 如果条件没有成功解析,`DoAction()` 不会做决定
- `=` 对字符串是精确匹配,`resource = "GET /tmall/**"` 不会把 `**` 当通配符;如果要模糊匹配,请使用 `re(...)` 或 `match`
- 很有可能一个都没有匹配到就结束了,也会被删除掉。所以 **要有百分比采样兜底**

所以如果你想做“默认概率采样”,不要留空条件,写一个始终为真的条件更安全。
Expand Down
59 changes: 35 additions & 24 deletions aggregate/tail-sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,39 +84,20 @@ func evaluatePipelines(td *DataPacket, pipelines []*SamplingPipeline) (bool, *Da
}

ptw := &ptWrap{}
matched := false
var keptPacket *DataPacket
matchedPipelines := make([]bool, len(pipelines))

walkErr := td.WalkRawPBPoints(func(raw []byte) bool {
if err := ptw.Reset(raw); err != nil {
l.Errorf("decode datapacket point failed: %v", err)
keptPacket = nil
matched = false
return false
}

for _, pipeline := range pipelines {
if pipeline == nil {
for idx, pipeline := range pipelines {
if matchedPipelines[idx] {
continue
}

if pipeline.conds == nil && !pipeline.isMatchAllSampling() {
continue
}

if pipeline.conds != nil {
if x := pipeline.conds.Eval(ptw); x < 0 {
continue
}
}

if pipeline.Type == PipelineTypeSampling && pipeline.Rate <= 0 {
continue
}

matched = true
keptPacket = pipelineMatchedPacket(td, pipeline)
return false
matchedPipelines[idx] = pipelineMatchesPoint(ptw, pipeline)
}

return true
Expand All @@ -126,13 +107,43 @@ func evaluatePipelines(td *DataPacket, pipelines []*SamplingPipeline) (bool, *Da
return false, nil
}

return matched, keptPacket
for idx, matched := range matchedPipelines {
if !matched {
continue
}

return true, pipelineMatchedPacket(td, pipelines[idx])
}

return false, nil
}

func (sp *SamplingPipeline) isMatchAllSampling() bool {
return sp != nil && sp.Type == PipelineTypeSampling && sp.Condition == "" && sp.Rate > 0
}

func pipelineMatchesPoint(ptw *ptWrap, pipeline *SamplingPipeline) bool {
if ptw == nil || pipeline == nil {
return false
}

if pipeline.conds == nil && !pipeline.isMatchAllSampling() {
return false
}

if pipeline.conds != nil {
if x := pipeline.conds.Eval(ptw); x < 0 {
return false
}
}

if pipeline.Type == PipelineTypeSampling && pipeline.Rate <= 0 {
return false
}

return true
}

func pipelineMatchedPacket(td *DataPacket, pipeline *SamplingPipeline) *DataPacket {
if td == nil || pipeline == nil {
return nil
Expand Down
89 changes: 86 additions & 3 deletions aggregate/tail-sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/GuanceCloud/cliutils/logger"
"github.com/GuanceCloud/cliutils/point"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPickTrace(t *testing.T) {
Expand Down Expand Up @@ -203,11 +204,11 @@ func TestSamplingPipeline_DoAction(t *testing.T) {
tdIsNil: false,
},
{
name: "test_drop_resource",
name: "test_drop_resource_regex",
fields: fields{
Name: "drop resource",
Type: PipelineTypeCondition,
Condition: "{ resource = \"GET /tmall/**\" }",
Condition: `{ resource = re("^GET /tmall/.+$") }`,
Action: PipelineActionDrop,
},
args: args{
Expand All @@ -224,6 +225,28 @@ func TestSamplingPipeline_DoAction(t *testing.T) {
want: true,
tdIsNil: true,
},
{
name: "test_drop_resource_literal_is_exact_match",
fields: fields{
Name: "drop resource literal",
Type: PipelineTypeCondition,
Condition: `{ resource = "GET /tmall/**" }`,
Action: PipelineActionDrop,
},
args: args{
td: &DataPacket{
GroupIdHash: 123123123123123,
RawGroupId: "123456789",
Source: "ddtrace",
ConfigVersion: 1,
HasError: false,
PointCount: 5,
PointsPayload: MockTrace(),
},
},
want: false,
tdIsNil: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -295,7 +318,7 @@ func MockTrace() []byte {

pt5 := point.NewPoint("ddtrace", point.NewKVs(map[string]interface{}{
"http.server.requests_bucket": float64(10),
"resource": "GET /tmall/**",
"resource": "GET /tmall/123",
"trace_id": "1000000000",
"span_id": "12345678912",
"status": "ok",
Expand All @@ -311,6 +334,66 @@ func MockTrace() []byte {
return payload
}

func TestEvaluatePipelinesTraceWideOrder(t *testing.T) {
t.Run("earlier pipeline wins even when a later span matches it", func(t *testing.T) {
packet := makeTracePacket(t,
map[string]interface{}{"resource": "/keep-first", "trace_id": "trace-wide", "span_id": "span-1", "start_time": time.Now().Unix(), "duration": int64(1)},
map[string]interface{}{"resource": "/drop-later", "trace_id": "trace-wide", "span_id": "span-2", "start_time": time.Now().Unix(), "duration": int64(1)},
)

pipelines := []*SamplingPipeline{
{Name: "drop-later-span", Type: PipelineTypeCondition, Condition: `{ resource = "/drop-later" }`, Action: PipelineActionDrop},
{Name: "sample-rest", Type: PipelineTypeSampling, Condition: `{ 1 = 1 }`, Rate: 1},
}
for _, pipeline := range pipelines {
require.NoError(t, pipeline.Apply())
}

matched, keptPacket := evaluatePipelines(packet, pipelines)
assert.True(t, matched)
assert.Nil(t, keptPacket)
})

t.Run("keep and drop conflicts follow pipeline order", func(t *testing.T) {
packet := makeTracePacket(t,
map[string]interface{}{"resource": "/normal", "trace_id": "trace-conflict", "span_id": "span-1", "status": "error", "start_time": time.Now().Unix(), "duration": int64(1)},
map[string]interface{}{"resource": "/drop-me", "trace_id": "trace-conflict", "span_id": "span-2", "start_time": time.Now().Unix(), "duration": int64(1)},
)

pipelines := []*SamplingPipeline{
{Name: "keep-errors", Type: PipelineTypeCondition, Condition: `{ status = "error" }`, Action: PipelineActionKeep},
{Name: "drop-resource", Type: PipelineTypeCondition, Condition: `{ resource = "/drop-me" }`, Action: PipelineActionDrop},
}
for _, pipeline := range pipelines {
require.NoError(t, pipeline.Apply())
}

matched, keptPacket := evaluatePipelines(packet, pipelines)
assert.True(t, matched)
assert.Same(t, packet, keptPacket)
})
}

func makeTracePacket(t *testing.T, spanFields ...map[string]interface{}) *DataPacket {
t.Helper()

var payload []byte
for _, fields := range spanFields {
pt := point.NewPoint("ddtrace", point.NewKVs(fields), point.CommonLoggingOptions()...)
pt.SetTime(time.Now())
payload = point.AppendPointToPBPointsPayload(payload, pt)
}

return &DataPacket{
GroupIdHash: 123123123123123,
RawGroupId: "trace",
Source: "ddtrace",
ConfigVersion: 1,
PointCount: int32(len(spanFields)),
PointsPayload: payload,
}
}

// TestTailSamplingConfigs_Init 测试配置初始化
func TestTailSamplingConfigs_Init(t *testing.T) {
tests := []struct {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.19
require (
github.com/BurntSushi/toml v1.2.1
github.com/GuanceCloud/pipeline-go v1.0.9-0.20250804083758-0b4dd0f48771
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260408104033-ca6a8390ea4f
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260414025403-ef70fe2798f7
github.com/VictoriaMetrics/easyproto v0.1.4
github.com/aliyun/aliyun-oss-go-sdk v2.1.2+incompatible
github.com/brianvoe/gofakeit/v6 v6.28.0
Expand Down Expand Up @@ -34,7 +34,7 @@ require (
golang.org/x/net v0.16.0
golang.org/x/sys v0.13.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.56.2
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.31.0
gopkg.in/CodapeWild/dd-trace-go.v1 v1.35.17
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down Expand Up @@ -105,7 +105,7 @@ require (
golang.org/x/text v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/uint128 v1.2.0 // indirect
modernc.org/cc/v3 v3.40.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/GuanceCloud/pipeline-go v1.0.9-0.20250804083758-0b4dd0f48771 h1:70uR0
github.com/GuanceCloud/pipeline-go v1.0.9-0.20250804083758-0b4dd0f48771/go.mod h1:ImLVtod1YFBrmFIdhsyf/OWwnwBbXcPBldCjqB93020=
github.com/GuanceCloud/platypus v0.3.3-0.20250528074826-e3130ff5a05c h1:DE7qQ8Vw3+/sbIiRZ+43m9X0cNtfS+NrmVU0htylAao=
github.com/GuanceCloud/platypus v0.3.3-0.20250528074826-e3130ff5a05c/go.mod h1:H9Sol/SI+A9ppJUohdn9m/UA0aiNvh+G0/GnY6IVDnI=
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260408104033-ca6a8390ea4f h1:B1oOM8tp1QdNt021fREg3CjUNMcgkH8O7PevP5Y239E=
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260408104033-ca6a8390ea4f/go.mod h1:qY0WeOypAHikxW4oOXeUsDUGWq1vqe6X6tA5YoXmBmc=
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260414025403-ef70fe2798f7 h1:DvGJeg7ZAHFzzDq8qVWfJkQVxZRgFHGtkMAPZJFRJjk=
github.com/GuanceCloud/tracing-protos/opentelemetry-gen-go v0.0.0-20260414025403-ef70fe2798f7/go.mod h1:RxBKC0A9sEgpp3nppMvxb6BrwQcb/GgFHDsYRODvV98=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/Microsoft/go-winio v0.4.15-0.20190919025122-fc70bd9a86b5/go.mod h1:tTuCMEN+UleMWgg9dVx4Hu52b1bJo+59jBh3ajtinzw=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
Expand Down Expand Up @@ -1317,8 +1317,8 @@ google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006 h1:mmbq5q8M1t7dhkLw320YK4PsOXm6jdnUAkErImaIqOg=
google.golang.org/genproto v0.0.0-20220920201722-2b89144ce006/go.mod h1:ht8XFiar2npT/g4vkk7O0WYS1sHOHbdujxbEp7CJWbw=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
Expand Down Expand Up @@ -1347,8 +1347,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.56.2 h1:fVRFRnXvU+x6C4IlHZewvJOVHoOv1TUuQyoRsYnB4bI=
google.golang.org/grpc v1.56.2/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
google.golang.org/grpc v1.51.0 h1:E1eGv1FTqoLIdnBCZufiSHgKjlqG6fKFf6pPWtMTh8U=
google.golang.org/grpc v1.51.0/go.mod h1:wgNDFcnuBGmxLKI/qn4T+m5BtEBYXJPvibbUPsAIPww=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 6 additions & 19 deletions vendor/google.golang.org/grpc/CONTRIBUTING.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading