diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/Is.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/Is.java index f142ba1b742..a196403e4e1 100644 --- a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/Is.java +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/Is.java @@ -27,6 +27,9 @@ public String failureReason() { @Override public boolean test(T t) { + if (this.expected instanceof CharSequence && t instanceof CharSequence) { + return this.expected.toString().equals(t.toString()); + } return this.expected.equals(t); } } diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/SpanMatcher.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/SpanMatcher.java index d913cd78752..acbee107710 100644 --- a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/SpanMatcher.java +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/SpanMatcher.java @@ -64,6 +64,11 @@ public final class SpanMatcher { private static final Matcher CHILD_OF_PREVIOUS_MATCHER = is(0L); + /** Sentinel index value meaning "use the previous span in the array". */ + private static final int INDEX_NOT_SET = -1; + + private int parentSpanIndex = INDEX_NOT_SET; + private SpanMatcher() { this.serviceNameMatcher = validates(s -> s != null && !s.isEmpty()); this.typeMatcher = isNull(); @@ -133,6 +138,18 @@ public SpanMatcher childOfPrevious() { return this; } + /** + * Checks the span is a direct child of the span at the given index in the trace. The index is + * resolved at assertion time by looking up the span ID from the trace list. + * + * @param index The zero-based index of the parent span in the trace. + * @return The current {@link SpanMatcher} instance with the child-of constraint applied. + */ + public SpanMatcher childOfSpan(int index) { + this.parentSpanIndex = index; + return this; + } + /** * Checks the span has service name defined. * @@ -302,6 +319,14 @@ public SpanMatcher links(SpanLinkMatcher... matchers) { } void assertSpan(DDSpan span, DDSpan previousSpan) { + assertSpan(span, previousSpan, null); + } + + void assertSpan(DDSpan span, DDSpan previousSpan, List trace) { + // Apply parent id matcher from a specific span index + if (this.parentSpanIndex != INDEX_NOT_SET && trace != null) { + this.parentIdMatcher = is(trace.get(this.parentSpanIndex).getSpanId()); + } // Apply parent id matcher from the previous span if (this.parentIdMatcher == CHILD_OF_PREVIOUS_MATCHER) { this.parentIdMatcher = is(previousSpan.getSpanId()); diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/TraceMatcher.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/TraceMatcher.java index 4187bcd3823..a0c232d875c 100644 --- a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/TraceMatcher.java +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/assertions/TraceMatcher.java @@ -71,7 +71,7 @@ void assertTrace(List trace, int traceIndex) { DDSpan previousSpan = null; for (int i = 0; i < spanCount; i++) { DDSpan span = trace.get(i); - this.matchers[i].assertSpan(span, previousSpan); + this.matchers[i].assertSpan(span, previousSpan, trace); previousSpan = span; } } diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle new file mode 100644 index 00000000000..1fd876777cc --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle @@ -0,0 +1,24 @@ + +muzzle { + pass { + group = "io.reactivex.rxjava3" + module = "rxjava" + versions = "[3.0.0,)" + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') + +dependencies { + compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.0' + compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0' + + testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') + testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20') + + testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0' + testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0' + latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '+' +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java new file mode 100644 index 00000000000..3af41d89fa8 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java @@ -0,0 +1,73 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import net.bytebuddy.asm.Advice; + +public final class CompletableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Completable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Completable completable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Completable.class, Context.class) + .put(completable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Completable completable, + @Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) { + if (observer != null) { + Context parentContext = + InstrumentationContext.get(Completable.class, Context.class).get(completable); + if (parentContext != null) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingCompletableObserver(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java new file mode 100644 index 00000000000..16104c4b877 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java @@ -0,0 +1,70 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Flowable; +import net.bytebuddy.asm.Advice; +import org.reactivestreams.Subscriber; + +public final class FlowableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Flowable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("org.reactivestreams.Subscriber"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Flowable flowable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Flowable flowable, + @Advice.Argument(value = 0, readOnly = false) Subscriber subscriber) { + if (subscriber != null) { + Context parentContext = + InstrumentationContext.get(Flowable.class, Context.class).get(flowable); + if (parentContext != null) { + subscriber = new TracingSubscriber<>(subscriber, parentContext); + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java new file mode 100644 index 00000000000..49bf3e35acf --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java @@ -0,0 +1,70 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import net.bytebuddy.asm.Advice; + +public final class MaybeInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Maybe"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Maybe maybe) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Maybe maybe, + @Advice.Argument(value = 0, readOnly = false) MaybeObserver observer) { + if (observer != null) { + Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe); + if (parentContext != null) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingMaybeObserver<>(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java new file mode 100644 index 00000000000..5805f890b24 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java @@ -0,0 +1,69 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import net.bytebuddy.asm.Advice; + +public final class ObservableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Observable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Observable observable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null) { + InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Observable observable, + @Advice.Argument(value = 0, readOnly = false) Observer observer) { + if (observer != null) { + Context parentContext = + InstrumentationContext.get(Observable.class, Context.class).get(observable); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + observer = new TracingObserver<>(observer, parentContext); + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java new file mode 100644 index 00000000000..26ad58cfcf3 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java @@ -0,0 +1,68 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.EagerHelper; +import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension; +import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; + +public class RxJavaAsyncResultExtension implements AsyncResultExtension, EagerHelper { + static { + AsyncResultExtensions.register(new RxJavaAsyncResultExtension()); + } + + /** + * Register the extension as an {@link AsyncResultExtension} using static class initialization. + *
+ * It uses an empty static method call to ensure the class loading and the one-time-only static + * class initialization. This will ensure this extension will only be registered once under {@link + * AsyncResultExtensions}. + */ + public static void init() {} + + @Override + public boolean supports(Class result) { + return Completable.class.isAssignableFrom(result) + || Maybe.class.isAssignableFrom(result) + || Single.class.isAssignableFrom(result) + || Observable.class.isAssignableFrom(result) + || Flowable.class.isAssignableFrom(result); + } + + @Override + public Object apply(Object result, AgentSpan span) { + if (result instanceof Completable) { + return ((Completable) result) + .doOnEvent(throwable -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Maybe) { + return ((Maybe) result) + .doOnEvent((o, throwable) -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Single) { + return ((Single) result) + .doOnEvent((o, throwable) -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Observable) { + return ((Observable) result) + .doOnComplete(span::finish) + .doOnError(throwable -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Flowable) { + return ((Flowable) result) + .doOnComplete(span::finish) + .doOnError(throwable -> onError(span, throwable)) + .doOnCancel(span::finish); + } + return null; + } + + private static void onError(AgentSpan span, Throwable throwable) { + span.addThrowable(throwable); + span.finish(); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java new file mode 100644 index 00000000000..5931d2873ed --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.rxjava3; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class RxJavaModule extends InstrumenterModule.ContextTracking { + public RxJavaModule() { + super("rxjava", "rxjava-3"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TracingObserver", + packageName + ".TracingSubscriber", + packageName + ".TracingSingleObserver", + packageName + ".TracingMaybeObserver", + packageName + ".TracingCompletableObserver", + packageName + ".RxJavaAsyncResultExtension", + }; + } + + @Override + public Map contextStore() { + final Map store = new HashMap<>(); + store.put("io.reactivex.rxjava3.core.Observable", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Flowable", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Single", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Maybe", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Completable", Context.class.getName()); + return store; + } + + @Override + public List typeInstrumentations() { + return asList( + new ObservableInstrumentation(), + new FlowableInstrumentation(), + new SingleInstrumentation(), + new MaybeInstrumentation(), + new CompletableInstrumentation()); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java new file mode 100644 index 00000000000..6ed709e0b50 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import net.bytebuddy.asm.Advice; + +public final class SingleInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Single"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.SingleObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Single single) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null) { + InstrumentationContext.get(Single.class, Context.class).put(single, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Single single, + @Advice.Argument(value = 0, readOnly = false) SingleObserver observer) { + if (observer != null) { + Context parentContext = InstrumentationContext.get(Single.class, Context.class).get(single); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingSingleObserver<>(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java new file mode 100644 index 00000000000..8a0dd7254e1 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingCompletableObserver implements CompletableObserver { + private final CompletableObserver observer; + private final Context parentContext; + + public TracingCompletableObserver( + @Nonnull final CompletableObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java new file mode 100644 index 00000000000..0cbf34c61e4 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingMaybeObserver implements MaybeObserver { + private final MaybeObserver observer; + private final Context parentContext; + + public TracingMaybeObserver( + @Nonnull final MaybeObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onSuccess(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onSuccess(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java new file mode 100644 index 00000000000..32018611cd1 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java @@ -0,0 +1,43 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingObserver implements Observer { + private final Observer observer; + private final Context parentContext; + + public TracingObserver(final Observer observer, final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onNext(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onNext(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java new file mode 100644 index 00000000000..3e05d1124bc --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingSingleObserver implements SingleObserver { + private final SingleObserver observer; + private final Context parentContext; + + public TracingSingleObserver( + @Nonnull final SingleObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onSuccess(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onSuccess(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java new file mode 100644 index 00000000000..c25317ecb8a --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import javax.annotation.Nonnull; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** Wrapper that makes sure spans from subscriber events treat the captured span as their parent. */ +public final class TracingSubscriber implements Subscriber { + private final Subscriber subscriber; + private final Context parentContext; + + public TracingSubscriber( + @Nonnull final Subscriber subscriber, @Nonnull final Context parentContext) { + this.subscriber = subscriber; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(final T value) { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onNext(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java new file mode 100644 index 00000000000..73ae483b411 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java @@ -0,0 +1,113 @@ +package annotatedsample; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.opentelemetry.instrumentation.annotations.WithSpan; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.util.concurrent.CountDownLatch; + +/** Helper class for {@link WithSpan}-annotated async methods returning RxJava 3 types. */ +public class RxJava3TracedMethods { + @WithSpan + public static Completable traceAsyncCompletable(CountDownLatch latch) { + return Completable.fromRunnable(() -> await(latch)); + } + + @WithSpan + public static Completable traceAsyncFailingCompletable( + CountDownLatch latch, Exception exception) { + return Completable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Maybe traceAsyncMaybe(CountDownLatch latch) { + return Maybe.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Maybe traceAsyncFailingMaybe(CountDownLatch latch, Exception exception) { + return Maybe.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Single traceAsyncSingle(CountDownLatch latch) { + return Single.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Single traceAsyncFailingSingle(CountDownLatch latch, Exception exception) { + return Single.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Observable traceAsyncObservable(CountDownLatch latch) { + return Observable.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Observable traceAsyncFailingObservable( + CountDownLatch latch, Exception exception) { + return Observable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Flowable traceAsyncFlowable(CountDownLatch latch) { + return Flowable.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Flowable traceAsyncFailingFlowable( + CountDownLatch latch, Exception exception) { + return Flowable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + private static void await(CountDownLatch latch) { + try { + if (!latch.await(5, SECONDS)) { + throw new IllegalStateException("Latch still locked"); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java new file mode 100644 index 00000000000..21d1d318489 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java @@ -0,0 +1,188 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.Matchers.is; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.error; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import annotatedsample.RxJava3TracedMethods; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.junit.utils.config.WithConfig; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +/** + * Tests that the RxJava 3 async result extension correctly completes spans for {@link + * io.opentelemetry.instrumentation.annotations.WithSpan}-annotated methods that return RxJava 3 + * reactive types. + * + *

These tests verify that spans are not finished when the method returns, but only after the + * reactive type signals completion (success, error, or cancellation). + */ +@WithConfig(key = "trace.otel.enabled", value = "true") +@WithConfig(key = "integration.opentelemetry-annotations-1.20.enabled", value = "true") +public class RxJava3ResultExtensionTest extends AbstractInstrumentationTest { + + // ---- Success path tests ---- + + static Stream asyncSuccessProvider() { + return Stream.of( + Arguments.of("Completable", "traceAsyncCompletable", "blockingAwait"), + Arguments.of("Maybe", "traceAsyncMaybe", "blockingGet"), + Arguments.of("Single", "traceAsyncSingle", "blockingGet"), + Arguments.of("Observable", "traceAsyncObservable", "blockingLast"), + Arguments.of("Flowable", "traceAsyncFlowable", "blockingLast")); + } + + @ParameterizedTest(name = "WithSpan annotated async method {0}") + @MethodSource("asyncSuccessProvider") + void withSpanAnnotatedAsyncMethodSuccess(String type, String method, String operation) + throws Exception { + CountDownLatch latch = new CountDownLatch(1); + Object asyncType = invokeTracedMethod(method, latch); + + // Span should not be finished yet + assertEquals(0, writer.size()); + + latch.countDown(); + invokeBlockingOperation(asyncType, operation); + + assertTraces( + trace( + span() + .root() + .operationName("RxJava3TracedMethods.traceAsync" + type) + .resourceName("RxJava3TracedMethods." + method) + .tags( + defaultTags(), + tag(Tags.COMPONENT, is("opentelemetry")), + tag(Tags.SPAN_KIND, is("internal"))))); + } + + // ---- Failure path tests ---- + + static Stream asyncFailureProvider() { + return Stream.of( + Arguments.of("Completable", "traceAsyncFailingCompletable", "blockingAwait"), + Arguments.of("Maybe", "traceAsyncFailingMaybe", "blockingGet"), + Arguments.of("Single", "traceAsyncFailingSingle", "blockingGet"), + Arguments.of("Observable", "traceAsyncFailingObservable", "blockingLast"), + Arguments.of("Flowable", "traceAsyncFailingFlowable", "blockingLast")); + } + + @ParameterizedTest(name = "WithSpan annotated async method failing {0}") + @MethodSource("asyncFailureProvider") + void withSpanAnnotatedAsyncMethodFailure(String type, String method, String operation) + throws Exception { + CountDownLatch latch = new CountDownLatch(1); + IllegalStateException expectedException = new IllegalStateException("Test exception"); + Object asyncType = invokeFailingTracedMethod(method, latch, expectedException); + + // Span should not be finished yet + assertEquals(0, writer.size()); + + latch.countDown(); + assertThrows(IllegalStateException.class, () -> invokeBlockingOperation(asyncType, operation)); + + assertTraces( + trace( + span() + .root() + .operationName("RxJava3TracedMethods." + method) + .resourceName("RxJava3TracedMethods." + method) + .error() + .tags( + defaultTags(), + tag(Tags.COMPONENT, is("opentelemetry")), + tag(Tags.SPAN_KIND, is("internal")), + error(expectedException)))); + } + + // ---- Cancellation path tests ---- + + static Stream asyncCancelProvider() { + return Stream.of( + Arguments.of("Completable", "traceAsyncCompletable"), + Arguments.of("Maybe", "traceAsyncMaybe"), + Arguments.of("Single", "traceAsyncSingle"), + Arguments.of("Observable", "traceAsyncObservable"), + Arguments.of("Flowable", "traceAsyncFlowable")); + } + + @ParameterizedTest(name = "WithSpan annotated async method cancelled {0}") + @MethodSource("asyncCancelProvider") + void withSpanAnnotatedAsyncMethodCancelled(String type, String method) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + Object asyncType = invokeTracedMethod(method, latch); + + // Span should not be finished yet + assertEquals(0, writer.size()); + + latch.countDown(); + disposeAsyncType(asyncType); + + assertTraces( + trace( + span() + .root() + .operationName("RxJava3TracedMethods.traceAsync" + type) + .resourceName("RxJava3TracedMethods." + method) + .tags( + defaultTags(), + tag(Tags.COMPONENT, is("opentelemetry")), + tag(Tags.SPAN_KIND, is("internal"))))); + } + + // ---- Helper methods ---- + + private static Object invokeTracedMethod(String method, CountDownLatch latch) throws Exception { + return RxJava3TracedMethods.class.getMethod(method, CountDownLatch.class).invoke(null, latch); + } + + private static Object invokeFailingTracedMethod( + String method, CountDownLatch latch, Exception exception) throws Exception { + return RxJava3TracedMethods.class + .getMethod(method, CountDownLatch.class, Exception.class) + .invoke(null, latch, exception); + } + + private static void invokeBlockingOperation(Object asyncType, String operation) throws Exception { + try { + asyncType.getClass().getMethod(operation).invoke(asyncType); + } catch (InvocationTargetException e) { + Throwable cause = e.getCause(); + if (cause instanceof Exception) { + throw (Exception) cause; + } + throw e; + } + } + + private static void disposeAsyncType(Object asyncType) throws Exception { + if (asyncType instanceof Completable) { + ((Completable) asyncType).subscribe().dispose(); + } else if (asyncType instanceof Maybe) { + ((Maybe) asyncType).subscribe().dispose(); + } else if (asyncType instanceof Single) { + ((Single) asyncType).subscribe().dispose(); + } else if (asyncType instanceof Observable) { + ((Observable) asyncType).subscribe().dispose(); + } else if (asyncType instanceof Flowable) { + ((Flowable) asyncType).subscribe().dispose(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3SubscriptionTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3SubscriptionTest.java new file mode 100644 index 00000000000..c36042e2483 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3SubscriptionTest.java @@ -0,0 +1,315 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.Schedulers; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; + +/** + * Tests that context propagation works for all five RxJava 3 reactive types (Observable, Flowable, + * Single, Maybe, Completable) by verifying that a child span created inside a subscribe callback is + * correctly parented to the span that was active when the subscription was created. + */ +public class RxJava3SubscriptionTest extends AbstractInstrumentationTest { + + /** Simulates a traced operation that could happen inside a subscriber callback. */ + static int tracedQuery() { + AgentSpan span = startSpan("test", "Connection.query"); + span.finish(); + return new Random().nextInt(); + } + + /** Simulates a traced void operation for Completable callbacks. */ + static void tracedAction() { + AgentSpan span = startSpan("test", "Connection.query"); + span.finish(); + } + + @Test + void maybeSubscribeLambdaRunsUnderParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Maybe.create( + emitter -> { + emitter.onSuccess(new Object()); + }) + .subscribe( + value -> { + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void observableSubscribeLambdaRunsUnderParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Observable.just(1) + .subscribe( + value -> { + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void flowableSubscribeLambdaRunsUnderParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Flowable.just(1) + .subscribe( + value -> { + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void singleSubscribeLambdaRunsUnderParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Single.just(1) + .subscribe( + value -> { + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void completableSubscribeRunsUnderParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Completable.fromRunnable(() -> {}) + .subscribe( + () -> { + tracedAction(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void contextPropagatesAcrossThreadBoundary() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Maybe.create( + emitter -> { + emitter.onSuccess(42); + }) + .subscribeOn(Schedulers.io()) + .subscribe( + value -> { + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } + + @Test + void multipleSubscribersEachCaptureOwnParent() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(2); + + // First subscriber under parent-1 + AgentSpan parent1 = startSpan("test", "parent-1"); + AgentScope scope1 = activateSpan(parent1); + Maybe maybe1 = Maybe.just(1); + maybe1.subscribe( + value -> { + AgentSpan child = startSpan("test", "child-1"); + child.finish(); + latch.countDown(); + }); + scope1.close(); + parent1.finish(); + + // Second subscriber under parent-2 + AgentSpan parent2 = startSpan("test", "parent-2"); + AgentScope scope2 = activateSpan(parent2); + Maybe maybe2 = Maybe.just(2); + maybe2.subscribe( + value -> { + AgentSpan child = startSpan("test", "child-2"); + child.finish(); + latch.countDown(); + }); + scope2.close(); + parent2.finish(); + + latch.await(5, TimeUnit.SECONDS); + + // Two independent traces, each with a parent-child pair + assertTraces( + trace( + span().root().operationName("parent-1").resourceName("parent-1").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("child-1") + .resourceName("child-1") + .tags(defaultTags())), + trace( + span().root().operationName("parent-2").resourceName("parent-2").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("child-2") + .resourceName("child-2") + .tags(defaultTags()))); + } + + @Test + void errorInCallbackDoesNotBreakContextPropagation() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + AgentScope scope = activateSpan(parent); + try { + Observable.just(1, 2) + .subscribe( + value -> { + if (value == 1) { + throw new RuntimeException("callback error"); + } + // This should not be reached since error terminates the stream, + // but the span created before the error should still have the correct parent + tracedQuery(); + latch.countDown(); + }, + error -> { + // Error handler — context should still be properly propagated + tracedQuery(); + latch.countDown(); + }); + } finally { + scope.close(); + parent.finish(); + } + latch.await(5, TimeUnit.SECONDS); + + // The error handler creates a child span under the parent + assertTraces( + trace( + span().root().operationName("parent").resourceName("parent").tags(defaultTags()), + span() + .childOfPrevious() + .operationName("Connection.query") + .resourceName("Connection.query") + .tags(defaultTags()))); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3Test.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3Test.java new file mode 100644 index 00000000000..5544caaaacb --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/datadog/trace/instrumentation/rxjava3/RxJava3Test.java @@ -0,0 +1,676 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.Matchers.is; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.error; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.agent.test.assertions.SpanMatcher; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.schedulers.Schedulers; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import testdog.trace.instrumentation.rxjava3.TracedMethods; + +/** + * Context-propagation tests for RxJava 3. + * + *

RxJava 3 instrumentation creates NO spans of its own — it only bridges trace context across + * async boundaries. These tests verify that a span started inside a wrapped callback becomes a + * child of the parent span that was active when the boundary was created. + */ +public class RxJava3Test extends AbstractInstrumentationTest { + + static { + // TODO fix this by making sure that spans get closed properly + // Delayed reactive operators may finish child spans after the root span completes, same as + // RxJava2Test.useStrictTraceWrites() returning false + testConfig.strictTraceWrites(false); + } + + @org.junit.jupiter.api.BeforeAll + static void warmUpSchedulers() { + // Warm up RxJava's computation scheduler so the first test using delay() doesn't timeout + // due to scheduler thread pool initialization interfering with trace context propagation. + Maybe.just(0).delay(1, MILLISECONDS).blockingGet(); + } + + private static final String EXCEPTION_MESSAGE = "test exception"; + + static Function addOne() { + return i -> TracedMethods.addOneFunc(i); + } + + static Function addTwo() { + return i -> TracedMethods.addTwoFunc(i); + } + + /** + * Wraps a publisher supplier call under a trace-parent / publisher-parent span pair and + * subscribes synchronously (blocking). + * + * @param publisherSupplier a supplier that creates the reactive publisher + * @return the result of blocking on the publisher + */ + Object assemblePublisherUnderTrace(PublisherSupplier publisherSupplier) { + AgentSpan traceParent = startSpan("trace", "trace-parent"); + traceParent.setResourceName("trace-parent"); + traceParent.setTag(Tags.COMPONENT, "trace"); + AgentScope traceParentScope = activateSpan(traceParent); + try { + AgentSpan span = startSpan("test", "publisher-parent"); + AgentScope scope = activateSpan(span); + try { + Object publisher = publisherSupplier.get(); + if (publisher instanceof Maybe) { + return ((Maybe) publisher).blockingGet(); + } else if (publisher instanceof Flowable) { + return ((Flowable) publisher).toList().blockingGet(); + } else if (publisher instanceof Single) { + return ((Single) publisher).blockingGet(); + } else if (publisher instanceof Observable) { + return ((Observable) publisher).toList().blockingGet(); + } else if (publisher instanceof Completable) { + ((Completable) publisher).blockingAwait(); + return null; + } + throw new RuntimeException("Unknown publisher: " + publisher); + } finally { + span.finish(); + scope.close(); + } + } catch (RuntimeException e) { + traceParent.setError(true); + traceParent.addThrowable(e); + throw e; + } finally { + traceParent.finish(); + traceParentScope.close(); + } + } + + /** + * Creates a publisher and immediately cancels it under a trace-parent / publisher-parent pair. + * + * @param publisherSupplier a supplier that creates the reactive publisher + */ + void cancelUnderTrace(PublisherSupplier publisherSupplier) { + AgentSpan traceParent = startSpan("trace", "trace-parent"); + traceParent.setResourceName("trace-parent"); + traceParent.setTag(Tags.COMPONENT, "trace"); + AgentScope traceParentScope = activateSpan(traceParent); + try { + AgentSpan span = startSpan("test", "publisher-parent"); + AgentScope scope = activateSpan(span); + + Object publisher = publisherSupplier.get(); + Flowable flowable; + if (publisher instanceof Maybe) { + flowable = ((Maybe) publisher).toFlowable(); + } else if (publisher instanceof Flowable) { + flowable = (Flowable) publisher; + } else if (publisher instanceof Single) { + flowable = ((Single) publisher).toFlowable(); + } else if (publisher instanceof Observable) { + flowable = ((Observable) publisher).toFlowable(BackpressureStrategy.BUFFER); + } else if (publisher instanceof Completable) { + flowable = ((Completable) publisher).toFlowable(); + } else { + throw new RuntimeException("Unknown publisher: " + publisher); + } + + flowable.subscribe( + new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.cancel(); + } + + @Override + public void onNext(Object t) {} + + @Override + public void onError(Throwable error) {} + + @Override + public void onComplete() {} + }); + + scope.close(); + span.finish(); + } finally { + traceParent.finish(); + traceParentScope.close(); + } + } + + // ---- Publisher context propagation tests ---- + + static Stream publisherTestProvider() { + return Stream.of( + Arguments.of("basic maybe", 2, 1, (PublisherSupplier) () -> Maybe.just(1).map(addOne())), + Arguments.of( + "two operations maybe", + 4, + 2, + (PublisherSupplier) () -> Maybe.just(2).map(addOne()).map(addOne())), + Arguments.of( + "delayed maybe", + 4, + 1, + (PublisherSupplier) () -> Maybe.just(3).delay(100, MILLISECONDS).map(addOne())), + Arguments.of( + "delayed twice maybe", + 6, + 2, + (PublisherSupplier) + () -> + Maybe.just(4) + .delay(100, MILLISECONDS) + .map(addOne()) + .delay(100, MILLISECONDS) + .map(addOne())), + Arguments.of( + "basic flowable", + Arrays.asList(6, 7), + 2, + (PublisherSupplier) () -> Flowable.fromIterable(Arrays.asList(5, 6)).map(addOne())), + Arguments.of( + "two operations flowable", + Arrays.asList(8, 9), + 4, + (PublisherSupplier) + () -> Flowable.fromIterable(Arrays.asList(6, 7)).map(addOne()).map(addOne())), + Arguments.of( + "delayed flowable", + Arrays.asList(8, 9), + 2, + (PublisherSupplier) + () -> + Flowable.fromIterable(Arrays.asList(7, 8)) + .delay(100, MILLISECONDS) + .map(addOne())), + Arguments.of( + "delayed twice flowable", + Arrays.asList(10, 11), + 4, + (PublisherSupplier) + () -> + Flowable.fromIterable(Arrays.asList(8, 9)) + .delay(100, MILLISECONDS) + .map(addOne()) + .delay(100, MILLISECONDS) + .map(addOne())), + Arguments.of( + "maybe from callable", + 12, + 2, + (PublisherSupplier) + () -> Maybe.fromCallable(() -> TracedMethods.addOneFunc(10)).map(addOne())), + Arguments.of("basic single", 2, 1, (PublisherSupplier) () -> Single.just(1).map(addOne())), + Arguments.of( + "two operations single", + 4, + 2, + (PublisherSupplier) () -> Single.just(2).map(addOne()).map(addOne())), + Arguments.of( + "basic observable", + Arrays.asList(6, 7), + 2, + (PublisherSupplier) () -> Observable.fromIterable(Arrays.asList(5, 6)).map(addOne())), + Arguments.of( + "two operations observable", + Arrays.asList(8, 9), + 4, + (PublisherSupplier) + () -> Observable.fromIterable(Arrays.asList(6, 7)).map(addOne()).map(addOne())), + Arguments.of( + "completable", + null, + 1, + (PublisherSupplier) + () -> Completable.fromCallable(() -> TracedMethods.addOneFunc(10)))); + } + + @ParameterizedTest(name = "Publisher {0} test") + @MethodSource("publisherTestProvider") + void publisherContextPropagation( + String name, Object expected, int workSpans, PublisherSupplier publisherSupplier) { + Object result = assemblePublisherUnderTrace(publisherSupplier); + assertEquals(expected, result); + + SpanMatcher[] allSpans = new SpanMatcher[workSpans + 2]; + allSpans[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + allSpans[1] = + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + allSpans[i + 2] = + span() + .operationName("addOne") + .resourceName("addOne") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + } + + assertTraces(trace(SORT_BY_START_TIME, allSpans)); + } + + // ---- Publisher error tests ---- + + static Stream publisherErrorTestProvider() { + return Stream.of( + Arguments.of( + "maybe", + (PublisherSupplier) () -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE))), + Arguments.of( + "flowable", + (PublisherSupplier) () -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE))), + Arguments.of( + "single", + (PublisherSupplier) () -> Single.error(new RuntimeException(EXCEPTION_MESSAGE))), + Arguments.of( + "observable", + (PublisherSupplier) () -> Observable.error(new RuntimeException(EXCEPTION_MESSAGE))), + Arguments.of( + "completable", + (PublisherSupplier) () -> Completable.error(new RuntimeException(EXCEPTION_MESSAGE)))); + } + + @ParameterizedTest(name = "Publisher error {0} test") + @MethodSource("publisherErrorTestProvider") + void publisherErrorContextPropagation(String name, PublisherSupplier publisherSupplier) { + try { + assemblePublisherUnderTrace(publisherSupplier); + } catch (RuntimeException expected) { + // expected — errors propagate through the reactive chain + } + + // Context-propagation instrumentation does NOT attach errors at the reactive level, + // so that other integrations (netty, lettuce) are not impacted. + // Only the trace-parent span is errored because the exception bubbles up to it. + assertTraces( + trace( + SORT_BY_START_TIME, + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .error() + .tags( + defaultTags(), + tag(Tags.COMPONENT, is("trace")), + error(RuntimeException.class, EXCEPTION_MESSAGE)), + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()))); + } + + // ---- Publisher step error tests ---- + + static Stream publisherStepErrorTestProvider() { + return Stream.of( + Arguments.of( + "basic maybe failure", + 1, + (PublisherSupplier) + () -> + Maybe.just(1) + .map(addOne()) + .map( + i -> { + throw new RuntimeException(EXCEPTION_MESSAGE); + })), + Arguments.of( + "basic flowable failure", + 1, + (PublisherSupplier) + () -> + Flowable.fromIterable(Arrays.asList(5, 6)) + .map(addOne()) + .map( + i -> { + throw new RuntimeException(EXCEPTION_MESSAGE); + })), + Arguments.of( + "basic single failure", + 1, + (PublisherSupplier) + () -> + Single.just(1) + .map(addOne()) + .map( + i -> { + throw new RuntimeException(EXCEPTION_MESSAGE); + })), + Arguments.of( + "basic observable failure", + 1, + (PublisherSupplier) + () -> + Observable.fromIterable(Arrays.asList(5, 6)) + .map(addOne()) + .map( + i -> { + throw new RuntimeException(EXCEPTION_MESSAGE); + }))); + } + + @ParameterizedTest(name = "Publisher step {0} test") + @MethodSource("publisherStepErrorTestProvider") + void publisherStepErrorContextPropagation( + String name, int workSpans, PublisherSupplier publisherSupplier) { + try { + assemblePublisherUnderTrace(publisherSupplier); + } catch (RuntimeException expected) { + // expected — mid-chain error + } + + SpanMatcher[] allSpans = new SpanMatcher[workSpans + 2]; + allSpans[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .error() + .tags( + defaultTags(), + tag(Tags.COMPONENT, is("trace")), + error(RuntimeException.class, EXCEPTION_MESSAGE)); + allSpans[1] = + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + allSpans[i + 2] = + span() + .operationName("addOne") + .resourceName("addOne") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + } + + assertTraces(trace(SORT_BY_START_TIME, allSpans)); + } + + // ---- Cancel tests ---- + + static Stream publisherCancelTestProvider() { + return Stream.of( + Arguments.of("basic maybe", (PublisherSupplier) () -> Maybe.just(1)), + Arguments.of( + "basic flowable", (PublisherSupplier) () -> Flowable.fromIterable(Arrays.asList(5, 6))), + Arguments.of("basic single", (PublisherSupplier) () -> Single.just(1)), + Arguments.of( + "basic observable", + (PublisherSupplier) () -> Observable.fromIterable(Arrays.asList(5, 6))), + Arguments.of("basic completable", (PublisherSupplier) () -> Completable.complete())); + } + + @ParameterizedTest(name = "Publisher {0} cancel") + @MethodSource("publisherCancelTestProvider") + void publisherCancelContextPropagation(String name, PublisherSupplier publisherSupplier) { + cancelUnderTrace(publisherSupplier); + + assertTraces( + trace( + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))), + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()))); + } + + // ---- Chain parent tests ---- + + static Stream publisherChainParentTestProvider() { + return Stream.of( + Arguments.of( + "basic maybe", + 3, + (PublisherSupplier) + () -> + Maybe.just(1) + .map(addOne()) + .map(addOne()) + .concatWith(Maybe.just(1).map(addOne()))), + Arguments.of( + "basic flowable", + 5, + (PublisherSupplier) + () -> + Flowable.fromIterable(Arrays.asList(5, 6)) + .map(addOne()) + .map(addOne()) + .concatWith(Maybe.just(1).map(addOne()).toFlowable()))); + } + + @ParameterizedTest(name = "Publisher chain spans have the correct parent for {0}") + @MethodSource("publisherChainParentTestProvider") + void publisherChainParentContextPropagation( + String name, int workSpans, PublisherSupplier publisherSupplier) { + assemblePublisherUnderTrace(publisherSupplier); + + SpanMatcher[] allSpans = new SpanMatcher[workSpans + 2]; + allSpans[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + allSpans[1] = + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + allSpans[i + 2] = + span() + .operationName("addOne") + .resourceName("addOne") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + } + + assertTraces(trace(SORT_BY_START_TIME, allSpans)); + } + + // ---- Subscription time parent tests ---- + + @Test + void publisherChainSpansHaveCorrectParentsFromSubscriptionTime() { + Maybe maybe = Maybe.just(42).map(addOne()).map(addTwo()); + + AgentSpan parent = startSpan("test", "trace-parent"); + AgentScope scope = activateSpan(parent); + try { + maybe.blockingGet(); + } finally { + scope.close(); + parent.finish(); + } + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("trace-parent"), + span() + .childOfSpan(0) + .operationName("addOne") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))), + span() + .childOfSpan(0) + .operationName("addTwo") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))))); + } + + // ---- Subscription time parent with intermediate span tests ---- + + static Stream publisherSubscriptionTimeParentTestProvider() { + return Stream.of( + Arguments.of("basic maybe", 1, (PublisherSupplier) () -> Maybe.just(1).map(addOne())), + Arguments.of( + "basic flowable", + 2, + (PublisherSupplier) () -> Flowable.fromIterable(Arrays.asList(1, 2)).map(addOne()))); + } + + @ParameterizedTest(name = "Publisher chain spans from subscription time {0}") + @MethodSource("publisherSubscriptionTimeParentTestProvider") + void publisherSubscriptionTimeContextPropagation( + String name, int workItems, PublisherSupplier publisherSupplier) { + assemblePublisherUnderTrace( + () -> { + Object publisher = publisherSupplier.get(); + + AgentSpan intermediate = startSpan("test", "intermediate"); + AgentScope intermediateScope = activateSpan(intermediate); + try { + if (publisher instanceof Maybe) { + @SuppressWarnings("unchecked") + Maybe maybe = (Maybe) publisher; + return maybe.map(addTwo()); + } else if (publisher instanceof Flowable) { + @SuppressWarnings("unchecked") + Flowable flowable = (Flowable) publisher; + return flowable.map(addTwo()); + } + throw new IllegalStateException("Unknown publisher type"); + } finally { + intermediate.finish(); + intermediateScope.close(); + } + }); + + int totalSpans = 3 + 2 * workItems; + SpanMatcher[] allSpans = new SpanMatcher[totalSpans]; + allSpans[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + allSpans[1] = + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + allSpans[2] = span().childOfPrevious().operationName("intermediate").tags(defaultTags()); + + for (int i = 0; i < 2 * workItems; i += 2) { + allSpans[3 + i] = + span() + .operationName("addOne") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + allSpans[4 + i] = + span() + .operationName("addTwo") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + } + + assertTraces(trace(SORT_BY_START_TIME, allSpans)); + } + + // ---- Flowable parallel scheduler test ---- + + static Stream schedulerTestProvider() { + return Stream.of( + Arguments.of("new-thread", Schedulers.newThread()), + Arguments.of("computation", Schedulers.computation()), + Arguments.of("single", Schedulers.single()), + Arguments.of("trampoline", Schedulers.trampoline())); + } + + @ParameterizedTest(name = "Flowables propagate context on {0} scheduler") + @MethodSource("schedulerTestProvider") + void flowableParallelOnScheduler( + String schedulerName, io.reactivex.rxjava3.core.Scheduler scheduler) { + Object result = + assemblePublisherUnderTrace( + () -> + Flowable.fromIterable(Arrays.asList(1, 2, 3, 4)) + .parallel() + .runOn(scheduler) + .flatMap(num -> Maybe.just(num).map(addOne()).toFlowable()) + .sequential()); + + List values = (List) result; + assertEquals(4, values.size()); + + // 2 parent spans + 4 addOne work spans + SpanMatcher[] allSpans = new SpanMatcher[6]; + allSpans[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + allSpans[1] = + span() + .childOfPrevious() + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < 4; i++) { + allSpans[i + 2] = + span() + .operationName("addOne") + .resourceName("addOne") + .childOfSpan(1) + .tags(defaultTags(), tag(Tags.COMPONENT, is("trace"))); + } + + assertTraces(trace(SORT_BY_START_TIME, allSpans)); + } + + /** Functional interface for publisher suppliers (avoids Groovy closures). */ + @FunctionalInterface + interface PublisherSupplier { + Object get(); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/TracedMethods.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/TracedMethods.java new file mode 100644 index 00000000000..2b427149d41 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/TracedMethods.java @@ -0,0 +1,23 @@ +package testdog.trace.instrumentation.rxjava3; + +import datadog.trace.api.Trace; + +/** + * Helper class with {@link Trace}-annotated methods, kept in the {@code testdog} package to avoid + * the global ignore rule for {@code datadog.trace.*} classes, allowing the agent to instrument them + * at class-load time. + */ +public final class TracedMethods { + + @Trace(operationName = "addOne", resourceName = "addOne") + public static int addOneFunc(int i) { + return i + 1; + } + + @Trace(operationName = "addTwo", resourceName = "addTwo") + public static int addTwoFunc(int i) { + return i + 2; + } + + private TracedMethods() {} +} diff --git a/settings.gradle.kts b/settings.gradle.kts index bd5aaceffaa..57410ef074d 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -546,6 +546,7 @@ include( ":dd-java-agent:instrumentation:rs:jax-rs:jax-rs-client:jax-rs-client-2.0", ":dd-java-agent:instrumentation:rxjava:rxjava-1.0", ":dd-java-agent:instrumentation:rxjava:rxjava-2.0", + ":dd-java-agent:instrumentation:rxjava:rxjava-3.0", ":dd-java-agent:instrumentation:scala:scala-concurrent-2.8", ":dd-java-agent:instrumentation:scala:scala-promise:scala-promise-2.10", ":dd-java-agent:instrumentation:scala:scala-promise:scala-promise-2.13",