Skip to content

sandbox: enforce streaming-only fragment dispatch in analytics-engine#21594

Open
mch2 wants to merge 2 commits intoopensearch-project:mainfrom
mch2:streaming-refactor
Open

sandbox: enforce streaming-only fragment dispatch in analytics-engine#21594
mch2 wants to merge 2 commits intoopensearch-project:mainfrom
mch2:streaming-refactor

Conversation

@mch2
Copy link
Copy Markdown
Member

@mch2 mch2 commented May 10, 2026

Description

Collapses analytics-engine fragment dispatch to streaming-only. Removes the parallel row-oriented codec layer (RowResponseCodec, ResponseCodec, FragmentExecutionResponse, RowBatchToArrowConverter) and the executeFragment / collectResponse service methods. AnalyticsSearchTransportService now fails fast if the streaming transport feature flag isn't on; sandbox QA clusters set it for us.

Bug fixes uncovered by this change - i have pointed ITs for these issues but they are dependent on stubbable transport pr #21581. However, without these fixes the current IT suite fails.

  • Sink-feed exceptions hung the query — an outputSink.feed(...) failure in ShardFragmentStageExecution surfaced only on the stream's virtual thread; inFlight never decremented and the stage hung to QUERY_TIMEOUT. Now caught and routed to stage FAILED.
  • Empty native streams produced schema-less wire output — streaming Flight requires ≥1 schema-bearing frame; DatafusionResultStream now synthesises a zero-row schema-carrying batch when the native side yields nothing.
  • DatafusionPartitionSender UAFsender_close could reclaim the heap allocation while sender_send held an immutable borrow across the tokio mpsc await. Send/close now serialise via a read-write lock.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

- delete RowResponseCodec, ResponseCodec, FragmentExecutionResponse, RowBatchToArrowConverter
- delete AnalyticsSearchService.executeFragment / collectResponse
- require StreamTransportService at injection (fail-fast); force streaming feature flag in sandbox QA clusters
- ShardFragmentStageExecution: catch outputSink.feed() exceptions so a feed failure fails the stage instead of hanging to QUERY_TIMEOUT
- DatafusionResultStream: synthesise one zero-row schema-bearing batch for empty native streams (Flight requires ≥1 schema frame)
- DatafusionPartitionSender: read-write lock around send/close to prevent sender_close UAF while sender_send is mid-await
- @AwaitsFix 7 Append/AppendPipe IT methods — Utf8View FFI schema-lie bug, tracked separately

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@mch2 mch2 requested a review from a team as a code owner May 10, 2026 19:38
@mch2 mch2 requested review from bowenlan-amzn and expani May 10, 2026 19:38
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

PR Reviewer Guide 🔍

(Review updated until commit e41c84e)

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Deadlock

The read-write lock serializes send and close, but send acquires a read lock while close acquires a write lock. If multiple threads call send concurrently and one thread calls close, the write lock request will block until all read locks are released. However, if new send calls keep arriving, the write lock may starve indefinitely, preventing close from completing. This can hang the system if send operations are frequent or long-running.

public void send(long arrayAddr, long schemaAddr) {
    lifecycle.readLock().lock();
    try {
        NativeBridge.senderSend(getPointer(), arrayAddr, schemaAddr);
    } finally {
        lifecycle.readLock().unlock();
    }
}

@Override
public void close() {
    lifecycle.writeLock().lock();
    try {
        super.close();
    } finally {
        lifecycle.writeLock().unlock();
    }
}
Exception Swallowed

