Skip to content

Batching for persistent task cluster service which will help C…#21245

Open
mohit10011999 wants to merge 8 commits intoopensearch-project:mainfrom
mohit10011999:PersistentTasksClusterService
Open

Batching for persistent task cluster service which will help C…#21245
mohit10011999 wants to merge 8 commits intoopensearch-project:mainfrom
mohit10011999:PersistentTasksClusterService

Conversation

@mohit10011999
Copy link
Copy Markdown
Contributor

@mohit10011999 mohit10011999 commented Apr 16, 2026

…CR plugin from overwhelming cluster manager during high volume of incoming replication tasks

Description

  1. Added PersistentTaskUpdateEntry class with OperationType enum covering all five operations: CREATE, UPDATE_STATE, COMPLETE, REMOVE, UNASSIGN
  2. Added PersistentTaskUpdateExecutor - a singleton ClusterStateTaskExecutor that batches all persistent task operations into a single cluster state publish cycle
  3. Converted all five public methods (createPersistentTask, completePersistentTask, removePersistentTask, updatePersistentTaskState, unassignPersistentTask) from individual ClusterStateUpdateTask submissions to batched submitPersistentTaskUpdate calls
  4. Removed separate throttling key registrations for CREATE_PERSISTENT_TASK, FINISH_PERSISTENT_TASK, and REMOVE_PERSISTENT_TASK - all operations now share the single UPDATE_TASK_STATE throttling key
  5. Preserved periodicRechecker scheduling logic for CREATE operations when a task ends up unassigned

Code flow:-

Step 1: Data node triggers a persistent task state change
Step 2: Transport action on cluster manager receives the request
Step 3: PersistentTasksClusterService creates an entry and submits it (this PR changes)
Step 4: ClusterManagerService delegates to TaskBatcher
Step 5: ClusterManagerTaskThrottler.onBeginSubmit checks the threshold
Step 6: TaskBatcher accumulates tasks
Step 7: TaskBatcher.runIfNotProcessed drains all pending tasks
Step 8: ClusterManagerService.Batcher.run calls the executor
Step 9: PersistentTaskUpdateExecutor.execute processes the batch
Step 10: ClusterManagerService publishes the new cluster state
Step 11: Listeners are called back

Before vs After example:

Before: 460 requests - 460 separate ClusterStateUpdateTask instances - 460 different batching keys - 460 separate execute() calls - 460 cluster state publish cycles.

After: 460 requests - 460 PersistentTaskUpdateEntry objects - 1 batching key (singleton executor) - 1 execute() call processing all 460 - 1 cluster state publish cycle.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@mohit10011999 mohit10011999 requested a review from a team as a code owner April 16, 2026 04:56
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 8ccb182.

PathLineSeverityDescription
server/src/main/java/org/opensearch/cluster/service/ClusterManagerTask.java40lowUPDATE_TASK_STATE throttling threshold increased 6x from 50 to 300. While consistent with the batching refactor, this allows significantly more cluster-state update operations before throttling engages. Should be verified against intended capacity planning.
server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java110lowThree previously independent throttling keys (createPersistentTaskKey, finishPersistentTaskKey, removePersistentTaskKey at 50 ops each) are removed and all operation types (CREATE, COMPLETE, REMOVE, UPDATE, UNASSIGN) now share a single throttling key (updatePersistentTaskKey at 300). This collapses per-operation-type throttling into a shared pool, which could allow bursts of task creation or removal that were previously individually rate-limited.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

PR Reviewer Guide 🔍

(Review updated until commit 8ccb182)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible NPE

In the COMPLETE case at line 228, when a task exists but has a wrong allocation ID, the code logs a warning that accesses existingTask.getTaskName(). If getTaskWithId returns null, this will throw NPE. The null check at line 227 only guards the else branch, not the warning log at line 229.

if (tasksBuilder.hasTask(entry.taskId)) {
    PersistentTask<?> existingTask = PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId);
    logger.warn(
        "The task [{}] with id [{}] has a different allocation id [{}], status is not updated",
        existingTask != null ? existingTask.getTaskName() : "unknown",
        entry.taskId,
        entry.allocationId
    );
Incorrect State Lookup

At lines 400-401, the logic uses oldState for COMPLETE and REMOVE operations to retrieve the task for the listener callback. However, for REMOVE operations that succeed, the task still exists in newState until the cluster state is published. Using oldState here means the listener receives a stale task object that doesn't reflect any state changes made by other entries in the same batch. For COMPLETE, this is correct since the task is removed. For REMOVE, it should use newState.

ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
    || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? oldState : newState;
PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId);

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Apr 16, 2026

PR Code Suggestions ✨

Latest suggestions up to 8ccb182

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent lost updates in batch

The executor rebuilds the metadata builder from scratch for each entry in the batch,
which can lead to lost updates when multiple entries modify the same task. If entry
A updates task X and entry B also updates task X, entry B's builder won't see entry
A's changes because it's built from the original state, not the intermediate state
after A.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [187-267]

+PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state);
 for (PersistentTaskUpdateEntry entry : tasks) {
     try {
-        PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state);
         switch (entry.operationType) {
             case CREATE:
                 ...
             case UPDATE_STATE:
                 ...
             case COMPLETE:
                 ...
             case REMOVE:
                 ...
             case UNASSIGN:
                 ...
         }
+        state = update(state, tasksBuilder);
         resultBuilder.success(entry);
     } catch (Exception e) {
         resultBuilder.failure(entry, e);
     }
 }
Suggestion importance[1-10]: 10

__

Why: This is a critical bug. Creating a new tasksBuilder from state inside the loop means each entry operates on the original state, not the accumulated changes. This causes lost updates when multiple entries modify the same or different tasks in a batch. The fix correctly moves the builder outside the loop and updates state after each successful operation.

High

Previous suggestions

Suggestions up to commit 03654bd
Suggestions up to commit 3721d13
CategorySuggestion                                                                                                                                    Impact
Possible issue
Removed throttling keys break operation throttling

All operation types (CREATE, COMPLETE, REMOVE, UNASSIGN, UPDATE_STATE) now share the
single updatePersistentTaskKey throttling key, which was previously registered only
for UPDATE_TASK_STATE with a threshold of 300. The CREATE, COMPLETE, and REMOVE
operations previously had their own throttling keys registered with
CREATE_PERSISTENT_TASK, FINISH_PERSISTENT_TASK, and REMOVE_PERSISTENT_TASK. By
routing all operations through updatePersistentTaskKey, the throttling semantics for
those operations have changed silently, and the other three throttling keys are no
longer registered, which may cause issues if other parts of the system still
reference them.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [272-275]

-@Override
-public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() {
-    return updatePersistentTaskKey;
-}
+// Consider registering a dedicated throttling key for the unified executor,
+// or re-registering the removed keys to avoid breaking throttling for CREATE/FINISH/REMOVE operations.
+// Example: use a new ClusterManagerTask enum value for the batched executor.
Suggestion importance[1-10]: 5

__

Why: The PR intentionally consolidates all persistent task operations under a single updatePersistentTaskKey throttling key (with threshold 300), removing the separate createPersistentTaskKey, finishPersistentTaskKey, and removePersistentTaskKey. This is a valid design concern about changed throttling semantics, but the improved_code is just a comment rather than actual code, limiting its actionability.

Low
Batched entries' listeners not all invoked

The clusterStateProcessed callback in submitPersistentTaskUpdate is called once per
submitted entry, but since batching merges multiple entries into one cluster state
update, this single listener only handles the entry that was passed to
submitStateUpdateTask. The per-entry success callbacks should instead be invoked
from within PersistentTaskUpdateExecutor (e.g., via a ClusterTasksResult success
handler), not from a single outer ClusterStateTaskListener, to ensure each batched
entry's listener is called correctly. Otherwise, only the listener for the
last-submitted entry will be invoked via clusterStateProcessed.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [398-403]

