From f1956deacf7edf9d013948f204b5d0e0ca509900 Mon Sep 17 00:00:00 2001 From: bharath-techie Date: Thu, 7 May 2026 15:56:17 +0530 Subject: [PATCH 1/2] making tests pass and fixing arrow stream alignment issue Signed-off-by: bharath-techie --- .../analytics-backend-datafusion/build.gradle | 45 ----- .../rust/src/api.rs | 7 +- .../be/datafusion/CoordinatorReduceIT.java | 164 ------------------ .../CoordinatorReduceMemtableIT.java | 135 -------------- .../StreamingCoordinatorReduceIT.java | 145 ---------------- sandbox/qa/analytics-engine-rest/build.gradle | 83 +++++++-- .../analytics/qa/CoordinatorReduceIT.java | 128 ++++++++++++++ .../qa/CoordinatorReduceMemtableIT.java | 111 ++++++++++++ .../qa/StreamingCoordinatorReduceIT.java | 118 +++++++++++++ 9 files changed, 429 insertions(+), 507 deletions(-) delete mode 100644 sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java delete mode 100644 sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceMemtableIT.java delete mode 100644 sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/StreamingCoordinatorReduceIT.java create mode 100644 sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceIT.java create mode 100644 sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceMemtableIT.java create mode 100644 sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamingCoordinatorReduceIT.java diff --git a/sandbox/plugins/analytics-backend-datafusion/build.gradle b/sandbox/plugins/analytics-backend-datafusion/build.gradle index e2504d6f49c55..d382603186f40 100644 --- a/sandbox/plugins/analytics-backend-datafusion/build.gradle +++ b/sandbox/plugins/analytics-backend-datafusion/build.gradle @@ -6,12 +6,6 @@ * compatible open source license. */ -apply plugin: 'opensearch.internal-cluster-test' - -// SQL Unified Query API version (aligned with OpenSearch build version) — required by -// the PPL transport plugin used in CoordinatorReduceIT. -def sqlUnifiedQueryVersion = '3.6.0.0-SNAPSHOT' - opensearchplugin { description = 'DataFusion native execution engine plugin for the query engine.' classname = 'org.opensearch.be.datafusion.DataFusionPlugin' @@ -85,25 +79,6 @@ dependencies { testImplementation project(':sandbox:plugins:analytics-backend-lucene') testCompileOnly 'org.immutables:value-annotations:2.8.8' - // ── internalClusterTest: end-to-end coordinator-reduce IT ─────────────────── - // Pulls in every sibling plugin needed to construct a parquet-backed composite - // index, dispatch a multi-shard PPL aggregate, and exercise DatafusionReduceSink. - internalClusterTestImplementation project(':sandbox:plugins:analytics-engine') - internalClusterTestImplementation project(':sandbox:plugins:parquet-data-format') - internalClusterTestImplementation project(':sandbox:plugins:composite-engine') - internalClusterTestImplementation project(':sandbox:plugins:analytics-backend-lucene') - internalClusterTestImplementation project(':plugins:arrow-flight-rpc') - internalClusterTestImplementation("org.opensearch.query:unified-query-api:${sqlUnifiedQueryVersion}") { - exclude group: 'org.opensearch' - } - internalClusterTestImplementation("org.opensearch.query:unified-query-core:${sqlUnifiedQueryVersion}") { - exclude group: 'org.opensearch' - } - internalClusterTestImplementation("org.opensearch.query:unified-query-ppl:${sqlUnifiedQueryVersion}") { - exclude group: 'org.opensearch' - } - // PPL front-end plugin — provides UnifiedPPLExecuteAction transport action used by the IT. - internalClusterTestImplementation project(':sandbox:plugins:test-ppl-frontend') } test { @@ -169,30 +144,10 @@ task cargoTest(type: Exec) { check.dependsOn cargoTest -internalClusterTest { - jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' - jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED' - jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED' - jvmArgs '--enable-native-access=ALL-UNNAMED' - jvmArgs '-Darrow.memory.debug.allocator=false' - jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"] - systemProperty 'io.netty.allocator.numDirectArenas', '1' - systemProperty 'io.netty.noUnsafe', 'false' - systemProperty 'io.netty.tryUnsafe', 'true' - systemProperty 'io.netty.tryReflectionSetAccessible', 'true' - systemProperty 'native.lib.path', project(':sandbox:libs:dataformat-native').ext.nativeLibPath.absolutePath - dependsOn ':sandbox:libs:dataformat-native:buildRustLibrary' -} - configurations.all { - // okhttp-aws-signer is a transitive dep of unified-query-common (via unified-query-core), - // only published on JitPack, not needed for PPL parsing/planning exclude group: 'com.github.babbel', module: 'okhttp-aws-signer' resolutionStrategy { - // Align transitive versions with OpenSearch's managed versions — required because the - // unified-query-* artifacts pull in older versions of common libs that conflict with - // OpenSearch's enforced versions on internalClusterTest classpath. force 'com.google.guava:guava:33.4.0-jre' force 'com.google.guava:failureaccess:1.0.2' force 'com.google.errorprone:error_prone_annotations:2.36.0' diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs index 86856ed0c5f08..87cc0fd4d4181 100644 --- a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs +++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs @@ -632,10 +632,15 @@ pub unsafe fn sender_send( // `from_ffi` takes the array by value (consumes it) and the schema by // reference (it is still dropped when `ffi_schema` goes out of scope). - let array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| { + let mut array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| { DataFusionError::Execution(format!("Failed to import Arrow C Data array: {}", e)) })?; + // Buffers from Java's Flight RPC deserialization may not meet Rust's + // native alignment requirements. align_buffers() is a no-op for + // already-aligned buffers; only misaligned ones are reallocated. + array_data.align_buffers(); + let struct_array = StructArray::from(array_data); let batch = RecordBatch::from(struct_array); diff --git a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java b/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java deleted file mode 100644 index 5cd4e39ca07c5..0000000000000 --- a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceIT.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.be.datafusion; - -import org.opensearch.Version; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.analytics.AnalyticsPlugin; -import org.opensearch.arrow.flight.transport.FlightStreamPlugin; -import org.opensearch.be.lucene.LucenePlugin; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.composite.CompositeDataFormatPlugin; -import org.opensearch.parquet.ParquetDataFormatPlugin; -import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.PluginInfo; -import org.opensearch.ppl.TestPPLPlugin; -import org.opensearch.ppl.action.PPLRequest; -import org.opensearch.ppl.action.PPLResponse; -import org.opensearch.ppl.action.UnifiedPPLExecuteAction; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * End-to-end smoke test for the streaming coordinator-reduce path: - * - *
- *   PPL → planner → multi-shard SHARD_FRAGMENT dispatch → DataFusion shard scan
- *       → ExchangeSink.feed → DatafusionReduceSink (Substrait SUM via convertFinalAggFragment)
- *       → drain → downstream → assembled PPLResponse
- * 
- * - *

