Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 150 additions & 1 deletion src/rewrite/normal_form.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ As one can see, all compensating filters are included, and the query only uses t

use std::{
collections::{BTreeSet, HashMap, HashSet},
fmt,
sync::Arc,
};

Expand Down Expand Up @@ -225,6 +226,14 @@ impl SpjNormalForm {
&self.referenced_tables
}

/// The normalized predicate of this plan (column equivalence classes,
/// per-class range intervals, and residual filter expressions). Useful
/// for surfacing the SPJ normal form via catalog metadata or for
/// debugging why view matching does or does not engage on a given query.
pub fn predicate(&self) -> &Predicate {
&self.predicate
}

/// Analyze an existing `LogicalPlan` and rewrite it in select-project-join normal form.
pub fn new(original_plan: &LogicalPlan) -> Result<Self> {
let predicate = Predicate::new(original_plan)?;
Expand Down Expand Up @@ -353,8 +362,21 @@ impl SpjNormalForm {
}

/// Stores information on filters from a Select-Project-Join plan.
///
/// Built from the original `LogicalPlan` in [`SpjNormalForm::new`] and
/// exposed read-only via [`SpjNormalForm::predicate`]. Internally it
/// groups filters into three buckets that the view-matching subsumption
/// tests consume:
///
/// 1. column equivalence classes (`col_a = col_b` chains),
/// 2. per-class range intervals (e.g. `col_a >= 5`), and
/// 3. residual filter expressions that don't fit the first two buckets.
///
/// The internal fields stay private; consumers can render a human-readable
/// summary via the [`Display`](std::fmt::Display) impl, or reach for the
/// underlying `LogicalPlan` if they need raw access.
#[derive(Debug, Clone)]
struct Predicate {
pub struct Predicate {
/// Full table schema, including all possible columns.
schema: DFSchema,
/// List of column equivalence classes.
Expand All @@ -369,6 +391,81 @@ struct Predicate {
referenced_tables: Vec<TableReference>,
}

/// Renders the predicate as an AND-joined list of filter conditions.
///
/// If any equivalence class has an empty range (stored as `None` after a
/// `Interval::intersect` collapsed to the empty set), the predicate is
/// unsatisfiable and renders as a single `FALSE` -- otherwise downstream
/// consumers would see an unsatisfiable filter as if it had no constraint
/// at all, which is misleading.
///
/// For a satisfiable predicate, three sources are emitted in order:
/// 1. Pairwise equalities derived from each column equivalence class (e.g.
/// `t.a = t.b` for a class `{a, b}`); singleton classes emit nothing.
/// 2. Narrowed range intervals per equivalence class. Classes whose
/// interval is still the default unbounded interval (no filter applied)
/// are skipped, so only meaningful constraints surface.
/// 3. Residual filter expressions, rendered via their `Expr` `Display`
/// and sorted for deterministic output across runs.
///
/// Intended for human inspection / catalog surfacing rather than as a
/// canonical SQL form -- callers that need the original `Expr`s should
/// reach for the underlying logical plan instead.
impl fmt::Display for Predicate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Any None range means a prior intersection produced the empty set,
// i.e. the predicate is unsatisfiable. Bail with FALSE so consumers
// don't see an unsatisfiable predicate as "no constraints".
if self.ranges_by_equivalence_class.iter().any(|r| r.is_none()) {
return write!(f, "FALSE");
}

let mut parts: Vec<String> = Vec::new();

for eq_class in &self.eq_classes {
let mut cols = eq_class.columns.iter();
let Some(first) = cols.next() else {
debug_assert!(false, "empty ColumnEquivalenceClass");
continue;
};
for other in cols {
parts.push(format!("{first} = {other}"));
}
}

for (idx, range) in self.ranges_by_equivalence_class.iter().enumerate() {
// None was handled above; this is the satisfiable path only.
let Some(interval) = range else { continue };
let Some(eq_class) = self.eq_classes.get(idx) else {
debug_assert!(false, "ranges/eq_classes length mismatch at {idx}");
continue;
};
let Some(col) = eq_class.columns.iter().next() else {
debug_assert!(false, "empty ColumnEquivalenceClass at {idx}");
continue;
};
let Ok(field) = self.schema.field_from_column(col) else {
debug_assert!(false, "column {col} missing from predicate schema");
continue;
};
let Ok(unbounded) = Interval::make_unbounded(field.data_type()) else {
debug_assert!(false, "make_unbounded failed for {}", field.data_type());
continue;
};
if interval == &unbounded {
continue;
}
parts.push(format!("{col} in {interval}"));
}

let mut residuals: Vec<String> = self.residuals.iter().map(|e| e.to_string()).collect();
residuals.sort();
parts.extend(residuals);

write!(f, "{}", parts.join(" AND "))
}
}

impl Predicate {
/// Create a new Predicate by analyzing the given logical plan.
/// Uses single-pass traversal to collect schema, columns, filters, and referenced tables.
Expand Down Expand Up @@ -1651,4 +1748,56 @@ mod test {

Ok(())
}

#[tokio::test]
async fn test_predicate_getter_and_display() -> Result<()> {
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE t (a INT, b INT, c VARCHAR)")
.await?
.collect()
.await?;

// Residual: `c LIKE 'foo%'` falls outside eq classes / ranges and
// should land in `residuals`. The eq `a = b` populates an
// equivalence class, and `a >= 5` narrows its range.
let plan = ctx
.sql("SELECT a FROM t WHERE a = b AND a >= 5 AND c LIKE 'foo%'")
.await?
.into_optimized_plan()?;
let normal_form = SpjNormalForm::new(&plan)?;

// Getter returns a reference to the same Predicate that drives
// subsumption tests internally; callers can stringify or inspect
// it for catalog surfacing / debugging.
let rendered = normal_form.predicate().to_string();

// Display output is AND-joined; check each expected fragment
// independently because residuals are hash-set ordered before the
// internal sort, so positional matching would be brittle.
assert!(
rendered.contains("t.a = t.b"),
"expected equality from class {{a, b}}, got: {rendered}"
);
// `a >= 5` should narrow the equivalence class range away from
// the default unbounded interval, so Display emits the column +
// its non-default interval. Match liberally on `t.a` followed by
// a literal `5` so we don't depend on the exact `Interval`
// Display formatting.
let has_range =
rendered.contains("t.a") && rendered.contains("in ") && rendered.contains('5');
assert!(
has_range,
"expected a narrowed range constraint from `a >= 5` to appear, got: {rendered}"
);
assert!(
rendered.contains("t.c LIKE"),
"expected LIKE residual, got: {rendered}"
);
assert!(
!rendered.is_empty(),
"predicate Display should not be empty when filters exist"
);
Comment on lines +1796 to +1799

Ok(())
}
}
Loading