Skip to content

feat: add max_row_group_bytes option to WriterProperties#9357

Open
yonipeleg33 wants to merge 9 commits intoapache:mainfrom
yonipeleg33:feature/max_row_group_bytes
Open

feat: add max_row_group_bytes option to WriterProperties#9357
yonipeleg33 wants to merge 9 commits intoapache:mainfrom
yonipeleg33:feature/max_row_group_bytes

Conversation

@yonipeleg33
Copy link

@yonipeleg33 yonipeleg33 commented Feb 5, 2026

Which issue does this PR close?

This PR implements another suggestion introduced in #1213:

Add functionality to flush based on a bytes threshold instead of, or in addition to, the current row threshold

So not "Closes" anything new.

Rationale for this change

A best effort to match Spark's (or more specifically, Hadoop's) parquet.block.size configuration behaviour, as documented in parquet-hadoop's README:

Property: parquet.block.size
Description: The block size in bytes...

Since arrow's parquet writer writes batches, it's inherently different than Hadoop's per-record writer behaviour - so the behaviour of max_row_group_bytes will be different than Hadoop's parquet.block.size, but this is the closest I could reasonably get (see details below).

What changes are included in this PR?

Configuration changes

  • New optional max_row_group_bytes configuration option in WriterProperties
  • Rename existing max_row_group_size private property to max_row_group_row_count
  • Backwards compatible: No public APIs changed:
    • set_max_row_group_size() and max_row_group_size() still remain with their existing signatures.
    • added set_max_row_group_row_count() and max_row_group_row_count() which expose the Option<usize> type.
    • If set_max_row_group_row_count(None) is called, max_row_group_size() will return usize::MAX.

Writer changes

ArrowWriter::write now supports any combination of these two properties (row count and row bytes):

  • Both are unset -> Write everything in one row group.
  • One is set -> Respect only this one (either bytes or rows amount).
  • Both are set -> Respect the lower of them: Open a new row group when either row count or byte size limits reached

Byte limit is calculated once per batch (as opposed to Hadoop's per-record calculation):
Before writing each batch, compute the average row size in bytes based on previous writes, and flush or split the batch according to that average before hitting the limit.
This means that the first batch will always be written as a whole (unless row count limit is also set).

Are these changes tested?

Yes - added unit tests to check all different combinations of these two properties being set.

Are there any user-facing changes?

Yes:

  1. Introducing new APIs to configure byte limits on row groups, and slight change to existing one (returning usize::MAX from max_row_group_size() if it was unset by the user).
  2. Deprecating the old set_max_row_group_size API.

@github-actions github-actions bot added the parquet Changes to the parquet crate label Feb 5, 2026
/// Sets maximum size of a row group in bytes, or `None` for unlimited.
///
/// Row groups are flushed when their estimated encoded size exceeds this threshold.
/// This is similar to parquet-mr's `parquet.block.size` behavior.
Copy link
Member

@rluvaton rluvaton Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parquet-mr is just the official java implementation for parquet, you can rewrite the comment to clarify that this match the official parquet Java implementation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, parquet-mr is I think now officially called "parquet-java" https://github.com/apache/parquet-java

@yonipeleg33 yonipeleg33 force-pushed the feature/max_row_group_bytes branch from 618f003 to 0e07315 Compare February 5, 2026 14:22
@github-actions github-actions bot added the arrow Changes to the arrow crate label Feb 5, 2026
@github-actions github-actions bot removed the arrow Changes to the arrow crate label Feb 5, 2026
Comment on lines 602 to 613
/// Sets maximum number of rows in a row group, or `None` for unlimited.
///
/// # Panics
/// If the value is `Some(0)`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment on this and set_max_row_group_bytes and set_max_row_group_size what will be the behavior when both are set

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we deprecate this function function?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. Done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users

Copy link
Author

@yonipeleg33 yonipeleg33 Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree that this API should be deprecated. Thanks for pointing it out!

this is a breaking change

This is inherently not a breaking change; the purpose of marking APIs as deprecated is to warn users before making a breaking change, without actually making this change.
This PR already calls for a minor bump due to the new APIs introduced; deprecating the old one does not change the version semantics for this PR.

as clippy will fail for users

It's a rustc warning:

The deprecated attribute marks an item as deprecated. rustc will issue warnings on usage of #[deprecated] items

So unless users add -D warnings, compilation won't break.

it might be in a different pr

I don't mind either way: Leaving it here or opening a new PR for deprecating the old API. LMK what's your preference and I'll do it.

if you keep it please update the pr description under changes to users

Done!

}

/// Helper to create a test batch with the given number of rows.
/// Each row is approximately 4 bytes (one i32).
Copy link
Member

@rluvaton rluvaton Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Each row is approximately 4 bytes (one i32).
/// Each row is 4 bytes (one `i32`).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ArrowDataType::Int32,
false,
)]));
let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
let array = Int32Array::from_iter(0..num_rows as i32);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Done


