Skip to content

Introduce Instructions for the Backend to use during execution#21479

Merged
mch2 merged 7 commits into
opensearch-project:mainfrom
expani:data_node_exec_with_instructions
May 6, 2026
Merged

Introduce Instructions for the Backend to use during execution#21479
mch2 merged 7 commits into
opensearch-project:mainfrom
expani:data_node_exec_with_instructions

Conversation

@expani
Copy link
Copy Markdown
Contributor

@expani expani commented May 5, 2026

Backend Execution Metadata Framework

Replaces the opaque data-node execution flow with a chained instruction-based system. The planner now produces typed, ordered instructions per stage that tell the backend how to configure its execution environment before running the query.

Why:

The old flow bundled SessionContext creation, table registration, and query execution into one indivisible call. This made it hard to inject work between steps — like registering UDFs for filter delegation or configuring optimizer settings for partial aggregates without making backends COMPLEX.

What it does:

The planner analyzes the query shape and produces instructions like "set up a shard scan", "configure filter delegation", or "prepare for final aggregation". These travel with the plan to the execution site. At execution time, each instruction's handler runs in order, composably building up the native execution environment. Only after all handlers have run does the query execute against the fully-configured environment.

For the Rust/DataFusion side, the previous execute_query is decomposed into create_session_context → register_table_provider → execute_with_context, each callable independently via FFM. This lets Java instruction handlers configure the SessionContext incrementally.

What this enables next: Filter delegation (Core bridges Lucene and DataFusion without either knowing about the other), partial aggregate optimizer configuration, and any future execution concern — all as additional instruction handlers without touching existing code.

@expani expani requested a review from a team as a code owner May 5, 2026 05:25
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

PR Reviewer Guide 🔍

(Review updated until commit 25543f8)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 Multiple PR themes

Sub-PR theme: SPI and wire protocol for typed instruction nodes

Relevant files:

  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/DelegatedExpression.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionNode.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/InstructionType.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterDelegationInstructionNode.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ShardScanInstructionNode.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/PartialAggregateInstructionNode.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FinalAggregateInstructionNode.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FilterTreeShape.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandler.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/FragmentInstructionHandlerFactory.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionContext.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/CommonExecutionContext.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionState.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/backend/ShardScanExecutionContext.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/SearchExecEngineProvider.java
  • sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/ExchangeSinkContext.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/StagePlan.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/action/FragmentExecutionRequest.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/exec/action/PlanAlternativeSerializationTests.java

Sub-PR theme: Planner instruction assembly and execution dispatch

Relevant files:

  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FilterTreeShapeDeriver.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/Stage.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/RelNodeUtils.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/FilterTreeShapeDeriverTests.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/dag/FragmentConversionDriverTests.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/LocalStageScheduler.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageScheduler.java
  • sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/MockBackend.java
  • sandbox/plugins/analytics-engine/src/test/java/org/opensearch/analytics/planner/FilterRuleTests.java

Sub-PR theme: DataFusion backend SessionContext decomposition and FFM bindings

Relevant files:

  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionInstructionHandlerFactory.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/ShardScanInstructionHandler.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/FinalAggregateInstructionHandler.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionSessionState.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionContext.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearcher.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionSearchExecEngine.java
  • sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DataFusionNativeBridgeTests.java
  • sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/DatafusionSearchExecEngineTests.java

⚡ Recommended focus areas for review

Memory Leak

The doClose() method is a no-op with a TODO comment acknowledging that if executeWithContextAsync is never called or fails before consuming the handle, the native Rust-side SessionContext will leak. This is a known resource management gap that needs resolution before production use.

protected void doClose() {
    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
    // execute_with_context (moves QueryTrackingContext into the stream). If execute
    // fails or is never called, this handle leaks on the Rust side.
    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
    // See df_close_session_context FFM entry which exists but is not yet wired here.
}
Null Engine

The engine variable is initialized to null and only assigned inside the try block after instruction handlers run. If createSearchExecEngine succeeds but engine.execute(ctx) throws, the engine is included in FragmentResources and will be closed. However, if createSearchExecEngine itself throws, engine remains null and FragmentResources constructor is never reached — but the backendContext (which may hold a SessionContextHandle) is not closed. This could leak native resources on the error path.

