Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 84 additions & 173 deletions backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"strconv"
"strings"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -21,24 +20,16 @@ import (
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"github.com/dapr/durabletask-go/backend/loops"
loopexecutor "github.com/dapr/durabletask-go/backend/loops/executor"
"github.com/dapr/kit/concurrency"
"github.com/dapr/kit/events/loop"
)

var emptyCompleteTaskResponse = &protos.CompleteTaskResponse{}

var errShuttingDown error = status.Error(codes.Canceled, "shutting down")

type pendingOrchestrator struct {
instanceID api.InstanceID
streamID string
}

type pendingActivity struct {
instanceID api.InstanceID
taskID int32
streamID string
}

type Executor interface {
Start(context.Context) error
ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*protos.OrchestratorResponse, error)
ExecuteActivity(context.Context, api.InstanceID, *protos.HistoryEvent) (*protos.HistoryEvent, error)
Shutdown(ctx context.Context) error
Expand All @@ -47,9 +38,7 @@ type Executor interface {
type grpcExecutor struct {
protos.UnimplementedTaskHubSidecarServiceServer

workItemQueue chan *protos.WorkItem
pendingOrchestrators *sync.Map // map[api.InstanceID]*pendingOrchestrator
pendingActivities *sync.Map // map[string]*pendingActivity
executorLoop loop.Interface[loops.EventExecutor]
backend Backend
logger Logger
onWorkItemConnection func(context.Context) error
Expand Down Expand Up @@ -91,12 +80,6 @@ func WithStreamShutdownChannel(c <-chan any) grpcExecutorOptions {
}
}

func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions {
return func(g *grpcExecutor) {
g.streamSendTimeout = &d
}
}

func WithSkipWaitForInstanceStart() grpcExecutorOptions {
return func(g *grpcExecutor) {
g.skipWaitForInstanceStart = true
Expand All @@ -105,27 +88,49 @@ func WithSkipWaitForInstanceStart() grpcExecutorOptions {

// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
grpcExecutor := &grpcExecutor{
workItemQueue: make(chan *protos.WorkItem),
backend: be,
logger: logger,
pendingOrchestrators: &sync.Map{},
pendingActivities: &sync.Map{},
grpcExec := &grpcExecutor{
backend: be,
logger: logger,
}

for _, opt := range opts {
opt(grpcExecutor)
opt(grpcExec)
}

return grpcExecutor, func(grpcServer grpc.ServiceRegistrar) {
protos.RegisterTaskHubSidecarServiceServer(grpcServer, grpcExecutor)
grpcExec.executorLoop = loopexecutor.New(loopexecutor.Options{
Backend: be,
Logger: logger,
})

return grpcExec, func(grpcServer grpc.ServiceRegistrar) {
protos.RegisterTaskHubSidecarServiceServer(grpcServer, grpcExec)
}
}

// ExecuteOrchestrator implements Executor
func (executor *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*protos.OrchestratorResponse, error) {
executor.pendingOrchestrators.Store(iid, &pendingOrchestrator{instanceID: iid})
// Start starts the executor loop and blocks until the context is cancelled.
func (g *grpcExecutor) Start(ctx context.Context) error {
manager := concurrency.NewRunnerManager(g.executorLoop.Run)
if g.streamShutdownChan != nil {
manager.Add(func(ctx context.Context) error {
select {
case <-g.streamShutdownChan:
g.executorLoop.Enqueue(new(loops.StreamShutdown))
case <-ctx.Done():
}
return nil
})
}
// When context is cancelled, close the executor loop so Run unblocks.
manager.Add(func(ctx context.Context) error {
<-ctx.Done()
g.executorLoop.Close(new(loops.ShutdownExecutor))
return nil
})
return manager.Run(ctx)
}
Comment on lines +110 to +130
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gRPC executor now requires Start(ctx) to run the internal executor loop, but nothing in NewGrpcExecutor/GetWorkItems auto-starts it. If callers don't explicitly invoke Start (e.g., main.go currently constructs a gRPC executor but never starts it), ExecuteOrchestrator/ExecuteActivity will enqueue events and then block forever waiting for dispatched. Consider auto-starting the loop (lazy sync.Once on first use) or wiring executor start/stop into the server/worker lifecycle, and documenting this behavioral requirement.

Copilot uses AI. Check for mistakes.

// ExecuteOrchestrator implements Executor
func (g *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.InstanceID, oldEvents []*protos.HistoryEvent, newEvents []*protos.HistoryEvent) (*protos.OrchestratorResponse, error) {
req := &protos.OrchestratorRequest{
InstanceId: string(iid),
ExecutionId: nil,
Expand All @@ -139,37 +144,41 @@ func (executor *grpcExecutor) ExecuteOrchestrator(ctx context.Context, iid api.I
},
}

wait := executor.backend.WaitForOrchestratorCompletion(req)
wait := g.backend.WaitForOrchestratorCompletion(req)

dispatched := make(chan error, 1)
g.executorLoop.Enqueue(&loops.ExecuteOrchestrator{
InstanceID: string(iid),
WorkItem: workItem,
Dispatched: dispatched,
})

// Send the orchestration execution work-item to the connected worker.
// This will block if the worker isn't listening for work items.
// Block until dispatched to stream (preserves back-pressure).
select {
case <-ctx.Done():
executor.logger.Warnf("%s: context canceled before dispatching orchestrator work item", iid)
g.logger.Warnf("%s: context canceled before dispatching orchestrator work item", iid)
return nil, fmt.Errorf("context canceled before dispatching orchestrator work item: %w", ctx.Err())
case executor.workItemQueue <- workItem:
case err := <-dispatched:
if err != nil {
return nil, fmt.Errorf("failed to dispatch orchestrator work item: %w", err)
}
}

resp, err := wait(ctx)

// this orchestrator is either completed or cancelled, but its no longer pending, delete it
executor.pendingOrchestrators.Delete(iid)
if err != nil {
if errors.Is(err, api.ErrTaskCancelled) {
return nil, errors.New("operation aborted")
}
executor.logger.Warnf("%s: failed before receiving orchestration result", iid)
g.logger.Warnf("%s: failed before receiving orchestration result", iid)
return nil, err
}

return resp, nil
}

// ExecuteActivity implements Executor
func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.InstanceID, e *protos.HistoryEvent) (*protos.HistoryEvent, error) {
func (g *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.InstanceID, e *protos.HistoryEvent) (*protos.HistoryEvent, error) {
key := GetActivityExecutionKey(string(iid), e.EventId)
executor.pendingActivities.Store(key, &pendingActivity{instanceID: iid, taskID: e.EventId})

task := e.GetTaskScheduled()

req := &protos.ActivityRequest{
Expand All @@ -187,26 +196,34 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta
},
}

wait := executor.backend.WaitForActivityCompletion(req)
wait := g.backend.WaitForActivityCompletion(req)

dispatched := make(chan error, 1)
g.executorLoop.Enqueue(&loops.ExecuteActivity{
Key: key,
InstanceID: string(iid),
TaskID: e.EventId,
WorkItem: workItem,
Dispatched: dispatched,
})

// Send the activity execution work-item to the connected worker.
// This will block if the worker isn't listening for work items.
// Block until dispatched to stream (preserves back-pressure).
select {
case <-ctx.Done():
executor.logger.Warnf("%s/%s#%d: context canceled before dispatching activity work item", iid, task.Name, e.EventId)
g.logger.Warnf("%s/%s#%d: context canceled before dispatching activity work item", iid, task.Name, e.EventId)
return nil, fmt.Errorf("context canceled before dispatching activity work item: %w", ctx.Err())
case executor.workItemQueue <- workItem:
case err := <-dispatched:
if err != nil {
return nil, fmt.Errorf("failed to dispatch activity work item: %w", err)
}
}

resp, err := wait(ctx)

// this activity is either completed or cancelled, but its no longer pending, delete it
executor.pendingActivities.Delete(key)
if err != nil {
if errors.Is(err, api.ErrTaskCancelled) {
return nil, errors.New("operation aborted")
}
executor.logger.Warnf("%s/%s#%d: failed before receiving activity result", iid, task.Name, e.EventId)
g.logger.Warnf("%s/%s#%d: failed before receiving activity result", iid, task.Name, e.EventId)
return nil, err
}

Expand Down Expand Up @@ -244,31 +261,7 @@ func (executor *grpcExecutor) ExecuteActivity(ctx context.Context, iid api.Insta

// Shutdown implements Executor
func (g *grpcExecutor) Shutdown(ctx context.Context) error {
// closing the work item queue is a signal for shutdown
close(g.workItemQueue)

// Iterate through all pending items and close them to unblock the goroutines waiting on this
g.pendingActivities.Range(func(_, value any) bool {
p, ok := value.(*pendingActivity)
if ok {
err := g.backend.CancelActivityTask(ctx, p.instanceID, p.taskID)
if err != nil {
g.logger.Warnf("failed to cancel activity task: %v", err)
}
}
return true
})
g.pendingOrchestrators.Range(func(_, value any) bool {
p, ok := value.(*pendingOrchestrator)
if ok {
err := g.backend.CancelOrchestratorTask(ctx, p.instanceID)
if err != nil {
g.logger.Warnf("failed to cancel orchestrator task: %v", err)
}
}
return true
})

g.executorLoop.Close(new(loops.ShutdownExecutor))
return nil
}

Expand All @@ -295,109 +288,28 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
g.logger.Warnf("error while disconnecting work item stream: %v", derr)
}

return status.Errorf(codes.Unavailable, message)
return status.Errorf(codes.Unavailable, "%s", message)
}

defer func() {
// If there's any pending activity left, remove them
g.pendingActivities.Range(func(key, value any) bool {
if p, ok := value.(*pendingActivity); ok && p.streamID == streamID {
g.logger.Debugf("cleaning up pending activity: %s", key)
err := g.backend.CancelActivityTask(context.Background(), p.instanceID, p.taskID)
if err != nil {
g.logger.Warnf("failed to cancel activity task: %v", err)
}
g.pendingActivities.Delete(key)
}
return true
})
g.pendingOrchestrators.Range(func(key, value any) bool {
if p, ok := value.(*pendingOrchestrator); ok && p.streamID == streamID {
g.logger.Debugf("cleaning up pending orchestrator: %s", key)
err := g.backend.CancelOrchestratorTask(context.Background(), p.instanceID)
if err != nil {
g.logger.Warnf("failed to cancel orchestrator task: %v", err)
}
}
return true
})
g.executorLoop.Enqueue(&loops.DisconnectStream{StreamID: streamID})
if err := g.executeOnWorkItemDisconnect(stream.Context()); err != nil {
g.logger.Warnf("error while disconnecting work item stream: %v", err)
}
}()

ch := make(chan *protos.WorkItem)
errCh := make(chan error, 1)
go func() {
for {
select {
case <-stream.Context().Done():
return
case wi := <-ch:
errCh <- stream.Send(wi)
}
}
}()

// The worker client invokes this method, which streams back work-items as they arrive.
for {
select {
case <-stream.Context().Done():
g.logger.Info("work item stream closed")
return nil
case wi, ok := <-g.workItemQueue:
if !ok {
continue
}

switch x := wi.Request.(type) {
case *protos.WorkItem_OrchestratorRequest:
key := x.OrchestratorRequest.GetInstanceId()
if value, ok := g.pendingOrchestrators.Load(api.InstanceID(key)); ok {
if p, ok := value.(*pendingOrchestrator); ok {
p.streamID = streamID
}
}
case *protos.WorkItem_ActivityRequest:
key := GetActivityExecutionKey(x.ActivityRequest.GetOrchestrationInstance().GetInstanceId(), x.ActivityRequest.GetTaskId())
if value, ok := g.pendingActivities.Load(key); ok {
if p, ok := value.(*pendingActivity); ok {
p.streamID = streamID
}
}
}

if err := g.sendWorkItem(stream, wi, ch, errCh); err != nil {
g.logger.Errorf("encountered an error while sending work item: %v", err)
return err
}

case <-g.streamShutdownChan:
return errShuttingDown
}
}
}
g.executorLoop.Enqueue(&loops.ConnectStream{
StreamID: streamID,
Stream: stream,
ErrCh: errCh,
})

func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkItemsServer, wi *protos.WorkItem,
ch chan *protos.WorkItem, errCh chan error,
) error {
// Wait for either the stream context to be done or the loop to signal an error.
select {
case <-stream.Context().Done():
return stream.Context().Err()
case ch <- wi:
}

ctx := stream.Context()
if g.streamSendTimeout != nil {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, *g.streamSendTimeout)
defer cancel()
}

select {
case <-ctx.Done():
g.logger.Errorf("timed out while sending work item")
return fmt.Errorf("timed out while sending work item: %w", ctx.Err())
g.logger.Info("work item stream closed")
return nil
case err := <-errCh:
return err
}
Expand Down Expand Up @@ -680,4 +592,3 @@ func createGetInstanceResponse(req *protos.GetInstanceRequest, metadata *Orchest

return &protos.GetInstanceResponse{Exists: true, OrchestrationState: state}
}

Loading