Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,385 changes: 703 additions & 682 deletions Cargo.lock

Large diffs are not rendered by default.

44 changes: 42 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
members = ["benchmarks", "cli", "console"]

[workspace.dependencies]
datafusion = { version = "53.0.0", default-features = false }
datafusion-proto = { version = "53.0.0" }
# DataFusion 54 is not yet on crates.io — pull workspace deps directly from git.
datafusion = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569", default-features = false }
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }

[package]
name = "datafusion-distributed"
Expand Down Expand Up @@ -83,3 +84,42 @@ test-case = "3.3.1"
[workspace.lints.clippy]
disallowed_types = "deny"
disallowed-methods = "deny"

# DataFusion 54 is not yet on crates.io. Pin every DF sub-crate that may appear
# in the transitive graph to the branch-54 tip so resolution succeeds.
[patch.crates-io]
datafusion = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-catalog = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-catalog-listing = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-common-runtime = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource-arrow = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource-avro = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource-csv = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource-json = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-datasource-parquet = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-doc = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-expr-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-aggregate-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-nested = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-table = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-window = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-functions-window-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-macros = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-optimizer = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-physical-optimizer = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-proto = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-proto-common = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-pruning = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-session = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-sql = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
datafusion-cli = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569" }
4 changes: 0 additions & 4 deletions benchmarks/benches/broadcast_cache_scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl ExecutionPlan for SyntheticExec {
"SyntheticExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
Expand Down
5 changes: 2 additions & 3 deletions benchmarks/src/datasets/tpcds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arrow::datatypes::{DataType, Field};
use datafusion::common::internal_err;
use datafusion::error::DataFusionError;
use datafusion::physical_expr::Partitioning;
use datafusion::physical_expr::expressions::{CastColumnExpr, Column};
use datafusion::physical_expr::expressions::{CastExpr, Column};
use datafusion::physical_expr::projection::ProjectionExpr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -148,9 +148,8 @@ fn project_cols_as_dict(
.enumerate()
.map(|(i, f)| ProjectionExpr {
expr: if cols.contains(&f.name().as_str()) {
Arc::new(CastColumnExpr::new(
Arc::new(CastExpr::new_with_target_field(
Arc::new(Column::new(f.name(), i)),
f.clone(),
Arc::new(Field::new(
f.name(),
DataType::Dictionary(
Expand Down
2 changes: 1 addition & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
datafusion = { workspace = true }
datafusion-distributed = { path = "..", features = ["avro", "integration"] }
datafusion-cli = { version = "53", default-features = false }
datafusion-cli = { git = "https://github.com/apache/datafusion", rev = "a4a080482406cc8b702ae6f5f03c75ca36c20569", default-features = false }
tokio = { version = "1.48", features = ["full"] }
clap = { version = "4", features = ["derive"] }
env_logger = "0.11"
Expand Down
15 changes: 3 additions & 12 deletions examples/custom_execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use datafusion_proto::protobuf;
use datafusion_proto::protobuf::proto_error;
use futures::{TryStreamExt, stream};
use prost::Message;
use std::any::Any;
use std::fmt::{self, Formatter};
use std::ops::Range;
use std::sync::Arc;
Expand Down Expand Up @@ -97,10 +96,6 @@ struct NumbersTableProvider {

#[async_trait]
impl TableProvider for NumbersTableProvider {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
numbers_schema()
}
Expand Down Expand Up @@ -167,10 +162,6 @@ impl ExecutionPlan for NumbersExec {
"NumbersExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
&self.plan_properties
}
Expand Down Expand Up @@ -270,7 +261,7 @@ impl PhysicalExtensionCodec for NumbersExecCodec {
}

fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> Result<()> {
let Some(exec) = node.as_any().downcast_ref::<NumbersExec>() else {
let Some(exec) = node.downcast_ref::<NumbersExec>() else {
return internal_err!("Expected plan to be NumbersExec, but was {}", node.name());
};

Expand Down Expand Up @@ -315,7 +306,7 @@ impl TaskEstimator for NumbersTaskEstimator {
plan: &Arc<dyn ExecutionPlan>,
cfg: &datafusion::config::ConfigOptions,
) -> Option<TaskEstimation> {
let plan = plan.as_any().downcast_ref::<NumbersExec>()?;
let plan = plan.downcast_ref::<NumbersExec>()?;
let cfg: &NumbersConfig = cfg.extensions.get()?;
let task_count = (plan.ranges_per_task[0].end - plan.ranges_per_task[0].start) as f64
/ cfg.numbers_per_task as f64;
Expand All @@ -329,7 +320,7 @@ impl TaskEstimator for NumbersTaskEstimator {
task_count: usize,
_cfg: &datafusion::config::ConfigOptions,
) -> Option<Arc<dyn ExecutionPlan>> {
let plan = plan.as_any().downcast_ref::<NumbersExec>()?;
let plan = plan.downcast_ref::<NumbersExec>()?;
let range = &plan.ranges_per_task[0];
let chunk_size = ((range.end - range.start) as f64 / task_count as f64).ceil() as i64;

Expand Down
1 change: 1 addition & 0 deletions src/common/task_context_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) fn task_ctx_with_extension<T: Send + Sync + 'static>(
ctx.session_id(),
ctx.session_config().clone().with_extension(Arc::new(ext)),
ctx.scalar_functions().clone(),
ctx.higher_order_functions().clone(),
ctx.aggregate_functions().clone(),
ctx.window_functions().clone(),
ctx.runtime_env(),
Expand Down
5 changes: 0 additions & 5 deletions src/coordinator/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::StreamExt;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use std::sync::Mutex;
Expand Down Expand Up @@ -116,10 +115,6 @@ impl ExecutionPlan for DistributedExec {
"DistributedExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn properties(&self) -> &Arc<PlanProperties> {
self.plan.properties()
}
Expand Down
4 changes: 2 additions & 2 deletions src/coordinator/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
.get_work_unit_feed(plan)
{
wuf
} else if let Some(ciu) = plan.as_any().downcast_ref::<ChildrenIsolatorUnionExec>() {
} else if let Some(ciu) = plan.downcast_ref::<ChildrenIsolatorUnionExec>() {
for (child_i, ctx) in &ciu.task_idx_map[ctx.task_index] {
let child = &ciu.children[*child_i];
// Just recurse to children that will actually get executed by this
Expand Down Expand Up @@ -293,7 +293,7 @@ impl<'a> CoordinatorToWorkerTaskSpawner<'a> {
.get_work_unit_feed(plan)
{
wuf
} else if let Some(ciu) = plan.as_any().downcast_ref::<ChildrenIsolatorUnionExec>() {
} else if let Some(ciu) = plan.downcast_ref::<ChildrenIsolatorUnionExec>() {
for (child_i, dt_ctx) in &ciu.task_idx_map[dt_ctx.task_index] {
// Just recurse to children that will actually get executed by this
// ChildrenIsolatorUnionExec.
Expand Down
2 changes: 1 addition & 1 deletion src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ impl DistributedExt for SessionConfig {
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
{
set_distributed_work_unit_feed(self, move |plan: &Arc<dyn ExecutionPlan>| {
plan.as_any().downcast_ref::<T>().and_then(&getter)
plan.downcast_ref::<T>().and_then(&getter)
})
}

Expand Down
Loading
Loading