diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 19c3a8f76a06..0001a8316015 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -706,17 +706,21 @@ struct ArrowColumnChunkData { length: usize, store: Box, keys: Vec, - /// The dictionary page's serialized blobs (header ‖ data), held in memory - /// rather than the store. + /// Handles to the dictionary page's blobs (header then data) in the store. /// /// A dictionary page is produced at most once and bounded by /// `dict_page_size_limit`, but it must be written *first* in the chunk even /// though the data pages reach the writer before it (see - /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only - /// round-trip ~1 page to the backend and straight back, so it is kept here - /// and emitted ahead of the data pages at splice. Empty for non-dictionary - /// columns. - dictionary: Vec, + /// [`PageWriter::defers_dictionary_ordering`]). Its header and data are `put` + /// into the store like any other page — which keeps the store uniform, and + /// lets an oversized dictionary page spill — and their handles are held apart + /// so they can be emitted ahead of the data pages at splice. + /// Empty for non-dictionary columns. + dictionary_keys: Vec, + /// Serialized length of the dictionary page (0 if there is none), recorded + /// so the data pages can be shifted past it when offsets are rewritten to a + /// dictionary-first layout at splice. + dictionary_len: usize, } impl ArrowColumnChunkData { @@ -725,7 +729,8 @@ impl ArrowColumnChunkData { length: 0, store, keys: Vec::new(), - dictionary: Vec::new(), + dictionary_keys: Vec::new(), + dictionary_len: 0, } } @@ -737,36 +742,35 @@ impl ArrowColumnChunkData { Ok(()) } - /// Retain a dictionary-page blob in memory (emitted first at splice). - fn push_dictionary(&mut self, value: Bytes) { - self.dictionary.push(value); - } - - /// Total serialized size of the in-memory dictionary page, in bytes. - fn dictionary_len(&self) -> usize { - self.dictionary.iter().map(Bytes::len).sum() + /// Store a dictionary-page blob (header or data) in the page store, + /// recording its handle (emitted first at splice) and accumulating its + /// serialized length. + fn push_dictionary(&mut self, value: Bytes) -> Result<()> { + self.dictionary_len += value.len(); + let key = self.store.put(value)?; + self.dictionary_keys.push(key); + Ok(()) } /// Bytes this chunk currently holds on the heap: whatever the store keeps - /// resident (zero for a spilling backend) plus the in-memory dictionary - /// page. + /// resident (zero for a spilling backend). fn memory_size(&self) -> usize { - self.store.memory_size() + self.dictionary_len() + self.store.memory_size() } } /// A streaming [`Read`] over one column chunk's buffered pages, in final file -/// order: the in-memory dictionary page (if any) first, then the data pages. +/// order: the dictionary page (if any) first, then the data pages. /// -/// Each data-page blob is taken back out of the [`PageStore`] *as it is +/// Each blob is taken back out of the [`PageStore`] *as it is /// consumed* and released immediately afterwards, so splicing a chunk into the /// output file never materializes more than a single page in memory at a time. /// This is what keeps the splice phase within the memory bound for a spilling /// backend (an in-memory store already holds the bytes, so it is unaffected). struct StreamingColumnChunkReader { - /// Dictionary-page blobs, emitted before any data page. - dictionary: IntoIter, store: Box, + /// Page handles in final file order: the dictionary page first (if any), + /// then the data pages. keys: IntoIter, /// The blob currently being drained into the output; emptied as it is read. current: Bytes, @@ -774,10 +778,19 @@ struct StreamingColumnChunkReader { impl StreamingColumnChunkReader { fn new(data: ArrowColumnChunkData) -> Self { + // The dictionary page must be emitted first, ahead of the data pages, + // even though it was the last page produced. + let keys = if data.dictionary_keys.is_empty() { + data.keys + } else { + let mut keys = Vec::with_capacity(data.dictionary_keys.len() + data.keys.len()); + keys.extend(data.dictionary_keys); + keys.extend(data.keys); + keys + }; Self { - dictionary: data.dictionary.into_iter(), store: data.store, - keys: data.keys.into_iter(), + keys: keys.into_iter(), current: Bytes::new(), } } @@ -786,11 +799,9 @@ impl StreamingColumnChunkReader { impl Read for StreamingColumnChunkReader { fn read(&mut self, out: &mut [u8]) -> std::io::Result { // Refill from the next blob whenever the current one is drained: the - // dictionary page first, then each data page from the store. + // dictionary page first, then each data page, all taken from the store. while self.current.is_empty() { - if let Some(blob) = self.dictionary.next() { - self.current = blob; - } else if let Some(key) = self.keys.next() { + if let Some(key) = self.keys.next() { self.current = self.store.take(key).map_err(std::io::Error::other)?; } else { return Ok(0); @@ -885,10 +896,10 @@ impl PageWriter for ArrowPageWriter { buf.length += compressed_size; if spec.page_type == PageType::DICTIONARY_PAGE { - // Held in memory and emitted first at splice — see - // `ArrowColumnChunkData::dictionary`. - buf.push_dictionary(header); - buf.push_dictionary(data); + // Recorded apart from the data pages so it is emitted first at + // splice — see `ArrowColumnChunkData::dictionary_keys`. + buf.push_dictionary(header)?; + buf.push_dictionary(data)?; } else { buf.push(header)?; buf.push(data)?; @@ -975,7 +986,7 @@ impl ArrowColumnChunk { // The dictionary page is produced *after* the data pages on this path (so // they can stream straight through) but must be written *first*, so move // it ahead of the data pages in the recorded offsets before the splice. - let close = close.update_dictionary_location(data.dictionary_len())?; + let close = close.update_dictionary_location(data.dictionary_len)?; let reader = StreamingColumnChunkReader::new(data); writer.append_column_from_read(reader, close) @@ -2094,6 +2105,82 @@ mod tests { assert_eq!(read_values, values); } + /// The dictionary page is routed through the [`PageStore`] like any other + /// page rather than held resident in memory, so a dictionary column chunk's + /// *entire* serialized size — dictionary page included — passes through the + /// store. + #[test] + fn dictionary_page_is_routed_through_the_store() { + /// A store that sums the bytes handed to `put`. + #[derive(Debug, Default)] + struct SizeRecordingPageStore { + blobs: Vec, + bytes_put: Arc, + } + impl PageStore for SizeRecordingPageStore { + fn put(&mut self, value: Bytes) -> Result { + self.bytes_put + .fetch_add(value.len(), std::sync::atomic::Ordering::Relaxed); + let key = PageKey::new(self.blobs.len() as u64); + self.blobs.push(value); + Ok(key) + } + fn take(&mut self, key: PageKey) -> Result { + Ok(std::mem::take(&mut self.blobs[key.get() as usize])) + } + } + #[derive(Debug)] + struct Factory { + bytes_put: Arc, + } + impl PageStoreFactory for Factory { + fn create(&self, _args: &PageStoreArgs<'_>) -> Result> { + Ok(Box::new(SizeRecordingPageStore { + bytes_put: self.bytes_put.clone(), + ..Default::default() + })) + } + } + + let schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, false)])); + // Low cardinality keeps the column dictionary-encoded with a real, + // non-empty dictionary page. + let values: Vec<&str> = (0..2048) + .map(|i| ["alpha", "beta", "gamma", "delta"][i % 4]) + .collect(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))]) + .unwrap(); + + let bytes_put = Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let opts = ArrowWriterOptions::new().with_page_store_factory(Arc::new(Factory { + bytes_put: bytes_put.clone(), + })); + + // A single batch / single column means exactly one row group and one + // store instance, so the bytes it saw map to one column chunk. + let mut buffer = Vec::new(); + let mut writer = + ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), opts).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = SerializedFileReader::new(Bytes::from(buffer)).unwrap(); + let column = reader.metadata().row_group(0).column(0); + assert!( + column.dictionary_page_offset().is_some(), + "expected the column to be dictionary-encoded" + ); + + // The bytes the store was handed must account for the whole chunk, + // dictionary page included. Holding the dictionary page apart from the + // store would make this fall short by the dictionary page's size. + assert_eq!( + bytes_put.load(std::sync::atomic::Ordering::Relaxed) as i64, + column.compressed_size(), + "the dictionary page must pass through the store like any other page" + ); + } + #[test] fn arrow_writer() { // define schema diff --git a/parquet/tests/arrow_writer.rs b/parquet/tests/arrow_writer.rs index e4cc10100035..ce65ba01fe55 100644 --- a/parquet/tests/arrow_writer.rs +++ b/parquet/tests/arrow_writer.rs @@ -26,13 +26,15 @@ use std::sync::Arc; use arrow::array::{ArrayRef, BinaryArray, Float64Array, Int32Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::Bytes; -use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_writer::{ - ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory, + ArrowColumnChunk, ArrowRowGroupWriterFactory, ArrowWriterOptions, PageKey, PageStore, + PageStoreArgs, PageStoreFactory, compute_leaves, }; +use parquet::arrow::{ArrowSchemaConverter, ArrowWriter}; use parquet::basic::Encoding; use parquet::errors::Result; use parquet::file::properties::WriterProperties; +use parquet::file::writer::SerializedFileWriter; #[test] #[should_panic( @@ -383,3 +385,180 @@ fn page_store_bounds_write_memory() { baseline ({dict_in_memory}) — dictionary data pages should spill, not accumulate" ); } + +/// Number of dictionary-encoded columns written into a single row group. Each +/// produces its own dictionary page, and all of those pages are held at once +/// between every column's `close()` and the row-group splice. Large enough that +/// K × dict_page dominates the single dictionary page a spilling store keeps. +const DICT_NUM_COLUMNS: usize = 16; +/// Distinct values per dictionary column. Moderate cardinality: small enough to +/// stay dictionary-encoded, large enough (with `DICT_VALUE_LEN`) that each +/// dictionary page is ~0.75 MiB. +const DICT_DISTINCT: usize = 2048; +/// Width of each distinct dictionary value, in bytes (→ dictionary page +/// ≈ `DICT_DISTINCT × (DICT_VALUE_LEN + 4)` ≈ 0.75 MiB). +const DICT_VALUE_LEN: usize = 384; +/// Rows per dictionary column (each distinct value repeated twice). +const DICT_ROWS: usize = DICT_DISTINCT * 2; +/// Approximate retained size of one dictionary page (PLAIN: 4-byte length + value). +const DICT_PAGE_BYTES: usize = DICT_DISTINCT * (DICT_VALUE_LEN + 4); + +/// The `DICT_DISTINCT` distinct dictionary values, concatenated. Built once and +/// shared by every column. Constructed outside the measured closure so its bytes +/// sit in the baseline and are not charged to the per-run peak. +fn dict_value_pool() -> Vec { + let mut pool = vec![0u8; DICT_DISTINCT * DICT_VALUE_LEN]; + for d in 0..DICT_DISTINCT { + let slice = &mut pool[d * DICT_VALUE_LEN..(d + 1) * DICT_VALUE_LEN]; + // A deterministic, incompressible blob per distinct value. + let mut state = (d as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 1; + for byte in slice.iter_mut() { + state ^= state << 13; + state ^= state >> 7; + state ^= state << 17; + *byte = (state >> 24) as u8; + } + } + pool +} + +/// One binary column of `DICT_ROWS` rows drawn from `pool`, each distinct value +/// repeated twice. +fn dict_column(pool: &[u8]) -> ArrayRef { + let mut data = vec![0u8; DICT_VALUE_LEN * DICT_ROWS]; + let mut offsets: Vec = Vec::with_capacity(DICT_ROWS + 1); + offsets.push(0); + for r in 0..DICT_ROWS { + let distinct = r % DICT_DISTINCT; + data[r * DICT_VALUE_LEN..(r + 1) * DICT_VALUE_LEN] + .copy_from_slice(&pool[distinct * DICT_VALUE_LEN..(distinct + 1) * DICT_VALUE_LEN]); + offsets.push(((r + 1) * DICT_VALUE_LEN) as i32); + } + Arc::new( + BinaryArray::try_new( + arrow::buffer::OffsetBuffer::new(offsets.into()), + arrow::buffer::Buffer::from_vec(data), + None, + ) + .unwrap(), + ) +} + +/// Encode `DICT_NUM_COLUMNS` dictionary columns into a single row group, then +/// splice them in. Columns are encoded and closed one at a time (via the +/// column-writer API), so only the current column's encoder dictionary is +/// resident while it encodes; the pages that accumulate are the completed +/// dictionary pages held in each closed `ArrowColumnChunk` until the splice. +/// +/// With the default in-memory store every closed chunk keeps its ~0.75 MiB +/// dictionary page on the heap, so all `DICT_NUM_COLUMNS` accumulate before the +/// splice (peak ≈ K × dict_page). With a spilling store each dictionary page is +/// pushed off the heap as its column closes, so at most one is ever resident. +/// +/// `page_store_factory` selects the backend (`None` → the default in-memory +/// store). Output goes to [`io::sink`] so produced file bytes never live on the +/// heap and the measured peak reflects only the writer's page buffering. +fn write_dict_columns(page_store_factory: Option>, pool: &[u8]) { + let fields: Vec = (0..DICT_NUM_COLUMNS) + .map(|i| Field::new(format!("k{i}"), DataType::Binary, false)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let props = Arc::new( + WriterProperties::builder() + .set_compression(parquet::basic::Compression::UNCOMPRESSED) + // Raise the dictionary-page limit above one dictionary page so every + // column stays dictionary-encoded instead of falling back to PLAIN. + .set_dictionary_page_size_limit(8 * 1024 * 1024) + .build(), + ); + + let parquet_schema = ArrowSchemaConverter::new() + .with_coerce_types(props.coerce_types()) + .convert(&schema) + .unwrap(); + let mut file = + SerializedFileWriter::new(std::io::sink(), parquet_schema.root_schema_ptr(), props) + .unwrap(); + + let mut factory = ArrowRowGroupWriterFactory::new(&file, schema.clone()); + if let Some(f) = page_store_factory { + factory = factory.with_page_store_factory(f); + } + let col_writers = factory.create_column_writers(0).unwrap(); + + // Encode + close each column before starting the next: its input array and + // encoder are dropped at the end of the iteration, leaving only the closed + // chunk (which holds the completed dictionary page) to accumulate. + let mut chunks: Vec = Vec::with_capacity(DICT_NUM_COLUMNS); + for (i, mut writer) in col_writers.into_iter().enumerate() { + let arr = dict_column(pool); + for leaf in compute_leaves(schema.field(i), &arr).unwrap() { + writer.write(&leaf).unwrap(); + } + drop(arr); + chunks.push(writer.close().unwrap()); + } + + // Splice the held chunks into the row group, one page at a time. + let mut row_group_writer = file.next_row_group().unwrap(); + for chunk in chunks { + chunk.append_to_row_group(&mut row_group_writer).unwrap(); + } + row_group_writer.close().unwrap(); + file.close().unwrap(); +} + +/// Writes `DICT_NUM_COLUMNS` moderate-cardinality dictionary columns — each +/// producing a ~0.75 MiB dictionary page — into a single row group via the +/// column-at-a-time API. Columns are encoded and closed one at a time, so the +/// pages that accumulate are the completed dictionary pages held in each closed +/// `ArrowColumnChunk` between every column's `close()` and the row-group splice. +/// +/// With the default in-memory store all K dictionary pages stay resident at once +/// (peak ≈ K × dict_page). With a spilling store each dictionary page is pushed +/// off the heap as its column closes, so at most one is ever resident, keeping +/// the spilling peak far below the in-memory K × dict_page baseline. +#[test] +fn page_store_spills_dictionary_pages() { + // Build the distinct-value pool up front so its bytes sit in the baseline + // and are not charged to either per-run peak below. + let dict_pool = dict_value_pool(); + let dict_in_memory = peak_heap_bytes(|| write_dict_columns(None, &dict_pool)); + let dict_spill = peak_heap_bytes(|| { + write_dict_columns(Some(Arc::new(TempFilePageStoreFactory)), &dict_pool) + }); + let all_dict_pages = DICT_NUM_COLUMNS * DICT_PAGE_BYTES; + eprintln!( + "dict columns ({} cols × ~{:.2} MiB dictionary page = ~{:.1} MiB held at once) peak heap \ + — in-memory: {:.2} MiB, temp-file spill: {:.2} MiB", + DICT_NUM_COLUMNS, + DICT_PAGE_BYTES as f64 / (1024.0 * 1024.0), + all_dict_pages as f64 / (1024.0 * 1024.0), + dict_in_memory as f64 / (1024.0 * 1024.0), + dict_spill as f64 / (1024.0 * 1024.0), + ); + + // The in-memory store must hold most of the K simultaneously-resident + // dictionary pages — confirming the scenario really does accumulate them. + assert!( + dict_in_memory >= all_dict_pages / 2, + "expected in-memory dict peak >= {} bytes (≈ K × dict_page), got {dict_in_memory}", + all_dict_pages / 2 + ); + + // The spilling store keeps at most one dictionary page in flight, so its + // peak stays a small multiple of a single dictionary page — far below the + // in-memory K × dict_page. + assert!( + dict_spill * 2 < dict_in_memory, + "expected dict-column spilling peak ({dict_spill}) to be far below the in-memory \ + baseline ({dict_in_memory}) — the K held dictionary pages should spill, not stay resident" + ); + let spill_ceiling = DICT_PAGE_BYTES * 5; + assert!( + dict_spill < spill_ceiling, + "expected dict-column spilling peak ({dict_spill}) below {spill_ceiling} bytes \ + (a few dictionary pages), not the ~K × dict_page of the in-memory baseline" + ); +}