diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 817aa1f0d84..4822c39ef07 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -87,7 +87,13 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { public static volatile boolean finishTraceOnApplicationEnd = true; public static volatile boolean isPysparkShell = false; - private final int MAX_COLLECTION_SIZE = 5000; + private static final int MAX_COLLECTION_SIZE = 5000; + + /** Overridable in tests to exercise collection-cap behaviour without filling 5000 entries. */ + protected int maxCollectionSize() { + return MAX_COLLECTION_SIZE; + } + private final int MAX_ACCUMULATOR_SIZE = 50000; private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags."; private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage"; @@ -115,6 +121,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap stageToJob = new HashMap<>(); private final HashMap stageProperties = new HashMap<>(); + private final HashMap jobToSessionId = new HashMap<>(); private final SparkAggregatedTaskMetrics applicationMetrics = new SparkAggregatedTaskMetrics(); private final HashMap streamingBatchMetrics = new HashMap<>(); @@ -127,6 +134,12 @@ public abstract class AbstractDatadogSparkListener extends SparkListener { private final HashMap sqlQueries = new HashMap<>(); protected final HashMap sqlPlans = new HashMap<>(); private final HashMap liveExecutors = new HashMap<>(); + private final HashMap perSessionApplicationSpans = new HashMap<>(); + private final HashMap perSessionApplicationMetrics = + new HashMap<>(); + private final HashMap perSessionLastJobFailed = new HashMap<>(); + private final HashMap perSessionLastJobFailedMessage = new HashMap<>(); + private final HashMap perSessionLastJobFailedStackTrace = new HashMap<>(); private final Map accumulatorToStageID = new HashMap<>(); @@ -361,6 +374,39 @@ public synchronized void finishApplication( } applicationEnded = true; + // TODO: per-session app spans are closed here (server shutdown) rather than when the session + // actually ends — so their duration is "first job → server shutdown". Spark Connect does emit + // a server-side session-close event, but it is not surfaced through SparkListener today. + // When that hook becomes available, finish the span there and remove it from the map so that + // long-lived servers don't accumulate unbounded open spans. + + // Finish per-session application spans before the guard below, because a pure Connect server + // has applicationSpan == null with jobCount > 0, which would cause the guard to return early + // and skip finishing the per-session spans entirely. + for (Map.Entry entry : perSessionApplicationSpans.entrySet()) { + String sessionId = entry.getKey(); + AgentSpan sessionAppSpan = entry.getValue(); + + if (Boolean.TRUE.equals(perSessionLastJobFailed.get(sessionId))) { + sessionAppSpan.setError(true); + sessionAppSpan.setTag(DDTags.ERROR_TYPE, "Spark Application Failed"); + sessionAppSpan.setTag(DDTags.ERROR_MSG, perSessionLastJobFailedMessage.get(sessionId)); + sessionAppSpan.setTag(DDTags.ERROR_STACK, perSessionLastJobFailedStackTrace.get(sessionId)); + } + + SparkAggregatedTaskMetrics sessionMetrics = perSessionApplicationMetrics.get(sessionId); + if (sessionMetrics != null) { + sessionMetrics.setSpanMetrics(sessionAppSpan); + } + + sessionAppSpan.finish(time * 1000); + } + perSessionApplicationSpans.clear(); + perSessionApplicationMetrics.clear(); + perSessionLastJobFailed.clear(); + perSessionLastJobFailedMessage.clear(); + perSessionLastJobFailedStackTrace.clear(); + if ((applicationSpan == null && jobCount > 0) || isRunningOnDatabricks) { // If the application span is not initialized, but spark jobs have been executed, all those // spark jobs were databricks or streaming. In this case we don't send the application span @@ -466,6 +512,8 @@ private AgentSpan getOrCreateSqlSpan( return null; } + String connectSessionId = getSparkConnectSessionId(jobProperties); + AgentTracer.SpanBuilder spanBuilder = buildSparkSpan("spark.sql", jobProperties) .withStartTimestamp(queryStart.time() * 1000) @@ -479,6 +527,10 @@ private AgentSpan getOrCreateSqlSpan( AgentSpan batchSpan = getOrCreateStreamingBatchSpan(batchKey, queryStart.time(), jobProperties); spanBuilder.asChildOf(batchSpan.context()); + } else if (connectSessionId != null) { + AgentSpan sessionAppSpan = + getOrCreatePerSessionApplicationSpan(connectSessionId, queryStart.time(), jobProperties); + spanBuilder.asChildOf(sessionAppSpan.context()); } else if (isRunningOnDatabricks) { addDatabricksSpecificTags(spanBuilder, jobProperties, true); } else { @@ -492,10 +544,99 @@ private AgentSpan getOrCreateSqlSpan( return sqlSpan; } + private AgentSpan getOrCreatePerSessionApplicationSpan( + String sessionId, long timeMs, Properties jobProperties) { + AgentSpan span = perSessionApplicationSpans.get(sessionId); + if (span != null) { + return span; + } + + if (perSessionApplicationSpans.size() >= maxCollectionSize()) { + // Cap exceeded: fall back to the global application span so this session's children + // are still parented and the started span is never orphaned. + initApplicationSpanIfNotInitialized(); + return applicationSpan; + } + + AgentTracer.SpanBuilder builder = + buildSparkSpan("spark.application", jobProperties) + // 1µs before first child so this span sorts strictly before its children. + .withStartTimestamp(timeMs * 1000 - 1) + .withTag("session_id", sessionId) + .withTag("spark.connect.server", true); + + if (applicationStart != null) { + String ddTags = + Config.get().getGlobalTags().entrySet().stream() + .sorted(Map.Entry.comparingByKey()) + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",")); + + builder + .withTag("application_name", applicationStart.appName()) + .withTag("djm.tags", ddTags) + .withTag("spark_user", applicationStart.sparkUser()); + + applicationStart.appAttemptId().foreach(id -> builder.withTag("app_attempt_id", id)); + } + + captureApplicationParameters(builder); + captureEmrStepId(builder); + captureOpenlineageJobInfo(builder); + + // captureOpenlineageContextIfPresent and predeterminedTraceIdContext are intentionally NOT + // applied — per-session spans must be independent trace roots. + + AgentSpan sessionAppSpan = builder.start(); + sessionAppSpan.setMeasured(true); + setDataJobsSamplingPriority(sessionAppSpan); + + perSessionApplicationSpans.put(sessionId, sessionAppSpan); + perSessionApplicationMetrics.put(sessionId, new SparkAggregatedTaskMetrics()); + return sessionAppSpan; + } + + // Spark Connect adds + // "SparkConnect_OperationTag_User_{userId}_Session_{sessionId}_Operation_{opId}" + // to every job's spark.jobTags via SparkContext.addJobTag in ExecuteThreadRunner.scala. + private static final String CONNECT_OP_TAG_PREFIX = "SparkConnect_OperationTag_"; + private static final String SESSION_MARKER = "_Session_"; + private static final String OPERATION_MARKER = "_Operation_"; + + private static String getSparkConnectSessionId(Properties properties) { + if (properties == null) { + return null; + } + String jobTags = properties.getProperty("spark.jobTags"); + if (jobTags == null) { + return null; + } + for (String tag : jobTags.split(",")) { + tag = tag.trim(); + if (!tag.startsWith(CONNECT_OP_TAG_PREFIX)) { + continue; + } + int sessionIdx = tag.indexOf(SESSION_MARKER); + if (sessionIdx < 0) { + continue; + } + int sessionStart = sessionIdx + SESSION_MARKER.length(); + int operationIdx = tag.indexOf(OPERATION_MARKER, sessionStart); + if (operationIdx <= sessionStart) { + continue; + } + String sessionId = tag.substring(sessionStart, operationIdx); + if (!sessionId.isEmpty()) { + return sessionId; + } + } + return null; + } + @Override public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobCount++; - if (jobSpans.size() > MAX_COLLECTION_SIZE) { + if (jobSpans.size() > maxCollectionSize()) { return; } @@ -507,6 +648,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { String batchKey = getStreamingBatchKey(jobStart.properties()); Long sqlExecutionId = getSqlExecutionId(jobStart.properties()); + String connectSessionId = getSparkConnectSessionId(jobStart.properties()); AgentSpan sqlSpan = null; if (sqlExecutionId != null) { @@ -531,6 +673,11 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { jobSpanBuilder.asChildOf(batchSpan.context()); } else if (isRunningOnDatabricks) { addDatabricksSpecificTags(jobSpanBuilder, jobStart.properties(), true); + } else if (connectSessionId != null) { + AgentSpan sessionAppSpan = + getOrCreatePerSessionApplicationSpan( + connectSessionId, jobStart.time(), jobStart.properties()); + jobSpanBuilder.asChildOf(sessionAppSpan.context()); } else { // In non-databricks, non-streaming env, the spark application is the local root span initApplicationSpanIfNotInitialized(); @@ -546,6 +693,12 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) { for (int stageId : getSparkJobStageIds(jobStart)) { stageToJob.put(stageId, jobStart.jobId()); } + // If the cap is reached the put is dropped; onJobEnd then recovers connectSessionId as null, + // so a failure on that job is attributed to the global lastJobFailed instead of the session. + // This requires >maxCollectionSize() in-flight Connect jobs concurrently, which is unlikely. + if (connectSessionId != null && jobToSessionId.size() < maxCollectionSize()) { + jobToSessionId.put(jobStart.jobId(), connectSessionId); + } jobSpans.put(jobStart.jobId(), jobSpan); notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart); } @@ -557,6 +710,8 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { return; } + String connectSessionId = jobToSessionId.remove(jobEnd.jobId()); + if (jobEnd.jobResult() instanceof JobFailed) { JobFailed jobFailed = (JobFailed) jobEnd.jobResult(); Exception exception = jobFailed.exception(); @@ -571,13 +726,23 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { // Only propagate the error to the application if it is not a cancellation if (errorMessage != null && !errorMessage.toLowerCase().contains("cancelled")) { - lastJobFailed = true; - lastJobFailedMessage = errorMessage; - lastJobFailedStackTrace = errorStackTrace; + if (connectSessionId != null && perSessionApplicationSpans.containsKey(connectSessionId)) { + perSessionLastJobFailed.put(connectSessionId, true); + perSessionLastJobFailedMessage.put(connectSessionId, errorMessage); + perSessionLastJobFailedStackTrace.put(connectSessionId, errorStackTrace); + } else { + lastJobFailed = true; + lastJobFailedMessage = errorMessage; + lastJobFailedStackTrace = errorStackTrace; + } } } else { - lastJobFailed = false; - lastSqlFailed = false; + if (connectSessionId != null && perSessionApplicationSpans.containsKey(connectSessionId)) { + perSessionLastJobFailed.put(connectSessionId, false); + } else { + lastJobFailed = false; + lastSqlFailed = false; + } } SparkAggregatedTaskMetrics metrics = jobMetrics.remove(jobEnd.jobId()); @@ -591,7 +756,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) { @Override public synchronized void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - if (stageSpans.size() > MAX_COLLECTION_SIZE) { + if (stageSpans.size() > maxCollectionSize()) { return; } @@ -683,12 +848,21 @@ public synchronized void onStageCompleted(SparkListenerStageCompleted stageCompl Properties prop = stageProperties.remove(stageSpanKey); Long sqlExecutionId = getSqlExecutionId(prop); + String connectSessionId = getSparkConnectSessionId(prop); SparkAggregatedTaskMetrics stageMetric = stageMetrics.remove(stageSpanKey); if (stageMetric != null) { stageMetric.computeSkew(); stageMetric.setSpanMetrics(span); - applicationMetrics.accumulateStageMetrics(stageMetric); + if (connectSessionId != null) { + SparkAggregatedTaskMetrics sessionMetrics = + perSessionApplicationMetrics.get(connectSessionId); + if (sessionMetrics != null) { + sessionMetrics.accumulateStageMetrics(stageMetric); + } + } else { + applicationMetrics.accumulateStageMetrics(stageMetric); + } jobMetrics .computeIfAbsent(jobId, k -> new SparkAggregatedTaskMetrics()) @@ -820,7 +994,7 @@ public synchronized void onExecutorAdded(SparkListenerExecutorAdded executorAdde currentExecutorCount += 1; maxExecutorCount = Math.max(maxExecutorCount, currentExecutorCount); - if (liveExecutors.size() <= MAX_COLLECTION_SIZE) { + if (liveExecutors.size() <= maxCollectionSize()) { liveExecutors.put(executorAdded.executorId(), executorAdded); } } @@ -941,7 +1115,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) private synchronized void onStreamingQueryStartedEvent( StreamingQueryListener.QueryStartedEvent event) { - if (streamingQueries.size() > MAX_COLLECTION_SIZE) { + if (streamingQueries.size() > maxCollectionSize()) { return; } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java new file mode 100644 index 00000000000..ec3a46b8129 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/test/java/SparkConnectListenerTest.java @@ -0,0 +1,486 @@ +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.core.DDSpan; +import datadog.trace.instrumentation.spark.DatadogSpark213Listener; +import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; +import org.apache.spark.SparkConf; +import org.apache.spark.scheduler.JobFailed; +import org.apache.spark.scheduler.JobSucceeded$; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.StageInfo; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.apache.spark.sql.execution.metric.SQLMetricInfo; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; +import org.junit.jupiter.api.Test; +import scala.Option; +import scala.collection.immutable.Map$; +import scala.collection.immutable.Nil$; +import scala.collection.immutable.Seq; +import scala.collection.immutable.Set$; + +/** + * Tests that Spark Connect jobs emit one spark.application root span per session (independent + * traces), with SQL/job spans nested directly under the per-session app span. + */ +class SparkConnectListenerTest extends AbstractInstrumentationTest { + + @Test + void sqlSpanIsNestedUnderSessionSpanNotApplicationSpan() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1500L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1500L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + List trace = writer.get(0); + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-abc", appSpan.getTag("session_id")); + assertEquals(true, appSpan.getTag("spark.connect.server")); + } + + @Test + void separateSessionsGetDistinctSessionSpans() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-1", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "session-2", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark")), + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + DDSpan appSpan0 = + writer.get(0).stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .findFirst() + .orElseThrow(() -> new AssertionError("No spark.application span in trace 0")); + DDSpan appSpan1 = + writer.get(1).stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .findFirst() + .orElseThrow(() -> new AssertionError("No spark.application span in trace 1")); + + List sessionIds = + Arrays.asList( + (String) appSpan0.getTag("session_id"), (String) appSpan1.getTag("session_id")); + assertTrue(sessionIds.contains("session-1")); + assertTrue(sessionIds.contains("session-2")); + } + + @Test + void sameSessionReusesExistingSessionSpan() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + // Two SQL queries from the same session + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "session-abc", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + List trace = writer.get(0); + long appSpanCount = + trace.stream() + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .count(); + assertEquals(1, appSpanCount); + + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-abc", appSpan.getTag("session_id")); + } + + @Test + void nonConnectJobOnConnectServerIsTracedSeparately() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + // Connect job for session-abc + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-abc", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + // Non-Connect job (no spark.jobTags) + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(plainJobStartEvent(2, 1600L, 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // Expect 2 traces: one per-session app span, one global app span + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark")), + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark"), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark"))); + + // One trace has session_id (Connect), one does not (global) + DDSpan sessionAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") != null) + .findFirst() + .orElse(null); + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElse(null); + + assertNotNull(sessionAppSpan, "Expected a per-session application span"); + assertNotNull(globalAppSpan, "Expected a global application span"); + assertEquals("session-abc", sessionAppSpan.getTag("session_id")); + assertEquals(true, sessionAppSpan.getTag("spark.connect.server")); + } + + @Test + void connectSessionJobFailureIsAttributedToSession() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + + listener.onApplicationStart(appStartEvent(1000L)); + + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-fail", 1L)); + listener.onJobEnd( + new SparkListenerJobEnd(1, 1500L, new JobFailed(new RuntimeException("job error")))); + listener.onOtherEvent(sqlEndEvent(1L, 1500L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().operationName("spark.application").type("spark").error(true), + span().operationName("spark.sql").type("spark"), + span().operationName("spark.job").type("spark").error(true))); + + List trace = writer.get(0); + DDSpan appSpan = findSpan(trace, "spark.application"); + assertNotNull(appSpan); + assertEquals("session-fail", appSpan.getTag("session_id")); + assertTrue(appSpan.isError(), "Per-session app span should have error set"); + } + + @Test + void sessionCapOverflowFallsBackToGlobalSpan() throws Exception { + // Override maxCollectionSize() to 1 so a second session overflows without creating 5000 + // entries. + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0") { + @Override + protected int maxCollectionSize() { + return 1; + } + }; + listener.onApplicationStart(appStartEvent(1000L)); + + // First session fills the cap. + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(connectJobStartEvent(1, 1200L, "session-at-cap", 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + + // Second session overflows — must fall back to the global span, not create an orphan. + listener.onOtherEvent(sqlStartEvent(2L, 1500L)); + listener.onJobStart(connectJobStartEvent(2, 1600L, "overflow-session", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1800L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1800L)); + + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // No per-session trace for the overflow session. + boolean hasOverflowSession = + writer.stream() + .flatMap(List::stream) + .anyMatch(s -> "overflow-session".equals(s.getTag("session_id"))); + assertFalse( + hasOverflowSession, "Overflow session must not create an independent per-session trace"); + + // The overflow session's spans are parented under the global spark.application (no session_id). + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElse(null); + assertNotNull(globalAppSpan, "Overflow session must fall back to the global application span"); + } + + @Test + void emptySessionIdIsRejected() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + listener.onApplicationStart(appStartEvent(1000L)); + + // A malformed OperationTag where the Session segment is empty must not produce a session span. + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", "1"); + props.setProperty( + "spark.jobTags", "SparkConnect_OperationTag_User_testUser_Session__Operation_op-1"); + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + listener.onOtherEvent(sqlStartEvent(1L, 1100L)); + listener.onJobStart(new SparkListenerJobStart(1, 1200L, emptyStages, props)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1400L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1400L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // Should produce the global spark.application trace (no per-session span). + assertEquals(1, writer.size(), "Expected exactly one trace (global app span)"); + DDSpan appSpan = findSpan(writer.get(0), "spark.application"); + assertNotNull(appSpan); + assertNull(appSpan.getTag("session_id"), "Empty session id must not be stored"); + } + + @Test + void connectJobSuccessDoesNotClearGlobalSqlFailure() throws Exception { + DatadogSpark213Listener listener = + new DatadogSpark213Listener(new SparkConf(), "test_app_id", "3.5.0"); + listener.onApplicationStart(appStartEvent(1000L)); + + // A non-Connect job succeeds first — this initializes the global application span. + listener.onOtherEvent(sqlStartEvent(1L, 1050L)); + listener.onJobStart(plainJobStartEvent(1, 1100L, 1L)); + listener.onJobEnd(new SparkListenerJobEnd(1, 1200L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(1L, 1200L)); + + // A SQL analysis failure fires after the non-Connect job completes. + listener.onSqlFailure(new RuntimeException("analysis error")); + + // A Connect job from a different session then succeeds — must not clear the global SQL failure. + listener.onOtherEvent(sqlStartEvent(2L, 1300L)); + listener.onJobStart(connectJobStartEvent(2, 1400L, "session-ok", 2L)); + listener.onJobEnd(new SparkListenerJobEnd(2, 1500L, JobSucceeded$.MODULE$)); + listener.onOtherEvent(sqlEndEvent(2L, 1500L)); + listener.onApplicationEnd(new SparkListenerApplicationEnd(2000L)); + + // The per-session span must not be errored (the Connect job succeeded). + DDSpan sessionAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> "session-ok".equals(s.getTag("session_id"))) + .findFirst() + .orElseThrow(() -> new AssertionError("No per-session app span found")); + assertFalse( + sessionAppSpan.isError(), "Connect session span must not carry the global SQL error"); + + // The global span must still reflect the SQL failure. + DDSpan globalAppSpan = + writer.stream() + .flatMap(List::stream) + .filter(s -> "spark.application".equals(s.getOperationName().toString())) + .filter(s -> s.getTag("session_id") == null) + .findFirst() + .orElseThrow(() -> new AssertionError("No global app span found")); + assertTrue(globalAppSpan.isError(), "Global app span must still reflect the SQL failure"); + } + + // region Helpers + + private static DDSpan findSpan(List trace, String operationName) { + return trace.stream() + .filter(s -> operationName.equals(s.getOperationName().toString())) + .findFirst() + .orElse(null); + } + + private static SparkListenerApplicationStart appStartEvent(long time) { + return new SparkListenerApplicationStart( + "test_app", + Option.apply("test_app_id"), + time, + "test_user", + Option.apply("1"), + Option.empty(), + Option.empty()); + } + + private static SparkListenerJobStart connectJobStartEvent( + int jobId, long time, String sessionId, long sqlExecutionId) { + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", String.valueOf(sqlExecutionId)); + // Real Spark Connect tag format: ExecuteHolder.scala ExecuteJobTag + props.setProperty( + "spark.jobTags", + "SparkConnect_OperationTag_User_testUser_Session_" + sessionId + "_Operation_op-1"); + + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + return new SparkListenerJobStart(jobId, time, emptyStages, props); + } + + private static SparkListenerJobStart plainJobStartEvent( + int jobId, long time, long sqlExecutionId) { + Properties props = new Properties(); + props.setProperty("spark.sql.execution.id", String.valueOf(sqlExecutionId)); + + @SuppressWarnings("unchecked") + Seq emptyStages = (Seq) (Object) Nil$.MODULE$; + return new SparkListenerJobStart(jobId, time, emptyStages, props); + } + + /** + * Constructs {@link SparkListenerSQLExecutionStart} using reflection to handle constructor + * differences across Spark versions (3.2 has 6 args, 3.3 has 7 args, 3.4 adds rootExecutionId, + * 3.5 adds jobTags). + */ + @SuppressWarnings("unchecked") + private static SparkListenerSQLExecutionStart sqlStartEvent(long executionId, long time) + throws ReflectiveOperationException { + SparkPlanInfo planInfo = + new SparkPlanInfo( + "scan", + "scan", + (Seq) (Object) Nil$.MODULE$, + Map$.MODULE$.empty(), + (Seq) (Object) Nil$.MODULE$); + + Constructor[] ctors = SparkListenerSQLExecutionStart.class.getConstructors(); + Arrays.sort(ctors, Comparator.comparingInt(Constructor::getParameterCount)); + + for (Constructor ctor : ctors) { + int n = ctor.getParameterCount(); + if (n == 6) { + // Spark 3.2: (Long, String, String, String, SparkPlanInfo, Long) + return (SparkListenerSQLExecutionStart) + ctor.newInstance(executionId, "SELECT 1", "", "", planInfo, time); + } else if (n == 7) { + // Spark 3.3: (Long, String, String, String, SparkPlanInfo, Long, Map) + return (SparkListenerSQLExecutionStart) + ctor.newInstance(executionId, "SELECT 1", "", "", planInfo, time, Map$.MODULE$.empty()); + } else if (n == 8) { + // Spark 3.4: (Long, Option, String, String, String, SparkPlanInfo, Long, Map) + return (SparkListenerSQLExecutionStart) + ctor.newInstance( + executionId, + Option.empty(), + "SELECT 1", + "", + "", + planInfo, + time, + Map$.MODULE$.empty()); + } else if (n == 9) { + // Spark 3.5+: (Long, Option, String, String, String, SparkPlanInfo, Long, Map, Set) + return (SparkListenerSQLExecutionStart) + ctor.newInstance( + executionId, + Option.empty(), + "SELECT 1", + "", + "", + planInfo, + time, + Map$.MODULE$.empty(), + Set$.MODULE$.empty()); + } + } + throw new IllegalStateException( + "No compatible SparkListenerSQLExecutionStart constructor found in " + + Arrays.toString(ctors)); + } + + private static SparkListenerSQLExecutionEnd sqlEndEvent(long executionId, long time) + throws ReflectiveOperationException { + Constructor[] ctors = SparkListenerSQLExecutionEnd.class.getConstructors(); + Arrays.sort(ctors, Comparator.comparingInt(Constructor::getParameterCount)); + + for (Constructor ctor : ctors) { + int n = ctor.getParameterCount(); + if (n == 2) { + // Spark 3.2: (Long, Long) + return (SparkListenerSQLExecutionEnd) ctor.newInstance(executionId, time); + } else if (n == 3) { + // Spark 3.4+: (Long, Long, Option) + return (SparkListenerSQLExecutionEnd) ctor.newInstance(executionId, time, Option.empty()); + } + } + throw new IllegalStateException( + "No compatible SparkListenerSQLExecutionEnd constructor found in " + + Arrays.toString(ctors)); + } + + // endregion +}