SearchExecEngine<ShardScanExecutionContext, EngineResultStream> engine = null;
EngineResultStream stream = null;
try {
    ShardScanExecutionContext ctx = buildContext(request, gatedReader.get(), resolved.plan, task);
    AnalyticsSearchBackendPlugin backend = backends.get(resolved.plan.getBackendId());

    // Apply instruction handlers in order — each builds upon the previous handler's backend context
    BackendExecutionContext backendContext = null;
    List<InstructionNode> instructions = resolved.plan.getInstructions();
    if (!instructions.isEmpty()) {
        FragmentInstructionHandlerFactory factory = backend.getInstructionHandlerFactory();
        for (InstructionNode node : instructions) {
            FragmentInstructionHandler handler = factory.createHandler(node);
            backendContext = handler.apply(node, ctx, backendContext);
        }
    }

    engine = backend.getSearchExecEngineProvider().createSearchExecEngine(ctx, backendContext);
    stream = engine.execute(ctx);
    return new FragmentResources(gatedReader, engine, stream);
} catch (Exception e) {
Duplicate Factory Store

setInstructionHandlerFactory is called twice for the same stage: once inside convertStage (lines 112-115) and once in convertAll (lines 80-83). The convertAll call operates on dag.rootStage() which is the same stage that convertStage processes, resulting in redundant work and potential confusion about which call is authoritative.

public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
    convertStage(dag.rootStage(), registry);
    // Root stage executes locally at coordinator — store factory for instruction dispatch.
    Stage root = dag.rootStage();
    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
    }
}

private static void convertStage(Stage stage, CapabilityRegistry registry) {
    for (Stage child : stage.getChildStages()) {
        convertStage(child, registry);
    }
    List<StagePlan> converted = new ArrayList<>(stage.getPlanAlternatives().size());
    for (StagePlan plan : stage.getPlanAlternatives()) {
        AnalyticsSearchBackendPlugin backend = registry.getBackend(plan.backendId());
        FragmentConvertor convertor = backend.getFragmentConvertor();

        // Derive filter tree shape BEFORE stripping (annotations must be intact)
        OpenSearchFilter filter = RelNodeUtils.findNode(plan.resolvedFragment(), OpenSearchFilter.class);
        FilterTreeShape treeShape = filter != null
            ? FilterTreeShapeDeriver.derive(filter, plan.backendId())
            : FilterTreeShape.NO_DELEGATION;

        IntraOperatorDelegationBytes delegationBytes = new IntraOperatorDelegationBytes(registry);
        byte[] bytes = convert(plan.resolvedFragment(), convertor, delegationBytes);

        // Assemble instruction list
        List<InstructionNode> instructions = assembleInstructions(backend, plan, treeShape, delegationBytes);

        converted.add(plan.withConvertedBytes(bytes, delegationBytes.getResult()).withInstructions(instructions));
    }
    stage.setPlanAlternatives(converted);
    // Store factory on coordinator-reduce stages (local execution, no serialization needed).
    // Shard stages get the factory from the local backend plugin at the data node.
    if (stage.getExchangeSinkProvider() != null && !converted.isEmpty()) {
        AnalyticsSearchBackendPlugin backend = registry.getBackend(converted.getFirst().backendId());
        stage.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
    }
Unchecked Cast

The raw-type FragmentInstructionHandler is used without a type parameter when calling handler.apply(node, context, backendContext). This suppresses compile-time type safety. The context here is QueryContext (an ExchangeSinkContext), but ShardScanInstructionHandler casts its commonContext argument to ShardScanExecutionContext, which would throw a ClassCastException at runtime if a shard-scan instruction is mistakenly included in a reduce stage's instruction list.

for (InstructionNode node : stage.getPlanAlternatives().getFirst().instructions()) {
    FragmentInstructionHandler handler = factory.createHandler(node);
    backendContext = handler.apply(node, context, backendContext);
}
Blocking Call

searchWithSessionContext uses future.join() which blocks the calling thread indefinitely. If the native executeWithContextAsync never calls back (e.g., due to a Rust panic or timeout), this will hang the thread with no timeout or cancellation mechanism.

long streamPtr;
try {
    streamPtr = future.join();
} catch (Exception exception) {
    throw new IOException("Query execution with session context failed", exception);
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

PR Code Suggestions ✨

Latest suggestions up to 25543f8

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix native memory leak in session context cleanup

The doClose() method is a no-op, meaning if execute_with_context is never called
(e.g., due to an exception before execution), the native SessionContextHandle will
leak on the Rust side. The NativeBridge.closeSessionContext method already exists
for this purpose and should be called here to prevent the leak.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-37]

 @Override
 protected void doClose() {
-    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
-    // execute_with_context (moves QueryTrackingContext into the stream). If execute
-    // fails or is never called, this handle leaks on the Rust side.
-    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
-    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
-    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
-    // See df_close_session_context FFM entry which exists but is not yet wired here.
+    NativeBridge.closeSessionContext(getPointer());
 }
Suggestion importance[1-10]: 8

__

Why: The no-op doClose() causes a native memory leak when execute_with_context is never called (e.g., on exception). NativeBridge.closeSessionContext already exists for this purpose and should be called here to prevent the leak.

Medium
General
Fix incorrect log level for success message

A success message is being logged at the error! level, which is misleading and will
pollute error logs. This should use info! or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [130]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 7

__

Why: A success message logged at error! level will pollute error logs and cause confusion during debugging. This is a clear misuse of the logging API that should be corrected.

Medium
Remove duplicate marker interface to avoid confusion

BackendExecutionState and BackendExecutionContext are two separate interfaces with
identical Javadoc and purpose. Having two nearly identical marker interfaces for the
same concept will cause confusion. One of them should be removed and all usages
consolidated to the other.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionState.java [22]

-public interface BackendExecutionState {}
+// Remove this file entirely; use BackendExecutionContext everywhere.
Suggestion importance[1-10]: 6

__

Why: BackendExecutionState and BackendExecutionContext are two separate files with identical Javadoc and purpose, which will cause confusion. Having two marker interfaces for the same concept is a design issue that should be resolved by removing one.

Low
Remove duplicate instruction handler factory assignment

The convertAll method calls convertStage which already sets the
InstructionHandlerFactory on the root stage (via the if
(stage.getExchangeSinkProvider() != null ...) block at the end of convertStage). The
duplicate logic in convertAll after the convertStage call will overwrite the factory
set by convertStage with the same value, which is redundant and could cause
confusion. The duplicate block in convertAll should be removed.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [76-84]

 public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
     convertStage(dag.rootStage(), registry);
-    // Root stage executes locally at coordinator — store factory for instruction dispatch.
-    Stage root = dag.rootStage();
-    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
-        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
-        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
-    }
 }
