diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java index 8d0d5873eea..77c59d8799e 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java @@ -13,7 +13,6 @@ import datadog.context.ContextScope; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -53,21 +52,11 @@ public static class PublisherSubscribeAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope onSubscribe( @Advice.This final Publisher self, @Advice.Argument(value = 0) final Subscriber s) { - - final Context context = - InstrumentationContext.get(Publisher.class, Context.class).remove(self); - final Context activeContext = Java8BytecodeBridge.getCurrentContext(); - if (s == null || (context == null && activeContext == null)) { - return null; - } - final Context current = - InstrumentationContext.get(Subscriber.class, Context.class) - .putIfAbsent(s, context != null ? context : activeContext); - if (current != null && activeContext != current) { - return current.attach(); - } - - return null; + return ReactiveStreamsContextPropagation.captureOnSubscribe( + self, + s, + InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Subscriber.class, Context.class)); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java new file mode 100644 index 00000000000..40bcd7d55b3 --- /dev/null +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java @@ -0,0 +1,66 @@ +package datadog.trace.instrumentation.reactivestreams; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +public final class ReactiveStreamsContextPropagation { + + private ReactiveStreamsContextPropagation() {} + + public static ContextScope captureOnSubscribe( + final Publisher publisher, + final Subscriber subscriber, + final ContextStore publisherContexts, + final ContextStore subscriberContexts) { + if (subscriber == null) { + return null; + } + + final Context contextFromPublisher = publisherContexts.remove(publisher); + final Context activeContext = Java8BytecodeBridge.getCurrentContext(); + final Context context = + contextFromPublisher != null ? contextFromPublisher : nonRootContext(activeContext); + if (context == null) { + return null; + } + + final Context subscriberContext = subscriberContexts.putIfAbsent(subscriber, context); + // A context captured on the publisher (cross-thread propagation) must win even when the + // current thread already carries a non-root active context + return attachIfRequired(subscriberContext, activeContext, true); + } + + public static ContextScope activateOnSignal( + final Subscriber subscriber, final ContextStore subscriberContexts) { + final Context activeContext = Java8BytecodeBridge.getCurrentContext(); + if (nonRootContext(activeContext) != null) { + return null; + } + return attachIfRequired(subscriberContexts.get(subscriber), activeContext, false); + } + + public static ContextScope activateOnComplete( + final Subscriber subscriber, final ContextStore subscriberContexts) { + return attachIfRequired( + subscriberContexts.get(subscriber), Java8BytecodeBridge.getCurrentContext(), true); + } + + private static ContextScope attachIfRequired( + final Context context, final Context activeContext, final boolean allowReplacingActive) { + if (nonRootContext(context) == null || context == activeContext) { + return null; + } + if (!allowReplacingActive && nonRootContext(activeContext) != null) { + return null; + } + return context.attach(); + } + + private static Context nonRootContext(final Context context) { + return context == null || context == Java8BytecodeBridge.getRootContext() ? null : context; + } +} diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java index 164eec431c1..80b92265419 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java @@ -19,6 +19,7 @@ public ReactiveStreamsModule() { @Override public String[] helperClassNames() { return new String[] { + packageName + ".ReactiveStreamsContextPropagation", packageName + ".ReactiveStreamsAsyncResultExtension", packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher", packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber", diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java index b9874674df5..6d0d27fe440 100644 --- a/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/SubscriberInstrumentation.java @@ -10,7 +10,6 @@ import datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers; import datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers; import datadog.trace.bootstrap.InstrumentationContext; -import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; @@ -51,12 +50,8 @@ public ElementMatcher hierarchyMatcher() { public static class SubscriberDownStreamAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope before(@Advice.This final Subscriber self) { - final Context currentContext = Java8BytecodeBridge.getCurrentContext(); - if (currentContext != null && currentContext != Java8BytecodeBridge.getRootContext()) { - return null; - } - final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self); - return context == null || context == currentContext ? null : context.attach(); + return ReactiveStreamsContextPropagation.activateOnSignal( + self, InstrumentationContext.get(Subscriber.class, Context.class)); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -75,8 +70,8 @@ public static void closeScope(@Advice.Enter final ContextScope scope) { public static class SubscriberCompleteAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope before(@Advice.This final Subscriber self) { - final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self); - return context == null ? null : context.attach(); + return ReactiveStreamsContextPropagation.activateOnComplete( + self, InstrumentationContext.get(Subscriber.class, Context.class)); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java new file mode 100644 index 00000000000..177adf279e7 --- /dev/null +++ b/dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java @@ -0,0 +1,158 @@ +package datadog.trace.instrumentation.reactivestreams; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; + +import datadog.context.Context; +import datadog.context.ContextKey; +import datadog.context.ContextScope; +import datadog.trace.bootstrap.ContextStore; +import java.util.IdentityHashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class ReactiveStreamsContextPropagationTest { + + private static final ContextKey KEY = ContextKey.named("reactive-streams-test"); + + @Test + void publisherCapturedContextOverridesActiveContext() { + final Publisher publisher = subscriber -> {}; + final Subscriber subscriber = new NoopSubscriber(); + final ContextStore publisherContexts = new MapContextStore<>(); + final ContextStore subscriberContexts = new MapContextStore<>(); + + // A context was captured on the publisher (e.g. at assembly / cross-thread subscribe). + final Context captured = Context.root().with(KEY, "captured"); + publisherContexts.put(publisher, captured); + + // The current thread already carries a different, non-root active context. + final Context active = Context.root().with(KEY, "active"); + try (ContextScope activeScope = active.attach()) { + assertSame(active, Context.current()); + + final ContextScope scope = + ReactiveStreamsContextPropagation.captureOnSubscribe( + publisher, subscriber, publisherContexts, subscriberContexts); + try { + // The captured context must win over the ambient active one + assertNotNull(scope, "captured context should be attached over the active context"); + assertSame(captured, Context.current()); + } finally { + if (scope != null) { + scope.close(); + } + } + + // Closing the scope restores the previously active context. + assertSame(active, Context.current()); + } + + // The captured context is remembered for the subscriber, and removed from the publisher store. + assertSame(captured, subscriberContexts.get(subscriber)); + assertNull(publisherContexts.get(publisher)); + } + + @Test + void signalActivationIsSkippedWhenAnotherContextIsActive() { + final Subscriber subscriber = new NoopSubscriber(); + final ContextStore subscriberContexts = new MapContextStore<>(); + subscriberContexts.put(subscriber, Context.root().with(KEY, "stored")); + + final Context active = Context.root().with(KEY, "active"); + try (ContextScope activeScope = active.attach()) { + final ContextScope scope = + ReactiveStreamsContextPropagation.activateOnSignal(subscriber, subscriberContexts); + assertNull(scope, "must not override an already-active non-root context on a signal"); + assertSame(active, Context.current()); + } + } + + @Test + void signalActivationAttachesStoredContextWhenIdle() { + final Subscriber subscriber = new NoopSubscriber(); + final ContextStore subscriberContexts = new MapContextStore<>(); + final Context stored = Context.root().with(KEY, "stored"); + subscriberContexts.put(subscriber, stored); + + final ContextScope scope = + ReactiveStreamsContextPropagation.activateOnSignal(subscriber, subscriberContexts); + try { + assertNotNull(scope); + assertSame(stored, Context.current()); + } finally { + if (scope != null) { + scope.close(); + } + } + } + + private static final class NoopSubscriber implements Subscriber { + @Override + public void onSubscribe(final Subscription subscription) {} + + @Override + public void onNext(final Object value) {} + + @Override + public void onError(final Throwable throwable) {} + + @Override + public void onComplete() {} + } + + private static final class MapContextStore implements ContextStore { + private final Map map = new IdentityHashMap<>(); + + @Override + public C get(final K key) { + return map.get(key); + } + + @Override + public void put(final K key, final C context) { + map.put(key, context); + } + + @Override + public C putIfAbsent(final K key, final C context) { + final C existing = map.get(key); + if (existing != null) { + return existing; + } + map.put(key, context); + return context; + } + + @Override + public C putIfAbsent(final K key, final Factory contextFactory) { + final C existing = map.get(key); + if (existing != null) { + return existing; + } + final C created = contextFactory.create(); + map.put(key, created); + return created; + } + + @Override + public C computeIfAbsent(final K key, final KeyAwareFactory contextFactory) { + final C existing = map.get(key); + if (existing != null) { + return existing; + } + final C created = contextFactory.create(key); + map.put(key, created); + return created; + } + + @Override + public C remove(final K key) { + return map.remove(key); + } + } +} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java index 9dd5257e902..e68665cc4bd 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java @@ -41,11 +41,8 @@ public void methodAdvice(MethodTransformer transformer) { public static class BlockingAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope before(@Advice.This final Publisher self) { - final Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self); - if (context == null) { - return null; - } - return context.attach(); + return ReactorContextBridge.activateForBlocking( + self, InstrumentationContext.get(Publisher.class, Context.class)); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java deleted file mode 100644 index bf11f50b640..00000000000 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextSpanHelper.java +++ /dev/null @@ -1,55 +0,0 @@ -package datadog.trace.instrumentation.reactor.core; - -import datadog.context.Context; -import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; -import javax.annotation.Nullable; -import reactor.core.CoreSubscriber; -import reactor.core.publisher.Mono; - -public class ContextSpanHelper { - - private static final Class MONO_WITH_CONTEXT_CLASS = findMonoWithContextClass(); - - private static final String DD_SPAN_KEY = "dd.span"; - - private static Class findMonoWithContextClass() { - final ClassLoader classLoader = Mono.class.getClassLoader(); - // 3.4+ - try { - return Class.forName( - "reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber", false, classLoader); - } catch (Throwable ignored) { - } - // < 3.4 - try { - return Class.forName( - "reactor.core.publisher.FluxContextStart$ContextStartSubscriber", false, classLoader); - } catch (Throwable ignored) { - } - return null; - } - - private ContextSpanHelper() {} - - @Nullable - public static Context extractContextFromSubscriberContext(final CoreSubscriber subscriber) { - if (MONO_WITH_CONTEXT_CLASS == null || !MONO_WITH_CONTEXT_CLASS.isInstance(subscriber)) { - return null; - } - reactor.util.context.Context reactorContext = null; - try { - reactorContext = subscriber.currentContext(); - } catch (Throwable ignored) { - } - if (reactorContext == null) { - return null; - } - if (reactorContext.hasKey(DD_SPAN_KEY)) { - Object maybeSpan = reactorContext.get(DD_SPAN_KEY); - if (maybeSpan instanceof WithAgentSpan) { - return ((WithAgentSpan) maybeSpan).asAgentSpan(); - } - } - return null; - } -} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextWritingSubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextWritingSubscriberInstrumentation.java new file mode 100644 index 00000000000..87f407fae94 --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ContextWritingSubscriberInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.reactor.core; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; + +/** + * Tailored instrumentation for Reactor's context-writing subscribers (the inner subscribers created + * by {@code contextWrite}/{@code subscriberContext} operators). Matching them by exact type removes + * the per-signal {@code instanceof} chain that the generic {@code CoreSubscriberInstrumentation} + * used to run on every {@link CoreSubscriber}. + * + *

The explicit Datadog {@link Context} carried by these subscribers is fixed at construction, so + * it is read once (constructor advice) and stored; signal advice then only does a context-store + * lookup. + */ +public class ContextWritingSubscriberInstrumentation + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber", + "reactor.core.publisher.FluxContextStart$ContextStartSubscriber", + "reactor.core.publisher.FluxContextWriteRestoringThreadLocals" + + "$ContextWriteRestoringThreadLocalsSubscriber", + "reactor.core.publisher.FluxContextWriteRestoringThreadLocalsFuseable" + + "$FuseableContextWriteRestoringThreadLocalsSubscriber", + "reactor.core.publisher.MonoContextWriteRestoringThreadLocals" + + "$ContextWriteRestoringThreadLocalsSubscriber", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureContextAdvice"); + transformer.applyAdvice( + namedOneOf("onNext", "onComplete", "onError"), + getClass().getName() + "$ActivateContextAdvice"); + } + + public static class CaptureContextAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstructed(@Advice.This final CoreSubscriber self) { + ReactorContextBridge.captureSubscriberContext( + self, InstrumentationContext.get(Subscriber.class, Context.class)); + } + } + + public static class ActivateContextAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope before(@Advice.This final CoreSubscriber self) { + return ReactorContextBridge.activateStoredContext( + self, InstrumentationContext.get(Subscriber.class, Context.class)); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void after(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java index 38c0061bea6..f27fc0ebd35 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java @@ -4,7 +4,6 @@ import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractContextFromSubscriberContext; import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.not; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; @@ -51,20 +50,16 @@ public void methodAdvice(MethodTransformer transformer) { public static class PropagateContextSpanOnSubscribe { @Advice.OnMethodEnter(suppress = Throwable.class) public static ContextScope before( - @Advice.This Publisher self, @Advice.Argument(0) final CoreSubscriber subscriber) { - final Context context = extractContextFromSubscriberContext(subscriber); - - if (context != null) { - /* - we force storing the context state linked to publisher and subscriber to the one - explicitly present in the reactor context so that, if PublisherInstrumentation is kicking in - after this advice, it won't override that active context. - */ - InstrumentationContext.get(Publisher.class, Context.class).put(self, context); - InstrumentationContext.get(Subscriber.class, Context.class).put(subscriber, context); - return context.attach(); - } - return null; + @Advice.This final Publisher self, + @Advice.Argument(0) final CoreSubscriber subscriber) { + // Hands the explicit context recorded for a context-writing subscriber to the publisher store + // (for the reactive-streams hand-off) and attaches it. The subscriber wrapping for + // context-reading operators lives in ContextReadingPublisherInstrumentation. + return ReactorContextBridge.captureOnSubscribe( + self, + subscriber, + InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Subscriber.class, Context.class)); } @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java deleted file mode 100644 index 75152611f80..00000000000 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CoreSubscriberInstrumentation.java +++ /dev/null @@ -1,53 +0,0 @@ -package datadog.trace.instrumentation.reactor.core; - -import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.implementsInterface; -import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; -import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; -import static datadog.trace.instrumentation.reactor.core.ContextSpanHelper.extractContextFromSubscriberContext; - -import datadog.context.Context; -import datadog.context.ContextScope; -import datadog.trace.agent.tooling.Instrumenter; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import reactor.core.CoreSubscriber; - -public class CoreSubscriberInstrumentation - implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { - - @Override - public String hierarchyMarkerType() { - return "reactor.core.CoreSubscriber"; - } - - @Override - public ElementMatcher hierarchyMatcher() { - return implementsInterface(named(hierarchyMarkerType())); - } - - @Override - public void methodAdvice(MethodTransformer transformer) { - transformer.applyAdvice( - namedOneOf("onNext", "onComplete", "onError"), - getClass().getName() + "$PropagateSpanInScopeAdvice"); - } - - public static class PropagateSpanInScopeAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static ContextScope before(@Advice.This final CoreSubscriber self) { - final Context context = extractContextFromSubscriberContext(self); - if (context != null) { - return context.attach(); - } - return null; - } - - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void after(@Advice.Enter final ContextScope scope) { - if (scope != null) { - scope.close(); - } - } - } -} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java index 747216f2402..af7743c6fff 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java @@ -50,16 +50,12 @@ public static void onSubscribe( @Advice.This final Publisher self, @Advice.Argument(0) final Subscriber arg, @Advice.Return final Subscriber s) { - if (s == null || arg == null) { - return; - } - Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self); - if (context == null) { - context = InstrumentationContext.get(Subscriber.class, Context.class).get(arg); - } - if (context != null) { - InstrumentationContext.get(Subscriber.class, Context.class).putIfAbsent(s, context); - } + ReactorContextBridge.transferToOptimizedSubscriber( + self, + arg, + s, + InstrumentationContext.get(Publisher.class, Context.class), + InstrumentationContext.get(Subscriber.class, Context.class)); } } } diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java new file mode 100644 index 00000000000..06145cdea7f --- /dev/null +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java @@ -0,0 +1,140 @@ +package datadog.trace.instrumentation.reactor.core; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import reactor.core.CoreSubscriber; + +/** + * Helper shared by the reactor-core instrumentations. It reads the span a user placed in the + * Reactor context under the {@code dd.span} key (adapting it to a {@link Context}) and drives the + * context-store-based propagation: capture on subscribe, restore on signal/blocking, and transfer + * to optimized subscribers. + */ +public final class ReactorContextBridge { + + private static final String DD_SPAN_KEY = "dd.span"; + + private ReactorContextBridge() {} + + /** + * Records the {@link Context} derived from the {@code dd.span} span a context-writing subscriber + * carries, into the subscriber store, once, when the subscriber is constructed. The caller + * ({@code ContextWritingSubscriberInstrumentation}) only matches context-writing subscribers, so + * no runtime type check is needed. + */ + public static void captureSubscriberContext( + final CoreSubscriber subscriber, + final ContextStore subscriberContexts) { + final Context context = explicitContextFromSubscriber(subscriber); + if (context != null) { + subscriberContexts.put(subscriber, context); + } + } + + /** + * Attaches the context recorded for {@code subscriber} by {@link #captureSubscriberContext}. A + * plain store lookup — no {@code instanceof}, no {@code currentContext()} call — on the signal + * hot path. + */ + public static ContextScope activateStoredContext( + final Subscriber subscriber, final ContextStore subscriberContexts) { + return attachIfRequired( + subscriberContexts.get(subscriber), Java8BytecodeBridge.getCurrentContext()); + } + + /** + * On subscribe, hands the explicit context recorded for {@code subscriber} (a context-writing + * subscriber) to the publisher store so the reactive-streams layer can propagate it, and attaches + * it. + */ + public static ContextScope captureOnSubscribe( + final Publisher publisher, + final Subscriber subscriber, + final ContextStore publisherContexts, + final ContextStore subscriberContexts) { + final Context context = subscriberContexts.get(subscriber); + if (context == null) { + return null; + } + + publisherContexts.put(publisher, context); + return attachIfRequired(context, Java8BytecodeBridge.getCurrentContext()); + } + + public static ContextScope activateForBlocking( + final Publisher publisher, final ContextStore publisherContexts) { + return attachIfRequired( + publisherContexts.get(publisher), Java8BytecodeBridge.getCurrentContext()); + } + + public static void transferToOptimizedSubscriber( + final Publisher publisher, + final Subscriber source, + final Subscriber target, + final ContextStore publisherContexts, + final ContextStore subscriberContexts) { + if (source == null || target == null) { + return; + } + + Context context = publisherContexts.get(publisher); + if (context == null) { + context = subscriberContexts.get(source); + } + if (context != null) { + subscriberContexts.putIfAbsent(target, context); + } + } + + private static Context explicitContextFromSubscriber(final CoreSubscriber subscriber) { + final reactor.util.context.Context reactorContext = currentContext(subscriber); + if (reactorContext == null || !hasKey(reactorContext, DD_SPAN_KEY)) { + return null; + } + final Object maybeSpan = get(reactorContext, DD_SPAN_KEY); + return maybeSpan instanceof WithAgentSpan ? ((WithAgentSpan) maybeSpan).asAgentSpan() : null; + } + + private static reactor.util.context.Context currentContext(final CoreSubscriber subscriber) { + if (subscriber == null) { + return null; + } + try { + return subscriber.currentContext(); + } catch (Throwable ignored) { + return null; + } + } + + private static ContextScope attachIfRequired(final Context context, final Context activeContext) { + if (nonRootContext(context) == null || context == activeContext) { + return null; + } + return context.attach(); + } + + private static Context nonRootContext(final Context context) { + return context == null || context == Java8BytecodeBridge.getRootContext() ? null : context; + } + + private static boolean hasKey(final reactor.util.context.Context context, final Object key) { + try { + return context.hasKey(key); + } catch (Throwable ignored) { + return false; + } + } + + private static Object get(final reactor.util.context.Context context, final Object key) { + try { + return context.get(key); + } catch (Throwable ignored) { + return null; + } + } +} diff --git a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java index 9073903a8c4..d729c5a7947 100644 --- a/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java +++ b/dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorCoreModule.java @@ -25,7 +25,7 @@ public ReactorCoreModule() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".ReactorAsyncResultExtension", packageName + ".ContextSpanHelper", + packageName + ".ReactorAsyncResultExtension", packageName + ".ReactorContextBridge", }; } @@ -55,7 +55,7 @@ public List typeInstrumentations() { return asList( new BlockingPublisherInstrumentation(), new CorePublisherInstrumentation(), - new CoreSubscriberInstrumentation(), + new ContextWritingSubscriberInstrumentation(), new OptimizableOperatorInstrumentation()); } }