diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index f3b13e69a6..9be63dbaf3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -490,16 +490,17 @@ public void run() { OptimizerKeepingTask keepingTask = suspendingQueue.take(); String token = keepingTask.getToken(); boolean isExpired = !keepingTask.tryKeeping(); + if (isExpired) { + LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); + unregisterOptimizer(token); + } Optional.ofNullable(keepingTask.getQueue()) .ifPresent( queue -> queue .collectTasks(buildSuspendingPredication(authOptimizers.keySet())) .forEach(task -> retryTask(task, queue))); - if (isExpired) { - LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer()); - unregisterOptimizer(token); - } else { + if (!isExpired) { LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer()); keepInTouch(keepingTask.getOptimizer()); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index ec207705c7..ea4bb78b36 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -912,6 +912,14 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) { taskMap.put(taskRuntime.getTaskId(), taskRuntime); if (taskRuntime.getStatus() == TaskRuntime.Status.PLANNED) { taskQueue.offer(taskRuntime); + } else if (taskRuntime.getStatus() == TaskRuntime.Status.SCHEDULED + || taskRuntime.getStatus() == TaskRuntime.Status.ACKED) { + LOG.info( + "Reset task {} from {} to PLANNED during recovery", + taskRuntime.getTaskId(), + taskRuntime.getStatus()); + taskRuntime.reset(); + taskQueue.offer(taskRuntime); } else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) { retryTask(taskRuntime); } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index f9fa8ce94d..1fe6dc2a17 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -248,7 +248,9 @@ public void testTouchTimeout() throws InterruptedException { Assertions.assertThrows(PluginRetryAuthException.class, () -> optimizingService().touch(token)); Assertions.assertThrows( PluginRetryAuthException.class, () -> optimizingService().pollTask(token, THREAD_ID)); - assertTaskStatus(TaskRuntime.Status.SCHEDULED); + // After optimizer expires, its tasks are immediately reset to PLANNED + // because unregister happens before task scan in OptimizerKeeper + assertTaskStatus(TaskRuntime.Status.PLANNED); token = optimizingService().authenticate(buildRegisterInfo()); toucher = new Toucher(); Thread.sleep(1000); @@ -333,13 +335,19 @@ public void testReloadScheduledTask() { OptimizingTask task = optimizingService().pollTask(token, THREAD_ID); Assertions.assertNotNull(task); + // After reload, SCHEDULED tasks are reset to PLANNED during recovery reload(); - assertTaskStatus(TaskRuntime.Status.SCHEDULED); - optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + assertTaskStatus(TaskRuntime.Status.PLANNED); + + // Re-poll the task to get it scheduled again + OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task2); + Assertions.assertEquals(task2.getTaskId(), task.getTaskId()); + optimizingService().ackTask(token, THREAD_ID, task2.getTaskId()); TaskRuntime taskRuntime = optimizingService().listTasks(defaultResourceGroup().getName()).get(0); - optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId())); assertTaskCompleted(taskRuntime); } @@ -350,12 +358,19 @@ public void testReloadAckTask() { Assertions.assertNotNull(task); optimizingService().ackTask(token, THREAD_ID, task.getTaskId()); + // After reload, ACKED tasks are reset to PLANNED during recovery reload(); - assertTaskStatus(TaskRuntime.Status.ACKED); + assertTaskStatus(TaskRuntime.Status.PLANNED); + + // Re-poll and re-ack the task + OptimizingTask task2 = optimizingService().pollTask(token, THREAD_ID); + Assertions.assertNotNull(task2); + Assertions.assertEquals(task2.getTaskId(), task.getTaskId()); + optimizingService().ackTask(token, THREAD_ID, task2.getTaskId()); TaskRuntime taskRuntime = optimizingService().listTasks(defaultResourceGroup().getName()).get(0); - optimizingService().completeTask(token, buildOptimizingTaskResult(task.getTaskId())); + optimizingService().completeTask(token, buildOptimizingTaskResult(task2.getTaskId())); assertTaskCompleted(taskRuntime); }