diff --git a/src/observability/simple_trace.rs b/src/observability/simple_trace.rs index 6321ae5..9615ccb 100644 --- a/src/observability/simple_trace.rs +++ b/src/observability/simple_trace.rs @@ -97,6 +97,27 @@ impl SimpleTracer { } result } + + pub fn push_tracer(&mut self, mut tracer: SimpleTracer) { + assert_eq!(tracer.open_spans.len(), 1); + let mut children = tracer.open_spans.pop().unwrap().children; + // Set depth of children to the depth of the current span + 1 + for child in children.iter_mut() { + set_depth(child, self.open_spans.len()); + } + self.open_spans.last_mut().unwrap().children.extend(children); + } + + pub fn elapsed(&self) -> Duration { + self.open_spans.last().unwrap().start_time.elapsed() + } +} + +fn set_depth(child: &mut SimpleSpan, depth: usize) { + child.depth = depth; + for child in child.children.iter_mut() { + set_depth(child, depth + 1); + } } impl SimpleSpan { @@ -132,9 +153,9 @@ impl SimpleSpan { for (name, (total_duration, count)) in agg_spans { let duration_str = format_duration(total_duration); if count > 1 { - result.push_str(&format!("{}: {} (× {})\n", name, duration_str, count)); + result.push_str(&format!("{} {}: {} (×{})\n", indent, name, duration_str, count)); } else { - result.push_str(&format!("{}: {}\n", name, duration_str)); + result.push_str(&format!("{} {}: {}\n", indent, name, duration_str)); } } } else { diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 382837d..112a986 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -36,6 +36,9 @@ use self::meta_store::SubpartitionMetadata; use self::raw_col::MixedCol; use self::wal_segment::WalSegment; +// Table name + list of partitions +pub type PartitionList = (String, Vec<(u64, String)>); + pub struct InnerLocustDB { tables: RwLock>>, lru: Lru, @@ -400,11 +403,32 @@ impl InnerLocustDB { let tx = tx.clone(); let this = self.clone(); self.walflush_threadpool.execute(move || { - let to_delete = this.compact(table, id, range, &parts); - tx.send(to_delete).unwrap(); + let (to_delete, tracer) = this.compact(table, id, range, &parts); + tx.send((to_delete, tracer)).unwrap(); }); } - let partitions_to_delete = rx.iter().take(num_compactions).flatten().collect(); + let mut partitions_to_delete = vec![]; + let mut longest_span: Option<(SimpleTracer, Duration)> = None; + for (to_delete, tracer) in rx.iter().take(num_compactions) { + if let Some(to_delete) = to_delete { + partitions_to_delete.push(to_delete); + } + let elapsed = tracer.elapsed(); + if longest_span.is_none() || elapsed > longest_span.as_ref().unwrap().1 { + longest_span = Some((tracer, elapsed)); + } + } + tracer.annotate("table_count", num_compactions); + tracer.annotate( + "partition_count", + partitions_to_delete + .iter() + .map(|(_, p)| p.len()) + .sum::(), + ); + if let Some((compaction_tracer, _)) = longest_span { + tracer.push_tracer(compaction_tracer); + } tracer.end_span(span_compaction); // Update metastore and clean up orphaned partitions and WAL segments @@ -530,11 +554,14 @@ impl InnerLocustDB { id: PartitionID, range: Range, parts: &[u64], - ) -> Option<(String, Vec<(u64, String)>)> { + ) -> (Option, SimpleTracer) { // get table, create new merged partition/sub-partitions (not registered with table) // - get names of all columns // - run query for each column, construct Column // - create subpartitions + let mut tracer = SimpleTracer::default(); + + let span_load_column_names = tracer.start_span("load_column_names"); if !table.columns_names_loaded() { let column_names = self .query_column_names(table.name()) @@ -542,10 +569,20 @@ impl InnerLocustDB { table.init_column_names(column_names.into_iter().collect()); } let colnames = table.column_names(); + tracer.end_span(span_load_column_names); + + let span_snapshot_partitions = tracer.start_span("snapshot_partitions"); let data = table.snapshot_parts(parts); + tracer.end_span(span_snapshot_partitions); + + let span_load_columns = tracer.start_span("load_columns"); let mut columns = Vec::with_capacity(colnames.len()); for column in &colnames { + let span_create_query = tracer.start_span("create_query"); let query = Query::read_column(table.name(), column); + tracer.end_span(span_create_query); + + let span_schedule_query = tracer.start_span("schedule_query"); let (sender, receiver) = oneshot::channel(); let query_task = QueryTask::new( query, @@ -560,7 +597,13 @@ impl InnerLocustDB { ) .unwrap(); self.schedule(query_task); + tracer.end_span(span_schedule_query); + + let span_block_on = tracer.start_span("await_query"); let result = block_on(receiver).unwrap().unwrap(); + tracer.end_span(span_block_on); + + let span_column_builder = tracer.start_span("build_column"); let mut column_builder = MixedCol::default(); let column_data = result.columns.into_iter().next().unwrap().1; match column_data { @@ -572,6 +615,8 @@ impl InnerLocustDB { raws.into_iter().for_each(|r| column_builder.push(r)) } } + tracer.end_span(span_column_builder); + assert_eq!( range.len(), column_builder.len(), @@ -579,15 +624,25 @@ impl InnerLocustDB { column_builder.len(), table.name(), ); + + let span_finalize_column = tracer.start_span("finalize_column"); columns.push(column_builder.finalize(column)); + tracer.end_span(span_finalize_column); } + tracer.end_span(span_load_columns); + + let span_subpartition = tracer.start_span("subpartition"); let (metadata, subpartitions) = subpartition(&self.opts, columns.clone()); + tracer.end_span(span_subpartition); // replace old partitions with new partition + let span_compact_partitions = tracer.start_span("compact_partitions"); table.compact(id, range.start, columns, parts); + tracer.end_span(span_compact_partitions); // write new subpartitions to disk and update in-memory metastore - self.storage.as_ref().map(|s| { + let span_prepare_compact = tracer.start_span("prepare_compact"); + let to_delete = self.storage.as_ref().map(|s| { let to_delete = s.prepare_compact( table.name(), id, @@ -597,7 +652,10 @@ impl InnerLocustDB { range.start, ); (table.name().to_string(), to_delete) - }) + }); + tracer.end_span(span_prepare_compact); + + (to_delete, tracer) } pub fn restore(&self, id: PartitionID, column: Column) {