Suggestion importance[1-10]: 5

__

Why: The convertStage method already sets the InstructionHandlerFactory on stages with an ExchangeSinkProvider, so the duplicate block in convertAll is redundant. However, the logic in convertAll targets the root stage specifically, while convertStage processes all stages recursively, so there may be a subtle difference in intent that warrants careful review before removal.

Low

Previous suggestions

Suggestions up to commit 67652b2
CategorySuggestion                                                                                                                                    Impact
General
Fix incorrect log level for success message

A successful operation is being logged at the error! level, which will pollute error
logs and cause confusion. This should use info! or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [130]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 7

__

Why: Using error! for a success message is a real bug that will pollute error logs and cause confusion during debugging. This should clearly be info! or debug!.

Medium
Remove duplicate factory assignment logic

The convertAll method calls convertStage which already sets the
InstructionHandlerFactory on the root stage (via the if
(stage.getExchangeSinkProvider() != null ...) block at the end of convertStage). The
duplicate logic in convertAll after the convertStage call is redundant and could
cause confusion or subtle bugs if the two blocks diverge.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [76-84]

 public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
     convertStage(dag.rootStage(), registry);
-    // Root stage executes locally at coordinator — store factory for instruction dispatch.
-    Stage root = dag.rootStage();
-    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
-        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
-        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
-    }
 }
Suggestion importance[1-10]: 5

__

Why: The convertAll method duplicates the setInstructionHandlerFactory logic that convertStage already performs at the end of its execution. This redundancy could lead to subtle divergence bugs if one block is updated without the other.

Low
Guard against empty viable backends list

Calling getFirst() on getViableBackends() will throw a NoSuchElementException if the
list is empty. After resolution, each annotation should have exactly one viable
backend, but a defensive check or a clearer error message would prevent a cryptic
failure if the plan is in an unexpected state.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FilterTreeShapeDeriver.java [47-50]

 if (node instanceof AnnotatedPredicate predicate) {
-    boolean isDelegated = !predicate.getViableBackends().getFirst().equals(drivingBackendId);
+    List<String> viableBackends = predicate.getViableBackends();
+    if (viableBackends.isEmpty()) {
+        throw new IllegalStateException(
+            "AnnotatedPredicate has no viable backends — was the plan fully resolved before deriving tree shape?"
+        );
+    }
+    boolean isDelegated = !viableBackends.getFirst().equals(drivingBackendId);
     return new Result(isDelegated, false, !isDelegated);
 }
