diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 9dbcc2203f..c32b55b3f0 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -8,6 +8,7 @@ - Bumped the Databricks SDK for Java dependency from `0.106.0` to `0.118.0`. ### Fixed +- Fixed telemetry misattribution when multiple connections (e.g. Thrift and SEA) are used on the same thread. Per-statement telemetry events could be tagged with another connection's context (e.g. transport mode); each connection's telemetry now uses its own context instead of a shared thread-local value. - Hardened the OAuth U2M token cache at rest (encryption key derivation and file permissions). - Fixed `DatabaseMetaData.getURL()` exposing credentials embedded in the connection URL; secret parameters are now masked (the URL is otherwise unchanged). - Fixed presigned URL credentials not being fully redacted in logs. diff --git a/src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java b/src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java index 72002357ab..67a9ee4438 100644 --- a/src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java +++ b/src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java @@ -77,6 +77,18 @@ public static boolean isTelemetryAllowedForConnection(IDatabricksConnectionConte .isFeatureEnabled(TELEMETRY_FEATURE_FLAG_NAME); } + public static void exportTelemetryLog( + IDatabricksConnectionContext connectionContext, + StatementTelemetryDetails telemetryDetails, + TelemetryLogLevel logLevel) { + exportTelemetryEvent(connectionContext, telemetryDetails, null, null, logLevel); + } + + /** + * @deprecated Use {@link #exportTelemetryLog(IDatabricksConnectionContext, + * StatementTelemetryDetails, TelemetryLogLevel)} instead to avoid stale ThreadLocal context. + */ + @Deprecated public static void exportTelemetryLog( StatementTelemetryDetails telemetryDetails, TelemetryLogLevel logLevel) { exportTelemetryEvent( diff --git a/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollector.java b/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollector.java index 3ea6365509..2b825a410b 100644 --- a/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollector.java +++ b/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollector.java @@ -2,6 +2,7 @@ import static com.databricks.jdbc.telemetry.TelemetryHelper.getStatementIdString; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.api.internal.IDatabricksStatementInternal; import com.databricks.jdbc.common.TelemetryLogLevel; import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; @@ -25,15 +26,19 @@ public class TelemetryCollector { private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(TelemetryCollector.class); + private final IDatabricksConnectionContext connectionContext; + // Per-statement latency tracking using StatementLatencyDetails private final ConcurrentHashMap statementTrackers = new ConcurrentHashMap<>(); /** * Package-private constructor - instances should only be created via TelemetryCollectorManager + * + * @param connectionContext the connection context this collector is associated with */ - TelemetryCollector() { - // Constructor for per-connection instances + TelemetryCollector(IDatabricksConnectionContext connectionContext) { + this.connectionContext = connectionContext; } /** @@ -77,6 +82,7 @@ public void recordOperationLatency(long latencyMillis, String methodName) { return; } TelemetryHelper.exportTelemetryLog( + connectionContext, new StatementTelemetryDetails(statementId) .recordOperationLatency(latencyMillis, operationType), TelemetryLogLevel.INFO); @@ -117,8 +123,8 @@ public void exportAllPendingTelemetryDetails() { LOGGER.trace(" {} pending telemetry details for telemetry export", statementTrackers.size()); statementTrackers.forEach( (statementId, statementTelemetryDetails) -> { - // Info log level is used to export the latency telemetry - TelemetryHelper.exportTelemetryLog(statementTelemetryDetails, TelemetryLogLevel.INFO); + TelemetryHelper.exportTelemetryLog( + connectionContext, statementTelemetryDetails, TelemetryLogLevel.INFO); }); statementTrackers.clear(); } @@ -144,6 +150,11 @@ void recordChunkIteration(String statementId, Long totalChunks) { .recordChunkIteration(totalChunks); } + @VisibleForTesting + public IDatabricksConnectionContext getConnectionContext() { + return connectionContext; + } + @VisibleForTesting boolean isCloseOperation(OperationType operationType) { return (operationType == OperationType.CLOSE_STATEMENT @@ -162,8 +173,8 @@ boolean isTelemetryCollected(String statementId) { */ private void exportTelemetryDetailsAndClear(String statementId) { StatementTelemetryDetails statementTelemetryDetails = statementTrackers.remove(statementId); - // Info log level is used to export the latency telemetry - TelemetryHelper.exportTelemetryLog(statementTelemetryDetails, TelemetryLogLevel.INFO); + TelemetryHelper.exportTelemetryLog( + connectionContext, statementTelemetryDetails, TelemetryLogLevel.INFO); } public void setResultFormat( diff --git a/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManager.java b/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManager.java index b0cf2946ec..179577ec75 100644 --- a/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManager.java +++ b/src/main/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManager.java @@ -35,7 +35,12 @@ public TelemetryCollector getOrCreateCollector(IDatabricksConnectionContext cont (context != null && context.getConnectionUuid() != null) ? context.getConnectionUuid() : DEFAULT_CONNECTION; - return collectors.computeIfAbsent(key, k -> new TelemetryCollector()); + // The collector stores the context it is first created with. Real connections always have a + // unique connectionUuid, so each gets its own collector holding its own context. Only + // context-less calls (null context/uuid) share the DEFAULT_CONNECTION collector; those events + // carry a null context and are skipped at export time, so the shared instance cannot + // misattribute telemetry across connections. + return collectors.computeIfAbsent(key, k -> new TelemetryCollector(context)); } /** diff --git a/src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java b/src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java index e9a3c73a30..d3c6b54c0a 100644 --- a/src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java +++ b/src/test/java/com/databricks/jdbc/telemetry/TelemetryHelperTest.java @@ -247,6 +247,45 @@ public void testTelemetryAllowedWithForceTelemetryFlag() { assertTrue(() -> isTelemetryAllowedForConnection(connectionContext)); } + @Test + void testExportTelemetryLogWithExplicitContext_UsesProvidedContext() { + IDatabricksConnectionContext explicitContext = mock(IDatabricksConnectionContext.class); + when(explicitContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.DEBUG); + when(explicitContext.getConnectionUuid()).thenReturn("explicit-uuid"); + when(explicitContext.getClientType()).thenReturn(DatabricksClientType.SEA); + + StatementTelemetryDetails details = + new StatementTelemetryDetails("stmt-explicit").setOperationLatencyMillis(10L); + + ITelemetryClient clientMock = mock(ITelemetryClient.class); + TelemetryClientFactory factoryMock = mock(TelemetryClientFactory.class); + + try (MockedStatic mocked = + Mockito.mockStatic(TelemetryClientFactory.class)) { + mocked.when(TelemetryClientFactory::getInstance).thenReturn(factoryMock); + when(factoryMock.getTelemetryClient(explicitContext)).thenReturn(clientMock); + + TelemetryHelper.exportTelemetryLog(explicitContext, details, TelemetryLogLevel.ERROR); + + // Verify the explicit context was used, NOT the ThreadLocal one + verify(factoryMock, times(1)).getTelemetryClient(explicitContext); + verify(factoryMock, never()).getTelemetryClient(connectionContext); + verify(clientMock, times(1)).exportEvent(any()); + } + } + + @Test + void testExportTelemetryLogWithExplicitContext_NullContextSkipsExport() { + StatementTelemetryDetails details = + new StatementTelemetryDetails("stmt-null-ctx").setOperationLatencyMillis(10L); + + try (MockedStatic mocked = + Mockito.mockStatic(TelemetryClientFactory.class)) { + TelemetryHelper.exportTelemetryLog(null, details, TelemetryLogLevel.ERROR); + mocked.verify(TelemetryClientFactory::getInstance, never()); + } + } + @Test void testExportTelemetryLog_EmitsWhenEventLevelLowerThanConfigured() { // Configured level: DEBUG (5); Event level: ERROR (2) -> should export diff --git a/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManagerTest.java b/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManagerTest.java index c05a25396c..9366a0fa42 100644 --- a/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManagerTest.java +++ b/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorManagerTest.java @@ -4,6 +4,7 @@ import static org.mockito.Mockito.*; import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; +import com.databricks.jdbc.common.TelemetryLogLevel; import com.databricks.jdbc.dbclient.impl.common.StatementId; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -27,9 +28,11 @@ void setUp() { // Create mock contexts with different UUIDs context1 = mock(IDatabricksConnectionContext.class); when(context1.getConnectionUuid()).thenReturn("connection-uuid-1"); + when(context1.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF); context2 = mock(IDatabricksConnectionContext.class); when(context2.getConnectionUuid()).thenReturn("connection-uuid-2"); + when(context2.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF); } @AfterEach @@ -227,6 +230,27 @@ void testExportAndRemoveOnConnectionClose() { assertNotSame(collector, newCollector, "After removal, should get a new collector instance"); } + @Test + void testCollectorStoresCorrectConnectionContext() { + TelemetryCollector collector1 = manager.getOrCreateCollector(context1); + TelemetryCollector collector2 = manager.getOrCreateCollector(context2); + + assertSame(context1, collector1.getConnectionContext(), "Collector 1 should store context1"); + assertSame(context2, collector2.getConnectionContext(), "Collector 2 should store context2"); + } + + @Test + void testCollectorContextNotAffectedBySubsequentConnectionCreation() { + TelemetryCollector collector1 = manager.getOrCreateCollector(context1); + // Creating a second collector should not affect the first collector's stored context + manager.getOrCreateCollector(context2); + + assertSame( + context1, + collector1.getConnectionContext(), + "Collector 1's context should remain context1 even after context2 collector is created"); + } + @Test void testMultipleConnectionsWithSameHostAreIsolated() { // Even if connections are to the same host, they should have separate collectors diff --git a/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorTest.java b/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorTest.java index 31a79efbcf..8de7185b53 100644 --- a/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorTest.java +++ b/src/test/java/com/databricks/jdbc/telemetry/latency/TelemetryCollectorTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; +import com.databricks.jdbc.api.internal.IDatabricksConnectionContext; import com.databricks.jdbc.common.TelemetryLogLevel; import com.databricks.jdbc.common.util.DatabricksThreadContextHolder; import com.databricks.jdbc.model.telemetry.StatementTelemetryDetails; @@ -18,12 +19,14 @@ import org.mockito.MockedStatic; public class TelemetryCollectorTest { - private final TelemetryCollector handler = new TelemetryCollector(); private static final String TEST_STATEMENT_ID = "test-statement-id"; + private final IDatabricksConnectionContext mockContext = mock(IDatabricksConnectionContext.class); + private final TelemetryCollector handler = new TelemetryCollector(mockContext); @BeforeEach void setUp() { DatabricksThreadContextHolder.setStatementId(TEST_STATEMENT_ID); + when(mockContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF); } @AfterEach @@ -81,10 +84,17 @@ void testRecordOperationLatency_WithCloseOperation() { mockedStatic.verify( () -> TelemetryHelper.exportTelemetryLog( - any(StatementTelemetryDetails.class), any(TelemetryLogLevel.class))); + any(IDatabricksConnectionContext.class), + any(StatementTelemetryDetails.class), + any(TelemetryLogLevel.class))); } } + @Test + void testCollectorStoresConnectionContext() { + assertSame(mockContext, handler.getConnectionContext()); + } + @ParameterizedTest @EnumSource(OperationType.class) void testIsCloseOperation(OperationType operationType) {