Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 0 additions & 45 deletions sandbox/plugins/analytics-backend-datafusion/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@
* compatible open source license.
*/

apply plugin: 'opensearch.internal-cluster-test'

// SQL Unified Query API version (aligned with OpenSearch build version) — required by
// the PPL transport plugin used in CoordinatorReduceIT.
def sqlUnifiedQueryVersion = '3.6.0.0-SNAPSHOT'

opensearchplugin {
description = 'DataFusion native execution engine plugin for the query engine.'
classname = 'org.opensearch.be.datafusion.DataFusionPlugin'
Expand Down Expand Up @@ -85,25 +79,6 @@ dependencies {
testImplementation project(':sandbox:plugins:analytics-backend-lucene')
testCompileOnly 'org.immutables:value-annotations:2.8.8'

// ── internalClusterTest: end-to-end coordinator-reduce IT ───────────────────
// Pulls in every sibling plugin needed to construct a parquet-backed composite
// index, dispatch a multi-shard PPL aggregate, and exercise DatafusionReduceSink.
internalClusterTestImplementation project(':sandbox:plugins:analytics-engine')
internalClusterTestImplementation project(':sandbox:plugins:parquet-data-format')
internalClusterTestImplementation project(':sandbox:plugins:composite-engine')
internalClusterTestImplementation project(':sandbox:plugins:analytics-backend-lucene')
internalClusterTestImplementation project(':plugins:arrow-flight-rpc')
internalClusterTestImplementation("org.opensearch.query:unified-query-api:${sqlUnifiedQueryVersion}") {
exclude group: 'org.opensearch'
}
internalClusterTestImplementation("org.opensearch.query:unified-query-core:${sqlUnifiedQueryVersion}") {
exclude group: 'org.opensearch'
}
internalClusterTestImplementation("org.opensearch.query:unified-query-ppl:${sqlUnifiedQueryVersion}") {
exclude group: 'org.opensearch'
}
// PPL front-end plugin — provides UnifiedPPLExecuteAction transport action used by the IT.
internalClusterTestImplementation project(':sandbox:plugins:test-ppl-frontend')
}

test {
Expand Down Expand Up @@ -169,30 +144,10 @@ task cargoTest(type: Exec) {

check.dependsOn cargoTest

internalClusterTest {
jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED'
jvmArgs '--add-opens=java.base/java.lang=ALL-UNNAMED'
jvmArgs '--add-opens=java.base/sun.nio.ch=ALL-UNNAMED'
jvmArgs '--enable-native-access=ALL-UNNAMED'
jvmArgs '-Darrow.memory.debug.allocator=false'
jvmArgs += ["--add-opens", "java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"]
systemProperty 'io.netty.allocator.numDirectArenas', '1'
systemProperty 'io.netty.noUnsafe', 'false'
systemProperty 'io.netty.tryUnsafe', 'true'
systemProperty 'io.netty.tryReflectionSetAccessible', 'true'
systemProperty 'native.lib.path', project(':sandbox:libs:dataformat-native').ext.nativeLibPath.absolutePath
dependsOn ':sandbox:libs:dataformat-native:buildRustLibrary'
}

configurations.all {
// okhttp-aws-signer is a transitive dep of unified-query-common (via unified-query-core),
// only published on JitPack, not needed for PPL parsing/planning
exclude group: 'com.github.babbel', module: 'okhttp-aws-signer'

resolutionStrategy {
// Align transitive versions with OpenSearch's managed versions — required because the
// unified-query-* artifacts pull in older versions of common libs that conflict with
// OpenSearch's enforced versions on internalClusterTest classpath.
force 'com.google.guava:guava:33.4.0-jre'
force 'com.google.guava:failureaccess:1.0.2'
force 'com.google.errorprone:error_prone_annotations:2.36.0'
Expand Down
24 changes: 23 additions & 1 deletion sandbox/plugins/analytics-backend-datafusion/rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub struct QueryStreamHandle {
/// Held for its `Drop` impl — marks the query completed when the
/// stream is closed.
_query_tracking_context: QueryTrackingContext,
/// Keeps the SessionContext alive while the stream is being consumed.
/// The physical plan may reference state (e.g. RuntimeEnv, caches) owned
/// by the session; dropping it prematurely causes use-after-free.
_session_ctx: Option<datafusion::prelude::SessionContext>,
}

impl QueryStreamHandle {
Expand All @@ -79,6 +83,19 @@ impl QueryStreamHandle {
Self {
stream,
_query_tracking_context: query_context,
_session_ctx: None,
}
}

pub fn with_session_context(
stream: RecordBatchStreamAdapter<CrossRtStream>,
query_context: QueryTrackingContext,
ctx: datafusion::prelude::SessionContext,
) -> Self {
Self {
stream,
_query_tracking_context: query_context,
_session_ctx: Some(ctx),
}
}
}
Expand Down Expand Up @@ -632,10 +649,15 @@ pub unsafe fn sender_send(

// `from_ffi` takes the array by value (consumes it) and the schema by
// reference (it is still dropped when `ffi_schema` goes out of scope).
let array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
let mut array_data = arrow_array::ffi::from_ffi(ffi_array, &ffi_schema).map_err(|e| {
DataFusionError::Execution(format!("Failed to import Arrow C Data array: {}", e))
})?;

// Buffers from Java's Flight RPC deserialization may not meet Rust's
// native alignment requirements. align_buffers() is a no-op for
// already-aligned buffers; only misaligned ones are reallocated.
array_data.align_buffers();

let struct_array = StructArray::from(array_data);
let batch = RecordBatch::from(struct_array);

Expand Down
17 changes: 3 additions & 14 deletions sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,28 +624,17 @@ pub unsafe extern "C" fn df_execute_with_context(
plan_ptr: *const u8,
plan_len: i64,
) -> i64 {
// Consume the session context handle on entry. Ownership transfers here
// regardless of whether the remainder of this function succeeds, returns an
// error via `?`, or panics — RAII (or `catch_unwind` drop-during-unwind)
// drops `session_handle` and frees the underlying SessionContext resources.
//
// This matches the Java-side contract: SessionContextHandle.markConsumed() is
// invoked in a `finally` after the FFM downcall, so every observable path from
// Java's perspective ("call.invoke ran") maps to "Rust consumed the handle".
// If we were to run fallible or panic-prone code (e.g. `get_rt_manager()?`)
// before Box::from_raw, the handle would leak on those paths.
let session_handle = *Box::from_raw(session_ctx_ptr as *mut crate::session_context::SessionContextHandle);

let mgr = get_rt_manager()?;
let plan_bytes = slice::from_raw_parts(plan_ptr, plan_len as usize);
let cpu_executor = mgr.cpu_executor();

// Route based on whether the session was configured for indexed execution
let handle_ref = &*(session_ctx_ptr as *const crate::session_context::SessionContextHandle);
if handle_ref.indexed_config.is_some() {
if session_handle.indexed_config.is_some() {
let ptr = Box::into_raw(Box::new(session_handle)) as i64;
mgr.io_runtime
.block_on(crate::indexed_executor::execute_indexed_with_context(
session_ctx_ptr,
ptr,
plan_bytes.to_vec(),
cpu_executor,
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,6 @@ pub async unsafe fn execute_indexed_with_context(
let cross_rt_stream = CrossRtStream::new_with_df_error_stream(df_stream, cpu_executor);
let schema = cross_rt_stream.schema();
let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream);
let stream_handle = crate::api::QueryStreamHandle::new(wrapped, query_context);
let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, query_context, ctx);
Ok(Box::into_raw(Box::new(stream_handle)) as i64)
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,6 @@ pub async fn execute_with_context(
cross_rt_stream,
);

let stream_handle = crate::api::QueryStreamHandle::new(wrapped, handle.query_context);
let stream_handle = crate::api::QueryStreamHandle::with_session_context(wrapped, handle.query_context, handle.ctx);
Ok(Box::into_raw(Box::new(stream_handle)) as i64)
}

This file was deleted.

Loading
Loading