diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle index ee85bef77fa..d19f2d5dae8 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/build.gradle @@ -11,14 +11,42 @@ tasks.named("compileJava") { } addTestSuiteForDir('latestDepTest', 'test') +addTestSuiteForDir('legacyNettyTest', 'test') + +// legacyNettyTest reuses the whole src/test tree but only exists to verify the non-shaded Netty +// ScheduledFutureTask / idle-timer propagation on the pre-4.1.44 line (forced to 4.1.9 below). +// Restrict it to those tests so the ~40-file module suite is not run a second time per pipeline. +// GrpcShadedNetty... is intentionally excluded: grpc-netty-shaded bundles its own (modern) relocated +// Netty, which the netty-all 4.1.9 force does not affect, so running it here would not test 4.1.9. +tasks.named('legacyNettyTest', Test) { + filter { + includeTestsMatching 'executor.NettyScheduledFutureTaskContextPropagationTest' + includeTestsMatching 'executor.NettyIdleTimeoutContextPropagationTest' + } +} dependencies { testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') testImplementation libs.guava - testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final' + + // Netty 4.1.44 is the first version where a delayed ScheduledFutureTask is run + // once when it is enqueued and again when the delay expires. + testImplementation group: 'io.netty', name: 'netty-all', version: '4.1.44.Final' + // Shaded Netty copy to verify the ScheduledFutureTask fix also applies to relocated Netty. + testImplementation group: 'io.grpc', name: 'grpc-netty-shaded', version: '1.58.0' testImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '7.0.0' + // Tomcat 10.1.+ seems to require Java 11. Limit to fix build. // TODO: Tomcat 10.0.10 has a copy of the JSR166 ThreadPoolExecutor so it needs special instrumentation latestDepTestImplementation group: 'org.apache.tomcat.embed', name: 'tomcat-embed-core', version: '10.0.8' + latestDepTestImplementation group: 'io.netty', name: 'netty-all', version: '4+' + + // Legacy netty version to test. + legacyNettyTestImplementation group: 'io.netty', name: 'netty-all', version: '4.1.9.Final' } +['legacyNettyTestCompileClasspath', 'legacyNettyTestRuntimeClasspath'].each { name -> + configurations.named(name).configure { + resolutionStrategy.force 'io.netty:netty-all:4.1.9.Final' + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index 85598459f26..b2076f871bd 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -61,6 +61,10 @@ public String[] knownMatchingTypes() { "rx.internal.operators.OperatorTimeoutBase", "com.amazonaws.http.timers.request.HttpRequestTimer", "io.netty.handler.timeout.WriteTimeoutHandler", + "io.netty.handler.timeout.IdleStateHandler", + "io.grpc.netty.shaded.io.netty.handler.timeout.IdleStateHandler", + "play.shaded.ahc.io.netty.handler.timeout.IdleStateHandler", + "com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler", "java.util.concurrent.ScheduledThreadPoolExecutor", "io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe", "io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe", @@ -121,6 +125,14 @@ public void methodAdvice(MethodTransformer transformer) { named("scheduleTimeout") .and(isDeclaredBy(named("io.netty.handler.timeout.WriteTimeoutHandler"))), advice); + // ReadTimeoutHandler and IdleStateHandler both schedule long-lived framework-owned idle timers + // through IdleStateHandler#schedule (ReadTimeoutHandler inherits it). Suppressing propagation + // here stops those timers from capturing an active request span and holding the trace open. The + // nameEndsWith match also covers the shaded Netty copies (grpc-shaded, Play WS, Couchbase). + transformer.applyAdvice( + named("schedule") + .and(isDeclaredBy(nameEndsWith(".netty.handler.timeout.IdleStateHandler"))), + advice); transformer.applyAdvice( named("rescheduleIdleTimer").and(isDeclaredBy(GRPC_MANAGED_CHANNEL)), advice); transformer.applyAdvice( diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java index a783258c421..cebe0f2ed80 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/runnable/RunnableFutureInstrumentation.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -147,7 +149,21 @@ public static void captureScope(@Advice.This RunnableFuture task) { public static final class Run { @Advice.OnMethodEnter public static AgentScope activate(@Advice.This RunnableFuture task) { - return startTaskScope(InstrumentationContext.get(RunnableFuture.class, State.class), task); + // Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue delayed tasks + // scheduled from outside the event loop (while the delay is still positive), then again when + // the deadline elapses (delay <= 0). Skip activation on the early enqueue run when there is a + // captured State to preserve, so the continuation survives for the actual fire — otherwise + // startTaskScope() would consume it here. The endsWith check intentionally also matches the + // shaded Netty copies (grpc-shaded, play-shaded, couchbase-deps), which behave the same way; + // the JDK's own ScheduledThreadPoolExecutor$ScheduledFutureTask does not match. + State state = InstrumentationContext.get(RunnableFuture.class, State.class).get(task); + if (state != null + && task instanceof ScheduledFuture + && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask") + && ((ScheduledFuture) task).getDelay(TimeUnit.NANOSECONDS) > 0) { + return null; + } + return startTaskScope(state); } @Advice.OnMethodExit(onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java new file mode 100644 index 00000000000..01715b894a5 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/GrpcShadedNettyScheduledFutureTaskContextPropagationTest.java @@ -0,0 +1,86 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.grpc.netty.shaded.io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.grpc.netty.shaded.io.netty.util.concurrent.EventExecutor; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +/** + * The fix in {@code RunnableFutureInstrumentation} matches Netty's {@code ScheduledFutureTask} by + * the {@code .netty.util.concurrent.ScheduledFutureTask} class-name suffix so it also covers shaded + * Netty copies. This verifies context propagation through a delayed task on grpc-netty-shaded's + * (relocated) {@code DefaultEventExecutorGroup}. + */ +class GrpcShadedNettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest { + @Test + void testDelayedTaskPropagatesContextWithShadedNetty() throws Exception { + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup + implements AutoCloseable { + private CloseableDefaultEventExecutorGroup() { + super(1); + } + + @Override + public void close() { + try { + shutdownGracefully(0, 1, SECONDS).sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class TraceableTask implements Runnable { + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + try { + asyncChild(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java new file mode 100644 index 00000000000..29577c526a1 --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyIdleTimeoutContextPropagationTest.java @@ -0,0 +1,46 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import org.junit.jupiter.api.Test; + +class NettyIdleTimeoutContextPropagationTest extends AbstractInstrumentationTest { + + @Test + void testIdleStateHandlerDoesNotHoldTraceOpen() { + assertTimeoutHandlerDoesNotHoldTraceOpen(new IdleStateHandler(60, 60, 60)); + } + + @Test + void testReadTimeoutHandlerDoesNotHoldTraceOpen() { + // ReadTimeoutHandler inherits IdleStateHandler#schedule, so the same suppression must apply. + assertTimeoutHandlerDoesNotHoldTraceOpen(new ReadTimeoutHandler(60)); + } + + private void assertTimeoutHandlerDoesNotHoldTraceOpen(io.netty.channel.ChannelHandler handler) { + // An EmbeddedChannel is active and registered, so adding the handler triggers + // IdleStateHandler#schedule for its long (60s) idle timers. + EmbeddedChannel channel = new EmbeddedChannel(); + AgentSpan parent = startSpan("test", "parent"); + try (AgentScope ignored = activateSpan(parent)) { + channel.pipeline().addLast(handler); + } finally { + parent.finish(); + } + + // If the idle timers had captured the active request continuation, the trace would stay open + // until they fire (60s) and this assertion would time out. Suppression lets it report promptly. + assertTraces(trace(span().root().operationName("parent"))); + + channel.finishAndReleaseAll(); + } +} diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java new file mode 100644 index 00000000000..78734383d1d --- /dev/null +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/test/java/executor/NettyScheduledFutureTaskContextPropagationTest.java @@ -0,0 +1,208 @@ +package executor; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.netty.util.Version; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutor; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Test; + +class NettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest { + @Test + void testNettyVersionCompatible() { + assertFalse(isCompatibleVersion("4.0.0.Final")); + assertFalse(isCompatibleVersion("4.0.44.Final")); + assertFalse(isCompatibleVersion("4.0.99.Final")); + assertFalse(isCompatibleVersion("4.1.9.Final")); + assertFalse(isCompatibleVersion("4.1.43.Final")); + assertTrue(isCompatibleVersion("4.1.44.Final")); + assertTrue(isCompatibleVersion("4.2.13.Final")); + assertTrue(isCompatibleVersion("5.0.0.Alpha2")); + assertTrue(isCompatibleVersion("5.0.0.Final")); + } + + @Test + void testDelayedScheduledFutureTaskActivatesCapturedContinuationWhenDelayExpires() + throws Exception { + assumeTrue(hasCompatibleVersion()); + + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + BlockingTraceableTask task = new BlockingTraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + // Netty 4.1.44+ calls ScheduledFutureTask.run() once while enqueueing a delayed task and + // again when the delay expires. The continuation captured here must survive the enqueue run. + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + // When the delayed task actually runs, instrumentation should activate the captured + // continuation so traced work in the task remains a child of the scheduling span. + assertTrue(task.started.await(5, SECONDS)); + try { + assertTrue(task.sawActiveSpan.get()); + } finally { + task.proceed.countDown(); + } + assertTrue(task.finished.await(5, SECONDS)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + @Test + void testDelayedTaskPropagatesContextOnAllNettyVersions() throws Exception { + // Cross-version invariant: context propagation through a delayed task must work on every + // supported Netty version — pre-4.1.44 (single run() at the deadline) and 4.1.44+ (a delay > 0 + // self-enqueue run followed by the real fire). This guards against the fix regressing the + // versions that were never broken. + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 50, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + @Test + void testImmediateScheduledTaskKeepsContext() throws Exception { + // A ScheduledFutureTask scheduled with a non-positive delay only ever runs its body when + // delayNanos <= 0 (Netty self-enqueues only while delay > 0). The fix's "delay > 0" skip must + // therefore NOT apply to immediate tasks, so the captured continuation must still activate. + try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { + EventExecutor executor = group.next(); + TraceableTask task = new TraceableTask(); + AgentSpan parent = startSpan("test", "parent"); + + try (AgentScope ignored = activateSpan(parent)) { + executor.schedule(task, 0, MILLISECONDS); + } finally { + parent.finish(); + } + + assertTrue(task.finished.await(5, SECONDS)); + assertTrue(task.sawActiveSpan.get()); + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } + } + + private static boolean hasCompatibleVersion() { + for (Map.Entry entry : Version.identify().entrySet()) { + if (entry.getKey().startsWith("netty-")) { + return isCompatibleVersion(entry.getValue().artifactVersion()); + } + } + return false; + } + + private static boolean isCompatibleVersion(String version) { + String[] parts = version.split("\\."); + if (parts.length < 3) { + return false; + } + int major = Integer.parseInt(parts[0]); + int minor = Integer.parseInt(parts[1]); + int patch = Integer.parseInt(parts[2]); + // Netty uses a self-enqueue path for delayed tasks since 4.1.44. + return major > 4 || (major == 4 && (minor > 1 || (minor == 1 && patch >= 44))); + } + + private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup + implements AutoCloseable { + private CloseableDefaultEventExecutorGroup() { + super(1); + } + + @Override + public void close() { + try { + shutdownGracefully(0, 1, SECONDS).sync(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + private static final class BlockingTraceableTask implements Runnable { + private final CountDownLatch started = new CountDownLatch(1); + private final CountDownLatch proceed = new CountDownLatch(1); + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + started.countDown(); + try { + proceed.await(5, SECONDS); + asyncChild(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } + + private static final class TraceableTask implements Runnable { + private final CountDownLatch finished = new CountDownLatch(1); + private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); + + @Override + public void run() { + sawActiveSpan.set(activeSpan() != null); + try { + asyncChild(); + } finally { + finished.countDown(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} + } +} diff --git a/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java new file mode 100644 index 00000000000..012723b963a --- /dev/null +++ b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-4.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java @@ -0,0 +1,161 @@ +package server; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.vertx.core.Handler; +import io.vertx.core.TimeoutStream; +import io.vertx.core.Vertx; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import org.junit.jupiter.api.Test; + +class VertxTimerContextPropagationForkedTest extends AbstractInstrumentationTest { + @Test + void testTimerCallbackCanPropagateContextToNestedTimer() throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch nestedTimerFinished = new CountDownLatch(1); + AtomicBoolean firstTimerSawActiveSpan = new AtomicBoolean(); + AtomicBoolean nestedTimerSawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + vertx.setTimer( + 10, + firstTimerId -> { + try { + firstTimerSawActiveSpan.set(activeSpan() != null); + + // A timer callback is user code. Async propagation must stay enabled here so work + // scheduled from the callback can keep the captured timer context. + vertx.setTimer( + 10, + nestedTimerId -> { + try { + nestedTimerSawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + nestedTimerFinished.countDown(); + } + }); + } catch (Throwable t) { + failure.set(t); + nestedTimerFinished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(nestedTimerFinished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(firstTimerSawActiveSpan.get()); + assertTrue(nestedTimerSawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + closeVertx(vertx); + } + } + + @Test + void testSetTimerHandlerKeepsContext() throws Exception { + assertTimerHandlerPropagatesContext((vertx, handler) -> vertx.setTimer(10, handler)); + } + + @Test + void testTimerStreamHandlerKeepsContext() throws Exception { + // timerStream(...).handler(...) wraps the user handler in an io.vertx TimeoutStream. The + // captured timer context must still reach the user callback through that wrapper. + assertTimerHandlerPropagatesContext((vertx, handler) -> vertx.timerStream(10).handler(handler)); + } + + @Test + void testPeriodicStreamHandlerKeepsContext() throws Exception { + // periodicStream(...) uses the same user-facing TimeoutStream wrapper as timerStream(...). It + // fires repeatedly, so cancel after the first delivery to keep a single asyncChild span. + assertTimerHandlerPropagatesContext( + (vertx, handler) -> { + TimeoutStream stream = vertx.periodicStream(10); + stream.handler( + id -> { + stream.cancel(); + handler.handle(id); + }); + }); + } + + private void assertTimerHandlerPropagatesContext(BiConsumer> scheduler) + throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch finished = new CountDownLatch(1); + AtomicBoolean sawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + scheduler.accept( + vertx, + id -> { + try { + sawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + finished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(finished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(sawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + closeVertx(vertx); + } + } + + private static void closeVertx(Vertx vertx) throws InterruptedException { + // Vert.x 4.0 predates Future#await; close via a callback and wait on a latch. + CountDownLatch closed = new CountDownLatch(1); + vertx.close(result -> closed.countDown()); + assertTrue(closed.await(5, SECONDS)); + } + + @Trace(operationName = "asyncChild") + private static void asyncChild() {} +} diff --git a/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java new file mode 100644 index 00000000000..9e3037efd7c --- /dev/null +++ b/dd-java-agent/instrumentation/vertx/vertx-web/vertx-web-5.0/src/test/java/server/VertxTimerContextPropagationForkedTest.java @@ -0,0 +1,82 @@ +package server; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.vertx.core.Vertx; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class VertxTimerContextPropagationForkedTest extends AbstractInstrumentationTest { + @Test + void testTimerCallbackCanPropagateContextToNestedTimer() throws Exception { + Vertx vertx = Vertx.vertx(); + CountDownLatch nestedTimerFinished = new CountDownLatch(1); + AtomicBoolean firstTimerSawActiveSpan = new AtomicBoolean(); + AtomicBoolean nestedTimerSawActiveSpan = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + AgentSpan parent = startSpan("test", "parent"); + + try { + try (AgentScope ignored = activateSpan(parent)) { + vertx.setTimer( + 10, + firstTimerId -> { + try { + firstTimerSawActiveSpan.set(activeSpan() != null); + + // A timer callback is user code. Async propagation must stay enabled here so work + // scheduled from the callback can keep the captured timer context. + vertx.setTimer( + 10, + nestedTimerId -> { + try { + nestedTimerSawActiveSpan.set(activeSpan() != null); + asyncChild(); + } catch (Throwable t) { + failure.set(t); + } finally { + nestedTimerFinished.countDown(); + } + }); + } catch (Throwable t) { + failure.set(t); + nestedTimerFinished.countDown(); + } + }); + } finally { + parent.finish(); + } + + assertTrue(nestedTimerFinished.await(5, SECONDS)); + if (failure.get() != null) { + throw new AssertionError(failure.get()); + } + assertTrue(firstTimerSawActiveSpan.get()); + assertTrue(nestedTimerSawActiveSpan.get()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("asyncChild"))); + } finally { + vertx.close().await(5, SECONDS); + } + } + + @Trace(operationName = "asyncChild") + private static void asyncChild() {} +}