Skip to content

Stream CloudFetch result chunks to bound memory and prevent OOM on large results#1509

Open
msrathore-db wants to merge 5 commits into
mainfrom
fix/issue-1508-fetch-size
Open

Stream CloudFetch result chunks to bound memory and prevent OOM on large results#1509
msrathore-db wants to merge 5 commits into
mainfrom
fix/issue-1508-fetch-size

Conversation

@msrathore-db

@msrathore-db msrathore-db commented Jun 24, 2026

Copy link
Copy Markdown
Collaborator

Summary

Fixes #1508.

Downloading large query results via CloudFetch could exhaust the JVM heap and throw java.lang.OutOfMemoryError: Java heap space (surfaced as Failed to ready chunk / Download failed for chunk index 0), especially on smaller heaps.

Root cause

The OutOfMemoryError is on-heap. Two factors combined to make on-heap usage scale with the CloudFetch download concurrency (cloudFetchThreadPoolSize, default 16) rather than with available heap:

  1. Each chunk was decompressed by materializing the entire decompressed Arrow payload into an on-heap byte[] (several times the compressed size) before parsing.
  2. Chunk downloads were scheduled with no regard to size, so up to cloudFetchThreadPoolSize of those transient decompressed copies existed at once.

(The parsed Arrow vectors themselves are off-heap, so they were not the cause — the transient on-heap decompression buffers were.)

Fix

  1. Streamed decompression. Decompression is now streamed directly into the Arrow reader (DecompressionUtil.decompressToInputStream), so the decompressed payload is never materialized on-heap alongside the compressed bytes. Chunks are still downloaded, decompressed, and parsed in parallel ahead of consumption — throughput is unaffected.
  2. Bounded concurrent downloads. Chunks carry their byte size from the result manifest, and scheduling is gated on an in-memory byte budget (default: a fraction of the JVM max heap; overridable via the new cloudFetchMaxBytesInMemory connection property) in addition to the existing thread-pool limit. At least one chunk is always allowed so an oversized chunk cannot stall consumption; the budget is released as chunks are consumed.

Applies to both the SQL Execution (SEA) and Thrift result paths, and to RemoteChunkProvider and StreamingChunkProvider.

Testing

Verified against a SQL warehouse on a 26-column, 169,769-row table (results split into 8 CloudFetch chunks):

  • Memory: the unpatched driver OOMs at -Xmx128m; the patched driver streams the full result at -Xmx40m (peak ~38 MB) — roughly a 3x reduction in required heap.
  • No throughput regression: at -Xmx2g, the patched driver reads the same result in ~14-16 s vs ~22-28 s unpatched, with lower peak heap — streaming decompression removes the per-chunk full-copy allocation and its GC churn, and the byte budget does not throttle when heap is ample.
  • Correctness: a checksum over all 4.4M cells, ordered by a column, is byte-identical between the unpatched and patched driver across both the Thrift and SEA paths and across heap sizes from 64 MB to 1 GB — no truncation or corruption.
  • Unit tests: added coverage for the byte-budget gating (count vs. byte limit, always-allow-one, no-limit), per-chunk byte-size plumbing, streaming decompression, and the config default/override. The full jdbc-core unit suite passes; the only failing tests locally are pre-existing integration/e2e tests unrelated to this change (confirmed failing identically on main).

This pull request and its description were written by Isaac.

NO_CHANGELOG=true

Large query results downloaded via CloudFetch could exhaust the JVM heap
because up to cloudFetchThreadPoolSize chunks were downloaded and held in
memory concurrently, regardless of their size. On small heaps this caused
an OutOfMemoryError while readying the first chunk.

Track each chunk's byte size from the result manifest and gate chunk
scheduling on a configurable in-memory byte budget (default: a fraction of
the JVM max heap) in addition to the existing thread-pool limit. At least
one chunk is always allowed so an oversized chunk cannot stall consumption.

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
@msrathore-db msrathore-db changed the title Bound CloudFetch chunk downloads by an in-memory byte budget to prevent OOM Stream CloudFetch result chunks to bound memory and prevent OOM on large results Jun 25, 2026
@msrathore-db msrathore-db force-pushed the fix/issue-1508-fetch-size branch from 8e41b07 to 7c78368 Compare June 25, 2026 20:52
@msrathore-db msrathore-db reopened this Jun 25, 2026
…heap

