From 390ddc8b2fb32226ea2cf2775e1985d51f0b5e00 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 18 Jun 2026 20:27:58 +0800 Subject: [PATCH 1/2] feat(parquet): add ParquetPushDecoder::peek_next_row_group() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Returns the file-level row-group index that the next call to `try_next_reader` will yield a reader for, after applying any internal skipping (row selection emptiness, exhausted offset/limit budget). Returns `None` when no row groups remain, when the decoder sits inside a row group, or when every remaining row group would be skipped. Closes #10148. # Motivation Adaptive callers that maintain per-row-group state in lock-step with the decoder (e.g. dynamic row-group pruners that re-evaluate row-group statistics mid-scan, per-RG `RowFilter` toggles that skip per-row evaluation when stats prove every row matches) currently have no way to know which row group the next reader will correspond to. `try_next_reader` can silently advance past row groups whose row selection is empty under the current `with_row_selection`, breaking the assumption that the queue of indices passed to `with_row_groups` maps 1:1 to the readers handed back. This is the API DataFusion's `#22450` (TopK runtime row-group pruning) needs to enable a per-RG fully-matched `RowFilter` skip optimization that the old `split_runs` design provided. # Implementation `RowGroupFrontier::peek_next_row_group` clones the offset/limit budget and the row-selection, then runs the same `split_off` walk that `next_readable_row_group` performs internally — returning the first row-group index whose simulated selection is non-empty (or, with predicates, the first index whose selection is non-empty regardless of budget). The clone keeps the call read-only; the cost is a single extra `RowSelection::clone` per peek. # Tests `test_peek_next_row_group_basic`: peek before / between / after readers on a 2-RG file. `test_peek_next_row_group_respects_with_row_groups`: explicit `with_row_groups([1])` reports `Some(1)`. `test_peek_next_row_group_skips_empty_selection`: a `RowSelection` that skips all of RG 0 + part of RG 1 makes peek report `Some(1)`, mirroring `next_readable_row_group`'s skip. `test_peek_next_row_group_finished`: an empty `with_row_groups` returns `None`. All 1219 existing parquet lib tests still pass. --- parquet/src/arrow/push_decoder/mod.rs | 120 ++++++++++++++++++++ parquet/src/arrow/push_decoder/remaining.rs | 72 ++++++++++++ 2 files changed, 192 insertions(+) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 6dc5520bb975..5b21b88a8258 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,6 +538,26 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } + /// Returns the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will yield a reader for, after applying + /// any internal skipping (row selection emptiness, exhausted budget, + /// finished state). Returns `None` when: + /// - the decoder has no more row groups to read, + /// - the decoder is currently inside a row group (consumers should + /// call [`Self::is_at_row_group_boundary`] first), or + /// - every remaining row group would be skipped. + /// + /// This is a read-only peek: it does not mutate decoder state. It is + /// useful for adaptive callers that maintain per-row-group state in + /// lock-step with the decoder (e.g. dynamic row-group pruners or + /// per-RG `RowFilter` toggles): without this peek the caller has no + /// way to know which row group the next reader actually corresponds + /// to, because [`Self::try_next_reader`] may silently advance past + /// row groups whose row selection is empty. + pub fn peek_next_row_group(&self) -> Option { + self.state.peek_next_row_group() + } + /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the /// row groups that have *not* yet been decoded. /// @@ -840,6 +860,19 @@ impl ParquetDecoderState { } } + fn peek_next_row_group(&self) -> Option { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.peek_next_row_group(), + // We only expose a meaningful answer at row-group boundaries. + // Mid-row-group there is no "next" — the active reader is + // tied to the current row group. + ParquetDecoderState::DecodingRowGroup { .. } => None, + ParquetDecoderState::Finished => None, + } + } + fn into_builder(self) -> Result { let remaining_row_groups = match self { ParquetDecoderState::ReadingRowGroup { @@ -1743,6 +1776,93 @@ mod test { expect_finished(decoder.try_decode()); } + /// `peek_next_row_group` reports the index of the row group the + /// next `try_next_reader` call will hand back, matching the + /// frontier's internal skip logic. + #[test] + fn test_peek_next_row_group_basic() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups (0, 1). At boundary before any read, peek should + // see RG 0. + assert_eq!(decoder.peek_next_row_group(), Some(0)); + assert!(decoder.is_at_row_group_boundary()); + + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + // Once the reader for RG 0 has been handed off, the decoder is + // back at a boundary waiting for RG 1 — peek must reflect that + // (the active reader lives outside the decoder). + assert!(decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group(), Some(1)); + + // Drain RG 0's reader and consume RG 1. + for batch in reader { + let _ = batch.unwrap(); + } + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + + // No row groups left. + assert_eq!(decoder.peek_next_row_group(), None); + } + + /// `peek_next_row_group` honors `with_row_groups` — restricting the + /// scan to a single row group means peek reports only that one and + /// then `None`. + #[test] + fn test_peek_next_row_group_respects_with_row_groups() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group(), Some(1)); + } + + /// When a row-selection segment leaves the next row group with zero + /// selected rows, `peek_next_row_group` mirrors + /// `next_readable_row_group`'s skip: it returns the *following* + /// row group instead of the empty one. + #[test] + fn test_peek_next_row_group_skips_empty_selection() { + // Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of + // RG 1; the next reader will be for RG 1, not RG 0. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(250), + RowSelector::select(100), + ])) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group(), Some(1)); + } + + /// `peek_next_row_group` returns `None` on a finished decoder. + #[test] + fn test_peek_next_row_group_finished() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![]) + .build() + .unwrap(); + + // No row groups requested ⇒ already finished, no peek. + expect_finished(decoder.try_next_reader()); + assert_eq!(decoder.peek_next_row_group(), None); + } + /// `into_builder` between row groups recovers a builder for the /// not-yet-decoded row groups; rebuilding it with a new row filter /// applies that filter to the subsequent row groups while leaving the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index d1070d2aa69f..b805e5d5530b 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -93,6 +93,62 @@ impl RowGroupFrontier { self.budget = budget; } + /// Peek at the next row-group index `next_readable_row_group` would + /// hand out, without mutating any state. Returns `None` if every + /// remaining row group would be skipped under the current + /// selection/budget, or if the queue is empty. + /// + /// Mirrors the structure of `next_readable_row_group` but only walks + /// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`] + /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG + /// `RowFilter` toggles) keep their per-RG state in lock-step with + /// the reader the decoder is about to emit. + fn peek_next_row_group(&self) -> Option { + // Short-circuit: budget exhausted or selection drained ⇒ same + // outcome as `next_readable_row_group`'s early return. + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return None; + } + + // We may have to walk past row groups whose split selection is + // empty. Cloning the selection lets us run the same `split_off` + // logic without disturbing the real one. + let mut selection = self.selection.clone(); + let mut budget = self.budget; + for &row_group_idx in &self.row_groups { + let row_count = self.row_group_num_rows(row_group_idx).ok()?; + let selected_rows = match selection.as_mut() { + Some(remaining) => { + let rg_segment = remaining.split_off(row_count); + rg_segment.row_count() + } + None => row_count, + }; + if selected_rows == 0 { + // Same skip path as `next_readable_row_group`: row + // selection drained for this RG, move on. + continue; + } + if self.has_predicates { + // Predicates → always read, regardless of budget. + return Some(row_group_idx); + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return Some(row_group_idx); + } + // Budget-skip: advance the simulated budget and keep + // walking; the next iteration sees the post-advance budget. + budget = budget.advance(selected_rows, rows_after_budget); + } + None + } + fn clear_remaining(&mut self) { self.selection = None; self.row_groups.clear(); @@ -299,6 +355,22 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } + /// Peek at the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will produce a reader for, after + /// simulating the same skip logic [`Self::try_next_reader`] applies + /// internally (row-selection emptiness + offset/limit budget). Does + /// not mutate state. + /// + /// Returns `None` when the active row group is still being decoded, + /// when no row groups remain, or when every remaining row group + /// would be skipped under the current selection/budget. + pub fn peek_next_row_group(&self) -> Option { + if self.row_group_reader_builder.has_active_row_group() { + return None; + } + self.frontier.peek_next_row_group() + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed From 739d94e76850f67215cf52957ab1379942808f9d Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Sun, 21 Jun 2026 21:15:39 +0800 Subject: [PATCH 2/2] address Copilot review comments on #10158 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Return Result, ParquetError> instead of swallowing metadata errors via .ok()?. Row-count overflow on 32-bit targets (and any other row_group_num_rows failure) now surfaces from peek too, matching the error surface of try_next_reader. - Reword the misleading 'always read, regardless of budget' comment. Predicates disable budget-based RG skipping for that RG; the budget still gates row emission inside the row group. - Add two budget-skip tests so peek stays aligned with the budget logic in next_readable_row_group: * test_peek_next_row_group_skips_for_budget — with_offset(200) on a 200-row RG returns RG 1, not RG 0. * test_peek_next_row_group_budget_drains_all — with_offset(500) on a 400-row scan returns None. Document the new error case on the public API. --- parquet/src/arrow/push_decoder/mod.rs | 60 +++++++++++++++++---- parquet/src/arrow/push_decoder/remaining.rs | 19 +++---- 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 5b21b88a8258..7e9e3a302bc1 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -541,12 +541,17 @@ impl ParquetPushDecoder { /// Returns the file-level row-group index that the next call to /// [`Self::try_next_reader`] will yield a reader for, after applying /// any internal skipping (row selection emptiness, exhausted budget, - /// finished state). Returns `None` when: + /// finished state). Returns `Ok(None)` when: /// - the decoder has no more row groups to read, /// - the decoder is currently inside a row group (consumers should /// call [`Self::is_at_row_group_boundary`] first), or /// - every remaining row group would be skipped. /// + /// Returns `Err` when reading row-group metadata fails (e.g. + /// `usize` overflow on 32-bit targets), matching the error surface + /// of `try_next_reader` so peek and read paths report errors + /// consistently. + /// /// This is a read-only peek: it does not mutate decoder state. It is /// useful for adaptive callers that maintain per-row-group state in /// lock-step with the decoder (e.g. dynamic row-group pruners or @@ -554,7 +559,7 @@ impl ParquetPushDecoder { /// way to know which row group the next reader actually corresponds /// to, because [`Self::try_next_reader`] may silently advance past /// row groups whose row selection is empty. - pub fn peek_next_row_group(&self) -> Option { + pub fn peek_next_row_group(&self) -> Result, ParquetError> { self.state.peek_next_row_group() } @@ -860,7 +865,7 @@ impl ParquetDecoderState { } } - fn peek_next_row_group(&self) -> Option { + fn peek_next_row_group(&self) -> Result, ParquetError> { match self { ParquetDecoderState::ReadingRowGroup { remaining_row_groups, @@ -868,8 +873,8 @@ impl ParquetDecoderState { // We only expose a meaningful answer at row-group boundaries. // Mid-row-group there is no "next" — the active reader is // tied to the current row group. - ParquetDecoderState::DecodingRowGroup { .. } => None, - ParquetDecoderState::Finished => None, + ParquetDecoderState::DecodingRowGroup { .. } => Ok(None), + ParquetDecoderState::Finished => Ok(None), } } @@ -1788,7 +1793,7 @@ mod test { // Two row groups (0, 1). At boundary before any read, peek should // see RG 0. - assert_eq!(decoder.peek_next_row_group(), Some(0)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0)); assert!(decoder.is_at_row_group_boundary()); let ranges = expect_needs_data(decoder.try_next_reader()); @@ -1798,7 +1803,7 @@ mod test { // back at a boundary waiting for RG 1 — peek must reflect that // (the active reader lives outside the decoder). assert!(decoder.is_at_row_group_boundary()); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); // Drain RG 0's reader and consume RG 1. for batch in reader { @@ -1812,7 +1817,7 @@ mod test { } // No row groups left. - assert_eq!(decoder.peek_next_row_group(), None); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); } /// `peek_next_row_group` honors `with_row_groups` — restricting the @@ -1826,7 +1831,7 @@ mod test { .build() .unwrap(); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); } /// When a row-selection segment leaves the next row group with zero @@ -1846,7 +1851,7 @@ mod test { .build() .unwrap(); - assert_eq!(decoder.peek_next_row_group(), Some(1)); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); } /// `peek_next_row_group` returns `None` on a finished decoder. @@ -1860,7 +1865,40 @@ mod test { // No row groups requested ⇒ already finished, no peek. expect_finished(decoder.try_next_reader()); - assert_eq!(decoder.peek_next_row_group(), None); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` mirrors `next_readable_row_group`'s + /// budget-skipping logic: with an `OFFSET` that consumes the + /// entire first row group, peek must return the *following* + /// row group, not RG 0 (no predicates, so budget skips apply). + #[test] + fn test_peek_next_row_group_skips_for_budget() { + // Each row group has 200 rows. OFFSET 200 drains RG 0 entirely + // and lands the decoder's first emitted reader at RG 1. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// OFFSET larger than every remaining row group's row count + /// exhausts the budget, so peek should report `None` — matching + /// `next_readable_row_group`'s behavior of producing `Finished`. + #[test] + fn test_peek_next_row_group_budget_drains_all() { + // 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land + // anywhere — every RG is skipped under the budget. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(500) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), None); } /// `into_builder` between row groups recovers a builder for the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index b805e5d5530b..c3138ef8bf4c 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -103,7 +103,7 @@ impl RowGroupFrontier { /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG /// `RowFilter` toggles) keep their per-RG state in lock-step with /// the reader the decoder is about to emit. - fn peek_next_row_group(&self) -> Option { + fn peek_next_row_group(&self) -> Result, ParquetError> { // Short-circuit: budget exhausted or selection drained ⇒ same // outcome as `next_readable_row_group`'s early return. if self.budget.is_exhausted() @@ -112,7 +112,7 @@ impl RowGroupFrontier { .as_ref() .is_some_and(|selection| selection.row_count() == 0) { - return None; + return Ok(None); } // We may have to walk past row groups whose split selection is @@ -121,7 +121,7 @@ impl RowGroupFrontier { let mut selection = self.selection.clone(); let mut budget = self.budget; for &row_group_idx in &self.row_groups { - let row_count = self.row_group_num_rows(row_group_idx).ok()?; + let row_count = self.row_group_num_rows(row_group_idx)?; let selected_rows = match selection.as_mut() { Some(remaining) => { let rg_segment = remaining.split_off(row_count); @@ -135,18 +135,19 @@ impl RowGroupFrontier { continue; } if self.has_predicates { - // Predicates → always read, regardless of budget. - return Some(row_group_idx); + // Predicates disable budget-based RG skipping for this RG; + // budget still gates row emission inside the row group. + return Ok(Some(row_group_idx)); } let rows_after_budget = budget.rows_after(selected_rows); if rows_after_budget != 0 { - return Some(row_group_idx); + return Ok(Some(row_group_idx)); } // Budget-skip: advance the simulated budget and keep // walking; the next iteration sees the post-advance budget. budget = budget.advance(selected_rows, rows_after_budget); } - None + Ok(None) } fn clear_remaining(&mut self) { @@ -364,9 +365,9 @@ impl RemainingRowGroups { /// Returns `None` when the active row group is still being decoded, /// when no row groups remain, or when every remaining row group /// would be skipped under the current selection/budget. - pub fn peek_next_row_group(&self) -> Option { + pub fn peek_next_row_group(&self) -> Result, ParquetError> { if self.row_group_reader_builder.has_active_row_group() { - return None; + return Ok(None); } self.frontier.peek_next_row_group() }