feat: add max_row_group_bytes option to WriterProperties#9357
feat: add max_row_group_bytes option to WriterProperties#9357yonipeleg33 wants to merge 9 commits intoapache:mainfrom
Conversation
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to parquet-mr's `parquet.block.size` behavior. |
There was a problem hiding this comment.
parquet-mr is just the official java implementation for parquet, you can rewrite the comment to clarify that this match the official parquet Java implementation
There was a problem hiding this comment.
Also, parquet-mr is I think now officially called "parquet-java" https://github.com/apache/parquet-java
618f003 to
0e07315
Compare
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// # Panics | ||
| /// If the value is `Some(0)`. |
There was a problem hiding this comment.
Please add comment on this and set_max_row_group_bytes and set_max_row_group_size what will be the behavior when both are set
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
There was a problem hiding this comment.
Should we deprecate this function function?
There was a problem hiding this comment.
Wait wait, no so fast, this is a breaking change, as clippy will fail for users, I was asking, it might be in a different pr, but open for discussion. if you keep it please update the pr description under changes to users
There was a problem hiding this comment.
I do agree that this API should be deprecated. Thanks for pointing it out!
this is a breaking change
This is inherently not a breaking change; the purpose of marking APIs as deprecated is to warn users before making a breaking change, without actually making this change.
This PR already calls for a minor bump due to the new APIs introduced; deprecating the old one does not change the version semantics for this PR.
as clippy will fail for users
It's a rustc warning:
The deprecated attribute marks an item as deprecated. rustc will issue warnings on usage of #[deprecated] items
So unless users add -D warnings, compilation won't break.
it might be in a different pr
I don't mind either way: Leaving it here or opening a new PR for deprecating the old API. LMK what's your preference and I'll do it.
if you keep it please update the pr description under changes to users
Done!
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
There was a problem hiding this comment.
| /// Each row is approximately 4 bytes (one i32). | |
| /// Each row is 4 bytes (one `i32`). |
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
There was a problem hiding this comment.
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); | |
| let array = Int32Array::from_iter(0..num_rows as i32); |
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
There was a problem hiding this comment.
the comment is not on the correct line
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
the comment is not on the correct line
There was a problem hiding this comment.
Done (moved to above the test function)
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
There was a problem hiding this comment.
the comment is not on the correct line (it should be on the Some(3500) one)
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { | ||
| // When both limits are set, the row limit triggers first |
There was a problem hiding this comment.
the comment is not on the correct line
parquet/src/file/properties.rs
Outdated
| pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false; | ||
| /// Default value for [`WriterProperties::max_row_group_size`] | ||
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as parquet-mr's parquet.block.size) |
There was a problem hiding this comment.
same as my other parquet-mr comment
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
There was a problem hiding this comment.
| if let Some(v) = value { | |
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | |
| } | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
Can you add a similar test when bytes win with the same structure as this, i.e. writing single large batch, but only changing the config (same test with only conf change)
There was a problem hiding this comment.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
There was a problem hiding this comment.
you don't need statistics, you can calculate it from the data types you need to encocde
There was a problem hiding this comment.
AFAICT, that beats the purpose of this configuration: Its purpose is to control the IO profile of the writer (i.e. how much and when it writes to disk), and for that, the data needs to at least be already encoded before calculating the row group size.
This is also backed by the Java source code:
it calculates memSize using columnStore.getBufferedSize(), which is documented as follows:
@return approximate size of the buffered encoded binary data
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Can you have a similar test with rows wins that have the same structure but only config change?
There was a problem hiding this comment.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
According to the comment on the method, the way you write batches should not affect, only the config, that is if the byte based got hit first, it should write that, if the row hit first it should write that.
and also, it should work regardless of how you feed the data
There was a problem hiding this comment.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
parquet/src/file/properties.rs
Outdated
| /// Sets maximum size of a row group in bytes, or `None` for unlimited. | ||
| /// | ||
| /// Row groups are flushed when their estimated encoded size exceeds this threshold. | ||
| /// This is similar to the official `parquet.block.size` behavior. |
There was a problem hiding this comment.
| /// This is similar to the official `parquet.block.size` behavior. | |
| /// This is similar to the official Java implementation `parquet.block.size` behavior. |
this is not part of the spec so there is no official about it
| } | ||
|
|
||
| /// Helper to create a test batch with the given number of rows. | ||
| /// Each row is approximately 4 bytes (one i32). |
| ArrowDataType::Int32, | ||
| false, | ||
| )])); | ||
| let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>()); |
|
|
||
| #[test] | ||
| fn test_row_group_limit_none_writes_single_row_group() { | ||
| // When both limits are None, all data should go into a single row group |
There was a problem hiding this comment.
Done (moved to above the test function)
|
|
||
| #[test] | ||
| fn test_row_group_limit_rows_only() { | ||
| // When only max_row_group_size is set, respect the row limit |
| false, | ||
| )])); | ||
|
|
||
| // Set byte limit to approximately fit ~30 rows worth of data (~100 bytes each) |
parquet/src/file/properties.rs
Outdated
| if let Some(v) = value { | ||
| assert!(v > 0, "Cannot have a 0 max row group bytes"); | ||
| } |
| @@ -575,7 +595,34 @@ impl WriterPropertiesBuilder { | |||
| /// If the value is set to 0. | |||
| pub fn set_max_row_group_size(mut self, value: usize) -> Self { | |||
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_row_wins() { |
There was a problem hiding this comment.
No can do; The first batch is always written as a whole, because we need some statistics in order to calculate average row size. This is also noted in the PR description:
This means that the first batch will always be written as a whole (unless row count limit is also set).
| } | ||
|
|
||
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { |
There was a problem hiding this comment.
Done - see test_row_group_limit_both_row_wins_multiple_batches vs. test_row_group_limit_both_row_wins_single_batch
| #[test] | ||
| fn test_row_group_limit_both_bytes_wins() { | ||
| // When both limits are set, the byte limit triggers first | ||
| // Write in multiple small batches so byte-based splitting can work |
There was a problem hiding this comment.
Unfortunately, the way data is fed does affect the row group splits, because of the first batch issue (noted in the PR description):
This means that the first batch will always be written as a whole (unless row count limit is also set).
And even beyond the first batch, the behaviour is not predictable: Byte-based limit is enforced by calculating the average row size, based on previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's still a missing test case, please LMK.
etseidl
left a comment
There was a problem hiding this comment.
Thanks @yonipeleg33. Flushing partial review for now, but I think this is looking pretty sound so far.
| /// # Panics | ||
| /// If the value is `Some(0)`. | ||
| pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self { | ||
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); |
There was a problem hiding this comment.
| assert_ne!(value, Some(0), "Cannot have a 0 max row group bytes"); | |
| assert_ne!(value, Some(0), "Cannot have a 0 max row group row count"); |
| /// # Panics | ||
| /// If the value is set to 0. | ||
| #[deprecated( | ||
| since = "57.3.0", |
There was a problem hiding this comment.
| since = "57.3.0", | |
| since = "58.0.0", |
57.3.0 has already been released
| pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; | ||
| /// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same as the official Java | ||
| /// implementation for `parquet.block.size`) | ||
| pub const DEFAULT_MAX_ROW_GROUP_BYTES: usize = 128 * 1024 * 1024; |
There was a problem hiding this comment.
This constant appears to be unused. I'd vote for less clutter and get rid of it
| /// This is similar to the official Java implementation for `parquet.block.size`'s behavior. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
| /// Sets maximum number of rows in a row group, or `None` for unlimited. | ||
| /// | ||
| /// If both `max_row_group_row_count` and `max_row_group_bytes` are set, | ||
| /// the row group with the smallest limit will be applied. |
There was a problem hiding this comment.
| /// the row group with the smallest limit will be applied. | |
| /// the row group with the smaller limit will be produced. |
| @@ -314,8 +320,12 @@ impl<W: Write + Send> ArrowWriter<W> { | |||
| /// Encodes the provided [`RecordBatch`] | |||
| /// | |||
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |||
There was a problem hiding this comment.
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_size`] | |
| /// If this would cause the current row group to exceed [`WriterProperties::max_row_group_row_count`] |
?
Which issue does this PR close?
This PR implements another suggestion introduced in #1213:
So not "Closes" anything new.
Rationale for this change
A best effort to match Spark's (or more specifically, Hadoop's)
parquet.block.sizeconfiguration behaviour, as documented in parquet-hadoop's README:Since arrow's parquet writer writes batches, it's inherently different than Hadoop's per-record writer behaviour - so the behaviour of
max_row_group_byteswill be different than Hadoop'sparquet.block.size, but this is the closest I could reasonably get (see details below).What changes are included in this PR?
Configuration changes
max_row_group_bytesconfiguration option inWriterPropertiesmax_row_group_sizeprivate property tomax_row_group_row_countset_max_row_group_size()andmax_row_group_size()still remain with their existing signatures.set_max_row_group_row_count()andmax_row_group_row_count()which expose theOption<usize>type.set_max_row_group_row_count(None)is called,max_row_group_size()will returnusize::MAX.Writer changes
ArrowWriter::writenow supports any combination of these two properties (row count and row bytes):Byte limit is calculated once per batch (as opposed to Hadoop's per-record calculation):
Before writing each batch, compute the average row size in bytes based on previous writes, and flush or split the batch according to that average before hitting the limit.
This means that the first batch will always be written as a whole (unless row count limit is also set).
Are these changes tested?
Yes - added unit tests to check all different combinations of these two properties being set.
Are there any user-facing changes?
Yes:
usize::MAXfrommax_row_group_size()if it was unset by the user).set_max_row_group_sizeAPI.