Suggestion importance[1-10]: 4

__

Why: Calling getFirst() on an empty list throws NoSuchElementException with a cryptic message. A defensive check with a clear error message would make debugging easier if the plan is in an unexpected state, though this should only occur if resolution didn't complete properly.

Low
Possible issue
Fix native handle leak on error path

The doClose() method is a no-op, meaning if execute_with_context is never called
(e.g., due to an exception before execution), the native SessionContextHandle will
leak on the Rust side. Since NativeBridge.closeSessionContext already exists and is
bound, it should be called here to free the handle on the error path.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-37]

 @Override
 protected void doClose() {
-    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
-    // execute_with_context (moves QueryTrackingContext into the stream). If execute
-    // fails or is never called, this handle leaks on the Rust side.
-    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
-    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
-    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
-    // See df_close_session_context FFM entry which exists but is not yet wired here.
+    NativeBridge.closeSessionContext(getPointer());
 }
Suggestion importance[1-10]: 7

__

Why: The no-op doClose() causes a native memory leak if execute_with_context is never called. Since NativeBridge.closeSessionContext is already bound, wiring it here would fix the error-path leak. The TODO comment in the code itself acknowledges this issue.

Medium
Suggestions up to commit 59dedaf
CategorySuggestion                                                                                                                                    Impact
General
Fix incorrect log level for success message

A success message is being logged at the error! level, which is misleading and will
pollute error logs. This should use info! or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [130]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 7

__

Why: A success message logged at error! level will pollute error logs and cause confusion during debugging. This is a clear misuse of the logging API that should be corrected.

Medium
Remove duplicate marker interface

BackendExecutionState and BackendExecutionContext are two separate files with
identical Javadoc and purpose. This duplication is likely unintentional and will
cause confusion. One of them should be removed, and all usages should be
consolidated to a single interface.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionState.java [22]

-public interface BackendExecutionState {}
+// Remove this file entirely; use BackendExecutionContext everywhere.
Suggestion importance[1-10]: 5

__

Why: BackendExecutionState and BackendExecutionContext have identical Javadoc and purpose, creating confusion. This duplication appears unintentional and one should be removed to keep the codebase clean.

Low
Possible issue
Fix native handle leak on error path

The doClose() method is a no-op, meaning if execute is never called or fails before
consuming the handle, the native SessionContextHandle will leak on the Rust side.
Since NativeBridge.closeSessionContext already exists and is bound, it should be
called here to free the pointer on the error path.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-37]

 @Override
 protected void doClose() {
-    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
-    // execute_with_context (moves QueryTrackingContext into the stream). If execute
-    // fails or is never called, this handle leaks on the Rust side.
-    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
-    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
-    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
-    // See df_close_session_context FFM entry which exists but is not yet wired here.
+    NativeBridge.closeSessionContext(getPointer());
 }
Suggestion importance[1-10]: 7

__

Why: The no-op doClose() causes a native memory leak when execute is never called or fails. Since NativeBridge.closeSessionContext is already bound via FFM, wiring it here would fix the leak on the error path.

Medium
Guard factory call when instructions list is empty

If selectedPlan.getInstructions() is empty (e.g., for plans produced before the
instruction pipeline was introduced or for backends that don't implement it),
backend.getInstructionHandlerFactory() will still be called and may throw
UnsupportedOperationException. The factory call should be guarded by a check that
instructions are non-empty.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [125-130]

-FragmentInstructionHandlerFactory factory = backend.getInstructionHandlerFactory();
 BackendExecutionContext backendContext = null;
-for (InstructionNode node : selectedPlan.getInstructions()) {
-    FragmentInstructionHandler handler = factory.createHandler(node);
-    backendContext = handler.apply(node, ctx, backendContext);
+List<InstructionNode> instructions = selectedPlan.getInstructions();
+if (!instructions.isEmpty()) {
+    FragmentInstructionHandlerFactory factory = backend.getInstructionHandlerFactory();
+    for (InstructionNode node : instructions) {
+        FragmentInstructionHandler handler = factory.createHandler(node);
+        backendContext = handler.apply(node, ctx, backendContext);
+    }
 }
Suggestion importance[1-10]: 5

__

Why: If getInstructions() is empty, calling getInstructionHandlerFactory() is unnecessary and may throw UnsupportedOperationException for backends that don't implement it. Guarding the factory call prevents this potential runtime failure.

Low
Suggestions up to commit a9d018c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix native handle leak on error paths

The doClose() method is a no-op, meaning if execute_with_context is never called
(e.g., due to an exception before execution), the native SessionContextHandle will
leak on the Rust side. The df_close_session_context FFM binding already exists but
is not wired here. This is a resource leak that should be addressed by calling
NativeBridge.closeSessionContext(getPointer()) in doClose() with a guard to avoid
double-free.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-37]

 @Override
 protected void doClose() {
-    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
-    // execute_with_context (moves QueryTrackingContext into the stream). If execute
-    // fails or is never called, this handle leaks on the Rust side.
-    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
-    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
-    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
-    // See df_close_session_context FFM entry which exists but is not yet wired here.
+    // df_close_session_context is a no-op on the Rust side if the handle was already
+    // consumed by execute_with_context, so it is safe to call unconditionally.
+    NativeBridge.closeSessionContext(getPointer());
 }
Suggestion importance[1-10]: 8

__

Why: The doClose() is a no-op, causing a native memory leak if execute_with_context is never called (e.g., on exception). The df_close_session_context FFM binding already exists and is safe to call unconditionally since Rust can guard against double-free, so wiring it here fixes the leak on error paths.

Medium
Remove duplicate marker interface

BackendExecutionState and BackendExecutionContext are two separate marker interfaces
with identical Javadoc and purpose. This duplication is likely a mistake — one of
them should be removed or they should be consolidated. Having two identical
interfaces will cause confusion about which one to implement.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionState.java [22]

-public interface BackendExecutionState {}
+// This file should be deleted; BackendExecutionContext already serves this purpose.
+// All usages should reference BackendExecutionContext instead.
Suggestion importance[1-10]: 7

__

Why: BackendExecutionState and BackendExecutionContext are two separate files with identical Javadoc and purpose. The codebase uses BackendExecutionContext everywhere (in FragmentInstructionHandler, AnalyticsSearchService, ShardScanInstructionHandler, etc.), making BackendExecutionState a dead, duplicate interface that should be removed to avoid confusion.

Medium
General
Fix incorrect log level for success message

A success message is being logged at the error! level, which is misleading and will
pollute error logs. This should use info! or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [130]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+log::info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 6

__

Why: A success message is logged at the error! level, which will pollute error logs and mislead operators. It should use info! or debug! instead.

Low
Remove duplicate factory assignment for root stage

The convertAll method calls convertStage(dag.rootStage(), registry) and then inside
convertStage, the factory is already set on the stage via
stage.setInstructionHandlerFactory(...). The subsequent block in convertAll
duplicates this logic for the root stage, potentially overwriting the factory set by
convertStage. This redundancy could lead to subtle bugs if the two code paths
diverge.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [76-84]

 public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
     convertStage(dag.rootStage(), registry);
-    // Root stage executes locally at coordinator — store factory for instruction dispatch.
-    Stage root = dag.rootStage();
-    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
-        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
-        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
-    }
+    // Factory is set inside convertStage for all stages including root.
 }
Suggestion importance[1-10]: 5

__

Why: convertStage already sets the instructionHandlerFactory on the stage (lines 110-113), so the additional block in convertAll that does the same for the root stage is redundant and could cause subtle divergence bugs if the two paths are ever updated independently.

Low
Suggestions up to commit d160c87
CategorySuggestion                                                                                                                                    Impact
General
Fix incorrect log level for success message

A success message is being logged at the error! level, which is misleading and will
pollute error logs. This should use info! or debug! instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [130]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 7

__

Why: A success message logged at error! level will pollute error logs and cause confusion. This is a clear bug in the logging level that should be info! or debug!.

Medium
Remove duplicate factory assignment after stage conversion

The convertAll method calls convertStage(dag.rootStage(), registry) which already
sets the instructionHandlerFactory on the root stage inside convertStage (the block
at the end of convertStage checks stage.getExchangeSinkProvider() != null). The
duplicate logic in convertAll after the convertStage call is redundant and could
cause confusion or inconsistency if the logic diverges.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [76-84]

 public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
     convertStage(dag.rootStage(), registry);
