From f0b6d458f24b37d7478eddd03e7cb3192f2de062 Mon Sep 17 00:00:00 2001 From: lemolatoon <63438515+lemolatoon@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:04:30 -0500 Subject: [PATCH 1/2] Revert "Split rewind implementation, and remove it if there's overhead" This reverts commit 0e2ddf078fa2a098f43bb8d1d7a859efef457ab6. --- ebi/src/compressor.rs | 2 -- ebi/src/compressor/gorilla.rs | 37 ++++++++++++++++++++++++++++++ ebi/src/compressor/run_length.rs | 4 +--- ebi/src/compressor/uncompressed.rs | 4 +--- 4 files changed, 39 insertions(+), 8 deletions(-) diff --git a/ebi/src/compressor.rs b/ebi/src/compressor.rs index 015c61b..7d2dcce 100644 --- a/ebi/src/compressor.rs +++ b/ebi/src/compressor.rs @@ -42,9 +42,7 @@ pub trait AppendableCompressor: Compressor { /// This method is NOT re-compressing the existing data. /// Returns the total size of the compressed data. fn append_compress(&mut self, input: &[f64]); -} -pub trait RewindableCompressor: AppendableCompressor { /// Rewind the n records from the end of the compressed data /// Returns true if the rewind is successful, false otherwise. fn rewind(&mut self, n: usize) -> bool; diff --git a/ebi/src/compressor/gorilla.rs b/ebi/src/compressor/gorilla.rs index 4850a66..d6b5d9d 100644 --- a/ebi/src/compressor/gorilla.rs +++ b/ebi/src/compressor/gorilla.rs @@ -64,6 +64,14 @@ impl AppendableCompressor for GorillaCompressor { self.encoder.encode(*value); } } + + fn rewind(&mut self, n: usize) -> bool { + if n != 1 { + return false; + } + + self.encoder.rewind() + } } pub(crate) mod modified_tsz { @@ -138,6 +146,7 @@ pub(crate) mod modified_tsz { /// StdEncoder is used to encode `DataPoint`s #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct GorillaFloatEncoder { + prev_state: Option, state: GorillaFloatEncoderState, w: BufferedWriterExt, @@ -148,6 +157,7 @@ pub(crate) mod modified_tsz { /// bytes to `w` pub fn new(w: BufferedWriterExt) -> Self { Self { + prev_state: None, state: GorillaFloatEncoderState::new(), w, } @@ -221,6 +231,7 @@ pub(crate) mod modified_tsz { impl GorillaFloatEncoder { pub fn encode(&mut self, value: f64) { + let prev_state = self.state; let value_bits = value.to_bits(); self.state.total_bytes_in += 8; @@ -231,6 +242,19 @@ pub(crate) mod modified_tsz { } else { self.write_next_value(value_bits) } + + self.prev_state = Some(prev_state); + } + + pub fn rewind(&mut self) -> bool { + if let Some(prev_state) = self.prev_state { + self.state = prev_state; + self.prev_state = None; + self.w.set_cursor_at(self.state.total_bits_buffered as u32); + true + } else { + false + } } /// Returns the total number of bytes of the compressed data if the stream were to be closed @@ -258,6 +282,7 @@ pub(crate) mod modified_tsz { /// Reset the encoder pub fn reset(&mut self) { self.state = GorillaFloatEncoderState::new(); + self.prev_state = None; self.w.reset(); } @@ -319,6 +344,18 @@ pub(crate) mod modified_tsz { fn last_index(&self) -> usize { self.buf.len() - 1 } + + fn set_cursor_at(&mut self, n_total_bits: u32) { + let n_bytes = (n_total_bits + 7) / 8; + let n_bits = n_total_bits % 8; + + self.buf.truncate(n_bytes as usize); + if n_bits == 0 { + self.pos = 8; + } else { + self.pos = n_bits; + } + } } impl Write for BufferedWriterExt { diff --git a/ebi/src/compressor/run_length.rs b/ebi/src/compressor/run_length.rs index 3df8991..e37cd9f 100644 --- a/ebi/src/compressor/run_length.rs +++ b/ebi/src/compressor/run_length.rs @@ -8,7 +8,7 @@ use crate::format::{ serialize::{AsBytes, ToLe}, }; -use super::{AppendableCompressor, Compressor, RewindableCompressor}; +use super::{AppendableCompressor, Compressor}; /// Run Length Encoding (RLE) Compression Scheme /// Chunk Layout: @@ -150,9 +150,7 @@ impl AppendableCompressor for RunLengthCompressor { self.total_bytes_in += n_bytes_compressed; } -} -impl RewindableCompressor for RunLengthCompressor { fn rewind(&mut self, n: usize) -> bool { let mut remaining_records = n; if self.previous_run_count == 0 { diff --git a/ebi/src/compressor/uncompressed.rs b/ebi/src/compressor/uncompressed.rs index 567e937..f73d561 100644 --- a/ebi/src/compressor/uncompressed.rs +++ b/ebi/src/compressor/uncompressed.rs @@ -8,7 +8,7 @@ use crate::format::{ uncompressed::UncompressedHeader0, }; -use super::{AppendableCompressor, Compressor, RewindableCompressor, MAX_BUFFERS}; +use super::{AppendableCompressor, Compressor, MAX_BUFFERS}; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct UncompressedCompressor { @@ -93,9 +93,7 @@ impl AppendableCompressor for UncompressedCompressor { let size = size_of_val(input); self.total_bytes_in += size; } -} -impl RewindableCompressor for UncompressedCompressor { fn rewind(&mut self, n: usize) -> bool { if self.buffer.len() < n { return false; From 4e5e617d5fde070082b1a0e9ba3accc3430c86a1 Mon Sep 17 00:00:00 2001 From: lemolatoon <63438515+lemolatoon@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:04:40 -0500 Subject: [PATCH 2/2] Revert "Remove SizeEstimator related codes" This reverts commit 5ac6612905c8ef7049a20b47516b760a8abf866b. --- ebi/src/compressor.rs | 116 +++++- ebi/src/compressor/buff.rs | 170 ++++++++- ebi/src/compressor/gorilla.rs | 16 +- ebi/src/compressor/run_length.rs | 17 +- ebi/src/compressor/size_estimater.rs | 514 +++++++++++++++++++++++++++ ebi/src/compressor/uncompressed.rs | 23 +- ebi/src/encoder.rs | 94 +++-- ebi/src/format/native.rs | 2 +- ebi/src/main.rs | 2 +- ebi/tests/query.rs | 2 +- ebi/tests/round_trip.rs | 19 +- 11 files changed, 932 insertions(+), 43 deletions(-) create mode 100644 ebi/src/compressor/size_estimater.rs diff --git a/ebi/src/compressor.rs b/ebi/src/compressor.rs index 7d2dcce..b72e0bd 100644 --- a/ebi/src/compressor.rs +++ b/ebi/src/compressor.rs @@ -1,8 +1,11 @@ -use buff::BUFFCompressor; +use buff::{BUFFCompressor, BUFFSizeEstimator}; use derive_builder::Builder; use gorilla::GorillaCompressor; use quick_impl::QuickImpl; use run_length::RunLengthCompressor; +use size_estimater::{ + AppendCompressingSizeEstimator, EstimateOption, SizeEstimator, StaticSizeEstimator, +}; // use run_length::RunLengthCompressor; use uncompressed::UncompressedCompressor; @@ -11,13 +14,35 @@ use crate::format::{self, CompressionScheme}; pub mod buff; pub mod gorilla; pub mod run_length; +pub mod size_estimater; pub mod uncompressed; const MAX_BUFFERS: usize = 5; pub trait Compressor { + type SizeEstimatorImpl<'comp, 'buf>: SizeEstimator + Sized + where + Self: 'comp; + /// Perform the compression and return the size of the compressed data. fn compress(&mut self, input: &[f64]); + /// If the size of the compressed data is known by O(1) operation, return it. + fn estimate_size_static( + &self, + _number_of_records: usize, + _estimate_option: EstimateOption, + ) -> Option { + None + } + + fn size_estimator<'comp, 'buf>( + &'comp mut self, + _input: &'buf [f64], + _estimate_option: EstimateOption, + ) -> Option> { + None + } + /// Returns the total number of input bytes which have been processed by this Compressor. fn total_bytes_in(&self) -> usize; @@ -56,9 +81,78 @@ pub enum GenericCompressor { BUFF(BUFFCompressor), } +#[derive(Debug, PartialEq, PartialOrd, QuickImpl)] +pub enum GenericSizeEstimator<'comp, 'buf> { + #[quick_impl(impl From)] + Uncompressed(StaticSizeEstimator<'comp, 'buf, UncompressedCompressor>), + #[quick_impl(impl From)] + RLE(AppendCompressingSizeEstimator<'comp, 'buf, RunLengthCompressor>), + #[quick_impl(impl From)] + Gorilla(AppendCompressingSizeEstimator<'comp, 'buf, GorillaCompressor>), + #[quick_impl(impl From)] + BUFF(BUFFSizeEstimator<'comp, 'buf>), +} + +macro_rules! impl_generic_size_estimator { + ($enum_name:ident, $($variant:ident),*) => { + impl<'comp, 'buf> SizeEstimator for $enum_name<'comp, 'buf> { + fn size(&self) -> usize { + match self { + $( $enum_name::$variant(e) => e.size(), )* + } + } + + fn advance_n(&mut self, n: usize) -> size_estimater::Result<()> { + match self { + $( $enum_name::$variant(e) => e.advance_n(n), )* + } + } + + fn advance(&mut self) -> size_estimater::Result<()> { + match self { + $( $enum_name::$variant(e) => e.advance(), )* + } + } + + fn unload_value(&mut self) -> size_estimater::Result<()> { + match self { + $( $enum_name::$variant(e) => e.unload_value(), )* + } + } + + fn number_of_records_advanced(&self) -> usize { + match self { + $( $enum_name::$variant(e) => e.number_of_records_advanced(), )* + } + } + + fn inner_buffer(&self) -> &[f64] { + match self { + $( $enum_name::$variant(e) => e.inner_buffer(), )* + } + } + + fn estimate_option(&self) -> EstimateOption { + match self { + $( $enum_name::$variant(e) => e.estimate_option(), )* + } + } + + fn compress(self) -> usize { + match self { + $( $enum_name::$variant(e) => e.compress(), )* + } + } + } + }; +} + +impl_generic_size_estimator!(GenericSizeEstimator, Uncompressed, RLE, Gorilla, BUFF); + macro_rules! impl_generic_compressor { ($enum_name:ident, $($variant:ident),*) => { impl Compressor for $enum_name { + type SizeEstimatorImpl<'comp, 'buf> = GenericSizeEstimator<'comp, 'buf>; fn compress(&mut self, input: &[f64]) { match self { $( $enum_name::$variant(c) => c.compress(input), )* @@ -94,6 +188,26 @@ macro_rules! impl_generic_compressor { $( $enum_name::$variant(c) => c.reset(), )* } } + + fn estimate_size_static( + &self, + number_of_records: usize, + estimate_option: EstimateOption, + ) -> Option { + match self { + $( $enum_name::$variant(c) => c.estimate_size_static(number_of_records, estimate_option), )* + } + } + + fn size_estimator<'comp, 'buf>( + &'comp mut self, + input: &'buf [f64], + estimate_option: EstimateOption, + ) -> Option> { + match self { + $( $enum_name::$variant(c) => c.size_estimator(input, estimate_option).map(|se| se.into()), )* + } + } } }; } diff --git a/ebi/src/compressor/buff.rs b/ebi/src/compressor/buff.rs index b54e894..de7a2b4 100644 --- a/ebi/src/compressor/buff.rs +++ b/ebi/src/compressor/buff.rs @@ -1,6 +1,12 @@ -use crate::compression_common::buff::precision_bound::{self, PRECISION_MAP}; +use crate::{ + compression_common::buff::precision_bound::{self, PRECISION_MAP}, + compressor::size_estimater::SizeEstimatorError, +}; -use super::Compressor; +use super::{ + size_estimater::{self, EstimateOption, SizeEstimator}, + Compressor, +}; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct BUFFCompressor { @@ -30,10 +36,20 @@ impl BUFFCompressor { } impl Compressor for BUFFCompressor { + type SizeEstimatorImpl<'comp, 'buf> = BUFFSizeEstimator<'comp, 'buf>; + fn compress(&mut self, data: &[f64]) { self.compress_with_precalculated(Precalculated::precalculate(self.scale, data)); } + fn size_estimator<'comp, 'buf>( + &'comp mut self, + input: &'buf [f64], + estimate_option: EstimateOption, + ) -> Option> { + Some(BUFFSizeEstimator::new(estimate_option, input, self)) + } + fn total_bytes_in(&self) -> usize { self.total_bytes_in } @@ -57,6 +73,94 @@ impl Compressor for BUFFCompressor { } } +#[derive(Debug, PartialEq, PartialOrd)] +pub struct BUFFSizeEstimator<'comp, 'buf> { + comp: &'comp mut BUFFCompressor, + buffer: &'buf [f64], + estimate_option: EstimateOption, + cursor: usize, + prev_min_max: Option<(usize, usize)>, + precalculated: Precalculated, +} + +impl<'comp, 'buf> BUFFSizeEstimator<'comp, 'buf> { + pub fn new( + estimate_option: EstimateOption, + buffer: &'buf [f64], + comp: &'comp mut BUFFCompressor, + ) -> Self { + let scale = comp.scale; + Self { + comp, + buffer, + estimate_option, + cursor: 0, + prev_min_max: None, + precalculated: Precalculated::new(scale), + } + } +} + +impl<'comp, 'buf> SizeEstimator for BUFFSizeEstimator<'comp, 'buf> { + fn size(&self) -> usize { + self.precalculated.compressed_size() + } + + fn advance_n(&mut self, n: usize) -> size_estimater::Result<()> { + if self.buffer.len() < self.cursor + n { + return Err(SizeEstimatorError::EndOfBuffer( + self.buffer.len() - self.cursor, + )); + } + let appending_values = &self.buffer[self.cursor..self.cursor + n]; + + self.precalculated.append_values(&appending_values[..n - 1]); + self.prev_min_max = Some((self.precalculated.min, self.precalculated.max)); + self.precalculated.append_values(&appending_values[n - 1..]); + self.cursor += n; + + Ok(()) + } + + fn unload_value(&mut self) -> size_estimater::Result<()> { + if self.cursor == 0 { + return Err(SizeEstimatorError::EndOfBuffer(0)); + } + + let (min, max) = match self.prev_min_max { + Some((min, max)) => (min, max), + None => { + return Err(SizeEstimatorError::EndOfBuffer(0)); + } + }; + + self.cursor -= 1; + self.precalculated.rewind_values(1, min, max); + + self.prev_min_max = None; + + Ok(()) + } + + fn number_of_records_advanced(&self) -> usize { + self.cursor + } + + fn inner_buffer(&self) -> &[f64] { + self.buffer + } + + fn estimate_option(&self) -> EstimateOption { + self.estimate_option + } + + fn compress(self) -> usize { + self.comp.compress_with_precalculated(self.precalculated); + + self.comp.total_bytes_buffered() + } +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] struct Precalculated { min: usize, @@ -66,6 +170,68 @@ struct Precalculated { } impl Precalculated { + pub fn new(scale: usize) -> Self { + let precision = if scale == 0 { + 0 + } else { + (scale as f32).log10() as usize + }; + Self { + min: 0, + max: 0, + fixed_representation_values: Vec::new(), + precision, + } + } + + pub fn rewind_values(&mut self, n: usize, prev_min: usize, prev_max: usize) { + if n >= self.number_of_records() { + self.min = 0; + self.max = 0; + self.fixed_representation_values.clear(); + return; + } + + self.min = prev_min; + self.max = prev_max; + + self.fixed_representation_values + .truncate(self.number_of_records() - n); + } + + pub fn append_values(&mut self, values: &[f64]) { + let mut min = Precalculated::min_fixed(self).unwrap_or(i64::MAX); + let mut min_index = self.min; + let mut max = Precalculated::max_fixed(self).unwrap_or(i64::MIN); + let mut max_index = self.max; + + let fractional_part_bits_length = self.fractional_part_bits_length() as i32; + let number_of_records = self.number_of_records(); + + for (i, f) in values + .iter() + .enumerate() + .map(|(i, &f)| (i + number_of_records, f)) + { + let fixed = precision_bound::into_fixed_representation_with_fractional_part_bits_length( + f, + fractional_part_bits_length, + ); + if fixed < min { + min = fixed; + min_index = i; + } + if fixed > max { + max = fixed; + max_index = i; + } + self.fixed_representation_values.push(fixed); + } + + self.min = min_index; + self.max = max_index; + } + pub fn precalculate(scale: usize, floats: &[f64]) -> Self { let mut fixed_representation_values = Vec::new(); diff --git a/ebi/src/compressor/gorilla.rs b/ebi/src/compressor/gorilla.rs index d6b5d9d..287ca1d 100644 --- a/ebi/src/compressor/gorilla.rs +++ b/ebi/src/compressor/gorilla.rs @@ -1,4 +1,4 @@ -use super::{AppendableCompressor, Compressor}; +use super::{size_estimater::AppendCompressingSizeEstimator, AppendableCompressor, Compressor}; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct GorillaCompressor { @@ -28,6 +28,8 @@ impl Default for GorillaCompressor { } impl Compressor for GorillaCompressor { + type SizeEstimatorImpl<'comp, 'buf> = AppendCompressingSizeEstimator<'comp, 'buf, Self>; + fn compress(&mut self, input: &[f64]) { self.reset(); for value in input { @@ -43,6 +45,18 @@ impl Compressor for GorillaCompressor { self.encoder.total_bytes_buffered() } + fn size_estimator<'comp, 'buf>( + &'comp mut self, + input: &'buf [f64], + estimate_option: super::size_estimater::EstimateOption, + ) -> Option> { + Some(AppendCompressingSizeEstimator::new( + self, + input, + estimate_option, + )) + } + fn prepare(&mut self) { self.encoder.prepare(); } diff --git a/ebi/src/compressor/run_length.rs b/ebi/src/compressor/run_length.rs index e37cd9f..e93784e 100644 --- a/ebi/src/compressor/run_length.rs +++ b/ebi/src/compressor/run_length.rs @@ -8,7 +8,7 @@ use crate::format::{ serialize::{AsBytes, ToLe}, }; -use super::{AppendableCompressor, Compressor}; +use super::{size_estimater::AppendCompressingSizeEstimator, AppendableCompressor, Compressor}; /// Run Length Encoding (RLE) Compression Scheme /// Chunk Layout: @@ -53,6 +53,9 @@ impl Default for RunLengthCompressor { } impl Compressor for RunLengthCompressor { + // TODO: Implement RLE size estimator + type SizeEstimatorImpl<'comp, 'buf> = AppendCompressingSizeEstimator<'comp, 'buf, Self>; + fn compress(&mut self, input: &[f64]) { self.reset(); let is_starting = self.previous_run_count == 0; @@ -81,6 +84,18 @@ impl Compressor for RunLengthCompressor { self.total_bytes_in += n_bytes_compressed; } + fn size_estimator<'comp, 'buf>( + &'comp mut self, + input: &'buf [f64], + estimate_option: super::size_estimater::EstimateOption, + ) -> Option> { + Some(AppendCompressingSizeEstimator::new( + self, + input, + estimate_option, + )) + } + fn total_bytes_in(&self) -> usize { self.total_bytes_in } diff --git a/ebi/src/compressor/size_estimater.rs b/ebi/src/compressor/size_estimater.rs new file mode 100644 index 0000000..4010a2a --- /dev/null +++ b/ebi/src/compressor/size_estimater.rs @@ -0,0 +1,514 @@ +use thiserror::Error; + +use super::{AppendableCompressor, Compressor}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub enum EstimateOption { + Exact = 0, + Overestimate, + BestEffort, +} + +impl EstimateOption { + pub fn is_stricter_than(&self, other: &Self) -> bool { + let lhs = *self as u8; + let rhs = *other as u8; + + lhs < rhs + } + + pub fn is_stricter_or_equal(&self, other: &Self) -> bool { + let lhs = *self as u8; + let rhs = *other as u8; + + lhs <= rhs + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)] +pub enum SizeEstimatorError { + #[error("End of buffer reached by advance/unload: cursor {{advance, load}}able: {0}")] + EndOfBuffer(usize), +} + +pub type Result = std::result::Result; + +/// Interface for Estimater, +/// This estimater estimates the value over the buffer. +/// With advance, we can load the next value from the buffer, +/// With unload_value, we can rewind the loaded value, if possible usually, unload buffer is size 1 +/// With compress method, leveraging the obtained statistics and internal Compressor object, do the actual compression +pub trait SizeEstimator { + /// Estimate the size of the compressed bytes for the loaded uncompressed values + fn size(&self) -> usize; + /// Load the next n values from the buffer + fn advance_n(&mut self, n: usize) -> Result<()>; + /// Load the next value from the buffer + fn advance(&mut self) -> Result<()> { + self.advance_n(1) + } + /// Unload the last loaded value, typically unloaded buffer is size 1 + /// 2nd call of unload_value usually returns error + fn unload_value(&mut self) -> Result<()>; + /// Number of records advanced + fn number_of_records_advanced(&self) -> usize; + /// Return the inner buffer + fn inner_buffer(&self) -> &[f64]; + /// Return the estimate option + fn estimate_option(&self) -> EstimateOption; + /// Compress the loaded values, and return the size of the compressed bytes + fn compress(self) -> usize; +} + +/// A naive size estimater that estimates the size by do the actual compression +#[derive(Debug, PartialEq, PartialOrd)] +pub struct NaiveSlowSizeEstimator<'comp, 'buf, C: Compressor + Clone> { + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + cursor: usize, + number_of_records_advanced: usize, +} + +impl<'comp, 'buf, C: Compressor + Clone> NaiveSlowSizeEstimator<'comp, 'buf, C> { + pub fn new( + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + ) -> Self { + Self { + compressor, + buffer, + estimate_option, + cursor: 0, + number_of_records_advanced: 0, + } + } + + fn loaded_buffer(&self) -> &[f64] { + &self.buffer[..self.cursor] + } +} + +impl SizeEstimator for NaiveSlowSizeEstimator<'_, '_, C> { + fn size(&self) -> usize { + let mut compressor = self.compressor.clone(); + compressor.compress(self.loaded_buffer()); + compressor.total_bytes_buffered() + } + + fn advance_n(&mut self, n: usize) -> Result<()> { + if self.cursor + n > self.buffer.len() { + return Err(SizeEstimatorError::EndOfBuffer( + self.buffer.len() - self.cursor, + )); + } + + self.cursor += n; + self.number_of_records_advanced += n; + + Ok(()) + } + + fn unload_value(&mut self) -> Result<()> { + if self.cursor == 0 { + return Err(SizeEstimatorError::EndOfBuffer(0)); + } + + self.cursor -= 1; + self.number_of_records_advanced -= 1; + + Ok(()) + } + + fn number_of_records_advanced(&self) -> usize { + self.number_of_records_advanced + } + + fn inner_buffer(&self) -> &[f64] { + self.buffer + } + + fn estimate_option(&self) -> EstimateOption { + self.estimate_option + } + + fn compress(self) -> usize { + let Self { + compressor, + buffer, + cursor, + .. + } = self; + let loaded_buffer = &buffer[..cursor]; + compressor.compress(loaded_buffer); + compressor.total_bytes_buffered() + } +} + +#[derive(Debug, PartialEq, PartialOrd)] +pub struct StaticSizeEstimator<'comp, 'buf, C: Compressor + Clone> { + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + cursor: usize, + number_of_records_advanced: usize, +} + +impl<'comp, 'buf, C: Compressor + Clone> StaticSizeEstimator<'comp, 'buf, C> { + pub fn new( + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + ) -> Self { + Self { + compressor, + buffer, + estimate_option, + cursor: 0, + number_of_records_advanced: 0, + } + } +} + +impl SizeEstimator for StaticSizeEstimator<'_, '_, C> { + fn size(&self) -> usize { + self.compressor + .estimate_size_static(self.number_of_records_advanced, self.estimate_option) + .unwrap() + } + + fn advance_n(&mut self, n: usize) -> Result<()> { + if self.cursor + n > self.buffer.len() { + return Err(SizeEstimatorError::EndOfBuffer( + self.buffer.len() - self.cursor, + )); + } + + self.cursor += n; + self.number_of_records_advanced += n; + + Ok(()) + } + + fn unload_value(&mut self) -> Result<()> { + if self.cursor == 0 { + return Err(SizeEstimatorError::EndOfBuffer(0)); + } + + self.cursor -= 1; + self.number_of_records_advanced -= 1; + + Ok(()) + } + + fn number_of_records_advanced(&self) -> usize { + self.number_of_records_advanced + } + + fn inner_buffer(&self) -> &[f64] { + self.buffer + } + + fn estimate_option(&self) -> EstimateOption { + self.estimate_option + } + + fn compress(self) -> usize { + let Self { + compressor, + buffer, + cursor, + .. + } = self; + let loaded_buffer = &buffer[..cursor]; + compressor.compress(loaded_buffer); + compressor.total_bytes_buffered() + } +} + +#[derive(Debug, PartialEq, PartialOrd)] +pub struct AppendCompressingSizeEstimator<'comp, 'buf, C: AppendableCompressor> { + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + cursor: usize, + number_of_records_advanced: usize, + size: usize, +} + +impl<'comp, 'buf, C: AppendableCompressor> AppendCompressingSizeEstimator<'comp, 'buf, C> { + pub fn new( + compressor: &'comp mut C, + buffer: &'buf [f64], + estimate_option: EstimateOption, + ) -> Self { + let size = compressor.total_bytes_buffered(); + Self { + compressor, + buffer, + estimate_option, + cursor: 0, + number_of_records_advanced: 0, + size, + } + } +} + +impl SizeEstimator for AppendCompressingSizeEstimator<'_, '_, C> { + fn size(&self) -> usize { + self.size + } + + fn advance_n(&mut self, n: usize) -> Result<()> { + if self.cursor + n > self.buffer.len() { + return Err(SizeEstimatorError::EndOfBuffer( + self.buffer.len() - self.cursor, + )); + } + + self.compressor + .append_compress(&self.buffer[self.cursor..self.cursor + n]); + + self.cursor += n; + self.number_of_records_advanced += n; + self.size = self.compressor.total_bytes_buffered(); + + Ok(()) + } + + fn unload_value(&mut self) -> Result<()> { + if self.cursor == 0 { + return Err(SizeEstimatorError::EndOfBuffer(0)); + } + if self.compressor.rewind(1) { + self.cursor -= 1; + self.number_of_records_advanced -= 1; + self.size = self.compressor.total_bytes_buffered(); + Ok(()) + } else { + Err(SizeEstimatorError::EndOfBuffer(0)) + } + } + + fn number_of_records_advanced(&self) -> usize { + self.number_of_records_advanced + } + + fn inner_buffer(&self) -> &[f64] { + self.buffer + } + + fn estimate_option(&self) -> EstimateOption { + self.estimate_option + } + + fn compress(self) -> usize { + // Already compressed by `advance_n` + self.compressor.total_bytes_buffered() + } +} + +#[cfg(test)] +mod tests { + use crate::compressor::{CompressorConfig, GenericCompressor}; + + use super::*; + + #[test] + fn test_is_stricter_than() { + assert!(EstimateOption::Exact.is_stricter_than(&EstimateOption::Overestimate)); + assert!(EstimateOption::Exact.is_stricter_than(&EstimateOption::BestEffort)); + assert!(!EstimateOption::Overestimate.is_stricter_than(&EstimateOption::Exact)); + assert!(EstimateOption::Overestimate.is_stricter_than(&EstimateOption::BestEffort)); + assert!(!EstimateOption::BestEffort.is_stricter_than(&EstimateOption::Exact)); + assert!(!EstimateOption::BestEffort.is_stricter_than(&EstimateOption::Overestimate)); + + assert!(!EstimateOption::Exact.is_stricter_than(&EstimateOption::Exact)); + assert!(!EstimateOption::Overestimate.is_stricter_than(&EstimateOption::Overestimate)); + assert!(!EstimateOption::BestEffort.is_stricter_than(&EstimateOption::BestEffort)); + } + + #[test] + fn test_is_stricter_or_equal() { + assert!(EstimateOption::Exact.is_stricter_or_equal(&EstimateOption::Overestimate)); + assert!(EstimateOption::Exact.is_stricter_or_equal(&EstimateOption::BestEffort)); + assert!(!EstimateOption::Overestimate.is_stricter_or_equal(&EstimateOption::Exact)); + assert!(EstimateOption::Overestimate.is_stricter_or_equal(&EstimateOption::BestEffort)); + assert!(!EstimateOption::BestEffort.is_stricter_or_equal(&EstimateOption::Exact)); + assert!(!EstimateOption::BestEffort.is_stricter_or_equal(&EstimateOption::Overestimate)); + + assert!(EstimateOption::Exact.is_stricter_or_equal(&EstimateOption::Exact)); + assert!(EstimateOption::Overestimate.is_stricter_or_equal(&EstimateOption::Overestimate)); + assert!(EstimateOption::BestEffort.is_stricter_or_equal(&EstimateOption::BestEffort)); + } + + fn test_size(mut comp: GenericCompressor, estimate_option: EstimateOption) { + let mut se = comp + .size_estimator( + &[ + 0.5, 0.5, 1.9, 1000.8, -2323.3, 33.7, 22.5, 907.8, 298.9, 298.9, 28.0, 28.0, + 28.0, 0.9, 2182.2, + ], + estimate_option, + ) + .unwrap(); + let mut size = 0; + for _ in 0..5 { + se.advance().unwrap(); + assert!(se.size() >= size, "size should be monotonically increasing"); + size = se.size(); + } + let estimate_option = se.estimate_option(); + let compressed_size = se.compress(); + let actual_compressed_size = comp.total_bytes_buffered(); + assert_eq!( + compressed_size, actual_compressed_size, + "compressed size returned by SizeEstimator::compress should be equal to the actual compressed size" + ); + + match estimate_option { + EstimateOption::Exact => { + assert_eq!(size, compressed_size, "estimated size should be exact"); + } + EstimateOption::Overestimate => { + assert!( + size <= compressed_size, + "estimated size should be overestimated" + ); + } + EstimateOption::BestEffort => {} + } + } + + fn test_rewind(mut comp: GenericCompressor, estimate_option: EstimateOption) { + let mut se = comp + .size_estimator( + &[ + 0.5, 0.5, 1.9, 1000.8, -2323.3, 33.7, 22.5, 907.8, 298.9, 298.9, 28.0, 28.0, + 28.0, 0.9, 2182.2, + ], + estimate_option, + ) + .unwrap(); + let mut sizes = Vec::new(); + sizes.push(se.size()); + for _ in 0..5 { + se.advance().unwrap(); + sizes.push(se.size()); + } + + for i in 0..5 { + sizes.pop(); + if se.unload_value().is_err() { + assert_ne!( + i, 0, + "unload_value should not return error for the first time" + ); + break; + }; + let size = se.size(); + let expected_size = *sizes.last().unwrap(); + println!("size: {}, expected_size: {}", size, expected_size); + assert_eq!(size, expected_size, "size should be the same after rewind"); + } + + for _ in 0..5 { + se.advance().unwrap(); + sizes.push(se.size()); + } + + for i in 0..5 { + sizes.pop(); + if se.unload_value().is_err() { + assert_ne!( + i, 0, + "unload_value should not return error for the first time" + ); + break; + }; + let size = se.size(); + let expected_size = *sizes.last().unwrap(); + assert_eq!(size, expected_size, "size should be the same after rewind"); + } + + let estimate_option = se.estimate_option(); + let estimated_size = se.size(); + let compressed_size = se.compress(); + + match estimate_option { + EstimateOption::Exact => { + assert_eq!( + estimated_size, compressed_size, + "estimated size should be exact" + ); + } + EstimateOption::Overestimate => { + assert!( + estimated_size <= compressed_size, + "estimated size should be overestimated" + ); + } + EstimateOption::BestEffort => {} + } + } + + #[test] + fn test_uncompressed_estimator() { + let comp: CompressorConfig = CompressorConfig::uncompressed() + .header("hii".as_bytes().to_vec().into_boxed_slice()) + .build() + .into(); + let options = [ + EstimateOption::Exact, + EstimateOption::Overestimate, + EstimateOption::BestEffort, + ]; + for &estimate_option in &options { + test_size(comp.clone().build(), estimate_option); + test_rewind(comp.clone().build(), estimate_option); + } + } + + #[test] + fn test_rle_estimator() { + let comp: CompressorConfig = CompressorConfig::rle().build().into(); + let options = [ + EstimateOption::Exact, + EstimateOption::Overestimate, + EstimateOption::BestEffort, + ]; + for &estimate_option in &options { + test_size(comp.clone().build(), estimate_option); + test_rewind(comp.clone().build(), estimate_option); + } + } + + #[test] + fn test_gorilla_estimator() { + let comp: CompressorConfig = CompressorConfig::gorilla().build().into(); + let options = [ + EstimateOption::Exact, + EstimateOption::Overestimate, + EstimateOption::BestEffort, + ]; + for &estimate_option in &options { + test_size(comp.clone().build(), estimate_option); + test_rewind(comp.clone().build(), estimate_option); + } + } + + #[test] + fn test_buff_estimator() { + let comp: CompressorConfig = CompressorConfig::buff().scale(10).build().into(); + let options = [ + EstimateOption::Exact, + EstimateOption::Overestimate, + EstimateOption::BestEffort, + ]; + for &estimate_option in &options { + test_size(comp.clone().build(), estimate_option); + test_rewind(comp.clone().build(), estimate_option); + } + } +} diff --git a/ebi/src/compressor/uncompressed.rs b/ebi/src/compressor/uncompressed.rs index f73d561..08e667b 100644 --- a/ebi/src/compressor/uncompressed.rs +++ b/ebi/src/compressor/uncompressed.rs @@ -8,7 +8,7 @@ use crate::format::{ uncompressed::UncompressedHeader0, }; -use super::{AppendableCompressor, Compressor, MAX_BUFFERS}; +use super::{size_estimater::StaticSizeEstimator, AppendableCompressor, Compressor, MAX_BUFFERS}; #[derive(Debug, Clone, PartialEq, PartialOrd)] pub struct UncompressedCompressor { @@ -35,6 +35,7 @@ impl UncompressedCompressor { } impl Compressor for UncompressedCompressor { + type SizeEstimatorImpl<'comp, 'buf> = StaticSizeEstimator<'comp, 'buf, Self>; fn compress(&mut self, input: &[f64]) { self.reset(); for &value in input { @@ -45,6 +46,26 @@ impl Compressor for UncompressedCompressor { self.total_bytes_in += size; } + fn estimate_size_static( + &self, + number_of_records: usize, + _estimate_option: super::size_estimater::EstimateOption, + ) -> Option { + Some( + size_of::() + + self.header.as_ref().map_or(0, |h| h.len()) + + size_of::() * number_of_records, + ) + } + + fn size_estimator<'comp, 'buf>( + &'comp mut self, + input: &'buf [f64], + estimate_option: super::size_estimater::EstimateOption, + ) -> Option> { + Some(StaticSizeEstimator::new(self, input, estimate_option)) + } + fn total_bytes_in(&self) -> usize { self.total_bytes_in } diff --git a/ebi/src/encoder.rs b/ebi/src/encoder.rs index 3a54654..a15ef53 100644 --- a/ebi/src/encoder.rs +++ b/ebi/src/encoder.rs @@ -1,7 +1,10 @@ use cfg_if::cfg_if; use crate::{ - compressor::{Compressor, CompressorConfig, GenericCompressor}, + compressor::{ + size_estimater::{EstimateOption, SizeEstimator}, + Compressor, CompressorConfig, GenericCompressor, + }, format::{ self, serialize::{AsBytes, ToLe}, @@ -21,7 +24,7 @@ use std::{ pub enum ChunkOption { Full, RecordCount(usize), - ByteSizeBestEffort(usize), + ByteSize(usize), } impl From<&ChunkOption> for format::ChunkOption { @@ -37,7 +40,7 @@ impl From<&ChunkOption> for format::ChunkOption { reserved: 0, value: *value as u64, }, - ChunkOption::ByteSizeBestEffort(value) => format::ChunkOption { + ChunkOption::ByteSize(value) => format::ChunkOption { kind: format::ChunkOptionKind::ByteSize, reserved: 0, value: *value as u64, @@ -51,7 +54,7 @@ impl ChunkOption { match self { ChunkOption::Full => false, ChunkOption::RecordCount(count) => count * size_of::() <= total_bytes_in, - ChunkOption::ByteSizeBestEffort(n_bytes) => *n_bytes <= total_bytes_out, + ChunkOption::ByteSize(n_bytes) => *n_bytes <= total_bytes_out, } } } @@ -111,7 +114,6 @@ impl<'a, R: AlignedBufRead> BufWrapper<'a, R> { )) } - #[allow(unused)] pub fn set_n_consumed_bytes(&mut self, consumed: usize) { self.n_consumed_bytes = consumed; } @@ -167,7 +169,6 @@ impl FileWriter { + size_of::() } - #[allow(unused)] /// Returns the wrapper of the contents of the internal buffer, filling it with more data from the inner reader. /// If input has already reached EOF, the second element of tuple will be set. fn f64_buf(&mut self) -> Result<(BufWrapper<'_, R>, bool), io::Error> { @@ -388,20 +389,73 @@ impl<'a, R: AlignedBufRead> ChunkWriter<'a, R> { self.compressor.compress(buf.as_mut_slice()); } - ChunkOption::ByteSizeBestEffort(n) => { - // TODO: implement good estimation - // Simple heuristic to estimate the number of records to read. - let record_count = ((n as f64 / size_of::() as f64) * 1.5).ceil() as usize; - let mut buf = read_less_or_equal( - self.file_writer.input_mut(), - record_count * size_of::(), - )?; - if buf.is_empty() { - self.file_writer.set_reaches_eof(); - return Ok(()); - } - - self.compressor.compress(buf.as_mut_slice()); + ChunkOption::ByteSize(n) => { + let estimate_option = EstimateOption::BestEffort; + if self + .compressor + .estimate_size_static(0, estimate_option) + .is_some() + { + let mut number_of_records = 0; + // TODO: replace this with binary search + for i in 1..usize::MAX { + let size = self + .compressor + .estimate_size_static(i, EstimateOption::BestEffort) + .unwrap(); + + if size > n { + number_of_records = i - 1; + break; + } + } + + let buf = read_less_or_equal( + self.file_writer.input_mut(), + number_of_records * size_of::(), + )?; + self.compressor.compress(buf.as_slice()); + } else if self + .compressor + .size_estimator(&[], estimate_option) + .is_some() + { + let (mut buf, reaches_eof) = self.file_writer.f64_buf()?; + if reaches_eof { + drop(buf); + self.file_writer.set_reaches_eof(); + return Ok(()); + } + let mut estimator = self + .compressor + .size_estimator(buf.as_ref(), estimate_option) + .unwrap(); + + loop { + let size = estimator.size(); + if size > n { + if estimator.unload_value().is_err() + && estimate_option + .is_stricter_or_equal(&EstimateOption::Overestimate) + { + panic!("Failed to unload value from the estimator while we are in the overestimation or exact mode: {:?}", self.compressor.compression_scheme()); + } + break; + } + if estimator.advance().is_err() { + break; + }; + } + + estimator.compress(); + + buf.set_n_consumed_bytes(self.compressor.total_bytes_in()); + } else { + panic!( + "No size estimator is available for the compressor: {:?}", + self.compressor.compression_scheme() + ); + }; } } if self.compressor.total_bytes_in() == 0 { diff --git a/ebi/src/format/native.rs b/ebi/src/format/native.rs index bd7a35d..4cddb38 100644 --- a/ebi/src/format/native.rs +++ b/ebi/src/format/native.rs @@ -52,7 +52,7 @@ impl From<&FileConfig> for NativeFileConfig { let chunk_option = match config.chunk_option.kind { super::ChunkOptionKind::Full => ChunkOption::Full, super::ChunkOptionKind::RecordCount => ChunkOption::RecordCount(value), - super::ChunkOptionKind::ByteSize => ChunkOption::ByteSizeBestEffort(value), + super::ChunkOptionKind::ByteSize => ChunkOption::ByteSize(value), }; let field_type = config.field_type; let compression_scheme = config.compression_scheme; diff --git a/ebi/src/main.rs b/ebi/src/main.rs index d6d2c6b..cdfb9fe 100644 --- a/ebi/src/main.rs +++ b/ebi/src/main.rs @@ -58,7 +58,7 @@ fn main() { // let compressor_config = CompressorConfig::gorilla().build(); let compressor_config = CompressorConfig::buff().scale(scale).build(); // let chunk_option = ChunkOption::RecordCount(RECORD_COUNT * 3000 + 3); - let chunk_option = ChunkOption::ByteSizeBestEffort(1024 * 8); + let chunk_option = ChunkOption::ByteSize(1024 * 8); dbg!(chunk_option); // let binding = vec![0.4, 100000000.5, 0.5, 0.6, 0.7, 0.8, 0.9, 0.9, 1.0]; diff --git a/ebi/tests/query.rs b/ebi/tests/query.rs index 814c9f6..ea8dd02 100644 --- a/ebi/tests/query.rs +++ b/ebi/tests/query.rs @@ -776,7 +776,7 @@ mod helper { ChunkOption::RecordCount(n_records / n_chunks) } else { let byte_size = rng.gen_range(100..=1024 * 10); - ChunkOption::ByteSizeBestEffort(byte_size) + ChunkOption::ByteSize(byte_size) }; println!("n_records: {}, chunk_option: {:?}", n_records, chunk_option); let gen_values = || { diff --git a/ebi/tests/round_trip.rs b/ebi/tests/round_trip.rs index edf5c78..597b3fc 100644 --- a/ebi/tests/round_trip.rs +++ b/ebi/tests/round_trip.rs @@ -65,10 +65,7 @@ fn test_api_round_trip_uncompressed_bytesize() { .capacity(8000) .header(header) .build(); - test_round_trip( - compressor_config.into(), - ChunkOption::ByteSizeBestEffort(1024), - ); + test_round_trip(compressor_config.into(), ChunkOption::ByteSize(1024)); } #[test] @@ -80,10 +77,7 @@ fn test_api_round_trip_gorilla() { #[test] fn test_api_round_trip_gorilla_bytesize() { let compressor_config = CompressorConfig::gorilla().capacity(8000).build(); - test_round_trip( - compressor_config.into(), - ChunkOption::ByteSizeBestEffort(1024), - ); + test_round_trip(compressor_config.into(), ChunkOption::ByteSize(1024)); } #[test] @@ -95,10 +89,7 @@ fn test_api_round_trip_rle() { #[test] fn test_api_round_trip_rle_bytesize() { let compressor_config = CompressorConfig::rle().build(); - test_round_trip( - compressor_config.into(), - ChunkOption::ByteSizeBestEffort(1024), - ); + test_round_trip(compressor_config.into(), ChunkOption::ByteSize(1024)); } fn test_round_trip_with_scale( @@ -335,7 +326,7 @@ fn test_api_round_trip_buff_bytesize() { generate_and_write_random_f64_with_precision, compressor_config.into(), scale, - ChunkOption::ByteSizeBestEffort(1024), + ChunkOption::ByteSize(1024), ); } @@ -347,6 +338,6 @@ fn test_api_round_trip_buff_bytesize() { |n, _scale| vec![100.19; n], compressor_config.into(), 100, - ChunkOption::ByteSizeBestEffort(1024), + ChunkOption::ByteSize(1024), ); }