diff --git a/.kiro/settings/lsp.json b/.kiro/settings/lsp.json new file mode 100644 index 00000000000..6ccf4c68e33 --- /dev/null +++ b/.kiro/settings/lsp.json @@ -0,0 +1,198 @@ +{ + "languages": { + "cpp": { + "name": "clangd", + "command": "clangd", + "args": [ + "--background-index" + ], + "file_extensions": [ + "cpp", + "cc", + "cxx", + "c", + "h", + "hpp", + "hxx" + ], + "project_patterns": [ + "CMakeLists.txt", + "compile_commands.json", + "Makefile" + ], + "exclude_patterns": [ + "**/build/**", + "**/cmake-build-**/**" + ], + "multi_workspace": false, + "initialization_options": {}, + "request_timeout_secs": 60 + }, + "rust": { + "name": "rust-analyzer", + "command": "rust-analyzer", + "args": [], + "file_extensions": [ + "rs" + ], + "project_patterns": [ + "Cargo.toml" + ], + "exclude_patterns": [ + "**/target/**" + ], + "multi_workspace": false, + "initialization_options": { + "cargo": { + "buildScripts": { + "enable": true + } + }, + "diagnostics": { + "enable": true, + "enableExperimental": true + }, + "workspace": { + "symbol": { + "search": { + "scope": "workspace" + } + } + } + }, + "request_timeout_secs": 60 + }, + "typescript": { + "name": "typescript-language-server", + "command": "typescript-language-server", + "args": [ + "--stdio" + ], + "file_extensions": [ + "ts", + "js", + "tsx", + "jsx" + ], + "project_patterns": [ + "package.json", + "tsconfig.json" + ], + "exclude_patterns": [ + "**/node_modules/**", + "**/dist/**" + ], + "multi_workspace": false, + "initialization_options": { + "preferences": { + "disableSuggestions": false + } + }, + "request_timeout_secs": 60 + }, + "java": { + "name": "jdtls", + "command": "jdtls", + "args": [], + "file_extensions": [ + "java" + ], + "project_patterns": [ + "pom.xml", + "build.gradle", + "build.gradle.kts", + ".project" + ], + "exclude_patterns": [ + "**/target/**", + "**/build/**", + "**/.gradle/**" + ], + "multi_workspace": false, + "initialization_options": { + "settings": { + "java": { + "compile": { + "nullAnalysis": { + "mode": "automatic" + } + }, + "configuration": { + "annotationProcessing": { + "enabled": true + } + } + } + } + }, + "request_timeout_secs": 60 + }, + "go": { + "name": "gopls", + "command": "gopls", + "args": [], + "file_extensions": [ + "go" + ], + "project_patterns": [ + "go.mod", + "go.sum" + ], + "exclude_patterns": [ + "**/vendor/**" + ], + "multi_workspace": false, + "initialization_options": { + "usePlaceholders": true, + "completeUnimported": true + }, + "request_timeout_secs": 60 + }, + "python": { + "name": "pyright", + "command": "pyright-langserver", + "args": [ + "--stdio" + ], + "file_extensions": [ + "py" + ], + "project_patterns": [ + "pyproject.toml", + "setup.py", + "requirements.txt", + "pyrightconfig.json" + ], + "exclude_patterns": [ + "**/__pycache__/**", + "**/venv/**", + "**/.venv/**", + "**/.pytest_cache/**" + ], + "multi_workspace": false, + "initialization_options": {}, + "request_timeout_secs": 60 + }, + "ruby": { + "name": "solargraph", + "command": "solargraph", + "args": [ + "stdio" + ], + "file_extensions": [ + "rb" + ], + "project_patterns": [ + "Gemfile", + "Rakefile" + ], + "exclude_patterns": [ + "**/vendor/**", + "**/tmp/**" + ], + "multi_workspace": false, + "initialization_options": {}, + "request_timeout_secs": 60 + } + } +} \ No newline at end of file diff --git a/api/README.md b/api/README.md index 91651aa3153..e9fa594ecb7 100644 --- a/api/README.md +++ b/api/README.md @@ -8,7 +8,7 @@ This module provides components organized into two main areas aligned with the [ ### Unified Language Specification -- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language) queries and returns Calcite `RelNode` logical plans as intermediate representation. +- **`UnifiedQueryPlanner`**: Accepts PPL (Piped Processing Language), OpenSearch SQL, and ANSI SQL queries and returns Calcite `RelNode` logical plans as intermediate representation. - **`UnifiedQueryTranspiler`**: Converts Calcite logical plans (`RelNode`) into SQL strings for various target databases using different SQL dialects. ### Unified Execution Runtime @@ -17,7 +17,7 @@ This module provides components organized into two main areas aligned with the [ - **`UnifiedFunction`**: Engine-agnostic function interface that enables functions to be evaluated across different execution engines without engine-specific code duplication. - **`UnifiedFunctionRepository`**: Repository for discovering and loading functions as `UnifiedFunction` instances, providing a bridge between function definitions and external execution engines. -Together, these components enable complete workflows: parse PPL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines. +Together, these components enable complete workflows: parse PPL, SQL, or ANSI SQL queries into logical plans, transpile those plans into target database SQL, compile and execute queries directly, or export PPL functions for use in external execution engines. ### Experimental API Design @@ -32,27 +32,53 @@ Together, these components enable complete workflows: parse PPL queries into log Create a context with catalog configuration, query type, and optional settings: ```java -UnifiedQueryContext context = UnifiedQueryContext.builder() +// PPL +UnifiedQueryContext pplContext = UnifiedQueryContext.builder() .language(QueryType.PPL) .catalog("opensearch", opensearchSchema) - .catalog("spark_catalog", sparkSchema) .defaultNamespace("opensearch") - .cacheMetadata(true) - .setting("plugins.query.size_limit", 200) + .build(); + +// OpenSearch SQL (default — uses ANTLR parser with OpenSearch UDFs like match()) +UnifiedQueryContext sqlContext = UnifiedQueryContext.builder() + .language(QueryType.SQL) + .catalog("opensearch", opensearchSchema) + .defaultNamespace("opensearch") + .build(); + +// Standard SQL via Calcite's native parser (set conformance to opt in) +UnifiedQueryContext calciteSqlContext = UnifiedQueryContext.builder() + .language(QueryType.SQL) + .conformance(SqlConformanceEnum.DEFAULT) + .catalog("opensearch", opensearchSchema) + .defaultNamespace("opensearch") + .build(); + +// MySQL-compatible SQL +UnifiedQueryContext mysqlContext = UnifiedQueryContext.builder() + .language(QueryType.SQL) + .conformance(SqlConformanceEnum.MYSQL_5) + .catalog("opensearch", opensearchSchema) + .defaultNamespace("opensearch") .build(); ``` ### UnifiedQueryPlanner -Use `UnifiedQueryPlanner` to parse and analyze PPL queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries. +Use `UnifiedQueryPlanner` to parse and analyze queries into Calcite logical plans. The planner accepts a `UnifiedQueryContext` and can be reused for multiple queries within the same language. ```java -// Create planner with context -UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); +// PPL +UnifiedQueryPlanner planner = new UnifiedQueryPlanner(pplContext); +RelNode plan = planner.plan("source = logs | where status = 200"); + +// OpenSearch SQL (supports OpenSearch UDFs like match, match_phrase, etc.) +UnifiedQueryPlanner sqlPlanner = new UnifiedQueryPlanner(sqlContext); +RelNode plan = sqlPlanner.plan("SELECT * FROM logs WHERE match(message, 'error')"); -// Plan multiple queries (context is reused) -RelNode plan1 = planner.plan("source = logs | where status = 200"); -RelNode plan2 = planner.plan("source = metrics | stats avg(cpu)"); +// Standard SQL via Calcite's native parser (conformance set on context) +UnifiedQueryPlanner calcitePlanner = new UnifiedQueryPlanner(calciteSqlContext); +RelNode plan = calcitePlanner.plan("SELECT \"name\", COUNT(*) FROM \"logs\" GROUP BY \"name\""); ``` ### UnifiedQueryTranspiler @@ -226,5 +252,4 @@ public class MySchema extends AbstractSchema { ## Future Work -- Expand support to SQL language. - Extend planner to generate optimized physical plans using Calcite's optimization frameworks. diff --git a/api/build.gradle b/api/build.gradle index fb4cafe79d8..df82de62b11 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -13,6 +13,7 @@ plugins { dependencies { api project(':ppl') + api project(':sql') testImplementation testFixtures(project(':api')) testImplementation group: 'junit', name: 'junit', version: '4.13.2' diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java index 3e0a1f972bd..ad0754b7c23 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java @@ -13,13 +13,24 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import lombok.Getter; import lombok.Value; import org.apache.calcite.jdbc.CalciteSchema; import org.apache.calcite.plan.RelTraitDef; import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider; +import org.apache.calcite.rel.rules.CoreRules; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.avatica.util.Casing; +import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.SqlBasicFunction; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Programs; @@ -42,6 +53,13 @@ public class UnifiedQueryContext implements AutoCloseable { /** Settings containing execution limits and feature flags used by parsers and planners. */ Settings settings; + /** + * SQL conformance level for Calcite's native SQL parser. When null and language is SQL, the + * OpenSearch ANTLR-based SQL parser is used. When set, Calcite's native SqlParser is used with + * this conformance level, enabling dialect-specific SQL parsing (e.g., MySQL, Oracle, BigQuery). + */ + @Getter SqlConformance conformance; + /** * Closes the underlying resource managed by this context. * @@ -62,6 +80,7 @@ public static Builder builder() { /** Builder that constructs UnifiedQueryContext. */ public static class Builder { private QueryType queryType; + private SqlConformance conformance; private final Map catalogs = new HashMap<>(); private String defaultNamespace; private boolean cacheMetadata = false; @@ -80,7 +99,7 @@ public static class Builder { /** * Sets the query language frontend to be used. * - * @param queryType the {@link QueryType}, such as PPL + * @param queryType the {@link QueryType}, such as PPL or SQL * @return this builder instance */ public Builder language(QueryType queryType) { @@ -88,6 +107,20 @@ public Builder language(QueryType queryType) { return this; } + /** + * Sets the SQL conformance level for Calcite's native SQL parser. Only applicable when language + * is {@link QueryType#SQL}. When set, bypasses the OpenSearch ANTLR parser and uses Calcite's + * built-in SqlParser with the specified conformance (e.g., {@code + * SqlConformanceEnum.MYSQL_5}). + * + * @param conformance the Calcite {@link SqlConformance} level + * @return this builder instance + */ + public Builder conformance(SqlConformance conformance) { + this.conformance = conformance; + return this; + } + /** * Registers a catalog with the specified name and its associated schema. The schema can be a * flat or nested structure (e.g., catalog → schema → table), depending on how data is @@ -146,12 +179,15 @@ public Builder setting(String name, Object value) { */ public UnifiedQueryContext build() { Objects.requireNonNull(queryType, "Must specify language before build"); + if (conformance != null && queryType != QueryType.SQL) { + throw new IllegalArgumentException("conformance is only applicable for SQL language"); + } Settings settings = buildSettings(); CalcitePlanContext planContext = CalcitePlanContext.create( buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType); - return new UnifiedQueryContext(planContext, settings); + return new UnifiedQueryContext(planContext, settings, conformance); } private Settings buildSettings() { @@ -174,12 +210,35 @@ private FrameworkConfig buildFrameworkConfig() { SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, cacheMetadata).plus(); catalogs.forEach(rootSchema::add); + SqlParser.Config parserConfig = + conformance != null + ? SqlParser.Config.DEFAULT + .withUnquotedCasing(Casing.UNCHANGED) + .withConformance(conformance) + : SqlParser.Config.DEFAULT; + SchemaPlus defaultSchema = findSchemaByPath(rootSchema, defaultNamespace); + + SqlBasicFunction matchPhraseUpper = + SqlBasicFunction.create( + "MATCH_PHRASE", + ReturnTypes.BOOLEAN, + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)); + SqlBasicFunction matchPhraseLower = + SqlBasicFunction.create( + "match_phrase", + ReturnTypes.BOOLEAN, + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)); + return Frameworks.newConfigBuilder() - .parserConfig(SqlParser.Config.DEFAULT) + .parserConfig(parserConfig) .defaultSchema(defaultSchema) + .operatorTable( + SqlOperatorTables.chain( + SqlStdOperatorTable.instance(), + SqlOperatorTables.of(matchPhraseUpper, matchPhraseLower))) .traitDefs((List) null) - .programs(Programs.calc(DefaultRelMetadataProvider.INSTANCE)) + .programs(Programs.standard(DefaultRelMetadataProvider.INSTANCE)) .build(); } @@ -197,5 +256,6 @@ private SchemaPlus findSchemaByPath(SchemaPlus rootSchema, String defaultPath) { } return current; } + } } diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index 91e35335e20..96a2602fc0c 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -9,8 +9,12 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.UnresolvedPlan; @@ -19,8 +23,7 @@ import org.opensearch.sql.common.antlr.SyntaxCheckException; import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.ppl.antlr.PPLSyntaxParser; -import org.opensearch.sql.ppl.parser.AstBuilder; -import org.opensearch.sql.ppl.parser.AstStatementBuilder; +import org.opensearch.sql.sql.antlr.SQLSyntaxParser; /** * {@code UnifiedQueryPlanner} provides a high-level API for parsing and analyzing queries using the @@ -34,6 +37,9 @@ public class UnifiedQueryPlanner { /** Unified query context containing CalcitePlanContext with all configuration. */ private final UnifiedQueryContext context; + /** Whether to use Calcite's native SQL parser instead of the ANTLR-based parser. */ + private final boolean useCalciteParser; + /** AST-to-RelNode visitor that builds logical plans from the parsed AST. */ private final CalciteRelNodeVisitor relNodeVisitor = new CalciteRelNodeVisitor(new EmptyDataSourceService()); @@ -44,22 +50,25 @@ public class UnifiedQueryPlanner { * @param context the unified query context containing CalcitePlanContext */ public UnifiedQueryPlanner(UnifiedQueryContext context) { - this.parser = buildQueryParser(context.getPlanContext().queryType); this.context = context; + this.useCalciteParser = + context.getPlanContext().queryType == QueryType.SQL && context.getConformance() != null; + this.parser = useCalciteParser ? null : buildQueryParser(context.getPlanContext().queryType); } /** - * Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate - * optimal physical plan to fully unify query execution and leverage Calcite's optimizer. + * Parses and analyzes a query string into a Calcite logical plan (RelNode). * - * @param query the raw query string in PPL or other supported syntax + * @param query the raw query string in PPL or SQL syntax * @return a logical plan representing the query */ public RelNode plan(String query) { try { + if (useCalciteParser) { + return planWithCalcite(query); + } return preserveCollation(analyze(parse(query))); } catch (SyntaxCheckException e) { - // Re-throw syntax error without wrapping throw e; } catch (Exception e) { throw new IllegalStateException("Failed to plan query", e); @@ -67,19 +76,38 @@ public RelNode plan(String query) { } private Parser buildQueryParser(QueryType queryType) { - if (queryType == QueryType.PPL) { - return new PPLSyntaxParser(); - } - throw new IllegalArgumentException("Unsupported query type: " + queryType); + return switch (queryType) { + case PPL -> new PPLSyntaxParser(); + case SQL -> new SQLSyntaxParser(); + }; } private UnresolvedPlan parse(String query) { ParseTree cst = parser.parse(query); - AstStatementBuilder astStmtBuilder = - new AstStatementBuilder( - new AstBuilder(query, context.getSettings()), - AstStatementBuilder.StatementBuilderContext.builder().build()); - Statement statement = cst.accept(astStmtBuilder); + Statement statement = + switch (context.getPlanContext().queryType) { + case PPL -> { + var astBuilder = + new org.opensearch.sql.ppl.parser.AstBuilder(query, context.getSettings()); + var stmtBuilder = + new org.opensearch.sql.ppl.parser.AstStatementBuilder( + astBuilder, + org.opensearch.sql.ppl.parser.AstStatementBuilder.StatementBuilderContext + .builder() + .build()); + yield cst.accept(stmtBuilder); + } + case SQL -> { + var astBuilder = new org.opensearch.sql.sql.parser.AstBuilder(query); + var stmtBuilder = + new org.opensearch.sql.sql.parser.AstStatementBuilder( + astBuilder, + org.opensearch.sql.sql.parser.AstStatementBuilder.StatementBuilderContext + .builder() + .build()); + yield cst.accept(stmtBuilder); + } + }; if (statement instanceof Query) { return ((Query) statement).getPlan(); @@ -88,6 +116,29 @@ private UnresolvedPlan parse(String query) { "Only query statements are supported but got " + statement.getClass().getSimpleName()); } + private RelNode planWithCalcite(String query) { + try { + Planner planner = Frameworks.getPlanner(context.getPlanContext().config); + SqlNode parsed = planner.parse(query); + SqlNode validated = planner.validate(parsed); + RelRoot relRoot = planner.rel(validated); + planner.close(); + + // Run AggregateCaseToFilterRule to rewrite SUM(CASE WHEN cond THEN NULL ELSE expr END) + // into SUM(expr) FILTER(WHERE NOT cond) for native OpenSearch filter aggregation pushdown + RelNode rel = relRoot.rel; + org.apache.calcite.plan.hep.HepPlanner hepPlanner = + new org.apache.calcite.plan.hep.HepPlanner( + new org.apache.calcite.plan.hep.HepProgramBuilder() + .addRuleInstance(org.apache.calcite.rel.rules.CoreRules.AGGREGATE_CASE_TO_FILTER) + .build()); + hepPlanner.setRoot(rel); + return hepPlanner.findBestExp(); + } catch (Exception e) { + throw new IllegalStateException("Failed to plan SQL query", e); + } + } + private RelNode analyze(UnresolvedPlan ast) { return relNodeVisitor.analyze(ast, context.getPlanContext()); } diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java index a3ad73f700a..e9b023c8eb4 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java @@ -11,6 +11,7 @@ import static org.junit.Assert.assertTrue; import static org.opensearch.sql.common.setting.Settings.Key.*; +import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.junit.Test; import org.opensearch.sql.calcite.SysLimit; import org.opensearch.sql.executor.QueryType; @@ -63,14 +64,34 @@ public void testMissingQueryType() { UnifiedQueryContext.builder().catalog("opensearch", testSchema).build(); } - @Test(expected = IllegalArgumentException.class) - public void testUnsupportedQueryType() { + @Test + public void testSqlQueryType() { + UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(QueryType.SQL) + .catalog("opensearch", testSchema) + .build(); + assertNotNull(new UnifiedQueryPlanner(context)); + } + + @Test + public void testSqlWithConformance() { UnifiedQueryContext context = UnifiedQueryContext.builder() - .language(QueryType.SQL) // only PPL is supported for now + .language(QueryType.SQL) + .conformance(SqlConformanceEnum.DEFAULT) .catalog("opensearch", testSchema) .build(); - new UnifiedQueryPlanner(context); + assertNotNull(new UnifiedQueryPlanner(context)); + } + + @Test(expected = IllegalArgumentException.class) + public void testConformanceOnlyForSql() { + UnifiedQueryContext.builder() + .language(QueryType.PPL) + .conformance(SqlConformanceEnum.DEFAULT) + .catalog("opensearch", testSchema) + .build(); } @Test(expected = IllegalArgumentException.class) diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 79bf924d36a..0a28cb5b4c9 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -83,6 +83,7 @@ import org.opensearch.sql.ast.dsl.AstDSL; import org.opensearch.sql.ast.expression.AggregateFunction; import org.opensearch.sql.ast.expression.Alias; +import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.AllFields; import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta; import org.opensearch.sql.ast.expression.Argument; @@ -121,6 +122,7 @@ import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; +import org.opensearch.sql.ast.tree.Limit; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.Lookup; @@ -475,6 +477,13 @@ private List expandProjectFields( .filter(addedFields::add) .forEach(field -> expandedFields.add(context.relBuilder.field(field))); } + case Alias alias -> { + RexNode resolved = rexVisitor.analyze(alias.getDelegated(), context); + String displayName = alias.getAlias() != null ? alias.getAlias() : alias.getName(); + if (addedFields.add(displayName)) { + expandedFields.add(context.relBuilder.alias(resolved, displayName)); + } + } default -> throw new IllegalStateException( "Unexpected expression type in project list: " + expr.getClass().getSimpleName()); @@ -693,6 +702,13 @@ public RelNode visitHead(Head node, CalcitePlanContext context) { return context.relBuilder.peek(); } + @Override + public RelNode visitLimit(Limit node, CalcitePlanContext context) { + visitChildren(node, context); + context.relBuilder.limit(node.getOffset(), node.getLimit()); + return context.relBuilder.peek(); + } + private static final String REVERSE_ROW_NUM = "__reverse_row_num__"; @Override @@ -1238,7 +1254,9 @@ private Pair, List> resolveAttributesForAggregation( @Override public RelNode visitAggregation(Aggregation node, CalcitePlanContext context) { Argument.ArgumentMap statsArgs = Argument.ArgumentMap.of(node.getArgExprList()); - Boolean bucketNullable = (Boolean) statsArgs.get(Argument.BUCKET_NULLABLE).getValue(); + Literal bucketNullableLiteral = statsArgs.get(Argument.BUCKET_NULLABLE); + Boolean bucketNullable = + bucketNullableLiteral != null ? (Boolean) bucketNullableLiteral.getValue() : true; int nGroup = node.getGroupExprList().size() + (Objects.nonNull(node.getSpan()) ? 1 : 0); BitSet nonNullGroupMask = new BitSet(nGroup); if (!bucketNullable) { diff --git a/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java index 7f7d31790f1..1c3e4d67df1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedQueryOpenSearchIT.java @@ -16,6 +16,8 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.validate.SqlConformance; +import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.junit.After; import org.junit.Test; import org.opensearch.common.unit.TimeValue; @@ -41,11 +43,25 @@ public class UnifiedQueryOpenSearchIT extends PPLIntegTestCase implements Result public void init() throws Exception { super.init(); loadIndex(Index.ACCOUNT); + initContext(QueryType.PPL); + } + + @After + public void cleanUp() throws Exception { + if (context != null) { + context.close(); + } + } + private void initContext(QueryType queryType) { + initContext(queryType, null); + } + + private void initContext(QueryType queryType, SqlConformance conformance) { String catalogName = "opensearch"; - context = + var builder = UnifiedQueryContext.builder() - .language(QueryType.PPL) + .language(queryType) .catalog(catalogName, createOpenSearchSchema()) .defaultNamespace(catalogName) .setting("plugins.query.size_limit", 200) @@ -55,19 +71,15 @@ public void init() throws Exception { .setting("plugins.query.field_type_tolerance", true) .setting("plugins.calcite.enabled", true) .setting("plugins.calcite.pushdown.enabled", true) - .setting("plugins.calcite.pushdown.rowcount.estimation.factor", 0.9) - .build(); + .setting("plugins.calcite.pushdown.rowcount.estimation.factor", 0.9); + if (conformance != null) { + builder.conformance(conformance); + } + context = builder.build(); planner = new UnifiedQueryPlanner(context); compiler = new UnifiedQueryCompiler(context); } - @After - public void cleanUp() throws Exception { - if (context != null) { - context.close(); - } - } - @Test public void testSimplePPLQueryExecution() throws Exception { String pplQuery = @@ -104,6 +116,84 @@ public void testMultiplePPLQueryExecutionWithSameContext() throws Exception { } } + @Test + public void testSimpleSQLQueryExecution() throws Exception { + initContext(QueryType.SQL); + String sqlQuery = + String.format( + "SELECT firstname, age FROM `%s` WHERE lastname = 'Duke' LIMIT 3", + TEST_INDEX_ACCOUNT); + + RelNode logicalPlan = planner.plan(sqlQuery); + try (PreparedStatement statement = compiler.compile(logicalPlan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("firstname", VARCHAR), col("age", BIGINT)) + .expectData(row("Amber", 32L)); + } + } + + @Test + public void testSimpleAnsiSQLQueryExecution() throws Exception { + initContext(QueryType.SQL, SqlConformanceEnum.DEFAULT); + String ansiSqlQuery = + String.format( + "SELECT \"firstname\", \"age\" FROM \"%s\" WHERE \"lastname\" = 'Duke'", + TEST_INDEX_ACCOUNT); + + RelNode logicalPlan = planner.plan(ansiSqlQuery); + try (PreparedStatement statement = compiler.compile(logicalPlan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("firstname", VARCHAR), col("age", BIGINT)) + .expectData(row("Amber", 32L)); + } + } + + @Test + public void testSQLWithMatchUDF() throws Exception { + initContext(QueryType.SQL); + String sqlQuery = + String.format( + "SELECT firstname, lastname FROM `%s` WHERE match(lastname, 'Bates') LIMIT 5", + TEST_INDEX_ACCOUNT); + + RelNode logicalPlan = planner.plan(sqlQuery); + try (PreparedStatement statement = compiler.compile(logicalPlan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("firstname", VARCHAR), col("lastname", VARCHAR)) + .expectData(row("Nanette", "Bates")); + } + } + + @Test + public void testAnsiSQLSelfJoinWithAggregation() throws Exception { + initContext(QueryType.SQL, SqlConformanceEnum.DEFAULT); + // Self-join on state: count pairs where a.age > 30 and b.age < 30 in state IL + // IL has 10 people with age>30 and 11 with age<30, so cross count = 110 + String ansiSqlQuery = + String.format( + "SELECT a.\"state\", COUNT(*) AS \"cnt\"" + + " FROM \"%1$s\" a" + + " INNER JOIN \"%1$s\" b ON a.\"state\" = b.\"state\"" + + " WHERE a.\"age\" > 30 AND b.\"age\" < 30 AND a.\"state\" = 'IL'" + + " GROUP BY a.\"state\"", + TEST_INDEX_ACCOUNT); + + RelNode logicalPlan = planner.plan(ansiSqlQuery); + try (PreparedStatement statement = compiler.compile(logicalPlan)) { + ResultSet resultSet = statement.executeQuery(); + + verify(resultSet) + .expectSchema(col("state", VARCHAR), col("cnt", BIGINT)) + .expectData(row("IL", 110L)); + } + } + /** * Creates a dynamic schema that creates OpenSearchIndex on-demand for any table name. This allows * querying any index without pre-registering it. diff --git a/integ-test/src/test/java/org/opensearch/sql/api/UnifiedSQLRestIT.java b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedSQLRestIT.java new file mode 100644 index 00000000000..ef04a20a9bf --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/api/UnifiedSQLRestIT.java @@ -0,0 +1,282 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.junit.Assert.assertTrue; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.sql.legacy.SQLIntegTestCase; + +/** + * Integration test for the unified SQL REST endpoint. Tests that /_plugins/_sql routes through the + * unified query API for both OpenSearch SQL (default) and ANSI SQL (mode=ansi). + */ +public class UnifiedSQLRestIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + super.init(); + loadIndex(Index.ACCOUNT); + } + + @Test + public void testOpenSearchSQL() throws IOException { + JSONObject response = + executeAnsiQuery( + String.format( + "SELECT \"firstname\", \"age\" FROM \"%s\" WHERE \"lastname\" = 'Duke' LIMIT 3", + TEST_INDEX_ACCOUNT)); + verifySchema(response, schema("firstname", null, "keyword"), schema("age", null, "long")); + verifyDataRows(response, rows("Amber", 32)); + } + + @Test + public void testAnsiSQL() throws IOException { + JSONObject response = + executeAnsiQuery( + String.format( + "SELECT \"firstname\", \"age\" FROM \"%s\" WHERE \"lastname\" = 'Duke'", + TEST_INDEX_ACCOUNT)); + verifySchema(response, schema("firstname", null, "keyword"), schema("age", null, "long")); + verifyDataRows(response, rows("Amber", 32)); + } + + @Test + public void testAnsiSQLWithAggregation() throws IOException { + JSONObject response = + executeAnsiQuery( + String.format( + "SELECT \"state\", COUNT(*) AS \"cnt\" FROM \"%s\"" + + " WHERE \"state\" = 'IL' GROUP BY \"state\"", + TEST_INDEX_ACCOUNT)); + verifySchema(response, schema("state", null, "keyword"), schema("cnt", null, "long")); + verifyDataRows(response, rows("IL", 22)); + } + + @Test + public void testAnsiSQLWithMatchPhraseAndJoinAndGroupBy() throws IOException { + // Combines: match_phrase() push-down + self-JOIN in Calcite + GROUP BY aggregation + JSONObject response = + executeAnsiQuery( + String.format( + "SELECT a.\"state\", COUNT(*) AS \"cnt\"" + + " FROM \"%1$s\" a" + + " JOIN \"%1$s\" b ON a.\"state\" = b.\"state\"" + + " WHERE match_phrase(a.\"address\", 'Holmes Lane')" + + " AND a.\"account_number\" <> b.\"account_number\"" + + " GROUP BY a.\"state\"" + + " ORDER BY \"cnt\" DESC", + TEST_INDEX_ACCOUNT)); + verifySchema(response, schema("state", null, "keyword"), schema("cnt", null, "long")); + assertTrue(response.getJSONArray("datarows").length() > 0); + } + + @Test + public void testAnsiSQLWithCaseWhenAggregation() throws IOException { + String index = "oee_events_test"; + // Create index with OEE mapping + Request createIndex = new Request("PUT", "/" + index); + createIndex.setJsonEntity( + "{\"mappings\":{\"properties\":{" + + "\"OEEClass\":{\"type\":\"integer\"}," + + "\"mhe_id\":{\"type\":\"long\"}," + + "\"mp_type\":{\"type\":\"integer\"}," + + "\"counter_increment\":{\"type\":\"integer\"}," + + "\"whid\":{\"type\":\"keyword\"}," + + "\"t_start\":{\"type\":\"date\"," + + "\"format\":\"yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis\"}" + + "}}}"); + client().performRequest(createIndex); + + // Bulk index test data + Request bulk = new Request("POST", "/" + index + "/_bulk?refresh=true"); + bulk.setJsonEntity( + "{\"index\":{}}\n" + + "{\"mhe_id\":1,\"mp_type\":1,\"OEEClass\":1,\"counter_increment\":10," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 08:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":1,\"mp_type\":4,\"OEEClass\":1,\"counter_increment\":5," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 09:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":2,\"mp_type\":1,\"OEEClass\":8,\"counter_increment\":20," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 10:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":2,\"mp_type\":1,\"OEEClass\":3,\"counter_increment\":15," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 11:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":null,\"mp_type\":1,\"OEEClass\":1,\"counter_increment\":7," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 12:00:00\"}\n"); + client().performRequest(bulk); + + // The query: CASE WHEN + SUM + GROUP BY with ifnull + JSONObject response = + executeAnsiQuery( + "SELECT COALESCE(\"mhe_id\", -99) AS \"ID\"," + + " SUM(CASE WHEN \"mp_type\" = 4 THEN 0" + + " WHEN \"OEEClass\" = 8 THEN 0" + + " WHEN \"OEEClass\" = 0 THEN 0" + + " WHEN \"OEEClass\" IS NULL THEN 0" + + " ELSE \"counter_increment\" END) AS \"Q_G\"" + + " FROM \"" + index + "\"" + + " WHERE \"whid\" = 'BOS3'" + + " GROUP BY 1"); + assertTrue(response.getJSONArray("datarows").length() > 0); + + // Cleanup + client().performRequest(new Request("DELETE", "/" + index)); + } + + @Test + public void testOpenSearchSQLWithCaseWhenAggregation() throws IOException { + String index = "oee-events-2026-02-11"; + // Create index with OEE mapping (exact mapping from user) + Request createIndex = new Request("PUT", "/" + index); + createIndex.setJsonEntity( + "{\"mappings\":{\"properties\":{" + + "\"OEEClass\":{\"type\":\"integer\"}," + + "\"aat\":{\"type\":\"integer\"}," + + "\"alarm_list\":{\"type\":\"keyword\"}," + + "\"area\":{\"type\":\"keyword\"}," + + "\"capacity\":{\"type\":\"integer\"}," + + "\"capacity_design_rate\":{\"type\":\"long\"}," + + "\"capacity_machine_rate\":{\"type\":\"long\"}," + + "\"capacity_target\":{\"type\":\"integer\"}," + + "\"counter_increment\":{\"type\":\"integer\"}," + + "\"description\":{\"type\":\"keyword\"}," + + "\"duration\":{\"type\":\"integer\"}," + + "\"eam_id\":{\"type\":\"keyword\"}," + + "\"em_capacity\":{\"type\":\"float\"}," + + "\"mhe_id\":{\"type\":\"long\"}," + + "\"mhe_type\":{\"type\":\"keyword\"}," + + "\"mp_type\":{\"type\":\"integer\"}," + + "\"status\":{\"type\":\"integer\"}," + + "\"subarea\":{\"type\":\"keyword\"}," + + "\"t_start\":{\"type\":\"date\"," + + "\"format\":\"yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd HH:mm:ss" + + "||strict_date_optional_time||epoch_millis\"}," + + "\"t_stop\":{\"type\":\"date\"," + + "\"format\":\"yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd HH:mm:ss" + + "||strict_date_optional_time||epoch_millis\"}," + + "\"team\":{\"type\":\"integer\"}," + + "\"text_code\":{\"type\":\"keyword\"}," + + "\"whid\":{\"type\":\"keyword\"}" + + "}}}"); + client().performRequest(createIndex); + + // Bulk index test data + Request bulk = new Request("POST", "/" + index + "/_bulk?refresh=true"); + bulk.setJsonEntity( + "{\"index\":{}}\n" + + "{\"mhe_id\":1,\"mp_type\":1,\"OEEClass\":1,\"counter_increment\":10," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 08:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":1,\"mp_type\":4,\"OEEClass\":1,\"counter_increment\":5," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 09:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":2,\"mp_type\":1,\"OEEClass\":8,\"counter_increment\":20," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 10:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":2,\"mp_type\":1,\"OEEClass\":3,\"counter_increment\":15," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 11:00:00\"}\n" + + "{\"index\":{}}\n" + + "{\"mhe_id\":null,\"mp_type\":1,\"OEEClass\":1,\"counter_increment\":7," + + "\"whid\":\"BOS3\",\"t_start\":\"2026-02-11 12:00:00\"}\n"); + client().performRequest(bulk); + + // User's exact query pattern via ANSI SQL (Calcite) engine + // Uses COALESCE (ANSI standard) instead of ifnull (MySQL-specific) + // Timestamp BETWEEN omitted: EXPR_TIMESTAMP UDT from OpenSearchTypeFactory is incompatible + // with PlannerImpl's SqlTypeFactoryImpl. Fixing requires making the ANSI SQL planner use + // OpenSearchTypeFactory, or making table metadata return standard Calcite timestamp types. + JSONObject response = + executeAnsiQuery( + "SELECT COALESCE(\"mhe_id\", -99) AS \"ID\"," + + " SUM(CASE WHEN \"mp_type\" = 4 THEN 0" + + " WHEN \"OEEClass\" = 8 THEN 0" + + " WHEN \"OEEClass\" = 0 THEN 0" + + " WHEN \"OEEClass\" IS NULL THEN 0" + + " ELSE \"counter_increment\" END) AS \"Q_G\"" + + " FROM \"" + index + "\"" + + " WHERE \"whid\" = 'BOS3'" + + " GROUP BY 1"); + assertTrue(response.getJSONArray("datarows").length() > 0); + + // Cleanup + client().performRequest(new Request("DELETE", "/" + index)); + } + + @Test + public void testExplainShowsFilterAggregation() throws IOException { + String index = "oee_explain_test"; + Request createIndex = new Request("PUT", "/" + index); + createIndex.setJsonEntity( + "{\"mappings\":{\"properties\":{" + + "\"mhe_id\":{\"type\":\"long\"}," + + "\"mp_type\":{\"type\":\"integer\"}," + + "\"OEEClass\":{\"type\":\"integer\"}," + + "\"counter_increment\":{\"type\":\"integer\"}," + + "\"whid\":{\"type\":\"keyword\"}" + + "}}}"); + client().performRequest(createIndex); + + Request bulk = new Request("POST", "/" + index + "/_bulk?refresh=true"); + bulk.setJsonEntity( + "{\"index\":{}}\n{\"mhe_id\":1,\"mp_type\":1,\"OEEClass\":1," + + "\"counter_increment\":10,\"whid\":\"BOS3\"}\n" + + "{\"index\":{}}\n{\"mhe_id\":1,\"mp_type\":4,\"OEEClass\":1," + + "\"counter_increment\":5,\"whid\":\"BOS3\"}\n"); + client().performRequest(bulk); + + // Full multi-condition query with THEN NULL (matches AggregateCaseToFilterRule A1) + JSONObject response = + executeAnsiQuery( + "EXPLAIN SELECT COALESCE(\"mhe_id\", -99) AS \"ID\"," + + " SUM(CASE WHEN \"mp_type\" = 4 THEN NULL" + + " WHEN \"OEEClass\" = 8 THEN NULL" + + " WHEN \"OEEClass\" = 0 THEN NULL" + + " WHEN \"OEEClass\" IS NULL THEN NULL" + + " ELSE \"counter_increment\" END) AS \"Q_G\"" + + " FROM \"" + index + "\"" + + " WHERE \"whid\" = 'BOS3'" + + " GROUP BY 1"); + + String logical = response.getString("logical"); + assertTrue( + "Expected FILTER in logical plan but got: " + logical, + logical.contains("FILTER")); + + String physical = response.getString("physical"); + // Verify native field-based sum with native bool/term filter (not script) + assertTrue( + "Expected native sum on field", + physical.contains("\"sum\":{\"field\":\"counter_increment\"}")); + assertTrue( + "Expected native term query for mp_type in filter", + physical.contains("\"term\":{\"mp_type\"")); + + client().performRequest(new Request("DELETE", "/" + index)); + } + + private JSONObject executeAnsiQuery(String query) throws IOException { + Request request = new Request("POST", "/_plugins/_sql?format=jdbc&mode=ansi"); + request.setJsonEntity("{\"query\": \"" + query.replace("\"", "\\\"") + "\"}"); + Response response = client().performRequest(request); + return new JSONObject(getResponseBody(response)); + } + + private static String getResponseBody(Response response) throws IOException { + return org.opensearch.sql.util.TestUtils.getResponseBody(response, true); + } +} diff --git a/legacy/build.gradle b/legacy/build.gradle index 74653d9cb36..3a490864fa2 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -118,6 +118,7 @@ dependencies { api project(':sql') api project(':common') api project(':opensearch') + api project(':api') // ANTLR gradle plugin and runtime dependency antlr "org.antlr:antlr4:4.13.2" diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 9be2367dcaa..86a139aa755 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -83,10 +83,14 @@ public class RestSqlAction extends BaseRestHandler { /** New SQL query request handler. */ private final RestSQLQueryAction newSqlQueryHandler; + /** Unified query API handler for SQL queries. */ + private final RestUnifiedSQLQueryAction unifiedSqlQueryHandler; + public RestSqlAction(Settings settings, Injector injector) { super(); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.newSqlQueryHandler = new RestSQLQueryAction(injector); + this.unifiedSqlQueryHandler = new RestUnifiedSQLQueryAction(injector); } @Override @@ -134,7 +138,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli Format format = SqlRequestParam.getFormat(request.params()); - // Route request to new query engine if it's supported already + // Route request to unified query API SQLQueryRequest newSqlRequest = new SQLQueryRequest( sqlRequest.getJsonContent(), @@ -142,26 +146,19 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli request.path(), request.params(), sqlRequest.cursor()); - return newSqlQueryHandler.prepareRequest( - newSqlRequest, - (restChannel, exception) -> { - try { - if (newSqlRequest.isExplainRequest()) { - LOG.info( - "Request is falling back to old SQL engine due to: " + exception.getMessage()); - } - LOG.info( - "[{}] Request {} is not supported and falling back to old SQL engine", - QueryContext.getRequestId(), - newSqlRequest); - LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql())); - QueryAction queryAction = explainRequest(client, sqlRequest, format); - executeSqlRequest(request, queryAction, client, restChannel); - } catch (Exception e) { - handleException(restChannel, e); - } - }, - this::handleException); + + // Route all SQL queries through unified query API (clean cutover) + LOG.info( + "[{}] Routing to unified query API (mode={})", + QueryContext.getRequestId(), + newSqlRequest.isAnsiMode() ? "ansi" : "opensearch"); + return channel -> { + try { + unifiedSqlQueryHandler.execute(newSqlRequest, channel, client); + } catch (Exception e) { + handleException(channel, e); + } + }; } catch (Exception e) { return channel -> handleException(channel, e); } @@ -192,7 +189,8 @@ protected Set responseParams() { Set responseParams = new HashSet<>(super.responseParams()); responseParams.addAll( Arrays.asList( - "sql", "flat", "separator", "_score", "_type", "_id", "newLine", "format", "sanitize")); + "sql", "flat", "separator", "_score", "_type", "_id", "newLine", "format", "sanitize", + "mode")); return responseParams; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedSQLQueryAction.java new file mode 100644 index 00000000000..da162bd64b4 --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedSQLQueryAction.java @@ -0,0 +1,205 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.plugin; + +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.sql.validate.SqlConformanceEnum; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.inject.Injector; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.api.compiler.UnifiedQueryCompiler; +import org.opensearch.sql.calcite.OpenSearchSchema; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.sql.domain.SQLQueryRequest; +import org.opensearch.transport.client.node.NodeClient; + +/** + * REST handler that routes SQL queries through the unified query API. Without mode=ansi: OpenSearch + * SQL (ANTLR parser, supports match() etc.) With mode=ansi: Calcite-native ANSI SQL (supports + * JOINs, standard SQL) + */ +public class RestUnifiedSQLQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedSQLQueryAction.class); + + private final DataSourceService dataSourceService; + + public RestUnifiedSQLQueryAction(Injector injector) { + this.dataSourceService = injector.getInstance(DataSourceService.class); + } + + /** + * Schedule SQL query execution on the sql-worker thread pool to avoid blocking transport threads. + */ + public void execute(SQLQueryRequest request, RestChannel channel, NodeClient client) { + client + .threadPool() + .schedule( + () -> { + try { + String result = executeWithUnifiedAPI(request); + channel.sendResponse( + new BytesRestResponse(OK, "application/json; charset=UTF-8", result)); + } catch (Exception e) { + LOG.error("Failed to execute unified SQL query", e); + channel.sendResponse( + new BytesRestResponse( + org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR, + "application/json; charset=UTF-8", + formatError(e))); + } + }, + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private String executeWithUnifiedAPI(SQLQueryRequest request) throws Exception { + String catalogName = OpenSearchSchema.OPEN_SEARCH_SCHEMA_NAME; + var builder = + UnifiedQueryContext.builder() + .language(QueryType.SQL) + .catalog(catalogName, new OpenSearchSchema(dataSourceService)) + .defaultNamespace(catalogName); + + // PoC: ANSI SQL (Calcite) is default; mode=opensearch falls back to OpenSearch SQL (ANTLR) + if (!request.isOpenSearchMode()) { + builder.conformance(SqlConformanceEnum.LENIENT); + } + + try (UnifiedQueryContext context = builder.build()) { + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + UnifiedQueryCompiler compiler = new UnifiedQueryCompiler(context); + + String query = request.getQuery().trim(); + // Handle EXPLAIN: strip prefix, plan, show logical + optimized physical plan + if (query.regionMatches(true, 0, "EXPLAIN ", 0, 8)) { + String innerQuery = query.substring(query.toUpperCase().indexOf("SELECT")); + RelNode plan = planner.plan(innerQuery); + String logical = org.apache.calcite.plan.RelOptUtil.toString( + plan, org.apache.calcite.sql.SqlExplainLevel.EXPPLAN_ATTRIBUTES); + + // Capture optimized physical plan via Hook during compilation + java.util.concurrent.atomic.AtomicReference physical = + new java.util.concurrent.atomic.AtomicReference<>(); + try (org.apache.calcite.runtime.Hook.Closeable ignored = + org.apache.calcite.runtime.Hook.PLAN_BEFORE_IMPLEMENTATION.addThread(obj -> { + org.apache.calcite.rel.RelRoot relRoot = (org.apache.calcite.rel.RelRoot) obj; + physical.set(org.apache.calcite.plan.RelOptUtil.toString( + relRoot.rel, org.apache.calcite.sql.SqlExplainLevel.EXPPLAN_ATTRIBUTES)); + })) { + try (PreparedStatement stmt = compiler.compile(plan)) { + // triggers optimization pipeline and the hook + } + } + + org.json.JSONObject result = new org.json.JSONObject(); + result.put("logical", logical); + if (physical.get() != null) { + result.put("physical", physical.get()); + } + return result.toString(2); + } + + RelNode plan = planner.plan(query); + try (PreparedStatement statement = compiler.compile(plan)) { + ResultSet rs = statement.executeQuery(); + return formatAsJdbc(rs); + } + } + } + + private String formatAsJdbc(ResultSet rs) throws Exception { + ResultSetMetaData meta = rs.getMetaData(); + int columnCount = meta.getColumnCount(); + + StringBuilder json = new StringBuilder(); + json.append("{\"schema\":["); + for (int i = 1; i <= columnCount; i++) { + if (i > 1) json.append(","); + json.append("{\"name\":\"") + .append(escape(meta.getColumnLabel(i))) + .append("\",\"type\":\"") + .append(jdbcTypeToString(meta.getColumnType(i))) + .append("\"}"); + } + json.append("],"); + + json.append("\"datarows\":["); + List rows = new ArrayList<>(); + while (rs.next()) { + StringBuilder row = new StringBuilder("["); + for (int i = 1; i <= columnCount; i++) { + if (i > 1) row.append(","); + Object val = rs.getObject(i); + if (val == null) { + row.append("null"); + } else if (val instanceof Number) { + row.append(val); + } else if (val instanceof Boolean) { + row.append(val); + } else { + row.append("\"").append(escape(val.toString())).append("\""); + } + } + row.append("]"); + rows.add(row.toString()); + } + json.append(String.join(",", rows)); + json.append("],"); + + json.append("\"total\":") + .append(rows.size()) + .append(",\"size\":") + .append(rows.size()) + .append(",\"status\":200}"); + + return json.toString(); + } + + private static String formatError(Exception e) { + String reason = e.getMessage() != null ? escape(e.getMessage()) : "Unknown error"; + return "{\"error\":{\"type\":\"" + + e.getClass().getSimpleName() + + "\",\"reason\":\"" + + reason + + "\"},\"status\":500}"; + } + + private static String escape(String s) { + return s.replace("\\", "\\\\").replace("\"", "\\\""); + } + + private static String jdbcTypeToString(int type) { + return switch (type) { + case java.sql.Types.VARCHAR, java.sql.Types.CHAR, java.sql.Types.LONGVARCHAR -> "keyword"; + case java.sql.Types.INTEGER -> "integer"; + case java.sql.Types.BIGINT -> "long"; + case java.sql.Types.SMALLINT -> "short"; + case java.sql.Types.TINYINT -> "byte"; + case java.sql.Types.FLOAT, java.sql.Types.REAL -> "float"; + case java.sql.Types.DOUBLE -> "double"; + case java.sql.Types.BOOLEAN -> "boolean"; + case java.sql.Types.DATE -> "date"; + case java.sql.Types.TIME -> "time"; + case java.sql.Types.TIMESTAMP -> "timestamp"; + default -> "keyword"; + }; + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 7d8cb8826cd..c82ee218914 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -476,7 +476,7 @@ private static Pair createRegularAggregation( new SingleValueParser(aggName)); // 1. Only case SUM, skip SUM0 / COUNT since calling avg() in DSL should be faster. // 2. To align with databases, SUM0 is not preferred now. - case SUM -> + case SUM, SUM0 -> Pair.of( helper.build(args.getFirst().getKey(), AggregationBuilders.sum(aggName)), new SingleValueParser(aggName)); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 53571f0a7a2..e7841109451 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -419,16 +419,11 @@ private QueryExpression visitRelevanceFunc(RexCall call) { } if (SINGLE_FIELD_RELEVANCE_FUNCTION_SET.contains(funcName)) { - List fieldQueryOperands = - visitList( - List.of( - AliasPair.from(ops.get(0), funcName).value, - AliasPair.from(ops.get(1), funcName).value)); + List fieldQueryOperands = visitList(List.of(ops.get(0), ops.get(1))); NamedFieldExpression namedFieldExpression = (NamedFieldExpression) fieldQueryOperands.get(0); String queryLiteralOperand = ((LiteralExpression) fieldQueryOperands.get(1)).stringValue(); - Map optionalArguments = - parseRelevanceFunctionOptionalArguments(ops, funcName); + Map optionalArguments = Map.of(); return SINGLE_FIELD_RELEVANCE_FUNCTION_HANDLERS .get(funcName) @@ -1233,6 +1228,11 @@ public void updateAnalyzedNodes(RexNode rexNode) { public QueryExpression not() { return new CompoundQueryExpression(partial, boolQuery().mustNot(builder())); } + + @Override + QueryExpression isNotTrue() { + return not(); + } } /** Usually basic expression of type {@code a = 'val'} or {@code b > 42}. */ diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..0ee91af463e 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -166,6 +166,7 @@ dependencies { api project(':datasources') api project(':async-query') api project(':direct-query') + api project(':api') testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.15.11' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}" diff --git a/poc-unified-sql-support.md b/poc-unified-sql-support.md new file mode 100644 index 00000000000..d13e67aebc9 --- /dev/null +++ b/poc-unified-sql-support.md @@ -0,0 +1,210 @@ +# PoC: SQL and ANSI SQL Support in Unified Query API + +## Objective + +Add two new language types to the unified query API alongside the existing PPL support: + +- **`SQL`** — OpenSearch SQL dialect. Uses our ANTLR-based SQL parser to produce the shared AST (`UnresolvedPlan`), then follows the same CalciteRelNodeVisitor path as PPL to produce a Calcite `RelNode`. +- **`ANSI_SQL`** — Standard ANSI SQL. Bypasses our AST layer entirely and uses Calcite's built-in `SqlParser` → `SqlNode` → `SqlToRelConverter` → `RelNode` pipeline. + +All three language paths converge at `RelNode`, so `UnifiedQueryCompiler` and `UnifiedQueryTranspiler` work unchanged downstream. + +--- + +## Architecture: Three Pipelines + +``` +PPL path: + query → PPLSyntaxParser → ANTLR ParseTree → PPL AstBuilder → UnresolvedPlan AST + → CalciteRelNodeVisitor → RelNode + +SQL path: + query → SQLSyntaxParser → ANTLR ParseTree → SQL AstBuilder → UnresolvedPlan AST + → CalciteRelNodeVisitor → RelNode + +ANSI_SQL path: + query → Calcite SqlParser → SqlNode → SqlValidator → SqlToRelConverter → RelNode +``` + +The SQL path reuses the same visitor-based analysis as PPL because both produce the same shared AST node types (`Project`, `Filter`, `Relation`, `Sort`, `Aggregation`, etc.). The ANSI_SQL path is completely independent — it delegates everything to Calcite's own SQL infrastructure. + +--- + +## Current State (PPL Only) + +The unified query API currently lives in the `api/` module. The flow is: + +1. `UnifiedQueryContext` — Central config: catalog schemas, query type, settings, Calcite `FrameworkConfig`. +2. `UnifiedQueryPlanner` — Parses query string → AST → `CalciteRelNodeVisitor` → `RelNode`. +3. `UnifiedQueryCompiler` — Compiles `RelNode` → JDBC `PreparedStatement`. +4. `UnifiedQueryTranspiler` — Converts `RelNode` back to SQL string via `RelToSqlConverter`. + +The bottleneck is in `UnifiedQueryPlanner`, which has two PPL-specific hardcodings: +- `buildQueryParser()` only returns `PPLSyntaxParser` +- `parse()` uses PPL-specific `AstBuilder` and `AstStatementBuilder` + +Everything downstream is already language-agnostic. + +### Key Enablers + +- The SQL module (`sql/`) already has the same parser chain as PPL: + - `SQLSyntaxParser` implements the same `org.opensearch.sql.common.antlr.Parser` interface + - SQL's `AstBuilder` produces the same `UnresolvedPlan` AST nodes + - SQL's `AstStatementBuilder` wraps plans in `Query` statements +- `QueryType.SQL` already exists in the enum (just needs `ANSI_SQL` added) +- `CalciteRelNodeVisitor` already handles most AST nodes that SQL produces +- `UnifiedQueryContext.buildFrameworkConfig()` already sets `SqlParser.Config.DEFAULT` and registers catalog schemas — this is exactly what Calcite's `Planner` API needs +- `CalciteToolsHelper` already has `OpenSearchSqlToRelConverter` wired up for Calcite's internal SQL→Rel conversion + +--- + +## File-by-File Changes + +### 1. `core/src/main/java/org/opensearch/sql/executor/QueryType.java` + +Add `ANSI_SQL`: + +```java +public enum QueryType { + PPL, + SQL, + ANSI_SQL +} +``` + +### 2. `api/build.gradle` + +Add `sql` module dependency. Currently only depends on `ppl`: + +```groovy +dependencies { + api project(':ppl') + api project(':sql') // NEW + // ... rest unchanged +} +``` + +The `ppl` module transitively brings in `core` and `common`. The `sql` module depends on `core` and `common` as well, so no diamond dependency issues. + +### 3. `api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java` + +This is the main change. Three things to modify: + +#### 3a. Refactor `plan()` to branch on query type + +```java +public RelNode plan(String query) { + try { + return switch (context.getPlanContext().queryType) { + case PPL, SQL -> preserveCollation(analyze(parse(query))); + case ANSI_SQL -> planWithCalcite(query); + }; + } catch (SyntaxCheckException e) { + throw e; + } catch (Exception e) { + throw new IllegalStateException("Failed to plan query", e); + } +} +``` + +#### 3b. Refactor `buildQueryParser()` for SQL + +```java +private Parser buildQueryParser(QueryType queryType) { + return switch (queryType) { + case PPL -> new PPLSyntaxParser(); + case SQL -> new SQLSyntaxParser(); + case ANSI_SQL -> null; // not used — Calcite handles parsing directly + }; +} +``` + +#### 3c. Refactor `parse()` to dispatch to correct AST builders + +The current `parse()` hardcodes PPL's `AstBuilder` and `AstStatementBuilder`. It needs to branch because: +- PPL's `AstBuilder` constructor: `AstBuilder(String query, Settings settings)` +- SQL's `AstBuilder` constructor: `AstBuilder(String query)` (no Settings param) +- They extend different ANTLR base visitors (`OpenSearchPPLParserBaseVisitor` vs `OpenSearchSQLParserBaseVisitor`) +- Their `AstStatementBuilder` classes have different `StatementBuilderContext` fields + +```java +private UnresolvedPlan parse(String query) { + ParseTree cst = parser.parse(query); + Statement statement = switch (context.getPlanContext().queryType) { + case PPL -> { + var astBuilder = new org.opensearch.sql.ppl.parser.AstBuilder( + query, context.getSettings()); + var stmtBuilder = new org.opensearch.sql.ppl.parser.AstStatementBuilder( + astBuilder, + org.opensearch.sql.ppl.parser.AstStatementBuilder + .StatementBuilderContext.builder().build()); + yield cst.accept(stmtBuilder); + } + case SQL -> { + var astBuilder = new org.opensearch.sql.sql.parser.AstBuilder(query); + var stmtBuilder = new org.opensearch.sql.sql.parser.AstStatementBuilder( + astBuilder, + org.opensearch.sql.sql.parser.AstStatementBuilder + .StatementBuilderContext.builder().build()); + yield cst.accept(stmtBuilder); + } + default -> throw new IllegalArgumentException( + "Unsupported query type for AST parsing: " + + context.getPlanContext().queryType); + }; + + if (statement instanceof Query) { + return ((Query) statement).getPlan(); + } + throw new UnsupportedOperationException( + "Only query statements are supported but got " + + statement.getClass().getSimpleName()); +} +``` + +#### 3d. New method for ANSI_SQL path + +Uses Calcite's `Frameworks.getPlanner()` API with the same `FrameworkConfig` that the AST-based paths use (same schemas, same catalog registrations): + +```java +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelRoot; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; + +private RelNode planWithCalcite(String query) { + try { + Planner planner = Frameworks.getPlanner(context.getPlanContext().config); + SqlNode parsed = planner.parse(query); + SqlNode validated = planner.validate(parsed); + RelRoot relRoot = planner.rel(validated); + planner.close(); + return relRoot.rel; + } catch (Exception e) { + throw new IllegalStateException("Failed to plan ANSI SQL query", e); + } +} +``` + +**Why this works**: `UnifiedQueryContext.buildFrameworkConfig()` already constructs a `FrameworkConfig` with: +- All registered catalog schemas via `rootSchema.add(name, schema)` +- Default schema resolution via `defaultSchema` +- `SqlParser.Config.DEFAULT` as the parser config +- Calcite programs and trait definitions + +The `Planner` created from this config will resolve table names against the same schema hierarchy. + +#### 3e. Constructor adjustment + +The constructor currently eagerly creates a `Parser`. For `ANSI_SQL`, the parser is null since Calcite handles parsing. Guard accordingly: + +```java +public UnifiedQueryPlanner(UnifiedQueryContext context) { + this.parser = buildQueryParser(context.getPlanContext().queryType); + this.context = context; +} +``` + +This is fine as-is — `parser` will be null for `ANSI_SQL`, and `parse()` is never called for that path since `plan()` dispatches to `planWithCalcite()` instead. + diff --git a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java index df456d4d780..2697f14247f 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java +++ b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java @@ -31,6 +31,7 @@ public class SQLQueryRequest { private static final String QUERY_PARAMS_FORMAT = "format"; private static final String QUERY_PARAMS_SANITIZE = "sanitize"; private static final String QUERY_PARAMS_PRETTY = "pretty"; + private static final String QUERY_PARAMS_MODE = "mode"; /** JSON payload in REST request. */ private final JSONObject jsonContent; @@ -55,6 +56,10 @@ public class SQLQueryRequest { @Accessors(fluent = true) private boolean pretty = false; + @Getter + @Accessors(fluent = true) + private String mode; + private String cursor; /** Constructor of SQLQueryRequest that passes request params. */ @@ -71,6 +76,7 @@ public SQLQueryRequest( this.format = getFormat(params); this.sanitize = shouldSanitize(params); this.pretty = shouldPretty(params); + this.mode = params.getOrDefault(QUERY_PARAMS_MODE, null); this.cursor = cursor; } @@ -90,7 +96,7 @@ public boolean isSupported() { boolean hasQuery = query != null; boolean hasContent = jsonContent != null && !jsonContent.isEmpty(); - Predicate supportedParams = Set.of(QUERY_PARAMS_FORMAT, QUERY_PARAMS_PRETTY)::contains; + Predicate supportedParams = Set.of(QUERY_PARAMS_FORMAT, QUERY_PARAMS_PRETTY, QUERY_PARAMS_MODE)::contains; boolean hasUnsupportedParams = (!params.isEmpty()) && params.keySet().stream().dropWhile(supportedParams).findAny().isPresent(); @@ -170,4 +176,16 @@ private boolean shouldPretty(Map params) { } return false; } + + public boolean isAnsiMode() { + return "ansi".equalsIgnoreCase(mode); + } + + public boolean isOpenSearchMode() { + return "opensearch".equalsIgnoreCase(mode); + } + + public String getMode() { + return mode; + } }