-    // Root stage executes locally at coordinator — store factory for instruction dispatch.
-    Stage root = dag.rootStage();
-    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
-        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
-        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
-    }
 }
Suggestion importance[1-10]: 5

__

Why: The convertAll method duplicates the setInstructionHandlerFactory logic already present at the end of convertStage, which could lead to inconsistency. Removing the duplicate block simplifies the code and avoids potential divergence.

Low
Remove duplicate marker interface to avoid confusion

BackendExecutionState and BackendExecutionContext are two separate files with nearly
identical Javadoc and purpose (both are marker interfaces for backend-specific
execution state flowing between instruction handlers). Having both creates confusion
about which one to use. One of them should be removed, and all usages should be
consolidated to the other.

sandbox/libs/analytics-framework/src/main/java/org/opensearch/analytics/spi/BackendExecutionState.java [22]

-public interface BackendExecutionState {}
+// Remove this file entirely — use BackendExecutionContext throughout.
Suggestion importance[1-10]: 5

__

Why: BackendExecutionState and BackendExecutionContext are nearly identical marker interfaces with the same Javadoc, creating confusion. The codebase uses BackendExecutionContext throughout, making BackendExecutionState an unused duplicate that should be removed.

Low
Possible issue
Fix native handle leak on error path

The doClose() method is a no-op, meaning if execute_with_context is never called
(e.g., due to an exception before execution), the native SessionContextHandle will
leak on the Rust side. Since df_close_session_context is already bound in
NativeBridge, it should be called here to free the pointer on the error path. A
consumed flag can guard against double-free.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-37]

 @Override
 protected void doClose() {
-    // TODO: Handle error-path cleanup. Currently Rust consumes the handle in
-    // execute_with_context (moves QueryTrackingContext into the stream). If execute
-    // fails or is never called, this handle leaks on the Rust side.
-    // Options: (a) AtomicBool 'consumed' flag on Rust handle — close_session_context
-    // checks flag before freeing, (b) don't consume in Rust and use no-op tracking
-    // on the stream, (c) markConsumed() on NativeHandle to skip doClose on happy path.
-    // See df_close_session_context FFM entry which exists but is not yet wired here.
+    // df_close_session_context is safe to call if Rust hasn't consumed the handle yet.
+    // If Rust already consumed it (happy path), this is a no-op on the Rust side
+    // (guarded by an AtomicBool or similar mechanism there).
+    NativeBridge.closeSessionContext(getPointer());
 }
Suggestion importance[1-10]: 6

__

Why: The doClose() no-op creates a potential native memory leak if execute_with_context is never called. However, the suggestion's improved_code assumes Rust guards against double-free, which is not yet implemented per the TODO comments, making this a partial fix that could introduce a double-free bug.

Low
Suggestions up to commit 2974120
CategorySuggestion                                                                                                                                    Impact
General
Fix incorrect log level for success message

A success message is being logged at the error! level, which is incorrect and will
pollute error logs with non-error information. This should use the info! or debug!
macro instead.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [129]

-error!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
+info!("create_session_context: successfully registered table '{}', table_name_len={}", table_name, table_name.len());
Suggestion importance[1-10]: 7

__

Why: Using error! macro for a success message is clearly wrong and will pollute error logs. This should use info! or debug! instead.

Medium
Remove duplicate instruction handler factory assignment

The convertAll method calls convertStage(dag.rootStage(), registry) and then
separately sets the instruction handler factory on the root stage. However,
convertStage already sets the factory at the end for stages with an
ExchangeSinkProvider. This results in the factory being set twice on the root stage,
which is redundant and could cause confusion. The duplicate block in convertAll
should be removed.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FragmentConversionDriver.java [76-84]

 public static void convertAll(QueryDAG dag, CapabilityRegistry registry) {
     convertStage(dag.rootStage(), registry);
-    // Root stage executes locally at coordinator — store factory for instruction dispatch.
-    Stage root = dag.rootStage();
-    if (root.getExchangeSinkProvider() != null && !root.getPlanAlternatives().isEmpty()) {
-        AnalyticsSearchBackendPlugin backend = registry.getBackend(root.getPlanAlternatives().getFirst().backendId());
-        root.setInstructionHandlerFactory(backend.getInstructionHandlerFactory());
-    }
 }
Suggestion importance[1-10]: 6

__

