-
-
Notifications
You must be signed in to change notification settings - Fork 84
workflow: avoid duplicate fan-in execution from concurrent predecessors #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
geofmureithi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some thoughts
apalis-workflow/src/dag/service.rs
Outdated
| .await?; | ||
| // TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed | ||
| if results.iter().all(|s| matches!(s.status, Status::Done)) { | ||
| // ===== FIX START ===== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line
| // ===== FIX START ===== |
apalis-workflow/src/dag/service.rs
Outdated
| pending_dependencies: dependency_task_ids, | ||
| }); | ||
| } | ||
| // ===== FIX END ===== |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this line too
| // ===== FIX END ===== |
|
I accidentally deleted the test conversation, #[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
let counter = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let dag = DagFlow::new("fan-in-single-exec-workflow");
let a = {
let barrier = barrier.clone();
dag.add_node("a", task_fn(move |task: u32| {
let barrier = barrier.clone();
async move {
barrier.wait().await;
task
}
}))
};
let b = {
let barrier = barrier.clone();
dag.add_node("b", task_fn(move |task: u32| {
let barrier = barrier.clone();
async move {
barrier.wait().await;
task
}
}))
};
let counter_clone = Arc::clone(&counter);
let _fan_in = dag
.add_node(
"fan_in",
task_fn(move |task: (u32, u32), worker: WorkerContext| {
let counter = Arc::clone(&counter_clone);
async move {
counter.fetch_add(1, Ordering::SeqCst);
worker.stop().unwrap();
task.0 + task.1
}
}),
)
.depends_on((&a, &b));
let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
backend.start_fan_out((1u32, 2u32)).await.unwrap();
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(dag);
worker.run().await.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 1);
} |
|
Your approach is not ideal specifically because its not a practical use case scenario. You possibly need to use TestWorker Setup dag without passing the barrier (just normal nodes), execute that with |
|
“I tried TestWorker, but for this fan-in flow with JsonStorage it doesn’t preserve the dependency-completion behavior needed by check_status, so it can loop in waiting state. I switched to a normal worker test to validate the actual runtime behavior: multi-predecessor fan-in executes once and produces the expected result.” Can you check the following code and give feedback? |
Could you elaborate? You approach is now better but I think if |
|
If it is an underlying problem with |
|
I investigated this further and it’s not specific to JsonStorage. The issue is with TestWorker semantics for fan-in flows. Fan-in scheduling relies on backend.check_status() seeing all predecessors as Done. Because of that, fan-in can stay in WaitingForDependencies and the test can loop/hang even though predecessor results were emitted. A real worker run does not have this issue, which is why the normal worker test passes. This looks like either: a known limitation of TestWorker for dependency-driven workflows, or a TestWorker bug (it should emit only after backend completion state is observable). Happy to open a separate issue/PR for TestWorker once we agree on expected behavior. |
|
That doesnt sound right as |
|
The issue is in when TestWorker emits task completion. In TestEmitService::call(): let res = fut.await; The result is emitted immediately after the service future resolves. Fan-in scheduling, however, relies on: backend.check_status(dependency_task_ids) which assumes that dependency completion is already visible in the backend. With TestWorker, there is no synchronization point between This doesn’t occur with a normal worker, where backend state and |
|
Can you share the code you are using with |
|
|
Yeah, you are doing it wrong. Since: It should possibly be something like Also remove |
|
Ah I see the problem, TestWorker {
stream,You can still work around this by exiting the while loop on I will add an API improvement to expose the stream Let me know if you need help. |
|
I tried implementing the TestWorker-based test as suggested. This happens because fan-in relies on backend.check_status, and with TestWorker the emitted results are observable before dependency completion is visible to check_status. As a result, the fan-in gate never opens. |
|
The TestWorkerService wraps all other services and should work appropriately. (Hence not the issue you are describing) My suspicion is that there is an extra bug here: let req = req.map(|_| encoded_input); // Replace args with fan-in input
let response = executor.call(req).await?;
(response, context)It doesnt look obvious but the executor will return Can you confirm this by debugging the first 4 results? and posting them here. |
|
[0] task_id=TaskId(RandomId("4yzi9nwccm4")) resp=Ok(EntryFanOut { node_task_ids: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} }) I captured the first few results from TestWorker (see logs above). Both predecessor nodes run successfully — each emits EnqueuedNext. |
|
From this trace, dependency execution is not the issue — both predecessors emit EnqueuedNext successfully. The problem is probably that the fan-in path never reaches Complete and keeps returning WaitingForDependencies with the same task IDs. This suggests the executor is not advancing from ExecuteNext → Complete once dependencies are satisfied (likely around the executor.call(req) path you pointed out). |
|
This doesnt look right, why would there be more than 5 results? Anyways, let me look at it and then get back. |
|
I will explore more on that, can you give me some guidance |
|
I think the problem is that somewhere we are missing a completed state and
returning WaitForDependencies
…On Fri, 6 Feb 2026 at 17:34, next-n ***@***.***> wrote:
*next-n* left a comment (apalis-dev/apalis#679)
<#679 (comment)>
I will explore more on that, can you give me some guidance
—
Reply to this email directly, view it on GitHub
<#679 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AWXVRGT6FOX2GW3J2XJZRJL4KSRANAVCNFSM6AAAAACTN77A2WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZTQNRQHAYTCNBVGE>
.
You are receiving this because your review was requested.Message ID:
***@***.***>
|
|
now I made a test with TestWorker. And this is working because it asserts on the observed DagExecutionResponse::Complete instead of waiting for the stream to end, which can hang under TestWorker when fan-in returns WaitingForDependencies. below is the code. Appreciate the code review. |
|
I am still trying to avoid this: // Bounded loop so we never hang tests.
for _ in 0..20 {Why is it 20 and not 15? I will merge #682 so that we can use |
|
After a little debugging, I think there is a bug possibly in the I think you should:
I will merge and document the issue later |
|
I did push the commit with the last test |
|
Add a Changelog Entry |
|
oh sorry I missed last one, I will commit again |
|
I push the code, can you please review it |
geofmureithi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The manual retry raises some problems as highlighted. I think there is more to this task.
My initial solution would have involved atomics/transactions. This would have been enforced on a backend level. Another solution might involve storing the state in a root task and updating it and tracking it.
I think the second option has potential.
apalis-workflow/src/dag/service.rs
Outdated
| let response = executor.call(req).await?; | ||
| (response, context) | ||
| } else { | ||
| if is_designated { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, we never discussed this, I dont think its ideal to do this. Because it creates side-effects (with the retry, which creates a new task).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we remove the “enqueue retry task” and replacing the loser behavior with a terminal/no-op outcome or add a “SkippedDuplicate” concept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what you mean by these:
replacing the loser behavior with a terminal/no-op outcome or add a “SkippedDuplicate” concept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else {
if is_designated {
let retry_task = TaskBuilder::new(req.args.clone())
.with_task_id(TaskId::new(B::IdType::generate()))
.meta(context.clone())
.build();
backend
.send(retry_task)
.await
.map_err(BoxDynError::from)?;
}
return Ok(DagExecutionResponse::WaitingForDependencies {
pending_dependencies: dependency_task_ids,
});
}
I plan to change this block of code to the following
-no new task is enqueued, ever, just to dedupe
-if this execution is not the one that “owns” fan-in (didn’t claim), it should terminate/no-op, not “wait”
I also need one small ownership/claim check before fan-in execution.
Right before executing the fan-in node (i.e., after dependencies are confirmed as done), make a single decision.
Identify the fan-in instance uniquely with a stable key:
– workflow run id (root task id)
– fan-in node identity (node name or stable node id)
So the key is something like:
(root_task_id, current_node_id)
So only the claimer executes fan-in. everyone else exits immediately (terminal/no-op) once they see it’s claimed or completed.
What do you think about this flow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would not solve the problem here which is what happens when the designated node is called yet other nodes have not completed. The claiming mechanism would possibly need an extra API which I would recommend avoiding adding. We already have check_status and should try to use this.
|
I did commit on new code, it does
I also make some improvement in testworker, so it won't hang on testing fan-in. Please review these changes, if satisfied, I will set in changelog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont understand why this file has so many line changes but no only one line change in principle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this has only one change, May be because I used codex to change for hanging issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can reverse changes and place only one change and commit again.
apalis-workflow/src/dag/service.rs
Outdated
| pending_dependencies: dependency_task_ids, | ||
| }); | ||
| match action { | ||
| FanInAction::Retry => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I highlighted in the previous conversation, the retry approach is suboptimal and not the way to go. It is also creating rogue task_ids which will make debugging harder. There is already APIs like check_status and wait_for which can handle this better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remove retry re-enqueue logic and switched to dependency waiting/checking for the designated predecessor.
Result in only one designated fan-in path waits and executes merge; non-designated contenders exit early; no rogue retry task IDs.
|
Let me take a look and get back with the specific way forward. I might be
able to work something out with wait_for
…On Tue, 10 Feb 2026 at 20:33, next-n ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
On apalis-core/src/worker/test_worker.rs
<#679 (comment)>:
I can reverse changes and place only one change and commit again.
—
Reply to this email directly, view it on GitHub
<#679 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AWXVRGQTBXOV7WEQKUKFI6T4LII47AVCNFSM6AAAAACTN77A2WVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTOOBQGQ3DONZXGM>
.
You are receiving this because your review was requested.Message ID:
***@***.***>
|
Description
Fixes #673
Fix a bug where a fan-in node in a DAG workflow could be executed more than once when multiple predecessor nodes enqueue it concurrently.
This change ensures that only a single designated predecessor is allowed to trigger fan-in execution once all dependencies are complete.
Type of Change
Bug fix (non-breaking change which fixes an issue)
Testing
I have added tests that prove my fix is effective or that my feature works
I have run the existing tests and they pass
I have run cargo fmt and cargo clippy
Checklist
My code follows the code style of this project
I have performed a self-review of my own code
I have commented my code, particularly in hard-to-understand areas
My changes generate no new warnings
I have added tests that prove my fix is effective or that my feature works
New and existing unit tests pass locally with my changes
Additional Notes
This fix addresses duplicate enqueue caused specifically by concurrent fan-in scheduling.