Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type worker struct {
processor TaskProcessor
waiting bool
stop atomic.Bool

// loopDone is closed when the background polling goroutine exits.
// StopAndDrain waits on this before calling pending.Wait() to prevent
// a race between ProcessNext calling pending.Add(1) and StopAndDrain
// calling pending.Wait().
loopDone chan struct{}
}

type NewTaskWorkerOptions func(*WorkerOptions)
Expand Down Expand Up @@ -92,8 +98,10 @@ func (w *worker) Start(ctx context.Context) {
w.cancel = cancel

w.stop.Store(false)
w.loopDone = make(chan struct{})

go func() {
defer close(w.loopDone)
var b backoff.BackOff = &backoff.ExponentialBackOff{
Comment on lines +101 to 105
InitialInterval: 50 * time.Millisecond,
MaxInterval: 5 * time.Second,
Expand Down Expand Up @@ -204,6 +212,13 @@ func (w *worker) StopAndDrain() {
w.cancel()
}

// Wait for the polling loop to fully exit before calling pending.Wait().
// This ensures no more ProcessNext() calls (and thus no more pending.Add(1))
// can race with pending.Wait().
if w.loopDone != nil {
<-w.loopDone
}

// Wait for outstanding work-items to finish processing.
// TODO: Need to find a way to cancel this if it takes too long for some reason.
w.pending.Wait()
Expand Down
37 changes: 37 additions & 0 deletions tests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,43 @@ func Test_TaskWorker(t *testing.T) {

}

func Test_RapidStartStopNoPanic(t *testing.T) {
// Regression test for https://github.com/microsoft/durabletask-go/issues/XXX
// Before the fix, calling StopAndDrain while the polling loop was still
// running could cause "sync: WaitGroup is reused before previous Wait has
// returned" because pending.Add(1) in ProcessNext raced with
// pending.Wait() in StopAndDrain.
for i := 0; i < 50; i++ {
ctx, cancel := context.WithCancel(context.Background())

tp := mocks.NewTestTaskPocessor("rapid-start-stop")
tp.UnblockProcessing()

// Add a work item so the worker actually processes something
tp.AddWorkItems(backend.ActivityWorkItem{SequenceNumber: int64(i)})

worker := backend.NewTaskWorker(tp, logger)
worker.Start(ctx)

// Immediately stop — this is the scenario that triggered the panic
drainFinished := make(chan bool, 1)
go func() {
worker.StopAndDrain()
drainFinished <- true
}()

select {
case <-drainFinished:
// success
case <-time.After(3 * time.Second):
cancel()
t.Fatalf("iteration %d: worker stop and drain not finished within timeout", i)
}

cancel()
}
}

func Test_StartAndStop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
Loading