Skip to content

Refactor to use control loops#73

Draft
JoshVanL wants to merge 3 commits intodapr:mainfrom
JoshVanL:joshvanl/executor-loop
Draft

Refactor to use control loops#73
JoshVanL wants to merge 3 commits intodapr:mainfrom
JoshVanL:joshvanl/executor-loop

Conversation

@JoshVanL
Copy link
Copy Markdown

Introduces the event-driven loop infrastructure that will be used by
the worker and executor refactors:

  • backend/loops: EventWorker and EventExecutor marker interfaces with
    concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator,
    ExecuteActivity, ConnectStream, DisconnectStream, etc.)
  • backend/loops/worker: Handler that processes dispatched work items
    inline within a loop, calling Process/Complete/Abandon on the
    processor.
  • backend/loops/executor: Handler that manages gRPC streams and
    dispatches orchestrator/activity work items to connected clients.
  • backend/local/loops: EventTask types and handler for the local
    tasks backend, replacing sync.Map-based pending task tracking with
    serialized loop processing.

Refactor executor and local tasks backend to loop-based dispatch

Replaces the channel-based work item queue and sync.Map-based pending
task tracking in the gRPC executor with a single-threaded event loop.
All stream management, work item dispatch, and cleanup now happen
serially within the loop handler, eliminating data races.

  • grpcExecutor: replaces workItemQueue channel and pendingOrchestrators/
    pendingActivities sync.Maps with an executor event loop. Adds Start()
    to the Executor interface. GetWorkItems now connects a stream via
    the loop and blocks until disconnected.
  • local.TasksBackend: replaces sync.Map-based pending task tracking with
    a task event loop. All complete/cancel/register operations are now
    serialized through the loop.
  • sqlite/postgres: Start now runs the TasksBackend loop in a goroutine,
    Stop closes it.
  • task.taskExecutor: adds no-op Start() that blocks until context done.
  • Executor mock: adds Start() mock method.
  • gRPC tests: start the executor loop and cancel on cleanup.

Branched from #72

The NewOrchestrationWorker signature was changed to take an
OrchestratorOptions struct but the samples were not updated.

Signed-off-by: joshvanl <me@joshvanl.dev>
Introduces the event-driven loop infrastructure that will be used by
the worker and executor refactors:

- backend/loops: EventWorker and EventExecutor marker interfaces with
  concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator,
  ExecuteActivity, ConnectStream, DisconnectStream, etc.)
- backend/loops/worker: Handler that processes dispatched work items
  inline within a loop, calling Process/Complete/Abandon on the
  processor.
- backend/loops/executor: Handler that manages gRPC streams and
  dispatches orchestrator/activity work items to connected clients.
- backend/local/loops: EventTask types and handler for the local
  tasks backend, replacing sync.Map-based pending task tracking with
  serialized loop processing.

Branched from dapr#72

Signed-off-by: joshvanl <me@joshvanl.dev>
Replaces the channel-based work item queue and sync.Map-based pending
task tracking in the gRPC executor with a single-threaded event loop.
All stream management, work item dispatch, and cleanup now happen
serially within the loop handler, eliminating data races.

- grpcExecutor: replaces workItemQueue channel and pendingOrchestrators/
  pendingActivities sync.Maps with an executor event loop. Adds Start()
  to the Executor interface. GetWorkItems now connects a stream via
  the loop and blocks until disconnected.
- local.TasksBackend: replaces sync.Map-based pending task tracking with
  a task event loop. All complete/cancel/register operations are now
  serialized through the loop.
- sqlite/postgres: Start now runs the TasksBackend loop in a goroutine,
  Stop closes it.
- task.taskExecutor: adds no-op Start() that blocks until context done.
- Executor mock: adds Start() mock method.
- gRPC tests: start the executor loop and cancel on cleanup.

Branched from dapr#72

Signed-off-by: joshvanl <me@joshvanl.dev>
@JoshVanL JoshVanL requested a review from a team as a code owner March 10, 2026 15:59
Copilot AI review requested due to automatic review settings March 10, 2026 15:59
@JoshVanL JoshVanL marked this pull request as draft March 10, 2026 15:59
JoshVanL added a commit to JoshVanL/durabletask-go that referenced this pull request Mar 10, 2026
Replaces the poll-based worker model with push-based dispatch and makes
all Start methods blocking (return when context is cancelled).

Worker changes:
- TaskWorker interface: Start(ctx) returns error (blocking), adds
  Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain.
- TaskProcessor interface: removes NextWorkItem (no longer polls).
- taskWorker uses N event loops (one per parallelism slot) with
  round-robin dispatch via atomic counter.
- Removes processWorkItem from worker.go (moved to loops/worker handler).
- activity.go/orchestration.go: removes NextWorkItem methods.

TaskHub changes:
- TaskHubWorker interface: removes Shutdown (callers cancel context).
- taskHubWorker.Start uses RunnerManager to run backend, workers, and
  pollAndDispatch bridges concurrently. Blocks until context cancelled.
- pollAndDispatch bridges blocking Next*WorkItem calls into fire-and-
  forget Dispatch calls to worker loops.

Backend changes:
- sqlite/postgres Start: now blocking (delegates to TasksBackend.Run).
- sqlite CreateTaskHub: idempotent (returns ErrTaskHubExists if already
  initialized).

Test changes:
- All tests use context cancellation instead of StopAndDrain/Shutdown.
- orchestrations_test.go: initTaskHubWorker returns CancelFunc, uses
  ready channel for synchronization.
- worker_test.go: rewritten for push-based Dispatch API.
- taskhub_test.go: simplified for blocking Start.
- grpc_test.go: uses goroutines for blocking Start, context cancel for
  cleanup.

Sample changes:
- All samples updated: Init returns CancelFunc, Start runs in goroutine.

Branched from  dapr#73

Signed-off-by: joshvanl <me@joshvanl.dev>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors task dispatching to use dapr/kit/events/loop-based executor/task loops, introduces an explicit Start(ctx) lifecycle for the gRPC executor, and wires task backend loops into the sqlite/postgres backends. It also updates samples to a new OrchestratorOptions constructor and bumps several Go dependencies.

Changes:

  • Add event-loop handlers for executor/worker/task-backend dispatch and refactor backend/grpcExecutor to use them.
  • Add Executor.Start(ctx) and update grpc tests to start the executor loop.
  • Start/stop the local TasksBackend loop from sqlite/postgres backend lifecycle; update samples to backend.OrchestratorOptions{...}.

Reviewed changes

Copilot reviewed 19 out of 20 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tests/mocks/Executor.go Extends mock executor with Start(ctx) support.
tests/grpc/grpc_test.go Starts the gRPC executor loop in TestMain and adjusts shutdown flow.
task/executor.go Adds a no-op/blocking Start(ctx) for the in-process executor implementation.
samples/taskexecutionid/taskexecutionid.go Switches orchestration worker construction to OrchestratorOptions.
samples/sequence/sequence.go Switches orchestration worker construction to OrchestratorOptions.
samples/retries/retries.go Switches orchestration worker construction to OrchestratorOptions.
samples/parallel/parallel.go Switches orchestration worker construction to OrchestratorOptions.
samples/externalevents/externalevents.go Switches orchestration worker construction to OrchestratorOptions.
samples/distributedtracing/distributedtracing.go Switches orchestration worker construction to OrchestratorOptions.
backend/sqlite/sqlite.go Prevents re-creating an already-created task hub; starts/stops local task loop on backend lifecycle.
backend/postgres/postgres.go Starts/stops local task loop on backend lifecycle.
backend/loops/worker/worker.go New worker loop handler for inline work-item processing.
backend/loops/loops.go New shared event types for executor/worker loops.
backend/loops/executor/executor.go New executor loop handler managing stream connection, buffering, and dispatch.
backend/local/task.go Refactors local TasksBackend to use an event loop for pending completion coordination.
backend/local/loops/task/task.go Implements the local task backend loop logic.
backend/local/loops/loops.go Defines event types for the local task backend loop.
backend/executor.go Refactors gRPC executor to use the executor loop; adds Start(ctx) and simplifies stream handling.
go.mod Dependency bumps (grpc/otel/dapr kit/etc) and new indirect deps.
go.sum Corresponding checksum updates for bumped/added deps.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

Comment on lines +58 to +82
wi := e.WorkItem.(T)
w.processWorkItem(ctx, wi)
e.Callback <- nil
}

func (w *worker[T]) processWorkItem(ctx context.Context, wi T) {
w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi)

if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err)
if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err)
}
return
}

if err := w.processor.CompleteWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err)
if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err)
}
return
}

w.logger.Debugf("%v: work item processed successfully", w.processor.Name())
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DispatchWorkItem.Callback is always signaled with nil, even if ProcessWorkItem/CompleteWorkItem fails and the item gets abandoned. This breaks the documented callback contract in loops.DispatchWorkItem (nil=completed, non-nil=abandoned) and can cause callers to think work succeeded when it didn't. Also, e.WorkItem.(T) will panic if the backend accidentally enqueues the wrong type; consider using an ok type assertion and reporting an error via the callback instead of panicking.

Suggested change
wi := e.WorkItem.(T)
w.processWorkItem(ctx, wi)
e.Callback <- nil
}
func (w *worker[T]) processWorkItem(ctx context.Context, wi T) {
w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi)
if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err)
if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err)
}
return
}
if err := w.processor.CompleteWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err)
if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err)
}
return
}
w.logger.Debugf("%v: work item processed successfully", w.processor.Name())
wi, ok := e.WorkItem.(T)
if !ok {
err := fmt.Errorf("%v: received work item of unexpected type %T", w.processor.Name(), e.WorkItem)
w.logger.Errorf("%v", err)
e.Callback <- err
return
}
err := w.processWorkItem(ctx, wi)
e.Callback <- err
}
func (w *worker[T]) processWorkItem(ctx context.Context, wi T) error {
w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi)
if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err)
if abandonErr := w.processor.AbandonWorkItem(context.Background(), wi); abandonErr != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), abandonErr)
}
return fmt.Errorf("processing work item failed: %w", err)
}
if err := w.processor.CompleteWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err)
if abandonErr := w.processor.AbandonWorkItem(context.Background(), wi); abandonErr != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), abandonErr)
}
return fmt.Errorf("completing work item failed: %w", err)
}
w.logger.Debugf("%v: work item processed successfully", w.processor.Name())
return nil

Copilot uses AI. Check for mistakes.
Comment on lines +78 to +82
func (t *task) completeActivity(key string, res *protos.ActivityResponse) error {
pending, ok := t.pendingActivities[key]
if !ok {
return api.NewUnknownTaskIDError(key, 0)
}
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeActivity returns api.NewUnknownTaskIDError(key, 0) when a waiter isn't found. Passing the composite key string and a hard-coded taskID of 0 produces an incorrect/low-signal error. Since CompleteActivity/CancelActivity events already have InstanceID + TaskID, consider changing completeActivity to accept those separately and return NewUnknownTaskIDError(instanceID, taskID).

Copilot uses AI. Check for mistakes.
Comment on lines +110 to +130
// Start starts the executor loop and blocks until the context is cancelled.
func (g *grpcExecutor) Start(ctx context.Context) error {
manager := concurrency.NewRunnerManager(g.executorLoop.Run)
if g.streamShutdownChan != nil {
manager.Add(func(ctx context.Context) error {
select {
case <-g.streamShutdownChan:
g.executorLoop.Enqueue(new(loops.StreamShutdown))
case <-ctx.Done():
}
return nil
})
}
// When context is cancelled, close the executor loop so Run unblocks.
manager.Add(func(ctx context.Context) error {
<-ctx.Done()
g.executorLoop.Close(new(loops.ShutdownExecutor))
return nil
})
return manager.Run(ctx)
}
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC executor now requires Start(ctx) to run the internal executor loop, but nothing in NewGrpcExecutor/GetWorkItems auto-starts it. If callers don't explicitly invoke Start (e.g., main.go currently constructs a gRPC executor but never starts it), ExecuteOrchestrator/ExecuteActivity will enqueue events and then block forever waiting for dispatched. Consider auto-starting the loop (lazy sync.Once on first use) or wiring executor start/stop into the server/worker lifecycle, and documenting this behavioral requirement.

Copilot uses AI. Check for mistakes.
Comment on lines +120 to +126
e.pendingOrch[iid].streamID = e.stream.streamID

if err := e.sendWorkItem(ev.WorkItem); err != nil {
ev.Dispatched <- err
return
}
ev.Dispatched <- nil
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When sendWorkItem fails while a stream is connected, the code only reports the error to ev.Dispatched and returns, but it leaves e.stream set and keeps the orchestrator in pendingOrch. This can strand the executor in a state where a broken stream is still considered connected (blocking new connections) and pending items won't be cleaned up until an explicit disconnect/shutdown. Consider treating Send failures as a stream failure: notify stream.errCh, clear e.stream, and clean up/cancel the affected pending item(s).

Copilot uses AI. Check for mistakes.
Comment on lines +152 to +153
if err := e.sendWorkItem(ev.WorkItem); err != nil {
ev.Dispatched <- err
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue for activities: if sendWorkItem fails, the activity remains in pendingAct and e.stream stays non-nil, potentially preventing reconnection and leaving pending state until a later disconnect/shutdown event. Consider failing the stream (signal errCh, clear e.stream) and cleaning up/canceling the pending activity on Send errors.

Suggested change
if err := e.sendWorkItem(ev.WorkItem); err != nil {
ev.Dispatched <- err
if err := e.sendWorkItem(ev.WorkItem); err != nil {
// Report the error to the dispatcher.
ev.Dispatched <- err
// Clean up the pending activity since the send failed.
delete(e.pendingAct, ev.Key)
// Fail the current stream so a new one can be connected.
if e.stream != nil {
if e.stream.errCh != nil {
select {
case e.stream.errCh <- err:
default:
}
}
e.stream = nil
}

Copilot uses AI. Check for mistakes.
log.Fatalf("failed to start worker: %v", err)
}
executorCtx, executorCancel := context.WithCancel(ctx)
go grpcExecutor.Start(executorCtx)
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grpcExecutor.Start is launched in a goroutine and its returned error is ignored. If Start exits early (e.g., loop run failure), the test suite will continue and may hang or produce misleading failures. Consider capturing the error in a channel and failing fast in TestMain if Start returns unexpectedly before executorCancel() is called.

Suggested change
go grpcExecutor.Start(executorCtx)
go func() {
if err := grpcExecutor.Start(executorCtx); err != nil && executorCtx.Err() == nil {
log.Fatalf("grpcExecutor.Start exited unexpectedly: %v", err)
}
}()

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants