diff --git a/src/bin/db_inspector.rs b/src/bin/db_inspector.rs index bbd18057..79e5b172 100644 --- a/src/bin/db_inspector.rs +++ b/src/bin/db_inspector.rs @@ -34,7 +34,7 @@ struct Opt { fn main() { env_logger::init(); let opts = Opt::from_args(); - let (storage, wal, _) = Storage::new(&opts.db_path, Arc::new(PerfCounter::default()), true); + let (storage, wal, _) = Storage::new(&opts.db_path, Arc::new(PerfCounter::default()), true, 1); { let meta = storage.meta_store().read().unwrap(); @@ -66,9 +66,7 @@ fn main() { for (i, subpartition) in partition.subpartitions.iter().enumerate() { println!( " Subpartition {} has last column {} ({} bytes)", - i, - subpartition.last_column, - subpartition.size_bytes, + i, subpartition.last_column, subpartition.size_bytes, ); if opts.meta > 2 { println!( diff --git a/src/bin/repl/main.rs b/src/bin/repl/main.rs index 6e035166..4863439a 100644 --- a/src/bin/repl/main.rs +++ b/src/bin/repl/main.rs @@ -108,6 +108,10 @@ struct Opt { #[structopt(long, default_value = "1")] wal_flush_compaction_threads: usize, + /// Number of parallel threads used for IO operations + #[structopt(long, default_value = "1")] + io_threads: usize, + /// Internal metrics collection interval in seconds #[structopt(long, default_value = "15")] metrics_interval: u64, @@ -144,6 +148,7 @@ fn main() { wal_flush_compaction_threads, metrics_interval, metrics_table_name, + io_threads, } = Opt::from_args(); let options = locustdb::Options { @@ -159,6 +164,7 @@ fn main() { batch_size, max_partition_length: 1024 * 1024, wal_flush_compaction_threads, + io_threads, metrics_interval, metrics_table_name, }; diff --git a/src/disk_store/meta_store.rs b/src/disk_store/meta_store.rs index 649629a6..fa206586 100644 --- a/src/disk_store/meta_store.rs +++ b/src/disk_store/meta_store.rs @@ -235,8 +235,6 @@ impl MetaStore { let mut buf = Vec::new(); serialize_packed::write_message(&mut buf, &builder).unwrap(); tracer.end_span(span_message_serialization); - tracer.end_span(span_serialize); - tracer.annotate("table_count", self.partitions.len()); tracer.annotate("partition_count", total_partitions); tracer.annotate("unique_column_count", 0); @@ -245,6 +243,7 @@ impl MetaStore { tracer.annotate("column_names_bytes", 0); tracer.annotate("compressed_column_names_bytes", 0); tracer.annotate("column_name_lengths_bytes", 0); + tracer.end_span(span_serialize); buf } diff --git a/src/disk_store/storage.rs b/src/disk_store/storage.rs index 09ba7813..a50803f8 100644 --- a/src/disk_store/storage.rs +++ b/src/disk_store/storage.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use std::ops::Range; use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::sync::{mpsc, Arc, RwLock}; + +use threadpool::ThreadPool; use super::azure_writer::AzureBlobWriter; use super::file_writer::{BlobWriter, FileBlobWriter, VersionedChecksummedBlobWriter}; @@ -50,6 +52,8 @@ pub struct Storage { meta_store: Arc>, writer: Box, perf_counter: Arc, + + io_threadpool: Option, } impl Storage { @@ -57,6 +61,7 @@ impl Storage { path: &Path, perf_counter: Arc, readonly: bool, + io_threads: usize, ) -> (Storage, Vec>, u64) { let (writer, path): (Box, PathBuf) = if path.starts_with("gs://") { @@ -122,6 +127,11 @@ impl Storage { meta_store, writer, perf_counter, + io_threadpool: if io_threads > 1 { + Some(ThreadPool::new(io_threads)) + } else { + None + }, }, wal_segments, wal_size, @@ -238,34 +248,94 @@ impl Storage { } pub fn persist_partitions( - &self, + self: &Arc, partitions: Vec<(PartitionMetadata, Vec>>)>, tracer: &mut SimpleTracer, ) { let span_persist_partitions = tracer.start_span("persist_partitions"); + let mut partition_count = 0; + let mut partition_bytes = 0; + if let Some(io_threadpool) = &self.io_threadpool { + let (tx, rx) = mpsc::channel(); + let span_spawn_tasks = tracer.start_span("spawn_tasks"); + for (partition, subpartitions) in partitions { + partition_count += 1; + partition_bytes += subpartitions + .iter() + .flat_map(|s| s.iter()) + .map(|c| c.heap_size_of_children()) + .sum::(); + let tx = tx.clone(); + let storage = self.clone(); + io_threadpool.execute(move || { + storage.write_subpartitions(&partition, subpartitions, false); + let mut meta_store = storage.meta_store.write().unwrap(); + meta_store.insert_partition(partition); + tx.send(()).unwrap(); + }); + } + tracer.end_span(span_spawn_tasks); - // Write out new partition files - for (partition, subpartition_cols) in partitions { - let span_write_subpartitions = tracer.start_span("write_subpartitions"); - self.write_subpartitions(&partition, subpartition_cols, false); - tracer.end_span(span_write_subpartitions); - - let span_lock_meta_store = tracer.start_span("lock_meta_store"); - let mut meta_store = self.meta_store.write().unwrap(); - meta_store.insert_partition(partition); - tracer.end_span(span_lock_meta_store); + let span_wait_for_tasks = tracer.start_span("wait_for_tasks"); + for _ in rx.iter().take(partition_count) { + // Wait for all partitions to be persisted + } + tracer.end_span(span_wait_for_tasks); + } else { + // Write out new partition files + for (partition, subpartition_cols) in partitions { + let span_write_subpartitions = tracer.start_span("write_subpartitions"); + self.write_subpartitions(&partition, subpartition_cols, false); + tracer.end_span(span_write_subpartitions); + + let span_lock_meta_store = tracer.start_span("lock_meta_store"); + let mut meta_store = self.meta_store.write().unwrap(); + meta_store.insert_partition(partition); + tracer.end_span(span_lock_meta_store); + } } + tracer.annotate("partition_count", partition_count); + tracer.annotate("partition_bytes", partition_bytes); tracer.end_span(span_persist_partitions); } + pub fn persist_partition( + &self, + partition: PartitionMetadata, + subpartition_cols: Vec>>, + ) { + self.write_subpartitions(&partition, subpartition_cols, false); + let mut meta_store = self.meta_store.write().unwrap(); + meta_store.insert_partition(partition); + } + /// Delete WAL segments with ids in the given range. - pub fn delete_wal_segments(&self, ids: Range, tracer: &mut SimpleTracer) { + pub fn delete_wal_segments(self: &Arc, ids: Range, tracer: &mut SimpleTracer) { let span_delete_wal_segments = tracer.start_span("delete_wal_segments"); - for id in ids { - let path = self.wal_dir.join(format!("{}.wal", id)); - self.writer.delete(&path).unwrap(); + + if let Some(io_threadpool) = &self.io_threadpool { + let count = ids.end - ids.start; + let (tx, rx) = mpsc::channel(); + for id in ids { + let tx = tx.clone(); + let storage = self.clone(); + io_threadpool.execute(move || { + let path = storage.wal_dir.join(format!("{}.wal", id)); + storage.writer.delete(&path).unwrap(); + tx.send(()).unwrap(); + }); + } + for _ in rx.iter().take(count as usize) { + // Wait for all WAL segments to be deleted + } + } else { + for id in ids { + let path = self.wal_dir.join(format!("{}.wal", id)); + self.writer.delete(&path).unwrap(); + } } + tracer.end_span(span_delete_wal_segments); } @@ -311,19 +381,53 @@ impl Storage { } pub fn delete_orphaned_partitions( - &self, + self: &Arc, to_delete: Vec<(String, Vec<(u64, String)>)>, tracer: &mut SimpleTracer, ) { // Delete old partition files let span_delete_orphaned_partitions = tracer.start_span("delete_orphaned_partitions"); - for (table, to_delete) in &to_delete { - for (id, key) in to_delete { - let table_dir = self.tables_path.join(sanitize_table_name(table)); - let path = table_dir.join(partition_filename(*id, key)); - self.writer.delete(&path).unwrap(); + let mut table_count = 0; + let mut partition_count = 0; + + if let Some(io_threadpool) = &self.io_threadpool { + let (tx, rx) = mpsc::channel(); + let span_spawn_tasks = tracer.start_span("spawn_tasks"); + for (table, to_delete) in to_delete { + table_count += 1; + for (id, key) in to_delete { + partition_count += 1; + let tx = tx.clone(); + let storage = self.clone(); + let table = table.clone(); + io_threadpool.execute(move || { + let table_dir = storage.tables_path.join(sanitize_table_name(&table)); + let path = table_dir.join(partition_filename(id, &key)); + storage.writer.delete(&path).unwrap(); + tx.send(()).unwrap(); + }); + } + } + tracer.end_span(span_spawn_tasks); + + let span_wait_for_tasks = tracer.start_span("wait_for_tasks"); + for _ in rx.iter().take(partition_count) { + // Wait for all partitions to be deleted + } + tracer.end_span(span_wait_for_tasks); + } else { + for (table, to_delete) in &to_delete { + table_count += 1; + for (id, key) in to_delete { + partition_count += 1; + let table_dir = self.tables_path.join(sanitize_table_name(table)); + let path = table_dir.join(partition_filename(*id, key)); + self.writer.delete(&path).unwrap(); + } } } + tracer.annotate("table_count", table_count); + tracer.annotate("partition_count", partition_count); tracer.end_span(span_delete_orphaned_partitions); } diff --git a/src/locustdb.rs b/src/locustdb.rs index 085ef90b..ec9421a9 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -233,6 +233,8 @@ pub struct Options { pub max_partition_length: usize, /// Number of parallel threads used during WAL flush table batching and compacting pub wal_flush_compaction_threads: usize, + /// Number of parallel threads used for IO operations + pub io_threads: usize, /// Internal metrics collection interval in seconds pub metrics_interval: u64, /// Internal metrics table name @@ -254,6 +256,7 @@ impl Default for Options { batch_size: 1024, max_partition_length: 1024 * 1024, wal_flush_compaction_threads: 1, + io_threads: 1, metrics_interval: 15, metrics_table_name: Some("_metrics".to_string()), } diff --git a/src/observability/simple_trace.rs b/src/observability/simple_trace.rs index eae6b0b8..6321ae56 100644 --- a/src/observability/simple_trace.rs +++ b/src/observability/simple_trace.rs @@ -2,60 +2,81 @@ use std::collections::HashMap; use std::fmt::Display; use std::time::{Duration, Instant}; -#[derive(Debug, Default)] +#[derive(Debug)] pub struct SimpleTracer { - completed_spans: Vec, - span_stack: Vec<(&'static str, Instant, usize)>, - annotations: Vec<(String, String)>, + open_spans: Vec, } #[derive(Debug)] pub struct SimpleSpan { - pub name: String, + pub name: &'static str, pub duration: Duration, pub depth: usize, pub annotations: Vec<(String, String)>, - pub child_count: usize, + pub children: Vec, +} + +#[derive(Debug)] +struct OpenSpan { + pub name: &'static str, + pub depth: usize, + pub start_time: Instant, + pub annotations: Vec<(String, String)>, + pub children: Vec, } pub struct SpanToken(&'static str); -impl SimpleTracer { - pub fn new() -> Self { +impl Default for SimpleTracer { + fn default() -> Self { SimpleTracer { - completed_spans: Vec::new(), - span_stack: Vec::new(), - annotations: Vec::new(), + open_spans: vec![OpenSpan { + name: "", + depth: 0, + start_time: Instant::now(), + annotations: Vec::new(), + children: Vec::new(), + }], } } +} +impl SimpleTracer { #[must_use] - pub fn start_span(&mut self, full_name: &'static str) -> SpanToken { - self.span_stack.push((full_name, Instant::now(), 0)); - SpanToken(full_name) + pub fn start_span(&mut self, name: &'static str) -> SpanToken { + self.open_spans.push(OpenSpan { + name, + depth: self.open_spans.len() - 1, + start_time: Instant::now(), + annotations: Vec::new(), + children: Vec::new(), + }); + SpanToken(name) } pub fn end_span(&mut self, span_token: SpanToken) { assert!( - self.span_stack.last().unwrap().0 == span_token.0, + self.open_spans.last().unwrap().name == span_token.0, "Span token mismatch" ); - let (_, start_time, child_count) = self.span_stack.pop().unwrap(); - if let Some((_, _, child_count)) = self.span_stack.last_mut() { - *child_count += 1; - } - let duration = start_time.elapsed(); - self.completed_spans.push(SimpleSpan { - name: span_token.0.to_string(), + let span = self.open_spans.pop().unwrap(); + let duration = span.start_time.elapsed(); + let span = SimpleSpan { + name: span.name, duration, - depth: self.span_stack.len(), - annotations: std::mem::take(&mut self.annotations), - child_count, - }); + depth: span.depth, + annotations: span.annotations, + children: span.children, + }; + self.open_spans.last_mut().unwrap().children.push(span); } pub fn annotate(&mut self, key: &'static str, value: S) { - self.annotations.push((key.to_string(), value.to_string())); + self.open_spans + .last_mut() + .unwrap() + .annotations + .push((key.to_string(), value.to_string())); } pub fn summary(&self) -> String { @@ -71,68 +92,60 @@ impl SimpleTracer { // string_sorting: 0.2s let mut result = String::new(); + for span in &self.open_spans.last().unwrap().children { + result.push_str(&span.summary().to_string()); + } + result + } +} + +impl SimpleSpan { + fn summary(&self) -> String { + let mut result = String::new(); + + let indent = " ".repeat(self.depth); - // Process spans in reverse order - let mut i = self.completed_spans.len(); - while i > 0 { - i -= 1; - let span = &self.completed_spans[i]; - let indent = " ".repeat(span.depth); - - // Render span name, duration, and annotations - result.push_str(&format!( - "{}{}: {}\n", - indent, - span.name, - format_duration(span.duration) - )); - for (key, value) in &span.annotations { - result.push_str(&format!("{} - {}: {}\n", indent, key, value)); + result.push_str(&format!( + "{}{}: {}\n", + indent, + self.name, + format_duration(self.duration) + )); + + for (key, value) in &self.annotations { + result.push_str(&format!("{} - {}: {}\n", indent, key, value)); + } + + if self.children.len() > 10 { + let mut aggregated_spans: HashMap<&'static str, (Duration, usize)> = HashMap::new(); + for child in &self.children { + let entry = aggregated_spans + .entry(child.name) + .or_insert((Duration::ZERO, 0)); + entry.0 += child.duration; + entry.1 += 1; } - // If span has more than 10 children, aggregate similar subspans - if span.child_count > 10 { - // Find and aggregate all children of this span - let mut aggregated_spans: HashMap = HashMap::new(); - let mut path = Vec::new(); - let mut last_depth = span.depth; - - while i > 0 && self.completed_spans[i - 1].depth > span.depth { - i -= 1; - if last_depth == self.completed_spans[i].depth { - path.pop(); - } - path.push(self.completed_spans[i].name.clone()); - let name = path.join("."); - let entry = aggregated_spans.entry(name).or_insert((Duration::ZERO, 0)); - entry.0 += self.completed_spans[i].duration; - entry.1 += 1; - - last_depth = self.completed_spans[i].depth; - } + let mut agg_spans: Vec<_> = aggregated_spans.into_iter().collect(); + agg_spans.sort_by(|a, b| a.0.cmp(b.0)); - // Print aggregated children sorted by name - let mut agg_spans: Vec<_> = aggregated_spans.into_iter().collect(); - agg_spans.sort_by(|a, b| a.0.cmp(&b.0)); - - for (name, (total_duration, count)) in agg_spans { - let child_indent = " ".repeat(span.depth + 1); - let duration_str = format_duration(total_duration); - if count > 1 { - result.push_str(&format!( - "{}{}: {} (× {})\n", - child_indent, name, duration_str, count - )); - } else { - result.push_str(&format!("{}{}: {}\n", child_indent, name, duration_str)); - } + for (name, (total_duration, count)) in agg_spans { + let duration_str = format_duration(total_duration); + if count > 1 { + result.push_str(&format!("{}: {} (× {})\n", name, duration_str, count)); + } else { + result.push_str(&format!("{}: {}\n", name, duration_str)); } } + } else { + for child in &self.children { + result.push_str(&child.summary().to_string()); + } } - result } } + fn format_duration(duration: Duration) -> String { let nanos = duration.as_nanos(); let value: f64; diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 60662826..382837de 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -65,8 +65,10 @@ impl InnerLocustDB { Some(path) => { let perf_counter = perf_counter.clone(); let lru = lru.clone(); + let io_threads = opts.io_threads; std::thread::spawn(move || { - let (storage, wal, wal_size) = Storage::new(&path, perf_counter, false); + let (storage, wal, wal_size) = + Storage::new(&path, perf_counter, false, io_threads); let tables = Table::restore_tables_from_disk(&storage, &lru); (Some(Arc::new(storage)), tables, wal, wal_size) @@ -330,7 +332,7 @@ impl InnerLocustDB { /// this function is never called concurrently. fn wal_flush(self: &Arc) { log::info!("Commencing WAL flush"); - let mut tracer = SimpleTracer::new(); + let mut tracer = SimpleTracer::default(); let span_wal_flush = tracer.start_span("wal_flush"); // Acquire wal_size lock to block creation of new WAL segments and modifications of open buffers, @@ -386,8 +388,8 @@ impl InnerLocustDB { tracer.end_span(span_batching); // Persist new partitions - if let Some(s) = self.storage.as_ref() { - s.persist_partitions(new_partitions, &mut tracer); + if let Some(storage) = self.storage.as_ref() { + storage.persist_partitions(new_partitions, &mut tracer); } // Write new segments from compactions to storage and apply compaction in-memory @@ -533,6 +535,12 @@ impl InnerLocustDB { // - get names of all columns // - run query for each column, construct Column // - create subpartitions + if !table.columns_names_loaded() { + let column_names = self + .query_column_names(table.name()) + .expect("Failed to query column names"); + table.init_column_names(column_names.into_iter().collect()); + } let colnames = table.column_names(); let data = table.snapshot_parts(parts); let mut columns = Vec::with_capacity(colnames.len()); diff --git a/tests/ingestion_test.rs b/tests/ingestion_test.rs index 198bf963..93d6e4fe 100644 --- a/tests/ingestion_test.rs +++ b/tests/ingestion_test.rs @@ -25,6 +25,7 @@ async fn test_ingestion() { log::info!("Creating LocustDB at {:?}", db_path); let opts = locustdb::Options { db_path: Some(db_path), + io_threads: 8, ..locustdb::Options::default() }; let port = 8888;