diff --git a/src/common/mod.rs b/src/common/mod.rs index de4e60e5..e8cb5b4f 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -4,6 +4,7 @@ mod on_drop_stream; mod once_lock; mod recursion; mod task_context_helpers; +mod time; mod uuid; pub(crate) use children_helpers::require_one_child; @@ -12,4 +13,5 @@ pub(crate) use on_drop_stream::on_drop_stream; pub(crate) use once_lock::OnceLockResult; pub(crate) use recursion::TreeNodeExt; pub(crate) use task_context_helpers::task_ctx_with_extension; +pub(crate) use time::now_ns; pub(crate) use uuid::{deserialize_uuid, serialize_uuid}; diff --git a/src/common/time.rs b/src/common/time.rs new file mode 100644 index 00000000..c055706f --- /dev/null +++ b/src/common/time.rs @@ -0,0 +1,9 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Nanoseconds elapsed since UNIX epoch. +pub(crate) fn now_ns() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos() as u64) + .unwrap_or(0) +} diff --git a/src/coordinator/distributed.rs b/src/coordinator/distributed.rs index 1c472b9a..de8bc1ac 100644 --- a/src/coordinator/distributed.rs +++ b/src/coordinator/distributed.rs @@ -30,7 +30,7 @@ pub struct DistributedExec { plan: Arc, prepared_plan: Arc>>>, metrics: ExecutionPlanMetricsSet, - pub(crate) task_metrics: Option>, + pub(crate) metrics_store: Option>, } pub(super) struct PreparedPlan { @@ -44,13 +44,13 @@ impl DistributedExec { plan, prepared_plan: Arc::new(Mutex::new(None)), metrics: ExecutionPlanMetricsSet::new(), - task_metrics: None, + metrics_store: None, } } /// Enables task metrics collection from remote workers. pub fn with_metrics_collection(mut self, enabled: bool) -> Self { - self.task_metrics = match enabled { + self.metrics_store = match enabled { true => Some(Arc::new(MetricsStore::new())), false => None, }; @@ -66,7 +66,7 @@ impl DistributedExec { /// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics pub async fn wait_for_metrics(&self) { let mut expected_keys: Vec = Vec::new(); - let Some(task_metrics) = &self.task_metrics else { + let Some(task_metrics) = &self.metrics_store else { return; }; let _ = self.plan.apply(|plan| { @@ -136,7 +136,7 @@ impl ExecutionPlan for DistributedExec { plan: require_one_child(&children)?, prepared_plan: self.prepared_plan.clone(), metrics: self.metrics.clone(), - task_metrics: self.task_metrics.clone(), + metrics_store: self.metrics_store.clone(), })) } @@ -158,7 +158,7 @@ impl ExecutionPlan for DistributedExec { let PreparedPlan { head_stage, join_set, - } = prepare_static_plan(&self.plan, &self.metrics, &self.task_metrics, &context)?; + } = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?; { let mut guard = self .prepared_plan diff --git a/src/coordinator/metrics_store.rs b/src/coordinator/metrics_store.rs index 4a9eee93..4b9b0ffc 100644 --- a/src/coordinator/metrics_store.rs +++ b/src/coordinator/metrics_store.rs @@ -1,8 +1,9 @@ use crate::TaskKey; +use crate::worker::generated::worker as pb; use datafusion::common::HashMap; use tokio::sync::watch; -type MetricsMap = HashMap>; +type MetricsMap = HashMap; /// Stores the metrics collected from all worker tasks, and notifies waiters when new entries arrive. #[derive(Debug, Clone)] @@ -17,26 +18,19 @@ impl MetricsStore { Self { tx, rx } } - pub(crate) fn insert( - &self, - key: TaskKey, - metrics: Vec, - ) { + pub(crate) fn insert(&self, key: TaskKey, metrics: pb::TaskMetrics) { self.tx.send_modify(|map| { map.insert(key, metrics); }); } - pub(crate) fn get( - &self, - key: &TaskKey, - ) -> Option> { + pub(crate) fn get(&self, key: &TaskKey) -> Option { self.rx.borrow().get(key).cloned() } #[cfg(test)] pub(crate) fn from_entries( - entries: impl IntoIterator)>, + entries: impl IntoIterator, ) -> Self { let map: HashMap<_, _> = entries.into_iter().collect(); let (tx, rx) = watch::channel(map); diff --git a/src/coordinator/task_spawner.rs b/src/coordinator/task_spawner.rs index c959a75b..f7a2a8a6 100644 --- a/src/coordinator/task_spawner.rs +++ b/src/coordinator/task_spawner.rs @@ -1,4 +1,4 @@ -use crate::common::{TreeNodeExt, serialize_uuid, task_ctx_with_extension}; +use crate::common::{TreeNodeExt, now_ns, serialize_uuid, task_ctx_with_extension}; use crate::config_extension_ext::get_config_extension_propagation_headers; use crate::coordinator::MetricsStore; use crate::passthrough_headers::get_passthrough_headers; @@ -43,6 +43,7 @@ use uuid::Uuid; pub(super) struct CoordinatorToWorkerMetrics { pub(super) plan_bytes_sent: Count, pub(super) plan_send_latency: Arc, + pub(super) instantiation_time: u64, } impl CoordinatorToWorkerMetrics { @@ -58,6 +59,7 @@ impl CoordinatorToWorkerMetrics { |b| b.with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")), metrics, )), + instantiation_time: now_ns(), } } } @@ -149,6 +151,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { task_key: Some(task_key.clone()), work_unit_feed_declarations, target_worker_url: url.to_string(), + query_start_time_ns: self.metrics.instantiation_time, })), }; let plan_size = self.plan_proto.len(); @@ -221,7 +224,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> { match inner { pb::worker_to_coordinator_msg::Inner::TaskMetrics(pre_order_metrics) => { if let Some(task_metrics) = &task_metrics { - task_metrics.insert(task_key.clone(), pre_order_metrics.metrics); + task_metrics.insert(task_key.clone(), pre_order_metrics); } } } diff --git a/src/metrics/task_metrics_collector.rs b/src/metrics/task_metrics_collector.rs index 1cf6d90a..78ce4897 100644 --- a/src/metrics/task_metrics_collector.rs +++ b/src/metrics/task_metrics_collector.rs @@ -176,7 +176,7 @@ mod tests { // Ensure that there's metrics for each node for each task for each stage. for expected_task_key in expected_task_keys { let actual_metrics = dist_exec - .task_metrics + .metrics_store .as_ref() .unwrap() .get(&expected_task_key) @@ -189,7 +189,7 @@ mod tests { let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap(); let stage_plan = stage.local_plan().unwrap(); assert_eq!( - actual_metrics.len(), + actual_metrics.pre_order_plan_metrics.len(), count_plan_nodes_up_to_network_boundary(stage_plan), "Mismatch between collected metrics and actual nodes for {expected_task_key:?}" ); @@ -295,7 +295,7 @@ mod tests { for expected_task_key in &expected_task_keys { let actual_metrics = dist_exec - .task_metrics + .metrics_store .as_ref() .unwrap() .get(expected_task_key) @@ -309,7 +309,7 @@ mod tests { let stage = stages.get(&(expected_task_key.stage_id as usize)).unwrap(); let stage_plan = stage.local_plan().unwrap(); assert_eq!( - actual_metrics.len(), + actual_metrics.pre_order_plan_metrics.len(), count_plan_nodes_up_to_network_boundary(stage_plan), "Mismatch between collected metrics and actual nodes for {expected_task_key:?}" ); @@ -350,7 +350,7 @@ mod tests { // Verify all nodes (including PartitionIsolatorExec) are preserved in metrics collection for expected_task_key in expected_task_keys { let actual_metrics = dist_exec - .task_metrics + .metrics_store .as_ref() .unwrap() .get(&expected_task_key) @@ -361,7 +361,7 @@ mod tests { // Verify metrics count matches - this ensures all nodes are included in metrics collection // regardless of whether they have metrics or not (some nodes may have empty metrics sets) assert_eq!( - actual_metrics.len(), + actual_metrics.pre_order_plan_metrics.len(), count_plan_nodes_up_to_network_boundary(stage_plan), "Metrics count must match plan nodes for stage {expected_task_key:?}" ); diff --git a/src/metrics/task_metrics_rewriter.rs b/src/metrics/task_metrics_rewriter.rs index 4b442f0a..4107c31e 100644 --- a/src/metrics/task_metrics_rewriter.rs +++ b/src/metrics/task_metrics_rewriter.rs @@ -55,7 +55,7 @@ pub async fn rewrite_distributed_plan_with_metrics( distributed_exec.wait_for_metrics().await; - let Some(metrics_collection) = distributed_exec.task_metrics.clone() else { + let Some(metrics_collection) = distributed_exec.metrics_store.clone() else { return Ok(plan); }; @@ -227,13 +227,13 @@ pub fn stage_metrics_rewriter( }; match metrics_collection.get(&task_key) { Some(task_metrics) => { - if node_idx >= task_metrics.len() { + if node_idx >= task_metrics.pre_order_plan_metrics.len() { return internal_err!( "not enough metrics provided to rewrite task: {} metrics provided", - task_metrics.len() + task_metrics.pre_order_plan_metrics.len() ); } - let node_metrics_protos = task_metrics[node_idx].clone(); + let node_metrics_protos = task_metrics.pre_order_plan_metrics[node_idx].clone(); let mut node_metrics = metrics_set_proto_to_df(&node_metrics_protos)?; let rewrite_ctx = format.to_rewrite_ctx(task_id as u64); @@ -457,7 +457,11 @@ mod tests { ) }) .collect::>(); - (task_key, metrics) + let task_metrics = pb::TaskMetrics { + task_metrics: None, + pre_order_plan_metrics: metrics, + }; + (task_key, task_metrics) })); let metrics_collection = Arc::new(metrics_collection); @@ -489,7 +493,8 @@ mod tests { stage_id: stage.num as u64, task_number: task_id as u64, }) - .unwrap()[node_id] + .unwrap() + .pre_order_plan_metrics[node_id] .clone(); let mut actual_metrics_set = MetricsSet::new(); diff --git a/src/stage.rs b/src/stage.rs index 5c584d7d..1d329a83 100644 --- a/src/stage.rs +++ b/src/stage.rs @@ -1,4 +1,4 @@ -use crate::coordinator::DistributedExec; +use crate::coordinator::{DistributedExec, MetricsStore}; use crate::execution_plans::NetworkCoalesceExec; use crate::metrics::DISTRIBUTED_DATAFUSION_TASK_ID_LABEL; use crate::{NetworkShuffleExec, PartitionIsolatorExec}; @@ -157,6 +157,9 @@ impl DistributedTaskContext { } } +use crate::common::serialize_uuid; +use crate::metrics::proto::metric_proto_to_df; +use crate::worker::generated::worker as pb; use crate::{DistributedMetricsFormat, rewrite_distributed_plan_with_metrics}; use crate::{NetworkBoundary, NetworkBoundaryExt}; use datafusion::common::DataFusionError; @@ -199,7 +202,7 @@ const HORIZONTAL: &str = "─"; // Horizontal line pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> String { if let Some(plan) = plan.as_any().downcast_ref::() { let mut f = String::new(); - display_ascii(Either::Left(plan), 0, show_metrics, &mut f).unwrap(); + display_ascii(plan, Either::Left(plan), 0, show_metrics, &mut f).unwrap(); f } else { match show_metrics { @@ -212,6 +215,7 @@ pub fn display_plan_ascii(plan: &dyn ExecutionPlan, show_metrics: bool) -> Strin } fn display_ascii( + root: &DistributedExec, stage: Either<&DistributedExec, &Stage>, depth: usize, show_metrics: bool, @@ -228,23 +232,24 @@ fn display_ascii( }; match stage { Either::Left(dist_exec) => { - writeln!( + write!( f, - "{}{}{} DistributedExec {} {}{}", + "{}{}{} DistributedExec {} {}", " ".repeat(depth), LTCORNER, HORIZONTAL.repeat(5), HORIZONTAL.repeat(2), format_tasks_for_stage(1, plan), - if show_metrics { - format_metrics_by_task(&dist_exec.metrics().unwrap_or_default()) - } else { - "".into() - } )?; + if show_metrics && let Some(metrics) = dist_exec.metrics() { + write!(f, " ")?; + writeln!(f, "{}", format_metrics_by_task(&metrics))?; + } else { + writeln!(f)?; + } } Either::Right(stage) => { - writeln!( + write!( f, "{}{}{} Stage {} {} {}", " ".repeat(depth), @@ -254,6 +259,13 @@ fn display_ascii( HORIZONTAL.repeat(2), format_tasks_for_stage(stage.task_count(), plan) )?; + if show_metrics && let Some(metrics_store) = &root.metrics_store { + let metrics = gather_stage_header_metrics(stage, metrics_store); + write!(f, " ")?; + writeln!(f, "{}", format_metrics_by_task(&metrics))?; + } else { + writeln!(f)?; + } } } @@ -273,7 +285,7 @@ fn display_ascii( HORIZONTAL.repeat(50) )?; for input_stage in find_input_stages(plan.as_ref()) { - display_ascii(Either::Right(input_stage), depth + 1, show_metrics, f)?; + display_ascii(root, Either::Right(input_stage), depth + 1, show_metrics, f)?; } Ok(()) } @@ -317,6 +329,33 @@ fn display_inner_ascii( Ok(()) } +/// Gathers the metrics global to a stage. These metrics are not specific to any plan node, and +/// are instead global to a whole stage. +fn gather_stage_header_metrics(stage: &Stage, metrics_store: &MetricsStore) -> MetricsSet { + let mut task_key = pb::TaskKey { + query_id: serialize_uuid(&stage.query_id()), + stage_id: stage.num() as u64, + task_number: 0, + }; + let mut all_metrics = MetricsSet::new(); + while let Some(task_metrics) = metrics_store.get(&task_key) { + let Some(metrics_set) = task_metrics.task_metrics else { + continue; + }; + for mut metric in metrics_set.metrics { + metric.labels.push(pb::Label { + name: DISTRIBUTED_DATAFUSION_TASK_ID_LABEL.to_string(), + value: task_key.task_number.to_string(), + }); + if let Ok(metric) = metric_proto_to_df(metric) { + all_metrics.push(metric) + }; + } + task_key.task_number += 1; + } + all_metrics +} + /// Aggregates metrics by (name, task_id), preserving the [DISTRIBUTED_DATAFUSION_TASK_ID_LABEL] /// only. Metrics without a task_id label (ie. non distributed metrics) are aggregated together. /// diff --git a/src/worker/generated/worker.rs b/src/worker/generated/worker.rs index 57aa698a..728fa2d2 100644 --- a/src/worker/generated/worker.rs +++ b/src/worker/generated/worker.rs @@ -33,16 +33,21 @@ pub mod worker_to_coordinator_msg { /// ensuring metrics are never lost due to early stream termination. /// metrics\[i\] is the set of metrics for plan node i in pre-order traversal order. #[prost(message, tag = "1")] - TaskMetrics(super::PreOrderTaskMetrics), + TaskMetrics(super::TaskMetrics), } } -/// Metrics for a single task's plan nodes in pre-order traversal order. -/// The TaskKey is implicit — it is determined by the SetPlanRequest that -/// opened this coordinator channel connection. #[derive(Clone, PartialEq, ::prost::Message)] -pub struct PreOrderTaskMetrics { +pub struct TaskMetrics { + /// Metrics for a single task's plan nodes in pre-order traversal order. + /// The TaskKey is implicit — it is determined by the SetPlanRequest that + /// opened this coordinator channel connection. #[prost(message, repeated, tag = "1")] - pub metrics: ::prost::alloc::vec::Vec, + pub pre_order_plan_metrics: ::prost::alloc::vec::Vec, + /// Metrics related to the execution of a task within a stage. This metrics, instead of being + /// associated to a specific node, they are global to the task, like the time at which the plan + /// was fed by the coordinator to the worker. + #[prost(message, optional, tag = "2")] + pub task_metrics: ::core::option::Option, } #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct GetWorkerInfoRequest {} @@ -74,6 +79,10 @@ pub struct SetPlanRequest { /// itself, and avoid further gRPC calls in case it needs to call itself for executing remote tasks. #[prost(string, tag = "5")] pub target_worker_url: ::prost::alloc::string::String, + /// Unix nanos when the query started as reported by the coordinator. Used for collecting temporal metrics + /// relative to when the query was fired in the coordinator. + #[prost(uint64, tag = "6")] + pub query_start_time_ns: u64, } /// Nested message and enum types in `SetPlanRequest`. pub mod set_plan_request { diff --git a/src/worker/impl_coordinator_channel.rs b/src/worker/impl_coordinator_channel.rs index dfc5da33..b246085d 100644 --- a/src/worker/impl_coordinator_channel.rs +++ b/src/worker/impl_coordinator_channel.rs @@ -7,6 +7,7 @@ use crate::worker::generated::worker::worker_service_server::WorkerService; use crate::worker::generated::worker::{ CoordinatorToWorkerMsg, WorkerToCoordinatorMsg, worker_to_coordinator_msg, }; +use crate::worker::task_data::TaskDataMetrics; use crate::{ DistributedCodec, DistributedConfig, DistributedExt, DistributedTaskContext, TaskData, Worker, WorkerQueryContext, @@ -108,6 +109,7 @@ impl Worker { true => Arc::new(std::sync::Mutex::new(Some(metrics_tx))), false => Arc::new(std::sync::Mutex::new(None)), }, + task_data_metrics: Arc::new(TaskDataMetrics::new(request.query_start_time_ns)), }) }; diff --git a/src/worker/impl_execute_task.rs b/src/worker/impl_execute_task.rs index 13e3a4a8..802f95af 100644 --- a/src/worker/impl_execute_task.rs +++ b/src/worker/impl_execute_task.rs @@ -1,8 +1,8 @@ use crate::DistributedConfig; -use crate::common::{map_last_stream, on_drop_stream}; +use crate::common::{map_last_stream, now_ns, on_drop_stream}; use crate::metrics::proto::df_metrics_set_to_proto; use crate::protobuf::datafusion_error_to_tonic_status; -use crate::worker::generated::worker::{FlightAppMetadata, PreOrderTaskMetrics}; +use crate::worker::generated::worker::{FlightAppMetadata, TaskMetrics}; use crate::worker::worker_service::{TaskDataEntries, Worker}; use arrow_flight::encode::{DictionaryHandling, FlightDataEncoder, FlightDataEncoderBuilder}; use arrow_flight::error::FlightError; @@ -14,6 +14,7 @@ use datafusion::common::{Result, exec_err, internal_err}; use crate::worker::generated::worker::ExecuteTaskRequest; use crate::worker::generated::worker::worker_service_server::WorkerService; use crate::worker::spawn_select_all::spawn_select_all; +use crate::worker::task_data::TaskDataMetrics; use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::ipc::writer::IpcWriteOptions; use datafusion::common::exec_datafusion_err; @@ -26,7 +27,7 @@ use prost::Message; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::{AtomicBool, Ordering}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use tokio::sync::oneshot::Sender; use tokio_stream::StreamExt; use tonic::{Request, Response, Status}; @@ -58,6 +59,7 @@ pub(crate) async fn execute_local_task( .await .map_err(|e| exec_datafusion_err!("Worker::execute_task timed-out while waiting for the plan to be set by the coordinator. ({e})"))? .map_err(DataFusionError::Shared)?; + task_data.task_data_metrics.mark_execution_started_once(); let plan = task_data.plan; let task_ctx = task_data.task_ctx; @@ -87,9 +89,10 @@ pub(crate) async fn execute_local_task( let key_clone = key.clone(); let plan = Arc::clone(&plan); - let plan_for_drop = Arc::clone(&plan); + let plan_cloned = Arc::clone(&plan); let fully_finished = Arc::new(AtomicBool::new(false)); let fully_finished_cloned = Arc::clone(&fully_finished); + let task_data_metrics = Arc::clone(&task_data.task_data_metrics); let stream = map_last_stream(stream, move |msg, last_msg_in_stream| { if !last_msg_in_stream { return msg; @@ -106,10 +109,11 @@ pub(crate) async fn execute_local_task( tokio::spawn(async move { entries.invalidate(&k).await; }); + task_data_metrics.mark_execution_finished(); if send_metrics { // Last message of the last partition. This is the moment to send // the metrics back. - send_metrics_via_channel(&metrics_tx, &plan); + send_metrics_via_channel(&metrics_tx, &plan, &task_data_metrics); } } fully_finished.store(true, Ordering::SeqCst); @@ -117,16 +121,17 @@ pub(crate) async fn execute_local_task( }); let num_partitions_remaining = Arc::clone(&task_data.num_partitions_remaining); - let task_data_entries_for_drop = Arc::clone(task_data_entries); + let task_data_entries = Arc::clone(task_data_entries); let metrics_tx = Arc::clone(&task_data.metrics_tx); - let key_for_drop = key.clone(); + let key_clone = key.clone(); + let task_data_metrics = Arc::clone(&task_data.task_data_metrics); let stream = on_drop_stream(stream, move || { if !fully_finished_cloned.load(Ordering::SeqCst) { // Stream was dropped before fully consumed -- see https://github.com/datafusion-contrib/datafusion-distributed/issues/412 // Send metrics via the coordinator channel so they are not lost. if num_partitions_remaining.fetch_sub(1, Ordering::SeqCst) == 1 { - let entries = Arc::clone(&task_data_entries_for_drop); - let k = key_for_drop.clone(); + let entries = Arc::clone(&task_data_entries); + let k = key_clone.clone(); // Fire-and-forget background tokio task to handle async // invalidate() within synchronous on_drop_stream. #[allow(clippy::disallowed_methods)] @@ -134,7 +139,7 @@ pub(crate) async fn execute_local_task( entries.invalidate(&k).await; }); if send_metrics { - send_metrics_via_channel(&metrics_tx, &plan_for_drop); + send_metrics_via_channel(&metrics_tx, &plan_cloned, &task_data_metrics); } } } @@ -180,10 +185,7 @@ pub(crate) async fn execute_remote_task( // the original per-partition streams in later steps. let flight_data = FlightAppMetadata { partition, - created_timestamp_unix_nanos: SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|duration| duration.as_nanos() as u64) - .unwrap_or(0), + created_timestamp_unix_nanos: now_ns(), }; msg.map(|v| v.with_app_metadata(flight_data.encode_to_vec())) }); @@ -243,12 +245,13 @@ fn build_flight_data_stream( /// Collects metrics from the plan in pre-order traversal order and sends them via the /// coordinator channel oneshot. fn send_metrics_via_channel( - metrics_tx: &Arc>>>, + metrics_tx: &Arc>>>, plan: &Arc, + task_data_metrics: &Arc, ) { - let mut metrics = vec![]; + let mut pre_order_plan_metrics = vec![]; let _ = plan.apply(|node| { - metrics.push( + pre_order_plan_metrics.push( node.metrics() .and_then(|m| df_metrics_set_to_proto(&m).ok()) .unwrap_or_default(), @@ -265,7 +268,10 @@ fn send_metrics_via_channel( }; let Some(tx) = tx else { return }; // Ignore send errors — the coordinator channel may have been dropped (e.g. query cancelled). - let _ = tx.send(PreOrderTaskMetrics { metrics }); + let _ = tx.send(TaskMetrics { + pre_order_plan_metrics, + task_metrics: Some(task_data_metrics.to_proto_metrics_set()), + }); } /// Garbage collects values sub-arrays. diff --git a/src/worker/task_data.rs b/src/worker/task_data.rs index 1454ecd9..d3e17f02 100644 --- a/src/worker/task_data.rs +++ b/src/worker/task_data.rs @@ -1,8 +1,12 @@ -use crate::worker::generated::worker::PreOrderTaskMetrics; +use crate::MaxLatencyMetric; +use crate::common::now_ns; +use crate::worker::generated::worker as pb; use datafusion::execution::TaskContext; +use datafusion::physical_expr_common::metrics::CustomMetricValue; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; use std::sync::atomic::AtomicUsize; +use std::time::Duration; use tokio::sync::oneshot; #[derive(Clone, Debug)] @@ -22,7 +26,80 @@ pub struct TaskData { /// Sender half of the metrics channel. `impl_execute_task` takes this (via `Option::take`) /// once all partitions have finished or been dropped, sending the collected metrics back to /// the coordinator through the `CoordinatorChannel` side channel. - pub(super) metrics_tx: Arc>>>, + pub(super) metrics_tx: Arc>>>, + /// Metrics related to the execution of a task within a stage. This metrics, instead of being + /// associated to a specific node, they are global to the task, like the time at which the plan + /// was fed by the coordinator to the worker. + pub(super) task_data_metrics: Arc, +} + +pub(crate) const PLAN_ADDED_AT_METRIC: &str = "plan_added_at"; +pub(crate) const PLAN_EXECUTED_AT_METRIC: &str = "plan_executed_at"; +pub(crate) const PLAN_FINISHED_AT_METRIC: &str = "plan_finished_at"; + +#[derive(Debug)] +pub(super) struct TaskDataMetrics { + pub(super) query_start_time_ns: u64, + /// When the plan was set by the coordinator. + pub(super) plan_added_at: MaxLatencyMetric, + /// When the plan execution was triggered by the parent worker. + pub(super) plan_executed_at: MaxLatencyMetric, + /// When the execution stream finished. + pub(super) plan_finished_at: MaxLatencyMetric, +} + +impl TaskDataMetrics { + pub(super) fn new(query_start_time_ns: u64) -> Self { + let plan_added_at = MaxLatencyMetric::default(); + plan_added_at.add_duration(Duration::from_nanos(now_ns() - query_start_time_ns)); + Self { + query_start_time_ns, + plan_added_at, + plan_finished_at: MaxLatencyMetric::default(), + plan_executed_at: MaxLatencyMetric::default(), + } + } + + pub(super) fn mark_execution_started_once(&self) { + if self.plan_executed_at.value() == 0 { + self.plan_executed_at + .add_duration(Duration::from_nanos(now_ns() - self.query_start_time_ns)) + } + } + + pub(super) fn mark_execution_finished(&self) { + self.plan_finished_at + .add_duration(Duration::from_nanos(now_ns() - self.query_start_time_ns)) + } + + pub(super) fn to_proto_metrics_set(&self) -> pb::MetricsSet { + let mut task_metrics_set = pb::MetricsSet { metrics: vec![] }; + + fn new_metric(name: &str, value: usize) -> pb::Metric { + pb::Metric { + partition: None, + labels: vec![], + value: Some(pb::metric::Value::CustomMaxLatency(pb::MaxLatency { + name: name.to_string(), + value: value as u64, + })), + } + } + task_metrics_set.metrics.push(new_metric( + PLAN_ADDED_AT_METRIC, + self.plan_added_at.as_usize(), + )); + task_metrics_set.metrics.push(new_metric( + PLAN_EXECUTED_AT_METRIC, + self.plan_executed_at.as_usize(), + )); + task_metrics_set.metrics.push(new_metric( + PLAN_FINISHED_AT_METRIC, + self.plan_finished_at.as_usize(), + )); + + task_metrics_set + } } impl TaskData { diff --git a/src/worker/test_utils/worker_handles.rs b/src/worker/test_utils/worker_handles.rs index 83a996c0..a214070d 100644 --- a/src/worker/test_utils/worker_handles.rs +++ b/src/worker/test_utils/worker_handles.rs @@ -1,5 +1,6 @@ use crate::config_extension_ext::set_distributed_option_extension; use crate::worker::generated::worker::TaskKey; +use crate::worker::task_data::TaskDataMetrics; use crate::{BoxCloneSyncChannel, DistributedConfig, DistributedExt, TaskData, Worker}; use arrow_ipc::CompressionType; use datafusion::arrow::datatypes::SchemaRef; @@ -217,6 +218,7 @@ pub async fn register_plan_on_worker( plan, num_partitions_remaining: Arc::new(AtomicUsize::new(partition_count)), metrics_tx: Arc::new(std::sync::Mutex::new(Some(metrics_tx))), + task_data_metrics: Arc::new(TaskDataMetrics::new(0)), })) .expect("failed to write to task data"); } diff --git a/src/worker/worker.proto b/src/worker/worker.proto index 6372f31b..81217578 100644 --- a/src/worker/worker.proto +++ b/src/worker/worker.proto @@ -30,15 +30,19 @@ message WorkerToCoordinatorMsg { // This is sent after all partitions of a task have finished (or been dropped), // ensuring metrics are never lost due to early stream termination. // metrics[i] is the set of metrics for plan node i in pre-order traversal order. - PreOrderTaskMetrics task_metrics = 1; + TaskMetrics task_metrics = 1; } } -// Metrics for a single task's plan nodes in pre-order traversal order. -// The TaskKey is implicit — it is determined by the SetPlanRequest that -// opened this coordinator channel connection. -message PreOrderTaskMetrics { - repeated MetricsSet metrics = 1; +message TaskMetrics { + // Metrics for a single task's plan nodes in pre-order traversal order. + // The TaskKey is implicit — it is determined by the SetPlanRequest that + // opened this coordinator channel connection. + repeated MetricsSet pre_order_plan_metrics = 1; + // Metrics related to the execution of a task within a stage. This metrics, instead of being + // associated to a specific node, they are global to the task, like the time at which the plan + // was fed by the coordinator to the worker. + MetricsSet task_metrics = 2; } message GetWorkerInfoRequest {} @@ -70,6 +74,9 @@ message SetPlanRequest { // The worker URL to which this message will go. The receiving worker will use this information to identify // itself, and avoid further gRPC calls in case it needs to call itself for executing remote tasks. string target_worker_url = 5; + // Unix nanos when the query started as reported by the coordinator. Used for collecting temporal metrics + // relative to when the query was fired in the coordinator. + uint64 query_start_time_ns = 6; } message WorkUnit {