feat(parquet): add ParquetPushDecoder::peek_next_row_group()#10158
Open
zhuqi-lucas wants to merge 2 commits into
Open
feat(parquet): add ParquetPushDecoder::peek_next_row_group()#10158zhuqi-lucas wants to merge 2 commits into
zhuqi-lucas wants to merge 2 commits into
Conversation
Returns the file-level row-group index that the next call to `try_next_reader` will yield a reader for, after applying any internal skipping (row selection emptiness, exhausted offset/limit budget). Returns `None` when no row groups remain, when the decoder sits inside a row group, or when every remaining row group would be skipped. Closes apache#10148. # Motivation Adaptive callers that maintain per-row-group state in lock-step with the decoder (e.g. dynamic row-group pruners that re-evaluate row-group statistics mid-scan, per-RG `RowFilter` toggles that skip per-row evaluation when stats prove every row matches) currently have no way to know which row group the next reader will correspond to. `try_next_reader` can silently advance past row groups whose row selection is empty under the current `with_row_selection`, breaking the assumption that the queue of indices passed to `with_row_groups` maps 1:1 to the readers handed back. This is the API DataFusion's `#22450` (TopK runtime row-group pruning) needs to enable a per-RG fully-matched `RowFilter` skip optimization that the old `split_runs` design provided. # Implementation `RowGroupFrontier::peek_next_row_group` clones the offset/limit budget and the row-selection, then runs the same `split_off` walk that `next_readable_row_group` performs internally — returning the first row-group index whose simulated selection is non-empty (or, with predicates, the first index whose selection is non-empty regardless of budget). The clone keeps the call read-only; the cost is a single extra `RowSelection::clone` per peek. # Tests `test_peek_next_row_group_basic`: peek before / between / after readers on a 2-RG file. `test_peek_next_row_group_respects_with_row_groups`: explicit `with_row_groups([1])` reports `Some(1)`. `test_peek_next_row_group_skips_empty_selection`: a `RowSelection` that skips all of RG 0 + part of RG 1 makes peek report `Some(1)`, mirroring `next_readable_row_group`'s skip. `test_peek_next_row_group_finished`: an empty `with_row_groups` returns `None`. All 1219 existing parquet lib tests still pass.
Contributor
There was a problem hiding this comment.
Pull request overview
Adds a new public “peek” API to the Parquet push decoder so adaptive callers can determine which file-level row-group index the next try_next_reader() will actually correspond to, after internal skipping (e.g. empty row-selection segments, budget skipping). This supports consumers that maintain per-row-group state in lock-step with the decoder (e.g. DataFusion runtime row-group pruning).
Changes:
- Add
ParquetPushDecoder::peek_next_row_group() -> Option<usize>and plumb through decoder state. - Implement a non-mutating simulation of
next_readable_row_group’s skip logic inRowGroupFrontier. - Add new unit tests covering basic behavior, honoring
with_row_groups, skipping empty selections, and finished-state behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| parquet/src/arrow/push_decoder/remaining.rs | Adds read-only peek logic on the queued row-group frontier and exposes it via RemainingRowGroups. |
| parquet/src/arrow/push_decoder/mod.rs | Exposes the new public API on ParquetPushDecoder, wires through state, and adds tests for the new behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+123
to
+125
| for &row_group_idx in &self.row_groups { | ||
| let row_count = self.row_group_num_rows(row_group_idx).ok()?; | ||
| let selected_rows = match selection.as_mut() { |
Comment on lines
+137
to
+140
| if self.has_predicates { | ||
| // Predicates → always read, regardless of budget. | ||
| return Some(row_group_idx); | ||
| } |
Comment on lines
+1832
to
+1836
| /// When a row-selection segment leaves the next row group with zero | ||
| /// selected rows, `peek_next_row_group` mirrors | ||
| /// `next_readable_row_group`'s skip: it returns the *following* | ||
| /// row group instead of the empty one. | ||
| #[test] |
8 tasks
- Return Result<Option<usize>, ParquetError> instead of swallowing
metadata errors via .ok()?. Row-count overflow on 32-bit targets
(and any other row_group_num_rows failure) now surfaces from peek
too, matching the error surface of try_next_reader.
- Reword the misleading 'always read, regardless of budget' comment.
Predicates disable budget-based RG skipping for that RG; the budget
still gates row emission inside the row group.
- Add two budget-skip tests so peek stays aligned with the budget
logic in next_readable_row_group:
* test_peek_next_row_group_skips_for_budget — with_offset(200) on a
200-row RG returns RG 1, not RG 0.
* test_peek_next_row_group_budget_drains_all — with_offset(500) on
a 400-row scan returns None.
Document the new error case on the public API.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #10148.
Rationale for this change
Adaptive callers that maintain per-row-group state in lock-step with the decoder — e.g. dynamic row-group pruners that re-evaluate row-group statistics mid-scan, or per-RG `RowFilter` toggles that skip per-row evaluation when stats prove every row matches — currently have no way to know which row group the next reader will correspond to. `try_next_reader` can silently advance past row groups whose row selection is empty under the current `with_row_selection`, breaking the assumption that the queue of indices passed to `with_row_groups` maps 1:1 to the readers handed back.
This is the API DataFusion's #22450 (TopK runtime row-group pruning) needs to enable a per-RG fully-matched `RowFilter` skip optimization that the old `split_runs` design previously provided.
What changes are included in this PR?
A new public method on `ParquetPushDecoder`:
```rust
pub fn peek_next_row_group(&self) -> Option
```
Returns the file-level row-group index that the next call to `try_next_reader` will yield a reader for, after applying any internal skipping (row selection emptiness, exhausted offset/limit budget). Returns `None` when no row groups remain, when the decoder sits inside a row group, or when every remaining row group would be skipped.
Implementation
`RowGroupFrontier::peek_next_row_group` clones the offset/limit budget and the row-selection, then runs the same `split_off` walk that `next_readable_row_group` performs internally — returning the first row-group index whose simulated selection is non-empty (or, with predicates, the first index whose selection is non-empty regardless of budget). The clone keeps the call read-only; the cost is a single extra `RowSelection::clone` per peek.
Are these changes tested?
Yes — four new lib tests:
All 1219 existing parquet lib tests still pass.
Are there any user-facing changes?
One new public method on `ParquetPushDecoder`. No existing API is changed; nothing breaks.