Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ mod map_last_stream;
mod on_drop_stream;
mod once_lock;
mod task_context_helpers;
mod time;
mod uuid;

pub(crate) use children_helpers::require_one_child;
pub(crate) use map_last_stream::map_last_stream;
pub(crate) use on_drop_stream::on_drop_stream;
pub(crate) use once_lock::OnceLockResult;
pub(crate) use task_context_helpers::task_ctx_with_extension;
pub(crate) use time::now_ns;
pub(crate) use uuid::{deserialize_uuid, serialize_uuid};
9 changes: 9 additions & 0 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use std::time::{SystemTime, UNIX_EPOCH};

/// Nanoseconds elapsed since UNIX epoch.
pub(crate) fn now_ns() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos() as u64)
.unwrap_or(0)
}
12 changes: 6 additions & 6 deletions src/coordinator/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DistributedExec {
plan: Arc<dyn ExecutionPlan>,
prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
metrics: ExecutionPlanMetricsSet,
pub(crate) task_metrics: Option<Arc<MetricsStore>>,
pub(crate) metrics_store: Option<Arc<MetricsStore>>,
}

pub(super) struct PreparedPlan {
Expand All @@ -44,13 +44,13 @@ impl DistributedExec {
plan,
prepared_plan: Arc::new(Mutex::new(None)),
metrics: ExecutionPlanMetricsSet::new(),
task_metrics: None,
metrics_store: None,
}
}

