diff --git a/api/build.gradle b/api/build.gradle index fb4cafe79d8..2f0a378e2a9 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -19,6 +19,7 @@ dependencies { testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: "${hamcrest_version}" testImplementation group: 'org.mockito', name: 'mockito-core', version: "${mockito_version}" testImplementation group: 'org.apache.calcite', name: 'calcite-testkit', version: '1.41.0' + testCompileOnly 'org.immutables:value-annotations:2.8.8' testFixturesApi group: 'junit', name: 'junit', version: '4.13.2' testFixturesApi group: 'org.hamcrest', name: 'hamcrest', version: "${hamcrest_version}" diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java index 91e35335e20..b9cde23b356 100644 --- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java +++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java @@ -5,15 +5,28 @@ package org.opensearch.sql.api; +import java.util.List; import org.antlr.v4.runtime.tree.ParseTree; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.config.Lex; +import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Planner; +import org.apache.calcite.tools.Program; +import org.apache.calcite.tools.Programs; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.CalciteRelNodeVisitor; import org.opensearch.sql.common.antlr.Parser; import org.opensearch.sql.common.antlr.SyntaxCheckException; @@ -44,7 +57,10 @@ public class UnifiedQueryPlanner { * @param context the unified query context containing CalcitePlanContext */ public UnifiedQueryPlanner(UnifiedQueryContext context) { - this.parser = buildQueryParser(context.getPlanContext().queryType); + this.parser = + context.getPlanContext().queryType == QueryType.SQL + ? null + : buildQueryParser(context.getPlanContext().queryType); this.context = context; } @@ -52,11 +68,14 @@ public UnifiedQueryPlanner(UnifiedQueryContext context) { * Parses and analyzes a query string into a Calcite logical plan (RelNode). TODO: Generate * optimal physical plan to fully unify query execution and leverage Calcite's optimizer. * - * @param query the raw query string in PPL or other supported syntax + * @param query the raw query string in PPL, SQL, or other supported syntax * @return a logical plan representing the query */ public RelNode plan(String query) { try { + if (context.getPlanContext().queryType == QueryType.SQL) { + return planWithCalcite(query); + } return preserveCollation(analyze(parse(query))); } catch (SyntaxCheckException e) { // Re-throw syntax error without wrapping @@ -66,6 +85,42 @@ public RelNode plan(String query) { } } + /** + * Optimizes a logical plan using the VolcanoPlanner with rules registered by the schema's table + * scan nodes. Adapter-specific pushdown rules (filter, project, aggregate) are applied here. + * + * @param logical the logical plan from {@link #plan(String)} + * @return an optimized plan with adapter-specific physical operators + */ + public RelNode optimize(RelNode logical) { + try { + RelTraitSet targetTraits = logical.getCluster().traitSetOf(EnumerableConvention.INSTANCE); + Program program = Programs.standard(); + // Create a fresh VolcanoPlanner to avoid state conflicts with the HepPlanner from plan() + return program.run( + logical.getCluster().getPlanner(), logical, targetTraits, List.of(), List.of()); + } catch (Exception e) { + throw new IllegalStateException("Failed to optimize plan", e); + } + } + + private RelNode planWithCalcite(String query) throws Exception { + CalcitePlanContext planContext = context.getPlanContext(); + FrameworkConfig sqlConfig = + Frameworks.newConfigBuilder(planContext.config) + .parserConfig(SqlParser.config().withLex(Lex.JAVA)) + .build(); + Planner planner = Frameworks.getPlanner(sqlConfig); + try { + SqlNode parsed = planner.parse(query); + SqlNode validated = planner.validate(parsed); + RelRoot relRoot = planner.rel(validated); + return relRoot.rel; + } finally { + planner.close(); + } + } + private Parser buildQueryParser(QueryType queryType) { if (queryType == QueryType.PPL) { return new PPLSyntaxParser(); diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java index a3ad73f700a..e5d4362b42c 100644 --- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java @@ -63,14 +63,14 @@ public void testMissingQueryType() { UnifiedQueryContext.builder().catalog("opensearch", testSchema).build(); } - @Test(expected = IllegalArgumentException.class) - public void testUnsupportedQueryType() { + @Test + public void testSqlQueryType() { UnifiedQueryContext context = UnifiedQueryContext.builder() - .language(QueryType.SQL) // only PPL is supported for now + .language(QueryType.SQL) .catalog("opensearch", testSchema) .build(); - new UnifiedQueryPlanner(context); + assertNotNull("SQL planner should be created", new UnifiedQueryPlanner(context)); } @Test(expected = IllegalArgumentException.class) diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerOptimizeTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerOptimizeTest.java new file mode 100644 index 00000000000..bebc03248f3 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerOptimizeTest.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; +import static org.apache.calcite.test.Matchers.hasTree; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.List; +import java.util.Map; +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalFilter; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.executor.QueryType; + +/** + * Demonstrates that {@link UnifiedQueryPlanner#optimize(RelNode)} triggers the VolcanoPlanner with + * adapter-specific pushdown rules. Simulates a real Analytics engine that provides a custom + * TableScan with a filter absorption rule. + */ +public class UnifiedQueryPlannerOptimizeTest { + + private UnifiedQueryContext context; + private UnifiedQueryPlanner planner; + + @Before + public void setUp() { + AbstractSchema testSchema = + new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of("test_table", new EngineTable()); + } + }; + context = + UnifiedQueryContext.builder() + .language(QueryType.SQL) + .catalog("catalog", testSchema) + .build(); + planner = new UnifiedQueryPlanner(context); + } + + @After + public void tearDown() throws Exception { + if (context != null) { + context.close(); + } + } + + @Test + public void optimizePushesFilterIntoEngineScan() { + RelNode logical = planner.plan("SELECT name FROM catalog.test_table WHERE id > 10"); + + // Before: EngineTableScan (from TranslatableTable.toRel) with LogicalFilter on top + assertThat( + logical, + hasTree( + """ + LogicalProject(name=[$1]) + LogicalFilter(condition=[>($0, 10)]) + EngineTableScan(table=[[catalog, test_table]]) + """)); + + // After: filter absorbed into EngineTableScan by the pushdown rule + RelNode optimized = planner.optimize(logical); + assertThat( + optimized, + hasTree( + """ + EnumerableCalc(expr#0..1=[{inputs}], name=[$t1]) + EngineTableScan(table=[[catalog, test_table]], filter=[>($0, 10)]) + """)); + } + + // --- Simulated Analytics engine adapter --- + + /** Table that produces EngineTableScan via toRel(). */ + static class EngineTable extends AbstractTable + implements org.apache.calcite.schema.TranslatableTable { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory.builder().add("id", INTEGER).add("name", VARCHAR).build(); + } + + @Override + public RelNode toRel(RelOptTable.ToRelContext ctx, RelOptTable table) { + return new EngineTableScan(ctx.getCluster(), table, null); + } + } + + /** Engine-specific scan node with filter absorption. */ + static class EngineTableScan extends TableScan implements EnumerableRel { + private final RexNode filter; + + EngineTableScan(RelOptCluster cluster, RelOptTable table, RexNode filter) { + super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), List.of(), table); + this.filter = filter; + } + + @Override + public void register(RelOptPlanner planner) { + planner.addRule(EngineFilterAbsorptionRule.INSTANCE); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new EngineTableScan(getCluster(), getTable(), filter); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw).itemIf("filter", filter, filter != null); + } + + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = + PhysTypeImpl.of(implementor.getTypeFactory(), getRowType(), pref.preferArray()); + return implementor.result( + physType, Blocks.toBlock(Expressions.call(Expressions.constant(List.of()), "iterator"))); + } + } + + /** Absorbs LogicalFilter into EngineTableScan. */ + static class EngineFilterAbsorptionRule extends RelOptRule { + static final EngineFilterAbsorptionRule INSTANCE = new EngineFilterAbsorptionRule(); + + EngineFilterAbsorptionRule() { + super(operand(LogicalFilter.class, operand(EngineTableScan.class, none()))); + } + + @Override + public void onMatch(RelOptRuleCall call) { + LogicalFilter filter = call.rel(0); + EngineTableScan scan = call.rel(1); + call.transformTo( + new EngineTableScan(scan.getCluster(), scan.getTable(), filter.getCondition())); + } + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerPplTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerPplTest.java new file mode 100644 index 00000000000..960b79c54f7 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerPplTest.java @@ -0,0 +1,174 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.apache.calcite.sql.type.SqlTypeName.DATE; +import static org.apache.calcite.sql.type.SqlTypeName.INTEGER; +import static org.apache.calcite.sql.type.SqlTypeName.TIME; +import static org.apache.calcite.sql.type.SqlTypeName.TIMESTAMP; +import static org.apache.calcite.sql.type.SqlTypeName.VARBINARY; +import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.calcite.type.AbstractExprRelDataType; +import org.opensearch.sql.executor.QueryType; + +/** + * Phase 2 PoC: Validates PPL RelNode generation against a schema with standard Calcite types. + * + *

Key Area 1 — UDT interference. PPL V3 defines 5 custom UDTs (EXPR_DATE, EXPR_TIME, + * EXPR_TIMESTAMP, EXPR_BINARY, EXPR_IP) backed by VARCHAR. This test validates that when the schema + * uses standard Calcite types (DATE, TIME, TIMESTAMP, VARBINARY, VARCHAR), the RelNode output + * preserves those standard types without UDT contamination. + */ +public class UnifiedQueryPlannerPplTest extends UnifiedQueryTestBase { + + @Override + @Before + public void setUp() { + testSchema = + new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of("parquet_index", createParquetIndexTable()); + } + }; + + context = + UnifiedQueryContext.builder() + .language(QueryType.PPL) + .catalog(DEFAULT_CATALOG, testSchema) + .build(); + planner = new UnifiedQueryPlanner(context); + } + + /** Schema covers all 5 UDT-corresponding standard Calcite types. */ + private Table createParquetIndexTable() { + return SimpleTable.builder() + .col("ts", TIMESTAMP) + .col("dt", DATE) + .col("tm", TIME) + .col("data", VARBINARY) + .col("ip_addr", VARCHAR) + .col("status", INTEGER) + .col("message", VARCHAR) + .build(); + } + + @Test + public void testFilterAndProject() { + RelNode plan = + planner.plan( + "source = catalog.parquet_index | where status = 200 | fields ts, status, message"); + assertNotNull(plan); + } + + @Test + public void testAggregate() { + RelNode plan = planner.plan("source = catalog.parquet_index | stats count() by status"); + assertNotNull(plan); + } + + @Test + public void testSortAndLimit() { + RelNode plan = planner.plan("source = catalog.parquet_index | sort ts | head 100"); + assertNotNull(plan); + } + + // --- Key Area 1: Verify standard types are preserved, no UDT contamination --- + + @Test + public void testTimestampColumnPreservesStandardType() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields ts"); + assertStandardType(plan, "ts", SqlTypeName.TIMESTAMP); + } + + @Test + public void testDateColumnPreservesStandardType() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields dt"); + assertStandardType(plan, "dt", SqlTypeName.DATE); + } + + @Test + public void testTimeColumnPreservesStandardType() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields tm"); + assertStandardType(plan, "tm", SqlTypeName.TIME); + } + + @Test + public void testVarbinaryColumnPreservesStandardType() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields data"); + assertStandardType(plan, "data", SqlTypeName.VARBINARY); + } + + @Test + public void testVarcharColumnPreservesStandardType() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields ip_addr"); + assertStandardType(plan, "ip_addr", SqlTypeName.VARCHAR); + } + + @Test + public void testHourFunctionReturnsInteger() { + RelNode plan = planner.plan("source = catalog.parquet_index | eval hour = hour(ts)"); + assertStandardType(plan, "hour", SqlTypeName.INTEGER); + } + + @Test + public void testDayFunctionReturnsInteger() { + RelNode plan = planner.plan("source = catalog.parquet_index | eval day = day(dt)"); + assertStandardType(plan, "day", SqlTypeName.INTEGER); + } + + @Test + public void testTimestampFilter() { + RelNode plan = planner.plan("source = catalog.parquet_index | where ts > '2024-01-01'"); + assertStandardType(plan, "ts", SqlTypeName.TIMESTAMP); + } + + /** + * Asserts the named field in the RelNode output has the expected standard SqlTypeName and is NOT + * a UDT. + */ + private void assertStandardType(RelNode plan, String fieldName, SqlTypeName expectedType) { + assertNotNull("Plan should not be null", plan); + RelDataTypeField field = findField(plan.getRowType(), fieldName); + assertNotNull("Field '" + fieldName + "' should exist in output", field); + RelDataType type = field.getType(); + assertFalse( + "Field '" + + fieldName + + "' should NOT be a UDT (got " + + type.getClass().getSimpleName() + + ")", + type instanceof AbstractExprRelDataType); + assertEquals( + "Field '" + fieldName + "' should have standard type " + expectedType, + expectedType, + type.getSqlTypeName()); + } + + private RelDataTypeField findField(RelDataType rowType, String name) { + List fields = rowType.getFieldList(); + for (RelDataTypeField f : fields) { + if (f.getName().equalsIgnoreCase(name)) { + return f; + } + } + return null; + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerSqlTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerSqlTest.java new file mode 100644 index 00000000000..ddaf60019ea --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryPlannerSqlTest.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; + +import org.apache.calcite.rel.RelNode; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.executor.QueryType; + +public class UnifiedQueryPlannerSqlTest extends UnifiedQueryTestBase { + + @Override + @Before + public void setUp() { + testSchema = + new org.apache.calcite.schema.impl.AbstractSchema() { + @Override + protected java.util.Map getTableMap() { + return java.util.Map.of("employees", createEmployeesTable()); + } + }; + + context = + UnifiedQueryContext.builder() + .language(QueryType.SQL) + .catalog(DEFAULT_CATALOG, testSchema) + .build(); + planner = new UnifiedQueryPlanner(context); + } + + @Test + public void testSelectWithFilter() { + RelNode plan = planner.plan("SELECT id, name FROM catalog.employees WHERE age > 30"); + assertNotNull("Plan should be created for SELECT with filter", plan); + } + + @Test + public void testAggregate() { + RelNode plan = + planner.plan("SELECT department, count(*) FROM catalog.employees GROUP BY department"); + assertNotNull("Plan should be created for aggregate query", plan); + } + + @Test + public void testSortWithLimit() { + RelNode plan = planner.plan("SELECT * FROM catalog.employees ORDER BY age LIMIT 10"); + assertNotNull("Plan should be created for sort with limit", plan); + } + + @Test + public void testCaseToFilterRewrite() { + RelNode plan = + planner.plan( + "SELECT department, SUM(CASE WHEN age > 30 THEN 1 ELSE 0 END)" + + " FROM catalog.employees GROUP BY department"); + assertNotNull("Plan should be created with CASE-to-FILTER rewrite", plan); + } + + @Test + public void testInvalidSqlSyntax() { + assertThrows(IllegalStateException.class, () -> planner.plan("SELEC invalid syntax")); + } +} diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryRelNodeDemoTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryRelNodeDemoTest.java new file mode 100644 index 00000000000..4b882512369 --- /dev/null +++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryRelNodeDemoTest.java @@ -0,0 +1,266 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.api; + +import static org.apache.calcite.sql.type.SqlTypeName.*; +import static org.junit.Assert.*; + +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.externalize.RelJsonWriter; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.sql.executor.QueryType; + +/** + * Demonstrates the RelNode object tree via Calcite's RelJsonWriter JSON dump. Each test asserts the + * full JSON to clearly show what's in the RelNode for PoC demo purposes. + */ +public class UnifiedQueryRelNodeDemoTest extends UnifiedQueryTestBase { + + @Override + @Before + public void setUp() { + testSchema = + new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "parquet_index", + SimpleTable.builder() + .col("ts", TIMESTAMP) + .col("dt", DATE) + .col("tm", TIME) + .col("data", VARBINARY) + .col("ip_addr", VARCHAR) + .col("status", INTEGER) + .col("message", VARCHAR) + .build()); + } + }; + context = + UnifiedQueryContext.builder() + .language(QueryType.PPL) + .catalog(DEFAULT_CATALOG, testSchema) + .build(); + planner = new UnifiedQueryPlanner(context); + } + + /** + * LogicalTableScan is a standard Calcite node referencing ["catalog", "parquet_index"]. No custom + * OpenSearch scan node — the Analytics engine will replace this with its own physical scan. + */ + @Test + public void demoTableScanIsStandardCalcite() { + RelNode plan = planner.plan("source = catalog.parquet_index | fields status"); + assertEquals( + """ + { + "rels": [ + { + "id": "0", + "relOp": "LogicalTableScan", + "table": [ + "catalog", + "parquet_index" + ], + "inputs": [] + }, + { + "id": "1", + "relOp": "LogicalProject", + "fields": [ + "status" + ], + "exprs": [ + { + "input": 5, + "name": "$5" + } + ] + } + ] + }\ + """, + toRelJson(plan)); + } + + /** + * HOUR is registered as a PPL UDF (UserDefinedFunctionBuilder) but produces standard Calcite + * INTEGER type — no UDT. The Analytics engine sees a standard function call it can evaluate. + */ + @Test + public void demoDatetimeFunctionProducesStandardTypes() { + RelNode plan = + planner.plan("source = catalog.parquet_index | eval hour = hour(ts) | fields hour, status"); + assertEquals( + """ + { + "rels": [ + { + "id": "0", + "relOp": "LogicalTableScan", + "table": [ + "catalog", + "parquet_index" + ], + "inputs": [] + }, + { + "id": "1", + "relOp": "LogicalProject", + "fields": [ + "hour", + "status" + ], + "exprs": [ + { + "op": { + "name": "HOUR", + "kind": "OTHER_FUNCTION", + "syntax": "FUNCTION" + }, + "operands": [ + { + "input": 0, + "name": "$0" + } + ], + "class": "org.opensearch.sql.expression.function.UserDefinedFunctionBuilder$1", + "type": { + "type": "INTEGER", + "nullable": true + }, + "deterministic": true, + "dynamic": false + }, + { + "input": 5, + "name": "$5" + } + ] + } + ] + }\ + """, + toRelJson(plan)); + } + + /** + * match() is a PPL full-text search UDF with MAP-typed arguments (field name and query string). + * The Analytics engine must handle or reject this — it's not a standard Calcite/SQL function. + */ + @Test + public void demoMatchFunctionIsPplUdf() { + RelNode plan = + planner.plan( + "source = catalog.parquet_index | where match(message, 'error') | fields message"); + assertEquals( + """ + { + "rels": [ + { + "id": "0", + "relOp": "LogicalTableScan", + "table": [ + "catalog", + "parquet_index" + ], + "inputs": [] + }, + { + "id": "1", + "relOp": "LogicalFilter", + "condition": { + "op": { + "name": "match", + "kind": "OTHER_FUNCTION", + "syntax": "FUNCTION" + }, + "operands": [ + { + "op": { + "name": "MAP", + "kind": "MAP_VALUE_CONSTRUCTOR", + "syntax": "SPECIAL" + }, + "operands": [ + { + "literal": "field", + "type": { + "type": "CHAR", + "nullable": false, + "precision": 5 + } + }, + { + "input": 6, + "name": "$6" + } + ] + }, + { + "op": { + "name": "MAP", + "kind": "MAP_VALUE_CONSTRUCTOR", + "syntax": "SPECIAL" + }, + "operands": [ + { + "literal": "query", + "type": { + "type": "CHAR", + "nullable": false, + "precision": 5 + } + }, + { + "literal": "error", + "type": { + "type": "VARCHAR", + "nullable": false, + "precision": -1 + } + } + ] + } + ], + "class": "org.opensearch.sql.expression.function.UserDefinedFunctionBuilder$1", + "type": { + "type": "BOOLEAN", + "nullable": false + }, + "deterministic": true, + "dynamic": false + } + }, + { + "id": "2", + "relOp": "LogicalProject", + "fields": [ + "message" + ], + "exprs": [ + { + "input": 6, + "name": "$6" + } + ] + } + ] + }\ + """, + toRelJson(plan)); + } + + private String toRelJson(RelNode plan) { + RelJsonWriter writer = new RelJsonWriter(); + plan.explain(writer); + return writer.asString(); + } +} diff --git a/core/build.gradle b/core/build.gradle index 6dcd0b9e1f8..85fdd1e7ee8 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -63,6 +63,8 @@ dependencies { } api 'org.apache.calcite:calcite-linq4j:1.41.0' api project(':common') + compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") + testImplementation files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") implementation "com.github.seancfoley:ipaddress:5.4.2" implementation "com.jayway.jsonpath:json-path:2.9.0" diff --git a/core/src/main/java/org/opensearch/sql/executor/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/AnalyticsExecutionEngine.java new file mode 100644 index 00000000000..42de154d57f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/AnalyticsExecutionEngine.java @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.sql.SqlExplainLevel; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Execution engine that hands off RelNode plans to the Analytics engine for execution. Uses {@link + * QueryPlanExecutor} to execute plans directly in-process (same JVM, shared classloader via plugin + * extension). + */ +public class AnalyticsExecutionEngine implements ExecutionEngine { + + private final QueryPlanExecutor> planExecutor; + + public AnalyticsExecutionEngine(QueryPlanExecutor> planExecutor) { + this.planExecutor = planExecutor; + } + + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + try { + Iterable results = planExecutor.execute(plan, context); + List fields = plan.getRowType().getFieldList(); + + List columns = + fields.stream() + .map( + f -> + new Schema.Column( + f.getName(), + null, + OpenSearchTypeFactory.convertRelDataTypeToExprType(f.getType()))) + .toList(); + + List rows = new ArrayList<>(); + for (Object[] row : results) { + LinkedHashMap valueMap = new LinkedHashMap<>(); + for (int i = 0; i < fields.size(); i++) { + valueMap.put(fields.get(i).getName(), ExprValueUtils.fromObjectValue(row[i])); + } + rows.add(ExprTupleValue.fromExprValueMap(valueMap)); + } + + listener.onResponse(new QueryResponse(new Schema(columns), rows, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void explain( + RelNode plan, + ExplainMode mode, + CalcitePlanContext context, + ResponseListener listener) { + try { + SqlExplainLevel level = + mode == ExplainMode.SIMPLE + ? SqlExplainLevel.NO_ATTRIBUTES + : SqlExplainLevel.EXPPLAN_ATTRIBUTES; + String logical = RelOptUtil.toString(plan, level); + listener.onResponse(new ExplainResponse(new ExplainResponseNodeV2(logical, null, null))); + } catch (Exception e) { + listener.onFailure(e); + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/AnalyticsExecutionEngineTest.java new file mode 100644 index 00000000000..a175285c2a9 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/AnalyticsExecutionEngineTest.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelRecordType; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.response.ResponseListener; + +class AnalyticsExecutionEngineTest { + + private final AnalyticsExecutionEngine engine = + new AnalyticsExecutionEngine((plan, context) -> java.util.Collections.emptyList()); + + @Test + void executeRelNodeReturnsEmptyResult() { + RelNode plan = mock(RelNode.class); + RelDataType rowType = new RelRecordType(java.util.List.of()); + org.mockito.Mockito.when(plan.getRowType()).thenReturn(rowType); + CalcitePlanContext context = mock(CalcitePlanContext.class); + AtomicReference result = new AtomicReference<>(); + + engine.execute( + plan, + context, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + result.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Should not fail", e); + } + }); + + assertNotNull(result.get()); + assertTrue(result.get().getResults().isEmpty()); + assertTrue(result.get().getSchema().getColumns().isEmpty()); + } + + @Test + void explainRelNodeReturnsLogicalPlan() { + RelNode plan = mock(RelNode.class); + RelDataType rowType = new RelRecordType(java.util.List.of()); + org.mockito.Mockito.when(plan.getRowType()).thenReturn(rowType); + CalcitePlanContext context = mock(CalcitePlanContext.class); + AtomicReference result = new AtomicReference<>(); + + engine.explain( + plan, + ExplainMode.STANDARD, + context, + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.ExplainResponse response) { + result.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Should not fail", e); + } + }); + + assertNotNull(result.get()); + assertNotNull(result.get().getCalcite().getLogical()); + } + + @Test + void executePhysicalPlanFails() { + AtomicReference error = new AtomicReference<>(); + + engine.execute( + mock(org.opensearch.sql.planner.physical.PhysicalPlan.class), + new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + throw new AssertionError("Should not succeed"); + } + + @Override + public void onFailure(Exception e) { + error.set(e); + } + }); + + assertNotNull(error.get()); + assertEquals(UnsupportedOperationException.class, error.get().getClass()); + } +} diff --git a/docs/plans/poc-option-b-result.md b/docs/plans/poc-option-b-result.md new file mode 100644 index 00000000000..7ed1d50a943 --- /dev/null +++ b/docs/plans/poc-option-b-result.md @@ -0,0 +1,107 @@ +# PoC Result: Unified Query Pipeline (Option B) + +## Summary + +This PoC validates the end-to-end flow of routing SQL and PPL queries against non-Lucene indices (e.g., Parquet-backed) through the SQL/PPL plugin to the Analytics engine for execution. The SQL plugin parses queries using either Calcite's native SQL parser or the existing PPL V3 Calcite path, generates a logical `RelNode` plan against a schema built from cluster state index mappings, and hands the plan off to the Analytics engine's `QueryPlanExecutor` for execution. The integration uses OpenSearch's `extendedPlugins` mechanism — the SQL plugin declares `analytics-engine` as an extended plugin, which shares the Calcite classloader between both plugins, enabling direct in-process `RelNode` passing without serialization. Seven integration tests verify the full round-trip: REST request → query routing → RelNode generation → Analytics engine handoff → JDBC-formatted response, with the real analytics-engine plugin loaded in the test cluster. + +## Key Area 1: UDT/UDF Impact on RelNode Processing + +PPL V3's custom datetime UDTs (`EXPR_TIMESTAMP`, `EXPR_DATE`, `EXPR_TIME`, backed by VARCHAR) do **not** contaminate the RelNode when the schema uses standard Calcite types. The `OpenSearchSchemaBuilder` from the Analytics engine maps index fields to standard `SqlTypeName` (e.g., `date` → `TIMESTAMP`), bypassing `OpenSearchTypeFactory` entirely. Datetime functions like `hour()` and `day()` resolve correctly via `PPLTypeChecker` and return standard `INTEGER` type. The UDT system only activates when schemas are built through `OpenSearchTypeFactory` — which the Analytics engine path does not use. + +## Key Area 2: RelNode Handoff Mechanism + +Serialization is **not needed**. Each OpenSearch plugin gets its own `URLClassLoader`, so `RelNode` from one plugin is a different class than in another — unless one plugin extends the other. By declaring `extendedPlugins = ['analytics-engine']`, the SQL plugin inherits the Analytics engine's classloader as parent, sharing the same Calcite classes. The `QueryPlanExecutor.execute(RelNode, context)` call passes the object directly in-process. Calcite's JSON serializer (`RelJsonWriter`/`RelJsonReader`) remains a viable fallback if cross-node execution is needed in the future. + +## Key Area 3: Optimization Boundary + +The RelNode produced by the SQL plugin is a **pure logical plan** with standard Calcite nodes (e.g., `LogicalProject`, `LogicalFilter`, `LogicalTableScan`). Optimization is split into two layers with a clean boundary: + +**SQL plugin (language-specific logical rewrites)**: The SQL plugin applies engine-agnostic HepPlanner rules before handoff. These simplify PPL/SQL-generated patterns without knowledge of the execution engine: +- `PPLAggregateConvertRule` — rewrite `SUM(field+N)` → `SUM(field)+N*COUNT()` +- `PPLAggGroupMergeRule` — merge redundant group-by fields +- `PPLSimplifyDedupRule` — simplify dedup pattern into `LogicalDedup` +- `FilterMergeRule` — merge adjacent filters + +Currently `UnifiedQueryPlanner.plan()` does NOT apply these rules. The fix is to call `CalciteToolsHelper.optimize()` before returning the RelNode. + +**Analytics engine (engine-specific physical optimization)**: The Analytics engine receives the optimized logical plan and runs its own VolcanoPlanner to produce a physical plan. It replaces `LogicalTableScan` with its own physical scan node (e.g., `DataFusionTableScan`) via a conversion rule — the same pattern as `CalciteLogicalIndexScan` replaces `LogicalTableScan` for the Lucene path. Pushdown rules (filter, project, aggregate, sort) are registered via the scan node's `register(RelOptPlanner)` method. + +**Fail-fast for unsupported operations**: The Analytics engine's `QueryPlanExecutor.execute()` throws a clear exception for unsupported operations (e.g., `match()` on Parquet). The SQL plugin catches and returns an error response. This is simpler and more robust than a pre-validation pass, since the engine knows best what it supports. + +## Responsibilities + +### SQL/PPL Plugin + +1. **Query routing**: Detect non-Lucene indices (e.g., by index settings) and route to the unified query pipeline instead of the existing V2/V3 engine +2. **Parsing**: Parse SQL via Calcite's native `SqlParser` → `SqlValidator` → `SqlToRelConverter`, or PPL via ANTLR → `CalciteRelNodeVisitor`, producing a logical `RelNode` +3. **Logical optimization**: Apply engine-agnostic HepPlanner rules (FilterMerge, PPL-specific rewrites) to simplify the logical plan before handoff +4. **Handoff**: Pass the optimized logical `RelNode` to the Analytics engine's `QueryPlanExecutor.execute()` +5. **Response formatting**: Convert `Iterable` results from the Analytics engine into JDBC/CSV/RAW response using existing `JdbcResponseFormatter` +6. **Error handling**: Catch exceptions from the Analytics engine (unsupported operations, execution failures) and return appropriate error responses +7. **Observability**: Log planning and execution time; increment request/failure metrics (`REQ_TOTAL`, `REQ_COUNT_TOTAL`, `FAILED_REQ_COUNT_SYS`) +8. **Thread management**: Schedule execution on `sql-worker` thread pool to avoid blocking transport threads + +### Analytics Engine Plugin + +1. **Schema provisioning**: Provide `OpenSearchSchemaBuilder` to build Calcite `SchemaPlus` from cluster state index mappings with standard Calcite types (no UDTs) +2. **Physical optimization**: Replace `LogicalTableScan` with engine-specific physical scan node, register pushdown rules (filter, project, aggregate, sort, limit) via `TableScan.register()` +3. **Execution**: Execute the physical plan via back-end engines (DataFusion, etc.) and return `Iterable` result rows +4. **Fail-fast**: Throw clear exceptions for unsupported operations (e.g., full-text search functions on Parquet) +5. **Classloader hosting**: Bundle Calcite and all transitive runtime dependencies as the classloader parent for extending plugins + +## API Contract + +### Query Routing Rule + +``` +if index storage type is non-Lucene (e.g., Parquet): + → UnifiedQueryPlanner → RelNode → AnalyticsExecutionEngine +else: + → existing V2/V3 engine (Lucene path, unchanged) +``` + +Currently detected by naming convention (`parquet_` prefix). In production, this would check index settings or metadata to determine the storage engine. + +### SchemaBuilder (Analytics Engine → SQL Plugin) + +```java +// Analytics engine provides schema from cluster state +SchemaPlus schema = OpenSearchSchemaBuilder.buildSchema(clusterService.state()); + +// Type mapping: OpenSearch field type → standard Calcite SqlTypeName +// keyword/text/ip → VARCHAR +// long → BIGINT, integer → INTEGER, short → SMALLINT, byte → TINYINT +// double → DOUBLE, float → FLOAT, boolean → BOOLEAN +// date → TIMESTAMP +// nested/object → skipped +``` + +### QueryPlanExecutor (Analytics Engine → SQL Plugin) + +```java +// SQL plugin generates RelNode, Analytics engine executes it +QueryPlanExecutor> executor; +Iterable results = executor.execute(relNode, context); + +// Each Object[] is one row, column order matches relNode.getRowType().getFieldList() +``` + +### Query Result Format (SQL Plugin → Client) + +```json +{ + "schema": [ + {"name": "status", "type": "integer"}, + {"name": "message", "type": "keyword"} + ], + "datarows": [ + [200, "OK"], + [500, "Error"] + ], + "total": 2, + "size": 2, + "status": 200 +} +``` + +Schema column types derived from `RelNode.getRowType()` via `OpenSearchTypeFactory.convertRelDataTypeToExprType()`. Result rows converted from `Iterable` to `List` (TODO in current PoC — stub returns empty results). Formatted by existing `JdbcResponseFormatter`. diff --git a/docs/plans/poc-option-b.md b/docs/plans/poc-option-b.md new file mode 100644 index 00000000000..4ec45a99602 --- /dev/null +++ b/docs/plans/poc-option-b.md @@ -0,0 +1,190 @@ +# PoC Plan: Option B (Unified Query Pipeline) + +**Goal**: Validate the end-to-end flow of PPL and SQL queries against a Parquet-backed index through the SQL/PPL plugin, with RelNode handoff to the analytics-plugin. + +## Purpose + +This PoC validates the feasibility of integrating the SQL/PPL plugin with the Analytics engine for Parquet-backed index queries. Beyond the end-to-end flow, it focuses on two key investigation areas: + +### Key Area 1: UDT/UDF Impact on RelNode Processing in Analytics Engine + +PPL V3 introduces custom User-Defined Types (UDTs) — notably `EXPR_TIMESTAMP` from `OpenSearchTypeFactory` — and User-Defined Functions (UDFs) that depend on these UDTs. These override Calcite's standard datetime handling. The PoC must determine whether these UDT/UDF overrides interfere with RelNode processing in the Analytics engine, which expects standard Calcite types. Specifically: +- Does the Analytics engine's `OpenSearchSchemaBuilder` produce schemas compatible with PPL V3's UDT-based type system? +- Can RelNode trees containing UDT-typed expressions be correctly processed by the Analytics engine's `QueryPlanExecutor`? +- Do UDF implementations that depend on UDTs (e.g., datetime functions) produce valid RelNode subtrees that survive optimization and execution in the Analytics engine? + +### Key Area 2: RelNode Handoff Mechanism + +The handoff between the SQL/PPL plugin and the Analytics engine requires passing a `RelNode` for execution. The PoC must determine the right communication mechanism: +- Both plugins run in the same JVM — is direct in-process invocation sufficient, or is a transport action needed? +- If in-process: how does the SQL plugin obtain `QueryPlanExecutor` from the Analytics engine given their separate Guice injectors? +- If transport action: can Calcite's built-in JSON serializer (`RelJsonWriter` / `RelJsonReader`) round-trip the RelNode trees reliably, including custom PPL operators? + +## References + +- **Analytics Engine (OpenSearch repo)**: https://github.com/opensearch-project/OpenSearch/tree/main/sandbox/plugins + - `analytics-engine/` — Hub plugin implementing `ExtensiblePlugin`, discovers and wires extensions via Guice + - `analytics-backend-datafusion/` — DataFusion native execution backend (stub) + - `sandbox/libs/analytics-framework/` — Shared SPI library with interfaces: `QueryPlanExecutor`, `SchemaProvider`, `EngineBridge`, `AnalyticsBackEndPlugin`, `AnalyticsFrontEndPlugin` +- **ANSI SQL on Calcite PoC**: https://github.com/dai-chen/sql-1/tree/poc/unified-sql-support + - Adds `ANSI_SQL` query type using Calcite's native `SqlParser` → `SqlValidator` → `SqlToRelConverter` → `RelNode` + - Registers `match_phrase` as custom `SqlBasicFunction` in Calcite operator table + - Known issue: `EXPR_TIMESTAMP` UDT from `OpenSearchTypeFactory` is incompatible with Calcite's `SqlTypeFactoryImpl` — directly relevant to Key Area 1 + - Only `match_phrase` registered; other search functions (`match`, `query_string`, etc.) not yet handled + - Useful reference for REST handler integration: `RestUnifiedSQLQueryAction` wiring, thread pool, response formatting + +## Assumptions + +- No real DataFusion execution needed — mock/stub returning hardcoded rows is sufficient. +- No real Parquet data needed — mock schema with hardcoded results. +- RelNode serialization format for transport action is TBD — investigate Calcite JSON serializer first, fall back to Java serialization if needed. +- The PoC targets PPL V3 Calcite path only (not V2 fallback). +- The Analytics engine code in `sandbox/plugins` of the OpenSearch repo is available. The PoC will attempt to depend on it directly via published artifacts or Gradle composite build; if not feasible, copy the required SPI interfaces from `analytics-framework`. + +## Module Placement + +| New code | Module | Rationale | +|---|---|---| +| Routing logic + REST handler | `plugin/` or `legacy/` (where `RestSqlAction` lives) | Extends existing REST handlers. Copied from `RestUnifiedSQLQueryAction` and refactored to reuse existing default response formatter. | +| Analytics engine SPI interfaces | Depend on `analytics-framework` directly, or copy into new `analytics-engine-stub/` module | Prefer direct dependency. Copy only if cross-repo dependency is not feasible for PoC. | +| Mock Schema adapter, EngineCapabilities, absorption rules | New `analytics-engine-stub/` module (if copying) or test fixtures | Clear boundary — these are external dependencies. | +| Stub analytics-plugin transport action | Test plugin in `integ-test/` | Only needed for IT, not shipped | +| Integration tests | `integ-test/` | End-to-end REST round-trip tests | +| Unit tests | `core/`, `api/`, per module | Schema, EngineCapabilities, RelNode generation | + +## Test Strategy + +- **Unit tests**: In each module (`core/`, `api/`) for Schema, EngineCapabilities, RelNode generation, absorption rules. +- **Integration tests**: In `integ-test/` for end-to-end REST round-trips. All test queries in the table below should be covered as ITs. +- **Phase 2 establishes the IT scaffold first** (ANSI SQL RelNode + explain), then subsequent phases add PPL and routing. ITs should pass at every phase. + +## Prerequisites: Analytics Engine Dependencies + +The Analytics engine is already available in the OpenSearch repo under `sandbox/plugins/`. The SPI interfaces live in `sandbox/libs/analytics-framework/` (Calcite 1.41.0). + +**Preferred approach**: Depend on `analytics-framework` directly as a Gradle dependency (if published as a Maven artifact or via composite build). + +**Fallback approach**: Copy the SPI interfaces into a local `analytics-engine-stub/` module. Required interfaces from `analytics-framework`: +- `EngineCapabilities.java` — operator/function support checker +- `QueryPlanExecutor.java` — execution entry point (takes RelNode, returns rows) +- `SchemaProvider.java` — builds Calcite SchemaPlus from cluster state +- `EngineBridge.java` — JNI boundary interface for native execution +- `AnalyticsBackEndPlugin.java` — backend plugin SPI + +From `analytics-engine` plugin (for reference/mock): +- `OpenSearchSchemaBuilder.java` — ClusterState → Calcite SchemaPlus with type mappings +- `AnalyticsPlugin.java` — hub plugin that discovers extensions via Guice + +## Phases + +### Phase 1: ANSI SQL RelNode Generation (U-2, U-3, U-4) + +**Work**: +- Add ANSI SQL support logic based on the Calcite native SQL path from [`UnifiedQueryPlanner`](https://github.com/dai-chen/sql-1/blob/poc/unified-sql-support/api/src/main/java/org/opensearch/sql/api/UnifiedQueryPlanner.java): `SqlParser` → `SqlValidator` → `SqlToRelConverter` → `RelNode`, plus `AGGREGATE_CASE_TO_FILTER` HepPlanner rule for filter aggregation pushdown. +- Wire in a mock Schema for unit testing (hardcoded Parquet index with representative column types). This mock will be replaced by `SchemaProvider` from the Analytics engine in Phase 4. +- Verify full-text search functions (`match`, `match_phrase`, `query_string`) are handled correctly in absorption rules. +- Verify datetime functions work correctly with standard Calcite types (Key Area 1: UDT concern). + +**Done when**: SQL queries generate correct RelNode. Search functions are absorbed or fail-fast as expected. Unit tests validate RelNode generation (using mock schema). + +### Phase 2: PPL RelNode Generation (B-1, U-1, U-3, U-4) + +**Work**: +- Use the existing PPL V3 Calcite path to generate `RelNode` from PPL queries. +- Wire in the same mock Schema from Phase 1. +- Apply logical optimization with absorption rules. +- Verify that unsupported operations are rejected based on `EngineCapabilities` (fail-fast behavior). +- Verify that PPL V3's custom datetime UDT overrides do not interfere when the schema uses standard Calcite types (Key Area 1). + +**Done when**: PPL queries generate correct RelNode with absorption rules applied. Unsupported operations fail fast. Datetime functions work without UDT interference. Unit tests pass. + +### Phase 3: Query Routing & REST Handler (A-1, A-2, A-4, A-5, M-6) + +**Work**: +- Copy `RestUnifiedSQLQueryAction.java` from the [ANSI SQL PoC](https://github.com/dai-chen/sql-1/blob/poc/unified-sql-support/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedSQLQueryAction.java) into `plugin/` or `legacy/`. +- Refactor to reuse the existing default response formatter instead of the custom `formatAsJdbc` method — use the existing JDBC response format infrastructure in the `protocol/` module. +- Add routing logic to extract the target index name and resolve Lucene vs Parquet (hardcoded flag or mock index setting). +- For Lucene indices, fall through to existing pipeline. For Parquet, enter the new path using `UnifiedQueryPlanner`. +- Wire the Parquet path end-to-end with a stub that returns hardcoded schema + rows. +- Support basic explain API (`_explain` endpoint) for the Parquet path. +- Add integration tests in `integ-test/` covering the full REST round-trip (query, explain, error cases). + +**Done when**: `POST _plugins/_ppl` and `POST _plugins/_sql` return a valid JDBC-formatted response for a Parquet index query using the existing response formatter. `_explain` returns a plan. Lucene queries are unaffected. ITs pass. + +### Phase 4: Analytics Engine Integration (M-3, M-4) + +**Reference**: [Analytics engine sandbox plugins](https://github.com/opensearch-project/OpenSearch/tree/main/sandbox/plugins) + +**Work**: +- Determine dependency approach: depend on `analytics-framework` directly from the OpenSearch repo, or copy SPI interfaces into `analytics-engine-stub/`. +- If direct dependency: add `analytics-framework` as a Gradle dependency and implement `AnalyticsFrontEndPlugin` SPI. +- If copy: copy SPI interfaces and implement mock `SchemaProvider` using `OpenSearchSchemaBuilder` as reference. +- Implement `EngineCapabilities` declaring supported operators/functions. +- Implement absorption rules (filter, project, sort, aggregate pushdown) gated by `EngineCapabilities`. +- Replace mock Schema from Phase 1/2 with the Analytics engine's schema infrastructure. + +**Done when**: Schema can be registered in a Calcite `FrameworkConfig` and used to plan queries. Unit tests validate type mapping and EngineCapabilities. Phase 3 ITs updated to use real schema and still pass. + +### Phase 5: Plugin Extension and RelNode Handoff (B-2, B-3) + +**Investigation findings** (replaces original Phase 5 and 6): +- The Analytics engine plugin (`AnalyticsPlugin`) uses `ExtensiblePlugin` for SPI discovery of back-end engines. It registers NO transport actions. +- `QueryPlanExecutor>` and `EngineContext` are bound via `createGuiceModules()` into the **node-level Guice injector**. +- The SQL plugin creates its **own private Guice injector** in `createComponents()`, isolated from the node-level injector. Direct `@Inject` across plugins does not work without plugin extension. +- **Classloader isolation**: Each OpenSearch plugin gets its own `URLClassLoader`. Even though both plugins bundle `calcite-core:1.41.0`, the `RelNode` class is different in each classloader → `ClassCastException` if passed directly between unrelated plugins. +- **Plugin extension solves both problems**: If the SQL plugin declares `extendedPlugins = ['analytics-engine']`, the Analytics engine's classloader becomes a parent of the SQL plugin's classloader. Both then share the same Calcite classes → RelNode can be passed directly, no serialization needed. The Guice bindings also become accessible. +- **Serialization as fallback**: Calcite's `RelJsonWriter`/`RelJsonReader` (1.41.0) can round-trip standard RelNode trees. Custom PPL operators need operator table configuration on the reader side. This is the fallback if plugin extension is not feasible. + +**Recommended production integration**: +1. SQL plugin declares `extendedPlugins = ['analytics-engine']` in `plugin-descriptor.properties` +2. SQL plugin's classloader inherits Analytics engine's Calcite classes — no classloader conflict +3. SQL plugin obtains `QueryPlanExecutor` and `EngineContext` via Guice (both in node-level injector) +4. RelNode passed directly in-process — no serialization needed + +**PoC work** (real Analytics plugin not available as published artifact for test cluster): +- Verify Calcite JSON serialization round-trips the RelNode trees from Phase 1/2 as fallback validation. +- Keep the stub `AnalyticsExecutionEngine` with `QueryPlanExecutor` interface for the PoC. +- Document the plugin extension integration pattern for production. + +### Phase 6 (Optional): Cross-Cutting Concerns (TBD-1, TBD-2, TBD-3) + +**Work**: +- **Resource management**: Verify `querySizeLimit` is enforced on the Parquet path. Identify where circuit breaker and timeout should be added (document findings, not full implementation). +- **Monitoring**: Verify existing request/failure metrics capture Parquet queries. Identify gaps. +- **Security**: Verify index-level auth during routing. Document the DLS/FLS gap. + +**Done when**: Findings documented. No implementation required — this phase produces a gap analysis for the LLD. + +## Test Queries + +All test queries should be covered as integration tests in `integ-test/`. + +| Query | What it validates | +|---|---| +| `source = parquet_index \| where status = 200 \| fields timestamp, status, message` | Filter + project pushdown | +| `source = parquet_index \| stats count() by status` | Aggregate pushdown | +| `source = parquet_index \| sort timestamp \| head 100` | Sort + limit pushdown | +| `source = parquet_index \| where match(message, 'error')` | Search function — should fail-fast on Parquet | +| `source = parquet_index \| eval new_field = upper(message)` | Function not in EngineCapabilities — test fail-fast vs post-process | +| `source = parquet_index \| eval hour = hour(timestamp) \| where timestamp > '2024-01-01'` | Datetime functions with standard Calcite TIMESTAMP type (no UDT) | +| `source = parquet_index \| stats count() by span(timestamp, 1h)` | Time span aggregation against native Calcite timestamp | +| `SELECT timestamp, status FROM parquet_index WHERE status = 200` | SQL filter + project | +| `SELECT count(*) FROM parquet_index GROUP BY status` | SQL aggregate | +| `SELECT * FROM parquet_index WHERE match(message, 'error')` | SQL search function handling | + +## Success Criteria + +- PPL and SQL queries against a Parquet index return correct results through `_plugins/_ppl` and `_plugins/_sql` endpoints. +- Lucene queries are unaffected (no regression). +- Response format matches existing Lucene query responses (reuses existing default response formatter). +- Unsupported operations produce clear error messages (fail-fast validated). +- Explain API returns meaningful output for Parquet queries. +- Datetime functions work correctly with standard Calcite types (Key Area 1: no UDT interference confirmed). +- RelNode handoff mechanism determined (Key Area 2: direct in-process invocation via QueryPlanExecutor, no serialization needed). +- All test queries pass as ITs in `integ-test/`. + +## Dependencies on Analytics Engine Team + +- `EngineCapabilities` baseline: which operators/functions are supported in the initial release? +- `analytics-framework` artifact availability: is it published to Maven for cross-repo dependency, or do we need composite build / copy? +- Guice integration pattern: how should the SQL plugin obtain `QueryPlanExecutor` and `EngineContext` from the Analytics engine? Options: (a) refactor SQL plugin to use `createGuiceModules()` so both are in the node-level injector, (b) service locator pattern, (c) explicit component passing via `createComponents()` return values. diff --git a/integ-test/build.gradle b/integ-test/build.gradle index b914cb0cbe0..1bf594bae07 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -270,11 +270,16 @@ def getGeoSpatialPlugin() { } } +def getAnalyticsEnginePlugin() { + provider { (RegularFile) (() -> file("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.zip")) } +} + testClusters { integTest { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) plugin(getGeoSpatialPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" setting "plugins.query.datasources.encryption.masterkey", "1234567812345678" } @@ -289,6 +294,7 @@ testClusters { testDistribution = 'archive' plugin(getJobSchedulerPlugin()) plugin(getGeoSpatialPlugin()) + plugin(getAnalyticsEnginePlugin()) plugin ":opensearch-sql-plugin" } integTestWithSecurity { diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/UnifiedQueryPipelineIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/UnifiedQueryPipelineIT.java new file mode 100644 index 00000000000..e3fb7d16455 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/sql/UnifiedQueryPipelineIT.java @@ -0,0 +1,221 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.sql; + +import static org.opensearch.sql.legacy.TestUtils.isIndexExist; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.util.MatcherUtils.schema; + +import java.io.IOException; +import org.hamcrest.Matcher; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** + * Integration tests for the unified query pipeline. Queries targeting indices with "parquet_" + * prefix are routed through UnifiedQueryPlanner → AnalyticsExecutionEngine (stub returning empty + * results with schema derived from RelNode). + */ +public class UnifiedQueryPipelineIT extends PPLIntegTestCase { + + private static final String TEST_INDEX = "parquet_test"; + + @Override + protected void init() throws Exception { + super.init(); + createParquetTestIndex(); + loadIndex(Index.BANK); + } + + private void createParquetTestIndex() throws IOException { + if (isIndexExist(client(), TEST_INDEX)) { + return; + } + String mapping = + """ + { + "mappings": { + "properties": { + "timestamp": {"type": "date"}, + "status": {"type": "integer"}, + "message": {"type": "keyword"}, + "department": {"type": "keyword"} + } + } + }\ + """; + Request request = new Request("PUT", "/" + TEST_INDEX); + request.setJsonEntity(mapping); + client().performRequest(request); + } + + // --- SQL tests --- + + @Test + public void testSqlSelectWithFilter() throws IOException { + withSQL( + """ + SELECT status, message FROM parquet_test WHERE status = 200\ + """) + .verifySchema(schema("status", "integer"), schema("message", "keyword")) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(status=[$2], message=[$1]) + LogicalFilter(condition=[=($2, 200)]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + @Test + public void testSqlAggregate() throws IOException { + withSQL( + """ + SELECT count(*) FROM parquet_test GROUP BY status\ + """) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(EXPR$0=[$1]) + LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) + LogicalProject(status=[$2]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + // --- PPL tests --- + + @Test + public void testPplWhereAndProject() throws IOException { + withPPL( + """ + source = parquet_test | where status = 200 | fields status, message\ + """) + .verifySchema(schema("status", "integer"), schema("message", "keyword")) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(status=[$2], message=[$1]) + LogicalFilter(condition=[=($2, 200)]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + @Test + public void testPplStats() throws IOException { + withPPL( + """ + source = parquet_test | stats count() by status\ + """) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(count()=[$1], status=[$0]) + LogicalAggregate(group=[{0}], count()=[COUNT()]) + LogicalProject(status=[$2]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + // --- Key Area 1: Datetime UDT --- + + @Test + public void testPplDatetimeFunction() throws IOException { + // hour() is a standard Calcite function — no UDT in the plan. + // Timestamp comparison uses TIMESTAMP('2024-01-01':VARCHAR) cast. + withPPL( + """ + source = parquet_test \ + | where timestamp > '2024-01-01' \ + | eval hour = hour(timestamp) \ + | fields hour, status\ + """) + .verifySchema(schema("hour", "integer"), schema("status", "integer")) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(hour=[HOUR($3)], status=[$2]) + LogicalFilter(condition=[>($3, TIMESTAMP('2024-01-01':VARCHAR))]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + // --- Full-text search function --- + + @Test + public void testPplSearchFunction() throws IOException { + // match() is a full-text search function — appears in the plan with MAP arguments. + // On a Parquet-backed index, the Analytics engine would need to handle or reject it. + withPPL( + """ + source = parquet_test | where match(message, 'error') | fields message\ + """) + .verifySchema(schema("message", "keyword")) + .verifyDataRows() + .verifyExplain( + """ + LogicalProject(message=[$1]) + LogicalFilter(condition=[match(MAP('field', $1), MAP('query', 'error':VARCHAR))]) + LogicalTableScan(table=[[opensearch, parquet_test]]) + """); + } + + // --- Regression test --- + + @Test + public void testNonParquetQueryUnaffected() { + JSONObject result = executeJdbcRequest("SELECT firstname FROM " + TEST_INDEX_BANK); + assertTrue("Non-parquet query should return results", result.getInt("total") > 0); + } + + // --- Fluent assertion helpers --- + + private QueryAssertion withSQL(String query) { + return new QueryAssertion(query, true); + } + + private QueryAssertion withPPL(String query) { + return new QueryAssertion(query, false); + } + + private class QueryAssertion { + private final String query; + private final boolean isSql; + private final JSONObject result; + + QueryAssertion(String query, boolean isSql) { + this.query = query; + this.isSql = isSql; + try { + this.result = isSql ? executeJdbcRequest(query) : executeQuery(query); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @SafeVarargs + final QueryAssertion verifySchema(Matcher... matchers) { + org.opensearch.sql.util.MatcherUtils.verifySchema(result, matchers); + return this; + } + + @SafeVarargs + final QueryAssertion verifyDataRows(Matcher... matchers) { + org.opensearch.sql.util.MatcherUtils.verifyDataRows(result, matchers); + return this; + } + + QueryAssertion verifyExplain(String expectedPlan) throws IOException { + String explain = isSql ? explainQuery(query).strip() : explainQueryToString(query).strip(); + String logical = new JSONObject(explain).getJSONObject("calcite").getString("logical"); + assertEquals(expectedPlan, logical); + return this; + } + } +} diff --git a/legacy/build.gradle b/legacy/build.gradle index 74653d9cb36..b183898b81e 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -116,6 +116,9 @@ dependencies { // add geo module as dependency. https://github.com/opensearch-project/OpenSearch/pull/4180/. implementation group: 'org.opensearch.plugin', name: 'geo', version: "${opensearch_version}" api project(':sql') + api project(':api') + compileOnly files("${rootDir}/libs/analytics-engine-3.6.0-SNAPSHOT.jar") + compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") api project(':common') api project(':opensearch') diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java index 9be2367dcaa..48d9631cd1a 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Injector; import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; @@ -83,10 +84,18 @@ public class RestSqlAction extends BaseRestHandler { /** New SQL query request handler. */ private final RestSQLQueryAction newSqlQueryHandler; + /** Unified query handler for Parquet-backed indices (Analytics engine path). */ + private final RestUnifiedQueryAction unifiedQueryHandler; + public RestSqlAction(Settings settings, Injector injector) { super(); this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.newSqlQueryHandler = new RestSQLQueryAction(injector); + this.unifiedQueryHandler = + new RestUnifiedQueryAction( + injector.getInstance(ClusterService.class), + injector.getInstance(org.opensearch.transport.client.node.NodeClient.class), + (plan, context) -> java.util.Collections.emptyList()); } @Override @@ -142,6 +151,15 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli request.path(), request.params(), sqlRequest.cursor()); + + // PoC: Route to unified query pipeline for Parquet-backed indices + if (RestUnifiedQueryAction.isUnifiedQueryPath(sqlRequest.getSql())) { + boolean isExplain = isExplainRequest(request); + return channel -> + unifiedQueryHandler.execute( + sqlRequest.getSql(), org.opensearch.sql.executor.QueryType.SQL, channel, isExplain); + } + return newSqlQueryHandler.prepareRequest( newSqlRequest, (restChannel, exception) -> { diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedQueryAction.java new file mode 100644 index 00000000000..89d4fdb0f92 --- /dev/null +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestUnifiedQueryAction.java @@ -0,0 +1,195 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.legacy.plugin; + +import static org.opensearch.core.rest.RestStatus.OK; +import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import static org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.analytics.exec.QueryPlanExecutor; +import org.opensearch.analytics.schema.OpenSearchSchemaBuilder; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.AnalyticsExecutionEngine; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.transport.client.node.NodeClient; + +/** + * REST handler for queries routed to the Analytics engine via the unified query pipeline. Parses + * SQL/PPL queries using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then + * hands off to {@link AnalyticsExecutionEngine} for execution. + */ +public class RestUnifiedQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); + private static final String SCHEMA_NAME = "opensearch"; + + private final AnalyticsExecutionEngine analyticsEngine; + private final ClusterService clusterService; + private final NodeClient client; + + public RestUnifiedQueryAction( + ClusterService clusterService, + NodeClient client, + QueryPlanExecutor> planExecutor) { + this.clusterService = clusterService; + this.client = client; + this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); + } + + /** + * PoC: Check if the query should be routed to the unified query pipeline. Currently uses a + * hardcoded prefix convention ("parquet_" in table name). In production, this would check index + * settings to determine the storage engine. + */ + public static boolean isUnifiedQueryPath(String query) { + return query != null && query.toLowerCase().contains("parquet_"); + } + + /** + * Execute a query through the unified query pipeline on the sql-worker thread pool. + * + * @param query the query string + * @param queryType SQL or PPL + * @param channel the REST channel for sending the response + * @param isExplain whether this is an explain request + */ + public void execute(String query, QueryType queryType, RestChannel channel, boolean isExplain) { + client + .threadPool() + .schedule( + () -> doExecute(query, queryType, channel, isExplain), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private void doExecute( + String query, QueryType queryType, RestChannel channel, boolean isExplain) { + try { + long startTime = System.nanoTime(); + + SchemaPlus schema = OpenSearchSchemaBuilder.buildSchema(clusterService.state()); + + try (UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, schema) + .defaultNamespace(SCHEMA_NAME) + .build()) { + + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + long planTime = System.nanoTime(); + LOG.info( + "[unified] Planning completed in {}ms for {} query", + (planTime - startTime) / 1_000_000, + queryType); + + CalcitePlanContext planContext = context.getPlanContext(); + + if (isExplain) { + analyticsEngine.explain( + plan, + org.opensearch.sql.ast.statement.ExplainMode.STANDARD, + planContext, + createExplainListener(channel)); + } else { + analyticsEngine.execute(plan, planContext, createQueryListener(channel, planTime)); + } + } + } catch (Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + } + + private ResponseListener createQueryListener( + RestChannel channel, long planEndTime) { + ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); + return new ResponseListener<>() { + @Override + public void onResponse(QueryResponse response) { + long execTime = System.nanoTime(); + LOG.info( + "[unified] Execution completed in {}ms, {} rows returned", + (execTime - planEndTime) / 1_000_000, + response.getResults().size()); + Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment(); + String result = + formatter.format( + new QueryResult(response.getSchema(), response.getResults(), response.getCursor())); + channel.sendResponse(new BytesRestResponse(OK, formatter.contentType(), result)); + } + + @Override + public void onFailure(Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + }; + } + + private ResponseListener createExplainListener(RestChannel channel) { + return new ResponseListener<>() { + @Override + public void onResponse(ExplainResponse response) { + Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment(); + var formatter = + new JsonResponseFormatter(PRETTY) { + @Override + protected Object buildJsonObject(ExplainResponse resp) { + return resp; + } + }; + channel.sendResponse( + new BytesRestResponse(OK, formatter.contentType(), formatter.format(response))); + } + + @Override + public void onFailure(Exception e) { + recordFailureMetric(e); + reportError(channel, e); + } + }; + } + + private static void recordFailureMetric(Exception e) { + LOG.error("[unified] Query execution failed", e); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } + + private static void reportError(RestChannel channel, Exception e) { + channel.sendResponse( + new BytesRestResponse( + org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR, + "application/json; charset=UTF-8", + "{\"error\":{\"type\":\"" + + e.getClass().getSimpleName() + + "\",\"reason\":\"" + + (e.getMessage() != null ? e.getMessage().replace("\"", "\\\"") : "Unknown error") + + "\"},\"status\":500}")); + } +} diff --git a/libs/analytics-engine-3.6.0-SNAPSHOT.jar b/libs/analytics-engine-3.6.0-SNAPSHOT.jar new file mode 100644 index 00000000000..52f3011068f Binary files /dev/null and b/libs/analytics-engine-3.6.0-SNAPSHOT.jar differ diff --git a/libs/analytics-engine-3.6.0-SNAPSHOT.zip b/libs/analytics-engine-3.6.0-SNAPSHOT.zip new file mode 100644 index 00000000000..d2a748b91ef Binary files /dev/null and b/libs/analytics-engine-3.6.0-SNAPSHOT.zip differ diff --git a/libs/analytics-framework-3.6.0-SNAPSHOT.jar b/libs/analytics-framework-3.6.0-SNAPSHOT.jar new file mode 100644 index 00000000000..4a8161c1e9d Binary files /dev/null and b/libs/analytics-framework-3.6.0-SNAPSHOT.jar differ diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..2684fa54ed2 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -55,11 +55,49 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' - extendedPlugins = ['opensearch-job-scheduler'] + extendedPlugins = ['opensearch-job-scheduler', 'analytics-engine'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } +// Exclude jars provided by analytics-engine plugin (shared via extendedPlugins classloader) +bundlePlugin { + exclude 'calcite-core-*.jar' + exclude 'calcite-linq4j-*.jar' + exclude 'avatica-core-*.jar' + exclude 'guava-*.jar' + exclude 'failureaccess-*.jar' + exclude 'slf4j-api-*.jar' + exclude 'commons-codec-*.jar' + exclude 'commons-compiler-*.jar' + exclude 'janino-*.jar' + exclude 'joou-java-6-*.jar' + exclude 'json-path-*.jar' + exclude 'json-smart-*.jar' + exclude 'accessors-smart-*.jar' + exclude 'asm-9*.jar' + exclude 'httpcore5-5*.jar' + exclude 'httpcore5-h2-*.jar' + exclude 'antlr4-runtime-4.13.2.jar' + exclude 'avatica-metrics-1.27.0.jar' + exclude 'commons-dbcp2-2.11.0.jar' + exclude 'commons-io-2.15.0.jar' + exclude 'commons-lang3-3.18.0.jar' + exclude 'commons-math3-3.6.1.jar' + exclude 'commons-pool2-2.12.0.jar' + exclude 'httpclient5-5.6.jar' + exclude 'httpcore5-reactive-5.4.jar' + exclude 'icu4j-72.1.jar' + exclude 'jackson-annotations-2.20.jar' + exclude 'jackson-databind-2.20.1.jar' + exclude 'memory-0.9.0.jar' + exclude 'proj4j-1.2.2.jar' + exclude 'sketches-core-0.9.0.jar' + exclude 'uzaygezen-core-0.2.jar' + exclude 'error_prone_annotations-*.jar' + exclude 'jts-io-common-*.jar' +} + publishing { publications { pluginZip(MavenPublication) { publication -> @@ -160,6 +198,8 @@ dependencies { api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}" api project(":ppl") + api project(':api') + compileOnly files("${rootDir}/libs/analytics-framework-3.6.0-SNAPSHOT.jar") api project(':legacy') api project(':opensearch') api project(':prometheus') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d817e13c69f..e85d716acf3 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -162,7 +162,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(), + new RestPPLQueryAction(clusterService, this.client), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index ffdd90504f7..4c44a4c8495 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchException; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.index.IndexNotFoundException; @@ -29,8 +30,10 @@ import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.legacy.plugin.RestUnifiedQueryAction; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; import org.opensearch.sql.plugin.request.PPLQueryRequestFactory; import org.opensearch.sql.plugin.transport.PPLQueryAction; @@ -45,8 +48,11 @@ public class RestPPLQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(); /** Constructor of RestPPLQueryAction. */ - public RestPPLQueryAction() { + public RestPPLQueryAction(ClusterService clusterService, NodeClient client) { super(); + this.unifiedQueryHandler = + new RestUnifiedQueryAction( + clusterService, client, (plan, context) -> java.util.Collections.emptyList()); } private static boolean isClientError(Exception e) { @@ -81,11 +87,21 @@ protected Set responseParams() { return responseParams; } + /** Unified query handler for Parquet-backed indices (Analytics engine path). */ + private final RestUnifiedQueryAction unifiedQueryHandler; + @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) { TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest(PPLQueryRequestFactory.getPPLRequest(request)); + // PoC: Route to unified query pipeline for Parquet-backed indices + String pplQuery = transportPPLQueryRequest.toPPLQueryRequest().getRequest(); + if (RestUnifiedQueryAction.isUnifiedQueryPath(pplQuery)) { + boolean isExplain = request.path().endsWith("/_explain"); + return channel -> unifiedQueryHandler.execute(pplQuery, QueryType.PPL, channel, isExplain); + } + return channel -> nodeClient.execute( PPLQueryAction.INSTANCE,