Builds a parquet-backed composite index with two shards, indexes a small - * deterministic dataset, then runs a {@code stats sum(value) as total} aggregate. - * The total is a function of the indexed values × shard count; any drift in - * shard fan-out, sink wiring, or final-agg merge will show up as a mismatch. - */ -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) -public class CoordinatorReduceIT extends OpenSearchIntegTestCase { - - private static final String INDEX = "coord_reduce_e2e"; - private static final int NUM_SHARDS = 2; - private static final int DOCS_PER_SHARD = 10; - /** Constant `value` for every doc — picks a deterministic SUM independent of shard routing. */ - private static final int VALUE = 7; - - @Override - protected Collection> nodePlugins() { - // Plugins with no extendedPlugins requirement go here. Plugins that need - // explicit extendedPlugins (so SPI ExtensionLoader walks the right parent - // classloader) are declared in additionalNodePlugins() below. - return List.of(TestPPLPlugin.class, FlightStreamPlugin.class, CompositeDataFormatPlugin.class); - } - - @Override - protected Collection additionalNodePlugins() { - // OpenSearchIntegTestCase's nodePlugins() builds PluginInfo with empty - // extendedPlugins, which breaks ExtensiblePlugin.loadExtensions(...) for - // plugins like DataFusionPlugin that ride on AnalyticsPlugin's SPI. Use - // additionalNodePlugins() to declare the parent relationships explicitly. - return List.of( - classpathPlugin(AnalyticsPlugin.class, Collections.emptyList()), - classpathPlugin(ParquetDataFormatPlugin.class, Collections.emptyList()), - classpathPlugin(DataFusionPlugin.class, List.of(AnalyticsPlugin.class.getName())), - classpathPlugin(LucenePlugin.class, List.of(AnalyticsPlugin.class.getName())) - ); - } - - private static PluginInfo classpathPlugin(Class pluginClass, List extendedPlugins) { - return new PluginInfo( - pluginClass.getName(), - "classpath plugin", - "NA", - Version.CURRENT, - "1.8", - pluginClass.getName(), - null, - extendedPlugins, - false - ); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true) - // STREAM_TRANSPORT (Arrow Flight RPC for shard→coordinator response streaming) - // is intentionally NOT enabled here. With it on, AnalyticsSearchTransportService - // routes all sendChildRequest calls through StreamTransportService whose connection - // profile only carries stream channels, breaking the non-stream fragment dispatch - // request. The non-stream path is enough for this IT's small-result SUM aggregate. - .build(); - } - - /** - * {@code source = T | stats sum(value) as total} on a 2-shard parquet-backed index - * → coordinator-reduce path runs the final SUM via {@link DatafusionReduceSink} - * and returns the deterministic total. - */ - public void testScalarSumAcrossShards() throws Exception { - createParquetBackedIndex(); - indexDeterministicDocs(); - - PPLResponse response = executePPL("source = " + INDEX + " | stats sum(value) as total"); - - assertNotNull("PPLResponse must not be null", response); - assertTrue("columns must contain 'total', got " + response.getColumns(), response.getColumns().contains("total")); - assertEquals("scalar agg must return exactly 1 row", 1, response.getRows().size()); - - int idx = response.getColumns().indexOf("total"); - Object cell = response.getRows().get(0)[idx]; - assertNotNull("SUM(value) cell must not be null — coordinator-reduce returned no value", cell); - long actual = ((Number) cell).longValue(); - long expected = (long) VALUE * NUM_SHARDS * DOCS_PER_SHARD; - assertEquals( - "SUM(value) across " + NUM_SHARDS + " shards × " + DOCS_PER_SHARD + " docs × value=" + VALUE + " = " + expected, - expected, - actual - ); - } - - private void createParquetBackedIndex() { - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.pluggable.dataformat.enabled", true) - .put("index.pluggable.dataformat", "composite") - .put("index.composite.primary_data_format", "parquet") - .putList("index.composite.secondary_data_formats") - .build(); - - CreateIndexResponse response = client().admin() - .indices() - .prepareCreate(INDEX) - .setSettings(indexSettings) - .setMapping("value", "type=integer") - .get(); - assertTrue("index creation must be acknowledged", response.isAcknowledged()); - ensureGreen(INDEX); - } - - private void indexDeterministicDocs() { - int total = NUM_SHARDS * DOCS_PER_SHARD; - for (int i = 0; i < total; i++) { - client().prepareIndex(INDEX).setId(String.valueOf(i)).setSource("value", VALUE).get(); - } - client().admin().indices().prepareRefresh(INDEX).get(); - client().admin().indices().prepareFlush(INDEX).get(); - } - - private PPLResponse executePPL(String ppl) { - return client().execute(UnifiedPPLExecuteAction.INSTANCE, new PPLRequest(ppl)).actionGet(); - } -} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceMemtableIT.java b/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceMemtableIT.java deleted file mode 100644 index 438742158d990..0000000000000 --- a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/CoordinatorReduceMemtableIT.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.be.datafusion; - -import org.opensearch.Version; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.analytics.AnalyticsPlugin; -import org.opensearch.arrow.flight.transport.FlightStreamPlugin; -import org.opensearch.be.lucene.LucenePlugin; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.composite.CompositeDataFormatPlugin; -import org.opensearch.parquet.ParquetDataFormatPlugin; -import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.PluginInfo; -import org.opensearch.ppl.TestPPLPlugin; -import org.opensearch.ppl.action.PPLRequest; -import org.opensearch.ppl.action.PPLResponse; -import org.opensearch.ppl.action.UnifiedPPLExecuteAction; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -/** - * Memtable variant of {@link CoordinatorReduceIT}. Identical query and assertion, but the cluster - * starts with {@code datafusion.reduce.input_mode=memtable} so the coordinator-reduce path uses - * {@link DatafusionMemtableReduceSink} instead of the streaming sink. Verifies the sink dispatch - * wiring in {@link DataFusionAnalyticsBackendPlugin#getExchangeSinkProvider} and the buffered - * memtable handoff against a real multi-shard scan. - */ -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) -public class CoordinatorReduceMemtableIT extends OpenSearchIntegTestCase { - - private static final String INDEX = "coord_reduce_memtable_e2e"; - private static final int NUM_SHARDS = 2; - private static final int DOCS_PER_SHARD = 10; - private static final int VALUE = 7; - - @Override - protected Collection> nodePlugins() { - return List.of(TestPPLPlugin.class, FlightStreamPlugin.class, CompositeDataFormatPlugin.class, LucenePlugin.class); - } - - @Override - protected Collection additionalNodePlugins() { - return List.of( - classpathPlugin(AnalyticsPlugin.class, Collections.emptyList()), - classpathPlugin(ParquetDataFormatPlugin.class, Collections.emptyList()), - classpathPlugin(DataFusionPlugin.class, List.of(AnalyticsPlugin.class.getName())) - ); - } - - private static PluginInfo classpathPlugin(Class pluginClass, List extendedPlugins) { - return new PluginInfo( - pluginClass.getName(), - "classpath plugin", - "NA", - Version.CURRENT, - "1.8", - pluginClass.getName(), - null, - extendedPlugins, - false - ); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true) - .put(DataFusionPlugin.DATAFUSION_REDUCE_INPUT_MODE.getKey(), "memtable") - .build(); - } - - public void testScalarSumAcrossShardsViaMemtable() throws Exception { - createParquetBackedIndex(); - indexDeterministicDocs(); - - PPLResponse response = executePPL("source = " + INDEX + " | stats sum(value) as total"); - - assertNotNull("PPLResponse must not be null", response); - assertTrue("columns must contain 'total', got " + response.getColumns(), response.getColumns().contains("total")); - assertEquals("scalar agg must return exactly 1 row", 1, response.getRows().size()); - - int idx = response.getColumns().indexOf("total"); - Object cell = response.getRows().get(0)[idx]; - assertNotNull("SUM(value) cell must not be null — memtable coordinator-reduce returned no value", cell); - long actual = ((Number) cell).longValue(); - long expected = (long) VALUE * NUM_SHARDS * DOCS_PER_SHARD; - assertEquals("SUM(value) memtable path must match streaming path", expected, actual); - } - - private void createParquetBackedIndex() { - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.pluggable.dataformat.enabled", true) - .put("index.pluggable.dataformat", "composite") - .put("index.composite.primary_data_format", "parquet") - .putList("index.composite.secondary_data_formats") - .build(); - - CreateIndexResponse response = client().admin() - .indices() - .prepareCreate(INDEX) - .setSettings(indexSettings) - .setMapping("value", "type=integer") - .get(); - assertTrue("index creation must be acknowledged", response.isAcknowledged()); - ensureGreen(INDEX); - } - - private void indexDeterministicDocs() { - int total = NUM_SHARDS * DOCS_PER_SHARD; - for (int i = 0; i < total; i++) { - client().prepareIndex(INDEX).setId(String.valueOf(i)).setSource("value", VALUE).get(); - } - client().admin().indices().prepareRefresh(INDEX).get(); - client().admin().indices().prepareFlush(INDEX).get(); - } - - private PPLResponse executePPL(String ppl) { - return client().execute(UnifiedPPLExecuteAction.INSTANCE, new PPLRequest(ppl)).actionGet(); - } -} diff --git a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/StreamingCoordinatorReduceIT.java b/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/StreamingCoordinatorReduceIT.java deleted file mode 100644 index 6ac06cc18536b..0000000000000 --- a/sandbox/plugins/analytics-backend-datafusion/src/internalClusterTest/java/org/opensearch/be/datafusion/StreamingCoordinatorReduceIT.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.be.datafusion; - -import org.opensearch.Version; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.analytics.AnalyticsPlugin; -import org.opensearch.arrow.flight.transport.FlightStreamPlugin; -import org.opensearch.be.lucene.LucenePlugin; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.FeatureFlags; -import org.opensearch.composite.CompositeDataFormatPlugin; -import org.opensearch.parquet.ParquetDataFormatPlugin; -import org.opensearch.plugins.Plugin; -import org.opensearch.plugins.PluginInfo; -import org.opensearch.ppl.TestPPLPlugin; -import org.opensearch.ppl.action.PPLRequest; -import org.opensearch.ppl.action.PPLResponse; -import org.opensearch.ppl.action.UnifiedPPLExecuteAction; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT; - -/** - * Streaming variant of {@link CoordinatorReduceIT}: same 2-shard parquet-backed index and deterministic - * dataset, but with Arrow Flight RPC enabled via {@link FeatureFlags#STREAM_TRANSPORT}. Exercises the - * shard-fragment → Flight → {@code DatafusionReduceSink.feed} handoff that previously failed with - * {@code "A buffer can only be associated between two allocators that share the same root"} on - * multi-shard queries. - */ -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) -public class StreamingCoordinatorReduceIT extends OpenSearchIntegTestCase { - - private static final String INDEX = "coord_reduce_streaming_e2e"; - private static final int NUM_SHARDS = 2; - private static final int DOCS_PER_SHARD = 10; - /** Constant `value` for every doc — deterministic assertion independent of shard routing. */ - private static final int VALUE = 7; - - @Override - protected Collection> nodePlugins() { - return List.of(TestPPLPlugin.class, FlightStreamPlugin.class, CompositeDataFormatPlugin.class, LucenePlugin.class); - } - - @Override - protected Collection additionalNodePlugins() { - return List.of( - classpathPlugin(AnalyticsPlugin.class, Collections.emptyList()), - classpathPlugin(ParquetDataFormatPlugin.class, Collections.emptyList()), - classpathPlugin(DataFusionPlugin.class, List.of(AnalyticsPlugin.class.getName())) - ); - } - - private static PluginInfo classpathPlugin(Class pluginClass, List extendedPlugins) { - return new PluginInfo( - pluginClass.getName(), - "classpath plugin", - "NA", - Version.CURRENT, - "1.8", - pluginClass.getName(), - null, - extendedPlugins, - false - ); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(FeatureFlags.PLUGGABLE_DATAFORMAT_EXPERIMENTAL_FLAG, true) - .build(); - } - - /** - * {@code source = T} on a 2-shard parquet-backed index with streaming enabled exercises the - * coordinator reduce sink's cross-plugin VectorSchemaRoot handoff. Before the allocator-root - * unification fix, this failed with an Arrow {@code associate} mismatch. - */ - @LockFeatureFlag(STREAM_TRANSPORT) - public void testBaselineScanAcrossShards() throws Exception { - createParquetBackedIndex(); - indexDeterministicDocs(); - - PPLResponse response = executePPL("source = " + INDEX); - - assertNotNull("PPLResponse must not be null", response); - assertTrue("columns must contain 'value', got " + response.getColumns(), response.getColumns().contains("value")); - - int expectedRows = NUM_SHARDS * DOCS_PER_SHARD; - assertEquals("all docs across shards must be returned", expectedRows, response.getRows().size()); - - int idx = response.getColumns().indexOf("value"); - for (Object[] row : response.getRows()) { - Object cell = row[idx]; - assertNotNull("value cell must not be null", cell); - assertEquals("every doc has value=" + VALUE, (long) VALUE, ((Number) cell).longValue()); - } - } - - private void createParquetBackedIndex() { - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.pluggable.dataformat.enabled", true) - .put("index.pluggable.dataformat", "composite") - .put("index.composite.primary_data_format", "parquet") - .putList("index.composite.secondary_data_formats") - .build(); - - CreateIndexResponse response = client().admin() - .indices() - .prepareCreate(INDEX) - .setSettings(indexSettings) - .setMapping("value", "type=integer") - .get(); - assertTrue("index creation must be acknowledged", response.isAcknowledged()); - ensureGreen(INDEX); - } - - private void indexDeterministicDocs() { - int total = NUM_SHARDS * DOCS_PER_SHARD; - for (int i = 0; i < total; i++) { - client().prepareIndex(INDEX).setId(String.valueOf(i)).setSource("value", VALUE).get(); - } - client().admin().indices().prepareRefresh(INDEX).get(); - client().admin().indices().prepareFlush(INDEX).get(); - } - - private PPLResponse executePPL(String ppl) { - return client().execute(UnifiedPPLExecuteAction.INSTANCE, new PPLRequest(ppl)).actionGet(); - } -} diff --git a/sandbox/qa/analytics-engine-rest/build.gradle b/sandbox/qa/analytics-engine-rest/build.gradle index d636582c7b911..534851ec62d43 100644 --- a/sandbox/qa/analytics-engine-rest/build.gradle +++ b/sandbox/qa/analytics-engine-rest/build.gradle @@ -6,6 +6,8 @@ * compatible open source license. */ +import org.opensearch.gradle.test.RestIntegTestTask + apply plugin: 'opensearch.testclusters' apply plugin: 'opensearch.standalone-rest-test' apply plugin: 'opensearch.rest-test' @@ -30,37 +32,84 @@ dependencies { testImplementation project(':sandbox:plugins:test-ppl-frontend') } -testClusters.integTest { - plugin ':plugins:arrow-flight-rpc' - plugin ':sandbox:plugins:analytics-engine' - plugin ':sandbox:plugins:analytics-backend-datafusion' - plugin ':sandbox:plugins:analytics-backend-lucene' - plugin ':sandbox:plugins:dsl-query-executor' - plugin ':sandbox:plugins:composite-engine' - plugin ':sandbox:plugins:parquet-data-format' - plugin ':sandbox:plugins:test-ppl-frontend' +// ── Shared cluster configuration closure ───────────────────────────────────── +// All test clusters share the same plugin set and JVM flags; only node count +// and feature-specific settings differ per task. +def configureAnalyticsCluster = { cluster -> + cluster.plugin ':plugins:arrow-flight-rpc' + cluster.plugin ':sandbox:plugins:analytics-engine' + cluster.plugin ':sandbox:plugins:analytics-backend-datafusion' + cluster.plugin ':sandbox:plugins:analytics-backend-lucene' + cluster.plugin ':sandbox:plugins:dsl-query-executor' + cluster.plugin ':sandbox:plugins:composite-engine' + cluster.plugin ':sandbox:plugins:parquet-data-format' + cluster.plugin ':sandbox:plugins:test-ppl-frontend' // Arrow/Flight JVM flags for DataFusion native library - jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' - jvmArgs '--enable-native-access=ALL-UNNAMED' + cluster.jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' + cluster.jvmArgs '--enable-native-access=ALL-UNNAMED' // Arrow memory allocator needs Netty unsafe access on JDK 25; mirrors // gradle/run.gradle's arrow-flight-rpc overrides so AnalyticsSearchService // can construct its RootAllocator at node start. - systemProperty 'io.netty.allocator.numDirectArenas', '1' - systemProperty 'io.netty.noUnsafe', 'false' - systemProperty 'io.netty.tryUnsafe', 'true' - systemProperty 'io.netty.tryReflectionSetAccessible', 'true' + cluster.systemProperty 'io.netty.allocator.numDirectArenas', '1' + cluster.systemProperty 'io.netty.noUnsafe', 'false' + cluster.systemProperty 'io.netty.tryUnsafe', 'true' + cluster.systemProperty 'io.netty.tryReflectionSetAccessible', 'true' // Native library path for DataFusion - systemProperty 'java.library.path', "${project(':sandbox:libs:dataformat-native').ext.nativeLibPath.parent}" + cluster.systemProperty 'java.library.path', "${project(':sandbox:libs:dataformat-native').ext.nativeLibPath.parent}" // Enable pluggable dataformat feature flag - systemProperty 'opensearch.experimental.feature.pluggable.dataformat.enabled', 'true' + cluster.systemProperty 'opensearch.experimental.feature.pluggable.dataformat.enabled', 'true' +} + +// ── Default integTest cluster: 2 nodes, no streaming ───────────────────────── +testClusters.integTest { + numberOfNodes = 2 + configureAnalyticsCluster(delegate) } integTest { systemProperty 'tests.security.manager', 'false' + exclude '**/CoordinatorReduceMemtableIT.class' + exclude '**/StreamingCoordinatorReduceIT.class' +} + +// ── Memtable variant: 2 nodes, datafusion.reduce.input_mode=memtable ───────── +task integTestMemtable(type: RestIntegTestTask) { + description = 'Runs coordinator-reduce tests with memtable sink mode' + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + filter { + includeTestsMatching 'org.opensearch.analytics.qa.CoordinatorReduceMemtableIT' + } + systemProperty 'tests.security.manager', 'false' +} +check.dependsOn(integTestMemtable) + +testClusters.integTestMemtable { + numberOfNodes = 2 + configureAnalyticsCluster(delegate) + setting 'datafusion.reduce.input_mode', 'memtable' +} + +// ── Streaming variant: 2 nodes, Arrow Flight stream transport enabled ──────── +task integTestStreaming(type: RestIntegTestTask) { + description = 'Runs coordinator-reduce tests with Arrow Flight streaming' + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + filter { + includeTestsMatching 'org.opensearch.analytics.qa.StreamingCoordinatorReduceIT' + } + systemProperty 'tests.security.manager', 'false' +} +check.dependsOn(integTestStreaming) + +testClusters.integTestStreaming { + numberOfNodes = 2 + configureAnalyticsCluster(delegate) + systemProperty 'opensearch.experimental.feature.transport.stream.enabled', 'true' } // Run against an external cluster (no testClusters lifecycle): diff --git a/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceIT.java b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceIT.java new file mode 100644 index 0000000000000..3f472a15ae826 --- /dev/null +++ b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceIT.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.qa; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; + +import java.util.List; +import java.util.Map; + +/** + * End-to-end smoke test for the streaming coordinator-reduce path: + * + *

+ *   PPL → planner → multi-shard SHARD_FRAGMENT dispatch → DataFusion shard scan
+ *       → ExchangeSink.feed → DatafusionReduceSink (Substrait SUM via convertFinalAggFragment)
+ *       → drain → downstream → assembled PPLResponse
+ * 
+ * + *

Builds a parquet-backed composite index with two shards, indexes a small + * deterministic dataset, then runs a {@code stats sum(value) as total} aggregate. + * The total is a function of the indexed values × shard count; any drift in + * shard fan-out, sink wiring, or final-agg merge will show up as a mismatch. + * + *

Requires a 2-node cluster (configured in build.gradle) so that shards + * are distributed across nodes, exercising the coordinator-reduce path. + */ +public class CoordinatorReduceIT extends AnalyticsRestTestCase { + + private static final String INDEX = "coord_reduce_e2e"; + private static final int NUM_SHARDS = 2; + private static final int DOCS_PER_SHARD = 10; + private static final int VALUE = 7; + + /** + * {@code source = T | stats sum(value) as total} on a 2-shard parquet-backed index + * → coordinator-reduce path runs the final SUM via DatafusionReduceSink + * and returns the deterministic total. + */ + public void testScalarSumAcrossShards() throws Exception { + createParquetBackedIndex(); + indexDeterministicDocs(); + + Map result = executePPL("source = " + INDEX + " | stats sum(value) as total"); + + @SuppressWarnings("unchecked") + List columns = (List) result.get("columns"); + assertNotNull("columns must not be null", columns); + assertTrue("columns must contain 'total', got " + columns, columns.contains("total")); + + @SuppressWarnings("unchecked") + List> rows = (List>) result.get("rows"); + assertNotNull("rows must not be null", rows); + assertEquals("scalar agg must return exactly 1 row", 1, rows.size()); + + int idx = columns.indexOf("total"); + Object cell = rows.get(0).get(idx); + assertNotNull("SUM(value) cell must not be null — coordinator-reduce returned no value", cell); + long actual = ((Number) cell).longValue(); + long expected = (long) VALUE * NUM_SHARDS * DOCS_PER_SHARD; + assertEquals( + "SUM(value) across " + NUM_SHARDS + " shards × " + DOCS_PER_SHARD + " docs × value=" + VALUE + " = " + expected, + expected, + actual + ); + } + + private void createParquetBackedIndex() throws Exception { + try { + client().performRequest(new Request("DELETE", "/" + INDEX)); + } catch (Exception ignored) {} + + String body = "{" + + "\"settings\": {" + + " \"number_of_shards\": " + NUM_SHARDS + "," + + " \"number_of_replicas\": 0," + + " \"index.pluggable.dataformat.enabled\": true," + + " \"index.pluggable.dataformat\": \"composite\"," + + " \"index.composite.primary_data_format\": \"parquet\"," + + " \"index.composite.secondary_data_formats\": \"\"" + + "}," + + "\"mappings\": {" + + " \"properties\": {" + + " \"value\": { \"type\": \"integer\" }" + + " }" + + "}" + + "}"; + + Request createIndex = new Request("PUT", "/" + INDEX); + createIndex.setJsonEntity(body); + Map response = assertOkAndParse(client().performRequest(createIndex), "Create index"); + assertEquals("index creation must be acknowledged", true, response.get("acknowledged")); + + Request health = new Request("GET", "/_cluster/health/" + INDEX); + health.addParameter("wait_for_status", "green"); + health.addParameter("timeout", "30s"); + client().performRequest(health); + } + + private void indexDeterministicDocs() throws Exception { + int total = NUM_SHARDS * DOCS_PER_SHARD; + StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < total; i++) { + bulk.append("{\"index\": {\"_id\": \"").append(i).append("\"}}\n"); + bulk.append("{\"value\": ").append(VALUE).append("}\n"); + } + + Request bulkRequest = new Request("POST", "/" + INDEX + "/_bulk"); + bulkRequest.setJsonEntity(bulk.toString()); + bulkRequest.addParameter("refresh", "true"); + client().performRequest(bulkRequest); + + client().performRequest(new Request("POST", "/" + INDEX + "/_flush?force=true")); + } + + private Map executePPL(String ppl) throws Exception { + Request request = new Request("POST", "/_analytics/ppl"); + request.setJsonEntity("{\"query\": \"" + ppl + "\"}"); + Response response = client().performRequest(request); + return entityAsMap(response); + } +} diff --git a/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceMemtableIT.java b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceMemtableIT.java new file mode 100644 index 0000000000000..d0d4d31d70128 --- /dev/null +++ b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/CoordinatorReduceMemtableIT.java @@ -0,0 +1,111 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.qa; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; + +import java.util.List; +import java.util.Map; + +/** + * Memtable variant of {@link CoordinatorReduceIT}. Identical query and assertion, but the cluster + * starts with {@code datafusion.reduce.input_mode=memtable} so the coordinator-reduce path uses + * DatafusionMemtableReduceSink instead of the streaming sink. Verifies the sink dispatch + * wiring and the buffered memtable handoff against a real multi-shard scan. + * + *

Requires a dedicated cluster configuration with {@code datafusion.reduce.input_mode=memtable} + * (configured via the {@code integTestMemtable} task in build.gradle). + */ +public class CoordinatorReduceMemtableIT extends AnalyticsRestTestCase { + + private static final String INDEX = "coord_reduce_memtable_e2e"; + private static final int NUM_SHARDS = 2; + private static final int DOCS_PER_SHARD = 10; + private static final int VALUE = 7; + + public void testScalarSumAcrossShardsViaMemtable() throws Exception { + createParquetBackedIndex(); + indexDeterministicDocs(); + + Map result = executePPL("source = " + INDEX + " | stats sum(value) as total"); + + @SuppressWarnings("unchecked") + List columns = (List) result.get("columns"); + assertNotNull("columns must not be null", columns); + assertTrue("columns must contain 'total', got " + columns, columns.contains("total")); + + @SuppressWarnings("unchecked") + List> rows = (List>) result.get("rows"); + assertNotNull("rows must not be null", rows); + assertEquals("scalar agg must return exactly 1 row", 1, rows.size()); + + int idx = columns.indexOf("total"); + Object cell = rows.get(0).get(idx); + assertNotNull("SUM(value) cell must not be null — memtable coordinator-reduce returned no value", cell); + long actual = ((Number) cell).longValue(); + long expected = (long) VALUE * NUM_SHARDS * DOCS_PER_SHARD; + assertEquals("SUM(value) memtable path must match streaming path", expected, actual); + } + + private void createParquetBackedIndex() throws Exception { + try { + client().performRequest(new Request("DELETE", "/" + INDEX)); + } catch (Exception ignored) {} + + String body = "{" + + "\"settings\": {" + + " \"number_of_shards\": " + NUM_SHARDS + "," + + " \"number_of_replicas\": 0," + + " \"index.pluggable.dataformat.enabled\": true," + + " \"index.pluggable.dataformat\": \"composite\"," + + " \"index.composite.primary_data_format\": \"parquet\"," + + " \"index.composite.secondary_data_formats\": \"\"" + + "}," + + "\"mappings\": {" + + " \"properties\": {" + + " \"value\": { \"type\": \"integer\" }" + + " }" + + "}" + + "}"; + + Request createIndex = new Request("PUT", "/" + INDEX); + createIndex.setJsonEntity(body); + Map response = assertOkAndParse(client().performRequest(createIndex), "Create index"); + assertEquals("index creation must be acknowledged", true, response.get("acknowledged")); + + Request health = new Request("GET", "/_cluster/health/" + INDEX); + health.addParameter("wait_for_status", "green"); + health.addParameter("timeout", "30s"); + client().performRequest(health); + } + + private void indexDeterministicDocs() throws Exception { + int total = NUM_SHARDS * DOCS_PER_SHARD; + StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < total; i++) { + bulk.append("{\"index\": {\"_id\": \"").append(i).append("\"}}\n"); + bulk.append("{\"value\": ").append(VALUE).append("}\n"); + } + + Request bulkRequest = new Request("POST", "/" + INDEX + "/_bulk"); + bulkRequest.setJsonEntity(bulk.toString()); + bulkRequest.addParameter("refresh", "true"); + client().performRequest(bulkRequest); + + client().performRequest(new Request("POST", "/" + INDEX + "/_flush?force=true")); + } + + private Map executePPL(String ppl) throws Exception { + Request request = new Request("POST", "/_analytics/ppl"); + request.setJsonEntity("{\"query\": \"" + ppl + "\"}"); + Response response = client().performRequest(request); + return entityAsMap(response); + } +} diff --git a/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamingCoordinatorReduceIT.java b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamingCoordinatorReduceIT.java new file mode 100644 index 0000000000000..f3133e3c7e2e4 --- /dev/null +++ b/sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/StreamingCoordinatorReduceIT.java @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.analytics.qa; + +import org.opensearch.client.Request; +import org.opensearch.client.Response; + +import java.util.List; +import java.util.Map; + +/** + * Streaming variant of {@link CoordinatorReduceIT}: same 2-shard parquet-backed index and + * deterministic dataset, but with Arrow Flight RPC streaming enabled. Exercises the + * shard-fragment → Flight → DatafusionReduceSink.feed handoff that previously failed with + * "A buffer can only be associated between two allocators that share the same root" on + * multi-shard queries. + * + *

Requires a dedicated cluster configuration with the stream transport feature flag enabled + * (configured via the {@code integTestStreaming} task in build.gradle). + */ +public class StreamingCoordinatorReduceIT extends AnalyticsRestTestCase { + + private static final String INDEX = "coord_reduce_streaming_e2e"; + private static final int NUM_SHARDS = 2; + private static final int DOCS_PER_SHARD = 10; + private static final int VALUE = 7; + + /** + * {@code source = T} on a 2-shard parquet-backed index with streaming enabled exercises the + * coordinator reduce sink's cross-plugin VectorSchemaRoot handoff. + */ + public void testBaselineScanAcrossShards() throws Exception { + createParquetBackedIndex(); + indexDeterministicDocs(); + + Map result = executePPL("source = " + INDEX); + + @SuppressWarnings("unchecked") + List columns = (List) result.get("columns"); + assertNotNull("columns must not be null", columns); + assertTrue("columns must contain 'value', got " + columns, columns.contains("value")); + + @SuppressWarnings("unchecked") + List> rows = (List>) result.get("rows"); + assertNotNull("rows must not be null", rows); + + int expectedRows = NUM_SHARDS * DOCS_PER_SHARD; + assertEquals("all docs across shards must be returned", expectedRows, rows.size()); + + int idx = columns.indexOf("value"); + for (List row : rows) { + Object cell = row.get(idx); + assertNotNull("value cell must not be null", cell); + assertEquals("every doc has value=" + VALUE, (long) VALUE, ((Number) cell).longValue()); + } + } + + private void createParquetBackedIndex() throws Exception { + try { + client().performRequest(new Request("DELETE", "/" + INDEX)); + } catch (Exception ignored) {} + + String body = "{" + + "\"settings\": {" + + " \"number_of_shards\": " + NUM_SHARDS + "," + + " \"number_of_replicas\": 0," + + " \"index.pluggable.dataformat.enabled\": true," + + " \"index.pluggable.dataformat\": \"composite\"," + + " \"index.composite.primary_data_format\": \"parquet\"," + + " \"index.composite.secondary_data_formats\": \"\"" + + "}," + + "\"mappings\": {" + + " \"properties\": {" + + " \"value\": { \"type\": \"integer\" }" + + " }" + + "}" + + "}"; + + Request createIndex = new Request("PUT", "/" + INDEX); + createIndex.setJsonEntity(body); + Map response = assertOkAndParse(client().performRequest(createIndex), "Create index"); + assertEquals("index creation must be acknowledged", true, response.get("acknowledged")); + + Request health = new Request("GET", "/_cluster/health/" + INDEX); + health.addParameter("wait_for_status", "green"); + health.addParameter("timeout", "30s"); + client().performRequest(health); + } + + private void indexDeterministicDocs() throws Exception { + int total = NUM_SHARDS * DOCS_PER_SHARD; + StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < total; i++) { + bulk.append("{\"index\": {\"_id\": \"").append(i).append("\"}}\n"); + bulk.append("{\"value\": ").append(VALUE).append("}\n"); + } + + Request bulkRequest = new Request("POST", "/" + INDEX + "/_bulk"); + bulkRequest.setJsonEntity(bulk.toString()); + bulkRequest.addParameter("refresh", "true"); + client().performRequest(bulkRequest); + + client().performRequest(new Request("POST", "/" + INDEX + "/_flush?force=true")); + } + + private Map executePPL(String ppl) throws Exception { + Request request = new Request("POST", "/_analytics/ppl"); + request.setJsonEntity("{\"query\": \"" + ppl + "\"}"); + Response response = client().performRequest(request); + return entityAsMap(response); + } +} From a2f209c2e5eafe1f999399fb82550646644d5067 Mon Sep 17 00:00:00 2001 From: bharath-techie Date: Thu, 7 May 2026 22:14:42 +0530 Subject: [PATCH 2/2] fixes for handling session context Signed-off-by: bharath-techie --- .../rust/src/api.rs | 17 +++++++++++++++++ .../rust/src/ffm.rs | 17 +++-------------- .../rust/src/indexed_executor.rs | 2 +- .../rust/src/query_executor.rs | 2 +- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs index 87cc0fd4d4181..329bd810d3bea 100644 --- a/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs +++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs @@ -69,6 +69,10 @@ pub struct QueryStreamHandle { /// Held for its `Drop` impl — marks the query completed when the /// stream is closed. _query_tracking_context: QueryTrackingContext, + /// Keeps the SessionContext alive while the stream is being consumed. + /// The physical plan may reference state (e.g. RuntimeEnv, caches) owned + /// by the session; dropping it prematurely causes use-after-free. + _session_ctx: Option, } impl QueryStreamHandle { @@ -79,6 +83,19 @@ impl QueryStreamHandle { Self { stream, _query_tracking_context: query_context, + _session_ctx: None, + } + } + + pub fn with_session_context( + stream: RecordBatchStreamAdapter, + query_context: QueryTrackingContext, + ctx: datafusion::prelude::SessionContext, + ) -> Self { + Self { + stream, + _query_tracking_context: query_context, + _session_ctx: Some(ctx), } } } diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs index 2c3b1af6fb3ee..bde6624c17576 100644 --- a/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs +++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs @@ -624,28 +624,17 @@ pub unsafe extern "C" fn df_execute_with_context( plan_ptr: *const u8, plan_len: i64, ) -> i64 { - // Consume the session context handle on entry. Ownership transfers here - // regardless of whether the remainder of this function succeeds, returns an - // error via `?`, or panics — RAII (or `catch_unwind` drop-during-unwind) - // drops `session_handle` and frees the underlying SessionContext resources. - // - // This matches the Java-side contract: SessionContextHandle.markConsumed() is - // invoked in a `finally` after the FFM downcall, so every observable path from - // Java's perspective ("call.invoke ran") maps to "Rust consumed the handle". - // If we were to run fallible or panic-prone code (e.g. `get_rt_manager()?`) - // before Box::from_raw, the handle would leak on those paths. let session_handle = *Box::from_raw(session_ctx_ptr as *mut crate::session_context::SessionContextHandle); let mgr = get_rt_manager()?; let plan_bytes = slice::from_raw_parts(plan_ptr, plan_len as usize); let cpu_executor = mgr.cpu_executor(); - // Route based on whether the session was configured for indexed execution - let handle_ref = &*(session_ctx_ptr as *const crate::session_context::SessionContextHandle); - if handle_ref.indexed_config.is_some() { + if session_handle.indexed_config.is_some() { + let ptr = Box::into_raw(Box::new(session_handle)) as i64; mgr.io_runtime .block_on(crate::indexed_executor::execute_indexed_with_context( - session_ctx_ptr, + ptr, plan_bytes.to_vec(), cpu_executor, )) diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs index a88e56aa8c606..7eb5c71bd28f7 100644 --- a/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs +++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_executor.rs @@ -692,6 +692,6 @@ pub async unsafe fn execute_indexed_with_context( let cross_rt_stream = CrossRtStream::new_with_df_error_stream(df_stream, cpu_executor); let schema = cross_rt_stream.schema(); let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream); - let stream_handle = crate::api::QueryStreamHandle::new(wrapped, query_context); + let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, query_context, ctx); Ok(Box::into_raw(Box::new(stream_handle)) as i64) } diff --git a/sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs b/sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs index 33d47b9bb556a..be804e1eb777e 100644 --- a/sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs +++ b/sandbox/plugins/analytics-backend-datafusion/rust/src/query_executor.rs @@ -185,6 +185,6 @@ pub async fn execute_with_context( cross_rt_stream, ); - let stream_handle = crate::api::QueryStreamHandle::new(wrapped, handle.query_context); + let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, handle.query_context, handle.ctx); Ok(Box::into_raw(Box::new(stream_handle)) as i64) }