From 807224d8c23cd6a9eb515729d9d1277f38b6d6cc Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Sun, 15 Feb 2026 09:01:14 +0900 Subject: [PATCH] [AMORO-4089] Fix optimizing process stuck permanently after AMS restart When AMS restarts, SCHEDULED/ACKED tasks were loaded into taskMap but never re-queued, causing the optimizing process to hang forever. This fix addresses two issues: 1. Reset SCHEDULED/ACKED tasks to PLANNED during recovery in loadTaskRuntimes(), so they are re-queued for execution. 2. Fix race condition in OptimizerKeeper where expired optimizer tokens were still in authOptimizers during task scanning. Move unregisterOptimizer() before collectTasks() so orphaned tasks are correctly detected. Signed-off-by: Jiwon Park --- .../server/DefaultOptimizingService.java | 9 ++++--- .../server/optimizing/OptimizingQueue.java | 8 ++++++ .../server/TestDefaultOptimizingService.java | 27 ++++++++++++++----- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index f3b13e69a6..9be63dbaf3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -490,16 +490,17 @@ public void run() { OptimizerKeepingTask keepingTask = suspendingQueue.take(); String token = keepingTask.getToken(); boolean isExpired = !keepingTask.tryKeeping(); + if (isExpired) { + LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); + unregisterOptimizer(token); + } Optional.ofNullable(keepingTask.getQueue()) .ifPresent( queue -> queue .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) .forEach(task -> retryTask(task, queue))); - if (isExpired) { - LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); - unregisterOptimizer(token); - } else { + if (!isExpired) { LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); keepInTouch(keepingTask.getOptimizer()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index ec207705c7..ea4bb78b36 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -912,6 +912,14 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { taskMap.put(taskRuntime.getTaskId(), taskRuntime); if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) { taskQueue.offer(taskRuntime); + } else if (taskRuntime.getStatus() == TaskRuntime.Status.SCHEDULED + || taskRuntime.getStatus() == TaskRuntime.Status.ACKED) { + LOG.info( + "Reset task {} from {} to PLANNED during recovery", + taskRuntime.getTaskId(), + taskRuntime.getStatus()); + taskRuntime.reset(); + taskQueue.offer(taskRuntime); } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) { retryTask(taskRuntime); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index f9fa8ce94d..1fe6dc2a17 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -248,7 +248,9 @@ public void testTouchTimeout() throws InterruptedException { Assertions.assertThrows(PluginRetryAuthException.class, () -> optimizingService().touch(token)); Assertions.assertThrows( PluginRetryAuthException.class, () -> optimizingService().pollTask(token, THREAD_ID)); - assertTaskStatus(TaskRuntime.Status.SCHEDULED); + // After optimizer expires, its tasks are immediately reset to PLANNED + // because unregister happens before task scan in OptimizerKeeper + assertTaskStatus(TaskRuntime.Status.PLANNED); token = optimizingService().authenticate(buildRegisterInfo()); toucher = new Toucher(); Thread.sleep(1000); @@ -333,13 +335,19 @@ public void testReloadScheduledTask() { OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); Assertions.assertNotNull(task); + // After reload, SCHEDULED tasks are reset to PLANNED during recovery reload(); - assertTaskStatus(TaskRuntime.Status.SCHEDULED); - optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + assertTaskStatus(TaskRuntime.Status.PLANNED); + + // Re-poll the task to get it scheduled again + OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task2); + Assertions.assertEquals(task2.getTaskId(), task.getTaskId()); + optimizingService().ackTask(token, THREAD_ID, task2.getTaskId()); TaskRuntime taskRuntime = optimizingService().listTasks(defaultResourceGroup().getName()).get(0); - optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId())); assertTaskCompleted(taskRuntime); } @@ -350,12 +358,19 @@ public void testReloadAckTask() { Assertions.assertNotNull(task); optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + // After reload, ACKED tasks are reset to PLANNED during recovery reload(); - assertTaskStatus(TaskRuntime.Status.ACKED); + assertTaskStatus(TaskRuntime.Status.PLANNED); + + // Re-poll and re-ack the task + OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task2); + Assertions.assertEquals(task2.getTaskId(), task.getTaskId()); + optimizingService().ackTask(token, THREAD_ID, task2.getTaskId()); TaskRuntime taskRuntime = optimizingService().listTasks(defaultResourceGroup().getName()).get(0); - optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId())); assertTaskCompleted(taskRuntime); }