-
-
Notifications
You must be signed in to change notification settings - Fork 2
bugfix: Ensure deferred checkpoint includes successor tasks in execution queue #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| """Test for deferred checkpoint resume bug. | ||
|
|
||
| Bug: When a task requests a deferred checkpoint via task_ctx.checkpoint(), | ||
| the checkpoint is created BEFORE successor tasks are added to the queue. | ||
| This means resuming from the checkpoint results in an empty queue, | ||
| and successor tasks are never executed. | ||
|
|
||
| Expected: After resuming from a deferred checkpoint, the engine should | ||
| execute remaining tasks (successors of the checkpointed task). | ||
| """ | ||
|
|
||
| import os | ||
| import tempfile | ||
|
|
||
| from graflow.core.checkpoint import CheckpointManager | ||
| from graflow.core.context import TaskExecutionContext | ||
| from graflow.core.decorators import task | ||
| from graflow.core.engine import WorkflowEngine | ||
| from graflow.core.workflow import workflow | ||
|
|
||
|
|
||
| class TestDeferredCheckpointResume: | ||
| """Test that deferred checkpoint includes successor tasks.""" | ||
|
|
||
| def test_deferred_checkpoint_resumes_with_successors(self): | ||
| """Resuming from a deferred checkpoint should execute remaining tasks. | ||
|
|
||
| Scenario: step_1 >> step_2 >> step_3 | ||
| - step_2 requests a deferred checkpoint | ||
| - Checkpoint should include step_3 in the queue | ||
| - Resuming should execute step_3 and return its result | ||
| """ | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| checkpoint_path = os.path.join(tmpdir, "test_checkpoint") | ||
|
|
||
| # === Part 1: Execute workflow with checkpoint === | ||
| with workflow("checkpoint_test") as wf: | ||
|
|
||
| @task | ||
| def step_1() -> str: | ||
| return "data_prepared" | ||
|
|
||
| @task(inject_context=True) | ||
| def step_2(task_ctx: TaskExecutionContext) -> str: | ||
| task_ctx.checkpoint(path=checkpoint_path, metadata={"stage": "after_step_2"}) | ||
| return "data_processed" | ||
|
|
||
| @task | ||
| def step_3() -> str: | ||
| return "completed" | ||
|
|
||
| _ = step_1 >> step_2 >> step_3 | ||
| _result, context = wf.execute("step_1", ret_context=True) | ||
|
|
||
| # Verify initial execution completed all steps | ||
| assert context.get_result("step_1") == "data_prepared" | ||
| assert context.get_result("step_2") == "data_processed" | ||
| assert context.get_result("step_3") == "completed" | ||
| assert context.last_checkpoint_path is not None | ||
|
|
||
| # === Part 2: Resume from checkpoint === | ||
| restored_ctx, _metadata = CheckpointManager.resume_from_checkpoint(context.last_checkpoint_path) | ||
|
|
||
| # Verify checkpoint state: step_1 and step_2 completed | ||
| assert "step_1" in restored_ctx.completed_tasks | ||
| assert "step_2" in restored_ctx.completed_tasks | ||
| assert "step_3" not in restored_ctx.completed_tasks | ||
| assert restored_ctx.get_result("step_1") == "data_prepared" | ||
| assert restored_ctx.get_result("step_2") == "data_processed" | ||
|
|
||
| # Resume execution - step_3 should be executed | ||
| engine = WorkflowEngine() | ||
| final_result = engine.execute(restored_ctx) | ||
|
|
||
| # BUG: step_3 result is None because it was never executed | ||
| # EXPECTED: step_3 should execute and return "completed" | ||
| assert restored_ctx.get_result("step_3") == "completed", ( | ||
| "step_3 should have been executed after resuming from checkpoint. " | ||
| f"Got: {restored_ctx.get_result('step_3')}" | ||
| ) | ||
| assert final_result == "completed" | ||
|
|
||
| def test_deferred_checkpoint_queue_contains_successor(self): | ||
| """Verify the checkpoint's queue contains successor tasks. | ||
|
|
||
| This is the root cause test: check that the saved checkpoint | ||
| has the successor task in its pending queue. | ||
| """ | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| checkpoint_path = os.path.join(tmpdir, "test_checkpoint") | ||
|
|
||
| with workflow("queue_test") as wf: | ||
|
|
||
| @task | ||
| def task_a() -> str: | ||
| return "a_done" | ||
|
|
||
| @task(inject_context=True) | ||
| def task_b(task_ctx: TaskExecutionContext) -> str: | ||
| task_ctx.checkpoint(path=checkpoint_path) | ||
| return "b_done" | ||
|
|
||
| @task | ||
| def task_c() -> str: | ||
| return "c_done" | ||
|
|
||
| _ = task_a >> task_b >> task_c | ||
| _result, context = wf.execute("task_a", ret_context=True) | ||
|
|
||
| # Resume and check queue state | ||
| assert context.last_checkpoint_path is not None, ( | ||
| "Expected deferred checkpoint to be created before resuming" | ||
| ) | ||
| restored_ctx, _ = CheckpointManager.resume_from_checkpoint(context.last_checkpoint_path) | ||
|
|
||
| # The queue should contain task_c as a pending task | ||
| pending = list(restored_ctx.task_queue.get_pending_task_specs()) | ||
| pending_ids = [spec.task_id for spec in pending] | ||
|
|
||
| assert "task_c" in pending_ids, ( | ||
| f"task_c should be in pending queue after checkpoint resume. Pending tasks: {pending_ids}" | ||
| ) | ||
|
|
||
| # === Resume execution from restored context === | ||
| engine = WorkflowEngine() | ||
| final_result = engine.execute(restored_ctx) | ||
|
|
||
| # Verify prior results are still intact | ||
| assert restored_ctx.get_result("task_a") == "a_done" | ||
| assert restored_ctx.get_result("task_b") == "b_done" | ||
|
|
||
| # Verify task_c was executed from the restored context | ||
| assert restored_ctx.get_result("task_c") == "c_done", ( | ||
| f"task_c should have been executed after resume. Got: {restored_ctx.get_result('task_c')}" | ||
| ) | ||
| assert final_result == "c_done" | ||
| assert "task_c" in restored_ctx.completed_tasks | ||
|
|
||
| def test_deferred_checkpoint_with_terminate_workflow(self): | ||
| """A task that requests both checkpoint and terminate should still produce a checkpoint. | ||
|
|
||
| Scenario: step_1 >> step_2 >> step_3 | ||
| - step_2 requests a deferred checkpoint AND terminates the workflow | ||
| - Checkpoint should still be created (before break) | ||
| - step_3 should NOT execute (workflow terminated) | ||
| """ | ||
| with tempfile.TemporaryDirectory() as tmpdir: | ||
| checkpoint_path = os.path.join(tmpdir, "terminate_checkpoint") | ||
|
|
||
| with workflow("terminate_test") as wf: | ||
|
|
||
| @task | ||
| def step_1() -> str: | ||
| return "done" | ||
|
|
||
| @task(inject_context=True) | ||
| def step_2(task_ctx: TaskExecutionContext) -> str: | ||
| task_ctx.checkpoint(path=checkpoint_path, metadata={"stage": "before_terminate"}) | ||
| task_ctx.terminate_workflow("early exit") | ||
| return "terminated" | ||
|
|
||
| @task | ||
| def step_3() -> str: | ||
| return "should_not_run" | ||
|
|
||
| _ = step_1 >> step_2 >> step_3 | ||
| _result, context = wf.execute("step_1", ret_context=True) | ||
|
|
||
| # step_3 should not have been executed (workflow terminated at step_2) | ||
| assert context.get_result("step_3") is None | ||
|
|
||
| # Checkpoint should still have been created despite termination | ||
| assert context.last_checkpoint_path is not None | ||
| assert os.path.exists(context.last_checkpoint_path) | ||
|
|
||
| # Verify checkpoint is valid and restorable | ||
| restored_ctx, metadata = CheckpointManager.resume_from_checkpoint(context.last_checkpoint_path) | ||
| assert "step_1" in restored_ctx.completed_tasks | ||
| assert "step_2" in restored_ctx.completed_tasks | ||
| assert restored_ctx.get_result("step_2") == "terminated" | ||
| assert metadata.user_metadata["stage"] == "before_terminate" |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.