Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 115 additions & 3 deletions ebi/src/compressor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use buff::BUFFCompressor;
use buff::{BUFFCompressor, BUFFSizeEstimator};
use derive_builder::Builder;
use gorilla::GorillaCompressor;
use quick_impl::QuickImpl;
use run_length::RunLengthCompressor;
use size_estimater::{
AppendCompressingSizeEstimator, EstimateOption, SizeEstimator, StaticSizeEstimator,
};
// use run_length::RunLengthCompressor;
use uncompressed::UncompressedCompressor;

Expand All @@ -11,13 +14,35 @@ use crate::format::{self, CompressionScheme};
pub mod buff;
pub mod gorilla;
pub mod run_length;
pub mod size_estimater;
pub mod uncompressed;

const MAX_BUFFERS: usize = 5;
pub trait Compressor {
type SizeEstimatorImpl<'comp, 'buf>: SizeEstimator + Sized
where
Self: 'comp;

/// Perform the compression and return the size of the compressed data.
fn compress(&mut self, input: &[f64]);

/// If the size of the compressed data is known by O(1) operation, return it.
fn estimate_size_static(
&self,
_number_of_records: usize,
_estimate_option: EstimateOption,
) -> Option<usize> {
None
}

fn size_estimator<'comp, 'buf>(
&'comp mut self,
_input: &'buf [f64],
_estimate_option: EstimateOption,
) -> Option<Self::SizeEstimatorImpl<'comp, 'buf>> {
None
}

/// Returns the total number of input bytes which have been processed by this Compressor.
fn total_bytes_in(&self) -> usize;

Expand All @@ -42,9 +67,7 @@ pub trait AppendableCompressor: Compressor {
/// This method is NOT re-compressing the existing data.
/// Returns the total size of the compressed data.
fn append_compress(&mut self, input: &[f64]);
}

pub trait RewindableCompressor: AppendableCompressor {
/// Rewind the n records from the end of the compressed data
/// Returns true if the rewind is successful, false otherwise.
fn rewind(&mut self, n: usize) -> bool;
Expand All @@ -58,9 +81,78 @@ pub enum GenericCompressor {
BUFF(BUFFCompressor),
}

#[derive(Debug, PartialEq, PartialOrd, QuickImpl)]
pub enum GenericSizeEstimator<'comp, 'buf> {
#[quick_impl(impl From)]
Uncompressed(StaticSizeEstimator<'comp, 'buf, UncompressedCompressor>),
#[quick_impl(impl From)]
RLE(AppendCompressingSizeEstimator<'comp, 'buf, RunLengthCompressor>),
#[quick_impl(impl From)]
Gorilla(AppendCompressingSizeEstimator<'comp, 'buf, GorillaCompressor>),
#[quick_impl(impl From)]
BUFF(BUFFSizeEstimator<'comp, 'buf>),
}

macro_rules! impl_generic_size_estimator {
($enum_name:ident, $($variant:ident),*) => {
impl<'comp, 'buf> SizeEstimator for $enum_name<'comp, 'buf> {
fn size(&self) -> usize {
match self {
$( $enum_name::$variant(e) => e.size(), )*
}
}

fn advance_n(&mut self, n: usize) -> size_estimater::Result<()> {
match self {
$( $enum_name::$variant(e) => e.advance_n(n), )*
}
}

fn advance(&mut self) -> size_estimater::Result<()> {
match self {
$( $enum_name::$variant(e) => e.advance(), )*
}
}

fn unload_value(&mut self) -> size_estimater::Result<()> {
match self {
$( $enum_name::$variant(e) => e.unload_value(), )*
}
}

fn number_of_records_advanced(&self) -> usize {
match self {
$( $enum_name::$variant(e) => e.number_of_records_advanced(), )*
}
}

fn inner_buffer(&self) -> &[f64] {
match self {
$( $enum_name::$variant(e) => e.inner_buffer(), )*
}
}

fn estimate_option(&self) -> EstimateOption {
match self {
$( $enum_name::$variant(e) => e.estimate_option(), )*
}
}

fn compress(self) -> usize {
match self {
$( $enum_name::$variant(e) => e.compress(), )*
}
}
}
};
}

impl_generic_size_estimator!(GenericSizeEstimator, Uncompressed, RLE, Gorilla, BUFF);

macro_rules! impl_generic_compressor {
($enum_name:ident, $($variant:ident),*) => {
impl Compressor for $enum_name {
type SizeEstimatorImpl<'comp, 'buf> = GenericSizeEstimator<'comp, 'buf>;
fn compress(&mut self, input: &[f64]) {
match self {
$( $enum_name::$variant(c) => c.compress(input), )*
Expand Down Expand Up @@ -96,6 +188,26 @@ macro_rules! impl_generic_compressor {
$( $enum_name::$variant(c) => c.reset(), )*
}
}

fn estimate_size_static(
&self,
number_of_records: usize,
estimate_option: EstimateOption,
) -> Option<usize> {
match self {
$( $enum_name::$variant(c) => c.estimate_size_static(number_of_records, estimate_option), )*
}
}

fn size_estimator<'comp, 'buf>(
&'comp mut self,
input: &'buf [f64],
estimate_option: EstimateOption,
) -> Option<Self::SizeEstimatorImpl<'comp, 'buf>> {
match self {
$( $enum_name::$variant(c) => c.size_estimator(input, estimate_option).map(|se| se.into()), )*
}
}
}
};
}
Expand Down
170 changes: 168 additions & 2 deletions ebi/src/compressor/buff.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use crate::compression_common::buff::precision_bound::{self, PRECISION_MAP};
use crate::{
compression_common::buff::precision_bound::{self, PRECISION_MAP},
compressor::size_estimater::SizeEstimatorError,
};

use super::Compressor;
use super::{
size_estimater::{self, EstimateOption, SizeEstimator},
Compressor,
};

#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub struct BUFFCompressor {
Expand Down Expand Up @@ -30,10 +36,20 @@ impl BUFFCompressor {
}

impl Compressor for BUFFCompressor {
type SizeEstimatorImpl<'comp, 'buf> = BUFFSizeEstimator<'comp, 'buf>;

fn compress(&mut self, data: &[f64]) {
self.compress_with_precalculated(Precalculated::precalculate(self.scale, data));
}

fn size_estimator<'comp, 'buf>(
&'comp mut self,
input: &'buf [f64],
estimate_option: EstimateOption,
) -> Option<Self::SizeEstimatorImpl<'comp, 'buf>> {
Some(BUFFSizeEstimator::new(estimate_option, input, self))
}

fn total_bytes_in(&self) -> usize {
self.total_bytes_in
}
Expand All @@ -57,6 +73,94 @@ impl Compressor for BUFFCompressor {
}
}

#[derive(Debug, PartialEq, PartialOrd)]
pub struct BUFFSizeEstimator<'comp, 'buf> {
comp: &'comp mut BUFFCompressor,
buffer: &'buf [f64],
estimate_option: EstimateOption,
cursor: usize,
prev_min_max: Option<(usize, usize)>,
precalculated: Precalculated,
}

impl<'comp, 'buf> BUFFSizeEstimator<'comp, 'buf> {
pub fn new(
estimate_option: EstimateOption,
buffer: &'buf [f64],
comp: &'comp mut BUFFCompressor,
) -> Self {
let scale = comp.scale;
Self {
comp,
buffer,
estimate_option,
cursor: 0,
prev_min_max: None,
precalculated: Precalculated::new(scale),
}
}
}

impl<'comp, 'buf> SizeEstimator for BUFFSizeEstimator<'comp, 'buf> {
fn size(&self) -> usize {
self.precalculated.compressed_size()
}

fn advance_n(&mut self, n: usize) -> size_estimater::Result<()> {
if self.buffer.len() < self.cursor + n {
return Err(SizeEstimatorError::EndOfBuffer(
self.buffer.len() - self.cursor,
));
}
let appending_values = &self.buffer[self.cursor..self.cursor + n];

self.precalculated.append_values(&appending_values[..n - 1]);
self.prev_min_max = Some((self.precalculated.min, self.precalculated.max));
self.precalculated.append_values(&appending_values[n - 1..]);
self.cursor += n;

Ok(())
}

fn unload_value(&mut self) -> size_estimater::Result<()> {
if self.cursor == 0 {
return Err(SizeEstimatorError::EndOfBuffer(0));
}

let (min, max) = match self.prev_min_max {
Some((min, max)) => (min, max),
None => {
return Err(SizeEstimatorError::EndOfBuffer(0));
}
};

self.cursor -= 1;
self.precalculated.rewind_values(1, min, max);

self.prev_min_max = None;

Ok(())
}

fn number_of_records_advanced(&self) -> usize {
self.cursor
}

fn inner_buffer(&self) -> &[f64] {
self.buffer
}

fn estimate_option(&self) -> EstimateOption {
self.estimate_option
}

fn compress(self) -> usize {
self.comp.compress_with_precalculated(self.precalculated);

self.comp.total_bytes_buffered()
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct Precalculated {
min: usize,
Expand All @@ -66,6 +170,68 @@ struct Precalculated {
}

impl Precalculated {
pub fn new(scale: usize) -> Self {
let precision = if scale == 0 {
0
} else {
(scale as f32).log10() as usize
};
Self {
min: 0,
max: 0,
fixed_representation_values: Vec::new(),
precision,
}
}

pub fn rewind_values(&mut self, n: usize, prev_min: usize, prev_max: usize) {
if n >= self.number_of_records() {
self.min = 0;
self.max = 0;
self.fixed_representation_values.clear();
return;
}

self.min = prev_min;
self.max = prev_max;

self.fixed_representation_values
.truncate(self.number_of_records() - n);
}

pub fn append_values(&mut self, values: &[f64]) {
let mut min = Precalculated::min_fixed(self).unwrap_or(i64::MAX);
let mut min_index = self.min;
let mut max = Precalculated::max_fixed(self).unwrap_or(i64::MIN);
let mut max_index = self.max;

let fractional_part_bits_length = self.fractional_part_bits_length() as i32;
let number_of_records = self.number_of_records();

for (i, f) in values
.iter()
.enumerate()
.map(|(i, &f)| (i + number_of_records, f))
{
let fixed = precision_bound::into_fixed_representation_with_fractional_part_bits_length(
f,
fractional_part_bits_length,
);
if fixed < min {
min = fixed;
min_index = i;
}
if fixed > max {
max = fixed;
max_index = i;
}
self.fixed_representation_values.push(fixed);
}

self.min = min_index;
self.max = max_index;
}

pub fn precalculate(scale: usize, floats: &[f64]) -> Self {
let mut fixed_representation_values = Vec::new();

Expand Down
Loading