From d4cc10f0fba91b3a99da6e40894fae969a9241a4 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Thu, 26 Jun 2025 09:27:47 +0800 Subject: [PATCH 01/23] Upgrade DataFusion 48.0.0 (#61) * Upgrade to DF48 * fix bug * update * update more --- Cargo.toml | 22 +++++++++++----------- src/materialized/dependencies.rs | 2 +- src/rewrite/exploitation.rs | 9 ++++++++- src/rewrite/normal_form.rs | 23 ++++++++++++++--------- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2ee4fd..07c88d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.80" [dependencies] -arrow = "55" -arrow-schema = "55" +arrow = "55.1.0" +arrow-schema = "55.1.0" async-trait = "0.1" dashmap = "6" -datafusion = "47" -datafusion-common = "47" -datafusion-expr = "47" -datafusion-functions = "47" -datafusion-functions-aggregate = "47" -datafusion-optimizer = "47" -datafusion-physical-expr = "47" -datafusion-physical-plan = "47" -datafusion-sql = "47" +datafusion = "48" +datafusion-common = "48" +datafusion-expr = "48" +datafusion-functions = "48" +datafusion-functions-aggregate = "48" +datafusion-optimizer = "48" +datafusion-physical-expr = "48" +datafusion-physical-plan = "48" +datafusion-sql = "48" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 5b42692..de50181 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -231,7 +231,7 @@ impl TableFunctionImpl for StaleFilesUdtf { /// Extract table name from args passed to TableFunctionImpl::call() fn get_table_name(args: &[Expr]) -> Result<&String> { match &args[0] { - Expr::Literal(ScalarValue::Utf8(Some(table_name))) => Ok(table_name), + Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) => Ok(table_name), _ => Err(DataFusionError::Plan( "expected a single string literal argument to mv_dependencies".to_string(), )), diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index b6fb761..e8f3003 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -427,7 +427,14 @@ impl ExecutionPlan for OneOfExec { } fn statistics(&self) -> Result { - self.candidates[self.best].statistics() + self.candidates[self.best].partition_statistics(None) + } + + fn partition_statistics( + &self, + partition: Option, + ) -> Result { + self.candidates[self.best].partition_statistics(partition) } } diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 0d6f7d0..8d4bc62 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -455,6 +455,10 @@ impl Predicate { self.eq_classes[idx].columns.insert(c2.clone()); } (Some(&i), Some(&j)) => { + if i == j { + // The two columns are already in the same equivalence class. + return Ok(()); + } // We need to merge two existing column eq classes. // Delete the eq class with a larger index, @@ -593,7 +597,7 @@ impl Predicate { /// Add a binary expression to our collection of filters. fn insert_binary_expr(&mut self, left: &Expr, op: Operator, right: &Expr) -> Result<()> { match (left, op, right) { - (Expr::Column(c), op, Expr::Literal(v)) => { + (Expr::Column(c), op, Expr::Literal(v, _)) => { if let Err(e) = self.add_range(c, &op, v) { // Add a range can fail in some cases, so just fallthrough log::debug!("failed to add range filter: {e}"); @@ -601,7 +605,7 @@ impl Predicate { return Ok(()); } } - (Expr::Literal(_), op, Expr::Column(_)) => { + (Expr::Literal(_, _), op, Expr::Column(_)) => { if let Some(swapped) = op.swap() { return self.insert_binary_expr(right, swapped, left); } @@ -714,14 +718,14 @@ impl Predicate { extra_range_filters.push(Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(other_column.clone())), op: Operator::Eq, - right: Box::new(Expr::Literal(range.lower().clone())), + right: Box::new(Expr::Literal(range.lower().clone(), None)), })) } else { if !range.lower().is_null() { extra_range_filters.push(Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(other_column.clone())), op: Operator::GtEq, - right: Box::new(Expr::Literal(range.lower().clone())), + right: Box::new(Expr::Literal(range.lower().clone(), None)), })) } @@ -729,7 +733,7 @@ impl Predicate { extra_range_filters.push(Expr::BinaryExpr(BinaryExpr { left: Box::new(Expr::Column(other_column.clone())), op: Operator::LtEq, - right: Box::new(Expr::Literal(range.upper().clone())), + right: Box::new(Expr::Literal(range.upper().clone(), None)), })) } } @@ -984,7 +988,8 @@ mod test { let ctx = SessionContext::new_with_config( SessionConfig::new() .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.explain.logical_plan_only", true), + .set_bool("datafusion.explain.logical_plan_only", true) + .set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false), ); let t1_path = tempdir()?; @@ -996,11 +1001,11 @@ mod test { ctx.sql(&format!( " CREATE EXTERNAL TABLE t1 ( - column1 VARCHAR, - column2 BIGINT, + column1 VARCHAR, + column2 BIGINT, column3 CHAR ) - STORED AS PARQUET + STORED AS PARQUET LOCATION '{}'", t1_path.path().to_string_lossy() )) From 25e5ccc06aca8b03892dc2bc7062517d992fb16f Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 1 Aug 2025 08:51:20 +0800 Subject: [PATCH 02/23] Upgrade to DF49 (#75) * Upgrade to DF49 * fix licenses * use 49 * resolve comments --- Cargo.toml | 22 +++++++++++----------- deny.toml | 4 +++- src/materialized/dependencies.rs | 28 ++++++++++++++-------------- src/materialized/file_metadata.rs | 12 ++++++------ src/materialized/row_metadata.rs | 2 +- src/rewrite/exploitation.rs | 27 ++++++++++++++++----------- src/rewrite/normal_form.rs | 3 +-- tests/materialized_listing_table.rs | 15 ++++++++------- 8 files changed, 60 insertions(+), 53 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 07c88d7..30a06ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.80" [dependencies] -arrow = "55.1.0" -arrow-schema = "55.1.0" +arrow = "55.2.0" +arrow-schema = "55.2.0" async-trait = "0.1" dashmap = "6" -datafusion = "48" -datafusion-common = "48" -datafusion-expr = "48" -datafusion-functions = "48" -datafusion-functions-aggregate = "48" -datafusion-optimizer = "48" -datafusion-physical-expr = "48" -datafusion-physical-plan = "48" -datafusion-sql = "48" +datafusion = "49" +datafusion-common = "49" +datafusion-expr = "49" +datafusion-functions = "49" +datafusion-functions-aggregate = "49" +datafusion-optimizer = "49" +datafusion-physical-expr = "49" +datafusion-physical-plan = "49" +datafusion-sql = "49" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/deny.toml b/deny.toml index a24420a..b516c5d 100644 --- a/deny.toml +++ b/deny.toml @@ -24,6 +24,8 @@ allow = [ "BSD-3-Clause", "CC0-1.0", "Unicode-3.0", - "Zlib" + "Zlib", + "ISC", + "bzip2-1.0.6" ] version = 2 diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index de50181..9150e93 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -1447,7 +1447,7 @@ mod test { .enumerate() .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i)) .collect(); - println!("indices: {:?}", partition_col_indices); + println!("indices: {partition_col_indices:?}"); let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; println!( "inexact projection pushdown:\n{}", @@ -1720,19 +1720,19 @@ mod test { ", projection: &["year"], expected_plan: vec![ - "+--------------+--------------------------------------------------+", - "| plan_type | plan |", - "+--------------+--------------------------------------------------+", - "| logical_plan | Union |", - "| | Projection: coalesce(t1.year, t2.year) AS year |", - "| | Full Join: Using t1.year = t2.year |", - "| | SubqueryAlias: t1 |", - "| | Projection: t1.column1 AS year |", - "| | TableScan: t1 projection=[column1] |", - "| | SubqueryAlias: t2 |", - "| | TableScan: t2 projection=[year] |", - "| | TableScan: t3 projection=[year] |", - "+--------------+--------------------------------------------------+", + "+--------------+--------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------------------------------+", + "| logical_plan | Union |", + "| | Projection: coalesce(CAST(t1.year AS Utf8View), t2.year) AS year |", + "| | Full Join: Using CAST(t1.year AS Utf8View) = t2.year |", + "| | SubqueryAlias: t1 |", + "| | Projection: t1.column1 AS year |", + "| | TableScan: t1 projection=[column1] |", + "| | SubqueryAlias: t2 |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+--------------------------------------------------------------------+", ], expected_output: vec![ "+------+", diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index d5a5c8e..2c68405 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -226,7 +226,7 @@ impl ExecutionPlan for FileMetadataExec { .map(|record_batch| { record_batch .project(&projection) - .map_err(|e| DataFusionError::ArrowError(e, None)) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) }) .collect::>(); } @@ -858,7 +858,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (1, '2021'), (2, '2022'), (3, '2023'), @@ -882,7 +882,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO private.t1 VALUES + "INSERT INTO private.t1 VALUES (1, '2021', '01'), (2, '2022', '02'), (3, '2023', '03'), @@ -906,7 +906,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO datafusion_mv.public.t3 VALUES + "INSERT INTO datafusion_mv.public.t3 VALUES (1, '2021-01-01'), (2, '2022-02-02'), (3, '2023-03-03'), @@ -929,8 +929,8 @@ mod test { ctx.sql( // Remove timestamps and trim (randomly generated) file names since they're not stable in tests "CREATE VIEW file_metadata_test_view AS SELECT - * EXCLUDE(file_path, last_modified), - regexp_replace(file_path, '/[^/]*$', '/') AS file_path + * EXCLUDE(file_path, last_modified), + regexp_replace(file_path, '/[^/]*$', '/') AS file_path FROM file_metadata", ) .await diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index 6476f39..fa12cdf 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -98,7 +98,7 @@ impl RowMetadataRegistry { .get(&table.to_string()) .map(|o| Arc::clone(o.value())) .or_else(|| self.default_source.clone()) - .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table))) + .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}"))) } } diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index e8f3003..eef0708 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -23,7 +23,7 @@ use datafusion::catalog::TableProvider; use datafusion::datasource::provider_as_source; use datafusion::execution::context::SessionState; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::{LexRequirement, PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use datafusion::physical_expr_common::sort_expr::format_physical_sort_requirement_list; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; @@ -32,6 +32,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tre use datafusion_common::{DataFusionError, Result, TableReference}; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; use datafusion_optimizer::OptimizerRule; +use datafusion_physical_expr::OrderingRequirements; use itertools::Itertools; use ordered_float::OrderedFloat; @@ -316,7 +317,7 @@ pub struct OneOfExec { // Optionally declare a required input ordering // This will inform DataFusion to add sorts to children, // which will improve cost estimation of candidates - required_input_ordering: Option, + required_input_ordering: Option, // Index of the candidate with the best cost best: usize, // Cost function to use in optimization @@ -337,7 +338,7 @@ impl OneOfExec { /// Create a new `OneOfExec` pub fn try_new( candidates: Vec>, - required_input_ordering: Option, + required_input_ordering: Option, cost: CostFn, ) -> Result { if candidates.is_empty() { @@ -366,7 +367,7 @@ impl OneOfExec { /// Modify this plan's required input ordering. /// Used for sort pushdown - pub fn with_required_input_ordering(self, requirement: Option) -> Self { + pub fn with_required_input_ordering(self, requirement: Option) -> Self { Self { required_input_ordering: requirement, ..self @@ -387,7 +388,7 @@ impl ExecutionPlan for OneOfExec { self.candidates[self.best].properties() } - fn required_input_ordering(&self) -> Vec> { + fn required_input_ordering(&self) -> Vec> { vec![self.required_input_ordering.clone(); self.children().len()] } @@ -455,12 +456,16 @@ impl DisplayAs for OneOfExec { format_physical_sort_requirement_list( &self .required_input_ordering - .clone() - .unwrap_or_default() - .into_iter() - .map(PhysicalSortExpr::from) - .map(PhysicalSortRequirement::from) - .collect_vec() + .as_ref() + .map(|req| { + req.clone() + .into_single() + .into_iter() + .map(PhysicalSortExpr::from) + .map(PhysicalSortRequirement::from) + .collect_vec() + }) + .unwrap_or_default(), ) ) } diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 8d4bc62..7efd53d 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -988,8 +988,7 @@ mod test { let ctx = SessionContext::new_with_config( SessionConfig::new() .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.explain.logical_plan_only", true) - .set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false), + .set_bool("datafusion.explain.logical_plan_only", true), ); let t1_path = tempdir()?; diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 5ad9d25..7798f9a 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -185,7 +185,7 @@ async fn setup() -> Result { .await?; ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (1, '2023-01-01', 'A'), (2, '2023-01-02', 'B'), (3, '2023-01-03', 'C'), @@ -251,7 +251,7 @@ async fn test_materialized_listing_table_incremental_maintenance() -> Result<()> // Insert another row into the source table ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (7, '2024-12-07', 'W')", ) .await? @@ -352,12 +352,13 @@ impl MaterializedListingTable { file_sort_order: opts.file_sort_order, }); + let mut listing_table_config = ListingTableConfig::new(config.table_path); + if let Some(options) = options { + listing_table_config = listing_table_config.with_listing_options(options); + } + listing_table_config = listing_table_config.with_schema(Arc::new(file_schema)); Ok(MaterializedListingTable { - inner: ListingTable::try_new(ListingTableConfig { - table_paths: vec![config.table_path], - file_schema: Some(Arc::new(file_schema)), - options, - })?, + inner: ListingTable::try_new(listing_table_config)?, query: normalized_query, schema: normalized_schema, }) From 3026895c6d8bc76388d628043016f90954aaa635 Mon Sep 17 00:00:00 2001 From: Qi Zhu Date: Wed, 3 Sep 2025 13:55:04 +0800 Subject: [PATCH 03/23] Upgrade DF to 49.0.2 (#86) * Upgrade DF to 49.0.2 * fix clippy and upgrade rust * upgrade version for Cargo Deny --- .github/workflows/ci.yml | 2 +- Cargo.toml | 20 ++++++++++---------- tests/materialized_listing_table.rs | 2 +- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 14bf19f..43ec6b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -132,7 +132,7 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - uses: EmbarkStudios/cargo-deny-action@v1 + - uses: EmbarkStudios/cargo-deny-action@v2 with: command: check license diff --git a/Cargo.toml b/Cargo.toml index 30a06ee..e84a189 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,22 +25,22 @@ authors = ["Matthew Cramerus "] license = "Apache-2.0" description = "Materialized Views & Query Rewriting in DataFusion" keywords = ["arrow", "arrow-rs", "datafusion"] -rust-version = "1.80" +rust-version = "1.85.1" [dependencies] arrow = "55.2.0" arrow-schema = "55.2.0" async-trait = "0.1" dashmap = "6" -datafusion = "49" -datafusion-common = "49" -datafusion-expr = "49" -datafusion-functions = "49" -datafusion-functions-aggregate = "49" -datafusion-optimizer = "49" -datafusion-physical-expr = "49" -datafusion-physical-plan = "49" -datafusion-sql = "49" +datafusion = "49.0.2" +datafusion-common = "49.0.2" +datafusion-expr = "49.0.2" +datafusion-functions = "49.0.2" +datafusion-functions-aggregate = "49.0.2" +datafusion-optimizer = "49.0.2" +datafusion-physical-expr = "49.0.2" +datafusion-physical-plan = "49.0.2" +datafusion-sql = "49.0.2" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 7798f9a..5d91a67 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -504,7 +504,7 @@ impl TableProvider for MaterializedListingTable { self.inner.get_table_definition() } - fn get_logical_plan(&self) -> Option> { + fn get_logical_plan(&self) -> Option> { // We _could_ return the LogicalPlan here, // but it will cause this table to be treated like a regular view // and the materialized results will not be used. From d8364fbf6d9f4831c4bd900ddd17729a876a999f Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sat, 13 Sep 2025 16:45:50 +0800 Subject: [PATCH 04/23] make cost fn accept candidates (#83) --- src/rewrite/exploitation.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index eef0708..11bcc14 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -42,7 +42,9 @@ use super::normal_form::SpjNormalForm; use super::QueryRewriteOptions; /// A cost function. Used to evaluate the best physical plan among multiple equivalent choices. -pub type CostFn = Arc f64 + Send + Sync>; +pub type CostFn = Arc< + dyn for<'a> Fn(Box + 'a>) -> Vec + Send + Sync, +>; /// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes. #[derive(Debug)] @@ -346,9 +348,10 @@ impl OneOfExec { "can't create OneOfExec with empty children".to_string(), )); } - let best = candidates + + let best = cost(Box::new(candidates.iter().map(|c| c.as_ref()))) .iter() - .position_min_by_key(|candidate| OrderedFloat(cost(candidate.as_ref()))) + .position_min_by_key(|&cost| OrderedFloat(*cost)) .unwrap(); Ok(Self { @@ -441,11 +444,7 @@ impl ExecutionPlan for OneOfExec { impl DisplayAs for OneOfExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let costs = self - .children() - .iter() - .map(|c| (self.cost)(c.as_ref())) - .collect_vec(); + let costs = (self.cost)(Box::new(self.children().iter().map(|arc| arc.as_ref()))); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( From 540f29ee5534e64f8bdd4ebea1efc4ef9566417e Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 15 Sep 2025 19:16:41 +0800 Subject: [PATCH 05/23] Fix empty unnest columns handling when pushdown_projection_inexact (#88) --- src/materialized/dependencies.rs | 104 ++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 3 deletions(-) diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 9150e93..e586430 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -603,6 +603,21 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> R .map(Expr::Column) .collect_vec(); + // GUARD: if after pushdown the set of relevant unnest columns is empty, + // avoid constructing an Unnest node with zero exec columns (which will + // later error in Unnest::try_new). Instead, simply project the + // desired output columns from the child plan (after pushing down the child projection). + // Related PR: https://github.com/apache/datafusion/pull/16632, after that we must + // also check for empty exec columns here. + if columns_to_unnest.is_empty() { + return LogicalPlanBuilder::from(pushdown_projection_inexact( + Arc::unwrap_or_clone(unnest.input), + &child_indices, + )?) + .project(columns_to_project)? + .build(); + } + LogicalPlanBuilder::from(pushdown_projection_inexact( Arc::unwrap_or_clone(unnest.input), &child_indices, @@ -922,7 +937,7 @@ mod test { use std::{any::Any, collections::HashSet, sync::Arc}; use arrow::util::pretty::pretty_format_batches; - use arrow_schema::SchemaRef; + use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef}; use datafusion::{ assert_batches_eq, assert_batches_sorted_eq, catalog::{Session, TableProvider}, @@ -930,8 +945,9 @@ mod test { execution::session_state::SessionStateBuilder, prelude::{DataFrame, SessionConfig, SessionContext}, }; - use datafusion_common::{Column, Result, ScalarValue}; - use datafusion_expr::{Expr, JoinType, LogicalPlan, TableType}; + use datafusion_common::{Column, DFSchema, Result, ScalarValue}; + use datafusion_expr::builder::unnest; + use datafusion_expr::{EmptyRelation, Expr, JoinType, LogicalPlan, TableType}; use datafusion_physical_plan::ExecutionPlan; use itertools::Itertools; @@ -1837,4 +1853,86 @@ mod test { Ok(()) } + + #[test] + fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> { + // This test simulates a simplified MV scenario: + // + // WITH events_structs AS ( + // SELECT id, date, unnest(events) AS evs + // FROM base_table + // ), + // flattened_events AS ( + // SELECT id, date, evs.event_type, evs.event_time + // FROM events_structs + // ), + // SELECT id, date, MAX(...) ... + // GROUP BY id, date + // + // The partition column is "date". During dependency plan + // building we only request "date" from this subtree, + // so pushdown_projection_inexact receives indices for + // the `date` column only. The guard must kick in: + // unnest(events) becomes unused, and the plan should + // collapse to just projecting `date` from the child. + + // 1. Build schema for base table + let id = Field::new("id", DataType::Utf8, true); + let date = Field::new("date", DataType::Utf8, true); + + // events: list> + let event_type = Field::new("event_type", DataType::Utf8, true); + let event_time = Field::new("event_time", DataType::Utf8, true); + let events_struct = Field::new( + "item", + DataType::Struct(Fields::from(vec![event_type, event_time])), + true, + ); + let events = Field::new( + "events", + DataType::List(FieldRef::from(Box::new(events_struct))), + true, + ); + + // Build DFSchema: (id, date, events) + let qualified_fields = vec![ + (None, Arc::new(id.clone())), + (None, Arc::new(date.clone())), + (None, Arc::new(events.clone())), + ]; + let df_schema = + DFSchema::new_with_metadata(qualified_fields, std::collections::HashMap::new())?; + + // 2. Build a dummy child plan (EmptyRelation with the schema) + let empty = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(df_schema), + }); + + // 3. Wrap it with an Unnest node on the "events" column + let events_col = Column::from_name("events"); + let unnest_plan = unnest(empty.clone(), vec![events_col.clone()])?; + + // 4. Partition column is "date". Look up its actual index dynamically. + let date_idx = unnest_plan + .schema() + .index_of_column(&Column::from_name("date"))?; + let mut indices: HashSet = HashSet::new(); + indices.insert(date_idx); + + // 5. Call pushdown_projection_inexact with {date} + let res = pushdown_projection_inexact(unnest_plan, &indices)?; + + // 6. Assert the result schema only contains `date` + let cols: Vec = res + .schema() + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect(); + + assert_eq!(cols, vec!["date"]); + + Ok(()) + } } From 169eb66628e943406b145f2c738e8def7e945458 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Tue, 16 Sep 2025 19:07:50 +0800 Subject: [PATCH 06/23] upgrade to DF50 (#87) --- Cargo.toml | 24 ++++++++++++------------ src/materialized/file_metadata.rs | 7 ++++--- src/materialized/hive_partition.rs | 4 ++-- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e84a189..c30b62e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.85.1" [dependencies] -arrow = "55.2.0" -arrow-schema = "55.2.0" -async-trait = "0.1" +arrow = "56.0.0" +arrow-schema = "56.0.0" +async-trait = "0.1.89" dashmap = "6" -datafusion = "49.0.2" -datafusion-common = "49.0.2" -datafusion-expr = "49.0.2" -datafusion-functions = "49.0.2" -datafusion-functions-aggregate = "49.0.2" -datafusion-optimizer = "49.0.2" -datafusion-physical-expr = "49.0.2" -datafusion-physical-plan = "49.0.2" -datafusion-sql = "49.0.2" +datafusion = "50" +datafusion-common = "50" +datafusion-expr = "50" +datafusion-functions = "50" +datafusion-functions-aggregate = "50" +datafusion-optimizer = "50" +datafusion-physical-expr = "50" +datafusion-physical-plan = "50" +datafusion-sql = "50" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 2c68405..85e5838 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -17,8 +17,9 @@ use arrow::array::{StringBuilder, TimestampNanosecondBuilder, UInt64Builder}; use arrow::record_batch::RecordBatch; -use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; +use arrow_schema::{DataType, Field, TimeUnit}; use async_trait::async_trait; +use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, Session}; use datafusion::datasource::listing::ListingTableUrl; @@ -35,7 +36,7 @@ use datafusion::physical_plan::{ use datafusion::{ catalog::CatalogProviderList, execution::TaskContext, physical_plan::SendableRecordBatchStream, }; -use datafusion_common::{DataFusionError, Result, ScalarValue, ToDFSchema}; +use datafusion_common::{DFSchema, DataFusionError, Result, ScalarValue}; use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use futures::stream::{self, BoxStream}; @@ -103,7 +104,7 @@ impl TableProvider for FileMetadata { filters: &[Expr], limit: Option, ) -> Result> { - let dfschema = self.table_schema.clone().to_dfschema()?; + let dfschema = DFSchema::try_from(self.table_schema.as_ref().clone())?; let filters = filters .iter() diff --git a/src/materialized/hive_partition.rs b/src/materialized/hive_partition.rs index 43ebfde..ad381cb 100644 --- a/src/materialized/hive_partition.rs +++ b/src/materialized/hive_partition.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use arrow::array::{Array, StringArray, StringBuilder}; -use arrow_schema::DataType; +use datafusion::arrow::datatypes::DataType; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{ @@ -79,7 +79,7 @@ pub fn hive_partition_udf() -> ScalarUDF { ScalarUDF::new_from_impl(udf_impl) } -#[derive(Debug)] +#[derive(Debug, Hash, PartialEq, Eq)] struct HivePartitionUdf { pub signature: Signature, } From 5915f4cddbd90383494102751c40319f4a79d49c Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Wed, 24 Sep 2025 23:18:02 -0500 Subject: [PATCH 07/23] Support static partition columns for MV (#89) * Support static partition columns for MV * runtime checks * unit test for dynamic partition columns * lint --- src/materialized.rs | 48 ++++++++-- src/materialized/dependencies.rs | 155 +++++++++++++++++++++++++------ src/rewrite/exploitation.rs | 2 +- 3 files changed, 167 insertions(+), 38 deletions(-) diff --git a/src/materialized.rs b/src/materialized.rs index e089591..8e093c0 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -41,6 +41,7 @@ use datafusion::{ catalog::TableProvider, datasource::listing::{ListingTable, ListingTableUrl}, }; +use datafusion_common::DataFusionError; use datafusion_expr::LogicalPlan; use itertools::Itertools; @@ -110,6 +111,14 @@ pub trait Materialized: ListingTableLike { fn config(&self) -> MaterializedConfig { MaterializedConfig::default() } + + /// Which partition columns are 'static'. + /// Static partition columns are those that are used in incremental view maintenance. + /// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`]. + /// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh. + fn static_partition_columns(&self) -> Vec { + ::partition_columns(self) + } } /// Register a [`Materialized`] implementation in this registry. @@ -122,13 +131,38 @@ pub fn register_materialized() { } /// Attempt to cast the given TableProvider into a [`Materialized`]. -/// If the table's type has not been registered using [`register_materialized`], will return `None`. -pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> { - TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| { - TABLE_TYPE_REGISTRY - .cast_to_decorator(table) - .and_then(|decorator| cast_to_materialized(decorator.base())) - }) +/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`. +/// +/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated. +/// In particular, checks that the static partition columns are a prefix of all partition columns. +pub fn cast_to_materialized( + table: &dyn TableProvider, +) -> Result, DataFusionError> { + let materialized = match TABLE_TYPE_REGISTRY + .cast_to_materialized(table) + .map(Ok) + .or_else(|| { + TABLE_TYPE_REGISTRY + .cast_to_decorator(table) + .and_then(|decorator| cast_to_materialized(decorator.base()).transpose()) + }) + .transpose()? + { + None => return Ok(None), + Some(m) => m, + }; + + let static_partition_cols = materialized.static_partition_columns(); + let all_partition_cols = materialized.partition_columns(); + + if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..] + { + return Err(DataFusionError::Plan(format!( + "Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})" + ))); + } + + Ok(Some(materialized)) } /// A `TableProvider` that decorates other `TableProvider`s. diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index e586430..9cf8b8a 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -106,7 +106,7 @@ impl TableFunctionImpl for FileDependenciesUdtf { let table = util::get_table(self.catalog_list.as_ref(), &table_ref) .map_err(|e| DataFusionError::Plan(e.to_string()))?; - let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!( + let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!( "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"), ))?; @@ -166,6 +166,15 @@ impl TableFunctionImpl for StaleFilesUdtf { &self.mv_dependencies.config_options.catalog.default_schema, ); + let table = util::get_table(self.mv_dependencies.catalog_list.as_ref(), &table_ref) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!( + "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"), + ))?; + + let url = mv.table_paths()[0].to_string(); + let num_static_partition_cols = mv.static_partition_columns().len(); + let logical_plan = LogicalPlanBuilder::scan_with_filters("dependencies", dependencies, None, vec![])? .aggregate( @@ -187,16 +196,21 @@ impl TableFunctionImpl for StaleFilesUdtf { )? .aggregate( vec![ - // We want to omit the file name along with any "special" partitions - // from the path before comparing it to the target partition. Special - // partitions must be leaf most nodes and are designated by a leading - // underscore. These are useful for adding additional information to a - // filename without affecting partitioning or staleness checks. - regexp_replace( - col("file_path"), - lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"), - lit("/"), - None, + // Omit the file name along with any "special" partitions. + // This can include dynamic partition columns as well as some internal + // metadata columns that are not part of the schema + // + // We implement this by only taking the first N columns, + // where N is the number of static partition columns. + array_element( + regexp_match( + col("file_path"), + lit(format!( + "{url}(?:[^/=]+=[^/]+/){{{num_static_partition_cols}}}" + )), + None, + ), + lit(1), ) .alias("existing_target"), ], @@ -249,16 +263,16 @@ pub fn mv_dependencies_plan( let plan = materialized_view.query().clone(); - let partition_cols = materialized_view.partition_columns(); - let partition_col_indices = plan + let static_partition_cols = materialized_view.static_partition_columns(); + let static_partition_col_indices = plan .schema() .fields() .iter() .enumerate() - .filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i)) + .filter_map(|(i, f)| static_partition_cols.contains(f.name()).then_some(i)) .collect(); - let pruned_plan_with_source_files = if partition_cols.is_empty() { + let pruned_plan_with_source_files = if static_partition_cols.is_empty() { get_source_files_all_partitions( materialized_view, &config_options.catalog, @@ -266,14 +280,14 @@ pub fn mv_dependencies_plan( ) } else { // Prune non-partition columns from all table scans - let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?; + let pruned_plan = pushdown_projection_inexact(plan, &static_partition_col_indices)?; // Now bubble up file metadata to the top of the plan push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry) }?; // We now have data in the following form: - // (partition_col0, partition_col1, ..., __meta) + // (static_partition_col0, static_partition_col1, ..., __meta) // The last column is a list of structs containing the row metadata // We need to unnest it @@ -289,7 +303,7 @@ pub fn mv_dependencies_plan( LogicalPlanBuilder::from(pruned_plan_with_source_files) .unnest_column(files)? .project(vec![ - construct_target_path_from_partition_columns(materialized_view).alias("target"), + construct_target_path_from_static_partition_columns(materialized_view).alias("target"), get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), get_field(files_col.clone(), "table_schema").alias("source_table_schema"), get_field(files_col.clone(), "table_name").alias("source_table_name"), @@ -300,14 +314,16 @@ pub fn mv_dependencies_plan( .build() } -fn construct_target_path_from_partition_columns(materialized_view: &dyn Materialized) -> Expr { +fn construct_target_path_from_static_partition_columns( + materialized_view: &dyn Materialized, +) -> Expr { let table_path = lit(materialized_view.table_paths()[0] .as_str() // Trim the / (we'll add it back later if we need it) .trim_end_matches("/")); // Construct the paths for the build targets let mut hive_column_path_elements = materialized_view - .partition_columns() + .static_partition_columns() .iter() .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec())) .collect::>(); @@ -965,6 +981,7 @@ mod test { struct MockMaterializedView { table_path: ListingTableUrl, partition_columns: Vec, + static_partition_columns: Option>, // default = all partition columns query: LogicalPlan, file_ext: &'static str, } @@ -1012,6 +1029,12 @@ mod test { fn query(&self) -> LogicalPlan { self.query.clone() } + + fn static_partition_columns(&self) -> Vec { + self.static_partition_columns + .clone() + .unwrap_or_else(|| self.partition_columns.clone()) + } } #[derive(Debug)] @@ -1181,12 +1204,14 @@ mod test { #[tokio::test] async fn test_deps() { + #[derive(Debug, Default)] struct TestCase { name: &'static str, query_to_analyze: &'static str, table_name: &'static str, - table_path: ListingTableUrl, + table_path: &'static str, partition_cols: Vec<&'static str>, + static_partition_cols: Option>, file_extension: &'static str, expected_output: Vec<&'static str>, file_metadata: &'static str, @@ -1198,7 +1223,7 @@ mod test { query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", - table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + table_path: "s3://m1/", partition_cols: vec!["partition_column"], file_extension: ".parquet", expected_output: vec![ @@ -1225,12 +1250,13 @@ mod test { "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, - TestCase { name: "omit 'special' partition columns", + TestCase { name: "omit internal metadata partition columns", query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", - table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + table_path: "s3://m1/", partition_cols: vec!["partition_column"], file_extension: ".parquet", expected_output: vec![ @@ -1257,6 +1283,7 @@ mod test { "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "transform year/month/day partition into timestamp partition", @@ -1266,7 +1293,7 @@ mod test { feed FROM t2", table_name: "m2", - table_path: ListingTableUrl::parse("s3://m2/").unwrap(), + table_path: "s3://m2/", partition_cols: vec!["timestamp", "feed"], file_extension: ".parquet", expected_output: vec![ @@ -1301,12 +1328,63 @@ mod test { "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", "+-----------------------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() + }, + TestCase { + name: "omit dynamic partition columns", + query_to_analyze: " + SELECT + year, + month, + day, + column2, + COUNT(*) AS ct + FROM t2 + GROUP BY year, month, day, column2 + ", + table_name: "m_dynamic", + table_path: "s3://m_dynamic/", + partition_cols: vec!["year", "month", "day", "column2"], + static_partition_cols: Some(vec!["year", "month", "day"]), + file_extension: ".parquet", + expected_output: vec![ + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m_dynamic/year=2023/month=01/day=01/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m_dynamic/year=2023/month=01/day=02/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m_dynamic/year=2023/month=01/day=03/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m_dynamic/year=2024/month=12/day=04/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m_dynamic/year=2024/month=12/day=05/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m_dynamic/year=2024/month=12/day=06/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=01/column2=1/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=02/column2=2/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=03/column2=3/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=04/column2=4/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=05/column2=5/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=06/column2=6/data.01.parquet', '2023-07-10T16:00:00Z', 0) + ", + expected_stale_files_output: vec![ + "+-------------------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+-------------------------------------------+----------------------+-----------------------+----------+", + "| s3://m_dynamic/year=2023/month=01/day=01/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m_dynamic/year=2023/month=01/day=02/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m_dynamic/year=2023/month=01/day=03/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "| s3://m_dynamic/year=2024/month=12/day=04/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m_dynamic/year=2024/month=12/day=05/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m_dynamic/year=2024/month=12/day=06/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "+-------------------------------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "materialized view has no partitions", query_to_analyze: "SELECT column1 AS output FROM t3", table_name: "m3", - table_path: ListingTableUrl::parse("s3://m3/").unwrap(), + table_path: "s3://m3/", partition_cols: vec![], file_extension: ".parquet", expected_output: vec![ @@ -1327,12 +1405,13 @@ mod test { "| s3://m3/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+----------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "simple equijoin on year", query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1361,6 +1440,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "triangular join on year", @@ -1373,7 +1453,7 @@ mod test { INNER JOIN t3 ON (t2.year <= t3.year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1403,6 +1483,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "triangular left join, strict <", @@ -1415,7 +1496,7 @@ mod test { LEFT JOIN t3 ON (t2.year < t3.year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1443,6 +1524,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, ]; @@ -1476,12 +1558,16 @@ mod test { // Register table with a decorator to exercise this functionality Arc::new(DecoratorTable { inner: Arc::new(MockMaterializedView { - table_path: case.table_path.clone(), + table_path: ListingTableUrl::parse(case.table_path).unwrap(), partition_columns: case .partition_cols .iter() .map(|s| s.to_string()) .collect(), + static_partition_columns: case + .static_partition_cols + .as_ref() + .map(|list| list.iter().map(|s| s.to_string()).collect()), query: plan, file_ext: case.file_extension, }), @@ -1498,6 +1584,15 @@ mod test { .collect() .await?; + context + .sql(&format!( + "SELECT * FROM file_metadata WHERE table_name = '{}'", + case.table_name + )) + .await? + .show() + .await?; + let df = context .sql(&format!( "SELECT * FROM mv_dependencies('{}', 'v2')", diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 11bcc14..06145ae 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -59,7 +59,7 @@ impl ViewMatcher { for (resolved_table_ref, table) in super::util::list_tables(session_state.catalog_list().as_ref()).await? { - let Some(mv) = cast_to_materialized(table.as_ref()) else { + let Some(mv) = cast_to_materialized(table.as_ref())? else { continue; }; From 0c408a73ba41bf11bf5ef8af3ee97d1133e24483 Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Thu, 9 Oct 2025 20:41:36 -0500 Subject: [PATCH 08/23] Improved documentation on IVM algorithm (#90) * inline mermaid diagrams * additional comment * mention duplicates, not cardinality * more documentation --- Cargo.toml | 1 + src/lib.rs | 3 + src/materialized.rs | 1 - src/materialized/dependencies.rs | 101 ++++++++++++++++++++++++++++++ src/materialized/file_metadata.rs | 2 +- src/rewrite.rs | 2 - src/rewrite/exploitation.rs | 28 +++++++++ src/rewrite/normal_form.rs | 2 +- 8 files changed, 135 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c30b62e..cec3755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.85.1" [dependencies] +aquamarine = "0.6.0" arrow = "56.0.0" arrow-schema = "56.0.0" async-trait = "0.1.89" diff --git a/src/lib.rs b/src/lib.rs index 8b26d85..238bbd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,9 @@ pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. +/// +/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), +/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*. pub mod rewrite; /// Configuration options for materialized view related features. diff --git a/src/materialized.rs b/src/materialized.rs index 8e093c0..c3cf30e 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -/// Track dependencies of materialized data in object storage pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 9cf8b8a..42ddd35 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -15,6 +15,29 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait. +Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed +into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is +implied by the type bound `Materialized: ListingTableLike`. + +The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to +partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions +of the materialized view and its source tables. This gives rise to two natural phases of the algorithm: +1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources. + This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order, + or even set equality of the original query. + * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. + * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies. +2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources. + After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource) + This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row. + The output rows are then transformed into object storage paths to generate the final graph. + +The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`]. + */ + use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, @@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> { } } +#[cfg_attr(doc, aquamarine::aquamarine)] /// Returns a logical plan that, when executed, lists expected build targets /// for this materialized view, together with the dependencies for each target. +/// +/// See the [module documentation](super) for an overview of the algorithm. +/// +/// # Example +/// +/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily +/// close prices of a stock from its trades, together with the settlement price from a daily statistics table: +/// +/// ```sql +/// SELECT +/// ticker, +/// LAST_VALUE(trades.price) AS close, +/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, +/// trades.date AS date +/// FROM trades +/// JOIN daily_statistics ON +/// trades.ticker = daily_statistics.ticker AND +/// trades.date = daily_statistics.reference_date AND +/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS +/// GROUP BY ticker, date +/// ``` +/// +/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`. +/// This query gives us the following logical plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection:
ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, trades.date AS date"] +/// A --> B["Aggregate:
expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)]
groupby=[ticker, trades.date]"] +/// B --> C["Inner Join:
trades.ticker = daily_statistics.ticker AND
trades.date = daily_statistics.reference_date AND
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades
projection=[ticker, price, date]"] +/// C --> E["TableScan: daily_statistics
projection=[ticker, settlement_price, reference_date, date]"] +/// ``` +/// +/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: trades.date AS date"] +/// A --> B["Projection: trades.date"] +/// B --> C["Inner Join:
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades (projection=[date])"] +/// C --> E["TableScan: daily_statistics (projection=[date])"] +/// ``` +/// +/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that +/// we cannot partition the materialized view on aggregate expressions. +/// +/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan, +/// together with the target path constructed from the (static) partition columns. This gives us the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"] +/// A --> B["Projection: __meta, trades.date AS date"] +/// B --> C["Projection:
concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"] +/// C --> D["Inner Join:
daily_statistics_meta.date BETWEEN trades_meta.date AND trades_meta.date + INTERVAL 2 WEEKS"] +/// D --> E["TableScan: trades_meta (projection=[__meta, date])"] +/// D --> F["TableScan: daily_statistics_meta (projection=[__meta, date])"] +/// ``` +/// +/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column +/// unnested into its components. The final output looks roughly like this: +/// +/// ```text +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// ``` pub fn mv_dependencies_plan( materialized_view: &dyn Materialized, row_metadata_registry: &RowMetadataRegistry, diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 85e5838..bca69ea 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -722,7 +722,7 @@ impl FileMetadataBuilder { } } -/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider. #[async_trait] pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { /// List all files in the store for the given `url` prefix. diff --git a/src/rewrite.rs b/src/rewrite.rs index 170c88f..da24cc5 100644 --- a/src/rewrite.rs +++ b/src/rewrite.rs @@ -17,8 +17,6 @@ use datafusion::{common::extensions_options, config::ConfigExtension}; -/// Implements a query rewriting optimizer, also known as "view exploitation" -/// in some academic sources. pub mod exploitation; pub mod normal_form; diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 06145ae..7a95113 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -15,6 +15,34 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a query rewriting optimizer, also known as "view exploitation" +in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code, +which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views. + +The query rewriting process spans both the logical and physical planning phases and can be described as follows: + +1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views + and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms. + If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression + and one or more candidate rewrites using materialized views. +2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`] + physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node. +3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions + to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost + estimations in the next phase. +4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node. + The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node + with its selected best candidate plan. + +In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention +that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization. +While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner +to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes. + +*/ + use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 7efd53d..e09a54b 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -17,7 +17,7 @@ /*! -This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf), +This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query. The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows: From 3910e12ce9c9d6cb3a6c6585cdc1801a35b8e7b3 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sat, 11 Oct 2025 15:03:11 +0800 Subject: [PATCH 09/23] Support limit pushdown for OneOfExec (#94) --- src/rewrite/exploitation.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 7a95113..b353a84 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -450,6 +450,10 @@ impl ExecutionPlan for OneOfExec { )?)) } + fn supports_limit_pushdown(&self) -> bool { + true + } + fn execute( &self, partition: usize, From f3d5eb1c7e3aaedea15e38d7bf86adda0dc83391 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 24 Oct 2025 17:10:37 +0800 Subject: [PATCH 10/23] Improve the doc (#95) --- src/lib.rs | 32 +++++++++++++++++++++++++++++++- src/materialized/dependencies.rs | 3 ++- src/materialized/util.rs | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 238bbd7..fd6dd8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,37 @@ #![deny(missing_docs)] -//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion. +//! # datafusion-materialized-views +//! +//! `datafusion-materialized-views` provides robust algorithms and core functionality for working with materialized views in [DataFusion](https://arrow.apache.org/datafusion/). +//! +//! ## Key Features +//! +//! - **Incremental View Maintenance**: Efficiently tracks dependencies between Hive-partitioned tables and their materialized views, allowing users to determine which partitions need to be refreshed when source data changes. This is achieved via UDTFs such as `mv_dependencies` and `stale_files`. +//! - **Query Rewriting**: Implements a view matching optimizer that rewrites queries to automatically leverage materialized views when beneficial, based on the techniques described in the [paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf). +//! - **Pluggable Metadata Sources**: Supports custom metadata sources for incremental view maintenance, with default support for object store metadata via the `FileMetadata` and `RowMetadataRegistry` components. +//! - **Extensible Table Abstractions**: Defines traits such as `ListingTableLike` and `Materialized` to abstract over Hive-partitioned tables and materialized views, enabling custom implementations and easy registration for use in the maintenance and rewriting logic. +//! +//! ## Typical Workflow +//! +//! 1. **Define and Register Views**: Implement a custom table type that implements the `Materialized` trait, and register it using `register_materialized`. +//! 2. **Metadata Initialization**: Set up `FileMetadata` and `RowMetadataRegistry` to track file-level and row-level metadata. +//! 3. **Dependency Tracking**: Use the `mv_dependencies` UDTF to generate build graphs for materialized views, and `stale_files` to identify partitions that require recomputation. +//! 4. **Query Optimization**: Enable the query rewriting optimizer to transparently rewrite queries to use materialized views where possible. +//! +//! ## Example +//! +//! See the README and integration tests for a full walkthrough of setting up and maintaining a materialized view, including dependency tracking and query rewriting. +//! +//! ## Limitations +//! +//! - Currently supports only Hive-partitioned tables in object storage, with the smallest update unit being a file. +//! - Future work may generalize to other storage backends and partitioning schemes. +//! +//! ## References +//! +//! - [Optimizing Queries Using Materialized Views: A Practical, Scalable Solution](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) +//! - [DataFusion documentation](https://datafusion.apache.org/) /// Code for incremental view maintenance against Hive-partitioned tables. /// diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 42ddd35..060e1e0 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -62,7 +62,8 @@ use crate::materialized::META_COLUMN; use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized}; -/// A table function that shows build targets and dependencies for a materialized view: +/// A table function that, for a given materialized view, lists all the output data objects (build targets) +/// generated during its construction or refresh, as well as all the source data objects (dependencies) it relies on. /// /// ```ignore /// fn mv_dependencies(table_ref: Utf8) -> Table diff --git a/src/materialized/util.rs b/src/materialized/util.rs index 7977f8d..cb4afad 100644 --- a/src/materialized/util.rs +++ b/src/materialized/util.rs @@ -21,6 +21,7 @@ use datafusion::catalog::{CatalogProviderList, TableProvider}; use datafusion_common::{DataFusionError, Result}; use datafusion_sql::ResolvedTableReference; +/// Retrieves a table from the catalog list given a resolved table reference. pub fn get_table( catalog_list: &dyn CatalogProviderList, table_ref: &ResolvedTableReference, @@ -35,6 +36,7 @@ pub fn get_table( // NOTE: this is bad, we are calling async code in a sync context. // We should file an issue about async in UDTFs. + // See: https://github.com/apache/datafusion/issues/17663 futures::executor::block_on(schema.table(table_ref.table.as_ref())) .map_err(|e| e.context(format!("couldn't get table '{}'", table_ref.table)))? .ok_or_else(|| DataFusionError::Plan(format!("no such table {}", table_ref.schema))) From 6162aea1d5d4952b9a821d2cfddb37245390dbbd Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 24 Oct 2025 17:24:25 +0800 Subject: [PATCH 11/23] Chore: remove useless lines in changelog (#97) --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4c0114..6d96f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,6 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] - ## [0.1.1](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.0...v0.1.1) - 2025-01-07 ### Added From e6205944dd70d5483f18c338a2d89acf4374c278 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 24 Oct 2025 09:33:23 +0000 Subject: [PATCH 12/23] chore: release v0.2.0 (#96) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- CHANGELOG.md | 42 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 +- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d96f00..ef55a1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,48 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.0](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.1...v0.2.0) - 2025-10-24 + +### Added +- `Decorator` trait ([#26](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/26)) (by @suremarc) - #26 + +### Other +- remove useless lines in changelog ([#97](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/97)) (by @xudong963) - #97 +- Improve the doc ([#95](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/95)) (by @xudong963) - #95 +- Support limit pushdown for OneOfExec ([#94](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/94)) (by @xudong963) - #94 +- Improved documentation on IVM algorithm ([#90](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/90)) (by @suremarc) - #90 +- Support static partition columns for MV ([#89](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/89)) (by @suremarc) - #89 +- upgrade to DF50 ([#87](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/87)) (by @xudong963) - #87 +- Fix empty unnest columns handling when pushdown_projection_inexact ([#88](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/88)) (by @zhuqi-lucas) - #88 +- make cost fn accept candidates ([#83](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/83)) (by @xudong963) - #83 +- Upgrade DF to 49.0.2 ([#86](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/86)) (by @zhuqi-lucas) - #86 +- Upgrade to DF49 ([#75](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/75)) (by @xudong963) - #75 +- Upgrade DataFusion 48.0.0 ([#61](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/61)) (by @xudong963) - #61 +- Allow customization of `list_all_files` function. ([#69](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/69)) (by @jared-m-combs) - #69 +- Allow for 'special' partitions that are omitted in the staleness check. ([#68](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/68)) (by @jared-m-combs) - #68 +- don't panic if eq class is not found ([#60](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/60)) (by @suremarc) - #60 +- Handle table scan filters that reference dropped columns ([#59](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/59)) (by @suremarc) - #59 +- exclude some materialized views from query rewriting ([#57](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/57)) (by @suremarc) - #57 +- Optimize performance bottleneck if projection is large ([#56](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/56)) (by @xudong963) - #56 +- Upgrade df47 ([#55](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/55)) (by @xudong963) - #55 +- Update itertools requirement from 0.13 to 0.14 ([#32](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/32)) (by @dependabot[bot]) - #32 +- Update ordered-float requirement from 4.6.0 to 5.0.0 ([#49](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/49)) (by @dependabot[bot]) - #49 +- Upgrade DF46 ([#48](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/48)) (by @xudong963) - #48 +- Update extension ([#45](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/45)) (by @matthewmturner) - #45 +- make explain output stable ([#44](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/44)) (by @suremarc) - #44 +- Add alternate analysis for MVs with no partition columns ([#39](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/39)) (by @suremarc) - #39 +- upgrade to datafusion 45 ([#38](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/38)) (by @suremarc) - #38 +- use nanosecond timestamps in file metadata ([#28](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/28)) (by @suremarc) - #28 + +### Contributors + +* @xudong963 +* @suremarc +* @zhuqi-lucas +* @jared-m-combs +* @dependabot[bot] +* @matthewmturner + ## [0.1.1](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.0...v0.1.1) - 2025-01-07 ### Added diff --git a/Cargo.toml b/Cargo.toml index cec3755..b0e6fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "datafusion-materialized-views" -version = "0.1.1" +version = "0.2.0" edition = "2021" homepage = "https://github.com/datafusion-contrib/datafusion-materialized-views" repository = "https://github.com/datafusion-contrib/datafusion-materialized-views" From ec7e88ab4a2fdc4c039e4302cb1dc6c682585dcf Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 28 Oct 2025 13:19:42 +0800 Subject: [PATCH 13/23] Add benchmark for heavy operation for datafusion-materialized-views (#101) --- Cargo.toml | 6 + benches/materialized_views_benchmark.rs | 182 ++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 benches/materialized_views_benchmark.rs diff --git a/Cargo.toml b/Cargo.toml index b0e6fd2..c839523 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,13 @@ ordered-float = "5.0.0" [dev-dependencies] anyhow = "1.0.95" +criterion = "0.4" env_logger = "0.11.6" tempfile = "3.14.0" tokio = "1.42.0" url = "2.5.4" + +[[bench]] +name = "materialized_views_benchmark" +harness = false +path = "benches/materialized_views_benchmark.rs" diff --git a/benches/materialized_views_benchmark.rs b/benches/materialized_views_benchmark.rs new file mode 100644 index 0000000..7bc2f52 --- /dev/null +++ b/benches/materialized_views_benchmark.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use std::sync::Arc; +use std::time::Duration; + +use datafusion::datasource::provider_as_source; +use datafusion::datasource::TableProvider; +use datafusion::prelude::SessionContext; +use datafusion_common::Result as DfResult; +use datafusion_expr::LogicalPlan; +use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm; +use datafusion_sql::TableReference; +use tokio::runtime::Builder; + +// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1} +fn make_create_table_sql(table_name: &str, ncols: usize) -> String { + let cols = (0..ncols) + .map(|i| format!("c{} INT", i)) + .collect::>() + .join(", "); + format!( + "CREATE TABLE {table} ({cols})", + table = table_name, + cols = cols + ) +} + +// Utility: generate a base SELECT that projects all columns and has a couple filters +fn make_base_sql(table_name: &str, ncols: usize) -> String { + let cols = (0..ncols) + .map(|i| format!("c{}", i)) + .collect::>() + .join(", "); + let mut where_clauses = vec![]; + if ncols > 0 { + where_clauses.push("c0 >= 0".to_string()); + } + if ncols > 1 { + where_clauses.push("c0 + c1 >= 0".to_string()); + } + let where_part = if where_clauses.is_empty() { + "".to_string() + } else { + format!(" WHERE {}", where_clauses.join(" AND ")) + }; + format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part) +} + +// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance) +fn make_query_sql(table_name: &str, ncols: usize) -> String { + let take = std::cmp::max(1, ncols / 2); + let cols = (0..take) + .map(|i| format!("c{}", i)) + .collect::>() + .join(", "); + + let mut where_clauses = vec![]; + if ncols > 0 { + where_clauses.push("c0 >= 10".to_string()); + } + if ncols > 1 { + where_clauses.push("c0 * c1 > 100".to_string()); + } + if ncols > 10 { + where_clauses.push(format!("c{} >= 0", ncols - 1)); + } + + let where_part = if where_clauses.is_empty() { + "".to_string() + } else { + format!(" WHERE {}", where_clauses.join(" AND ")) + }; + + format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part) +} + +// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider +fn build_fixture_for_cols( + rt: &tokio::runtime::Runtime, + ncols: usize, +) -> DfResult<(LogicalPlan, LogicalPlan, Arc)> { + rt.block_on(async move { + let ctx = SessionContext::new(); + + // create table + let table_name = "t"; + let create_sql = make_create_table_sql(table_name, ncols); + ctx.sql(&create_sql).await?.collect().await?; // create table in catalog + + // base and query plans (optimize to normalize) + let base_sql = make_base_sql(table_name, ncols); + let query_sql = make_query_sql(table_name, ncols); + + let base_df = ctx.sql(&base_sql).await?; + let base_plan = base_df.into_optimized_plan()?; + + let query_df = ctx.sql(&query_sql).await?; + let query_plan = query_df.into_optimized_plan()?; + + // get table provider (Arc) + let table_ref = TableReference::bare(table_name); + let provider: Arc = ctx.table_provider(table_ref.clone()).await?; + + Ok((base_plan, query_plan, provider)) + }) +} + +// Criterion benchmark +fn criterion_benchmark(c: &mut Criterion) { + // columns to test + let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320]; + + // build a tokio runtime that's broadly compatible + let rt = Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + let mut group = c.benchmark_group("view_matcher_spj"); + group.warm_up_time(Duration::from_secs(1)); + group.measurement_time(Duration::from_secs(5)); + group.sample_size(30); + + for &ncols in &col_cases { + // Build fixture + let (base_plan, query_plan, provider) = + build_fixture_for_cols(&rt, ncols).expect("fixture"); + + // Measure SpjNormalForm::new for base_plan and query_plan separately + let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols)); + group.throughput(Throughput::Elements(1)); + group.bench_with_input(id_base, &base_plan, |b, plan| { + b.iter(|| { + let _nf = SpjNormalForm::new(plan).unwrap(); + }); + }); + + let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols)); + group.bench_with_input(id_query_nf, &query_plan, |b, plan| { + b.iter(|| { + let _nf = SpjNormalForm::new(plan).unwrap(); + }); + }); + + // Precompute normal forms once (to measure rewrite_from cost only) + let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf"); + let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf"); + + // qualifier for rewrite_from and a source created from the provider + let qualifier = TableReference::bare("mv"); + let source = provider_as_source(Arc::clone(&provider)); + + // Benchmark rewrite_from (this is the heavy check) + let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols)); + group.bench_with_input(id_rewrite, &ncols, |b, &_n| { + b.iter(|| { + let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source)); + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From f1f7ad8e72d818e65652218aecd0ea72ed731db8 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 19 Nov 2025 17:57:11 +0800 Subject: [PATCH 14/23] Upgrade DF51.0.0 (#104) * Upgrade DF51.0.0 * udpate --- Cargo.toml | 22 +++++++++++----------- src/rewrite/normal_form.rs | 4 ++-- tests/materialized_listing_table.rs | 8 +++++--- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c839523..b6fc338 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,19 +29,19 @@ rust-version = "1.85.1" [dependencies] aquamarine = "0.6.0" -arrow = "56.0.0" -arrow-schema = "56.0.0" +arrow = "57.0.0" +arrow-schema = "57.0.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "50" -datafusion-common = "50" -datafusion-expr = "50" -datafusion-functions = "50" -datafusion-functions-aggregate = "50" -datafusion-optimizer = "50" -datafusion-physical-expr = "50" -datafusion-physical-plan = "50" -datafusion-sql = "50" +datafusion = "51" +datafusion-common = "51" +datafusion-expr = "51" +datafusion-functions = "51" +datafusion-functions-aggregate = "51" +datafusion-optimizer = "51" +datafusion-physical-expr = "51" +datafusion-physical-plan = "51" +datafusion-sql = "51" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index e09a54b..c09f18c 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -1099,7 +1099,7 @@ mod test { assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); let expected = concat_batches( - &query_plan.schema().as_ref().clone().into(), + &query_plan.schema().inner().clone(), &context .execute_logical_plan(query_plan) .await? @@ -1108,7 +1108,7 @@ mod test { )?; let result = concat_batches( - &rewritten.schema().as_ref().clone().into(), + &rewritten.schema().inner().clone(), &context .execute_logical_plan(rewritten) .await? diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 5d91a67..7bba358 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -32,7 +32,9 @@ use datafusion::{ }, prelude::{SessionConfig, SessionContext}, }; -use datafusion_common::{Constraints, DataFusionError, ParamValues, ScalarValue, Statistics}; +use datafusion_common::{ + metadata::ScalarAndMetadata, Constraints, DataFusionError, ParamValues, ScalarValue, Statistics, +}; use datafusion_expr::{ col, dml::InsertOp, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, SortExpr, TableProviderFilterPushDown, TableType, @@ -550,7 +552,7 @@ impl TableProvider for MaterializedListingTable { fn parse_partition_values( path: &ObjectPath, partition_columns: &[(String, DataType)], -) -> Result, DataFusionError> { +) -> Result, DataFusionError> { let parts = path.parts().map(|part| part.to_owned()).collect::>(); let pairs = parts @@ -562,7 +564,7 @@ fn parse_partition_values( .iter() .map(|(column, datatype)| { let value = pairs.get(column.as_str()).copied().map(String::from); - ScalarValue::Utf8(value).cast_to(datatype) + ScalarAndMetadata::from(ScalarValue::Utf8(value)).cast_storage_to(datatype) }) .collect::, _>>()?; From 9c915808e6607d3165f1bb24d2a032e7e5d71e94 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 12 Dec 2025 20:27:46 +0800 Subject: [PATCH 15/23] Fix mv dependencies involving unrelated files (#107) --- src/materialized/dependencies.rs | 84 ++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 060e1e0..50f806c 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -639,6 +639,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> R _ => unreachable!(), }; + if new_projection.is_empty() { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + })); + } + TableScan::try_new( scan.table_name, scan.source, @@ -1798,11 +1805,11 @@ mod test { query_to_analyze: "SELECT column1 AS output FROM t3", projection: &[], expected_plan: vec![ - "+--------------+-----------------------------+", - "| plan_type | plan |", - "+--------------+-----------------------------+", - "| logical_plan | TableScan: t3 projection=[] |", - "+--------------+-----------------------------+", + "+--------------+-----------------------+", + "| plan_type | plan |", + "+--------------+-----------------------+", + "| logical_plan | EmptyRelation: rows=1 |", + "+--------------+-----------------------+", ], expected_output: vec![ "++", @@ -2051,6 +2058,73 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_cross_join_unrelated_files() -> Result<()> { + let context = setup().await?; + + // Test case: Cross join where only columns from left table (t1) are selected + // The cross join with t3 affects cardinality but we don't select any t3 columns + // Expected: Only files from t1 should be in dependencies, not from t3 + // BUG: Currently t3 files are incorrectly included in dependencies + let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3"; + + let plan = context.sql(query).await?.into_optimized_plan()?; + + println!("Original plan:\n{}", plan.display_indent()); + + // We're partitioning on column1 which only comes from t1 + let partition_col_indices: HashSet = [0].into_iter().collect(); // column1 is at index 0 + + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!("After pushdown:\n{}", analyzed.display_indent()); + + // Register materialized view + context.register_table( + "mv_cross_join", + Arc::new(MockMaterializedView { + table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(), + partition_columns: vec!["column1".to_string()], + static_partition_columns: None, + query: plan, + file_ext: ".parquet", + }), + )?; + + // Add file metadata for the MV + context.sql( + "INSERT INTO file_metadata VALUES + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)" + ).await?.collect().await?; + + // Get dependencies + let df = context + .sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')") + .await?; + let batches = df.collect().await?; + + // Print the actual dependencies for debugging + println!("Actual dependencies:"); + println!("{}", pretty_format_batches(&batches)?); + + // Expected: Only t1 files should be in dependencies, NOT t3 files + // This test currently FAILS because t3 files are incorrectly included + let expected = [ + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[test] fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> { // This test simulates a simplified MV scenario: From 4539acf8fa56c27570f56f559cd969434e50b009 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 15 Dec 2025 11:02:51 +0800 Subject: [PATCH 16/23] prevent rewriting strict inequality to closed interval for non-discrete types (#21) (#108) Co-authored-by: Matt Friede <7852262+Friede80@users.noreply.github.com> --- src/rewrite/normal_form.rs | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index c09f18c..7203cca 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -527,8 +527,8 @@ impl Predicate { // so handling of open intervals is done by adding/subtracting the smallest increment. // However, there is not really a public API to do this, // other than the satisfy_greater method. - Operator::Lt => Ok( - match satisfy_greater( + Operator::Lt => { + let range_val = match satisfy_greater( &Interval::try_new(value.clone(), value.clone())?, &Interval::make_unbounded(&value.data_type())?, true, @@ -538,11 +538,19 @@ impl Predicate { *range = None; return Ok(()); } - }, - ), - // Same thing as above. - Operator::Gt => Ok( - match satisfy_greater( + }; + // If the type is not discrete (e.g. Utf8), satisfy_greater may return an unchanged value. + // This means the interval could not be tightened and it is unsafe to produce a closed interval + if range_val.upper() == &value { + Err(DataFusionError::Plan( + "cannot represent strict inequality as closed interval for non-discrete types".to_string(), + )) + } else { + Ok(range_val) + } + } + Operator::Gt => { + let range_val = match satisfy_greater( &Interval::make_unbounded(&value.data_type())?, &Interval::try_new(value.clone(), value.clone())?, true, @@ -552,8 +560,15 @@ impl Predicate { *range = None; return Ok(()); } - }, - ), + }; + if range_val.lower() == &value { + Err(DataFusionError::Plan( + "cannot represent strict inequality as closed interval for non-discrete types".to_string(), + )) + } else { + Ok(range_val) + } + } _ => Err(DataFusionError::Plan( "unsupported binary expression".to_string(), )), @@ -1154,6 +1169,11 @@ mod test { // we are allowed to substitute column1 for column3 and vice versa. "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", }, + TestCase { + name: "range filter with inequality on non-discrete type", + base: "SELECT * FROM t1", + query: "SELECT column1 FROM t1 WHERE column1 < '2022'", + }, TestCase { name: "duplicate expressions (X-209)", base: "SELECT * FROM t1", From 0d1aefae252ab808088a3ca378694e7d5a9f09da Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 19 Dec 2025 22:22:46 +0800 Subject: [PATCH 17/23] Expose mv_plans for ViewMatcher (#22) (#109) --- src/rewrite/exploitation.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index b353a84..9a05638 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -114,6 +114,11 @@ impl ViewMatcher { Ok(ViewMatcher { mv_plans }) } + + /// Returns the materialized views and their corresponding normal forms. + pub fn mv_plans(&self) -> &HashMap, SpjNormalForm)> { + &self.mv_plans + } } impl OptimizerRule for ViewMatcher { From 547aa4d70371226503cb08a58b2db29f716e339f Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 12 Jan 2026 23:31:11 +0800 Subject: [PATCH 18/23] Upgrade DF52 (#111) * Upgrade DF52 * update test * use 52 --- Cargo.toml | 24 ++++++++++++------------ src/materialized/dependencies.rs | 29 +++++++++++++++-------------- src/rewrite/normal_form.rs | 4 ++-- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b6fc338..041add6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,23 +29,23 @@ rust-version = "1.85.1" [dependencies] aquamarine = "0.6.0" -arrow = "57.0.0" -arrow-schema = "57.0.0" +arrow = "57.1.0" +arrow-schema = "57.1.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "51" -datafusion-common = "51" -datafusion-expr = "51" -datafusion-functions = "51" -datafusion-functions-aggregate = "51" -datafusion-optimizer = "51" -datafusion-physical-expr = "51" -datafusion-physical-plan = "51" -datafusion-sql = "51" +datafusion = "52" +datafusion-common = "52" +datafusion-expr = "52" +datafusion-functions = "52" +datafusion-functions-aggregate = "52" +datafusion-optimizer = "52" +datafusion-physical-expr = "52" +datafusion-physical-plan = "52" +datafusion-sql = "52" futures = "0.3" itertools = "0.14" log = "0.4" -object_store = "0.12" +object_store = "0.12.4" ordered-float = "5.0.0" [dev-dependencies] diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 50f806c..7295b36 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -860,7 +860,7 @@ fn project_dfschema(schema: &DFSchema, indices: &HashSet) -> Result range, }; - if other_range.contains(range)? != Interval::CERTAINLY_TRUE { + if other_range.contains(range)? != Interval::TRUE { return Ok(None); } - if range.contains(other_range)? != Interval::CERTAINLY_TRUE { + if range.contains(other_range)? != Interval::TRUE { if !(range.lower().is_null() || range.upper().is_null()) && (range.lower().eq(range.upper())) { From 4071577345a1ba932017609ca2cecaa58adcfc34 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 12 Jan 2026 23:48:08 +0800 Subject: [PATCH 19/23] Expose a `get_mv_candidates_for_table` API for ViewMatcher (#112) * Expose a get_mv_candidates_for_table API for ViewMatcher * refine comments --- src/rewrite/exploitation.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 9a05638..997d28a 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -119,6 +119,43 @@ impl ViewMatcher { pub fn mv_plans(&self) -> &HashMap, SpjNormalForm)> { &self.mv_plans } + + /// Returns materialized views that potentially reference the given table. + /// + /// This is a preliminary filter - it only checks if the MV references the table + /// but does NOT guarantee that the MV can actually be used to rewrite queries + /// involving that table. + /// + /// # Arguments + /// + /// * `table_reference` - The table reference to find candidates for + /// + /// # Returns + /// + /// A vector of tuples containing: + /// - The materialized view's table reference + /// - The materialized view's table provider + /// - The materialized view's SPJ normal form + pub fn get_potential_mv_candidates_for_table( + &self, + table_reference: &TableReference, + ) -> Vec<(TableReference, Arc, &SpjNormalForm)> { + self.mv_plans + .iter() + .filter_map(|(mv_table_ref, (mv_provider, mv_normal_form))| { + // Check if this MV references the target table + if mv_normal_form.referenced_tables().contains(table_reference) { + Some(( + mv_table_ref.clone(), + Arc::clone(mv_provider), + mv_normal_form, + )) + } else { + None + } + }) + .collect() + } } impl OptimizerRule for ViewMatcher { From 02bad143cfc7a9867c1db2b317e91826c2780e22 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 20 Jan 2026 13:45:13 +0800 Subject: [PATCH 20/23] perf: single-pass plan traversal in Predicate::new (#113) * perf: single-pass plan traversal in Predicate::new * address comments * add join error test --- src/rewrite/normal_form.rs | 235 +++++++++++++++++++++++++------------ 1 file changed, 163 insertions(+), 72 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 5590862..195f8da 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -233,22 +233,11 @@ impl SpjNormalForm { .map(|expr| predicate.normalize_expr(expr)) .collect(); - let mut referenced_tables = vec![]; - original_plan - .apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - referenced_tables.push(scan.table_name.clone()); - } - - Ok(TreeNodeRecursion::Continue) - }) - // No chance of error since we never return Err -- this unwrap is safe - .unwrap(); - Ok(Self { output_schema: Arc::clone(original_plan.schema()), output_exprs, - referenced_tables, + // Reuse referenced_tables collected during Predicate::new to avoid extra traversal + referenced_tables: predicate.referenced_tables.clone(), predicate, }) } @@ -344,84 +333,95 @@ struct Predicate { ranges_by_equivalence_class: Vec>, /// Filter expressions that aren't column equality predicates or range filters. residuals: HashSet, + /// Tables referenced in this plan (collected during single-pass traversal) + referenced_tables: Vec, } impl Predicate { + /// Create a new Predicate by analyzing the given logical plan. + /// Uses single-pass traversal to collect schema, columns, filters, and referenced tables. fn new(plan: &LogicalPlan) -> Result { let mut schema = DFSchema::empty(); - plan.apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - let new_schema = DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - scan.source.schema().as_ref(), - )?; - schema = if schema.fields().is_empty() { - new_schema - } else { - schema.join(&new_schema)? - } - } + let mut columns_info: Vec<(Column, arrow::datatypes::DataType)> = Vec::new(); + let mut filters: Vec = Vec::new(); + let mut referenced_tables: Vec = Vec::new(); + + // Single traversal to collect everything + plan.apply(|node| { + match node { + LogicalPlan::TableScan(scan) => { + // Collect referenced table + referenced_tables.push(scan.table_name.clone()); - Ok(TreeNodeRecursion::Continue) - })?; + // Build schema + let new_schema = DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + scan.source.schema().as_ref(), + )?; + + // Collect columns with their data types + for (table_ref, field) in new_schema.iter() { + columns_info.push(( + Column::new(table_ref.cloned(), field.name()), + field.data_type().clone(), + )); + } - let mut new = Self { - schema, - eq_classes: vec![], - eq_class_idx_by_column: HashMap::default(), - ranges_by_equivalence_class: vec![], - residuals: HashSet::new(), - }; + // Merge schema + schema = if schema.fields().is_empty() { + new_schema + } else { + schema.join(&new_schema)? + }; - // Collect all referenced columns - plan.apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - for (i, (table_ref, field)) in DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - scan.source.schema().as_ref(), - )? - .iter() - .enumerate() - { - let column = Column::new(table_ref.cloned(), field.name()); - let data_type = field.data_type(); - new.eq_classes - .push(ColumnEquivalenceClass::new_singleton(column.clone())); - new.eq_class_idx_by_column.insert(column, i); - new.ranges_by_equivalence_class - .push(Some(Interval::make_unbounded(data_type)?)); + // Collect filters from TableScan + filters.extend(scan.filters.iter().cloned()); + } + LogicalPlan::Filter(filter) => { + filters.push(filter.predicate.clone()); } - } - - Ok(TreeNodeRecursion::Continue) - })?; - - // Collect any filters - plan.apply(|plan| { - let filters = match plan { - LogicalPlan::TableScan(scan) => scan.filters.as_slice(), - LogicalPlan::Filter(filter) => core::slice::from_ref(&filter.predicate), LogicalPlan::Join(_join) => { return Err(DataFusionError::Internal( "joins are not supported yet".to_string(), - )) + )); } - LogicalPlan::Projection(_) => &[], + LogicalPlan::Projection(_) => {} _ => { return Err(DataFusionError::Plan(format!( "unsupported logical plan: {}", - plan.display() - ))) + node.display() + ))); } - }; - - for expr in filters.iter().flat_map(split_conjunction) { - new.insert_conjuct(expr)?; } - Ok(TreeNodeRecursion::Continue) })?; + // Initialize data structures with known capacity + let num_columns = columns_info.len(); + let mut eq_classes = Vec::with_capacity(num_columns); + let mut eq_class_idx_by_column = HashMap::with_capacity(num_columns); + let mut ranges_by_equivalence_class = Vec::with_capacity(num_columns); + + for (i, (column, data_type)) in columns_info.into_iter().enumerate() { + eq_classes.push(ColumnEquivalenceClass::new_singleton(column.clone())); + eq_class_idx_by_column.insert(column, i); + ranges_by_equivalence_class.push(Some(Interval::make_unbounded(&data_type)?)); + } + + let mut new = Self { + schema, + eq_classes, + eq_class_idx_by_column, + ranges_by_equivalence_class, + residuals: HashSet::new(), + referenced_tables, + }; + + // Process all collected filters + for expr in filters.iter().flat_map(split_conjunction) { + new.insert_conjuct(expr)?; + } + Ok(new) } @@ -1163,11 +1163,11 @@ mod test { TestCase { name: "range filter + equality predicate", base: - "SELECT column1, column2 FROM t1 WHERE column1 = column3 AND column1 >= '2022'", + "SELECT column1, column2 FROM t1 WHERE column1 = column3 AND column1 >= '2022'", query: // Since column1 = column3 in the original view, // we are allowed to substitute column1 for column3 and vice versa. - "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", + "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", }, TestCase { name: "range filter with inequality on non-discrete type", @@ -1229,4 +1229,95 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_predicate_new_collects_expected_data() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a table with known schema + ctx.sql( + "CREATE TABLE test_table ( + col1 INT, + col2 VARCHAR, + col3 DOUBLE + )", + ) + .await? + .collect() + .await?; + + // Create a plan with filters + let plan = ctx + .sql("SELECT col1, col2 FROM test_table WHERE col1 >= 10 AND col2 = col3") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify referenced_tables is collected + assert_eq!(normal_form.referenced_tables().len(), 1); + assert_eq!(normal_form.referenced_tables()[0].to_string(), "test_table"); + + // Verify output_exprs matches the projection (2 columns) + assert_eq!(normal_form.output_exprs().len(), 2); + + // Verify schema is preserved + assert_eq!(normal_form.output_schema().fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_predicate_new_with_join_returns_error() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE t1 (a INT, b INT)") + .await? + .collect() + .await?; + ctx.sql("CREATE TABLE t2 (c INT, d INT)") + .await? + .collect() + .await?; + + // Test that join returns an error as it's not supported yet + let plan = ctx + .sql("SELECT t1.a, t2.d FROM t1 JOIN t2 ON t1.b = t2.c WHERE t1.a >= 0 AND t2.d <= 100") + .await? + .into_optimized_plan()?; + + let result = SpjNormalForm::new(&plan); + + // Verify that join returns an error + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("joins are not supported yet")); + + Ok(()) + } + + #[tokio::test] + async fn test_predicate_new_with_range_filters() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE range_test (x INT, y INT, z VARCHAR)") + .await? + .collect() + .await?; + + let plan = ctx + .sql("SELECT * FROM range_test WHERE x >= 10 AND x <= 100 AND y = 50") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify all columns are in output + assert_eq!(normal_form.output_exprs().len(), 3); + assert_eq!(normal_form.referenced_tables().len(), 1); + + Ok(()) + } } From 429e52ac740ea7d0c9f43580084e7a6960562f27 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 3 Feb 2026 14:29:56 +0800 Subject: [PATCH 21/23] Optimize rewrite performance (#115) * Optimize rewrite performanc * Add test * fmt * Address comments to add details --- src/rewrite/normal_form.rs | 161 ++++++++++++++++++++++++++++++++----- 1 file changed, 142 insertions(+), 19 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 195f8da..70fd129 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -247,32 +247,50 @@ impl SpjNormalForm { /// This is useful for rewriting queries to use materialized views. pub fn rewrite_from( &self, - mut other: &Self, + other: &Self, qualifier: TableReference, source: Arc, ) -> Result> { log::trace!("rewriting from {qualifier}"); + + // Cache columns() result to avoid repeated Vec allocation in the loop. + // DFSchema::columns() creates a new Vec on each call. + let output_columns = self.output_schema.columns(); + let mut new_output_exprs = Vec::with_capacity(self.output_exprs.len()); // check that our output exprs are sub-expressions of the other one's output exprs for (i, output_expr) in self.output_exprs.iter().enumerate() { - let new_output_expr = other - .predicate - .normalize_expr(output_expr.clone()) - .rewrite(&mut other)? - .data; - - // Check that all references to the original tables have been replaced. - // All remaining column expressions should be unqualified, which indicates - // that they refer to the output of the sub-plan (in this case the view) - if new_output_expr - .column_refs() - .iter() - .any(|c| c.relation.is_some()) - { - return Ok(None); - } + // Fast path for simple Column expressions (most common case). + // This avoids the expensive normalize_expr transform for columns. + let new_output_expr = if let Expr::Column(col) = output_expr { + let normalized_col = other.predicate.normalize_column(col); + match other.find_output_column(&normalized_col) { + Some(rewritten) => rewritten, + None => return Ok(None), // Column not found, can't rewrite + } + } else { + // Slow path: complex expressions need full transform + let new_output_expr = other + .predicate + .normalize_expr(output_expr.clone()) + .rewrite(&mut &*other)? + .data; + + // Check that all references to the original tables have been replaced. + // All remaining column expressions should be unqualified, which indicates + // that they refer to the output of the sub-plan (in this case the view) + if new_output_expr + .column_refs() + .iter() + .any(|c| c.relation.is_some()) + { + return Ok(None); + } + new_output_expr + }; - let column = &self.output_schema.columns()[i]; + // Use cached columns instead of calling .columns() on each iteration + let column = &output_columns[i]; new_output_exprs.push( new_output_expr.alias_qualified(column.relation.clone(), column.name.clone()), ); @@ -299,7 +317,7 @@ impl SpjNormalForm { .into_iter() .chain(range_filters) .chain(residual_filters) - .map(|expr| expr.rewrite(&mut other).unwrap().data) + .map(|expr| expr.rewrite(&mut &*other).unwrap().data) .reduce(|a, b| a.and(b)); if all_filters @@ -318,6 +336,20 @@ impl SpjNormalForm { builder.project(new_output_exprs)?.build().map(Some) } + + /// Fast path: find a column in output_exprs and return rewritten expression. + /// This avoids full tree traversal for simple column lookups. + #[inline] + fn find_output_column(&self, col: &Column) -> Option { + self.output_exprs + .iter() + .position(|e| matches!(e, Expr::Column(c) if c == col)) + .map(|idx| { + Expr::Column(Column::new_unqualified( + self.output_schema.field(idx).name().clone(), + )) + }) + } } /// Stores information on filters from a Select-Project-Join plan. @@ -431,6 +463,17 @@ impl Predicate { .and_then(|&idx| self.eq_classes.get(idx)) } + /// Fast path: normalize a single Column without full tree traversal. + /// This is O(1) lookup instead of O(n) transform. + #[inline] + fn normalize_column(&self, col: &Column) -> Column { + if let Some(eq_class) = self.class_for_column(col) { + eq_class.columns.first().unwrap().clone() + } else { + col.clone() + } + } + /// Add a new column equivalence fn add_equivalence(&mut self, c1: &Column, c2: &Column) -> Result<()> { match ( @@ -792,6 +835,15 @@ impl Predicate { /// Rewrite all expressions in terms of their normal representatives /// with respect to this predicate's equivalence classes. fn normalize_expr(&self, e: Expr) -> Expr { + // Fast path: if it's a simple Column, avoid full transform traversal. + // Even though transform() handles Column efficiently, the machinery setup + // (closures, iterators, Transformed wrappers) has overhead that adds up + // when called thousands of times (e.g., 41 columns × 5-7 MVs × every query). + // Direct HashMap lookup + clone is significantly faster. + if let Expr::Column(ref c) = e { + return Expr::Column(self.normalize_column(c)); + } + e.transform(&|e| { let c = match e { Expr::Column(c) => c, @@ -1320,4 +1372,75 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_normalize_column_fast_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE t (a INT, b INT, c INT)") + .await? + .collect() + .await?; + + // Query with column equivalence: a = b + let plan = ctx + .sql("SELECT a, b, c FROM t WHERE a = b") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify that columns are normalized correctly + // a and b should be in the same equivalence class + assert_eq!(normal_form.output_exprs().len(), 3); + + Ok(()) + } + + #[tokio::test] + async fn test_rewrite_from_with_many_columns() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a wide table to test the columns() caching optimization + ctx.sql( + "CREATE TABLE wide_table ( + c0 INT, c1 INT, c2 INT, c3 INT, c4 INT, + c5 INT, c6 INT, c7 INT, c8 INT, c9 INT + )", + ) + .await? + .collect() + .await?; + + let base_plan = ctx + .sql("SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .into_optimized_plan()?; + + let query_plan = ctx + .sql("SELECT c0, c1, c2 FROM wide_table WHERE c0 >= 10") + .await? + .into_optimized_plan()?; + + let base_nf = SpjNormalForm::new(&base_plan)?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + // Create MV table + ctx.sql("CREATE TABLE mv AS SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .collect() + .await?; + + let table_ref = TableReference::bare("mv"); + let provider = ctx.table_provider(table_ref.clone()).await?; + + // Test that rewrite_from works correctly with cached columns + let result = query_nf.rewrite_from(&base_nf, table_ref, provider_as_source(provider))?; + + assert!(result.is_some()); + let rewritten = result.unwrap(); + assert_eq!(rewritten.schema().fields().len(), 3); + + Ok(()) + } } From 152c88542d90e474d9cd4ae4c63c3c71aa6a3675 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 9 Mar 2026 15:21:10 +0800 Subject: [PATCH 22/23] Support view matcher with boolean binary operation (#117) --- src/rewrite/normal_form.rs | 208 +++++++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 70fd129..b5f2343 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -656,6 +656,27 @@ impl Predicate { fn insert_binary_expr(&mut self, left: &Expr, op: Operator, right: &Expr) -> Result<()> { match (left, op, right) { (Expr::Column(c), op, Expr::Literal(v, _)) => { + // Normalize boolean expressions to canonical form: + // col = false -> NOT col + // col != true -> NOT col + // col = true -> col + // col != false -> col + // This ensures semantic equivalence matching (e.g., "active = false" matches "NOT active") + if let ScalarValue::Boolean(Some(b)) = v { + match (op, b) { + (Operator::Eq, false) | (Operator::NotEq, true) => { + self.residuals + .insert(Expr::Not(Box::new(Expr::Column(c.clone())))); + return Ok(()); + } + (Operator::Eq, true) | (Operator::NotEq, false) => { + self.residuals.insert(Expr::Column(c.clone())); + return Ok(()); + } + _ => {} + } + } + if let Err(e) = self.add_range(c, &op, v) { // Add a range can fail in some cases, so just fallthrough log::debug!("failed to add range filter: {e}"); @@ -1443,4 +1464,191 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_boolean_expression_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + // Create table with boolean column + ctx.sql( + "CREATE TABLE bool_test ( + id INT, + active BOOLEAN, + name VARCHAR + )", + ) + .await? + .collect() + .await?; + + ctx.sql("INSERT INTO bool_test VALUES (1, true, 'a'), (2, false, 'b')") + .await? + .collect() + .await?; + + // MV: uses "active = false" + let mv_plan = ctx + .sql("SELECT * FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv AS SELECT * FROM bool_test WHERE active = false") + .await? + .collect() + .await?; + + // Query: uses "NOT active" (semantically equivalent to "active = false") + let query_plan = ctx + .sql("SELECT id, name FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Expected MV with 'active = false' to match query with 'NOT active'" + ); + + // Also test the reverse: MV with "NOT active", query with "active = false" + let mv_plan2 = ctx + .sql("SELECT * FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let mv_normal_form2 = SpjNormalForm::new(&mv_plan2)?; + + ctx.sql("CREATE TABLE mv2 AS SELECT * FROM bool_test WHERE NOT active") + .await? + .collect() + .await?; + + let query_plan2 = ctx + .sql("SELECT id FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let query_normal_form2 = SpjNormalForm::new(&query_plan2)?; + + let table_ref2 = TableReference::bare("mv2"); + let rewritten2 = query_normal_form2.rewrite_from( + &mv_normal_form2, + table_ref2.clone(), + provider_as_source(ctx.table_provider(table_ref2).await?), + )?; + + assert!( + rewritten2.is_some(), + "Expected MV with 'NOT active' to match query with 'active = false'" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_boolean_column_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE bool_test ( + id INT, + active BOOLEAN, + name VARCHAR + )", + ) + .await? + .collect() + .await?; + + // Test: MV with "active = false" should match query with "NOT active" + let mv_plan = ctx + .sql("SELECT * FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv AS SELECT * FROM bool_test WHERE active = false") + .await? + .collect() + .await?; + + let query_plan = ctx + .sql("SELECT id, name FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + // Should successfully rewrite + assert!( + rewritten.is_some(), + "Expected MV with 'active = false' to match query with 'NOT active'" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_boolean_true_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE bool_test2 ( + id INT, + enabled BOOLEAN + )", + ) + .await? + .collect() + .await?; + + // Test: MV with "enabled = true" should match query with just "enabled" + let mv_plan = ctx + .sql("SELECT * FROM bool_test2 WHERE enabled = true") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv2 AS SELECT * FROM bool_test2 WHERE enabled = true") + .await? + .collect() + .await?; + + let query_plan = ctx + .sql("SELECT id FROM bool_test2 WHERE enabled") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv2"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Expected MV with 'enabled = true' to match query with 'enabled'" + ); + + Ok(()) + } } From 99ff72dd64695e69e26af054696ef1133ce2f2b7 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 23 Jun 2026 14:03:29 +0800 Subject: [PATCH 23/23] expose SpjNormalForm::predicate() + Predicate Display impl `SpjNormalForm` already exposes public getters for `output_schema`, `output_exprs`, and `referenced_tables`, but the `predicate` field is private. Downstream consumers that want to surface the SPJ normal form (e.g. atlas catalog tables exposing `mv_normal_form` for debugging why view matching does or does not engage) currently have to walk the original `LogicalPlan` and re-collect Filter / TableScan filters as a string, which loses the normalization the analyzer applied. This change: * Promotes `Predicate` and `ColumnEquivalenceClass` from private to `pub` so they can be referenced from public APIs. * Adds `pub fn predicate(&self) -> &Predicate` on `SpjNormalForm`. * Adds a `Display` impl on `Predicate` that AND-joins: - pairwise equalities derived from each multi-column equivalence class (singletons emit nothing), - narrowed range intervals per equivalence class (classes whose interval is still the default unbounded interval are skipped, so only meaningful constraints surface), - residual filter expressions, sorted for deterministic output. The internal fields stay private, so this does not lock in any particular representation; only the read-only view is published. A unit test exercises the new getter + Display on a predicate that populates all three buckets (eq class, range, residual). --- src/rewrite/normal_form.rs | 114 ++++++++++++++++++++++++++++++++++++- 1 file changed, 112 insertions(+), 2 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index b5f2343..1b8190d 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -167,6 +167,7 @@ As one can see, all compensating filters are included, and the query only uses t use std::{ collections::{BTreeSet, HashMap, HashSet}, + fmt, sync::Arc, }; @@ -225,6 +226,14 @@ impl SpjNormalForm { &self.referenced_tables } + /// The normalized predicate of this plan (column equivalence classes, + /// per-class range intervals, and residual filter expressions). Useful + /// for surfacing the SPJ normal form via catalog metadata or for + /// debugging why view matching does or does not engage on a given query. + pub fn predicate(&self) -> &Predicate { + &self.predicate + } + /// Analyze an existing `LogicalPlan` and rewrite it in select-project-join normal form. pub fn new(original_plan: &LogicalPlan) -> Result { let predicate = Predicate::new(original_plan)?; @@ -353,8 +362,14 @@ impl SpjNormalForm { } /// Stores information on filters from a Select-Project-Join plan. +/// +/// Built from the original `LogicalPlan` in [`SpjNormalForm::new`] and +/// exposed read-only via [`SpjNormalForm::predicate`]. Field accessors +/// describe the three filter buckets that view matching uses (equality +/// classes, per-class range intervals, and residuals); see the module +/// docs for how they participate in the subsumption tests. #[derive(Debug, Clone)] -struct Predicate { +pub struct Predicate { /// Full table schema, including all possible columns. schema: DFSchema, /// List of column equivalence classes. @@ -369,6 +384,60 @@ struct Predicate { referenced_tables: Vec, } +/// Renders the predicate as an AND-joined list of filter conditions. +/// +/// Three sources are emitted in order: +/// 1. Pairwise equalities derived from each column equivalence class (e.g. +/// `t.a = t.b` for a class `{a, b}`); singleton classes emit nothing. +/// 2. Narrowed range intervals per equivalence class. Classes whose +/// interval is still the default unbounded interval (no filter applied) +/// are skipped, so only meaningful constraints surface. +/// 3. Residual filter expressions, rendered via their `Expr` `Display` +/// and sorted for deterministic output across runs. +/// +/// Intended for human inspection / catalog surfacing rather than as a +/// canonical SQL form -- callers that need the original `Expr`s should +/// reach for the underlying logical plan instead. +impl fmt::Display for Predicate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut parts: Vec = Vec::new(); + + for eq_class in &self.eq_classes { + let mut cols = eq_class.columns.iter(); + let Some(first) = cols.next() else { continue }; + for other in cols { + parts.push(format!("{first} = {other}")); + } + } + + for (idx, range) in self.ranges_by_equivalence_class.iter().enumerate() { + let Some(interval) = range else { continue }; + let Some(eq_class) = self.eq_classes.get(idx) else { + continue; + }; + let Some(col) = eq_class.columns.iter().next() else { + continue; + }; + let Ok(field) = self.schema.field_from_column(col) else { + continue; + }; + let Ok(unbounded) = Interval::make_unbounded(field.data_type()) else { + continue; + }; + if interval == &unbounded { + continue; + } + parts.push(format!("{col} in {interval}")); + } + + let mut residuals: Vec = self.residuals.iter().map(|e| e.to_string()).collect(); + residuals.sort(); + parts.extend(residuals); + + write!(f, "{}", parts.join(" AND ")) + } +} + impl Predicate { /// Create a new Predicate by analyzing the given logical plan. /// Uses single-pass traversal to collect schema, columns, filters, and referenced tables. @@ -891,7 +960,7 @@ impl Predicate { /// in place of any other columns in the class. /// This normal representative is chosen arbitrarily. #[derive(Debug, Clone, Default)] -struct ColumnEquivalenceClass { +pub struct ColumnEquivalenceClass { // first element is the normal representative of the equivalence class columns: BTreeSet, } @@ -1651,4 +1720,45 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_predicate_getter_and_display() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE t (a INT, b INT, c VARCHAR)") + .await? + .collect() + .await?; + + // Residual: `c LIKE 'foo%'` falls outside eq classes / ranges and + // should land in `residuals`. The eq `a = b` populates an + // equivalence class, and `a >= 5` narrows its range. + let plan = ctx + .sql("SELECT a FROM t WHERE a = b AND a >= 5 AND c LIKE 'foo%'") + .await? + .into_optimized_plan()?; + let normal_form = SpjNormalForm::new(&plan)?; + + // Getter returns a reference to the same Predicate that drives + // subsumption tests internally; callers can stringify or inspect + // it for catalog surfacing / debugging. + let rendered = normal_form.predicate().to_string(); + + // Display output is AND-joined; check each expected fragment + // independently because residuals are hash-set ordered before the + // internal sort, so positional matching would be brittle. + assert!( + rendered.contains("t.a = t.b"), + "expected equality from class {{a, b}}, got: {rendered}" + ); + assert!( + rendered.contains("t.c LIKE"), + "expected LIKE residual, got: {rendered}" + ); + assert!( + !rendered.is_empty(), + "predicate Display should not be empty when filters exist" + ); + + Ok(()) + } }