Skip to content

Commit bcab903

Browse files
committed
reimplement connect to request propagation
1 parent 2160fd8 commit bcab903

6 files changed

Lines changed: 147 additions & 21 deletions

File tree

dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ public AsyncPropagatingDisableInstrumentation() {
4949
named("io.reactivex.internal.schedulers.AbstractDirectTask");
5050
private static final ElementMatcher<TypeDescription> JAVA_HTTP_CLIENT =
5151
extendsClass(named("java.net.http.HttpClient"));
52-
private static final ElementMatcher<TypeDescription> REACTOR_NETTY_HTTP_CONNECT =
53-
named("reactor.netty.http.client.HttpClientConnect$MonoHttpConnect");
5452

5553
@Override
5654
public boolean onlyMatchKnownTypes() {
@@ -85,8 +83,7 @@ public String[] knownMatchingTypes() {
8583
"org.apache.activemq.broker.TransactionBroker",
8684
"com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager",
8785
"io.reactivex.internal.schedulers.AbstractDirectTask",
88-
"jdk.internal.net.http.HttpClientImpl",
89-
"reactor.netty.http.client.HttpClientConnect$MonoHttpConnect"
86+
"jdk.internal.net.http.HttpClientImpl"
9087
};
9188
}
9289

@@ -101,8 +98,7 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
10198
.or(GRPC_MANAGED_CHANNEL)
10299
.or(REACTOR_DISABLED_TYPE_INITIALIZERS)
103100
.or(RXJAVA2_DISABLED_TYPE_INITIALIZERS)
104-
.or(JAVA_HTTP_CLIENT)
105-
.or(REACTOR_NETTY_HTTP_CONNECT);
101+
.or(JAVA_HTTP_CLIENT);
106102
}
107103

108104
@Override
@@ -189,10 +185,6 @@ public void methodAdvice(MethodTransformer transformer) {
189185
transformer.applyAdvice(
190186
isTypeInitializer().and(isDeclaredBy(RXJAVA2_DISABLED_TYPE_INITIALIZERS)), advice);
191187
transformer.applyAdvice(namedOneOf("sendAsync").and(isDeclaredBy(JAVA_HTTP_CLIENT)), advice);
192-
// Avoid leaking continuations on the netty event loop that initiates the tasks that handle the
193-
// request lifecycle
194-
transformer.applyAdvice(
195-
named("subscribe").and(isDeclaredBy(REACTOR_NETTY_HTTP_CONNECT)), advice);
196188
}
197189

198190
public static class DisableAsyncAdvice {

dd-java-agent/instrumentation/netty/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/client/HttpClientRequestTracingHandler.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import datadog.trace.api.Config;
1919
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2020
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
21+
import io.netty.channel.Channel;
2122
import io.netty.channel.ChannelHandler;
2223
import io.netty.channel.ChannelHandlerContext;
2324
import io.netty.channel.ChannelOutboundHandlerAdapter;
@@ -54,8 +55,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
5455
}
5556

