From 00ab7e6cc028ca6b5b4f3f273ec7df525fbc6689 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 1 Jun 2026 15:31:41 +0200 Subject: [PATCH 1/5] Emit one spark.application trace per Spark Connect session --- .../spark/AbstractDatadogSparkListener.java | 143 ++++++- .../test/java/SparkConnectListenerTest.java | 365 ++++++++++++++++++ 2 files changed, 503 insertions(+), 5 deletions(-) create mode 100644 dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 817aa1f0d84..3550be8c7ef 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -115,6 +115,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap stageToJob = new HashMap<>(); private final HashMap stageProperties = new HashMap<>(); + private final HashMap jobToSessionId = new HashMap<>(); private final SparkAggregatedTaskMetrics applicationMetrics = new SparkAggregatedTaskMetrics(); private final HashMap streamingBatchMetrics = new HashMap<>(); @@ -127,6 +128,12 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap sqlQueries = new HashMap<>(); protected final HashMap sqlPlans = new HashMap<>(); private final HashMap liveExecutors = new HashMap<>(); + private final HashMap perSessionApplicationSpans = new HashMap<>(); + private final HashMap perSessionApplicationMetrics = + new HashMap<>(); + private final HashMap perSessionLastJobFailed = new HashMap<>(); + private final HashMap perSessionLastJobFailedMessage = new HashMap<>(); + private final HashMap perSessionLastJobFailedStackTrace = new HashMap<>(); private final Map accumulatorToStageID = new HashMap<>(); @@ -361,6 +368,33 @@ public synchronized void finishApplication( } applicationEnded = true; + // Finish per-session application spans before the guard below, because a pure Connect server + // has applicationSpan == null with jobCount > 0, which would cause the guard to return early + // and skip finishing the per-session spans entirely. + for (Map.Entry entry : perSessionApplicationSpans.entrySet()) { + String sessionId = entry.getKey(); + AgentSpan sessionAppSpan = entry.getValue(); + + if (Boolean.TRUE.equals(perSessionLastJobFailed.get(sessionId))) { + sessionAppSpan.setError(true); + sessionAppSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed"); + sessionAppSpan.setTag(DDTags.ERROR_MSG, perSessionLastJobFailedMessage.get(sessionId)); + sessionAppSpan.setTag(DDTags.ERROR_STACK, perSessionLastJobFailedStackTrace.get(sessionId)); + } + + SparkAggregatedTaskMetrics sessionMetrics = perSessionApplicationMetrics.get(sessionId); + if (sessionMetrics != null) { + sessionMetrics.setSpanMetrics(sessionAppSpan); + } + + sessionAppSpan.finish(time * 1000); + } + perSessionApplicationSpans.clear(); + perSessionApplicationMetrics.clear(); + perSessionLastJobFailed.clear(); + perSessionLastJobFailedMessage.clear(); + perSessionLastJobFailedStackTrace.clear(); + if ((applicationSpan == null && jobCount > 0) || isRunningOnDatabricks) { // If the application span is not initialized, but spark jobs have been executed, all those // spark jobs were databricks or streaming. In this case we don't send the application span @@ -466,6 +500,8 @@ private AgentSpan getOrCreateSqlSpan( return null; } + String connectSessionId = getSparkConnectSessionId(jobProperties); + AgentTracer.SpanBuilder spanBuilder = buildSparkSpan("spark.sql", jobProperties) .withStartTimestamp(queryStart.time() * 1000) @@ -479,6 +515,10 @@ private AgentSpan getOrCreateSqlSpan( AgentSpan batchSpan = getOrCreateStreamingBatchSpan(batchKey, queryStart.time(), jobProperties); spanBuilder.asChildOf(batchSpan.context()); + } else if (connectSessionId != null) { + AgentSpan sessionAppSpan = + getOrCreatePerSessionApplicationSpan(connectSessionId, queryStart.time(), jobProperties); + spanBuilder.asChildOf(sessionAppSpan.context()); } else if (isRunningOnDatabricks) { addDatabricksSpecificTags(spanBuilder, jobProperties, true); } else { @@ -492,6 +532,69 @@ private AgentSpan getOrCreateSqlSpan( return sqlSpan; } + private AgentSpan getOrCreatePerSessionApplicationSpan( + String sessionId, long timeMs, Properties jobProperties) { + AgentSpan span = perSessionApplicationSpans.get(sessionId); + if (span != null) { + return span; + } + + AgentTracer.SpanBuilder builder = + buildSparkSpan("spark.application", jobProperties) + .withStartTimestamp(timeMs * 1000 - 1) + .withTag("session_id", sessionId) + .withTag("spark.connect.server", true); + + if (applicationStart != null) { + String ddTags = + Config.get().getGlobalTags().entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + + builder + .withTag("application_name", applicationStart.appName()) + .withTag("djm.tags", ddTags) + .withTag("spark_user", applicationStart.sparkUser()); + + applicationStart.appAttemptId().foreach(id -> builder.withTag("app_attempt_id", id)); + } + + captureApplicationParameters(builder); + captureEmrStepId(builder); + captureOpenlineageJobInfo(builder); + + // captureOpenlineageContextIfPresent and predeterminedTraceIdContext are intentionally NOT + // applied — per-session spans must be independent trace roots. + + AgentSpan sessionAppSpan = builder.start(); + sessionAppSpan.setMeasured(true); + setDataJobsSamplingPriority(sessionAppSpan); + + if (perSessionApplicationSpans.size() < MAX_COLLECTION_SIZE) { + perSessionApplicationSpans.put(sessionId, sessionAppSpan); + perSessionApplicationMetrics.put(sessionId, new SparkAggregatedTaskMetrics()); + } + return sessionAppSpan; + } + + private static String getSparkConnectSessionId(Properties properties) { + if (properties == null) { + return null; + } + String jobTags = properties.getProperty("spark.jobTags"); + if (jobTags == null) { + return null; + } + for (String tag : jobTags.split(",")) { + tag = tag.trim(); + if (tag.startsWith("spark-connect-session-")) { + return tag.substring("spark-connect-session-".length()); + } + } + return null; + } + @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobCount++; @@ -507,6 +610,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { String batchKey = getStreamingBatchKey(jobStart.properties()); Long sqlExecutionId = getSqlExecutionId(jobStart.properties()); + String connectSessionId = getSparkConnectSessionId(jobStart.properties()); AgentSpan sqlSpan = null; if (sqlExecutionId != null) { @@ -531,6 +635,11 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobSpanBuilder.asChildOf(batchSpan.context()); } else if (isRunningOnDatabricks) { addDatabricksSpecificTags(jobSpanBuilder, jobStart.properties(), true); + } else if (connectSessionId != null) { + AgentSpan sessionAppSpan = + getOrCreatePerSessionApplicationSpan( + connectSessionId, jobStart.time(), jobStart.properties()); + jobSpanBuilder.asChildOf(sessionAppSpan.context()); } else { // In non-databricks, non-streaming env, the spark application is the local root span initApplicationSpanIfNotInitialized(); @@ -546,6 +655,9 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { for (int stageId : getSparkJobStageIds(jobStart)) { stageToJob.put(stageId, jobStart.jobId()); } + if (connectSessionId != null) { + jobToSessionId.put(jobStart.jobId(), connectSessionId); + } jobSpans.put(jobStart.jobId(), jobSpan); notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart); } @@ -557,6 +669,8 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { return; } + String connectSessionId = jobToSessionId.remove(jobEnd.jobId()); + if (jobEnd.jobResult() instanceof JobFailed) { JobFailed jobFailed = (JobFailed) jobEnd.jobResult(); Exception exception = jobFailed.exception(); @@ -571,12 +685,22 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { // Only propagate the error to the application if it is not a cancellation if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) { - lastJobFailed = true; - lastJobFailedMessage = errorMessage; - lastJobFailedStackTrace = errorStackTrace; + if (connectSessionId != null) { + perSessionLastJobFailed.put(connectSessionId, true); + perSessionLastJobFailedMessage.put(connectSessionId, errorMessage); + perSessionLastJobFailedStackTrace.put(connectSessionId, errorStackTrace); + } else { + lastJobFailed = true; + lastJobFailedMessage = errorMessage; + lastJobFailedStackTrace = errorStackTrace; + } } } else { - lastJobFailed = false; + if (connectSessionId != null) { + perSessionLastJobFailed.put(connectSessionId, false); + } else { + lastJobFailed = false; + } lastSqlFailed = false; } @@ -683,12 +807,21 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl Properties prop = stageProperties.remove(stageSpanKey); Long sqlExecutionId = getSqlExecutionId(prop); + String connectSessionId = getSparkConnectSessionId(prop); SparkAggregatedTaskMetrics stageMetric = stageMetrics.remove(stageSpanKey); if (stageMetric != null) { stageMetric.computeSkew(); stageMetric.setSpanMetrics(span); - applicationMetrics.accumulateStageMetrics(stageMetric); + if (connectSessionId != null) { + SparkAggregatedTaskMetrics sessionMetrics = + perSessionApplicationMetrics.get(connectSessionId); + if (sessionMetrics != null) { + sessionMetrics.accumulateStageMetrics(stageMetric); + } + } else { + applicationMetrics.accumulateStageMetrics(stageMetric); + } jobMetrics .computeIfAbsent(jobId, k -> new SparkAggregatedTaskMetrics()) diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java new file mode 100644 index 00000000000..af5dfa81faa --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java @@ -0,0 +1,365 @@ +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.core.DDSpan; +import datadog.trace.instrumentation.spark.DatadogSpark213Listener; +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; +import org.apache.spark.SparkConf; +import org.apache.spark.scheduler.JobFailed; +import org.apache.spark.scheduler.JobSucceeded$; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; +import org.junit.jupiter.api.Test; +import scala.Option; +import scala.collection.immutable.Map$; +import scala.collection.immutable.Nil$; +import scala.collection.immutable.Seq; +import scala.collection.immutable.Set$; + +/** + * Tests that Spark Connect jobs emit one spark.application root span per session (independent + * traces), with SQL/job spans nested directly under the per-session app span. + */ +class SparkConnectListenerTest extends AbstractInstrumentationTest { + + @Test + void sqlSpanIsNestedUnderSessionSpanNotApplicationSpan() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1500L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1500L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + List trace = writer.get(0); + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-abc", appSpan.getTag("session_id")); + assertEquals(true, appSpan.getTag("spark.connect.server")); + } + + @Test + void separateSessionsGetDistinctSessionSpans() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-1", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "session-2", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark")), + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + DDSpan appSpan0 = + writer.get(0).stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .findFirst() + .orElseThrow(() -> new AssertionError("No spark.application span in trace 0")); + DDSpan appSpan1 = + writer.get(1).stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .findFirst() + .orElseThrow(() -> new AssertionError("No spark.application span in trace 1")); + + List sessionIds = + Arrays.asList( + (String) appSpan0.getTag("session_id"), (String) appSpan1.getTag("session_id")); + assertTrue(sessionIds.contains("session-1")); + assertTrue(sessionIds.contains("session-2")); + } + + @Test + void sameSessionReusesExistingSessionSpan() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + // Two SQL queries from the same session + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "session-abc", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + List trace = writer.get(0); + long appSpanCount = + trace.stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .count(); + assertEquals(1, appSpanCount); + + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-abc", appSpan.getTag("session_id")); + } + + @Test + void nonConnectJobOnConnectServerIsTracedSeparately() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + // Connect job for session-abc + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + // Non-Connect job (no spark.jobTags) + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(plainJobStartEvent(2, 1600L, 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // Expect 2 traces: one per-session app span, one global app span + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark")), + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + // One trace has session_id (Connect), one does not (global) + DDSpan sessionAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") != null) + .findFirst() + .orElse(null); + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElse(null); + + assertNotNull(sessionAppSpan, "Expected a per-session application span"); + assertNotNull(globalAppSpan, "Expected a global application span"); + assertEquals("session-abc", sessionAppSpan.getTag("session_id")); + assertEquals(true, sessionAppSpan.getTag("spark.connect.server")); + } + + @Test + void connectSessionJobFailureIsAttributedToSession() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-fail", 1L)); + listener.onJobEnd( + new SparkListenerJobEnd(1, 1500L, new JobFailed(new RuntimeException("job error")))); + listener.onOtherEvent(sqlEndEvent(1L, 1500L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark").error(true), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark").error(true))); + + List trace = writer.get(0); + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-fail", appSpan.getTag("session_id")); + assertTrue(appSpan.isError(), "Per-session app span should have error set"); + } + + // region Helpers + + private static DDSpan findSpan(List trace, String operationName) { + return trace.stream() + .filter(s -> operationName.equals(s.getOperationName().toString())) + .findFirst() + .orElse(null); + } + + private static SparkListenerApplicationStart appStartEvent(long time) { + return new SparkListenerApplicationStart( + "test_app", + Option.apply("test_app_id"), + time, + "test_user", + Option.apply("1"), + Option.empty(), + Option.empty()); + } + + private static SparkListenerJobStart connectJobStartEvent( + int jobId, long time, String sessionId, long sqlExecutionId) { + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", String.valueOf(sqlExecutionId)); + props.setProperty("spark.jobTags", "spark-connect-session-" + sessionId); + + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + return new SparkListenerJobStart(jobId, time, emptyStages, props); + } + + private static SparkListenerJobStart plainJobStartEvent( + int jobId, long time, long sqlExecutionId) { + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", String.valueOf(sqlExecutionId)); + + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + return new SparkListenerJobStart(jobId, time, emptyStages, props); + } + + /** + * Constructs {@link SparkListenerSQLExecutionStart} using reflection to handle constructor + * differences across Spark versions (3.2 has 6 args, 3.3 has 7 args, 3.4 adds rootExecutionId, + * 3.5 adds jobTags). + */ + @SuppressWarnings("unchecked") + private static SparkListenerSQLExecutionStart sqlStartEvent(long executionId, long time) + throws ReflectiveOperationException { + SparkPlanInfo planInfo = + new SparkPlanInfo( + "scan", + "scan", + (Seq) (Object) Nil$.MODULE$, + Map$.MODULE$.empty(), + (Seq) (Object) Nil$.MODULE$); + + Constructor[] ctors = SparkListenerSQLExecutionStart.class.getConstructors(); + Arrays.sort(ctors, Comparator.comparingInt(Constructor::getParameterCount)); + + for (Constructor ctor : ctors) { + int n = ctor.getParameterCount(); + if (n == 6) { + // Spark 3.2: (Long, String, String, String, SparkPlanInfo, Long) + return (SparkListenerSQLExecutionStart) + ctor.newInstance(executionId, "SELECT 1", "", "", planInfo, time); + } else if (n == 7) { + // Spark 3.3: (Long, String, String, String, SparkPlanInfo, Long, Map) + return (SparkListenerSQLExecutionStart) + ctor.newInstance(executionId, "SELECT 1", "", "", planInfo, time, Map$.MODULE$.empty()); + } else if (n == 8) { + // Spark 3.4: (Long, Option, String, String, String, SparkPlanInfo, Long, Map) + return (SparkListenerSQLExecutionStart) + ctor.newInstance( + executionId, + Option.empty(), + "SELECT 1", + "", + "", + planInfo, + time, + Map$.MODULE$.empty()); + } else if (n == 9) { + // Spark 3.5+: (Long, Option, String, String, String, SparkPlanInfo, Long, Map, Set) + return (SparkListenerSQLExecutionStart) + ctor.newInstance( + executionId, + Option.empty(), + "SELECT 1", + "", + "", + planInfo, + time, + Map$.MODULE$.empty(), + Set$.MODULE$.empty()); + } + } + throw new IllegalStateException( + "No compatible SparkListenerSQLExecutionStart constructor found in " + + Arrays.toString(ctors)); + } + + private static SparkListenerSQLExecutionEnd sqlEndEvent(long executionId, long time) + throws ReflectiveOperationException { + Constructor[] ctors = SparkListenerSQLExecutionEnd.class.getConstructors(); + Arrays.sort(ctors, Comparator.comparingInt(Constructor::getParameterCount)); + + for (Constructor ctor : ctors) { + int n = ctor.getParameterCount(); + if (n == 2) { + // Spark 3.2: (Long, Long) + return (SparkListenerSQLExecutionEnd) ctor.newInstance(executionId, time); + } else if (n == 3) { + // Spark 3.4+: (Long, Long, Option) + return (SparkListenerSQLExecutionEnd) ctor.newInstance(executionId, time, Option.empty()); + } + } + throw new IllegalStateException( + "No compatible SparkListenerSQLExecutionEnd constructor found in " + + Arrays.toString(ctors)); + } + + // endregion +} From 5f862e9f6967e016b19faae672eb2fb72edef80b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 1 Jun 2026 15:43:59 +0200 Subject: [PATCH 2/5] Fix cap overflow, lastSqlFailed cross-session clearing, and empty session id --- .../spark/AbstractDatadogSparkListener.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 3550be8c7ef..15979e30f35 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -539,8 +539,16 @@ private AgentSpan getOrCreatePerSessionApplicationSpan( return span; } + if (perSessionApplicationSpans.size() >= MAX_COLLECTION_SIZE) { + // Cap exceeded: fall back to the global application span so this session's children + // are still parented and the started span is never orphaned. + initApplicationSpanIfNotInitialized(); + return applicationSpan; + } + AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", jobProperties) + // 1µs before first child so this span sorts strictly before its children. .withStartTimestamp(timeMs * 1000 - 1) .withTag("session_id", sessionId) .withTag("spark.connect.server", true); @@ -571,10 +579,8 @@ private AgentSpan getOrCreatePerSessionApplicationSpan( sessionAppSpan.setMeasured(true); setDataJobsSamplingPriority(sessionAppSpan); - if (perSessionApplicationSpans.size() < MAX_COLLECTION_SIZE) { - perSessionApplicationSpans.put(sessionId, sessionAppSpan); - perSessionApplicationMetrics.put(sessionId, new SparkAggregatedTaskMetrics()); - } + perSessionApplicationSpans.put(sessionId, sessionAppSpan); + perSessionApplicationMetrics.put(sessionId, new SparkAggregatedTaskMetrics()); return sessionAppSpan; } @@ -589,7 +595,10 @@ private static String getSparkConnectSessionId(Properties properties) { for (String tag : jobTags.split(",")) { tag = tag.trim(); if (tag.startsWith("spark-connect-session-")) { - return tag.substring("spark-connect-session-".length()); + String id = tag.substring("spark-connect-session-".length()); + if (!id.isEmpty()) { + return id; + } } } return null; @@ -685,7 +694,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { // Only propagate the error to the application if it is not a cancellation if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) { - if (connectSessionId != null) { + if (connectSessionId != null && perSessionApplicationSpans.containsKey(connectSessionId)) { perSessionLastJobFailed.put(connectSessionId, true); perSessionLastJobFailedMessage.put(connectSessionId, errorMessage); perSessionLastJobFailedStackTrace.put(connectSessionId, errorStackTrace); @@ -696,12 +705,12 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { } } } else { - if (connectSessionId != null) { + if (connectSessionId != null && perSessionApplicationSpans.containsKey(connectSessionId)) { perSessionLastJobFailed.put(connectSessionId, false); } else { lastJobFailed = false; + lastSqlFailed = false; } - lastSqlFailed = false; } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); From c3237799d838cdcc672d441e9406b4a9793d062b Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 1 Jun 2026 16:25:10 +0200 Subject: [PATCH 3/5] Add cap guard, TODO comment, and missing tests for per-session spark.application spans --- .../spark/AbstractDatadogSparkListener.java | 10 +- .../test/java/SparkConnectListenerTest.java | 105 ++++++++++++++++++ 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 15979e30f35..4562a40ba67 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -87,7 +87,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; - private final int MAX_COLLECTION_SIZE = 5000; + private int MAX_COLLECTION_SIZE = 5000; private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; @@ -368,6 +368,12 @@ public synchronized void finishApplication( } applicationEnded = true; + // TODO: per-session app spans are closed here (server shutdown) rather than when the session + // actually ends — so their duration is "first job → server shutdown". Spark Connect does emit + // a server-side session-close event, but it is not surfaced through SparkListener today. + // When that hook becomes available, finish the span there and remove it from the map so that + // long-lived servers don't accumulate unbounded open spans. + // Finish per-session application spans before the guard below, because a pure Connect server // has applicationSpan == null with jobCount > 0, which would cause the guard to return early // and skip finishing the per-session spans entirely. @@ -664,7 +670,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { for (int stageId : getSparkJobStageIds(jobStart)) { stageToJob.put(stageId, jobStart.jobId()); } - if (connectSessionId != null) { + if (connectSessionId != null && jobToSessionId.size() < MAX_COLLECTION_SIZE) { jobToSessionId.put(jobStart.jobId(), connectSessionId); } jobSpans.put(jobStart.jobId(), jobSpan); diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java index af5dfa81faa..db225bd4f18 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java @@ -2,7 +2,9 @@ import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; import static datadog.trace.agent.test.assertions.TraceMatcher.trace; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import datadog.trace.agent.test.AbstractInstrumentationTest; @@ -239,6 +241,109 @@ void connectSessionJobFailureIsAttributedToSession() throws Exception { assertTrue(appSpan.isError(), "Per-session app span should have error set"); } + @Test + void sessionCapOverflowFallsBackToGlobalSpan() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + listener.onApplicationStart(appStartEvent(1000L)); + + // Shrink the cap to 1 via reflection so we can overflow with a single extra session. + java.lang.reflect.Field capField = + listener.getClass().getSuperclass().getDeclaredField("MAX_COLLECTION_SIZE"); + capField.setAccessible(true); + capField.setInt(listener, 1); + + // First session fills the cap. + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-at-cap", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + // Second session overflows — must fall back to the global span, not create an orphan. + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "overflow-session", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + boolean hasOverflowSession = + writer.stream() + .flatMap(List::stream) + .anyMatch(s -> "overflow-session".equals(s.getTag("session_id"))); + assertFalse( + hasOverflowSession, "Overflow session must not create an independent per-session trace"); + } + + @Test + void emptySessionIdIsRejected() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + listener.onApplicationStart(appStartEvent(1000L)); + + // spark.jobTags with empty suffix after the prefix should not produce a session span. + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", "1"); + props.setProperty("spark.jobTags", "spark-connect-session-"); + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(new SparkListenerJobStart(1, 1200L, emptyStages, props)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // Should produce the global spark.application trace (no per-session span). + assertEquals(1, writer.size(), "Expected exactly one trace (global app span)"); + DDSpan appSpan = findSpan(writer.get(0), "spark.application"); + assertNotNull(appSpan); + assertNull(appSpan.getTag("session_id"), "Empty session id must not be stored"); + } + + @Test + void connectJobSuccessDoesNotClearGlobalSqlFailure() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + listener.onApplicationStart(appStartEvent(1000L)); + + // A non-Connect job succeeds first — this initializes the global application span. + listener.onOtherEvent(sqlStartEvent(1L, 1050L)); + listener.onJobStart(plainJobStartEvent(1, 1100L, 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1200L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1200L)); + + // A SQL analysis failure fires after the non-Connect job completes. + listener.onSqlFailure(new RuntimeException("analysis error")); + + // A Connect job from a different session then succeeds — must not clear the global SQL failure. + listener.onOtherEvent(sqlStartEvent(2L, 1300L)); + listener.onJobStart(connectJobStartEvent(2, 1400L, "session-ok", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1500L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1500L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // The per-session span must not be errored (the Connect job succeeded). + DDSpan sessionAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> "session-ok".equals(s.getTag("session_id"))) + .findFirst() + .orElseThrow(() -> new AssertionError("No per-session app span found")); + assertFalse( + sessionAppSpan.isError(), "Connect session span must not carry the global SQL error"); + + // The global span must still reflect the SQL failure. + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElseThrow(() -> new AssertionError("No global app span found")); + assertTrue(globalAppSpan.isError(), "Global app span must still reflect the SQL failure"); + } + // region Helpers private static DDSpan findSpan(List trace, String operationName) { From e62cd6dbc126cd27d4707e7a7f8bbabce5450a40 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 1 Jun 2026 17:23:37 +0200 Subject: [PATCH 4/5] Replace mutable MAX_COLLECTION_SIZE with protected getter hook and strengthen overflow test --- .../spark/AbstractDatadogSparkListener.java | 23 +++++++++++----- .../test/java/SparkConnectListenerTest.java | 26 ++++++++++++++----- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 4562a40ba67..4b6ced96b75 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -87,7 +87,13 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; - private int MAX_COLLECTION_SIZE = 5000; + private static final int MAX_COLLECTION_SIZE = 5000; + + /** Overridable in tests to exercise collection-cap behaviour without filling 5000 entries. */ + protected int maxCollectionSize() { + return MAX_COLLECTION_SIZE; + } + private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; @@ -545,7 +551,7 @@ private AgentSpan getOrCreatePerSessionApplicationSpan( return span; } - if (perSessionApplicationSpans.size() >= MAX_COLLECTION_SIZE) { + if (perSessionApplicationSpans.size() >= maxCollectionSize()) { // Cap exceeded: fall back to the global application span so this session's children // are still parented and the started span is never orphaned. initApplicationSpanIfNotInitialized(); @@ -613,7 +619,7 @@ private static String getSparkConnectSessionId(Properties properties) { @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobCount++; - if (jobSpans.size() > MAX_COLLECTION_SIZE) { + if (jobSpans.size() > maxCollectionSize()) { return; } @@ -670,7 +676,10 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { for (int stageId : getSparkJobStageIds(jobStart)) { stageToJob.put(stageId, jobStart.jobId()); } - if (connectSessionId != null && jobToSessionId.size() < MAX_COLLECTION_SIZE) { + // If the cap is reached the put is dropped; onJobEnd then recovers connectSessionId as null, + // so a failure on that job is attributed to the global lastJobFailed instead of the session. + // This requires >maxCollectionSize() in-flight Connect jobs concurrently, which is unlikely. + if (connectSessionId != null && jobToSessionId.size() < maxCollectionSize()) { jobToSessionId.put(jobStart.jobId(), connectSessionId); } jobSpans.put(jobStart.jobId(), jobSpan); @@ -730,7 +739,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { @Override public synchronized void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - if (stageSpans.size() > MAX_COLLECTION_SIZE) { + if (stageSpans.size() > maxCollectionSize()) { return; } @@ -968,7 +977,7 @@ public synchronized void onExecutorAdded(SparkListenerExecutorAdded executorAdde currentExecutorCount += 1; maxExecutorCount = Math.max(maxExecutorCount, currentExecutorCount); - if (liveExecutors.size() <= MAX_COLLECTION_SIZE) { + if (liveExecutors.size() <= maxCollectionSize()) { liveExecutors.put(executorAdded.executorId(), executorAdded); } } @@ -1089,7 +1098,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) private synchronized void onStreamingQueryStartedEvent( StreamingQueryListener.QueryStartedEvent event) { - if (streamingQueries.size() > MAX_COLLECTION_SIZE) { + if (streamingQueries.size() > maxCollectionSize()) { return; } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java index db225bd4f18..8ede00b9320 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java @@ -243,16 +243,17 @@ void connectSessionJobFailureIsAttributedToSession() throws Exception { @Test void sessionCapOverflowFallsBackToGlobalSpan() throws Exception { + // Override maxCollectionSize() to 1 so a second session overflows without creating 5000 + // entries. DatadogSpark213Listener listener = - new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0") { + @Override + protected int maxCollectionSize() { + return 1; + } + }; listener.onApplicationStart(appStartEvent(1000L)); - // Shrink the cap to 1 via reflection so we can overflow with a single extra session. - java.lang.reflect.Field capField = - listener.getClass().getSuperclass().getDeclaredField("MAX_COLLECTION_SIZE"); - capField.setAccessible(true); - capField.setInt(listener, 1); - // First session fills the cap. listener.onOtherEvent(sqlStartEvent(1L, 1100L)); listener.onJobStart(connectJobStartEvent(1, 1200L, "session-at-cap", 1L)); @@ -267,12 +268,23 @@ void sessionCapOverflowFallsBackToGlobalSpan() throws Exception { listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + // No per-session trace for the overflow session. boolean hasOverflowSession = writer.stream() .flatMap(List::stream) .anyMatch(s -> "overflow-session".equals(s.getTag("session_id"))); assertFalse( hasOverflowSession, "Overflow session must not create an independent per-session trace"); + + // The overflow session's spans are parented under the global spark.application (no session_id). + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElse(null); + assertNotNull(globalAppSpan, "Overflow session must fall back to the global application span"); } @Test From 2e460d84d343c81490e86e641777bcde729fa891 Mon Sep 17 00:00:00 2001 From: Adrien Boitreaud Date: Mon, 1 Jun 2026 18:27:23 +0200 Subject: [PATCH 5/5] Fix Connect session tag prefix: use SparkConnect_OperationTag_ instead of non-existent spark-connect-session- --- .../spark/AbstractDatadogSparkListener.java | 27 +++++++++++++++---- .../test/java/SparkConnectListenerTest.java | 10 ++++--- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 4b6ced96b75..4822c39ef07 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -596,6 +596,13 @@ private AgentSpan getOrCreatePerSessionApplicationSpan( return sessionAppSpan; } + // Spark Connect adds + // "SparkConnect_OperationTag_User_{userId}_Session_{sessionId}_Operation_{opId}" + // to every job's spark.jobTags via SparkContext.addJobTag in ExecuteThreadRunner.scala. + private static final String CONNECT_OP_TAG_PREFIX = "SparkConnect_OperationTag_"; + private static final String SESSION_MARKER = "_Session_"; + private static final String OPERATION_MARKER = "_Operation_"; + private static String getSparkConnectSessionId(Properties properties) { if (properties == null) { return null; @@ -606,11 +613,21 @@ private static String getSparkConnectSessionId(Properties properties) { } for (String tag : jobTags.split(",")) { tag = tag.trim(); - if (tag.startsWith("spark-connect-session-")) { - String id = tag.substring("spark-connect-session-".length()); - if (!id.isEmpty()) { - return id; - } + if (!tag.startsWith(CONNECT_OP_TAG_PREFIX)) { + continue; + } + int sessionIdx = tag.indexOf(SESSION_MARKER); + if (sessionIdx < 0) { + continue; + } + int sessionStart = sessionIdx + SESSION_MARKER.length(); + int operationIdx = tag.indexOf(OPERATION_MARKER, sessionStart); + if (operationIdx <= sessionStart) { + continue; + } + String sessionId = tag.substring(sessionStart, operationIdx); + if (!sessionId.isEmpty()) { + return sessionId; } } return null; diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java index 8ede00b9320..ec3a46b8129 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java @@ -293,10 +293,11 @@ void emptySessionIdIsRejected() throws Exception { new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); listener.onApplicationStart(appStartEvent(1000L)); - // spark.jobTags with empty suffix after the prefix should not produce a session span. + // A malformed OperationTag where the Session segment is empty must not produce a session span. Properties props = new Properties(); props.setProperty("spark.sql.execution.id", "1"); - props.setProperty("spark.jobTags", "spark-connect-session-"); + props.setProperty( + "spark.jobTags", "SparkConnect_OperationTag_User_testUser_Session__Operation_op-1"); @SuppressWarnings("unchecked") Seq emptyStages = (Seq) (Object) Nil$.MODULE$; listener.onOtherEvent(sqlStartEvent(1L, 1100L)); @@ -380,7 +381,10 @@ private static SparkListenerJobStart connectJobStartEvent( int jobId, long time, String sessionId, long sqlExecutionId) { Properties props = new Properties(); props.setProperty("spark.sql.execution.id", String.valueOf(sqlExecutionId)); - props.setProperty("spark.jobTags", "spark-connect-session-" + sessionId); + // Real Spark Connect tag format: ExecuteHolder.scala ExecuteJobTag + props.setProperty( + "spark.jobTags", + "SparkConnect_OperationTag_User_testUser_Session_" + sessionId + "_Operation_op-1"); @SuppressWarnings("unchecked") Seq emptyStages = (Seq) (Object) Nil$.MODULE$;