diff --git a/.github/workflows/analytics-engine-compat.yml b/.github/workflows/analytics-engine-compat.yml
new file mode 100644
index 00000000000..9c3bd9c9f99
--- /dev/null
+++ b/.github/workflows/analytics-engine-compat.yml
@@ -0,0 +1,44 @@
+name: Analytics Engine Compatibility
+
+on:
+ pull_request:
+ push:
+ branches-ignore:
+ - 'backport/**'
+ - 'dependabot/**'
+ paths:
+ - '**/*.java'
+ - '**gradle*'
+ - 'integ-test/**'
+ - '.github/workflows/analytics-engine-compat.yml'
+ merge_group:
+
+jobs:
+ Get-CI-Image-Tag:
+ uses: opensearch-project/opensearch-build/.github/workflows/get-ci-image-tag.yml@main
+ with:
+ product: opensearch
+
+ analytics-engine-compat:
+ needs: Get-CI-Image-Tag
+ runs-on: ubuntu-latest
+ container:
+ image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
+ options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}
+
+ steps:
+ - name: Run start commands
+ run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
+
+ - uses: actions/checkout@v4
+
+ - name: Set up JDK 25
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'temurin'
+ java-version: 25
+
+ - name: Run analytics-engine compatibility smoke test
+ run: |
+ chown -R 1000:1000 `pwd`
+ su `id -un 1000` -c "./gradlew :integ-test:analyticsEngineCompatIT"
diff --git a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java
index 4169963da54..4372d5e05ba 100644
--- a/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java
+++ b/api/src/main/java/org/opensearch/sql/api/UnifiedQueryContext.java
@@ -5,7 +5,9 @@
package org.opensearch.sql.api;
+import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT;
+import static org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT;
import static org.opensearch.sql.common.setting.Settings.Key.PPL_SUBSEARCH_MAXOUT;
import static org.opensearch.sql.common.setting.Settings.Key.QUERY_SIZE_LIMIT;
@@ -119,13 +121,31 @@ public static class Builder {
/**
* Setting values with defaults from SysLimit.DEFAULT. Only includes planning-required settings
* to avoid coupling with OpenSearchSettings.
+ *
+ *
{@link Settings.Key#CALCITE_ENGINE_ENABLED} defaults to {@code true} here because the
+ * unified query path is by definition Calcite-based — every query reaching this context flows
+ * through Calcite's planner, never the v2 engine. The PPL {@link
+ * org.opensearch.sql.api.parser.PPLQueryParser} reuses the v2 {@code AstBuilder}, which gates
+ * Calcite-only commands (e.g. {@code visitTableCommand}) on this setting; without the default,
+ * those commands fail at parse time even when the cluster setting is true.
+ *
+ *
{@link Settings.Key#PPL_REX_MAX_MATCH_LIMIT} defaults to {@code 10} here because {@code
+ * AstBuilder.visitRexCommand} reads it unconditionally and unboxes to {@code int} — a {@code
+ * null} return from {@code getSettingValue} NPEs the planner before any operator-level
+ * capability check runs. The value mirrors the cluster-side default of {@code 10} registered by
+ * {@code OpenSearchSettings.PPL_REX_MAX_MATCH_LIMIT_SETTING}. Cluster-side overrides reach this
+ * map via {@link #setting(String, Object)} — the REST handler reads the live value from {@code
+ * OpenSearchSettings} and routes it through that existing API, keeping {@link
+ * UnifiedQueryContext} decoupled from any specific {@link Settings} implementation.
*/
private final Map settings =
new HashMap(
Map.of(
QUERY_SIZE_LIMIT, SysLimit.DEFAULT.querySizeLimit(),
PPL_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.subsearchLimit(),
- PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit()));
+ PPL_JOIN_SUBSEARCH_MAXOUT, SysLimit.DEFAULT.joinSubsearchLimit(),
+ CALCITE_ENGINE_ENABLED, true,
+ PPL_REX_MAX_MATCH_LIMIT, 10));
/**
* Sets the query language frontend to be used.
diff --git a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java
index ad2eba0fea5..f0111d06363 100644
--- a/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java
+++ b/api/src/test/java/org/opensearch/sql/api/UnifiedQueryContextTest.java
@@ -33,6 +33,10 @@ public void testContextCreationWithDefaults() {
"Settings should have default system limits",
SysLimit.DEFAULT,
SysLimit.fromSettings(context.getSettings()));
+ assertEquals(
+ "PPL_REX_MAX_MATCH_LIMIT default should be 10",
+ Integer.valueOf(10),
+ context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}
@Test
@@ -43,10 +47,15 @@ public void testContextCreationWithCustomConfig() {
.catalog("opensearch", testSchema)
.cacheMetadata(true)
.setting("plugins.query.size_limit", 200)
+ .setting("plugins.ppl.rex.max_match.limit", 5)
.build();
Integer querySizeLimit = context.getSettings().getSettingValue(QUERY_SIZE_LIMIT);
assertEquals("Custom setting should be applied", Integer.valueOf(200), querySizeLimit);
+ assertEquals(
+ "Cluster-side override for PPL_REX_MAX_MATCH_LIMIT should reach the unified path",
+ Integer.valueOf(5),
+ context.getSettings().getSettingValue(PPL_REX_MAX_MATCH_LIMIT));
}
@Test(expected = IllegalArgumentException.class)
diff --git a/core/build.gradle b/core/build.gradle
index 23f9b37e317..f567fb85653 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -33,6 +33,7 @@ plugins {
}
repositories {
+ mavenLocal()
mavenCentral()
}
@@ -63,6 +64,12 @@ dependencies {
}
api 'org.apache.calcite:calcite-linq4j:1.41.0'
api project(':common')
+ compileOnly 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
+ // Needed because analytics-api's QueryPlanExecutor signature uses
+ // org.opensearch.core.action.ActionListener; AnalyticsExecutionEngine references that type.
+ compileOnly group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
+ testImplementation 'org.opensearch.sandbox:analytics-api:3.7.0-SNAPSHOT'
+ testImplementation group: 'org.opensearch', name: 'opensearch-core', version: "${opensearch_version}"
implementation "com.github.seancfoley:ipaddress:5.4.2"
implementation "com.jayway.jsonpath:json-path:2.9.0"
diff --git a/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java b/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java
index 9043f05929b..b52d64f4867 100644
--- a/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java
+++ b/core/src/main/java/org/opensearch/sql/ast/statement/ExplainMode.java
@@ -8,6 +8,7 @@
import java.util.Locale;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.calcite.sql.SqlExplainLevel;
@RequiredArgsConstructor
public enum ExplainMode {
@@ -26,4 +27,13 @@ public static ExplainMode of(String mode) {
return ExplainMode.STANDARD;
}
}
+
+ /** Convert to Calcite SqlExplainLevel for RelOptUtil.toString(). */
+ public SqlExplainLevel toExplainLevel() {
+ return switch (this) {
+ case SIMPLE -> SqlExplainLevel.NO_ATTRIBUTES;
+ case COST -> SqlExplainLevel.ALL_ATTRIBUTES;
+ default -> SqlExplainLevel.EXPPLAN_ATTRIBUTES;
+ };
+ }
}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
index 15bfece5f46..1251f51b131 100644
--- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
@@ -1538,10 +1538,25 @@ private Pair, List> aggregateWithTrimming(
* count(a.b)] returns true.
*/
private boolean containsNestedAggregator(RelBuilder relBuilder, List aggCallRefs) {
+ // For each aggregator argument, take the part of its column name before the first dot
+ // (e.g. "city" from "city.location.latitude") and check whether that's a top-level
+ // ARRAY column — the marker for an OpenSearch `nested` field.
+ //
+ // The classic path always exposes a top-level column for object/nested parents. The
+ // analytics-engine path emits only the flat leaves ("city.name", "city.location.latitude")
+ // because parent placeholder types (MAP) can't round-trip through Substrait.
+ // RelDataType.getField returns null when the column doesn't exist — for analytics-engine,
+ // that null just means "not nested," which is the right answer.
+ RelDataType rowType = relBuilder.peek().getRowType();
return aggCallRefs.stream()
- .map(r -> relBuilder.peek().getRowType().getFieldNames().get(r.getIndex()))
+ .map(r -> rowType.getFieldNames().get(r.getIndex()))
.map(name -> org.apache.commons.lang3.StringUtils.substringBefore(name, "."))
- .anyMatch(root -> relBuilder.field(root).getType().getSqlTypeName() == SqlTypeName.ARRAY);
+ .anyMatch(
+ root -> {
+ RelDataTypeField field =
+ rowType.getField(root, /* caseSensitive= */ true, /* elideRecord= */ false);
+ return field != null && field.getType().getSqlTypeName() == SqlTypeName.ARRAY;
+ });
}
/**
diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java
new file mode 100644
index 00000000000..b2367f653c3
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/calcite/utils/CalciteClassLoaderHelper.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Helper for setting the thread context classloader before Calcite operations. This is needed for
+ * patched Calcite (CALCITE-3745): when analytics-engine is the parent classloader, Janino uses the
+ * parent's classloader which can't see SQL plugin classes. The patched Calcite checks {@code
+ * Thread.currentThread().getContextClassLoader()} first. This helper sets it to the SQL plugin's
+ * classloader (child) which can see both parent and child classes.
+ *
+ * @see CALCITE-3745
+ * @see sql#5306
+ */
+public final class CalciteClassLoaderHelper {
+
+ private CalciteClassLoaderHelper() {}
+
+ /**
+ * Run an action with the thread context classloader set to the caller's classloader.
+ *
+ * @param action the action to run
+ * @param callerClass the class whose classloader should be used (pass {@code MyClass.class})
+ * @param the return type
+ * @return the result of the action
+ */
+ public static T withCalciteClassLoader(Callable action, Class> callerClass) {
+ Thread currentThread = Thread.currentThread();
+ ClassLoader originalCl = currentThread.getContextClassLoader();
+ currentThread.setContextClassLoader(callerClass.getClassLoader());
+ try {
+ return action.call();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ currentThread.setContextClassLoader(originalCl);
+ }
+ }
+
+ /**
+ * Run a void action with the thread context classloader set to the caller's classloader.
+ *
+ * @see #withCalciteClassLoader(Callable, Class)
+ */
+ public static void withCalciteClassLoader(Runnable action, Class> callerClass) {
+ withCalciteClassLoader(
+ () -> {
+ action.run();
+ return null;
+ },
+ callerClass);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/executor/QueryService.java b/core/src/main/java/org/opensearch/sql/executor/QueryService.java
index b294a168cd8..fe9d3e55dc1 100644
--- a/core/src/main/java/org/opensearch/sql/executor/QueryService.java
+++ b/core/src/main/java/org/opensearch/sql/executor/QueryService.java
@@ -35,6 +35,7 @@
import org.opensearch.sql.calcite.SysLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
+import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper;
import org.opensearch.sql.common.error.ErrorReport;
import org.opensearch.sql.common.error.QueryProcessingStage;
import org.opensearch.sql.common.error.StageErrorHandler;
@@ -142,33 +143,37 @@ public void executeWithCalcite(
QueryProfiling.activate(QueryContext.isProfileEnabled());
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
long analyzeStart = System.nanoTime();
- CalcitePlanContext context =
- CalcitePlanContext.create(
- buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
+ CalciteClassLoaderHelper.withCalciteClassLoader(
+ () -> {
+ CalcitePlanContext context =
+ CalcitePlanContext.create(
+ buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
- context.setHighlightConfig(highlightConfig);
+ context.setHighlightConfig(highlightConfig);
- // Wrap analyze with ANALYZING stage tracking
- RelNode relNode =
- StageErrorHandler.executeStage(
- QueryProcessingStage.ANALYZING,
- () -> analyze(plan, context),
- "while preparing and validating the query plan");
+ // Wrap analyze with ANALYZING stage tracking
+ RelNode relNode =
+ StageErrorHandler.executeStage(
+ QueryProcessingStage.ANALYZING,
+ () -> analyze(plan, context),
+ "while preparing and validating the query plan");
- // Wrap plan conversion with PLAN_CONVERSION stage tracking
- RelNode calcitePlan =
- StageErrorHandler.executeStage(
- QueryProcessingStage.PLAN_CONVERSION,
- () -> convertToCalcitePlan(relNode, context),
- "while converting the query to an executable plan");
+ // Wrap plan conversion with PLAN_CONVERSION stage tracking
+ RelNode calcitePlan =
+ StageErrorHandler.executeStage(
+ QueryProcessingStage.PLAN_CONVERSION,
+ () -> convertToCalcitePlan(relNode, context),
+ "while converting the query to an executable plan");
- analyzeMetric.set(System.nanoTime() - analyzeStart);
+ analyzeMetric.set(System.nanoTime() - analyzeStart);
- // Wrap execution with EXECUTING stage tracking
- StageErrorHandler.executeStageVoid(
- QueryProcessingStage.EXECUTING,
- () -> executionEngine.execute(calcitePlan, context, listener),
- "while running the query");
+ // Wrap execution with EXECUTING stage tracking
+ StageErrorHandler.executeStageVoid(
+ QueryProcessingStage.EXECUTING,
+ () -> executionEngine.execute(calcitePlan, context, listener),
+ "while running the query");
+ },
+ QueryService.class);
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
log.warn("Fallback to V2 query engine since got exception", t);
@@ -191,17 +196,21 @@ public void explainWithCalcite(
() -> {
try {
QueryProfiling.noop();
- CalcitePlanContext context =
- CalcitePlanContext.create(
- buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
- context.setHighlightConfig(highlightConfig);
- context.run(
+ CalciteClassLoaderHelper.withCalciteClassLoader(
() -> {
- RelNode relNode = analyze(plan, context);
- RelNode calcitePlan = convertToCalcitePlan(relNode, context);
- executionEngine.explain(calcitePlan, mode, context, listener);
+ CalcitePlanContext context =
+ CalcitePlanContext.create(
+ buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
+ context.setHighlightConfig(highlightConfig);
+ context.run(
+ () -> {
+ RelNode relNode = analyze(plan, context);
+ RelNode calcitePlan = convertToCalcitePlan(relNode, context);
+ executionEngine.explain(calcitePlan, mode, context, listener);
+ },
+ settings);
},
- settings);
+ QueryService.class);
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t)) {
log.warn("Fallback to V2 query engine since got exception", t);
diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java
new file mode 100644
index 00000000000..ddfe5fd3556
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.executor.analytics;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.sql.ast.statement.ExplainMode;
+import org.opensearch.sql.calcite.CalcitePlanContext;
+import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
+import org.opensearch.sql.common.response.ResponseListener;
+import org.opensearch.sql.data.model.ExprTupleValue;
+import org.opensearch.sql.data.model.ExprValue;
+import org.opensearch.sql.data.model.ExprValueUtils;
+import org.opensearch.sql.data.type.ExprType;
+import org.opensearch.sql.executor.ExecutionContext;
+import org.opensearch.sql.executor.ExecutionEngine;
+import org.opensearch.sql.executor.pagination.Cursor;
+import org.opensearch.sql.monitor.profile.MetricName;
+import org.opensearch.sql.monitor.profile.ProfileMetric;
+import org.opensearch.sql.monitor.profile.QueryProfiling;
+import org.opensearch.sql.planner.physical.PhysicalPlan;
+
+/**
+ * Execution engine adapter for the analytics engine (Project Mustang).
+ *
+ * Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link
+ * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the
+ * analytics engine, and converts the raw results into {@link QueryResponse}.
+ */
+public class AnalyticsExecutionEngine implements ExecutionEngine {
+
+ private final QueryPlanExecutor> planExecutor;
+
+ public AnalyticsExecutionEngine(QueryPlanExecutor> planExecutor) {
+ this.planExecutor = planExecutor;
+ }
+
+ /** Not supported. Analytics queries use the RelNode path exclusively. */
+ @Override
+ public void execute(PhysicalPlan plan, ResponseListener listener) {
+ listener.onFailure(
+ new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
+ }
+
+ /** Not supported. Analytics queries use the RelNode path exclusively. */
+ @Override
+ public void execute(
+ PhysicalPlan plan, ExecutionContext context, ResponseListener listener) {
+ listener.onFailure(
+ new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
+ }
+
+ /** Not supported. Analytics queries use the RelNode path exclusively. */
+ @Override
+ public void explain(PhysicalPlan plan, ResponseListener listener) {
+ listener.onFailure(
+ new UnsupportedOperationException("Analytics engine only supports RelNode execution"));
+ }
+
+ @Override
+ public void execute(
+ RelNode plan, CalcitePlanContext context, ResponseListener listener) {
+ // QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched
+ // to a worker pool and results arrive on the listener. Record the execute metric in the
+ // listener callback, before delegating to the user-supplied listener, so the metric snapshot
+ // taken by SimpleJsonResponseFormatter sees the correct value.
+ ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
+ long execStart = System.nanoTime();
+
+ planExecutor.execute(
+ plan,
+ null,
+ new ActionListener<>() {
+ @Override
+ public void onResponse(Iterable rows) {
+ try {
+ List fields = plan.getRowType().getFieldList();
+ List results = convertRows(rows, fields);
+ Schema schema = buildSchema(fields);
+ execMetric.set(System.nanoTime() - execStart);
+ listener.onResponse(new QueryResponse(schema, results, Cursor.None));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ }
+ });
+ }
+
+ @Override
+ public void explain(
+ RelNode plan,
+ ExplainMode mode,
+ CalcitePlanContext context,
+ ResponseListener listener) {
+ try {
+ String logical = RelOptUtil.toString(plan, mode.toExplainLevel());
+ ExplainResponse response =
+ new ExplainResponse(new ExplainResponseNodeV2(logical, null, null));
+ listener.onResponse(ExplainResponse.normalizeLf(response));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }
+
+ private List convertRows(Iterable rows, List fields) {
+ List results = new ArrayList<>();
+ for (Object[] row : rows) {
+ Map valueMap = new LinkedHashMap<>();
+ for (int i = 0; i < fields.size(); i++) {
+ String columnName = fields.get(i).getName();
+ Object value = (i < row.length) ? row[i] : null;
+ valueMap.put(columnName, ExprValueUtils.fromObjectValue(value));
+ }
+ results.add(ExprTupleValue.fromExprValueMap(valueMap));
+ }
+ return results;
+ }
+
+ private Schema buildSchema(List fields) {
+ List columns = new ArrayList<>();
+ for (RelDataTypeField field : fields) {
+ ExprType exprType = convertType(field.getType());
+ columns.add(new Schema.Column(field.getName(), null, exprType));
+ }
+ return new Schema(columns);
+ }
+
+ private ExprType convertType(RelDataType type) {
+ try {
+ return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
+ } catch (IllegalArgumentException e) {
+ return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN;
+ }
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java
index 9a77a0d5a7c..318f32a41be 100644
--- a/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java
+++ b/core/src/main/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImpl.java
@@ -50,6 +50,10 @@ public SqlReturnTypeInference getReturnTypeInference() {
RelDataType originalType =
SqlLibraryOperators.ARRAY.getReturnTypeInference().inferReturnType(sqlOperatorBinding);
RelDataType innerType = originalType.getComponentType();
+ // Default empty/unknown element type to VARCHAR — see PR description for why.
+ if (innerType == null || isUnknownLikeType(innerType.getSqlTypeName())) {
+ innerType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ }
return createArrayType(
typeFactory, typeFactory.createTypeWithNullability(innerType, true), true);
} catch (Exception e) {
@@ -63,6 +67,17 @@ public UDFOperandMetadata getOperandMetadata() {
return null;
}
+ /**
+ * Calcite's {@link SqlLibraryOperators#ARRAY} infers a {@code NULL}-element array for an empty
+ * call list and an {@code UNKNOWN}-element array when type inference can't pick one (e.g. all
+ * operands are typeless nulls). Either of those bubbles up to the analytics-engine route's
+ * substrait converter as "Unable to convert the type UNKNOWN" — substrait has no encoding for
+ * either marker. Treat both as needing a concrete fallback.
+ */
+ private static boolean isUnknownLikeType(SqlTypeName sqlTypeName) {
+ return sqlTypeName == SqlTypeName.NULL || sqlTypeName == SqlTypeName.UNKNOWN;
+ }
+
public static class ArrayImplementor implements NotNullImplementor {
@Override
public Expression implement(
diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java
new file mode 100644
index 00000000000..4de596fb375
--- /dev/null
+++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java
@@ -0,0 +1,396 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.executor.analytics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.sql.calcite.CalcitePlanContext;
+import org.opensearch.sql.calcite.SysLimit;
+import org.opensearch.sql.common.response.ResponseListener;
+import org.opensearch.sql.data.type.ExprCoreType;
+import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
+import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
+import org.opensearch.sql.planner.physical.PhysicalPlan;
+
+class AnalyticsExecutionEngineTest {
+
+ private AnalyticsExecutionEngine engine;
+
+ @SuppressWarnings("unchecked")
+ private QueryPlanExecutor> mockExecutor;
+
+ private CalcitePlanContext mockContext;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ mockExecutor = (QueryPlanExecutor>) mock(QueryPlanExecutor.class);
+ engine = new AnalyticsExecutionEngine(mockExecutor);
+ mockContext = mock(CalcitePlanContext.class);
+ setSysLimit(mockContext, SysLimit.DEFAULT);
+ }
+
+ /** Sets the public final sysLimit field on a mocked CalcitePlanContext. */
+ private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) throws Exception {
+ Field field = CalcitePlanContext.class.getDeclaredField("sysLimit");
+ field.setAccessible(true);
+ field.set(context, sysLimit);
+ }
+
+ /** QueryPlanExecutor became async in analytics-framework 3.7 — stub the listener callback. */
+ @SuppressWarnings("unchecked")
+ private void stubExecutorWith(RelNode relNode, Iterable rows) {
+ doAnswer(
+ inv -> {
+ ((ActionListener>) inv.getArgument(2)).onResponse(rows);
+ return null;
+ })
+ .when(mockExecutor)
+ .execute(eq(relNode), any(), any(ActionListener.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ private void stubExecutorWithError(RelNode relNode, Exception error) {
+ doAnswer(
+ inv -> {
+ ((ActionListener>) inv.getArgument(2)).onFailure(error);
+ return null;
+ })
+ .when(mockExecutor)
+ .execute(eq(relNode), any(), any(ActionListener.class));
+ }
+
+ @Test
+ void executeRelNode_basicTypesAndRows() {
+ RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
+ Iterable rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25});
+ stubExecutorWith(relNode, rows);
+
+ QueryResponse response = executeAndCapture(relNode);
+ String dump = dumpResponse(response);
+
+ // Schema: 2 columns [name:STRING, age:INTEGER]
+ assertEquals(2, response.getSchema().getColumns().size(), "Column count. " + dump);
+ assertEquals("name", response.getSchema().getColumns().get(0).getName(), dump);
+ assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump);
+ assertEquals("age", response.getSchema().getColumns().get(1).getName(), dump);
+ assertEquals(
+ ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump);
+
+ // Rows: [{name=Alice, age=30}, {name=Bob, age=25}]
+ assertEquals(2, response.getResults().size(), "Row count. " + dump);
+ assertEquals(
+ "Alice", response.getResults().get(0).tupleValue().get("name").value(), "Row 0. " + dump);
+ assertEquals(
+ 30, response.getResults().get(0).tupleValue().get("age").value(), "Row 0. " + dump);
+ assertEquals(
+ "Bob", response.getResults().get(1).tupleValue().get("name").value(), "Row 1. " + dump);
+ assertEquals(
+ 25, response.getResults().get(1).tupleValue().get("age").value(), "Row 1. " + dump);
+
+ // Cursor: None
+ assertEquals(org.opensearch.sql.executor.pagination.Cursor.None, response.getCursor(), dump);
+ }
+
+ @Test
+ void executeRelNode_numericTypes() {
+ RelNode relNode =
+ mockRelNode(
+ "b", SqlTypeName.TINYINT,
+ "s", SqlTypeName.SMALLINT,
+ "i", SqlTypeName.INTEGER,
+ "l", SqlTypeName.BIGINT,
+ "f", SqlTypeName.FLOAT,
+ "d", SqlTypeName.DOUBLE);
+ Iterable rows =
+ Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0});
+ stubExecutorWith(relNode, rows);
+
+ QueryResponse response = executeAndCapture(relNode);
+ String dump = dumpResponse(response);
+
+ assertEquals(ExprCoreType.BYTE, response.getSchema().getColumns().get(0).getExprType(), dump);
+ assertEquals(ExprCoreType.SHORT, response.getSchema().getColumns().get(1).getExprType(), dump);
+ assertEquals(
+ ExprCoreType.INTEGER, response.getSchema().getColumns().get(2).getExprType(), dump);
+ assertEquals(ExprCoreType.LONG, response.getSchema().getColumns().get(3).getExprType(), dump);
+ assertEquals(ExprCoreType.FLOAT, response.getSchema().getColumns().get(4).getExprType(), dump);
+ assertEquals(ExprCoreType.DOUBLE, response.getSchema().getColumns().get(5).getExprType(), dump);
+
+ // Verify actual values
+ assertEquals(
+ (byte) 1,
+ response.getResults().get(0).tupleValue().get("b").value(),
+ "byte value. " + dump);
+ assertEquals(
+ (short) 2,
+ response.getResults().get(0).tupleValue().get("s").value(),
+ "short value. " + dump);
+ assertEquals(
+ 3, response.getResults().get(0).tupleValue().get("i").value(), "int value. " + dump);
+ assertEquals(
+ 4L, response.getResults().get(0).tupleValue().get("l").value(), "long value. " + dump);
+ assertEquals(
+ 5.0f, response.getResults().get(0).tupleValue().get("f").value(), "float value. " + dump);
+ assertEquals(
+ 6.0, response.getResults().get(0).tupleValue().get("d").value(), "double value. " + dump);
+ }
+
+ @Test
+ void executeRelNode_temporalTypes() {
+ RelNode relNode =
+ mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP);
+ Iterable emptyRows = Collections.emptyList();
+ stubExecutorWith(relNode, emptyRows);
+
+ QueryResponse response = executeAndCapture(relNode);
+ String dump = dumpResponse(response);
+
+ assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(0).getExprType(), dump);
+ assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(1).getExprType(), dump);
+ assertEquals(
+ ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(2).getExprType(), dump);
+ assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump);
+ }
+
+ // Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches
+ // AnalyticsExecutionEngine. The engine trusts the executor to honor the limit.
+
+ @Test
+ void executeRelNode_emptyResults() {
+ RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR);
+ Iterable emptyRows = Collections.emptyList();
+ stubExecutorWith(relNode, emptyRows);
+
+ QueryResponse response = executeAndCapture(relNode);
+ String dump = dumpResponse(response);
+
+ assertEquals(1, response.getSchema().getColumns().size(), "Schema column count. " + dump);
+ assertEquals(0, response.getResults().size(), "Row count should be 0. " + dump);
+ }
+
+ @Test
+ void executeRelNode_nullValues() {
+ RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
+ Iterable rows = Collections.singletonList(new Object[] {null, null});
+ stubExecutorWith(relNode, rows);
+
+ QueryResponse response = executeAndCapture(relNode);
+ String dump = dumpResponse(response);
+
+ assertEquals(1, response.getResults().size(), "Row count. " + dump);
+ assertTrue(
+ response.getResults().get(0).tupleValue().get("name").isNull(),
+ "name should be null. " + dump);
+ assertTrue(
+ response.getResults().get(0).tupleValue().get("age").isNull(),
+ "age should be null. " + dump);
+ }
+
+ @Test
+ void executeRelNode_errorPropagation() {
+ RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER);
+ stubExecutorWithError(relNode, new RuntimeException("Engine failure"));
+
+ Exception error = executeAndCaptureError(relNode);
+ System.out.println(dumpError("executeRelNode_errorPropagation", error));
+
+ assertEquals(
+ "Engine failure",
+ error.getMessage(),
+ "Exception type: " + error.getClass().getSimpleName() + ", message: " + error.getMessage());
+ }
+
+ @Test
+ void physicalPlanExecute_callsOnFailure() {
+ PhysicalPlan physicalPlan = mock(PhysicalPlan.class);
+ AtomicReference errorRef = new AtomicReference<>();
+ engine.execute(physicalPlan, failureListener(errorRef));
+
+ assertNotNull(errorRef.get(), "onFailure should have been called");
+ System.out.println(dumpError("physicalPlanExecute_callsOnFailure", errorRef.get()));
+ assertTrue(
+ errorRef.get() instanceof UnsupportedOperationException,
+ "Expected UnsupportedOperationException, got: "
+ + errorRef.get().getClass().getSimpleName()
+ + " - "
+ + errorRef.get().getMessage());
+ }
+
+ @Test
+ void physicalPlanExecuteWithContext_callsOnFailure() {
+ PhysicalPlan physicalPlan = mock(PhysicalPlan.class);
+ AtomicReference errorRef = new AtomicReference<>();
+ engine.execute(
+ physicalPlan,
+ org.opensearch.sql.executor.ExecutionContext.emptyExecutionContext(),
+ failureListener(errorRef));
+
+ assertNotNull(errorRef.get(), "onFailure should have been called");
+ System.out.println(dumpError("physicalPlanExecuteWithContext_callsOnFailure", errorRef.get()));
+ assertTrue(
+ errorRef.get() instanceof UnsupportedOperationException,
+ "Expected UnsupportedOperationException, got: "
+ + errorRef.get().getClass().getSimpleName()
+ + " - "
+ + errorRef.get().getMessage());
+ }
+
+ @Test
+ void physicalPlanExplain_callsOnFailure() {
+ PhysicalPlan physicalPlan = mock(PhysicalPlan.class);
+ AtomicReference errorRef = new AtomicReference<>();
+ engine.explain(physicalPlan, explainFailureListener(errorRef));
+
+ assertNotNull(errorRef.get(), "onFailure should have been called");
+ System.out.println(dumpError("physicalPlanExplain_callsOnFailure", errorRef.get()));
+ assertTrue(
+ errorRef.get() instanceof UnsupportedOperationException,
+ "Expected UnsupportedOperationException, got: "
+ + errorRef.get().getClass().getSimpleName()
+ + " - "
+ + errorRef.get().getMessage());
+ }
+
+ // --- helpers ---
+
+ private QueryResponse executeAndCapture(RelNode relNode) {
+ AtomicReference ref = new AtomicReference<>();
+ engine.execute(relNode, mockContext, captureListener(ref));
+ assertNotNull(ref.get(), "QueryResponse should not be null");
+ // Always print the full response so test output shows exact results
+ System.out.println(dumpResponse(ref.get()));
+ return ref.get();
+ }
+
+ private Exception executeAndCaptureError(RelNode relNode) {
+ AtomicReference ref = new AtomicReference<>();
+ engine.execute(
+ relNode,
+ mockContext,
+ new ResponseListener() {
+ @Override
+ public void onResponse(QueryResponse response) {}
+
+ @Override
+ public void onFailure(Exception e) {
+ ref.set(e);
+ }
+ });
+ assertNotNull(ref.get(), "onFailure should have been called");
+ return ref.get();
+ }
+
+ private ResponseListener failureListener(AtomicReference ref) {
+ return new ResponseListener() {
+ @Override
+ public void onResponse(QueryResponse response) {}
+
+ @Override
+ public void onFailure(Exception e) {
+ ref.set(e);
+ }
+ };
+ }
+
+ private ResponseListener explainFailureListener(AtomicReference ref) {
+ return new ResponseListener() {
+ @Override
+ public void onResponse(ExplainResponse response) {}
+
+ @Override
+ public void onFailure(Exception e) {
+ ref.set(e);
+ }
+ };
+ }
+
+ private String dumpError(String testName, Exception e) {
+ return "\n--- "
+ + testName
+ + " ---\n"
+ + "Exception: "
+ + e.getClass().getSimpleName()
+ + "\n"
+ + "Message: "
+ + e.getMessage()
+ + "\n--- End ---";
+ }
+
+ /** Dumps the full QueryResponse into a readable string for test output and assertion messages. */
+ private String dumpResponse(QueryResponse response) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n--- QueryResponse ---\n");
+
+ sb.append("Schema: [");
+ sb.append(
+ response.getSchema().getColumns().stream()
+ .map(c -> c.getName() + ":" + c.getExprType().typeName())
+ .collect(Collectors.joining(", ")));
+ sb.append("]\n");
+
+ sb.append("Rows (").append(response.getResults().size()).append("):\n");
+ for (int i = 0; i < response.getResults().size(); i++) {
+ sb.append(" [").append(i).append("] ");
+ sb.append(response.getResults().get(i).tupleValue());
+ sb.append("\n");
+ }
+
+ sb.append("Cursor: ").append(response.getCursor()).append("\n");
+ sb.append("--- End ---");
+ return sb.toString();
+ }
+
+ private RelNode mockRelNode(Object... nameTypePairs) {
+ SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ RelDataTypeFactory.Builder builder = typeFactory.builder();
+ for (int i = 0; i < nameTypePairs.length; i += 2) {
+ String name = (String) nameTypePairs[i];
+ SqlTypeName typeName = (SqlTypeName) nameTypePairs[i + 1];
+ builder.add(name, typeName);
+ }
+ RelDataType rowType = builder.build();
+
+ RelNode relNode = mock(RelNode.class);
+ when(relNode.getRowType()).thenReturn(rowType);
+ return relNode;
+ }
+
+ private ResponseListener captureListener(AtomicReference ref) {
+ return new ResponseListener() {
+ @Override
+ public void onResponse(QueryResponse response) {
+ ref.set(response);
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ throw new AssertionError("Unexpected failure", e);
+ }
+ };
+ }
+}
diff --git a/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java b/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java
index 6dbc1901fa7..600a802615a 100644
--- a/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java
+++ b/core/src/test/java/org/opensearch/sql/expression/function/CollectionUDF/ArrayFunctionImplTest.java
@@ -14,6 +14,12 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.sql.ExplicitOperatorBinding;
+import org.apache.calcite.sql.fun.SqlLibraryOperators;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.Test;
@@ -302,4 +308,79 @@ public void testArrayWithCharTypePreservesNulls() {
assertNull(list.get(1), "Null should be preserved during CHAR type conversion");
assertEquals("y", list.get(2));
}
+
+ // ==================== RETURN-TYPE INFERENCE TESTS ====================
+ // These tests cover the return-type fallback the analytics-engine route depends on:
+ // when Calcite can't infer a concrete element type (no operands, or all-null operands),
+ // we substitute VARCHAR so the call's return type is substrait-serializable. Without the
+ // fallback Calcite emits ARRAY / ARRAY, which fails substrait conversion
+ // with "Unable to convert the type UNKNOWN" downstream.
+
+ /** array() — empty operand list — returns ARRAY. */
+ @Test
+ public void testReturnTypeForEmptyCallIsVarcharArray() {
+ RelDataType returnType = inferReturnType();
+ assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName());
+ RelDataType element = returnType.getComponentType();
+ assertNotNull(element);
+ assertEquals(SqlTypeName.VARCHAR, element.getSqlTypeName());
+ assertTrue(element.isNullable(), "Element type should be nullable per existing semantics");
+ }
+
+ /** array(NULL) — single typeless-null operand — also falls back to ARRAY. */
+ @Test
+ public void testReturnTypeForAllNullOperandsIsVarcharArray() {
+ RelDataTypeFactory typeFactory = newTypeFactory();
+ RelDataType nullType = typeFactory.createSqlType(SqlTypeName.NULL);
+ RelDataType returnType = inferReturnType(nullType);
+ assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName());
+ RelDataType element = returnType.getComponentType();
+ assertNotNull(element);
+ assertEquals(SqlTypeName.VARCHAR, element.getSqlTypeName());
+ }
+
+ /** array(1) — INTEGER operand — preserves the inferred element type (no fallback). */
+ @Test
+ public void testReturnTypeForIntegerOperandPreservesType() {
+ RelDataTypeFactory typeFactory = newTypeFactory();
+ RelDataType intType = typeFactory.createSqlType(SqlTypeName.INTEGER);
+ RelDataType returnType = inferReturnType(intType);
+ assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName());
+ RelDataType element = returnType.getComponentType();
+ assertNotNull(element);
+ assertEquals(
+ SqlTypeName.INTEGER,
+ element.getSqlTypeName(),
+ "Concrete element types must not be affected by the VARCHAR fallback");
+ }
+
+ /** array('a', 'b') — VARCHAR operands — already VARCHAR, fallback path doesn't fire. */
+ @Test
+ public void testReturnTypeForVarcharOperandPreservesType() {
+ RelDataTypeFactory typeFactory = newTypeFactory();
+ RelDataType varcharType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ RelDataType returnType = inferReturnType(varcharType, varcharType);
+ assertEquals(SqlTypeName.ARRAY, returnType.getSqlTypeName());
+ assertEquals(SqlTypeName.VARCHAR, returnType.getComponentType().getSqlTypeName());
+ }
+
+ /**
+ * Helper — invokes {@code new ArrayFunctionImpl().getReturnTypeInference().inferReturnType(...)}
+ * via Calcite's {@link ExplicitOperatorBinding}, which is the public test harness for exercising
+ * a return-type inference against a specific operand-type list. We bind it to {@link
+ * SqlLibraryOperators#ARRAY} so the inference's internal call to {@code
+ * SqlLibraryOperators.ARRAY.getReturnTypeInference().inferReturnType(...)} resolves the same
+ * operator the lambda delegates to.
+ */
+ private static RelDataType inferReturnType(RelDataType... operandTypes) {
+ RelDataTypeFactory typeFactory = newTypeFactory();
+ ExplicitOperatorBinding binding =
+ new ExplicitOperatorBinding(
+ typeFactory, SqlLibraryOperators.ARRAY, Arrays.asList(operandTypes));
+ return new ArrayFunctionImpl().getReturnTypeInference().inferReturnType(binding);
+ }
+
+ private static RelDataTypeFactory newTypeFactory() {
+ return new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+ }
}
diff --git a/integ-test/build.gradle b/integ-test/build.gradle
index 4bd9a9847ca..06e6478f5fd 100644
--- a/integ-test/build.gradle
+++ b/integ-test/build.gradle
@@ -270,6 +270,36 @@ def getGeoSpatialPlugin() {
}
}
+// fetch from the feature-build artifact for now (linux/x64 only; for local dev pass -PanalyticsEngineZip=/path instead).
+ext.pluginVersion = opensearch_version.tokenize('-')[0]
+ext.featureBuildBase = "https://ci.opensearch.org/ci/dbc/feature-build-opensearch/feature-datafusion/latest/linux/x64/tar/builds/opensearch/plugins"
+ext.analyticsEngineZipDest = "${buildDir}/distributions/analytics-engine-${pluginVersion}-SNAPSHOT.zip"
+ext.arrowFlightRpcZipDest = "${buildDir}/distributions/arrow-flight-rpc-${pluginVersion}-SNAPSHOT.zip"
+
+task downloadAnalyticsEngineZip(type: Download) {
+ src "${featureBuildBase}/1-analytics-engine-${pluginVersion}.zip"
+ dest analyticsEngineZipDest
+ overwrite false
+ onlyIfModified true
+ onlyIf { !project.findProperty('analyticsEngineZip') }
+}
+
+task downloadArrowFlightRpcZip(type: Download) {
+ src "${featureBuildBase}/0-arrow-flight-rpc-${pluginVersion}.zip"
+ dest arrowFlightRpcZipDest
+ overwrite false
+ onlyIfModified true
+ onlyIf { !project.findProperty('arrowFlightRpcZip') }
+}
+
+def getAnalyticsEnginePlugin() {
+ provider { (RegularFile) (() -> file(project.findProperty('analyticsEngineZip') ?: analyticsEngineZipDest)) }
+}
+
+def getArrowFlightRpcPlugin() {
+ provider { (RegularFile) (() -> file(project.findProperty('arrowFlightRpcZip') ?: arrowFlightRpcZipDest)) }
+}
+
testClusters {
integTest {
testDistribution = 'archive'
@@ -301,6 +331,14 @@ testClusters {
plugin(getJobSchedulerPlugin())
plugin ":opensearch-sql-plugin"
}
+ // Smoke test: verify sql loads cleanly alongside analytics-engine.
+ analyticsEngineCompat {
+ testDistribution = 'archive'
+ plugin(getJobSchedulerPlugin())
+ plugin(getArrowFlightRpcPlugin())
+ plugin(getAnalyticsEnginePlugin())
+ plugin ":opensearch-sql-plugin"
+ }
}
def isPrometheusRunning() {
@@ -351,9 +389,19 @@ task stopPrometheus(type: KillProcessTask) {
stopPrometheus.mustRunAfter startPrometheus
+task analyticsEngineCompatIT(type: RestIntegTestTask) {
+ useCluster testClusters.analyticsEngineCompat
+ dependsOn downloadAnalyticsEngineZip, downloadArrowFlightRpcZip
+ systemProperty 'tests.security.manager', 'false'
+ filter {
+ includeTestsMatching 'org.opensearch.sql.plugin.AnalyticsEngineCompatIT'
+ }
+}
+
task integJdbcTest(type: RestIntegTestTask) {
- testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().
+ testClusters.findAll {c -> c.clusterName == "integJdbcTest"}.first().with {
plugin ":opensearch-sql-plugin"
+ }
useJUnitPlatform()
dependsOn ':opensearch-sql-plugin:bundlePlugin'
@@ -546,10 +594,15 @@ integTest {
// Exclude this IT, because they executed in another task (:integTestWithSecurity)
exclude 'org/opensearch/sql/security/**'
- // Workaround for Gradle 9.4.1 bug: TestEventReporterAsListener crashes with ClassCastException
- // when encountering class-level @Ignore annotations. These classes were already skipped by JUnit;
- // this moves the skip to the Gradle layer to avoid the buggy bridge.
- // Remove once Gradle ships a fix (not fixed as of 9.5.0).
+ // Workaround for Gradle 9.4.1 ClassCastException in TestEventReporterAsListener.started
+ // (line 58) — the bridge casts a parent test descriptor's reporter to
+ // GroupTestEventReporterInternal but a class-level @Ignore produces a non-composite parent
+ // descriptor with a leaf reporter, so the cast fails and aborts the entire integTest task
+ // even though the tests would have been skipped anyway. The bridge is registered by Gradle's
+ // own AbstractTestTask (we can't bypass it from user code), so the only available remedy is
+ // to keep these classes off the test runner's input set. Net behaviour for CI: still
+ // skipped, just at the build layer instead of inside JUnit. Remove once Gradle ships a fix
+ // (not fixed as of 9.5.0). OrderIT is already excluded above.
exclude 'org/opensearch/sql/calcite/remote/CalciteInformationSchemaCommandIT.class'
exclude 'org/opensearch/sql/calcite/remote/CalciteJsonFunctionsIT.class'
exclude 'org/opensearch/sql/calcite/remote/CalcitePrometheusDataSourceCommandsIT.class'
@@ -559,8 +612,8 @@ integTest {
exclude 'org/opensearch/sql/legacy/DateFunctionsIT.class'
exclude 'org/opensearch/sql/legacy/HashJoinIT.class'
exclude 'org/opensearch/sql/legacy/HavingIT.class'
- exclude 'org/opensearch/sql/legacy/JoinIT.class'
exclude 'org/opensearch/sql/legacy/JSONRequestIT.class'
+ exclude 'org/opensearch/sql/legacy/JoinIT.class'
exclude 'org/opensearch/sql/legacy/MathFunctionsIT.class'
exclude 'org/opensearch/sql/legacy/MetricsIT.class'
exclude 'org/opensearch/sql/legacy/MultiQueryIT.class'
@@ -568,9 +621,9 @@ integTest {
exclude 'org/opensearch/sql/legacy/PreparedStatementIT.class'
exclude 'org/opensearch/sql/legacy/QueryFunctionsIT.class'
exclude 'org/opensearch/sql/legacy/QueryIT.class'
+ exclude 'org/opensearch/sql/legacy/SQLFunctionsIT.class'
exclude 'org/opensearch/sql/legacy/ShowIT.class'
exclude 'org/opensearch/sql/legacy/SourceFieldIT.class'
- exclude 'org/opensearch/sql/legacy/SQLFunctionsIT.class'
exclude 'org/opensearch/sql/legacy/SubqueryIT.class'
exclude 'org/opensearch/sql/ppl/JsonFunctionsIT.class'
exclude 'org/opensearch/sql/sql/ExpressionIT.class'
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java
index 219020b1650..87bd412907d 100644
--- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteEvalCommandIT.java
@@ -17,6 +17,7 @@
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
+import org.opensearch.sql.legacy.TestUtils;
import org.opensearch.sql.ppl.PPLIntegTestCase;
public class CalciteEvalCommandIT extends PPLIntegTestCase {
@@ -29,38 +30,63 @@ public void init() throws Exception {
loadIndex(Index.BANK);
loadIndex(Index.TELEMETRY);
- // Create test data for string concatenation
- Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true");
- request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}");
- client().performRequest(request1);
-
- Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true");
- request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}");
- client().performRequest(request2);
-
- Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true");
- request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}");
- client().performRequest(request3);
+ // Pre-create test_eval through the helper so the analytics-engine compatibility run
+ // (tests.analytics.parquet_indices=true) provisions it as a parquet-backed composite
+ // index. Plain auto-mapping via the doc PUTs would create a Lucene-backed index, which
+ // the analytics-engine planner cannot scan ("No backend can scan all requested fields").
+ // Explicit mapping pins types so both v2 (verifySchema "string"/"bigint") and analytics
+ // paths see the same shape regardless of dynamic-mapping behavior on the parquet engine.
+ // Guarded by isIndexExist for idempotency — init() runs before each @Test method.
+ if (!TestUtils.isIndexExist(client(), "test_eval")) {
+ String testEvalMapping =
+ "{\"mappings\":{\"properties\":{"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"age\":{\"type\":\"long\"},"
+ + "\"title\":{\"type\":\"keyword\"}}}}";
+ TestUtils.createIndexByRestClient(client(), "test_eval", testEvalMapping);
+
+ // Create test data for string concatenation
+ Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true");
+ request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}");
+ client().performRequest(request1);
+
+ Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true");
+ request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}");
+ client().performRequest(request2);
+
+ Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true");
+ request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}");
+ client().performRequest(request3);
+ }
// Index with a struct field `agent` to reproduce the reviewer's case from PR #5351:
// source= | fields agent | eval agent.name = "test"
// Rely on dynamic mapping — OpenSearch infers `agent` as an object with string children
// from the document contents. Using dynamic mapping keeps the init idempotent across
// repeated `@Before` invocations in the preserved cluster.
- Request agentDoc1 = new Request("PUT", "/test_eval_agent/_doc/1?refresh=true");
- agentDoc1.setJsonEntity(
- "{\"agent\": {\"name\": \"winlogbeat\", \"version\": \"7.0\"}, \"message\": \"hello\"}");
- client().performRequest(agentDoc1);
-
- Request agentDoc2 = new Request("PUT", "/test_eval_agent/_doc/2?refresh=true");
- agentDoc2.setJsonEntity(
- "{\"agent\": {\"name\": \"filebeat\", \"version\": \"8.1\"}, \"message\": \"world\"}");
- client().performRequest(agentDoc2);
+ if (!TestUtils.isIndexExist(client(), "test_eval_agent")) {
+ Request agentDoc1 = new Request("PUT", "/test_eval_agent/_doc/1?refresh=true");
+ agentDoc1.setJsonEntity(
+ "{\"agent\": {\"name\": \"winlogbeat\", \"version\": \"7.0\"}, \"message\": \"hello\"}");
+ client().performRequest(agentDoc1);
+
+ Request agentDoc2 = new Request("PUT", "/test_eval_agent/_doc/2?refresh=true");
+ agentDoc2.setJsonEntity(
+ "{\"agent\": {\"name\": \"filebeat\", \"version\": \"8.1\"}, \"message\": \"world\"}");
+ client().performRequest(agentDoc2);
+ }
}
@Test
public void testEvalStringConcatenation() throws IOException {
- JSONObject result = executeQuery("source=test_eval | eval greeting = 'Hello ' + name");
+ // Pin the projection so column order is deterministic across execution paths — the
+ // analytics-engine route reads parquet schema in storage order, which can differ from the
+ // v2 / Lucene path's _source-iteration order. Adding an explicit | fields makes the test
+ // a strict assertion on the eval expression rather than a coincidence of projection order.
+ JSONObject result =
+ executeQuery(
+ "source=test_eval | eval greeting = 'Hello ' + name | fields name, title, age,"
+ + " greeting");
verifySchema(
result,
schema("name", "string"),
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java
index 86f87c90c81..24c7e504224 100644
--- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteFieldFormatCommandIT.java
@@ -12,6 +12,7 @@
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
+import org.opensearch.sql.legacy.TestUtils;
import org.opensearch.sql.ppl.PPLIntegTestCase;
public class CalciteFieldFormatCommandIT extends PPLIntegTestCase {
@@ -23,26 +24,48 @@ public void init() throws Exception {
loadIndex(Index.BANK);
- // Create test data for string concatenation
- Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true");
- request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}");
- client().performRequest(request1);
+ // Pre-create test_eval through the helper so the analytics-engine compatibility run
+ // (tests.analytics.parquet_indices=true) provisions it as a parquet-backed composite
+ // index. Plain auto-mapping via the doc PUTs would create a Lucene-backed index, which
+ // the analytics-engine planner cannot scan ("No backend can scan all requested fields").
+ // Explicit mapping pins types so both v2 (verifySchema "string"/"bigint") and analytics
+ // paths see the same shape regardless of dynamic-mapping behavior on the parquet engine.
+ // Guarded by isIndexExist for idempotency — init() runs before each @Test method.
+ if (!TestUtils.isIndexExist(client(), "test_eval")) {
+ String testEvalMapping =
+ "{\"mappings\":{\"properties\":{"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"age\":{\"type\":\"long\"},"
+ + "\"title\":{\"type\":\"keyword\"}}}}";
+ TestUtils.createIndexByRestClient(client(), "test_eval", testEvalMapping);
- Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true");
- request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}");
- client().performRequest(request2);
+ // Create test data for string concatenation
+ Request request1 = new Request("PUT", "/test_eval/_doc/1?refresh=true");
+ request1.setJsonEntity("{\"name\": \"Alice\", \"age\": 25, \"title\": \"Engineer\"}");
+ client().performRequest(request1);
- Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true");
- request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}");
- client().performRequest(request3);
+ Request request2 = new Request("PUT", "/test_eval/_doc/2?refresh=true");
+ request2.setJsonEntity("{\"name\": \"Bob\", \"age\": 30, \"title\": \"Manager\"}");
+ client().performRequest(request2);
+
+ Request request3 = new Request("PUT", "/test_eval/_doc/3?refresh=true");
+ request3.setJsonEntity("{\"name\": \"Charlie\", \"age\": null, \"title\": \"Analyst\"}");
+ client().performRequest(request3);
+ }
}
@Test
public void testFieldFormatStringConcatenation() throws IOException {
+ // Pin the projection so column order is deterministic across execution paths — the
+ // analytics-engine route reads parquet schema in storage order, which can differ from the
+ // v2 / Lucene path's _source-iteration order. Adding an explicit | fields makes the test
+ // a strict assertion on the fieldformat expression rather than a coincidence of projection
+ // order.
JSONObject result =
executeQuery(
StringEscapeUtils.escapeJson(
- "source=test_eval | fieldformat greeting = 'Hello ' + name"));
+ "source=test_eval | fieldformat greeting = 'Hello ' + name | fields name, title,"
+ + " age, greeting"));
verifySchema(
result,
schema("name", "string"),
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java
index 24401444457..3503d7c533c 100644
--- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLRenameIT.java
@@ -14,6 +14,10 @@
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;
import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.hamcrest.Matcher;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.sql.ppl.PPLIntegTestCase;
@@ -40,7 +44,7 @@ public void testRename() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "name", "country", "state", "month", "year", "renamed_age");
}
@Test
@@ -77,7 +81,7 @@ public void testRenameToMetaField() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "name", "country", "state", "month", "year", "_ID");
}
@Test
@@ -156,7 +160,7 @@ public void testRenameWildcardFields() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "nAME", "country", "state", "month", "year", "age");
}
@Test
@@ -171,7 +175,7 @@ public void testRenameMultipleWildcardFields() throws IOException {
schema("couNTry", "string"),
schema("year", "int"),
schema("moNTh", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "name", "couNTry", "state", "moNTh", "year", "age");
}
@Test
@@ -186,7 +190,7 @@ public void testRenameWildcardPrefix() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "new_na", "country", "state", "month", "year", "age");
}
@Test
@@ -247,7 +251,7 @@ public void testRenameMultipleWildcards() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("MoNtH", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "name", "country", "state", "MoNtH", "year", "age");
}
@Test
@@ -296,12 +300,14 @@ public void testRenamingToExistingField() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyDataRows(
+ // After `rename name as age`, the original name column overwrites the original age column;
+ // the (number) age values are gone and only the (string) name values remain under "age".
+ verifyDataRowsByColumn(
result,
- rows("Jake", "USA", "California", 4, 2023),
- rows("Hello", "USA", "New York", 4, 2023),
- rows("John", "Canada", "Ontario", 4, 2023),
- rows("Jane", "Canada", "Quebec", 4, 2023));
+ rowOf("age", "Jake", "country", "USA", "state", "California", "month", 4, "year", 2023),
+ rowOf("age", "Hello", "country", "USA", "state", "New York", "month", 4, "year", 2023),
+ rowOf("age", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023),
+ rowOf("age", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023));
}
@Test
@@ -331,12 +337,12 @@ public void testRenamingNonExistentFieldToExistingField() throws IOException {
schema("country", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyDataRows(
+ verifyDataRowsByColumn(
result,
- rows("Jake", "USA", "California", 4, 2023),
- rows("Hello", "USA", "New York", 4, 2023),
- rows("John", "Canada", "Ontario", 4, 2023),
- rows("Jane", "Canada", "Quebec", 4, 2023));
+ rowOf("name", "Jake", "country", "USA", "state", "California", "month", 4, "year", 2023),
+ rowOf("name", "Hello", "country", "USA", "state", "New York", "month", 4, "year", 2023),
+ rowOf("name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023),
+ rowOf("name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023));
}
@Test
@@ -380,7 +386,7 @@ public void testMultipleRenameWithoutComma() throws IOException {
schema("location", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "user_name", "location", "state", "month", "year", "user_age");
}
@Test
@@ -398,15 +404,103 @@ public void testRenameMixedCommaAndSpace() throws IOException {
schema("location", "string"),
schema("year", "int"),
schema("month", "int"));
- verifyStandardDataRows(result);
+ verifyStandardDataRows(result, "user_name", "location", "state", "month", "year", "user_age");
+ }
+
+ /**
+ * Build a {@code column -> value} map from interleaved varargs ({@code key1, val1, key2, val2,
+ * ...}). Preserves insertion order so the expected-row mapping reads naturally at the call site.
+ */
+ private static Map rowOf(Object... pairs) {
+ if (pairs.length % 2 != 0) {
+ throw new IllegalArgumentException("rowOf expects an even number of args (key, value, ...)");
+ }
+ Map row = new LinkedHashMap<>();
+ for (int i = 0; i < pairs.length; i += 2) {
+ row.put((String) pairs[i], pairs[i + 1]);
+ }
+ return row;
}
private void verifyStandardDataRows(JSONObject result) {
- verifyDataRows(
- result,
- rows("Jake", "USA", "California", 4, 2023, 70),
- rows("Hello", "USA", "New York", 4, 2023, 30),
- rows("John", "Canada", "Ontario", 4, 2023, 25),
- rows("Jane", "Canada", "Quebec", 4, 2023, 20));
+ verifyStandardDataRows(result, "name", "country", "state", "month", "year", "age");
+ }
+
+ /**
+ * Verify the four canonical state_country rows independently of column order.
+ *
+ * The schema check above ({@code verifySchema}) is set-equality on column names; the data row
+ * check {@code verifyDataRows} is positional. The two paths the analytics-engine route can take
+ * return columns in different orders (parquet preserves storage order, the v2 / Lucene path
+ * preserves {@code _source} iteration order), and either is valid given the contract {@code
+ * verifySchema} declares. To avoid baking either order into the test, this helper takes the
+ * canonical-position column names as varargs and reorders the canonical row values to match
+ * whatever column order the response actually returned.
+ *
+ * @param result the response JSON
+ * @param canonicalColumns the column names of the four canonical rows in {@code (name-or-renamed,
+ * country-or-renamed, state, month, year, age-or-renamed)} order. Pass the rename target
+ * where applicable.
+ */
+ private void verifyStandardDataRows(JSONObject result, String... canonicalColumns) {
+ if (canonicalColumns.length != 6) {
+ throw new IllegalArgumentException(
+ "verifyStandardDataRows expects 6 canonical column names; got "
+ + canonicalColumns.length);
+ }
+ Object[][] canonicalValues =
+ new Object[][] {
+ {"Jake", "USA", "California", 4, 2023, 70},
+ {"Hello", "USA", "New York", 4, 2023, 30},
+ {"John", "Canada", "Ontario", 4, 2023, 25},
+ {"Jane", "Canada", "Quebec", 4, 2023, 20}
+ };
+ Map[] expectedRows = new LinkedHashMap[canonicalValues.length];
+ for (int i = 0; i < canonicalValues.length; i++) {
+ Map row = new LinkedHashMap<>();
+ for (int c = 0; c < canonicalColumns.length; c++) {
+ row.put(canonicalColumns[c], canonicalValues[i][c]);
+ }
+ expectedRows[i] = row;
+ }
+ verifyDataRowsByColumn(result, expectedRows);
+ }
+
+ /**
+ * Match expected rows against the response by column name, ignoring the response's column
+ * emission order. For each expected row (a {@code column-name -> value} map), the value at each
+ * schema position is looked up by name. Tests using this helper become engine-order agnostic: a
+ * parquet-backed response and a Lucene-backed response yield the same assertion outcome as long
+ * as the column-name-to-value mapping agrees.
+ */
+ @SafeVarargs
+ @SuppressWarnings("varargs")
+ private final void verifyDataRowsByColumn(
+ JSONObject result, Map... expectedRows) {
+ JSONArray schema = result.getJSONArray("schema");
+ int n = schema.length();
+ String[] columnOrder = new String[n];
+ for (int i = 0; i < n; i++) {
+ columnOrder[i] = schema.getJSONObject(i).getString("name");
+ }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Matcher[] rowMatchers = new Matcher[expectedRows.length];
+ for (int r = 0; r < expectedRows.length; r++) {
+ Object[] reordered = new Object[n];
+ for (int c = 0; c < n; c++) {
+ if (!expectedRows[r].containsKey(columnOrder[c])) {
+ throw new IllegalArgumentException(
+ "Expected row at index "
+ + r
+ + " is missing canonical value for response column ["
+ + columnOrder[c]
+ + "]; provided keys: "
+ + expectedRows[r].keySet());
+ }
+ reordered[c] = expectedRows[r].get(columnOrder[c]);
+ }
+ rowMatchers[r] = rows(reordered);
+ }
+ verifyDataRows(result, rowMatchers);
}
}
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java
index 44cc4a3aaf0..5943a3c5d30 100644
--- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteReplaceCommandIT.java
@@ -9,6 +9,10 @@
import static org.opensearch.sql.util.MatcherUtils.*;
import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.hamcrest.Matcher;
+import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
@@ -61,12 +65,41 @@ public void testMultipleReplace() throws IOException {
schema("year", "int"),
schema("age", "int"));
- verifyDataRows(
+ // Match by column name — analytics-engine and v2 paths return columns in different orders.
+ verifyDataRowsByColumn(
result,
- rows("Jake", "United States", "California", 4, 2023, 70),
- rows("Hello", "United States", "New York", 4, 2023, 30),
- rows("John", "Canada", "Ontario", 4, 2023, 25),
- rows("Joseph", "Canada", "Quebec", 4, 2023, 20));
+ rowOf(
+ "name",
+ "Jake",
+ "country",
+ "United States",
+ "state",
+ "California",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 70),
+ rowOf(
+ "name",
+ "Hello",
+ "country",
+ "United States",
+ "state",
+ "New York",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 30),
+ rowOf(
+ "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023,
+ "age", 25),
+ rowOf(
+ "name", "Joseph", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023,
+ "age", 20));
}
@Test
@@ -121,12 +154,40 @@ public void testEmptyStringReplacement() throws IOException {
schema("year", "int"),
schema("age", "int"));
- verifyDataRows(
+ verifyDataRowsByColumn(
result,
- rows("Jake", "", "California", 4, 2023, 70),
- rows("Hello", "", "New York", 4, 2023, 30),
- rows("John", "Canada", "Ontario", 4, 2023, 25),
- rows("Jane", "Canada", "Quebec", 4, 2023, 20));
+ rowOf(
+ "name",
+ "Jake",
+ "country",
+ "",
+ "state",
+ "California",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 70),
+ rowOf(
+ "name",
+ "Hello",
+ "country",
+ "",
+ "state",
+ "New York",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 30),
+ rowOf(
+ "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023,
+ "age", 25),
+ rowOf(
+ "name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023, "age",
+ 20));
}
@Test
@@ -146,12 +207,40 @@ public void testMultipleFieldsInClause() throws IOException {
schema("year", "int"),
schema("age", "int"));
- verifyDataRows(
+ verifyDataRowsByColumn(
result,
- rows("Jake", "United States", "California", 4, 2023, 70),
- rows("Hello", "United States", "New York", 4, 2023, 30),
- rows("John", "Canada", "Ontario", 4, 2023, 25),
- rows("Jane", "Canada", "Quebec", 4, 2023, 20));
+ rowOf(
+ "name",
+ "Jake",
+ "country",
+ "United States",
+ "state",
+ "California",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 70),
+ rowOf(
+ "name",
+ "Hello",
+ "country",
+ "United States",
+ "state",
+ "New York",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 30),
+ rowOf(
+ "name", "John", "country", "Canada", "state", "Ontario", "month", 4, "year", 2023,
+ "age", 25),
+ rowOf(
+ "name", "Jane", "country", "Canada", "state", "Quebec", "month", 4, "year", 2023, "age",
+ 20));
}
@Test
@@ -164,10 +253,16 @@ public void testReplaceNonExistentField() {
String.format(
"source = %s | replace 'USA' WITH 'United States' IN non_existent_field",
TEST_INDEX_STATE_COUNTRY)));
- verifyErrorMessageContains(
- e,
- "field [non_existent_field] not found; input fields are: [name, country, state, month,"
- + " year, age, _id, _index, _score, _maxscore, _sort, _routing]");
+ // Order-agnostic — analytics-engine and v2 paths emit the input-field list in different
+ // orders (parquet preserves storage order, Lucene preserves _source iteration order).
+ // Assert that the prefix and every expected field name appear somewhere in the message.
+ verifyErrorMessageContains(e, "field [non_existent_field] not found; input fields are:");
+ verifyErrorMessageContains(e, "name");
+ verifyErrorMessageContains(e, "country");
+ verifyErrorMessageContains(e, "state");
+ verifyErrorMessageContains(e, "month");
+ verifyErrorMessageContains(e, "year");
+ verifyErrorMessageContains(e, "age");
}
@Test
@@ -259,12 +354,40 @@ public void testMultiplePairsInSingleCommand() throws IOException {
schema("year", "int"),
schema("age", "int"));
- verifyDataRows(
+ verifyDataRowsByColumn(
result,
- rows("Jake", "United States", "California", 4, 2023, 70),
- rows("Hello", "United States", "New York", 4, 2023, 30),
- rows("John", "CA", "Ontario", 4, 2023, 25),
- rows("Jane", "CA", "Quebec", 4, 2023, 20));
+ rowOf(
+ "name",
+ "Jake",
+ "country",
+ "United States",
+ "state",
+ "California",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 70),
+ rowOf(
+ "name",
+ "Hello",
+ "country",
+ "United States",
+ "state",
+ "New York",
+ "month",
+ 4,
+ "year",
+ 2023,
+ "age",
+ 30),
+ rowOf(
+ "name", "John", "country", "CA", "state", "Ontario", "month", 4, "year", 2023, "age",
+ 25),
+ rowOf(
+ "name", "Jane", "country", "CA", "state", "Quebec", "month", 4, "year", 2023, "age",
+ 20));
}
@Test
@@ -402,4 +525,61 @@ public void testEscapeSequence_noMatchLiteral() throws IOException {
// Pattern "foo\*bar" matches literal "foo*bar", not "fooXbar", so original value returned
verifyDataRows(result, rows("fooXbar"));
}
+
+ /**
+ * Build a {@code column -> value} map from interleaved varargs ({@code key1, val1, key2, val2,
+ * ...}). Preserves insertion order so the expected-row mapping reads naturally at the call site.
+ */
+ private static Map rowOf(Object... pairs) {
+ if (pairs.length % 2 != 0) {
+ throw new IllegalArgumentException("rowOf expects an even number of args (key, value, ...)");
+ }
+ Map row = new LinkedHashMap<>();
+ for (int i = 0; i < pairs.length; i += 2) {
+ row.put((String) pairs[i], pairs[i + 1]);
+ }
+ return row;
+ }
+
+ /**
+ * Match expected rows against the response by column name, ignoring the response's column
+ * emission order. The two paths the analytics-engine route can take return columns in different
+ * orders (parquet preserves storage order, the v2 / Lucene path preserves {@code _source}
+ * iteration order), and either is valid given the contract {@code verifySchema} declares (set
+ * equality on column names). To avoid baking either order into the test, this helper reorders
+ * each expected row to match whatever column order the response actually returned.
+ *
+ * Mirrors the helper in {@code CalcitePPLRenameIT} (commit 59c728b) — same pattern applied to
+ * PPL {@code replace} command tests.
+ */
+ @SafeVarargs
+ @SuppressWarnings("varargs")
+ private final void verifyDataRowsByColumn(
+ JSONObject result, Map... expectedRows) {
+ JSONArray schema = result.getJSONArray("schema");
+ int n = schema.length();
+ String[] columnOrder = new String[n];
+ for (int i = 0; i < n; i++) {
+ columnOrder[i] = schema.getJSONObject(i).getString("name");
+ }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Matcher[] rowMatchers = new Matcher[expectedRows.length];
+ for (int r = 0; r < expectedRows.length; r++) {
+ Object[] reordered = new Object[n];
+ for (int c = 0; c < n; c++) {
+ if (!expectedRows[r].containsKey(columnOrder[c])) {
+ throw new IllegalArgumentException(
+ "Expected row at index "
+ + r
+ + " is missing canonical value for response column ["
+ + columnOrder[c]
+ + "]; provided keys: "
+ + expectedRows[r].keySet());
+ }
+ reordered[c] = expectedRows[r].get(columnOrder[c]);
+ }
+ rowMatchers[r] = rows(reordered);
+ }
+ verifyDataRows(result, rowMatchers);
+ }
}
diff --git a/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java b/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java
new file mode 100644
index 00000000000..5cd89fa7cd9
--- /dev/null
+++ b/integ-test/src/test/java/org/opensearch/sql/plugin/AnalyticsEngineCompatIT.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.plugin;
+
+import org.junit.Test;
+import org.opensearch.test.rest.OpenSearchRestTestCase;
+
+/**
+ * Smoke test: verifies that opensearch-sql loads cleanly alongside arrow-flight-rpc and
+ * analytics-engine. A successful cluster start is the only assertion — no sql-specific logic runs.
+ */
+public class AnalyticsEngineCompatIT extends OpenSearchRestTestCase {
+
+ @Test
+ public void testClusterStarted() {
+ // If the cluster booted, all three plugins loaded without classloader errors.
+ }
+}
diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
index 676dee1751a..2950f6c9b85 100644
--- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
+++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlAction.java
@@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
@@ -82,10 +83,21 @@ public class RestSqlAction extends BaseRestHandler {
/** New SQL query request handler. */
private final RestSQLQueryAction newSqlQueryHandler;
- public RestSqlAction(Settings settings, Injector injector) {
+ /**
+ * Analytics router. Called before the normal SQL engine. Accepts the request and channel, returns
+ * {@code true} if it handled the request (analytics index), {@code false} to fall through to
+ * normal SQL engine.
+ */
+ private final BiFunction analyticsRouter;
+
+ public RestSqlAction(
+ Settings settings,
+ Injector injector,
+ BiFunction analyticsRouter) {
super();
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
this.newSqlQueryHandler = new RestSQLQueryAction(injector);
+ this.analyticsRouter = analyticsRouter;
}
@Override
@@ -133,7 +145,6 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
Format format = SqlRequestParam.getFormat(request.params());
- // Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest =
new SQLQueryRequest(
sqlRequest.getJsonContent(),
@@ -141,31 +152,58 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
request.path(),
request.params(),
sqlRequest.cursor());
- return newSqlQueryHandler.prepareRequest(
- newSqlRequest,
- (restChannel, exception) -> {
- try {
- if (newSqlRequest.isExplainRequest()) {
- LOG.info(
- "Request is falling back to old SQL engine due to: " + exception.getMessage());
- }
- LOG.info(
- "[{}] Request {} is not supported and falling back to old SQL engine",
- QueryContext.getRequestId(),
- newSqlRequest);
- LOG.info("Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
- QueryAction queryAction = explainRequest(client, sqlRequest, format);
- executeSqlRequest(request, queryAction, client, restChannel);
- } catch (Exception e) {
- handleException(restChannel, e);
- }
- },
- this::handleException);
+
+ // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
+ // The router returns true and sends the response directly if it handled the request.
+ final SQLQueryRequest finalRequest = newSqlRequest;
+ return channel -> {
+ if (!analyticsRouter.apply(finalRequest, channel)) {
+ delegateToV2Engine(request, client, sqlRequest, finalRequest, format, channel);
+ }
+ };
} catch (Exception e) {
return channel -> handleException(channel, e);
}
}
+ /** Delegate a SQL query to the V2 engine with legacy fallback. */
+ private void delegateToV2Engine(
+ RestRequest request,
+ NodeClient client,
+ SqlRequest sqlRequest,
+ SQLQueryRequest sqlQueryRequest,
+ Format format,
+ RestChannel channel) {
+ try {
+ newSqlQueryHandler
+ .prepareRequest(
+ sqlQueryRequest,
+ (restChannel, exception) -> {
+ try {
+ if (sqlQueryRequest.isExplainRequest()) {
+ LOG.info(
+ "Request is falling back to old SQL engine due to: "
+ + exception.getMessage());
+ }
+ LOG.info(
+ "[{}] Request {} is not supported and falling back to old SQL engine",
+ QueryContext.getRequestId(),
+ sqlQueryRequest);
+ LOG.info(
+ "Request Query: {}", QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));
+ QueryAction queryAction = explainRequest(client, sqlRequest, format);
+ executeSqlRequest(request, queryAction, client, restChannel);
+ } catch (Exception e) {
+ handleException(restChannel, e);
+ }
+ },
+ this::handleException)
+ .accept(channel);
+ } catch (Exception e) {
+ handleException(channel, e);
+ }
+ }
+
private void handleException(RestChannel restChannel, Exception exception) {
RestStatus status = getRestStatus(exception);
logAndPublishMetrics(status, exception);
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java
index 224d7019ec2..6e2240909b0 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/CalciteScriptEngine.java
@@ -77,6 +77,7 @@
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.StringSortScript;
import org.opensearch.search.lookup.SourceLookup;
+import org.opensearch.sql.calcite.utils.CalciteClassLoaderHelper;
import org.opensearch.sql.data.model.ExprTimestampValue;
import org.opensearch.sql.opensearch.storage.script.aggregation.CalciteAggregationScriptFactory;
import org.opensearch.sql.opensearch.storage.script.field.CalciteFieldScriptFactory;
@@ -138,7 +139,9 @@ public T compile(
new RelRecordType(List.of()));
Function1 function =
- new RexExecutable(code, "generated Rex code").getFunction();
+ CalciteClassLoaderHelper.withCalciteClassLoader(
+ () -> new RexExecutable(code, "generated Rex code").getFunction(),
+ CalciteScriptEngine.class);
if (CONTEXTS.containsKey(context)) {
return context.factoryClazz.cast(CONTEXTS.get(context).apply(function, rexNode.getType()));
diff --git a/plugin/build.gradle b/plugin/build.gradle
index 340787fa01f..708c4b18b35 100644
--- a/plugin/build.gradle
+++ b/plugin/build.gradle
@@ -1,5 +1,4 @@
import java.util.concurrent.Callable
-import org.opensearch.gradle.dependencies.CompileOnlyResolvePlugin
/*
* Copyright OpenSearch Contributors
@@ -55,7 +54,7 @@ opensearchplugin {
name 'opensearch-sql'
description 'OpenSearch SQL'
classname 'org.opensearch.sql.plugin.SQLPlugin'
- extendedPlugins = ['opensearch-job-scheduler']
+ extendedPlugins = ['opensearch-job-scheduler', 'analytics-engine;optional=true']
licenseFile rootProject.file("LICENSE.txt")
noticeFile rootProject.file("NOTICE")
}
@@ -160,6 +159,8 @@ dependencies {
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}"
api project(":ppl")
+ api project(':api')
+ implementation("org.opensearch.sandbox:analytics-api:${opensearch_version}")
api project(':legacy')
api project(':opensearch')
api project(':prometheus')
@@ -320,4 +321,3 @@ testClusters.integTest {
run {
useCluster testClusters.integTest
}
-
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
index d92788ac43b..3692f49688d 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@@ -36,8 +37,10 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
+import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
+import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
@@ -51,11 +54,15 @@
import org.opensearch.plugins.ScriptPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.repositories.RepositoriesService;
+import org.opensearch.rest.BytesRestResponse;
+import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptContext;
import org.opensearch.script.ScriptEngine;
import org.opensearch.script.ScriptService;
+import org.opensearch.sql.ast.statement.ExplainMode;
+import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelper;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
@@ -87,6 +94,8 @@
import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse;
import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse;
import org.opensearch.sql.executor.ExecutionEngine;
+import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
+import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.plugin.RestSqlAction;
@@ -97,14 +106,18 @@
import org.opensearch.sql.opensearch.storage.script.CompoundedScriptEngine;
import org.opensearch.sql.plugin.config.EngineExtensionsHolder;
import org.opensearch.sql.plugin.config.OpenSearchPluginModule;
+import org.opensearch.sql.plugin.rest.AnalyticsExecutorHolder;
import org.opensearch.sql.plugin.rest.RestPPLGrammarAction;
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
import org.opensearch.sql.plugin.rest.RestQuerySettingsAction;
+import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction;
import org.opensearch.sql.plugin.transport.PPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
+import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
+import org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
@@ -120,6 +133,7 @@
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse;
+import org.opensearch.sql.sql.domain.SQLQueryRequest;
import org.opensearch.sql.storage.DataSourceFactory;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
@@ -185,7 +199,7 @@ public List getRestHandlers(
return Arrays.asList(
new RestPPLQueryAction(),
new RestPPLGrammarAction(),
- new RestSqlAction(settings, injector),
+ new RestSqlAction(settings, injector, createSqlAnalyticsRouter()),
new RestSqlStatsAction(settings, restController),
new RestPPLStatsAction(settings, restController),
new RestQuerySettingsAction(settings, restController),
@@ -195,6 +209,88 @@ public List getRestHandlers(
new RestDirectQueryResourcesManagementAction((OpenSearchSettings) pluginSettings));
}
+ /**
+ * Creates a routing function for SQL queries targeting analytics engine indices. Returns {@code
+ * true} if the query was handled (analytics index), {@code false} to fall through to normal SQL.
+ *
+ * The {@link RestUnifiedQueryAction} is built lazily on the first request because the
+ * analytics-engine {@code QueryPlanExecutor} is published into {@link AnalyticsExecutorHolder} by
+ * {@code TransportPPLQueryAction}'s {@code @Inject} constructor — which fires after the Node
+ * Guice injector is built, i.e. after {@code getRestHandlers}. If the executor is still
+ * unavailable when a SQL request arrives, the router falls through to the legacy SQL path.
+ */
+ private BiFunction createSqlAnalyticsRouter() {
+ final RestUnifiedQueryAction[] cached = new RestUnifiedQueryAction[1];
+ java.util.function.Supplier handlerSupplier =
+ () -> {
+ if (cached[0] == null) {
+ var executor = AnalyticsExecutorHolder.get();
+ if (executor == null) {
+ return null;
+ }
+ cached[0] =
+ new RestUnifiedQueryAction(client, clusterService, executor, pluginSettings);
+ }
+ return cached[0];
+ };
+ return (sqlRequest, channel) -> {
+ RestUnifiedQueryAction unifiedQueryHandler = handlerSupplier.get();
+ if (unifiedQueryHandler == null
+ || !unifiedQueryHandler.isAnalyticsIndex(sqlRequest.getQuery(), QueryType.SQL)) {
+ return false;
+ }
+ if (sqlRequest.isExplainRequest()) {
+ unifiedQueryHandler.explain(
+ sqlRequest.getQuery(),
+ QueryType.SQL,
+ ExplainMode.STANDARD,
+ new ResponseListener<>() {
+ @Override
+ public void onResponse(ExplainResponse response) {
+ JsonResponseFormatter formatter =
+ new JsonResponseFormatter<>(Style.PRETTY) {
+ @Override
+ protected Object buildJsonObject(ExplainResponse resp) {
+ return resp;
+ }
+ };
+ channel.sendResponse(
+ new BytesRestResponse(
+ RestStatus.OK,
+ "application/json; charset=UTF-8",
+ formatter.format(response)));
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ channel.sendResponse(
+ new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ });
+ } else {
+ unifiedQueryHandler.execute(
+ sqlRequest.getQuery(),
+ QueryType.SQL,
+ false,
+ new ActionListener<>() {
+ @Override
+ public void onResponse(TransportPPLQueryResponse response) {
+ channel.sendResponse(
+ new BytesRestResponse(
+ RestStatus.OK, "application/json; charset=UTF-8", response.getResult()));
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ channel.sendResponse(
+ new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ });
+ }
+ return true;
+ };
+ }
+
/** Register action and handler so that transportClient can find proxy for action. */
@Override
public List> getActions() {
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java
new file mode 100644
index 00000000000..fa3e7d1d1fa
--- /dev/null
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/AnalyticsExecutorHolder.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.plugin.rest;
+
+import org.apache.calcite.rel.RelNode;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
+
+/**
+ * Bridge for sharing the analytics-engine {@link QueryPlanExecutor} between the PPL transport
+ * action (where Guice resolves the binding via {@code @Inject}) and the REST-only SQL router (where
+ * Guice cannot, because {@code SQLPlugin#getRestHandlers} runs before the Node-level injector
+ * satisfies {@code @Inject} parameters).
+ *
+ * Why a static holder: cross-plugin Guice injection needs a class registered in the Node
+ * injector, and {@link org.opensearch.sql.plugin.SQLPlugin}'s SQL routing path is built in {@code
+ * getRestHandlers} — outside any Guice-managed lifecycle. Persisting the executor in this holder
+ * once {@link org.opensearch.sql.plugin.transport.TransportPPLQueryAction} is constructed lets the
+ * SQL router read the same instance without going back through the injector.
+ */
+public final class AnalyticsExecutorHolder {
+
+ private static volatile QueryPlanExecutor> executor;
+
+ private AnalyticsExecutorHolder() {}
+
+ public static void set(QueryPlanExecutor> instance) {
+ executor = instance;
+ }
+
+ public static QueryPlanExecutor> get() {
+ return executor;
+ }
+}
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java
new file mode 100644
index 00000000000..e1bb84778dd
--- /dev/null
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java
@@ -0,0 +1,290 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.plugin.rest;
+
+import static org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
+import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC;
+import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME;
+import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY;
+
+import java.util.Map;
+import java.util.Optional;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.ThreadContext;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
+import org.opensearch.analytics.schema.OpenSearchSchemaBuilder;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.unit.TimeValue;
+import org.opensearch.core.action.ActionListener;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.sql.api.UnifiedQueryContext;
+import org.opensearch.sql.api.UnifiedQueryPlanner;
+import org.opensearch.sql.ast.AbstractNodeVisitor;
+import org.opensearch.sql.ast.statement.ExplainMode;
+import org.opensearch.sql.ast.tree.Relation;
+import org.opensearch.sql.ast.tree.UnresolvedPlan;
+import org.opensearch.sql.calcite.CalcitePlanContext;
+import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
+import org.opensearch.sql.common.response.ResponseListener;
+import org.opensearch.sql.executor.ExecutionEngine.QueryResponse;
+import org.opensearch.sql.executor.QueryType;
+import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine;
+import org.opensearch.sql.lang.LangSpec;
+import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
+import org.opensearch.sql.protocol.response.QueryResult;
+import org.opensearch.sql.protocol.response.format.ResponseFormatter;
+import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter;
+import org.opensearch.transport.client.node.NodeClient;
+
+/**
+ * Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL/SQL
+ * queries using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates
+ * to {@link AnalyticsExecutionEngine} for execution.
+ */
+public class RestUnifiedQueryAction {
+
+ private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class);
+ private static final String SCHEMA_NAME = "opensearch";
+
+ private final AnalyticsExecutionEngine analyticsEngine;
+ private final NodeClient client;
+ private final ClusterService clusterService;
+ private final org.opensearch.sql.common.setting.Settings pluginSettings;
+
+ public RestUnifiedQueryAction(
+ NodeClient client,
+ ClusterService clusterService,
+ QueryPlanExecutor> planExecutor,
+ org.opensearch.sql.common.setting.Settings pluginSettings) {
+ this.client = client;
+ this.clusterService = clusterService;
+ this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor);
+ this.pluginSettings = pluginSettings;
+ }
+
+ /**
+ * Returns true iff the target index has {@link
+ * IndexSettings#PLUGGABLE_DATAFORMAT_ENABLED_SETTING} set and {@link
+ * IndexSettings#PLUGGABLE_DATAFORMAT_VALUE_SETTING} is {@code "composite"}, routing it to
+ * DataFusion instead of the Calcite→DSL path.
+ *
+ * Note: This creates a separate UnifiedQueryContext for parsing. The context cannot be shared
+ * with doExecute/doExplain because UnifiedQueryContext holds a Calcite JDBC connection that fails
+ * when used across threads (transport thread -> sql-worker thread). When real catalog metadata
+ * makes this expensive, consider moving the routing check to the sql-worker thread.
+ */
+ public boolean isAnalyticsIndex(String query, QueryType queryType) {
+ if (query == null || query.isEmpty()) {
+ return false;
+ }
+ try (UnifiedQueryContext context = buildParsingContext(queryType)) {
+ return extractIndexName(query, queryType, context)
+ .map(this::stripSchemaPrefix)
+ .map(this::isPluggableDataformatIndex)
+ .orElse(false);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ private String stripSchemaPrefix(String indexName) {
+ int lastDot = indexName.lastIndexOf('.');
+ return lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName;
+ }
+
+ private boolean isPluggableDataformatIndex(String indexName) {
+ var indexMetadata = clusterService.state().metadata().index(indexName);
+ if (indexMetadata == null) {
+ return false;
+ }
+ var settings = indexMetadata.getSettings();
+ return IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.get(settings)
+ && "composite".equals(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.get(settings));
+ }
+
+ /** Execute a query through the unified query pipeline on the sql-worker thread pool. */
+ public void execute(
+ String query,
+ QueryType queryType,
+ boolean profiling,
+ ActionListener listener) {
+ client
+ .threadPool()
+ .schedule(
+ withCurrentContext(
+ () -> {
+ try (UnifiedQueryContext context = buildContext(queryType, profiling)) {
+ UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
+ RelNode plan = planner.plan(query);
+ CalcitePlanContext planContext = context.getPlanContext();
+ plan = addQuerySizeLimit(plan, planContext);
+ analyticsEngine.execute(
+ plan, planContext, createQueryListener(queryType, listener));
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }),
+ new TimeValue(0),
+ SQL_WORKER_THREAD_POOL_NAME);
+ }
+
+ /**
+ * Explain a query through the unified query pipeline on the sql-worker thread pool. Returns
+ * ExplainResponse via ResponseListener so the caller can format it.
+ */
+ public void explain(
+ String query,
+ QueryType queryType,
+ ExplainMode mode,
+ ResponseListener listener) {
+ client
+ .threadPool()
+ .schedule(
+ withCurrentContext(
+ () -> {
+ try (UnifiedQueryContext context = buildContext(queryType, false)) {
+ UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context);
+ RelNode plan = planner.plan(query);
+ CalcitePlanContext planContext = context.getPlanContext();
+ plan = addQuerySizeLimit(plan, planContext);
+ analyticsEngine.explain(plan, mode, planContext, listener);
+ } catch (Exception e) {
+ listener.onFailure(e);
+ }
+ }),
+ new TimeValue(0),
+ SQL_WORKER_THREAD_POOL_NAME);
+ }
+
+ /**
+ * Build a lightweight context for parsing only (index name extraction). Does not require cluster
+ * state or catalog schema.
+ */
+ private UnifiedQueryContext buildParsingContext(QueryType queryType) {
+ return applyClusterOverrides(UnifiedQueryContext.builder().language(queryType)).build();
+ }
+
+ private UnifiedQueryContext buildContext(QueryType queryType, boolean profiling) {
+ return applyClusterOverrides(
+ UnifiedQueryContext.builder()
+ .language(queryType)
+ .catalog(SCHEMA_NAME, OpenSearchSchemaBuilder.buildSchema(clusterService.state()))
+ .defaultNamespace(SCHEMA_NAME)
+ .profiling(profiling))
+ .build();
+ }
+
+ /**
+ * Routes operator-configured cluster overrides into the builder via the existing {@code
+ * setting(String, Object)} API, keeping {@link UnifiedQueryContext} decoupled from any specific
+ * {@link org.opensearch.sql.common.setting.Settings} implementation.
+ *
+ * Currently scoped to {@code plugins.ppl.rex.max_match.limit} — required so the unified path
+ * honors {@code _cluster/settings} updates for {@code rex max_match} (CalciteRexCommandIT's
+ * testRexMaxMatchConfigurableLimit). Add keys here if a future PR / IT depends on cluster-side
+ * fidelity for one of the other planning settings.
+ */
+ private UnifiedQueryContext.Builder applyClusterOverrides(UnifiedQueryContext.Builder builder) {
+ Object rexLimit =
+ pluginSettings.getSettingValue(
+ org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT);
+ if (rexLimit != null) {
+ builder.setting(
+ org.opensearch.sql.common.setting.Settings.Key.PPL_REX_MAX_MATCH_LIMIT.getKeyValue(),
+ rexLimit);
+ }
+ return builder;
+ }
+
+ /**
+ * Extract the source index name by parsing the query and visiting the AST to find the Relation
+ * node. Uses the context's parser which supports both PPL and SQL.
+ */
+ private static Optional extractIndexName(
+ String query, QueryType queryType, UnifiedQueryContext context) {
+ if (queryType == QueryType.PPL) {
+ UnresolvedPlan unresolvedPlan = (UnresolvedPlan) context.getParser().parse(query);
+ return Optional.ofNullable(unresolvedPlan.accept(new IndexNameExtractor(), null));
+ }
+ SqlNode sqlNode = (SqlNode) context.getParser().parse(query);
+ return Optional.ofNullable(extractTableNameFromSqlNode(sqlNode));
+ }
+
+ /** AST visitor that extracts the source index name from a Relation node (PPL path). */
+ private static class IndexNameExtractor extends AbstractNodeVisitor {
+ @Override
+ public String visitRelation(Relation node, Void context) {
+ return node.getTableQualifiedName().toString();
+ }
+ }
+
+ /** SqlNode visitor that extracts the source table name from a SQL parse tree. */
+ private static class SqlTableNameExtractor extends SqlBasicVisitor {
+ @Override
+ public String visit(SqlCall call) {
+ if (call instanceof SqlSelect select) {
+ return select.getFrom().accept(this);
+ }
+ if (call instanceof SqlJoin join) {
+ return join.getLeft().accept(this);
+ }
+ return null;
+ }
+
+ @Override
+ public String visit(SqlIdentifier id) {
+ return id.toString();
+ }
+ }
+
+ private static String extractTableNameFromSqlNode(SqlNode sqlNode) {
+ return sqlNode.accept(new SqlTableNameExtractor());
+ }
+
+ private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) {
+ return LogicalSystemLimit.create(
+ LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT,
+ plan,
+ context.relBuilder.literal(context.sysLimit.querySizeLimit()));
+ }
+
+ private ResponseListener createQueryListener(
+ QueryType queryType, ActionListener transportListener) {
+ ResponseFormatter formatter = new SimpleJsonResponseFormatter(PRETTY);
+ return new ResponseListener() {
+ @Override
+ public void onResponse(QueryResponse response) {
+ LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC;
+ String result =
+ formatter.format(
+ new QueryResult(
+ response.getSchema(), response.getResults(), response.getCursor(), langSpec));
+ transportListener.onResponse(new TransportPPLQueryResponse(result));
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ transportListener.onFailure(e);
+ }
+ };
+ }
+
+ private static Runnable withCurrentContext(final Runnable task) {
+ final Map currentContext = ThreadContext.getImmutableContext();
+ return () -> {
+ ThreadContext.putAll(currentContext);
+ task.run();
+ };
+ }
+}
diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
index ef10aaca451..95f5b014ef0 100644
--- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
+++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java
@@ -13,9 +13,11 @@
import java.util.Locale;
import java.util.Optional;
import java.util.function.Supplier;
+import org.apache.calcite.rel.RelNode;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Guice;
import org.opensearch.common.inject.Inject;
@@ -28,6 +30,7 @@
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.executor.ExecutionEngine;
+import org.opensearch.sql.executor.QueryType;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.monitor.profile.QueryProfiling;
@@ -35,6 +38,8 @@
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.plugin.config.EngineExtensionsHolder;
import org.opensearch.sql.plugin.config.OpenSearchPluginModule;
+import org.opensearch.sql.plugin.rest.AnalyticsExecutorHolder;
+import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction;
import org.opensearch.sql.ppl.PPLService;
import org.opensearch.sql.ppl.domain.PPLQueryRequest;
import org.opensearch.sql.protocol.response.QueryResult;
@@ -58,7 +63,13 @@ public class TransportPPLQueryAction
private final Supplier pplEnabled;
- /** Constructor of TransportPPLQueryAction. */
+ /** Null when analytics-engine plugin is absent; set via {@link #setQueryPlanExecutor}. */
+ private volatile RestUnifiedQueryAction unifiedQueryHandler;
+
+ private final NodeClient clientRef;
+ private final ClusterService clusterServiceRef;
+ private final org.opensearch.sql.common.setting.Settings pluginSettingsRef;
+
@Inject
public TransportPPLQueryAction(
TransportService transportService,
@@ -69,14 +80,18 @@ public TransportPPLQueryAction(
org.opensearch.common.settings.Settings clusterSettings,
EngineExtensionsHolder extensionsHolder) {
super(PPLQueryAction.NAME, transportService, actionFilters, TransportPPLQueryRequest::new);
+ this.clientRef = client;
+ this.clusterServiceRef = clusterService;
ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule(extensionsHolder.engines()));
+ org.opensearch.sql.common.setting.Settings pluginSettings =
+ new OpenSearchSettings(clusterService.getClusterSettings());
+ this.pluginSettingsRef = pluginSettings;
modules.add(
b -> {
b.bind(NodeClient.class).toInstance(client);
- b.bind(org.opensearch.sql.common.setting.Settings.class)
- .toInstance(new OpenSearchSettings(clusterService.getClusterSettings()));
+ b.bind(org.opensearch.sql.common.setting.Settings.class).toInstance(pluginSettings);
b.bind(DataSourceService.class).toInstance(dataSourceService);
});
this.injector = Guice.createInjector(modules);
@@ -89,6 +104,16 @@ public TransportPPLQueryAction(
.getSettingValue(Settings.Key.PPL_ENABLED);
}
+ /** Invoked by Guice iff analytics-engine bound {@code QueryPlanExecutor}. */
+ @Inject(optional = true)
+ public void setQueryPlanExecutor(
+ QueryPlanExecutor> queryPlanExecutor) {
+ AnalyticsExecutorHolder.set(queryPlanExecutor);
+ this.unifiedQueryHandler =
+ new RestUnifiedQueryAction(
+ clientRef, clusterServiceRef, queryPlanExecutor, pluginSettingsRef);
+ }
+
/**
* {@inheritDoc} Transform the request and call super.doExecute() to support call from other
* plugins.
@@ -120,12 +145,32 @@ protected void doExecute(
QueryContext.addRequestId();
- PPLService pplService = injector.getInstance(PPLService.class);
// in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest
PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest();
QueryContext.setProfile(transformedRequest.profile());
ActionListener clearingListener = wrapWithProfilingClear(listener);
+ // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices.
+ if (unifiedQueryHandler != null
+ && unifiedQueryHandler.isAnalyticsIndex(transformedRequest.getRequest(), QueryType.PPL)) {
+ if (transformedRequest.isExplainRequest()) {
+ unifiedQueryHandler.explain(
+ transformedRequest.getRequest(),
+ QueryType.PPL,
+ transformedRequest.mode(),
+ createExplainResponseListener(transformedRequest, clearingListener));
+ } else {
+ unifiedQueryHandler.execute(
+ transformedRequest.getRequest(),
+ QueryType.PPL,
+ transformedRequest.profile(),
+ clearingListener);
+ }
+ return;
+ }
+
+ PPLService pplService = injector.getInstance(PPLService.class);
+
if (transformedRequest.isExplainRequest()) {
pplService.explain(
transformedRequest, createExplainResponseListener(transformedRequest, clearingListener));
diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java
new file mode 100644
index 00000000000..c8012660369
--- /dev/null
+++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.plugin.rest;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.calcite.rel.RelNode;
+import org.junit.Before;
+import org.junit.Test;
+import org.opensearch.analytics.exec.QueryPlanExecutor;
+import org.opensearch.cluster.ClusterState;
+import org.opensearch.cluster.metadata.IndexMetadata;
+import org.opensearch.cluster.metadata.Metadata;
+import org.opensearch.cluster.service.ClusterService;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.index.IndexSettings;
+import org.opensearch.sql.executor.QueryType;
+import org.opensearch.transport.client.node.NodeClient;
+
+/**
+ * Tests for analytics index routing in RestUnifiedQueryAction. Routing requires both {@code
+ * index.pluggable.dataformat.enabled=true} and {@code index.pluggable.dataformat=composite}.
+ */
+public class RestUnifiedQueryActionTest {
+
+ private ClusterService clusterService;
+ private Metadata metadata;
+ private RestUnifiedQueryAction action;
+
+ @Before
+ public void setUp() {
+ clusterService = mock(ClusterService.class);
+ ClusterState clusterState = mock(ClusterState.class);
+ metadata = mock(Metadata.class);
+ when(clusterService.state()).thenReturn(clusterState);
+ when(clusterState.metadata()).thenReturn(metadata);
+
+ @SuppressWarnings("unchecked")
+ QueryPlanExecutor> executor = mock(QueryPlanExecutor.class);
+ action =
+ new RestUnifiedQueryAction(
+ mock(NodeClient.class),
+ clusterService,
+ executor,
+ mock(org.opensearch.sql.common.setting.Settings.class));
+ }
+
+ @Test
+ public void pluggableDataformatIndexRoutesToAnalytics() {
+ registerIndex(
+ "parquet_logs",
+ Settings.builder()
+ .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true)
+ .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "composite")
+ .build());
+
+ assertTrue(action.isAnalyticsIndex("source = parquet_logs | fields ts", QueryType.PPL));
+ assertTrue(
+ action.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts", QueryType.PPL));
+ }
+
+ @Test
+ public void pluggableEnabledButLuceneFormatRoutesToLucene() {
+ registerIndex(
+ "lucene_logs",
+ Settings.builder()
+ .put(IndexSettings.PLUGGABLE_DATAFORMAT_ENABLED_SETTING.getKey(), true)
+ .put(IndexSettings.PLUGGABLE_DATAFORMAT_VALUE_SETTING.getKey(), "lucene")
+ .build());
+
+ assertFalse(action.isAnalyticsIndex("source = lucene_logs | fields ts", QueryType.PPL));
+ }
+
+ @Test
+ public void indexWithoutSettingRoutesToLucene() {
+ registerIndex("plain_logs", Settings.EMPTY);
+
+ assertFalse(action.isAnalyticsIndex("source = plain_logs | fields ts", QueryType.PPL));
+ }
+
+ @Test
+ public void missingIndexRoutesToLucene() {
+ assertFalse(action.isAnalyticsIndex("source = does_not_exist | fields ts", QueryType.PPL));
+ }
+
+ @Test
+ public void nullAndEmptyQueriesRouteToLucene() {
+ assertFalse(action.isAnalyticsIndex(null, QueryType.PPL));
+ assertFalse(action.isAnalyticsIndex("", QueryType.PPL));
+ }
+
+ private void registerIndex(String name, Settings settings) {
+ IndexMetadata indexMetadata = mock(IndexMetadata.class);
+ when(indexMetadata.getSettings()).thenReturn(settings);
+ when(metadata.index(name)).thenReturn(indexMetadata);
+ }
+}