Each chunk was decompressed by fully materializing the decompressed Arrow
payload into an on-heap byte[] (several times the compressed size) before
parsing. With multiple chunks downloading and decompressing in parallel,
these transient on-heap copies are what exhausted the Java heap on small
heaps.

Decompression is now streamed directly into the Arrow reader, so the
decompressed payload is never held on-heap alongside the compressed bytes
(the parsed vectors are off-heap). Chunks are still downloaded and parsed
in parallel ahead of consumption, so throughput is unaffected; combined
with the in-memory byte budget for concurrent downloads, peak heap for
large CloudFetch results drops substantially.

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
@msrathore-db msrathore-db force-pushed the fix/issue-1508-fetch-size branch from 7c78368 to f8063a4 Compare June 25, 2026 21:30
…size

Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>

# Conflicts:
#	NEXT_CHANGELOG.md
@msrathore-db msrathore-db force-pushed the fix/issue-1508-fetch-size branch from 8fca624 to 7c29f02 Compare June 25, 2026 23:02
&& totalChunksInMemory < allowedChunksInMemory) {
while (!isClosed && nextChunkToDownload < chunkCount) {
ArrowResultChunkV2 chunk = chunkIndexToChunksMap.get(nextChunkToDownload);
if (!canScheduleChunkDownload(chunk.getChunkSizeInBytes())) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it get attempted again?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When canScheduleChunkDownload(...) returns false we break without advancing nextChunkToDownload, so the chunk is deferred, not dropped. downloadNextChunks() is re-invoked from releaseChunk() every time a consumed chunk frees budget, and it retries the same nextChunkToDownload. The always-allow-one rule (totalChunksInMemory == 0) guarantees forward progress even if a single chunk is larger than the whole budget. I've added a comment at the break documenting this.

* Returns a stream that decompresses {@code compressedInput} lazily as it is read, so the full
* decompressed payload is never materialized alongside the compressed bytes.
*/
public static InputStream decompressToInputStream(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — #1470's decompressLazy and this decompressToInputStream are the same idea: wrap the compressed bytes in LZ4FrameInputStream and let the Arrow reader pull decompressed bytes lazily instead of materializing the full payload. Since #1470 is still open (and stacked on #1468), I didn't want to introduce a merge dependency here. Proposal: whichever lands first, the other consolidates onto the single shared helper — they're behaviourally identical so the merge is trivial. Happy to be the one to converge.

(Side note from testing: LZ4FrameInputStream already reads concatenated LZ4 frames on demand, so this also covers the >2 GB-decompressed case without a custom multi-frame reader.)

@gopalldb

Copy link
Copy Markdown
Collaborator

⚠️ [MAJOR] Byte budget silently inert when chunk sizes are unknown (0), disabling OOM protection
Line 331 | bug | logical_claude_agent | [1/3 Claude flagged]

In-memory byte budget is silently disabled whenever chunk sizes are unknown (0). chunkSizeInBytes defaults to 0 when the SEA manifest reports a null getByteCount() (ArrowResultChunk.java:222-223) or when the Thrift getBytesNum() is unset (line 256). With 0-sized chunks, totalBytesInMemory never grows, so canScheduleChunkDownload (AbstractRemoteChunkProvider.java:331-334) only ever enforces the count limit and the byte budget this PR adds is inactive. On exactly the large-result workloads where byte sizes may be missing, the heap bound silently does not apply, reintroducing the OOM the PR targets.

@gopalldb

Copy link
Copy Markdown
Collaborator

Mostly LG, some minor comments

Addresses PR #1509 review: the in-memory byte budget was silently inert
when the result manifest did not report chunk sizes. When SEA's
getByteCount() is null or Thrift's getBytesNum() is unset,
getChunkSizeInBytes() returns 0, so totalBytesInMemory never grew and
canScheduleChunkDownload only enforced the parallel-count limit --
disabling the OOM protection on exactly the large-result workloads it
targets.

Charge size-less chunks a conservative per-chunk estimate
(UNKNOWN_CHUNK_SIZE_ESTIMATE_BYTES) via effectiveChunkSizeInBytes(),
applied consistently when reserving budget (both RemoteChunkProvider and
RemoteChunkProviderV2) and releasing it (AbstractRemoteChunkProvider).
Also document that a chunk deferred for lack of budget is retried from
releaseChunk() once a consumed chunk frees space.

Co-authored-by: Isaac
Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
…size

Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
@msrathore-db

Copy link
Copy Markdown
Collaborator Author

Fixed. You're right: with chunkSizeInBytes == 0 (SEA getByteCount() null / Thrift getBytesNum() unset) totalBytesInMemory never grew, so canScheduleChunkDownload enforced only the count limit and the byte budget was inert.

I added effectiveChunkSizeInBytes() in AbstractRemoteChunkProvider, which charges a size-less chunk a conservative fallback estimate (UNKNOWN_CHUNK_SIZE_ESTIMATE_BYTES), applied symmetrically on reserve (both RemoteChunkProvider and RemoteChunkProviderV2) and release, so the budget stays active. Added testUnknownChunkSizeStillConsumesBudget, which asserts a size-less chunk still consumes budget (previously inert).

One honest caveat worth flagging: the budget charges the manifest's (compressed) byteCount, which under-counts the real per-chunk on-heap footprint (compressed buffer + Arrow parse transients). On narrow results this fix + streaming decompression comfortably resolves #1508 (verified: streams the repro table at -Xmx48m, where unpatched 3.x and even 2.7.5/2.8.1 OOM). On very wide/many-chunk results the default budget can still be too loose and rely on cloudFetchMaxBytesInMemory being tuned down. I think tightening the default (or charging a decompressed-size estimate) is worth a follow-up rather than expanding this PR — open to your preference.

@gopalldb

gopalldb commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator

📄 src/main/java/com/databricks/jdbc/common/util/DecompressionUtil.java

💡 [MINOR] Possibly unused decompressToStream after switch to decompressToInputStream
Line 60 | style | deadcode_claude_agent

Potentially dead method — This PR replaces the ArrowResultChunk call site from DecompressionUtil.decompressToStream(...) (removed line 111 in ArrowResultChunk.java) to the new DecompressionUtil.decompressToInputStream(...). If decompressToStream had no other callers, it is now unused.


📄 src/main/java/com/databricks/jdbc/api/impl/arrow/StreamingChunkProvider.java

⚠️ [MAJOR] Byte budget not applied to StreamingChunkProvider, leaving its OOM path open
Line 48 | bug | logical_claude_agent

PR's byte-budget bound is missing from StreamingChunkProvider. The PR description states the in-memory byte budget applies to both RemoteChunkProvider and StreamingChunkProvider, but StreamingChunkProvider (StreamingChunkProvider.java:48) implements ChunkProvider directly and does not extend AbstractRemoteChunkProvider, so it inherits none of the new gating (maxBytesInMemory, canScheduleChunkDownload, totalBytesInMemory). Its triggerDownloads() (lines 504-530) schedules downloads bounded only by the chunk count maxChunksInMemory (line 509). As a result, when this provider is selected (constructed in ArrowStreamResult.java:121/202), up to maxChunksInMemory large chunks can be downloaded and decompressed concurrently with no byte cap — the exact heap-exhaustion (OOM) scenario the PR set out to fix remains open for this path, and the PR description overstates coverage.


📄 src/main/java/com/databricks/jdbc/api/impl/arrow/incubator/RemoteChunkProviderV2.java

💡 [MINOR] In-memory budget charged before throwing link fetch in RemoteChunkProviderV2.downloadNextChunks leaks on failure
Line 123 | bug | logical_claude_agent

Byte/count budget charged before a throwing link fetch, leaked on error. In RemoteChunkProviderV2.downloadNextChunks(), totalChunksInMemory++ and the PR-added totalBytesInMemory += chunkSizeInBytes (lines 123-124) run before the linkDownloadService.getLinkForChunk(...).get() block (lines 125-142). That block re-throws DatabricksSQLException on ExecutionException/InterruptedException, exiting downloadNextChunks() before nextChunkToDownload++ (line 144). The reserved count and bytes are never reconciled on this path — a link-fetch failure leaves the budget over-charged for a chunk that was never scheduled. Unlike RemoteChunkProvider (lines 128-132), which increments only after a successful submit(), V2 reserves budget before the fallible operation. If scheduling continues after such a failure, the inflated totals can throttle or block further downloads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG]In 3.x versions fetch size is ignored resulting in out-of-memory when downloading query results

2 participants