diff --git a/parquet/benches/metadata.rs b/parquet/benches/metadata.rs index b6e04580c141..24ac68513188 100644 --- a/parquet/benches/metadata.rs +++ b/parquet/benches/metadata.rs @@ -124,7 +124,7 @@ fn encoded_meta(is_nullable: bool, has_lists: bool, write_path_in_schema: bool) .set_column_metadata(columns) .set_total_byte_size(rng.random_range(1..2000000000)) .set_num_rows(rng.random_range(1..10000000000)) - .set_ordinal(i as i16) + .set_ordinal(i as i32) .build() .unwrap() }) diff --git a/parquet/src/arrow/array_reader/row_group_index.rs b/parquet/src/arrow/array_reader/row_group_index.rs index b4c296c14590..032f1931dc02 100644 --- a/parquet/src/arrow/array_reader/row_group_index.rs +++ b/parquet/src/arrow/array_reader/row_group_index.rs @@ -90,7 +90,7 @@ impl RowGroupIndexReader { // general path: many row groups // builds a mapping from ordinal to row group index // this is O(n) where n is the total number of row groups in the file - let ordinal_to_index: HashMap = + let ordinal_to_index: HashMap = HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map( |(row_group_index, rg)| { rg.ordinal() @@ -215,7 +215,7 @@ mod tests { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } - fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData { + fn create_test_parquet_metadata(row_groups: Vec<(i32, i64)>) -> ParquetMetaData { let schema_descr = create_test_schema(); let mut row_group_metas = vec![]; diff --git a/parquet/src/arrow/array_reader/row_number.rs b/parquet/src/arrow/array_reader/row_number.rs index def88641a15a..3ac55c0b0e69 100644 --- a/parquet/src/arrow/array_reader/row_number.rs +++ b/parquet/src/arrow/array_reader/row_number.rs @@ -45,7 +45,7 @@ impl RowNumberReader { ) -> Result { // Pass 1: Build a map from ordinal to first_row_index // This is O(M) where M is the total number of row groups in the file - let mut ordinal_to_offset: HashMap = HashMap::new(); + let mut ordinal_to_offset: HashMap = HashMap::new(); let mut first_row_index: i64 = 0; for rg in parquet_metadata.row_groups() { @@ -181,7 +181,7 @@ mod tests { Arc::new(SchemaDescriptor::new(Arc::new(schema))) } - fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData { + fn create_test_parquet_metadata(row_groups: Vec<(i32, i64)>) -> ParquetMetaData { let schema_descr = create_test_schema(); let mut row_group_metas = vec![]; diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 646438d2e979..73fa98a0b5fb 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -636,7 +636,7 @@ pub struct RowGroupMetaData { /// We can't infer from file offset of first column since there may empty columns in row group. file_offset: Option, /// Ordinal position of this row group in file - ordinal: Option, + ordinal: Option, } impl RowGroupMetaData { @@ -700,7 +700,7 @@ impl RowGroupMetaData { /// For example if this is the first row group in the file, this will return 0. /// If this is the second row group in the file, this will return 1. #[inline(always)] - pub fn ordinal(&self) -> Option { + pub fn ordinal(&self) -> Option { self.ordinal } @@ -773,7 +773,7 @@ impl RowGroupMetaDataBuilder { } /// Sets ordinal for this row group. - pub fn set_ordinal(mut self, value: i16) -> Self { + pub fn set_ordinal(mut self, value: i32) -> Self { self.0.ordinal = Some(value); self } diff --git a/parquet/src/file/metadata/thrift/mod.rs b/parquet/src/file/metadata/thrift/mod.rs index d5a0112a5e1a..fd352984d5e8 100644 --- a/parquet/src/file/metadata/thrift/mod.rs +++ b/parquet/src/file/metadata/thrift/mod.rs @@ -691,7 +691,7 @@ fn read_row_group( } // 6: we don't expose total_compressed_size 7 => { - row_group.ordinal = Some(i16::read_thrift(&mut *prot)?); + row_group.ordinal = Some(i16::read_thrift(&mut *prot)? as i32); } _ => { prot.skip(field_ident.field_type)?; @@ -817,11 +817,6 @@ pub(crate) fn parquet_metadata_from_bytes( // Read row groups and handle ordinal assignment let mut assigner = OrdinalAssigner::new(); for ordinal in 0..list_ident.size { - let ordinal: i16 = ordinal.try_into().map_err(|_| { - ParquetError::General(format!( - "Row group ordinal {ordinal} exceeds i16 max value", - )) - })?; let rg = read_row_group(&mut prot, schema_descr, options)?; rg_vec.push(assigner.ensure(ordinal, rg)?); } @@ -940,7 +935,7 @@ impl OrdinalAssigner { /// groups must also not have ordinals. fn ensure( &mut self, - actual_ordinal: i16, + actual_ordinal: i32, mut rg: RowGroupMetaData, ) -> Result { let rg_has_ordinal = rg.ordinal.is_some(); @@ -1438,6 +1433,8 @@ impl<'a> WriteThrift for FileMeta<'a> { #[allow(unused_assignments)] fn write_thrift(&self, writer: &mut ThriftCompactOutputProtocol) -> Result<()> { writer.set_write_path_in_schema(self.write_path_in_schema); + // only write ordinal if all values will fit in an i16 + writer.set_write_row_group_ordinal(self.row_groups.len() <= i16::MAX as usize); self.file_metadata .version @@ -1598,8 +1595,14 @@ impl WriteThrift for RowGroupMetaData { last_field_id = self .compressed_size() .write_thrift_field(writer, 6, last_field_id)?; - if let Some(ordinal) = self.ordinal() { - ordinal.write_thrift_field(writer, 7, last_field_id)?; + + // write ordinal if it will fit in an i16 + if writer.write_row_group_ordinal() { + if let Some(ordinal) = self.ordinal() { + if let Ok(ordinal) = i16::try_from(ordinal) { + ordinal.write_thrift_field(writer, 7, last_field_id)?; + } + } } writer.write_struct_end() } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 8ec16ba36739..b86d1fc8ab82 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -226,8 +226,8 @@ impl SerializedFileWriter { /// Creates new row group from this file writer. /// - /// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must - /// be written sequentially. + /// Note: Parquet files are limited to at most 2^31 row groups in a file. If encryption is + /// enabled, this is reduced to 2^15, and row groups must be written sequentially. /// /// Every time the next row group is requested, the previous row group must /// be finalised and closed using the [`SerializedRowGroupWriter::close`] @@ -236,14 +236,25 @@ impl SerializedFileWriter { self.assert_previous_writer_closed()?; let ordinal = self.row_group_index; - let ordinal: i16 = ordinal.try_into().map_err(|_| { + // Thrift cannot encode lists with more than i32::MAX elements + let ordinal: i32 = ordinal.try_into().map_err(|_| { ParquetError::General(format!( "Parquet does not support more than {} row groups per file (currently: {})", - i16::MAX, + i32::MAX, ordinal )) })?; + // If encryption is enabled, the max is 32767 + #[cfg(feature = "encryption")] + if self.file_encryptor.is_some() && ordinal > i16::MAX as i32 { + return Err(ParquetError::General(format!( + "Parquet with encryption does not support more than {} row groups per file (currently: {})", + i16::MAX, + ordinal + ))); + } + self.row_group_index = self .row_group_index .checked_add(1) @@ -470,7 +481,7 @@ fn write_bloom_filters( // iter each column // write bloom filter to the file - let row_group_idx: u16 = row_group + let row_group_idx: u32 = row_group .ordinal() .expect("Missing row group ordinal") .try_into() @@ -523,7 +534,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { bloom_filters: Vec>, column_indexes: Vec>, offset_indexes: Vec>, - row_group_index: i16, + row_group_index: i32, file_offset: i64, on_close: Option>, #[cfg(feature = "encryption")] @@ -543,7 +554,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { schema_descr: SchemaDescPtr, properties: WriterPropertiesPtr, buf: &'a mut TrackedWrite, - row_group_index: i16, + row_group_index: i32, on_close: Option>, ) -> Self { let num_columns = schema_descr.num_columns(); @@ -1730,7 +1741,7 @@ mod tests { let last_group = row_group_writer.close().unwrap(); let flushed = file_writer.flushed_row_groups(); assert_eq!(flushed.len(), idx + 1); - assert_eq!(Some(idx as i16), last_group.ordinal()); + assert_eq!(Some(idx as i32), last_group.ordinal()); assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset()); assert_eq!(&flushed[idx], last_group.as_ref()); } @@ -2209,6 +2220,7 @@ mod tests { assert_eq!(page_sizes[0], unenc_size); } + #[cfg(feature = "encryption")] #[test] fn test_too_many_rowgroups() { let message_type = " @@ -2218,10 +2230,17 @@ mod tests { "; let schema = Arc::new(parse_message_type(message_type).unwrap()); let file: File = tempfile::tempfile().unwrap(); + + const AES_128_FOOTER_KEY: &[u8; 16] = b"0123456789012345"; // 128bit/16 + let footer_key = AES_128_FOOTER_KEY; + let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec()) + .build() + .unwrap(); let props = Arc::new( WriterProperties::builder() .set_statistics_enabled(EnabledStatistics::None) .set_max_row_group_row_count(Some(1)) + .with_file_encryption_properties(file_encryption_properties) .build(), ); let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); @@ -2239,7 +2258,7 @@ mod tests { assert_eq!(i, 0x8000); assert_eq!( e.to_string(), - "Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)" + "Parquet error: Parquet with encryption does not support more than 32767 row groups per file (currently: 32768)" ); } } @@ -2247,6 +2266,42 @@ mod tests { writer.close().unwrap(); } + #[test] + fn test_32k_rowgroups() { + let message_type = " + message test_schema { + REQUIRED BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let file: File = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_max_row_group_row_count(Some(1)) + .build(), + ); + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + + // Create 32k + 1 empty rowgroups. No row group ordinals should be written (but we can't + // test for that). + for _ in 0..0x8001 { + let mut row_group_writer = writer.next_row_group().unwrap(); + let col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + } + writer.close().unwrap(); + + // Parse the written metadata and check that ordinals were replaced. + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + + for (i, rg) in metadata.row_groups().iter().enumerate() { + assert_eq!(i as i32, rg.ordinal().unwrap()); + } + } + #[test] fn test_size_statistics_with_repetition_and_nulls() { let message_type = " diff --git a/parquet/src/parquet_thrift.rs b/parquet/src/parquet_thrift.rs index cc6390b392f4..218194e0da23 100644 --- a/parquet/src/parquet_thrift.rs +++ b/parquet/src/parquet_thrift.rs @@ -754,6 +754,7 @@ pub(crate) fn validate_list_type(expected: ElementType, got: &ListIdentifier) -> pub(crate) struct ThriftCompactOutputProtocol { writer: W, write_path_in_schema: bool, + write_rg_ordinal: bool, } impl ThriftCompactOutputProtocol { @@ -762,6 +763,7 @@ impl ThriftCompactOutputProtocol { Self { writer, write_path_in_schema: true, + write_rg_ordinal: true, } } @@ -778,6 +780,21 @@ impl ThriftCompactOutputProtocol { self.write_path_in_schema } + /// Control the writing of the `ordinal` element of the `RowGroup` struct. + /// + /// The Thrift `ordinal` field on the `RowGroup` struct is `i16`, but the + /// Thrift compact protocol allows for up to 2^31 elements in a list. If + /// more than 2^15 row groups are to be written, this can be set to `false` + /// to prevent writing the ordinal for some row groups but not others. + pub(crate) fn set_write_row_group_ordinal(&mut self, val: bool) { + self.write_rg_ordinal = val; + } + + /// Indicate whether or not to emit `ordinal`. + pub(crate) fn write_row_group_ordinal(&self) -> bool { + self.write_rg_ordinal + } + /// Write a single byte to the output stream. fn write_byte(&mut self, b: u8) -> Result<()> { self.writer.write_all(&[b])?;