From 47cc8215925076838860d96918274a00d4c2a2dc Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Mon, 11 May 2026 09:15:28 -0400 Subject: [PATCH 1/4] Fixed Netty scheduled executor context propagation on newer versions of netty starting at 4.1.44 (2019-12-18). --- .../java-concurrent/java-concurrent-1.8/build.gradle | 2 +- .../runnable/RunnableFutureInstrumentation.java | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle index ee85bef77fa..d08083d80f7 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle @@ -15,7 +15,7 @@ addTestSuiteForDir('latestDepTest', 'test') dependencies { testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') testImplementation libs.guava - testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final' + testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.50.Final' testImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '7.0.0' // Tomcat 10.1.+ seems to require Java 11. Limit to fix build. // TODO: Tomcat 10.0.10 has a copy of the JSR166 ThreadPoolExecutor so it needs special instrumentation diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java index a783258c421..074a5dc294e 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -147,6 +149,13 @@ public static void captureScope(@Advice.This RunnableFuture task) { public static final class Run { @Advice.OnMethodEnter public static AgentScope activate(@Advice.This RunnableFuture task) { + // Newer Netty versions may run scheduled tasks once before they expire to enqueue them. + if (task instanceof ScheduledFuture + && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask") + && ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS) > 0) { + return null; + } + return startTaskScope(InstrumentationContext.get(RunnableFuture.class, State.class), task); } From 77b14f21b668f99e649ca733a32d98237dd2c5af Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Mon, 11 May 2026 16:29:57 -0400 Subject: [PATCH 2/4] Fixed failed tests and cover legacy version testing. --- .../java-concurrent-1.8/build.gradle | 16 +- ...syncPropagatingDisableInstrumentation.java | 8 +- .../RunnableFutureInstrumentation.java | 24 ++- ...duledFutureTaskContextPropagationTest.java | 149 ++++++++++++++++++ 4 files changed, 188 insertions(+), 9 deletions(-) create mode 100644 dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle index d08083d80f7..cce3fda5365 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle @@ -11,14 +11,28 @@ tasks.named("compileJava") { } addTestSuiteForDir('latestDepTest', 'test') +addTestSuiteForDir('legacyNettyTest', 'test') dependencies { testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') testImplementation libs.guava - testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.50.Final' + + // Netty 4.1.44 is the first version where a delayed ScheduledFutureTask is run + // once when it is enqueued and again when the delay expires. + testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.44.Final' testImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '7.0.0' + // Tomcat 10.1.+ seems to require Java 11. Limit to fix build. // TODO: Tomcat 10.0.10 has a copy of the JSR166 ThreadPoolExecutor so it needs special instrumentation latestDepTestImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '10.0.8' + latestDepTestImplementation group: 'io.netty', name: 'netty-all', version: '4+' + + // Legacy netty version to test. + legacyNettyTestImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final' } +['legacyNettyTestCompileClasspath', 'legacyNettyTestRuntimeClasspath'].each { name -> + configurations.named(name).configure { + resolutionStrategy.force 'io.netty:netty-all:4.1.9.Final' + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index 1d543a705d2..e614434eff2 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -77,7 +77,8 @@ public String[] knownMatchingTypes() { "net.sf.ehcache.store.disk.DiskStorageFactory", "org.springframework.jms.listener.DefaultMessageListenerContainer", "org.apache.activemq.broker.TransactionBroker", - "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager" + "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", + "io.vertx.core.impl.VertxImpl$InternalTimerHandler" }; } @@ -170,6 +171,11 @@ public void methodAdvice(MethodTransformer transformer) { named( "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager"))), advice); + // Vert.x timer handlers can reschedule framework work while a captured timer scope is active. + transformer.applyAdvice( + namedOneOf("run", "handle") + .and(isDeclaredBy(named("io.vertx.core.impl.VertxImpl$InternalTimerHandler"))), + advice); transformer.applyAdvice( isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); } diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java index 074a5dc294e..617c5b0d1aa 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java @@ -22,6 +22,7 @@ import datadog.trace.agent.tooling.ExcludeFilterProvider; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter; @@ -142,21 +143,30 @@ public static final class Construct { @Advice.OnMethodExit public static void captureScope(@Advice.This RunnableFuture task) { - capture(InstrumentationContext.get(RunnableFuture.class, State.class), task); + ContextStore contextStore = + InstrumentationContext.get(RunnableFuture.class, State.class); + capture(contextStore, task); } } public static final class Run { @Advice.OnMethodEnter public static AgentScope activate(@Advice.This RunnableFuture task) { - // Newer Netty versions may run scheduled tasks once before they expire to enqueue them. + ContextStore contextStore = + InstrumentationContext.get(RunnableFuture.class, State.class); + + // Netty 4.1.44+ invokes ScheduledFutureTask.run() twice for tasks scheduled + // from outside the event loop: once to self-enqueue while the delay is + // still positive, then again when the deadline elapses. Skip the first + // call so the captured continuation survives for the actual fire. if (task instanceof ScheduledFuture - && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask") - && ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS) > 0) { - return null; + && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask")) { + long delayNanos = ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS); + if (delayNanos > 0) { + return null; + } } - - return startTaskScope(InstrumentationContext.get(RunnableFuture.class, State.class), task); + return startTaskScope(contextStore, task); } @Advice.OnMethodExit(onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java new file mode 100644 index 00000000000..c7811f5a331 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java @@ -0,0 +1,149 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.util.Version; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +class NettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest { + @Test + void testNettyVersionCompatible() { + assertFalse(isCompatibleVersion("4.0.0.Final")); + assertFalse(isCompatibleVersion("4.1.9.Final")); + assertFalse(isCompatibleVersion("4.1.43.Final")); + assertTrue(isCompatibleVersion("4.1.44.Final")); + assertTrue(isCompatibleVersion("4.2.13.Final")); + assertTrue(isCompatibleVersion("5.0.0.Alpha2")); + assertTrue(isCompatibleVersion("5.0.0.Final")); + } + + @Test + void testDelayedScheduledFutureTaskActivatesCapturedContinuationWhenDelayExpires() + throws Exception { + assumeTrue(hasCompatibleVersion()); + + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + BlockingTraceableTask task = new BlockingTraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + // Netty 4.1.44+ calls ScheduledFutureTask.run() once while enqueueing a delayed task and + // again when the delay expires. The continuation captured here must survive the enqueue run. + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + // When the delayed task actually runs, instrumentation should activate the captured + // continuation so traced work in the task remains a child of the scheduling span. + assertTrue(task.started.await(5, SECONDS)); + try { + assertTrue(task.sawActiveSpan.get()); + } finally { + task.proceed.countDown(); + } + assertTrue(task.finished.await(5, SECONDS)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + private static boolean hasCompatibleVersion() { + for (Map.Entry entry : Version.identify().entrySet()) { + if (entry.getKey().startsWith("netty-")) { + return isCompatibleVersion(entry.getValue().artifactVersion()); + } + } + return false; + } + + private static boolean isCompatibleVersion(String version) { + String[] parts = version.split("\\."); + if (parts.length < 3) { + return false; + } + + int major = Integer.parseInt(parts[0]); + int minor = Integer.parseInt(parts[1]); + int patch = Integer.parseInt(parts[2]); + + if (major > 4) { + return true; + } + + if (major != 4) { + return false; + } + + if (minor > 1) { + return true; + } + + // Netty 4.1.44+ compatible with new logic. + return patch >= 44; + } + + private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup + implements AutoCloseable { + private CloseableDefaultEventExecutorGroup() { + super(1); + } + + @Override + public void close() { + try { + shutdownGracefully(0, 1, SECONDS).sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class BlockingTraceableTask implements Runnable { + private final CountDownLatch started = new CountDownLatch(1); + private final CountDownLatch proceed = new CountDownLatch(1); + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + started.countDown(); + try { + proceed.await(5, SECONDS); + asyncChild(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } +} From e8b08b4c1fd93c15bc963ede362d751224e6967b Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Tue, 12 May 2026 09:01:18 -0400 Subject: [PATCH 3/4] Improved fix. --- ...syncPropagatingDisableInstrumentation.java | 69 ++++++++++++++-- .../RunnableFutureInstrumentation.java | 13 +-- ...duledFutureTaskContextPropagationTest.java | 2 +- ...ertxTimerContextPropagationForkedTest.java | 82 +++++++++++++++++++ 4 files changed, 154 insertions(+), 12 deletions(-) create mode 100644 dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index e614434eff2..73fa02c9fc6 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -10,6 +10,7 @@ import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; @@ -45,6 +46,7 @@ public AsyncPropagatingDisableInstrumentation() { nameEndsWith("io.grpc.internal.ManagedChannelImpl"); private static final ElementMatcher REACTOR_DISABLED_TYPE_INITIALIZERS = namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask"); + private static final String VERTX_IMPL = "io.vertx.core.impl.VertxImpl"; @Override public boolean onlyMatchKnownTypes() { @@ -78,7 +80,7 @@ public String[] knownMatchingTypes() { "org.springframework.jms.listener.DefaultMessageListenerContainer", "org.apache.activemq.broker.TransactionBroker", "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", - "io.vertx.core.impl.VertxImpl$InternalTimerHandler" + VERTX_IMPL }; } @@ -171,11 +173,19 @@ public void methodAdvice(MethodTransformer transformer) { named( "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager"))), advice); - // Vert.x timer handlers can reschedule framework work while a captured timer scope is active. + // Vert.x can schedule long-running internal timers while a request span is active. + // Suppress propagation only for Vert.x-owned timer handlers so user timers still keep context. + String disableVertxInternalTimerAdvice = + getClass().getName() + "$DisableVertxInternalTimerAdvice"; transformer.applyAdvice( - namedOneOf("run", "handle") - .and(isDeclaredBy(named("io.vertx.core.impl.VertxImpl$InternalTimerHandler"))), - advice); + named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(4)), + disableVertxInternalTimerAdvice); + transformer.applyAdvice( + named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(6)), + disableVertxInternalTimerAdvice); + transformer.applyAdvice( + named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(7)), + disableVertxInternalTimerAdvice); transformer.applyAdvice( isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); } @@ -198,4 +208,53 @@ public static void after(@Advice.Enter boolean wasDisabled) { } } } + + public static class DisableVertxInternalTimerAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static boolean before(@Advice.AllArguments Object[] args) { + for (Object arg : args) { + if (isVertxInternalHandler(arg)) { + return DisableAsyncAdvice.before(); + } + } + return false; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void after(@Advice.Enter boolean wasDisabled) { + DisableAsyncAdvice.after(wasDisabled); + } + + private static boolean isVertxInternalHandler(Object arg) { + if (arg == null || !arg.getClass().getName().startsWith("io.vertx.")) { + return false; + } + return implementsVertxHandler(arg.getClass()); + } + + private static boolean implementsVertxHandler(Class clazz) { + while (clazz != null) { + for (Class iface : clazz.getInterfaces()) { + if (isVertxHandler(iface)) { + return true; + } + } + clazz = clazz.getSuperclass(); + } + return false; + } + + private static boolean isVertxHandler(Class iface) { + if ("io.vertx.core.Handler".equals(iface.getName())) { + return true; + } + for (Class parent : iface.getInterfaces()) { + if (isVertxHandler(parent)) { + return true; + } + } + return false; + } + } } diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java index 617c5b0d1aa..3a0f9c76a7b 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java @@ -155,18 +155,19 @@ public static AgentScope activate(@Advice.This RunnableFuture task) { ContextStore contextStore = InstrumentationContext.get(RunnableFuture.class, State.class); - // Netty 4.1.44+ invokes ScheduledFutureTask.run() twice for tasks scheduled - // from outside the event loop: once to self-enqueue while the delay is - // still positive, then again when the deadline elapses. Skip the first - // call so the captured continuation survives for the actual fire. + // Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue + // delayed tasks scheduled from outside the event loop, then again when + // the deadline elapses. Only skip the first call when there is a captured + // continuation to preserve for the actual fire. + State state = contextStore.get(task); if (task instanceof ScheduledFuture && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask")) { long delayNanos = ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS); - if (delayNanos > 0) { + if (delayNanos > 0 && state != null && state.getSpan() != null) { return null; } } - return startTaskScope(contextStore, task); + return startTaskScope(state); } @Advice.OnMethodExit(onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java index c7811f5a331..66992d13e34 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java @@ -103,7 +103,7 @@ private static boolean isCompatibleVersion(String version) { return true; } - // Netty 4.1.44+ compatible with new logic. + // Since 4.1.44+ Netty uses a self-enqueue path for delayed tasks. return patch >= 44; } diff --git a/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java new file mode 100644 index 00000000000..9e3037efd7c --- /dev/null +++ b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java @@ -0,0 +1,82 @@ +package server; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.vertx.core.Vertx; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class VertxTimerContextPropagationForkedTest extends AbstractInstrumentationTest { + @Test + void testTimerCallbackCanPropagateContextToNestedTimer() throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch nestedTimerFinished = new CountDownLatch(1); + AtomicBoolean firstTimerSawActiveSpan = new AtomicBoolean(); + AtomicBoolean nestedTimerSawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + vertx.setTimer( + 10, + firstTimerId -> { + try { + firstTimerSawActiveSpan.set(activeSpan() != null); + + // A timer callback is user code. Async propagation must stay enabled here so work + // scheduled from the callback can keep the captured timer context. + vertx.setTimer( + 10, + nestedTimerId -> { + try { + nestedTimerSawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + nestedTimerFinished.countDown(); + } + }); + } catch (Throwable t) { + failure.set(t); + nestedTimerFinished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(nestedTimerFinished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(firstTimerSawActiveSpan.get()); + assertTrue(nestedTimerSawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + vertx.close().await(5, SECONDS); + } + } + + @Trace(operationName = "asyncChild") + private static void asyncChild() {} +} From fd947b6f34a88b10e9fe956d00225010696591dc Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Wed, 3 Jun 2026 18:55:07 -0400 Subject: [PATCH 4/4] Improved netty support and applied review comments. --- .../java-concurrent-1.8/build.gradle | 14 ++ ...syncPropagatingDisableInstrumentation.java | 80 ++------- .../RunnableFutureInstrumentation.java | 32 ++-- ...duledFutureTaskContextPropagationTest.java | 86 ++++++++++ ...ettyIdleTimeoutContextPropagationTest.java | 46 +++++ ...duledFutureTaskContextPropagationTest.java | 91 ++++++++-- ...ertxTimerContextPropagationForkedTest.java | 161 ++++++++++++++++++ 7 files changed, 409 insertions(+), 101 deletions(-) create mode 100644 dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java create mode 100644 dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java create mode 100644 dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle index cce3fda5365..d19f2d5dae8 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle @@ -13,6 +13,18 @@ tasks.named("compileJava") { addTestSuiteForDir('latestDepTest', 'test') addTestSuiteForDir('legacyNettyTest', 'test') +// legacyNettyTest reuses the whole src/test tree but only exists to verify the non-shaded Netty +// ScheduledFutureTask / idle-timer propagation on the pre-4.1.44 line (forced to 4.1.9 below). +// Restrict it to those tests so the ~40-file module suite is not run a second time per pipeline. +// GrpcShadedNetty... is intentionally excluded: grpc-netty-shaded bundles its own (modern) relocated +// Netty, which the netty-all 4.1.9 force does not affect, so running it here would not test 4.1.9. +tasks.named('legacyNettyTest', Test) { + filter { + includeTestsMatching 'executor.NettyScheduledFutureTaskContextPropagationTest' + includeTestsMatching 'executor.NettyIdleTimeoutContextPropagationTest' + } +} + dependencies { testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') testImplementation libs.guava @@ -20,6 +32,8 @@ dependencies { // Netty 4.1.44 is the first version where a delayed ScheduledFutureTask is run // once when it is enqueued and again when the delay expires. testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.44.Final' + // Shaded Netty copy to verify the ScheduledFutureTask fix also applies to relocated Netty. + testImplementation group: 'io.grpc', name: 'grpc-netty-shaded', version: '1.58.0' testImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '7.0.0' // Tomcat 10.1.+ seems to require Java 11. Limit to fix build. diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index ca404f3e040..79cd141ba41 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -10,7 +10,6 @@ import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME; import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; @@ -48,7 +47,6 @@ public AsyncPropagatingDisableInstrumentation() { namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask"); private static final ElementMatcher RXJAVA2_DISABLED_TYPE_INITIALIZERS = named("io.reactivex.internal.schedulers.AbstractDirectTask"); - private static final String VERTX_IMPL = "io.vertx.core.impl.VertxImpl"; @Override public boolean onlyMatchKnownTypes() { @@ -61,6 +59,10 @@ public String[] knownMatchingTypes() { "rx.internal.operators.OperatorTimeoutBase", "com.amazonaws.http.timers.request.HttpRequestTimer", "io.netty.handler.timeout.WriteTimeoutHandler", + "io.netty.handler.timeout.IdleStateHandler", + "io.grpc.netty.shaded.io.netty.handler.timeout.IdleStateHandler", + "play.shaded.ahc.io.netty.handler.timeout.IdleStateHandler", + "com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler", "java.util.concurrent.ScheduledThreadPoolExecutor", "io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe", "io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe", @@ -82,9 +84,7 @@ public String[] knownMatchingTypes() { "org.springframework.jms.listener.DefaultMessageListenerContainer", "org.apache.activemq.broker.TransactionBroker", "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", - "io.reactivex.internal.schedulers.AbstractDirectTask", - "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", - VERTX_IMPL + "io.reactivex.internal.schedulers.AbstractDirectTask" }; } @@ -121,6 +121,14 @@ public void methodAdvice(MethodTransformer transformer) { named("scheduleTimeout") .and(isDeclaredBy(named("io.netty.handler.timeout.WriteTimeoutHandler"))), advice); + // ReadTimeoutHandler and IdleStateHandler both schedule long-lived framework-owned idle timers + // through IdleStateHandler#schedule (ReadTimeoutHandler inherits it). Suppressing propagation + // here stops those timers from capturing an active request span and holding the trace open. The + // nameEndsWith match also covers the shaded Netty copies (grpc-shaded, Play WS, Couchbase). + transformer.applyAdvice( + named("schedule") + .and(isDeclaredBy(nameEndsWith(".netty.handler.timeout.IdleStateHandler"))), + advice); transformer.applyAdvice( named("rescheduleIdleTimer").and(isDeclaredBy(GRPC_MANAGED_CHANNEL)), advice); transformer.applyAdvice( @@ -180,19 +188,6 @@ public void methodAdvice(MethodTransformer transformer) { named( "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager"))), advice); - // Vert.x can schedule long-running internal timers while a request span is active. - // Suppress propagation only for Vert.x-owned timer handlers so user timers still keep context. - String disableVertxInternalTimerAdvice = - getClass().getName() + "$DisableVertxInternalTimerAdvice"; - transformer.applyAdvice( - named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(4)), - disableVertxInternalTimerAdvice); - transformer.applyAdvice( - named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(6)), - disableVertxInternalTimerAdvice); - transformer.applyAdvice( - named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(7)), - disableVertxInternalTimerAdvice); transformer.applyAdvice( isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); transformer.applyAdvice( @@ -217,53 +212,4 @@ public static void after(@Advice.Enter boolean wasDisabled) { } } } - - public static class DisableVertxInternalTimerAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static boolean before(@Advice.AllArguments Object[] args) { - for (Object arg : args) { - if (isVertxInternalHandler(arg)) { - return DisableAsyncAdvice.before(); - } - } - return false; - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void after(@Advice.Enter boolean wasDisabled) { - DisableAsyncAdvice.after(wasDisabled); - } - - private static boolean isVertxInternalHandler(Object arg) { - if (arg == null || !arg.getClass().getName().startsWith("io.vertx.")) { - return false; - } - return implementsVertxHandler(arg.getClass()); - } - - private static boolean implementsVertxHandler(Class clazz) { - while (clazz != null) { - for (Class iface : clazz.getInterfaces()) { - if (isVertxHandler(iface)) { - return true; - } - } - clazz = clazz.getSuperclass(); - } - return false; - } - - private static boolean isVertxHandler(Class iface) { - if ("io.vertx.core.Handler".equals(iface.getName())) { - return true; - } - for (Class parent : iface.getInterfaces()) { - if (isVertxHandler(parent)) { - return true; - } - } - return false; - } - } } diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java index 3a0f9c76a7b..cebe0f2ed80 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java @@ -22,7 +22,6 @@ import datadog.trace.agent.tooling.ExcludeFilterProvider; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.InstrumenterModule; -import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter; @@ -143,29 +142,26 @@ public static final class Construct { @Advice.OnMethodExit public static void captureScope(@Advice.This RunnableFuture task) { - ContextStore contextStore = - InstrumentationContext.get(RunnableFuture.class, State.class); - capture(contextStore, task); + capture(InstrumentationContext.get(RunnableFuture.class, State.class), task); } } public static final class Run { @Advice.OnMethodEnter public static AgentScope activate(@Advice.This RunnableFuture task) { - ContextStore contextStore = - InstrumentationContext.get(RunnableFuture.class, State.class); - - // Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue - // delayed tasks scheduled from outside the event loop, then again when - // the deadline elapses. Only skip the first call when there is a captured - // continuation to preserve for the actual fire. - State state = contextStore.get(task); - if (task instanceof ScheduledFuture - && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask")) { - long delayNanos = ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS); - if (delayNanos > 0 && state != null && state.getSpan() != null) { - return null; - } + // Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue delayed tasks + // scheduled from outside the event loop (while the delay is still positive), then again when + // the deadline elapses (delay <= 0). Skip activation on the early enqueue run when there is a + // captured State to preserve, so the continuation survives for the actual fire — otherwise + // startTaskScope() would consume it here. The endsWith check intentionally also matches the + // shaded Netty copies (grpc-shaded, play-shaded, couchbase-deps), which behave the same way; + // the JDK's own ScheduledThreadPoolExecutor$ScheduledFutureTask does not match. + State state = InstrumentationContext.get(RunnableFuture.class, State.class).get(task); + if (state != null + && task instanceof ScheduledFuture + && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask") + && ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS) > 0) { + return null; } return startTaskScope(state); } diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java new file mode 100644 index 00000000000..01715b894a5 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java @@ -0,0 +1,86 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.grpc.netty.shaded.io.netty.util.concurrent.EventExecutor; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +/** + * The fix in {@code RunnableFutureInstrumentation} matches Netty's {@code ScheduledFutureTask} by + * the {@code .netty.util.concurrent.ScheduledFutureTask} class-name suffix so it also covers shaded + * Netty copies. This verifies context propagation through a delayed task on grpc-netty-shaded's + * (relocated) {@code DefaultEventExecutorGroup}. + */ +class GrpcShadedNettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest { + @Test + void testDelayedTaskPropagatesContextWithShadedNetty() throws Exception { + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup + implements AutoCloseable { + private CloseableDefaultEventExecutorGroup() { + super(1); + } + + @Override + public void close() { + try { + shutdownGracefully(0, 1, SECONDS).sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class TraceableTask implements Runnable { + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + try { + asyncChild(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java new file mode 100644 index 00000000000..29577c526a1 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java @@ -0,0 +1,46 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import org.junit.jupiter.api.Test; + +class NettyIdleTimeoutContextPropagationTest extends AbstractInstrumentationTest { + + @Test + void testIdleStateHandlerDoesNotHoldTraceOpen() { + assertTimeoutHandlerDoesNotHoldTraceOpen(new IdleStateHandler(60, 60, 60)); + } + + @Test + void testReadTimeoutHandlerDoesNotHoldTraceOpen() { + // ReadTimeoutHandler inherits IdleStateHandler#schedule, so the same suppression must apply. + assertTimeoutHandlerDoesNotHoldTraceOpen(new ReadTimeoutHandler(60)); + } + + private void assertTimeoutHandlerDoesNotHoldTraceOpen(io.netty.channel.ChannelHandler handler) { + // An EmbeddedChannel is active and registered, so adding the handler triggers + // IdleStateHandler#schedule for its long (60s) idle timers. + EmbeddedChannel channel = new EmbeddedChannel(); + AgentSpan parent = startSpan("test", "parent"); + try (AgentScope ignored = activateSpan(parent)) { + channel.pipeline().addLast(handler); + } finally { + parent.finish(); + } + + // If the idle timers had captured the active request continuation, the trace would stay open + // until they fire (60s) and this assertion would time out. Suppression lets it report promptly. + assertTraces(trace(span().root().operationName("parent"))); + + channel.finishAndReleaseAll(); + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java index 66992d13e34..78734383d1d 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java @@ -28,6 +28,8 @@ class NettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumenta @Test void testNettyVersionCompatible() { assertFalse(isCompatibleVersion("4.0.0.Final")); + assertFalse(isCompatibleVersion("4.0.44.Final")); + assertFalse(isCompatibleVersion("4.0.99.Final")); assertFalse(isCompatibleVersion("4.1.9.Final")); assertFalse(isCompatibleVersion("4.1.43.Final")); assertTrue(isCompatibleVersion("4.1.44.Final")); @@ -72,6 +74,59 @@ void testDelayedScheduledFutureTaskActivatesCapturedContinuationWhenDelayExpires } } + @Test + void testDelayedTaskPropagatesContextOnAllNettyVersions() throws Exception { + // Cross-version invariant: context propagation through a delayed task must work on every + // supported Netty version — pre-4.1.44 (single run() at the deadline) and 4.1.44+ (a delay > 0 + // self-enqueue run followed by the real fire). This guards against the fix regressing the + // versions that were never broken. + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + @Test + void testImmediateScheduledTaskKeepsContext() throws Exception { + // A ScheduledFutureTask scheduled with a non-positive delay only ever runs its body when + // delayNanos <= 0 (Netty self-enqueues only while delay > 0). The fix's "delay > 0" skip must + // therefore NOT apply to immediate tasks, so the captured continuation must still activate. + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 0, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + private static boolean hasCompatibleVersion() { for (Map.Entry entry : Version.identify().entrySet()) { if (entry.getKey().startsWith("netty-")) { @@ -86,25 +141,11 @@ private static boolean isCompatibleVersion(String version) { if (parts.length < 3) { return false; } - int major = Integer.parseInt(parts[0]); int minor = Integer.parseInt(parts[1]); int patch = Integer.parseInt(parts[2]); - - if (major > 4) { - return true; - } - - if (major != 4) { - return false; - } - - if (minor > 1) { - return true; - } - - // Since 4.1.44+ Netty uses a self-enqueue path for delayed tasks. - return patch >= 44; + // Netty uses a self-enqueue path for delayed tasks since 4.1.44. + return major > 4 || (major == 4 && (minor > 1 || (minor == 1 && patch >= 44))); } private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup @@ -146,4 +187,22 @@ public void run() { @Trace(operationName = "asyncChild") private void asyncChild() {} } + + private static final class TraceableTask implements Runnable { + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + try { + asyncChild(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } } diff --git a/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java new file mode 100644 index 00000000000..012723b963a --- /dev/null +++ b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java @@ -0,0 +1,161 @@ +package server; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.vertx.core.Handler; +import io.vertx.core.TimeoutStream; +import io.vertx.core.Vertx; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import org.junit.jupiter.api.Test; + +class VertxTimerContextPropagationForkedTest extends AbstractInstrumentationTest { + @Test + void testTimerCallbackCanPropagateContextToNestedTimer() throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch nestedTimerFinished = new CountDownLatch(1); + AtomicBoolean firstTimerSawActiveSpan = new AtomicBoolean(); + AtomicBoolean nestedTimerSawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + vertx.setTimer( + 10, + firstTimerId -> { + try { + firstTimerSawActiveSpan.set(activeSpan() != null); + + // A timer callback is user code. Async propagation must stay enabled here so work + // scheduled from the callback can keep the captured timer context. + vertx.setTimer( + 10, + nestedTimerId -> { + try { + nestedTimerSawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + nestedTimerFinished.countDown(); + } + }); + } catch (Throwable t) { + failure.set(t); + nestedTimerFinished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(nestedTimerFinished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(firstTimerSawActiveSpan.get()); + assertTrue(nestedTimerSawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + closeVertx(vertx); + } + } + + @Test + void testSetTimerHandlerKeepsContext() throws Exception { + assertTimerHandlerPropagatesContext((vertx, handler) -> vertx.setTimer(10, handler)); + } + + @Test + void testTimerStreamHandlerKeepsContext() throws Exception { + // timerStream(...).handler(...) wraps the user handler in an io.vertx TimeoutStream. The + // captured timer context must still reach the user callback through that wrapper. + assertTimerHandlerPropagatesContext((vertx, handler) -> vertx.timerStream(10).handler(handler)); + } + + @Test + void testPeriodicStreamHandlerKeepsContext() throws Exception { + // periodicStream(...) uses the same user-facing TimeoutStream wrapper as timerStream(...). It + // fires repeatedly, so cancel after the first delivery to keep a single asyncChild span. + assertTimerHandlerPropagatesContext( + (vertx, handler) -> { + TimeoutStream stream = vertx.periodicStream(10); + stream.handler( + id -> { + stream.cancel(); + handler.handle(id); + }); + }); + } + + private void assertTimerHandlerPropagatesContext(BiConsumer> scheduler) + throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch finished = new CountDownLatch(1); + AtomicBoolean sawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + scheduler.accept( + vertx, + id -> { + try { + sawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + finished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(finished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(sawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + closeVertx(vertx); + } + } + + private static void closeVertx(Vertx vertx) throws InterruptedException { + // Vert.x 4.0 predates Future#await; close via a callback and wait on a latch. + CountDownLatch closed = new CountDownLatch(1); + vertx.close(result -> closed.countDown()); + assertTrue(closed.await(5, SECONDS)); + } + + @Trace(operationName = "asyncChild") + private static void asyncChild() {} +}