Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,19 @@ public double getCloudFetchSpeedThreshold() {
return Double.parseDouble(getParameter(DatabricksJdbcUrlParams.CLOUD_FETCH_SPEED_THRESHOLD));
}

/** Fraction of the JVM max heap used as the default CloudFetch in-memory budget. */
private static final double CLOUD_FETCH_HEAP_FRACTION = 0.2;

@Override
public long getCloudFetchMaxBytesInMemory() {
long configured =
Long.parseLong(getParameter(DatabricksJdbcUrlParams.CLOUD_FETCH_MAX_BYTES_IN_MEMORY));
if (configured > 0) {
return configured;
}
return (long) (Runtime.getRuntime().maxMemory() * CLOUD_FETCH_HEAP_FRACTION);
}

@Override
public String getCatalog() {
return getParameter(DatabricksJdbcUrlParams.CONN_CATALOG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public abstract class AbstractArrowResultChunk {
protected final long numRows;
protected final long rowOffset;
protected final long chunkIndex;
protected final long chunkSizeInBytes;
protected final StatementId statementId;
protected final BufferAllocator rootAllocator;

Expand Down Expand Up @@ -89,6 +90,7 @@ protected AbstractArrowResultChunk(
long numRows,
long rowOffset,
long chunkIndex,
long chunkSizeInBytes,
StatementId statementId,
ChunkStatus initialStatus,
ExternalLink chunkLink,
Expand All @@ -97,6 +99,7 @@ protected AbstractArrowResultChunk(
this.numRows = numRows;
this.rowOffset = rowOffset;
this.chunkIndex = chunkIndex;
this.chunkSizeInBytes = chunkSizeInBytes;
this.statementId = statementId;
this.rootAllocator = ArrowBufferAllocator.getBufferAllocator();
this.chunkReadyFuture = new CompletableFuture<>();
Expand All @@ -115,6 +118,15 @@ public Long getChunkIndex() {
return chunkIndex;
}

/**
* Returns the size of this chunk in bytes, as reported by the result manifest, or 0 if unknown.
*
* @return chunk size in bytes
*/
public long getChunkSizeInBytes() {
return chunkSizeInBytes;
}

/**
* Returns the start row offset of this chunk in the overall result set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public abstract class AbstractRemoteChunkProvider<T extends AbstractArrowResultC
protected long nextChunkToDownload;
protected long totalChunksInMemory;
protected long allowedChunksInMemory;
protected long totalBytesInMemory;
protected final long maxBytesInMemory;
protected boolean isClosed;

/** Maximum number of parallel chunk downloads allowed per query. */
Expand All @@ -70,6 +72,7 @@ protected AbstractRemoteChunkProvider(
throws DatabricksSQLException {
this.chunkReadyTimeoutSeconds = session.getConnectionContext().getChunkReadyTimeoutSeconds();
this.maxParallelChunkDownloadsPerQuery = maxParallelChunkDownloadsPerQuery;
this.maxBytesInMemory = session.getConnectionContext().getCloudFetchMaxBytesInMemory();
this.session = session;
this.httpClient = httpClient;
this.statementId = statementId;
Expand Down Expand Up @@ -98,6 +101,7 @@ protected AbstractRemoteChunkProvider(
throws SQLException {
this.chunkReadyTimeoutSeconds = session.getConnectionContext().getChunkReadyTimeoutSeconds();
this.maxParallelChunkDownloadsPerQuery = maxParallelChunkDownloadsPerQuery;
this.maxBytesInMemory = session.getConnectionContext().getCloudFetchMaxBytesInMemory();
this.session = session;
this.httpClient = httpClient;
this.statementId = parentStatement.getStatementId();
Expand Down Expand Up @@ -240,6 +244,7 @@ private void initializeData() throws DatabricksSQLException {
currentChunkIndex = -1L;
// We don't have any chunk in downloaded yet
totalChunksInMemory = 0L;
totalBytesInMemory = 0L;
// Number of worker threads are directly linked to allowed chunks in memory
allowedChunksInMemory = Math.min(maxParallelChunkDownloadsPerQuery, chunkCount);
// The first link is available
Expand Down Expand Up @@ -304,11 +309,50 @@ private void populateChunkIndexMap(TRowSet resultData, ConcurrentMap<Long, T> ch
}
}

/**
* Conservative per-chunk byte size charged against the in-memory budget when the result manifest
* does not report a chunk's size (SEA {@code getByteCount()} null or Thrift {@code getBytesNum()}
* unset, surfacing as {@link AbstractArrowResultChunk#getChunkSizeInBytes()} == 0). Without this
* fallback the budget accounting would never grow for size-less chunks, silently disabling the
* byte budget and degrading to the count limit alone — exactly on the large-result workloads this
* budget is meant to protect.
*/
static final long UNKNOWN_CHUNK_SIZE_ESTIMATE_BYTES = 16 * 1024 * 1024L;

/**
* Returns the byte cost charged to a chunk for in-memory budgeting: the manifest-reported size
* when known, otherwise {@link #UNKNOWN_CHUNK_SIZE_ESTIMATE_BYTES}. The same value must be used
* when reserving budget (scheduling) and releasing it (consumption) so accounting stays balanced.
*/
protected long effectiveChunkSizeInBytes(long declaredChunkSizeInBytes) {
return declaredChunkSizeInBytes > 0
? declaredChunkSizeInBytes
: UNKNOWN_CHUNK_SIZE_ESTIMATE_BYTES;
}

/** Release the memory for previous chunk since it is already consumed */
private void releaseChunk() throws DatabricksSQLException {
if (chunkIndexToChunksMap.get(currentChunkIndex).releaseChunk()) {
T chunk = chunkIndexToChunksMap.get(currentChunkIndex);
if (chunk.releaseChunk()) {
totalChunksInMemory--;
totalBytesInMemory -= effectiveChunkSizeInBytes(chunk.getChunkSizeInBytes());
downloadNextChunks();
}
}

/**
* Returns whether a chunk of the given (effective) size can be scheduled for download without
* breaching the count or in-memory byte budgets. Callers must pass the value from {@link
* #effectiveChunkSizeInBytes(long)} so that size-less chunks still consume budget. At least one
* chunk is always allowed so a single oversized chunk cannot stall consumption.
*/
protected boolean canScheduleChunkDownload(long chunkSizeInBytes) {
if (totalChunksInMemory >= allowedChunksInMemory) {
return false;
}
if (maxBytesInMemory <= 0 || totalChunksInMemory == 0) {
return true;
}
return totalBytesInMemory + chunkSizeInBytes <= maxBytesInMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private ArrowResultChunk(Builder builder) throws DatabricksParsingException {
builder.numRows,
builder.rowOffset,
builder.chunkIndex,
builder.chunkSizeInBytes,
builder.statementId,
builder.status,
builder.chunkLink,
Expand Down Expand Up @@ -101,15 +102,16 @@ protected void downloadData(
chunkLink.getExternalLink(),
speedThreshold);

// Decompress (if needed) and parse
// Decompress and parse. The decompression is streamed straight into the Arrow reader so the
// full decompressed payload is never materialized on-heap alongside the compressed bytes.
long decompressStart = System.nanoTime();
try {
String ctx =
String.format(
"Data decompression for chunk index [%d] and statement [%s]",
this.chunkIndex, this.statementId);
InputStream data = DecompressionUtil.decompressToStream(compressed, compressionCodec, ctx);
initializeData(data);
initializeData(
DecompressionUtil.decompressToInputStream(compressed, compressionCodec, ctx));
} catch (Exception e) {
handleFailure(e, ChunkStatus.PROCESSING_FAILED);
}
Expand Down Expand Up @@ -193,6 +195,7 @@ public static class Builder {
private long chunkIndex;
private long numRows;
private long rowOffset;
private long chunkSizeInBytes;
private ExternalLink chunkLink;
private StatementId statementId;
private Instant expiryTime;
Expand All @@ -216,6 +219,8 @@ public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
this.chunkIndex = baseChunkInfo.getChunkIndex();
this.numRows = baseChunkInfo.getRowCount();
this.rowOffset = baseChunkInfo.getRowOffset();
this.chunkSizeInBytes =
baseChunkInfo.getByteCount() != null ? baseChunkInfo.getByteCount() : 0L;
this.status = status == null ? ChunkStatus.PENDING : status;
return this;
}
Expand Down Expand Up @@ -248,6 +253,7 @@ public Builder withThriftChunkInfo(long chunkIndex, TSparkArrowResultLink chunkI
this.chunkIndex = chunkIndex;
this.numRows = chunkInfo.getRowCount();
this.rowOffset = chunkInfo.getStartRowOffset();
this.chunkSizeInBytes = chunkInfo.getBytesNum();
this.expiryTime = Instant.ofEpochMilli(chunkInfo.getExpiryTime());
this.status =
status == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ protected ArrowResultChunk createChunk(
* <ul>
* <li>The provider is not closed
* <li>There are more chunks available to download
* <li>The number of chunks in memory is below the allowed limit
* <li>The next chunk fits within the count and in-memory byte budgets (see {@link
* #canScheduleChunkDownload(long)})
* </ul>
* <li>Tracks the total chunks in memory and the next chunk to download
* </ul>
Expand All @@ -115,13 +116,19 @@ public void downloadNextChunks() {
chunkDownloaderExecutorService = createChunksDownloaderExecutorService();
}

while (!isClosed
&& nextChunkToDownload < chunkCount
&& totalChunksInMemory < allowedChunksInMemory) {
while (!isClosed && nextChunkToDownload < chunkCount) {
ArrowResultChunk chunk = chunkIndexToChunksMap.get(nextChunkToDownload);
long chunkSizeInBytes = effectiveChunkSizeInBytes(chunk.getChunkSizeInBytes());
if (!canScheduleChunkDownload(chunkSizeInBytes)) {
// Budget is full; leave nextChunkToDownload unadvanced so this chunk is retried the next
// time downloadNextChunks() runs (invoked from releaseChunk() once a consumed chunk frees
// budget). The always-allow-one rule in canScheduleChunkDownload guarantees progress.
break;
}
chunkDownloaderExecutorService.submit(
new ChunkDownloadTask(chunk, httpClient, this, linkDownloadService));
totalChunksInMemory++;
totalBytesInMemory += chunkSizeInBytes;
nextChunkToDownload++;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private ArrowResultChunkV2(Builder builder) {
builder.numRows,
builder.rowOffset,
builder.chunkIndex,
builder.chunkSizeInBytes,
builder.statementId,
builder.status,
builder.chunkLink,
Expand Down Expand Up @@ -270,6 +271,7 @@ public static class Builder {
private long chunkIndex;
private long numRows;
private long rowOffset;
private long chunkSizeInBytes;
private ExternalLink chunkLink;
private StatementId statementId;
private Instant expiryTime;
Expand All @@ -286,6 +288,8 @@ public Builder withChunkInfo(BaseChunkInfo baseChunkInfo) {
this.chunkIndex = baseChunkInfo.getChunkIndex();
this.numRows = baseChunkInfo.getRowCount();
this.rowOffset = baseChunkInfo.getRowOffset();
this.chunkSizeInBytes =
baseChunkInfo.getByteCount() != null ? baseChunkInfo.getByteCount() : 0L;
this.status = ChunkStatus.PENDING;
return this;
}
Expand All @@ -294,6 +298,7 @@ public Builder withThriftChunkInfo(long chunkIndex, TSparkArrowResultLink chunkI
this.chunkIndex = chunkIndex;
this.numRows = chunkInfo.getRowCount();
this.rowOffset = chunkInfo.getStartRowOffset();
this.chunkSizeInBytes = chunkInfo.getBytesNum();
this.expiryTime = Instant.ofEpochMilli(chunkInfo.getExpiryTime());
this.status = ChunkStatus.URL_FETCHED; // URL has always been fetched in case of thrift
this.chunkLink = createExternalLink(chunkInfo, chunkIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,17 @@ protected ArrowResultChunkV2 createChunk(
*/
@Override
public void downloadNextChunks() throws DatabricksSQLException {
while (!isClosed
&& nextChunkToDownload < chunkCount
&& totalChunksInMemory < allowedChunksInMemory) {
while (!isClosed && nextChunkToDownload < chunkCount) {
ArrowResultChunkV2 chunk = chunkIndexToChunksMap.get(nextChunkToDownload);
long chunkSizeInBytes = effectiveChunkSizeInBytes(chunk.getChunkSizeInBytes());
if (!canScheduleChunkDownload(chunkSizeInBytes)) {
// Budget is full; leave nextChunkToDownload unadvanced so this chunk is retried the next
// time downloadNextChunks() runs (invoked from releaseChunk() once a consumed chunk frees
// budget). The always-allow-one rule in canScheduleChunkDownload guarantees progress.
break;
}
totalChunksInMemory++;
totalBytesInMemory += chunkSizeInBytes;
if (chunk.isChunkLinkInvalid()) {
try {
ExternalLink link =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ public interface IDatabricksConnectionContext {
/** Returns the minimum expected download speed threshold in MB/s for CloudFetch operations */
double getCloudFetchSpeedThreshold();

/**
* Returns the per-result-set budget, in bytes, for result chunks buffered in memory at once
* during CloudFetch downloads. The budget is compared against the compressed chunk sizes reported
* by the result manifest, so the default (derived from the JVM max heap when the configured value
* is non-positive) is intentionally conservative. Bounds peak memory in addition to the chunk
* download thread-pool limit.
*/
long getCloudFetchMaxBytesInMemory();

Boolean getDirectResultMode();

Boolean shouldRetryTemporarilyUnavailableError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public enum DatabricksJdbcUrlParams {
IDLE_HTTP_CONNECTION_EXPIRY("IdleHttpConnectionExpiry", "Idle HTTP connection expiry", "60"),
SUPPORT_MANY_PARAMETERS("supportManyParameters", "Support many parameters", "0"),
CLOUD_FETCH_THREAD_POOL_SIZE("cloudFetchThreadPoolSize", "Cloud fetch thread pool size", "16"),
CLOUD_FETCH_MAX_BYTES_IN_MEMORY(
"cloudFetchMaxBytesInMemory",
"Maximum bytes of result chunks buffered in memory at once (0 = derive from heap)",
"0"),
OAUTH_ENDPOINT("OAuth2ConnAuthAuthorizeEndpoint", "OAuth2 authorization endpoint"),
AUTH_ENDPOINT(
"OAuth2AuthorizationEndPoint", "OAuth2 authorization endpoint"), // Same as OAUTH_ENDPOINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ public static InputStream decompressToStream(
return new ByteArrayInputStream(uncompressed);
}

/**
* Returns a stream that decompresses {@code compressedInput} lazily as it is read, so the full
* decompressed payload is never materialized alongside the compressed bytes.
*/
public static InputStream decompressToInputStream(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — #1470's decompressLazy and this decompressToInputStream are the same idea: wrap the compressed bytes in LZ4FrameInputStream and let the Arrow reader pull decompressed bytes lazily instead of materializing the full payload. Since #1470 is still open (and stacked on #1468), I didn't want to introduce a merge dependency here. Proposal: whichever lands first, the other consolidates onto the single shared helper — they're behaviourally identical so the merge is trivial. Happy to be the one to converge.

(Side note from testing: LZ4FrameInputStream already reads concatenated LZ4 frames on demand, so this also covers the >2 GB-decompressed case without a custom multi-frame reader.)

byte[] compressedInput, CompressionCodec compressionCodec, String context)
throws DatabricksSQLException {
if (compressionCodec == null
|| compressedInput == null
|| compressionCodec == CompressionCodec.NONE) {
return new ByteArrayInputStream(compressedInput);
}
if (compressionCodec == CompressionCodec.LZ4_FRAME) {
try {
return new LZ4FrameInputStream(new ByteArrayInputStream(compressedInput));
} catch (IOException e) {
String errorMessage =
String.format("Unable to de-compress LZ4 Frame compressed result %s", context);
LOGGER.error(e, errorMessage);
throw new DatabricksParsingException(
errorMessage, e, DatabricksDriverErrorCode.DECOMPRESSION_ERROR);
}
}
String errorMessage =
String.format("Unknown compression type: %s. Context : %s", compressionCodec, context);
LOGGER.error(errorMessage);
throw new DatabricksSQLException(errorMessage, DatabricksDriverErrorCode.DECOMPRESSION_ERROR);
}

public static InputStream decompress(
InputStream compressedStream, CompressionCodec compressionCodec, String context)
throws IOException, DatabricksSQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,21 @@ public void testEnableCloudFetch() throws DatabricksSQLException {
assertTrue(connectionContext.shouldEnableArrow());
}

@Test
public void testCloudFetchMaxBytesInMemory() throws DatabricksSQLException {
// Explicit configured value is honored
IDatabricksConnectionContext ctx =
DatabricksConnectionContext.parse(
TestConstants.VALID_URL_1 + ";cloudFetchMaxBytesInMemory=12345", properties);
assertEquals(12345L, ctx.getCloudFetchMaxBytesInMemory());

// Default (0) derives a positive budget bounded by the JVM max heap
IDatabricksConnectionContext defaultCtx =
DatabricksConnectionContext.parse(TestConstants.VALID_URL_1, properties);
assertTrue(defaultCtx.getCloudFetchMaxBytesInMemory() > 0);
assertTrue(defaultCtx.getCloudFetchMaxBytesInMemory() <= Runtime.getRuntime().maxMemory());
}

@Test
public void testShouldEnableArrow_defaultIsTrue() throws DatabricksSQLException {
// On non-AIX, Arrow is always enabled regardless of EnableArrow setting
Expand Down
Loading
Loading