diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index b4f18f311723..5b8555138dc7 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -449,6 +449,12 @@ enum Encoding { /// afterwards. Note that the use of this encoding with FIXED_LEN_BYTE_ARRAY(N) data may /// perform poorly for large values of N. BYTE_STREAM_SPLIT = 9; + /// Compressed string encoding using a Fast Static Symbol Table (FSST). + /// + /// Frequently occurring substrings (up to 8 bytes) are replaced with + /// single-byte codes drawn from a per-page symbol table, enabling random + /// access to individual compressed values. Applies to BYTE_ARRAY data. + FSST = 10; } ); @@ -469,6 +475,7 @@ impl FromStr for Encoding { "DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY), "RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY), "BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT), + "FSST" | "fsst" => Ok(Encoding::FSST), _ => Err(general_err!("unknown encoding: {}", s)), } } @@ -608,6 +615,7 @@ fn i32_to_encoding(val: i32) -> Encoding { 7 => Encoding::DELTA_BYTE_ARRAY, 8 => Encoding::RLE_DICTIONARY, 9 => Encoding::BYTE_STREAM_SPLIT, + 10 => Encoding::FSST, _ => panic!("Impossible encoding {val}"), } } diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 0fb960412295..3bdfa5b7b9df 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -29,11 +29,13 @@ use crate::data_type::*; use crate::encodings::decoding::byte_stream_split_decoder::{ ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder, }; +use crate::encodings::decoding::fsst_decoder::FsstDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{self, BitReader, FromBitpacked}; mod byte_stream_split_decoder; +pub(crate) mod fsst_decoder; pub(crate) mod private { use super::*; @@ -142,6 +144,7 @@ pub(crate) mod private { Encoding::DELTA_LENGTH_BYTE_ARRAY => { Ok(Box::new(DeltaLengthByteArrayDecoder::new())) } + Encoding::FSST => Ok(Box::new(FsstDecoder::new())), _ => get_decoder_default(descr, encoding), } } diff --git a/parquet/src/encodings/decoding/fsst_decoder.rs b/parquet/src/encodings/decoding/fsst_decoder.rs new file mode 100644 index 000000000000..7c71c15d8908 --- /dev/null +++ b/parquet/src/encodings/decoding/fsst_decoder.rs @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; + +use bytes::Bytes; + +use crate::basic::{Encoding, Type}; +use crate::data_type::private::ParquetValueType; +use crate::data_type::{DataType, Int32Type}; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::encodings::fsst::{FSST_LENGTH_ENCODING_DELTA, SymbolTable, read_uleb128}; +use crate::errors::{ParquetError, Result}; + +/// Decoder for the [`FSST`](Encoding::FSST) encoding. +/// +/// See [`FsstEncoder`](crate::encodings::encoding::fsst_encoder::FsstEncoder) +/// for the page layout. Only [`Type::BYTE_ARRAY`] is supported. +pub struct FsstDecoder { + /// Symbol table parsed from the page. + table: SymbolTable, + /// Concatenated compressed payload (section 4). + data: Bytes, + /// Per-value compressed lengths (section 2), decoded up front. + lengths: Vec, + /// Index of the next value to produce. + cursor: usize, + /// Byte offset into `data` of the value at `cursor`. + offset: usize, + _phantom: PhantomData, +} + +impl Default for FsstDecoder { + fn default() -> Self { + Self::new() + } +} + +impl FsstDecoder { + /// Creates a new FSST decoder. + pub fn new() -> Self { + Self { + table: SymbolTable::default(), + data: Bytes::new(), + lengths: Vec::new(), + cursor: 0, + offset: 0, + _phantom: PhantomData, + } + } +} + +impl Decoder for FsstDecoder { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstDecoder only supports ByteArrayType")); + } + + // Section 1: header. + let mut pos = 0; + let length_encoding = read_uleb128(&data, &mut pos)?; + if length_encoding != FSST_LENGTH_ENCODING_DELTA as u64 { + return Err(general_err!( + "FSST: unsupported length-array encoding {length_encoding}" + )); + } + let length_size = read_uleb128(&data, &mut pos)? as usize; + let symbol_size = read_uleb128(&data, &mut pos)? as usize; + + let length_end = pos + length_size; + let symbol_end = length_end + symbol_size; + let symbol_bytes = data + .get(length_end..symbol_end) + .ok_or_else(|| general_err!("FSST: truncated symbol table"))?; + + // Section 2: length array, Delta-Binary-Packed. + let mut lengths = vec![0i32; num_values]; + if num_values > 0 { + let length_bytes = data + .slice(pos..length_end); + let mut length_decoder = DeltaBitPackDecoder::::new(); + length_decoder.set_data(length_bytes, num_values)?; + length_decoder.get(&mut lengths)?; + } + + // Section 3: symbol table. + let (table, _) = SymbolTable::deserialize(symbol_bytes)?; + + // Section 4: compressed payload. + self.table = table; + self.data = data.slice(symbol_end..); + self.lengths = lengths; + self.cursor = 0; + self.offset = 0; + Ok(()) + } + + fn get(&mut self, buffer: &mut [T::T]) -> Result { + let to_read = buffer.len().min(self.lengths.len() - self.cursor); + let mut decompressed = Vec::new(); + for item in buffer.iter_mut().take(to_read) { + let len = self.lengths[self.cursor] as usize; + let end = self.offset + len; + let compressed = self + .data + .get(self.offset..end) + .ok_or_else(|| general_err!("FSST: truncated compressed value"))?; + decompressed.clear(); + self.table.decompress(compressed, &mut decompressed)?; + item.set_from_bytes(Bytes::copy_from_slice(&decompressed)); + self.offset = end; + self.cursor += 1; + } + Ok(to_read) + } + + fn values_left(&self) -> usize { + self.lengths.len() - self.cursor + } + + #[cold] + fn encoding(&self) -> Encoding { + Encoding::FSST + } + + fn skip(&mut self, num_values: usize) -> Result { + let to_skip = num_values.min(self.lengths.len() - self.cursor); + for _ in 0..to_skip { + self.offset += self.lengths[self.cursor] as usize; + self.cursor += 1; + } + Ok(to_skip) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + + use crate::basic::Encoding; + use crate::data_type::{ByteArray, ByteArrayType}; + use crate::encodings::decoding::Decoder; + use crate::encodings::encoding::Encoder; + use crate::encodings::encoding::fsst_encoder::FsstEncoder; + + use super::FsstDecoder; + + #[test] + fn encode_decode_roundtrip() { + let values: Vec = vec![ + ByteArray::from("hello"), + ByteArray::from("parquet"), + ByteArray::from(""), + ByteArray::from("fsst"), + ]; + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + assert_eq!(encoder.encoding(), Encoding::FSST); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + assert_eq!(decoder.values_left(), values.len()); + + let mut out = vec![ByteArray::default(); values.len()]; + let read = decoder.get(&mut out).unwrap(); + assert_eq!(read, values.len()); + assert_eq!(out, values); + assert_eq!(decoder.values_left(), 0); + } + + #[test] + fn roundtrip_many_values_exercises_length_array() { + let values: Vec = (0..1000) + .map(|i| ByteArray::from(format!("https://example.com/item/{i}").as_str())) + .collect(); + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + + let mut out = vec![ByteArray::default(); values.len()]; + assert_eq!(decoder.get(&mut out).unwrap(), values.len()); + assert_eq!(out, values); + } + + #[test] + fn rejects_unknown_length_encoding() { + // A header whose first ULEB128 (length-array encoding id) is not the + // expected Delta-Binary-Packed id must be rejected. + let bogus = Bytes::from_static(&[99, 0, 0]); + let mut decoder = FsstDecoder::::new(); + assert!(decoder.set_data(bogus, 0).is_err()); + } + + #[test] + fn skip_values() { + let values: Vec = + vec![ByteArray::from("a"), ByteArray::from("bb"), ByteArray::from("ccc")]; + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + + assert_eq!(decoder.skip(2).unwrap(), 2); + let mut out = vec![ByteArray::default(); 1]; + assert_eq!(decoder.get(&mut out).unwrap(), 1); + assert_eq!(out[0], values[2]); + } +} diff --git a/parquet/src/encodings/encoding/fsst_encoder.rs b/parquet/src/encodings/encoding/fsst_encoder.rs new file mode 100644 index 000000000000..18de96451cdd --- /dev/null +++ b/parquet/src/encodings/encoding/fsst_encoder.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; + +use bytes::Bytes; + +use crate::basic::{Encoding, Type}; +use crate::data_type::private::ParquetValueType; +use crate::data_type::{ByteArray, DataType, Int32Type}; +use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; +use crate::encodings::fsst::{FSST_LENGTH_ENCODING_DELTA, SymbolTable, write_uleb128}; +use crate::errors::{ParquetError, Result}; + +/// Encoder for the [`FSST`](Encoding::FSST) encoding. +/// +/// Values are buffered until [`flush_buffer`](Encoder::flush_buffer), at which +/// point a [`SymbolTable`] is trained over them and each value is compressed. +/// +/// The flushed page has four contiguous sections: +/// 1. a header of three ULEB128 values — the length-array encoding id, the +/// length-array byte size, and the symbol-table byte size; +/// 2. the per-value compressed lengths, Delta-Binary-Packed; +/// 3. the serialized [`SymbolTable`]; and +/// 4. the concatenated compressed values (boundaries come from the lengths). +/// +/// Only [`Type::BYTE_ARRAY`] is supported. +pub struct FsstEncoder { + /// Raw values buffered until the symbol table can be trained at flush time. + values: Vec, + /// Running total of buffered raw bytes, for O(1) size estimates. + buffered_bytes: usize, + _phantom: PhantomData, +} + +impl Default for FsstEncoder { + fn default() -> Self { + Self::new() + } +} + +impl FsstEncoder { + /// Creates a new FSST encoder. + pub fn new() -> Self { + Self { + values: Vec::new(), + buffered_bytes: 0, + _phantom: PhantomData, + } + } +} + +impl Encoder for FsstEncoder { + fn put(&mut self, values: &[T::T]) -> Result<()> { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstEncoder only supports ByteArrayType")); + } + self.values.reserve(values.len()); + for value in values { + let byte_array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| general_err!("FsstEncoder only supports ByteArrayType"))?; + self.buffered_bytes += byte_array.len(); + self.values.push(byte_array.clone()); + } + Ok(()) + } + + #[cold] + fn encoding(&self) -> Encoding { + Encoding::FSST + } + + fn estimated_data_encoded_size(&self) -> usize { + // Loose estimate: the raw buffered bytes. The trained codec typically + // compresses below this, but the symbol table is only built at flush. + self.buffered_bytes + } + + fn estimated_memory_size(&self) -> usize { + self.buffered_bytes + + self.values.len() * std::mem::size_of::() + + std::mem::size_of::() + } + + fn flush_buffer(&mut self) -> Result { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstEncoder only supports ByteArrayType")); + } + // Parquet counts values per page with an i32, so the encoder cannot + // produce more than that in a single flush. + if self.values.len() > i32::MAX as usize { + return Err(general_err!( + "FSST can encode at most i32::MAX values, got {}", + self.values.len() + )); + } + + let table = SymbolTable::train(self.values.iter().map(|v| v.data())); + + // Compress each value, concatenating the payload and collecting the + // per-value compressed lengths for the length array. + let mut payload = Vec::with_capacity(self.buffered_bytes); + let mut lengths: Vec = Vec::with_capacity(self.values.len()); + let mut compressed = Vec::new(); + for value in &self.values { + compressed.clear(); + table.compress(value.data(), &mut compressed); + lengths.push(compressed.len() as i32); + payload.extend_from_slice(&compressed); + } + + // Section 2: length array, Delta-Binary-Packed. + let mut length_encoder = DeltaBitPackEncoder::::new(); + length_encoder.put(&lengths)?; + let length_bytes = length_encoder.flush_buffer()?; + + // Section 3: symbol table. + let mut symbol_bytes = Vec::with_capacity(table.serialized_size()); + table.serialize(&mut symbol_bytes); + + // Section 1 (header) + sections 2-4, contiguous. + let mut out = + Vec::with_capacity(8 + length_bytes.len() + symbol_bytes.len() + payload.len()); + write_uleb128(&mut out, FSST_LENGTH_ENCODING_DELTA as u64); + write_uleb128(&mut out, length_bytes.len() as u64); + write_uleb128(&mut out, symbol_bytes.len() as u64); + out.extend_from_slice(&length_bytes); + out.extend_from_slice(&symbol_bytes); + out.extend_from_slice(&payload); + + self.values.clear(); + self.buffered_bytes = 0; + Ok(out.into()) + } +} diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index eeabcf4ba5ce..c46c67d18af8 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -30,9 +30,11 @@ use crate::util::bit_util::{BitWriter, num_required_bits}; use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder}; use bytes::Bytes; pub use dict_encoder::DictEncoder; +use fsst_encoder::FsstEncoder; mod byte_stream_split_encoder; mod dict_encoder; +pub(crate) mod fsst_encoder; // ---------------------------------------------------------------------- // Encoders @@ -101,6 +103,7 @@ pub fn get_encoder( )), _ => Box::new(ByteStreamSplitEncoder::new()), }, + Encoding::FSST => Box::new(FsstEncoder::new()), e => return Err(nyi_err!("Encoding {} is not supported", e)), }; Ok(encoder) diff --git a/parquet/src/encodings/fsst.rs b/parquet/src/encodings/fsst.rs new file mode 100644 index 000000000000..240923c87d8f --- /dev/null +++ b/parquet/src/encodings/fsst.rs @@ -0,0 +1,443 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! FSST (Fast Static Symbol Table) string compression. +//! +//! FSST compresses short strings by replacing frequently occurring substrings +//! (up to [`FSST_MAX_SYMBOL_LEN`] bytes) with single-byte codes drawn from a +//! small static symbol table. Because the table is static for a block of +//! values, individual compressed strings can be decompressed independently, +//! which supports random access into the encoded data. +//! +//! A compressed stream is a sequence of 1-byte codes. A code in `0..n_symbols` +//! expands to the symbol with that index; the reserved code [`FSST_ESCAPE`] +//! (`255`) indicates that the following byte is an uncompressed literal. The +//! symbol table is therefore limited to [`FSST_MAX_SYMBOLS`] entries. +//! +//! The symbol table is built by [`SymbolTable::train`], which implements the +//! generational construction from the [FSST paper] §4.3: starting from an empty +//! table, it repeatedly compresses a sample, counts how often each symbol and +//! each pair of adjacent symbols occurs, and rebuilds the table from the +//! highest-gain symbols (and concatenations of adjacent pairs). +//! +//! [FSST paper]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf + +use std::collections::HashMap; + +use crate::errors::{ParquetError, Result}; + +/// Escape code: the byte immediately following it is an uncompressed literal. +pub(crate) const FSST_ESCAPE: u8 = 255; + +/// Maximum number of symbols in a table. Codes `0..=254` address symbols; +/// code `255` ([`FSST_ESCAPE`]) is reserved for escaped literals. +pub(crate) const FSST_MAX_SYMBOLS: usize = 255; + +/// Maximum length, in bytes, of a single symbol. +pub(crate) const FSST_MAX_SYMBOL_LEN: usize = 8; + +/// FSST symbol-table format version, stored in the upper 32 bits of the leading +/// `u64` of a serialized symbol table. +const FSST_VERSION: u64 = 20190218; + +/// Fixed-size prelude of a serialized symbol table: version (`u64`) + +/// zero-terminated flag (`u8`) + length histogram (`[u8; 8]`). +const FSST_SYMBOL_TABLE_PRELUDE_LEN: usize = 8 + 1 + 8; + +/// Identifier written in the page header for the encoding used by the length +/// array. Matches `Encoding::DELTA_BINARY_PACKED`'s discriminant. +pub(crate) const FSST_LENGTH_ENCODING_DELTA: u8 = 5; + +/// Number of passes used to grow the symbol table during training. +const TRAINING_GENERATIONS: usize = 5; + +/// Cap on the number of sample bytes inspected per training generation, to keep +/// training time bounded on large pages. +const TRAINING_SAMPLE_BUDGET: usize = 1 << 20; + +/// Pack up to 8 bytes into a `u64` key (little-endian, zero-padded). +fn pack(bytes: &[u8]) -> u64 { + debug_assert!(bytes.len() <= 8); + let mut buf = [0u8; 8]; + buf[..bytes.len()].copy_from_slice(bytes); + u64::from_le_bytes(buf) +} + +/// Append `value` to `out` as unsigned LEB128. +pub(crate) fn write_uleb128(out: &mut Vec, mut value: u64) { + loop { + let byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + out.push(byte | 0x80); + } else { + out.push(byte); + return; + } + } +} + +/// Read an unsigned LEB128 value from `data` starting at `*pos`, advancing +/// `*pos` past the consumed bytes. +pub(crate) fn read_uleb128(data: &[u8], pos: &mut usize) -> Result { + let mut result: u64 = 0; + let mut shift = 0; + loop { + let byte = *data + .get(*pos) + .ok_or_else(|| general_err!("FSST: truncated ULEB128 value"))?; + *pos += 1; + result |= ((byte & 0x7f) as u64) << shift; + if byte & 0x80 == 0 { + return Ok(result); + } + shift += 7; + if shift >= 64 { + return Err(general_err!("FSST: ULEB128 value overflows u64")); + } + } +} + +/// A static symbol table mapping 1-byte codes to byte strings. +#[derive(Debug, Clone)] +pub(crate) struct SymbolTable { + /// `symbols[code]` is the byte string that `code` expands to on decode. + symbols: Vec>, + /// Lookup acceleration for compression: `lookup[len]` maps the packed bytes + /// of every symbol of that length to its code. Index `0` is unused. + lookup: Vec>, +} + +impl Default for SymbolTable { + fn default() -> Self { + Self::with_symbols(Vec::new()) + } +} + +impl SymbolTable { + /// Build a table from an explicit symbol list, computing the compression + /// lookup index. The list must contain at most [`FSST_MAX_SYMBOLS`] symbols, + /// each at most [`FSST_MAX_SYMBOL_LEN`] bytes long. + fn with_symbols(symbols: Vec>) -> Self { + debug_assert!(symbols.len() <= FSST_MAX_SYMBOLS); + let mut lookup: Vec> = vec![HashMap::new(); FSST_MAX_SYMBOL_LEN + 1]; + for (idx, symbol) in symbols.iter().enumerate() { + debug_assert!((1..=FSST_MAX_SYMBOL_LEN).contains(&symbol.len())); + lookup[symbol.len()].insert(pack(symbol), idx as u8); + } + Self { symbols, lookup } + } + + /// Train a symbol table over a set of sample values. + /// + /// Implements the generational algorithm from the [FSST paper] §4.3. + /// + /// [FSST paper]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf + pub(crate) fn train<'a>(samples: impl IntoIterator) -> Self { + let lines: Vec<&[u8]> = samples + .into_iter() + .filter(|line| !line.is_empty()) + .collect(); + if lines.is_empty() { + return Self::default(); + } + + let mut symbols: Vec> = Vec::new(); + for _ in 0..TRAINING_GENERATIONS { + let table = Self::with_symbols(symbols.clone()); + + // Count single-symbol and adjacent-symbol-pair occurrences over the + // sample, compressed with the current table. + let mut count1: HashMap = HashMap::new(); + let mut count2: HashMap<(u16, u16), u64> = HashMap::new(); + let mut budget = TRAINING_SAMPLE_BUDGET; + for line in &lines { + if budget == 0 { + break; + } + budget = budget.saturating_sub(line.len()); + + let mut prev: Option = None; + let mut i = 0; + while i < line.len() { + let (code, len) = table.longest_code(&line[i..]); + *count1.entry(code).or_default() += 1; + if let Some(prev) = prev { + *count2.entry((prev, code)).or_default() += 1; + } + prev = Some(code); + i += len; + } + } + + symbols = Self::select_symbols(&symbols, &count1, &count2); + } + + // The serialized symbol table groups symbols by length, so a code is + // implicitly its position in length order. Sort here (stably) so the + // codes used for compression match what `deserialize` reconstructs. + symbols.sort_by_key(|s| s.len()); + Self::with_symbols(symbols) + } + + /// Build the next generation of symbols by ranking candidates by gain. + /// + /// Candidates are every symbol seen during counting (gain = frequency × + /// length, with an 8× boost for single bytes, per the paper's heuristic) and + /// every concatenation of an adjacent symbol pair that fits in + /// [`FSST_MAX_SYMBOL_LEN`] bytes (gain = pair frequency × concatenated + /// length). The [`FSST_MAX_SYMBOLS`] highest-gain candidates are kept. + fn select_symbols( + current: &[Vec], + count1: &HashMap, + count2: &HashMap<(u16, u16), u64>, + ) -> Vec> { + // A `code` is the byte value `0..256` (a literal) or `256 + symbol_index`. + let code_bytes = |code: u16| -> Vec { + if (code as usize) < 256 { + vec![code as u8] + } else { + current[code as usize - 256].clone() + } + }; + + let mut candidates: HashMap, u64> = HashMap::new(); + for (&code, &freq) in count1 { + let bytes = code_bytes(code); + let mut gain = freq * bytes.len() as u64; + if bytes.len() == 1 { + gain *= 8; + } + *candidates.entry(bytes).or_default() += gain; + } + for (&(prev, code), &freq) in count2 { + let mut bytes = code_bytes(prev); + bytes.extend_from_slice(&code_bytes(code)); + if bytes.len() <= FSST_MAX_SYMBOL_LEN { + let gain = freq * bytes.len() as u64; + *candidates.entry(bytes).or_default() += gain; + } + } + + let mut ranked: Vec<(Vec, u64)> = candidates.into_iter().collect(); + // Highest gain first; break ties toward longer symbols for determinism. + ranked.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| b.0.len().cmp(&a.0.len()))); + ranked.truncate(FSST_MAX_SYMBOLS); + ranked.into_iter().map(|(bytes, _)| bytes).collect() + } + + /// Find the longest symbol that is a prefix of `input`, returning its code + /// (`< 256` for a single-byte literal, `256 + index` for a table symbol) and + /// the number of input bytes it consumes. Used during training. + fn longest_code(&self, input: &[u8]) -> (u16, usize) { + match self.longest_symbol(input) { + Some((idx, len)) => (256 + idx as u16, len), + None => (input[0] as u16, 1), + } + } + + /// Find the longest table symbol that is a prefix of `input`, returning its + /// index and length, or `None` if no symbol matches. + fn longest_symbol(&self, input: &[u8]) -> Option<(u8, usize)> { + let max = input.len().min(FSST_MAX_SYMBOL_LEN); + for len in (1..=max).rev() { + if let Some(&idx) = self.lookup[len].get(&pack(&input[..len])) { + return Some((idx, len)); + } + } + None + } + + /// Compress `input`, appending the encoded bytes to `out`. + pub(crate) fn compress(&self, input: &[u8], out: &mut Vec) { + let mut i = 0; + while i < input.len() { + match self.longest_symbol(&input[i..]) { + Some((idx, len)) => { + out.push(idx); + i += len; + } + None => { + out.push(FSST_ESCAPE); + out.push(input[i]); + i += 1; + } + } + } + } + + /// Decompress `input`, appending the decoded bytes to `out`. + pub(crate) fn decompress(&self, input: &[u8], out: &mut Vec) -> Result<()> { + let mut i = 0; + while i < input.len() { + let code = input[i]; + i += 1; + if code == FSST_ESCAPE { + let lit = *input + .get(i) + .ok_or_else(|| general_err!("FSST: dangling escape at end of stream"))?; + out.push(lit); + i += 1; + } else { + let symbol = self + .symbols + .get(code as usize) + .ok_or_else(|| general_err!("FSST: code {} not in symbol table", code))?; + out.extend_from_slice(symbol); + } + } + Ok(()) + } + + /// Number of bytes [`serialize`](Self::serialize) will append, so callers + /// can pre-size their output buffer and record the symbol-table size. + pub(crate) fn serialized_size(&self) -> usize { + FSST_SYMBOL_TABLE_PRELUDE_LEN + self.symbols.iter().map(|s| s.len()).sum::() + } + + /// Serialize the table in the FSST symbol-table format: + /// + /// 1. version (`u64` little-endian; the version number occupies the upper 32 bits), + /// 2. zero-terminated flag (`u8`; always `0` here), + /// 3. length histogram (`[u8; 8]`; `histogram[len - 1]` is the number of + /// symbols of that length), then + /// 4. the symbol bytes, grouped by ascending length. + /// + /// Relies on `self.symbols` being sorted by length (see [`train`](Self::train)). + pub(crate) fn serialize(&self, out: &mut Vec) { + debug_assert!(self.symbols.len() <= FSST_MAX_SYMBOLS); + + out.extend_from_slice(&(FSST_VERSION << 32).to_le_bytes()); + out.push(0); // not zero-terminated + + let mut histogram = [0u8; 8]; + for symbol in &self.symbols { + debug_assert!((1..=FSST_MAX_SYMBOL_LEN).contains(&symbol.len())); + histogram[symbol.len() - 1] += 1; + } + out.extend_from_slice(&histogram); + + for symbol in &self.symbols { + out.extend_from_slice(symbol); + } + } + + /// Deserialize a table written by [`serialize`](Self::serialize), returning + /// the table and the number of bytes consumed. + pub(crate) fn deserialize(data: &[u8]) -> Result<(Self, usize)> { + let prelude = data + .get(..FSST_SYMBOL_TABLE_PRELUDE_LEN) + .ok_or_else(|| general_err!("FSST: truncated symbol-table prelude"))?; + // Bytes 0..8 are the version and 8 the zero-terminated flag; neither + // affects decoding of the symbols below. + let histogram = &prelude[9..17]; + + let mut symbols = Vec::new(); + let mut i = FSST_SYMBOL_TABLE_PRELUDE_LEN; + for (idx, &count) in histogram.iter().enumerate() { + let len = idx + 1; + for _ in 0..count { + let end = i + len; + let symbol = data + .get(i..end) + .ok_or_else(|| general_err!("FSST: truncated symbol data"))? + .to_vec(); + symbols.push(symbol); + i = end; + } + } + Ok((Self::with_symbols(symbols), i)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn roundtrip(table: &SymbolTable, input: &[u8]) -> Vec { + let mut compressed = Vec::new(); + table.compress(input, &mut compressed); + let mut decompressed = Vec::new(); + table.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, input, "roundtrip mismatch"); + compressed + } + + #[test] + fn empty_table_roundtrips_everything() { + let table = SymbolTable::train(std::iter::empty()); + let inputs: &[&[u8]] = &[b"", b"hello", b"parquet", &[0u8, 255, 1, 254]]; + for input in inputs { + roundtrip(&table, input); + } + } + + #[test] + fn trained_table_compresses_repetitive_data() { + let corpus: Vec<&[u8]> = (0..256).map(|_| b"http://example.com/".as_slice()).collect(); + let table = SymbolTable::train(corpus.iter().copied()); + + let input = b"http://example.com/path"; + let compressed = roundtrip(&table, input); + assert!( + compressed.len() < input.len(), + "expected compression, got {} >= {}", + compressed.len(), + input.len() + ); + } + + #[test] + fn trained_table_roundtrips_unseen_bytes() { + // Bytes never seen in training must still round-trip via escapes. + let table = SymbolTable::train([b"aaaaaaaa".as_slice(), b"bbbbbbbb".as_slice()]); + roundtrip(&table, &[0u8, 1, 2, 250, 255]); + roundtrip(&table, b"aaaabbbb\x00\xff"); + } + + #[test] + fn serialize_roundtrip_preserves_decoding() { + let table = SymbolTable::train([b"the quick brown fox".as_slice()]); + let input = b"the fox"; + let mut compressed = Vec::new(); + table.compress(input, &mut compressed); + + let mut header = Vec::new(); + table.serialize(&mut header); + let (restored, consumed) = SymbolTable::deserialize(&header).unwrap(); + assert_eq!(consumed, header.len()); + + let mut decompressed = Vec::new(); + restored.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, input); + } + + #[test] + fn serialized_size_matches_serialized_len() { + let table = SymbolTable::train([b"the quick brown fox".as_slice()]); + let mut buf = Vec::new(); + table.serialize(&mut buf); + assert_eq!(buf.len(), table.serialized_size()); + } + + #[test] + fn decompress_rejects_dangling_escape() { + let table = SymbolTable::default(); + let mut out = Vec::new(); + assert!(table.decompress(&[FSST_ESCAPE], &mut out).is_err()); + } +} diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs index 894c4fb961ee..247d221e6afa 100644 --- a/parquet/src/encodings/mod.rs +++ b/parquet/src/encodings/mod.rs @@ -17,5 +17,6 @@ pub mod decoding; pub mod encoding; +pub(crate) mod fsst; pub mod levels; experimental!(pub(crate) mod rle);