Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,31 @@ impl ParquetPushDecoder {
self.state.row_groups_remaining()
}

/// Returns the file-level row-group index that the next call to
/// [`Self::try_next_reader`] will yield a reader for, after applying
/// any internal skipping (row selection emptiness, exhausted budget,
/// finished state). Returns `Ok(None)` when:
/// - the decoder has no more row groups to read,
/// - the decoder is currently inside a row group (consumers should
/// call [`Self::is_at_row_group_boundary`] first), or
/// - every remaining row group would be skipped.
///
/// Returns `Err` when reading row-group metadata fails (e.g.
/// `usize` overflow on 32-bit targets), matching the error surface
/// of `try_next_reader` so peek and read paths report errors
/// consistently.
///
/// This is a read-only peek: it does not mutate decoder state. It is
/// useful for adaptive callers that maintain per-row-group state in
/// lock-step with the decoder (e.g. dynamic row-group pruners or
/// per-RG `RowFilter` toggles): without this peek the caller has no
/// way to know which row group the next reader actually corresponds
/// to, because [`Self::try_next_reader`] may silently advance past
/// row groups whose row selection is empty.
pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
self.state.peek_next_row_group()
}

/// Decompose this decoder back into a [`ParquetPushDecoderBuilder`] for the
/// row groups that have *not* yet been decoded.
///
Expand Down Expand Up @@ -840,6 +865,19 @@ impl ParquetDecoderState {
}
}

fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.peek_next_row_group(),
// We only expose a meaningful answer at row-group boundaries.
// Mid-row-group there is no "next" — the active reader is
// tied to the current row group.
ParquetDecoderState::DecodingRowGroup { .. } => Ok(None),
ParquetDecoderState::Finished => Ok(None),
}
}

fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError> {
let remaining_row_groups = match self {
ParquetDecoderState::ReadingRowGroup {
Expand Down Expand Up @@ -1743,6 +1781,126 @@ mod test {
expect_finished(decoder.try_decode());
}

/// `peek_next_row_group` reports the index of the row group the
/// next `try_next_reader` call will hand back, matching the
/// frontier's internal skip logic.
#[test]
fn test_peek_next_row_group_basic() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.build()
.unwrap();

// Two row groups (0, 1). At boundary before any read, peek should
// see RG 0.
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(0));
assert!(decoder.is_at_row_group_boundary());

let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
// Once the reader for RG 0 has been handed off, the decoder is
// back at a boundary waiting for RG 1 — peek must reflect that
// (the active reader lives outside the decoder).
assert!(decoder.is_at_row_group_boundary());
assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));

// Drain RG 0's reader and consume RG 1.
for batch in reader {
let _ = batch.unwrap();
}
let ranges = expect_needs_data(decoder.try_next_reader());
push_ranges_to_decoder(&mut decoder, ranges);
let reader = expect_data(decoder.try_next_reader());
for batch in reader {
let _ = batch.unwrap();
}

