From 802077b77188f7c5b3aeb85f9f27891dc0998e14 Mon Sep 17 00:00:00 2001 From: Kaibo Cai Date: Fri, 13 Mar 2026 17:54:58 -0500 Subject: [PATCH] fix: prevent WaitGroup race between ProcessNext and StopAndDrain Add a loopDone channel to ensure the background polling goroutine has fully exited before calling pending.Wait() in StopAndDrain. This prevents the "sync: WaitGroup is reused before previous Wait has returned" panic that occurred when pending.Add(1) in ProcessNext raced with pending.Wait() in StopAndDrain. Co-Authored-By: Claude Opus 4.6 --- backend/worker.go | 15 +++++++++++++++ tests/worker_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/backend/worker.go b/backend/worker.go index 640e7bc3..8668760f 100644 --- a/backend/worker.go +++ b/backend/worker.go @@ -47,6 +47,12 @@ type worker struct { processor TaskProcessor waiting bool stop atomic.Bool + + // loopDone is closed when the background polling goroutine exits. + // StopAndDrain waits on this before calling pending.Wait() to prevent + // a race between ProcessNext calling pending.Add(1) and StopAndDrain + // calling pending.Wait(). + loopDone chan struct{} } type NewTaskWorkerOptions func(*WorkerOptions) @@ -92,8 +98,10 @@ func (w *worker) Start(ctx context.Context) { w.cancel = cancel w.stop.Store(false) + w.loopDone = make(chan struct{}) go func() { + defer close(w.loopDone) var b backoff.BackOff = &backoff.ExponentialBackOff{ InitialInterval: 50 * time.Millisecond, MaxInterval: 5 * time.Second, @@ -204,6 +212,13 @@ func (w *worker) StopAndDrain() { w.cancel() } + // Wait for the polling loop to fully exit before calling pending.Wait(). + // This ensures no more ProcessNext() calls (and thus no more pending.Add(1)) + // can race with pending.Wait(). + if w.loopDone != nil { + <-w.loopDone + } + // Wait for outstanding work-items to finish processing. // TODO: Need to find a way to cancel this if it takes too long for some reason. w.pending.Wait() diff --git a/tests/worker_test.go b/tests/worker_test.go index c4a5a74c..2e1aea35 100644 --- a/tests/worker_test.go +++ b/tests/worker_test.go @@ -178,6 +178,43 @@ func Test_TaskWorker(t *testing.T) { } +func Test_RapidStartStopNoPanic(t *testing.T) { + // Regression test for https://github.com/microsoft/durabletask-go/issues/XXX + // Before the fix, calling StopAndDrain while the polling loop was still + // running could cause "sync: WaitGroup is reused before previous Wait has + // returned" because pending.Add(1) in ProcessNext raced with + // pending.Wait() in StopAndDrain. + for i := 0; i < 50; i++ { + ctx, cancel := context.WithCancel(context.Background()) + + tp := mocks.NewTestTaskPocessor("rapid-start-stop") + tp.UnblockProcessing() + + // Add a work item so the worker actually processes something + tp.AddWorkItems(backend.ActivityWorkItem{SequenceNumber: int64(i)}) + + worker := backend.NewTaskWorker(tp, logger) + worker.Start(ctx) + + // Immediately stop — this is the scenario that triggered the panic + drainFinished := make(chan bool, 1) + go func() { + worker.StopAndDrain() + drainFinished <- true + }() + + select { + case <-drainFinished: + // success + case <-time.After(3 * time.Second): + cancel() + t.Fatalf("iteration %d: worker stop and drain not finished within timeout", i) + } + + cancel() + } +} + func Test_StartAndStop(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()