Skip to content

Conversation

@next-n
Copy link

@next-n next-n commented Jan 30, 2026

Description
Fixes #673
Fix a bug where a fan-in node in a DAG workflow could be executed more than once when multiple predecessor nodes enqueue it concurrently.
This change ensures that only a single designated predecessor is allowed to trigger fan-in execution once all dependencies are complete.

Type of Change

Bug fix (non-breaking change which fixes an issue)

Testing

I have added tests that prove my fix is effective or that my feature works

I have run the existing tests and they pass

I have run cargo fmt and cargo clippy

Checklist

My code follows the code style of this project

I have performed a self-review of my own code

I have commented my code, particularly in hard-to-understand areas

My changes generate no new warnings

I have added tests that prove my fix is effective or that my feature works

New and existing unit tests pass locally with my changes

Additional Notes

This fix addresses duplicate enqueue caused specifically by concurrent fan-in scheduling.

@next-n next-n requested a review from geofmureithi as a code owner January 30, 2026 13:31
Copy link
Member

@geofmureithi geofmureithi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some thoughts

.await?;
// TODO(bug): The check of done is not a good one as it can be called more than once if the jobs a too quickly executed
if results.iter().all(|s| matches!(s.status, Status::Done)) {
// ===== FIX START =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line

Suggested change
// ===== FIX START =====

pending_dependencies: dependency_task_ids,
});
}
// ===== FIX END =====
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this line too

Suggested change
// ===== FIX END =====

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I accidentally deleted the test conversation,
here is the test I added

#[tokio::test]
    async fn test_fan_in_runs_once_with_multiple_predecessors() {
        let counter = Arc::new(AtomicUsize::new(0));
        let barrier = Arc::new(Barrier::new(2));

        let dag = DagFlow::new("fan-in-single-exec-workflow");

        let a = {
            let barrier = barrier.clone();
            dag.add_node("a", task_fn(move |task: u32| {
                let barrier = barrier.clone();
                async move {
                    barrier.wait().await;
                    task
                }
            }))
        };

        let b = {
            let barrier = barrier.clone();
            dag.add_node("b", task_fn(move |task: u32| {
                let barrier = barrier.clone();
                async move {
                    barrier.wait().await;
                    task
                }
            }))
        };

        let counter_clone = Arc::clone(&counter);
        let _fan_in = dag
            .add_node(
                "fan_in",
                task_fn(move |task: (u32, u32), worker: WorkerContext| {
                    let counter = Arc::clone(&counter_clone);
                    async move {
                        counter.fetch_add(1, Ordering::SeqCst);
                        worker.stop().unwrap();
                        task.0 + task.1
                    }
                }),
            )
            .depends_on((&a, &b));

        let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
        backend.start_fan_out((1u32, 2u32)).await.unwrap();

        let worker = WorkerBuilder::new("rango-tango")
            .backend(backend)
            .build(dag);

        worker.run().await.unwrap();

        assert_eq!(counter.load(Ordering::SeqCst), 1);
    }
I have not pushed the code yet, Does this coverage look sufficient to you, or would you prefer a different structure?
And should I also delete comment of TODO?

@geofmureithi
Copy link
Member

Your approach is not ideal specifically because its not a practical use case scenario.

You possibly need to use TestWorker

Setup dag without passing the barrier (just normal nodes), execute that with TestWorker then collect the results and assert that the results match what we expect.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

“I tried TestWorker, but for this fan-in flow with JsonStorage it doesn’t preserve the dependency-completion behavior needed by check_status, so it can loop in waiting state. I switched to a normal worker test to validate the actual runtime behavior: multi-predecessor fan-in executes once and produces the expected result.” Can you check the following code and give feedback?

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    let counter = Arc::new(AtomicUsize::new(0));
    let sum = Arc::new(AtomicUsize::new(0));
    let dag = DagFlow::new("fan-in-single-exec-workflow");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let counter_clone = Arc::clone(&counter);
    let sum_clone = Arc::clone(&sum);
    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(move |task: (u32, u32), worker: WorkerContext| {
                let counter = Arc::clone(&counter_clone);
                let sum = Arc::clone(&sum_clone);
                async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    sum.store((task.0 + task.1) as usize, Ordering::SeqCst);
                    worker.stop().unwrap();
                    task.0 + task.1
                }
            }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let worker = WorkerBuilder::new("rango-tango")
        .backend(backend)
        .on_event(|ctx, ev| {
            if matches!(ev, Event::Error(_)) {
                ctx.stop().unwrap();
            }
        })
        .build(dag);
    worker.run().await.unwrap();

    assert_eq!(counter.load(Ordering::SeqCst), 1);
    assert_eq!(sum.load(Ordering::SeqCst), 3);
}

@geofmureithi
Copy link
Member

I tried TestWorker, but for this fan-in flow with JsonStorage it doesn’t preserve the dependency-completion behavior needed by check_status, so it can loop in waiting state

Could you elaborate? You approach is now better but I think if TestWorker is not working then I would want to fix that as it would signify an underlying problem.

@geofmureithi
Copy link
Member

If it is an underlying problem with JsonStorage let me know so that we can separate it from this issue.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I investigated this further and it’s not specific to JsonStorage.

The issue is with TestWorker semantics for fan-in flows.

Fan-in scheduling relies on backend.check_status() seeing all predecessors as Done.
TestWorker emits task results immediately when the service future completes, before backend state used by check_status() is guaranteed to be visible.

Because of that, fan-in can stay in WaitingForDependencies and the test can loop/hang even though predecessor results were emitted.

A real worker run does not have this issue, which is why the normal worker test passes.

This looks like either:

a known limitation of TestWorker for dependency-driven workflows, or

a TestWorker bug (it should emit only after backend completion state is observable).

Happy to open a separate issue/PR for TestWorker once we agree on expected behavior.

@geofmureithi
Copy link
Member

That doesnt sound right as TestWorker just wraps a regular worker and pipes results into a channel.
Can you provide the code you are using for TestWorker?

@next-n
Copy link
Author

next-n commented Feb 2, 2026

The issue is in when TestWorker emits task completion.

In TestEmitService::call():

let res = fut.await;
tx.send((task_id, Ok(res))).await.unwrap();

The result is emitted immediately after the service future resolves.

Fan-in scheduling, however, relies on:

backend.check_status(dependency_task_ids)

which assumes that dependency completion is already visible in the backend.

With TestWorker, there is no synchronization point between
“service finished” and “backend status observable by check_status”.
So execute_next() can see predecessor results while the backend still
reports them as incomplete, leaving the fan-in node in
WaitingForDependencies.

This doesn’t occur with a normal worker, where backend state and
scheduling advance together.

@geofmureithi
Copy link
Member

geofmureithi commented Feb 2, 2026

Can you share the code you are using with TestWorker?

@next-n
Copy link
Author

next-n commented Feb 2, 2026

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors_testworker() {
    use std::sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    };

    use apalis_file_storage::JsonStorage;
    use serde_json::Value;
    use apalis_core::worker::test_worker::{TestWorker, ExecuteNext};
    use apalis_core::task_fn::task_fn;

    let counter = Arc::new(AtomicUsize::new(0));
    let dag = DagFlow::new("fan-in-testworker");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let counter_clone = Arc::clone(&counter);
    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(move |task: (u32, u32)| {
                let counter = Arc::clone(&counter_clone);
                async move {
                    counter.fetch_add(1, Ordering::SeqCst);
                    task.0 + task.1
                }
            }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let mut worker = TestWorker::new(backend, dag);

    // This loop never reaches fan_in completion
    while let Some(item) = worker.execute_next().await {
        let (_task_id, resp) = item.unwrap();
        let _ = resp.unwrap();
    }

    // This assertion is never reached
    assert_eq!(counter.load(Ordering::SeqCst), 1);
}

@geofmureithi
Copy link
Member

Yeah, you are doing it wrong.

Since: worker.execute_next() returns a Stream, use .take(4) and assert that the resp returned is what we expect.

It should possibly be something like vec[EnqueuedNext{..}, EnqueuedNext{..}, WaitingForDependencies {.. }, Complete {..}].

Also remove let counter = Arc::clone(&counter_clone);. TestWorker already handles this.

@geofmureithi
Copy link
Member

geofmureithi commented Feb 2, 2026

Ah I see the problem, TestWorker does not expose stream:

 TestWorker {
            stream,

You can still work around this by exiting the while loop on Complete or using a loop and breaking at the 4th iteration. The key goal is to use resp rather than the counter.

I will add an API improvement to expose the stream

Let me know if you need help.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

I tried implementing the TestWorker-based test as suggested.
However, the fan-in node never reaches Complete under TestWorker — it remains in WaitingForDependencies, so complete == 0.

This happens because fan-in relies on backend.check_status, and with TestWorker the emitted results are observable before dependency completion is visible to check_status. As a result, the fan-in gate never opens.
this is the code I used

#[tokio::test]
async fn test_fan_in_runs_once_with_multiple_predecessors() {
    use apalis_core::{
        task_fn::task_fn,
        worker::test_worker::{ExecuteNext, TestWorker},
    };
    use apalis_file_storage::JsonStorage;
    use serde_json::Value;

    let dag = DagFlow::new("fan-in-testworker");

    let a = dag.add_node("a", task_fn(|task: u32| async move { task }));
    let b = dag.add_node("b", task_fn(|task: u32| async move { task }));

    let _fan_in = dag
        .add_node(
            "fan_in",
            task_fn(|task: (u32, u32)| async move { task.0 + task.1 }),
        )
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let mut worker = TestWorker::new(backend, dag);

    use super::response::DagExecutionResponse::*;

    let mut enqueued_next = 0;
    let mut waiting = 0;
    let mut complete = 0;
    let mut complete_value = None;

    // bounded loop to avoid hangs
    for _ in 0..50 {
        let Some(item) = worker.execute_next().await else { break };
        let (_task_id, resp) = item.unwrap();
        let resp = resp.unwrap();

        match resp {
            EnqueuedNext { .. } => enqueued_next += 1,
            WaitingForDependencies { .. } => waiting += 1,
            Complete { result } => {
                complete += 1;
                complete_value = Some(result);
                break; // stop after first completion
            }
            _ => {}
        }
    }

    // Assertions focus on semantics, not order
    assert_eq!(enqueued_next, 2, "both predecessors must enqueue once");
    assert!(waiting >= 1, "fan-in must wait at least once");
    assert_eq!(complete, 1, "fan-in must execute exactly once");
    assert_eq!(complete_value, Some(Value::from(3u32)));
}

@geofmureithi
Copy link
Member

The TestWorkerService wraps all other services and should work appropriately. (Hence not the issue you are describing)

My suspicion is that there is an extra bug here:

let req = req.map(|_| encoded_input); // Replace args with fan-in input
let response = executor.call(req).await?;
(response, context)

It doesnt look obvious but the executor will return ExecuteNext and since there is no Next and we dont check so, then it doesnt return the correct result which should be Complete.

Can you confirm this by debugging the first 4 results? and posting them here.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

[0] task_id=TaskId(RandomId("4yzi9nwccm4")) resp=Ok(EntryFanOut { node_task_ids: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[1] task_id=TaskId(RandomId("4yzi9pmhn5p")) resp=Ok(EnqueuedNext { result: Number(2) })
[2] task_id=TaskId(RandomId("4yzi9pwccm4")) resp=Ok(EnqueuedNext { result: Number(1) })
[3] task_id=TaskId(RandomId("4yzi9qyqw4k")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[4] task_id=TaskId(RandomId("4yzi9re6m5-")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[5] task_id=TaskId(RandomId("4yzi9te6m5-")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[6] task_id=TaskId(RandomId("4yzi9trzjjw")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })
[7] task_id=TaskId(RandomId("4yzi9ukxq95")) resp=Ok(WaitingForDependencies { pending_dependencies: {NodeIndex(0): TaskId(RandomId("4yzi9pwccm4")), NodeIndex(1): TaskId(RandomId("4yzi9pmhn5p"))} })

I captured the first few results from TestWorker (see logs above).

Both predecessor nodes run successfully — each emits EnqueuedNext.
However, the fan-in node never transitions to Complete.

@next-n
Copy link
Author

next-n commented Feb 2, 2026

From this trace, dependency execution is not the issue — both predecessors emit EnqueuedNext successfully.

The problem is probably that the fan-in path never reaches Complete and keeps returning WaitingForDependencies with the same task IDs. This suggests the executor is not advancing from ExecuteNext → Complete once dependencies are satisfied (likely around the executor.call(req) path you pointed out).

@geofmureithi
Copy link
Member

This doesnt look right, why would there be more than 5 results?

Anyways, let me look at it and then get back.

@next-n
Copy link
Author

next-n commented Feb 6, 2026

I will explore more on that, can you give me some guidance

@geofmureithi
Copy link
Member

geofmureithi commented Feb 6, 2026 via email

@next-n
Copy link
Author

next-n commented Feb 7, 2026

now I made a test with TestWorker. And this is working because it asserts on the observed DagExecutionResponse::Complete instead of waiting for the stream to end, which can hang under TestWorker when fan-in returns WaitingForDependencies. below is the code. Appreciate the code review.

#[tokio::test]
async fn fan_in_completes_once_with_testworker() {
    use apalis_core::task_fn::task_fn;
    use apalis_core::worker::test_worker::{ExecuteNext, TestWorker};
    use apalis_file_storage::JsonStorage;
    use crate::dag::response::DagExecutionResponse;

    use serde_json::Value;

    let dag = DagFlow::new("fan-in-testworker");

    let a = dag.add_node("a", task_fn(|t: u32| async move { t }));
    let b = dag.add_node("b", task_fn(|t: u32| async move { t }));

    let _fan_in = dag
        .add_node("fan_in", task_fn(|t: (u32, u32)| async move { t.0 + t.1 }))
        .depends_on((&a, &b));

    let mut backend: JsonStorage<Value> = JsonStorage::new_temp().unwrap();
    backend.start_fan_out((1u32, 2u32)).await.unwrap();

    let mut worker = TestWorker::new(backend, dag);

    let mut responses = Vec::<DagExecutionResponse<Value, _>>::new();

    // Bounded loop so we never hang tests.
    for _ in 0..20 {
        let Some(item) = worker.execute_next().await else {
            break;
        };
        let (_task_id, resp) = item.expect("worker error");
        let resp = resp.expect("task error");
        responses.push(resp);

        // Optional: stop early once we see completion.
        if matches!(responses.last(), Some(DagExecutionResponse::Complete { .. })) {
            // pull a couple more if you want to ensure no duplicates,
            // but usually backend drains to None after this.
        }
    }

    let complete: Vec<_> = responses
        .iter()
        .filter_map(|r| match r {
            DagExecutionResponse::Complete { result } => Some(result.clone()),
            _ => None,
        })
        .collect();

    assert_eq!(
        complete,
        vec![Value::from(3)],
        "expected exactly one Complete(3). full trace: {responses:?}"
    );

    // This is the “no duplicate fan-in execution” signal:
    // if fan_in ran twice, you'd typically get >1 Complete in the trace.
    assert_eq!(
        complete.len(),
        1,
        "fan-in should complete once. full trace: {responses:?}"
    );
}

@geofmureithi
Copy link
Member

I am still trying to avoid this:

    // Bounded loop so we never hang tests.
    for _ in 0..20 {

Why is it 20 and not 15?

I will merge #682 so that we can use Stream::take

@geofmureithi
Copy link
Member

After a little debugging, I think there is a bug possibly in the JSONStorage.

I think you should:

  1. Clean up any unnecessary code and comments
  2. Update with the base branch
  3. Include the last test
  4. Add a changelog

I will merge and document the issue later

@next-n
Copy link
Author

next-n commented Feb 9, 2026

I did push the commit with the last test

@geofmureithi
Copy link
Member

Add a Changelog Entry

@next-n
Copy link
Author

next-n commented Feb 9, 2026

oh sorry I missed last one, I will commit again

@next-n
Copy link
Author

next-n commented Feb 9, 2026

I push the code, can you please review it

Copy link
Member

@geofmureithi geofmureithi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manual retry raises some problems as highlighted. I think there is more to this task.

My initial solution would have involved atomics/transactions. This would have been enforced on a backend level. Another solution might involve storing the state in a root task and updating it and tracking it.

I think the second option has potential.

let response = executor.call(req).await?;
(response, context)
} else {
if is_designated {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, we never discussed this, I dont think its ideal to do this. Because it creates side-effects (with the retry, which creates a new task).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we remove the “enqueue retry task” and replacing the loser behavior with a terminal/no-op outcome or add a “SkippedDuplicate” concept.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what you mean by these:

replacing the loser behavior with a terminal/no-op outcome or add a “SkippedDuplicate” concept.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else {
if is_designated {
let retry_task = TaskBuilder::new(req.args.clone())
.with_task_id(TaskId::new(B::IdType::generate()))
.meta(context.clone())
.build();
backend
.send(retry_task)
.await
.map_err(BoxDynError::from)?;
}
return Ok(DagExecutionResponse::WaitingForDependencies {
pending_dependencies: dependency_task_ids,
});
}

I plan to change this block of code to the following

-no new task is enqueued, ever, just to dedupe
-if this execution is not the one that “owns” fan-in (didn’t claim), it should terminate/no-op, not “wait”

I also need one small ownership/claim check before fan-in execution.

Right before executing the fan-in node (i.e., after dependencies are confirmed as done), make a single decision.

Identify the fan-in instance uniquely with a stable key:

– workflow run id (root task id)
– fan-in node identity (node name or stable node id)

So the key is something like:

(root_task_id, current_node_id)

So only the claimer executes fan-in. everyone else exits immediately (terminal/no-op) once they see it’s claimed or completed.
What do you think about this flow

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would not solve the problem here which is what happens when the designated node is called yet other nodes have not completed. The claiming mechanism would possibly need an extra API which I would recommend avoiding adding. We already have check_status and should try to use this.

@next-n
Copy link
Author

next-n commented Feb 10, 2026

I did commit on new code, it does

  1. Fan-in now checks all_done
  2. If designated parent arrives early, it re-enqueues retry
  3. Non-designated contenders exit immediately
  4. Retry-on-missing-result path also re-enqueues

I also make some improvement in testworker, so it won't hang on testing fan-in.
it ack persists the service return value, so it now stores real DagExecutionResponse instead of ().
Then fan-in dependency checks/merge can decode and finish, instead of retrying forever.

Please review these changes, if satisfied, I will set in changelog

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand why this file has so many line changes but no only one line change in principle

Copy link
Author

@next-n next-n Feb 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this has only one change, May be because I used codex to change for hanging issue

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can reverse changes and place only one change and commit again.

pending_dependencies: dependency_task_ids,
});
match action {
FanInAction::Retry => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I highlighted in the previous conversation, the retry approach is suboptimal and not the way to go. It is also creating rogue task_ids which will make debugging harder. There is already APIs like check_status and wait_for which can handle this better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove retry re-enqueue logic and switched to dependency waiting/checking for the designated predecessor.
Result in only one designated fan-in path waits and executes merge; non-designated contenders exit early; no rogue retry task IDs.

@geofmureithi
Copy link
Member

geofmureithi commented Feb 10, 2026 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG fan‑in node can execute more than once when dependencies finish close together

2 participants