From 8279463070bad0d97c33de11a41adcb5a435732b Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Thu, 22 May 2025 19:51:57 -0700 Subject: [PATCH 1/7] More efficient compaction --- locustdb-serialization/src/event_buffer.rs | 79 ++- src/bin/db_inspector.rs | 4 +- src/bitvec.rs | 2 +- src/disk_store/storage.rs | 2 +- src/engine/data_types/mod.rs | 14 +- src/engine/execution/executor.rs | 2 +- src/engine/operators/aggregate.rs | 4 +- src/engine/operators/binary_operator.rs | 6 +- .../operators/compact_nullable_nullable.rs | 2 +- src/engine/operators/compact_with_nullable.rs | 2 +- src/engine/operators/filter.rs | 2 +- src/engine/operators/filter_nullable.rs | 6 +- src/engine/operators/fuse_nulls.rs | 8 +- src/engine/operators/is_null.rs | 4 +- src/engine/operators/nonzero_indices.rs | 2 +- src/engine/operators/null_vec_like.rs | 2 +- src/engine/operators/select.rs | 2 +- src/engine/operators/to_val.rs | 2 +- src/ingest/buffer.rs | 50 +- src/ingest/colgen.rs | 2 +- src/ingest/input_column.rs | 13 + src/ingest/raw_val.rs | 12 + src/logging_client/mod.rs | 35 +- src/mem_store/column.rs | 250 ++++++++- src/mem_store/column_buffer.rs | 495 ++++++++++++++++++ src/mem_store/column_builder.rs | 164 ------ src/mem_store/lru.rs | 2 +- src/mem_store/mod.rs | 5 +- src/mem_store/partition.rs | 42 +- src/mem_store/table.rs | 5 +- src/scheduler/inner_locustdb.rs | 124 +++-- src/server/mod.rs | 2 +- src/stringpack.rs | 6 +- tests/ingestion_test.rs | 91 +++- 34 files changed, 1102 insertions(+), 341 deletions(-) create mode 100644 src/mem_store/column_buffer.rs delete mode 100644 src/mem_store/column_builder.rs diff --git a/locustdb-serialization/src/event_buffer.rs b/locustdb-serialization/src/event_buffer.rs index b9e249bd..ccfa910f 100644 --- a/locustdb-serialization/src/event_buffer.rs +++ b/locustdb-serialization/src/event_buffer.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; use crate::api::AnyVal; use crate::default_reader_options; @@ -11,10 +12,77 @@ pub struct EventBuffer { #[derive(Default, Clone, Debug)] pub struct TableBuffer { - pub len: u64, - pub columns: HashMap, + len: u64, + columns: HashMap, } +impl TableBuffer { + pub fn new(columns: HashMap) -> Self { + let len = columns.values().map(|c| c.data.len()).max().unwrap_or(0) as u64; + assert!( + columns + .values() + .all(|c| c.data.len() == len as usize || matches!(c.data, ColumnData::Empty)), + "All columns must have the same length" + ); + Self { len, columns } + } + + pub fn len(&self) -> usize { + self.len as usize + } + + pub fn insert(&mut self, column_name: String, column: ColumnBuffer) { + if self.len == 0 { + self.len = column.data.len() as u64; + } else if self.len != column.data.len() as u64 { + panic!( + "Column {} has length {} but table has length {}", + column_name, + column.data.len(), + self.len + ); + } + self.columns.insert(column_name, column); + } + + pub fn push_row_and_timestamp>( + &mut self, + row: Row, + ) -> usize { + let time_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as f64 + / 1000.0; + let mut cols = 0; + let mut timestamp_provided = false; + for (column_name, value) in row { + self.columns + .entry(column_name.to_string()) + .or_default() + .push(value, self.len); + timestamp_provided |= column_name == "timestamp"; + cols += 1; + } + if !timestamp_provided { + self.columns + .entry("timestamp".to_string()) + .or_default() + .push(AnyVal::Float(time_millis), self.len); + } + self.len += 1; + cols + } + + pub fn columns(&self) -> impl Iterator { + self.columns.iter() + } + + pub fn into_columns(self) -> HashMap { + self.columns + } +} #[derive(Default, Clone, Debug)] pub struct ColumnBuffer { pub data: ColumnData, @@ -164,6 +232,13 @@ impl EventBuffer { .reborrow() .init_columns(table.columns.len() as u32); for (j, (colname, column)) in table.columns.iter().enumerate() { + assert!( + column.data.len() == table.len as usize || column.data.len() == 0, + "Column {} has length {} but table has length {}", + colname, + column.data.len(), + table.len, + ); let mut column_builder = columns.reborrow().get(j as u32); column_builder.set_name(colname); match &column.data { diff --git a/src/bin/db_inspector.rs b/src/bin/db_inspector.rs index 79e5b172..04234582 100644 --- a/src/bin/db_inspector.rs +++ b/src/bin/db_inspector.rs @@ -97,9 +97,9 @@ fn main() { ); if opts.wal > 2 { for (name, table) in &segment.data.as_ref().tables { - println!(" Table {} has {} columns", name, table.columns.len()); + println!(" Table {} has {} columns", name, table.columns().count()); if opts.wal > 3 { - for col in &table.columns { + for (col, _) in table.columns() { println!(" {:?}", col); } } diff --git a/src/bitvec.rs b/src/bitvec.rs index 02c7ad95..8e9bae9d 100644 --- a/src/bitvec.rs +++ b/src/bitvec.rs @@ -31,7 +31,7 @@ impl BitVec for Vec { } } -impl BitVec for &'_ [u8] { +impl BitVec for [u8] { fn is_set(&self, index: usize) -> bool { let slot = index >> 3; slot < self.len() && self[slot] & (1 << (index as u8 & 7)) > 0 diff --git a/src/disk_store/storage.rs b/src/disk_store/storage.rs index a50803f8..c9bd905e 100644 --- a/src/disk_store/storage.rs +++ b/src/disk_store/storage.rs @@ -170,7 +170,7 @@ impl Storage { "Found wal segment {} with id {} and {} rows in {} tables", wal_file.display(), wal_segment.id, - wal_segment.data.tables.values().map(|t| t.len).sum::(), + wal_segment.data.tables.values().map(|t| t.len()).sum::(), wal_segment.data.tables.len(), ); if wal_segment.id < earliest_uncommited_wal_id { diff --git a/src/engine/data_types/mod.rs b/src/engine/data_types/mod.rs index 8a79c228..15d0bf4f 100644 --- a/src/engine/data_types/mod.rs +++ b/src/engine/data_types/mod.rs @@ -8,13 +8,17 @@ mod vec_data; use ordered_float::OrderedFloat; -pub use self::types::*; -pub use self::data::*; -pub use self::vec_data::*; pub use self::byte_slices::*; +pub use self::data::*; +pub use self::nullable_vec_data::*; pub use self::scalar_data::*; +pub use self::types::*; pub use self::val_rows::*; -pub use self::nullable_vec_data::*; +pub use self::vec_data::*; #[allow(non_camel_case_types)] -pub type of64 = OrderedFloat; \ No newline at end of file +pub type of64 = OrderedFloat; + +pub fn vec_f64_to_vec_of64(vec: Vec) -> Vec { + unsafe { std::mem::transmute::, Vec>(vec) } +} diff --git a/src/engine/execution/executor.rs b/src/engine/execution/executor.rs index 82fa110e..304d06d3 100644 --- a/src/engine/execution/executor.rs +++ b/src/engine/execution/executor.rs @@ -707,7 +707,7 @@ impl<'a> QueryExecutor<'a> { if let Some(present) = scratchpad.try_get_null_map(output) { print!(" null map: "); for i in 0..cmp::min(present.len() * 8, 100) { - if (&*present).is_set(i) { + if (*present).is_set(i) { print!("1") } else { print!("0") diff --git a/src/engine/operators/aggregate.rs b/src/engine/operators/aggregate.rs index af2e4cb6..58f23d04 100644 --- a/src/engine/operators/aggregate.rs +++ b/src/engine/operators/aggregate.rs @@ -266,7 +266,7 @@ where } for i in 0..nums.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { let g = grouping[i].cast_usize(); accumulators[g] = A::accumulate(accumulators[g], nums[i]); accumulators_present.set(g); @@ -411,7 +411,7 @@ where let mut any_overflow = false; for i in 0..nums.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { let g = grouping[i].cast_usize(); let (result, overflow) = A::accumulate_checked(accumulators[g], nums[i]); any_overflow |= overflow; diff --git a/src/engine/operators/binary_operator.rs b/src/engine/operators/binary_operator.rs index 3b7fbd55..c43c0191 100644 --- a/src/engine/operators/binary_operator.rs +++ b/src/engine/operators/binary_operator.rs @@ -275,7 +275,7 @@ impl<'a, LHS, RHS, Out, Op> VecOperator<'a> for NullableCheckedBinaryOperator VecOperator<'a> for NullableCheckedBinaryVSOperator< let mut any_overflow = false; for (i, &l) in lhs.iter().enumerate() { let (result, overflow) = Op::perform_checked(l, rhs); - any_overflow |= overflow && (&*present).is_set(i); + any_overflow |= overflow && (*present).is_set(i); output.push(result); } if any_overflow { Err(QueryError::Overflow) } else { Ok(()) } @@ -367,7 +367,7 @@ impl<'a, LHS, RHS, Out, Op> VecOperator<'a> for NullableCheckedBinarySVOperator< let mut any_overflow = false; for (i, &r) in rhs.iter().enumerate() { let (result, overflow) = Op::perform_checked(lhs, r); - any_overflow |= overflow && (&*present).is_set(i); + any_overflow |= overflow && (*present).is_set(i); output.push(result); } if any_overflow { Err(QueryError::Overflow) } else { Ok(()) } diff --git a/src/engine/operators/compact_nullable_nullable.rs b/src/engine/operators/compact_nullable_nullable.rs index 0451e576..b7c4aac1 100644 --- a/src/engine/operators/compact_nullable_nullable.rs +++ b/src/engine/operators/compact_nullable_nullable.rs @@ -15,7 +15,7 @@ impl<'a, T: VecData + 'a, U: GenericIntVec> VecOperator<'a> for CompactNul // Remove all unmodified entries let mut j = 0; for (i, &s) in select.iter().take(data.len()).enumerate() { - if s > U::zero() && (&*select_present).is_set(i) { + if s > U::zero() && (*select_present).is_set(i) { data[j] = data[i]; if present.is_set(i) { present.set(j); diff --git a/src/engine/operators/compact_with_nullable.rs b/src/engine/operators/compact_with_nullable.rs index bbc8ca08..cfa7bf92 100644 --- a/src/engine/operators/compact_with_nullable.rs +++ b/src/engine/operators/compact_with_nullable.rs @@ -15,7 +15,7 @@ impl<'a, T: VecData + 'a, U: GenericIntVec> VecOperator<'a> for CompactWit // Remove all unmodified entries let mut j = 0; for (i, &s) in select.iter().take(data.len()).enumerate() { - if s > U::zero() && (&*select_present).is_set(i) { + if s > U::zero() && (*select_present).is_set(i) { data[j] = data[i]; j += 1; } diff --git a/src/engine/operators/filter.rs b/src/engine/operators/filter.rs index dc772fa6..3f38a61f 100644 --- a/src/engine/operators/filter.rs +++ b/src/engine/operators/filter.rs @@ -73,7 +73,7 @@ where filtered.clear(); } for i in 0..data.len() { - if filter[i] > 0 && (&*present).is_set(i) { + if filter[i] > 0 && (*present).is_set(i) { filtered.push(data[i]); } } diff --git a/src/engine/operators/filter_nullable.rs b/src/engine/operators/filter_nullable.rs index f4d9001c..6203466b 100644 --- a/src/engine/operators/filter_nullable.rs +++ b/src/engine/operators/filter_nullable.rs @@ -24,7 +24,7 @@ where } for (i, (d, &select)) in data.iter().zip(filter.iter()).enumerate() { if select > 0 { - if BitVec::is_set(&&*present, i) { + if BitVec::is_set(&*present, i) { filtered_present.set(filtered.len()); } filtered.push(*d); @@ -82,8 +82,8 @@ where } } for i in 0..data.len() { - if filter[i] > 0 && (&*filter_present).is_set(i) { - if BitVec::is_set(&&*input_present, i) { + if filter[i] > 0 && (*filter_present).is_set(i) { + if BitVec::is_set(&*input_present, i) { filtered_present.set(filtered.len()); } filtered.push(data[i]); diff --git a/src/engine/operators/fuse_nulls.rs b/src/engine/operators/fuse_nulls.rs index 03fc260a..a7e067b7 100644 --- a/src/engine/operators/fuse_nulls.rs +++ b/src/engine/operators/fuse_nulls.rs @@ -15,7 +15,7 @@ impl<'a> VecOperator<'a> for FuseNullsI64 { let mut fused = scratchpad.get_mut(self.fused); if stream { fused.clear(); } for i in 0..input.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { fused.push(input[i]); } else { fused.push(I64_NULL); @@ -91,7 +91,7 @@ impl<'a> VecOperator<'a> for FuseNullsStr<'a> { let mut fused = scratchpad.get_mut(self.fused); if stream { fused.clear(); } for i in 0..input.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { fused.push(Some(input[i])); } else { fused.push(None); @@ -176,7 +176,7 @@ impl<'a, T: GenericIntVec> VecOperator<'a> for FuseIntNulls { let mut fused = scratchpad.get_mut(self.fused); if stream { fused.clear(); } for i in 0..input.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { fused.push(input[i] + self.offset); } else { fused.push(T::zero()); @@ -266,7 +266,7 @@ impl<'a> VecOperator<'a> for FuseNullsF64 { let mut fused = scratchpad.get_mut(self.fused); if stream { fused.clear(); } for i in 0..input.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { fused.push(input[i]); } else { fused.push(F64_NULL); diff --git a/src/engine/operators/is_null.rs b/src/engine/operators/is_null.rs index 3459c90f..51784130 100644 --- a/src/engine/operators/is_null.rs +++ b/src/engine/operators/is_null.rs @@ -14,7 +14,7 @@ impl<'a> VecOperator<'a> for IsNull { let mut is_null = scratchpad.get_mut(self.is_null); if stream { is_null.clear(); } for i in 0..len { - if (&*present).is_set(i) { + if (*present).is_set(i) { is_null.push(false as u8); } else { is_null.push(true as u8); @@ -52,7 +52,7 @@ impl<'a> VecOperator<'a> for IsNotNull { let mut is_not_null = scratchpad.get_mut(self.is_not_null); if stream { is_not_null.clear(); } for i in 0..len { - if (&*present).is_set(i) { + if (*present).is_set(i) { is_not_null.push(true as u8); } else { is_not_null.push(false as u8); diff --git a/src/engine/operators/nonzero_indices.rs b/src/engine/operators/nonzero_indices.rs index 5177300e..203b924a 100644 --- a/src/engine/operators/nonzero_indices.rs +++ b/src/engine/operators/nonzero_indices.rs @@ -54,7 +54,7 @@ impl<'a, T: GenericIntVec + CastUsize, U: GenericIntVec> VecOperator<'a> f let mut unique = scratchpad.get_mut(self.output); if stream { unique.clear(); } for (index, &n) in input.iter().enumerate() { - if n > T::zero() && (&*input_present).is_set(index) { + if n > T::zero() && (*input_present).is_set(index) { unique.push(U::from(index).unwrap() + U::from(self.offset).unwrap()); } } diff --git a/src/engine/operators/null_vec_like.rs b/src/engine/operators/null_vec_like.rs index d6937615..b665633f 100644 --- a/src/engine/operators/null_vec_like.rs +++ b/src/engine/operators/null_vec_like.rs @@ -26,7 +26,7 @@ impl<'a> VecOperator<'a> for NullVecLike { let mut count = 0; let (data, present) = scratchpad.get_nullable(self.input.nullable_u8()); for (i, d) in data.iter().enumerate() { - if *d != 0 && BitVec::is_set(&&*present, i) { + if *d != 0 && BitVec::is_set(&*present, i) { count += 1; } } diff --git a/src/engine/operators/select.rs b/src/engine/operators/select.rs index 91c8d5d7..2b87b8ae 100644 --- a/src/engine/operators/select.rs +++ b/src/engine/operators/select.rs @@ -56,7 +56,7 @@ impl<'a, T: 'a> VecOperator<'a> for SelectNullable where T: VecData { } for (i, &index) in indices.iter().enumerate() { data_out.push(data[index]); - if (&*present).is_set(index) { present_out.set(i) } + if (*present).is_set(index) { present_out.set(i) } } Ok(()) } diff --git a/src/engine/operators/to_val.rs b/src/engine/operators/to_val.rs index 34507d3c..4bb29bae 100644 --- a/src/engine/operators/to_val.rs +++ b/src/engine/operators/to_val.rs @@ -60,7 +60,7 @@ impl<'a, T: VecData + Cast> + 'a> VecOperator<'a> for NullableToVal<' let mut vals = scratchpad.get_mut(self.vals); if stream { vals.clear(); } for i in 0..input.len() { - if (&*present).is_set(i) { + if (*present).is_set(i) { vals.push(input[i].cast()); } else { vals.push(Val::Null); diff --git a/src/ingest/buffer.rs b/src/ingest/buffer.rs index f978fb78..c3455ec6 100644 --- a/src/ingest/buffer.rs +++ b/src/ingest/buffer.rs @@ -1,14 +1,15 @@ +use datasize::DataSize; use ordered_float::OrderedFloat; use crate::ingest::input_column::InputColumn; use crate::ingest::raw_val::RawVal; -use crate::mem_store::raw_col::MixedCol; +use crate::mem_store::column_buffer::ColumnBuffer; use std::cmp; use std::collections::HashMap; -#[derive(PartialEq, Debug, Clone, Default)] +#[derive(Debug, Clone, Default, DataSize)] pub struct Buffer { - pub buffer: HashMap, + pub buffer: HashMap, pub length: usize, } @@ -19,8 +20,8 @@ impl Buffer { let buffered_col = self .buffer .entry(name) - .or_insert_with(|| MixedCol::with_nulls(len)); - buffered_col.push(input_val); + .or_insert_with(|| ColumnBuffer::null(len)); + buffered_col.push_val(input_val); } self.length += 1; self.extend_to_largest(); @@ -33,22 +34,26 @@ impl Buffer { let buffered_col = self .buffer .entry(name) - .or_insert_with(|| MixedCol::with_nulls(len)); + .or_insert_with(|| ColumnBuffer::null(len)); match input_col { - InputColumn::Int(vec) => buffered_col.push_ints(vec), - InputColumn::Str(vec) => buffered_col.push_strings(vec), - InputColumn::Float(vec) => buffered_col.push_floats(vec), + InputColumn::Int(vec) => buffered_col.push_ints(vec, None), + InputColumn::Str(vec) => { + buffered_col.push_strings(vec.iter().map(|s| s.as_str()), None) + } + InputColumn::Float(vec) => { + buffered_col.push_floats(vec.into_iter().map(OrderedFloat), None) + } InputColumn::Null(c) => buffered_col.push_nulls(c), InputColumn::Mixed(vec) => { for val in vec { - buffered_col.push(val); + buffered_col.push_val(val); } } InputColumn::NullableFloat(c, data) => { let mut next_i = 0; for (i, f) in data { buffered_col.push_nulls((i - next_i) as usize); - buffered_col.push(RawVal::Float(OrderedFloat(f))); + buffered_col.push_val(RawVal::Float(OrderedFloat(f))); next_i = i + 1; } buffered_col.push_nulls((c - next_i) as usize); @@ -57,28 +62,27 @@ impl Buffer { let mut next_i = 0; for (i, f) in data { buffered_col.push_nulls((i - next_i) as usize); - buffered_col.push(RawVal::Int(f)); + buffered_col.push_val(RawVal::Int(f)); next_i = i + 1; } buffered_col.push_nulls((c - next_i) as usize); } } + assert!(buffered_col.len() > self.length); + assert!(new_length == 0 || new_length == buffered_col.len()); new_length = cmp::max(new_length, buffered_col.len()) } + assert!(new_length > self.length); self.length = new_length; self.extend_to_largest(); } pub fn push_untyped_cols(&mut self, columns: HashMap>) { - let len = self.len(); let mut new_length = 0; for (name, input_vals) in columns { - let buffered_col = self - .buffer - .entry(name) - .or_insert_with(|| MixedCol::with_nulls(len)); + let buffered_col = self.buffer.entry(name).or_default(); for input_val in input_vals { - buffered_col.push(input_val); + buffered_col.push_val(input_val); } new_length = cmp::max(new_length, buffered_col.len()) } @@ -100,16 +104,6 @@ impl Buffer { self.length } - pub fn heap_size_of_children(&self) -> usize { - self.buffer - .values() - .map(|v| { - // Currently does not take into account the memory of String. - v.heap_size_of_children() - }) - .sum() - } - pub fn filter(&self, columns: &[String]) -> Buffer { let mut columns: HashMap<_, _> = columns .iter() diff --git a/src/ingest/colgen.rs b/src/ingest/colgen.rs index 0aa178c8..b2924659 100644 --- a/src/ingest/colgen.rs +++ b/src/ingest/colgen.rs @@ -279,7 +279,7 @@ pub fn event_buffer_from_raw_vals( let mut event_buffer = EventBuffer::default(); let mut table_buffer = TableBuffer::default(); for (colname, values) in columns { - table_buffer.columns.insert( + table_buffer.insert( colname, ColumnBuffer { data: ColumnData::Mixed(values.into_iter().map(RawVal::into).collect()), diff --git a/src/ingest/input_column.rs b/src/ingest/input_column.rs index 74b06402..b67ac00f 100644 --- a/src/ingest/input_column.rs +++ b/src/ingest/input_column.rs @@ -5,6 +5,7 @@ use crate::Value; pub enum InputColumn { Int(Vec), Float(Vec), + // (Length, [(Index, Value)]) NullableFloat(u64, Vec<(u64, f64)>), NullableInt(u64, Vec<(u64, i64)>), Str(Vec), @@ -58,4 +59,16 @@ impl InputColumn { } } } + + pub fn len(&self) -> usize { + match self { + InputColumn::Int(data) => data.len(), + InputColumn::Float(data) => data.len(), + InputColumn::Str(data) => data.len(), + InputColumn::NullableFloat(rows, _) => *rows as usize, + InputColumn::NullableInt(rows, _) => *rows as usize, + InputColumn::Mixed(data) => data.len(), + InputColumn::Null(rows) => *rows, + } + } } diff --git a/src/ingest/raw_val.rs b/src/ingest/raw_val.rs index 6b0b5d84..4f0c8279 100644 --- a/src/ingest/raw_val.rs +++ b/src/ingest/raw_val.rs @@ -1,6 +1,7 @@ use std::fmt; use std::mem; +use datasize::DataSize; use locustdb_serialization::api::AnyVal; use ordered_float::OrderedFloat; use serde::{Deserialize, Serialize}; @@ -120,3 +121,14 @@ impl From for AnyVal { } } } + +impl DataSize for RawVal { + const IS_DYNAMIC: bool = true; + const STATIC_HEAP_SIZE: usize = 0; + fn estimate_heap_size(&self) -> usize { + match *self { + RawVal::Str(ref s) => s.capacity() * mem::size_of::(), + RawVal::Int(_) | RawVal::Float(_) | RawVal::Null => 0, + } + } +} diff --git a/src/logging_client/mod.rs b/src/logging_client/mod.rs index 1d6b6085..7e825753 100644 --- a/src/logging_client/mod.rs +++ b/src/logging_client/mod.rs @@ -1,7 +1,7 @@ use std::fmt; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Condvar, Mutex}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime}; use locustdb_serialization::api::{ AnyVal, Column, ColumnNameRequest, ColumnNameResponse, EncodingOpts, MultiQueryRequest, @@ -128,11 +128,6 @@ impl LoggingClient { } pub fn log>(&mut self, table: &str, row: Row) { - let time_millis = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as f64 - / 1000.0; let mut warncount = 0; loop { if self.buffer_size.load(std::sync::atomic::Ordering::SeqCst) as usize @@ -157,25 +152,9 @@ impl LoggingClient { } let mut events = self.events.lock().unwrap(); let table = events.tables.entry(table.to_string()).or_default(); - let mut timestamp_provided = false; - for (column_name, value) in row { - self.buffer_size - .fetch_add(8, std::sync::atomic::Ordering::SeqCst); - table - .columns - .entry(column_name.to_string()) - .or_default() - .push(value, table.len); - timestamp_provided |= column_name == "timestamp"; - } - if !timestamp_provided { - table - .columns - .entry("timestamp".to_string()) - .or_default() - .push(AnyVal::Float(time_millis), table.len); - } - table.len += 1; + let cols = table.push_row_and_timestamp(row); + self.buffer_size + .fetch_add(8 * cols as u64, std::sync::atomic::Ordering::SeqCst); self.total_events += 1; } @@ -260,11 +239,7 @@ impl BackgroundWorker { } log::info!( "Creating request data for {} events", - buffer - .tables - .values() - .map(|t| t.columns.values().next().map(|c| c.data.len()).unwrap_or(0)) - .sum::() + buffer.tables.values().map(|t| t.len()).sum::() ); self.buffer_size .store(0, std::sync::atomic::Ordering::SeqCst); diff --git a/src/mem_store/column.rs b/src/mem_store/column.rs index b83d51e5..3d28c15a 100644 --- a/src/mem_store/column.rs +++ b/src/mem_store/column.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use crate::engine::data_types::*; use crate::mem_store::*; +use crate::stringpack::StringPackerIterator; #[derive(Serialize, Deserialize)] pub struct Column { @@ -24,8 +25,12 @@ pub trait DataSource: fmt::Debug + Sync + Send { fn range(&self) -> Option<(i64, i64)>; fn codec(&self) -> Codec; fn len(&self) -> usize; - fn data_sections(&self) -> Vec<&dyn Data>; + fn data_sections<'a>(&'a self) -> Vec<&'a dyn Data<'a>>; fn full_type(&self) -> Type; + + fn decode<'a>(&'a self) -> BoxedData<'a> { + decode(&self.codec(), &self.data_sections()) + } } impl DataSource for Arc { @@ -569,3 +574,246 @@ impl From>> for DataSection { DataSection::F64(vec) } } + +fn decode<'a>(codec: &Codec, sections: &[&'a dyn Data<'a>]) -> BoxedData<'a> { + let mut section_stack: Vec> = vec![sections[0].slice_box(0, sections[0].len())]; + for codec_op in codec.ops() { + let arg0 = section_stack.first().unwrap(); + let decoded = match codec_op { + CodecOp::Nullable => { + let present = section_stack.pop().unwrap(); + let mut data = section_stack.pop().unwrap(); + data.make_nullable(present.cast_ref_u8()) + } + CodecOp::Add(encoding_type, value) => match encoding_type { + EncodingType::U8 => Box::new( + arg0.cast_ref_u8() + .iter() + .map(|&v| v as i64 + value) + .collect::>(), + ), + EncodingType::U16 => Box::new( + arg0.cast_ref_u16() + .iter() + .map(|&v| v as i64 + value) + .collect::>(), + ), + EncodingType::U32 => Box::new( + arg0.cast_ref_u32() + .iter() + .map(|&v| v as i64 + value) + .collect::>(), + ), + _ => panic!( + "Unsupported encoding type for CodecOp::Add: {:?}", + encoding_type + ), + }, + CodecOp::Delta(encoding_type) => match encoding_type { + EncodingType::U8 => { + let mut decoded = Vec::with_capacity(arg0.len()); + let deltas = arg0.cast_ref_u8(); + let mut current = 0; + for delta in deltas { + current += *delta as i64; + decoded.push(current); + } + Box::new(decoded) + } + EncodingType::U16 => { + let mut decoded = Vec::with_capacity(arg0.len()); + let deltas = arg0.cast_ref_u16(); + let mut current = 0; + for delta in deltas { + current += *delta as i64; + decoded.push(current); + } + Box::new(decoded) + } + EncodingType::U32 => { + let mut decoded = Vec::with_capacity(arg0.len()); + let deltas = arg0.cast_ref_u32(); + let mut current = 0; + for delta in deltas { + current += *delta as i64; + decoded.push(current); + } + Box::new(decoded) + } + EncodingType::I64 => { + let mut decoded = Vec::with_capacity(arg0.len()); + let deltas = arg0.cast_ref_i64(); + let mut current = 0; + for delta in deltas { + current += *delta; + decoded.push(current); + } + Box::new(decoded) + } + _ => panic!( + "Unsupported encoding type for CodecOp::Delta: {:?}", + encoding_type + ), + }, + CodecOp::ToI64(encoding_type) => match encoding_type { + EncodingType::U8 => Box::new( + arg0.cast_ref_u8() + .iter() + .map(|&v| v as i64) + .collect::>(), + ), + EncodingType::U16 => Box::new( + arg0.cast_ref_u16() + .iter() + .map(|&v| v as i64) + .collect::>(), + ), + EncodingType::U32 => Box::new( + arg0.cast_ref_u32() + .iter() + .map(|&v| v as i64) + .collect::>(), + ), + _ => panic!( + "Unsupported encoding type for CodecOp::ToI64: {:?}", + encoding_type + ), + }, + CodecOp::PushDataSection(index) => { + let data_section = sections[*index].slice_box(0, sections[*index].len()); + section_stack.push(data_section); + continue; + } + CodecOp::DictLookup(encoding_type) => { + let dict_data = section_stack.pop().unwrap(); + // TODO: make lifetimes work out + let dict_data = + unsafe { std::mem::transmute::<&[u8], &[u8]>(dict_data.cast_ref_u8()) }; + let string_ranges = section_stack.pop().unwrap(); + let string_ranges = string_ranges.cast_ref_u64(); + let indices: Vec = match encoding_type { + EncodingType::U8 => section_stack + .pop() + .unwrap() + .cast_ref_u8() + .iter() + .map(|i| *i as usize) + .collect(), + EncodingType::U16 => section_stack + .pop() + .unwrap() + .cast_ref_u16() + .iter() + .map(|i| *i as usize) + .collect(), + EncodingType::U32 => section_stack + .pop() + .unwrap() + .cast_ref_u32() + .iter() + .map(|i| *i as usize) + .collect(), + EncodingType::I64 => section_stack + .pop() + .unwrap() + .cast_ref_i64() + .iter() + .map(|i| *i as usize) + .collect(), + _ => panic!( + "Unexpected encoding type for CodecOp::DictLookup: {:?}", + encoding_type + ), + }; + let mut output = Vec::new(); + for i in indices.iter() { + let offset_len = string_ranges[*i]; + let offset = (offset_len >> 24) as usize; + let len = (offset_len & 0x00ff_ffff) as usize; + let string = + unsafe { str::from_utf8_unchecked(&dict_data[offset..(offset + len)]) }; + output.push(string); + } + Box::new(output) as BoxedData + } + CodecOp::LZ4(encoding_type, count) => { + match encoding_type { + EncodingType::U8 => { + let mut decoded = vec![0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(decoded) as BoxedData + } + EncodingType::I64 => { + let mut decoded = vec![0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(decoded) + } + EncodingType::F64 => { + let mut decoded = vec![0.0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(vec_f64_to_vec_of64(decoded)) + } + other => panic!("Unsupported encoding type for CodecOp::LZ4: {:?}", other), + } + } + CodecOp::Pco(encoding_type, _, is_fp32) => { + let encoded_data = arg0.cast_ref_u8(); + match encoding_type { + EncodingType::U8 => { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded.iter().map(|&x| x as u8).collect::>()) as BoxedData + } + EncodingType::U16 => { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded.iter().map(|&x| x as u16).collect::>()) + } + EncodingType::U32 => { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded) + } + EncodingType::U64 => { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded) + } + EncodingType::I64 => { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded) + } + EncodingType::F64 => { + if *is_fp32 { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new( + decoded + .iter() + .map(|&v| OrderedFloat(v as f64)) + .collect::>(), + ) + } else { + let decoded = simple_decompress::(encoded_data).unwrap(); + Box::new(decoded.iter().map(|&v| OrderedFloat(v)).collect::>()) + } + } + encoding_type => panic!( + "Unsupported encoding type for CodecOp::Pco: {:?}", + encoding_type + ), + } + } + CodecOp::UnpackStrings => { + let mut output = Vec::new(); + let packed: &'a [u8] = sections[0].cast_ref_u8(); + let iterator = unsafe { StringPackerIterator::from_slice(packed) }; + for str in iterator { + output.push(str); + } + Box::new(output) as BoxedData + } + CodecOp::UnhexpackStrings(_, _) => todo!(), + CodecOp::Unknown => todo!(), + }; + section_stack.pop(); + section_stack.push(decoded); + } + + section_stack.pop().unwrap() +} diff --git a/src/mem_store/column_buffer.rs b/src/mem_store/column_buffer.rs new file mode 100644 index 00000000..96f4fe9c --- /dev/null +++ b/src/mem_store/column_buffer.rs @@ -0,0 +1,495 @@ +use std::cmp; +use std::sync::Arc; + +use datasize::DataSize; +use ordered_float::OrderedFloat; + +use crate::bitvec::*; +use crate::ingest::raw_val::RawVal; +use crate::mem_store::column::*; +use crate::mem_store::integers::*; +use crate::mem_store::strings::*; +use crate::stringpack::*; + +use super::floats::FloatColumn; + +#[derive(Default, Clone, Debug, DataSize)] +pub struct ColumnBuffer { + buffer: TypedBuffer, + length: usize, + present: Option>, +} + +impl ColumnBuffer { + pub fn null(length: usize) -> Self { + ColumnBuffer { + buffer: TypedBuffer::Empty, + length, + present: None, + } + } + + pub fn push_val(&mut self, elem: RawVal) { + match elem { + RawVal::Int(elem) => self.push_ints([elem], None), + RawVal::Float(ordered_float) => self.push_floats([ordered_float], None), + RawVal::Str(s) => self.push_strings([s.as_str()], None), + RawVal::Null => self.push_nulls(1), + } + } + + pub fn push_ints>(&mut self, elems: I, present: Option<&[u8]>) { + let mut count = 0; + match &mut self.buffer { + TypedBuffer::Empty => { + let mut buffer = IntColBuffer::default(); + if self.len() > 0 { + self.init_present(); + } + for _ in 0..self.len() { + buffer.push(0); + } + for i in elems { + buffer.push(i); + count += 1; + } + self.buffer = TypedBuffer::Int(buffer); + } + TypedBuffer::Int(buffer) => { + for i in elems { + buffer.push(i); + count += 1; + } + } + TypedBuffer::Mixed(buffer) => { + for i in elems { + buffer.push(RawVal::Int(i)); + count += 1; + } + } + TypedBuffer::Float(buffer) => { + // TODO: conversion is potentially lossy, convert into mixed column if necessary + for i in elems { + buffer.push(i as f64); + count += 1; + } + } + TypedBuffer::String(buffer) => { + let mut buffer = MixedColBuffer { + data: buffer + .values + .iter() + .map(|s| RawVal::Str(s.to_string())) + .collect(), + }; + for i in elems { + buffer.push(RawVal::Int(i)); + count += 1; + } + self.buffer = TypedBuffer::Mixed(buffer); + } + } + self.push_present(present, count); + self.length += count; + } + + pub fn push_floats>>( + &mut self, + elems: I, + present: Option<&[u8]>, + ) { + let mut count = 0; + match &mut self.buffer { + TypedBuffer::Empty => { + if self.len() > 0 { + self.init_present(); + } + let mut buffer = FloatColBuffer::default(); + for _ in 0..self.len() { + buffer.push(0.0); + } + for f in elems { + buffer.push(f.0); + count += 1; + } + self.buffer = TypedBuffer::Float(buffer); + } + TypedBuffer::Float(buffer) => { + for f in elems { + buffer.push(f.0); + count += 1; + } + } + TypedBuffer::Int(buffer) => { + // TODO: conversion is potentially lossy, convert into mixed column if necessary + let mut float_buffer = FloatColBuffer::default(); + for i in buffer.data.iter() { + float_buffer.push(*i as f64); + } + for f in elems { + float_buffer.push(f.0); + count += 1; + } + self.buffer = TypedBuffer::Float(float_buffer); + } + TypedBuffer::String(buffer) => { + let mut buffer = MixedColBuffer { + data: buffer + .values + .iter() + .map(|s| RawVal::Str(s.to_string())) + .collect(), + }; + for f in elems { + buffer.push(RawVal::Float(f)); + count += 1; + } + self.buffer = TypedBuffer::Mixed(buffer); + } + TypedBuffer::Mixed(buffer) => { + for f in elems { + buffer.push(RawVal::Float(f)); + count += 1; + } + } + } + self.push_present(present, count); + self.length += count; + } + + pub fn push_strings<'a, I: IntoIterator>( + &mut self, + elems: I, + present: Option<&[u8]>, + ) { + let mut count = 0; + match &mut self.buffer { + TypedBuffer::Empty => { + if self.len() > 0 { + self.init_present(); + } + let mut buffer = StringColBuffer::default(); + for _ in 0..self.len() { + buffer.push(""); + } + for s in elems { + buffer.push(s); + count += 1; + } + self.buffer = TypedBuffer::String(buffer); + } + TypedBuffer::String(buffer) => { + for s in elems { + buffer.push(s); + count += 1; + } + } + TypedBuffer::Int(buffer) => { + let mut buffer = MixedColBuffer { + data: buffer + .data + .iter() + .map(|s| RawVal::Str(s.to_string())) + .collect(), + }; + for s in elems { + buffer.push(RawVal::Str(s.to_string())); + count += 1; + } + self.buffer = TypedBuffer::Mixed(buffer); + } + TypedBuffer::Float(buffer) => { + let mut buffer = MixedColBuffer { + data: buffer + .data + .iter() + .map(|s| RawVal::Str(s.to_string())) + .collect(), + }; + for s in elems { + buffer.push(RawVal::Str(s.to_string())); + count += 1; + } + self.buffer = TypedBuffer::Mixed(buffer); + } + TypedBuffer::Mixed(buffer) => { + for s in elems { + buffer.push(RawVal::Str(s.to_string())); + count += 1; + } + } + } + self.push_present(present, count); + self.length += count; + } + + fn push_present(&mut self, new_present: Option<&[u8]>, count: usize) { + if let Some(all_present) = self.present.as_mut() { + if let Some(new_present) = new_present { + for i in 0..count { + if BitVec::is_set(new_present, i) { + BitVecMut::set(all_present, self.length + i); + } + } + } else { + for i in 0..count { + BitVecMut::set(all_present, self.length + i); + } + } + } + } + + fn init_present(&mut self) { + assert!(self.present.is_none()); + match self.buffer { + TypedBuffer::Empty => { + let present = vec![0; self.length / 8]; + self.present = Some(present); + } + _ => { + let mut present = vec![0; self.length / 8]; + for i in 0..self.length { + BitVecMut::set(&mut present, i); + } + self.present = Some(present); + } + } + } + + pub fn push_nulls(&mut self, count: usize) { + match &mut self.buffer { + TypedBuffer::Empty => {} + buffer => { + if self.present.is_none() { + let mut present = vec![0xff; self.length / 8]; + for i in ((self.length / 8) * 8)..self.length { + BitVecMut::set(&mut present, i); + } + self.present = Some(present); + } + match buffer { + TypedBuffer::Int(buffer) => { + for _ in 0..count { + buffer.push(0); + } + } + TypedBuffer::Float(buffer) => { + for _ in 0..count { + buffer.push(0.0); + } + } + TypedBuffer::Mixed(buffer) => { + for _ in 0..count { + buffer.push(RawVal::Null); + } + } + TypedBuffer::String(buffer) => { + for _ in 0..count { + buffer.push(""); + } + } + TypedBuffer::Empty => {} + } + } + } + self.length += count; + } + + pub fn finalize(self, name: &str) -> Arc { + match self.buffer { + TypedBuffer::Empty => Arc::new(Column::null(name, self.length)), + TypedBuffer::Int(buffer) => buffer.finalize(name, self.present), + TypedBuffer::Float(buffer) => buffer.finalize(name, self.present), + TypedBuffer::String(buffer) => buffer.finalize(name, self.present), + TypedBuffer::Mixed(buffer) => buffer.finalize(name, self.present), + } + } + + pub fn len(&self) -> usize { + self.length + } +} + +#[derive(Default, Clone, Debug, DataSize)] +enum TypedBuffer { + #[default] + Empty, + String(StringColBuffer), + Int(IntColBuffer), + Float(FloatColBuffer), + Mixed(MixedColBuffer), +} + +#[derive(Clone, Debug, DataSize)] +struct StringColBuffer { + values: IndexedPackedStrings, + lhex: bool, + uhex: bool, + string_bytes: usize, +} + +impl Default for StringColBuffer { + fn default() -> StringColBuffer { + StringColBuffer { + values: IndexedPackedStrings::default(), + lhex: true, + uhex: true, + string_bytes: 0, + } + } +} + +impl StringColBuffer { + fn push(&mut self, elem: &str) { + self.lhex = self.lhex && is_lowercase_hex(elem); + self.uhex = self.uhex && is_uppercase_hex(elem); + self.string_bytes += elem.len(); + self.values.push(elem); + } + + fn finalize(self, name: &str, present: Option>) -> Arc { + fast_build_string_column( + name, + self.values.iter(), + self.values.len(), + self.lhex, + self.uhex, + self.string_bytes, + present, + ) + } +} + +#[derive(Clone, Debug, DataSize)] +struct IntColBuffer { + data: Vec, + min: i64, + max: i64, + increasing: u64, + allow_delta_encode: bool, + last: i64, +} + +impl Default for IntColBuffer { + fn default() -> IntColBuffer { + IntColBuffer { + data: Vec::new(), + min: i64::MAX, + max: i64::MIN, + increasing: 0, + allow_delta_encode: true, + last: i64::MIN, + } + } +} + +impl IntColBuffer { + fn push(&mut self, elem: i64) { + // PERF: can set arbitrary values for null to help compression (extend from last/previous value) + self.min = cmp::min(elem, self.min); + self.max = cmp::max(elem, self.max); + if elem > self.last { + self.increasing += 1; + } else if elem.checked_sub(self.last).is_none() { + self.allow_delta_encode = false; + }; + self.last = elem; + self.data.push(elem); + } + + fn finalize(self, name: &str, present: Option>) -> Arc { + // PERF: heuristic for deciding delta encoding could probably be improved + let delta_encode = + self.allow_delta_encode && (self.increasing * 10 > self.data.len() as u64 * 9); + IntegerColumn::new_boxed(name, self.data, self.min, self.max, delta_encode, present) + } +} + +#[derive(Default, Clone, Debug, DataSize)] +struct FloatColBuffer { + data: Vec, +} + +impl FloatColBuffer { + #[inline] + fn push(&mut self, elem: f64) { + self.data.push(elem); + } + + fn finalize(self, name: &str, present: Option>) -> Arc { + FloatColumn::new_boxed( + name, + unsafe { std::mem::transmute::, Vec>>(self.data) }, + present, + ) + } +} + +#[derive(Default, Clone, Debug, DataSize)] +struct MixedColBuffer { + data: Vec, +} + +impl MixedColBuffer { + fn push(&mut self, elem: RawVal) { + self.data.push(elem); + } + + fn finalize(self, name: &str, present: Option>) -> Arc { + // TODO: allow for mixed columns + let mut string_col = StringColBuffer::default(); + for elem in self.data { + match elem { + RawVal::Str(s) => string_col.push(&s), + RawVal::Int(i) => string_col.push(&i.to_string()), + RawVal::Float(f) => string_col.push(&f.to_string()), + RawVal::Null => {} + } + } + string_col.finalize(name, present) + } +} + +fn is_lowercase_hex(string: &str) -> bool { + string.len() & 1 == 0 + && string.chars().all(|c| { + c == '0' + || c == '1' + || c == '2' + || c == '3' + || c == '4' + || c == '5' + || c == '6' + || c == '7' + || c == '8' + || c == '9' + || c == 'a' + || c == 'b' + || c == 'c' + || c == 'd' + || c == 'e' + || c == 'f' + }) +} + +fn is_uppercase_hex(string: &str) -> bool { + string.len() & 1 == 0 + && string.chars().all(|c| { + c == '0' + || c == '1' + || c == '2' + || c == '3' + || c == '4' + || c == '5' + || c == '6' + || c == '7' + || c == '8' + || c == '9' + || c == 'A' + || c == 'B' + || c == 'C' + || c == 'D' + || c == 'E' + || c == 'F' + }) +} diff --git a/src/mem_store/column_builder.rs b/src/mem_store/column_builder.rs deleted file mode 100644 index 941fd1a5..00000000 --- a/src/mem_store/column_builder.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::cmp; -use std::sync::Arc; - -use ordered_float::OrderedFloat; - -use crate::mem_store::column::*; -use crate::mem_store::integers::*; -use crate::mem_store::strings::*; -use crate::stringpack::*; - -use super::floats::FloatColumn; - -pub trait ColumnBuilder: Default { - fn push(&mut self, elem: &T); - fn finalize(self, name: &str, present: Option>) -> Arc; -} - -pub struct StringColBuilder { - values: IndexedPackedStrings, - lhex: bool, - uhex: bool, - string_bytes: usize, -} - -impl Default for StringColBuilder { - fn default() -> StringColBuilder { - StringColBuilder { - values: IndexedPackedStrings::default(), - lhex: true, - uhex: true, - string_bytes: 0, - } - } -} - -impl> ColumnBuilder for StringColBuilder { - fn push(&mut self, elem: &T) { - let elem = elem.as_ref(); - self.lhex = self.lhex && is_lowercase_hex(elem); - self.uhex = self.uhex && is_uppercase_hex(elem); - self.string_bytes += elem.len(); - self.values.push(elem); - } - - fn finalize(self, name: &str, present: Option>) -> Arc { - fast_build_string_column( - name, - self.values.iter(), - self.values.len(), - self.lhex, - self.uhex, - self.string_bytes, - present, - ) - } -} - -pub struct IntColBuilder { - data: Vec, - min: i64, - max: i64, - increasing: u64, - allow_delta_encode: bool, - last: i64, -} - -impl Default for IntColBuilder { - fn default() -> IntColBuilder { - IntColBuilder { - data: Vec::new(), - min: i64::MAX, - max: i64::MIN, - increasing: 0, - allow_delta_encode: true, - last: i64::MIN, - } - } -} - -impl ColumnBuilder> for IntColBuilder { - #[inline] - fn push(&mut self, elem: &Option) { - // PERF: can set arbitrary values for null to help compression (extend from last/previous value) - let elem = elem.unwrap_or(0); - self.min = cmp::min(elem, self.min); - self.max = cmp::max(elem, self.max); - if elem > self.last { - self.increasing += 1; - } else if elem.checked_sub(self.last).is_none() { - self.allow_delta_encode = false; - }; - self.last = elem; - self.data.push(elem); - } - - fn finalize(self, name: &str, present: Option>) -> Arc { - // PERF: heuristic for deciding delta encoding could probably be improved - let delta_encode = - self.allow_delta_encode && (self.increasing * 10 > self.data.len() as u64 * 9); - IntegerColumn::new_boxed(name, self.data, self.min, self.max, delta_encode, present) - } -} - -#[derive(Default)] -pub struct FloatColBuilder { - data: Vec>, -} - -impl ColumnBuilder> for FloatColBuilder { - #[inline] - fn push(&mut self, elem: &Option) { - // PERF: can set arbitrary values for null to help compression (extend from last/previous value) - let elem = elem.unwrap_or(0.0); - self.data.push(OrderedFloat(elem)); - } - - fn finalize(self, name: &str, present: Option>) -> Arc { - FloatColumn::new_boxed(name, self.data, present) - } -} - -fn is_lowercase_hex(string: &str) -> bool { - string.len() & 1 == 0 - && string.chars().all(|c| { - c == '0' - || c == '1' - || c == '2' - || c == '3' - || c == '4' - || c == '5' - || c == '6' - || c == '7' - || c == '8' - || c == '9' - || c == 'a' - || c == 'b' - || c == 'c' - || c == 'd' - || c == 'e' - || c == 'f' - }) -} - -fn is_uppercase_hex(string: &str) -> bool { - string.len() & 1 == 0 - && string.chars().all(|c| { - c == '0' - || c == '1' - || c == '2' - || c == '3' - || c == '4' - || c == '5' - || c == '6' - || c == '7' - || c == '8' - || c == '9' - || c == 'A' - || c == 'B' - || c == 'C' - || c == 'D' - || c == 'E' - || c == 'F' - }) -} diff --git a/src/mem_store/lru.rs b/src/mem_store/lru.rs index 6c1ce958..8fdd4d09 100644 --- a/src/mem_store/lru.rs +++ b/src/mem_store/lru.rs @@ -2,7 +2,7 @@ use crate::mem_store::partition::ColumnLocator; use lru::LruCache; use std::sync::{Arc, Mutex}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Lru { cache: Arc>>, } diff --git a/src/mem_store/mod.rs b/src/mem_store/mod.rs index ef2ff9b9..f9bf28b6 100644 --- a/src/mem_store/mod.rs +++ b/src/mem_store/mod.rs @@ -1,13 +1,12 @@ pub mod codec; pub mod column; -pub mod column_builder; +pub mod column_buffer; pub mod floats; pub mod integers; pub(crate) mod lru; pub mod lz4; mod mixed_column; pub mod partition; -pub mod raw_col; pub mod strings; pub mod table; pub mod tree; @@ -18,4 +17,4 @@ pub use self::column::{Column, DataSection, DataSource}; pub use self::lru::Lru; pub use self::table::TableStats; pub use self::tree::*; -pub use self::value::Val; \ No newline at end of file +pub use self::value::Val; diff --git a/src/mem_store/partition.rs b/src/mem_store/partition.rs index a0d1da4d..b6432d87 100644 --- a/src/mem_store/partition.rs +++ b/src/mem_store/partition.rs @@ -20,6 +20,7 @@ pub struct ColumnLocator { pub column: String, } +#[derive(Debug)] pub struct Partition { pub id: PartitionID, table_name: String, @@ -47,23 +48,28 @@ impl Partition { let mut keys = Vec::with_capacity(cols.len()); let len = cols[0].len(); let total_size_bytes = cols.iter().map(|c| c.heap_size_of_children()).sum(); - let cols = cols - .into_iter() - .map(|c| { - keys.push((id, c.name().to_string())); - ( - c.name().to_string(), - Arc::new(ColumnHandle::resident(table, id, c)), - ) - }) - .collect(); + let mut columns: HashMap> = HashMap::default(); + for c in cols { + let name = c.name().to_string(); + keys.push((id, name.clone())); + let column = Arc::new(ColumnHandle::resident(table, id, c)); + assert!( + column.try_get().as_ref().unwrap().len() == len, + "Expected column \"{}\" to have length {} but got {} (column: {:?})", + name, + len, + column.try_get().as_ref().unwrap().len(), + column, + ); + columns.insert(name, column); + } ( Partition { id, table_name: table.to_string(), range: offset..(offset + len), total_size_bytes, - cols: RwLock::new(cols), + cols: RwLock::new(columns), lru, ephemeral, }, @@ -101,7 +107,18 @@ impl Partition { buffer .buffer .into_iter() - .map(|(name, raw_col)| raw_col.finalize(&name)) + .map(|(name, raw_col)| { + let orig_len = raw_col.len(); + let finalized = raw_col.finalize(&name); + assert!( + orig_len == finalized.len(), + "Column {} has length {} but {} after finalization", + name, + orig_len, + finalized.len() + ); + finalized + }) .collect(), lru, true, @@ -254,6 +271,7 @@ impl Partition { } } +#[derive(Debug)] pub struct ColumnHandle { // Table, Partition, Subpartition key: ColumnLocator, diff --git a/src/mem_store/table.rs b/src/mem_store/table.rs index f969b564..acea4d95 100644 --- a/src/mem_store/table.rs +++ b/src/mem_store/table.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use std::sync::Arc; use std::sync::{Mutex, RwLock}; +use datasize::DataSize; use itertools::Itertools; use crate::disk_store::storage::Storage; @@ -330,7 +331,7 @@ impl Table { .map(|partition| partition.heap_size_of_children()) .sum(), buffer_length: buffer.len(), - buffer_bytes: buffer.heap_size_of_children(), + buffer_bytes: buffer.estimate_heap_size(), size_per_column, } } @@ -345,7 +346,7 @@ impl Table { }; let buffer_size = { let buffer = self.buffer.lock().unwrap(); - buffer.heap_size_of_children() + buffer.estimate_heap_size() }; (batches_size, buffer_size) } diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 112a9864..1cd054b8 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -26,14 +26,13 @@ use crate::ingest::raw_val::RawVal; use crate::locustdb::Options; use crate::mem_store::partition::Partition; use crate::mem_store::table::*; -use crate::observability::{metrics, PerfCounter, SimpleTracer}; +use crate::observability::{metrics, PerfCounter, QueryPerfCounter, SimpleTracer}; use crate::scheduler::disk_read_scheduler::DiskReadScheduler; use crate::scheduler::*; use crate::{disk_store::*, QueryError, QueryOutput}; use crate::{mem_store::*, NoopStorage}; use self::meta_store::SubpartitionMetadata; -use self::raw_col::MixedCol; use self::wal_segment::WalSegment; // Table name + list of partitions @@ -124,7 +123,7 @@ impl InnerLocustDB { let _ = locustdb.create_if_empty_no_ingest(&table_name); let tables = locustdb.tables.read().unwrap(); let table = tables.get(&table_name).unwrap(); - let rows = data.len; + let rows = data.len() as u64; // TODO: eliminate conversion if !table.columns_names_loaded() { let column_names = locustdb @@ -132,11 +131,12 @@ impl InnerLocustDB { .expect("Failed to query column names"); table.init_column_names(column_names.into_iter().collect()); } - let columns = data - .columns + let columns: HashMap = data + .into_columns() .into_iter() .map(|(k, v)| (k, InputColumn::from_column_data(v.data, rows))) .collect(); + assert!(columns.iter().all(|(_, c)| c.len() == rows as usize)); table.ingest_homogeneous(columns); } } @@ -262,14 +262,13 @@ impl InnerLocustDB { table.init_column_names(column_names.into_iter().collect()); } let new_column_names = - table.new_column_names(table_buffer.columns.keys().map(|s| s.as_str())); + table.new_column_names(table_buffer.columns().map(|(s, _)| s.as_str())); if !new_column_names.is_empty() { _new_column_rows.push((meta_columns_table.clone(), new_column_names)); } } if !_meta_tables_rows.is_empty() { let (timestamps, names): (Vec<_>, Vec<_>) = _meta_tables_rows.into_iter().unzip(); - let len = timestamps.len() as u64; let mut columns = HashMap::new(); columns.insert( "timestamp".to_string(), @@ -283,20 +282,19 @@ impl InnerLocustDB { data: ColumnData::String(names), }, ); - let meta_tables_buffer = TableBuffer { len, columns }; + let meta_tables_buffer = TableBuffer::new(columns); events .tables .insert("_meta_tables".to_string(), meta_tables_buffer); } for (table, names) in _new_column_rows { - let len = names.len() as u64; let columns = HashMap::from([( "column_name".to_string(), ColumnBuffer { data: ColumnData::String(names), }, )]); - let meta_columns_buffer = TableBuffer { len, columns }; + let meta_columns_buffer = TableBuffer::new(columns); events.tables.insert(table, meta_columns_buffer); } @@ -313,10 +311,10 @@ impl InnerLocustDB { for (table, data) in events.tables { let tables = self.tables.read().unwrap(); let table = tables.get(&table).unwrap(); - let rows = data.len; + let rows = data.len() as u64; // TODO: eliminate conversion let columns = data - .columns + .into_columns() .into_iter() .map(|(k, v)| (k, InputColumn::from_column_data(v.data, rows))) .collect(); @@ -399,9 +397,14 @@ impl InnerLocustDB { let span_compaction = tracer.start_span("compaction"); let (tx, rx) = mpsc::channel(); let num_compactions = compactions.len(); + let mut skipped = 0; for (table, id, range, parts) in compactions { let tx = tx.clone(); let this = self.clone(); + if table.name() == "sdfasdfTestTable" { + skipped += 1; + continue; + } self.walflush_threadpool.execute(move || { let (to_delete, tracer) = this.compact(table, id, range, &parts); tx.send((to_delete, tracer)).unwrap(); @@ -409,7 +412,7 @@ impl InnerLocustDB { } let mut partitions_to_delete = vec![]; let mut longest_span: Option<(SimpleTracer, Duration)> = None; - for (to_delete, tracer) in rx.iter().take(num_compactions) { + for (to_delete, tracer) in rx.iter().take(num_compactions - skipped) { if let Some(to_delete) = to_delete { partitions_to_delete.push(to_delete); } @@ -572,61 +575,74 @@ impl InnerLocustDB { tracer.end_span(span_load_column_names); let span_snapshot_partitions = tracer.start_span("snapshot_partitions"); + // TODO: ensure parts is sorted correctly let data = table.snapshot_parts(parts); tracer.end_span(span_snapshot_partitions); let span_load_columns = tracer.start_span("load_columns"); let mut columns = Vec::with_capacity(colnames.len()); + let query_perf_counter = QueryPerfCounter::new(); for column in &colnames { - let span_create_query = tracer.start_span("create_query"); - let query = Query::read_column(table.name(), column); - tracer.end_span(span_create_query); - - let span_schedule_query = tracer.start_span("schedule_query"); - let (sender, receiver) = oneshot::channel(); - let query_task = QueryTask::new( - query, - false, - false, - vec![], - data.clone(), - self.disk_read_scheduler().clone(), - SharedSender::new(sender), - self.opts.batch_size, - None, - ) - .unwrap(); - self.schedule(query_task); - tracer.end_span(span_schedule_query); - - let span_block_on = tracer.start_span("await_query"); - let result = block_on(receiver).unwrap().unwrap(); - tracer.end_span(span_block_on); - - let span_column_builder = tracer.start_span("build_column"); - let mut column_builder = MixedCol::default(); - let column_data = result.columns.into_iter().next().unwrap().1; - match column_data { - BasicTypeColumn::Int(ints) => column_builder.push_ints(ints), - BasicTypeColumn::Float(floats) => column_builder.push_floats(floats), - BasicTypeColumn::String(strings) => column_builder.push_strings(strings), - BasicTypeColumn::Null(count) => column_builder.push_nulls(count), - BasicTypeColumn::Mixed(raws) => { - raws.into_iter().for_each(|r| column_builder.push(r)) + let mut builder = crate::mem_store::column_buffer::ColumnBuffer::default(); + for part in &data { + let cols = part.get_cols( + &[column.clone()].into(), + self.disk_read_scheduler(), + &query_perf_counter, + ); + let col = if cols.is_empty() { + let len = part.range().len(); + Arc::new(Column::null(column, len)) as Arc + } else { + assert_eq!( + cols.len(), + 1, + "Expected 1 column (column = {column}), got {:?}", + cols.len() + ); + cols.into_values().next().unwrap() + }; + + let decoded = col.decode(); + match decoded.get_type() { + crate::engine::data_types::EncodingType::F64 => { + builder.push_floats(decoded.cast_ref_f64().iter().cloned(), None) + } + crate::engine::data_types::EncodingType::I64 => { + builder.push_ints(decoded.cast_ref_i64().iter().cloned(), None) + } + crate::engine::data_types::EncodingType::Str => { + builder.push_strings(decoded.cast_ref_str().iter().copied(), None) + } + crate::engine::data_types::EncodingType::NullableF64 => builder.push_floats( + decoded.cast_ref_f64().iter().cloned(), + Some(decoded.cast_ref_null_map()), + ), + crate::engine::data_types::EncodingType::Null => { + builder.push_nulls(decoded.len()) + } + crate::engine::data_types::EncodingType::NullableStr => builder.push_strings( + decoded.cast_ref_str().iter().copied(), + Some(decoded.cast_ref_null_map()), + ), + _ => panic!( + "Unsupported encoding type for add: {:?}", + decoded.get_type() + ), } } - tracer.end_span(span_column_builder); assert_eq!( range.len(), - column_builder.len(), - "range={range:?}, column_builder.len() = {}, table = {}, column = {column}", - column_builder.len(), + builder.len(), + "range={range:?}, column_builder.len() = {}, table = {}, column = {column}, column_data = {:?}", + builder.len(), table.name(), + builder, ); let span_finalize_column = tracer.start_span("finalize_column"); - columns.push(column_builder.finalize(column)); + columns.push(builder.finalize(column)); tracer.end_span(span_finalize_column); } tracer.end_span(span_load_columns); @@ -908,7 +924,7 @@ impl InnerLocustDB { } } - let table_buffer = TableBuffer { len: 1, columns }; + let table_buffer = TableBuffer::new(columns); let event_buffer = EventBuffer { tables: HashMap::from([(metrics_table_name.clone(), table_buffer)]), }; diff --git a/src/server/mod.rs b/src/server/mod.rs index a0b91b4b..5180b4d3 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -358,7 +358,7 @@ async fn insert_bin(data: web::Data, req_body: Bytes) -> impl Responde }; log::info!( "Received request data for {} events", - events.tables.values().map(|t| t.len).sum::() + events.tables.values().map(|t| t.len()).sum::() ); data.db.ingest_efficient(events).await; HttpResponse::Ok().json(r#"{"status": "ok"}"#) diff --git a/src/stringpack.rs b/src/stringpack.rs index 174c9224..f3c0e7d3 100644 --- a/src/stringpack.rs +++ b/src/stringpack.rs @@ -1,7 +1,11 @@ use std::str; -#[derive(Default)] +use datasize::DataSize; + +#[derive(Default, Clone, Debug, DataSize)] pub struct IndexedPackedStrings { + // each element stores a pointer and length into the `backing_store` + // the pointer is in the upper 40 bits, and the length is in the lower 24 bits data: Vec, backing_store: Vec, } diff --git a/tests/ingestion_test.rs b/tests/ingestion_test.rs index 16aebbe4..8ec454c2 100644 --- a/tests/ingestion_test.rs +++ b/tests/ingestion_test.rs @@ -1,7 +1,7 @@ use actix_web::dev::ServerHandle; use locustdb::logging_client::BufferFullPolicy; use locustdb_serialization::api::any_val_syntax::vf64; -use locustdb_serialization::api::Column; +use locustdb_serialization::api::{AnyVal, Column}; use pretty_assertions::assert_eq; use tempfile::tempdir; @@ -28,7 +28,7 @@ async fn test_ingestion() { io_threads: 8, ..locustdb::Options::default() }; - let port = 8888; + let port = 8895; let (db, handle) = create_locustdb(&opts, port); let tables = (0..20) @@ -36,7 +36,7 @@ async fn test_ingestion() { .collect::>(); let mut total_rows = 0; - ingest(total_rows, 127, 10, &tables); + ingest(total_rows, 127, 10, &tables, port); total_rows += 127; log::info!("completed ingestion"); @@ -50,7 +50,7 @@ async fn test_ingestion() { assert_eq!( id_sum.rows.unwrap(), vec![[ - Float(i as f64), + Int(i as i64), Float((total_rows * (total_rows - 1) / 2) as f64), Int(total_rows as i64) ],] @@ -98,7 +98,7 @@ async fn test_ingestion() { let start_time = Instant::now(); let new_rows = 21 + 11 * i; - ingest(total_rows, new_rows, i, &tables); + ingest(total_rows, new_rows, i, &tables, port); log::info!("Ingested {} rows in {:?}", new_rows, start_time.elapsed()); let start_time = Instant::now(); @@ -138,6 +138,41 @@ async fn test_ingestion() { } } +// Need multiple threads since dropping logging client blocks main thread and prevents logging worker from flushing buffers +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_simple_ingestion() { + let _ = env_logger::try_init(); + + let db_path: PathBuf = tempdir().unwrap().path().into(); + log::info!("Creating LocustDB at {:?}", db_path); + let opts = locustdb::Options { + db_path: Some(db_path), + io_threads: 8, + ..locustdb::Options::default() + }; + let port = 8888; + let (mut db, mut handle) = create_locustdb(&opts, port); + + let tables = ["TestTable".to_string()]; + let mut total_rows = 0; + ingest_simple(total_rows, 1, &tables[0]); + total_rows += 1; + + for i in 0..10 { + handle.stop(true).await; + drop(db); + (db, handle) = create_locustdb(&opts, port); + ingest_simple(total_rows, 3, &tables[0]); + total_rows += 3; + test_simple_db(&db, total_rows, &tables[0]).await; + if i % 3 == 0 { + db.force_flush(); + } + } + + test_simple_db(&db, total_rows, &tables[0]).await; +} + async fn test_db(db: &LocustDB, nrow: usize, tables: &[String]) { for (i, table) in tables.iter().enumerate() { let id_sum = query( @@ -149,7 +184,7 @@ async fn test_db(db: &LocustDB, nrow: usize, tables: &[String]) { assert_eq!( id_sum.rows.unwrap(), vec![[ - Float(i as f64), + Int(i as i64), Float((nrow * (nrow - 1) / 2) as f64), Int(nrow as i64) ],] @@ -157,9 +192,24 @@ async fn test_db(db: &LocustDB, nrow: usize, tables: &[String]) { } } -fn ingest(offset: usize, rows: usize, random_cols: usize, tables: &[String]) { - let start_time = Instant::now(); - log::info!("Ingesting {rows} rows into {} tables", tables.len()); +async fn test_simple_db(db: &LocustDB, nrow: usize, table: &String) { + let id_sum = query( + db, + &format!("SELECT table_id, SUM(row), COUNT(1) FROM {}", table), + ) + .await; + + assert_eq!( + id_sum.rows.unwrap(), + vec![[ + Int(1), + Float((nrow * (nrow - 1) / 2) as f64), + Int(nrow as i64) + ],] + ); +} + +fn ingest_simple(offset: usize, rows: usize, table: &str) { let addr = "http://localhost:8888"; let mut log = locustdb::logging_client::LoggingClient::new( Duration::from_secs(1), @@ -168,12 +218,33 @@ fn ingest(offset: usize, rows: usize, random_cols: usize, tables: &[String]) { BufferFullPolicy::Block, None, ); + for row in 0..rows { + let row = vec![ + ("table_id".to_string(), AnyVal::Int(1)), + ("row".to_string(), vf64((row + offset) as f64)), + ("name".to_string(), AnyVal::Str(table.to_string())), + ]; + log.log(table, row); + } +} + +fn ingest(offset: usize, rows: usize, random_cols: usize, tables: &[String], port: u16) { + let start_time = Instant::now(); + log::info!("Ingesting {rows} rows into {} tables", tables.len()); + let addr = format!("http://localhost:{port}"); + let mut log = locustdb::logging_client::LoggingClient::new( + Duration::from_secs(1), + &addr, + 64 * (1 << 20), + BufferFullPolicy::Block, + None, + ); let mut rng = rand::rngs::SmallRng::seed_from_u64(0); for row in 0..rows { for (i, table) in tables.iter().enumerate() { let mut row = vec![ ("row".to_string(), vf64((row + offset) as f64)), - ("table_id".to_string(), vf64(i as f64)), + ("table_id".to_string(), AnyVal::Int(i as i64)), ]; for c in 0..random_cols { row.push((format!("col_{c}"), vf64(rng.random::()))); From f64994d48a8a2e49eb18c9d7a86d378eee6e654a Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Mon, 26 May 2025 20:28:09 -0700 Subject: [PATCH 2/7] nullable i64 --- src/scheduler/inner_locustdb.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 1cd054b8..e6f3fca3 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -625,6 +625,10 @@ impl InnerLocustDB { decoded.cast_ref_str().iter().copied(), Some(decoded.cast_ref_null_map()), ), + crate::engine::data_types::EncodingType::NullableI64 => builder.push_ints( + decoded.cast_ref_i64().iter().cloned(), + Some(decoded.cast_ref_null_map()), + ), _ => panic!( "Unsupported encoding type for add: {:?}", decoded.get_type() From a8d50d76e9cb0598f57204820625984cca58cc81 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 31 May 2025 15:46:28 -0700 Subject: [PATCH 3/7] remove assert --- locustdb-serialization/src/event_buffer.rs | 7 ------- wandb_data_import.py | 3 ++- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/locustdb-serialization/src/event_buffer.rs b/locustdb-serialization/src/event_buffer.rs index ccfa910f..29098b58 100644 --- a/locustdb-serialization/src/event_buffer.rs +++ b/locustdb-serialization/src/event_buffer.rs @@ -232,13 +232,6 @@ impl EventBuffer { .reborrow() .init_columns(table.columns.len() as u32); for (j, (colname, column)) in table.columns.iter().enumerate() { - assert!( - column.data.len() == table.len as usize || column.data.len() == 0, - "Column {} has length {} but table has length {}", - colname, - column.data.len(), - table.len, - ); let mut column_builder = columns.reborrow().get(j as u32); column_builder.set_name(colname); match &column.data { diff --git a/wandb_data_import.py b/wandb_data_import.py index 5c3ea88f..1b6459b4 100644 --- a/wandb_data_import.py +++ b/wandb_data_import.py @@ -17,6 +17,7 @@ print("Starting...") i = 0 +logger = locustdb.Client(url="http://localhost:8080") while True: try: run = next(runs) @@ -33,7 +34,7 @@ for row in run.history(pandas=False): clean_row = {k: v or 0.0 for k, v in row.items() if not isinstance(v, dict) and not isinstance(v, str)} # print(clean_row) - locustdb.log(table=run.name, metrics=clean_row) + logger.log(table="gb_9a43be3e-"+run.name, metrics=clean_row) rows += 1 print(f"Logged {rows} rows") i += 1 From 7acb5fc665378d59dd698cdf25470afbdbcbbf7b Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 31 May 2025 16:21:30 -0700 Subject: [PATCH 4/7] faster wal recovery --- src/bin/repl/main.rs | 6 +++ src/disk_store/storage.rs | 77 ++++++++++++++++++++------------- src/locustdb.rs | 3 ++ src/scheduler/inner_locustdb.rs | 14 +++++- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/src/bin/repl/main.rs b/src/bin/repl/main.rs index 02605d0f..d90968cc 100644 --- a/src/bin/repl/main.rs +++ b/src/bin/repl/main.rs @@ -41,6 +41,10 @@ struct Opt { #[structopt(long, name = "WAL_SIZE", default_value = "16777216")] max_wal_size_bytes: u64, + /// Maximum number of WAL files before triggering compaction + #[structopt(long, name = "MAX_WAL_FILES", default_value = "1000")] + max_wal_files: usize, + /// Maximum size of partition files in bytes #[structopt(long, name = "PART_SIZE", default_value = "8388608")] max_partition_size_bytes: u64, @@ -150,6 +154,7 @@ fn main() { metrics_interval, metrics_table_name, io_threads, + max_wal_files, } = Opt::from_args(); let options = locustdb::Options { @@ -160,6 +165,7 @@ fn main() { mem_lz4, readahead: readahead * 1024 * 1024, max_wal_size_bytes, + max_wal_files, max_partition_size_bytes, partition_combine_factor, batch_size, diff --git a/src/disk_store/storage.rs b/src/disk_store/storage.rs index c9bd905e..77b090e1 100644 --- a/src/disk_store/storage.rs +++ b/src/disk_store/storage.rs @@ -50,7 +50,7 @@ pub struct Storage { meta_db_path: PathBuf, tables_path: PathBuf, meta_store: Arc>, - writer: Box, + writer: Arc, perf_counter: Arc, io_threadpool: Option, @@ -107,16 +107,17 @@ impl Storage { } else { (Box::new(FileBlobWriter::new()), path.to_owned()) }; - let writer = Box::new(VersionedChecksummedBlobWriter::new(writer)); + let writer = Arc::new(VersionedChecksummedBlobWriter::new(writer)); let meta_db_path = path.join("meta"); let wal_dir = path.join("wal"); let tables_path = path.join("tables"); let (meta_store, wal_segments, wal_size) = Storage::recover( - writer.as_ref(), + writer.clone(), &meta_db_path, &wal_dir, readonly, - perf_counter.as_ref(), + perf_counter.clone(), + io_threads, ); let meta_store = Arc::new(RwLock::new(meta_store)); ( @@ -139,11 +140,12 @@ impl Storage { } fn recover( - writer: &dyn BlobWriter, + writer: Arc, meta_db_path: &Path, wal_dir: &Path, readonly: bool, - perf_counter: &PerfCounter, + perf_counter: Arc, + io_threads: usize, ) -> (MetaStore, Vec>, u64) { let mut meta_store: MetaStore = if writer.exists(meta_db_path).unwrap() { let data = writer.load(meta_db_path).unwrap(); @@ -153,40 +155,57 @@ impl Storage { MetaStore::default() }; - let mut wal_segments = Vec::new(); let earliest_uncommited_wal_id = meta_store.earliest_uncommited_wal_id(); log::info!( "Recovering from wal checkpoint {}", earliest_uncommited_wal_id ); let wal_files = writer.list(wal_dir).unwrap(); + let num_wal_files = wal_files.len(); log::info!("Found {} wal segments", wal_files.len()); - let mut wal_size = 0; + + let threadpool = ThreadPool::new(io_threads.min(wal_files.len())); + let (tx, rx) = mpsc::channel(); for wal_file in wal_files { - let wal_data = writer.load(&wal_file).unwrap(); - perf_counter.disk_read_wal(wal_data.len() as u64); - let wal_segment = WalSegment::deserialize(&wal_data).unwrap(); - log::info!( - "Found wal segment {} with id {} and {} rows in {} tables", - wal_file.display(), - wal_segment.id, - wal_segment.data.tables.values().map(|t| t.len()).sum::(), - wal_segment.data.tables.len(), - ); - if wal_segment.id < earliest_uncommited_wal_id { - if readonly { - log::info!("Skipping wal segment {}", wal_file.display()); + let tx = tx.clone(); + let writer = writer.clone(); + let perf_counter = perf_counter.clone(); + threadpool.execute(move || { + let wal_data = writer.load(&wal_file).unwrap(); + perf_counter.disk_read_wal(wal_data.len() as u64); + let wal_segment = WalSegment::deserialize(&wal_data).unwrap(); + log::info!( + "Found wal segment {} with id {} and {} rows in {} tables", + wal_file.display(), + wal_segment.id, + wal_segment + .data + .tables + .values() + .map(|t| t.len()) + .sum::(), + wal_segment.data.tables.len(), + ); + tx.send((wal_file, wal_segment, wal_data.len() as u64)).unwrap(); + }); + } + + let mut wal_size = 0; + let mut wal_segments = Vec::new(); + for (path, wal_segment, size) in rx.iter().take(num_wal_files) { + if wal_segment.id < earliest_uncommited_wal_id { + if readonly { + log::info!("Skipping wal segment {}", path.display()); + } else { + writer.delete(&path).unwrap(); + log::info!("Deleting wal segment {}", path.display()); + } } else { - writer.delete(&wal_file).unwrap(); - log::info!("Deleting wal segment {}", wal_file.display()); + meta_store.register_wal_segment(wal_segment.id); + wal_size += size; + wal_segments.push(wal_segment); } - } else { - meta_store.register_wal_segment(wal_segment.id); - wal_segments.push(wal_segment); - wal_size += wal_data.len() as u64; - } } - wal_segments.sort_by_key(|s| s.id); (meta_store, wal_segments, wal_size) diff --git a/src/locustdb.rs b/src/locustdb.rs index ec9421a9..196f07fe 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -223,6 +223,8 @@ pub struct Options { pub readahead: usize, /// Maximum size of WAL in bytes before triggering compaction pub max_wal_size_bytes: u64, + /// Maximum number of WAL files before triggering compaction + pub max_wal_files: usize, /// Maximum size of partition pub max_partition_size_bytes: u64, /// Combine partitions when the size of every original partition is less than this factor of the combined partition size @@ -251,6 +253,7 @@ impl Default for Options { mem_lz4: true, readahead: 256 * 1024 * 1024, // 256 MiB max_wal_size_bytes: 64 * 1024 * 1024, // 64 MiB + max_wal_files: 1000, max_partition_size_bytes: 8 * 1024 * 1024, // 8 MiB partition_combine_factor: 4, batch_size: 1024, diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index e6f3fca3..2625e684 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -839,7 +839,19 @@ impl InnerLocustDB { while self.running.load(Ordering::SeqCst) { let wal_size = { *wal_size.lock().unwrap() }; let pending_wal_flushes = mem::take(&mut *pending_wal_flushes_mutex.lock().unwrap()); - if wal_size > self.opts.max_wal_size_bytes || !pending_wal_flushes.is_empty() { + let too_many_wal_files = match self.storage.as_ref() { + Some(storage) => { + let unflushed_wal_ids = + storage.meta_store().read().unwrap().unflushed_wal_ids(); + let wal_file_count = (unflushed_wal_ids.end - unflushed_wal_ids.start) as usize; + wal_file_count > self.opts.max_wal_files + } + None => false, + }; + if wal_size > self.opts.max_wal_size_bytes + || !pending_wal_flushes.is_empty() + || too_many_wal_files + { self.wal_flush(); for sender in pending_wal_flushes { let _ = sender.send(()); From 3efdf2933fbe3b36d0b845140627dc6f2084adef Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 31 May 2025 16:22:55 -0700 Subject: [PATCH 5/7] prevent empty threadpool --- src/disk_store/storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/disk_store/storage.rs b/src/disk_store/storage.rs index 77b090e1..80b5113c 100644 --- a/src/disk_store/storage.rs +++ b/src/disk_store/storage.rs @@ -164,7 +164,7 @@ impl Storage { let num_wal_files = wal_files.len(); log::info!("Found {} wal segments", wal_files.len()); - let threadpool = ThreadPool::new(io_threads.min(wal_files.len())); + let threadpool = ThreadPool::new(io_threads.min(wal_files.len()).max(1)); let (tx, rx) = mpsc::channel(); for wal_file in wal_files { let tx = tx.clone(); From 7b93de17e2dbbe82293acfdcb2a8d6936e48a94a Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sat, 31 May 2025 16:47:03 -0700 Subject: [PATCH 6/7] more fine grained profile --- src/scheduler/inner_locustdb.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 2625e684..84479d86 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -397,14 +397,9 @@ impl InnerLocustDB { let span_compaction = tracer.start_span("compaction"); let (tx, rx) = mpsc::channel(); let num_compactions = compactions.len(); - let mut skipped = 0; for (table, id, range, parts) in compactions { let tx = tx.clone(); let this = self.clone(); - if table.name() == "sdfasdfTestTable" { - skipped += 1; - continue; - } self.walflush_threadpool.execute(move || { let (to_delete, tracer) = this.compact(table, id, range, &parts); tx.send((to_delete, tracer)).unwrap(); @@ -412,7 +407,7 @@ impl InnerLocustDB { } let mut partitions_to_delete = vec![]; let mut longest_span: Option<(SimpleTracer, Duration)> = None; - for (to_delete, tracer) in rx.iter().take(num_compactions - skipped) { + for (to_delete, tracer) in rx.iter().take(num_compactions) { if let Some(to_delete) = to_delete { partitions_to_delete.push(to_delete); } @@ -579,17 +574,21 @@ impl InnerLocustDB { let data = table.snapshot_parts(parts); tracer.end_span(span_snapshot_partitions); - let span_load_columns = tracer.start_span("load_columns"); + let span_build_columns = tracer.start_span("build_columns"); let mut columns = Vec::with_capacity(colnames.len()); let query_perf_counter = QueryPerfCounter::new(); for column in &colnames { let mut builder = crate::mem_store::column_buffer::ColumnBuffer::default(); for part in &data { + let span_load_columns = tracer.start_span("load_column"); let cols = part.get_cols( &[column.clone()].into(), self.disk_read_scheduler(), &query_perf_counter, ); + tracer.end_span(span_load_columns); + + let col = if cols.is_empty() { let len = part.range().len(); Arc::new(Column::null(column, len)) as Arc @@ -603,7 +602,11 @@ impl InnerLocustDB { cols.into_values().next().unwrap() }; + let span_decode = tracer.start_span("decode"); let decoded = col.decode(); + tracer.end_span(span_decode); + + let span_push = tracer.start_span("push"); match decoded.get_type() { crate::engine::data_types::EncodingType::F64 => { builder.push_floats(decoded.cast_ref_f64().iter().cloned(), None) @@ -634,6 +637,7 @@ impl InnerLocustDB { decoded.get_type() ), } + tracer.end_span(span_push); } assert_eq!( @@ -649,7 +653,7 @@ impl InnerLocustDB { columns.push(builder.finalize(column)); tracer.end_span(span_finalize_column); } - tracer.end_span(span_load_columns); + tracer.end_span(span_build_columns); let span_subpartition = tracer.start_span("subpartition"); let (metadata, subpartitions) = subpartition(&self.opts, columns.clone()); From 8ded9485065059399d697d5232f6051e0a62c61f Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Tue, 3 Jun 2025 19:52:04 -0700 Subject: [PATCH 7/7] PCO decode error workaround --- src/mem_store/column.rs | 62 ++++++++++++++++++++++------------------- 1 file changed, 33 insertions(+), 29 deletions(-) diff --git a/src/mem_store/column.rs b/src/mem_store/column.rs index 3d28c15a..ef26d0ea 100644 --- a/src/mem_store/column.rs +++ b/src/mem_store/column.rs @@ -132,10 +132,10 @@ impl Column { self.data[0] = self.data[0].lz4_decode(decoded_type, self.len); trace!("lz4_decode after: {:?}", self); } - if let Some(CodecOp::Pco(decoded_type, ..)) = self.codec.ops().first().copied() { + if let Some(CodecOp::Pco(decoded_type, length, ..)) = self.codec.ops().first().copied() { trace!("lz4_decode before: {:?}", self); self.codec = self.codec.without_pco(); - self.data[0] = self.data[0].pco_decode(decoded_type); + self.data[0] = self.data[0].pco_decode(decoded_type, length); trace!("lz4_decode after: {:?}", self); } } @@ -462,7 +462,7 @@ impl DataSection { } } - pub fn pco_decode(&self, decoded_type: EncodingType) -> DataSection { + pub fn pco_decode(&self, decoded_type: EncodingType, length: usize) -> DataSection { match self { DataSection::Pco { data, is_fp32, .. } => match decoded_type { EncodingType::U8 => DataSection::U8( @@ -482,13 +482,19 @@ impl DataSection { EncodingType::U32 => DataSection::U32(simple_decompress(data).unwrap()), EncodingType::U64 => DataSection::U64(simple_decompress(data).unwrap()), EncodingType::I64 => DataSection::I64(simple_decompress(data).unwrap()), - EncodingType::F64 if *is_fp32 => DataSection::F64( - simple_decompress::(data) - .unwrap() - .into_iter() - .map(|v| OrderedFloat(v as f64)) - .collect(), - ), + EncodingType::F64 if *is_fp32 => match simple_decompress::(data) { + Ok(decompressed) => DataSection::F64( + decompressed + .into_iter() + .map(|v| OrderedFloat(v as f64)) + .collect(), + ), + Err(e) => { + log::error!("Error decompressing PCO f32 data section: {:?}", e); + log::error!("PCO data section (hex): {:02x?}", data); + DataSection::F64(vec![OrderedFloat(0.0); length]) + } + }, EncodingType::F64 if !is_fp32 => DataSection::F64(unsafe { std::mem::transmute::, Vec>( simple_decompress::(data).unwrap(), @@ -736,26 +742,24 @@ fn decode<'a>(codec: &Codec, sections: &[&'a dyn Data<'a>]) -> BoxedData<'a> { } Box::new(output) as BoxedData } - CodecOp::LZ4(encoding_type, count) => { - match encoding_type { - EncodingType::U8 => { - let mut decoded = vec![0; *count]; - lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); - Box::new(decoded) as BoxedData - } - EncodingType::I64 => { - let mut decoded = vec![0; *count]; - lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); - Box::new(decoded) - } - EncodingType::F64 => { - let mut decoded = vec![0.0; *count]; - lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); - Box::new(vec_f64_to_vec_of64(decoded)) - } - other => panic!("Unsupported encoding type for CodecOp::LZ4: {:?}", other), + CodecOp::LZ4(encoding_type, count) => match encoding_type { + EncodingType::U8 => { + let mut decoded = vec![0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(decoded) as BoxedData } - } + EncodingType::I64 => { + let mut decoded = vec![0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(decoded) + } + EncodingType::F64 => { + let mut decoded = vec![0.0; *count]; + lz4::decode::(&mut lz4::decoder(arg0.cast_ref_u8()), &mut decoded); + Box::new(vec_f64_to_vec_of64(decoded)) + } + other => panic!("Unsupported encoding type for CodecOp::LZ4: {:?}", other), + }, CodecOp::Pco(encoding_type, _, is_fp32) => { let encoded_data = arg0.cast_ref_u8(); match encoding_type {