diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs
index 6dc5520bb975..7e9e3a302bc1 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -538,6 +538,31 @@ impl ParquetPushDecoder {
self.state.row_groups_remaining()
}
+ /// Returns the file-level row-group index that the next call to
+ /// [`Self::try_next_reader`] will yield a reader for, after applying
+ /// any internal skipping (row selection emptiness, exhausted budget,
+ /// finished state). Returns `Ok(None)` when:
+ /// - the decoder has no more row groups to read,
+ /// - the decoder is currently inside a row group (consumers should
+ /// call [`Self::is_at_row_group_boundary`] first), or
+ /// - every remaining row group would be skipped.
+ ///
+ /// Returns `Err` when reading row-group metadata fails (e.g.
+ /// `usize` overflow on 32-bit targets), matching the error surface
+ /// of `try_next_reader` so peek and read paths report errors
+ /// consistently.
+ ///
+ /// This is a read-only peek: it does not mutate decoder state. It is
+ /// useful for adaptive callers that maintain per-row-group state in
+ /// lock-step with the decoder (e.g. dynamic row-group pruners or
+ /// per-RG `RowFilter` toggles): without this peek the caller has no
+ /// way to know which row group the next reader actually corresponds
+ /// to, because [`Self::try_next_reader`] may silently advance past
+ /// row groups whose row selection is empty.
+ pub fn peek_next_row_group(&self) -> Result, ParquetError> {
+ self.state.peek_next_row_group()
+ }
+
/// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
/// row groups that have *not* yet been decoded.
///
@@ -840,6 +865,19 @@ impl ParquetDecoderState {
}
}
+ fn peek_next_row_group(&self) -> Result , ParquetError> {
+ match self {
+ ParquetDecoderState::ReadingRowGroup {
+ remaining_row_groups,
+ } => remaining_row_groups.peek_next_row_group(),
+ // We only expose a meaningful answer at row-group boundaries.
+ // Mid-row-group there is no "next" — the active reader is
+ // tied to the current row group.
+ ParquetDecoderState::DecodingRowGroup { .. } => Ok(None),
+ ParquetDecoderState::Finished => Ok(None),
+ }
+ }
+
fn into_builder(self) -> Result {
let remaining_row_groups = match self {
ParquetDecoderState::ReadingRowGroup {
@@ -1743,6 +1781,126 @@ mod test {
expect_finished(decoder.try_decode());
}
+ /// `peek_next_row_group` reports the index of the row group the
+ /// next `try_next_reader` call will hand back, matching the
+ /// frontier's internal skip logic.
+ #[test]
+ fn test_peek_next_row_group_basic() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // Two row groups (0, 1). At boundary before any read, peek should
+ // see RG 0.
+ assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0));
+ assert!(decoder.is_at_row_group_boundary());
+
+ let ranges = expect_needs_data(decoder.try_next_reader());
+ push_ranges_to_decoder(&mut decoder, ranges);
+ let reader = expect_data(decoder.try_next_reader());
+ // Once the reader for RG 0 has been handed off, the decoder is
+ // back at a boundary waiting for RG 1 — peek must reflect that
+ // (the active reader lives outside the decoder).
+ assert!(decoder.is_at_row_group_boundary());
+ assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
+
+ // Drain RG 0's reader and consume RG 1.
+ for batch in reader {
+ let _ = batch.unwrap();
+ }
+ let ranges = expect_needs_data(decoder.try_next_reader());
+ push_ranges_to_decoder(&mut decoder, ranges);
+ let reader = expect_data(decoder.try_next_reader());
+ for batch in reader {
+ let _ = batch.unwrap();
+ }
+
+ // No row groups left.
+ assert_eq!(decoder.peek_next_row_group().unwrap(), None);
+ }
+
+ /// `peek_next_row_group` honors `with_row_groups` — restricting the
+ /// scan to a single row group means peek reports only that one and
+ /// then `None`.
+ #[test]
+ fn test_peek_next_row_group_respects_with_row_groups() {
+ let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_row_groups(vec![1])
+ .build()
+ .unwrap();
+
+ assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
+ }
+
+ /// When a row-selection segment leaves the next row group with zero
+ /// selected rows, `peek_next_row_group` mirrors
+ /// `next_readable_row_group`'s skip: it returns the *following*
+ /// row group instead of the empty one.
+ #[test]
+ fn test_peek_next_row_group_skips_empty_selection() {
+ // Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of
+ // RG 1; the next reader will be for RG 1, not RG 0.
+ let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_row_selection(RowSelection::from(vec![
+ RowSelector::skip(250),
+ RowSelector::select(100),
+ ]))
+ .build()
+ .unwrap();
+
+ assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
+ }
+
+ /// `peek_next_row_group` returns `None` on a finished decoder.
+ #[test]
+ fn test_peek_next_row_group_finished() {
+ let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_row_groups(vec![])
+ .build()
+ .unwrap();
+
+ // No row groups requested ⇒ already finished, no peek.
+ expect_finished(decoder.try_next_reader());
+ assert_eq!(decoder.peek_next_row_group().unwrap(), None);
+ }
+
+ /// `peek_next_row_group` mirrors `next_readable_row_group`'s
+ /// budget-skipping logic: with an `OFFSET` that consumes the
+ /// entire first row group, peek must return the *following*
+ /// row group, not RG 0 (no predicates, so budget skips apply).
+ #[test]
+ fn test_peek_next_row_group_skips_for_budget() {
+ // Each row group has 200 rows. OFFSET 200 drains RG 0 entirely
+ // and lands the decoder's first emitted reader at RG 1.
+ let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_offset(200)
+ .build()
+ .unwrap();
+
+ assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
+ }
+
+ /// OFFSET larger than every remaining row group's row count
+ /// exhausts the budget, so peek should report `None` — matching
+ /// `next_readable_row_group`'s behavior of producing `Finished`.
+ #[test]
+ fn test_peek_next_row_group_budget_drains_all() {
+ // 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land
+ // anywhere — every RG is skipped under the budget.
+ let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_offset(500)
+ .build()
+ .unwrap();
+
+ assert_eq!(decoder.peek_next_row_group().unwrap(), None);
+ }
+
/// `into_builder` between row groups recovers a builder for the
/// not-yet-decoded row groups; rebuilding it with a new row filter
/// applies that filter to the subsequent row groups while leaving the
diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs
index d1070d2aa69f..c3138ef8bf4c 100644
--- a/parquet/src/arrow/push_decoder/remaining.rs
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -93,6 +93,63 @@ impl RowGroupFrontier {
self.budget = budget;
}
+ /// Peek at the next row-group index `next_readable_row_group` would
+ /// hand out, without mutating any state. Returns `None` if every
+ /// remaining row group would be skipped under the current
+ /// selection/budget, or if the queue is empty.
+ ///
+ /// Mirrors the structure of `next_readable_row_group` but only walks
+ /// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`]
+ /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG
+ /// `RowFilter` toggles) keep their per-RG state in lock-step with
+ /// the reader the decoder is about to emit.
+ fn peek_next_row_group(&self) -> Result, ParquetError> {
+ // Short-circuit: budget exhausted or selection drained ⇒ same
+ // outcome as `next_readable_row_group`'s early return.
+ if self.budget.is_exhausted()
+ || self
+ .selection
+ .as_ref()
+ .is_some_and(|selection| selection.row_count() == 0)
+ {
+ return Ok(None);
+ }
+
+ // We may have to walk past row groups whose split selection is
+ // empty. Cloning the selection lets us run the same `split_off`
+ // logic without disturbing the real one.
+ let mut selection = self.selection.clone();
+ let mut budget = self.budget;
+ for &row_group_idx in &self.row_groups {
+ let row_count = self.row_group_num_rows(row_group_idx)?;
+ let selected_rows = match selection.as_mut() {
+ Some(remaining) => {
+ let rg_segment = remaining.split_off(row_count);
+ rg_segment.row_count()
+ }
+ None => row_count,
+ };
+ if selected_rows == 0 {
+ // Same skip path as `next_readable_row_group`: row
+ // selection drained for this RG, move on.
+ continue;
+ }
+ if self.has_predicates {
+ // Predicates disable budget-based RG skipping for this RG;
+ // budget still gates row emission inside the row group.
+ return Ok(Some(row_group_idx));
+ }
+ let rows_after_budget = budget.rows_after(selected_rows);
+ if rows_after_budget != 0 {
+ return Ok(Some(row_group_idx));
+ }
+ // Budget-skip: advance the simulated budget and keep
+ // walking; the next iteration sees the post-advance budget.
+ budget = budget.advance(selected_rows, rows_after_budget);
+ }
+ Ok(None)
+ }
+
fn clear_remaining(&mut self) {
self.selection = None;
self.row_groups.clear();
@@ -299,6 +356,22 @@ impl RemainingRowGroups {
self.frontier.row_groups.len()
}
+ /// Peek at the file-level row-group index that the next call to
+ /// [`Self::try_next_reader`] will produce a reader for, after
+ /// simulating the same skip logic [`Self::try_next_reader`] applies
+ /// internally (row-selection emptiness + offset/limit budget). Does
+ /// not mutate state.
+ ///
+ /// Returns `None` when the active row group is still being decoded,
+ /// when no row groups remain, or when every remaining row group
+ /// would be skipped under the current selection/budget.
+ pub fn peek_next_row_group(&self) -> Result , ParquetError> {
+ if self.row_group_reader_builder.has_active_row_group() {
+ return Ok(None);
+ }
+ self.frontier.peek_next_row_group()
+ }
+
/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed