Skip to content

Enable streaming transport in analytics-engine-rest QA by default#21575

Draft
bowenlan-amzn wants to merge 3 commits intoopensearch-project:mainfrom
bowenlan-amzn:stream-transport-qa
Draft

Enable streaming transport in analytics-engine-rest QA by default#21575
bowenlan-amzn wants to merge 3 commits intoopensearch-project:mainfrom
bowenlan-amzn:stream-transport-qa

Conversation

@bowenlan-amzn
Copy link
Copy Markdown
Member

Description

Make streaming transport the default in the analytics-engine-rest QA suite so every IT runs with the opensearch.experimental.feature.transport.stream.enabled flag enabled, rather than only the dedicated StreamingCoordinatorReduceIT. This guards against regressions in the Arrow Flight stream transport across the full IT surface instead of a single test.

Note: the single-shard / single-node path short-circuits local transport, so flag-enabled does not guarantee Flight RPC is exercised for every test — it just ensures no regression when it is.

Changes

  • Add systemProperty 'opensearch.experimental.feature.transport.stream.enabled', 'true' to the default testClusters.integTest configuration.
  • Remove the exclude '**/StreamingCoordinatorReduceIT.class' line from integTest so the streaming test runs alongside the rest.
  • Delete the now-redundant integTestStreaming task and its testClusters.integTestStreaming configuration (its coverage is subsumed by the default integTest).

Known issue

Enabling the flag across the full IT suite may cause a cluster OOM mid-suite. This is being triaged in parallel under stream:qa-oom and is not resolved by this PR. A follow-up may be needed once the root cause is identified.

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for aaa0c63: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Reviewer Guide 🔍

(Review updated until commit 30f0c0e)

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Potential Deadlock

If drainEnteredPoll.release() at line 189 throws (e.g., due to thread interruption or semaphore overflow), the drain thread exits without releasing the permit. Meanwhile, closeUnderLock() at line 411 waits up to 30 seconds for that permit. If the drain thread fails before releasing, close will always time out, delaying shutdown by 30 seconds per sender. Consider wrapping the release in a try-finally or handling exceptions that prevent the signal.

drainEnteredPoll.release();
Resource Leak

If copyToVarChar or transferVector throws after adding vectors to newVectors but before the try-catch at line 76 catches it, the loop continues and may add more vectors. The catch block at line 76-79 closes all vectors in newVectors, but if an exception occurs during the close loop itself (e.g., v.close() throws), subsequent vectors remain unclosed. Consider using try-with-resources or a suppressed-exception pattern to ensure all vectors are closed even if closing one fails.

try {
    for (FieldVector original : vsr.getFieldVectors()) {
        FieldVector replacement;
        if (original instanceof ViewVarCharVector view) {
            replacement = copyToVarChar(view.getName(), rowCount, allocator, view::isNull, view::get);
        } else if (original instanceof LargeVarCharVector large) {
            replacement = copyToVarChar(large.getName(), rowCount, allocator, large::isNull, large::get);
        } else {
            // Transfer ownership so the original VSR's close() won't free the buffers.
            replacement = transferVector(original, allocator);
        }
        newVectors.add(replacement);
        newFields.add(replacement.getField());
    }
} catch (RuntimeException e) {
    for (FieldVector v : newVectors)
        v.close();
    throw e;
}
Possible OOM

dst.allocateNew(rowCount) at line 106 allocates memory for rowCount entries without checking available memory or capping the allocation size. If rowCount is very large (e.g., millions of rows with long strings), this can exhaust the allocator's budget and throw an OOM exception. The catch block at line 116 closes dst, but the caller's loop in toUtf8Strings does not handle OOM gracefully — it will propagate up, potentially leaving other vectors in newVectors unclosed (see previous issue). Consider validating rowCount or using chunked allocation if the allocator supports it.

dst.allocateNew(rowCount);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

PR Code Suggestions ✨

Latest suggestions up to 30f0c0e

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent resource leak on error

The exception handler only closes vectors in newVectors but doesn't close the
original vsr that was passed in. If an exception occurs during conversion, the
original VSR's resources will leak since the caller expects the method to handle
cleanup.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [76-80]

 } catch (RuntimeException e) {
     for (FieldVector v : newVectors)
         v.close();
+    vsr.close();
     throw e;
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical resource leak. The method's contract (line 50-52) states the caller must close the returned VSR, but if an exception occurs mid-conversion, the original vsr is never closed and its resources leak. The fix correctly adds vsr.close() in the exception handler.

High
Fix permit draining race condition

Draining permits before closing each sender creates a race condition. If the drain
thread releases a permit between drainPermits() and the subsequent tryAcquire(),
that permit will be consumed even though it represents a poll that occurred before
the sender was closed, defeating the synchronization logic.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReduceSink.java [401-402]

+senders.get(i).close();
 drainEnteredPoll.drainPermits();
-senders.get(i).close();
Suggestion importance[1-10]: 8

__

Why: The suggestion identifies a subtle race condition. Draining permits before closing the sender means a permit released between drainPermits() and tryAcquire() (line 411) could represent a poll from before the close, breaking the synchronization guarantee. Moving drainPermits() after close() ensures only post-close permits are drained.

Medium
Handle drain timeout as error

The timeout warning proceeds silently after logging, which may lead to data
corruption or incorrect ordering in appendpipe scenarios. Consider throwing an
exception or setting a failure flag to prevent silent data corruption when the drain
thread doesn't respond within the timeout period.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DatafusionReduceSink.java [411-413]

 if (!drainEnteredPoll.tryAcquire(30, TimeUnit.SECONDS)) {
-    logger.warn("[ReduceSink] timed out waiting for branch {} drain; proceeding", i);
+    String msg = String.format("[ReduceSink] timed out waiting for branch %d drain", i);
+    logger.error(msg);
+    throw new IllegalStateException(msg);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that a timeout could lead to ordering issues in appendpipe scenarios. However, throwing an exception may be too aggressive since the code already has error accumulation logic. The current approach of logging and proceeding may be intentional for graceful degradation.

Medium

Previous suggestions

Suggestions up to commit 601f5c8
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent resource leak on exception

The exception handler should close the original VSR to prevent resource leaks when
an error occurs during vector conversion. Currently, if an exception is thrown, the
original vsr remains open while the method exits.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [76-80]

 } catch (RuntimeException e) {
     for (FieldVector v : newVectors)
         v.close();
+    vsr.close();
     throw e;
 }
Suggestion importance[1-10]: 9

__

Why: Critical resource leak fix. When an exception occurs during vector conversion, the original vsr must be closed to prevent memory leaks. The suggestion correctly identifies that vsr.close() should be called in the catch block before re-throwing.

High
General
Validate field type before conversion

The check should also verify the field type to ensure it's actually a string column.
Non-string vectors could theoretically be instances of these classes but represent
different data types, leading to incorrect normalization logic.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [87-94]

 private static boolean hasNonUtf8StringColumn(VectorSchemaRoot vsr) {
     for (FieldVector v : vsr.getFieldVectors()) {
-        if (v instanceof ViewVarCharVector || v instanceof LargeVarCharVector) {
+        ArrowType type = v.getField().getType();
+        boolean isStringType = type instanceof ArrowType.Utf8 || type instanceof ArrowType.Utf8View || type instanceof ArrowType.LargeUtf8;
+        if (isStringType && (v instanceof ViewVarCharVector || v instanceof LargeVarCharVector)) {
             return true;
         }
     }
     return false;
 }
Suggestion importance[1-10]: 3

__

Why: While theoretically more defensive, this check is likely unnecessary in practice. ViewVarCharVector and LargeVarCharVector are specifically string vector implementations, so the instanceof check already ensures they are string columns. The added complexity may not justify the marginal safety improvement.

Low
Suggestions up to commit a6a4178
CategorySuggestion                                                                                                                                    Impact
General
Prevent resource leak on constructor failure

If the VectorSchemaRoot constructor throws an exception, the newVectors will leak
since they won't be closed. Wrap the constructor call in a try-catch block to ensure
proper cleanup of allocated resources.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [82-84]

-VectorSchemaRoot normalized = new VectorSchemaRoot(new Schema(newFields), newVectors, rowCount);
+VectorSchemaRoot normalized;
+try {
+    normalized = new VectorSchemaRoot(new Schema(newFields), newVectors, rowCount);
+} catch (RuntimeException e) {
+    for (FieldVector v : newVectors) {
+        v.close();
+    }
+    throw e;
+}
 vsr.close();
 return normalized;
Suggestion importance[1-10]: 7

__

Why: This addresses a potential resource leak if VectorSchemaRoot constructor throws an exception. The newVectors would not be closed, causing a memory leak. This is a valid concern for resource management in production code.

Medium
Add braces to for-loop

Add braces around the single-statement for-loop in the catch block to prevent
potential bugs if additional statements are added later. This follows Java best
practices and improves code maintainability.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [76-79]

 } catch (RuntimeException e) {
-    for (FieldVector v : newVectors)
+    for (FieldVector v : newVectors) {
         v.close();
+    }
     throw e;
 }
Suggestion importance[1-10]: 4

__

Why: While adding braces improves code style and maintainability, the existing code is functionally correct. This is a minor style improvement that doesn't address any bugs or critical issues.

Low
Suggestions up to commit 9ea752c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent resource leak on construction failure

If an exception occurs during VectorSchemaRoot construction, the newVectors will
leak because vsr.close() is never called. Wrap the construction and close in a
try-catch to ensure cleanup on failure.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/VsrNormalizer.java [81-83]

-VectorSchemaRoot normalized = new VectorSchemaRoot(new Schema(newFields), newVectors, rowCount);
-vsr.close();
-return normalized;
+try {
+    VectorSchemaRoot normalized = new VectorSchemaRoot(new Schema(newFields), newVectors, rowCount);
+    vsr.close();
+    return normalized;
+} catch (RuntimeException e) {
+    for (FieldVector v : newVectors) v.close();
+    vsr.close();
+    throw e;
+}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential resource leak if VectorSchemaRoot construction fails. However, the newVectors are already protected by a try-catch block earlier (lines 62-79), so this is a secondary safeguard. The impact is moderate since construction failure is less common than vector creation failure.

Medium
Prevent VSR leak on normalization failure

If VsrNormalizer.toUtf8Strings() throws an exception, the original vsr may leak if
it was replaced but not properly closed. Ensure the VSR is closed in the existing
error handling path or wrap normalization in try-catch.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/stage/ShardFragmentStageExecution.java [125-132]

 VectorSchemaRoot vsr = toVsr.apply(response);
-...
-vsr = VsrNormalizer.toUtf8Strings(vsr, config.bufferAllocator());
-outputSink.feed(vsr);
+try {
+    ...
+    vsr = VsrNormalizer.toUtf8Strings(vsr, config.bufferAllocator());
+    outputSink.feed(vsr);
+} catch (RuntimeException e) {
+    vsr.close();
+    throw e;
+}
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a potential resource leak if VsrNormalizer.toUtf8Strings() throws an exception. However, examining the normalizer code shows it already handles cleanup internally (line 82 closes vsr on success, and line 77 cleans up newVectors on failure). The suggestion adds defensive programming but may be redundant given the normalizer's internal error handling.

Low

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for 9ea752c: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bowenlan-amzn bowenlan-amzn force-pushed the stream-transport-qa branch from 9ea752c to a6a4178 Compare May 10, 2026 04:31
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a6a4178

@bowenlan-amzn bowenlan-amzn force-pushed the stream-transport-qa branch from a6a4178 to 601f5c8 Compare May 10, 2026 05:18
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 601f5c8

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 601f5c8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

…gTest

Make streaming transport the default in the analytics-engine-rest QA
suite so every IT runs with the streaming flag enabled, rather than
only the one dedicated test.

- Add `opensearch.experimental.feature.transport.stream.enabled=true`
  as a systemProperty on the default integTest testCluster.
- Remove the `**/StreamingCoordinatorReduceIT.class` exclusion from
  integTest so the streaming test runs alongside the rest.
- Drop the now-redundant `integTestStreaming` task and its dedicated
  testClusters.integTestStreaming config.

Note: the single-shard / single-node path short-circuits local
transport, so enabling the flag does not guarantee Flight RPC is
exercised for every test — it just ensures no regression when it is.

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
DataFusion physical plans promote string columns to Utf8View inside
aggregations (group-by keys, hash tables). Under the streaming transport,
the shard ships that VSR straight to the coordinator via Arrow C Data +
Flight; the reducer's partition stream is declared Utf8 (from Calcite),
so a Utf8View batch reinterprets as broken Utf8 and the downstream read
either OOMs the heap (multi-GB alleged length on VarCharVector.get) or
throws NegativeArraySizeException. The non-streaming path doesn't see
this because RowResponseCodec round-trips values through Object[] and
rebuilds a fresh Utf8 VSR on the coordinator.

Fix on the streaming receive path: normalize each inbound VSR so every
variable-width string column is a VarCharVector (Utf8) before handing
it to the reducer sink. Other vector types transfer through unchanged.

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
…ch ordering

appendpipe lowers to UnionExec with CoalescePartitionsExec, which polls
both partitions concurrently and emits whichever completes first. Under
streaming transport, tokio scheduling can flip the winner causing branch
1 output to precede branch 0 — breaking appendpipe semantics.

Fix: close senders sequentially with a semaphore-based synchronization
point between each. After closing sender-i, wait for the drain thread to
re-enter streamNext (proving all branch-i output has been consumed)
before closing sender-(i+1). Single-sender cases degenerate to one close
with no wait.

Known limitation: a TOCTOU race exists where tryAcquire can grab the
permit from release() before streamNext is actually entered. This needs
a follow-up fix but significantly reduces the failure window.

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
@bowenlan-amzn bowenlan-amzn force-pushed the stream-transport-qa branch from 601f5c8 to 30f0c0e Compare May 10, 2026 19:26
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 30f0c0e

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 30f0c0e: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant