Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,42 @@ tasks.named("compileJava") {
}

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteForDir('legacyNettyTest', 'test')

// legacyNettyTest reuses the whole src/test tree but only exists to verify the non-shaded Netty
// ScheduledFutureTask / idle-timer propagation on the pre-4.1.44 line (forced to 4.1.9 below).
// Restrict it to those tests so the ~40-file module suite is not run a second time per pipeline.
// GrpcShadedNetty... is intentionally excluded: grpc-netty-shaded bundles its own (modern) relocated
// Netty, which the netty-all 4.1.9 force does not affect, so running it here would not test 4.1.9.
tasks.named('legacyNettyTest', Test) {
filter {
includeTestsMatching 'executor.NettyScheduledFutureTaskContextPropagationTest'
includeTestsMatching 'executor.NettyIdleTimeoutContextPropagationTest'
}
}

dependencies {
testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
testImplementation libs.guava
testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final'

// Netty 4.1.44 is the first version where a delayed ScheduledFutureTask is run
// once when it is enqueued and again when the delay expires.
testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.44.Final'
// Shaded Netty copy to verify the ScheduledFutureTask fix also applies to relocated Netty.
testImplementation group: 'io.grpc', name: 'grpc-netty-shaded', version: '1.58.0'
testImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '7.0.0'

// Tomcat 10.1.+ seems to require Java 11. Limit to fix build.
// TODO: Tomcat 10.0.10 has a copy of the JSR166 ThreadPoolExecutor so it needs special instrumentation
latestDepTestImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '10.0.8'
latestDepTestImplementation group: 'io.netty', name: 'netty-all', version: '4+'

// Legacy netty version to test.
legacyNettyTestImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final'
}

