From bc467804d970b2d54e557c1686bf306cc354eb92 Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Thu, 26 Feb 2026 17:19:02 -0800 Subject: [PATCH 1/6] feat: add Durable Task Scheduler (DTS) backend Add a new backend/scheduler package that implements the Backend interface for connecting to the Durable Task Scheduler cloud service over gRPC. Key components: - SchedulerOptions: configuration with endpoint, task hub, auth type - Connection string parsing matching the .NET SDK format (Endpoint=...;TaskHub=...;Authentication=...) - gRPC channel with TLS auto-detection and metadata interceptors (taskhub, workerid headers injected on every request) - Stub Backend interface implementation (only used for connection) - Worker identity generation (hostname,pid,uuid) matching existing backends The DTS backend is a remote service, so work item dispatch is handled through the gRPC streaming protocol rather than local polling. Includes unit tests for connection string parsing and backend construction. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- backend/durabletaskscheduler/options.go | 101 +++++ backend/durabletaskscheduler/scheduler.go | 418 ++++++++++++++++++ .../durabletaskscheduler/scheduler_test.go | 149 +++++++ backend/executor.go | 2 +- client/client_grpc.go | 5 + client/worker_grpc.go | 52 ++- go.mod | 29 +- go.sum | 72 +-- internal/protos/completion_token.go | 87 ++++ samples/sequence_dts/sequence_dts.go | 120 +++++ 10 files changed, 988 insertions(+), 47 deletions(-) create mode 100644 backend/durabletaskscheduler/options.go create mode 100644 backend/durabletaskscheduler/scheduler.go create mode 100644 backend/durabletaskscheduler/scheduler_test.go create mode 100644 internal/protos/completion_token.go create mode 100644 samples/sequence_dts/sequence_dts.go diff --git a/backend/durabletaskscheduler/options.go b/backend/durabletaskscheduler/options.go new file mode 100644 index 00000000..9781ff44 --- /dev/null +++ b/backend/durabletaskscheduler/options.go @@ -0,0 +1,101 @@ +package durabletaskscheduler + +import ( + "fmt" + "strings" +) + +const ( + // DefaultResourceID is the default Azure resource ID used for token authentication with DTS. + DefaultResourceID = "https://durabletask.io" + + // DefaultAuthType is the default authentication type. + DefaultAuthType = AuthTypeDefaultAzure +) + +// AuthType represents the authentication type for connecting to DTS. +type AuthType string + +const ( + // AuthTypeNone disables authentication (for local development). + AuthTypeNone AuthType = "none" + + // AuthTypeDefaultAzure uses DefaultAzureCredential from the Azure Identity SDK. + AuthTypeDefaultAzure AuthType = "defaultazure" +) + +// Options configures the connection to a Durable Task Scheduler (DTS) endpoint. +type Options struct { + // EndpointAddress is the DTS endpoint URL (e.g., "https://myscheduler.westus2.durabletask.io"). + EndpointAddress string + + // TaskHubName is the name of the task hub resource. + TaskHubName string + + // AuthType specifies how to authenticate with the DTS service. + AuthType AuthType + + // ResourceID is the Azure resource ID used for token scoping. Defaults to "https://durabletask.io". + ResourceID string +} + +// NewOptions creates a new Options with the given endpoint and task hub name. +func NewOptions(endpointAddress, taskHubName string) *Options { + return &Options{ + EndpointAddress: endpointAddress, + TaskHubName: taskHubName, + AuthType: DefaultAuthType, + ResourceID: DefaultResourceID, + } +} + +// NewOptionsFromConnectionString parses a connection string into Options. +// +// The connection string format is: +// +// Endpoint=https://{scheduler-name}.{region}.durabletask.io;TaskHub={taskhub-name};Authentication={auth-type} +// +// Required keys: Endpoint, TaskHub. +// Optional keys: Authentication (defaults to "defaultazure"). +func NewOptionsFromConnectionString(connectionString string) (*Options, error) { + opts := &Options{ + AuthType: DefaultAuthType, + ResourceID: DefaultResourceID, + } + + parts := strings.Split(connectionString, ";") + for _, part := range parts { + part = strings.TrimSpace(part) + if part == "" { + continue + } + + kv := strings.SplitN(part, "=", 2) + if len(kv) != 2 { + return nil, fmt.Errorf("invalid connection string segment: %q", part) + } + + key := strings.TrimSpace(kv[0]) + value := strings.TrimSpace(kv[1]) + + switch strings.ToLower(key) { + case "endpoint": + opts.EndpointAddress = value + case "taskhub": + opts.TaskHubName = value + case "authentication": + opts.AuthType = AuthType(strings.ToLower(value)) + default: + // Ignore unknown keys for forward compatibility + } + } + + if opts.EndpointAddress == "" { + return nil, fmt.Errorf("connection string is missing required 'Endpoint' key") + } + if opts.TaskHubName == "" { + return nil, fmt.Errorf("connection string is missing required 'TaskHub' key") + } + + return opts, nil +} diff --git a/backend/durabletaskscheduler/scheduler.go b/backend/durabletaskscheduler/scheduler.go new file mode 100644 index 00000000..8acea642 --- /dev/null +++ b/backend/durabletaskscheduler/scheduler.go @@ -0,0 +1,418 @@ +package durabletaskscheduler + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "sync" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/backend" + "github.com/microsoft/durabletask-go/internal/protos" +) + +type Backend struct { + options *Options + logger backend.Logger + workerName string + client protos.TaskHubSidecarServiceClient + conn *grpc.ClientConn + credential azcore.TokenCredential + mu sync.Mutex + started bool +} + +var ( + errNilHistoryEvent = errors.New("HistoryEvent must be non-nil") + errNotExecutionStarted = errors.New("HistoryEvent must be an ExecutionStartedEvent") +) + +// NewBackend creates a new Backend backed by a Durable Task Scheduler (DTS) service. +func NewBackend(opts *Options, logger backend.Logger) *Backend { + hostname, _ := os.Hostname() + pid := os.Getpid() + workerName := fmt.Sprintf("%s,%d,%s", hostname, pid, uuid.NewString()) + + return &Backend{ + options: opts, + logger: logger, + workerName: workerName, + } +} + +// CreateTaskHub implements backend.Backend. +// For DTS, task hubs are managed externally, so this is a no-op. +func (be *Backend) CreateTaskHub(ctx context.Context) error { + return nil +} + +// DeleteTaskHub implements backend.Backend. +// For DTS, task hubs are managed externally, so this is a no-op. +func (be *Backend) DeleteTaskHub(ctx context.Context) error { + return nil +} + +// Start implements backend.Backend. +func (be *Backend) Start(ctx context.Context) error { + be.mu.Lock() + defer be.mu.Unlock() + + if be.started { + return backend.ErrBackendAlreadyStarted + } + + conn, err := be.createConnection() + if err != nil { + return fmt.Errorf("failed to connect to DTS endpoint: %w", err) + } + + be.conn = conn + be.client = protos.NewTaskHubSidecarServiceClient(conn) + be.started = true + be.logger.Infof("durabletaskscheduler backend started: %s (task hub: %s)", be.options.EndpointAddress, be.options.TaskHubName) + return nil +} + +// Stop implements backend.Backend. +func (be *Backend) Stop(ctx context.Context) error { + be.mu.Lock() + defer be.mu.Unlock() + + if !be.started { + return nil + } + + be.started = false + if be.conn != nil { + if err := be.conn.Close(); err != nil { + return fmt.Errorf("failed to close gRPC connection: %w", err) + } + be.conn = nil + } + be.logger.Info("durabletaskscheduler backend stopped") + return nil +} + +// CreateOrchestrationInstance implements backend.Backend. +func (be *Backend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error { + if err := be.ensureStarted(); err != nil { + return err + } + + startEvent := e.GetExecutionStarted() + if startEvent == nil { + return errNotExecutionStarted + } + + req := &protos.CreateInstanceRequest{ + Name: startEvent.Name, + InstanceId: startEvent.OrchestrationInstance.GetInstanceId(), + Input: startEvent.Input, + } + + if startEvent.GetScheduledStartTimestamp() != nil { + req.ScheduledStartTimestamp = startEvent.GetScheduledStartTimestamp() + } + + policy := &protos.OrchestrationIdReusePolicy{} + for _, opt := range opts { + if err := opt(policy); err != nil { + return fmt.Errorf("failed to configure orchestration ID reuse policy: %w", err) + } + } + if len(policy.OperationStatus) > 0 || policy.Action != 0 { + req.OrchestrationIdReusePolicy = policy + } + + _, err := be.client.StartInstance(ctx, req) + if err != nil { + return fmt.Errorf("failed to create orchestration instance: %w", err) + } + return nil +} + +// AddNewOrchestrationEvent implements backend.Backend. +func (be *Backend) AddNewOrchestrationEvent(ctx context.Context, iid api.InstanceID, e *backend.HistoryEvent) error { + if err := be.ensureStarted(); err != nil { + return err + } + if e == nil { + return errNilHistoryEvent + } + + // Route to the appropriate gRPC call based on event type + if et := e.GetExecutionTerminated(); et != nil { + _, err := be.client.TerminateInstance(ctx, &protos.TerminateRequest{ + InstanceId: string(iid), + Output: et.Input, + Recursive: et.Recurse, + }) + if err != nil { + return fmt.Errorf("failed to terminate instance: %w", err) + } + return nil + } + + if er := e.GetEventRaised(); er != nil { + _, err := be.client.RaiseEvent(ctx, &protos.RaiseEventRequest{ + InstanceId: string(iid), + Name: er.Name, + Input: er.Input, + }) + if err != nil { + return fmt.Errorf("failed to raise event: %w", err) + } + return nil + } + + if e.GetExecutionSuspended() != nil { + _, err := be.client.SuspendInstance(ctx, &protos.SuspendRequest{ + InstanceId: string(iid), + }) + if err != nil { + return fmt.Errorf("failed to suspend instance: %w", err) + } + return nil + } + + if e.GetExecutionResumed() != nil { + _, err := be.client.ResumeInstance(ctx, &protos.ResumeRequest{ + InstanceId: string(iid), + }) + if err != nil { + return fmt.Errorf("failed to resume instance: %w", err) + } + return nil + } + + return fmt.Errorf("unsupported orchestration event type: %v", e) +} + +// GetOrchestrationWorkItem implements backend.Backend. +func (be *Backend) GetOrchestrationWorkItem(ctx context.Context) (*backend.OrchestrationWorkItem, error) { + // DTS uses a streaming model via GetWorkItems, which is handled by the executor layer. + // This method is not used when the DTS scheduler is the backend since work items + // are dispatched through the gRPC streaming protocol. + return nil, backend.ErrNoWorkItems +} + +// GetOrchestrationRuntimeState implements backend.Backend. +func (be *Backend) GetOrchestrationRuntimeState(ctx context.Context, wi *backend.OrchestrationWorkItem) (*backend.OrchestrationRuntimeState, error) { + if err := be.ensureStarted(); err != nil { + return nil, err + } + + // The runtime state is populated from the work item's events, which are provided + // by the DTS service via the GetWorkItems stream. + return backend.NewOrchestrationRuntimeState(wi.InstanceID, wi.NewEvents), nil +} + +// GetOrchestrationMetadata implements backend.Backend. +func (be *Backend) GetOrchestrationMetadata(ctx context.Context, iid api.InstanceID) (*api.OrchestrationMetadata, error) { + if err := be.ensureStarted(); err != nil { + return nil, err + } + + resp, err := be.client.GetInstance(ctx, &protos.GetInstanceRequest{ + InstanceId: string(iid), + GetInputsAndOutputs: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to get orchestration metadata: %w", err) + } + + if !resp.Exists || resp.OrchestrationState == nil { + return nil, api.ErrInstanceNotFound + } + + state := resp.OrchestrationState + metadata := api.NewOrchestrationMetadata( + iid, + state.Name, + state.OrchestrationStatus, + state.CreatedTimestamp.AsTime(), + state.LastUpdatedTimestamp.AsTime(), + state.Input.GetValue(), + state.Output.GetValue(), + state.CustomStatus.GetValue(), + state.FailureDetails, + ) + + return metadata, nil +} + +// CompleteOrchestrationWorkItem implements backend.Backend. +func (be *Backend) CompleteOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error { + // Work item completion is handled by the executor layer via CompleteOrchestratorTask gRPC call. + // The scheduler backend doesn't need to do additional processing here. + return nil +} + +// AbandonOrchestrationWorkItem implements backend.Backend. +func (be *Backend) AbandonOrchestrationWorkItem(ctx context.Context, wi *backend.OrchestrationWorkItem) error { + // Work item abandonment is handled by the DTS service automatically when the + // streaming connection is dropped or the work item is not completed within the lock timeout. + return nil +} + +// GetActivityWorkItem implements backend.Backend. +func (be *Backend) GetActivityWorkItem(ctx context.Context) (*backend.ActivityWorkItem, error) { + // Similar to GetOrchestrationWorkItem, activity work items are dispatched through + // the gRPC streaming protocol by the DTS service. + return nil, backend.ErrNoWorkItems +} + +// CompleteActivityWorkItem implements backend.Backend. +func (be *Backend) CompleteActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error { + // Activity completion is handled by the executor layer via CompleteActivityTask gRPC call. + return nil +} + +// AbandonActivityWorkItem implements backend.Backend. +func (be *Backend) AbandonActivityWorkItem(ctx context.Context, wi *backend.ActivityWorkItem) error { + // Activity abandonment is handled automatically by the DTS service. + return nil +} + +// PurgeOrchestrationState implements backend.Backend. +func (be *Backend) PurgeOrchestrationState(ctx context.Context, id api.InstanceID) error { + if err := be.ensureStarted(); err != nil { + return err + } + + resp, err := be.client.PurgeInstances(ctx, &protos.PurgeInstancesRequest{ + Request: &protos.PurgeInstancesRequest_InstanceId{InstanceId: string(id)}, + }) + if err != nil { + return fmt.Errorf("failed to purge orchestration state: %w", err) + } + + if resp.GetDeletedInstanceCount() == 0 { + return api.ErrInstanceNotFound + } + return nil +} + +// String implements fmt.Stringer for logging. +func (be *Backend) String() string { + return fmt.Sprintf("durabletaskscheduler::%s/%s", be.options.EndpointAddress, be.options.TaskHubName) +} + +func (be *Backend) ensureStarted() error { + if !be.started { + return backend.ErrNotInitialized + } + return nil +} + +// Connection returns the underlying gRPC client connection. This can be used to create +// a [client.TaskHubGrpcClient] for streaming work item processing. +// Must be called after Start(). +func (be *Backend) Connection() grpc.ClientConnInterface { + return be.conn +} + +func (be *Backend) createConnection() (*grpc.ClientConn, error) { + target := be.options.EndpointAddress + isTLS := strings.HasPrefix(target, "https://") + target = strings.TrimPrefix(target, "https://") + target = strings.TrimPrefix(target, "http://") + + // Add default port if not specified + if !strings.Contains(target, ":") { + if isTLS { + target += ":443" + } else { + target += ":80" + } + } + + var transportCreds credentials.TransportCredentials + if isTLS { + transportCreds = credentials.NewTLS(nil) + } else { + transportCreds = insecure.NewCredentials() + } + + // Set up Azure credential if using DefaultAzure auth + if be.options.AuthType == AuthTypeDefaultAzure { + cred, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure credential: %w", err) + } + be.credential = cred + } + + dialOpts := []grpc.DialOption{ + grpc.WithTransportCredentials(transportCreds), + grpc.WithUnaryInterceptor(be.unaryMetadataInterceptor()), + grpc.WithStreamInterceptor(be.streamMetadataInterceptor()), + } + + conn, err := grpc.Dial(target, dialOpts...) + if err != nil { + return nil, fmt.Errorf("failed to create gRPC connection: %w", err) + } + + return conn, nil +} + +func (be *Backend) unaryMetadataInterceptor() grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply any, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + ctx = be.injectMetadata(ctx) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func (be *Backend) streamMetadataInterceptor() grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + ctx = be.injectMetadata(ctx) + return streamer(ctx, desc, cc, method, opts...) + } +} + +func (be *Backend) injectMetadata(ctx context.Context) context.Context { + pairs := []string{ + "taskhub", be.options.TaskHubName, + "workerid", be.workerName, + } + + if be.credential != nil { + token, err := be.credential.GetToken(ctx, policy.TokenRequestOptions{ + Scopes: []string{be.options.ResourceID + "/.default"}, + }) + if err != nil { + be.logger.Warnf("failed to get access token: %v", err) + } else { + pairs = append(pairs, "authorization", "Bearer "+token.Token) + } + } + + md := metadata.Pairs(pairs...) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/backend/durabletaskscheduler/scheduler_test.go b/backend/durabletaskscheduler/scheduler_test.go new file mode 100644 index 00000000..a3bec975 --- /dev/null +++ b/backend/durabletaskscheduler/scheduler_test.go @@ -0,0 +1,149 @@ +package durabletaskscheduler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewOptions(t *testing.T) { + opts := NewOptions("https://myscheduler.westus2.durabletask.io", "my-task-hub") + + assert.Equal(t, "https://myscheduler.westus2.durabletask.io", opts.EndpointAddress) + assert.Equal(t, "my-task-hub", opts.TaskHubName) + assert.Equal(t, DefaultAuthType, opts.AuthType) + assert.Equal(t, DefaultResourceID, opts.ResourceID) +} + +func TestNewOptionsFromConnectionString(t *testing.T) { + tests := []struct { + name string + connStr string + wantOpts *Options + wantErr bool + errContains string + }{ + { + name: "full connection string", + connStr: "Endpoint=https://myscheduler.westus2.durabletask.io;TaskHub=my-task-hub;Authentication=defaultazure", + wantOpts: &Options{ + EndpointAddress: "https://myscheduler.westus2.durabletask.io", + TaskHubName: "my-task-hub", + AuthType: AuthTypeDefaultAzure, + ResourceID: DefaultResourceID, + }, + }, + { + name: "minimal connection string (no auth)", + connStr: "Endpoint=https://myscheduler.westus2.durabletask.io;TaskHub=my-task-hub", + wantOpts: &Options{ + EndpointAddress: "https://myscheduler.westus2.durabletask.io", + TaskHubName: "my-task-hub", + AuthType: DefaultAuthType, + ResourceID: DefaultResourceID, + }, + }, + { + name: "auth type none", + connStr: "Endpoint=http://localhost:4001;TaskHub=test;Authentication=none", + wantOpts: &Options{ + EndpointAddress: "http://localhost:4001", + TaskHubName: "test", + AuthType: AuthTypeNone, + ResourceID: DefaultResourceID, + }, + }, + { + name: "case insensitive keys", + connStr: "endpoint=https://test.durabletask.io;taskhub=hub1;authentication=DefaultAzure", + wantOpts: &Options{ + EndpointAddress: "https://test.durabletask.io", + TaskHubName: "hub1", + AuthType: AuthTypeDefaultAzure, + ResourceID: DefaultResourceID, + }, + }, + { + name: "extra whitespace", + connStr: " Endpoint = https://test.durabletask.io ; TaskHub = hub1 ", + wantOpts: &Options{ + EndpointAddress: "https://test.durabletask.io", + TaskHubName: "hub1", + AuthType: DefaultAuthType, + ResourceID: DefaultResourceID, + }, + }, + { + name: "unknown keys are ignored", + connStr: "Endpoint=https://test.durabletask.io;TaskHub=hub1;ClientID=abc-123", + wantOpts: &Options{ + EndpointAddress: "https://test.durabletask.io", + TaskHubName: "hub1", + AuthType: DefaultAuthType, + ResourceID: DefaultResourceID, + }, + }, + { + name: "missing endpoint", + connStr: "TaskHub=my-task-hub", + wantErr: true, + errContains: "Endpoint", + }, + { + name: "missing task hub", + connStr: "Endpoint=https://test.durabletask.io", + wantErr: true, + errContains: "TaskHub", + }, + { + name: "empty string", + connStr: "", + wantErr: true, + errContains: "Endpoint", + }, + { + name: "invalid segment", + connStr: "Endpoint=https://test.durabletask.io;TaskHub=hub1;badformat", + wantErr: true, + errContains: "invalid connection string segment", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts, err := NewOptionsFromConnectionString(tt.connStr) + + if tt.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errContains) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.wantOpts.EndpointAddress, opts.EndpointAddress) + assert.Equal(t, tt.wantOpts.TaskHubName, opts.TaskHubName) + assert.Equal(t, tt.wantOpts.AuthType, opts.AuthType) + assert.Equal(t, tt.wantOpts.ResourceID, opts.ResourceID) + }) + } +} + +func TestNewBackend(t *testing.T) { + opts := NewOptions("https://test.durabletask.io", "test-hub") + be := NewBackend(opts, nil) + + require.NotNil(t, be) + assert.Contains(t, be.String(), "test.durabletask.io") + assert.Contains(t, be.String(), "test-hub") + assert.NotEmpty(t, be.workerName) +} + +func TestBackend_EnsureStarted(t *testing.T) { + opts := NewOptions("https://test.durabletask.io", "test-hub") + be := NewBackend(opts, nil) + + // Should fail before Start is called + err := be.ensureStarted() + require.Error(t, err) +} diff --git a/backend/executor.go b/backend/executor.go index 570988cd..c00aae04 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -230,7 +230,7 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot if err := callback(stream.Context()); err != nil { message := "unable to establish work item stream at this time: " + err.Error() g.logger.Warn(message) - return status.Errorf(codes.Unavailable, message) + return status.Errorf(codes.Unavailable, "%s", message) } } diff --git a/client/client_grpc.go b/client/client_grpc.go index 996fd985..fcdce375 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -19,6 +19,9 @@ import ( type TaskHubGrpcClient struct { client protos.TaskHubSidecarServiceClient logger backend.Logger + cancel context.CancelFunc + stop chan struct{} + done chan struct{} } // NewTaskHubGrpcClient creates a client that can be used to manage orchestrations over a gRPC connection. @@ -27,6 +30,8 @@ func NewTaskHubGrpcClient(cc grpc.ClientConnInterface, logger backend.Logger) *T return &TaskHubGrpcClient{ client: protos.NewTaskHubSidecarServiceClient(cc), logger: logger, + stop: make(chan struct{}), + done: make(chan struct{}), } } diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 69c1bea0..54851161 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -26,6 +26,9 @@ type workItemsStream interface { func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error { executor := task.NewTaskExecutor(r) + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + var stream workItemsStream initStream := func() error { @@ -45,11 +48,13 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T c.logger.Infof("connecting work item listener stream") err := initStream() if err != nil { + cancel() return err } go func() { c.logger.Info("starting background processor") + defer close(c.done) defer func() { c.logger.Info("stopping background processor") // We must use a background context here as the stream's context is likely canceled @@ -63,10 +68,13 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T workItem, err := stream.Recv() if err != nil { - // user wants to stop the listener - if ctx.Err() != nil { - c.logger.Infof("stopping background processor: %v", err) + // Check if this is a graceful shutdown before logging any errors. + // StopWorkItemListener closes the stop channel before cancelling + // the context, so a closed stop channel means this error is expected. + select { + case <-c.stop: return + default: } retriable := false @@ -92,9 +100,10 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T err = backoff.Retry( func() error { - // user wants to stop the listener - if ctx.Err() != nil { - return backoff.Permanent(ctx.Err()) + select { + case <-c.stop: + return backoff.Permanent(fmt.Errorf("work item listener stopped")) + default: } c.logger.Infof("reconnecting work item listener stream") @@ -109,6 +118,11 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T newInfiniteRetries(), ) if err != nil { + select { + case <-c.stop: + return + default: + } c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err) return } @@ -118,9 +132,11 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T } if orchReq := workItem.GetOrchestratorRequest(); orchReq != nil { - go c.processOrchestrationWorkItem(ctx, executor, orchReq) + completionToken := protos.GetWorkItemCompletionToken(workItem) + go c.processOrchestrationWorkItem(ctx, executor, orchReq, completionToken) } else if actReq := workItem.GetActivityRequest(); actReq != nil { - go c.processActivityWorkItem(ctx, executor, actReq) + completionToken := protos.GetWorkItemCompletionToken(workItem) + go c.processActivityWorkItem(ctx, executor, actReq, completionToken) } else { c.logger.Warnf("received unknown work item type: %v", workItem) } @@ -133,10 +149,12 @@ func (c *TaskHubGrpcClient) processOrchestrationWorkItem( ctx context.Context, executor backend.Executor, workItem *protos.OrchestratorRequest, + completionToken string, ) { results, err := executor.ExecuteOrchestrator(ctx, api.InstanceID(workItem.InstanceId), workItem.PastEvents, workItem.NewEvents) resp := protos.OrchestratorResponse{InstanceId: workItem.InstanceId} + protos.SetOrchestratorResponseCompletionToken(&resp, completionToken) if err != nil { // NOTE: At the time of writing, there's no known case where this error is returned. // We add error handling here anyways, just in case. @@ -168,12 +186,14 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( ctx context.Context, executor backend.Executor, req *protos.ActivityRequest, + completionToken string, ) { var tc *protos.TraceContext = nil // TODO: How to populate trace context? event := helpers.NewTaskScheduledEvent(req.TaskId, req.Name, req.Version, req.Input, tc) result, err := executor.ExecuteActivity(ctx, api.InstanceID(req.OrchestrationInstance.InstanceId), event) resp := protos.ActivityResponse{InstanceId: req.OrchestrationInstance.InstanceId, TaskId: req.TaskId} + protos.SetActivityResponseCompletionToken(&resp, completionToken) if err != nil { // NOTE: At the time of writing, there's no known case where this error is returned. // We add error handling here anyways, just in case. @@ -201,6 +221,22 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( } } +// StopWorkItemListener gracefully stops the work item listener that was started by StartWorkItemListener. +// The provided context controls how long to wait for the listener to shut down. +// This must be called before closing the underlying gRPC connection to ensure a clean shutdown. +func (c *TaskHubGrpcClient) StopWorkItemListener(ctx context.Context) error { + close(c.stop) + if c.cancel != nil { + c.cancel() + } + select { + case <-c.done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + func newInfiniteRetries() *backoff.ExponentialBackOff { b := backoff.NewExponentialBackOff() // max wait of 15 seconds between retries diff --git a/go.mod b/go.mod index ed007103..cb556fd5 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,15 @@ module github.com/microsoft/durabletask-go -go 1.23.0 +go 1.24.0 require ( + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 github.com/cenkalti/backoff/v4 v4.1.3 - github.com/google/uuid v1.3.0 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 go.opentelemetry.io/otel v1.18.0 go.opentelemetry.io/otel/exporters/zipkin v1.11.1 @@ -19,29 +21,34 @@ require ( ) require ( + github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang-jwt/jwt/v5 v5.3.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/openzipkin/zipkin-go v0.4.1 // indirect + github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect - github.com/stretchr/objx v0.5.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.18.0 // indirect - golang.org/x/crypto v0.36.0 // indirect - golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.38.0 // indirect - golang.org/x/sync v0.12.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect - golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect + golang.org/x/crypto v0.47.0 // indirect + golang.org/x/mod v0.31.0 // indirect + golang.org/x/net v0.49.0 // indirect + golang.org/x/sync v0.19.0 // indirect + golang.org/x/sys v0.40.0 // indirect + golang.org/x/text v0.33.0 // indirect + golang.org/x/tools v0.40.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect diff --git a/go.sum b/go.sum index 050e23d5..20b30c70 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,15 @@ +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 h1:fou+2+WFTib47nS+nz/ozhEBnvU96bKHy6LjRsY4E28= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0/go.mod h1:t76Ruy8AHvUAC8GfMWJMa0ElSbuIcO03NLpynfbgsPA= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpzme37xbCDdNTxU7O9eb5+LB4= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2 h1:yz1bePFlP5Vws5+8ez6T3HWXPmwOK7Yvq8QxDBD3SKY= +github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2/go.mod h1:Pa9ZNPuoNu/GztvBSKk9J1cDJW6vk/n0zLtV4mgd8N8= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= +github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= +github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mod h1:tCcJZ0uHAmvjsVYzEFivsRTN00oz5BEsRgQHu5JZ9WE= +github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs= +github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -12,6 +24,8 @@ github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= +github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -20,8 +34,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -32,10 +46,14 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU= +github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= @@ -44,23 +62,22 @@ github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwp github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ= +github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 h1:KfYpVmrjI7JuToy5k8XV3nkapjWx48k4E4JOtVstzQI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0/go.mod h1:SeQhzAEccGVZVEy7aH87Nh0km+utSpo1pTv6eMMop48= go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= @@ -73,21 +90,22 @@ go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZp go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= -golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= -golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= -golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= +golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= +golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= +golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= +golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= +golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= +golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= +golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= +golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= +golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= +golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= +golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= diff --git a/internal/protos/completion_token.go b/internal/protos/completion_token.go new file mode 100644 index 00000000..2402d982 --- /dev/null +++ b/internal/protos/completion_token.go @@ -0,0 +1,87 @@ +package protos + +import "google.golang.org/protobuf/encoding/protowire" + +const ( + workItemCompletionTokenField = 10 // WorkItem.completionToken proto field number + orchestratorResponseCompletionTokenField = 4 // OrchestratorResponse.completionToken proto field number + activityResponseCompletionTokenField = 5 // ActivityResponse.completionToken proto field number +) + +// GetWorkItemCompletionToken extracts the completionToken (field 10) from a WorkItem's +// unknown fields. This field exists in the upstream proto but is not yet in the generated code. +func GetWorkItemCompletionToken(wi *WorkItem) string { + return extractStringField(wi.ProtoReflect().GetUnknown(), workItemCompletionTokenField) +} + +// SetOrchestratorResponseCompletionToken sets the completionToken (field 4) on an +// OrchestratorResponse as an unknown field. This field exists in the upstream proto +// but is not yet in the generated code. +func SetOrchestratorResponseCompletionToken(resp *OrchestratorResponse, token string) { + if token == "" { + return + } + b := protowire.AppendTag(nil, orchestratorResponseCompletionTokenField, protowire.BytesType) + b = protowire.AppendString(b, token) + raw := resp.ProtoReflect().GetUnknown() + raw = append(raw, b...) + resp.ProtoReflect().SetUnknown(raw) +} + +// SetActivityResponseCompletionToken sets the completionToken (field 5) on an +// ActivityResponse as an unknown field. This field exists in the upstream proto +// but is not yet in the generated code. +func SetActivityResponseCompletionToken(resp *ActivityResponse, token string) { + if token == "" { + return + } + b := protowire.AppendTag(nil, activityResponseCompletionTokenField, protowire.BytesType) + b = protowire.AppendString(b, token) + raw := resp.ProtoReflect().GetUnknown() + raw = append(raw, b...) + resp.ProtoReflect().SetUnknown(raw) +} + +// extractStringField extracts a string field with the given field number from raw protobuf bytes. +func extractStringField(raw []byte, fieldNum protowire.Number) string { + for len(raw) > 0 { + num, typ, n := protowire.ConsumeTag(raw) + if n < 0 { + break + } + raw = raw[n:] + + switch typ { + case protowire.BytesType: + val, vn := protowire.ConsumeBytes(raw) + if vn < 0 { + return "" + } + if num == fieldNum { + return string(val) + } + raw = raw[vn:] + case protowire.VarintType: + _, vn := protowire.ConsumeVarint(raw) + if vn < 0 { + return "" + } + raw = raw[vn:] + case protowire.Fixed32Type: + _, vn := protowire.ConsumeFixed32(raw) + if vn < 0 { + return "" + } + raw = raw[vn:] + case protowire.Fixed64Type: + _, vn := protowire.ConsumeFixed64(raw) + if vn < 0 { + return "" + } + raw = raw[vn:] + default: + return "" + } + } + return "" +} diff --git a/samples/sequence_dts/sequence_dts.go b/samples/sequence_dts/sequence_dts.go new file mode 100644 index 00000000..630bf3eb --- /dev/null +++ b/samples/sequence_dts/sequence_dts.go @@ -0,0 +1,120 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + + "github.com/microsoft/durabletask-go/backend" + "github.com/microsoft/durabletask-go/backend/durabletaskscheduler" + "github.com/microsoft/durabletask-go/client" + "github.com/microsoft/durabletask-go/task" +) + +func main() { + if err := run(); err != nil { + log.Fatal(err) + } +} + +func run() error { + // Create a new task registry and add the orchestrator and activities + r := task.NewTaskRegistry() + if err := r.AddOrchestrator(ActivitySequenceOrchestrator); err != nil { + return fmt.Errorf("failed to add orchestrator: %w", err) + } + if err := r.AddActivity(SayHelloActivity); err != nil { + return fmt.Errorf("failed to add activity: %w", err) + } + + // Init the gRPC client + ctx := context.Background() + grpcClient, be, err := Init(ctx, r) + if err != nil { + return fmt.Errorf("failed to initialize the client: %w", err) + } + defer func() { + if err := grpcClient.StopWorkItemListener(ctx); err != nil { + log.Printf("Failed to stop work item listener: %v", err) + } + if err := be.Stop(ctx); err != nil { + log.Printf("Failed to stop backend: %v", err) + } + }() + + // Start a new orchestration + id, err := grpcClient.ScheduleNewOrchestration(ctx, "ActivitySequenceOrchestrator") + if err != nil { + return fmt.Errorf("failed to schedule new orchestration: %w", err) + } + + // Wait for the orchestration to complete + metadata, err := grpcClient.WaitForOrchestrationCompletion(ctx, id) + if err != nil { + return fmt.Errorf("failed to wait for orchestration to complete: %w", err) + } + + // Print the results + metadataEnc, err := json.MarshalIndent(metadata, "", " ") + if err != nil { + return fmt.Errorf("failed to encode result to JSON: %w", err) + } + log.Printf("Orchestration completed: %v", string(metadataEnc)) + return nil +} + +// Init creates and initializes a DTS-backed client. +// TODO: Update the endpoint address, task hub name, and scheduler configuration +// to match your DTS environment. +func Init(ctx context.Context, r *task.TaskRegistry) (*client.TaskHubGrpcClient, *durabletaskscheduler.Backend, error) { + logger := backend.DefaultLogger() + + // Create a new DTS scheduler backend and establish the gRPC connection + opts, err := durabletaskscheduler.NewOptionsFromConnectionString("Endpoint=https://halspang-priv-d-f7e3cjgz.centraluseuap.durabletask.io;TaskHub=default;Authentication=DefaultAzure") + if err != nil { + return nil, nil, fmt.Errorf("failed to parse connection string: %w", err) + } + be := durabletaskscheduler.NewBackend(opts, logger) + if err := be.Start(ctx); err != nil { + return nil, nil, err + } + + // Create the gRPC client and start the streaming work item listener. + // DTS dispatches work items via a gRPC stream rather than backend polling. + grpcClient := client.NewTaskHubGrpcClient(be.Connection(), logger) + if err := grpcClient.StartWorkItemListener(ctx, r); err != nil { + return nil, nil, fmt.Errorf("failed to start work item listener: %w", err) + } + + return grpcClient, be, nil +} + +// ActivitySequenceOrchestrator makes three activity calls in sequence and returns the results +// as an array. +func ActivitySequenceOrchestrator(ctx *task.OrchestrationContext) (any, error) { + var helloTokyo string + if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Tokyo")).Await(&helloTokyo); err != nil { + return nil, err + } + var helloLondon string + if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("London")).Await(&helloLondon); err != nil { + return nil, err + } + var helloSeattle string + if err := ctx.CallActivity(SayHelloActivity, task.WithActivityInput("Seattle")).Await(&helloSeattle); err != nil { + return nil, err + } + return []string{helloTokyo, helloLondon, helloSeattle}, nil +} + +// SayHelloActivity can be called by an orchestrator function and will return a friendly greeting. +func SayHelloActivity(ctx task.ActivityContext) (any, error) { + var input string + if err := ctx.GetInput(&input); err != nil { + return "", err + } + logger := backend.DefaultLogger() + logger.Infof("Saying hello to %s", input) + return fmt.Sprintf("Hello, %s!", input), nil +} From ee79576df0e0dcb0121210b0fcedc239b358d02b Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Fri, 13 Mar 2026 15:03:47 -0700 Subject: [PATCH 2/6] Update gRPC, fix start/stop race Signed-off-by: Hal Spang --- backend/durabletaskscheduler/scheduler.go | 33 +++++++------- client/client_grpc.go | 12 ++--- client/worker_grpc.go | 9 +++- go.mod | 17 +++---- go.sum | 55 ++++++++++++----------- samples/sequence_dts/sequence_dts.go | 2 +- 6 files changed, 71 insertions(+), 57 deletions(-) diff --git a/backend/durabletaskscheduler/scheduler.go b/backend/durabletaskscheduler/scheduler.go index 8acea642..89a2ff08 100644 --- a/backend/durabletaskscheduler/scheduler.go +++ b/backend/durabletaskscheduler/scheduler.go @@ -2,7 +2,6 @@ package durabletaskscheduler import ( "context" - "errors" "fmt" "os" "strings" @@ -33,11 +32,6 @@ type Backend struct { started bool } -var ( - errNilHistoryEvent = errors.New("HistoryEvent must be non-nil") - errNotExecutionStarted = errors.New("HistoryEvent must be an ExecutionStartedEvent") -) - // NewBackend creates a new Backend backed by a Durable Task Scheduler (DTS) service. func NewBackend(opts *Options, logger backend.Logger) *Backend { hostname, _ := os.Hostname() @@ -112,7 +106,7 @@ func (be *Backend) CreateOrchestrationInstance(ctx context.Context, e *backend.H startEvent := e.GetExecutionStarted() if startEvent == nil { - return errNotExecutionStarted + return backend.ErrNotExecutionStarted } req := &protos.CreateInstanceRequest{ @@ -148,7 +142,7 @@ func (be *Backend) AddNewOrchestrationEvent(ctx context.Context, iid api.Instanc return err } if e == nil { - return errNilHistoryEvent + return backend.ErrNilHistoryEvent } // Route to the appropriate gRPC call based on event type @@ -310,6 +304,8 @@ func (be *Backend) String() string { } func (be *Backend) ensureStarted() error { + be.mu.Lock() + defer be.mu.Unlock() if !be.started { return backend.ErrNotInitialized } @@ -360,7 +356,7 @@ func (be *Backend) createConnection() (*grpc.ClientConn, error) { grpc.WithStreamInterceptor(be.streamMetadataInterceptor()), } - conn, err := grpc.Dial(target, dialOpts...) + conn, err := grpc.NewClient(target, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to create gRPC connection: %w", err) } @@ -377,7 +373,10 @@ func (be *Backend) unaryMetadataInterceptor() grpc.UnaryClientInterceptor { invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) error { - ctx = be.injectMetadata(ctx) + ctx, err := be.injectMetadata(ctx) + if err != nil { + return fmt.Errorf("failed to inject metadata: %w", err) + } return invoker(ctx, method, req, reply, cc, opts...) } } @@ -391,12 +390,15 @@ func (be *Backend) streamMetadataInterceptor() grpc.StreamClientInterceptor { streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { - ctx = be.injectMetadata(ctx) + ctx, err := be.injectMetadata(ctx) + if err != nil { + return nil, fmt.Errorf("failed to inject metadata: %w", err) + } return streamer(ctx, desc, cc, method, opts...) } } -func (be *Backend) injectMetadata(ctx context.Context) context.Context { +func (be *Backend) injectMetadata(ctx context.Context) (context.Context, error) { pairs := []string{ "taskhub", be.options.TaskHubName, "workerid", be.workerName, @@ -407,12 +409,11 @@ func (be *Backend) injectMetadata(ctx context.Context) context.Context { Scopes: []string{be.options.ResourceID + "/.default"}, }) if err != nil { - be.logger.Warnf("failed to get access token: %v", err) - } else { - pairs = append(pairs, "authorization", "Bearer "+token.Token) + return ctx, fmt.Errorf("failed to get access token: %w", err) } + pairs = append(pairs, "authorization", "Bearer "+token.Token) } md := metadata.Pairs(pairs...) - return metadata.NewOutgoingContext(ctx, md) + return metadata.NewOutgoingContext(ctx, md), nil } diff --git a/client/client_grpc.go b/client/client_grpc.go index fcdce375..594d9644 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "sync" "github.com/cenkalti/backoff/v4" "github.com/google/uuid" @@ -17,11 +18,12 @@ import ( // REVIEW: Can this be merged with backend/client.go somehow? type TaskHubGrpcClient struct { - client protos.TaskHubSidecarServiceClient - logger backend.Logger - cancel context.CancelFunc - stop chan struct{} - done chan struct{} + client protos.TaskHubSidecarServiceClient + logger backend.Logger + cancel context.CancelFunc + stop chan struct{} + stopOnce sync.Once + done chan struct{} } // NewTaskHubGrpcClient creates a client that can be used to manage orchestrations over a gRPC connection. diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 54851161..5dcc091f 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -161,7 +161,7 @@ func (c *TaskHubGrpcClient) processOrchestrationWorkItem( failureAction := helpers.NewCompleteOrchestrationAction( -1, protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED, - wrapperspb.String("An internal error occured while executing the orchestration."), + wrapperspb.String("An internal error occurred while executing the orchestration."), nil, &protos.TaskFailureDetails{ ErrorType: fmt.Sprintf("%T", err), @@ -225,9 +225,14 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( // The provided context controls how long to wait for the listener to shut down. // This must be called before closing the underlying gRPC connection to ensure a clean shutdown. func (c *TaskHubGrpcClient) StopWorkItemListener(ctx context.Context) error { - close(c.stop) + c.stopOnce.Do(func() { + close(c.stop) + }) if c.cancel != nil { c.cancel() + } else { + // StartWorkItemListener was never called; nothing to wait for. + return nil } select { case <-c.done: diff --git a/go.mod b/go.mod index cb556fd5..3bdfc433 100644 --- a/go.mod +++ b/go.mod @@ -11,25 +11,25 @@ require ( github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 - go.opentelemetry.io/otel v1.18.0 + go.opentelemetry.io/otel v1.39.0 go.opentelemetry.io/otel/exporters/zipkin v1.11.1 - go.opentelemetry.io/otel/sdk v1.11.1 - go.opentelemetry.io/otel/trace v1.18.0 - google.golang.org/grpc v1.56.3 - google.golang.org/protobuf v1.33.0 + go.opentelemetry.io/otel/sdk v1.39.0 + go.opentelemetry.io/otel/trace v1.39.0 + google.golang.org/grpc v1.79.2 + google.golang.org/protobuf v1.36.10 modernc.org/sqlite v1.22.1 ) require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt/v5 v5.3.0 // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -41,7 +41,8 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/stretchr/objx v0.5.2 // indirect - go.opentelemetry.io/otel/metric v1.18.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect golang.org/x/crypto v0.47.0 // indirect golang.org/x/mod v0.31.0 // indirect golang.org/x/net v0.49.0 // indirect diff --git a/go.sum b/go.sum index 20b30c70..8b836b28 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgv github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk= github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -20,18 +22,16 @@ github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+m github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= @@ -69,8 +69,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= -github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= @@ -78,18 +78,24 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 h1:KfYpVmrjI7JuToy5k8XV3nkapjWx48k4E4JOtVstzQI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0/go.mod h1:SeQhzAEccGVZVEy7aH87Nh0km+utSpo1pTv6eMMop48= -go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= -go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= go.opentelemetry.io/otel/exporters/zipkin v1.11.1 h1:JlJ3/oQoyqlrPDCfsSVFcHgGeHvZq+hr1VPWtiYCXTo= go.opentelemetry.io/otel/exporters/zipkin v1.11.1/go.mod h1:T4S6aVwIS1+MHA+dJHCcPROtZe6ORwnv5vMKPRapsFw= -go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= -go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= -go.opentelemetry.io/otel/sdk v1.11.1 h1:F7KmQgoHljhUuJyA+9BiU+EkJfyX5nVVF4wyzWZpKxs= -go.opentelemetry.io/otel/sdk v1.11.1/go.mod h1:/l3FE4SupHJ12TduVjUkZtlfFqDCQJlOlithYrdktys= -go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= -go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= @@ -106,15 +112,14 @@ golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= -google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= +google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/samples/sequence_dts/sequence_dts.go b/samples/sequence_dts/sequence_dts.go index 630bf3eb..5e8a8110 100644 --- a/samples/sequence_dts/sequence_dts.go +++ b/samples/sequence_dts/sequence_dts.go @@ -71,7 +71,7 @@ func Init(ctx context.Context, r *task.TaskRegistry) (*client.TaskHubGrpcClient, logger := backend.DefaultLogger() // Create a new DTS scheduler backend and establish the gRPC connection - opts, err := durabletaskscheduler.NewOptionsFromConnectionString("Endpoint=https://halspang-priv-d-f7e3cjgz.centraluseuap.durabletask.io;TaskHub=default;Authentication=DefaultAzure") + opts, err := durabletaskscheduler.NewOptionsFromConnectionString("Endpoint=https://my-scheduler.westus2.durabletask.io;TaskHub=default;Authentication=DefaultAzure") if err != nil { return nil, nil, fmt.Errorf("failed to parse connection string: %w", err) } From 419f3a031fec75d1991086d02f7dce2cdde041d7 Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Fri, 13 Mar 2026 15:13:06 -0700 Subject: [PATCH 3/6] Fix linter issues Signed-off-by: Hal Spang --- samples/heterogeneous/heterogeneous.go | 7 ++----- tests/grpc/grpc_test.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/samples/heterogeneous/heterogeneous.go b/samples/heterogeneous/heterogeneous.go index 01a0129f..67fb7804 100644 --- a/samples/heterogeneous/heterogeneous.go +++ b/samples/heterogeneous/heterogeneous.go @@ -236,12 +236,9 @@ func setupGrpcExecutor(ctx context.Context, be backend.Backend, logger backend.L }() // Create a worker that connects to the gRPC server. - // establish a gRPC connection, blocking until the server is ready or the timeout expires - conn, err := grpc.DialContext( - ctx, - lis.Addr().String(), + conn, err := grpc.NewClient( + "dns:///"+lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), ) if err != nil { return nil, err diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index e5bf6d35..a44a3035 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -57,7 +57,7 @@ func TestMain(m *testing.M) { time.Sleep(1 * time.Second) - conn, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient("dns:///"+lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("failed to connect to gRPC server: %v", err) } From 515bdbf006ae2524786d68b3efd27d04d8f91b89 Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Fri, 13 Mar 2026 18:39:31 -0700 Subject: [PATCH 4/6] Fix connection/start issues Signed-off-by: Hal Spang --- backend/durabletaskscheduler/scheduler.go | 16 ++++++++++++---- client/client_grpc.go | 14 ++++++++------ client/worker_grpc.go | 8 ++++++++ samples/sequence_dts/sequence_dts.go | 6 +++++- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/backend/durabletaskscheduler/scheduler.go b/backend/durabletaskscheduler/scheduler.go index 89a2ff08..c9622e0d 100644 --- a/backend/durabletaskscheduler/scheduler.go +++ b/backend/durabletaskscheduler/scheduler.go @@ -315,8 +315,11 @@ func (be *Backend) ensureStarted() error { // Connection returns the underlying gRPC client connection. This can be used to create // a [client.TaskHubGrpcClient] for streaming work item processing. // Must be called after Start(). -func (be *Backend) Connection() grpc.ClientConnInterface { - return be.conn +func (be *Backend) Connection() (grpc.ClientConnInterface, error) { + if err := be.ensureStarted(); err != nil { + return nil, err + } + return be.conn, nil } func (be *Backend) createConnection() (*grpc.ClientConn, error) { @@ -342,12 +345,17 @@ func (be *Backend) createConnection() (*grpc.ClientConn, error) { } // Set up Azure credential if using DefaultAzure auth - if be.options.AuthType == AuthTypeDefaultAzure { + switch be.options.AuthType { + case AuthTypeDefaultAzure: cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { return nil, fmt.Errorf("failed to create Azure credential: %w", err) } be.credential = cred + case AuthTypeNone: + // No authentication needed + default: + return nil, fmt.Errorf("unsupported authentication type: %q", be.options.AuthType) } dialOpts := []grpc.DialOption{ @@ -356,7 +364,7 @@ func (be *Backend) createConnection() (*grpc.ClientConn, error) { grpc.WithStreamInterceptor(be.streamMetadataInterceptor()), } - conn, err := grpc.NewClient(target, dialOpts...) + conn, err := grpc.NewClient("dns:///"+target, dialOpts...) if err != nil { return nil, fmt.Errorf("failed to create gRPC connection: %w", err) } diff --git a/client/client_grpc.go b/client/client_grpc.go index 594d9644..af24e54c 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -18,12 +18,14 @@ import ( // REVIEW: Can this be merged with backend/client.go somehow? type TaskHubGrpcClient struct { - client protos.TaskHubSidecarServiceClient - logger backend.Logger - cancel context.CancelFunc - stop chan struct{} - stopOnce sync.Once - done chan struct{} + client protos.TaskHubSidecarServiceClient + logger backend.Logger + cancel context.CancelFunc + stop chan struct{} + stopOnce sync.Once + done chan struct{} + listenerStarted bool + mu sync.Mutex } // NewTaskHubGrpcClient creates a client that can be used to manage orchestrations over a gRPC connection. diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 5dcc091f..76d0722d 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -24,6 +24,14 @@ type workItemsStream interface { } func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.TaskRegistry) error { + c.mu.Lock() + if c.listenerStarted { + c.mu.Unlock() + return errors.New("work item listener already started") + } + c.listenerStarted = true + c.mu.Unlock() + executor := task.NewTaskExecutor(r) ctx, cancel := context.WithCancel(ctx) diff --git a/samples/sequence_dts/sequence_dts.go b/samples/sequence_dts/sequence_dts.go index 5e8a8110..535da10d 100644 --- a/samples/sequence_dts/sequence_dts.go +++ b/samples/sequence_dts/sequence_dts.go @@ -82,7 +82,11 @@ func Init(ctx context.Context, r *task.TaskRegistry) (*client.TaskHubGrpcClient, // Create the gRPC client and start the streaming work item listener. // DTS dispatches work items via a gRPC stream rather than backend polling. - grpcClient := client.NewTaskHubGrpcClient(be.Connection(), logger) + conn, err := be.Connection() + if err != nil { + return nil, nil, fmt.Errorf("failed to get backend connection: %w", err) + } + grpcClient := client.NewTaskHubGrpcClient(conn, logger) if err := grpcClient.StartWorkItemListener(ctx, r); err != nil { return nil, nil, fmt.Errorf("failed to start work item listener: %w", err) } From 202bdff2ce4b505d15431c87aff8c19b9aa97c5a Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Fri, 13 Mar 2026 18:59:46 -0700 Subject: [PATCH 5/6] Fix listener reuse in tests Signed-off-by: Hal Spang --- client/worker_grpc.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 76d0722d..d81c2987 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "sync" "time" "github.com/cenkalti/backoff/v4" @@ -232,6 +233,7 @@ func (c *TaskHubGrpcClient) processActivityWorkItem( // StopWorkItemListener gracefully stops the work item listener that was started by StartWorkItemListener. // The provided context controls how long to wait for the listener to shut down. // This must be called before closing the underlying gRPC connection to ensure a clean shutdown. +// After stopping, the listener can be started again with StartWorkItemListener. func (c *TaskHubGrpcClient) StopWorkItemListener(ctx context.Context) error { c.stopOnce.Do(func() { close(c.stop) @@ -244,12 +246,24 @@ func (c *TaskHubGrpcClient) StopWorkItemListener(ctx context.Context) error { } select { case <-c.done: + c.resetListenerState() return nil case <-ctx.Done(): return ctx.Err() } } +// resetListenerState resets the listener state so that StartWorkItemListener can be called again. +func (c *TaskHubGrpcClient) resetListenerState() { + c.mu.Lock() + defer c.mu.Unlock() + c.listenerStarted = false + c.cancel = nil + c.stop = make(chan struct{}) + c.stopOnce = sync.Once{} + c.done = make(chan struct{}) +} + func newInfiniteRetries() *backoff.ExponentialBackOff { b := backoff.NewExponentialBackOff() // max wait of 15 seconds between retries From 9fa0fcd1a58ca379c0257c0b21ec9ce04df11795 Mon Sep 17 00:00:00 2001 From: Hal Spang Date: Fri, 13 Mar 2026 19:44:39 -0700 Subject: [PATCH 6/6] Fix go directive, update worker/test Signed-off-by: Hal Spang --- client/worker_grpc.go | 9 ++++-- go.mod | 35 ++++++++++---------- go.sum | 72 ++++++++++++++++++++++------------------- tests/grpc/grpc_test.go | 12 +++++-- 4 files changed, 71 insertions(+), 57 deletions(-) diff --git a/client/worker_grpc.go b/client/worker_grpc.go index d81c2987..2f9e3e23 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -58,6 +58,7 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T err := initStream() if err != nil { cancel() + c.resetListenerState() return err } @@ -238,8 +239,12 @@ func (c *TaskHubGrpcClient) StopWorkItemListener(ctx context.Context) error { c.stopOnce.Do(func() { close(c.stop) }) - if c.cancel != nil { - c.cancel() + var cancel context.CancelFunc + c.mu.Lock() + cancel = c.cancel + c.mu.Unlock() + if cancel != nil { + cancel() } else { // StartWorkItemListener was never called; nothing to wait for. return nil diff --git a/go.mod b/go.mod index 3bdfc433..864a28f3 100644 --- a/go.mod +++ b/go.mod @@ -1,29 +1,28 @@ module github.com/microsoft/durabletask-go -go 1.24.0 +go 1.23.0 require ( - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 github.com/cenkalti/backoff/v4 v4.1.3 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.1 github.com/marusama/semaphore/v2 v2.5.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 - go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/exporters/zipkin v1.11.1 - go.opentelemetry.io/otel/sdk v1.39.0 - go.opentelemetry.io/otel/trace v1.39.0 - google.golang.org/grpc v1.79.2 + go.opentelemetry.io/otel/sdk v1.37.0 + go.opentelemetry.io/otel/trace v1.38.0 + google.golang.org/grpc v1.75.1 google.golang.org/protobuf v1.36.10 modernc.org/sqlite v1.22.1 ) require ( - github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect + github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.1 // indirect github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.3 // indirect @@ -41,15 +40,15 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/stretchr/objx v0.5.2 // indirect - go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel/metric v1.39.0 // indirect - golang.org/x/crypto v0.47.0 // indirect - golang.org/x/mod v0.31.0 // indirect - golang.org/x/net v0.49.0 // indirect - golang.org/x/sync v0.19.0 // indirect - golang.org/x/sys v0.40.0 // indirect - golang.org/x/text v0.33.0 // indirect - golang.org/x/tools v0.40.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/mod v0.26.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sync v0.15.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + golang.org/x/tools v0.34.0 // indirect google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/uint128 v1.2.0 // indirect diff --git a/go.sum b/go.sum index 8b836b28..3a83944e 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0 h1:fou+2+WFTib47nS+nz/ozhEBnvU96bKHy6LjRsY4E28= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.21.0/go.mod h1:t76Ruy8AHvUAC8GfMWJMa0ElSbuIcO03NLpynfbgsPA= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1 h1:Hk5QBxZQC1jb2Fwj6mpzme37xbCDdNTxU7O9eb5+LB4= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.13.1/go.mod h1:IYus9qsFobWIc2YVwe/WPjcnyCkPKtnHAqUYeebc8z0= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1 h1:Wc1ml6QlJs2BHQ/9Bqu1jiyggbsSjramq2oUmp5WeIo= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.18.1/go.mod h1:Ot/6aikWnKWi4l9QB7qVSwa8iMphQNqkWALMoNT3rzM= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1 h1:B+blDbyVIG3WaikNxPnhPiJ1MThR03b3vKGtER95TP4= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.10.1/go.mod h1:JdM5psgjfBf5fo2uWOZhflPWyDBZ/O/CNAH9CtsuZE4= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2 h1:yz1bePFlP5Vws5+8ez6T3HWXPmwOK7Yvq8QxDBD3SKY= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2/go.mod h1:Pa9ZNPuoNu/GztvBSKk9J1cDJW6vk/n0zLtV4mgd8N8= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 h1:9iefClla7iYpfYWdzPCRDozdmndjTm8DXdpCzPajMgA= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2/go.mod h1:XtLgD3ZD34DAaVIIAyG3objl5DynM3CQ/vMcbBNJZGI= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.1 h1:FPKJS1T+clwv+OLGt13a8UjqeRuh0O4SJ3lUriThc+4= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.1/go.mod h1:j2chePtV91HrC22tGoRX3sGY42uF13WzmmV80/OdVAA= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mod h1:tCcJZ0uHAmvjsVYzEFivsRTN00oz5BEsRgQHu5JZ9WE= github.com/AzureAD/microsoft-authentication-library-for-go v1.6.0 h1:XRzhVemXdgvJqCH0sFfrBUTnUJSBrBf7++ypk+twtRs= @@ -17,6 +17,8 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= @@ -66,6 +68,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmd github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= +github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -78,46 +82,46 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= -go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= -go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/auto/sdk v1.2.0 h1:YpRtUFjvhSymycLS2T81lT6IGhcUP+LUPtv0iv1N8bM= +go.opentelemetry.io/auto/sdk v1.2.0/go.mod h1:1deq2zL7rwjwC8mR7XgY2N+tlIl6pjmEUoLDENMEzwk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 h1:KfYpVmrjI7JuToy5k8XV3nkapjWx48k4E4JOtVstzQI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0/go.mod h1:SeQhzAEccGVZVEy7aH87Nh0km+utSpo1pTv6eMMop48= -go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= -go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= go.opentelemetry.io/otel/exporters/zipkin v1.11.1 h1:JlJ3/oQoyqlrPDCfsSVFcHgGeHvZq+hr1VPWtiYCXTo= go.opentelemetry.io/otel/exporters/zipkin v1.11.1/go.mod h1:T4S6aVwIS1+MHA+dJHCcPROtZe6ORwnv5vMKPRapsFw= -go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= -go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= -go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= -go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= -go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= -go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= -go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= -go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= -golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= -golang.org/x/mod v0.31.0 h1:HaW9xtz0+kOcWKwli0ZXy79Ix+UW/vOfmWI5QVd2tgI= -golang.org/x/mod v0.31.0/go.mod h1:43JraMp9cGx1Rx3AqioxrbrhNsLl2l/iNAvuBkrezpg= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= -golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= -golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/mod v0.26.0 h1:EGMPT//Ezu+ylkCijjPc+f4Aih7sZvaAr+O3EHBxvZg= +golang.org/x/mod v0.26.0/go.mod h1:/j6NAhSk8iQ723BGAUyoAcn7SlD7s15Dp9Nd/SfeaFQ= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= -golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= -golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= -golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= -golang.org/x/tools v0.40.0 h1:yLkxfA+Qnul4cs9QA3KnlFu0lVmd8JJfoq+E41uSutA= -golang.org/x/tools v0.40.0/go.mod h1:Ik/tzLRlbscWpqqMRjyWYDisX8bG13FrdXp3o4Sr9lc= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/tools v0.34.0 h1:qIpSLOxeCYGg9TrcJokLBG4KFA6d795g0xkBkiESGlo= +golang.org/x/tools v0.34.0/go.mod h1:pAP9OwEaY1CAW3HOmg3hLZC5Z0CCmzjAF2UQMSqNARg= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= -google.golang.org/grpc v1.79.2 h1:fRMD94s2tITpyJGtBBn7MkMseNpOZU8ZxgC3MMBaXRU= -google.golang.org/grpc v1.79.2/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/tests/grpc/grpc_test.go b/tests/grpc/grpc_test.go index a44a3035..25c08b8b 100644 --- a/tests/grpc/grpc_test.go +++ b/tests/grpc/grpc_test.go @@ -87,10 +87,17 @@ func TestMain(m *testing.M) { os.Exit(exitCode) //nolint:gocritic // os.Exit in TestMain is required by Go testing } -func startGrpcListener(t *testing.T, r *task.TaskRegistry) context.CancelFunc { +func startGrpcListener(t *testing.T, r *task.TaskRegistry) func() { cancelCtx, cancel := context.WithCancel(ctx) require.NoError(t, grpcClient.StartWorkItemListener(cancelCtx, r)) - return cancel + return func() { + stopCtx, stopCancel := context.WithTimeout(ctx, 5*time.Second) + defer stopCancel() + if err := grpcClient.StopWorkItemListener(stopCtx); err != nil { + t.Logf("failed to stop work item listener: %v", err) + cancel() + } + } } func Test_Grpc_WaitForInstanceStart_Timeout(t *testing.T) { @@ -134,7 +141,6 @@ func Test_Grpc_WaitForInstanceStart_ConnectionResume(t *testing.T) { assert.Contains(t, err.Error(), "context deadline exceeded") } cancelListener() - time.Sleep(2 * time.Second) // reconnect cancelListener = startGrpcListener(t, r)