-
Notifications
You must be signed in to change notification settings - Fork 515
feat: trace context propagation behavior #4635
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1679462
721ee34
ec9d1fd
7166cb4
a31dfe1
e5d5cf4
d502c97
e74e765
6344be5
dad691e
502e5fc
c1876e6
69414fc
e86bc01
b3d13f8
b856236
abc8508
3b072e0
13b2517
f240849
9857535
0e1e163
7aef04e
564bc64
acbd99a
7a49acc
c0d292e
adf7691
cb9a60b
72b092f
5c35603
49391b2
463dda0
cb0dc18
5a09f0a
3f504cb
9ce00be
920565f
ddd05ad
b16c4d8
5bfb26b
50b4212
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,22 @@ func (c TextMapCarrier) ForeachKey(handler func(key, val string) error) error { | |
| } | ||
|
|
||
| const ( | ||
| // headerPropagationBehaviorExtract specifies how to handle incoming trace | ||
| // context. Allowed values: | ||
| // - "continue" (default): Continue the trace from incoming headers. | ||
| // Baggage is propagated. | ||
| // - "restart": Start a new trace with a new trace ID and sampling | ||
| // decision. The incoming context is referenced via a span link. | ||
| // Baggage is propagated. | ||
| // - "ignore": Start a new trace with a new trace ID and sampling | ||
| // decision. No span links are created. Baggage is dropped. | ||
| headerPropagationBehaviorExtract = "DD_TRACE_PROPAGATION_BEHAVIOR_EXTRACT" | ||
|
|
||
| propagationBehaviorExtractContinue = "continue" | ||
| propagationBehaviorExtractRestart = "restart" | ||
| propagationBehaviorExtractIgnore = "ignore" | ||
|
|
||
| headerPropagationExtractFirst = "DD_TRACE_PROPAGATION_EXTRACT_FIRST" | ||
| headerPropagationStyleInject = "DD_TRACE_PROPAGATION_STYLE_INJECT" | ||
| headerPropagationStyleExtract = "DD_TRACE_PROPAGATION_STYLE_EXTRACT" | ||
| headerPropagationStyle = "DD_TRACE_PROPAGATION_STYLE" | ||
|
|
@@ -166,7 +182,17 @@ func NewPropagator(cfg *PropagatorConfig, propagators ...Propagator) Propagator | |
| cfg.BaggageHeader = DefaultBaggageHeader | ||
| } | ||
| cp := new(chainedPropagator) | ||
| cp.onlyExtractFirst = internal.BoolEnv("DD_TRACE_PROPAGATION_EXTRACT_FIRST", false) | ||
| cp.onlyExtractFirst = internal.BoolEnv(headerPropagationExtractFirst, false) | ||
| cp.propagationBehaviorExtract = env.Get(headerPropagationBehaviorExtract) | ||
| switch cp.propagationBehaviorExtract { | ||
| case propagationBehaviorExtractContinue, propagationBehaviorExtractRestart, propagationBehaviorExtractIgnore: | ||
| // valid | ||
| default: | ||
| if cp.propagationBehaviorExtract != "" { | ||
| log.Warn("unrecognized propagation behavior: %s. Defaulting to continue", cp.propagationBehaviorExtract) | ||
| } | ||
|
Comment on lines
+191
to
+193
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| cp.propagationBehaviorExtract = propagationBehaviorExtractContinue | ||
| } | ||
| if len(propagators) > 0 { | ||
| cp.injectors = propagators | ||
| cp.extractors = propagators | ||
|
|
@@ -183,11 +209,12 @@ func NewPropagator(cfg *PropagatorConfig, propagators ...Propagator) Propagator | |
| // When injecting, all injectors are called to propagate the span context. | ||
| // When extracting, it tries each extractor, selecting the first successful one. | ||
| type chainedPropagator struct { | ||
| injectors []Propagator | ||
| extractors []Propagator | ||
| injectorNames string | ||
| extractorsNames string | ||
| onlyExtractFirst bool // value of DD_TRACE_PROPAGATION_EXTRACT_FIRST | ||
| injectors []Propagator | ||
| extractors []Propagator | ||
| injectorNames string | ||
| extractorsNames string | ||
| onlyExtractFirst bool // value of DD_TRACE_PROPAGATION_EXTRACT_FIRST | ||
| propagationBehaviorExtract string // value of DD_TRACE_PROPAGATION_BEHAVIOR_EXTRACT | ||
| } | ||
|
|
||
| // getPropagators returns a list of propagators based on ps, which is a comma seperated | ||
|
|
@@ -277,12 +304,86 @@ func (p *chainedPropagator) Inject(spanCtx *SpanContext, carrier any) error { | |
| // subsequent trace context has conflicting trace information, such information will | ||
| // be relayed in the returned SpanContext with a SpanLink. | ||
| func (p *chainedPropagator) Extract(carrier any) (*SpanContext, error) { | ||
| if p.propagationBehaviorExtract == propagationBehaviorExtractIgnore { | ||
| return nil, nil | ||
| } | ||
|
Comment on lines
+307
to
+309
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm still uncertain of this as we might break the implicit contract of not returning Internally we're fine, all callers guard against We should at least document this change in the doc of the function. |
||
|
|
||
| incomingCtx, err := p.extractIncomingSpanContext(carrier) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // "restart" propagation behavior starts a new trace with a new trace ID | ||
| // and sampling decision. The incoming context is referenced via a span | ||
| // link. Baggage is propagated. | ||
| if p.propagationBehaviorExtract == propagationBehaviorExtractRestart { | ||
| ctx := &SpanContext{ | ||
| baggageOnly: true, // signals spanStart to generate new traceID/spanID | ||
| } | ||
|
|
||
| link := SpanLink{ | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should guard against an If we don't we're going to create a spanlink with |
||
| TraceID: incomingCtx.TraceIDLower(), | ||
| TraceIDHigh: incomingCtx.TraceIDUpper(), | ||
| SpanID: incomingCtx.SpanID(), | ||
| Attributes: map[string]string{ | ||
| "reason": "propagation_behavior_extract", | ||
| "context_headers": getPropagatorName(p.extractors[0]), | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The RFC states
Is that enough here or should we return the propagators that actually extracted the context ? If the carrier has only W3C headers (no DD headers), the DD propagator fails, tracecontext succeeds, but the span link says "context_headers": "datadog". |
||
| }, | ||
| } | ||
| if trace := incomingCtx.trace; trace != nil { | ||
| if prio := trace.priority.Load(); prio != nil && uint32(*prio) > 0 { // +checklocksignore - Initialization time, freshly extracted trace not yet shared. | ||
| link.Flags = 1 | ||
| } else { | ||
| link.Flags = 0 | ||
| } | ||
| link.Tracestate = trace.propagatingTag(tracestateHeader) | ||
| } | ||
| ctx.spanLinks = []SpanLink{link} | ||
|
|
||
| // When onlyExtractFirst is set, extractIncomingSpanContext returns after the | ||
| // first successful non-baggage extractor, so incomingCtx carries no baggage. | ||
| // Extract baggage explicitly so it is propagated regardless. | ||
| baggage := incomingCtx.baggage // +checklocksignore | ||
| if p.onlyExtractFirst { | ||
| baggage = p.extractBaggage(carrier) | ||
| } | ||
| if len(baggage) > 0 { | ||
| ctx.baggage = maps.Clone(baggage) // +checklocksignore | ||
| atomic.StoreUint32(&ctx.hasBaggage, 1) | ||
| } | ||
|
|
||
| return ctx, nil | ||
| } | ||
|
|
||
| // "continue" continues the trace from the incoming context. Baggage is | ||
| // propagated. | ||
| return incomingCtx, nil | ||
| } | ||
|
|
||
| // extractBaggage runs only the baggage propagator against the carrier and | ||
| // returns the extracted items. Used when onlyExtractFirst has prevented the | ||
| // baggage propagator from running inside extractIncomingSpanContext. | ||
| func (p *chainedPropagator) extractBaggage(carrier any) map[string]string { | ||
| for _, v := range p.extractors { | ||
| if _, isBaggage := v.(*propagatorBaggage); !isBaggage { | ||
| continue | ||
| } | ||
| if baggageCtx, err := v.Extract(carrier); err == nil && baggageCtx != nil { | ||
| return baggageCtx.baggage // +checklocksignore - Initialization time, freshly extracted ctx not yet shared. | ||
| } | ||
| break // there is only one baggage propagator | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (p *chainedPropagator) extractIncomingSpanContext(carrier any) (*SpanContext, error) { | ||
| var ctx *SpanContext | ||
| var links []SpanLink | ||
| pendingBaggage := make(map[string]string) // used to store baggage items temporarily | ||
|
|
||
| for _, v := range p.extractors { | ||
| firstExtract := (ctx == nil) // ctx stores the most recently extracted ctx across iterations; if it's nil, no extractor has run yet | ||
| // If incomingCtx is nil, no extraction has run yet | ||
| firstExtraction := (ctx == nil) | ||
| extractedCtx, err := v.Extract(carrier) | ||
|
|
||
| // If this is the baggage propagator, just stash its items into pendingBaggage | ||
|
|
@@ -293,7 +394,7 @@ func (p *chainedPropagator) Extract(carrier any) (*SpanContext, error) { | |
| continue | ||
| } | ||
|
|
||
| if firstExtract { | ||
| if firstExtraction { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renaming to read more naturally. |
||
| if err != nil { | ||
| if p.onlyExtractFirst { // Every error is relevant when we are relying on the first extractor | ||
| return nil, err | ||
|
|
@@ -306,35 +407,34 @@ func (p *chainedPropagator) Extract(carrier any) (*SpanContext, error) { | |
| return extractedCtx, nil | ||
| } | ||
| ctx = extractedCtx | ||
| } else { // A local trace context has already been extracted | ||
| extractedCtx2 := extractedCtx | ||
| ctx2 := ctx | ||
|
Comment on lines
-310
to
-311
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed unnecessary alias. |
||
|
|
||
| // If we can't cast to spanContext, we can't propgate tracestate or create span links | ||
| if extractedCtx2.TraceID() == ctx2.TraceID() { | ||
| } else { // A trace context was already extracted by a previous propagator | ||
| // When trace IDs match, merge W3C tracestate and resolve parent ID conflicts. | ||
| // When trace IDs differ, create span links to preserve the terminated context. | ||
| if extractedCtx.TraceID() == ctx.TraceID() { | ||
| if pW3C, ok := v.(*propagatorW3c); ok { | ||
| pW3C.propagateTracestate(ctx2, extractedCtx2) | ||
| // If trace IDs match but span IDs do not, use spanID from `*propagatorW3c` extractedCtx for parenting | ||
| if extractedCtx2.SpanID() != ctx2.SpanID() { | ||
| pW3C.propagateTracestate(ctx, extractedCtx) | ||
| // W3C and Datadog headers may specify different parent span IDs. | ||
| // Prefer W3C's span ID for parenting, and record the Datadog span ID as reparentID. | ||
| if extractedCtx.SpanID() != ctx.SpanID() { | ||
| var ddCtx *SpanContext | ||
| // Grab the datadog-propagated spancontext again | ||
| if ddp := getDatadogPropagator(p); ddp != nil { | ||
| if ddSpanCtx, err := ddp.Extract(carrier); err == nil { | ||
| ddCtx = ddSpanCtx | ||
| } | ||
| } | ||
| overrideDatadogParentID(ctx2, extractedCtx2, ddCtx) | ||
| overrideDatadogParentID(ctx, extractedCtx, ddCtx) | ||
| } | ||
| } | ||
| } else if extractedCtx2 != nil { // Trace IDs do not match - create span links | ||
| link := SpanLink{TraceID: extractedCtx2.TraceIDLower(), SpanID: extractedCtx2.SpanID(), TraceIDHigh: extractedCtx2.TraceIDUpper(), Attributes: map[string]string{"reason": "terminated_context", "context_headers": getPropagatorName(v)}} | ||
| if trace := extractedCtx2.trace; trace != nil { | ||
| } else if extractedCtx != nil { // Trace IDs do not match - create span links | ||
| link := SpanLink{TraceID: extractedCtx.TraceIDLower(), SpanID: extractedCtx.SpanID(), TraceIDHigh: extractedCtx.TraceIDUpper(), Attributes: map[string]string{"reason": "terminated_context", "context_headers": getPropagatorName(v)}} | ||
| if trace := extractedCtx.trace; trace != nil { | ||
| if p := trace.priority.Load(); p != nil && uint32(*p) > 0 { // +checklocksignore - Initialization time, freshly extracted trace not yet shared. | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: We don't need this empty line 😄 |
||
| link.Flags = 1 | ||
| } else { | ||
| link.Flags = 0 | ||
| } | ||
| link.Tracestate = extractedCtx2.trace.propagatingTag(tracestateHeader) | ||
| link.Tracestate = extractedCtx.trace.propagatingTag(tracestateHeader) | ||
| } | ||
| links = append(links, link) | ||
| } | ||
|
|
@@ -587,9 +687,18 @@ func getDatadogPropagator(cp *chainedPropagator) *propagator { | |
| return nil | ||
| } | ||
|
|
||
| // overrideDatadogParentID overrides the span ID of a context with the ID extracted from tracecontext headers. | ||
| // If the reparenting ID is not set on the context, the span ID from datadog headers is used. | ||
| // spanContexts are passed by reference to avoid copying lock value in spanContext type | ||
| // overrideDatadogParentID overrides a context's: | ||
| // 1. span ID with the span ID extracted from W3C tracecontext headers; and | ||
| // 2. reparent ID with either: | ||
| // - the reparent ID from W3C tracecontext headers (if set), or | ||
| // - the span ID from Datadog headers (as fallback). | ||
| // | ||
| // reparent ID is the last known Datadog parent span ID, used by Datadog's | ||
| // backend to fix broken parent-child relationships when non-Datadog tracers | ||
| // in the path don't report spans to Datadog. | ||
| // | ||
| // SpanContexts are passed by reference to avoid copying lock information in | ||
| // the SpanContext type. | ||
| func overrideDatadogParentID(ctx, w3cCtx, ddCtx *SpanContext) { | ||
| if ctx == nil || w3cCtx == nil || ddCtx == nil { | ||
| return | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we compare the entire 128bit IDs ?