Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ fn encoded_meta(is_nullable: bool, has_lists: bool, write_path_in_schema: bool)
.set_column_metadata(columns)
.set_total_byte_size(rng.random_range(1..2000000000))
.set_num_rows(rng.random_range(1..10000000000))
.set_ordinal(i as i16)
.set_ordinal(i as i32)
.build()
.unwrap()
})
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/array_reader/row_group_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl RowGroupIndexReader {
// general path: many row groups
// builds a mapping from ordinal to row group index
// this is O(n) where n is the total number of row groups in the file
let ordinal_to_index: HashMap<i16, i64> =
let ordinal_to_index: HashMap<i32, i64> =
HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map(
|(row_group_index, rg)| {
rg.ordinal()
Expand Down Expand Up @@ -215,7 +215,7 @@ mod tests {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
fn create_test_parquet_metadata(row_groups: Vec<(i32, i64)>) -> ParquetMetaData {
let schema_descr = create_test_schema();

let mut row_group_metas = vec![];
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/arrow/array_reader/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RowNumberReader {
) -> Result<Self> {
// Pass 1: Build a map from ordinal to first_row_index
// This is O(M) where M is the total number of row groups in the file
let mut ordinal_to_offset: HashMap<i16, i64> = HashMap::new();
let mut ordinal_to_offset: HashMap<i32, i64> = HashMap::new();
let mut first_row_index: i64 = 0;

for rg in parquet_metadata.row_groups() {
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}

fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
fn create_test_parquet_metadata(row_groups: Vec<(i32, i64)>) -> ParquetMetaData {
let schema_descr = create_test_schema();

let mut row_group_metas = vec![];
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ pub struct RowGroupMetaData {
/// We can't infer from file offset of first column since there may empty columns in row group.
file_offset: Option<i64>,
/// Ordinal position of this row group in file
ordinal: Option<i16>,
ordinal: Option<i32>,
}

impl RowGroupMetaData {
Expand Down Expand Up @@ -700,7 +700,7 @@ impl RowGroupMetaData {
/// For example if this is the first row group in the file, this will return 0.
/// If this is the second row group in the file, this will return 1.
#[inline(always)]
pub fn ordinal(&self) -> Option<i16> {
pub fn ordinal(&self) -> Option<i32> {
self.ordinal
}

Expand Down Expand Up @@ -773,7 +773,7 @@ impl RowGroupMetaDataBuilder {
}

/// Sets ordinal for this row group.
pub fn set_ordinal(mut self, value: i16) -> Self {
pub fn set_ordinal(mut self, value: i32) -> Self {
self.0.ordinal = Some(value);
self
}
Expand Down
21 changes: 12 additions & 9 deletions parquet/src/file/metadata/thrift/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ fn read_row_group(
}
// 6: we don't expose total_compressed_size
7 => {
row_group.ordinal = Some(i16::read_thrift(&mut *prot)?);
row_group.ordinal = Some(i16::read_thrift(&mut *prot)? as i32);
}
_ => {
prot.skip(field_ident.field_type)?;
Expand Down Expand Up @@ -817,11 +817,6 @@ pub(crate) fn parquet_metadata_from_bytes(
// Read row groups and handle ordinal assignment
let mut assigner = OrdinalAssigner::new();
for ordinal in 0..list_ident.size {
let ordinal: i16 = ordinal.try_into().map_err(|_| {
ParquetError::General(format!(
"Row group ordinal {ordinal} exceeds i16 max value",
))
})?;
let rg = read_row_group(&mut prot, schema_descr, options)?;
rg_vec.push(assigner.ensure(ordinal, rg)?);
}
Expand Down Expand Up @@ -940,7 +935,7 @@ impl OrdinalAssigner {
/// groups must also not have ordinals.
fn ensure(
&mut self,
actual_ordinal: i16,
actual_ordinal: i32,
mut rg: RowGroupMetaData,
) -> Result<RowGroupMetaData> {
let rg_has_ordinal = rg.ordinal.is_some();
Expand Down Expand Up @@ -1438,6 +1433,8 @@ impl<'a> WriteThrift for FileMeta<'a> {
#[allow(unused_assignments)]
fn write_thrift<W: Write>(&self, writer: &mut ThriftCompactOutputProtocol<W>) -> Result<()> {
writer.set_write_path_in_schema(self.write_path_in_schema);
// only write ordinal if all values will fit in an i16

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to write ordinal for the first 2^16-1 row groups, and then just not write it for larger row groups. I think you could avoid changing the thrift writer / adding set_write_row_group_ordinal

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is easier, but I think it's odd to write some but not all. Also, OrdinalAssigner::ensure will error if early row groups have ordinal set but later row groups do not.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is odd to write a mix, but it would only affect files with more than 2^16 row groups, so maybe not a practical matter

writer.set_write_row_group_ordinal(self.row_groups.len() <= i16::MAX as usize);

self.file_metadata
.version
Expand Down Expand Up @@ -1598,8 +1595,14 @@ impl WriteThrift for RowGroupMetaData {
last_field_id = self
.compressed_size()
.write_thrift_field(writer, 6, last_field_id)?;
if let Some(ordinal) = self.ordinal() {
ordinal.write_thrift_field(writer, 7, last_field_id)?;

// write ordinal if it will fit in an i16
if writer.write_row_group_ordinal() {
if let Some(ordinal) = self.ordinal() {
if let Ok(ordinal) = i16::try_from(ordinal) {
ordinal.write_thrift_field(writer, 7, last_field_id)?;
}
}
}
writer.write_struct_end()
}
Expand Down
73 changes: 64 additions & 9 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ impl<W: Write + Send> SerializedFileWriter<W> {

/// Creates new row group from this file writer.
///
/// Note: Parquet files are limited to at most 2^15 row groups in a file; and row groups must
/// be written sequentially.
/// Note: Parquet files are limited to at most 2^31 row groups in a file. If encryption is
/// enabled, this is reduced to 2^15, and row groups must be written sequentially.
///
/// Every time the next row group is requested, the previous row group must
/// be finalised and closed using the [`SerializedRowGroupWriter::close`]
Expand All @@ -236,14 +236,25 @@ impl<W: Write + Send> SerializedFileWriter<W> {
self.assert_previous_writer_closed()?;
let ordinal = self.row_group_index;

let ordinal: i16 = ordinal.try_into().map_err(|_| {
// Thrift cannot encode lists with more than i32::MAX elements
let ordinal: i32 = ordinal.try_into().map_err(|_| {
ParquetError::General(format!(
"Parquet does not support more than {} row groups per file (currently: {})",
i16::MAX,
i32::MAX,
ordinal
))
})?;

// If encryption is enabled, the max is 32767
#[cfg(feature = "encryption")]
if self.file_encryptor.is_some() && ordinal > i16::MAX as i32 {
return Err(ParquetError::General(format!(
"Parquet with encryption does not support more than {} row groups per file (currently: {})",
i16::MAX,
ordinal
)));
}

self.row_group_index = self
.row_group_index
.checked_add(1)
Expand Down Expand Up @@ -470,7 +481,7 @@ fn write_bloom_filters<W: Write + Send>(
// iter each column
// write bloom filter to the file

let row_group_idx: u16 = row_group
let row_group_idx: u32 = row_group
.ordinal()
.expect("Missing row group ordinal")
.try_into()
Expand Down Expand Up @@ -523,7 +534,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
bloom_filters: Vec<Option<Sbbf>>,
column_indexes: Vec<Option<ColumnIndexMetaData>>,
offset_indexes: Vec<Option<OffsetIndexMetaData>>,
row_group_index: i16,
row_group_index: i32,
file_offset: i64,
on_close: Option<OnCloseRowGroup<'a, W>>,
#[cfg(feature = "encryption")]
Expand All @@ -543,7 +554,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
buf: &'a mut TrackedWrite<W>,
row_group_index: i16,
row_group_index: i32,
on_close: Option<OnCloseRowGroup<'a, W>>,
) -> Self {
let num_columns = schema_descr.num_columns();
Expand Down Expand Up @@ -1730,7 +1741,7 @@ mod tests {
let last_group = row_group_writer.close().unwrap();
let flushed = file_writer.flushed_row_groups();
assert_eq!(flushed.len(), idx + 1);
assert_eq!(Some(idx as i16), last_group.ordinal());
assert_eq!(Some(idx as i32), last_group.ordinal());
assert_eq!(Some(row_group_file_offset as i64), last_group.file_offset());
assert_eq!(&flushed[idx], last_group.as_ref());
}
Expand Down Expand Up @@ -2209,6 +2220,7 @@ mod tests {
assert_eq!(page_sizes[0], unenc_size);
}

#[cfg(feature = "encryption")]
#[test]
fn test_too_many_rowgroups() {
let message_type = "
Expand All @@ -2218,10 +2230,17 @@ mod tests {
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let file: File = tempfile::tempfile().unwrap();

const AES_128_FOOTER_KEY: &[u8; 16] = b"0123456789012345"; // 128bit/16
let footer_key = AES_128_FOOTER_KEY;
let file_encryption_properties = FileEncryptionProperties::builder(footer_key.to_vec())
.build()
.unwrap();
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_max_row_group_row_count(Some(1))
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
Expand All @@ -2239,14 +2258,50 @@ mod tests {
assert_eq!(i, 0x8000);
assert_eq!(
e.to_string(),
"Parquet error: Parquet does not support more than 32767 row groups per file (currently: 32768)"
"Parquet error: Parquet with encryption does not support more than 32767 row groups per file (currently: 32768)"
);
}
}
}
writer.close().unwrap();
}

#[test]
fn test_32k_rowgroups() {
let message_type = "
message test_schema {
REQUIRED BYTE_ARRAY a (UTF8);
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let file: File = tempfile::tempfile().unwrap();
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_max_row_group_row_count(Some(1))
.build(),
);
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();

// Create 32k + 1 empty rowgroups. No row group ordinals should be written (but we can't
// test for that).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and arguably it is an implementation detail -- just testing writing 32k row groups is enough

for _ in 0..0x8001 {
let mut row_group_writer = writer.next_row_group().unwrap();
let col_writer = row_group_writer.next_column().unwrap().unwrap();
col_writer.close().unwrap();
row_group_writer.close().unwrap();
}
writer.close().unwrap();

// Parse the written metadata and check that ordinals were replaced.
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();

for (i, rg) in metadata.row_groups().iter().enumerate() {
assert_eq!(i as i32, rg.ordinal().unwrap());
}
}

#[test]
fn test_size_statistics_with_repetition_and_nulls() {
let message_type = "
Expand Down
17 changes: 17 additions & 0 deletions parquet/src/parquet_thrift.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ pub(crate) fn validate_list_type(expected: ElementType, got: &ListIdentifier) ->
pub(crate) struct ThriftCompactOutputProtocol<W: Write> {
writer: W,
write_path_in_schema: bool,
write_rg_ordinal: bool,
}

impl<W: Write> ThriftCompactOutputProtocol<W> {
Expand All @@ -762,6 +763,7 @@ impl<W: Write> ThriftCompactOutputProtocol<W> {
Self {
writer,
write_path_in_schema: true,
write_rg_ordinal: true,
}
}

Expand All @@ -778,6 +780,21 @@ impl<W: Write> ThriftCompactOutputProtocol<W> {
self.write_path_in_schema
}

/// Control the writing of the `ordinal` element of the `RowGroup` struct.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some context here about why one would disable writing the optional ordinal field? Namely because it can't be set for row groups greater than i32::MAX?

///
/// The Thrift `ordinal` field on the `RowGroup` struct is `i16`, but the
/// Thrift compact protocol allows for up to 2^31 elements in a list. If
/// more than 2^15 row groups are to be written, this can be set to `false`
/// to prevent writing the ordinal for some row groups but not others.
pub(crate) fn set_write_row_group_ordinal(&mut self, val: bool) {
self.write_rg_ordinal = val;
}

/// Indicate whether or not to emit `ordinal`.
pub(crate) fn write_row_group_ordinal(&self) -> bool {
self.write_rg_ordinal
}

/// Write a single byte to the output stream.
fn write_byte(&mut self, b: u8) -> Result<()> {
self.writer.write_all(&[b])?;
Expand Down
Loading