The catch block at line 103 captures any exception from outputSink.feed(vsr) and wraps it in a RuntimeException. However, if captureFailure or subsequent operations throw, the original exception context may be lost. More critically, if onShardTerminated at line 108 throws, the exception from line 103 is never propagated, silently swallowing the feed failure. This can mask the root cause of query failures.

    outputSink.feed(vsr);
} catch (Exception e) {
    // Without this guard the exception only surfaces on the stream's virtual
    // thread; inFlight never decrements and the stage hangs to QUERY_TIMEOUT.
    captureFailure(new RuntimeException("Stage " + stage.getStageId() + " sink feed failed", e));
    metrics.incrementTasksFailed();
    onShardTerminated();
    return;
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 10, 2026

PR Code Suggestions ✨

Latest suggestions up to e41c84e
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Close VectorSchemaRoot on feed exception

When outputSink.feed(vsr) throws an exception, the VectorSchemaRoot is never closed,
causing a memory leak. The exception handler should ensure the root is closed before
returning, especially since the caller transfers ownership of the VSR to this
method.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageExecution.java [91-110]

 public void onStreamResponse(FragmentExecutionArrowResponse response, boolean isLast) {
     if (isDone()) {
         VectorSchemaRoot root = response.getRoot();
         if (root != null) {
             root.close();
         }
         return;
     }
 
     VectorSchemaRoot vsr = response.getRoot();
     try {
         outputSink.feed(vsr);
     } catch (Exception e) {
+        if (vsr != null) {
+            vsr.close();
+        }
         captureFailure(new RuntimeException("Stage " + stage.getStageId() + " sink feed failed", e));
         metrics.incrementTasksFailed();
         onShardTerminated();
         return;
     }
Suggestion importance[1-10]: 9

__

Why: This identifies a critical memory leak where VectorSchemaRoot is not closed when outputSink.feed(vsr) throws an exception. Since the caller transfers ownership of the VSR to this method, failing to close it on exception paths causes resource leaks. This is a high-impact bug fix.

High
Add closed-state check in send

The send method should verify that the sender hasn't been closed before attempting
to send data. Without this check, a race condition between send and close could
still occur if close acquires the write lock immediately after send checks the
pointer but before acquiring the read lock.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionPartitionSender.java [33-40]

 public void send(long arrayAddr, long schemaAddr) {
     lifecycle.readLock().lock();
     try {
+        if (ptr == 0) {
+            throw new IllegalStateException("Sender already closed");
+        }
         NativeBridge.senderSend(getPointer(), arrayAddr, schemaAddr);
     } finally {
         lifecycle.readLock().unlock();
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition where send could operate on a closed pointer. However, the read-write lock already provides synchronization between send and close, making this check defensive rather than critical. The score reflects its value as a safety improvement.

Medium

Previous suggestions

Suggestions up to commit 7ea7b0a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Close VectorSchemaRoot after feeding sink

The VectorSchemaRoot from response.getRoot() is not closed in the success path after
being fed to the sink. This creates a memory leak as Arrow buffers are not released.
The sink should not take ownership of the VSR lifecycle; the response handler must
close it after feeding.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageExecution.java [91-116]

 public void onStreamResponse(FragmentExecutionArrowResponse response, boolean isLast) {
     if (isDone()) {
         VectorSchemaRoot root = response.getRoot();
         if (root != null) {
             root.close();
         }
         return;
     }
 
     VectorSchemaRoot vsr = response.getRoot();
     try {
         outputSink.feed(vsr);
     } catch (Exception e) {
         ...
+    } finally {
+        if (vsr != null) {
+            vsr.close();
+        }
     }
Suggestion importance[1-10]: 9

__

Why: This identifies a critical memory leak. The VectorSchemaRoot obtained from response.getRoot() is fed to the sink but never closed in the success path, causing Arrow buffers to accumulate. The suggestion correctly adds a finally block to ensure cleanup, which is essential for preventing resource exhaustion in long-running queries.

High
Check sender state before sending

The send method should verify that the sender has not been closed before attempting
to send data. Without this check, a race condition could occur where close()
acquires the write lock and frees the native pointer while send() is waiting for the
read lock, leading to use-after-free when send() proceeds with an invalid pointer.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionPartitionSender.java [33-40]

 public void send(long arrayAddr, long schemaAddr) {
     lifecycle.readLock().lock();
     try {
+        if (ptr == 0) {
+            throw new IllegalStateException("Sender already closed");
+        }
         NativeBridge.senderSend(getPointer(), arrayAddr, schemaAddr);
     } finally {
         lifecycle.readLock().unlock();
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential race condition where send() could operate on a freed pointer after close(). However, the read-write lock already serializes these operations - send() holds a read lock while close() requires a write lock, preventing overlap. The check adds defensive programming but doesn't fix a critical bug given the existing lock protection.

Medium

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 7ea7b0a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@expani
Copy link
Copy Markdown
Contributor

expani commented May 10, 2026

Thanks for getting this started @mch2

Always happy to see PRs that have a diff like

+122
-563
Lines changed: 122 additions & 563 deletions

- DatafusionReduceSink.feedToSender now calls sender.send() instead of
  NativeBridge.senderSend() directly, so the read-write lock added on
  DatafusionPartitionSender actually protects the hot path.
- Drop @AwaitsFix on 7 Append/AppendPipe IT methods. The Utf8View
  schema-lie they hit is fixed by coerceToDeclaredSchema in upstream
  opensearch-project#21457 (merged here); all 7 now pass.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
Comment thread sandbox/qa/analytics-engine-rest/build.gradle
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e41c84e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants