Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class CalcitePlanContext {
private static final ThreadLocal<Boolean> legacyPreferredFlag =
ThreadLocal.withInitial(() -> true);

/** Thread-local QueryType for use by functions that lack access to CalcitePlanContext. */
private static final ThreadLocal<QueryType> currentQueryType =
ThreadLocal.withInitial(() -> QueryType.PPL);

@Getter @Setter private HighlightConfig highlightConfig;
@Getter @Setter private boolean isResolvingJoinCondition = false;
@Getter @Setter private boolean isResolvingSubquery = false;
Expand Down Expand Up @@ -78,6 +82,7 @@ private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType
this.config = config;
this.sysLimit = sysLimit;
this.queryType = queryType;
currentQueryType.set(queryType);
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
this.rexBuilder = new ExtendedRexBuilder(relBuilder.getRexBuilder());
Expand Down Expand Up @@ -158,6 +163,7 @@ public static void run(Runnable action, Settings settings) {
action.run();
} finally {
legacyPreferredFlag.remove();
currentQueryType.remove();
}
}

Expand All @@ -168,6 +174,13 @@ public static boolean isLegacyPreferred() {
return legacyPreferredFlag.get();
}

/**
* @return the current QueryType from thread-local context.
*/
public static QueryType getCurrentQueryType() {
return currentQueryType.get();
}

public void putRexLambdaRefMap(Map<String, RexLambdaRef> candidateMap) {
this.rexLambdaRefMap.putAll(candidateMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
Expand Down Expand Up @@ -246,7 +247,7 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
if (nameResolver.getSchemaName().equals(INFORMATION_SCHEMA_NAME)) {
throw new CalciteUnsupportedException("information_schema is unsupported in Calcite");
}
context.relBuilder.scan(node.getTableQualifiedName().getParts());
context.relBuilder.scan(List.of(nameResolver.getIdentifierName()));
RelNode scan = context.relBuilder.peek();

// Eagerly push down highlight config to the scan (highlight is a scan hint, not an operator)
Expand Down Expand Up @@ -541,6 +542,16 @@ private List<RexNode> expandProjectFields(
.filter(addedFields::add)
.forEach(field -> expandedFields.add(context.relBuilder.field(field)));
}
case Alias alias -> {
String aliasName =
Strings.isNullOrEmpty(alias.getAlias()) ? alias.getName() : alias.getAlias();
if (alias.getDelegated() instanceof AggregateFunction
&& currentFields.contains(aliasName)) {
expandedFields.add(context.relBuilder.field(aliasName));
} else {
expandedFields.add(rexVisitor.analyze(alias, context));
}
}
default ->
throw new IllegalStateException(
"Unexpected expression type in project list: " + expr.getClass().getSimpleName());
Expand Down Expand Up @@ -763,6 +774,13 @@ public RelNode visitHead(Head node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitLimit(Limit node, CalcitePlanContext context) {
visitChildren(node, context);
context.relBuilder.limit(node.getOffset(), node.getLimit());
return context.relBuilder.peek();
}

/**
* Insert a reversed sort node after finding the original sort in the tree. This rebuilds the tree
* with the reversed sort inserted right after the original sort.
Expand Down Expand Up @@ -1621,7 +1639,9 @@ private Pair<List<RexNode>, List<AggCall>> resolveAttributesForAggregation(
@Override
public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) {
Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList());
Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue();
Literal bucketNullableLit = statsArgs.get(Argument.BUCKET_NULLABLE);
Boolean bucketNullable =
bucketNullableLit != null ? (Boolean) bucketNullableLit.getValue() : true;
int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0);
BitSet nonNullGroupMask = new BitSet(nGroup);
if (!bucketNullable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
Expand All @@ -42,6 +43,7 @@
import org.apache.calcite.util.TimestampString;
import org.apache.logging.log4j.util.Strings;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.And;
import org.opensearch.sql.ast.expression.Between;
Expand Down Expand Up @@ -83,13 +85,18 @@
import org.opensearch.sql.exception.CalciteUnsupportedException;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLFuncImpTable;

@RequiredArgsConstructor
public class CalciteRexNodeVisitor extends AbstractNodeVisitor<RexNode, CalcitePlanContext> {
private final CalciteRelNodeVisitor planVisitor;

private static final Set<BuiltinFunctionName> PURE_WINDOW_FUNCTIONS =
Set.of(
BuiltinFunctionName.ROW_NUMBER, BuiltinFunctionName.RANK, BuiltinFunctionName.DENSE_RANK);

public RexNode analyze(UnresolvedExpression unresolved, CalcitePlanContext context) {
return unresolved.accept(this, context);
}
Expand Down Expand Up @@ -338,8 +345,14 @@ public RexNode visitQualifiedName(QualifiedName node, CalcitePlanContext context
public RexNode visitAlias(Alias node, CalcitePlanContext context) {
RexNode expr = analyze(node.getDelegated(), context);
// Only OpenSearch SQL uses node.getAlias, OpenSearch PPL uses node.getName.
return context.relBuilder.alias(
expr, Strings.isEmpty(node.getAlias()) ? node.getName() : node.getAlias());
String aliasName = Strings.isEmpty(node.getAlias()) ? node.getName() : node.getAlias();
// For SQL queries, encode expression name and alias in field name so the response
// builder can reconstruct both (V2 compatibility: name=expr, alias=AS-alias).
if (context.queryType == QueryType.SQL && !Strings.isEmpty(node.getAlias())) {
String exprName = node.getName() != null ? node.getName() : aliasName;
aliasName = exprName + "\u0000" + node.getAlias();
}
return context.relBuilder.alias(expr, aliasName);
}

@Override
Expand Down Expand Up @@ -563,47 +576,93 @@ public RexNode visitFunction(Function node, CalcitePlanContext context) {

@Override
public RexNode visitWindowFunction(WindowFunction node, CalcitePlanContext context) {
Function windowFunction = (Function) node.getFunction();
List<RexNode> arguments =
windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
String funcName;
List<RexNode> arguments;
final boolean isDistinct;
if (node.getFunction() instanceof AggregateFunction aggFunc) {
funcName = aggFunc.getFuncName();
isDistinct = Boolean.TRUE.equals(aggFunc.getDistinct());
List<UnresolvedExpression> argExprs = new java.util.ArrayList<>();
if (aggFunc.getField() != null) {
argExprs.add(aggFunc.getField());
}
argExprs.addAll(aggFunc.getArgList());
arguments = argExprs.stream().map(arg -> analyze(arg, context)).toList();
} else {
Function windowFunction = (Function) node.getFunction();
funcName = windowFunction.getFuncName();
isDistinct = false;
arguments = windowFunction.getFuncArgs().stream().map(arg -> analyze(arg, context)).toList();
}
List<RexNode> partitions =
node.getPartitionByList().stream()
.map(arg -> analyze(arg, context))
.map(this::extractRexNodeFromAlias)
.toList();
return BuiltinFunctionName.ofWindowFunction(windowFunction.getFuncName())
List<RexNode> orderKeys =
node.getSortList().stream()
.map(
pair -> {
RexNode sortField = analyze(pair.getRight(), context);
if (pair.getLeft().getSortOrder()
== org.opensearch.sql.ast.tree.Sort.SortOrder.DESC) {
sortField = context.relBuilder.desc(sortField);
}
if (pair.getLeft().getNullOrder()
== org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST) {
sortField = context.relBuilder.nullsLast(sortField);
} else if (pair.getLeft().getNullOrder()
== org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST) {
sortField = context.relBuilder.nullsFirst(sortField);
}
return sortField;
})
.toList();
return BuiltinFunctionName.ofWindowFunction(funcName)
.map(
functionName -> {
RexNode field = arguments.isEmpty() ? null : arguments.getFirst();
List<RexNode> args =
(arguments.isEmpty() || arguments.size() == 1)
? Collections.emptyList()
: arguments.subList(1, arguments.size());
// Pure window functions (ROW_NUMBER, RANK, DENSE_RANK) are not registered
// in aggFunctionRegistry, so skip validation for them.
if (PURE_WINDOW_FUNCTIONS.contains(functionName)) {
return PlanUtils.makeOver(
context,
functionName,
field,
args,
partitions,
orderKeys,
node.getWindowFrame());
}
List<RexNode> nodes =
PPLFuncImpTable.INSTANCE.validateAggFunctionSignature(
functionName, field, args, context.rexBuilder);
return nodes != null
? PlanUtils.makeOver(
context,
functionName,
isDistinct,
nodes.getFirst(),
nodes.size() <= 1 ? Collections.emptyList() : nodes.subList(1, nodes.size()),
partitions,
List.of(),
orderKeys,
node.getWindowFrame())
: PlanUtils.makeOver(
context,
functionName,
isDistinct,
field,
args,
partitions,
List.of(),
orderKeys,
node.getWindowFrame());
})
.orElseThrow(
() ->
new UnsupportedOperationException(
"Unexpected window function: " + windowFunction.getFuncName()));
() -> new UnsupportedOperationException("Unexpected window function: " + funcName));
}

/** extract the expression of Alias from a node */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ static RexNode makeOver(
List<RexNode> partitions,
List<RexNode> orderKeys,
@Nullable WindowFrame windowFrame) {
return makeOver(
context, functionName, false, field, argList, partitions, orderKeys, windowFrame);
}

static RexNode makeOver(
CalcitePlanContext context,
BuiltinFunctionName functionName,
boolean distinct,
RexNode field,
List<RexNode> argList,
List<RexNode> partitions,
List<RexNode> orderKeys,
@Nullable WindowFrame windowFrame) {
if (windowFrame == null) {
windowFrame = WindowFrame.rowsUnbounded();
}
Expand Down Expand Up @@ -216,6 +229,22 @@ static RexNode makeOver(
true,
lowerBound,
upperBound);
case RANK:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.RANK),
partitions,
orderKeys,
true,
lowerBound,
upperBound);
case DENSE_RANK:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.DENSE_RANK),
partitions,
orderKeys,
true,
lowerBound,
upperBound);
case NTH_VALUE:
return withOver(
context.relBuilder.aggregateCall(SqlStdOperatorTable.NTH_VALUE, field, argList.get(0)),
Expand All @@ -226,7 +255,7 @@ static RexNode makeOver(
upperBound);
default:
return withOver(
makeAggCall(context, functionName, false, field, argList),
makeAggCall(context, functionName, distinct, field, argList),
partitions,
orderKeys,
rows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,10 @@ private boolean isCalciteEnabled(Settings settings) {
}

// TODO https://github.com/opensearch-project/sql/issues/3457
// Calcite is not available for SQL query now. Maybe release in 3.1.0?
// TODO: PoC only — in production, SQL routes through unified query API (RestUnifiedQueryAction),
// not through this flag. Remove this change when migrating to the unified query path.
private boolean shouldUseCalcite(QueryType queryType) {
return isCalciteEnabled(settings) && queryType == QueryType.PPL;
return isCalciteEnabled(settings) && (queryType == QueryType.PPL || queryType == QueryType.SQL);
}

private FrameworkConfig buildFrameworkConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ public enum BuiltinFunctionName {
.put("dc", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("distinct_count", BuiltinFunctionName.DISTINCT_COUNT_APPROX)
.put("pattern", BuiltinFunctionName.INTERNAL_PATTERN)
.put("row_number", BuiltinFunctionName.ROW_NUMBER)
.put("rank", BuiltinFunctionName.RANK)
.put("dense_rank", BuiltinFunctionName.DENSE_RANK)
.put("percentile", BuiltinFunctionName.PERCENTILE_APPROX)
.put("percentile_approx", BuiltinFunctionName.PERCENTILE_APPROX)
.build();

public static Optional<BuiltinFunctionName> of(String str) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
RELEVANCE_QUERY_FUNCTION_INSTANCE.toUDF("query_string", false);
public static final SqlOperator MULTI_MATCH =
RELEVANCE_QUERY_FUNCTION_INSTANCE.toUDF("multi_match", false);
public static final SqlOperator WILDCARD_QUERY =
RELEVANCE_QUERY_FUNCTION_INSTANCE.toUDF("wildcard_query");
public static final SqlOperator NUMBER_TO_STRING =
new NumberToStringFunction().toUDF("NUMBER_TO_STRING");
public static final SqlOperator TONUMBER = new ToNumberFunction().toUDF("TONUMBER");
Expand Down
Loading
Loading