5657
AgentScope parentScope = null;
57-
final AgentScope.Continuation continuation =
58-
ctx.channel().attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
58+
final AgentScope.Continuation continuation = takeConnectContinuation(ctx.channel());
5959
if (continuation != null) {
6060
parentScope = continuation.activate();
6161
}
@@ -111,4 +111,16 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
111111
}
112112
}
113113
}
114+
115+
private static AgentScope.Continuation takeConnectContinuation(final Channel channel) {
116+
AgentScope.Continuation continuation =
117+
channel.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
118+
if (continuation == null) {
119+
final Channel parent = channel.parent();
120+
if (parent != null) {
121+
continuation = parent.attr(CONNECT_PARENT_CONTINUATION_ATTRIBUTE_KEY).getAndRemove();
122+
}
123+
}
124+
return continuation;
125+
}
114126
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package datadog.trace.instrumentation.reactor.netty;
2+
3+
import static datadog.trace.instrumentation.reactor.netty.CaptureConnectSpan.CONNECT_SPAN;
4+
5+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
6+
import org.reactivestreams.Subscription;
7+
import reactor.core.CoreSubscriber;
8+
import reactor.netty.Connection;
9+
import reactor.util.context.Context;
10+
11+
public final class ConnectSpanSubscriber implements CoreSubscriber<Connection> {
12+
13+
private final CoreSubscriber<? super Connection> actual;
14+
private final AgentSpan span;
15+
16+
public ConnectSpanSubscriber(
17+
final CoreSubscriber<? super Connection> actual, final AgentSpan span) {
18+
this.actual = actual;
19+
this.span = span;
20+
}
21+
22+
@Override
23+
public void onSubscribe(final Subscription subscription) {
24+
actual.onSubscribe(subscription);
25+
}
26+
27+
@Override
28+
public void onNext(final Connection connection) {
29+
actual.onNext(connection);
30+
}
31+
32+
@Override
33+
public void onError(final Throwable throwable) {
34+
actual.onError(throwable);
35+
}
36+
37+
@Override
38+
public void onComplete() {
39+
actual.onComplete();
40+
}
41+
42+
@Override
43+
public Context currentContext() {
44+
return actual.currentContext().put(CONNECT_SPAN, span);
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package datadog.trace.instrumentation.reactor.netty;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
4+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
5+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled;
6+
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.setAsyncPropagationEnabled;
7+
import static net.bytebuddy.matcher.ElementMatchers.named;
8+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
9+
10+
import com.google.auto.service.AutoService;
11+
import datadog.trace.agent.tooling.Instrumenter;
12+
import datadog.trace.agent.tooling.InstrumenterModule;
13+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
14+
import net.bytebuddy.asm.Advice;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import reactor.core.CoreSubscriber;
17+
import reactor.netty.Connection;
18+
19+
/**
20+
* Suppresses generic async captures created while Reactor Netty subscribes to connection setup.
21+
*
22+
* <p>The subscriber is wrapped first so the active span is still available from Reactor context;
23+
* {@link TransferConnectSpan} later turns that context value into the continuation consumed by
24+
* Netty request tracing.
25+
*/
26+
@AutoService(InstrumenterModule.class)
27+
public class MonoHttpConnectInstrumentation extends InstrumenterModule.Tracing
28+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
29+
30+
public MonoHttpConnectInstrumentation() {
31+
super("reactor-netty", "reactor-netty-1");
32+
}
33+
34+
@Override
35+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
36+
// Avoid matching pre-1.0 releases which are not compatible.
37+
return hasClassNamed("reactor.netty.transport.AddressUtils");
38+
}
39+
40+
@Override
41+
public String[] helperClassNames() {
42+
return new String[] {
43+
packageName + ".ConnectSpanSubscriber",
44+
};
45+
}
46+
47+
@Override
48+
public String instrumentedType() {
49+
return "reactor.netty.http.client.HttpClientConnect$MonoHttpConnect";
50+
}
51+
52+
@Override
53+
public void methodAdvice(MethodTransformer transformer) {
54+
transformer.applyAdvice(
55+
named("subscribe").and(takesArgument(0, named("reactor.core.CoreSubscriber"))),
56+
getClass().getName() + "$SubscribeAdvice");
57+
}
58+
59+
public static class SubscribeAdvice {
60+
@Advice.OnMethodEnter(suppress = Throwable.class)
61+
public static boolean before(
62+
@Advice.Argument(value = 0, readOnly = false)
63+
CoreSubscriber<? super Connection> subscriber) {
64+
final AgentSpan span = activeSpan();
65+
if (span != null) {
66+
subscriber = new ConnectSpanSubscriber(subscriber, span);
67+
}
68+
if (isAsyncPropagationEnabled()) {
69+
setAsyncPropagationEnabled(false);
70+
return true;
71+
}
72+
return false;
73+
}
74+
75+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
76+
public static void after(@Advice.Enter final boolean wasDisabled) {
77+
if (wasDisabled) {
78+
setAsyncPropagationEnabled(true);
79+
}
80+
}
81+
}
82+
}

dd-java-agent/instrumentation/reactor-netty-1.0/src/main/java/datadog/trace/instrumentation/reactor/netty/TransferConnectSpan.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,9 @@ public void accept(HttpClientRequest httpClientRequest, Connection connection) {
2424
current.cancel();
2525
}
2626

27-
// A http2 channel (Http2StreamChannel, H2C prior-knowledge), operates a child stream level by
28-
// design.
29-
// ConnectAdvice stores a continuation on the parent TCP channel at connect time hence this
30-
// will be never canceled
27+
// HTTP/2 requests operate on stream channels. Netty captures the TCP connect continuation on
28+
// the parent channel, but request tracing consumes the request-specific continuation from the
29+
// stream channel, so the parent copy must not keep the trace open.
3130
final Channel parent = channel.parent();
3231
if (parent != null) {
3332
final Continuation parentCurrent =

dd-java-agent/instrumentation/spring/spring-webflux/spring-webflux-5.0/src/bootTest/groovy/SpringWebfluxTest.groovy

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,6 @@ class SpringWebfluxTest extends InstrumentationSpecification {
5353

5454
WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build()
5555

56-
@Override
57-
boolean useStrictTraceWrites() {
58-
false
59-
}
60-
6156
def "Basic GET test #testName"() {
6257
setup:
6358
String url = "http://localhost:$port$urlPath"

0 commit comments

Comments
 (0)