Skip to content

Parquet: split RecordBatches when binary offsets would overflow#9369

Open
vigneshsiva11 wants to merge 2 commits intoapache:mainfrom
vigneshsiva11:parquet-split-batch-on-offset-overflow
Open

Parquet: split RecordBatches when binary offsets would overflow#9369
vigneshsiva11 wants to merge 2 commits intoapache:mainfrom
vigneshsiva11:parquet-split-batch-on-offset-overflow

Conversation

@vigneshsiva11
Copy link
Contributor

@vigneshsiva11 vigneshsiva11 commented Feb 6, 2026

Which issue does this PR close?

Rationale for this change

The Parquet Arrow reader currently attempts to decode up to batch_size rows into a single BinaryArray / StringArray. When the total variable-length data exceeds the maximum representable offset size, this causes offset overflow and a panic.

The reader should instead emit smaller batches when necessary, without requiring schema changes or reduced batch sizes from users.

What changes are included in this PR?

This PR updates the Parquet RecordBatch reader to stop decoding early when binary or string offsets would overflow, emit a partial RecordBatch, and continue reading remaining rows in subsequent batches. Existing behavior is unchanged for normal
cases.

Are these changes tested?

Yes. The behavior is covered by regression tests added earlier that reproduce the overflow scenario. All Parquet and Arrow reader tests pass.

Are there any user-facing changes?

No API changes. In rare cases with very large binary/string values, the reader may return smaller RecordBatches than the requested batch_size to avoid overflow.

Copilot AI review requested due to automatic review settings February 6, 2026 10:16
@github-actions github-actions bot added the parquet Changes to the parquet crate label Feb 6, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses binary/string offset overflow in the Parquet-to-Arrow reader by enabling the reader to emit smaller RecordBatches when variable-length data would cause offset overflow. The implementation modifies three code paths (Mask, Selectors, and All row selection cursors) to handle cases where read_records returns fewer rows than requested.

Changes:

  • Modified the Mask cursor path to detect and handle partial reads when offset overflow occurs
  • Updated the Selectors cursor path to break early on partial reads
  • Enhanced the All cursor path to return None immediately when no records are read

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

return Ok(Some(RecordBatch::from(struct_array)));
}

// Full read , safe to apply mask
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a spacing error in this comment: "Full read , safe" should be "Full read, safe" (remove the extra space before the comma).

Suggested change
// Full read , safe to apply mask
// Full read, safe to apply mask

Copilot uses AI. Check for mistakes.
let mask = mask_cursor.mask_values_for(&mask_chunk)?;
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), &mask)?;

Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed validation that checked filtered_batch.num_rows() == mask_chunk.selected_rows was an important consistency check. While this check needs to be skipped for partial reads, it should still be performed for full reads to catch data inconsistencies. Consider restoring this check after line 1419, within the full read path, to maintain data integrity validation.

Suggested change
// For full reads, ensure the mask and filtered output are consistent
if filtered_batch.num_rows() != mask_chunk.selected_rows {
return Err(general_err!(
"row filter inconsistency: expected {} rows, got {}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}

Copilot uses AI. Check for mistakes.
read_records += rec;

// stop early if we couldn't read everything requested
if rec < to_read {
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partial read occurs (rec < to_read), this code breaks and returns a partial batch. However, if line 1456 executed, we've already pushed remaining rows back onto the selectors_cursor via return_selector(). This means those rows are queued for the next read, but we haven't actually advanced the cursor properly for the rows we DID read. The cursor state may become inconsistent because:

  1. If to_read was less than front.row_count, we pushed back remaining rows
  2. But we only read rec rows, which may be less than to_read
  3. The cursor doesn't know about the (to_read - rec) rows that weren't consumed

Consider handling the partial read case by calculating how many rows were actually consumed and adjusting the selector cursor accordingly, possibly by calling return_selector with the unconsumed count.

Suggested change
if rec < to_read {
if rec < to_read {
let unconsumed = to_read - rec;
if unconsumed > 0 {
selectors_cursor.return_selector(RowSelector::select(unconsumed));
}

Copilot uses AI. Check for mistakes.
self.array_reader.read_records(batch_size)?;
let rec = self.array_reader.read_records(batch_size)?;
if rec == 0 {
return Ok(None);
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three cursor paths handle the case where read_records returns 0 inconsistently:

  • Mask path: Returns an error "reached end of column while expecting X rows" (lines 1397-1402)
  • Selectors path: Breaks the loop and proceeds to consume_batch (line 1462-1463)
  • All path: Returns Ok(None) immediately without calling consume_batch (lines 1476-1478)

This inconsistency could lead to different behavior in edge cases. Consider whether all three paths should handle EOF consistently, either by returning None or by proceeding to consume_batch to check if there's any buffered data. The early return in the All path might skip consuming any previously buffered data, although this may be intentional optimization.

Suggested change
return Ok(None);
break;

Copilot uses AI. Check for mistakes.
Comment on lines +1411 to +1416
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}

// Full read , safe to apply mask
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful explanatory comments were removed that described the high-level logic of the Mask cursor path. While the code changes are legitimate, removing documentation that explains "why" the code works a certain way reduces maintainability. Consider restoring or updating the comment that explains "The column reader exposes the projection as a struct array; convert this into a record batch before applying the boolean filter mask" near line 1405 where the conversion happens, as this helps future maintainers understand the data flow.

Suggested change
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read , safe to apply mask
// The column reader exposes the projected columns as a single StructArray.
// Convert this struct array into a RecordBatch before applying the boolean
// filter mask so we can use the standard RecordBatch-level filter utilities.
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read, safe to apply mask

Copilot uses AI. Check for mistakes.
Comment on lines +1411 to +1416
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}

// Full read , safe to apply mask
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a partial read occurs (read < mask_chunk.chunk_rows), this code returns the unfiltered struct_array without applying the boolean mask. This means rows that should have been filtered out according to the row selection mask will be incorrectly included in the returned RecordBatch.

The mask_cursor expects that mask_chunk.chunk_rows were read, but only 'read' rows were actually decoded. The mask should still be applied, but it needs to be sized/sliced appropriately for the partial read, or the mask_cursor state needs to be adjusted to reflect only the rows that were actually read.

Suggested change
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read , safe to apply mask
// Always apply the selection mask; for partial reads, slice it down to `read`
if read < mask_chunk.chunk_rows {
let full_mask = mask_cursor.mask_values_for(&mask_chunk)?;
let sliced_mask = full_mask.slice(0, read);
let sliced_mask = sliced_mask.as_boolean();
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?;
if filtered_batch.num_rows() == 0 {
continue;
}
return Ok(Some(filtered_batch));
}
// Full read, apply mask directly

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Error when reading row group larger than 2GB (total string length per 8k row batch exceeds 2GB)

1 participant