Conversation
The NewOrchestrationWorker signature was changed to take an OrchestratorOptions struct but the samples were not updated. Signed-off-by: joshvanl <me@joshvanl.dev>
Introduces the event-driven loop infrastructure that will be used by the worker and executor refactors: - backend/loops: EventWorker and EventExecutor marker interfaces with concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator, ExecuteActivity, ConnectStream, DisconnectStream, etc.) - backend/loops/worker: Handler that processes dispatched work items inline within a loop, calling Process/Complete/Abandon on the processor. - backend/loops/executor: Handler that manages gRPC streams and dispatches orchestrator/activity work items to connected clients. - backend/local/loops: EventTask types and handler for the local tasks backend, replacing sync.Map-based pending task tracking with serialized loop processing. Branched from dapr#72 Signed-off-by: joshvanl <me@joshvanl.dev>
Replaces the channel-based work item queue and sync.Map-based pending task tracking in the gRPC executor with a single-threaded event loop. All stream management, work item dispatch, and cleanup now happen serially within the loop handler, eliminating data races. - grpcExecutor: replaces workItemQueue channel and pendingOrchestrators/ pendingActivities sync.Maps with an executor event loop. Adds Start() to the Executor interface. GetWorkItems now connects a stream via the loop and blocks until disconnected. - local.TasksBackend: replaces sync.Map-based pending task tracking with a task event loop. All complete/cancel/register operations are now serialized through the loop. - sqlite/postgres: Start now runs the TasksBackend loop in a goroutine, Stop closes it. - task.taskExecutor: adds no-op Start() that blocks until context done. - Executor mock: adds Start() mock method. - gRPC tests: start the executor loop and cancel on cleanup. Branched from dapr#72 Signed-off-by: joshvanl <me@joshvanl.dev>
Replaces the poll-based worker model with push-based dispatch and makes all Start methods blocking (return when context is cancelled). Worker changes: - TaskWorker interface: Start(ctx) returns error (blocking), adds Dispatch(wi, callback) for push-based dispatch, removes StopAndDrain. - TaskProcessor interface: removes NextWorkItem (no longer polls). - taskWorker uses N event loops (one per parallelism slot) with round-robin dispatch via atomic counter. - Removes processWorkItem from worker.go (moved to loops/worker handler). - activity.go/orchestration.go: removes NextWorkItem methods. TaskHub changes: - TaskHubWorker interface: removes Shutdown (callers cancel context). - taskHubWorker.Start uses RunnerManager to run backend, workers, and pollAndDispatch bridges concurrently. Blocks until context cancelled. - pollAndDispatch bridges blocking Next*WorkItem calls into fire-and- forget Dispatch calls to worker loops. Backend changes: - sqlite/postgres Start: now blocking (delegates to TasksBackend.Run). - sqlite CreateTaskHub: idempotent (returns ErrTaskHubExists if already initialized). Test changes: - All tests use context cancellation instead of StopAndDrain/Shutdown. - orchestrations_test.go: initTaskHubWorker returns CancelFunc, uses ready channel for synchronization. - worker_test.go: rewritten for push-based Dispatch API. - taskhub_test.go: simplified for blocking Start. - grpc_test.go: uses goroutines for blocking Start, context cancel for cleanup. Sample changes: - All samples updated: Init returns CancelFunc, Start runs in goroutine. Branched from dapr#73 Signed-off-by: joshvanl <me@joshvanl.dev>
There was a problem hiding this comment.
Pull request overview
This PR refactors task dispatching to use dapr/kit/events/loop-based executor/task loops, introduces an explicit Start(ctx) lifecycle for the gRPC executor, and wires task backend loops into the sqlite/postgres backends. It also updates samples to a new OrchestratorOptions constructor and bumps several Go dependencies.
Changes:
- Add event-loop handlers for executor/worker/task-backend dispatch and refactor
backend/grpcExecutorto use them. - Add
Executor.Start(ctx)and update grpc tests to start the executor loop. - Start/stop the local
TasksBackendloop from sqlite/postgres backend lifecycle; update samples tobackend.OrchestratorOptions{...}.
Reviewed changes
Copilot reviewed 19 out of 20 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/mocks/Executor.go | Extends mock executor with Start(ctx) support. |
| tests/grpc/grpc_test.go | Starts the gRPC executor loop in TestMain and adjusts shutdown flow. |
| task/executor.go | Adds a no-op/blocking Start(ctx) for the in-process executor implementation. |
| samples/taskexecutionid/taskexecutionid.go | Switches orchestration worker construction to OrchestratorOptions. |
| samples/sequence/sequence.go | Switches orchestration worker construction to OrchestratorOptions. |
| samples/retries/retries.go | Switches orchestration worker construction to OrchestratorOptions. |
| samples/parallel/parallel.go | Switches orchestration worker construction to OrchestratorOptions. |
| samples/externalevents/externalevents.go | Switches orchestration worker construction to OrchestratorOptions. |
| samples/distributedtracing/distributedtracing.go | Switches orchestration worker construction to OrchestratorOptions. |
| backend/sqlite/sqlite.go | Prevents re-creating an already-created task hub; starts/stops local task loop on backend lifecycle. |
| backend/postgres/postgres.go | Starts/stops local task loop on backend lifecycle. |
| backend/loops/worker/worker.go | New worker loop handler for inline work-item processing. |
| backend/loops/loops.go | New shared event types for executor/worker loops. |
| backend/loops/executor/executor.go | New executor loop handler managing stream connection, buffering, and dispatch. |
| backend/local/task.go | Refactors local TasksBackend to use an event loop for pending completion coordination. |
| backend/local/loops/task/task.go | Implements the local task backend loop logic. |
| backend/local/loops/loops.go | Defines event types for the local task backend loop. |
| backend/executor.go | Refactors gRPC executor to use the executor loop; adds Start(ctx) and simplifies stream handling. |
| go.mod | Dependency bumps (grpc/otel/dapr kit/etc) and new indirect deps. |
| go.sum | Corresponding checksum updates for bumped/added deps. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
| wi := e.WorkItem.(T) | ||
| w.processWorkItem(ctx, wi) | ||
| e.Callback <- nil | ||
| } | ||
|
|
||
| func (w *worker[T]) processWorkItem(ctx context.Context, wi T) { | ||
| w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi) | ||
|
|
||
| if err := w.processor.ProcessWorkItem(ctx, wi); err != nil { | ||
| w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err) | ||
| if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil { | ||
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| if err := w.processor.CompleteWorkItem(ctx, wi); err != nil { | ||
| w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err) | ||
| if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil { | ||
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| w.logger.Debugf("%v: work item processed successfully", w.processor.Name()) |
There was a problem hiding this comment.
DispatchWorkItem.Callback is always signaled with nil, even if ProcessWorkItem/CompleteWorkItem fails and the item gets abandoned. This breaks the documented callback contract in loops.DispatchWorkItem (nil=completed, non-nil=abandoned) and can cause callers to think work succeeded when it didn't. Also, e.WorkItem.(T) will panic if the backend accidentally enqueues the wrong type; consider using an ok type assertion and reporting an error via the callback instead of panicking.
| wi := e.WorkItem.(T) | |
| w.processWorkItem(ctx, wi) | |
| e.Callback <- nil | |
| } | |
| func (w *worker[T]) processWorkItem(ctx context.Context, wi T) { | |
| w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi) | |
| if err := w.processor.ProcessWorkItem(ctx, wi); err != nil { | |
| w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err) | |
| if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil { | |
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err) | |
| } | |
| return | |
| } | |
| if err := w.processor.CompleteWorkItem(ctx, wi); err != nil { | |
| w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err) | |
| if err = w.processor.AbandonWorkItem(context.Background(), wi); err != nil { | |
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), err) | |
| } | |
| return | |
| } | |
| w.logger.Debugf("%v: work item processed successfully", w.processor.Name()) | |
| wi, ok := e.WorkItem.(T) | |
| if !ok { | |
| err := fmt.Errorf("%v: received work item of unexpected type %T", w.processor.Name(), e.WorkItem) | |
| w.logger.Errorf("%v", err) | |
| e.Callback <- err | |
| return | |
| } | |
| err := w.processWorkItem(ctx, wi) | |
| e.Callback <- err | |
| } | |
| func (w *worker[T]) processWorkItem(ctx context.Context, wi T) error { | |
| w.logger.Debugf("%v: processing work item: %s", w.processor.Name(), wi) | |
| if err := w.processor.ProcessWorkItem(ctx, wi); err != nil { | |
| w.logger.Errorf("%v: failed to process work item: %v", w.processor.Name(), err) | |
| if abandonErr := w.processor.AbandonWorkItem(context.Background(), wi); abandonErr != nil { | |
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), abandonErr) | |
| } | |
| return fmt.Errorf("processing work item failed: %w", err) | |
| } | |
| if err := w.processor.CompleteWorkItem(ctx, wi); err != nil { | |
| w.logger.Errorf("%v: failed to complete work item: %v", w.processor.Name(), err) | |
| if abandonErr := w.processor.AbandonWorkItem(context.Background(), wi); abandonErr != nil { | |
| w.logger.Errorf("%v: failed to abandon work item: %v", w.processor.Name(), abandonErr) | |
| } | |
| return fmt.Errorf("completing work item failed: %w", err) | |
| } | |
| w.logger.Debugf("%v: work item processed successfully", w.processor.Name()) | |
| return nil |
| func (t *task) completeActivity(key string, res *protos.ActivityResponse) error { | ||
| pending, ok := t.pendingActivities[key] | ||
| if !ok { | ||
| return api.NewUnknownTaskIDError(key, 0) | ||
| } |
There was a problem hiding this comment.
completeActivity returns api.NewUnknownTaskIDError(key, 0) when a waiter isn't found. Passing the composite key string and a hard-coded taskID of 0 produces an incorrect/low-signal error. Since CompleteActivity/CancelActivity events already have InstanceID + TaskID, consider changing completeActivity to accept those separately and return NewUnknownTaskIDError(instanceID, taskID).
| // Start starts the executor loop and blocks until the context is cancelled. | ||
| func (g *grpcExecutor) Start(ctx context.Context) error { | ||
| manager := concurrency.NewRunnerManager(g.executorLoop.Run) | ||
| if g.streamShutdownChan != nil { | ||
| manager.Add(func(ctx context.Context) error { | ||
| select { | ||
| case <-g.streamShutdownChan: | ||
| g.executorLoop.Enqueue(new(loops.StreamShutdown)) | ||
| case <-ctx.Done(): | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
| // When context is cancelled, close the executor loop so Run unblocks. | ||
| manager.Add(func(ctx context.Context) error { | ||
| <-ctx.Done() | ||
| g.executorLoop.Close(new(loops.ShutdownExecutor)) | ||
| return nil | ||
| }) | ||
| return manager.Run(ctx) | ||
| } |
There was a problem hiding this comment.
The gRPC executor now requires Start(ctx) to run the internal executor loop, but nothing in NewGrpcExecutor/GetWorkItems auto-starts it. If callers don't explicitly invoke Start (e.g., main.go currently constructs a gRPC executor but never starts it), ExecuteOrchestrator/ExecuteActivity will enqueue events and then block forever waiting for dispatched. Consider auto-starting the loop (lazy sync.Once on first use) or wiring executor start/stop into the server/worker lifecycle, and documenting this behavioral requirement.
| e.pendingOrch[iid].streamID = e.stream.streamID | ||
|
|
||
| if err := e.sendWorkItem(ev.WorkItem); err != nil { | ||
| ev.Dispatched <- err | ||
| return | ||
| } | ||
| ev.Dispatched <- nil |
There was a problem hiding this comment.
When sendWorkItem fails while a stream is connected, the code only reports the error to ev.Dispatched and returns, but it leaves e.stream set and keeps the orchestrator in pendingOrch. This can strand the executor in a state where a broken stream is still considered connected (blocking new connections) and pending items won't be cleaned up until an explicit disconnect/shutdown. Consider treating Send failures as a stream failure: notify stream.errCh, clear e.stream, and clean up/cancel the affected pending item(s).
| if err := e.sendWorkItem(ev.WorkItem); err != nil { | ||
| ev.Dispatched <- err |
There was a problem hiding this comment.
Same issue for activities: if sendWorkItem fails, the activity remains in pendingAct and e.stream stays non-nil, potentially preventing reconnection and leaving pending state until a later disconnect/shutdown event. Consider failing the stream (signal errCh, clear e.stream) and cleaning up/canceling the pending activity on Send errors.
| if err := e.sendWorkItem(ev.WorkItem); err != nil { | |
| ev.Dispatched <- err | |
| if err := e.sendWorkItem(ev.WorkItem); err != nil { | |
| // Report the error to the dispatcher. | |
| ev.Dispatched <- err | |
| // Clean up the pending activity since the send failed. | |
| delete(e.pendingAct, ev.Key) | |
| // Fail the current stream so a new one can be connected. | |
| if e.stream != nil { | |
| if e.stream.errCh != nil { | |
| select { | |
| case e.stream.errCh <- err: | |
| default: | |
| } | |
| } | |
| e.stream = nil | |
| } |
| log.Fatalf("failed to start worker: %v", err) | ||
| } | ||
| executorCtx, executorCancel := context.WithCancel(ctx) | ||
| go grpcExecutor.Start(executorCtx) |
There was a problem hiding this comment.
grpcExecutor.Start is launched in a goroutine and its returned error is ignored. If Start exits early (e.g., loop run failure), the test suite will continue and may hang or produce misleading failures. Consider capturing the error in a channel and failing fast in TestMain if Start returns unexpectedly before executorCancel() is called.
| go grpcExecutor.Start(executorCtx) | |
| go func() { | |
| if err := grpcExecutor.Start(executorCtx); err != nil && executorCtx.Err() == nil { | |
| log.Fatalf("grpcExecutor.Start exited unexpectedly: %v", err) | |
| } | |
| }() |
Introduces the event-driven loop infrastructure that will be used by
the worker and executor refactors:
concrete event types (DispatchWorkItem, Shutdown, ExecuteOrchestrator,
ExecuteActivity, ConnectStream, DisconnectStream, etc.)
inline within a loop, calling Process/Complete/Abandon on the
processor.
dispatches orchestrator/activity work items to connected clients.
tasks backend, replacing sync.Map-based pending task tracking with
serialized loop processing.
Refactor executor and local tasks backend to loop-based dispatch
Replaces the channel-based work item queue and sync.Map-based pending
task tracking in the gRPC executor with a single-threaded event loop.
All stream management, work item dispatch, and cleanup now happen
serially within the loop handler, eliminating data races.
pendingActivities sync.Maps with an executor event loop. Adds Start()
to the Executor interface. GetWorkItems now connects a stream via
the loop and blocks until disconnected.
a task event loop. All complete/cancel/register operations are now
serialized through the loop.
Stop closes it.
Branched from #72