Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Bumped the Databricks SDK for Java dependency from `0.106.0` to `0.118.0`.

### Fixed
- Fixed telemetry misattribution when multiple connections (e.g. Thrift and SEA) are used on the same thread. Per-statement telemetry events could be tagged with another connection's context (e.g. transport mode); each connection's telemetry now uses its own context instead of a shared thread-local value.
- Hardened the OAuth U2M token cache at rest (encryption key derivation and file permissions).
- Fixed `DatabaseMetaData.getURL()` exposing credentials embedded in the connection URL; secret parameters are now masked (the URL is otherwise unchanged).
- Fixed presigned URL credentials not being fully redacted in logs.
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/databricks/jdbc/telemetry/TelemetryHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ public static boolean isTelemetryAllowedForConnection(IDatabricksConnectionConte
.isFeatureEnabled(TELEMETRY_FEATURE_FLAG_NAME);
}

public static void exportTelemetryLog(
IDatabricksConnectionContext connectionContext,
StatementTelemetryDetails telemetryDetails,
TelemetryLogLevel logLevel) {
exportTelemetryEvent(connectionContext, telemetryDetails, null, null, logLevel);
}

/**
* @deprecated Use {@link #exportTelemetryLog(IDatabricksConnectionContext,
* StatementTelemetryDetails, TelemetryLogLevel)} instead to avoid stale ThreadLocal context.
*/
@Deprecated
public static void exportTelemetryLog(
StatementTelemetryDetails telemetryDetails, TelemetryLogLevel logLevel) {
exportTelemetryEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.databricks.jdbc.telemetry.TelemetryHelper.getStatementIdString;

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.api.internal.IDatabricksStatementInternal;
import com.databricks.jdbc.common.TelemetryLogLevel;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
Expand All @@ -25,15 +26,19 @@ public class TelemetryCollector {

private static final JdbcLogger LOGGER = JdbcLoggerFactory.getLogger(TelemetryCollector.class);

private final IDatabricksConnectionContext connectionContext;

// Per-statement latency tracking using StatementLatencyDetails
private final ConcurrentHashMap<String, StatementTelemetryDetails> statementTrackers =
new ConcurrentHashMap<>();

/**
* Package-private constructor - instances should only be created via TelemetryCollectorManager
*
* @param connectionContext the connection context this collector is associated with
*/
TelemetryCollector() {
// Constructor for per-connection instances
TelemetryCollector(IDatabricksConnectionContext connectionContext) {
this.connectionContext = connectionContext;
}

/**
Expand Down Expand Up @@ -77,6 +82,7 @@ public void recordOperationLatency(long latencyMillis, String methodName) {
return;
}
TelemetryHelper.exportTelemetryLog(
connectionContext,
new StatementTelemetryDetails(statementId)
.recordOperationLatency(latencyMillis, operationType),
TelemetryLogLevel.INFO);
Expand Down Expand Up @@ -117,8 +123,8 @@ public void exportAllPendingTelemetryDetails() {
LOGGER.trace(" {} pending telemetry details for telemetry export", statementTrackers.size());
statementTrackers.forEach(
(statementId, statementTelemetryDetails) -> {
// Info log level is used to export the latency telemetry
TelemetryHelper.exportTelemetryLog(statementTelemetryDetails, TelemetryLogLevel.INFO);
TelemetryHelper.exportTelemetryLog(
connectionContext, statementTelemetryDetails, TelemetryLogLevel.INFO);
});
statementTrackers.clear();
}
Expand All @@ -144,6 +150,11 @@ void recordChunkIteration(String statementId, Long totalChunks) {
.recordChunkIteration(totalChunks);
}

@VisibleForTesting
public IDatabricksConnectionContext getConnectionContext() {
return connectionContext;
}

@VisibleForTesting
boolean isCloseOperation(OperationType operationType) {
return (operationType == OperationType.CLOSE_STATEMENT
Expand All @@ -162,8 +173,8 @@ boolean isTelemetryCollected(String statementId) {
*/
private void exportTelemetryDetailsAndClear(String statementId) {
StatementTelemetryDetails statementTelemetryDetails = statementTrackers.remove(statementId);
// Info log level is used to export the latency telemetry
TelemetryHelper.exportTelemetryLog(statementTelemetryDetails, TelemetryLogLevel.INFO);
TelemetryHelper.exportTelemetryLog(
connectionContext, statementTelemetryDetails, TelemetryLogLevel.INFO);
}

public void setResultFormat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ public TelemetryCollector getOrCreateCollector(IDatabricksConnectionContext cont
(context != null && context.getConnectionUuid() != null)
? context.getConnectionUuid()
: DEFAULT_CONNECTION;
return collectors.computeIfAbsent(key, k -> new TelemetryCollector());
// The collector stores the context it is first created with. Real connections always have a
// unique connectionUuid, so each gets its own collector holding its own context. Only
// context-less calls (null context/uuid) share the DEFAULT_CONNECTION collector; those events
// carry a null context and are skipped at export time, so the shared instance cannot
// misattribute telemetry across connections.
return collectors.computeIfAbsent(key, k -> new TelemetryCollector(context));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,45 @@ public void testTelemetryAllowedWithForceTelemetryFlag() {
assertTrue(() -> isTelemetryAllowedForConnection(connectionContext));
}

@Test
void testExportTelemetryLogWithExplicitContext_UsesProvidedContext() {
IDatabricksConnectionContext explicitContext = mock(IDatabricksConnectionContext.class);
when(explicitContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.DEBUG);
when(explicitContext.getConnectionUuid()).thenReturn("explicit-uuid");
when(explicitContext.getClientType()).thenReturn(DatabricksClientType.SEA);

StatementTelemetryDetails details =
new StatementTelemetryDetails("stmt-explicit").setOperationLatencyMillis(10L);

ITelemetryClient clientMock = mock(ITelemetryClient.class);
TelemetryClientFactory factoryMock = mock(TelemetryClientFactory.class);

try (MockedStatic<TelemetryClientFactory> mocked =
Mockito.mockStatic(TelemetryClientFactory.class)) {
mocked.when(TelemetryClientFactory::getInstance).thenReturn(factoryMock);
when(factoryMock.getTelemetryClient(explicitContext)).thenReturn(clientMock);

TelemetryHelper.exportTelemetryLog(explicitContext, details, TelemetryLogLevel.ERROR);

// Verify the explicit context was used, NOT the ThreadLocal one
verify(factoryMock, times(1)).getTelemetryClient(explicitContext);
verify(factoryMock, never()).getTelemetryClient(connectionContext);
verify(clientMock, times(1)).exportEvent(any());
}
}

@Test
void testExportTelemetryLogWithExplicitContext_NullContextSkipsExport() {
StatementTelemetryDetails details =
new StatementTelemetryDetails("stmt-null-ctx").setOperationLatencyMillis(10L);

try (MockedStatic<TelemetryClientFactory> mocked =
Mockito.mockStatic(TelemetryClientFactory.class)) {
TelemetryHelper.exportTelemetryLog(null, details, TelemetryLogLevel.ERROR);
mocked.verify(TelemetryClientFactory::getInstance, never());
}
}

@Test
void testExportTelemetryLog_EmitsWhenEventLevelLowerThanConfigured() {
// Configured level: DEBUG (5); Event level: ERROR (2) -> should export
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.mockito.Mockito.*;

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.TelemetryLogLevel;
import com.databricks.jdbc.dbclient.impl.common.StatementId;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -27,9 +28,11 @@ void setUp() {
// Create mock contexts with different UUIDs
context1 = mock(IDatabricksConnectionContext.class);
when(context1.getConnectionUuid()).thenReturn("connection-uuid-1");
when(context1.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF);

context2 = mock(IDatabricksConnectionContext.class);
when(context2.getConnectionUuid()).thenReturn("connection-uuid-2");
when(context2.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF);
}

@AfterEach
Expand Down Expand Up @@ -227,6 +230,27 @@ void testExportAndRemoveOnConnectionClose() {
assertNotSame(collector, newCollector, "After removal, should get a new collector instance");
}

@Test
void testCollectorStoresCorrectConnectionContext() {
TelemetryCollector collector1 = manager.getOrCreateCollector(context1);
TelemetryCollector collector2 = manager.getOrCreateCollector(context2);

assertSame(context1, collector1.getConnectionContext(), "Collector 1 should store context1");
assertSame(context2, collector2.getConnectionContext(), "Collector 2 should store context2");
}

@Test
void testCollectorContextNotAffectedBySubsequentConnectionCreation() {
TelemetryCollector collector1 = manager.getOrCreateCollector(context1);
// Creating a second collector should not affect the first collector's stored context
manager.getOrCreateCollector(context2);

assertSame(
context1,
collector1.getConnectionContext(),
"Collector 1's context should remain context1 even after context2 collector is created");
}

@Test
void testMultipleConnectionsWithSameHostAreIsolated() {
// Even if connections are to the same host, they should have separate collectors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import com.databricks.jdbc.api.internal.IDatabricksConnectionContext;
import com.databricks.jdbc.common.TelemetryLogLevel;
import com.databricks.jdbc.common.util.DatabricksThreadContextHolder;
import com.databricks.jdbc.model.telemetry.StatementTelemetryDetails;
Expand All @@ -18,12 +19,14 @@
import org.mockito.MockedStatic;

public class TelemetryCollectorTest {
private final TelemetryCollector handler = new TelemetryCollector();
private static final String TEST_STATEMENT_ID = "test-statement-id";
private final IDatabricksConnectionContext mockContext = mock(IDatabricksConnectionContext.class);
private final TelemetryCollector handler = new TelemetryCollector(mockContext);

@BeforeEach
void setUp() {
DatabricksThreadContextHolder.setStatementId(TEST_STATEMENT_ID);
when(mockContext.getTelemetryLogLevel()).thenReturn(TelemetryLogLevel.OFF);
}

@AfterEach
Expand Down Expand Up @@ -81,10 +84,17 @@ void testRecordOperationLatency_WithCloseOperation() {
mockedStatic.verify(
() ->
TelemetryHelper.exportTelemetryLog(
any(StatementTelemetryDetails.class), any(TelemetryLogLevel.class)));
any(IDatabricksConnectionContext.class),
any(StatementTelemetryDetails.class),
any(TelemetryLogLevel.class)));
}
}

@Test
void testCollectorStoresConnectionContext() {
assertSame(mockContext, handler.getConnectionContext());
}

@ParameterizedTest
@EnumSource(OperationType.class)
void testIsCloseOperation(OperationType operationType) {
Expand Down
Loading