diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/cube_ref_evaluator.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/cube_ref_evaluator.rs index a2134362f62b7..f04a747c44d9d 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/cube_ref_evaluator.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/cube_ref_evaluator.rs @@ -1,4 +1,4 @@ -use super::sql_nodes::SqlNode; +use super::sql_nodes::NodeProcessor; use super::SqlEvaluatorVisitor; use crate::planner::query_tools::QueryTools; use crate::planner::sql_call::CubeRef; @@ -27,7 +27,7 @@ impl CubeRefEvaluator { &self, cube_ref: &CubeRef, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, ) -> Result { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_filter.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_filter.rs index adf806d12e43b..50c67d862b83c 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_filter.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_filter.rs @@ -1,5 +1,5 @@ use super::ToSql; -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::filter::typed_filter::resolve_base_symbol; use crate::planner::filter::BaseFilter; @@ -13,7 +13,7 @@ impl ToSql for BaseFilter { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, filters_ctx: &FiltersContext, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_segment.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_segment.rs index ffde3b8a05d86..5714a0664227a 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_segment.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/base_segment.rs @@ -1,5 +1,5 @@ use super::ToSql; -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::filter::BaseSegment; use crate::planner::query_tools::QueryTools; @@ -12,7 +12,7 @@ impl ToSql for BaseSegment { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, _query_tools: Rc, templates: &PlanSqlTemplates, _filters_ctx: &FiltersContext, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/filter.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/filter.rs index 91f01262eee85..4963949cc09a6 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/filter.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/filter.rs @@ -1,5 +1,5 @@ use super::ToSql; -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::filter::{Filter, FilterItem}; use crate::planner::query_tools::QueryTools; @@ -12,7 +12,7 @@ impl ToSql for FilterItem { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, filters_ctx: &FiltersContext, @@ -62,7 +62,7 @@ impl ToSql for Filter { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, filters_ctx: &FiltersContext, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/operators/measure_filter.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/operators/measure_filter.rs index cdb8eb6187f6c..17ee025c679e1 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/operators/measure_filter.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/operators/measure_filter.rs @@ -1,4 +1,4 @@ -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::filter::operators::measure_filter::MeasureFilterOp; use crate::planner::query_tools::QueryTools; @@ -12,7 +12,7 @@ impl MeasureFilterOp { &self, member_evaluator: &Rc, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, plan_templates: &PlanSqlTemplates, ) -> Result { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/to_sql.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/to_sql.rs index a44f62127d570..28c1a66096238 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/to_sql.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/to_sql.rs @@ -1,4 +1,4 @@ -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; @@ -10,7 +10,7 @@ pub trait ToSql { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, filters_ctx: &FiltersContext, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/typed_filter.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/typed_filter.rs index e1a407c95a7a1..a28806a62b700 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/typed_filter.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/filter/typed_filter.rs @@ -1,7 +1,7 @@ use super::operators::{FilterOperationSql, FilterSqlContext}; use super::ToSql; use crate::cube_bridge::member_sql::FilterParamsColumn; -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::filter::typed_filter::{resolve_base_symbol, FilterOp, TypedFilter}; use crate::planner::query_tools::QueryTools; @@ -14,7 +14,7 @@ impl ToSql for TypedFilter { fn to_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, filters_ctx: &FiltersContext, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/auto_prefix.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/auto_prefix.rs deleted file mode 100644 index 8fd8350fbec90..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/auto_prefix.rs +++ /dev/null @@ -1,119 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::Schema; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use lazy_static::lazy_static; -use regex::Regex; -use std::any::Any; -use std::collections::HashMap; -use std::rc::Rc; - -pub struct AutoPrefixSqlNode { - input: Rc, - cube_references: HashMap, - schema: Rc, -} - -impl AutoPrefixSqlNode { - pub fn new(input: Rc, cube_references: HashMap) -> Rc { - Rc::new(Self { - input, - cube_references, - schema: Rc::new(Schema::empty()), - }) - } - - pub fn new_with_schema( - input: Rc, - cube_references: HashMap, - schema: Rc, - ) -> Rc { - Rc::new(Self { - input, - schema, - cube_references, - }) - } - - pub fn input(&self) -> &Rc { - &self.input - } - - pub fn schema(&self) -> &Rc { - &self.schema - } - - pub fn cube_references(&self) -> &HashMap { - &self.cube_references - } - - fn resolve_cube_alias(&self, name: &String) -> String { - if let Some(alias) = self.cube_references.get(name) { - alias.clone() - } else { - name.clone() - } - } - - pub fn auto_prefix_with_cube_name( - &self, - cube_name: &str, - sql: &str, - templates: &PlanSqlTemplates, - ) -> Result { - lazy_static! { - static ref SINGLE_MEMBER_RE: Regex = Regex::new(r"^[_a-zA-Z][_a-zA-Z0-9]*$").unwrap(); - } - let res = if SINGLE_MEMBER_RE.is_match(sql) { - format!( - "{}.{}", - templates.quote_identifier(&PlanSqlTemplates::alias_name(&cube_name))?, - sql - ) - } else { - sql.to_string() - }; - Ok(res) - } -} - -impl SqlNode for AutoPrefixSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let input = self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let res = match node.as_ref() { - MemberSymbol::Dimension(ev) => { - let cube_alias = self.resolve_cube_alias(&ev.cube_name()); - self.auto_prefix_with_cube_name(&cube_alias, &input, templates)? - } - MemberSymbol::Measure(ev) => { - let cube_alias = self.resolve_cube_alias(&ev.cube_name()); - self.auto_prefix_with_cube_name(&cube_alias, &input, templates)? - } - _ => input, - }; - Ok(res) - } - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/calendar_time_shift.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/calendar_time_shift.rs deleted file mode 100644 index c421ddee3a33f..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/calendar_time_shift.rs +++ /dev/null @@ -1,108 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::CalendarDimensionTimeShift; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::collections::HashMap; -use std::rc::Rc; - -pub struct CalendarTimeShiftSqlNode { - shifts: HashMap, // Key is the full pk name of the calendar cube - input: Rc, -} - -impl CalendarTimeShiftSqlNode { - pub fn new( - shifts: HashMap, - input: Rc, - ) -> Rc { - Rc::new(Self { shifts, input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for CalendarTimeShiftSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Dimension(ev) => { - if !ev.is_reference() { - if let Some(shift) = self.shifts.get(&ev.full_name()) { - if let Some(sql) = &shift.sql { - sql.eval( - visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )? - } else if let Some(interval) = &shift.interval { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let res = templates - .add_timestamp_interval(input, interval.inverse().to_sql())?; - format!("({})", res) - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/cube_calc_groups.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/cube_calc_groups.rs deleted file mode 100644 index 27b5bfe4d55c4..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/cube_calc_groups.rs +++ /dev/null @@ -1,114 +0,0 @@ -use super::SqlNode; -use crate::planner::query_tools::QueryTools; -use crate::planner::MemberSymbol; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::sql_templates::structs::{TemplateCalcGroup, TemplateCalcSingleValue}; -use crate::planner::sql_templates::PlanSqlTemplates; -use cubenativeutils::CubeError; -use std::any::Any; -use std::collections::HashMap; -use std::rc::Rc; - -#[derive(Clone, Debug)] -pub struct CalcGroupItem { - pub name: String, - pub values: Vec, -} - -#[derive(Default, Clone, Debug)] -pub struct CalcGroupsItems { - items: HashMap>, -} - -impl CalcGroupsItems { - pub fn add(&mut self, cube_name: String, dimension_name: String, values: Vec) { - let items = self.items.entry(cube_name).or_default(); - if !items.iter().any(|itm| itm.name == dimension_name) { - items.push(CalcGroupItem { - name: dimension_name, - values, - }) - } - } - - pub fn get(&self, cube_name: &str) -> Option<&Vec> { - self.items.get(cube_name) - } - - pub fn is_empty(&self) -> bool { - self.items.is_empty() - } -} - -pub struct CubeCalcGroupsSqlNode { - input: Rc, - items: CalcGroupsItems, -} - -impl CubeCalcGroupsSqlNode { - pub fn new(input: Rc, items: CalcGroupsItems) -> Rc { - Rc::new(Self { input, items }) - } -} - -impl SqlNode for CubeCalcGroupsSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let input = self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let res = match node.as_ref() { - MemberSymbol::CubeTable(ev) => { - let res = if let Some(calc_groups) = self.items.get(ev.cube_name()) { - let mut single_values = vec![]; - let mut template_groups = vec![]; - for calc_group in calc_groups { - if calc_group.values.len() == 1 { - single_values.push(TemplateCalcSingleValue { - name: calc_group.name.clone(), - value: calc_group.values[0].clone(), - }) - } else { - template_groups.push(TemplateCalcGroup { - name: calc_group.name.clone(), - alias: format!("{}_values", calc_group.name), - values: calc_group.values.clone(), - }) - } - } - let res = templates.calc_groups_join( - &ev.cube_name(), - &input, - single_values, - template_groups, - )?; - format!("({})", res) - } else { - input - }; - - res - } - _ => input, - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/evaluate_sql.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/evaluate_sql.rs deleted file mode 100644 index 8871b58aa1952..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/evaluate_sql.rs +++ /dev/null @@ -1,47 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::symbols::{MemberSqlContext, ToSql}; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct EvaluateSqlNode {} - -impl EvaluateSqlNode { - pub fn new() -> Rc { - Rc::new(Self {}) - } -} - -impl SqlNode for EvaluateSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let path = node.compiled_path(); - let ctx = MemberSqlContext { - visitor, - node_processor: &node_processor, - query_tools: &query_tools, - templates, - name: path.name(), - full_name: path.full_name(), - }; - node.as_ref().to_sql(&ctx) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/factory.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/factory.rs index bfcad5b6fe073..08021385e2864 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/factory.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/factory.rs @@ -1,18 +1,124 @@ -use super::{ - AutoPrefixSqlNode, CaseSqlNode, EvaluateSqlNode, FinalMeasureSqlNode, - FinalPreAggregationMeasureSqlNode, GeoDimensionSqlNode, MaskedSqlNode, MeasureFilterSqlNode, - MultiStageRankNode, MultiStageWindowNode, ParenthesizeSqlNode, RenderReferencesSqlNode, - RenderReferencesType, RollingWindowNode, RootSqlNode, SqlNode, TimeDimensionNode, - TimeShiftSqlNode, UngroupedMeasureSqlNode, UngroupedQueryFinalMeasureSqlNode, -}; +use super::{NodeProcessor, Op, RenderReferencesType}; use crate::physical_plan::cube_ref_evaluator::CubeRefEvaluator; -use crate::physical_plan::sql_nodes::calendar_time_shift::CalendarTimeShiftSqlNode; use crate::physical_plan::sql_nodes::RenderReferences; use crate::planner::planners::multi_stage::TimeShiftState; use crate::planner::symbols::CalendarDimensionTimeShift; use std::collections::{HashMap, HashSet}; use std::rc::Rc; +/// Prepend an outer op in front of `tail` so the resulting `Vec` runs +/// `outer` first and then continues into the previously built pipeline. +fn prepend(outer: Op, tail: Vec) -> Vec { + let mut ops = Vec::with_capacity(tail.len() + 1); + ops.push(outer); + ops.extend(tail); + ops +} + +fn op_paren(tail: Vec) -> Vec { + prepend(Op::parenthesize(), tail) +} + +fn op_auto_prefix(tail: Vec, cube_references: HashMap) -> Vec { + prepend(Op::auto_prefix(cube_references), tail) +} + +fn op_measure_filter(tail: Vec) -> Vec { + prepend(Op::measure_filter(), tail) +} + +fn op_geo_dimension(tail: Vec) -> Vec { + prepend(Op::geo_dimension(), tail) +} + +fn op_render_references(tail: Vec, references: RenderReferences) -> Vec { + prepend(Op::render_references(references), tail) +} + +fn op_masked(tail: Vec, ungrouped: bool) -> Vec { + prepend(Op::masked(ungrouped), tail) +} + +fn op_case(tail: Vec) -> Vec { + prepend(Op::case(), tail) +} + +fn op_dispatch_by_kind( + dimension: Vec, + time_dimension: Vec, + measure: Vec, + default: Vec, +) -> Vec { + vec![Op::dispatch_by_kind( + dimension, + time_dimension, + measure, + default, + )] +} + +fn op_final_measure( + tail: Vec, + rendered_as_multiplied_measures: HashSet, + count_approx_as_state: bool, +) -> Vec { + prepend( + Op::final_measure(rendered_as_multiplied_measures, count_approx_as_state), + tail, + ) +} + +fn op_final_pre_aggregation_measure(tail: Vec, references: RenderReferences) -> Vec { + prepend(Op::final_pre_aggregation_measure(references), tail) +} + +fn op_ungrouped_measure(tail: Vec) -> Vec { + prepend(Op::ungrouped_measure(), tail) +} + +fn op_ungrouped_query_final_measure(tail: Vec) -> Vec { + prepend(Op::ungrouped_query_final_measure(), tail) +} + +fn op_time_dimension(dimensions_with_ignored_timezone: HashSet, tail: Vec) -> Vec { + prepend(Op::time_dimension(dimensions_with_ignored_timezone), tail) +} + +fn op_time_shift(shifts: TimeShiftState, tail: Vec) -> Vec { + prepend(Op::time_shift(shifts), tail) +} + +fn op_calendar_time_shift( + shifts: HashMap, + tail: Vec, +) -> Vec { + prepend(Op::calendar_time_shift(shifts), tail) +} + +fn op_multi_stage_rank(tail: Vec, partition: Vec) -> Vec { + prepend(Op::multi_stage_rank(partition), tail) +} + +fn op_multi_stage_window( + multi_stage_input: Vec, + else_pipeline: Vec, + partition: Vec, +) -> Vec { + vec![Op::multi_stage_window( + multi_stage_input, + else_pipeline, + partition, + )] +} + +fn op_rolling_window(input_pipeline: Vec, default_pipeline: Vec) -> Vec { + vec![Op::rolling_window(input_pipeline, default_pipeline)] +} + +/// Configuration carrier for assembling a query's render pipeline. Options +/// (time shifts, multi-stage settings, pre-aggregation references, masked +/// measures, …) accumulate via `set_*`/`add_*` methods and feed into +/// [`Self::default_node_processor`]. #[derive(Clone, Default)] pub struct SqlNodesFactory { time_shifts: TimeShiftState, @@ -150,64 +256,59 @@ impl SqlNodesFactory { ) } - pub fn default_node_processor(&self) -> Rc { - let evaluate_sql_processor = MaskedSqlNode::new(EvaluateSqlNode::new()); - let auto_prefix_processor = AutoPrefixSqlNode::new( + /// Build the `NodeProcessor` for the current configuration. + pub fn default_node_processor(&self) -> Rc { + let evaluate_sql_processor = op_masked(vec![Op::evaluate_symbol()], false); + let auto_prefix_processor = op_auto_prefix( evaluate_sql_processor.clone(), self.cube_name_references.clone(), ); - let parenthesize_processor: Rc = - ParenthesizeSqlNode::new(auto_prefix_processor.clone()); + let parenthesize_processor = op_paren(auto_prefix_processor); - let measure_filter_processor = MeasureFilterSqlNode::new(parenthesize_processor.clone()); - let measure_processor = CaseSqlNode::new(measure_filter_processor.clone()); + let measure_filter_processor = op_measure_filter(parenthesize_processor); + let measure_processor = op_case(measure_filter_processor.clone()); let measure_processor = self.add_ungrouped_measure_reference_if_needed(measure_processor); let measure_processor = self.final_measure_node_processor(measure_processor); - // Wrap the entire measure chain with MaskedSqlNode so masked measures + // Wrap the entire measure chain with a Masked op so masked measures // are intercepted before aggregation/ungrouped wrapping. - let measure_processor = if self.ungrouped || self.ungrouped_measure { - MaskedSqlNode::new_ungrouped(measure_processor) - } else { - MaskedSqlNode::new(measure_processor) - }; - let measure_processor = self - .add_multi_stage_window_if_needed(measure_processor, measure_filter_processor.clone()); + let measure_processor = + op_masked(measure_processor, self.ungrouped || self.ungrouped_measure); + let measure_processor = + self.add_multi_stage_window_if_needed(measure_processor, measure_filter_processor); let measure_processor = self.add_multi_stage_rank_if_needed(measure_processor); - let default_processor: Rc = - if !self.pre_aggregation_dimensions_references.is_empty() { - RenderReferencesSqlNode::new( - evaluate_sql_processor.clone(), - self.pre_aggregation_dimensions_references.clone(), - ) - } else { - evaluate_sql_processor.clone() - }; - let default_processor: Rc = ParenthesizeSqlNode::new(default_processor); + let default_processor = if !self.pre_aggregation_dimensions_references.is_empty() { + op_render_references( + evaluate_sql_processor.clone(), + self.pre_aggregation_dimensions_references.clone(), + ) + } else { + evaluate_sql_processor.clone() + }; + let default_processor = op_paren(default_processor); - let root_node = RootSqlNode::new( + let root_ops = op_dispatch_by_kind( self.dimension_processor(evaluate_sql_processor.clone()), - self.time_dimension_processor(ParenthesizeSqlNode::new(evaluate_sql_processor.clone())), - measure_processor.clone(), + self.time_dimension_processor(op_paren(evaluate_sql_processor)), + measure_processor, default_processor, ); - RenderReferencesSqlNode::new(root_node, self.render_references.clone()) + let root_ops = op_render_references(root_ops, self.render_references.clone()); + NodeProcessor::new(root_ops) } - fn add_ungrouped_measure_reference_if_needed( - &self, - default: Rc, - ) -> Rc { + + fn add_ungrouped_measure_reference_if_needed(&self, default: Vec) -> Vec { if !self.ungrouped_measure_references.is_empty() { - RenderReferencesSqlNode::new(default, self.ungrouped_measure_references.clone()) + op_render_references(default, self.ungrouped_measure_references.clone()) } else { default } } - fn add_multi_stage_rank_if_needed(&self, default: Rc) -> Rc { + fn add_multi_stage_rank_if_needed(&self, default: Vec) -> Vec { if let Some(partition_by) = &self.multi_stage_rank { - MultiStageRankNode::new(default, partition_by.clone()) + op_multi_stage_rank(default, partition_by.clone()) } else { default } @@ -215,29 +316,29 @@ impl SqlNodesFactory { fn add_multi_stage_window_if_needed( &self, - default: Rc, - multi_stage_input: Rc, - ) -> Rc { + else_pipeline: Vec, + multi_stage_input: Vec, + ) -> Vec { if let Some(partition_by) = &self.multi_stage_window { - MultiStageWindowNode::new(multi_stage_input, default, partition_by.clone()) + op_multi_stage_window(multi_stage_input, else_pipeline, partition_by.clone()) } else { - default + else_pipeline } } - fn final_measure_node_processor(&self, input: Rc) -> Rc { + fn final_measure_node_processor(&self, input: Vec) -> Vec { if self.ungrouped_measure { - UngroupedMeasureSqlNode::new(input) + op_ungrouped_measure(input) } else if self.ungrouped { - UngroupedQueryFinalMeasureSqlNode::new(input) + op_ungrouped_query_final_measure(input) } else { - let final_processor: Rc = FinalMeasureSqlNode::new( + let final_processor = op_final_measure( input.clone(), self.rendered_as_multiplied_measures.clone(), self.count_approx_as_state, ); let final_processor = if !self.pre_aggregation_measures_references.is_empty() { - FinalPreAggregationMeasureSqlNode::new( + op_final_pre_aggregation_measure( final_processor, self.pre_aggregation_measures_references.clone(), ) @@ -245,49 +346,39 @@ impl SqlNodesFactory { final_processor }; if self.rolling_window { - RollingWindowNode::new(input, final_processor) + op_rolling_window(input, final_processor) } else { final_processor } } } - fn dimension_processor(&self, input: Rc) -> Rc { + fn dimension_processor(&self, input: Vec) -> Vec { let input = if !self.pre_aggregation_dimensions_references.is_empty() { - RenderReferencesSqlNode::new(input, self.pre_aggregation_dimensions_references.clone()) + op_render_references(input, self.pre_aggregation_dimensions_references.clone()) } else { - let input: Rc = GeoDimensionSqlNode::new(input); - let input: Rc = CaseSqlNode::new(input); - input + let input = op_geo_dimension(input); + op_case(input) }; - let input: Rc = - AutoPrefixSqlNode::new(input, self.cube_name_references.clone()); - - let input: Rc = ParenthesizeSqlNode::new(input); - - let input: Rc = - TimeDimensionNode::new(self.dimensions_with_ignored_timezone.clone(), input); + let input = op_auto_prefix(input, self.cube_name_references.clone()); + let input = op_paren(input); + let input = op_time_dimension(self.dimensions_with_ignored_timezone.clone(), input); let input = if !self.calendar_time_shifts.is_empty() { - CalendarTimeShiftSqlNode::new(self.calendar_time_shifts.clone(), input) + op_calendar_time_shift(self.calendar_time_shifts.clone(), input) } else { input }; - let input = if !self.time_shifts.is_empty() { - TimeShiftSqlNode::new(self.time_shifts.clone(), input) + if !self.time_shifts.is_empty() { + op_time_shift(self.time_shifts.clone(), input) } else { input - }; - - input + } } - fn time_dimension_processor(&self, input: Rc) -> Rc { - let input: Rc = - TimeDimensionNode::new(self.dimensions_with_ignored_timezone.clone(), input); - - input + fn time_dimension_processor(&self, input: Vec) -> Vec { + op_time_dimension(self.dimensions_with_ignored_timezone.clone(), input) } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_measure.rs deleted file mode 100644 index 71c6579ec41e7..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_measure.rs +++ /dev/null @@ -1,100 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::AggregateWrap; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::collections::HashSet; -use std::rc::Rc; - -pub struct FinalMeasureSqlNode { - input: Rc, - rendered_as_multiplied_measures: HashSet, - count_approx_as_state: bool, -} - -impl FinalMeasureSqlNode { - pub fn new( - input: Rc, - rendered_as_multiplied_measures: HashSet, - count_approx_as_state: bool, - ) -> Rc { - Rc::new(Self { - input, - rendered_as_multiplied_measures, - count_approx_as_state, - }) - } - - pub fn input(&self) -> &Rc { - &self.input - } - - fn apply_wrap( - &self, - wrap: AggregateWrap, - input: String, - templates: &PlanSqlTemplates, - ) -> Result { - match wrap { - AggregateWrap::PassThrough => Ok(input), - AggregateWrap::Function(name) => Ok(format!("{}({})", name, input)), - AggregateWrap::CountDistinct => templates.count_distinct(&input), - AggregateWrap::CountDistinctApprox => { - if self.count_approx_as_state { - templates.hll_init(input) - } else { - templates.count_distinct_approx(input) - } - } - } - } -} - -impl SqlNode for FinalMeasureSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(ev) => { - let is_multiplied = self - .rendered_as_multiplied_measures - .contains(&ev.full_name()); - let wrap = ev.kind().aggregate_wrap(is_multiplied); - let child_visitor = match wrap { - AggregateWrap::PassThrough => visitor.clone(), - _ => visitor.with_arg_needs_paren_safe(false), - }; - let input = self.input.to_sql( - &child_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - self.apply_wrap(wrap, input, templates)? - } - _ => { - return Err(CubeError::internal(format!( - "Measure filter node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_pre_aggregation_measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_pre_aggregation_measure.rs deleted file mode 100644 index e650233c4b46c..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/final_pre_aggregation_measure.rs +++ /dev/null @@ -1,93 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::sql_nodes::RenderReferences; -use crate::physical_plan::sql_nodes::RenderReferencesType; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::AggregateWrap; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct FinalPreAggregationMeasureSqlNode { - input: Rc, - references: RenderReferences, -} - -impl FinalPreAggregationMeasureSqlNode { - pub fn new(input: Rc, references: RenderReferences) -> Rc { - Rc::new(Self { input, references }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for FinalPreAggregationMeasureSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(ev) => { - if let Some(reference) = self.references.get(&node.full_name()) { - match reference { - RenderReferencesType::QualifiedColumnName(column_name) => { - let table_ref = if let Some(table_name) = column_name.source() { - format!("{}.", templates.quote_identifier(table_name)?) - } else { - format!("") - }; - let pre_aggregation_measure = format!( - "{}{}", - table_ref, - templates.quote_identifier(&column_name.name())? - ); - match ev.kind().pre_aggregate_wrap() { - AggregateWrap::CountDistinctApprox => { - templates.count_distinct_approx(pre_aggregation_measure)? - } - AggregateWrap::Function(name) => { - format!("{}({})", name, pre_aggregation_measure) - } - _ => format!("sum({})", pre_aggregation_measure), - } - } - RenderReferencesType::LiteralValue(value) => { - templates.quote_string(value)? - } - RenderReferencesType::RawReferenceValue(value) => value.clone(), - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "final preaggregation measure node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/geo_dimension.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/geo_dimension.rs deleted file mode 100644 index f09bac3323cfd..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/geo_dimension.rs +++ /dev/null @@ -1,77 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::DimensionKind; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct GeoDimensionSqlNode { - input: Rc, -} - -impl GeoDimensionSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for GeoDimensionSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Dimension(ev) => { - if let DimensionKind::Geo(geo) = ev.kind() { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let latitude_str = geo.latitude().eval( - &inner_visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )?; - let longitude_str = geo.longitude().eval( - &inner_visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )?; - templates.concat_strings(&vec![latitude_str, format!("','"), longitude_str])? - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "GeoDimension node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/masked.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/masked.rs deleted file mode 100644 index af28236221bc2..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/masked.rs +++ /dev/null @@ -1,129 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::filter::ToSql; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::FiltersContext; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct MaskedSqlNode { - input: Rc, - ungrouped: bool, -} - -impl MaskedSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { - input, - ungrouped: false, - }) - } - - pub fn new_ungrouped(input: Rc) -> Rc { - Rc::new(Self { - input, - ungrouped: true, - }) - } - - fn resolve_mask( - &self, - node: &Rc, - visitor: &SqlEvaluatorVisitor, - node_processor: Rc, - query_tools: Rc, - templates: &PlanSqlTemplates, - ) -> Result, CubeError> { - let full_name = node.full_name(); - if !query_tools.is_member_masked(&full_name) { - return Ok(None); - } - - let mask_filter = query_tools.member_mask_filter(&full_name); - - let masked_sql = if let Some(mask_call) = node.mask_sql() { - if self.ungrouped { - if let MemberSymbol::Measure(_) = node.as_ref() { - if mask_call.dependencies_count() > 0 { - return Ok(None); - } - } - } - mask_call.eval( - visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )? - } else { - "(NULL)".to_string() - }; - - let Some(filter_item) = mask_filter else { - return Ok(Some(masked_sql)); - }; - - let original_sql = self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor, - templates, - )?; - // TODO: support FILTER_PARAMS in mask filter SQL by passing - // proper FiltersContext with filter_params_columns. - // Use self.input as node_processor so member references inside the filter - // resolve through the unmasked chain — prevents recursion through MaskedSqlNode - // when the filter member is itself masked. - let filter_sql = filter_item.to_sql( - visitor, - self.input.clone(), - query_tools, - templates, - &FiltersContext::default(), - )?; - if filter_sql.is_empty() { - Ok(Some(masked_sql)) - } else { - Ok(Some(templates.case( - None, - vec![(filter_sql, original_sql)], - Some(masked_sql), - )?)) - } - } -} - -impl SqlNode for MaskedSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - if let Some(masked) = self.resolve_mask( - node, - visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )? { - return Ok(masked); - } - self.input - .to_sql(visitor, node, query_tools, node_processor, templates) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/measure_filter.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/measure_filter.rs deleted file mode 100644 index 2488df55e1173..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/measure_filter.rs +++ /dev/null @@ -1,93 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct MeasureFilterSqlNode { - input: Rc, -} - -impl MeasureFilterSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for MeasureFilterSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(ev) => { - let measure_filters = ev.measure_filters(); - if !measure_filters.is_empty() { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let filters = measure_filters - .iter() - .map(|filter| -> Result { - Ok(format!( - "({})", - filter.eval( - &inner_visitor, - node_processor.clone(), - query_tools.clone(), - templates - )? - )) - }) - .collect::, _>>()? - .join(" AND "); - let result = if input.as_str() == "*" { - "1".to_string() - } else { - input - }; - format!("CASE WHEN {} THEN {} END", filters, result) - } else { - // Passthrough — propagate visitor unchanged. - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "Measure filter node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/mod.rs index 35f81ee5d4708..af185ea7e55c7 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/mod.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/mod.rs @@ -1,44 +1,13 @@ -pub mod auto_prefix; -pub mod calendar_time_shift; -pub mod case; -//pub mod cube_calc_groups; -pub mod evaluate_sql; pub mod factory; -pub mod final_measure; -pub mod final_pre_aggregation_measure; -pub mod geo_dimension; -pub mod masked; -pub mod measure_filter; -pub mod multi_stage_rank; -pub mod multi_stage_window; -pub mod parenthesize; +pub mod op; pub mod render_references; -pub mod rolling_window; -pub mod root_processor; -pub mod sql_node; -pub mod time_dimension; -pub mod time_shift; -pub mod ungroupped_measure; -pub mod ungroupped_query_final_measure; -pub use auto_prefix::AutoPrefixSqlNode; -pub use case::CaseSqlNode; -//pub use cube_calc_groups::CubeCalcGroupsSqlNode; -pub use evaluate_sql::EvaluateSqlNode; pub use factory::SqlNodesFactory; -pub use final_measure::FinalMeasureSqlNode; -pub use final_pre_aggregation_measure::FinalPreAggregationMeasureSqlNode; -pub use geo_dimension::GeoDimensionSqlNode; -pub use masked::MaskedSqlNode; -pub use measure_filter::MeasureFilterSqlNode; -pub use multi_stage_rank::MultiStageRankNode; -pub use multi_stage_window::MultiStageWindowNode; -pub use parenthesize::ParenthesizeSqlNode; +pub use op::{ + AutoPrefixOp, CalendarTimeShiftOp, CaseOp, DispatchByKindOp, EvaluateSymbolOp, FinalMeasureOp, + FinalPreAggregationMeasureOp, GeoDimensionOp, MaskedOp, MeasureFilterOp, MultiStageRankOp, + MultiStageWindowOp, NodeProcessor, Op, OpCtx, OpExec, ParenthesizeOp, RenderReferencesOp, + RollingWindowOp, TimeDimensionOp, TimeShiftOp, UngroupedMeasureOp, + UngroupedQueryFinalMeasureOp, +}; pub use render_references::*; -pub use rolling_window::RollingWindowNode; -pub use root_processor::RootSqlNode; -pub use sql_node::SqlNode; -pub use time_dimension::TimeDimensionNode; -pub use time_shift::TimeShiftSqlNode; -pub use ungroupped_measure::UngroupedMeasureSqlNode; -pub use ungroupped_query_final_measure::UngroupedQueryFinalMeasureSqlNode; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_rank.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_rank.rs deleted file mode 100644 index b290402215eac..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_rank.rs +++ /dev/null @@ -1,97 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::MeasureKind; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct MultiStageRankNode { - else_processor: Rc, - partition: Vec, -} - -impl MultiStageRankNode { - pub fn new(else_processor: Rc, partition: Vec) -> Rc { - Rc::new(Self { - else_processor, - partition, - }) - } - - pub fn else_processor(&self) -> &Rc { - &self.else_processor - } - - pub fn partition(&self) -> &Vec { - &self.partition - } -} - -impl SqlNode for MultiStageRankNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(m) => { - if m.is_multi_stage() && matches!(m.kind(), MeasureKind::Rank) { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let order_by = if !m.measure_order_by().is_empty() { - let sql = m - .measure_order_by() - .iter() - .map(|item| -> Result { - let sql = item.sql_call().eval( - &inner_visitor, - node_processor.clone(), - query_tools.clone(), - templates, - )?; - Ok(format!("{} {}", sql, item.direction())) - }) - .collect::, _>>()? - .join(", "); - format!("ORDER BY {sql}") - } else { - "".to_string() - }; - let partition_by = if self.partition.is_empty() { - "".to_string() - } else { - format!("PARTITION BY {} ", self.partition.join(", ")) - }; - format!("rank() OVER ({partition_by}{order_by})") - } else { - self.else_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "Unexpected evaluation node type for MultStageRankNode" - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.else_processor.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_window.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_window.rs deleted file mode 100644 index 2738e1788e8cd..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/multi_stage_window.rs +++ /dev/null @@ -1,96 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct MultiStageWindowNode { - input: Rc, - else_processor: Rc, - partition: Vec, -} - -impl MultiStageWindowNode { - pub fn new( - input: Rc, - else_processor: Rc, - partition: Vec, - ) -> Rc { - Rc::new(Self { - input, - else_processor, - partition, - }) - } - - pub fn input(&self) -> &Rc { - &self.input - } - - pub fn else_processor(&self) -> &Rc { - &self.else_processor - } - - pub fn partition(&self) -> &Vec { - &self.partition - } -} - -impl SqlNode for MultiStageWindowNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(m) => { - if m.is_multi_stage() && !m.is_calculated() { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input_sql = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - - let partition_by = if self.partition.is_empty() { - "".to_string() - } else { - format!("PARTITION BY {} ", self.partition.join(", ")) - }; - let measure_type = m.measure_type(); - format!("{measure_type}({measure_type}({input_sql})) OVER ({partition_by})") - } else { - self.else_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "Unexpected evaluation node type for MultStageWindowNode" - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone(), self.else_processor.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/auto_prefix_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/auto_prefix_op.rs new file mode 100644 index 0000000000000..f7679658407c2 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/auto_prefix_op.rs @@ -0,0 +1,69 @@ +use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use lazy_static::lazy_static; +use regex::Regex; +use std::collections::HashMap; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Qualifies a bare identifier with its cube alias so the column resolves +/// unambiguously when multiple cubes appear in the same query. +#[derive(Clone, Debug)] +pub struct AutoPrefixOp { + cube_references: Rc>, +} + +impl AutoPrefixOp { + pub fn new(cube_references: HashMap) -> Self { + Self { + cube_references: Rc::new(cube_references), + } + } + + fn resolve_cube_alias(&self, name: &str) -> String { + self.cube_references + .get(name) + .cloned() + .unwrap_or_else(|| name.to_string()) + } + + fn auto_prefix_with_cube_name( + &self, + cube_name: &str, + sql: &str, + templates: &PlanSqlTemplates, + ) -> Result { + lazy_static! { + static ref SINGLE_MEMBER_RE: Regex = Regex::new(r"^[_a-zA-Z][_a-zA-Z0-9]*$").unwrap(); + } + if SINGLE_MEMBER_RE.is_match(sql) { + Ok(format!( + "{}.{}", + templates.quote_identifier(&PlanSqlTemplates::alias_name(cube_name))?, + sql + )) + } else { + Ok(sql.to_string()) + } + } +} + +impl OpExec for AutoPrefixOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let input = ctx.render_tail()?; + let res = match ctx.sym.as_ref() { + MemberSymbol::Dimension(ev) => { + let cube_alias = self.resolve_cube_alias(&ev.cube_name()); + self.auto_prefix_with_cube_name(&cube_alias, &input, ctx.templates)? + } + MemberSymbol::Measure(ev) => { + let cube_alias = self.resolve_cube_alias(&ev.cube_name()); + self.auto_prefix_with_cube_name(&cube_alias, &input, ctx.templates)? + } + _ => input, + }; + Ok(res) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/calendar_time_shift_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/calendar_time_shift_op.rs new file mode 100644 index 0000000000000..924a6463ec82a --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/calendar_time_shift_op.rs @@ -0,0 +1,53 @@ +use crate::planner::symbols::CalendarDimensionTimeShift; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::collections::HashMap; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Applies a calendar-cube shift to a dimension: either substitutes a +/// preconfigured SQL expression for it, or moves it by an interval in the +/// inverse direction so calendar joins line up with the requested period. +#[derive(Clone, Debug)] +pub struct CalendarTimeShiftOp { + shifts: Rc>, +} + +impl CalendarTimeShiftOp { + pub fn new(shifts: HashMap) -> Self { + Self { + shifts: Rc::new(shifts), + } + } +} + +impl OpExec for CalendarTimeShiftOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Dimension(ev) = ctx.sym.as_ref() else { + return ctx.render_tail(); + }; + if ev.is_reference() { + return ctx.render_tail(); + } + let Some(shift) = self.shifts.get(&ev.full_name()) else { + return ctx.render_tail(); + }; + if let Some(sql) = &shift.sql { + sql.eval( + &ctx.visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + ) + } else if let Some(interval) = &shift.interval { + let interval_sql = interval.inverse().to_sql(); + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input = ctx.with_visitor(inner_visitor).render_tail()?; + let shifted = ctx.templates.add_timestamp_interval(input, interval_sql)?; + Ok(format!("({})", shifted)) + } else { + ctx.render_tail() + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/case.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/case_op.rs similarity index 52% rename from rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/case.rs rename to rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/case_op.rs index fc8ac0d0d1ac3..83f98d969fd8d 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/case.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/case_op.rs @@ -1,32 +1,26 @@ -use super::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; use crate::planner::symbols::{Case, CaseDefinition, CaseLabel, CaseSwitchDefinition}; use crate::planner::{CaseSwitchItem, MemberSymbol}; use cubenativeutils::CubeError; -use std::any::Any; use std::rc::Rc; -pub struct CaseSqlNode { - input: Rc, -} +use super::{OpCtx, OpExec}; -impl CaseSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } +/// Renders a member that is defined via `case:` / `case_switch:` rules as a +/// SQL `CASE … END`. Members without a case definition fall through to the +/// rest of the pipeline so plain dimensions/measures keep their normal path. +#[derive(Clone, Debug)] +pub struct CaseOp; - pub fn input(&self) -> &Rc { - &self.input - } - - pub fn case_to_sql( - &self, +impl CaseOp { + fn case_to_sql( visitor: &SqlEvaluatorVisitor, case: &CaseDefinition, query_tools: Rc, - node_processor: Rc, + node_processor: Rc, templates: &PlanSqlTemplates, ) -> Result { // All sub-SQLs end up inside `CASE … END` — a safe wrap. @@ -40,7 +34,7 @@ impl CaseSqlNode { templates, )?; let then = match &itm.label { - CaseLabel::String(s) => templates.quote_string(&s)?, + CaseLabel::String(s) => templates.quote_string(s)?, CaseLabel::Sql(sql) => sql.eval( &inner_visitor, node_processor.clone(), @@ -51,7 +45,7 @@ impl CaseSqlNode { when_then.push((when, then)); } let else_label = match &case.else_label { - CaseLabel::String(s) => templates.quote_string(&s)?, + CaseLabel::String(s) => templates.quote_string(s)?, CaseLabel::Sql(sql) => sql.eval( &inner_visitor, node_processor.clone(), @@ -61,17 +55,16 @@ impl CaseSqlNode { }; templates.case(None, when_then, Some(else_label)) } - pub fn case_switch_to_sql( - &self, + + fn case_switch_to_sql( visitor: &SqlEvaluatorVisitor, case: &CaseSwitchDefinition, query_tools: Rc, - node_processor: Rc, + node_processor: Rc, templates: &PlanSqlTemplates, ) -> Result { // Degenerate shortcuts return the inner SQL as-is — propagate the outer - // visitor so an enclosing ParenthesizeSqlNode still sees the compound - // flag. + // visitor so an enclosing ParenthesizeOp still sees the compound flag. if case.items.len() == 1 && case.else_sql.is_none() { return case.items[0].sql.eval( visitor, @@ -97,7 +90,7 @@ impl CaseSqlNode { templates, )?, CaseSwitchItem::Member(member_symbol) => { - inner_visitor.apply(&member_symbol, node_processor.clone(), templates)? + inner_visitor.apply(member_symbol, node_processor.clone(), templates)? } }; let mut when_then = Vec::new(); @@ -125,78 +118,35 @@ impl CaseSqlNode { } } -impl SqlNode for CaseSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Dimension(ev) => { - if let Some(case) = ev.case() { - match case { - Case::Case(case) => { - self.case_to_sql(visitor, case, query_tools, node_processor, templates)? - } - Case::CaseSwitch(case) => self.case_switch_to_sql( - visitor, - case, - query_tools, - node_processor, - templates, - )?, - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - MemberSymbol::Measure(ev) => { - if let Some(case) = ev.case() { - match case { - Case::Case(case) => { - self.case_to_sql(visitor, case, query_tools, node_processor, templates)? - } - Case::CaseSwitch(case) => self.case_switch_to_sql( - visitor, - case, - query_tools, - node_processor, - templates, - )?, - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } +impl OpExec for CaseOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let case_opt: Option<&Case> = match ctx.sym.as_ref() { + MemberSymbol::Dimension(ev) => ev.case(), + MemberSymbol::Measure(ev) => ev.case(), _ => { - return Err(CubeError::internal(format!( - "Case node processor called for wrong node", - ))); + return Err(CubeError::internal( + "Case op called for non-dimension/measure symbol".to_string(), + )); } }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] + let Some(case) = case_opt else { + return ctx.render_tail(); + }; + match case { + Case::Case(c) => Self::case_to_sql( + &ctx.visitor, + c, + ctx.query_tools.clone(), + ctx.node_processor.clone(), + ctx.templates, + ), + Case::CaseSwitch(c) => Self::case_switch_to_sql( + &ctx.visitor, + c, + ctx.query_tools.clone(), + ctx.node_processor.clone(), + ctx.templates, + ), + } } } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/dispatch_by_kind_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/dispatch_by_kind_op.rs new file mode 100644 index 0000000000000..5667e9987cd65 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/dispatch_by_kind_op.rs @@ -0,0 +1,56 @@ +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{Op, OpCtx, OpExec}; + +/// Top-level dispatch over the symbol kind: dimensions, time dimensions, +/// measures and everything else each follow their own rendering pipeline. +/// Discards the tail — each branch is a self-contained pipeline. +#[derive(Clone, Debug)] +pub struct DispatchByKindOp { + dimension: Vec, + time_dimension: Vec, + measure: Vec, + default: Vec, +} + +impl DispatchByKindOp { + pub fn new( + dimension: Vec, + time_dimension: Vec, + measure: Vec, + default: Vec, + ) -> Self { + Self { + dimension, + time_dimension, + measure, + default, + } + } +} + +impl OpExec for DispatchByKindOp { + fn is_terminal(&self) -> bool { + true + } + + fn nested_pipelines(&self) -> Vec<&[Op]> { + vec![ + &self.dimension, + &self.time_dimension, + &self.measure, + &self.default, + ] + } + + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let pipeline = match ctx.sym.as_ref() { + MemberSymbol::Dimension(_) => &self.dimension, + MemberSymbol::TimeDimension(_) => &self.time_dimension, + MemberSymbol::Measure(_) => &self.measure, + _ => &self.default, + }; + ctx.render_pipeline(pipeline) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/evaluate_symbol_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/evaluate_symbol_op.rs new file mode 100644 index 0000000000000..cdc405b7f5f94 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/evaluate_symbol_op.rs @@ -0,0 +1,28 @@ +use crate::physical_plan::symbols::{MemberSqlContext, ToSql}; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Produces the base SQL of a member from its own definition (column ref +/// or `sql:` expression). Terminal step of any rendering pipeline. +#[derive(Clone, Debug)] +pub struct EvaluateSymbolOp; + +impl OpExec for EvaluateSymbolOp { + fn is_terminal(&self) -> bool { + true + } + + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let path = ctx.sym.compiled_path(); + let member_ctx = MemberSqlContext { + visitor: &ctx.visitor, + node_processor: &ctx.node_processor, + query_tools: &ctx.query_tools, + templates: ctx.templates, + name: path.name(), + full_name: path.full_name(), + }; + ctx.sym.as_ref().to_sql(&member_ctx) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_measure_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_measure_op.rs new file mode 100644 index 0000000000000..651524ab14153 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_measure_op.rs @@ -0,0 +1,69 @@ +use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::symbols::AggregateWrap; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::collections::HashSet; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Wraps the measure expression with its aggregate (`SUM`/`AVG`/`COUNT +/// DISTINCT`/HLL state init/etc.), choosing the right form when the measure +/// is being rendered against a multiplied join branch. +#[derive(Clone, Debug)] +pub struct FinalMeasureOp { + rendered_as_multiplied_measures: Rc>, + count_approx_as_state: bool, +} + +impl FinalMeasureOp { + pub fn new( + rendered_as_multiplied_measures: HashSet, + count_approx_as_state: bool, + ) -> Self { + Self { + rendered_as_multiplied_measures: Rc::new(rendered_as_multiplied_measures), + count_approx_as_state, + } + } + + fn apply_wrap( + &self, + wrap: AggregateWrap, + input: String, + templates: &PlanSqlTemplates, + ) -> Result { + match wrap { + AggregateWrap::PassThrough => Ok(input), + AggregateWrap::Function(name) => Ok(format!("{}({})", name, input)), + AggregateWrap::CountDistinct => templates.count_distinct(&input), + AggregateWrap::CountDistinctApprox => { + if self.count_approx_as_state { + templates.hll_init(input) + } else { + templates.count_distinct_approx(input) + } + } + } + } +} + +impl OpExec for FinalMeasureOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(ev) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "FinalMeasure op called for non-measure symbol".to_string(), + )); + }; + let is_multiplied = self + .rendered_as_multiplied_measures + .contains(&ev.full_name()); + let wrap = ev.kind().aggregate_wrap(is_multiplied); + let child_visitor = match wrap { + AggregateWrap::PassThrough => ctx.visitor.clone(), + _ => ctx.visitor.with_arg_needs_paren_safe(false), + }; + let input = ctx.with_visitor(child_visitor).render_tail()?; + self.apply_wrap(wrap, input, ctx.templates) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_pre_aggregation_measure_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_pre_aggregation_measure_op.rs new file mode 100644 index 0000000000000..91d1007fa1655 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/final_pre_aggregation_measure_op.rs @@ -0,0 +1,62 @@ +use crate::physical_plan::sql_nodes::{RenderReferences, RenderReferencesType}; +use crate::planner::symbols::AggregateWrap; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Replaces a measure's aggregation with the equivalent rollup over a +/// pre-aggregation column when one is available — `sum(state)` for +/// pre-aggregated counts, `merge(state)` for HLL, etc. Falls through when +/// the measure is not covered by the active pre-aggregation. +#[derive(Clone, Debug)] +pub struct FinalPreAggregationMeasureOp { + references: Rc, +} + +impl FinalPreAggregationMeasureOp { + pub fn new(references: RenderReferences) -> Self { + Self { + references: Rc::new(references), + } + } +} + +impl OpExec for FinalPreAggregationMeasureOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(ev) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "FinalPreAggregationMeasure op called for non-measure symbol".to_string(), + )); + }; + let Some(reference) = self.references.get(&ctx.sym.full_name()) else { + return ctx.render_tail(); + }; + match reference { + RenderReferencesType::QualifiedColumnName(column_name) => { + let table_ref = if let Some(table_name) = column_name.source() { + format!("{}.", ctx.templates.quote_identifier(table_name)?) + } else { + String::new() + }; + let pre_aggregation_measure = format!( + "{}{}", + table_ref, + ctx.templates.quote_identifier(&column_name.name())? + ); + match ev.kind().pre_aggregate_wrap() { + AggregateWrap::CountDistinctApprox => { + ctx.templates.count_distinct_approx(pre_aggregation_measure) + } + AggregateWrap::Function(name) => { + Ok(format!("{}({})", name, pre_aggregation_measure)) + } + _ => Ok(format!("sum({})", pre_aggregation_measure)), + } + } + RenderReferencesType::LiteralValue(value) => ctx.templates.quote_string(value), + RenderReferencesType::RawReferenceValue(value) => Ok(value.clone()), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/geo_dimension_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/geo_dimension_op.rs new file mode 100644 index 0000000000000..89ca9c8ad05a3 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/geo_dimension_op.rs @@ -0,0 +1,41 @@ +use crate::planner::symbols::DimensionKind; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Emits a Geo dimension as a single `lat,lng` string column so the pair +/// can travel through downstream queries as one value. +#[derive(Clone, Debug)] +pub struct GeoDimensionOp; + +impl OpExec for GeoDimensionOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + match ctx.sym.as_ref() { + MemberSymbol::Dimension(ev) => { + if let DimensionKind::Geo(geo) = ev.kind() { + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let latitude_str = geo.latitude().eval( + &inner_visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + )?; + let longitude_str = geo.longitude().eval( + &inner_visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + )?; + ctx.templates + .concat_strings(&vec![latitude_str, format!("','"), longitude_str]) + } else { + ctx.render_tail() + } + } + _ => Err(CubeError::internal( + "GeoDimension op called for non-dimension symbol".to_string(), + )), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/masked_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/masked_op.rs new file mode 100644 index 0000000000000..c35627ba1f4df --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/masked_op.rs @@ -0,0 +1,85 @@ +use crate::physical_plan::filter::ToSql; +use crate::planner::FiltersContext; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Replaces a member's value with its `mask` SQL whenever the user lacks +/// access to the underlying data, optionally guarding the substitution with +/// the member's mask filter so authorized rows still see the real value. +#[derive(Clone, Debug)] +pub struct MaskedOp { + ungrouped: bool, +} + +impl MaskedOp { + pub fn new(ungrouped: bool) -> Self { + Self { ungrouped } + } + + fn resolve_mask(&self, ctx: &mut OpCtx<'_>) -> Result, CubeError> { + let full_name = ctx.sym.full_name(); + if !ctx.query_tools.is_member_masked(&full_name) { + return Ok(None); + } + + let mask_filter = ctx.query_tools.member_mask_filter(&full_name); + + let masked_sql = if let Some(mask_call) = ctx.sym.mask_sql() { + if self.ungrouped { + if let MemberSymbol::Measure(_) = ctx.sym.as_ref() { + if mask_call.dependencies_count() > 0 { + return Ok(None); + } + } + } + mask_call.eval( + &ctx.visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + )? + } else { + "(NULL)".to_string() + }; + + let Some(filter_item) = mask_filter else { + return Ok(Some(masked_sql)); + }; + + let original_sql = ctx.render_tail()?; + // Use the unmasked tail as `node_processor` for filter rendering so + // member references inside the filter resolve through it — prevents + // recursing back through this MaskedOp when the filter member is + // itself masked. + let tail_as_node_processor = ctx.tail_as_node_processor(); + // TODO: support FILTER_PARAMS in mask filter SQL by passing a proper + // FiltersContext with filter_params_columns. + let filter_sql = filter_item.to_sql( + &ctx.visitor, + tail_as_node_processor, + ctx.query_tools.clone(), + ctx.templates, + &FiltersContext::default(), + )?; + if filter_sql.is_empty() { + Ok(Some(masked_sql)) + } else { + Ok(Some(ctx.templates.case( + None, + vec![(filter_sql, original_sql)], + Some(masked_sql), + )?)) + } + } +} + +impl OpExec for MaskedOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + if let Some(masked) = self.resolve_mask(ctx)? { + return Ok(masked); + } + ctx.render_tail() + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/measure_filter_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/measure_filter_op.rs new file mode 100644 index 0000000000000..9197dd278bfae --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/measure_filter_op.rs @@ -0,0 +1,48 @@ +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Applies a measure's per-row filter rules so only matching rows +/// contribute to its aggregation. +#[derive(Clone, Debug)] +pub struct MeasureFilterOp; + +impl OpExec for MeasureFilterOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + match ctx.sym.as_ref() { + MemberSymbol::Measure(ev) => { + let measure_filters = ev.measure_filters(); + if measure_filters.is_empty() { + return ctx.render_tail(); + } + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input = ctx.with_visitor(inner_visitor.clone()).render_tail()?; + let filters = measure_filters + .iter() + .map(|filter| -> Result { + Ok(format!( + "({})", + filter.eval( + &inner_visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + )? + )) + }) + .collect::, _>>()? + .join(" AND "); + let result = if input.as_str() == "*" { + "1".to_string() + } else { + input + }; + Ok(format!("CASE WHEN {} THEN {} END", filters, result)) + } + _ => Err(CubeError::internal( + "MeasureFilter op called for non-measure symbol".to_string(), + )), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/mod.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/mod.rs new file mode 100644 index 0000000000000..f09f9ba3c57f0 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/mod.rs @@ -0,0 +1,47 @@ +pub mod auto_prefix_op; +pub mod calendar_time_shift_op; +pub mod case_op; +pub mod dispatch_by_kind_op; +pub mod evaluate_symbol_op; +pub mod final_measure_op; +pub mod final_pre_aggregation_measure_op; +pub mod geo_dimension_op; +pub mod masked_op; +pub mod measure_filter_op; +pub mod multi_stage_rank_op; +pub mod multi_stage_window_op; +pub mod node_processor; +pub mod op_ctx; +pub mod op_enum; +pub mod op_exec; +pub mod parenthesize_op; +pub mod render_references_op; +pub mod rolling_window_op; +pub mod time_dimension_op; +pub mod time_shift_op; +pub mod ungrouped_measure_op; +pub mod ungrouped_query_final_measure_op; + +pub use auto_prefix_op::AutoPrefixOp; +pub use calendar_time_shift_op::CalendarTimeShiftOp; +pub use case_op::CaseOp; +pub use dispatch_by_kind_op::DispatchByKindOp; +pub use evaluate_symbol_op::EvaluateSymbolOp; +pub use final_measure_op::FinalMeasureOp; +pub use final_pre_aggregation_measure_op::FinalPreAggregationMeasureOp; +pub use geo_dimension_op::GeoDimensionOp; +pub use masked_op::MaskedOp; +pub use measure_filter_op::MeasureFilterOp; +pub use multi_stage_rank_op::MultiStageRankOp; +pub use multi_stage_window_op::MultiStageWindowOp; +pub use node_processor::NodeProcessor; +pub use op_ctx::OpCtx; +pub use op_enum::Op; +pub use op_exec::OpExec; +pub use parenthesize_op::ParenthesizeOp; +pub use render_references_op::RenderReferencesOp; +pub use rolling_window_op::RollingWindowOp; +pub use time_dimension_op::TimeDimensionOp; +pub use time_shift_op::TimeShiftOp; +pub use ungrouped_measure_op::UngroupedMeasureOp; +pub use ungrouped_query_final_measure_op::UngroupedQueryFinalMeasureOp; diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_rank_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_rank_op.rs new file mode 100644 index 0000000000000..a9cacc9696d87 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_rank_op.rs @@ -0,0 +1,57 @@ +use crate::planner::symbols::MeasureKind; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Renders a multi-stage rank measure as a SQL `RANK() OVER (PARTITION BY … +/// ORDER BY …)`. Other measures fall through to the rest of the pipeline. +#[derive(Clone, Debug)] +pub struct MultiStageRankOp { + partition: Vec, +} + +impl MultiStageRankOp { + pub fn new(partition: Vec) -> Self { + Self { partition } + } +} + +impl OpExec for MultiStageRankOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(m) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "MultiStageRank op called for non-measure symbol".to_string(), + )); + }; + if !(m.is_multi_stage() && matches!(m.kind(), MeasureKind::Rank)) { + return ctx.render_tail(); + } + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let order_by = if !m.measure_order_by().is_empty() { + let sql = m + .measure_order_by() + .iter() + .map(|item| -> Result { + let sql = item.sql_call().eval( + &inner_visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + )?; + Ok(format!("{} {}", sql, item.direction())) + }) + .collect::, _>>()? + .join(", "); + format!("ORDER BY {sql}") + } else { + String::new() + }; + let partition_by = if self.partition.is_empty() { + String::new() + } else { + format!("PARTITION BY {} ", self.partition.join(", ")) + }; + Ok(format!("rank() OVER ({partition_by}{order_by})")) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_window_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_window_op.rs new file mode 100644 index 0000000000000..9399e27acb90b --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/multi_stage_window_op.rs @@ -0,0 +1,59 @@ +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{Op, OpCtx, OpExec}; + +/// Renders a multi-stage non-calculated measure as a windowed aggregate +/// over its `input_pipeline` (`agg(agg(input)) OVER (PARTITION BY …)`). +/// Other measures take the `else_pipeline` branch. Discards the tail — +/// each branch is a self-contained pipeline. +#[derive(Clone, Debug)] +pub struct MultiStageWindowOp { + input_pipeline: Vec, + else_pipeline: Vec, + partition: Vec, +} + +impl MultiStageWindowOp { + pub fn new(input_pipeline: Vec, else_pipeline: Vec, partition: Vec) -> Self { + Self { + input_pipeline, + else_pipeline, + partition, + } + } +} + +impl OpExec for MultiStageWindowOp { + fn is_terminal(&self) -> bool { + true + } + + fn nested_pipelines(&self) -> Vec<&[Op]> { + vec![&self.input_pipeline, &self.else_pipeline] + } + + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(m) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "MultiStageWindow op called for non-measure symbol".to_string(), + )); + }; + if !m.is_multi_stage() || m.is_calculated() { + return ctx.render_pipeline(&self.else_pipeline); + } + let measure_type = m.measure_type(); + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input_sql = ctx + .with_visitor(inner_visitor) + .render_pipeline(&self.input_pipeline)?; + let partition_by = if self.partition.is_empty() { + String::new() + } else { + format!("PARTITION BY {} ", self.partition.join(", ")) + }; + Ok(format!( + "{measure_type}({measure_type}({input_sql})) OVER ({partition_by})" + )) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/node_processor.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/node_processor.rs new file mode 100644 index 0000000000000..9531bb990cd6c --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/node_processor.rs @@ -0,0 +1,54 @@ +use crate::physical_plan::SqlEvaluatorVisitor; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::cell::OnceCell; +use std::rc::Rc; + +use super::{Op, OpCtx, OpExec}; + +/// `Rc`-shareable carrier of a validated `Vec`. Renders the SQL of a +/// member symbol by running the pipeline once via [`Self::to_sql`]. +#[derive(Debug)] +pub struct NodeProcessor { + ops: Vec, + /// Caches the structural validation result so a pipeline reused across + /// many renders pays the `validate_pipeline` cost only once. + validated: OnceCell<()>, +} + +impl NodeProcessor { + pub fn new(ops: Vec) -> Rc { + Rc::new(Self { + ops, + validated: OnceCell::new(), + }) + } + + pub fn to_sql( + &self, + visitor: &SqlEvaluatorVisitor, + node: &Rc, + query_tools: Rc, + node_processor: Rc, + templates: &PlanSqlTemplates, + ) -> Result { + if self.validated.get().is_none() { + Op::validate_pipeline(&self.ops)?; + let _ = self.validated.set(()); + } + let (op, tail) = self.ops.split_first().ok_or_else(|| { + CubeError::internal("NodeProcessor invoked with empty pipeline".to_string()) + })?; + let mut ctx = OpCtx { + visitor: visitor.clone(), + query_tools, + templates, + sym: node.clone(), + tail, + node_processor, + }; + op.exec(&mut ctx) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_ctx.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_ctx.rs new file mode 100644 index 0000000000000..9509c1bd46615 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_ctx.rs @@ -0,0 +1,89 @@ +use crate::physical_plan::SqlEvaluatorVisitor; +use crate::planner::query_tools::QueryTools; +use crate::planner::sql_templates::PlanSqlTemplates; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::rc::Rc; + +use super::{NodeProcessor, Op, OpExec}; + +/// State of one render in flight: the symbol being rendered, the visitor +/// and shared resources, the unprocessed `tail` of the current pipeline, +/// and the top-level `node_processor` to re-enter through for sub-arg +/// evaluation. +pub struct OpCtx<'a> { + pub visitor: SqlEvaluatorVisitor, + pub query_tools: Rc, + pub templates: &'a PlanSqlTemplates, + pub sym: Rc, + pub tail: &'a [Op], + pub node_processor: Rc, +} + +impl<'a> OpCtx<'a> { + /// Continue with the next op in the current pipeline. Errors if `tail` is + /// empty (i.e. the pipeline ended without a terminal op). + pub fn render_tail(&self) -> Result { + let (op, rest) = self.tail.split_first().ok_or_else(|| { + CubeError::internal( + "OpCtx::render_tail called on empty tail — pipeline missing terminal op" + .to_string(), + ) + })?; + let mut sub = OpCtx { + visitor: self.visitor.clone(), + query_tools: self.query_tools.clone(), + templates: self.templates, + sym: self.sym.clone(), + tail: rest, + node_processor: self.node_processor.clone(), + }; + op.exec(&mut sub) + } + + /// Run a separate sub-pipeline against the current symbol/visitor. The + /// slice may live for any lifetime shorter than the outer ctx's; the + /// templates reference is reborrowed to match. + pub fn render_pipeline<'b>(&self, ops: &'b [Op]) -> Result + where + 'a: 'b, + { + let (op, rest) = ops.split_first().ok_or_else(|| { + CubeError::internal("OpCtx::render_pipeline called with empty ops slice".to_string()) + })?; + let mut sub = OpCtx::<'b> { + visitor: self.visitor.clone(), + query_tools: self.query_tools.clone(), + templates: self.templates, + sym: self.sym.clone(), + tail: rest, + node_processor: self.node_processor.clone(), + }; + op.exec(&mut sub) + } + + /// Materialize the remaining pipeline as a standalone `NodeProcessor`, + /// suitable as a re-entry point for plumbing that must not recurse + /// through the current op (e.g. a filter rendered without the masking + /// wrapper). + /// + /// Cost: `O(tail_len)` Op clones plus one `Rc` allocation + /// per call. Cheap enough for cold paths; avoid on hot ones. + pub fn tail_as_node_processor(&self) -> Rc { + NodeProcessor::new(self.tail.to_vec()) + } + + /// Fresh ctx pointing at the same tail/symbol but with a swapped visitor + /// (e.g. a child render that needs different `arg_needs_paren_safe` or + /// `ignore_tz_convert` flags). + pub fn with_visitor(&self, visitor: SqlEvaluatorVisitor) -> OpCtx<'a> { + OpCtx { + visitor, + query_tools: self.query_tools.clone(), + templates: self.templates, + sym: self.sym.clone(), + tail: self.tail, + node_processor: self.node_processor.clone(), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_enum.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_enum.rs new file mode 100644 index 0000000000000..f82882b5143c3 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_enum.rs @@ -0,0 +1,264 @@ +use crate::physical_plan::sql_nodes::RenderReferences; +use crate::planner::planners::multi_stage::TimeShiftState; +use crate::planner::symbols::CalendarDimensionTimeShift; +use cubenativeutils::CubeError; +use std::collections::{HashMap, HashSet}; + +use super::{ + AutoPrefixOp, CalendarTimeShiftOp, CaseOp, DispatchByKindOp, EvaluateSymbolOp, FinalMeasureOp, + FinalPreAggregationMeasureOp, GeoDimensionOp, MaskedOp, MeasureFilterOp, MultiStageRankOp, + MultiStageWindowOp, OpCtx, OpExec, ParenthesizeOp, RenderReferencesOp, RollingWindowOp, + TimeDimensionOp, TimeShiftOp, UngroupedMeasureOp, UngroupedQueryFinalMeasureOp, +}; + +/// Declares the `Op` enum and forwards [`OpExec`] (`exec` + `is_terminal`) +/// to each variant via a single match — keeps the two parallel dispatch +/// tables in lockstep so adding a new op is a one-line change here. +macro_rules! define_op_enum { + ($( $variant:ident($ty:ty) ),+ $(,)?) => { + /// All op variants that participate in pipeline rendering. To add + /// one, append a `$variant($ty)` line below; the macro fills in + /// `OpExec for Op` and the `Clone`/`Debug` derives. A constructor + /// on `impl Op` is still added by hand because signatures vary. + #[derive(Clone, Debug)] + pub enum Op { + $( $variant($ty), )+ + } + + impl OpExec for Op { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + match self { + $( Op::$variant(o) => o.exec(ctx), )+ + } + } + + fn is_terminal(&self) -> bool { + match self { + $( Op::$variant(o) => o.is_terminal(), )+ + } + } + + fn nested_pipelines(&self) -> Vec<&[Op]> { + match self { + $( Op::$variant(o) => o.nested_pipelines(), )+ + } + } + } + }; +} + +define_op_enum! { + EvaluateSymbol(EvaluateSymbolOp), + Parenthesize(ParenthesizeOp), + AutoPrefix(AutoPrefixOp), + GeoDimension(GeoDimensionOp), + MeasureFilter(MeasureFilterOp), + RenderReferences(RenderReferencesOp), + Masked(MaskedOp), + Case(CaseOp), + DispatchByKind(DispatchByKindOp), + FinalMeasure(FinalMeasureOp), + FinalPreAggregationMeasure(FinalPreAggregationMeasureOp), + UngroupedMeasure(UngroupedMeasureOp), + UngroupedQueryFinalMeasure(UngroupedQueryFinalMeasureOp), + TimeDimension(TimeDimensionOp), + TimeShift(TimeShiftOp), + CalendarTimeShift(CalendarTimeShiftOp), + MultiStageRank(MultiStageRankOp), + MultiStageWindow(MultiStageWindowOp), + RollingWindow(RollingWindowOp), +} + +impl Op { + pub fn evaluate_symbol() -> Self { + Self::EvaluateSymbol(EvaluateSymbolOp) + } + + pub fn parenthesize() -> Self { + Self::Parenthesize(ParenthesizeOp) + } + + pub fn auto_prefix(cube_references: HashMap) -> Self { + Self::AutoPrefix(AutoPrefixOp::new(cube_references)) + } + + pub fn geo_dimension() -> Self { + Self::GeoDimension(GeoDimensionOp) + } + + pub fn measure_filter() -> Self { + Self::MeasureFilter(MeasureFilterOp) + } + + pub fn render_references(references: RenderReferences) -> Self { + Self::RenderReferences(RenderReferencesOp::new(references)) + } + + pub fn masked(ungrouped: bool) -> Self { + Self::Masked(MaskedOp::new(ungrouped)) + } + + pub fn case() -> Self { + Self::Case(CaseOp) + } + + pub fn dispatch_by_kind( + dimension: Vec, + time_dimension: Vec, + measure: Vec, + default: Vec, + ) -> Self { + Self::DispatchByKind(DispatchByKindOp::new( + dimension, + time_dimension, + measure, + default, + )) + } + + pub fn final_measure( + rendered_as_multiplied_measures: HashSet, + count_approx_as_state: bool, + ) -> Self { + Self::FinalMeasure(FinalMeasureOp::new( + rendered_as_multiplied_measures, + count_approx_as_state, + )) + } + + pub fn final_pre_aggregation_measure(references: RenderReferences) -> Self { + Self::FinalPreAggregationMeasure(FinalPreAggregationMeasureOp::new(references)) + } + + pub fn ungrouped_measure() -> Self { + Self::UngroupedMeasure(UngroupedMeasureOp) + } + + pub fn ungrouped_query_final_measure() -> Self { + Self::UngroupedQueryFinalMeasure(UngroupedQueryFinalMeasureOp) + } + + pub fn time_dimension(dimensions_with_ignored_timezone: HashSet) -> Self { + Self::TimeDimension(TimeDimensionOp::new(dimensions_with_ignored_timezone)) + } + + pub fn time_shift(shifts: TimeShiftState) -> Self { + Self::TimeShift(TimeShiftOp::new(shifts)) + } + + pub fn calendar_time_shift(shifts: HashMap) -> Self { + Self::CalendarTimeShift(CalendarTimeShiftOp::new(shifts)) + } + + pub fn multi_stage_rank(partition: Vec) -> Self { + Self::MultiStageRank(MultiStageRankOp::new(partition)) + } + + pub fn multi_stage_window( + input_pipeline: Vec, + else_pipeline: Vec, + partition: Vec, + ) -> Self { + Self::MultiStageWindow(MultiStageWindowOp::new( + input_pipeline, + else_pipeline, + partition, + )) + } + + pub fn rolling_window(input_pipeline: Vec, default_pipeline: Vec) -> Self { + Self::RollingWindow(RollingWindowOp::new(input_pipeline, default_pipeline)) + } +} + +impl Op { + /// Validate a pipeline: it must be non-empty, end with exactly one + /// terminal op, and contain no terminals before that. Recurses into + /// sub-pipelines carried by branching ops (`DispatchByKind`, + /// `MultiStageWindow`, `RollingWindow`). + pub fn validate_pipeline(ops: &[Op]) -> Result<(), CubeError> { + if ops.is_empty() { + return Err(CubeError::internal( + "Op pipeline is empty — needs at least one terminal op".to_string(), + )); + } + let last_idx = ops.len() - 1; + for (i, op) in ops.iter().enumerate() { + let terminal = op.is_terminal(); + if terminal && i != last_idx { + return Err(CubeError::internal(format!( + "Terminal op at position {} of {}; ops after it would be unreachable", + i, + ops.len() + ))); + } + if !terminal && i == last_idx { + return Err(CubeError::internal( + "Pipeline ends with a non-terminal op — render_tail will hit an empty tail at runtime".to_string(), + )); + } + for sub in op.nested_pipelines() { + Op::validate_pipeline(sub)?; + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn terminal() -> Op { + Op::evaluate_symbol() + } + + fn non_terminal() -> Op { + Op::parenthesize() + } + + #[test] + fn empty_pipeline_is_invalid() { + assert!(Op::validate_pipeline(&[]).is_err()); + } + + #[test] + fn pipeline_must_end_with_terminal() { + assert!(Op::validate_pipeline(&[non_terminal()]).is_err()); + assert!(Op::validate_pipeline(&[non_terminal(), non_terminal()]).is_err()); + } + + #[test] + fn terminal_in_the_middle_is_invalid() { + assert!(Op::validate_pipeline(&[terminal(), terminal()]).is_err()); + assert!(Op::validate_pipeline(&[terminal(), non_terminal(), terminal()]).is_err()); + } + + #[test] + fn valid_linear_pipeline() { + assert!(Op::validate_pipeline(&[terminal()]).is_ok()); + assert!(Op::validate_pipeline(&[non_terminal(), terminal()]).is_ok()); + assert!(Op::validate_pipeline(&[non_terminal(), non_terminal(), terminal()]).is_ok()); + } + + #[test] + fn nested_pipelines_are_validated_recursively() { + // DispatchByKind itself is terminal, so it can stand alone — but its + // four branches are independent pipelines that must each be valid. + let bad_branch = Op::dispatch_by_kind( + vec![non_terminal()], // missing terminal at the end + vec![terminal()], + vec![terminal()], + vec![terminal()], + ); + assert!(Op::validate_pipeline(&[bad_branch]).is_err()); + + let good = Op::dispatch_by_kind( + vec![non_terminal(), terminal()], + vec![terminal()], + vec![terminal()], + vec![terminal()], + ); + assert!(Op::validate_pipeline(&[good]).is_ok()); + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_exec.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_exec.rs new file mode 100644 index 0000000000000..2bb223168d49c --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/op_exec.rs @@ -0,0 +1,24 @@ +use cubenativeutils::CubeError; + +use super::{Op, OpCtx}; + +/// Behavior of a single op at render time. One impl per Op variant; the +/// dispatch on [`Op`] forwards to it. +pub trait OpExec { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result; + + /// Whether this op terminates a pipeline — i.e. it never calls + /// `render_tail`, so any ops after it in the same pipeline would be + /// unreachable. A well-formed pipeline ends with exactly one terminal + /// op; this is enforced by [`Op::validate_pipeline`]. + fn is_terminal(&self) -> bool { + false + } + + /// Sub-pipelines this op carries as data — branches of a kind dispatch, + /// the input/else legs of a multi-stage window, etc. Default is empty: + /// only branching ops override. + fn nested_pipelines(&self) -> Vec<&[Op]> { + Vec::new() + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/parenthesize_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/parenthesize_op.rs new file mode 100644 index 0000000000000..f15c7c4cbd958 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/parenthesize_op.rs @@ -0,0 +1,21 @@ +use crate::utils::sql_expression_scanner::is_top_level_compound; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Protects a compound expression from operator-precedence breakage when it +/// is being substituted into a position that expects an atomic argument. +#[derive(Clone, Debug)] +pub struct ParenthesizeOp; + +impl OpExec for ParenthesizeOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let needs_paren = ctx.visitor.arg_needs_paren_safe(); + let input_sql = ctx.render_tail()?; + if needs_paren && is_top_level_compound(&input_sql) { + Ok(format!("({})", input_sql)) + } else { + Ok(input_sql) + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/render_references_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/render_references_op.rs new file mode 100644 index 0000000000000..bcf08e18944e9 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/render_references_op.rs @@ -0,0 +1,43 @@ +use crate::physical_plan::sql_nodes::{RenderReferences, RenderReferencesType}; +use cubenativeutils::CubeError; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Reuses already-materialized columns (CTE outputs, pre-aggregation +/// fields) for known members instead of recomputing them from raw cube data. +#[derive(Clone, Debug)] +pub struct RenderReferencesOp { + references: Rc, +} + +impl RenderReferencesOp { + pub fn new(references: RenderReferences) -> Self { + Self { + references: Rc::new(references), + } + } +} + +impl OpExec for RenderReferencesOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let full_name = ctx.sym.full_name(); + match self.references.get(&full_name) { + Some(RenderReferencesType::QualifiedColumnName(column_name)) => { + let table_ref = if let Some(table_name) = column_name.source() { + format!("{}.", ctx.templates.quote_identifier(table_name)?) + } else { + String::new() + }; + Ok(format!( + "{}{}", + table_ref, + ctx.templates.quote_identifier(&column_name.name())? + )) + } + Some(RenderReferencesType::LiteralValue(value)) => ctx.templates.quote_string(value), + Some(RenderReferencesType::RawReferenceValue(value)) => Ok(value.clone()), + None => ctx.render_tail(), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/rolling_window_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/rolling_window_op.rs new file mode 100644 index 0000000000000..49e76492f4537 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/rolling_window_op.rs @@ -0,0 +1,70 @@ +use crate::planner::symbols::{AggregationType, MeasureKind}; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{Op, OpCtx, OpExec}; + +/// Aggregates a cumulative measure over its rolling window: SUM-able kinds +/// (`count`, `sum`, `running_total`, `min`, `max`, HLL approx) are wrapped +/// directly over `input_pipeline`; non-cumulative or unsupported kinds fall +/// back to `default_pipeline` for the regular aggregation path. Discards +/// the tail — each branch is a self-contained pipeline. +#[derive(Clone, Debug)] +pub struct RollingWindowOp { + input_pipeline: Vec, + default_pipeline: Vec, +} + +impl RollingWindowOp { + pub fn new(input_pipeline: Vec, default_pipeline: Vec) -> Self { + Self { + input_pipeline, + default_pipeline, + } + } +} + +impl OpExec for RollingWindowOp { + fn is_terminal(&self) -> bool { + true + } + + fn nested_pipelines(&self) -> Vec<&[Op]> { + vec![&self.input_pipeline, &self.default_pipeline] + } + + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(m) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "RollingWindow op called for non-measure symbol".to_string(), + )); + }; + if !m.is_cumulative() { + return ctx.render_pipeline(&self.default_pipeline); + } + let kind = m.kind().clone(); + let render_input = |c: &OpCtx<'_>| -> Result { + let inner_visitor = c.visitor.with_arg_needs_paren_safe(false); + c.with_visitor(inner_visitor) + .render_pipeline(&self.input_pipeline) + }; + match kind { + MeasureKind::Count(_) => Ok(format!("sum({})", render_input(ctx)?)), + MeasureKind::Aggregated(a) => match a.agg_type() { + AggregationType::CountDistinctApprox => { + ctx.templates.hll_cardinality_merge(render_input(ctx)?) + } + AggregationType::Sum | AggregationType::RunningTotal => { + Ok(format!("sum({})", render_input(ctx)?)) + } + AggregationType::Min | AggregationType::Max => { + Ok(format!("{}({})", a.agg_type().as_str(), render_input(ctx)?)) + } + AggregationType::Avg + | AggregationType::CountDistinct + | AggregationType::NumberAgg => ctx.render_pipeline(&self.default_pipeline), + }, + _ => ctx.render_pipeline(&self.default_pipeline), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_dimension_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_dimension_op.rs new file mode 100644 index 0000000000000..ad71f71aebb1d --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_dimension_op.rs @@ -0,0 +1,72 @@ +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; +use std::collections::HashSet; +use std::rc::Rc; + +use super::{OpCtx, OpExec}; + +/// Lifts time values into the user's timezone and applies any requested +/// granularity (`week`/`month`/calendar-cube SQL/...). Covers both +/// `TimeDimension` symbols and raw time-typed `Dimension` symbols (the +/// latter only when raw timezone conversion is enabled). +#[derive(Clone, Debug)] +pub struct TimeDimensionOp { + dimensions_with_ignored_timezone: Rc>, +} + +impl TimeDimensionOp { + pub fn new(dimensions_with_ignored_timezone: HashSet) -> Self { + Self { + dimensions_with_ignored_timezone: Rc::new(dimensions_with_ignored_timezone), + } + } +} + +impl OpExec for TimeDimensionOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + match ctx.sym.as_ref() { + MemberSymbol::TimeDimension(ev) => { + let Some(granularity_obj) = ev.granularity_obj() else { + return ctx.render_tail(); + }; + // Short-circuit to calendar SQL — the rest of the pipeline is + // not used. Outer visitor stays as-is: the calendar SQL is the + // expression itself, no further wrapping here. + if let Some(calendar_sql) = granularity_obj.calendar_sql() { + return calendar_sql.eval( + &ctx.visitor, + ctx.node_processor.clone(), + ctx.query_tools.clone(), + ctx.templates, + ); + } + let granularity_obj = granularity_obj.clone(); + let skip_convert_tz = self + .dimensions_with_ignored_timezone + .contains(&ev.full_name()); + // Wraps in `convert_tz(…)` and a granularity function — safe, + // reset paren-safe for the child render. + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input_sql = ctx.with_visitor(inner_visitor).render_tail()?; + let converted_tz = if skip_convert_tz { + input_sql + } else { + ctx.templates.convert_tz(input_sql)? + }; + granularity_obj.apply_to_input_sql(ctx.templates, converted_tz) + } + MemberSymbol::Dimension(ev) => { + let wraps_convert_tz = !ctx.visitor.ignore_tz_convert() + && ctx.query_tools.convert_tz_for_raw_time_dimension() + && ev.dimension_type() == "time"; + if !wraps_convert_tz { + return ctx.render_tail(); + } + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input_sql = ctx.with_visitor(inner_visitor).render_tail()?; + ctx.templates.convert_tz(input_sql) + } + _ => ctx.render_tail(), + } + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_shift_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_shift_op.rs new file mode 100644 index 0000000000000..2a66b8cc0103f --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/time_shift_op.rs @@ -0,0 +1,44 @@ +use crate::planner::planners::multi_stage::TimeShiftState; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Shifts a time-typed dimension by a configured interval (e.g. +/// "previous month") so multi-stage queries can compare aligned periods. +/// Dimensions without a configured shift fall through unchanged. +#[derive(Clone, Debug)] +pub struct TimeShiftOp { + shifts: TimeShiftState, +} + +impl TimeShiftOp { + pub fn new(shifts: TimeShiftState) -> Self { + Self { shifts } + } +} + +impl OpExec for TimeShiftOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Dimension(ev) = ctx.sym.as_ref() else { + return ctx.render_tail(); + }; + if ev.is_reference() || !ev.is_time() { + return ctx.render_tail(); + } + let Some(shift) = self.shifts.dimensions_shifts.get(&ev.full_name()) else { + return ctx.render_tail(); + }; + let Some(interval) = &shift.interval else { + return Err(CubeError::internal(format!( + "TimeShift op: dimension '{}' has a shift entry but no interval", + ev.full_name() + ))); + }; + let interval_sql = interval.to_sql(); + let inner_visitor = ctx.visitor.with_arg_needs_paren_safe(false); + let input = ctx.with_visitor(inner_visitor).render_tail()?; + let shifted = ctx.templates.add_timestamp_interval(input, interval_sql)?; + Ok(format!("({})", shifted)) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_measure_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_measure_op.rs new file mode 100644 index 0000000000000..1f5a6973ca722 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_measure_op.rs @@ -0,0 +1,22 @@ +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Renders a measure for an ungrouped query at the measure level — the +/// expression is left unaggregated, with the lone exception of `count(*)` +/// which becomes `1` per row. +#[derive(Clone, Debug)] +pub struct UngroupedMeasureOp; + +impl OpExec for UngroupedMeasureOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(_) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "UngroupedMeasure op called for non-measure symbol".to_string(), + )); + }; + let input = ctx.render_tail()?; + Ok(if input == "*" { "1".to_string() } else { input }) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_query_final_measure_op.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_query_final_measure_op.rs new file mode 100644 index 0000000000000..d4d080d2b1b24 --- /dev/null +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/op/ungrouped_query_final_measure_op.rs @@ -0,0 +1,45 @@ +use crate::planner::symbols::{AggregationType, MeasureKind}; +use crate::planner::MemberSymbol; +use cubenativeutils::CubeError; + +use super::{OpCtx, OpExec}; + +/// Renders a measure for an ungrouped query at the row level: count-likes +/// turn into `CASE WHEN IS NOT NULL THEN 1 END` so a downstream +/// aggregator can sum them, other measures pass through unchanged. +#[derive(Clone, Debug)] +pub struct UngroupedQueryFinalMeasureOp; + +impl OpExec for UngroupedQueryFinalMeasureOp { + fn exec(&self, ctx: &mut OpCtx<'_>) -> Result { + let MemberSymbol::Measure(ev) = ctx.sym.as_ref() else { + return Err(CubeError::internal( + "UngroupedQueryFinalMeasure op called for non-measure symbol".to_string(), + )); + }; + let is_count_like = match ev.kind() { + MeasureKind::Count(_) => true, + MeasureKind::Aggregated(a) => matches!( + a.agg_type(), + AggregationType::CountDistinct | AggregationType::CountDistinctApprox + ), + _ => false, + }; + // Count-likes wrap the child in `CASE WHEN … IS NOT NULL THEN 1 END` + // (safe), other kinds pass through and must propagate the flag. + let child_visitor = if is_count_like { + ctx.visitor.with_arg_needs_paren_safe(false) + } else { + ctx.visitor.clone() + }; + let input = ctx.with_visitor(child_visitor).render_tail()?; + Ok(if input == "*" { + "1".to_string() + } else if is_count_like { + // TODO: route through templates. + format!("CASE WHEN ({}) IS NOT NULL THEN 1 END", input) + } else { + input + }) + } +} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/original_sql_pre_aggregation.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/original_sql_pre_aggregation.rs deleted file mode 100644 index ca472e524e691..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/original_sql_pre_aggregation.rs +++ /dev/null @@ -1,73 +0,0 @@ -use super::SqlNode; -use crate::planner::query_tools::QueryTools; -use crate::planner::MemberSymbol; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::sql_templates::PlanSqlTemplates; -use cubenativeutils::CubeError; -use std::any::Any; -use std::collections::HashMap; -use std::rc::Rc; - -pub struct OriginalSqlPreAggregationSqlNode { - input: Rc, - original_sql_pre_aggregations: HashMap, -} - -impl OriginalSqlPreAggregationSqlNode { - pub fn new( - input: Rc, - original_pre_aggregations: HashMap, - ) -> Rc { - Rc::new(Self { - input, - original_sql_pre_aggregations: original_pre_aggregations, - }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for OriginalSqlPreAggregationSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::CubeTable(ev) => { - if let Some(original_sql_table_name) = - self.original_sql_pre_aggregations.get(ev.cube_name()) - { - format!("{}", original_sql_table_name) - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => { - return Err(CubeError::internal(format!( - "OriginalSqlPreAggregationSqlNode node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/parenthesize.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/parenthesize.rs deleted file mode 100644 index b7ebdb6b45173..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/parenthesize.rs +++ /dev/null @@ -1,62 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use crate::utils::sql_expression_scanner::is_top_level_compound; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -/// Wraps the child's rendered SQL in parentheses when the visitor signals that -/// the surrounding context expects a parentheses-safe argument (for example, a -/// `SqlCall` substitution into an arithmetic or logical position) and the -/// rendered expression is compound at the top level. -/// -/// Sits immediately above [`AutoPrefixSqlNode`] in the processor chain — the -/// lowest point where renaming is complete. Higher-layer nodes that wrap the -/// child's output in a syntactically safe construct (aggregate, window -/// function, CASE/DATE_TRUNC/CONVERT_TZ, etc.) should reset -/// `arg_needs_paren_safe` on the visitor before recursing, so this node avoids -/// scanning output that will be discarded. -pub struct ParenthesizeSqlNode { - input: Rc, -} - -impl ParenthesizeSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for ParenthesizeSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let input_sql = self - .input - .to_sql(visitor, node, query_tools, node_processor, templates)?; - if visitor.arg_needs_paren_safe() && is_top_level_compound(&input_sql) { - Ok(format!("({})", input_sql)) - } else { - Ok(input_sql) - } - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/render_references.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/render_references.rs index 8657a573dd8c8..6a24e34eda003 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/render_references.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/render_references.rs @@ -1,18 +1,10 @@ -use super::SqlNode; use crate::physical_plan::QualifiedColumnName; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; use std::collections::HashMap; -use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RawReferenceValue(pub String); -#[derive(Clone)] +#[derive(Clone, Debug)] pub enum RenderReferencesType { QualifiedColumnName(QualifiedColumnName), LiteralValue(String), @@ -37,7 +29,7 @@ impl From for RenderReferencesType { } } -#[derive(Default, Clone)] +#[derive(Default, Clone, Debug)] pub struct RenderReferences { references: HashMap, } @@ -59,65 +51,3 @@ impl RenderReferences { self.references.contains_key(name) } } - -pub struct RenderReferencesSqlNode { - input: Rc, - references: RenderReferences, -} - -impl RenderReferencesSqlNode { - pub fn new(input: Rc, references: RenderReferences) -> Rc { - Rc::new(Self { input, references }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for RenderReferencesSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let full_name = node.full_name(); - if let Some(reference) = self.references.get(&full_name) { - match reference { - RenderReferencesType::QualifiedColumnName(column_name) => { - let table_ref = if let Some(table_name) = column_name.source() { - format!("{}.", templates.quote_identifier(table_name)?) - } else { - format!("") - }; - Ok(format!( - "{}{}", - table_ref, - templates.quote_identifier(&column_name.name())? - )) - } - RenderReferencesType::LiteralValue(value) => templates.quote_string(value), - RenderReferencesType::RawReferenceValue(value) => Ok(value.clone()), - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ) - } - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/rolling_window.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/rolling_window.rs deleted file mode 100644 index 0acc52dab3b07..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/rolling_window.rs +++ /dev/null @@ -1,101 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::{AggregationType, MeasureKind}; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct RollingWindowNode { - input: Rc, - default_processor: Rc, -} - -impl RollingWindowNode { - pub fn new(input: Rc, default_processor: Rc) -> Rc { - Rc::new(Self { - input, - default_processor, - }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for RollingWindowNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(m) if m.is_cumulative() => { - let delegate = || { - self.default_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ) - }; - let render_input = || -> Result { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ) - }; - match m.kind() { - MeasureKind::Count(_) => format!("sum({})", render_input()?), - MeasureKind::Aggregated(a) => match a.agg_type() { - AggregationType::CountDistinctApprox => { - templates.hll_cardinality_merge(render_input()?)? - } - AggregationType::Sum | AggregationType::RunningTotal => { - format!("sum({})", render_input()?) - } - AggregationType::Min | AggregationType::Max => { - format!("{}({})", a.agg_type().as_str(), render_input()?) - } - AggregationType::Avg - | AggregationType::CountDistinct - | AggregationType::NumberAgg => delegate()?, - }, - _ => delegate()?, - } - } - MemberSymbol::Measure(_) => self.default_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor, - templates, - )?, - _ => { - return Err(CubeError::internal(format!( - "Unexpected evaluation node type for RollingWindowNode" - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/root_processor.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/root_processor.rs deleted file mode 100644 index 8d6abdc040a3d..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/root_processor.rs +++ /dev/null @@ -1,98 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct RootSqlNode { - dimension_processor: Rc, - time_dimesions_processor: Rc, - measure_processor: Rc, - default_processor: Rc, -} - -impl RootSqlNode { - pub fn new( - dimension_processor: Rc, - time_dimesions_processor: Rc, - measure_processor: Rc, - default_processor: Rc, - ) -> Rc { - Rc::new(Self { - dimension_processor, - time_dimesions_processor, - measure_processor, - default_processor, - }) - } - - pub fn dimension_processor(&self) -> &Rc { - &self.dimension_processor - } - - pub fn measure_processor(&self) -> &Rc { - &self.measure_processor - } - - pub fn default_processor(&self) -> &Rc { - &self.default_processor - } -} - -impl SqlNode for RootSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Dimension(_) => self.dimension_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - MemberSymbol::TimeDimension(_) => self.time_dimesions_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - MemberSymbol::Measure(_) => self.measure_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - _ => self.default_processor.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![ - self.dimension_processor.clone(), - self.measure_processor.clone(), - self.default_processor.clone(), - ] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/sql_node.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/sql_node.rs deleted file mode 100644 index bbff4d082e20d..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/sql_node.rs +++ /dev/null @@ -1,27 +0,0 @@ -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; - -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub trait SqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result; - - fn as_any(self: Rc) -> Rc; - - fn childs(&self) -> Vec>; -} - -pub trait CubeNameNode { - fn to_sql(&self, cube_name: &String) -> Result; -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_dimension.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_dimension.rs deleted file mode 100644 index 9095b6b90528e..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_dimension.rs +++ /dev/null @@ -1,123 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::collections::HashSet; -use std::rc::Rc; - -pub struct TimeDimensionNode { - dimensions_with_ignored_timezone: HashSet, - input: Rc, -} - -impl TimeDimensionNode { - pub fn new( - dimensions_with_ignored_timezone: HashSet, - input: Rc, - ) -> Rc { - Rc::new(Self { - dimensions_with_ignored_timezone, - input, - }) - } -} - -impl SqlNode for TimeDimensionNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - match node.as_ref() { - MemberSymbol::TimeDimension(ev) => { - if let Some(granularity_obj) = ev.granularity_obj() { - // Short-circuits to calendar SQL — `self.input` is not used. - // Propagate the outer visitor: the calendar SQL is the - // expression itself, not wrapped further here. - if let Some(calendar_sql) = granularity_obj.calendar_sql() { - return calendar_sql.eval( - visitor, - node_processor.clone(), - query_tools.clone(), - templates, - ); - } - // Wraps in `convert_tz(…)` and a granularity function — - // safe, reset for child render. - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input_sql = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let skip_convert_tz = self - .dimensions_with_ignored_timezone - .contains(&ev.full_name()); - - let converted_tz = if skip_convert_tz { - input_sql - } else { - templates.convert_tz(input_sql)? - }; - - Ok(granularity_obj.apply_to_input_sql(templates, converted_tz)?) - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ) - } - } - MemberSymbol::Dimension(ev) => { - let wraps_convert_tz = !visitor.ignore_tz_convert() - && query_tools.convert_tz_for_raw_time_dimension() - && ev.dimension_type() == "time"; - if wraps_convert_tz { - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input_sql = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - Ok(templates.convert_tz(input_sql)?) - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ) - } - } - _ => self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - ), - } - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_shift.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_shift.rs deleted file mode 100644 index da4e2d38da858..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/time_shift.rs +++ /dev/null @@ -1,87 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::planners::multi_stage::TimeShiftState; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct TimeShiftSqlNode { - shifts: TimeShiftState, - input: Rc, -} - -impl TimeShiftSqlNode { - pub fn new(shifts: TimeShiftState, input: Rc) -> Rc { - Rc::new(Self { shifts, input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for TimeShiftSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Dimension(ev) => { - if !ev.is_reference() && ev.is_time() { - if let Some(shift) = self.shifts.dimensions_shifts.get(&ev.full_name()) { - let shift = shift.interval.clone().unwrap().to_sql(); - let inner_visitor = visitor.with_arg_needs_paren_safe(false); - let input = self.input.to_sql( - &inner_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - let res = templates.add_timestamp_interval(input, shift)?; - format!("({})", res) - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } else { - self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )? - } - } - _ => self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?, - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_measure.rs deleted file mode 100644 index c3654e7f5de4f..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_measure.rs +++ /dev/null @@ -1,65 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct UngroupedMeasureSqlNode { - input: Rc, -} - -impl UngroupedMeasureSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for UngroupedMeasureSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(_) => { - let input = self.input.to_sql( - visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - - if input == "*" { - "1".to_string() - } else { - input - } - } - _ => { - return Err(CubeError::internal(format!( - "Measure filter node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_query_final_measure.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_query_final_measure.rs deleted file mode 100644 index d3e828a407ba5..0000000000000 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_nodes/ungroupped_query_final_measure.rs +++ /dev/null @@ -1,83 +0,0 @@ -use super::SqlNode; -use crate::physical_plan::SqlEvaluatorVisitor; -use crate::planner::query_tools::QueryTools; -use crate::planner::sql_templates::PlanSqlTemplates; -use crate::planner::symbols::{AggregationType, MeasureKind}; -use crate::planner::MemberSymbol; -use cubenativeutils::CubeError; -use std::any::Any; -use std::rc::Rc; - -pub struct UngroupedQueryFinalMeasureSqlNode { - input: Rc, -} - -impl UngroupedQueryFinalMeasureSqlNode { - pub fn new(input: Rc) -> Rc { - Rc::new(Self { input }) - } - - pub fn input(&self) -> &Rc { - &self.input - } -} - -impl SqlNode for UngroupedQueryFinalMeasureSqlNode { - fn to_sql( - &self, - visitor: &SqlEvaluatorVisitor, - node: &Rc, - query_tools: Rc, - node_processor: Rc, - templates: &PlanSqlTemplates, - ) -> Result { - let res = match node.as_ref() { - MemberSymbol::Measure(ev) => { - let is_count_like = match ev.kind() { - MeasureKind::Count(_) => true, - MeasureKind::Aggregated(a) => matches!( - a.agg_type(), - AggregationType::CountDistinct | AggregationType::CountDistinctApprox - ), - _ => false, - }; - // Count-likes wrap the child in `CASE WHEN … IS NOT NULL THEN 1 END` - // (safe), other kinds pass through and must propagate the flag. - let child_visitor = if is_count_like { - visitor.with_arg_needs_paren_safe(false) - } else { - visitor.clone() - }; - let input = self.input.to_sql( - &child_visitor, - node, - query_tools.clone(), - node_processor.clone(), - templates, - )?; - - if input == "*" { - "1".to_string() - } else if is_count_like { - format!("CASE WHEN ({}) IS NOT NULL THEN 1 END", input) //TODO templates!! - } else { - input - } - } - _ => { - return Err(CubeError::internal(format!( - "Measure filter node processor called for wrong node", - ))); - } - }; - Ok(res) - } - - fn as_any(self: Rc) -> Rc { - self.clone() - } - - fn childs(&self) -> Vec> { - vec![self.input.clone()] - } -} diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_visitor.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_visitor.rs index 7b0f6876b9aa2..cec4ff113d1ad 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_visitor.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/sql_visitor.rs @@ -1,4 +1,4 @@ -use super::sql_nodes::SqlNode; +use super::sql_nodes::NodeProcessor; use super::CubeRefEvaluator; use crate::planner::filter::Filter; use crate::planner::query_tools::QueryTools; @@ -58,7 +58,7 @@ impl SqlEvaluatorVisitor { pub fn apply( &self, node: &Rc, - node_processor: Rc, + node_processor: Rc, templates: &PlanSqlTemplates, ) -> Result { let result = node_processor.to_sql( @@ -76,7 +76,7 @@ impl SqlEvaluatorVisitor { pub fn apply_for_filter( &self, node: &Rc, - node_processor: Rc, + node_processor: Rc, templates: &PlanSqlTemplates, ) -> Result { self.with_ignore_tz_convert() @@ -90,7 +90,7 @@ impl SqlEvaluatorVisitor { pub fn evaluate_cube_ref( &self, cube_ref: &CubeRef, - node_processor: Rc, + node_processor: Rc, templates: &PlanSqlTemplates, ) -> Result { self.cube_ref_evaluator.evaluate( diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/symbols/to_sql.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/symbols/to_sql.rs index 2baec43b099d4..1b7af0abb52c2 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/symbols/to_sql.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/symbols/to_sql.rs @@ -1,4 +1,4 @@ -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; @@ -8,7 +8,7 @@ use std::rc::Rc; pub struct MemberSqlContext<'a> { pub visitor: &'a SqlEvaluatorVisitor, - pub node_processor: &'a Rc, + pub node_processor: &'a Rc, pub query_tools: &'a Rc, pub templates: &'a PlanSqlTemplates, pub name: &'a str, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/visitor_context.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/visitor_context.rs index 0b8f51b445947..7ad411d9a66b7 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/visitor_context.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/physical_plan/visitor_context.rs @@ -1,5 +1,5 @@ use crate::physical_plan::cube_ref_evaluator::CubeRefEvaluator; -use crate::physical_plan::sql_nodes::{SqlNode, SqlNodesFactory}; +use crate::physical_plan::sql_nodes::{NodeProcessor, SqlNodesFactory}; use crate::physical_plan::sql_visitor::SqlEvaluatorVisitor; use crate::planner::filter::Filter; use crate::planner::query_tools::QueryTools; @@ -12,7 +12,7 @@ use std::rc::Rc; pub struct VisitorContext { query_tools: Rc, - node_processor: Rc, + node_processor: Rc, cube_ref_evaluator: Rc, all_filters: Option, //To pass to FILTER_PARAMS and FILTER_GROUP filters_context: FiltersContext, @@ -63,7 +63,7 @@ impl VisitorContext { ) } - pub fn node_processor(&self) -> Rc { + pub fn node_processor(&self) -> Rc { self.node_processor.clone() } diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/sql_call.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/sql_call.rs index 85063a0740733..d6aa60bd30e99 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/sql_call.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/sql_call.rs @@ -1,6 +1,6 @@ use super::symbols::MemberSymbol; use crate::cube_bridge::member_sql::{FilterParamsColumn, SecutityContextProps, SqlTemplate}; -use crate::physical_plan::sql_nodes::{SqlNode, SqlNodesFactory}; +use crate::physical_plan::sql_nodes::{NodeProcessor, SqlNodesFactory}; use crate::physical_plan::{SqlEvaluatorVisitor, VisitorContext}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; @@ -157,7 +157,7 @@ impl SqlCall { pub fn eval( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, ) -> Result { @@ -182,7 +182,7 @@ impl SqlCall { pub fn eval_vec( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, ) -> Result, CubeError> { @@ -245,7 +245,7 @@ impl SqlCall { fn prepare_template_params( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: &Rc, templates: &PlanSqlTemplates, ) -> Result<(Vec, Vec, Vec, Vec), CubeError> { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/cube_symbol.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/cube_symbol.rs index 33cb7c129c236..ddce92cea3ab3 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/cube_symbol.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/cube_symbol.rs @@ -1,7 +1,7 @@ use crate::cube_bridge::cube_definition::CubeDefinition; use crate::cube_bridge::evaluator::CubeEvaluator; use crate::cube_bridge::member_sql::MemberSql; -use crate::physical_plan::sql_nodes::SqlNode; +use crate::physical_plan::sql_nodes::NodeProcessor; use crate::physical_plan::SqlEvaluatorVisitor; use crate::planner::query_tools::QueryTools; use crate::planner::sql_templates::PlanSqlTemplates; @@ -103,7 +103,7 @@ impl CubeTableSymbol { pub fn evaluate_sql( &self, visitor: &SqlEvaluatorVisitor, - node_processor: Rc, + node_processor: Rc, query_tools: Rc, templates: &PlanSqlTemplates, ) -> Result { diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs index f715a086ce876..e6d90c1885229 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/planner/symbols/dimension_symbol.rs @@ -16,7 +16,7 @@ use crate::planner::{Compiler, CubeRef, SqlCall}; use cubenativeutils::CubeError; use std::rc::Rc; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct CalendarDimensionTimeShift { pub interval: Option, pub name: Option, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/auto_parentheses.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/auto_parentheses.rs index 703869adb4899..78dedaffe4314 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/auto_parentheses.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/auto_parentheses.rs @@ -150,8 +150,8 @@ fn calculated_boolean_combined_wraps_compound_dep() { #[test] fn measure_with_case_definition_renders_safely() { let ctx = ctx(); - // `CaseSqlNode` wraps the whole result in `CASE … END`. No substituted - // deps here, but we verify the node's reset path doesn't break anything. + // The full result is wrapped in `CASE … END`, so any substituted args + // are inside a syntactically safe scope and don't need extra parens. let sql = measure_sql(&ctx, "expr_cube.measure_case"); assert_eq!( sql, diff --git a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/cube_evaluator/symbol_evaluator.rs b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/cube_evaluator/symbol_evaluator.rs index 17d40c6aae6cb..0e5571632165d 100644 --- a/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/cube_evaluator/symbol_evaluator.rs +++ b/rust/cube/cubesqlplanner/cubesqlplanner/src/tests/cube_evaluator/symbol_evaluator.rs @@ -252,7 +252,7 @@ fn masked_measure_returns_mask_literal() { let symbol = context.create_measure("masking_cube.count").unwrap(); let sql = context.evaluate_symbol(&symbol).unwrap(); - // FinalMeasureSqlNode skips aggregation for masked measures + // Masked measures skip aggregation and surface the mask literal as-is. assert_eq!(sql, "(12345)"); }