// No row groups left.
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` honors `with_row_groups` — restricting the
/// scan to a single row group means peek reports only that one and
/// then `None`.
#[test]
fn test_peek_next_row_group_respects_with_row_groups() {
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![1])
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// When a row-selection segment leaves the next row group with zero
/// selected rows, `peek_next_row_group` mirrors
/// `next_readable_row_group`'s skip: it returns the *following*
/// row group instead of the empty one.
#[test]
fn test_peek_next_row_group_skips_empty_selection() {
// Each row group has 200 rows. Skip all 200 of RG 0 plus 50 of
// RG 1; the next reader will be for RG 1, not RG 0.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(250),
RowSelector::select(100),
]))
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// `peek_next_row_group` returns `None` on a finished decoder.
#[test]
fn test_peek_next_row_group_finished() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_row_groups(vec![])
.build()
.unwrap();

// No row groups requested ⇒ already finished, no peek.
expect_finished(decoder.try_next_reader());
assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `peek_next_row_group` mirrors `next_readable_row_group`'s
/// budget-skipping logic: with an `OFFSET` that consumes the
/// entire first row group, peek must return the *following*
/// row group, not RG 0 (no predicates, so budget skips apply).
#[test]
fn test_peek_next_row_group_skips_for_budget() {
// Each row group has 200 rows. OFFSET 200 drains RG 0 entirely
// and lands the decoder's first emitted reader at RG 1.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(200)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), Some(1));
}

/// OFFSET larger than every remaining row group's row count
/// exhausts the budget, so peek should report `None` — matching
/// `next_readable_row_group`'s behavior of producing `Finished`.
#[test]
fn test_peek_next_row_group_budget_drains_all() {
// 2 row groups × 200 rows = 400 total. OFFSET 500 cannot land
// anywhere — every RG is skipped under the budget.
let decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_offset(500)
.build()
.unwrap();

assert_eq!(decoder.peek_next_row_group().unwrap(), None);
}

/// `into_builder` between row groups recovers a builder for the
/// not-yet-decoded row groups; rebuilding it with a new row filter
/// applies that filter to the subsequent row groups while leaving the
Expand Down
73 changes: 73 additions & 0 deletions parquet/src/arrow/push_decoder/remaining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,63 @@ impl RowGroupFrontier {
self.budget = budget;
}

/// Peek at the next row-group index `next_readable_row_group` would
/// hand out, without mutating any state. Returns `None` if every
/// remaining row group would be skipped under the current
/// selection/budget, or if the queue is empty.
///
/// Mirrors the structure of `next_readable_row_group` but only walks
/// borrowed state — used by [`super::ParquetPushDecoder::peek_next_row_group`]
/// to let adaptive callers (e.g. dynamic row-group pruners or per-RG
/// `RowFilter` toggles) keep their per-RG state in lock-step with
/// the reader the decoder is about to emit.
fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
// Short-circuit: budget exhausted or selection drained ⇒ same
// outcome as `next_readable_row_group`'s early return.
if self.budget.is_exhausted()
|| self
.selection
.as_ref()
.is_some_and(|selection| selection.row_count() == 0)
{
return Ok(None);
}

// We may have to walk past row groups whose split selection is
// empty. Cloning the selection lets us run the same `split_off`
// logic without disturbing the real one.
let mut selection = self.selection.clone();
let mut budget = self.budget;
for &row_group_idx in &self.row_groups {
let row_count = self.row_group_num_rows(row_group_idx)?;
let selected_rows = match selection.as_mut() {
Comment thread
zhuqi-lucas marked this conversation as resolved.
Some(remaining) => {
let rg_segment = remaining.split_off(row_count);
rg_segment.row_count()
}
None => row_count,
};
if selected_rows == 0 {
// Same skip path as `next_readable_row_group`: row
// selection drained for this RG, move on.
continue;
}
if self.has_predicates {
// Predicates disable budget-based RG skipping for this RG;
// budget still gates row emission inside the row group.
return Ok(Some(row_group_idx));
}
let rows_after_budget = budget.rows_after(selected_rows);
if rows_after_budget != 0 {
return Ok(Some(row_group_idx));
}
// Budget-skip: advance the simulated budget and keep
// walking; the next iteration sees the post-advance budget.
budget = budget.advance(selected_rows, rows_after_budget);
}
Ok(None)
}

fn clear_remaining(&mut self) {
self.selection = None;
self.row_groups.clear();
Expand Down Expand Up @@ -299,6 +356,22 @@ impl RemainingRowGroups {
self.frontier.row_groups.len()
}

/// Peek at the file-level row-group index that the next call to
/// [`Self::try_next_reader`] will produce a reader for, after
/// simulating the same skip logic [`Self::try_next_reader`] applies
/// internally (row-selection emptiness + offset/limit budget). Does
/// not mutate state.
///
/// Returns `None` when the active row group is still being decoded,
/// when no row groups remain, or when every remaining row group
/// would be skipped under the current selection/budget.
pub fn peek_next_row_group(&self) -> Result<Option<usize>, ParquetError> {
if self.row_group_reader_builder.has_active_row_group() {
return Ok(None);
}
self.frontier.peek_next_row_group()
}

/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed
Expand Down
Loading