Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public String failureReason() {

@Override
public boolean test(T t) {
if (this.expected instanceof CharSequence && t instanceof CharSequence) {
return this.expected.toString().equals(t.toString());
}
return this.expected.equals(t);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public final class SpanMatcher {

private static final Matcher<Long> CHILD_OF_PREVIOUS_MATCHER = is(0L);

/** Sentinel index value meaning "use the previous span in the array". */
private static final int INDEX_NOT_SET = -1;

private int parentSpanIndex = INDEX_NOT_SET;

private SpanMatcher() {
this.serviceNameMatcher = validates(s -> s != null && !s.isEmpty());
this.typeMatcher = isNull();
Expand Down Expand Up @@ -133,6 +138,18 @@ public SpanMatcher childOfPrevious() {
return this;
}

/**
* Checks the span is a direct child of the span at the given index in the trace. The index is
* resolved at assertion time by looking up the span ID from the trace list.
*
* @param index The zero-based index of the parent span in the trace.
* @return The current {@link SpanMatcher} instance with the child-of constraint applied.
*/
public SpanMatcher childOfSpan(int index) {
this.parentSpanIndex = index;
return this;
}

/**
* Checks the span has service name defined.
*
Expand Down Expand Up @@ -302,6 +319,14 @@ public SpanMatcher links(SpanLinkMatcher... matchers) {
}

void assertSpan(DDSpan span, DDSpan previousSpan) {
assertSpan(span, previousSpan, null);
}

void assertSpan(DDSpan span, DDSpan previousSpan, List<DDSpan> trace) {
// Apply parent id matcher from a specific span index
if (this.parentSpanIndex != INDEX_NOT_SET && trace != null) {
this.parentIdMatcher = is(trace.get(this.parentSpanIndex).getSpanId());
}
// Apply parent id matcher from the previous span
if (this.parentIdMatcher == CHILD_OF_PREVIOUS_MATCHER) {
this.parentIdMatcher = is(previousSpan.getSpanId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void assertTrace(List<DDSpan> trace, int traceIndex) {
DDSpan previousSpan = null;
for (int i = 0; i < spanCount; i++) {
DDSpan span = trace.get(i);
this.matchers[i].assertSpan(span, previousSpan);
this.matchers[i].assertSpan(span, previousSpan, trace);
previousSpan = span;
}
}
Expand Down
24 changes: 24 additions & 0 deletions dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

muzzle {
pass {
group = "io.reactivex.rxjava3"
module = "rxjava"
versions = "[3.0.0,)"
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')

dependencies {
compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.0'
compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0'

testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation')
testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20')

testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0'
testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0'
latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import net.bytebuddy.asm.Advice;

public final class CompletableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Completable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Completable completable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Completable.class, Context.class)
.put(completable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Completable completable,
@Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Completable.class, Context.class).get(completable);
if (parentContext != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingCompletableObserver(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Flowable;
import net.bytebuddy.asm.Advice;
import org.reactivestreams.Subscriber;

public final class FlowableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Flowable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("org.reactivestreams.Subscriber"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Flowable<?> flowable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Flowable<?> flowable,
@Advice.Argument(value = 0, readOnly = false) Subscriber<?> subscriber) {
if (subscriber != null) {
Context parentContext =
InstrumentationContext.get(Flowable.class, Context.class).get(flowable);
if (parentContext != null) {
subscriber = new TracingSubscriber<>(subscriber, parentContext);
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.MaybeObserver;
import net.bytebuddy.asm.Advice;

public final class MaybeInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Maybe";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Maybe<?> maybe) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Maybe<?> maybe,
@Advice.Argument(value = 0, readOnly = false) MaybeObserver<?> observer) {
if (observer != null) {
Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe);
if (parentContext != null) {
// wrap the observer so spans from its events treat the captured span as their parent
observer = new TracingMaybeObserver<>(observer, parentContext);
// attach the context here in case additional observers are created during subscribe
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package datadog.trace.instrumentation.rxjava3;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import datadog.context.Context;
import datadog.context.ContextScope;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import net.bytebuddy.asm.Advice;

public final class ObservableInstrumentation
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
@Override
public String instrumentedType() {
return "io.reactivex.rxjava3.core.Observable";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice");
transformer.applyAdvice(
isMethod()
.and(named("subscribe"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))),
getClass().getName() + "$PropagateParentSpanAdvice");
}

public static class CaptureParentSpanAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onConstruct(@Advice.This final Observable<?> observable) {
Context parentContext = Java8BytecodeBridge.getCurrentContext();
if (parentContext != null) {
InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext);
}
}
}

public static class PropagateParentSpanAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static ContextScope onSubscribe(
@Advice.This final Observable<?> observable,
@Advice.Argument(value = 0, readOnly = false) Observer<?> observer) {
if (observer != null) {
Context parentContext =
InstrumentationContext.get(Observable.class, Context.class).get(observable);
if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) {
observer = new TracingObserver<>(observer, parentContext);
return parentContext.attach();
}
}
return null;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void closeScope(@Advice.Enter final ContextScope scope) {
if (scope != null) {
scope.close();
}
}
}
}
Loading
Loading