/// Enables task metrics collection from remote workers.
pub fn with_metrics_collection(mut self, enabled: bool) -> Self {
self.task_metrics = match enabled {
self.metrics_store = match enabled {
true => Some(Arc::new(MetricsStore::new())),
false => None,
};
Expand All @@ -66,7 +66,7 @@ impl DistributedExec {
/// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics
pub async fn wait_for_metrics(&self) {
let mut expected_keys: Vec<TaskKey> = Vec::new();
let Some(task_metrics) = &self.task_metrics else {
let Some(task_metrics) = &self.metrics_store else {
return;
};
let _ = self.plan.apply(|plan| {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl ExecutionPlan for DistributedExec {
plan: require_one_child(&children)?,
prepared_plan: self.prepared_plan.clone(),
metrics: self.metrics.clone(),
task_metrics: self.task_metrics.clone(),
metrics_store: self.metrics_store.clone(),
}))
}

Expand All @@ -158,7 +158,7 @@ impl ExecutionPlan for DistributedExec {
let PreparedPlan {
head_stage,
join_set,
} = prepare_static_plan(&self.plan, &self.metrics, &self.task_metrics, &context)?;
} = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
{
let mut guard = self
.prepared_plan
Expand Down
16 changes: 5 additions & 11 deletions src/coordinator/metrics_store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::TaskKey;
use crate::worker::generated::worker as pb;
use datafusion::common::HashMap;
use tokio::sync::watch;

type MetricsMap = HashMap<TaskKey, Vec<crate::worker::generated::worker::MetricsSet>>;
type MetricsMap = HashMap<TaskKey, pb::TaskMetrics>;

/// Stores the metrics collected from all worker tasks, and notifies waiters when new entries arrive.
#[derive(Debug, Clone)]
Expand All @@ -17,26 +18,19 @@ impl MetricsStore {
Self { tx, rx }
}

pub(crate) fn insert(
&self,
key: TaskKey,
metrics: Vec<crate::worker::generated::worker::MetricsSet>,
) {
pub(crate) fn insert(&self, key: TaskKey, metrics: pb::TaskMetrics) {
self.tx.send_modify(|map| {
map.insert(key, metrics);
});
}

pub(crate) fn get(
&self,
key: &TaskKey,
) -> Option<Vec<crate::worker::generated::worker::MetricsSet>> {
pub(crate) fn get(&self, key: &TaskKey) -> Option<pb::TaskMetrics> {
self.rx.borrow().get(key).cloned()
}

#[cfg(test)]
pub(crate) fn from_entries(
entries: impl IntoIterator<Item = (TaskKey, Vec<crate::worker::generated::worker::MetricsSet>)>,
entries: impl IntoIterator<Item = (TaskKey, pb::TaskMetrics)>,
) -> Self {
let map: HashMap<_, _> = entries.into_iter().collect();
let (tx, rx) = watch::channel(map);
Expand Down
7 changes: 5 additions & 2 deletions src/coordinator/task_spawner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::common::{serialize_uuid, task_ctx_with_extension};
use crate::common::{now_ns, serialize_uuid, task_ctx_with_extension};
use crate::config_extension_ext::get_config_extension_propagation_headers;
use crate::coordinator::MetricsStore;
use crate::execution_plans::ChildrenIsolatorUnionExec;
Expand Down Expand Up @@ -44,6 +44,7 @@ use uuid::Uuid;
pub(super) struct CoordinatorToWorkerMetrics {
pub(super) plan_bytes_sent: Count,
pub(super) plan_send_latency: Arc<LatencyMetric>,
pub(super) instantiation_time: u64,
}

impl CoordinatorToWorkerMetrics {
Expand All @@ -59,6 +60,7 @@ impl CoordinatorToWorkerMetrics {
|b| b.with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")),
metrics,
)),
instantiation_time: now_ns(),
}
}
}
Expand Down Expand Up @@ -182,6 +184,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
task_key: Some(task_key.clone()),
work_unit_feed_declarations,
target_worker_url: url.to_string(),
query_start_time_ns: self.metrics.instantiation_time,
})),
};
let plan_size = self.plan_proto.len();
Expand Down Expand Up @@ -254,7 +257,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
match inner {
pb::worker_to_coordinator_msg::Inner::TaskMetrics(pre_order_metrics) => {
if let Some(task_metrics) = &task_metrics {
task_metrics.insert(task_key.clone(), pre_order_metrics.metrics);
task_metrics.insert(task_key.clone(), pre_order_metrics);
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/metrics/task_metrics_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ mod tests {
// Ensure that there's metrics for each node for each task for each stage.
for expected_task_key in expected_task_keys {
let actual_metrics = dist_exec
.task_metrics
.metrics_store
.as_ref()
.unwrap()
.get(&expected_task_key)
Expand All @@ -189,7 +189,7 @@ mod tests {
let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap();
let stage_plan = stage.local_plan().unwrap();
assert_eq!(
actual_metrics.len(),
actual_metrics.pre_order_plan_metrics.len(),
count_plan_nodes_up_to_network_boundary(stage_plan),
"Mismatch between collected metrics and actual nodes for {expected_task_key:?}"
);
Expand Down Expand Up @@ -295,7 +295,7 @@ mod tests {

for expected_task_key in &expected_task_keys {
let actual_metrics = dist_exec
.task_metrics
.metrics_store
.as_ref()
.unwrap()
.get(expected_task_key)
Expand All @@ -309,7 +309,7 @@ mod tests {
let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap();
let stage_plan = stage.local_plan().unwrap();
assert_eq!(
actual_metrics.len(),
actual_metrics.pre_order_plan_metrics.len(),
count_plan_nodes_up_to_network_boundary(stage_plan),
"Mismatch between collected metrics and actual nodes for {expected_task_key:?}"
);
Expand Down Expand Up @@ -350,7 +350,7 @@ mod tests {
// Verify all nodes (including PartitionIsolatorExec) are preserved in metrics collection
for expected_task_key in expected_task_keys {
let actual_metrics = dist_exec
.task_metrics
.metrics_store
.as_ref()
.unwrap()
.get(&expected_task_key)
Expand All @@ -361,7 +361,7 @@ mod tests {
// Verify metrics count matches - this ensures all nodes are included in metrics collection
// regardless of whether they have metrics or not (some nodes may have empty metrics sets)
assert_eq!(
actual_metrics.len(),
actual_metrics.pre_order_plan_metrics.len(),
count_plan_nodes_up_to_network_boundary(stage_plan),
"Metrics count must match plan nodes for stage {expected_task_key:?}"
);
Expand Down
17 changes: 11 additions & 6 deletions src/metrics/task_metrics_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub async fn rewrite_distributed_plan_with_metrics(

distributed_exec.wait_for_metrics().await;

let Some(metrics_collection) = distributed_exec.task_metrics.clone() else {
let Some(metrics_collection) = distributed_exec.metrics_store.clone() else {
return Ok(plan);
};

Expand Down Expand Up @@ -227,13 +227,13 @@ pub fn stage_metrics_rewriter(
};
match metrics_collection.get(&task_key) {
Some(task_metrics) => {
if node_idx >= task_metrics.len() {
if node_idx >= task_metrics.pre_order_plan_metrics.len() {
return internal_err!(
"not enough metrics provided to rewrite task: {} metrics provided",
task_metrics.len()
task_metrics.pre_order_plan_metrics.len()
);
}
let node_metrics_protos = task_metrics[node_idx].clone();
let node_metrics_protos = task_metrics.pre_order_plan_metrics[node_idx].clone();
let mut node_metrics = metrics_set_proto_to_df(&node_metrics_protos)?;

let rewrite_ctx = format.to_rewrite_ctx(task_id as u64);
Expand Down Expand Up @@ -457,7 +457,11 @@ mod tests {
)
})
.collect::<Vec<pb::MetricsSet>>();
(task_key, metrics)
let task_metrics = pb::TaskMetrics {
task_metrics: None,
pre_order_plan_metrics: metrics,
};
(task_key, task_metrics)
}));
let metrics_collection = Arc::new(metrics_collection);

Expand Down Expand Up @@ -489,7 +493,8 @@ mod tests {
stage_id: stage.num as u64,
task_number: task_id as u64,
})
.unwrap()[node_id]
.unwrap()
.pre_order_plan_metrics[node_id]
.clone();

let mut actual_metrics_set = MetricsSet::new();
Expand Down
61 changes: 50 additions & 11 deletions src/stage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::coordinator::DistributedExec;
use crate::coordinator::{DistributedExec, MetricsStore};
use crate::execution_plans::NetworkCoalesceExec;
use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL;
use crate::{NetworkShuffleExec, PartitionIsolatorExec};
Expand Down Expand Up @@ -157,6 +157,9 @@ impl DistributedTaskContext {
}
}

use crate::common::serialize_uuid;
use crate::metrics::proto::metric_proto_to_df;
use crate::worker::generated::worker as pb;
use crate::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics};
use crate::{NetworkBoundary, NetworkBoundaryExt};
use datafusion::common::DataFusionError;
Expand Down Expand Up @@ -199,7 +202,7 @@ const HORIZONTAL: &str = "─"; // Horizontal line
pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String {
if let Some(plan) = plan.as_any().downcast_ref::<DistributedExec>() {
let mut f = String::new();
display_ascii(Either::Left(plan), 0, show_metrics, &mut f).unwrap();
display_ascii(plan, Either::Left(plan), 0, show_metrics, &mut f).unwrap();
f
} else {
match show_metrics {
Expand All @@ -212,6 +215,7 @@ pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> Strin
}

fn display_ascii(
root: &DistributedExec,
stage: Either<&DistributedExec, &Stage>,
depth: usize,
show_metrics: bool,
Expand All @@ -228,23 +232,24 @@ fn display_ascii(
};
match stage {
Either::Left(dist_exec) => {
writeln!(
write!(
f,
"{}{}{} DistributedExec {} {}{}",
"{}{}{} DistributedExec {} {}",
" ".repeat(depth),
LTCORNER,
HORIZONTAL.repeat(5),
HORIZONTAL.repeat(2),
format_tasks_for_stage(1, plan),
if show_metrics {
format_metrics_by_task(&dist_exec.metrics().unwrap_or_default())
} else {
"".into()
}
)?;
if show_metrics && let Some(metrics) = dist_exec.metrics() {
write!(f, " ")?;
writeln!(f, "{}", format_metrics_by_task(&metrics))?;
} else {
writeln!(f)?;
}
}
Either::Right(stage) => {
writeln!(
write!(
f,
"{}{}{} Stage {} {} {}",
" ".repeat(depth),
Expand All @@ -254,6 +259,13 @@ fn display_ascii(
HORIZONTAL.repeat(2),
format_tasks_for_stage(stage.task_count(), plan)
)?;
if show_metrics && let Some(metrics_store) = &root.metrics_store {
let metrics = gather_stage_header_metrics(stage, metrics_store);
write!(f, " ")?;
writeln!(f, "{}", format_metrics_by_task(&metrics))?;
} else {
writeln!(f)?;
}
}
}

Expand All @@ -273,7 +285,7 @@ fn display_ascii(
HORIZONTAL.repeat(50)
)?;
for input_stage in find_input_stages(plan.as_ref()) {
display_ascii(Either::Right(input_stage), depth + 1, show_metrics, f)?;
display_ascii(root, Either::Right(input_stage), depth + 1, show_metrics, f)?;
}
Ok(())
}
Expand Down Expand Up @@ -317,6 +329,33 @@ fn display_inner_ascii(
Ok(())
}

/// Gathers the metrics global to a stage. These metrics are not specific to any plan node, and
/// are instead global to a whole stage.
fn gather_stage_header_metrics(stage: &Stage, metrics_store: &MetricsStore) -> MetricsSet {
let mut task_key = pb::TaskKey {
query_id: serialize_uuid(&stage.query_id()),
stage_id: stage.num() as u64,
task_number: 0,
};
let mut all_metrics = MetricsSet::new();
while let Some(task_metrics) = metrics_store.get(&task_key) {
let Some(metrics_set) = task_metrics.task_metrics else {
continue;
};
for mut metric in metrics_set.metrics {
metric.labels.push(pb::Label {
name: DISTRIBUTED_DATAFUSION_TASK_ID_LABEL.to_string(),
value: task_key.task_number.to_string(),
});
if let Ok(metric) = metric_proto_to_df(metric) {
all_metrics.push(metric)
};
}
task_key.task_number += 1;
}
all_metrics
}

/// Aggregates metrics by (name, task_id), preserving the [DISTRIBUTED_DATAFUSION_TASK_ID_LABEL]
/// only. Metrics without a task_id label (ie. non distributed metrics) are aggregated together.
///
Expand Down
Loading
Loading