diff --git a/src/distributed_planner/exchange_assignment.rs b/src/distributed_planner/exchange_assignment.rs new file mode 100644 index 00000000..82425d54 --- /dev/null +++ b/src/distributed_planner/exchange_assignment.rs @@ -0,0 +1,387 @@ +//! Assignment rules for one network exchange boundary. +//! +//! Boundary planning decides where the boundary belongs and how many tasks run on each side. +//! Assignment then decides which producer task/partition each consumer-local output slot should +//! read. This module only owns the read-assignment decision. Static planning can build it in +//! `prepare_network_boundaries`, while adaptive planning can build the same assignment later after +//! it has selected the consumer task count. + +use datafusion::common::{Result, plan_err}; +use std::ops::Range; +use std::sync::Arc; + +/// Upstream read target for one consumer-local output partition. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum SlotReadPlan { + /// Read the same partition from each producer task. + Fanout { + producer_tasks: Range, + producer_partition: usize, + }, + /// Read one partition from one producer task. + Single { + producer_task: usize, + producer_partition: usize, + }, +} + +/// Every consumer reads its assigned logical partition range from every producer task. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ShuffleExchangeAssignment { + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_consumer: usize, +} + +impl ShuffleExchangeAssignment { + fn resolve_slot( + &self, + consumer_task_idx: usize, + local_partition_idx: usize, + ) -> Option { + if consumer_task_idx >= self.consumer_task_count + || local_partition_idx >= self.partitions_per_consumer + { + return None; + } + + Some(SlotReadPlan::Fanout { + producer_tasks: 0..self.producer_task_count, + producer_partition: consumer_task_idx * self.partitions_per_consumer + + local_partition_idx, + }) + } +} + +/// Consumers divide producer tasks into contiguous groups and pad uneven groups with empty slots. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct CoalesceExchangeAssignment { + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_producer_task: usize, + producer_task_ranges: Vec>, +} + +impl CoalesceExchangeAssignment { + /// Maximum number of producer tasks assigned to any one consumer. + fn max_input_task_count_per_consumer(&self) -> usize { + self.producer_task_ranges + .iter() + .map(|range| range.len()) + .max() + .unwrap_or(0) + } + + /// Output partitions needed by every consumer, including padded empty slots. + fn max_partition_count_per_consumer(&self) -> usize { + self.max_input_task_count_per_consumer() * self.partitions_per_producer_task + } + + fn resolve_slot( + &self, + consumer_task_idx: usize, + local_partition_idx: usize, + ) -> Option { + let producer_task_range = self.producer_task_ranges.get(consumer_task_idx)?; + let producer_task_offset = local_partition_idx / self.partitions_per_producer_task; + let producer_partition = local_partition_idx % self.partitions_per_producer_task; + let producer_task = producer_task_range.clone().nth(producer_task_offset)?; + + Some(SlotReadPlan::Single { + producer_task, + producer_partition, + }) + } +} + +/// Broadcast uses the same fanout shape as shuffle, but over broadcast-expanded partitions. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct BroadcastExchangeAssignment { + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_consumer: usize, +} + +impl BroadcastExchangeAssignment { + fn resolve_slot( + &self, + consumer_task_idx: usize, + local_partition_idx: usize, + ) -> Option { + if consumer_task_idx >= self.consumer_task_count + || local_partition_idx >= self.partitions_per_consumer + { + return None; + } + + Some(SlotReadPlan::Fanout { + producer_tasks: 0..self.producer_task_count, + producer_partition: consumer_task_idx * self.partitions_per_consumer + + local_partition_idx, + }) + } +} + +/// Concrete read assignment for one prepared network boundary. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum ExchangeAssignment { + Shuffle(ShuffleExchangeAssignment), + Coalesce(CoalesceExchangeAssignment), + Broadcast(BroadcastExchangeAssignment), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum ExchangeAssignmentKind { + Shuffle, + Coalesce, + Broadcast, +} + +impl ExchangeAssignment { + pub(crate) fn try_shuffle( + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_consumer: usize, + ) -> Result> { + if producer_task_count == 0 { + return plan_err!("shuffle exchange requires producer_task_count > 0"); + } + if consumer_task_count == 0 { + return plan_err!("shuffle exchange requires consumer_task_count > 0"); + } + if partitions_per_consumer == 0 { + return plan_err!("shuffle exchange requires partitions_per_consumer > 0"); + } + + Ok(Arc::new(Self::Shuffle(ShuffleExchangeAssignment { + producer_task_count, + consumer_task_count, + partitions_per_consumer, + }))) + } + + pub(crate) fn try_coalesce( + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_producer_task: usize, + ) -> Result> { + if consumer_task_count == 0 { + return plan_err!("coalesce exchange requires consumer_task_count > 0"); + } + if partitions_per_producer_task == 0 { + return plan_err!("coalesce exchange requires partitions_per_producer_task > 0"); + } + + Ok(Arc::new(Self::Coalesce(CoalesceExchangeAssignment { + producer_task_count, + consumer_task_count, + partitions_per_producer_task, + producer_task_ranges: split_ranges(producer_task_count, consumer_task_count), + }))) + } + + pub(crate) fn try_broadcast( + producer_task_count: usize, + consumer_task_count: usize, + partitions_per_consumer: usize, + ) -> Result> { + if producer_task_count == 0 { + return plan_err!("broadcast exchange requires producer_task_count > 0"); + } + if consumer_task_count == 0 { + return plan_err!("broadcast exchange requires consumer_task_count > 0"); + } + if partitions_per_consumer == 0 { + return plan_err!("broadcast exchange requires partitions_per_consumer > 0"); + } + + Ok(Arc::new(Self::Broadcast(BroadcastExchangeAssignment { + producer_task_count, + consumer_task_count, + partitions_per_consumer, + }))) + } + + pub(crate) fn producer_task_count(&self) -> usize { + match self { + Self::Shuffle(assignment) => assignment.producer_task_count, + Self::Coalesce(assignment) => assignment.producer_task_count, + Self::Broadcast(assignment) => assignment.producer_task_count, + } + } + + pub(crate) fn kind(&self) -> ExchangeAssignmentKind { + match self { + Self::Shuffle(_) => ExchangeAssignmentKind::Shuffle, + Self::Coalesce(_) => ExchangeAssignmentKind::Coalesce, + Self::Broadcast(_) => ExchangeAssignmentKind::Broadcast, + } + } + + pub(crate) fn consumer_task_count(&self) -> usize { + match self { + Self::Shuffle(assignment) => assignment.consumer_task_count, + Self::Coalesce(assignment) => assignment.consumer_task_count, + Self::Broadcast(assignment) => assignment.consumer_task_count, + } + } + + pub(crate) fn max_partition_count_per_consumer(&self) -> usize { + match self { + Self::Shuffle(assignment) => assignment.partitions_per_consumer, + Self::Coalesce(assignment) => assignment.max_partition_count_per_consumer(), + Self::Broadcast(assignment) => assignment.partitions_per_consumer, + } + } + + pub(crate) fn partitions_per_producer_task(&self) -> usize { + match self { + Self::Shuffle(assignment) => { + assignment.partitions_per_consumer * assignment.consumer_task_count + } + Self::Coalesce(assignment) => assignment.partitions_per_producer_task, + Self::Broadcast(assignment) => { + assignment.partitions_per_consumer * assignment.consumer_task_count + } + } + } + + /// Returns the per-kind partition count needed to reconstruct this assignment. + pub(crate) fn assignment_partition_count(&self) -> usize { + match self { + Self::Shuffle(assignment) => assignment.partitions_per_consumer, + Self::Coalesce(assignment) => assignment.partitions_per_producer_task, + Self::Broadcast(assignment) => assignment.partitions_per_consumer, + } + } + + /// Returns the advertised output partition IDs owned by one consumer task. + pub(crate) fn partition_range_for_consumer( + &self, + consumer_task_idx: usize, + ) -> Option> { + if consumer_task_idx >= self.consumer_task_count() { + return None; + } + + match self { + Self::Shuffle(assignment) => { + let start = consumer_task_idx * assignment.partitions_per_consumer; + Some(start..start + assignment.partitions_per_consumer) + } + Self::Coalesce(_) => Some(0..self.partitions_per_producer_task()), + Self::Broadcast(assignment) => { + let start = consumer_task_idx * assignment.partitions_per_consumer; + Some(start..start + assignment.partitions_per_consumer) + } + } + } + + /// Maps a consumer-local output partition to the upstream data it must read. + pub(crate) fn resolve_slot( + &self, + consumer_task_idx: usize, + local_partition_idx: usize, + ) -> Option { + match self { + Self::Shuffle(assignment) => { + assignment.resolve_slot(consumer_task_idx, local_partition_idx) + } + Self::Coalesce(assignment) => { + assignment.resolve_slot(consumer_task_idx, local_partition_idx) + } + Self::Broadcast(assignment) => { + assignment.resolve_slot(consumer_task_idx, local_partition_idx) + } + } + } +} + +/// Splits producer task ids into contiguous consumer groups as evenly as possible. +fn split_ranges(total: usize, groups: usize) -> Vec> { + if groups == 0 { + return Vec::new(); + } + + let base = total / groups; + let extra = total % groups; + let mut ranges = Vec::with_capacity(groups); + let mut start = 0; + + for idx in 0..groups { + let len = base + usize::from(idx < extra); + ranges.push(start..start + len); + start += len; + } + + ranges +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shuffle_assignment_preserves_scaled_fanout() { + let assignment = ExchangeAssignment::try_shuffle(3, 2, 4).unwrap(); + + assert_eq!(assignment.producer_task_count(), 3); + assert_eq!(assignment.consumer_task_count(), 2); + assert_eq!(assignment.max_partition_count_per_consumer(), 4); + assert_eq!(assignment.partitions_per_producer_task(), 8); + assert_eq!(assignment.partition_range_for_consumer(1), Some(4..8)); + assert_eq!( + assignment.resolve_slot(1, 2), + Some(SlotReadPlan::Fanout { + producer_tasks: 0..3, + producer_partition: 6, + }) + ); + assert_eq!(assignment.resolve_slot(1, 4), None); + } + + #[test] + fn coalesce_assignment_assigns_contiguous_task_groups() { + let assignment = ExchangeAssignment::try_coalesce(3, 2, 4).unwrap(); + + assert_eq!(assignment.producer_task_count(), 3); + assert_eq!(assignment.consumer_task_count(), 2); + assert_eq!(assignment.max_partition_count_per_consumer(), 8); + assert_eq!(assignment.partitions_per_producer_task(), 4); + assert_eq!(assignment.partition_range_for_consumer(0), Some(0..4)); + assert_eq!( + assignment.resolve_slot(0, 4), + Some(SlotReadPlan::Single { + producer_task: 1, + producer_partition: 0, + }) + ); + assert_eq!( + assignment.resolve_slot(1, 3), + Some(SlotReadPlan::Single { + producer_task: 2, + producer_partition: 3, + }) + ); + assert_eq!(assignment.resolve_slot(1, 4), None); + } + + #[test] + fn broadcast_assignment_preserves_scaled_fanout() { + let assignment = ExchangeAssignment::try_broadcast(2, 3, 4).unwrap(); + + assert_eq!(assignment.producer_task_count(), 2); + assert_eq!(assignment.consumer_task_count(), 3); + assert_eq!(assignment.max_partition_count_per_consumer(), 4); + assert_eq!(assignment.partitions_per_producer_task(), 12); + assert_eq!(assignment.partition_range_for_consumer(2), Some(8..12)); + assert_eq!( + assignment.resolve_slot(2, 1), + Some(SlotReadPlan::Fanout { + producer_tasks: 0..2, + producer_partition: 9, + }) + ); + assert_eq!(assignment.resolve_slot(3, 0), None); + } +} diff --git a/src/distributed_planner/mod.rs b/src/distributed_planner/mod.rs index 174b9e8a..45c0401e 100644 --- a/src/distributed_planner/mod.rs +++ b/src/distributed_planner/mod.rs @@ -1,5 +1,6 @@ mod distributed_config; mod distributed_query_planner; +mod exchange_assignment; mod inject_network_boundaries; mod insert_broadcast; mod network_boundary; @@ -9,6 +10,7 @@ mod session_state_builder_ext; mod task_estimator; pub use distributed_config::DistributedConfig; +pub(crate) use exchange_assignment::{ExchangeAssignment, ExchangeAssignmentKind, SlotReadPlan}; pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt}; pub use session_state_builder_ext::SessionStateBuilderExt; pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext}; diff --git a/src/distributed_planner/network_boundary.rs b/src/distributed_planner/network_boundary.rs index fec385fc..7cdef3c3 100644 --- a/src/distributed_planner/network_boundary.rs +++ b/src/distributed_planner/network_boundary.rs @@ -1,5 +1,7 @@ -use crate::{NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage}; -use datafusion::common::Result; +use crate::{ + ExchangeAssignment, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, Stage, +}; +use datafusion::common::{Result, internal_err}; use datafusion::physical_plan::ExecutionPlan; use std::sync::Arc; @@ -76,3 +78,50 @@ pub(crate) fn network_boundary_scale_input( Ok(input) } + +/// Builds the routing assignment after input scaling has finalized producer partition counts. +pub(crate) fn network_boundary_exchange_assignment( + nb: &dyn NetworkBoundary, + producer_task_count: usize, + consumer_task_count: usize, + consumer_partition_count: usize, + producer_partition_count: usize, +) -> Result> { + if nb.as_any().is::() { + ExchangeAssignment::try_shuffle( + producer_task_count, + consumer_task_count, + consumer_partition_count, + ) + } else if nb.as_any().is::() { + ExchangeAssignment::try_coalesce( + producer_task_count, + consumer_task_count, + producer_partition_count, + ) + } else if nb.as_any().is::() { + ExchangeAssignment::try_broadcast( + producer_task_count, + consumer_task_count, + consumer_partition_count, + ) + } else { + internal_err!("unsupported network boundary {}", nb.name()) + } +} + +/// Stores the finalized routing assignment on the concrete network boundary node. +pub(crate) fn network_boundary_with_exchange_assignment( + nb: Arc, + assignment: Arc, +) -> Result> { + if let Some(node) = nb.as_any().downcast_ref::() { + Ok(Arc::new(node.with_exchange_assignment(assignment))) + } else if let Some(node) = nb.as_any().downcast_ref::() { + Ok(Arc::new(node.with_exchange_assignment(assignment))) + } else if let Some(node) = nb.as_any().downcast_ref::() { + Ok(Arc::new(node.with_exchange_assignment(assignment))) + } else { + internal_err!("unsupported network boundary {}", nb.name()) + } +} diff --git a/src/distributed_planner/prepare_network_boundaries.rs b/src/distributed_planner/prepare_network_boundaries.rs index f00360ff..89b0bdd3 100644 --- a/src/distributed_planner/prepare_network_boundaries.rs +++ b/src/distributed_planner/prepare_network_boundaries.rs @@ -1,4 +1,7 @@ -use crate::distributed_planner::network_boundary::network_boundary_scale_input; +use crate::distributed_planner::network_boundary::{ + network_boundary_exchange_assignment, network_boundary_scale_input, + network_boundary_with_exchange_assignment, +}; use crate::execution_plans::ChildrenIsolatorUnionExec; use crate::stage::LocalStage; use crate::{NetworkBoundaryExt, Stage}; @@ -67,6 +70,14 @@ fn prepare( // 2) Scale up the head node of the input stage in order to account for the amount of partition // and consumer count above it. let plan = network_boundary_scale_input(new_input, consumer_partitions, consumer_task_count)?; + let producer_partitions = plan.properties().partitioning.partition_count(); + let assignment = network_boundary_exchange_assignment( + nb, + producer_task_count, + consumer_task_count, + consumer_partitions, + producer_partitions, + )?; // 3) Make sure the input stage can be uniquely identified with a stage index and query id. // If there were already some `query_id` and `num` that's fine. @@ -77,5 +88,6 @@ fn prepare( tasks: local_stage.tasks, })); *stage_id += 1; - nb + let nb = nb?; + network_boundary_with_exchange_assignment(nb, assignment) } diff --git a/src/execution_plans/benchmarks/shuffle_bench.rs b/src/execution_plans/benchmarks/shuffle_bench.rs index a91fa022..33d2fb38 100644 --- a/src/execution_plans/benchmarks/shuffle_bench.rs +++ b/src/execution_plans/benchmarks/shuffle_bench.rs @@ -5,7 +5,9 @@ use crate::common::task_ctx_with_extension; use crate::stage::RemoteStage; use crate::worker::WorkerConnectionPool; use crate::worker::test_utils::worker_handles::MemoryWorkerHandle; -use crate::{DistributedExt, DistributedTaskContext, NetworkShuffleExec, Stage}; +use crate::{ + DistributedExt, DistributedTaskContext, ExchangeAssignment, NetworkShuffleExec, Stage, +}; use arrow::datatypes::Schema; use arrow_ipc::CompressionType; use datafusion::common::Result; @@ -228,6 +230,11 @@ impl ShuffleFixture { Boundedness::Bounded, )), input_stage: input_stage.clone(), + assignment: Some(ExchangeAssignment::try_shuffle( + self.bench.producer_tasks, + self.bench.consumer_tasks, + self.bench.partitions, + )?), worker_connections: WorkerConnectionPool::new(self.bench.producer_tasks), }; let task_ctx = Arc::new(task_ctx_with_extension( diff --git a/src/execution_plans/benchmarks/transport_bench.rs b/src/execution_plans/benchmarks/transport_bench.rs index 76259e9d..a105e9f4 100644 --- a/src/execution_plans/benchmarks/transport_bench.rs +++ b/src/execution_plans/benchmarks/transport_bench.rs @@ -5,7 +5,8 @@ use crate::common::task_ctx_with_extension; use crate::stage::RemoteStage; use crate::worker::test_utils::worker_handles::{MemoryWorkerHandle, TcpWorkerHandle}; use crate::{ - DefaultChannelResolver, DistributedExt, DistributedTaskContext, NetworkShuffleExec, Stage, + DefaultChannelResolver, DistributedExt, DistributedTaskContext, ExchangeAssignment, + NetworkShuffleExec, Stage, }; use arrow::datatypes::Schema; use arrow_ipc::CompressionType; @@ -170,7 +171,7 @@ impl TransportBench { } fn producer_local_partition_count(&self) -> usize { - // Transport intentionally keeps the current producer-local partition layout. + // Transport intentionally keeps the current producer-local partition assignment. self.partitions * self.consumer_tasks.max(1) } @@ -278,6 +279,11 @@ impl TransportFixture { Boundedness::Bounded, )), input_stage: input_stage.clone(), + assignment: Some(ExchangeAssignment::try_shuffle( + self.bench.producer_tasks, + self.bench.consumer_tasks, + self.bench.partitions, + )?), worker_connections: crate::worker::WorkerConnectionPool::new( self.bench.producer_tasks, ), diff --git a/src/execution_plans/network_broadcast.rs b/src/execution_plans/network_broadcast.rs index b107776c..01935368 100644 --- a/src/execution_plans/network_broadcast.rs +++ b/src/execution_plans/network_broadcast.rs @@ -2,9 +2,9 @@ use crate::common::require_one_child; use crate::distributed_planner::NetworkBoundary; use crate::stage::{LocalStage, Stage}; use crate::worker::WorkerConnectionPool; -use crate::{BroadcastExec, DistributedTaskContext}; +use crate::{BroadcastExec, DistributedTaskContext, ExchangeAssignment, SlotReadPlan}; use datafusion::common::tree_node::Transformed; -use datafusion::common::{Result, not_impl_err, plan_err}; +use datafusion::common::{Result, exec_err, internal_datafusion_err, not_impl_err, plan_err}; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr_common::metrics::MetricsSet; @@ -120,6 +120,7 @@ use uuid::Uuid; pub struct NetworkBroadcastExec { pub(crate) properties: Arc, pub(crate) input_stage: Stage, + pub(crate) assignment: Option>, pub(crate) worker_connections: WorkerConnectionPool, } @@ -150,9 +151,22 @@ impl NetworkBroadcastExec { properties, worker_connections: WorkerConnectionPool::new(0), input_stage: Stage::Local(input_stage), + assignment: None, } } + pub(crate) fn with_exchange_assignment(&self, assignment: Arc) -> Self { + let mut this = self.clone(); + this.assignment = Some(assignment); + this + } + + fn exchange_assignment(&self) -> Result<&ExchangeAssignment> { + self.assignment.as_deref().ok_or_else(|| { + internal_datafusion_err!("NetworkBroadcastExec is missing exchange assignment") + }) + } + /// Creates a new [NetworkBroadcastExec] fed by the provided [BroadcastExec]. The input plan /// will be executed in a remote worker in `producer_tasks` number of tasks. pub fn try_new(input: Arc, producer_tasks: usize) -> Result { @@ -248,18 +262,37 @@ impl ExecutionPlan for NetworkBroadcastExec { }; let task_context = DistributedTaskContext::from_ctx(&context); - let off = self.properties.partitioning.partition_count() * task_context.task_index; - let mut streams = Vec::with_capacity(self.input_stage.task_count()); + let assignment = self.exchange_assignment()?; + let Some(SlotReadPlan::Fanout { + producer_tasks, + producer_partition, + }) = assignment.resolve_slot(task_context.task_index, partition) + else { + return exec_err!( + "NetworkBroadcastExec cannot resolve task {} partition {}", + task_context.task_index, + partition + ); + }; + let Some(partition_range) = + assignment.partition_range_for_consumer(task_context.task_index) + else { + return exec_err!( + "NetworkBroadcastExec cannot resolve partition range for task {}", + task_context.task_index + ); + }; - for input_task_index in 0..self.input_stage.task_count() { + let mut streams = Vec::with_capacity(producer_tasks.len()); + for input_task_index in producer_tasks { let worker_connection = self.worker_connections.get_or_init_worker_connection( remote_stage, - off..(off + self.properties.partitioning.partition_count()), + partition_range.clone(), input_task_index, &context, )?; - let stream = worker_connection.stream_partition(off + partition, |_meta| {})?; + let stream = worker_connection.stream_partition(producer_partition, |_meta| {})?; streams.push(stream); } diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index 7e22aa87..192817da 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,11 +1,11 @@ -use crate::DistributedTaskContext; use crate::common::require_one_child; use crate::distributed_planner::NetworkBoundary; use crate::execution_plans::common::scale_partitioning_props; use crate::stage::{LocalStage, Stage}; use crate::worker::WorkerConnectionPool; +use crate::{DistributedTaskContext, ExchangeAssignment, SlotReadPlan}; use datafusion::common::tree_node::Transformed; -use datafusion::common::{exec_err, not_impl_err, plan_err}; +use datafusion::common::{exec_err, internal_datafusion_err, not_impl_err, plan_err}; use datafusion::error::Result; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr_common::metrics::MetricsSet; @@ -77,6 +77,7 @@ pub struct NetworkCoalesceExec { /// the properties we advertise for this execution plan pub(crate) properties: Arc, pub(crate) input_stage: Stage, + pub(crate) assignment: Option>, pub(crate) worker_connections: WorkerConnectionPool, } @@ -103,9 +104,25 @@ impl NetworkCoalesceExec { properties: props, worker_connections: WorkerConnectionPool::new(0), input_stage: Stage::Local(input_stage), + assignment: None, } } + pub(crate) fn with_exchange_assignment(&self, assignment: Arc) -> Self { + let mut this = self.clone(); + this.properties = scale_partitioning_props(&this.properties, |_| { + assignment.max_partition_count_per_consumer() + }); + this.assignment = Some(assignment); + this + } + + fn exchange_assignment(&self) -> Result<&ExchangeAssignment> { + self.assignment.as_deref().ok_or_else(|| { + internal_datafusion_err!("NetworkCoalesceExec is missing exchange assignment") + }) + } + /// Creates a new [NetworkCoalesceExec] fed by the provided `input` plan. /// /// The `input` plan will be remotely executed in `producer_tasks` tasks, while the @@ -224,59 +241,49 @@ impl ExecutionPlan for NetworkCoalesceExec { ); } - let partitions_per_task = self - .properties() - .partitioning - .partition_count() - .checked_div( - self.input_stage - .task_count() - .div_ceil(task_context.task_count) - .max(1), - ) - .unwrap_or(0); - if partitions_per_task == 0 { - return exec_err!("NetworkCoalesceExec has 0 partitions per input task"); - } - - let input_task_count = self.input_stage.task_count(); - let group = task_group( - input_task_count, - task_context.task_index, - task_context.task_count, - ); - - let input_task_offset = partition / partitions_per_task; - let target_partition = partition % partitions_per_task; - - // Some consumer tasks are assigned fewer upstream tasks when - // `input_task_count % task_count != 0` (uneven grouping). - // We still size partitions based on the maximum group size, so partitions that - // would map to a missing upstream task slot are treated as padding and return - // an empty stream (no network call). - if input_task_offset >= group.len { - return Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))); + let assignment = self.exchange_assignment()?; + if task_context.task_count != assignment.consumer_task_count() { + return exec_err!( + "NetworkCoalesceExec task count {} does not match exchange assignment consumer task count {}", + task_context.task_count, + assignment.consumer_task_count() + ); } - - // This should never happen. - if input_task_offset >= group.max_len { - return internal_err!( - "NetworkCoalesceExec input_task_offset={} >= group.max_len={}", - input_task_offset, - group.max_len + if partition >= assignment.max_partition_count_per_consumer() { + return exec_err!( + "NetworkCoalesceExec partition {} >= assignment partition count {}", + partition, + assignment.max_partition_count_per_consumer() ); } - let target_task = group.start_task + input_task_offset; + let Some(read_plan) = assignment.resolve_slot(task_context.task_index, partition) else { + return Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))); + }; + let SlotReadPlan::Single { + producer_task, + producer_partition, + } = read_plan + else { + return internal_err!("NetworkCoalesceExec expected single-task exchange slot"); + }; + let Some(partition_range) = + assignment.partition_range_for_consumer(task_context.task_index) + else { + return exec_err!( + "NetworkCoalesceExec cannot resolve partition range for task {}", + task_context.task_index + ); + }; let worker_connection = self.worker_connections.get_or_init_worker_connection( remote_stage, - 0..partitions_per_task, - target_task, + partition_range, + producer_task, &context, )?; - let stream = worker_connection.stream_partition(target_partition, |_meta| {})?; + let stream = worker_connection.stream_partition(producer_partition, |_meta| {})?; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -289,47 +296,6 @@ impl ExecutionPlan for NetworkCoalesceExec { } } -#[derive(Debug, Clone, Copy)] -struct TaskGroup { - /// The first input task index in this group. - start_task: usize, - /// The number of input tasks in this group. - len: usize, - /// The maximum possible group size across all groups. - /// - /// When groups are uneven (input_tasks % task_count != 0), some groups are shorter. We still - /// size the output partitioning based on this max and return empty streams for the extra - /// partitions in smaller groups. - max_len: usize, -} - -/// Returns the contiguous group of input tasks assigned to DistributedTaskContext::task_index. -fn task_group(input_task_count: usize, task_index: usize, task_count: usize) -> TaskGroup { - if task_count == 0 { - return TaskGroup { - start_task: 0, - len: 0, - max_len: 0, - }; - } - - // Split `input_task_count` into `task_count` contiguous groups. - // - base_tasks_per_group: floor(input_task_count / task_count) - // - groups_with_extra_task: first N groups that get one extra task (remainder) - let base_tasks_per_group = input_task_count / task_count; - let groups_with_extra_task = input_task_count % task_count; - - let len = base_tasks_per_group + usize::from(task_index < groups_with_extra_task); - let start_task = (task_index * base_tasks_per_group) + task_index.min(groups_with_extra_task); - let max_len = base_tasks_per_group + usize::from(groups_with_extra_task > 0); - - TaskGroup { - start_task, - len, - max_len, - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/execution_plans/network_shuffle.rs b/src/execution_plans/network_shuffle.rs index 834c01a6..2a374a19 100644 --- a/src/execution_plans/network_shuffle.rs +++ b/src/execution_plans/network_shuffle.rs @@ -2,9 +2,9 @@ use crate::common::require_one_child; use crate::execution_plans::common::scale_partitioning; use crate::stage::{LocalStage, Stage}; use crate::worker::WorkerConnectionPool; -use crate::{DistributedTaskContext, NetworkBoundary}; +use crate::{DistributedTaskContext, ExchangeAssignment, NetworkBoundary, SlotReadPlan}; use datafusion::common::tree_node::{Transformed, TreeNodeRecursion}; -use datafusion::common::{Result, not_impl_err, plan_err}; +use datafusion::common::{Result, exec_err, internal_datafusion_err, not_impl_err, plan_err}; use datafusion::error::DataFusionError; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_expr::Partitioning; @@ -102,6 +102,7 @@ pub struct NetworkShuffleExec { /// the properties we advertise for this execution plan pub(crate) properties: Arc, pub(crate) input_stage: Stage, + pub(crate) assignment: Option>, pub(crate) worker_connections: WorkerConnectionPool, } @@ -131,9 +132,22 @@ impl NetworkShuffleExec { properties: input_stage.plan.properties().clone(), worker_connections: WorkerConnectionPool::new(0), input_stage: Stage::Local(input_stage), + assignment: None, } } + pub(crate) fn with_exchange_assignment(&self, assignment: Arc) -> Self { + let mut this = self.clone(); + this.assignment = Some(assignment); + this + } + + fn exchange_assignment(&self) -> Result<&ExchangeAssignment> { + self.assignment.as_deref().ok_or_else(|| { + internal_datafusion_err!("NetworkShuffleExec is missing exchange assignment") + }) + } + /// Creates a new [NetworkShuffleExec] fed by the provided [RepartitionExec]. The input plan /// will be executed in a remote worker in `producer_tasks` number of tasks. pub fn try_new(input: Arc, producer_tasks: usize) -> Result { @@ -226,18 +240,37 @@ impl ExecutionPlan for NetworkShuffleExec { }; let task_context = DistributedTaskContext::from_ctx(&context); - let off = self.properties.partitioning.partition_count() * task_context.task_index; + let assignment = self.exchange_assignment()?; + let Some(SlotReadPlan::Fanout { + producer_tasks, + producer_partition, + }) = assignment.resolve_slot(task_context.task_index, partition) + else { + return exec_err!( + "NetworkShuffleExec cannot resolve task {} partition {}", + task_context.task_index, + partition + ); + }; + let Some(partition_range) = + assignment.partition_range_for_consumer(task_context.task_index) + else { + return exec_err!( + "NetworkShuffleExec cannot resolve partition range for task {}", + task_context.task_index + ); + }; - let mut streams = Vec::with_capacity(remote_stage.workers.len()); - for input_task_index in 0..remote_stage.workers.len() { + let mut streams = Vec::with_capacity(producer_tasks.len()); + for input_task_index in producer_tasks { let worker_connection = self.worker_connections.get_or_init_worker_connection( remote_stage, - off..(off + self.properties.partitioning.partition_count()), + partition_range.clone(), input_task_index, &context, )?; - let stream = worker_connection.stream_partition(off + partition, |_meta| {})?; + let stream = worker_connection.stream_partition(producer_partition, |_meta| {})?; streams.push(stream); } diff --git a/src/lib.rs b/src/lib.rs index e93bb98d..afcb0da7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ pub use distributed_planner::{ DistributedConfig, NetworkBoundary, NetworkBoundaryExt, SessionStateBuilderExt, TaskCountAnnotation, TaskEstimation, TaskEstimator, TaskRoutingContext, }; +pub(crate) use distributed_planner::{ExchangeAssignment, ExchangeAssignmentKind, SlotReadPlan}; pub use execution_plans::{ BroadcastExec, NetworkBroadcastExec, NetworkCoalesceExec, NetworkShuffleExec, PartitionIsolatorExec, diff --git a/src/protobuf/distributed_codec.rs b/src/protobuf/distributed_codec.rs index b529c509..3af0d60b 100644 --- a/src/protobuf/distributed_codec.rs +++ b/src/protobuf/distributed_codec.rs @@ -6,7 +6,9 @@ use crate::execution_plans::{ use crate::stage::{LocalStage, RemoteStage, Stage}; use crate::worker::WorkerConnectionPool; use crate::{DistributedTaskContext, NetworkBoundary}; -use crate::{NetworkShuffleExec, PartitionIsolatorExec}; +use crate::{ + ExchangeAssignment, ExchangeAssignmentKind, NetworkShuffleExec, PartitionIsolatorExec, +}; use bytes::Bytes; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::datatypes::SchemaRef; @@ -97,6 +99,7 @@ impl PhysicalExtensionCodec for DistributedCodec { schema, partitioning, input_stage, + assignment, }) => { let schema: Schema = schema .as_ref() @@ -116,12 +119,14 @@ impl PhysicalExtensionCodec for DistributedCodec { partitioning, Arc::new(schema), parse_stage_proto(input_stage, inputs)?, + parse_exchange_assignment_proto(assignment)?, ))) } DistributedExecNode::NetworkCoalesceTasks(NetworkCoalesceExecProto { schema, partitioning, input_stage, + assignment, }) => { let schema: Schema = schema .as_ref() @@ -141,6 +146,7 @@ impl PhysicalExtensionCodec for DistributedCodec { partitioning, Arc::new(schema), parse_stage_proto(input_stage, inputs)?, + parse_exchange_assignment_proto(assignment)?, ))) } DistributedExecNode::PartitionIsolator(PartitionIsolatorExecProto { n_tasks }) => { @@ -162,6 +168,7 @@ impl PhysicalExtensionCodec for DistributedCodec { schema, partitioning, input_stage, + assignment, }) => { let schema: Schema = schema .as_ref() @@ -181,6 +188,7 @@ impl PhysicalExtensionCodec for DistributedCodec { partitioning, Arc::new(schema), parse_stage_proto(input_stage, inputs)?, + parse_exchange_assignment_proto(assignment)?, ))) } DistributedExecNode::Broadcast(BroadcastExecProto { @@ -274,6 +282,7 @@ impl PhysicalExtensionCodec for DistributedCodec { &DefaultPhysicalProtoConverter, )?), input_stage: Some(encode_stage_proto(node.input_stage())?), + assignment: encode_exchange_assignment_proto(node.assignment.as_deref())?, }; let wrapper = DistributedExecProto { @@ -290,6 +299,7 @@ impl PhysicalExtensionCodec for DistributedCodec { &DefaultPhysicalProtoConverter, )?), input_stage: Some(encode_stage_proto(node.input_stage())?), + assignment: encode_exchange_assignment_proto(node.assignment.as_deref())?, }; let wrapper = DistributedExecProto { @@ -316,6 +326,7 @@ impl PhysicalExtensionCodec for DistributedCodec { &DefaultPhysicalProtoConverter, )?), input_stage: Some(encode_stage_proto(node.input_stage())?), + assignment: encode_exchange_assignment_proto(node.assignment.as_deref())?, }; let wrapper = DistributedExecProto { @@ -413,6 +424,27 @@ pub struct PartitionIsolatorExecProto { pub n_tasks: u64, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ExchangeAssignmentProto { + #[prost(enumeration = "ExchangeAssignmentKindProto", tag = "1")] + pub kind: i32, + #[prost(uint64, tag = "2")] + pub producer_task_count: u64, + #[prost(uint64, tag = "3")] + pub consumer_task_count: u64, + #[prost(uint64, tag = "4")] + pub partition_count: u64, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, ::prost::Enumeration)] +#[repr(i32)] +pub enum ExchangeAssignmentKindProto { + Unspecified = 0, + Shuffle = 1, + Coalesce = 2, + Broadcast = 3, +} + /// Protobuf representation of the [NetworkShuffleExec] physical node. It serves as /// an intermediate format for serializing/deserializing [NetworkShuffleExec] nodes /// to send them over the wire. @@ -424,6 +456,8 @@ pub struct NetworkShuffleExecProto { partitioning: Option, #[prost(message, optional, tag = "3")] input_stage: Option, + #[prost(message, optional, tag = "4")] + assignment: Option, } #[derive(Clone, PartialEq, ::prost::Message)] @@ -454,6 +488,7 @@ fn new_network_hash_shuffle_exec( partitioning: Partitioning, schema: SchemaRef, input_stage: Stage, + assignment: Option>, ) -> NetworkShuffleExec { NetworkShuffleExec { properties: Arc::new(PlanProperties::new( @@ -464,6 +499,7 @@ fn new_network_hash_shuffle_exec( )), worker_connections: WorkerConnectionPool::new(input_stage.task_count()), input_stage, + assignment, } } @@ -478,12 +514,15 @@ pub struct NetworkCoalesceExecProto { partitioning: Option, #[prost(message, optional, tag = "3")] input_stage: Option, + #[prost(message, optional, tag = "4")] + assignment: Option, } fn new_network_coalesce_tasks_exec( partitioning: Partitioning, schema: SchemaRef, input_stage: Stage, + assignment: Option>, ) -> NetworkCoalesceExec { NetworkCoalesceExec { properties: Arc::new(PlanProperties::new( @@ -494,6 +533,7 @@ fn new_network_coalesce_tasks_exec( )), worker_connections: WorkerConnectionPool::new(input_stage.task_count()), input_stage, + assignment, } } @@ -505,6 +545,8 @@ pub struct NetworkBroadcastExecProto { partitioning: Option, #[prost(message, optional, tag = "3")] input_stage: Option, + #[prost(message, optional, tag = "4")] + assignment: Option, } #[derive(Clone, PartialEq, ::prost::Message)] @@ -517,6 +559,7 @@ fn new_network_broadcast_exec( partitioning: Partitioning, schema: SchemaRef, input_stage: Stage, + assignment: Option>, ) -> NetworkBroadcastExec { NetworkBroadcastExec { properties: Arc::new(PlanProperties::new( @@ -527,9 +570,68 @@ fn new_network_broadcast_exec( )), worker_connections: WorkerConnectionPool::new(input_stage.task_count()), input_stage, + assignment, } } +fn parse_exchange_assignment_proto( + proto: Option, +) -> Result>> { + let Some(proto) = proto else { + return Ok(None); + }; + + let producer_task_count = proto.producer_task_count as usize; + let consumer_task_count = proto.consumer_task_count as usize; + let partition_count = proto.partition_count as usize; + let kind = ExchangeAssignmentKindProto::try_from(proto.kind) + .map_err(|_| proto_error(format!("invalid exchange assignment kind {}", proto.kind)))?; + + let assignment = match kind { + ExchangeAssignmentKindProto::Unspecified => { + return Err(proto_error("exchange assignment kind is unspecified")); + } + ExchangeAssignmentKindProto::Shuffle => ExchangeAssignment::try_shuffle( + producer_task_count, + consumer_task_count, + partition_count, + )?, + ExchangeAssignmentKindProto::Coalesce => ExchangeAssignment::try_coalesce( + producer_task_count, + consumer_task_count, + partition_count, + )?, + ExchangeAssignmentKindProto::Broadcast => ExchangeAssignment::try_broadcast( + producer_task_count, + consumer_task_count, + partition_count, + )?, + }; + + Ok(Some(assignment)) +} + +fn encode_exchange_assignment_proto( + assignment: Option<&ExchangeAssignment>, +) -> Result> { + let Some(assignment) = assignment else { + return Ok(None); + }; + + let kind = match assignment.kind() { + ExchangeAssignmentKind::Shuffle => ExchangeAssignmentKindProto::Shuffle, + ExchangeAssignmentKind::Coalesce => ExchangeAssignmentKindProto::Coalesce, + ExchangeAssignmentKind::Broadcast => ExchangeAssignmentKindProto::Broadcast, + }; + + Ok(Some(ExchangeAssignmentProto { + kind: kind as i32, + producer_task_count: assignment.producer_task_count() as u64, + consumer_task_count: assignment.consumer_task_count() as u64, + partition_count: assignment.assignment_partition_count() as u64, + })) +} + #[cfg(test)] mod tests { use super::*; @@ -576,6 +678,25 @@ mod tests { SessionContext::new().task_ctx() } + #[test] + fn test_roundtrip_exchange_assignments() -> datafusion::common::Result<()> { + let assignments = vec![ + ExchangeAssignment::try_shuffle(3, 2, 4)?, + ExchangeAssignment::try_coalesce(3, 2, 4)?, + ExchangeAssignment::try_broadcast(2, 3, 4)?, + ]; + + for assignment in assignments { + let proto = encode_exchange_assignment_proto(Some(assignment.as_ref()))? + .expect("assignment proto should be present"); + let decoded = + parse_exchange_assignment_proto(Some(proto))?.expect("assignment should decode"); + assert_eq!(assignment.as_ref(), decoded.as_ref()); + } + + Ok(()) + } + #[test] fn test_roundtrip_single_flight() -> datafusion::common::Result<()> { let codec = DistributedCodec; @@ -583,8 +704,12 @@ mod tests { let schema = schema_i32("a"); let part = Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 4); - let plan: Arc = - Arc::new(new_network_hash_shuffle_exec(part, schema, dummy_stage())); + let plan: Arc = Arc::new(new_network_hash_shuffle_exec( + part, + schema, + dummy_stage(), + None, + )); let mut buf = Vec::new(); codec.try_encode(plan.clone(), &mut buf)?; @@ -605,6 +730,7 @@ mod tests { Partitioning::UnknownPartitioning(1), schema, dummy_stage(), + None, )); let plan: Arc = Arc::new(PartitionIsolatorExec::new(flight.clone(), 1)); @@ -628,11 +754,13 @@ mod tests { Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )); let right = Arc::new(new_network_hash_shuffle_exec( Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )); let union = UnionExec::try_new(vec![left.clone(), right.clone()])?; @@ -657,6 +785,7 @@ mod tests { Partitioning::UnknownPartitioning(1), schema.clone(), dummy_stage(), + None, )); let sort_expr = PhysicalSortExpr { @@ -689,6 +818,7 @@ mod tests { Partitioning::RoundRobinBatch(3), schema, dummy_stage(), + None, )); let mut buf = Vec::new(); @@ -711,6 +841,7 @@ mod tests { part, schema, dummy_stage_with_plan(), + None, )); let mut buf = Vec::new(); @@ -732,6 +863,7 @@ mod tests { Partitioning::RoundRobinBatch(3), schema, dummy_stage_with_plan(), + None, )); let mut buf = Vec::new(); @@ -753,6 +885,7 @@ mod tests { Partitioning::UnknownPartitioning(1), schema, dummy_stage(), + None, )); let plan: Arc = Arc::new(PartitionIsolatorExec::new(flight.clone(), 1)); @@ -776,11 +909,13 @@ mod tests { Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )); let right = Arc::new(new_network_coalesce_tasks_exec( Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )); let union = UnionExec::try_new(vec![left.clone(), right.clone()])?; @@ -805,11 +940,13 @@ mod tests { Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )) as Arc; let right = Arc::new(new_network_hash_shuffle_exec( Partitioning::RoundRobinBatch(2), schema.clone(), dummy_stage(), + None, )) as Arc; let plan: Arc =