Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,8 @@ pretty_assertions = "1.4"
reqwest = "0.12"
zip = "6.0"
test-case = "3.3.1"

# @NetworkBoundaryStrategy: custom execution plan flag
[[example]]
name = "custom_execution_plan"
required-features = ["integration"]
55 changes: 55 additions & 0 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use delegate::delegate;
use http::HeaderMap;
use std::sync::Arc;

// @NetworkBoundaryStrategy: separte import for set_distributed_network_boundary_strategy
use crate::distributed_planner::set_distributed_network_boundary_strategy;

/// Extends DataFusion with distributed capabilities.
pub trait DistributedExt: Sized {
/// Adds the provided [ConfigExtension] to the distributed context. The [ConfigExtension] will
Expand Down Expand Up @@ -325,6 +328,25 @@ pub trait DistributedExt: Sized {
estimator: T,
);

// @NetworkBoundaryStrategy: trait method to register a custom network boundary strategy.
/// Adds a distributed network boundary strategy. [NetworkBoundaryStrategy]s are executed on each node
/// sequentially until one returns an annotation for a network boundary.

fn with_distributed_network_boundary_strategy<
T: crate::distributed_planner::NetworkBoundaryStrategy + 'static,
>(
self,
strategy: T,
) -> Self;

/// Same as [DistributedExt::with_distributed_network_boundary_strategy] but with an in-place mutation.
fn set_distributed_network_boundary_strategy<
T: crate::distributed_planner::NetworkBoundaryStrategy + 'static,
>(
&mut self,
strategy: T,
);

/// Sets the maximum number of files each task in a stage with a FileScanConfig node will
/// handle. Reducing this number will increment the amount of tasks. By default, this
/// is close to the number of cores in the machine.
Expand Down Expand Up @@ -564,6 +586,16 @@ impl DistributedExt for SessionConfig {
set_distributed_task_estimator(self, estimator)
}

// @NetworkBoundaryStrategy: SessionConfig impl of set_distributed_network_boundary_strategy.
fn set_distributed_network_boundary_strategy<
T: crate::distributed_planner::NetworkBoundaryStrategy + 'static,
>(
&mut self,
strategy: T,
) {
set_distributed_network_boundary_strategy(self, strategy)
}

fn set_distributed_files_per_task(
&mut self,
files_per_task: usize,
Expand Down Expand Up @@ -662,6 +694,11 @@ impl DistributedExt for SessionConfig {
#[expr($;self)]
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;

// @NetworkBoundaryStrategy: SessionStateBuilder delegate for with_distributed_network_boundary_strategy.
#[call(set_distributed_network_boundary_strategy)]
#[expr($;self)]
fn with_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(mut self, strategy: T) -> Self;

#[call(set_distributed_files_per_task)]
#[expr($?;Ok(self))]
fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
Expand Down Expand Up @@ -735,6 +772,12 @@ impl DistributedExt for SessionStateBuilder {
#[expr($;self)]
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;

// @NetworkBoundaryStrategy: SessionStateBuilder delegate for with_distributed_network_boundary_strategy.
fn set_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(&mut self, strategy: T);
#[call(set_distributed_network_boundary_strategy)]
#[expr($;self)]
fn with_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(mut self, strategy: T) -> Self;

fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
#[call(set_distributed_files_per_task)]
#[expr($?;Ok(self))]
Expand Down Expand Up @@ -816,6 +859,12 @@ impl DistributedExt for SessionState {
#[expr($;self)]
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;

// @NetworkBoundaryStrategy: SessionState delegate for with_distributed_network_boundary_strategy.
fn set_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(&mut self, strategy: T);
#[call(set_distributed_network_boundary_strategy)]
#[expr($;self)]
fn with_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(mut self, strategy: T) -> Self;

fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
#[call(set_distributed_files_per_task)]
#[expr($?;Ok(self))]
Expand Down Expand Up @@ -897,6 +946,12 @@ impl DistributedExt for SessionContext {
#[expr($;self)]
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;

// @NetworkBoundaryStrategy: SessionContext delegate for with_distributed_network_boundary_strategy.
fn set_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(&mut self, strategy: T);
#[call(set_distributed_network_boundary_strategy)]
#[expr($;self)]
fn with_distributed_network_boundary_strategy<T: crate::distributed_planner::NetworkBoundaryStrategy + 'static>(self, strategy: T) -> Self;

fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
#[call(set_distributed_files_per_task)]
#[expr($?;Ok(self))]
Expand Down
16 changes: 16 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ extensions_options! {
/// [WorkerResolver] implementation that tells the distributed planner information about
/// the available workers ready to execute distributed tasks.
pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
/// @NetworkBoundaryStrategy: Collection of [NetworkBoundaryStrategy]s that will be applied to plan nodes to
/// determine if a network boundary is needed and what type it should be.
pub(crate) __private_network_boundary_strategy: crate::distributed_planner::network_boundary_strategy::CombinedNetworkBoundaryStrategy, default = crate::distributed_planner::network_boundary_strategy::CombinedNetworkBoundaryStrategy::default()
}
}

Expand Down Expand Up @@ -166,3 +169,16 @@ impl Debug for CombinedTaskEstimator {
write!(f, "TaskEstimators")
}
}

// @NetworkBoundaryStrategy: ConfigField impl required so CombinedNetworkBoundaryStrategy can be stored in ConfigOptions extensions.
impl ConfigField
for crate::distributed_planner::network_boundary_strategy::CombinedNetworkBoundaryStrategy
{
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
// nothing to do.
}

fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
not_impl_err!("Not implemented")
}
}
13 changes: 13 additions & 0 deletions src/distributed_planner/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ fn distribute_plan(
stage_id.add_assign(1);
Ok(node)
}
// @NetworkBoundaryStrategy: Extension boundaries are handled by custom NetworkBoundaryStrategy implementations.
PlanOrNetworkBoundary::Extension(_) => {
crate::distributed_planner::network_boundary_strategy::apply_extension_boundary(
d_cfg,
&annotated_plan.plan_or_nb,
new_children,
query_id,
stage_id,
task_count,
max_child_task_count,
cfg,
)
}
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the match on the PlanOrNetworkBoundary::Extension to invoke the NetworkBoundaryStrategy::apply_boundary method.

}
}

Expand Down
12 changes: 12 additions & 0 deletions src/distributed_planner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,15 @@ pub use distributed_physical_optimizer_rule::DistributedPhysicalOptimizerRule;
pub use network_boundary::{NetworkBoundary, NetworkBoundaryExt};
pub(crate) use task_estimator::set_distributed_task_estimator;
pub use task_estimator::{TaskCountAnnotation, TaskEstimation, TaskEstimator};

// @NetworkBoundaryStrategy: new module and re-exports for pluggable network boundary strategies.
mod network_boundary_strategy;

pub(crate) use network_boundary_strategy::set_distributed_network_boundary_strategy;

#[rustfmt::skip]
pub use network_boundary_strategy::{
CombinedNetworkBoundaryStrategy, NetworkBoundaryAnnotation, NetworkBoundaryContext,
NetworkBoundaryStrategy,
};
pub use plan_annotator::PlanOrNetworkBoundary;
Loading