From 11423dcdeff8c84aa1ed3cf2dc5f14f9d006a917 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Tue, 16 Jun 2026 19:25:24 -0500 Subject: [PATCH 1/3] feat: [POC] Add FSST encoder/decoder This commit is a proof of concept to add FSST encoding to parquet within arrow-rs. This PR is heavily inspired and pulls in a bunch of methods used in: https://github.com/apache/arrow-rs/pull/9372 --- parquet/src/basic.rs | 8 + parquet/src/encodings/decoding.rs | 3 + .../src/encodings/decoding/fsst_decoder.rs | 181 +++++++++ .../src/encodings/encoding/fsst_encoder.rs | 121 ++++++ parquet/src/encodings/encoding/mod.rs | 3 + parquet/src/encodings/fsst.rs | 363 ++++++++++++++++++ parquet/src/encodings/mod.rs | 1 + 7 files changed, 680 insertions(+) create mode 100644 parquet/src/encodings/decoding/fsst_decoder.rs create mode 100644 parquet/src/encodings/encoding/fsst_encoder.rs create mode 100644 parquet/src/encodings/fsst.rs diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index b4f18f311723..5b8555138dc7 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -449,6 +449,12 @@ enum Encoding { /// afterwards. Note that the use of this encoding with FIXED_LEN_BYTE_ARRAY(N) data may /// perform poorly for large values of N. BYTE_STREAM_SPLIT = 9; + /// Compressed string encoding using a Fast Static Symbol Table (FSST). + /// + /// Frequently occurring substrings (up to 8 bytes) are replaced with + /// single-byte codes drawn from a per-page symbol table, enabling random + /// access to individual compressed values. Applies to BYTE_ARRAY data. + FSST = 10; } ); @@ -469,6 +475,7 @@ impl FromStr for Encoding { "DELTA_BYTE_ARRAY" | "delta_byte_array" => Ok(Encoding::DELTA_BYTE_ARRAY), "RLE_DICTIONARY" | "rle_dictionary" => Ok(Encoding::RLE_DICTIONARY), "BYTE_STREAM_SPLIT" | "byte_stream_split" => Ok(Encoding::BYTE_STREAM_SPLIT), + "FSST" | "fsst" => Ok(Encoding::FSST), _ => Err(general_err!("unknown encoding: {}", s)), } } @@ -608,6 +615,7 @@ fn i32_to_encoding(val: i32) -> Encoding { 7 => Encoding::DELTA_BYTE_ARRAY, 8 => Encoding::RLE_DICTIONARY, 9 => Encoding::BYTE_STREAM_SPLIT, + 10 => Encoding::FSST, _ => panic!("Impossible encoding {val}"), } } diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 0fb960412295..3bdfa5b7b9df 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -29,11 +29,13 @@ use crate::data_type::*; use crate::encodings::decoding::byte_stream_split_decoder::{ ByteStreamSplitDecoder, VariableWidthByteStreamSplitDecoder, }; +use crate::encodings::decoding::fsst_decoder::FsstDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{self, BitReader, FromBitpacked}; mod byte_stream_split_decoder; +pub(crate) mod fsst_decoder; pub(crate) mod private { use super::*; @@ -142,6 +144,7 @@ pub(crate) mod private { Encoding::DELTA_LENGTH_BYTE_ARRAY => { Ok(Box::new(DeltaLengthByteArrayDecoder::new())) } + Encoding::FSST => Ok(Box::new(FsstDecoder::new())), _ => get_decoder_default(descr, encoding), } } diff --git a/parquet/src/encodings/decoding/fsst_decoder.rs b/parquet/src/encodings/decoding/fsst_decoder.rs new file mode 100644 index 000000000000..2e8c707077c3 --- /dev/null +++ b/parquet/src/encodings/decoding/fsst_decoder.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; + +use bytes::Bytes; + +use crate::basic::{Encoding, Type}; +use crate::data_type::DataType; +use crate::data_type::private::ParquetValueType; +use crate::encodings::decoding::Decoder; +use crate::encodings::fsst::SymbolTable; +use crate::errors::{ParquetError, Result}; + +/// Decoder for the [`FSST`](Encoding::FSST) encoding. +/// +/// See [`FsstEncoder`](crate::encodings::encoding::fsst_encoder::FsstEncoder) +/// for the page layout. Only [`Type::BYTE_ARRAY`] is supported. +pub struct FsstDecoder { + /// Symbol table parsed from the page header. + table: SymbolTable, + /// Compressed payload following the symbol-table header. + data: Bytes, + /// Offset into `data` pointing at the next value's length prefix. + offset: usize, + /// Number of values still to be produced. + num_values: usize, + _phantom: PhantomData, +} + +impl Default for FsstDecoder { + fn default() -> Self { + Self::new() + } +} + +impl FsstDecoder { + /// Creates a new FSST decoder. + pub fn new() -> Self { + Self { + table: SymbolTable::default(), + data: Bytes::new(), + offset: 0, + num_values: 0, + _phantom: PhantomData, + } + } + + /// Reads the little-endian `u32` length prefix at the current offset and + /// advances past it, returning the length of the next compressed value. + fn read_len(&mut self) -> Result { + let end = self.offset + 4; + let bytes = self + .data + .get(self.offset..end) + .ok_or_else(|| general_err!("FSST: truncated length prefix"))?; + let len = u32::from_le_bytes(bytes.try_into().unwrap()) as usize; + self.offset = end; + Ok(len) + } +} + +impl Decoder for FsstDecoder { + fn set_data(&mut self, data: Bytes, num_values: usize) -> Result<()> { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstDecoder only supports ByteArrayType")); + } + let (table, consumed) = SymbolTable::deserialize(&data)?; + self.table = table; + self.data = data.slice(consumed..); + self.offset = 0; + self.num_values = num_values; + Ok(()) + } + + fn get(&mut self, buffer: &mut [T::T]) -> Result { + let to_read = buffer.len().min(self.num_values); + let mut decompressed = Vec::new(); + for item in buffer.iter_mut().take(to_read) { + let len = self.read_len()?; + let end = self.offset + len; + let compressed = self + .data + .get(self.offset..end) + .ok_or_else(|| general_err!("FSST: truncated compressed value"))?; + decompressed.clear(); + self.table.decompress(compressed, &mut decompressed)?; + self.offset = end; + item.set_from_bytes(Bytes::copy_from_slice(&decompressed)); + } + self.num_values -= to_read; + Ok(to_read) + } + + fn values_left(&self) -> usize { + self.num_values + } + + #[cold] + fn encoding(&self) -> Encoding { + Encoding::FSST + } + + fn skip(&mut self, num_values: usize) -> Result { + let to_skip = num_values.min(self.num_values); + for _ in 0..to_skip { + let len = self.read_len()?; + self.offset += len; + } + self.num_values -= to_skip; + Ok(to_skip) + } +} + +#[cfg(test)] +mod tests { + use crate::basic::Encoding; + use crate::data_type::{ByteArray, ByteArrayType}; + use crate::encodings::decoding::Decoder; + use crate::encodings::encoding::Encoder; + use crate::encodings::encoding::fsst_encoder::FsstEncoder; + + use super::FsstDecoder; + + #[test] + fn encode_decode_roundtrip() { + let values: Vec = vec![ + ByteArray::from("hello"), + ByteArray::from("parquet"), + ByteArray::from(""), + ByteArray::from("fsst"), + ]; + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + assert_eq!(encoder.encoding(), Encoding::FSST); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + assert_eq!(decoder.values_left(), values.len()); + + let mut out = vec![ByteArray::default(); values.len()]; + let read = decoder.get(&mut out).unwrap(); + assert_eq!(read, values.len()); + assert_eq!(out, values); + assert_eq!(decoder.values_left(), 0); + } + + #[test] + fn skip_values() { + let values: Vec = + vec![ByteArray::from("a"), ByteArray::from("bb"), ByteArray::from("ccc")]; + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + + assert_eq!(decoder.skip(2).unwrap(), 2); + let mut out = vec![ByteArray::default(); 1]; + assert_eq!(decoder.get(&mut out).unwrap(), 1); + assert_eq!(out[0], values[2]); + } +} diff --git a/parquet/src/encodings/encoding/fsst_encoder.rs b/parquet/src/encodings/encoding/fsst_encoder.rs new file mode 100644 index 000000000000..b51aebca01c1 --- /dev/null +++ b/parquet/src/encodings/encoding/fsst_encoder.rs @@ -0,0 +1,121 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::marker::PhantomData; + +use bytes::Bytes; + +use crate::basic::{Encoding, Type}; +use crate::data_type::private::ParquetValueType; +use crate::data_type::{ByteArray, DataType}; +use crate::encodings::encoding::Encoder; +use crate::encodings::fsst::SymbolTable; +use crate::errors::{ParquetError, Result}; + +/// Encoder for the [`FSST`](Encoding::FSST) encoding. +/// +/// Values are buffered until [`flush_buffer`](Encoder::flush_buffer), at which +/// point a [`SymbolTable`] is trained over them and each value is compressed. +/// +/// The flushed page layout is: +/// 1. the serialized [`SymbolTable`] header, followed by +/// 2. for each value: a little-endian `u32` compressed length, then the +/// compressed bytes. +/// +/// Only [`Type::BYTE_ARRAY`] is supported. +pub struct FsstEncoder { + /// Raw values buffered until the symbol table can be trained at flush time. + values: Vec, + /// Running total of buffered raw bytes, for O(1) size estimates. + buffered_bytes: usize, + _phantom: PhantomData, +} + +impl Default for FsstEncoder { + fn default() -> Self { + Self::new() + } +} + +impl FsstEncoder { + /// Creates a new FSST encoder. + pub fn new() -> Self { + Self { + values: Vec::new(), + buffered_bytes: 0, + _phantom: PhantomData, + } + } +} + +impl Encoder for FsstEncoder { + fn put(&mut self, values: &[T::T]) -> Result<()> { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstEncoder only supports ByteArrayType")); + } + self.values.reserve(values.len()); + for value in values { + let byte_array = value + .as_any() + .downcast_ref::() + .ok_or_else(|| general_err!("FsstEncoder only supports ByteArrayType"))?; + self.buffered_bytes += byte_array.len(); + self.values.push(byte_array.clone()); + } + Ok(()) + } + + #[cold] + fn encoding(&self) -> Encoding { + Encoding::FSST + } + + fn estimated_data_encoded_size(&self) -> usize { + // Loose estimate: the raw buffered bytes. The trained codec typically + // compresses below this, but the symbol table is only built at flush. + self.buffered_bytes + } + + fn estimated_memory_size(&self) -> usize { + self.buffered_bytes + + self.values.len() * std::mem::size_of::() + + std::mem::size_of::() + } + + fn flush_buffer(&mut self) -> Result { + if T::get_physical_type() != Type::BYTE_ARRAY { + return Err(general_err!("FsstEncoder only supports ByteArrayType")); + } + + let table = SymbolTable::train(self.values.iter().map(|v| v.data())); + + let mut out = Vec::with_capacity(self.buffered_bytes + self.values.len() * 4 + 8); + table.serialize(&mut out); + + let mut compressed = Vec::new(); + for value in &self.values { + compressed.clear(); + table.compress(value.data(), &mut compressed); + out.extend_from_slice(&(compressed.len() as u32).to_le_bytes()); + out.extend_from_slice(&compressed); + } + + self.values.clear(); + self.buffered_bytes = 0; + Ok(out.into()) + } +} diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index eeabcf4ba5ce..c46c67d18af8 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -30,9 +30,11 @@ use crate::util::bit_util::{BitWriter, num_required_bits}; use byte_stream_split_encoder::{ByteStreamSplitEncoder, VariableWidthByteStreamSplitEncoder}; use bytes::Bytes; pub use dict_encoder::DictEncoder; +use fsst_encoder::FsstEncoder; mod byte_stream_split_encoder; mod dict_encoder; +pub(crate) mod fsst_encoder; // ---------------------------------------------------------------------- // Encoders @@ -101,6 +103,7 @@ pub fn get_encoder( )), _ => Box::new(ByteStreamSplitEncoder::new()), }, + Encoding::FSST => Box::new(FsstEncoder::new()), e => return Err(nyi_err!("Encoding {} is not supported", e)), }; Ok(encoder) diff --git a/parquet/src/encodings/fsst.rs b/parquet/src/encodings/fsst.rs new file mode 100644 index 000000000000..c5f7b656d727 --- /dev/null +++ b/parquet/src/encodings/fsst.rs @@ -0,0 +1,363 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! FSST (Fast Static Symbol Table) string compression. +//! +//! FSST compresses short strings by replacing frequently occurring substrings +//! (up to [`FSST_MAX_SYMBOL_LEN`] bytes) with single-byte codes drawn from a +//! small static symbol table. Because the table is static for a block of +//! values, individual compressed strings can be decompressed independently, +//! which supports random access into the encoded data. +//! +//! A compressed stream is a sequence of 1-byte codes. A code in `0..n_symbols` +//! expands to the symbol with that index; the reserved code [`FSST_ESCAPE`] +//! (`255`) indicates that the following byte is an uncompressed literal. The +//! symbol table is therefore limited to [`FSST_MAX_SYMBOLS`] entries. +//! +//! The symbol table is built by [`SymbolTable::train`], which implements the +//! generational construction from the [FSST paper] §4.3: starting from an empty +//! table, it repeatedly compresses a sample, counts how often each symbol and +//! each pair of adjacent symbols occurs, and rebuilds the table from the +//! highest-gain symbols (and concatenations of adjacent pairs). +//! +//! [FSST paper]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf + +use std::collections::HashMap; + +use crate::errors::{ParquetError, Result}; + +/// Escape code: the byte immediately following it is an uncompressed literal. +pub(crate) const FSST_ESCAPE: u8 = 255; + +/// Maximum number of symbols in a table. Codes `0..=254` address symbols; +/// code `255` ([`FSST_ESCAPE`]) is reserved for escaped literals. +pub(crate) const FSST_MAX_SYMBOLS: usize = 255; + +/// Maximum length, in bytes, of a single symbol. +pub(crate) const FSST_MAX_SYMBOL_LEN: usize = 8; + +/// Number of passes used to grow the symbol table during training. +const TRAINING_GENERATIONS: usize = 5; + +/// Cap on the number of sample bytes inspected per training generation, to keep +/// training time bounded on large pages. +const TRAINING_SAMPLE_BUDGET: usize = 1 << 20; + +/// Pack up to 8 bytes into a `u64` key (little-endian, zero-padded). +fn pack(bytes: &[u8]) -> u64 { + debug_assert!(bytes.len() <= 8); + let mut buf = [0u8; 8]; + buf[..bytes.len()].copy_from_slice(bytes); + u64::from_le_bytes(buf) +} + +/// A static symbol table mapping 1-byte codes to byte strings. +#[derive(Debug, Clone)] +pub(crate) struct SymbolTable { + /// `symbols[code]` is the byte string that `code` expands to on decode. + symbols: Vec>, + /// Lookup acceleration for compression: `lookup[len]` maps the packed bytes + /// of every symbol of that length to its code. Index `0` is unused. + lookup: Vec>, +} + +impl Default for SymbolTable { + fn default() -> Self { + Self::with_symbols(Vec::new()) + } +} + +impl SymbolTable { + /// Build a table from an explicit symbol list, computing the compression + /// lookup index. The list must contain at most [`FSST_MAX_SYMBOLS`] symbols, + /// each at most [`FSST_MAX_SYMBOL_LEN`] bytes long. + fn with_symbols(symbols: Vec>) -> Self { + debug_assert!(symbols.len() <= FSST_MAX_SYMBOLS); + let mut lookup: Vec> = vec![HashMap::new(); FSST_MAX_SYMBOL_LEN + 1]; + for (idx, symbol) in symbols.iter().enumerate() { + debug_assert!((1..=FSST_MAX_SYMBOL_LEN).contains(&symbol.len())); + lookup[symbol.len()].insert(pack(symbol), idx as u8); + } + Self { symbols, lookup } + } + + /// Train a symbol table over a set of sample values. + /// + /// Implements the generational algorithm from the [FSST paper] §4.3. + /// + /// [FSST paper]: https://www.vldb.org/pvldb/vol13/p2649-boncz.pdf + pub(crate) fn train<'a>(samples: impl IntoIterator) -> Self { + let lines: Vec<&[u8]> = samples + .into_iter() + .filter(|line| !line.is_empty()) + .collect(); + if lines.is_empty() { + return Self::default(); + } + + let mut symbols: Vec> = Vec::new(); + for _ in 0..TRAINING_GENERATIONS { + let table = Self::with_symbols(symbols.clone()); + + // Count single-symbol and adjacent-symbol-pair occurrences over the + // sample, compressed with the current table. + let mut count1: HashMap = HashMap::new(); + let mut count2: HashMap<(u16, u16), u64> = HashMap::new(); + let mut budget = TRAINING_SAMPLE_BUDGET; + for line in &lines { + if budget == 0 { + break; + } + budget = budget.saturating_sub(line.len()); + + let mut prev: Option = None; + let mut i = 0; + while i < line.len() { + let (code, len) = table.longest_code(&line[i..]); + *count1.entry(code).or_default() += 1; + if let Some(prev) = prev { + *count2.entry((prev, code)).or_default() += 1; + } + prev = Some(code); + i += len; + } + } + + symbols = Self::select_symbols(&symbols, &count1, &count2); + } + + Self::with_symbols(symbols) + } + + /// Build the next generation of symbols by ranking candidates by gain. + /// + /// Candidates are every symbol seen during counting (gain = frequency × + /// length, with an 8× boost for single bytes, per the paper's heuristic) and + /// every concatenation of an adjacent symbol pair that fits in + /// [`FSST_MAX_SYMBOL_LEN`] bytes (gain = pair frequency × concatenated + /// length). The [`FSST_MAX_SYMBOLS`] highest-gain candidates are kept. + fn select_symbols( + current: &[Vec], + count1: &HashMap, + count2: &HashMap<(u16, u16), u64>, + ) -> Vec> { + // A `code` is the byte value `0..256` (a literal) or `256 + symbol_index`. + let code_bytes = |code: u16| -> Vec { + if (code as usize) < 256 { + vec![code as u8] + } else { + current[code as usize - 256].clone() + } + }; + + let mut candidates: HashMap, u64> = HashMap::new(); + for (&code, &freq) in count1 { + let bytes = code_bytes(code); + let mut gain = freq * bytes.len() as u64; + if bytes.len() == 1 { + gain *= 8; + } + *candidates.entry(bytes).or_default() += gain; + } + for (&(prev, code), &freq) in count2 { + let mut bytes = code_bytes(prev); + bytes.extend_from_slice(&code_bytes(code)); + if bytes.len() <= FSST_MAX_SYMBOL_LEN { + let gain = freq * bytes.len() as u64; + *candidates.entry(bytes).or_default() += gain; + } + } + + let mut ranked: Vec<(Vec, u64)> = candidates.into_iter().collect(); + // Highest gain first; break ties toward longer symbols for determinism. + ranked.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| b.0.len().cmp(&a.0.len()))); + ranked.truncate(FSST_MAX_SYMBOLS); + ranked.into_iter().map(|(bytes, _)| bytes).collect() + } + + /// Find the longest symbol that is a prefix of `input`, returning its code + /// (`< 256` for a single-byte literal, `256 + index` for a table symbol) and + /// the number of input bytes it consumes. Used during training. + fn longest_code(&self, input: &[u8]) -> (u16, usize) { + match self.longest_symbol(input) { + Some((idx, len)) => (256 + idx as u16, len), + None => (input[0] as u16, 1), + } + } + + /// Find the longest table symbol that is a prefix of `input`, returning its + /// index and length, or `None` if no symbol matches. + fn longest_symbol(&self, input: &[u8]) -> Option<(u8, usize)> { + let max = input.len().min(FSST_MAX_SYMBOL_LEN); + for len in (1..=max).rev() { + if let Some(&idx) = self.lookup[len].get(&pack(&input[..len])) { + return Some((idx, len)); + } + } + None + } + + /// Compress `input`, appending the encoded bytes to `out`. + pub(crate) fn compress(&self, input: &[u8], out: &mut Vec) { + let mut i = 0; + while i < input.len() { + match self.longest_symbol(&input[i..]) { + Some((idx, len)) => { + out.push(idx); + i += len; + } + None => { + out.push(FSST_ESCAPE); + out.push(input[i]); + i += 1; + } + } + } + } + + /// Decompress `input`, appending the decoded bytes to `out`. + pub(crate) fn decompress(&self, input: &[u8], out: &mut Vec) -> Result<()> { + let mut i = 0; + while i < input.len() { + let code = input[i]; + i += 1; + if code == FSST_ESCAPE { + let lit = *input + .get(i) + .ok_or_else(|| general_err!("FSST: dangling escape at end of stream"))?; + out.push(lit); + i += 1; + } else { + let symbol = self + .symbols + .get(code as usize) + .ok_or_else(|| general_err!("FSST: code {} not in symbol table", code))?; + out.extend_from_slice(symbol); + } + } + Ok(()) + } + + /// Serialize the table as a header that precedes the compressed data. + /// + /// Layout: `num_symbols: u8`, then for each symbol `len: u8` followed by + /// `len` bytes. + pub(crate) fn serialize(&self, out: &mut Vec) { + debug_assert!(self.symbols.len() <= FSST_MAX_SYMBOLS); + out.push(self.symbols.len() as u8); + for symbol in &self.symbols { + debug_assert!(symbol.len() <= FSST_MAX_SYMBOL_LEN); + out.push(symbol.len() as u8); + out.extend_from_slice(symbol); + } + } + + /// Deserialize a table from the front of `data`, returning the table and the + /// number of bytes consumed. + pub(crate) fn deserialize(data: &[u8]) -> Result<(Self, usize)> { + let mut i = 0; + let count = *data + .get(i) + .ok_or_else(|| general_err!("FSST: missing symbol-table header"))? as usize; + i += 1; + let mut symbols = Vec::with_capacity(count); + for _ in 0..count { + let len = *data + .get(i) + .ok_or_else(|| general_err!("FSST: truncated symbol length"))? as usize; + i += 1; + let end = i + len; + let symbol = data + .get(i..end) + .ok_or_else(|| general_err!("FSST: truncated symbol data"))? + .to_vec(); + symbols.push(symbol); + i = end; + } + Ok((Self::with_symbols(symbols), i)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn roundtrip(table: &SymbolTable, input: &[u8]) -> Vec { + let mut compressed = Vec::new(); + table.compress(input, &mut compressed); + let mut decompressed = Vec::new(); + table.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, input, "roundtrip mismatch"); + compressed + } + + #[test] + fn empty_table_roundtrips_everything() { + let table = SymbolTable::train(std::iter::empty()); + let inputs: &[&[u8]] = &[b"", b"hello", b"parquet", &[0u8, 255, 1, 254]]; + for input in inputs { + roundtrip(&table, input); + } + } + + #[test] + fn trained_table_compresses_repetitive_data() { + let corpus: Vec<&[u8]> = (0..256).map(|_| b"http://example.com/".as_slice()).collect(); + let table = SymbolTable::train(corpus.iter().copied()); + + let input = b"http://example.com/path"; + let compressed = roundtrip(&table, input); + assert!( + compressed.len() < input.len(), + "expected compression, got {} >= {}", + compressed.len(), + input.len() + ); + } + + #[test] + fn trained_table_roundtrips_unseen_bytes() { + // Bytes never seen in training must still round-trip via escapes. + let table = SymbolTable::train([b"aaaaaaaa".as_slice(), b"bbbbbbbb".as_slice()]); + roundtrip(&table, &[0u8, 1, 2, 250, 255]); + roundtrip(&table, b"aaaabbbb\x00\xff"); + } + + #[test] + fn serialize_roundtrip_preserves_decoding() { + let table = SymbolTable::train([b"the quick brown fox".as_slice()]); + let input = b"the fox"; + let mut compressed = Vec::new(); + table.compress(input, &mut compressed); + + let mut header = Vec::new(); + table.serialize(&mut header); + let (restored, consumed) = SymbolTable::deserialize(&header).unwrap(); + assert_eq!(consumed, header.len()); + + let mut decompressed = Vec::new(); + restored.decompress(&compressed, &mut decompressed).unwrap(); + assert_eq!(decompressed, input); + } + + #[test] + fn decompress_rejects_dangling_escape() { + let table = SymbolTable::default(); + let mut out = Vec::new(); + assert!(table.decompress(&[FSST_ESCAPE], &mut out).is_err()); + } +} diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs index 894c4fb961ee..247d221e6afa 100644 --- a/parquet/src/encodings/mod.rs +++ b/parquet/src/encodings/mod.rs @@ -17,5 +17,6 @@ pub mod decoding; pub mod encoding; +pub(crate) mod fsst; pub mod levels; experimental!(pub(crate) mod rle); From 009fa4ef31c150614090e0269323d6385dbbbe0a Mon Sep 17 00:00:00 2001 From: devanbenz Date: Tue, 16 Jun 2026 19:33:26 -0500 Subject: [PATCH 2/3] feat: harden encoder work per ALP review Apply patterns from the ALP encoder work (apache/arrow-rs#9372 follow-up) - Rejects pages with more than i32::MAX values in flush_buffer - Add pre-size for flush_buffer - Replace magic length prefix with a constant --- .../src/encodings/decoding/fsst_decoder.rs | 4 ++-- .../src/encodings/encoding/fsst_encoder.rs | 17 ++++++++++++++-- parquet/src/encodings/fsst.rs | 20 ++++++++++++++++++- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/parquet/src/encodings/decoding/fsst_decoder.rs b/parquet/src/encodings/decoding/fsst_decoder.rs index 2e8c707077c3..1783a2d1741a 100644 --- a/parquet/src/encodings/decoding/fsst_decoder.rs +++ b/parquet/src/encodings/decoding/fsst_decoder.rs @@ -23,7 +23,7 @@ use crate::basic::{Encoding, Type}; use crate::data_type::DataType; use crate::data_type::private::ParquetValueType; use crate::encodings::decoding::Decoder; -use crate::encodings::fsst::SymbolTable; +use crate::encodings::fsst::{FSST_LENGTH_PREFIX_BYTES, SymbolTable}; use crate::errors::{ParquetError, Result}; /// Decoder for the [`FSST`](Encoding::FSST) encoding. @@ -63,7 +63,7 @@ impl FsstDecoder { /// Reads the little-endian `u32` length prefix at the current offset and /// advances past it, returning the length of the next compressed value. fn read_len(&mut self) -> Result { - let end = self.offset + 4; + let end = self.offset + FSST_LENGTH_PREFIX_BYTES; let bytes = self .data .get(self.offset..end) diff --git a/parquet/src/encodings/encoding/fsst_encoder.rs b/parquet/src/encodings/encoding/fsst_encoder.rs index b51aebca01c1..dac8ff7eb3fb 100644 --- a/parquet/src/encodings/encoding/fsst_encoder.rs +++ b/parquet/src/encodings/encoding/fsst_encoder.rs @@ -23,7 +23,7 @@ use crate::basic::{Encoding, Type}; use crate::data_type::private::ParquetValueType; use crate::data_type::{ByteArray, DataType}; use crate::encodings::encoding::Encoder; -use crate::encodings::fsst::SymbolTable; +use crate::encodings::fsst::{FSST_LENGTH_PREFIX_BYTES, SymbolTable}; use crate::errors::{ParquetError, Result}; /// Encoder for the [`FSST`](Encoding::FSST) encoding. @@ -100,10 +100,23 @@ impl Encoder for FsstEncoder { if T::get_physical_type() != Type::BYTE_ARRAY { return Err(general_err!("FsstEncoder only supports ByteArrayType")); } + // Parquet counts values per page with an i32, so the encoder cannot + // produce more than that in a single flush. + if self.values.len() > i32::MAX as usize { + return Err(general_err!( + "FSST can encode at most i32::MAX values, got {}", + self.values.len() + )); + } let table = SymbolTable::train(self.values.iter().map(|v| v.data())); - let mut out = Vec::with_capacity(self.buffered_bytes + self.values.len() * 4 + 8); + // Pre-size to the known header layout plus a per-value length prefix; the + // compressed payload usually fits within the buffered raw size. + let capacity = table.serialized_size() + + self.values.len() * FSST_LENGTH_PREFIX_BYTES + + self.buffered_bytes; + let mut out = Vec::with_capacity(capacity); table.serialize(&mut out); let mut compressed = Vec::new(); diff --git a/parquet/src/encodings/fsst.rs b/parquet/src/encodings/fsst.rs index c5f7b656d727..3d48e9e2d4de 100644 --- a/parquet/src/encodings/fsst.rs +++ b/parquet/src/encodings/fsst.rs @@ -50,6 +50,10 @@ pub(crate) const FSST_MAX_SYMBOLS: usize = 255; /// Maximum length, in bytes, of a single symbol. pub(crate) const FSST_MAX_SYMBOL_LEN: usize = 8; +/// Width, in bytes, of the little-endian length prefix written before each +/// compressed value in a flushed page. +pub(crate) const FSST_LENGTH_PREFIX_BYTES: usize = std::mem::size_of::(); + /// Number of passes used to grow the symbol table during training. const TRAINING_GENERATIONS: usize = 5; @@ -252,10 +256,16 @@ impl SymbolTable { Ok(()) } + /// Number of bytes [`serialize`](Self::serialize) will append, so callers + /// can pre-size their output buffer to the exact header layout. + pub(crate) fn serialized_size(&self) -> usize { + 1 + self.symbols.iter().map(|s| 1 + s.len()).sum::() + } + /// Serialize the table as a header that precedes the compressed data. /// /// Layout: `num_symbols: u8`, then for each symbol `len: u8` followed by - /// `len` bytes. + /// `len` bytes. See [`serialized_size`](Self::serialized_size). pub(crate) fn serialize(&self, out: &mut Vec) { debug_assert!(self.symbols.len() <= FSST_MAX_SYMBOLS); out.push(self.symbols.len() as u8); @@ -354,6 +364,14 @@ mod tests { assert_eq!(decompressed, input); } + #[test] + fn serialized_size_matches_serialized_len() { + let table = SymbolTable::train([b"the quick brown fox".as_slice()]); + let mut buf = Vec::new(); + table.serialize(&mut buf); + assert_eq!(buf.len(), table.serialized_size()); + } + #[test] fn decompress_rejects_dangling_escape() { let table = SymbolTable::default(); From 41f6ca44b0b02aa65ec462589cc26408110bfb2f Mon Sep 17 00:00:00 2001 From: devanbenz Date: Tue, 16 Jun 2026 20:13:29 -0500 Subject: [PATCH 3/3] feat: Align FSST encoder/decoder to parquet spec proposal --- .../src/encodings/decoding/fsst_decoder.rs | 118 ++++++++++++----- .../src/encodings/encoding/fsst_encoder.rs | 51 ++++--- parquet/src/encodings/fsst.rs | 124 +++++++++++++----- 3 files changed, 211 insertions(+), 82 deletions(-) diff --git a/parquet/src/encodings/decoding/fsst_decoder.rs b/parquet/src/encodings/decoding/fsst_decoder.rs index 1783a2d1741a..7c71c15d8908 100644 --- a/parquet/src/encodings/decoding/fsst_decoder.rs +++ b/parquet/src/encodings/decoding/fsst_decoder.rs @@ -20,10 +20,10 @@ use std::marker::PhantomData; use bytes::Bytes; use crate::basic::{Encoding, Type}; -use crate::data_type::DataType; use crate::data_type::private::ParquetValueType; -use crate::encodings::decoding::Decoder; -use crate::encodings::fsst::{FSST_LENGTH_PREFIX_BYTES, SymbolTable}; +use crate::data_type::{DataType, Int32Type}; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::encodings::fsst::{FSST_LENGTH_ENCODING_DELTA, SymbolTable, read_uleb128}; use crate::errors::{ParquetError, Result}; /// Decoder for the [`FSST`](Encoding::FSST) encoding. @@ -31,14 +31,16 @@ use crate::errors::{ParquetError, Result}; /// See [`FsstEncoder`](crate::encodings::encoding::fsst_encoder::FsstEncoder) /// for the page layout. Only [`Type::BYTE_ARRAY`] is supported. pub struct FsstDecoder { - /// Symbol table parsed from the page header. + /// Symbol table parsed from the page. table: SymbolTable, - /// Compressed payload following the symbol-table header. + /// Concatenated compressed payload (section 4). data: Bytes, - /// Offset into `data` pointing at the next value's length prefix. + /// Per-value compressed lengths (section 2), decoded up front. + lengths: Vec, + /// Index of the next value to produce. + cursor: usize, + /// Byte offset into `data` of the value at `cursor`. offset: usize, - /// Number of values still to be produced. - num_values: usize, _phantom: PhantomData, } @@ -54,24 +56,12 @@ impl FsstDecoder { Self { table: SymbolTable::default(), data: Bytes::new(), + lengths: Vec::new(), + cursor: 0, offset: 0, - num_values: 0, _phantom: PhantomData, } } - - /// Reads the little-endian `u32` length prefix at the current offset and - /// advances past it, returning the length of the next compressed value. - fn read_len(&mut self) -> Result { - let end = self.offset + FSST_LENGTH_PREFIX_BYTES; - let bytes = self - .data - .get(self.offset..end) - .ok_or_else(|| general_err!("FSST: truncated length prefix"))?; - let len = u32::from_le_bytes(bytes.try_into().unwrap()) as usize; - self.offset = end; - Ok(len) - } } impl Decoder for FsstDecoder { @@ -79,19 +69,51 @@ impl Decoder for FsstDecoder { if T::get_physical_type() != Type::BYTE_ARRAY { return Err(general_err!("FsstDecoder only supports ByteArrayType")); } - let (table, consumed) = SymbolTable::deserialize(&data)?; + + // Section 1: header. + let mut pos = 0; + let length_encoding = read_uleb128(&data, &mut pos)?; + if length_encoding != FSST_LENGTH_ENCODING_DELTA as u64 { + return Err(general_err!( + "FSST: unsupported length-array encoding {length_encoding}" + )); + } + let length_size = read_uleb128(&data, &mut pos)? as usize; + let symbol_size = read_uleb128(&data, &mut pos)? as usize; + + let length_end = pos + length_size; + let symbol_end = length_end + symbol_size; + let symbol_bytes = data + .get(length_end..symbol_end) + .ok_or_else(|| general_err!("FSST: truncated symbol table"))?; + + // Section 2: length array, Delta-Binary-Packed. + let mut lengths = vec![0i32; num_values]; + if num_values > 0 { + let length_bytes = data + .slice(pos..length_end); + let mut length_decoder = DeltaBitPackDecoder::::new(); + length_decoder.set_data(length_bytes, num_values)?; + length_decoder.get(&mut lengths)?; + } + + // Section 3: symbol table. + let (table, _) = SymbolTable::deserialize(symbol_bytes)?; + + // Section 4: compressed payload. self.table = table; - self.data = data.slice(consumed..); + self.data = data.slice(symbol_end..); + self.lengths = lengths; + self.cursor = 0; self.offset = 0; - self.num_values = num_values; Ok(()) } fn get(&mut self, buffer: &mut [T::T]) -> Result { - let to_read = buffer.len().min(self.num_values); + let to_read = buffer.len().min(self.lengths.len() - self.cursor); let mut decompressed = Vec::new(); for item in buffer.iter_mut().take(to_read) { - let len = self.read_len()?; + let len = self.lengths[self.cursor] as usize; let end = self.offset + len; let compressed = self .data @@ -99,15 +121,15 @@ impl Decoder for FsstDecoder { .ok_or_else(|| general_err!("FSST: truncated compressed value"))?; decompressed.clear(); self.table.decompress(compressed, &mut decompressed)?; - self.offset = end; item.set_from_bytes(Bytes::copy_from_slice(&decompressed)); + self.offset = end; + self.cursor += 1; } - self.num_values -= to_read; Ok(to_read) } fn values_left(&self) -> usize { - self.num_values + self.lengths.len() - self.cursor } #[cold] @@ -116,18 +138,19 @@ impl Decoder for FsstDecoder { } fn skip(&mut self, num_values: usize) -> Result { - let to_skip = num_values.min(self.num_values); + let to_skip = num_values.min(self.lengths.len() - self.cursor); for _ in 0..to_skip { - let len = self.read_len()?; - self.offset += len; + self.offset += self.lengths[self.cursor] as usize; + self.cursor += 1; } - self.num_values -= to_skip; Ok(to_skip) } } #[cfg(test)] mod tests { + use bytes::Bytes; + use crate::basic::Encoding; use crate::data_type::{ByteArray, ByteArrayType}; use crate::encodings::decoding::Decoder; @@ -161,6 +184,33 @@ mod tests { assert_eq!(decoder.values_left(), 0); } + #[test] + fn roundtrip_many_values_exercises_length_array() { + let values: Vec = (0..1000) + .map(|i| ByteArray::from(format!("https://example.com/item/{i}").as_str())) + .collect(); + + let mut encoder = FsstEncoder::::new(); + encoder.put(&values).unwrap(); + let buffer = encoder.flush_buffer().unwrap(); + + let mut decoder = FsstDecoder::::new(); + decoder.set_data(buffer, values.len()).unwrap(); + + let mut out = vec![ByteArray::default(); values.len()]; + assert_eq!(decoder.get(&mut out).unwrap(), values.len()); + assert_eq!(out, values); + } + + #[test] + fn rejects_unknown_length_encoding() { + // A header whose first ULEB128 (length-array encoding id) is not the + // expected Delta-Binary-Packed id must be rejected. + let bogus = Bytes::from_static(&[99, 0, 0]); + let mut decoder = FsstDecoder::::new(); + assert!(decoder.set_data(bogus, 0).is_err()); + } + #[test] fn skip_values() { let values: Vec = diff --git a/parquet/src/encodings/encoding/fsst_encoder.rs b/parquet/src/encodings/encoding/fsst_encoder.rs index dac8ff7eb3fb..18de96451cdd 100644 --- a/parquet/src/encodings/encoding/fsst_encoder.rs +++ b/parquet/src/encodings/encoding/fsst_encoder.rs @@ -21,9 +21,9 @@ use bytes::Bytes; use crate::basic::{Encoding, Type}; use crate::data_type::private::ParquetValueType; -use crate::data_type::{ByteArray, DataType}; -use crate::encodings::encoding::Encoder; -use crate::encodings::fsst::{FSST_LENGTH_PREFIX_BYTES, SymbolTable}; +use crate::data_type::{ByteArray, DataType, Int32Type}; +use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder}; +use crate::encodings::fsst::{FSST_LENGTH_ENCODING_DELTA, SymbolTable, write_uleb128}; use crate::errors::{ParquetError, Result}; /// Encoder for the [`FSST`](Encoding::FSST) encoding. @@ -31,10 +31,12 @@ use crate::errors::{ParquetError, Result}; /// Values are buffered until [`flush_buffer`](Encoder::flush_buffer), at which /// point a [`SymbolTable`] is trained over them and each value is compressed. /// -/// The flushed page layout is: -/// 1. the serialized [`SymbolTable`] header, followed by -/// 2. for each value: a little-endian `u32` compressed length, then the -/// compressed bytes. +/// The flushed page has four contiguous sections: +/// 1. a header of three ULEB128 values — the length-array encoding id, the +/// length-array byte size, and the symbol-table byte size; +/// 2. the per-value compressed lengths, Delta-Binary-Packed; +/// 3. the serialized [`SymbolTable`]; and +/// 4. the concatenated compressed values (boundaries come from the lengths). /// /// Only [`Type::BYTE_ARRAY`] is supported. pub struct FsstEncoder { @@ -111,22 +113,37 @@ impl Encoder for FsstEncoder { let table = SymbolTable::train(self.values.iter().map(|v| v.data())); - // Pre-size to the known header layout plus a per-value length prefix; the - // compressed payload usually fits within the buffered raw size. - let capacity = table.serialized_size() - + self.values.len() * FSST_LENGTH_PREFIX_BYTES - + self.buffered_bytes; - let mut out = Vec::with_capacity(capacity); - table.serialize(&mut out); - + // Compress each value, concatenating the payload and collecting the + // per-value compressed lengths for the length array. + let mut payload = Vec::with_capacity(self.buffered_bytes); + let mut lengths: Vec = Vec::with_capacity(self.values.len()); let mut compressed = Vec::new(); for value in &self.values { compressed.clear(); table.compress(value.data(), &mut compressed); - out.extend_from_slice(&(compressed.len() as u32).to_le_bytes()); - out.extend_from_slice(&compressed); + lengths.push(compressed.len() as i32); + payload.extend_from_slice(&compressed); } + // Section 2: length array, Delta-Binary-Packed. + let mut length_encoder = DeltaBitPackEncoder::::new(); + length_encoder.put(&lengths)?; + let length_bytes = length_encoder.flush_buffer()?; + + // Section 3: symbol table. + let mut symbol_bytes = Vec::with_capacity(table.serialized_size()); + table.serialize(&mut symbol_bytes); + + // Section 1 (header) + sections 2-4, contiguous. + let mut out = + Vec::with_capacity(8 + length_bytes.len() + symbol_bytes.len() + payload.len()); + write_uleb128(&mut out, FSST_LENGTH_ENCODING_DELTA as u64); + write_uleb128(&mut out, length_bytes.len() as u64); + write_uleb128(&mut out, symbol_bytes.len() as u64); + out.extend_from_slice(&length_bytes); + out.extend_from_slice(&symbol_bytes); + out.extend_from_slice(&payload); + self.values.clear(); self.buffered_bytes = 0; Ok(out.into()) diff --git a/parquet/src/encodings/fsst.rs b/parquet/src/encodings/fsst.rs index 3d48e9e2d4de..240923c87d8f 100644 --- a/parquet/src/encodings/fsst.rs +++ b/parquet/src/encodings/fsst.rs @@ -50,9 +50,17 @@ pub(crate) const FSST_MAX_SYMBOLS: usize = 255; /// Maximum length, in bytes, of a single symbol. pub(crate) const FSST_MAX_SYMBOL_LEN: usize = 8; -/// Width, in bytes, of the little-endian length prefix written before each -/// compressed value in a flushed page. -pub(crate) const FSST_LENGTH_PREFIX_BYTES: usize = std::mem::size_of::(); +/// FSST symbol-table format version, stored in the upper 32 bits of the leading +/// `u64` of a serialized symbol table. +const FSST_VERSION: u64 = 20190218; + +/// Fixed-size prelude of a serialized symbol table: version (`u64`) + +/// zero-terminated flag (`u8`) + length histogram (`[u8; 8]`). +const FSST_SYMBOL_TABLE_PRELUDE_LEN: usize = 8 + 1 + 8; + +/// Identifier written in the page header for the encoding used by the length +/// array. Matches `Encoding::DELTA_BINARY_PACKED`'s discriminant. +pub(crate) const FSST_LENGTH_ENCODING_DELTA: u8 = 5; /// Number of passes used to grow the symbol table during training. const TRAINING_GENERATIONS: usize = 5; @@ -69,6 +77,41 @@ fn pack(bytes: &[u8]) -> u64 { u64::from_le_bytes(buf) } +/// Append `value` to `out` as unsigned LEB128. +pub(crate) fn write_uleb128(out: &mut Vec, mut value: u64) { + loop { + let byte = (value & 0x7f) as u8; + value >>= 7; + if value != 0 { + out.push(byte | 0x80); + } else { + out.push(byte); + return; + } + } +} + +/// Read an unsigned LEB128 value from `data` starting at `*pos`, advancing +/// `*pos` past the consumed bytes. +pub(crate) fn read_uleb128(data: &[u8], pos: &mut usize) -> Result { + let mut result: u64 = 0; + let mut shift = 0; + loop { + let byte = *data + .get(*pos) + .ok_or_else(|| general_err!("FSST: truncated ULEB128 value"))?; + *pos += 1; + result |= ((byte & 0x7f) as u64) << shift; + if byte & 0x80 == 0 { + return Ok(result); + } + shift += 7; + if shift >= 64 { + return Err(general_err!("FSST: ULEB128 value overflows u64")); + } + } +} + /// A static symbol table mapping 1-byte codes to byte strings. #[derive(Debug, Clone)] pub(crate) struct SymbolTable { @@ -144,6 +187,10 @@ impl SymbolTable { symbols = Self::select_symbols(&symbols, &count1, &count2); } + // The serialized symbol table groups symbols by length, so a code is + // implicitly its position in length order. Sort here (stably) so the + // codes used for compression match what `deserialize` reconstructs. + symbols.sort_by_key(|s| s.len()); Self::with_symbols(symbols) } @@ -257,46 +304,61 @@ impl SymbolTable { } /// Number of bytes [`serialize`](Self::serialize) will append, so callers - /// can pre-size their output buffer to the exact header layout. + /// can pre-size their output buffer and record the symbol-table size. pub(crate) fn serialized_size(&self) -> usize { - 1 + self.symbols.iter().map(|s| 1 + s.len()).sum::() + FSST_SYMBOL_TABLE_PRELUDE_LEN + self.symbols.iter().map(|s| s.len()).sum::() } - /// Serialize the table as a header that precedes the compressed data. + /// Serialize the table in the FSST symbol-table format: /// - /// Layout: `num_symbols: u8`, then for each symbol `len: u8` followed by - /// `len` bytes. See [`serialized_size`](Self::serialized_size). + /// 1. version (`u64` little-endian; the version number occupies the upper 32 bits), + /// 2. zero-terminated flag (`u8`; always `0` here), + /// 3. length histogram (`[u8; 8]`; `histogram[len - 1]` is the number of + /// symbols of that length), then + /// 4. the symbol bytes, grouped by ascending length. + /// + /// Relies on `self.symbols` being sorted by length (see [`train`](Self::train)). pub(crate) fn serialize(&self, out: &mut Vec) { debug_assert!(self.symbols.len() <= FSST_MAX_SYMBOLS); - out.push(self.symbols.len() as u8); + + out.extend_from_slice(&(FSST_VERSION << 32).to_le_bytes()); + out.push(0); // not zero-terminated + + let mut histogram = [0u8; 8]; + for symbol in &self.symbols { + debug_assert!((1..=FSST_MAX_SYMBOL_LEN).contains(&symbol.len())); + histogram[symbol.len() - 1] += 1; + } + out.extend_from_slice(&histogram); + for symbol in &self.symbols { - debug_assert!(symbol.len() <= FSST_MAX_SYMBOL_LEN); - out.push(symbol.len() as u8); out.extend_from_slice(symbol); } } - /// Deserialize a table from the front of `data`, returning the table and the - /// number of bytes consumed. + /// Deserialize a table written by [`serialize`](Self::serialize), returning + /// the table and the number of bytes consumed. pub(crate) fn deserialize(data: &[u8]) -> Result<(Self, usize)> { - let mut i = 0; - let count = *data - .get(i) - .ok_or_else(|| general_err!("FSST: missing symbol-table header"))? as usize; - i += 1; - let mut symbols = Vec::with_capacity(count); - for _ in 0..count { - let len = *data - .get(i) - .ok_or_else(|| general_err!("FSST: truncated symbol length"))? as usize; - i += 1; - let end = i + len; - let symbol = data - .get(i..end) - .ok_or_else(|| general_err!("FSST: truncated symbol data"))? - .to_vec(); - symbols.push(symbol); - i = end; + let prelude = data + .get(..FSST_SYMBOL_TABLE_PRELUDE_LEN) + .ok_or_else(|| general_err!("FSST: truncated symbol-table prelude"))?; + // Bytes 0..8 are the version and 8 the zero-terminated flag; neither + // affects decoding of the symbols below. + let histogram = &prelude[9..17]; + + let mut symbols = Vec::new(); + let mut i = FSST_SYMBOL_TABLE_PRELUDE_LEN; + for (idx, &count) in histogram.iter().enumerate() { + let len = idx + 1; + for _ in 0..count { + let end = i + len; + let symbol = data + .get(i..end) + .ok_or_else(|| general_err!("FSST: truncated symbol data"))? + .to_vec(); + symbols.push(symbol); + i = end; + } } Ok((Self::with_symbols(symbols), i)) }