['legacyNettyTestCompileClasspath', 'legacyNettyTestRuntimeClasspath'].each { name ->
configurations.named(name).configure {
resolutionStrategy.force 'io.netty:netty-all:4.1.9.Final'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ public String[] knownMatchingTypes() {
"rx.internal.operators.OperatorTimeoutBase",
"com.amazonaws.http.timers.request.HttpRequestTimer",
"io.netty.handler.timeout.WriteTimeoutHandler",
"io.netty.handler.timeout.IdleStateHandler",
"io.grpc.netty.shaded.io.netty.handler.timeout.IdleStateHandler",
"play.shaded.ahc.io.netty.handler.timeout.IdleStateHandler",
"com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler",
"java.util.concurrent.ScheduledThreadPoolExecutor",
"io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe",
"io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe",
Expand Down Expand Up @@ -121,6 +125,14 @@ public void methodAdvice(MethodTransformer transformer) {
named("scheduleTimeout")
.and(isDeclaredBy(named("io.netty.handler.timeout.WriteTimeoutHandler"))),
advice);
// ReadTimeoutHandler and IdleStateHandler both schedule long-lived framework-owned idle timers
// through IdleStateHandler#schedule (ReadTimeoutHandler inherits it). Suppressing propagation
// here stops those timers from capturing an active request span and holding the trace open. The
// nameEndsWith match also covers the shaded Netty copies (grpc-shaded, Play WS, Couchbase).
transformer.applyAdvice(
named("schedule")
.and(isDeclaredBy(nameEndsWith(".netty.handler.timeout.IdleStateHandler"))),
advice);
transformer.applyAdvice(
named("rescheduleIdleTimer").and(isDeclaredBy(GRPC_MANAGED_CHANNEL)), advice);
transformer.applyAdvice(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -147,7 +149,21 @@ public static <T> void captureScope(@Advice.This RunnableFuture<T> task) {
public static final class Run {
@Advice.OnMethodEnter
public static <T> AgentScope activate(@Advice.This RunnableFuture<T> task) {
return startTaskScope(InstrumentationContext.get(RunnableFuture.class, State.class), task);
// Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue delayed tasks
// scheduled from outside the event loop (while the delay is still positive), then again when
// the deadline elapses (delay <= 0). Skip activation on the early enqueue run when there is a
// captured State to preserve, so the continuation survives for the actual fire — otherwise
// startTaskScope() would consume it here. The endsWith check intentionally also matches the
// shaded Netty copies (grpc-shaded, play-shaded, couchbase-deps), which behave the same way;
// the JDK's own ScheduledThreadPoolExecutor$ScheduledFutureTask does not match.
State state = InstrumentationContext.get(RunnableFuture.class, State.class).get(task);
if (state != null
&& task instanceof ScheduledFuture
&& task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask")
&& ((ScheduledFuture<?>) task).getDelay(TimeUnit.NANOSECONDS) > 0) {
return null;
}
return startTaskScope(state);
}

@Advice.OnMethodExit(onThrowable = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package executor;

import static datadog.trace.agent.test.assertions.SpanMatcher.span;
import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME;
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertTrue;

import datadog.trace.agent.test.AbstractInstrumentationTest;
import datadog.trace.api.Trace;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.grpc.netty.shaded.io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;

/**
* The fix in {@code RunnableFutureInstrumentation} matches Netty's {@code ScheduledFutureTask} by
* the {@code .netty.util.concurrent.ScheduledFutureTask} class-name suffix so it also covers shaded
* Netty copies. This verifies context propagation through a delayed task on grpc-netty-shaded's
* (relocated) {@code DefaultEventExecutorGroup}.
*/
class GrpcShadedNettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest {
@Test
void testDelayedTaskPropagatesContextWithShadedNetty() throws Exception {
try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) {
EventExecutor executor = group.next();
TraceableTask task = new TraceableTask();
AgentSpan parent = startSpan("test", "parent");

try (AgentScope ignored = activateSpan(parent)) {
executor.schedule(task, 50, MILLISECONDS);
} finally {
parent.finish();
}

assertTrue(task.finished.await(5, SECONDS));
assertTrue(task.sawActiveSpan.get());
assertTraces(
trace(
SORT_BY_START_TIME,
span().root().operationName("parent"),
span().childOfPrevious().operationName("asyncChild")));
}
}

private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup
implements AutoCloseable {
private CloseableDefaultEventExecutorGroup() {
super(1);
}

@Override
public void close() {
try {
shutdownGracefully(0, 1, SECONDS).sync();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

private static final class TraceableTask implements Runnable {
private final CountDownLatch finished = new CountDownLatch(1);
private final AtomicBoolean sawActiveSpan = new AtomicBoolean();

@Override
public void run() {
sawActiveSpan.set(activeSpan() != null);
try {
asyncChild();
} finally {
finished.countDown();
}
}

@Trace(operationName = "asyncChild")
private void asyncChild() {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package executor;

import static datadog.trace.agent.test.assertions.SpanMatcher.span;
import static datadog.trace.agent.test.assertions.TraceMatcher.trace;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;

import datadog.trace.agent.test.AbstractInstrumentationTest;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.junit.jupiter.api.Test;

class NettyIdleTimeoutContextPropagationTest extends AbstractInstrumentationTest {

@Test
void testIdleStateHandlerDoesNotHoldTraceOpen() {
assertTimeoutHandlerDoesNotHoldTraceOpen(new IdleStateHandler(60, 60, 60));
}

@Test
void testReadTimeoutHandlerDoesNotHoldTraceOpen() {
// ReadTimeoutHandler inherits IdleStateHandler#schedule, so the same suppression must apply.
assertTimeoutHandlerDoesNotHoldTraceOpen(new ReadTimeoutHandler(60));
}

private void assertTimeoutHandlerDoesNotHoldTraceOpen(io.netty.channel.ChannelHandler handler) {
// An EmbeddedChannel is active and registered, so adding the handler triggers
// IdleStateHandler#schedule for its long (60s) idle timers.
EmbeddedChannel channel = new EmbeddedChannel();
AgentSpan parent = startSpan("test", "parent");
try (AgentScope ignored = activateSpan(parent)) {
channel.pipeline().addLast(handler);
} finally {
parent.finish();
}

// If the idle timers had captured the active request continuation, the trace would stay open
// until they fire (60s) and this assertion would time out. Suppression lets it report promptly.
assertTraces(trace(span().root().operationName("parent")));

channel.finishAndReleaseAll();
}
}
Loading
Loading