-@Override
-public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-    // For COMPLETE/REMOVE use oldState since the task is gone in newState
-    ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
-        || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? oldState : newState;
-    PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId);
-    entry.listener.onResponse(task);
+// Per-entry callbacks should be handled inside PersistentTaskUpdateExecutor.execute()
+// by calling entry.listener directly after each successful state update,
+// rather than relying on a single ClusterStateTaskListener for all batched entries.
Suggestion importance[1-10]: 2

__

Why: This suggestion misunderstands how ClusterStateTaskListener works in OpenSearch's batching framework. The submitStateUpdateTask API with a ClusterStateTaskExecutor calls clusterStateProcessed once per submitted task entry (not once for all batched entries), so each entry's listener is correctly invoked. The improved_code doesn't actually show valid code changes, making this suggestion inaccurate.

Low
General
Unnecessary listener wrapping adds indirection

The original completePersistentTask used oldState to look up the task for the
response (since the task is removed in newState), but the new clusterStateProcessed
in submitPersistentTaskUpdate already handles this via stateForLookup. However,
wrapping the listener with ActionListener.wrap(listener::onResponse,
listener::onFailure) is redundant and adds an unnecessary indirection — just pass
listener directly, as is done for other operations like removePersistentTask.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [334-343]

 submitPersistentTaskUpdate(
     failure != null ? "finish persistent task (failed)" : "finish persistent task (success)",
     new PersistentTaskUpdateEntry(
         PersistentTaskUpdateEntry.OperationType.COMPLETE,
         id,
         allocationId,
         null,
-        ActionListener.wrap(listener::onResponse, listener::onFailure)
+        listener
     )
 );
Suggestion importance[1-10]: 4

__

Why: The ActionListener.wrap(listener::onResponse, listener::onFailure) is indeed redundant since it just delegates to the same methods on listener. Passing listener directly is cleaner and consistent with other operations like removePersistentTask. The improved_code accurately reflects the suggested change.

Low
Suggestions up to commit 816c60a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Batched entries miss listener callbacks

The clusterStateProcessed callback in submitPersistentTaskUpdate is called once per
batch submission, but when tasks are batched, entry refers only to the single entry
passed to submitPersistentTaskUpdate. However, the ClusterStateTaskListener is
shared across all batched entries — only the listener for the specific entry
captured in the closure is invoked. This means that when multiple entries are
batched into one execute() call, the clusterStateProcessed callback only fires for
the last submitted entry's listener, not for all entries in the batch. The per-entry
success/failure callbacks should be invoked inside the executor's execute() method
or via a dedicated per-task listener mechanism rather than relying on the single
ClusterStateTaskListener.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [398-404]

-@Override
-public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-    // For COMPLETE/REMOVE use oldState since the task is gone in newState
-    ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
-        || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? oldState : newState;
-    PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId);
-    entry.listener.onResponse(task);
+// Per-entry success notification should be handled inside PersistentTaskUpdateExecutor.execute()
+// by calling entry.listener.onResponse(...) directly after resultBuilder.success(entry),
+// rather than relying on the shared ClusterStateTaskListener.clusterStateProcessed callback.
+resultBuilder.success(entry);
+// Notify listener inline:
+ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
+    || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? currentState : state;
+entry.listener.onResponse(PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId));
Suggestion importance[1-10]: 7

__

Why: This raises a valid architectural concern: when multiple entries are batched into one execute() call, the clusterStateProcessed callback in submitPersistentTaskUpdate only fires for the single entry captured in the closure, potentially missing callbacks for other batched entries. However, the improved_code doesn't accurately reflect a complete fix and mixes executor logic with listener logic incorrectly.

Medium
Missing throttling keys for non-update operations

The PR removes throttling key registrations for CREATE_PERSISTENT_TASK,
FINISH_PERSISTENT_TASK, and REMOVE_PERSISTENT_TASK, but all operations (CREATE,
COMPLETE, REMOVE, UNASSIGN, UPDATE_STATE) now share the single
updatePersistentTaskKey which maps to UPDATE_TASK_STATE. This means CREATE,
COMPLETE, REMOVE, and UNASSIGN operations are no longer throttled under their own
keys and are instead counted against the update-task-state throttle bucket, which
may cause incorrect throttling behavior or bypass throttling for those operation
types entirely.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [113]

+createPersistentTaskKey = clusterService.registerClusterManagerTask(CREATE_PERSISTENT_TASK, true);
+finishPersistentTaskKey = clusterService.registerClusterManagerTask(FINISH_PERSISTENT_TASK, true);
+removePersistentTaskKey = clusterService.registerClusterManagerTask(REMOVE_PERSISTENT_TASK, true);
 updatePersistentTaskKey = clusterService.registerClusterManagerTask(UPDATE_TASK_STATE, true);
Suggestion importance[1-10]: 5

__

Why: The suggestion raises a valid concern that CREATE, COMPLETE, REMOVE, and UNASSIGN operations now share the update-task-state throttle bucket. However, the PR intentionally consolidates all operations under a single batching executor with one throttling key, which is the design goal. Reverting to separate keys would undermine the batching approach.

Low
General
Unnecessary listener wrapping obscures callback issues

For completePersistentTask, the original code used oldState to look up the task for
the listener response (since the task is removed in newState). The new batched path
wraps the listener with ActionListener.wrap(listener::onResponse,
listener::onFailure) and relies on clusterStateProcessed to call
entry.listener.onResponse(task) using oldState. However, the clusterStateProcessed
callback only fires for the single entry captured in the submitPersistentTaskUpdate
closure — if this entry is batched with others, the callback may not fire at all for
this entry. Pass the original listener directly instead of wrapping it, since the
wrapping adds no value and may obscure the issue.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [334-343]

 submitPersistentTaskUpdate(
     failure != null ? "finish persistent task (failed)" : "finish persistent task (success)",
     new PersistentTaskUpdateEntry(
         PersistentTaskUpdateEntry.OperationType.COMPLETE,
         id,
         allocationId,
         null,
-        ActionListener.wrap(listener::onResponse, listener::onFailure)
+        listener
     )
 );
Suggestion importance[1-10]: 3

__

Why: Removing the ActionListener.wrap is a minor cleanup since wrapping listener::onResponse and listener::onFailure adds no functional value. However, this is a cosmetic change and the core concern about batched callback firing is better addressed in suggestion 1.

Low
Verify builder uses latest batch state

In the COMPLETE case, PersistentTasksCustomMetadata.getTaskWithId(state,
entry.taskId) looks up the task in state (the evolving batch state), but
tasksBuilder was constructed from state at the start of this iteration. If a prior
batch entry already removed or modified the task, state reflects those changes, so
getTaskWithId correctly uses the current state. However, tasksBuilder is re-created
from state at the top of each loop iteration (builder(state)), so this is
consistent. The real issue is that tasksBuilder is rebuilt each iteration but the
state variable used for getTaskWithId is the pre-update state for this iteration —
this is correct. No change needed here, but verify that builder(state) is called at
the top of each loop iteration to ensure tasksBuilder always reflects the latest
state.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [222-235]

-case COMPLETE:
-    if (tasksBuilder.hasTask(entry.taskId, entry.allocationId)) {
-        tasksBuilder.removeTask(entry.taskId);
-        state = update(state, tasksBuilder);
-    } else {
-        if (tasksBuilder.hasTask(entry.taskId)) {
-            PersistentTask<?> existingTask = PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId);
-            logger.warn(
-                "The task [{}] with id [{}] has a different allocation id [{}], status is not updated",
-                existingTask != null ? existingTask.getTaskName() : "unknown",
-                entry.taskId,
-                entry.allocationId
-            );
+for (PersistentTaskUpdateEntry entry : tasks) {
+    try {
+        PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state); // already correct
+        switch (entry.operationType) {
+            ...
Suggestion importance[1-10]: 2

__

Why: This suggestion asks to verify existing behavior rather than proposing a concrete fix, and the improved_code is essentially the same as the existing code. The concern about builder(state) being called at the top of each iteration is already correctly implemented in the PR.

Low
Suggestions up to commit 5dc70a0
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid duplicate listener callbacks in batched execution

The clusterStateProcessed callback in submitPersistentTaskUpdate is called once per
batch submission, but when tasks are batched, entry refers only to the single entry
submitted by this call. However, the ClusterStateTaskListener is shared across all
entries in the batch via the executor's execute() method — each entry's
success/failure is handled by resultBuilder.success(entry) /
resultBuilder.failure(entry, e), which invokes the per-entry listener. The outer
ClusterStateTaskListener passed to submitStateUpdateTask is only called for the
overall batch result, meaning clusterStateProcessed here may be called for every
submitted entry individually, potentially causing duplicate onResponse calls. Verify
that the ClusterStateTaskListener callbacks are not invoked redundantly alongside
the per-entry success callbacks in the executor.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [398-403]

-@Override
-public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-    // For COMPLETE/REMOVE use oldState since the task is gone in newState
-    ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
-        || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? oldState : newState;
-    PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId);
-    entry.listener.onResponse(task);
+// Consider removing the clusterStateProcessed callback from the outer ClusterStateTaskListener
+// and handling all per-entry responses exclusively within the executor's execute() method
+// via resultBuilder.success(entry) to avoid potential duplicate listener invocations.
Suggestion importance[1-10]: 6

__

Why: This raises a valid concern about potential duplicate onResponse calls when both the per-entry resultBuilder.success(entry) and the outer clusterStateProcessed callback invoke the listener. However, the suggestion's improved_code is just a comment rather than actual code, making it hard to evaluate the fix concretely.

Low
General
Consolidating throttling keys may bypass per-operation limits

The PR removes throttling keys for CREATE_PERSISTENT_TASK, FINISH_PERSISTENT_TASK,
and REMOVE_PERSISTENT_TASK, but all operations (CREATE, COMPLETE, REMOVE, UNASSIGN,
UPDATE_STATE) are now routed through a single executor that uses only
updatePersistentTaskKey (mapped to UPDATE_TASK_STATE with threshold 300). This means
CREATE, COMPLETE, REMOVE, and UNASSIGN operations no longer have their own
throttling keys and will all share the UPDATE_TASK_STATE throttling bucket. This
could allow significantly more CREATE/REMOVE/COMPLETE operations than previously
permitted (50 each), potentially overwhelming the cluster manager. Ensure this
consolidation is intentional and the threshold of 300 accounts for all operation
types combined.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [112-114]

-// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
+// If separate throttling per operation type is needed, register individual keys:
+// createPersistentTaskKey = clusterService.registerClusterManagerTask(CREATE_PERSISTENT_TASK, true);
+// finishPersistentTaskKey = clusterService.registerClusterManagerTask(FINISH_PERSISTENT_TASK, true);
+// removePersistentTaskKey = clusterService.registerClusterManagerTask(REMOVE_PERSISTENT_TASK, true);
 updatePersistentTaskKey = clusterService.registerClusterManagerTask(UPDATE_TASK_STATE, true);
 this.updateTaskStateExecutor = new PersistentTaskUpdateExecutor();
Suggestion importance[1-10]: 5

__

Why: This is a legitimate design concern — consolidating CREATE, COMPLETE, REMOVE, and UNASSIGN operations under a single UPDATE_TASK_STATE throttling key with threshold 300 changes the throttling semantics significantly. However, the improved_code just adds comments suggesting reverting to separate keys, which contradicts the PR's intent of batching all operations together.

Low
Remove unnecessary listener wrapping

The original completePersistentTask used oldState for the listener response (since
the task is removed in newState), but the new batched path's clusterStateProcessed
callback handles this via stateForLookup. However, the
ActionListener.wrap(listener::onResponse, listener::onFailure) wrapping is
unnecessary — it just delegates to the same listener. More importantly, the original
code returned PersistentTasksCustomMetadata.getTaskWithId(oldState, id) which could
return the task that was just removed, but the new clusterStateProcessed uses
oldState for COMPLETE which is correct. Verify the oldState used in
clusterStateProcessed is the state before the entire batch executes, not just before
this specific entry, as other entries in the same batch may have already modified
the state.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [330-343]

-public void completePersistentTask(String id, long allocationId, Exception failure, ActionListener<PersistentTask<?>> listener) {
-    if (failure != null) {
-        logger.warn("persistent task " + id + " failed", failure);
-    }
-    submitPersistentTaskUpdate(
-        failure != null ? "finish persistent task (failed)" : "finish persistent task (success)",
-        new PersistentTaskUpdateEntry(
-            PersistentTaskUpdateEntry.OperationType.COMPLETE,
-            id,
-            allocationId,
-            null,
-            ActionListener.wrap(listener::onResponse, listener::onFailure)
-        )
-    );
-}
+submitPersistentTaskUpdate(
+    failure != null ? "finish persistent task (failed)" : "finish persistent task (success)",
+    new PersistentTaskUpdateEntry(
+        PersistentTaskUpdateEntry.OperationType.COMPLETE,
+        id,
+        allocationId,
+        null,
+        listener
+    )
+);
Suggestion importance[1-10]: 3

__

Why: The suggestion to remove ActionListener.wrap(listener::onResponse, listener::onFailure) and pass listener directly is a minor cleanup. The wrapping is functionally equivalent but adds unnecessary indirection. The concern about oldState in batched context is valid but not addressed by the proposed code change.

Low
Verify builder reflects accumulated batch state

In the COMPLETE case, PersistentTasksCustomMetadata.getTaskWithId(state,
entry.taskId) uses the accumulated state variable (not currentState), which reflects
mutations from prior entries in the same batch. If a prior entry already removed
this task, tasksBuilder.hasTask(entry.taskId) would be false and execution falls to
the outer else branch — but if a prior entry modified the task in a way that
hasTask(entry.taskId) is still true but getTaskWithId(state, entry.taskId) returns
null due to state inconsistency, the null-check guard existingTask != null ?
existingTask.getTaskName() : "unknown" is correct. However, tasksBuilder is rebuilt
fresh each iteration via builder(state), so this should be safe. Confirm that
builder(state) always reflects the latest accumulated state to avoid stale builder
references.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [189-192]

-case COMPLETE:
-    if (tasksBuilder.hasTask(entry.taskId, entry.allocationId)) {
-        tasksBuilder.removeTask(entry.taskId);
-        state = update(state, tasksBuilder);
-    } else {
-        if (tasksBuilder.hasTask(entry.taskId)) {
-            PersistentTask<?> existingTask = PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId);
-            logger.warn(
-                "The task [{}] with id [{}] has a different allocation id [{}], status is not updated",
-                existingTask != null ? existingTask.getTaskName() : "unknown",
-                entry.taskId,
-                entry.allocationId
-            );
+PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state);
+switch (entry.operationType) {
+    // ... (no change needed if builder(state) correctly reflects accumulated state)
Suggestion importance[1-10]: 2

__

Why: This is more of a verification request than an actionable fix. The improved_code is essentially the same as the existing code with a comment, and the concern about stale builder references is already handled correctly since builder(state) is called at the start of each loop iteration.

Low
Suggestions up to commit b0baad9
CategorySuggestion                                                                                                                                    Impact
Possible issue
Use correct allocation ID in completion test

The test uses tasks.getLastAllocationId() - 1 as the allocation ID for taskId1, but
tasks.getLastAllocationId() returns the last allocation ID assigned (which belongs
to taskId2 since it was added second). The allocation ID for taskId1 should be
tasks.getLastAllocationId() - 1, which may or may not be correct depending on how
allocation IDs are assigned. This could cause the COMPLETE operation to fail with a
ResourceNotFoundException if the allocation IDs don't match, making the test
unreliable. Use the actual allocation ID from the built tasks metadata instead.

server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java [1035-1039]

-service.completePersistentTask(taskId1, tasks.getLastAllocationId() - 1, null, ActionListener.wrap(task -> {
+PersistentTasksCustomMetadata builtTasks = tasks.build();
+service.completePersistentTask(taskId1, builtTasks.getTask(taskId1).getAllocationId(), null, ActionListener.wrap(task -> {
     assertThat(task, is(notNullValue()));
     assertThat(task.getId(), equalTo(taskId1));
     completed1.set(true);
 }, e -> fail()));
Suggestion importance[1-10]: 7

__

Why: Using tasks.getLastAllocationId() - 1 as the allocation ID for taskId1 is fragile and assumes sequential allocation IDs. The suggestion to use builtTasks.getTask(taskId1).getAllocationId() is more robust and directly retrieves the correct allocation ID, preventing potential test failures.

Medium
Prevent potential NullPointerException in task lookup

In the COMPLETE case, PersistentTasksCustomMetadata.getTaskWithId(state,
entry.taskId) is called to get the task name for the warning log, but state here is
the evolving local variable (not currentState). If a prior entry in the same batch
already removed or modified this task, getTaskWithId could return null, causing a
NullPointerException when calling .getTaskName(). Add a null check before accessing
getTaskName().

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [222-233]

 case COMPLETE:
     if (tasksBuilder.hasTask(entry.taskId, entry.allocationId)) {
         tasksBuilder.removeTask(entry.taskId);
         state = update(state, tasksBuilder);
     } else {
         if (tasksBuilder.hasTask(entry.taskId)) {
+            PersistentTask<?> existingTask = PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId);
             logger.warn(
                 "The task [{}] with id [{}] has a different allocation id [{}], status is not updated",
-                PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId).getTaskName(),
+                existingTask != null ? existingTask.getTaskName() : "unknown",
                 entry.taskId,
                 entry.allocationId
             );
Suggestion importance[1-10]: 6

__

Why: In the COMPLETE case, PersistentTasksCustomMetadata.getTaskWithId(state, entry.taskId) could return null if a prior batch entry already removed the task, causing a NullPointerException on .getTaskName(). The fix adds a null check which is a valid defensive improvement.

Low
Verify per-entry callback invocation in batched execution

The clusterStateProcessed callback in submitPersistentTaskUpdate is called once per
batch submission, but when tasks are batched, the single entry captured in the
closure only represents one task. All other tasks in the batch will have their
clusterStateProcessed called via the executor's result, not this listener. This
means the listener here is only correct for the single submitted entry, but the
entry.taskId used for lookup may not reflect all batched tasks' results. Since each
call to submitPersistentTaskUpdate submits one entry, this is functionally correct
per-submission, but the clusterStateProcessed is only invoked for the batch as a
whole — verify that the framework calls this listener per-entry or only once per
batch, as this could cause missed callbacks for some entries.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [397-402]

-@Override
-public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
-    // For COMPLETE/REMOVE use oldState since the task is gone in newState
-    ClusterState stateForLookup = (entry.operationType == PersistentTaskUpdateEntry.OperationType.COMPLETE
-        || entry.operationType == PersistentTaskUpdateEntry.OperationType.REMOVE) ? oldState : newState;
-    PersistentTask<?> task = PersistentTasksCustomMetadata.getTaskWithId(stateForLookup, entry.taskId);
-    entry.listener.onResponse(task);
+// No code change needed if the framework calls clusterStateProcessed per-entry.
+// If called once per batch, consider moving response notification into the executor's execute() method
+// or using a per-entry success callback pattern instead.
Suggestion importance[1-10]: 3

__

Why: The suggestion raises a valid concern about whether clusterStateProcessed is called per-entry or once per batch, but since each call to submitPersistentTaskUpdate submits exactly one entry with its own listener, the callback is correctly scoped to that single entry. The 'improved_code' is not actual code but a comment, making this more of a verification note than an actionable fix.

Low
General
Ensure fresh builder per entry in batch loop

A new PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state) is created
at the top of each loop iteration from the current state. However, builder(state)
likely creates a builder from the current metadata snapshot. If the builder is
mutable and shared across operations within the same iteration (e.g.,
tasksBuilder.removeTask is called and then state = update(state, tasksBuilder)),
subsequent iterations correctly get a fresh builder from the updated state. This
looks correct, but verify that builder(state) always creates a fresh independent
builder from the current state to avoid cross-entry contamination within a batch.

server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java [189-200]

-PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state);
-switch (entry.operationType) {
-    case CREATE:
-        ...
-        state = update(state, tasksBuilder.addTask(...));
-        break;
-    case UPDATE_STATE:
-        ...
-        state = update(state, tasksBuilder.updateTaskState(...));
-        ...
+for (PersistentTaskUpdateEntry entry : tasks) {
+    try {
+        // Ensure a fresh builder is created from the latest state for each entry
+        PersistentTasksCustomMetadata.Builder tasksBuilder = builder(state);
+        // ... rest of switch
Suggestion importance[1-10]: 2

__

Why: The existing code already creates a fresh tasksBuilder = builder(state) at the top of each loop iteration using the updated state, so this is already correct. The suggestion's 'existing_code' doesn't match the actual PR diff and the 'improved_code' is essentially identical to what's already there.

Low

@mohit10011999 mohit10011999 changed the title Adding batching for persistent task cluster service which will help C… Fixing batching for persistent task cluster service which will help C… Apr 16, 2026
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1b72b46: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 1b72b46 to 327cb4a Compare April 16, 2026 06:18
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 327cb4a

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 327cb4a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 327cb4a to 8607b88 Compare April 16, 2026 09:27
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 8607b88

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 8607b88: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 16, 2026

Codecov Report

❌ Patch coverage is 91.01124% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.53%. Comparing base (c6cda3d) to head (b0baad9).

Files with missing lines Patch % Lines
...arch/persistent/PersistentTasksClusterService.java 90.90% 6 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21245      +/-   ##
============================================
+ Coverage     73.50%   73.53%   +0.03%     
+ Complexity    74473    74464       -9     
============================================
  Files          5967     5967              
  Lines        338227   338240      +13     
  Branches      48754    48759       +5     
============================================
+ Hits         248612   248725     +113     
+ Misses        69820    69678     -142     
- Partials      19795    19837      +42     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 25b5959

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1734210

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1734210: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 1734210 to af431f7 Compare April 16, 2026 19:13
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 380e3f0

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 380e3f0: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 380e3f0 to bdf55d3 Compare April 16, 2026 21:58
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit bdf55d3

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for bdf55d3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1367ad4

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 1367ad4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 1367ad4 to c60ae39 Compare April 17, 2026 05:58
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c60ae39

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f3f6710

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f3f6710: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from f3f6710 to e215394 Compare April 17, 2026 07:37
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e215394

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ded318d: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from ded318d to b0baad9 Compare May 4, 2026 05:23
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

Persistent review updated to latest commit b0baad9

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

✅ Gradle check result for b0baad9: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

Persistent review updated to latest commit 5dc70a0

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 4, 2026

❌ Gradle check result for 5dc70a0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 5dc70a0 to 816c60a Compare May 5, 2026 06:06
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

Persistent review updated to latest commit 816c60a

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 5, 2026

❌ Gradle check result for 816c60a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch 2 times, most recently from 3397e08 to 3721d13 Compare May 7, 2026 06:42
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 3721d13

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

❌ Gradle check result for 3721d13: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

mohitamg added 5 commits May 7, 2026 15:59
…CR plugin from overwhelming cluster manager during high volume of incoming replication tasks

Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
…CR plugin from overwhelming cluster manager during high volume of incoming replication tasks

Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
…ice batching

Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
Signed-off-by: Mohit Kumar <mohitamg@amazon.com>
@mohit10011999 mohit10011999 force-pushed the PersistentTasksClusterService branch from 3721d13 to 03654bd Compare May 7, 2026 10:29
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Persistent review updated to latest commit 03654bd

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

❌ Gradle check result for 03654bd: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@soosinha
Copy link
Copy Markdown
Member

soosinha commented May 7, 2026

Please provide the testing details with CCR.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

Persistent review updated to latest commit 776b6e2

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 8, 2026

❌ Gradle check result for 776b6e2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Persistent review updated to latest commit 8ccb182

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

❌ Gradle check result for 8ccb182: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants