From ad2ec808911c690991b65489af47a183269042b3 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 3 Jun 2026 10:36:10 +0200 Subject: [PATCH] Avoid leaking continuations on internal rxjava schedulers --- .../AsyncPropagatingDisableInstrumentation.java | 12 ++++++++++-- .../rxjava-2.0/src/test/groovy/RxJava2Test.groovy | 6 ------ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index 1d543a705d2..70587135ac5 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -45,6 +45,8 @@ public AsyncPropagatingDisableInstrumentation() { nameEndsWith("io.grpc.internal.ManagedChannelImpl"); private static final ElementMatcher REACTOR_DISABLED_TYPE_INITIALIZERS = namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask"); + private static final ElementMatcher RXJAVA2_DISABLED_TYPE_INITIALIZERS = + named("io.reactivex.internal.schedulers.AbstractDirectTask"); @Override public boolean onlyMatchKnownTypes() { @@ -77,7 +79,8 @@ public String[] knownMatchingTypes() { "net.sf.ehcache.store.disk.DiskStorageFactory", "org.springframework.jms.listener.DefaultMessageListenerContainer", "org.apache.activemq.broker.TransactionBroker", - "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager" + "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", + "io.reactivex.internal.schedulers.AbstractDirectTask" }; } @@ -88,7 +91,10 @@ public String hierarchyMarkerType() { @Override public ElementMatcher hierarchyMatcher() { - return RX_WORKERS.or(GRPC_MANAGED_CHANNEL).or(REACTOR_DISABLED_TYPE_INITIALIZERS); + return RX_WORKERS + .or(GRPC_MANAGED_CHANNEL) + .or(REACTOR_DISABLED_TYPE_INITIALIZERS) + .or(RXJAVA2_DISABLED_TYPE_INITIALIZERS); } @Override @@ -172,6 +178,8 @@ public void methodAdvice(MethodTransformer transformer) { advice); transformer.applyAdvice( isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); + transformer.applyAdvice( + isTypeInitializer().and(isDeclaredBy(RXJAVA2_DISABLED_TYPE_INITIALIZERS)), advice); } public static class DisableAsyncAdvice { diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-2.0/src/test/groovy/RxJava2Test.groovy b/dd-java-agent/instrumentation/rxjava/rxjava-2.0/src/test/groovy/RxJava2Test.groovy index b7dd68fa1d2..ad858348316 100644 --- a/dd-java-agent/instrumentation/rxjava/rxjava-2.0/src/test/groovy/RxJava2Test.groovy +++ b/dd-java-agent/instrumentation/rxjava/rxjava-2.0/src/test/groovy/RxJava2Test.groovy @@ -20,12 +20,6 @@ class RxJava2Test extends InstrumentationSpecification { public static final String EXCEPTION_MESSAGE = "test exception" - @Override - boolean useStrictTraceWrites() { - // TODO fix this by making sure that spans get closed properly - return false - } - @Shared def addOne = { i -> addOneFunc(i)