diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 6dc5520bb975..7e9e3a302bc1 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -538,6 +538,31 @@ impl ParquetPushDecoder { self.state.row_groups_remaining() } + /// Returns the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will yield a reader for, after applying + /// any internal skipping (row selection emptiness, exhausted budget, + /// finished state). Returns `Ok(None)` when: + /// - the decoder has no more row groups to read, + /// - the decoder is currently inside a row group (consumers should + /// call [`Self::is_at_row_group_boundary`] first), or + /// - every remaining row group would be skipped. + /// + /// Returns `Err` when reading row-group metadata fails (e.g. + /// `usize` overflow on 32-bit targets), matching the error surface + /// of `try_next_reader` so peek and read paths report errors + /// consistently. + /// + /// This is a read-only peek: it does not mutate decoder state. It is + /// useful for adaptive callers that maintain per-row-group state in + /// lock-step with the decoder (e.g. dynamic row-group pruners or + /// per-RG `RowFilter` toggles): without this peek the caller has no + /// way to know which row group the next reader actually corresponds + /// to, because [`Self::try_next_reader`] may silently advance past + /// row groups whose row selection is empty. + pub fn peek_next_row_group(&self) -> Result, ParquetError> { + self.state.peek_next_row_group() + } + /// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the /// row groups that have *not* yet been decoded. /// @@ -840,6 +865,19 @@ impl ParquetDecoderState { } } + fn peek_next_row_group(&self) -> Result, ParquetError> { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.peek_next_row_group(), + // We only expose a meaningful answer at row-group boundaries. + // Mid-row-group there is no "next" — the active reader is + // tied to the current row group. + ParquetDecoderState::DecodingRowGroup { .. } => Ok(None), + ParquetDecoderState::Finished => Ok(None), + } + } + fn into_builder(self) -> Result { let remaining_row_groups = match self { ParquetDecoderState::ReadingRowGroup { @@ -1743,6 +1781,126 @@ mod test { expect_finished(decoder.try_decode()); } + /// `peek_next_row_group` reports the index of the row group the + /// next `try_next_reader` call will hand back, matching the + /// frontier's internal skip logic. + #[test] + fn test_peek_next_row_group_basic() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .build() + .unwrap(); + + // Two row groups (0, 1). At boundary before any read, peek should + // see RG 0. + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0)); + assert!(decoder.is_at_row_group_boundary()); + + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + // Once the reader for RG 0 has been handed off, the decoder is + // back at a boundary waiting for RG 1 — peek must reflect that + // (the active reader lives outside the decoder). + assert!(decoder.is_at_row_group_boundary()); + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + + // Drain RG 0's reader and consume RG 1. + for batch in reader { + let _ = batch.unwrap(); + } + let ranges = expect_needs_data(decoder.try_next_reader()); + push_ranges_to_decoder(&mut decoder, ranges); + let reader = expect_data(decoder.try_next_reader()); + for batch in reader { + let _ = batch.unwrap(); + } + + // No row groups left. + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` honors `with_row_groups` — restricting the + /// scan to a single row group means peek reports only that one and + /// then `None`. + #[test] + fn test_peek_next_row_group_respects_with_row_groups() { + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![1]) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// When a row-selection segment leaves the next row group with zero + /// selected rows, `peek_next_row_group` mirrors + /// `next_readable_row_group`'s skip: it returns the *following* + /// row group instead of the empty one. + #[test] + fn test_peek_next_row_group_skips_empty_selection() { + // Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of + // RG 1; the next reader will be for RG 1, not RG 0. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(250), + RowSelector::select(100), + ])) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// `peek_next_row_group` returns `None` on a finished decoder. + #[test] + fn test_peek_next_row_group_finished() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_row_groups(vec![]) + .build() + .unwrap(); + + // No row groups requested ⇒ already finished, no peek. + expect_finished(decoder.try_next_reader()); + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + + /// `peek_next_row_group` mirrors `next_readable_row_group`'s + /// budget-skipping logic: with an `OFFSET` that consumes the + /// entire first row group, peek must return the *following* + /// row group, not RG 0 (no predicates, so budget skips apply). + #[test] + fn test_peek_next_row_group_skips_for_budget() { + // Each row group has 200 rows. OFFSET 200 drains RG 0 entirely + // and lands the decoder's first emitted reader at RG 1. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(200) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1)); + } + + /// OFFSET larger than every remaining row group's row count + /// exhausts the budget, so peek should report `None` — matching + /// `next_readable_row_group`'s behavior of producing `Finished`. + #[test] + fn test_peek_next_row_group_budget_drains_all() { + // 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land + // anywhere — every RG is skipped under the budget. + let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_offset(500) + .build() + .unwrap(); + + assert_eq!(decoder.peek_next_row_group().unwrap(), None); + } + /// `into_builder` between row groups recovers a builder for the /// not-yet-decoded row groups; rebuilding it with a new row filter /// applies that filter to the subsequent row groups while leaving the diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index d1070d2aa69f..c3138ef8bf4c 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -93,6 +93,63 @@ impl RowGroupFrontier { self.budget = budget; } + /// Peek at the next row-group index `next_readable_row_group` would + /// hand out, without mutating any state. Returns `None` if every + /// remaining row group would be skipped under the current + /// selection/budget, or if the queue is empty. + /// + /// Mirrors the structure of `next_readable_row_group` but only walks + /// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`] + /// to let adaptive callers (e.g. dynamic row-group pruners or per-RG + /// `RowFilter` toggles) keep their per-RG state in lock-step with + /// the reader the decoder is about to emit. + fn peek_next_row_group(&self) -> Result, ParquetError> { + // Short-circuit: budget exhausted or selection drained ⇒ same + // outcome as `next_readable_row_group`'s early return. + if self.budget.is_exhausted() + || self + .selection + .as_ref() + .is_some_and(|selection| selection.row_count() == 0) + { + return Ok(None); + } + + // We may have to walk past row groups whose split selection is + // empty. Cloning the selection lets us run the same `split_off` + // logic without disturbing the real one. + let mut selection = self.selection.clone(); + let mut budget = self.budget; + for &row_group_idx in &self.row_groups { + let row_count = self.row_group_num_rows(row_group_idx)?; + let selected_rows = match selection.as_mut() { + Some(remaining) => { + let rg_segment = remaining.split_off(row_count); + rg_segment.row_count() + } + None => row_count, + }; + if selected_rows == 0 { + // Same skip path as `next_readable_row_group`: row + // selection drained for this RG, move on. + continue; + } + if self.has_predicates { + // Predicates disable budget-based RG skipping for this RG; + // budget still gates row emission inside the row group. + return Ok(Some(row_group_idx)); + } + let rows_after_budget = budget.rows_after(selected_rows); + if rows_after_budget != 0 { + return Ok(Some(row_group_idx)); + } + // Budget-skip: advance the simulated budget and keep + // walking; the next iteration sees the post-advance budget. + budget = budget.advance(selected_rows, rows_after_budget); + } + Ok(None) + } + fn clear_remaining(&mut self) { self.selection = None; self.row_groups.clear(); @@ -299,6 +356,22 @@ impl RemainingRowGroups { self.frontier.row_groups.len() } + /// Peek at the file-level row-group index that the next call to + /// [`Self::try_next_reader`] will produce a reader for, after + /// simulating the same skip logic [`Self::try_next_reader`] applies + /// internally (row-selection emptiness + offset/limit budget). Does + /// not mutate state. + /// + /// Returns `None` when the active row group is still being decoded, + /// when no row groups remain, or when every remaining row group + /// would be skipped under the current selection/budget. + pub fn peek_next_row_group(&self) -> Result, ParquetError> { + if self.row_group_reader_builder.has_active_row_group() { + return Ok(None); + } + self.frontier.peek_next_row_group() + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed