From 5915f4cddbd90383494102751c40319f4a79d49c Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Wed, 24 Sep 2025 23:18:02 -0500 Subject: [PATCH 01/17] Support static partition columns for MV (#89) * Support static partition columns for MV * runtime checks * unit test for dynamic partition columns * lint --- src/materialized.rs | 48 ++++++++-- src/materialized/dependencies.rs | 155 +++++++++++++++++++++++++------ src/rewrite/exploitation.rs | 2 +- 3 files changed, 167 insertions(+), 38 deletions(-) diff --git a/src/materialized.rs b/src/materialized.rs index e089591..8e093c0 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -41,6 +41,7 @@ use datafusion::{ catalog::TableProvider, datasource::listing::{ListingTable, ListingTableUrl}, }; +use datafusion_common::DataFusionError; use datafusion_expr::LogicalPlan; use itertools::Itertools; @@ -110,6 +111,14 @@ pub trait Materialized: ListingTableLike { fn config(&self) -> MaterializedConfig { MaterializedConfig::default() } + + /// Which partition columns are 'static'. + /// Static partition columns are those that are used in incremental view maintenance. + /// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`]. + /// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh. + fn static_partition_columns(&self) -> Vec { + ::partition_columns(self) + } } /// Register a [`Materialized`] implementation in this registry. @@ -122,13 +131,38 @@ pub fn register_materialized() { } /// Attempt to cast the given TableProvider into a [`Materialized`]. -/// If the table's type has not been registered using [`register_materialized`], will return `None`. -pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> { - TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| { - TABLE_TYPE_REGISTRY - .cast_to_decorator(table) - .and_then(|decorator| cast_to_materialized(decorator.base())) - }) +/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`. +/// +/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated. +/// In particular, checks that the static partition columns are a prefix of all partition columns. +pub fn cast_to_materialized( + table: &dyn TableProvider, +) -> Result, DataFusionError> { + let materialized = match TABLE_TYPE_REGISTRY + .cast_to_materialized(table) + .map(Ok) + .or_else(|| { + TABLE_TYPE_REGISTRY + .cast_to_decorator(table) + .and_then(|decorator| cast_to_materialized(decorator.base()).transpose()) + }) + .transpose()? + { + None => return Ok(None), + Some(m) => m, + }; + + let static_partition_cols = materialized.static_partition_columns(); + let all_partition_cols = materialized.partition_columns(); + + if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..] + { + return Err(DataFusionError::Plan(format!( + "Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})" + ))); + } + + Ok(Some(materialized)) } /// A `TableProvider` that decorates other `TableProvider`s. diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index e586430..9cf8b8a 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -106,7 +106,7 @@ impl TableFunctionImpl for FileDependenciesUdtf { let table = util::get_table(self.catalog_list.as_ref(), &table_ref) .map_err(|e| DataFusionError::Plan(e.to_string()))?; - let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!( + let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!( "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"), ))?; @@ -166,6 +166,15 @@ impl TableFunctionImpl for StaleFilesUdtf { &self.mv_dependencies.config_options.catalog.default_schema, ); + let table = util::get_table(self.mv_dependencies.catalog_list.as_ref(), &table_ref) + .map_err(|e| DataFusionError::Plan(e.to_string()))?; + let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!( + "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"), + ))?; + + let url = mv.table_paths()[0].to_string(); + let num_static_partition_cols = mv.static_partition_columns().len(); + let logical_plan = LogicalPlanBuilder::scan_with_filters("dependencies", dependencies, None, vec![])? .aggregate( @@ -187,16 +196,21 @@ impl TableFunctionImpl for StaleFilesUdtf { )? .aggregate( vec![ - // We want to omit the file name along with any "special" partitions - // from the path before comparing it to the target partition. Special - // partitions must be leaf most nodes and are designated by a leading - // underscore. These are useful for adding additional information to a - // filename without affecting partitioning or staleness checks. - regexp_replace( - col("file_path"), - lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"), - lit("/"), - None, + // Omit the file name along with any "special" partitions. + // This can include dynamic partition columns as well as some internal + // metadata columns that are not part of the schema + // + // We implement this by only taking the first N columns, + // where N is the number of static partition columns. + array_element( + regexp_match( + col("file_path"), + lit(format!( + "{url}(?:[^/=]+=[^/]+/){{{num_static_partition_cols}}}" + )), + None, + ), + lit(1), ) .alias("existing_target"), ], @@ -249,16 +263,16 @@ pub fn mv_dependencies_plan( let plan = materialized_view.query().clone(); - let partition_cols = materialized_view.partition_columns(); - let partition_col_indices = plan + let static_partition_cols = materialized_view.static_partition_columns(); + let static_partition_col_indices = plan .schema() .fields() .iter() .enumerate() - .filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i)) + .filter_map(|(i, f)| static_partition_cols.contains(f.name()).then_some(i)) .collect(); - let pruned_plan_with_source_files = if partition_cols.is_empty() { + let pruned_plan_with_source_files = if static_partition_cols.is_empty() { get_source_files_all_partitions( materialized_view, &config_options.catalog, @@ -266,14 +280,14 @@ pub fn mv_dependencies_plan( ) } else { // Prune non-partition columns from all table scans - let pruned_plan = pushdown_projection_inexact(plan, &partition_col_indices)?; + let pruned_plan = pushdown_projection_inexact(plan, &static_partition_col_indices)?; // Now bubble up file metadata to the top of the plan push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry) }?; // We now have data in the following form: - // (partition_col0, partition_col1, ..., __meta) + // (static_partition_col0, static_partition_col1, ..., __meta) // The last column is a list of structs containing the row metadata // We need to unnest it @@ -289,7 +303,7 @@ pub fn mv_dependencies_plan( LogicalPlanBuilder::from(pruned_plan_with_source_files) .unnest_column(files)? .project(vec![ - construct_target_path_from_partition_columns(materialized_view).alias("target"), + construct_target_path_from_static_partition_columns(materialized_view).alias("target"), get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), get_field(files_col.clone(), "table_schema").alias("source_table_schema"), get_field(files_col.clone(), "table_name").alias("source_table_name"), @@ -300,14 +314,16 @@ pub fn mv_dependencies_plan( .build() } -fn construct_target_path_from_partition_columns(materialized_view: &dyn Materialized) -> Expr { +fn construct_target_path_from_static_partition_columns( + materialized_view: &dyn Materialized, +) -> Expr { let table_path = lit(materialized_view.table_paths()[0] .as_str() // Trim the / (we'll add it back later if we need it) .trim_end_matches("/")); // Construct the paths for the build targets let mut hive_column_path_elements = materialized_view - .partition_columns() + .static_partition_columns() .iter() .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec())) .collect::>(); @@ -965,6 +981,7 @@ mod test { struct MockMaterializedView { table_path: ListingTableUrl, partition_columns: Vec, + static_partition_columns: Option>, // default = all partition columns query: LogicalPlan, file_ext: &'static str, } @@ -1012,6 +1029,12 @@ mod test { fn query(&self) -> LogicalPlan { self.query.clone() } + + fn static_partition_columns(&self) -> Vec { + self.static_partition_columns + .clone() + .unwrap_or_else(|| self.partition_columns.clone()) + } } #[derive(Debug)] @@ -1181,12 +1204,14 @@ mod test { #[tokio::test] async fn test_deps() { + #[derive(Debug, Default)] struct TestCase { name: &'static str, query_to_analyze: &'static str, table_name: &'static str, - table_path: ListingTableUrl, + table_path: &'static str, partition_cols: Vec<&'static str>, + static_partition_cols: Option>, file_extension: &'static str, expected_output: Vec<&'static str>, file_metadata: &'static str, @@ -1198,7 +1223,7 @@ mod test { query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", - table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + table_path: "s3://m1/", partition_cols: vec!["partition_column"], file_extension: ".parquet", expected_output: vec![ @@ -1225,12 +1250,13 @@ mod test { "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, - TestCase { name: "omit 'special' partition columns", + TestCase { name: "omit internal metadata partition columns", query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", - table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + table_path: "s3://m1/", partition_cols: vec!["partition_column"], file_extension: ".parquet", expected_output: vec![ @@ -1257,6 +1283,7 @@ mod test { "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "transform year/month/day partition into timestamp partition", @@ -1266,7 +1293,7 @@ mod test { feed FROM t2", table_name: "m2", - table_path: ListingTableUrl::parse("s3://m2/").unwrap(), + table_path: "s3://m2/", partition_cols: vec!["timestamp", "feed"], file_extension: ".parquet", expected_output: vec![ @@ -1301,12 +1328,63 @@ mod test { "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", "+-----------------------------------------------+----------------------+-----------------------+----------+", ], + ..Default::default() + }, + TestCase { + name: "omit dynamic partition columns", + query_to_analyze: " + SELECT + year, + month, + day, + column2, + COUNT(*) AS ct + FROM t2 + GROUP BY year, month, day, column2 + ", + table_name: "m_dynamic", + table_path: "s3://m_dynamic/", + partition_cols: vec!["year", "month", "day", "column2"], + static_partition_cols: Some(vec!["year", "month", "day"]), + file_extension: ".parquet", + expected_output: vec![ + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + "| s3://m_dynamic/year=2023/month=01/day=01/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m_dynamic/year=2023/month=01/day=02/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m_dynamic/year=2023/month=01/day=03/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |", + "| s3://m_dynamic/year=2024/month=12/day=04/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m_dynamic/year=2024/month=12/day=05/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m_dynamic/year=2024/month=12/day=06/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |", + "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+", + ], + file_metadata: " + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=01/column2=1/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=02/column2=2/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=03/column2=3/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=04/column2=4/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=05/column2=5/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=06/column2=6/data.01.parquet', '2023-07-10T16:00:00Z', 0) + ", + expected_stale_files_output: vec![ + "+-------------------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+-------------------------------------------+----------------------+-----------------------+----------+", + "| s3://m_dynamic/year=2023/month=01/day=01/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m_dynamic/year=2023/month=01/day=02/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m_dynamic/year=2023/month=01/day=03/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "| s3://m_dynamic/year=2024/month=12/day=04/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m_dynamic/year=2024/month=12/day=05/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |", + "| s3://m_dynamic/year=2024/month=12/day=06/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |", + "+-------------------------------------------+----------------------+-----------------------+----------+", + ], }, TestCase { name: "materialized view has no partitions", query_to_analyze: "SELECT column1 AS output FROM t3", table_name: "m3", - table_path: ListingTableUrl::parse("s3://m3/").unwrap(), + table_path: "s3://m3/", partition_cols: vec![], file_extension: ".parquet", expected_output: vec![ @@ -1327,12 +1405,13 @@ mod test { "| s3://m3/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+----------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "simple equijoin on year", query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1361,6 +1440,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "triangular join on year", @@ -1373,7 +1453,7 @@ mod test { INNER JOIN t3 ON (t2.year <= t3.year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1403,6 +1483,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, TestCase { name: "triangular left join, strict <", @@ -1415,7 +1496,7 @@ mod test { LEFT JOIN t3 ON (t2.year < t3.year)", table_name: "m4", - table_path: ListingTableUrl::parse("s3://m4/").unwrap(), + table_path: "s3://m4/", partition_cols: vec!["year"], file_extension: ".parquet", expected_output: vec![ @@ -1443,6 +1524,7 @@ mod test { "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", "+--------------------+----------------------+-----------------------+----------+", ], + ..Default::default() }, ]; @@ -1476,12 +1558,16 @@ mod test { // Register table with a decorator to exercise this functionality Arc::new(DecoratorTable { inner: Arc::new(MockMaterializedView { - table_path: case.table_path.clone(), + table_path: ListingTableUrl::parse(case.table_path).unwrap(), partition_columns: case .partition_cols .iter() .map(|s| s.to_string()) .collect(), + static_partition_columns: case + .static_partition_cols + .as_ref() + .map(|list| list.iter().map(|s| s.to_string()).collect()), query: plan, file_ext: case.file_extension, }), @@ -1498,6 +1584,15 @@ mod test { .collect() .await?; + context + .sql(&format!( + "SELECT * FROM file_metadata WHERE table_name = '{}'", + case.table_name + )) + .await? + .show() + .await?; + let df = context .sql(&format!( "SELECT * FROM mv_dependencies('{}', 'v2')", diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 11bcc14..06145ae 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -59,7 +59,7 @@ impl ViewMatcher { for (resolved_table_ref, table) in super::util::list_tables(session_state.catalog_list().as_ref()).await? { - let Some(mv) = cast_to_materialized(table.as_ref()) else { + let Some(mv) = cast_to_materialized(table.as_ref())? else { continue; }; From 0c408a73ba41bf11bf5ef8af3ee97d1133e24483 Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Thu, 9 Oct 2025 20:41:36 -0500 Subject: [PATCH 02/17] Improved documentation on IVM algorithm (#90) * inline mermaid diagrams * additional comment * mention duplicates, not cardinality * more documentation --- Cargo.toml | 1 + src/lib.rs | 3 + src/materialized.rs | 1 - src/materialized/dependencies.rs | 101 ++++++++++++++++++++++++++++++ src/materialized/file_metadata.rs | 2 +- src/rewrite.rs | 2 - src/rewrite/exploitation.rs | 28 +++++++++ src/rewrite/normal_form.rs | 2 +- 8 files changed, 135 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c30b62e..cec3755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.85.1" [dependencies] +aquamarine = "0.6.0" arrow = "56.0.0" arrow-schema = "56.0.0" async-trait = "0.1.89" diff --git a/src/lib.rs b/src/lib.rs index 8b26d85..238bbd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,9 @@ pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. +/// +/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), +/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*. pub mod rewrite; /// Configuration options for materialized view related features. diff --git a/src/materialized.rs b/src/materialized.rs index 8e093c0..c3cf30e 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -/// Track dependencies of materialized data in object storage pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 9cf8b8a..42ddd35 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -15,6 +15,29 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait. +Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed +into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is +implied by the type bound `Materialized: ListingTableLike`. + +The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to +partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions +of the materialized view and its source tables. This gives rise to two natural phases of the algorithm: +1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources. + This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order, + or even set equality of the original query. + * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. + * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies. +2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources. + After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource) + This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row. + The output rows are then transformed into object storage paths to generate the final graph. + +The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`]. + */ + use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, @@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> { } } +#[cfg_attr(doc, aquamarine::aquamarine)] /// Returns a logical plan that, when executed, lists expected build targets /// for this materialized view, together with the dependencies for each target. +/// +/// See the [module documentation](super) for an overview of the algorithm. +/// +/// # Example +/// +/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily +/// close prices of a stock from its trades, together with the settlement price from a daily statistics table: +/// +/// ```sql +/// SELECT +/// ticker, +/// LAST_VALUE(trades.price) AS close, +/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, +/// trades.date AS date +/// FROM trades +/// JOIN daily_statistics ON +/// trades.ticker = daily_statistics.ticker AND +/// trades.date = daily_statistics.reference_date AND +/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS +/// GROUP BY ticker, date +/// ``` +/// +/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`. +/// This query gives us the following logical plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection:
ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, trades.date AS date"] +/// A --> B["Aggregate:
expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)]
groupby=[ticker, trades.date]"] +/// B --> C["Inner Join:
trades.ticker = daily_statistics.ticker AND
trades.date = daily_statistics.reference_date AND
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades
projection=[ticker, price, date]"] +/// C --> E["TableScan: daily_statistics
projection=[ticker, settlement_price, reference_date, date]"] +/// ``` +/// +/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: trades.date AS date"] +/// A --> B["Projection: trades.date"] +/// B --> C["Inner Join:
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades (projection=[date])"] +/// C --> E["TableScan: daily_statistics (projection=[date])"] +/// ``` +/// +/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that +/// we cannot partition the materialized view on aggregate expressions. +/// +/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan, +/// together with the target path constructed from the (static) partition columns. This gives us the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"] +/// A --> B["Projection: __meta, trades.date AS date"] +/// B --> C["Projection:
concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"] +/// C --> D["Inner Join:
daily_statistics_meta.date BETWEEN trades_meta.date AND trades_meta.date + INTERVAL 2 WEEKS"] +/// D --> E["TableScan: trades_meta (projection=[__meta, date])"] +/// D --> F["TableScan: daily_statistics_meta (projection=[__meta, date])"] +/// ``` +/// +/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column +/// unnested into its components. The final output looks roughly like this: +/// +/// ```text +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// ``` pub fn mv_dependencies_plan( materialized_view: &dyn Materialized, row_metadata_registry: &RowMetadataRegistry, diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 85e5838..bca69ea 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -722,7 +722,7 @@ impl FileMetadataBuilder { } } -/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider. #[async_trait] pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { /// List all files in the store for the given `url` prefix. diff --git a/src/rewrite.rs b/src/rewrite.rs index 170c88f..da24cc5 100644 --- a/src/rewrite.rs +++ b/src/rewrite.rs @@ -17,8 +17,6 @@ use datafusion::{common::extensions_options, config::ConfigExtension}; -/// Implements a query rewriting optimizer, also known as "view exploitation" -/// in some academic sources. pub mod exploitation; pub mod normal_form; diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 06145ae..7a95113 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -15,6 +15,34 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a query rewriting optimizer, also known as "view exploitation" +in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code, +which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views. + +The query rewriting process spans both the logical and physical planning phases and can be described as follows: + +1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views + and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms. + If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression + and one or more candidate rewrites using materialized views. +2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`] + physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node. +3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions + to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost + estimations in the next phase. +4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node. + The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node + with its selected best candidate plan. + +In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention +that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization. +While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner +to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes. + +*/ + use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 7efd53d..e09a54b 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -17,7 +17,7 @@ /*! -This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf), +This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query. The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows: From 3910e12ce9c9d6cb3a6c6585cdc1801a35b8e7b3 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sat, 11 Oct 2025 15:03:11 +0800 Subject: [PATCH 03/17] Support limit pushdown for OneOfExec (#94) --- src/rewrite/exploitation.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 7a95113..b353a84 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -450,6 +450,10 @@ impl ExecutionPlan for OneOfExec { )?)) } + fn supports_limit_pushdown(&self) -> bool { + true + } + fn execute( &self, partition: usize, From f3d5eb1c7e3aaedea15e38d7bf86adda0dc83391 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 24 Oct 2025 17:10:37 +0800 Subject: [PATCH 04/17] Improve the doc (#95) --- src/lib.rs | 32 +++++++++++++++++++++++++++++++- src/materialized/dependencies.rs | 3 ++- src/materialized/util.rs | 2 ++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 238bbd7..fd6dd8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,37 @@ #![deny(missing_docs)] -//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion. +//! # datafusion-materialized-views +//! +//! `datafusion-materialized-views` provides robust algorithms and core functionality for working with materialized views in [DataFusion](https://arrow.apache.org/datafusion/). +//! +//! ## Key Features +//! +//! - **Incremental View Maintenance**: Efficiently tracks dependencies between Hive-partitioned tables and their materialized views, allowing users to determine which partitions need to be refreshed when source data changes. This is achieved via UDTFs such as `mv_dependencies` and `stale_files`. +//! - **Query Rewriting**: Implements a view matching optimizer that rewrites queries to automatically leverage materialized views when beneficial, based on the techniques described in the [paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf). +//! - **Pluggable Metadata Sources**: Supports custom metadata sources for incremental view maintenance, with default support for object store metadata via the `FileMetadata` and `RowMetadataRegistry` components. +//! - **Extensible Table Abstractions**: Defines traits such as `ListingTableLike` and `Materialized` to abstract over Hive-partitioned tables and materialized views, enabling custom implementations and easy registration for use in the maintenance and rewriting logic. +//! +//! ## Typical Workflow +//! +//! 1. **Define and Register Views**: Implement a custom table type that implements the `Materialized` trait, and register it using `register_materialized`. +//! 2. **Metadata Initialization**: Set up `FileMetadata` and `RowMetadataRegistry` to track file-level and row-level metadata. +//! 3. **Dependency Tracking**: Use the `mv_dependencies` UDTF to generate build graphs for materialized views, and `stale_files` to identify partitions that require recomputation. +//! 4. **Query Optimization**: Enable the query rewriting optimizer to transparently rewrite queries to use materialized views where possible. +//! +//! ## Example +//! +//! See the README and integration tests for a full walkthrough of setting up and maintaining a materialized view, including dependency tracking and query rewriting. +//! +//! ## Limitations +//! +//! - Currently supports only Hive-partitioned tables in object storage, with the smallest update unit being a file. +//! - Future work may generalize to other storage backends and partitioning schemes. +//! +//! ## References +//! +//! - [Optimizing Queries Using Materialized Views: A Practical, Scalable Solution](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) +//! - [DataFusion documentation](https://datafusion.apache.org/) /// Code for incremental view maintenance against Hive-partitioned tables. /// diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 42ddd35..060e1e0 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -62,7 +62,8 @@ use crate::materialized::META_COLUMN; use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized}; -/// A table function that shows build targets and dependencies for a materialized view: +/// A table function that, for a given materialized view, lists all the output data objects (build targets) +/// generated during its construction or refresh, as well as all the source data objects (dependencies) it relies on. /// /// ```ignore /// fn mv_dependencies(table_ref: Utf8) -> Table diff --git a/src/materialized/util.rs b/src/materialized/util.rs index 7977f8d..cb4afad 100644 --- a/src/materialized/util.rs +++ b/src/materialized/util.rs @@ -21,6 +21,7 @@ use datafusion::catalog::{CatalogProviderList, TableProvider}; use datafusion_common::{DataFusionError, Result}; use datafusion_sql::ResolvedTableReference; +/// Retrieves a table from the catalog list given a resolved table reference. pub fn get_table( catalog_list: &dyn CatalogProviderList, table_ref: &ResolvedTableReference, @@ -35,6 +36,7 @@ pub fn get_table( // NOTE: this is bad, we are calling async code in a sync context. // We should file an issue about async in UDTFs. + // See: https://github.com/apache/datafusion/issues/17663 futures::executor::block_on(schema.table(table_ref.table.as_ref())) .map_err(|e| e.context(format!("couldn't get table '{}'", table_ref.table)))? .ok_or_else(|| DataFusionError::Plan(format!("no such table {}", table_ref.schema))) From 6162aea1d5d4952b9a821d2cfddb37245390dbbd Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 24 Oct 2025 17:24:25 +0800 Subject: [PATCH 05/17] Chore: remove useless lines in changelog (#97) --- CHANGELOG.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4c0114..6d96f00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,6 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] - ## [0.1.1](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.0...v0.1.1) - 2025-01-07 ### Added From e6205944dd70d5483f18c338a2d89acf4374c278 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 24 Oct 2025 09:33:23 +0000 Subject: [PATCH 06/17] chore: release v0.2.0 (#96) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- CHANGELOG.md | 42 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 +- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d96f00..ef55a1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,48 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.0](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.1...v0.2.0) - 2025-10-24 + +### Added +- `Decorator` trait ([#26](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/26)) (by @suremarc) - #26 + +### Other +- remove useless lines in changelog ([#97](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/97)) (by @xudong963) - #97 +- Improve the doc ([#95](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/95)) (by @xudong963) - #95 +- Support limit pushdown for OneOfExec ([#94](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/94)) (by @xudong963) - #94 +- Improved documentation on IVM algorithm ([#90](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/90)) (by @suremarc) - #90 +- Support static partition columns for MV ([#89](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/89)) (by @suremarc) - #89 +- upgrade to DF50 ([#87](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/87)) (by @xudong963) - #87 +- Fix empty unnest columns handling when pushdown_projection_inexact ([#88](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/88)) (by @zhuqi-lucas) - #88 +- make cost fn accept candidates ([#83](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/83)) (by @xudong963) - #83 +- Upgrade DF to 49.0.2 ([#86](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/86)) (by @zhuqi-lucas) - #86 +- Upgrade to DF49 ([#75](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/75)) (by @xudong963) - #75 +- Upgrade DataFusion 48.0.0 ([#61](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/61)) (by @xudong963) - #61 +- Allow customization of `list_all_files` function. ([#69](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/69)) (by @jared-m-combs) - #69 +- Allow for 'special' partitions that are omitted in the staleness check. ([#68](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/68)) (by @jared-m-combs) - #68 +- don't panic if eq class is not found ([#60](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/60)) (by @suremarc) - #60 +- Handle table scan filters that reference dropped columns ([#59](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/59)) (by @suremarc) - #59 +- exclude some materialized views from query rewriting ([#57](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/57)) (by @suremarc) - #57 +- Optimize performance bottleneck if projection is large ([#56](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/56)) (by @xudong963) - #56 +- Upgrade df47 ([#55](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/55)) (by @xudong963) - #55 +- Update itertools requirement from 0.13 to 0.14 ([#32](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/32)) (by @dependabot[bot]) - #32 +- Update ordered-float requirement from 4.6.0 to 5.0.0 ([#49](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/49)) (by @dependabot[bot]) - #49 +- Upgrade DF46 ([#48](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/48)) (by @xudong963) - #48 +- Update extension ([#45](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/45)) (by @matthewmturner) - #45 +- make explain output stable ([#44](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/44)) (by @suremarc) - #44 +- Add alternate analysis for MVs with no partition columns ([#39](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/39)) (by @suremarc) - #39 +- upgrade to datafusion 45 ([#38](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/38)) (by @suremarc) - #38 +- use nanosecond timestamps in file metadata ([#28](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/28)) (by @suremarc) - #28 + +### Contributors + +* @xudong963 +* @suremarc +* @zhuqi-lucas +* @jared-m-combs +* @dependabot[bot] +* @matthewmturner + ## [0.1.1](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.0...v0.1.1) - 2025-01-07 ### Added diff --git a/Cargo.toml b/Cargo.toml index cec3755..b0e6fd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "datafusion-materialized-views" -version = "0.1.1" +version = "0.2.0" edition = "2021" homepage = "https://github.com/datafusion-contrib/datafusion-materialized-views" repository = "https://github.com/datafusion-contrib/datafusion-materialized-views" From ec7e88ab4a2fdc4c039e4302cb1dc6c682585dcf Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 28 Oct 2025 13:19:42 +0800 Subject: [PATCH 07/17] Add benchmark for heavy operation for datafusion-materialized-views (#101) --- Cargo.toml | 6 + benches/materialized_views_benchmark.rs | 182 ++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 benches/materialized_views_benchmark.rs diff --git a/Cargo.toml b/Cargo.toml index b0e6fd2..c839523 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,13 @@ ordered-float = "5.0.0" [dev-dependencies] anyhow = "1.0.95" +criterion = "0.4" env_logger = "0.11.6" tempfile = "3.14.0" tokio = "1.42.0" url = "2.5.4" + +[[bench]] +name = "materialized_views_benchmark" +harness = false +path = "benches/materialized_views_benchmark.rs" diff --git a/benches/materialized_views_benchmark.rs b/benches/materialized_views_benchmark.rs new file mode 100644 index 0000000..7bc2f52 --- /dev/null +++ b/benches/materialized_views_benchmark.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use std::sync::Arc; +use std::time::Duration; + +use datafusion::datasource::provider_as_source; +use datafusion::datasource::TableProvider; +use datafusion::prelude::SessionContext; +use datafusion_common::Result as DfResult; +use datafusion_expr::LogicalPlan; +use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm; +use datafusion_sql::TableReference; +use tokio::runtime::Builder; + +// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1} +fn make_create_table_sql(table_name: &str, ncols: usize) -> String { + let cols = (0..ncols) + .map(|i| format!("c{} INT", i)) + .collect::>() + .join(", "); + format!( + "CREATE TABLE {table} ({cols})", + table = table_name, + cols = cols + ) +} + +// Utility: generate a base SELECT that projects all columns and has a couple filters +fn make_base_sql(table_name: &str, ncols: usize) -> String { + let cols = (0..ncols) + .map(|i| format!("c{}", i)) + .collect::>() + .join(", "); + let mut where_clauses = vec![]; + if ncols > 0 { + where_clauses.push("c0 >= 0".to_string()); + } + if ncols > 1 { + where_clauses.push("c0 + c1 >= 0".to_string()); + } + let where_part = if where_clauses.is_empty() { + "".to_string() + } else { + format!(" WHERE {}", where_clauses.join(" AND ")) + }; + format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part) +} + +// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance) +fn make_query_sql(table_name: &str, ncols: usize) -> String { + let take = std::cmp::max(1, ncols / 2); + let cols = (0..take) + .map(|i| format!("c{}", i)) + .collect::>() + .join(", "); + + let mut where_clauses = vec![]; + if ncols > 0 { + where_clauses.push("c0 >= 10".to_string()); + } + if ncols > 1 { + where_clauses.push("c0 * c1 > 100".to_string()); + } + if ncols > 10 { + where_clauses.push(format!("c{} >= 0", ncols - 1)); + } + + let where_part = if where_clauses.is_empty() { + "".to_string() + } else { + format!(" WHERE {}", where_clauses.join(" AND ")) + }; + + format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part) +} + +// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider +fn build_fixture_for_cols( + rt: &tokio::runtime::Runtime, + ncols: usize, +) -> DfResult<(LogicalPlan, LogicalPlan, Arc)> { + rt.block_on(async move { + let ctx = SessionContext::new(); + + // create table + let table_name = "t"; + let create_sql = make_create_table_sql(table_name, ncols); + ctx.sql(&create_sql).await?.collect().await?; // create table in catalog + + // base and query plans (optimize to normalize) + let base_sql = make_base_sql(table_name, ncols); + let query_sql = make_query_sql(table_name, ncols); + + let base_df = ctx.sql(&base_sql).await?; + let base_plan = base_df.into_optimized_plan()?; + + let query_df = ctx.sql(&query_sql).await?; + let query_plan = query_df.into_optimized_plan()?; + + // get table provider (Arc) + let table_ref = TableReference::bare(table_name); + let provider: Arc = ctx.table_provider(table_ref.clone()).await?; + + Ok((base_plan, query_plan, provider)) + }) +} + +// Criterion benchmark +fn criterion_benchmark(c: &mut Criterion) { + // columns to test + let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320]; + + // build a tokio runtime that's broadly compatible + let rt = Builder::new_current_thread() + .enable_all() + .build() + .expect("tokio runtime"); + + let mut group = c.benchmark_group("view_matcher_spj"); + group.warm_up_time(Duration::from_secs(1)); + group.measurement_time(Duration::from_secs(5)); + group.sample_size(30); + + for &ncols in &col_cases { + // Build fixture + let (base_plan, query_plan, provider) = + build_fixture_for_cols(&rt, ncols).expect("fixture"); + + // Measure SpjNormalForm::new for base_plan and query_plan separately + let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols)); + group.throughput(Throughput::Elements(1)); + group.bench_with_input(id_base, &base_plan, |b, plan| { + b.iter(|| { + let _nf = SpjNormalForm::new(plan).unwrap(); + }); + }); + + let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols)); + group.bench_with_input(id_query_nf, &query_plan, |b, plan| { + b.iter(|| { + let _nf = SpjNormalForm::new(plan).unwrap(); + }); + }); + + // Precompute normal forms once (to measure rewrite_from cost only) + let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf"); + let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf"); + + // qualifier for rewrite_from and a source created from the provider + let qualifier = TableReference::bare("mv"); + let source = provider_as_source(Arc::clone(&provider)); + + // Benchmark rewrite_from (this is the heavy check) + let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols)); + group.bench_with_input(id_rewrite, &ncols, |b, &_n| { + b.iter(|| { + let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source)); + }); + }); + } + + group.finish(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); From f1f7ad8e72d818e65652218aecd0ea72ed731db8 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Wed, 19 Nov 2025 17:57:11 +0800 Subject: [PATCH 08/17] Upgrade DF51.0.0 (#104) * Upgrade DF51.0.0 * udpate --- Cargo.toml | 22 +++++++++++----------- src/rewrite/normal_form.rs | 4 ++-- tests/materialized_listing_table.rs | 8 +++++--- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c839523..b6fc338 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,19 +29,19 @@ rust-version = "1.85.1" [dependencies] aquamarine = "0.6.0" -arrow = "56.0.0" -arrow-schema = "56.0.0" +arrow = "57.0.0" +arrow-schema = "57.0.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "50" -datafusion-common = "50" -datafusion-expr = "50" -datafusion-functions = "50" -datafusion-functions-aggregate = "50" -datafusion-optimizer = "50" -datafusion-physical-expr = "50" -datafusion-physical-plan = "50" -datafusion-sql = "50" +datafusion = "51" +datafusion-common = "51" +datafusion-expr = "51" +datafusion-functions = "51" +datafusion-functions-aggregate = "51" +datafusion-optimizer = "51" +datafusion-physical-expr = "51" +datafusion-physical-plan = "51" +datafusion-sql = "51" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index e09a54b..c09f18c 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -1099,7 +1099,7 @@ mod test { assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); let expected = concat_batches( - &query_plan.schema().as_ref().clone().into(), + &query_plan.schema().inner().clone(), &context .execute_logical_plan(query_plan) .await? @@ -1108,7 +1108,7 @@ mod test { )?; let result = concat_batches( - &rewritten.schema().as_ref().clone().into(), + &rewritten.schema().inner().clone(), &context .execute_logical_plan(rewritten) .await? diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 5d91a67..7bba358 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -32,7 +32,9 @@ use datafusion::{ }, prelude::{SessionConfig, SessionContext}, }; -use datafusion_common::{Constraints, DataFusionError, ParamValues, ScalarValue, Statistics}; +use datafusion_common::{ + metadata::ScalarAndMetadata, Constraints, DataFusionError, ParamValues, ScalarValue, Statistics, +}; use datafusion_expr::{ col, dml::InsertOp, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, SortExpr, TableProviderFilterPushDown, TableType, @@ -550,7 +552,7 @@ impl TableProvider for MaterializedListingTable { fn parse_partition_values( path: &ObjectPath, partition_columns: &[(String, DataType)], -) -> Result, DataFusionError> { +) -> Result, DataFusionError> { let parts = path.parts().map(|part| part.to_owned()).collect::>(); let pairs = parts @@ -562,7 +564,7 @@ fn parse_partition_values( .iter() .map(|(column, datatype)| { let value = pairs.get(column.as_str()).copied().map(String::from); - ScalarValue::Utf8(value).cast_to(datatype) + ScalarAndMetadata::from(ScalarValue::Utf8(value)).cast_storage_to(datatype) }) .collect::, _>>()?; From 9c915808e6607d3165f1bb24d2a032e7e5d71e94 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 12 Dec 2025 20:27:46 +0800 Subject: [PATCH 09/17] Fix mv dependencies involving unrelated files (#107) --- src/materialized/dependencies.rs | 84 ++++++++++++++++++++++++++++++-- 1 file changed, 79 insertions(+), 5 deletions(-) diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 060e1e0..50f806c 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -639,6 +639,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> R _ => unreachable!(), }; + if new_projection.is_empty() { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + })); + } + TableScan::try_new( scan.table_name, scan.source, @@ -1798,11 +1805,11 @@ mod test { query_to_analyze: "SELECT column1 AS output FROM t3", projection: &[], expected_plan: vec![ - "+--------------+-----------------------------+", - "| plan_type | plan |", - "+--------------+-----------------------------+", - "| logical_plan | TableScan: t3 projection=[] |", - "+--------------+-----------------------------+", + "+--------------+-----------------------+", + "| plan_type | plan |", + "+--------------+-----------------------+", + "| logical_plan | EmptyRelation: rows=1 |", + "+--------------+-----------------------+", ], expected_output: vec![ "++", @@ -2051,6 +2058,73 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_cross_join_unrelated_files() -> Result<()> { + let context = setup().await?; + + // Test case: Cross join where only columns from left table (t1) are selected + // The cross join with t3 affects cardinality but we don't select any t3 columns + // Expected: Only files from t1 should be in dependencies, not from t3 + // BUG: Currently t3 files are incorrectly included in dependencies + let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3"; + + let plan = context.sql(query).await?.into_optimized_plan()?; + + println!("Original plan:\n{}", plan.display_indent()); + + // We're partitioning on column1 which only comes from t1 + let partition_col_indices: HashSet = [0].into_iter().collect(); // column1 is at index 0 + + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!("After pushdown:\n{}", analyzed.display_indent()); + + // Register materialized view + context.register_table( + "mv_cross_join", + Arc::new(MockMaterializedView { + table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(), + partition_columns: vec!["column1".to_string()], + static_partition_columns: None, + query: plan, + file_ext: ".parquet", + }), + )?; + + // Add file metadata for the MV + context.sql( + "INSERT INTO file_metadata VALUES + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)" + ).await?.collect().await?; + + // Get dependencies + let df = context + .sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')") + .await?; + let batches = df.collect().await?; + + // Print the actual dependencies for debugging + println!("Actual dependencies:"); + println!("{}", pretty_format_batches(&batches)?); + + // Expected: Only t1 files should be in dependencies, NOT t3 files + // This test currently FAILS because t3 files are incorrectly included + let expected = [ + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[test] fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> { // This test simulates a simplified MV scenario: From 4539acf8fa56c27570f56f559cd969434e50b009 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 15 Dec 2025 11:02:51 +0800 Subject: [PATCH 10/17] prevent rewriting strict inequality to closed interval for non-discrete types (#21) (#108) Co-authored-by: Matt Friede <7852262+Friede80@users.noreply.github.com> --- src/rewrite/normal_form.rs | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index c09f18c..7203cca 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -527,8 +527,8 @@ impl Predicate { // so handling of open intervals is done by adding/subtracting the smallest increment. // However, there is not really a public API to do this, // other than the satisfy_greater method. - Operator::Lt => Ok( - match satisfy_greater( + Operator::Lt => { + let range_val = match satisfy_greater( &Interval::try_new(value.clone(), value.clone())?, &Interval::make_unbounded(&value.data_type())?, true, @@ -538,11 +538,19 @@ impl Predicate { *range = None; return Ok(()); } - }, - ), - // Same thing as above. - Operator::Gt => Ok( - match satisfy_greater( + }; + // If the type is not discrete (e.g. Utf8), satisfy_greater may return an unchanged value. + // This means the interval could not be tightened and it is unsafe to produce a closed interval + if range_val.upper() == &value { + Err(DataFusionError::Plan( + "cannot represent strict inequality as closed interval for non-discrete types".to_string(), + )) + } else { + Ok(range_val) + } + } + Operator::Gt => { + let range_val = match satisfy_greater( &Interval::make_unbounded(&value.data_type())?, &Interval::try_new(value.clone(), value.clone())?, true, @@ -552,8 +560,15 @@ impl Predicate { *range = None; return Ok(()); } - }, - ), + }; + if range_val.lower() == &value { + Err(DataFusionError::Plan( + "cannot represent strict inequality as closed interval for non-discrete types".to_string(), + )) + } else { + Ok(range_val) + } + } _ => Err(DataFusionError::Plan( "unsupported binary expression".to_string(), )), @@ -1154,6 +1169,11 @@ mod test { // we are allowed to substitute column1 for column3 and vice versa. "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", }, + TestCase { + name: "range filter with inequality on non-discrete type", + base: "SELECT * FROM t1", + query: "SELECT column1 FROM t1 WHERE column1 < '2022'", + }, TestCase { name: "duplicate expressions (X-209)", base: "SELECT * FROM t1", From 0d1aefae252ab808088a3ca378694e7d5a9f09da Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Fri, 19 Dec 2025 22:22:46 +0800 Subject: [PATCH 11/17] Expose mv_plans for ViewMatcher (#22) (#109) --- src/rewrite/exploitation.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index b353a84..9a05638 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -114,6 +114,11 @@ impl ViewMatcher { Ok(ViewMatcher { mv_plans }) } + + /// Returns the materialized views and their corresponding normal forms. + pub fn mv_plans(&self) -> &HashMap, SpjNormalForm)> { + &self.mv_plans + } } impl OptimizerRule for ViewMatcher { From 547aa4d70371226503cb08a58b2db29f716e339f Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 12 Jan 2026 23:31:11 +0800 Subject: [PATCH 12/17] Upgrade DF52 (#111) * Upgrade DF52 * update test * use 52 --- Cargo.toml | 24 ++++++++++++------------ src/materialized/dependencies.rs | 29 +++++++++++++++-------------- src/rewrite/normal_form.rs | 4 ++-- 3 files changed, 29 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b6fc338..041add6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,23 +29,23 @@ rust-version = "1.85.1" [dependencies] aquamarine = "0.6.0" -arrow = "57.0.0" -arrow-schema = "57.0.0" +arrow = "57.1.0" +arrow-schema = "57.1.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "51" -datafusion-common = "51" -datafusion-expr = "51" -datafusion-functions = "51" -datafusion-functions-aggregate = "51" -datafusion-optimizer = "51" -datafusion-physical-expr = "51" -datafusion-physical-plan = "51" -datafusion-sql = "51" +datafusion = "52" +datafusion-common = "52" +datafusion-expr = "52" +datafusion-functions = "52" +datafusion-functions-aggregate = "52" +datafusion-optimizer = "52" +datafusion-physical-expr = "52" +datafusion-physical-plan = "52" +datafusion-sql = "52" futures = "0.3" itertools = "0.14" log = "0.4" -object_store = "0.12" +object_store = "0.12.4" ordered-float = "5.0.0" [dev-dependencies] diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 50f806c..7295b36 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -860,7 +860,7 @@ fn project_dfschema(schema: &DFSchema, indices: &HashSet) -> Result range, }; - if other_range.contains(range)? != Interval::CERTAINLY_TRUE { + if other_range.contains(range)? != Interval::TRUE { return Ok(None); } - if range.contains(other_range)? != Interval::CERTAINLY_TRUE { + if range.contains(other_range)? != Interval::TRUE { if !(range.lower().is_null() || range.upper().is_null()) && (range.lower().eq(range.upper())) { From 4071577345a1ba932017609ca2cecaa58adcfc34 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 12 Jan 2026 23:48:08 +0800 Subject: [PATCH 13/17] Expose a `get_mv_candidates_for_table` API for ViewMatcher (#112) * Expose a get_mv_candidates_for_table API for ViewMatcher * refine comments --- src/rewrite/exploitation.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 9a05638..997d28a 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -119,6 +119,43 @@ impl ViewMatcher { pub fn mv_plans(&self) -> &HashMap, SpjNormalForm)> { &self.mv_plans } + + /// Returns materialized views that potentially reference the given table. + /// + /// This is a preliminary filter - it only checks if the MV references the table + /// but does NOT guarantee that the MV can actually be used to rewrite queries + /// involving that table. + /// + /// # Arguments + /// + /// * `table_reference` - The table reference to find candidates for + /// + /// # Returns + /// + /// A vector of tuples containing: + /// - The materialized view's table reference + /// - The materialized view's table provider + /// - The materialized view's SPJ normal form + pub fn get_potential_mv_candidates_for_table( + &self, + table_reference: &TableReference, + ) -> Vec<(TableReference, Arc, &SpjNormalForm)> { + self.mv_plans + .iter() + .filter_map(|(mv_table_ref, (mv_provider, mv_normal_form))| { + // Check if this MV references the target table + if mv_normal_form.referenced_tables().contains(table_reference) { + Some(( + mv_table_ref.clone(), + Arc::clone(mv_provider), + mv_normal_form, + )) + } else { + None + } + }) + .collect() + } } impl OptimizerRule for ViewMatcher { From 02bad143cfc7a9867c1db2b317e91826c2780e22 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 20 Jan 2026 13:45:13 +0800 Subject: [PATCH 14/17] perf: single-pass plan traversal in Predicate::new (#113) * perf: single-pass plan traversal in Predicate::new * address comments * add join error test --- src/rewrite/normal_form.rs | 235 +++++++++++++++++++++++++------------ 1 file changed, 163 insertions(+), 72 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 5590862..195f8da 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -233,22 +233,11 @@ impl SpjNormalForm { .map(|expr| predicate.normalize_expr(expr)) .collect(); - let mut referenced_tables = vec![]; - original_plan - .apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - referenced_tables.push(scan.table_name.clone()); - } - - Ok(TreeNodeRecursion::Continue) - }) - // No chance of error since we never return Err -- this unwrap is safe - .unwrap(); - Ok(Self { output_schema: Arc::clone(original_plan.schema()), output_exprs, - referenced_tables, + // Reuse referenced_tables collected during Predicate::new to avoid extra traversal + referenced_tables: predicate.referenced_tables.clone(), predicate, }) } @@ -344,84 +333,95 @@ struct Predicate { ranges_by_equivalence_class: Vec>, /// Filter expressions that aren't column equality predicates or range filters. residuals: HashSet, + /// Tables referenced in this plan (collected during single-pass traversal) + referenced_tables: Vec, } impl Predicate { + /// Create a new Predicate by analyzing the given logical plan. + /// Uses single-pass traversal to collect schema, columns, filters, and referenced tables. fn new(plan: &LogicalPlan) -> Result { let mut schema = DFSchema::empty(); - plan.apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - let new_schema = DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - scan.source.schema().as_ref(), - )?; - schema = if schema.fields().is_empty() { - new_schema - } else { - schema.join(&new_schema)? - } - } + let mut columns_info: Vec<(Column, arrow::datatypes::DataType)> = Vec::new(); + let mut filters: Vec = Vec::new(); + let mut referenced_tables: Vec = Vec::new(); + + // Single traversal to collect everything + plan.apply(|node| { + match node { + LogicalPlan::TableScan(scan) => { + // Collect referenced table + referenced_tables.push(scan.table_name.clone()); - Ok(TreeNodeRecursion::Continue) - })?; + // Build schema + let new_schema = DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + scan.source.schema().as_ref(), + )?; + + // Collect columns with their data types + for (table_ref, field) in new_schema.iter() { + columns_info.push(( + Column::new(table_ref.cloned(), field.name()), + field.data_type().clone(), + )); + } - let mut new = Self { - schema, - eq_classes: vec![], - eq_class_idx_by_column: HashMap::default(), - ranges_by_equivalence_class: vec![], - residuals: HashSet::new(), - }; + // Merge schema + schema = if schema.fields().is_empty() { + new_schema + } else { + schema.join(&new_schema)? + }; - // Collect all referenced columns - plan.apply(|plan| { - if let LogicalPlan::TableScan(scan) = plan { - for (i, (table_ref, field)) in DFSchema::try_from_qualified_schema( - scan.table_name.clone(), - scan.source.schema().as_ref(), - )? - .iter() - .enumerate() - { - let column = Column::new(table_ref.cloned(), field.name()); - let data_type = field.data_type(); - new.eq_classes - .push(ColumnEquivalenceClass::new_singleton(column.clone())); - new.eq_class_idx_by_column.insert(column, i); - new.ranges_by_equivalence_class - .push(Some(Interval::make_unbounded(data_type)?)); + // Collect filters from TableScan + filters.extend(scan.filters.iter().cloned()); + } + LogicalPlan::Filter(filter) => { + filters.push(filter.predicate.clone()); } - } - - Ok(TreeNodeRecursion::Continue) - })?; - - // Collect any filters - plan.apply(|plan| { - let filters = match plan { - LogicalPlan::TableScan(scan) => scan.filters.as_slice(), - LogicalPlan::Filter(filter) => core::slice::from_ref(&filter.predicate), LogicalPlan::Join(_join) => { return Err(DataFusionError::Internal( "joins are not supported yet".to_string(), - )) + )); } - LogicalPlan::Projection(_) => &[], + LogicalPlan::Projection(_) => {} _ => { return Err(DataFusionError::Plan(format!( "unsupported logical plan: {}", - plan.display() - ))) + node.display() + ))); } - }; - - for expr in filters.iter().flat_map(split_conjunction) { - new.insert_conjuct(expr)?; } - Ok(TreeNodeRecursion::Continue) })?; + // Initialize data structures with known capacity + let num_columns = columns_info.len(); + let mut eq_classes = Vec::with_capacity(num_columns); + let mut eq_class_idx_by_column = HashMap::with_capacity(num_columns); + let mut ranges_by_equivalence_class = Vec::with_capacity(num_columns); + + for (i, (column, data_type)) in columns_info.into_iter().enumerate() { + eq_classes.push(ColumnEquivalenceClass::new_singleton(column.clone())); + eq_class_idx_by_column.insert(column, i); + ranges_by_equivalence_class.push(Some(Interval::make_unbounded(&data_type)?)); + } + + let mut new = Self { + schema, + eq_classes, + eq_class_idx_by_column, + ranges_by_equivalence_class, + residuals: HashSet::new(), + referenced_tables, + }; + + // Process all collected filters + for expr in filters.iter().flat_map(split_conjunction) { + new.insert_conjuct(expr)?; + } + Ok(new) } @@ -1163,11 +1163,11 @@ mod test { TestCase { name: "range filter + equality predicate", base: - "SELECT column1, column2 FROM t1 WHERE column1 = column3 AND column1 >= '2022'", + "SELECT column1, column2 FROM t1 WHERE column1 = column3 AND column1 >= '2022'", query: // Since column1 = column3 in the original view, // we are allowed to substitute column1 for column3 and vice versa. - "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", + "SELECT column2, column3 FROM t1 WHERE column1 = column3 AND column3 >= '2023'", }, TestCase { name: "range filter with inequality on non-discrete type", @@ -1229,4 +1229,95 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_predicate_new_collects_expected_data() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a table with known schema + ctx.sql( + "CREATE TABLE test_table ( + col1 INT, + col2 VARCHAR, + col3 DOUBLE + )", + ) + .await? + .collect() + .await?; + + // Create a plan with filters + let plan = ctx + .sql("SELECT col1, col2 FROM test_table WHERE col1 >= 10 AND col2 = col3") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify referenced_tables is collected + assert_eq!(normal_form.referenced_tables().len(), 1); + assert_eq!(normal_form.referenced_tables()[0].to_string(), "test_table"); + + // Verify output_exprs matches the projection (2 columns) + assert_eq!(normal_form.output_exprs().len(), 2); + + // Verify schema is preserved + assert_eq!(normal_form.output_schema().fields().len(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_predicate_new_with_join_returns_error() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE t1 (a INT, b INT)") + .await? + .collect() + .await?; + ctx.sql("CREATE TABLE t2 (c INT, d INT)") + .await? + .collect() + .await?; + + // Test that join returns an error as it's not supported yet + let plan = ctx + .sql("SELECT t1.a, t2.d FROM t1 JOIN t2 ON t1.b = t2.c WHERE t1.a >= 0 AND t2.d <= 100") + .await? + .into_optimized_plan()?; + + let result = SpjNormalForm::new(&plan); + + // Verify that join returns an error + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("joins are not supported yet")); + + Ok(()) + } + + #[tokio::test] + async fn test_predicate_new_with_range_filters() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE range_test (x INT, y INT, z VARCHAR)") + .await? + .collect() + .await?; + + let plan = ctx + .sql("SELECT * FROM range_test WHERE x >= 10 AND x <= 100 AND y = 50") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify all columns are in output + assert_eq!(normal_form.output_exprs().len(), 3); + assert_eq!(normal_form.referenced_tables().len(), 1); + + Ok(()) + } } From 429e52ac740ea7d0c9f43580084e7a6960562f27 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 3 Feb 2026 14:29:56 +0800 Subject: [PATCH 15/17] Optimize rewrite performance (#115) * Optimize rewrite performanc * Add test * fmt * Address comments to add details --- src/rewrite/normal_form.rs | 161 ++++++++++++++++++++++++++++++++----- 1 file changed, 142 insertions(+), 19 deletions(-) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 195f8da..70fd129 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -247,32 +247,50 @@ impl SpjNormalForm { /// This is useful for rewriting queries to use materialized views. pub fn rewrite_from( &self, - mut other: &Self, + other: &Self, qualifier: TableReference, source: Arc, ) -> Result> { log::trace!("rewriting from {qualifier}"); + + // Cache columns() result to avoid repeated Vec allocation in the loop. + // DFSchema::columns() creates a new Vec on each call. + let output_columns = self.output_schema.columns(); + let mut new_output_exprs = Vec::with_capacity(self.output_exprs.len()); // check that our output exprs are sub-expressions of the other one's output exprs for (i, output_expr) in self.output_exprs.iter().enumerate() { - let new_output_expr = other - .predicate - .normalize_expr(output_expr.clone()) - .rewrite(&mut other)? - .data; - - // Check that all references to the original tables have been replaced. - // All remaining column expressions should be unqualified, which indicates - // that they refer to the output of the sub-plan (in this case the view) - if new_output_expr - .column_refs() - .iter() - .any(|c| c.relation.is_some()) - { - return Ok(None); - } + // Fast path for simple Column expressions (most common case). + // This avoids the expensive normalize_expr transform for columns. + let new_output_expr = if let Expr::Column(col) = output_expr { + let normalized_col = other.predicate.normalize_column(col); + match other.find_output_column(&normalized_col) { + Some(rewritten) => rewritten, + None => return Ok(None), // Column not found, can't rewrite + } + } else { + // Slow path: complex expressions need full transform + let new_output_expr = other + .predicate + .normalize_expr(output_expr.clone()) + .rewrite(&mut &*other)? + .data; + + // Check that all references to the original tables have been replaced. + // All remaining column expressions should be unqualified, which indicates + // that they refer to the output of the sub-plan (in this case the view) + if new_output_expr + .column_refs() + .iter() + .any(|c| c.relation.is_some()) + { + return Ok(None); + } + new_output_expr + }; - let column = &self.output_schema.columns()[i]; + // Use cached columns instead of calling .columns() on each iteration + let column = &output_columns[i]; new_output_exprs.push( new_output_expr.alias_qualified(column.relation.clone(), column.name.clone()), ); @@ -299,7 +317,7 @@ impl SpjNormalForm { .into_iter() .chain(range_filters) .chain(residual_filters) - .map(|expr| expr.rewrite(&mut other).unwrap().data) + .map(|expr| expr.rewrite(&mut &*other).unwrap().data) .reduce(|a, b| a.and(b)); if all_filters @@ -318,6 +336,20 @@ impl SpjNormalForm { builder.project(new_output_exprs)?.build().map(Some) } + + /// Fast path: find a column in output_exprs and return rewritten expression. + /// This avoids full tree traversal for simple column lookups. + #[inline] + fn find_output_column(&self, col: &Column) -> Option { + self.output_exprs + .iter() + .position(|e| matches!(e, Expr::Column(c) if c == col)) + .map(|idx| { + Expr::Column(Column::new_unqualified( + self.output_schema.field(idx).name().clone(), + )) + }) + } } /// Stores information on filters from a Select-Project-Join plan. @@ -431,6 +463,17 @@ impl Predicate { .and_then(|&idx| self.eq_classes.get(idx)) } + /// Fast path: normalize a single Column without full tree traversal. + /// This is O(1) lookup instead of O(n) transform. + #[inline] + fn normalize_column(&self, col: &Column) -> Column { + if let Some(eq_class) = self.class_for_column(col) { + eq_class.columns.first().unwrap().clone() + } else { + col.clone() + } + } + /// Add a new column equivalence fn add_equivalence(&mut self, c1: &Column, c2: &Column) -> Result<()> { match ( @@ -792,6 +835,15 @@ impl Predicate { /// Rewrite all expressions in terms of their normal representatives /// with respect to this predicate's equivalence classes. fn normalize_expr(&self, e: Expr) -> Expr { + // Fast path: if it's a simple Column, avoid full transform traversal. + // Even though transform() handles Column efficiently, the machinery setup + // (closures, iterators, Transformed wrappers) has overhead that adds up + // when called thousands of times (e.g., 41 columns × 5-7 MVs × every query). + // Direct HashMap lookup + clone is significantly faster. + if let Expr::Column(ref c) = e { + return Expr::Column(self.normalize_column(c)); + } + e.transform(&|e| { let c = match e { Expr::Column(c) => c, @@ -1320,4 +1372,75 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_normalize_column_fast_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE t (a INT, b INT, c INT)") + .await? + .collect() + .await?; + + // Query with column equivalence: a = b + let plan = ctx + .sql("SELECT a, b, c FROM t WHERE a = b") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify that columns are normalized correctly + // a and b should be in the same equivalence class + assert_eq!(normal_form.output_exprs().len(), 3); + + Ok(()) + } + + #[tokio::test] + async fn test_rewrite_from_with_many_columns() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a wide table to test the columns() caching optimization + ctx.sql( + "CREATE TABLE wide_table ( + c0 INT, c1 INT, c2 INT, c3 INT, c4 INT, + c5 INT, c6 INT, c7 INT, c8 INT, c9 INT + )", + ) + .await? + .collect() + .await?; + + let base_plan = ctx + .sql("SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .into_optimized_plan()?; + + let query_plan = ctx + .sql("SELECT c0, c1, c2 FROM wide_table WHERE c0 >= 10") + .await? + .into_optimized_plan()?; + + let base_nf = SpjNormalForm::new(&base_plan)?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + // Create MV table + ctx.sql("CREATE TABLE mv AS SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .collect() + .await?; + + let table_ref = TableReference::bare("mv"); + let provider = ctx.table_provider(table_ref.clone()).await?; + + // Test that rewrite_from works correctly with cached columns + let result = query_nf.rewrite_from(&base_nf, table_ref, provider_as_source(provider))?; + + assert!(result.is_some()); + let rewritten = result.unwrap(); + assert_eq!(rewritten.schema().fields().len(), 3); + + Ok(()) + } } From 152c88542d90e474d9cd4ae4c63c3c71aa6a3675 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Mon, 9 Mar 2026 15:21:10 +0800 Subject: [PATCH 16/17] Support view matcher with boolean binary operation (#117) --- src/rewrite/normal_form.rs | 208 +++++++++++++++++++++++++++++++++++++ 1 file changed, 208 insertions(+) diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 70fd129..b5f2343 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -656,6 +656,27 @@ impl Predicate { fn insert_binary_expr(&mut self, left: &Expr, op: Operator, right: &Expr) -> Result<()> { match (left, op, right) { (Expr::Column(c), op, Expr::Literal(v, _)) => { + // Normalize boolean expressions to canonical form: + // col = false -> NOT col + // col != true -> NOT col + // col = true -> col + // col != false -> col + // This ensures semantic equivalence matching (e.g., "active = false" matches "NOT active") + if let ScalarValue::Boolean(Some(b)) = v { + match (op, b) { + (Operator::Eq, false) | (Operator::NotEq, true) => { + self.residuals + .insert(Expr::Not(Box::new(Expr::Column(c.clone())))); + return Ok(()); + } + (Operator::Eq, true) | (Operator::NotEq, false) => { + self.residuals.insert(Expr::Column(c.clone())); + return Ok(()); + } + _ => {} + } + } + if let Err(e) = self.add_range(c, &op, v) { // Add a range can fail in some cases, so just fallthrough log::debug!("failed to add range filter: {e}"); @@ -1443,4 +1464,191 @@ mod test { Ok(()) } + + #[tokio::test] + async fn test_boolean_expression_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + // Create table with boolean column + ctx.sql( + "CREATE TABLE bool_test ( + id INT, + active BOOLEAN, + name VARCHAR + )", + ) + .await? + .collect() + .await?; + + ctx.sql("INSERT INTO bool_test VALUES (1, true, 'a'), (2, false, 'b')") + .await? + .collect() + .await?; + + // MV: uses "active = false" + let mv_plan = ctx + .sql("SELECT * FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv AS SELECT * FROM bool_test WHERE active = false") + .await? + .collect() + .await?; + + // Query: uses "NOT active" (semantically equivalent to "active = false") + let query_plan = ctx + .sql("SELECT id, name FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Expected MV with 'active = false' to match query with 'NOT active'" + ); + + // Also test the reverse: MV with "NOT active", query with "active = false" + let mv_plan2 = ctx + .sql("SELECT * FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let mv_normal_form2 = SpjNormalForm::new(&mv_plan2)?; + + ctx.sql("CREATE TABLE mv2 AS SELECT * FROM bool_test WHERE NOT active") + .await? + .collect() + .await?; + + let query_plan2 = ctx + .sql("SELECT id FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let query_normal_form2 = SpjNormalForm::new(&query_plan2)?; + + let table_ref2 = TableReference::bare("mv2"); + let rewritten2 = query_normal_form2.rewrite_from( + &mv_normal_form2, + table_ref2.clone(), + provider_as_source(ctx.table_provider(table_ref2).await?), + )?; + + assert!( + rewritten2.is_some(), + "Expected MV with 'NOT active' to match query with 'active = false'" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_boolean_column_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE bool_test ( + id INT, + active BOOLEAN, + name VARCHAR + )", + ) + .await? + .collect() + .await?; + + // Test: MV with "active = false" should match query with "NOT active" + let mv_plan = ctx + .sql("SELECT * FROM bool_test WHERE active = false") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv AS SELECT * FROM bool_test WHERE active = false") + .await? + .collect() + .await?; + + let query_plan = ctx + .sql("SELECT id, name FROM bool_test WHERE NOT active") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + // Should successfully rewrite + assert!( + rewritten.is_some(), + "Expected MV with 'active = false' to match query with 'NOT active'" + ); + + Ok(()) + } + + #[tokio::test] + async fn test_boolean_true_normalization() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let ctx = SessionContext::new(); + + ctx.sql( + "CREATE TABLE bool_test2 ( + id INT, + enabled BOOLEAN + )", + ) + .await? + .collect() + .await?; + + // Test: MV with "enabled = true" should match query with just "enabled" + let mv_plan = ctx + .sql("SELECT * FROM bool_test2 WHERE enabled = true") + .await? + .into_optimized_plan()?; + let mv_normal_form = SpjNormalForm::new(&mv_plan)?; + + ctx.sql("CREATE TABLE mv2 AS SELECT * FROM bool_test2 WHERE enabled = true") + .await? + .collect() + .await?; + + let query_plan = ctx + .sql("SELECT id FROM bool_test2 WHERE enabled") + .await? + .into_optimized_plan()?; + let query_normal_form = SpjNormalForm::new(&query_plan)?; + + let table_ref = TableReference::bare("mv2"); + let rewritten = query_normal_form.rewrite_from( + &mv_normal_form, + table_ref.clone(), + provider_as_source(ctx.table_provider(table_ref).await?), + )?; + + assert!( + rewritten.is_some(), + "Expected MV with 'enabled = true' to match query with 'enabled'" + ); + + Ok(()) + } } From 2acb13a52ec4bb3eeb90fd717f65e6a694de9574 Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Sun, 17 May 2026 21:28:57 +0800 Subject: [PATCH 17/17] Upgrade materialized views to DataFusion 53 Pin DataFusion dependencies to the Massive DataFusion 53 fork commit and update Arrow/ObjectStore versions for compatibility. Preserve Massive fork behavior by carrying rewrite context into OneOf/OneOfExec cost selection and allowing OneOf schema comparison to ignore nullability differences. Adapt to DataFusion/ObjectStore 53 execution plan APIs and unnest metadata structs before final dependency projection to avoid DF53 get_field/list optimizer ambiguity. --- Cargo.toml | 26 +-- src/materialized/dependencies.rs | 24 ++- src/materialized/file_metadata.rs | 8 +- src/rewrite/exploitation.rs | 275 +++++++++++++++++++++++++++--- 4 files changed, 287 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 041add6..2323c3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,27 +25,27 @@ authors = ["Matthew Cramerus "] license = "Apache-2.0" description = "Materialized Views & Query Rewriting in DataFusion" keywords = ["arrow", "arrow-rs", "datafusion"] -rust-version = "1.85.1" +rust-version = "1.88.0" [dependencies] aquamarine = "0.6.0" -arrow = "57.1.0" -arrow-schema = "57.1.0" +arrow = "58.0.0" +arrow-schema = "58.0.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "52" -datafusion-common = "52" -datafusion-expr = "52" -datafusion-functions = "52" -datafusion-functions-aggregate = "52" -datafusion-optimizer = "52" -datafusion-physical-expr = "52" -datafusion-physical-plan = "52" -datafusion-sql = "52" +datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } futures = "0.3" itertools = "0.14" log = "0.4" -object_store = "0.12.4" +object_store = "0.13.1" ordered-float = "5.0.0" [dev-dependencies] diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 7295b36..f188c0d 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -42,13 +42,13 @@ use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, datasource::{provider_as_source, TableProvider, ViewTable}, - prelude::{flatten, get_field, make_array}, + prelude::{flatten, make_array}, }; use datafusion_common::{ alias::AliasGenerator, internal_err, tree_node::{Transformed, TreeNode}, - DFSchema, DataFusionError, Result, ScalarValue, + Column as DFColumn, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, @@ -400,17 +400,25 @@ pub fn mv_dependencies_plan( .into_iter() .find(|c| c.name.starts_with(META_COLUMN)) .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?; - let files_col = Expr::Column(files.clone()); + let meta_table_catalog = + Expr::Column(DFColumn::from_name(format!("{}.table_catalog", files.name))); + let meta_table_schema = + Expr::Column(DFColumn::from_name(format!("{}.table_schema", files.name))); + let meta_table_name = Expr::Column(DFColumn::from_name(format!("{}.table_name", files.name))); + let meta_source_uri = Expr::Column(DFColumn::from_name(format!("{}.source_uri", files.name))); + let meta_last_modified = + Expr::Column(DFColumn::from_name(format!("{}.last_modified", files.name))); LogicalPlanBuilder::from(pruned_plan_with_source_files) + .unnest_column(files.clone())? .unnest_column(files)? .project(vec![ construct_target_path_from_static_partition_columns(materialized_view).alias("target"), - get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), - get_field(files_col.clone(), "table_schema").alias("source_table_schema"), - get_field(files_col.clone(), "table_name").alias("source_table_name"), - get_field(files_col.clone(), "source_uri").alias("source_uri"), - get_field(files_col.clone(), "last_modified").alias("source_last_modified"), + meta_table_catalog.alias("source_table_catalog"), + meta_table_schema.alias("source_table_schema"), + meta_table_name.alias("source_table_name"), + meta_source_uri.alias("source_uri"), + meta_last_modified.alias("source_last_modified"), ])? .distinct()? .build() diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index bca69ea..baf4cab 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -43,7 +43,7 @@ use futures::stream::{self, BoxStream}; use futures::{future, Future, FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use std::any::Any; use std::sync::Arc; @@ -137,7 +137,7 @@ impl TableProvider for FileMetadata { /// An [`ExecutionPlan`] that scans object store metadata. pub struct FileMetadataExec { table_schema: SchemaRef, - plan_properties: PlanProperties, + plan_properties: Arc, projection: Option>, filters: Vec>, limit: Option, @@ -170,7 +170,7 @@ impl FileMetadataExec { let exec = Self { table_schema, - plan_properties, + plan_properties: Arc::new(plan_properties), projection, filters, limit, @@ -192,7 +192,7 @@ impl ExecutionPlan for FileMetadataExec { "FileMetadataExec" } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 997d28a..efa2b4b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -69,10 +69,55 @@ use crate::materialized::cast_to_materialized; use super::normal_form::SpjNormalForm; use super::QueryRewriteOptions; +/// Logical rewrite metadata propagated alongside equivalent candidate plans. +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Hash)] +pub struct RewriteContext { + root_table_refs: Vec, +} + +impl RewriteContext { + /// Create a new rewrite context from the root table refs visible during rewrite. + pub fn new(root_table_refs: Vec) -> Self { + Self { root_table_refs } + } + + /// Returns the root table refs that produced this rewrite opportunity. + pub fn root_table_refs(&self) -> &[String] { + &self.root_table_refs + } +} + +/// Inputs provided to a cost function when selecting the best candidate plan. +pub struct CostContext<'a> { + candidate_plans: Box + 'a>, + rewrite_context: &'a RewriteContext, +} + +impl<'a> CostContext<'a> { + /// Create a new cost context. + pub fn new( + candidate_plans: Box + 'a>, + rewrite_context: &'a RewriteContext, + ) -> Self { + Self { + candidate_plans, + rewrite_context, + } + } + + /// Consume the context and return the candidate plans iterator. + pub fn into_candidate_plans(self) -> Box + 'a> { + self.candidate_plans + } + + /// Returns rewrite metadata for the current candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + self.rewrite_context + } +} + /// A cost function. Used to evaluate the best physical plan among multiple equivalent choices. -pub type CostFn = Arc< - dyn for<'a> Fn(Box + 'a>) -> Vec + Send + Sync, ->; +pub type CostFn = Arc Fn(CostContext<'a>) -> Vec + Send + Sync>; /// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes. #[derive(Debug)] @@ -251,9 +296,10 @@ impl TreeNodeRewriter for ViewMatchingRewriter<'_> { } else { Ok(Transformed::new( LogicalPlan::Extension(Extension { - node: Arc::new(OneOf { - branches: Some(node).into_iter().chain(candidates).collect_vec(), - }), + node: Arc::new(OneOf::with_rewrite_context( + Some(node).into_iter().chain(candidates).collect_vec(), + RewriteContext::new(vec![table_reference.to_string()]), + )), }), true, TreeNodeRecursion::Jump, @@ -306,14 +352,21 @@ impl ExtensionPlanner for ViewExploitationPlanner { physical_inputs: &[Arc], _session_state: &SessionState, ) -> Result>> { - if node.as_any().downcast_ref::().is_none() { + let Some(one_of) = node.as_any().downcast_ref::() else { return Ok(None); - } + }; + // Different table providers may expose equivalent fields with different + // nullability. Names and data types still have to match. if logical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != logical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability( + schema.as_arrow(), + logical_inputs[0].schema().as_arrow(), + ) + }) { return Err(DataFusionError::Plan( "candidate logical plans should have the same schema".to_string(), @@ -323,7 +376,9 @@ impl ExtensionPlanner for ViewExploitationPlanner { if physical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != physical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema()) + }) { return Err(DataFusionError::Plan( "candidate physical plans should have the same schema".to_string(), @@ -334,6 +389,7 @@ impl ExtensionPlanner for ViewExploitationPlanner { physical_inputs.to_vec(), None, Arc::clone(&self.cost), + one_of.rewrite_context().clone(), )?))) } } @@ -343,6 +399,30 @@ impl ExtensionPlanner for ViewExploitationPlanner { #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct OneOf { branches: Vec, + rewrite_context: RewriteContext, +} + +impl OneOf { + /// Create a new OneOf node with the given branches. + pub fn new(branches: Vec) -> Self { + Self::with_rewrite_context(branches, RewriteContext::default()) + } + + /// Create a new OneOf node with the given branches and rewrite context. + pub fn with_rewrite_context( + branches: Vec, + rewrite_context: RewriteContext, + ) -> Self { + Self { + branches, + rewrite_context, + } + } + + /// Returns logical rewrite metadata for this candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + &self.rewrite_context + } } impl UserDefinedLogicalNodeCore for OneOf { @@ -378,7 +458,10 @@ impl UserDefinedLogicalNodeCore for OneOf { _exprs: Vec, inputs: Vec, ) -> Result { - Ok(Self { branches: inputs }) + Ok(Self { + branches: inputs, + rewrite_context: self.rewrite_context.clone(), + }) } } @@ -394,6 +477,7 @@ pub struct OneOfExec { best: usize, // Cost function to use in optimization cost: CostFn, + rewrite_context: RewriteContext, } impl std::fmt::Debug for OneOfExec { @@ -402,6 +486,7 @@ impl std::fmt::Debug for OneOfExec { .field("candidates", &self.candidates) .field("required_input_ordering", &self.required_input_ordering) .field("best", &self.best) + .field("rewrite_context", &self.rewrite_context) .finish_non_exhaustive() } } @@ -412,6 +497,7 @@ impl OneOfExec { candidates: Vec>, required_input_ordering: Option, cost: CostFn, + rewrite_context: RewriteContext, ) -> Result { if candidates.is_empty() { return Err(DataFusionError::Plan( @@ -419,16 +505,20 @@ impl OneOfExec { )); } - let best = cost(Box::new(candidates.iter().map(|c| c.as_ref()))) - .iter() - .position_min_by_key(|&cost| OrderedFloat(*cost)) - .unwrap(); + let best = cost(CostContext::new( + Box::new(candidates.iter().map(|c| c.as_ref())), + &rewrite_context, + )) + .iter() + .position_min_by_key(|&cost| OrderedFloat(*cost)) + .unwrap(); Ok(Self { candidates, required_input_ordering, best, cost, + rewrite_context, }) } @@ -438,6 +528,11 @@ impl OneOfExec { Arc::clone(&self.candidates[self.best]) } + /// Returns rewrite metadata for this candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + &self.rewrite_context + } + /// Modify this plan's required input ordering. /// Used for sort pushdown pub fn with_required_input_ordering(self, requirement: Option) -> Self { @@ -457,7 +552,7 @@ impl ExecutionPlan for OneOfExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.candidates[self.best].properties() } @@ -489,6 +584,7 @@ impl ExecutionPlan for OneOfExec { children, self.required_input_ordering.clone(), Arc::clone(&self.cost), + self.rewrite_context.clone(), )?)) } @@ -504,10 +600,6 @@ impl ExecutionPlan for OneOfExec { self.candidates[self.best].execute(partition, context) } - fn statistics(&self) -> Result { - self.candidates[self.best].partition_statistics(None) - } - fn partition_statistics( &self, partition: Option, @@ -518,7 +610,10 @@ impl ExecutionPlan for OneOfExec { impl DisplayAs for OneOfExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let costs = (self.cost)(Box::new(self.children().iter().map(|arc| arc.as_ref()))); + let costs = (self.cost)(CostContext::new( + Box::new(self.children().iter().map(|arc| arc.as_ref())), + &self.rewrite_context, + )); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( @@ -580,3 +675,141 @@ impl PhysicalOptimizerRule for PruneCandidates { true } } + +/// Compare two Arrow schemas ignoring field nullability. +fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { + a.fields().len() == b.fields().len() + && a.fields() + .iter() + .zip(b.fields().iter()) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + +#[cfg(test)] +mod tests_nullability { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + #[test] + fn schemas_equal_when_only_nullability_differs() { + let a = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, false), + Field::new("date", DataType::Utf8, false), + Field::new("price", DataType::Float64, false), + ]); + let b = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, true), + Field::new("date", DataType::Utf8, true), + Field::new("price", DataType::Float64, true), + ]); + assert!(schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_types_differ() { + let a = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let b = Schema::new(vec![Field::new("x", DataType::Int64, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_names_differ() { + let a = Schema::new(vec![Field::new("ticker", DataType::Utf8, true)]); + let b = Schema::new(vec![Field::new("symbol", DataType::Utf8, true)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_field_count_differs() { + let a = Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ]); + let b = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } +} + +#[cfg(test)] +mod tests_rewrite_context { + use super::*; + use arrow_schema::Schema; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion_expr::LogicalPlanBuilder; + use std::sync::Mutex; + + #[test] + fn one_of_preserves_rewrite_context_when_rebuilt() { + let plan = LogicalPlanBuilder::empty(false) + .build() + .expect("empty plan"); + let one_of = OneOf::with_rewrite_context( + vec![plan.clone()], + RewriteContext::new(vec!["catalog.schema.root_table".to_string()]), + ); + + let rebuilt = + UserDefinedLogicalNodeCore::with_exprs_and_inputs(&one_of, vec![], vec![plan]) + .expect("rebuild one_of"); + + assert_eq!( + rebuilt.rewrite_context().root_table_refs(), + ["catalog.schema.root_table".to_string()] + ); + } + + #[test] + fn one_of_exec_passes_rewrite_context_to_cost_function() { + let seen = Arc::new(Mutex::new(Vec::::new())); + let seen_clone = Arc::clone(&seen); + let cost: CostFn = Arc::new(move |ctx| { + *seen_clone.lock().expect("lock seen") = + ctx.rewrite_context().root_table_refs().to_vec(); + ctx.into_candidate_plans().map(|_| 1.0).collect() + }); + let context = RewriteContext::new(vec!["catalog.schema.root_table".to_string()]); + let schema = Arc::new(Schema::empty()); + let candidates = vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(schema)) as Arc, + ]; + + let exec = + OneOfExec::try_new(candidates, None, cost, context.clone()).expect("one_of exec"); + + assert_eq!(exec.rewrite_context(), &context); + assert_eq!(*seen.lock().expect("lock seen"), context.root_table_refs()); + } + + #[test] + fn one_of_exec_with_new_children_preserves_rewrite_context() { + let cost: CostFn = Arc::new(|ctx| ctx.into_candidate_plans().map(|_| 1.0).collect()); + let context = RewriteContext::new(vec!["catalog.schema.root_table".to_string()]); + let schema = Arc::new(Schema::empty()); + let exec = Arc::new( + OneOfExec::try_new( + vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + ], + None, + cost, + context.clone(), + ) + .expect("one_of exec"), + ); + + let rebuilt = exec + .with_new_children(vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(schema)) as Arc, + ]) + .expect("rebuild exec"); + let rebuilt = rebuilt + .as_any() + .downcast_ref::() + .expect("expected OneOfExec"); + + assert_eq!(rebuilt.rewrite_context(), &context); + } +}