Why: The convertAll method duplicates the factory-setting logic already present at the end of convertStage, which could lead to redundant operations. However, the conditions differ slightly (root stage vs stages with ExchangeSinkProvider), so this needs careful verification.

Low
Possible issue
Prevent native memory leak on error path

If execute is never called (e.g., due to an exception before execution), the native
SessionContextHandle pointer will leak. The doClose() method should call a native
free function to release the Rust-side memory on the error path, rather than
accepting the leak.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/SessionContextHandle.java [29-33]

 @Override
 protected void doClose() {
-    // No-op: Rust consumes the SessionContext inside execute_with_context.
-    // If execute is never called (error path), the Rust memory leaks — acceptable
-    // for now since the process would be failing anyway.
+    // If execute was never called, free the native SessionContext to avoid memory leak.
+    if (getPointer() != 0) {
+        NativeBridge.freeSessionContext(getPointer());
+    }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion identifies a real memory leak risk when execute is never called, but the improved code references NativeBridge.freeSessionContext which doesn't exist in the PR diff, making the fix not directly applicable without additional changes.

Low
Guard against empty viable backends list

Calling getViableBackends().getFirst() will throw a NoSuchElementException if the
list is empty. There should be a guard to handle the case where getViableBackends()
returns an empty list, to avoid unexpected runtime failures.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/FilterTreeShapeDeriver.java [47-50]

 if (node instanceof AnnotatedPredicate predicate) {
-    boolean isDelegated = !predicate.getViableBackends().getFirst().equals(drivingBackendId);
+    List<String> viableBackends = predicate.getViableBackends();
+    if (viableBackends.isEmpty()) {
+        return new Result(false, false, false);
+    }
+    boolean isDelegated = !viableBackends.getFirst().equals(drivingBackendId);
     return new Result(isDelegated, false, !isDelegated);
 }
Suggestion importance[1-10]: 5

__

Why: Calling getFirst() on a potentially empty list could throw NoSuchElementException. Adding a guard for empty viableBackends improves robustness, though in practice after resolution each predicate should have exactly one backend.

Low

@expani expani changed the title Introduce Instructions for the Backend for use during execution Introduce Instructions for the Backend to use during execution May 5, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 25543f8.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs143lowA success path uses the `error!` macro ('create_session_context: successfully registered table') instead of `info!` or `debug!`. Logging success events at ERROR level is inconsistent and could pollute error monitoring/alerting with false positives, or mask genuine errors. Likely a debugging artifact left in, but the mismatch between log level and message content is anomalous.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 0 | Medium: 0 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit cd356ae

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for cd356ae: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 9c52685

@expani expani force-pushed the data_node_exec_with_instructions branch from 9c52685 to a2b744a Compare May 5, 2026 07:26
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit a2b744a

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for a2b744a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@expani expani force-pushed the data_node_exec_with_instructions branch from a2b744a to 2974120 Compare May 5, 2026 16:32
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 2974120

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for 2974120: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment thread sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit d160c87

Copy link
Copy Markdown
Contributor

@bharath-techie bharath-techie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@expani expani force-pushed the data_node_exec_with_instructions branch from d160c87 to e9f0237 Compare May 5, 2026 19:22
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit a9d018c

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for a9d018c:

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 59dedaf

Copy link
Copy Markdown
Member

@sandeshkr419 sandeshkr419 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. The abstraction LGTM with a few nit-picks: happy to pick them up myself while implementing in my follow-up PR #21457 as well if they make more sense.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for 59dedaf: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 67652b2

expani added 6 commits May 5, 2026 16:24
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
Signed-off-by: expani <anijainc@amazon.com>
@expani expani force-pushed the data_node_exec_with_instructions branch from 67652b2 to 25543f8 Compare May 5, 2026 23:30
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 25543f8

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 6, 2026

✅ Gradle check result for 25543f8: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.38%. Comparing base (c6527fa) to head (25543f8).

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21479      +/-   ##
============================================
- Coverage     73.41%   73.38%   -0.03%     
+ Complexity    74424    74401      -23     
============================================
  Files          5970     5970              
  Lines        338261   338261              
  Branches      48752    48752              
============================================
- Hits         248323   248246      -77     
- Misses        70205    70227      +22     
- Partials      19733    19788      +55     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@mch2 mch2 merged commit cbad6c6 into opensearch-project:main May 6, 2026
15 of 16 checks passed
vishwasgarg18 pushed a commit to vishwasgarg18/OpenSearch that referenced this pull request May 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants