Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -53,21 +52,11 @@ public static class PublisherSubscribeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Publisher self, @Advice.Argument(value = 0) final Subscriber s) {

final Context context =
InstrumentationContext.get(Publisher.class, Context.class).remove(self);
final Context activeContext = Java8BytecodeBridge.getCurrentContext();
if (s == null || (context == null && activeContext == null)) {
return null;
}
final Context current =
InstrumentationContext.get(Subscriber.class, Context.class)
.putIfAbsent(s, context != null ? context : activeContext);
if (current != null && activeContext != current) {
return current.attach();
}

return null;
return ReactiveStreamsContextPropagation.captureOnSubscribe(
self,
s,
InstrumentationContext.get(Publisher.class, Context.class),
InstrumentationContext.get(Subscriber.class, Context.class));
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.trace.instrumentation.reactivestreams;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public final class ReactiveStreamsContextPropagation {

private ReactiveStreamsContextPropagation() {}

public static ContextScope captureOnSubscribe(
final Publisher<?> publisher,
final Subscriber<?> subscriber,
final ContextStore<Publisher, Context> publisherContexts,
final ContextStore<Subscriber, Context> subscriberContexts) {
if (subscriber == null) {
return null;
}

final Context contextFromPublisher = publisherContexts.remove(publisher);
final Context activeContext = Java8BytecodeBridge.getCurrentContext();
final Context context =
contextFromPublisher != null ? contextFromPublisher : nonRootContext(activeContext);
if (context == null) {
return null;
}

final Context subscriberContext = subscriberContexts.putIfAbsent(subscriber, context);
// A context captured on the publisher (cross-thread propagation) must win even when the
// current thread already carries a non-root active context
return attachIfRequired(subscriberContext, activeContext, true);
}

public static ContextScope activateOnSignal(
final Subscriber<?> subscriber, final ContextStore<Subscriber, Context> subscriberContexts) {
final Context activeContext = Java8BytecodeBridge.getCurrentContext();
if (nonRootContext(activeContext) != null) {
return null;
}
return attachIfRequired(subscriberContexts.get(subscriber), activeContext, false);
}

public static ContextScope activateOnComplete(
final Subscriber<?> subscriber, final ContextStore<Subscriber, Context> subscriberContexts) {
return attachIfRequired(
subscriberContexts.get(subscriber), Java8BytecodeBridge.getCurrentContext(), true);
}

private static ContextScope attachIfRequired(
final Context context, final Context activeContext, final boolean allowReplacingActive) {
if (nonRootContext(context) == null || context == activeContext) {
return null;
}
if (!allowReplacingActive && nonRootContext(activeContext) != null) {
return null;
}
return context.attach();
}

private static Context nonRootContext(final Context context) {
return context == null || context == Java8BytecodeBridge.getRootContext() ? null : context;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public ReactiveStreamsModule() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactiveStreamsContextPropagation",
packageName + ".ReactiveStreamsAsyncResultExtension",
packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher",
packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers;
import datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
Expand Down Expand Up @@ -51,12 +50,8 @@ public ElementMatcher<TypeDescription> hierarchyMatcher() {
public static class SubscriberDownStreamAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope before(@Advice.This final Subscriber self) {
final Context currentContext = Java8BytecodeBridge.getCurrentContext();
if (currentContext != null && currentContext != Java8BytecodeBridge.getRootContext()) {
return null;
}
final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self);
return context == null || context == currentContext ? null : context.attach();
return ReactiveStreamsContextPropagation.activateOnSignal(
self, InstrumentationContext.get(Subscriber.class, Context.class));
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -75,8 +70,8 @@ public static void closeScope(@Advice.Enter final ContextScope scope) {
public static class SubscriberCompleteAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope before(@Advice.This final Subscriber self) {
final Context context = InstrumentationContext.get(Subscriber.class, Context.class).get(self);
return context == null ? null : context.attach();
return ReactiveStreamsContextPropagation.activateOnComplete(
self, InstrumentationContext.get(Subscriber.class, Context.class));
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package datadog.trace.instrumentation.reactivestreams;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;

import datadog.context.Context;
import datadog.context.ContextKey;
import datadog.context.ContextScope;
import datadog.trace.bootstrap.ContextStore;
import java.util.IdentityHashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class ReactiveStreamsContextPropagationTest {

private static final ContextKey<String> KEY = ContextKey.named("reactive-streams-test");

@Test
void publisherCapturedContextOverridesActiveContext() {
final Publisher<Object> publisher = subscriber -> {};
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Publisher, Context> publisherContexts = new MapContextStore<>();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();

// A context was captured on the publisher (e.g. at assembly / cross-thread subscribe).
final Context captured = Context.root().with(KEY, "captured");
publisherContexts.put(publisher, captured);

// The current thread already carries a different, non-root active context.
final Context active = Context.root().with(KEY, "active");
try (ContextScope activeScope = active.attach()) {
assertSame(active, Context.current());

final ContextScope scope =
ReactiveStreamsContextPropagation.captureOnSubscribe(
publisher, subscriber, publisherContexts, subscriberContexts);
try {
// The captured context must win over the ambient active one
assertNotNull(scope, "captured context should be attached over the active context");
assertSame(captured, Context.current());
} finally {
if (scope != null) {
scope.close();
}
}

// Closing the scope restores the previously active context.
assertSame(active, Context.current());
}

// The captured context is remembered for the subscriber, and removed from the publisher store.
assertSame(captured, subscriberContexts.get(subscriber));
assertNull(publisherContexts.get(publisher));
}

@Test
void signalActivationIsSkippedWhenAnotherContextIsActive() {
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();
subscriberContexts.put(subscriber, Context.root().with(KEY, "stored"));

final Context active = Context.root().with(KEY, "active");
try (ContextScope activeScope = active.attach()) {
final ContextScope scope =
ReactiveStreamsContextPropagation.activateOnSignal(subscriber, subscriberContexts);
assertNull(scope, "must not override an already-active non-root context on a signal");
assertSame(active, Context.current());
}
}

@Test
void signalActivationAttachesStoredContextWhenIdle() {
final Subscriber<Object> subscriber = new NoopSubscriber();
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();
final Context stored = Context.root().with(KEY, "stored");
subscriberContexts.put(subscriber, stored);

final ContextScope scope =
ReactiveStreamsContextPropagation.activateOnSignal(subscriber, subscriberContexts);
try {
assertNotNull(scope);
assertSame(stored, Context.current());
} finally {
if (scope != null) {
scope.close();
}
}
}

private static final class NoopSubscriber implements Subscriber<Object> {
@Override
public void onSubscribe(final Subscription subscription) {}

@Override
public void onNext(final Object value) {}

@Override
public void onError(final Throwable throwable) {}

@Override
public void onComplete() {}
}

private static final class MapContextStore<K, C> implements ContextStore<K, C> {
private final Map<K, C> map = new IdentityHashMap<>();

@Override
public C get(final K key) {
return map.get(key);
}

@Override
public void put(final K key, final C context) {
map.put(key, context);
}

@Override
public C putIfAbsent(final K key, final C context) {
final C existing = map.get(key);
if (existing != null) {
return existing;
}
map.put(key, context);
return context;
}

@Override
public C putIfAbsent(final K key, final Factory<C> contextFactory) {
final C existing = map.get(key);
if (existing != null) {
return existing;
}
final C created = contextFactory.create();
map.put(key, created);
return created;
}

@Override
public C computeIfAbsent(final K key, final KeyAwareFactory<? super K, C> contextFactory) {
final C existing = map.get(key);
if (existing != null) {
return existing;
}
final C created = contextFactory.create(key);
map.put(key, created);
return created;
}

@Override
public C remove(final K key) {
return map.remove(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ public void methodAdvice(MethodTransformer transformer) {
public static class BlockingAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope before(@Advice.This final Publisher self) {
final Context context = InstrumentationContext.get(Publisher.class, Context.class).get(self);
if (context == null) {
return null;
}
return context.attach();
return ReactorContextBridge.activateForBlocking(
self, InstrumentationContext.get(Publisher.class, Context.class));
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
Expand Down

This file was deleted.

Loading
Loading