#[test]
fn test_row_group_limit_rows_only() {
// When only max_row_group_size is set, respect the row limit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not on the correct line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


#[test]
fn test_row_group_limit_none_writes_single_row_group() {
// When both limits are None, all data should go into a single row group
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not on the correct line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (moved to above the test function)

false,
)]));

// Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not on the correct line (it should be on the Some(3500) one)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


#[test]
fn test_row_group_limit_both_row_wins() {
// When both limits are set, the row limit triggers first
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is not on the correct line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as my other parquet-mr comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 622 to 624
if let Some(v) = value {
assert!(v > 0, "Cannot have a 0 max row group bytes");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if let Some(v) = value {
assert!(v > 0, "Cannot have a 0 max row group bytes");
}
assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes");

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, done

}

#[test]
fn test_row_group_limit_both_row_wins() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a similar test when bytes win with the same structure as this, i.e. writing single large batch, but only changing the config (same test with only conf change)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:

This means that the first batch will always be written as a whole (unless row count limit is also set).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need statistics, you can calculate it from the data types you need to encocde

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, that beats the purpose of this configuration: Its purpose is to control the IO profile of the writer (i.e. how much and when it writes to disk), and for that, the data needs to at least be already encoded before calculating the row group size.

This is also backed by the Java source code:
it calculates memSize using columnStore.getBufferedSize(), which is documented as follows:

@return approximate size of the buffered encoded binary data

}

#[test]
fn test_row_group_limit_both_bytes_wins() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a similar test with rows wins that have the same structure but only config change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch

#[test]
fn test_row_group_limit_both_bytes_wins() {
// When both limits are set, the byte limit triggers first
// Write in multiple small batches so byte-based splitting can work
Copy link
Member

@rluvaton rluvaton Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the comment on the method, the way you write batches should not affect, only the config, that is if the byte based got hit first, it should write that, if the row hit first it should write that.

and also, it should work regardless of how you feed the data

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):

This means that the first batch will always be written as a whole (unless row count limit is also set).

And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.

I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.

/// Sets maximum size of a row group in bytes, or `None` for unlimited.
///
/// Row groups are flushed when their estimated encoded size exceeds this threshold.
/// This is similar to the official `parquet.block.size` behavior.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This is similar to the official `parquet.block.size` behavior.
/// This is similar to the official Java implementation `parquet.block.size` behavior.

this is not part of the spec so there is no official about it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Author

@yonipeleg33 yonipeleg33 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rluvaton, PTAL

}

/// Helper to create a test batch with the given number of rows.
/// Each row is approximately 4 bytes (one i32).
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ArrowDataType::Int32,
false,
)]));
let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Done


#[test]
fn test_row_group_limit_none_writes_single_row_group() {
// When both limits are None, all data should go into a single row group
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (moved to above the test function)


#[test]
fn test_row_group_limit_rows_only() {
// When only max_row_group_size is set, respect the row limit
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

false,
)]));

// Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 622 to 624
if let Some(v) = value {
assert!(v > 0, "Cannot have a 0 max row group bytes");
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat, done

@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably. Done

}

#[test]
fn test_row_group_limit_both_row_wins() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:

This means that the first batch will always be written as a whole (unless row count limit is also set).

}

#[test]
fn test_row_group_limit_both_bytes_wins() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch

#[test]
fn test_row_group_limit_both_bytes_wins() {
// When both limits are set, the byte limit triggers first
// Write in multiple small batches so byte-based splitting can work
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):

This means that the first batch will always be written as a whole (unless row count limit is also set).

And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.

I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.

@yonipeleg33 yonipeleg33 requested a review from rluvaton February 5, 2026 17:08
Copy link
Contributor

@etseidl etseidl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @yonipeleg33. Flushing partial review for now, but I think this is looking pretty sound so far.

/// # Panics
/// If the value is `Some(0)`.
pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self {
assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes");
assert_ne!(value, Some(0), "Cannot have a 0 max row group row count");

/// # Panics
/// If the value is set to 0.
#[deprecated(
since = "57.3.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
since = "57.3.0",
since = "58.0.0",

57.3.0 has already been released

pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java
/// implementation for `parquet.block.size`)
pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constant appears to be unused. I'd vote for less clutter and get rid of it

/// This is similar to the official Java implementation for `parquet.block.size`'s behavior.
///
/// If both `max_row_group_row_count` and `max_row_group_bytes` are set,
/// the row group with the smallest limit will be applied.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// the row group with the smallest limit will be applied.
/// the row group with the smaller limit will be produced.

/// Sets maximum number of rows in a row group, or `None` for unlimited.
///
/// If both `max_row_group_row_count` and `max_row_group_bytes` are set,
/// the row group with the smallest limit will be applied.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// the row group with the smallest limit will be applied.
/// the row group with the smaller limit will be produced.

@@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> {
/// Encodes the provided [`RecordBatch`]
///
/// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`]
/// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`]

?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants