Parquet: split RecordBatches when binary offsets would overflow#9369
Parquet: split RecordBatches when binary offsets would overflow#9369vigneshsiva11 wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses binary/string offset overflow in the Parquet-to-Arrow reader by enabling the reader to emit smaller RecordBatches when variable-length data would cause offset overflow. The implementation modifies three code paths (Mask, Selectors, and All row selection cursors) to handle cases where read_records returns fewer rows than requested.
Changes:
- Modified the Mask cursor path to detect and handle partial reads when offset overflow occurs
- Updated the Selectors cursor path to break early on partial reads
- Enhanced the All cursor path to return None immediately when no records are read
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return Ok(Some(RecordBatch::from(struct_array))); | ||
| } | ||
|
|
||
| // Full read , safe to apply mask |
There was a problem hiding this comment.
There's a spacing error in this comment: "Full read , safe" should be "Full read, safe" (remove the extra space before the comma).
| // Full read , safe to apply mask | |
| // Full read, safe to apply mask |
| let mask = mask_cursor.mask_values_for(&mask_chunk)?; | ||
| let filtered_batch = | ||
| filter_record_batch(&RecordBatch::from(struct_array), &mask)?; | ||
|
|
There was a problem hiding this comment.
The removed validation that checked filtered_batch.num_rows() == mask_chunk.selected_rows was an important consistency check. While this check needs to be skipped for partial reads, it should still be performed for full reads to catch data inconsistencies. Consider restoring this check after line 1419, within the full read path, to maintain data integrity validation.
| // For full reads, ensure the mask and filtered output are consistent | |
| if filtered_batch.num_rows() != mask_chunk.selected_rows { | |
| return Err(general_err!( | |
| "row filter inconsistency: expected {} rows, got {}", | |
| mask_chunk.selected_rows, | |
| filtered_batch.num_rows() | |
| )); | |
| } |
| read_records += rec; | ||
|
|
||
| // stop early if we couldn't read everything requested | ||
| if rec < to_read { |
There was a problem hiding this comment.
When a partial read occurs (rec < to_read), this code breaks and returns a partial batch. However, if line 1456 executed, we've already pushed remaining rows back onto the selectors_cursor via return_selector(). This means those rows are queued for the next read, but we haven't actually advanced the cursor properly for the rows we DID read. The cursor state may become inconsistent because:
- If to_read was less than front.row_count, we pushed back
remainingrows - But we only read
recrows, which may be less thanto_read - The cursor doesn't know about the (to_read - rec) rows that weren't consumed
Consider handling the partial read case by calculating how many rows were actually consumed and adjusting the selector cursor accordingly, possibly by calling return_selector with the unconsumed count.
| if rec < to_read { | |
| if rec < to_read { | |
| let unconsumed = to_read - rec; | |
| if unconsumed > 0 { | |
| selectors_cursor.return_selector(RowSelector::select(unconsumed)); | |
| } |
| self.array_reader.read_records(batch_size)?; | ||
| let rec = self.array_reader.read_records(batch_size)?; | ||
| if rec == 0 { | ||
| return Ok(None); |
There was a problem hiding this comment.
The three cursor paths handle the case where read_records returns 0 inconsistently:
- Mask path: Returns an error "reached end of column while expecting X rows" (lines 1397-1402)
- Selectors path: Breaks the loop and proceeds to consume_batch (line 1462-1463)
- All path: Returns Ok(None) immediately without calling consume_batch (lines 1476-1478)
This inconsistency could lead to different behavior in edge cases. Consider whether all three paths should handle EOF consistently, either by returning None or by proceeding to consume_batch to check if there's any buffered data. The early return in the All path might skip consuming any previously buffered data, although this may be intentional optimization.
| return Ok(None); | |
| break; |
| // Key Change: partial read → emit immediately, no mask | ||
| if read < mask_chunk.chunk_rows { | ||
| return Ok(Some(RecordBatch::from(struct_array))); | ||
| } | ||
|
|
||
| // Full read , safe to apply mask |
There was a problem hiding this comment.
Useful explanatory comments were removed that described the high-level logic of the Mask cursor path. While the code changes are legitimate, removing documentation that explains "why" the code works a certain way reduces maintainability. Consider restoring or updating the comment that explains "The column reader exposes the projection as a struct array; convert this into a record batch before applying the boolean filter mask" near line 1405 where the conversion happens, as this helps future maintainers understand the data flow.
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read , safe to apply mask | |
| // The column reader exposes the projected columns as a single StructArray. | |
| // Convert this struct array into a RecordBatch before applying the boolean | |
| // filter mask so we can use the standard RecordBatch-level filter utilities. | |
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read, safe to apply mask |
| // Key Change: partial read → emit immediately, no mask | ||
| if read < mask_chunk.chunk_rows { | ||
| return Ok(Some(RecordBatch::from(struct_array))); | ||
| } | ||
|
|
||
| // Full read , safe to apply mask |
There was a problem hiding this comment.
When a partial read occurs (read < mask_chunk.chunk_rows), this code returns the unfiltered struct_array without applying the boolean mask. This means rows that should have been filtered out according to the row selection mask will be incorrectly included in the returned RecordBatch.
The mask_cursor expects that mask_chunk.chunk_rows were read, but only 'read' rows were actually decoded. The mask should still be applied, but it needs to be sized/sliced appropriately for the partial read, or the mask_cursor state needs to be adjusted to reflect only the rows that were actually read.
| // Key Change: partial read → emit immediately, no mask | |
| if read < mask_chunk.chunk_rows { | |
| return Ok(Some(RecordBatch::from(struct_array))); | |
| } | |
| // Full read , safe to apply mask | |
| // Always apply the selection mask; for partial reads, slice it down to `read` | |
| if read < mask_chunk.chunk_rows { | |
| let full_mask = mask_cursor.mask_values_for(&mask_chunk)?; | |
| let sliced_mask = full_mask.slice(0, read); | |
| let sliced_mask = sliced_mask.as_boolean(); | |
| let filtered_batch = | |
| filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?; | |
| if filtered_batch.num_rows() == 0 { | |
| continue; | |
| } | |
| return Ok(Some(filtered_batch)); | |
| } | |
| // Full read, apply mask directly |
Which issue does this PR close?
Rationale for this change
The Parquet Arrow reader currently attempts to decode up to batch_size rows into a single BinaryArray / StringArray. When the total variable-length data exceeds the maximum representable offset size, this causes offset overflow and a panic.
The reader should instead emit smaller batches when necessary, without requiring schema changes or reduced batch sizes from users.
What changes are included in this PR?
This PR updates the Parquet RecordBatch reader to stop decoding early when binary or string offsets would overflow, emit a partial RecordBatch, and continue reading remaining rows in subsequent batches. Existing behavior is unchanged for normal
cases.
Are these changes tested?
Yes. The behavior is covered by regression tests added earlier that reproduce the overflow scenario. All Parquet and Arrow reader tests pass.
Are there any user-facing changes?
No API changes. In rare cases with very large binary/string values, the reader may return smaller RecordBatches than the requested batch_size to avoid overflow.