-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Allow reading and writing more than 32k Parquet row groups #10149
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -226,8 +226,8 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
|
|
||
| /// Creates new row group from this file writer. | ||
| /// | ||
| /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must | ||
| /// be written sequentially. | ||
| /// Note: Parquet files are limited to at most 2^31 row groups in a file. If encryption is | ||
| /// enabled, this is reduced to 2^15, and row groups must be written sequentially. | ||
| /// | ||
| /// Every time the next row group is requested, the previous row group must | ||
| /// be finalised and closed using the [`SerializedRowGroupWriter::close`] | ||
|
|
@@ -236,14 +236,25 @@ impl<W: Write + Send> SerializedFileWriter<W> { | |
| self.assert_previous_writer_closed()?; | ||
| let ordinal = self.row_group_index; | ||
|
|
||
| let ordinal: i16 = ordinal.try_into().map_err(|_| { | ||
| // Thrift cannot encode lists with more than i32::MAX elements | ||
| let ordinal: i32 = ordinal.try_into().map_err(|_| { | ||
| ParquetError::General(format!( | ||
| "Parquet does not support more than {} row groups per file (currently: {})", | ||
| i16::MAX, | ||
| i32::MAX, | ||
| ordinal | ||
| )) | ||
| })?; | ||
|
|
||
| // If encryption is enabled, the max is 32767 | ||
| #[cfg(feature = "encryption")] | ||
| if self.file_encryptor.is_some() && ordinal > i16::MAX as i32 { | ||
| return Err(ParquetError::General(format!( | ||
| "Parquet with encryption does not support more than {} row groups per file (currently: {})", | ||
| i16::MAX, | ||
| ordinal | ||
| ))); | ||
| } | ||
|
|
||
| self.row_group_index = self | ||
| .row_group_index | ||
| .checked_add(1) | ||
|
|
@@ -470,7 +481,7 @@ fn write_bloom_filters<W: Write + Send>( | |
| // iter each column | ||
| // write bloom filter to the file | ||
|
|
||
| let row_group_idx: u16 = row_group | ||
| let row_group_idx: u32 = row_group | ||
| .ordinal() | ||
| .expect("Missing row group ordinal") | ||
| .try_into() | ||
|
|
@@ -523,7 +534,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { | |
| bloom_filters: Vec<Option<Sbbf>>, | ||
| column_indexes: Vec<Option<ColumnIndexMetaData>>, | ||
| offset_indexes: Vec<Option<OffsetIndexMetaData>>, | ||
| row_group_index: i16, | ||
| row_group_index: i32, | ||
| file_offset: i64, | ||
| on_close: Option<OnCloseRowGroup<'a, W>>, | ||
| #[cfg(feature = "encryption")] | ||
|
|
@@ -543,7 +554,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { | |
| schema_descr: SchemaDescPtr, | ||
| properties: WriterPropertiesPtr, | ||
| buf: &'a mut TrackedWrite<W>, | ||
| row_group_index: i16, | ||
| row_group_index: i32, | ||
| on_close: Option<OnCloseRowGroup<'a, W>>, | ||
| ) -> Self { | ||
| let num_columns = schema_descr.num_columns(); | ||
|
|
@@ -1730,7 +1741,7 @@ mod tests { | |
| let last_group = row_group_writer.close().unwrap(); | ||
| let flushed = file_writer.flushed_row_groups(); | ||
| assert_eq!(flushed.len(), idx + 1); | ||
| assert_eq!(Some(idx as i16), last_group.ordinal()); | ||
| assert_eq!(Some(idx as i32), last_group.ordinal()); | ||
| assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); | ||
| assert_eq!(&flushed[idx], last_group.as_ref()); | ||
| } | ||
|
|
@@ -2209,6 +2220,7 @@ mod tests { | |
| assert_eq!(page_sizes[0], unenc_size); | ||
| } | ||
|
|
||
| #[cfg(feature = "encryption")] | ||
| #[test] | ||
| fn test_too_many_rowgroups() { | ||
| let message_type = " | ||
|
|
@@ -2218,10 +2230,17 @@ mod tests { | |
| "; | ||
| let schema = Arc::new(parse_message_type(message_type).unwrap()); | ||
| let file: File = tempfile::tempfile().unwrap(); | ||
|
|
||
| const AES_128_FOOTER_KEY: &[u8; 16] = b"0123456789012345"; // 128bit/16 | ||
| let footer_key = AES_128_FOOTER_KEY; | ||
| let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec()) | ||
| .build() | ||
| .unwrap(); | ||
| let props = Arc::new( | ||
| WriterProperties::builder() | ||
| .set_statistics_enabled(EnabledStatistics::None) | ||
| .set_max_row_group_row_count(Some(1)) | ||
| .with_file_encryption_properties(file_encryption_properties) | ||
| .build(), | ||
| ); | ||
| let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); | ||
|
|
@@ -2239,14 +2258,50 @@ mod tests { | |
| assert_eq!(i, 0x8000); | ||
| assert_eq!( | ||
| e.to_string(), | ||
| "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)" | ||
| "Parquet error: Parquet with encryption does not support more than 32767 row groups per file (currently: 32768)" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| writer.close().unwrap(); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_32k_rowgroups() { | ||
| let message_type = " | ||
| message test_schema { | ||
| REQUIRED BYTE_ARRAY a (UTF8); | ||
| } | ||
| "; | ||
| let schema = Arc::new(parse_message_type(message_type).unwrap()); | ||
| let file: File = tempfile::tempfile().unwrap(); | ||
| let props = Arc::new( | ||
| WriterProperties::builder() | ||
| .set_statistics_enabled(EnabledStatistics::None) | ||
| .set_max_row_group_row_count(Some(1)) | ||
| .build(), | ||
| ); | ||
| let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); | ||
|
|
||
| // Create 32k + 1 empty rowgroups. No row group ordinals should be written (but we can't | ||
| // test for that). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and arguably it is an implementation detail -- just testing writing 32k row groups is enough |
||
| for _ in 0..0x8001 { | ||
| let mut row_group_writer = writer.next_row_group().unwrap(); | ||
| let col_writer = row_group_writer.next_column().unwrap().unwrap(); | ||
| col_writer.close().unwrap(); | ||
| row_group_writer.close().unwrap(); | ||
| } | ||
| writer.close().unwrap(); | ||
|
|
||
| // Parse the written metadata and check that ordinals were replaced. | ||
| let reader = SerializedFileReader::new(file).unwrap(); | ||
| let metadata = reader.metadata(); | ||
|
|
||
| for (i, rg) in metadata.row_groups().iter().enumerate() { | ||
| assert_eq!(i as i32, rg.ordinal().unwrap()); | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_size_statistics_with_repetition_and_nulls() { | ||
| let message_type = " | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -754,6 +754,7 @@ pub(crate) fn validate_list_type(expected: ElementType, got: &ListIdentifier) -> | |
| pub(crate) struct ThriftCompactOutputProtocol<W: Write> { | ||
| writer: W, | ||
| write_path_in_schema: bool, | ||
| write_rg_ordinal: bool, | ||
| } | ||
|
|
||
| impl<W: Write> ThriftCompactOutputProtocol<W> { | ||
|
|
@@ -762,6 +763,7 @@ impl<W: Write> ThriftCompactOutputProtocol<W> { | |
| Self { | ||
| writer, | ||
| write_path_in_schema: true, | ||
| write_rg_ordinal: true, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -778,6 +780,21 @@ impl<W: Write> ThriftCompactOutputProtocol<W> { | |
| self.write_path_in_schema | ||
| } | ||
|
|
||
| /// Control the writing of the `ordinal` element of the `RowGroup` struct. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add some context here about why one would disable writing the optional |
||
| /// | ||
| /// The Thrift `ordinal` field on the `RowGroup` struct is `i16`, but the | ||
| /// Thrift compact protocol allows for up to 2^31 elements in a list. If | ||
| /// more than 2^15 row groups are to be written, this can be set to `false` | ||
| /// to prevent writing the ordinal for some row groups but not others. | ||
| pub(crate) fn set_write_row_group_ordinal(&mut self, val: bool) { | ||
| self.write_rg_ordinal = val; | ||
| } | ||
|
|
||
| /// Indicate whether or not to emit `ordinal`. | ||
| pub(crate) fn write_row_group_ordinal(&self) -> bool { | ||
| self.write_rg_ordinal | ||
| } | ||
|
|
||
| /// Write a single byte to the output stream. | ||
| fn write_byte(&mut self, b: u8) -> Result<()> { | ||
| self.writer.write_all(&[b])?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to write ordinal for the first
2^16-1row groups, and then just not write it for larger row groups. I think you could avoid changing the thrift writer / addingset_write_row_group_ordinalThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is easier, but I think it's odd to write some but not all. Also,
OrdinalAssigner::ensurewill error if early row groups have ordinal set but later row groups do not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it is odd to write a mix, but it would only affect files with more than 2^16 row groups, so maybe not a practical matter