From ea21b9bd9098dd0826cc2f17107e0ac0d1884d99 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 8 Apr 2026 17:19:26 -0400 Subject: [PATCH 1/8] Add chasm test engine for engine-backed tests --- chasm/chasmtest/test_engine.go | 351 ++++++++++++++++++++++++++++ chasm/lib/callback/tasks_test.go | 109 ++++----- chasm/lib/scheduler/handler_test.go | 64 +++-- 3 files changed, 434 insertions(+), 90 deletions(-) create mode 100644 chasm/chasmtest/test_engine.go diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go new file mode 100644 index 00000000000..3e056dc9115 --- /dev/null +++ b/chasm/chasmtest/test_engine.go @@ -0,0 +1,351 @@ +package chasmtest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.temporal.io/api/serviceerror" + persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/chasm" + "go.temporal.io/server/common/clock" + "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/metrics" + "go.temporal.io/server/common/testing/testlogger" +) + +type ( + Option[T chasm.Component] func(*Engine[T]) + + Engine[T chasm.Component] struct { + t *testing.T + registry *chasm.Registry + logger log.Logger + metrics metrics.Handler + + rootExecutionKey chasm.ExecutionKey + root T + rootRuntime *runtime + executions map[executionLookupKey]*runtime + } + + runtime struct { + key chasm.ExecutionKey + node *chasm.Node + backend *chasm.MockNodeBackend + timeSource *clock.EventTimeSource + root chasm.Component + } + + executionLookupKey struct { + namespaceID string + businessID string + } +) + +func NewEngine[T chasm.Component]( + t *testing.T, + registry *chasm.Registry, + opts ...Option[T], +) *Engine[T] { + t.Helper() + + e := &Engine[T]{ + t: t, + registry: registry, + logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), + metrics: metrics.NoopMetricsHandler, + rootExecutionKey: chasm.ExecutionKey{ + NamespaceID: "test-namespace-id", + BusinessID: "test-workflow-id", + RunID: "test-run-id", + }, + executions: make(map[executionLookupKey]*runtime), + } + + for _, opt := range opts { + opt(e) + } + + return e +} + +func WithRoot[T chasm.Component]( + factory func(chasm.MutableContext) T, +) Option[T] { + return func(e *Engine[T]) { + runtime := e.newRuntime(e.rootExecutionKey) + ctx := chasm.NewMutableContext(context.Background(), runtime.node) + root := factory(ctx) + require.NoError(e.t, runtime.node.SetRootComponent(root)) + _, err := runtime.node.CloseTransaction() + require.NoError(e.t, err) + e.root = root + runtime.root = root + e.rootRuntime = runtime + e.executions[newExecutionLookupKey(runtime.key)] = runtime + } +} + +func WithExecutionKey[T chasm.Component](key chasm.ExecutionKey) Option[T] { + return func(e *Engine[T]) { + e.rootExecutionKey = key + } +} + +func (e *Engine[T]) Root() T { + return e.root +} + +func (e *Engine[T]) EngineContext() context.Context { + return chasm.NewEngineContext(context.Background(), e) +} + +func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef { + for _, runtime := range e.executions { + ref, err := runtime.node.Ref(component) + if err != nil { + continue + } + structuredRef, err := chasm.DeserializeComponentRef(ref) + require.NoError(e.t, err) + return structuredRef + } + + e.t.Fatalf("component %T is not attached to the chasmtest engine", component) + return chasm.ComponentRef{} +} + +func (e *Engine[T]) StartExecution( + ctx context.Context, + ref chasm.ComponentRef, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + _ ...chasm.TransitionOption, +) (chasm.StartExecutionResult, error) { + if _, ok := e.executionForKey(ref.ExecutionKey); ok { + return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID) + } + + runtime := e.newRuntime(ref.ExecutionKey) + mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + root, err := startFn(mutableCtx) + if err != nil { + return chasm.StartExecutionResult{}, err + } + if err := runtime.node.SetRootComponent(root); err != nil { + return chasm.StartExecutionResult{}, err + } + _, err = runtime.node.CloseTransaction() + if err != nil { + return chasm.StartExecutionResult{}, err + } + + runtime.root = root + e.executions[newExecutionLookupKey(runtime.key)] = runtime + + serializedRef, err := runtime.node.Ref(root) + if err != nil { + return chasm.StartExecutionResult{}, err + } + + return chasm.StartExecutionResult{ + ExecutionKey: runtime.key, + ExecutionRef: serializedRef, + Created: true, + }, nil +} + +func (e *Engine[T]) UpdateWithStartExecution( + ctx context.Context, + ref chasm.ComponentRef, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + updateFn func(chasm.MutableContext, chasm.Component) error, + _ ...chasm.TransitionOption, +) (chasm.EngineUpdateWithStartExecutionResult, error) { + if runtime, ok := e.executionForKey(ref.ExecutionKey); ok { + serializedRef, err := e.updateComponentInRuntime(ctx, runtime, ref, updateFn) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + return chasm.EngineUpdateWithStartExecutionResult{ + ExecutionKey: runtime.key, + ExecutionRef: serializedRef, + Created: false, + }, nil + } + + runtime := e.newRuntime(ref.ExecutionKey) + mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + root, err := startFn(mutableCtx) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + if err := runtime.node.SetRootComponent(root); err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + if err := updateFn(mutableCtx, root); err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + _, err = runtime.node.CloseTransaction() + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + + runtime.root = root + e.executions[newExecutionLookupKey(runtime.key)] = runtime + + serializedRef, err := runtime.node.Ref(root) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + + return chasm.EngineUpdateWithStartExecutionResult{ + ExecutionKey: runtime.key, + ExecutionRef: serializedRef, + Created: true, + }, nil +} + +func (e *Engine[T]) UpdateComponent( + ctx context.Context, + ref chasm.ComponentRef, + updateFn func(chasm.MutableContext, chasm.Component) error, + _ ...chasm.TransitionOption, +) ([]byte, error) { + runtime, err := e.mustExecutionForRef(ref) + if err != nil { + return nil, err + } + return e.updateComponentInRuntime(ctx, runtime, ref, updateFn) +} + +func (e *Engine[T]) ReadComponent( + ctx context.Context, + ref chasm.ComponentRef, + readFn func(chasm.Context, chasm.Component) error, + _ ...chasm.TransitionOption, +) error { + runtime, err := e.mustExecutionForRef(ref) + if err != nil { + return err + } + + component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref) + if err != nil { + return err + } + + readCtx := chasm.NewContext(ctx, runtime.node) + return readFn(readCtx, component) +} + +func (e *Engine[T]) PollComponent( + context.Context, + chasm.ComponentRef, + func(chasm.Context, chasm.Component) (bool, error), + ...chasm.TransitionOption, +) ([]byte, error) { + return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent") +} + +func (e *Engine[T]) DeleteExecution( + context.Context, + chasm.ComponentRef, + chasm.DeleteExecutionRequest, +) error { + return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution") +} + +func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {} + +func (e *Engine[T]) newRuntime(key chasm.ExecutionKey) *runtime { + key = normalizeExecutionKey(key) + timeSource := clock.NewEventTimeSource() + timeSource.Update(time.Now()) + backend := &chasm.MockNodeBackend{ + HandleNextTransitionCount: func() int64 { return 2 }, + HandleGetCurrentVersion: func() int64 { return 1 }, + HandleGetWorkflowKey: func() definition.WorkflowKey { + return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID) + }, + HandleIsWorkflow: func() bool { return false }, + HandleCurrentVersionedTransition: func() *persistencespb.VersionedTransition { + return &persistencespb.VersionedTransition{ + NamespaceFailoverVersion: 1, + TransitionCount: 1, + } + }, + } + return &runtime{ + key: key, + backend: backend, + timeSource: timeSource, + node: chasm.NewEmptyTree( + e.registry, + timeSource, + backend, + chasm.DefaultPathEncoder, + e.logger, + e.metrics, + ), + } +} + +func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*runtime, bool) { + runtime, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] + return runtime, ok +} + +func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*runtime, error) { + runtime, ok := e.executionForKey(ref.ExecutionKey) + if !ok { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), + ) + } + return runtime, nil +} + +func (e *Engine[T]) updateComponentInRuntime( + ctx context.Context, + runtime *runtime, + ref chasm.ComponentRef, + updateFn func(chasm.MutableContext, chasm.Component) error, +) ([]byte, error) { + component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref) + if err != nil { + return nil, err + } + + mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + if err := updateFn(mutableCtx, component); err != nil { + return nil, err + } + + _, err = runtime.node.CloseTransaction() + if err != nil { + return nil, err + } + + return mutableCtx.Ref(component) +} + +func newExecutionLookupKey(key chasm.ExecutionKey) executionLookupKey { + return executionLookupKey{ + namespaceID: key.NamespaceID, + businessID: key.BusinessID, + } +} + +func normalizeExecutionKey(key chasm.ExecutionKey) chasm.ExecutionKey { + if key.NamespaceID == "" { + key.NamespaceID = "test-namespace-id" + } + if key.BusinessID == "" { + key.BusinessID = "test-workflow-id" + } + return key +} diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index c701536ba0b..f63767aae6e 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -15,6 +15,7 @@ import ( "go.temporal.io/server/api/historyservicemock/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" + "go.temporal.io/server/chasm/chasmtest" callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/clock" @@ -68,6 +69,26 @@ func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComp } } +func readCallbackFromEngine( + t *testing.T, + testEngine *chasmtest.Engine[*mockNexusCompletionGetterComponent], +) *Callback { + t.Helper() + + var callback *Callback + _, err := chasm.ReadComponent[*mockNexusCompletionGetterComponent, chasm.ComponentRef, any, chasm.NoValue]( + testEngine.EngineContext(), + testEngine.Ref(testEngine.Root()), + func(root *mockNexusCompletionGetterComponent, ctx chasm.Context, _ any) (chasm.NoValue, error) { + callback = root.Callback.Get(ctx) + return nil, nil + }, + nil, + ) + require.NoError(t, err) + return callback +} + // Test the full executeInvocationTask flow with direct handler calls func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { cases := []struct { @@ -157,17 +178,13 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { metrics.DestinationTag("http://localhost"), metrics.OutcomeTag(tc.expectedMetricOutcome)) - // Setup logger and time source + // Setup logger logger := log.NewTestLogger() - timeSource := clock.NewEventTimeSource() - timeSource.Update(time.Now()) // Create task handler with mock namespace registry nsRegistry := namespace.NewMockRegistry(ctrl) nsRegistry.EXPECT().GetNamespaceByID(gomock.Any()).Return(ns, nil) - // Create mock engine - mockEngine := chasm.NewMockEngine(ctrl) handler := &invocationTaskHandler{ config: &Config{ RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second), @@ -191,13 +208,10 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { err = chasmRegistry.Register(&mockNexusCompletionGetterLibrary{}) require.NoError(t, err) - nodeBackend := &chasm.MockNodeBackend{} - root := chasm.NewEmptyTree(chasmRegistry, timeSource, nodeBackend, chasm.DefaultPathEncoder, logger, metricsHandler) - callback := &Callback{ CallbackState: &callbackspb.CallbackState{ RequestId: "request-id", - RegistrationTime: timestamppb.New(timeSource.Now()), + RegistrationTime: timestamppb.New(time.Now()), Callback: &callbackspb.Callback{ Variant: &callbackspb.Callback_Nexus_{ Nexus: &callbackspb.Callback_Nexus{ @@ -213,72 +227,31 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { // Create completion completion := nexusrpc.CompleteOperationOptions{} - // Set up the CompletionSource field to return our mock completion - require.NoError(t, root.SetRootComponent(&mockNexusCompletionGetterComponent{ - completion: completion, - // Create callback in SCHEDULED state - Callback: chasm.NewComponentField( - chasm.NewMutableContext(context.Background(), root), - callback, - ), - })) - _, err = root.CloseTransaction() - require.NoError(t, err) - - // Setup engine expectations to directly call handler logic with MockMutableContext - mockEngine.EXPECT().ReadComponent( - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component) error, opts ...chasm.TransitionOption) error { - mockCtx := &chasm.MockContext{ - HandleNow: func(component chasm.Component) time.Time { - return timeSource.Now() - }, - HandleRef: func(component chasm.Component) ([]byte, error) { - return []byte{}, nil - }, - } - return readFn(mockCtx, callback) - }) - - mockEngine.EXPECT().UpdateComponent( - gomock.Any(), - gomock.Any(), - gomock.Any(), - ).DoAndReturn(func(ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, opts ...chasm.TransitionOption) ([]any, error) { - mockCtx := &chasm.MockMutableContext{ - MockContext: chasm.MockContext{ - HandleNow: func(component chasm.Component) time.Time { - return timeSource.Now() - }, - HandleRef: func(component chasm.Component) ([]byte, error) { - return []byte{}, nil - }, - }, - } - err := updateFn(mockCtx, callback) - return nil, err - }) - - // Create ComponentRef - ref := chasm.NewComponentRef[*Callback](chasm.ExecutionKey{ - NamespaceID: "namespace-id", - BusinessID: "workflow-id", - RunID: "run-id", - }) + testEngine := chasmtest.NewEngine( + t, + chasmRegistry, + chasmtest.WithExecutionKey[*mockNexusCompletionGetterComponent](chasm.ExecutionKey{ + NamespaceID: "namespace-id", + BusinessID: "workflow-id", + RunID: "run-id", + }), + chasmtest.WithRoot(func(ctx chasm.MutableContext) *mockNexusCompletionGetterComponent { + return &mockNexusCompletionGetterComponent{ + completion: completion, + Callback: chasm.NewComponentField(ctx, callback), + } + }), + ) - // Execute with engine context - engineCtx := chasm.NewEngineContext(context.Background(), mockEngine) err = handler.Execute( - engineCtx, - ref, + testEngine.EngineContext(), + testEngine.Ref(callback), chasm.TaskAttributes{Destination: "http://localhost"}, &callbackspb.InvocationTask{Attempt: 0}, ) // Verify the outcome and tasks - tc.assertOutcome(t, callback, err) + tc.assertOutcome(t, readCallbackFromEngine(t, testEngine), err) }) } } diff --git a/chasm/lib/scheduler/handler_test.go b/chasm/lib/scheduler/handler_test.go index 03ca797714c..e04f3ae7e06 100644 --- a/chasm/lib/scheduler/handler_test.go +++ b/chasm/lib/scheduler/handler_test.go @@ -7,8 +7,10 @@ import ( "go.temporal.io/api/serviceerror" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/chasm" + "go.temporal.io/server/chasm/chasmtest" "go.temporal.io/server/chasm/lib/scheduler" schedulerpb "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" + "go.temporal.io/server/common/log" legacyscheduler "go.temporal.io/server/service/worker/scheduler" "go.uber.org/mock/gomock" "google.golang.org/protobuf/types/known/timestamppb" @@ -108,19 +110,26 @@ func TestSentinelHandler_MigrateToWorkflow(t *testing.T) { } func TestHandler_CreateFromMigrationState_Sentinel(t *testing.T) { - env := newTestEnv(t, withMockEngine()) - sentinel, ctx, _ := setupSentinelForTest(t) - - h := scheduler.NewTestHandler(env.Logger) - - // StartExecution returns already-started because the sentinel occupies the key. - env.MockEngine.EXPECT().StartExecution(gomock.Any(), gomock.Any(), gomock.Any()). - Return(chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", "")) - - // ReadComponent invokes the read function with the sentinel. - env.ExpectReadComponent(ctx, sentinel) - - engineCtx := env.EngineContext() + ctrl := gomock.NewController(t) + logger := log.NewTestLogger() + registry := chasm.NewRegistry(logger) + require.NoError(t, registry.Register(&chasm.CoreLibrary{})) + require.NoError(t, registry.Register(newTestLibrary(logger, newRealSpecProcessor(ctrl, logger)))) + + h := scheduler.NewTestHandler(logger) + testEngine := chasmtest.NewEngine( + t, + registry, + chasmtest.WithExecutionKey[*scheduler.Scheduler](chasm.ExecutionKey{ + NamespaceID: namespaceID, + BusinessID: scheduleID, + }), + chasmtest.WithRoot(func(ctx chasm.MutableContext) *scheduler.Scheduler { + return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID) + }), + ) + + engineCtx := testEngine.EngineContext() _, err := h.TestCreateFromMigrationState(engineCtx, &schedulerpb.CreateFromMigrationStateRequest{ NamespaceId: namespaceID, State: &schedulerpb.SchedulerMigrationState{ @@ -137,15 +146,26 @@ func TestHandler_CreateFromMigrationState_Sentinel(t *testing.T) { } func TestHandler_MigrateToWorkflow_Sentinel(t *testing.T) { - env := newTestEnv(t, withMockEngine()) - sentinel, ctx, _ := setupSentinelForTest(t) - - h := scheduler.NewTestHandler(env.Logger) - - // UpdateComponent invokes the update function with the sentinel. - env.ExpectUpdateComponent(ctx, sentinel) - - engineCtx := env.EngineContext() + ctrl := gomock.NewController(t) + logger := log.NewTestLogger() + registry := chasm.NewRegistry(logger) + require.NoError(t, registry.Register(&chasm.CoreLibrary{})) + require.NoError(t, registry.Register(newTestLibrary(logger, newRealSpecProcessor(ctrl, logger)))) + + h := scheduler.NewTestHandler(logger) + testEngine := chasmtest.NewEngine( + t, + registry, + chasmtest.WithExecutionKey[*scheduler.Scheduler](chasm.ExecutionKey{ + NamespaceID: namespaceID, + BusinessID: scheduleID, + }), + chasmtest.WithRoot(func(ctx chasm.MutableContext) *scheduler.Scheduler { + return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID) + }), + ) + + engineCtx := testEngine.EngineContext() _, err := h.TestMigrateToWorkflow(engineCtx, &schedulerpb.MigrateToWorkflowRequest{ NamespaceId: namespaceID, ScheduleId: scheduleID, From 171cdf001d78fa52452d1dff13c5088913a4fdef Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Thu, 9 Apr 2026 11:39:43 -0400 Subject: [PATCH 2/8] Fix test engine root component constraint --- chasm/chasmtest/test_engine.go | 112 +++++++++++++++---------------- chasm/lib/callback/tasks_test.go | 11 +++ 2 files changed, 67 insertions(+), 56 deletions(-) diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 3e056dc9115..730541b6445 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -18,9 +18,9 @@ import ( ) type ( - Option[T chasm.Component] func(*Engine[T]) + Option[T chasm.RootComponent] func(*Engine[T]) - Engine[T chasm.Component] struct { + Engine[T chasm.RootComponent] struct { t *testing.T registry *chasm.Registry logger log.Logger @@ -28,16 +28,16 @@ type ( rootExecutionKey chasm.ExecutionKey root T - rootRuntime *runtime - executions map[executionLookupKey]*runtime + rootExecution *execution + executions map[executionLookupKey]*execution } - runtime struct { + execution struct { key chasm.ExecutionKey node *chasm.Node backend *chasm.MockNodeBackend timeSource *clock.EventTimeSource - root chasm.Component + root chasm.RootComponent } executionLookupKey struct { @@ -46,7 +46,7 @@ type ( } ) -func NewEngine[T chasm.Component]( +func NewEngine[T chasm.RootComponent]( t *testing.T, registry *chasm.Registry, opts ...Option[T], @@ -63,7 +63,7 @@ func NewEngine[T chasm.Component]( BusinessID: "test-workflow-id", RunID: "test-run-id", }, - executions: make(map[executionLookupKey]*runtime), + executions: make(map[executionLookupKey]*execution), } for _, opt := range opts { @@ -73,24 +73,24 @@ func NewEngine[T chasm.Component]( return e } -func WithRoot[T chasm.Component]( +func WithRoot[T chasm.RootComponent]( factory func(chasm.MutableContext) T, ) Option[T] { return func(e *Engine[T]) { - runtime := e.newRuntime(e.rootExecutionKey) - ctx := chasm.NewMutableContext(context.Background(), runtime.node) + execution := e.newExecution(e.rootExecutionKey) + ctx := chasm.NewMutableContext(context.Background(), execution.node) root := factory(ctx) - require.NoError(e.t, runtime.node.SetRootComponent(root)) - _, err := runtime.node.CloseTransaction() + require.NoError(e.t, execution.node.SetRootComponent(root)) + _, err := execution.node.CloseTransaction() require.NoError(e.t, err) e.root = root - runtime.root = root - e.rootRuntime = runtime - e.executions[newExecutionLookupKey(runtime.key)] = runtime + execution.root = root + e.rootExecution = execution + e.executions[newExecutionLookupKey(execution.key)] = execution } } -func WithExecutionKey[T chasm.Component](key chasm.ExecutionKey) Option[T] { +func WithExecutionKey[T chasm.RootComponent](key chasm.ExecutionKey) Option[T] { return func(e *Engine[T]) { e.rootExecutionKey = key } @@ -105,8 +105,8 @@ func (e *Engine[T]) EngineContext() context.Context { } func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef { - for _, runtime := range e.executions { - ref, err := runtime.node.Ref(component) + for _, execution := range e.executions { + ref, err := execution.node.Ref(component) if err != nil { continue } @@ -129,30 +129,30 @@ func (e *Engine[T]) StartExecution( return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID) } - runtime := e.newRuntime(ref.ExecutionKey) - mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + execution := e.newExecution(ref.ExecutionKey) + mutableCtx := chasm.NewMutableContext(ctx, execution.node) root, err := startFn(mutableCtx) if err != nil { return chasm.StartExecutionResult{}, err } - if err := runtime.node.SetRootComponent(root); err != nil { + if err := execution.node.SetRootComponent(root); err != nil { return chasm.StartExecutionResult{}, err } - _, err = runtime.node.CloseTransaction() + _, err = execution.node.CloseTransaction() if err != nil { return chasm.StartExecutionResult{}, err } - runtime.root = root - e.executions[newExecutionLookupKey(runtime.key)] = runtime + execution.root = root + e.executions[newExecutionLookupKey(execution.key)] = execution - serializedRef, err := runtime.node.Ref(root) + serializedRef, err := execution.node.Ref(root) if err != nil { return chasm.StartExecutionResult{}, err } return chasm.StartExecutionResult{ - ExecutionKey: runtime.key, + ExecutionKey: execution.key, ExecutionRef: serializedRef, Created: true, }, nil @@ -165,45 +165,45 @@ func (e *Engine[T]) UpdateWithStartExecution( updateFn func(chasm.MutableContext, chasm.Component) error, _ ...chasm.TransitionOption, ) (chasm.EngineUpdateWithStartExecutionResult, error) { - if runtime, ok := e.executionForKey(ref.ExecutionKey); ok { - serializedRef, err := e.updateComponentInRuntime(ctx, runtime, ref, updateFn) + if execution, ok := e.executionForKey(ref.ExecutionKey); ok { + serializedRef, err := e.updateComponentInExecution(ctx, execution, ref, updateFn) if err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } return chasm.EngineUpdateWithStartExecutionResult{ - ExecutionKey: runtime.key, + ExecutionKey: execution.key, ExecutionRef: serializedRef, Created: false, }, nil } - runtime := e.newRuntime(ref.ExecutionKey) - mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + execution := e.newExecution(ref.ExecutionKey) + mutableCtx := chasm.NewMutableContext(ctx, execution.node) root, err := startFn(mutableCtx) if err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } - if err := runtime.node.SetRootComponent(root); err != nil { + if err := execution.node.SetRootComponent(root); err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } if err := updateFn(mutableCtx, root); err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } - _, err = runtime.node.CloseTransaction() + _, err = execution.node.CloseTransaction() if err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } - runtime.root = root - e.executions[newExecutionLookupKey(runtime.key)] = runtime + execution.root = root + e.executions[newExecutionLookupKey(execution.key)] = execution - serializedRef, err := runtime.node.Ref(root) + serializedRef, err := execution.node.Ref(root) if err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err } return chasm.EngineUpdateWithStartExecutionResult{ - ExecutionKey: runtime.key, + ExecutionKey: execution.key, ExecutionRef: serializedRef, Created: true, }, nil @@ -215,11 +215,11 @@ func (e *Engine[T]) UpdateComponent( updateFn func(chasm.MutableContext, chasm.Component) error, _ ...chasm.TransitionOption, ) ([]byte, error) { - runtime, err := e.mustExecutionForRef(ref) + execution, err := e.mustExecutionForRef(ref) if err != nil { return nil, err } - return e.updateComponentInRuntime(ctx, runtime, ref, updateFn) + return e.updateComponentInExecution(ctx, execution, ref, updateFn) } func (e *Engine[T]) ReadComponent( @@ -228,17 +228,17 @@ func (e *Engine[T]) ReadComponent( readFn func(chasm.Context, chasm.Component) error, _ ...chasm.TransitionOption, ) error { - runtime, err := e.mustExecutionForRef(ref) + execution, err := e.mustExecutionForRef(ref) if err != nil { return err } - component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref) + component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) if err != nil { return err } - readCtx := chasm.NewContext(ctx, runtime.node) + readCtx := chasm.NewContext(ctx, execution.node) return readFn(readCtx, component) } @@ -261,7 +261,7 @@ func (e *Engine[T]) DeleteExecution( func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {} -func (e *Engine[T]) newRuntime(key chasm.ExecutionKey) *runtime { +func (e *Engine[T]) newExecution(key chasm.ExecutionKey) *execution { key = normalizeExecutionKey(key) timeSource := clock.NewEventTimeSource() timeSource.Update(time.Now()) @@ -279,7 +279,7 @@ func (e *Engine[T]) newRuntime(key chasm.ExecutionKey) *runtime { } }, } - return &runtime{ + return &execution{ key: key, backend: backend, timeSource: timeSource, @@ -294,38 +294,38 @@ func (e *Engine[T]) newRuntime(key chasm.ExecutionKey) *runtime { } } -func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*runtime, bool) { - runtime, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] - return runtime, ok +func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*execution, bool) { + execution, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] + return execution, ok } -func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*runtime, error) { - runtime, ok := e.executionForKey(ref.ExecutionKey) +func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { + execution, ok := e.executionForKey(ref.ExecutionKey) if !ok { return nil, serviceerror.NewNotFound( fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), ) } - return runtime, nil + return execution, nil } -func (e *Engine[T]) updateComponentInRuntime( +func (e *Engine[T]) updateComponentInExecution( ctx context.Context, - runtime *runtime, + execution *execution, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, ) ([]byte, error) { - component, err := runtime.node.Component(chasm.NewContext(ctx, runtime.node), ref) + component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) if err != nil { return nil, err } - mutableCtx := chasm.NewMutableContext(ctx, runtime.node) + mutableCtx := chasm.NewMutableContext(ctx, execution.node) if err := updateFn(mutableCtx, component); err != nil { return nil, err } - _, err = runtime.node.CloseTransaction() + _, err = execution.node.CloseTransaction() if err != nil { return nil, err } diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index f63767aae6e..bb2ac4d05f1 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -55,6 +55,17 @@ func (m *mockNexusCompletionGetterComponent) LifecycleState(_ chasm.Context) cha return chasm.LifecycleStateRunning } +func (m *mockNexusCompletionGetterComponent) ContextMetadata(chasm.Context) map[string]string { + return nil +} + +func (m *mockNexusCompletionGetterComponent) Terminate( + chasm.MutableContext, + chasm.TerminateComponentRequest, +) (chasm.TerminateComponentResponse, error) { + return chasm.TerminateComponentResponse{}, nil +} + type mockNexusCompletionGetterLibrary struct { chasm.UnimplementedLibrary } From 267e89644056f6fa659eb12d2d7b75736797652b Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 10 Apr 2026 11:13:08 -0400 Subject: [PATCH 3/8] Remove unnecessary helper to create Root --- chasm/chasmtest/test_engine.go | 96 ++++++++++------------------- chasm/lib/callback/tasks_test.go | 35 ++++++----- chasm/lib/scheduler/handler_test.go | 40 ++++++------ 3 files changed, 72 insertions(+), 99 deletions(-) diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 730541b6445..a10a0e29381 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -18,18 +18,14 @@ import ( ) type ( - Option[T chasm.RootComponent] func(*Engine[T]) - - Engine[T chasm.RootComponent] struct { - t *testing.T - registry *chasm.Registry - logger log.Logger - metrics metrics.Handler - - rootExecutionKey chasm.ExecutionKey - root T - rootExecution *execution - executions map[executionLookupKey]*execution + EngineOption func(*Engine) + + Engine struct { + t *testing.T + registry *chasm.Registry + logger log.Logger + metrics metrics.Handler + executions map[executionLookupKey]*execution } execution struct { @@ -46,23 +42,20 @@ type ( } ) -func NewEngine[T chasm.RootComponent]( +var _ chasm.Engine = (*Engine)(nil) + +func NewEngine( t *testing.T, registry *chasm.Registry, - opts ...Option[T], -) *Engine[T] { + opts ...EngineOption, +) *Engine { t.Helper() - e := &Engine[T]{ - t: t, - registry: registry, - logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), - metrics: metrics.NoopMetricsHandler, - rootExecutionKey: chasm.ExecutionKey{ - NamespaceID: "test-namespace-id", - BusinessID: "test-workflow-id", - RunID: "test-run-id", - }, + e := &Engine{ + t: t, + registry: registry, + logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), + metrics: metrics.NoopMetricsHandler, executions: make(map[executionLookupKey]*execution), } @@ -73,38 +66,11 @@ func NewEngine[T chasm.RootComponent]( return e } -func WithRoot[T chasm.RootComponent]( - factory func(chasm.MutableContext) T, -) Option[T] { - return func(e *Engine[T]) { - execution := e.newExecution(e.rootExecutionKey) - ctx := chasm.NewMutableContext(context.Background(), execution.node) - root := factory(ctx) - require.NoError(e.t, execution.node.SetRootComponent(root)) - _, err := execution.node.CloseTransaction() - require.NoError(e.t, err) - e.root = root - execution.root = root - e.rootExecution = execution - e.executions[newExecutionLookupKey(execution.key)] = execution - } -} - -func WithExecutionKey[T chasm.RootComponent](key chasm.ExecutionKey) Option[T] { - return func(e *Engine[T]) { - e.rootExecutionKey = key - } -} - -func (e *Engine[T]) Root() T { - return e.root -} - -func (e *Engine[T]) EngineContext() context.Context { +func (e *Engine) EngineContext() context.Context { return chasm.NewEngineContext(context.Background(), e) } -func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef { +func (e *Engine) Ref(component chasm.Component) chasm.ComponentRef { for _, execution := range e.executions { ref, err := execution.node.Ref(component) if err != nil { @@ -119,7 +85,7 @@ func (e *Engine[T]) Ref(component chasm.Component) chasm.ComponentRef { return chasm.ComponentRef{} } -func (e *Engine[T]) StartExecution( +func (e *Engine) StartExecution( ctx context.Context, ref chasm.ComponentRef, startFn func(chasm.MutableContext) (chasm.RootComponent, error), @@ -158,7 +124,7 @@ func (e *Engine[T]) StartExecution( }, nil } -func (e *Engine[T]) UpdateWithStartExecution( +func (e *Engine) UpdateWithStartExecution( ctx context.Context, ref chasm.ComponentRef, startFn func(chasm.MutableContext) (chasm.RootComponent, error), @@ -209,7 +175,7 @@ func (e *Engine[T]) UpdateWithStartExecution( }, nil } -func (e *Engine[T]) UpdateComponent( +func (e *Engine) UpdateComponent( ctx context.Context, ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, @@ -222,7 +188,7 @@ func (e *Engine[T]) UpdateComponent( return e.updateComponentInExecution(ctx, execution, ref, updateFn) } -func (e *Engine[T]) ReadComponent( +func (e *Engine) ReadComponent( ctx context.Context, ref chasm.ComponentRef, readFn func(chasm.Context, chasm.Component) error, @@ -242,7 +208,7 @@ func (e *Engine[T]) ReadComponent( return readFn(readCtx, component) } -func (e *Engine[T]) PollComponent( +func (e *Engine) PollComponent( context.Context, chasm.ComponentRef, func(chasm.Context, chasm.Component) (bool, error), @@ -251,7 +217,7 @@ func (e *Engine[T]) PollComponent( return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent") } -func (e *Engine[T]) DeleteExecution( +func (e *Engine) DeleteExecution( context.Context, chasm.ComponentRef, chasm.DeleteExecutionRequest, @@ -259,9 +225,9 @@ func (e *Engine[T]) DeleteExecution( return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution") } -func (e *Engine[T]) NotifyExecution(chasm.ExecutionKey) {} +func (e *Engine) NotifyExecution(chasm.ExecutionKey) {} -func (e *Engine[T]) newExecution(key chasm.ExecutionKey) *execution { +func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { key = normalizeExecutionKey(key) timeSource := clock.NewEventTimeSource() timeSource.Update(time.Now()) @@ -294,12 +260,12 @@ func (e *Engine[T]) newExecution(key chasm.ExecutionKey) *execution { } } -func (e *Engine[T]) executionForKey(key chasm.ExecutionKey) (*execution, bool) { +func (e *Engine) executionForKey(key chasm.ExecutionKey) (*execution, bool) { execution, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] return execution, ok } -func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { +func (e *Engine) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { execution, ok := e.executionForKey(ref.ExecutionKey) if !ok { return nil, serviceerror.NewNotFound( @@ -309,7 +275,7 @@ func (e *Engine[T]) mustExecutionForRef(ref chasm.ComponentRef) (*execution, err return execution, nil } -func (e *Engine[T]) updateComponentInExecution( +func (e *Engine) updateComponentInExecution( ctx context.Context, execution *execution, ref chasm.ComponentRef, diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index bb2ac4d05f1..ad914c0b299 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -82,22 +82,23 @@ func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComp func readCallbackFromEngine( t *testing.T, - testEngine *chasmtest.Engine[*mockNexusCompletionGetterComponent], + testEngine *chasmtest.Engine, + callback *Callback, ) *Callback { t.Helper() - var callback *Callback - _, err := chasm.ReadComponent[*mockNexusCompletionGetterComponent, chasm.ComponentRef, any, chasm.NoValue]( + var readCallback *Callback + _, err := chasm.ReadComponent[*Callback, chasm.ComponentRef, any, chasm.NoValue]( testEngine.EngineContext(), - testEngine.Ref(testEngine.Root()), - func(root *mockNexusCompletionGetterComponent, ctx chasm.Context, _ any) (chasm.NoValue, error) { - callback = root.Callback.Get(ctx) + testEngine.Ref(callback), + func(cb *Callback, _ chasm.Context, _ any) (chasm.NoValue, error) { + readCallback = cb return nil, nil }, nil, ) require.NoError(t, err) - return callback + return readCallback } // Test the full executeInvocationTask flow with direct handler calls @@ -238,21 +239,23 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { // Create completion completion := nexusrpc.CompleteOperationOptions{} - testEngine := chasmtest.NewEngine( - t, - chasmRegistry, - chasmtest.WithExecutionKey[*mockNexusCompletionGetterComponent](chasm.ExecutionKey{ + testEngine := chasmtest.NewEngine(t, chasmRegistry) + _, err = chasm.StartExecution[*mockNexusCompletionGetterComponent, struct{}]( + testEngine.EngineContext(), + chasm.ExecutionKey{ NamespaceID: "namespace-id", BusinessID: "workflow-id", RunID: "run-id", - }), - chasmtest.WithRoot(func(ctx chasm.MutableContext) *mockNexusCompletionGetterComponent { + }, + func(ctx chasm.MutableContext, _ struct{}) (*mockNexusCompletionGetterComponent, error) { return &mockNexusCompletionGetterComponent{ completion: completion, Callback: chasm.NewComponentField(ctx, callback), - } - }), + }, nil + }, + struct{}{}, ) + require.NoError(t, err) err = handler.Execute( testEngine.EngineContext(), @@ -262,7 +265,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { ) // Verify the outcome and tasks - tc.assertOutcome(t, readCallbackFromEngine(t, testEngine), err) + tc.assertOutcome(t, readCallbackFromEngine(t, testEngine, callback), err) }) } } diff --git a/chasm/lib/scheduler/handler_test.go b/chasm/lib/scheduler/handler_test.go index e04f3ae7e06..31b494d5371 100644 --- a/chasm/lib/scheduler/handler_test.go +++ b/chasm/lib/scheduler/handler_test.go @@ -117,20 +117,22 @@ func TestHandler_CreateFromMigrationState_Sentinel(t *testing.T) { require.NoError(t, registry.Register(newTestLibrary(logger, newRealSpecProcessor(ctrl, logger)))) h := scheduler.NewTestHandler(logger) - testEngine := chasmtest.NewEngine( - t, - registry, - chasmtest.WithExecutionKey[*scheduler.Scheduler](chasm.ExecutionKey{ + testEngine := chasmtest.NewEngine(t, registry) + _, err := chasm.StartExecution[*scheduler.Scheduler, struct{}]( + testEngine.EngineContext(), + chasm.ExecutionKey{ NamespaceID: namespaceID, BusinessID: scheduleID, - }), - chasmtest.WithRoot(func(ctx chasm.MutableContext) *scheduler.Scheduler { - return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID) - }), + }, + func(ctx chasm.MutableContext, _ struct{}) (*scheduler.Scheduler, error) { + return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID), nil + }, + struct{}{}, ) + require.NoError(t, err) engineCtx := testEngine.EngineContext() - _, err := h.TestCreateFromMigrationState(engineCtx, &schedulerpb.CreateFromMigrationStateRequest{ + _, err = h.TestCreateFromMigrationState(engineCtx, &schedulerpb.CreateFromMigrationStateRequest{ NamespaceId: namespaceID, State: &schedulerpb.SchedulerMigrationState{ SchedulerState: &schedulerpb.SchedulerState{ @@ -153,20 +155,22 @@ func TestHandler_MigrateToWorkflow_Sentinel(t *testing.T) { require.NoError(t, registry.Register(newTestLibrary(logger, newRealSpecProcessor(ctrl, logger)))) h := scheduler.NewTestHandler(logger) - testEngine := chasmtest.NewEngine( - t, - registry, - chasmtest.WithExecutionKey[*scheduler.Scheduler](chasm.ExecutionKey{ + testEngine := chasmtest.NewEngine(t, registry) + _, err := chasm.StartExecution[*scheduler.Scheduler, struct{}]( + testEngine.EngineContext(), + chasm.ExecutionKey{ NamespaceID: namespaceID, BusinessID: scheduleID, - }), - chasmtest.WithRoot(func(ctx chasm.MutableContext) *scheduler.Scheduler { - return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID) - }), + }, + func(ctx chasm.MutableContext, _ struct{}) (*scheduler.Scheduler, error) { + return scheduler.NewSentinel(ctx, namespace, namespaceID, scheduleID), nil + }, + struct{}{}, ) + require.NoError(t, err) engineCtx := testEngine.EngineContext() - _, err := h.TestMigrateToWorkflow(engineCtx, &schedulerpb.MigrateToWorkflowRequest{ + _, err = h.TestMigrateToWorkflow(engineCtx, &schedulerpb.MigrateToWorkflowRequest{ NamespaceId: namespaceID, ScheduleId: scheduleID, }) From 54ecfd9b4ab870412d145cc0321142496f7c7a5c Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 10 Apr 2026 17:42:41 -0400 Subject: [PATCH 4/8] Address comments --- chasm/chasmtest/task_helpers.go | 86 +++++++++++++++++++++++++++++++++ chasm/chasmtest/test_engine.go | 46 ++++++++++-------- 2 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 chasm/chasmtest/task_helpers.go diff --git a/chasm/chasmtest/task_helpers.go b/chasm/chasmtest/task_helpers.go new file mode 100644 index 00000000000..0e1ad5167b2 --- /dev/null +++ b/chasm/chasmtest/task_helpers.go @@ -0,0 +1,86 @@ +package chasmtest + +import ( + "fmt" + + "go.temporal.io/server/chasm" +) + +// ExecutePureTask validates and executes a pure task atomically via [Engine.UpdateComponent]. +// It returns taskDropped=true if [chasm.PureTaskHandler.Validate] returns (false, nil), +// indicating the task is no longer relevant and was not executed. +// +// This helper ensures that Validate is always exercised alongside Execute, matching the real +// engine's behavior. Use [MockMutableContext] directly when you need to inspect the typed +// task payloads added to the context during execution. +func ExecutePureTask[C chasm.Component, T any]( + e *Engine, + component C, + handler chasm.PureTaskHandler[C, T], + attrs chasm.TaskAttributes, + task T, +) (taskDropped bool, err error) { + ref := e.Ref(component) + _, err = e.UpdateComponent( + e.EngineContext(), + ref, + func(mutableCtx chasm.MutableContext, c chasm.Component) error { + typedC, ok := c.(C) + if !ok { + return fmt.Errorf("component type mismatch: got %T", c) + } + var valid bool + valid, err = handler.Validate(mutableCtx, typedC, attrs, task) + if err != nil { + return err + } + if !valid { + taskDropped = true + return nil + } + return handler.Execute(mutableCtx, typedC, attrs, task) + }, + ) + return taskDropped, err +} + +// ExecuteSideEffectTask validates and executes a side-effect task. +// Validation runs via [Engine.ReadComponent] (read-only), and if valid, [chasm.SideEffectTaskHandler.Execute] +// is called with the engine context so that [chasm.UpdateComponent] and [chasm.ReadComponent] +// inside the handler route through the test engine. +// +// It returns taskDropped=true if [chasm.SideEffectTaskHandler.Validate] returns (false, nil), +// indicating the task is no longer relevant and was not executed. +// +// Use [MockMutableContext] directly when you need to inspect typed task payloads added +// during execution, since the real engine serializes them into history-layer tasks. +func ExecuteSideEffectTask[C chasm.Component, T any]( + e *Engine, + component C, + handler chasm.SideEffectTaskHandler[C, T], + attrs chasm.TaskAttributes, + task T, +) (taskDropped bool, err error) { + ref := e.Ref(component) + + var valid bool + if err = e.ReadComponent( + e.EngineContext(), + ref, + func(ctx chasm.Context, c chasm.Component) error { + typedC, ok := c.(C) + if !ok { + return fmt.Errorf("component type mismatch: got %T", c) + } + valid, err = handler.Validate(ctx, typedC, attrs, task) + return err + }, + ); err != nil { + return false, err + } + if !valid { + return true, nil + } + + return false, handler.Execute(e.EngineContext(), ref, attrs, task) +} diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index a10a0e29381..d98a45c3bf3 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/stretchr/testify/require" "go.temporal.io/api/serviceerror" @@ -17,6 +16,15 @@ import ( "go.temporal.io/server/common/testing/testlogger" ) +// WithTimeSource overrides the engine's default real-time clock with the provided time source. +// Pass a *clock.EventTimeSource when tests need to control what ctx.Now() returns inside handlers. +// The caller holds the reference and calls ts.Update(...) directly to advance time. +func WithTimeSource(ts clock.TimeSource) EngineOption { + return func(e *Engine) { + e.timeSource = ts + } +} + type ( EngineOption func(*Engine) @@ -25,18 +33,18 @@ type ( registry *chasm.Registry logger log.Logger metrics metrics.Handler - executions map[executionLookupKey]*execution + timeSource clock.TimeSource + executions map[executionKey]*execution } execution struct { - key chasm.ExecutionKey - node *chasm.Node - backend *chasm.MockNodeBackend - timeSource *clock.EventTimeSource - root chasm.RootComponent + key chasm.ExecutionKey + node *chasm.Node + backend *chasm.MockNodeBackend + root chasm.RootComponent } - executionLookupKey struct { + executionKey struct { namespaceID string businessID string } @@ -56,7 +64,8 @@ func NewEngine( registry: registry, logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), metrics: metrics.NoopMetricsHandler, - executions: make(map[executionLookupKey]*execution), + timeSource: clock.NewRealTimeSource(), + executions: make(map[executionKey]*execution), } for _, opt := range opts { @@ -110,7 +119,7 @@ func (e *Engine) StartExecution( } execution.root = root - e.executions[newExecutionLookupKey(execution.key)] = execution + e.executions[newExecutionKey(execution.key)] = execution serializedRef, err := execution.node.Ref(root) if err != nil { @@ -161,7 +170,7 @@ func (e *Engine) UpdateWithStartExecution( } execution.root = root - e.executions[newExecutionLookupKey(execution.key)] = execution + e.executions[newExecutionKey(execution.key)] = execution serializedRef, err := execution.node.Ref(root) if err != nil { @@ -229,8 +238,6 @@ func (e *Engine) NotifyExecution(chasm.ExecutionKey) {} func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { key = normalizeExecutionKey(key) - timeSource := clock.NewEventTimeSource() - timeSource.Update(time.Now()) backend := &chasm.MockNodeBackend{ HandleNextTransitionCount: func() int64 { return 2 }, HandleGetCurrentVersion: func() int64 { return 1 }, @@ -246,12 +253,11 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { }, } return &execution{ - key: key, - backend: backend, - timeSource: timeSource, + key: key, + backend: backend, node: chasm.NewEmptyTree( e.registry, - timeSource, + e.timeSource, backend, chasm.DefaultPathEncoder, e.logger, @@ -261,7 +267,7 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { } func (e *Engine) executionForKey(key chasm.ExecutionKey) (*execution, bool) { - execution, ok := e.executions[newExecutionLookupKey(normalizeExecutionKey(key))] + execution, ok := e.executions[newExecutionKey(normalizeExecutionKey(key))] return execution, ok } @@ -299,8 +305,8 @@ func (e *Engine) updateComponentInExecution( return mutableCtx.Ref(component) } -func newExecutionLookupKey(key chasm.ExecutionKey) executionLookupKey { - return executionLookupKey{ +func newExecutionKey(key chasm.ExecutionKey) executionKey { + return executionKey{ namespaceID: key.NamespaceID, businessID: key.BusinessID, } From ab0e979bb7d562aad741abeae8e7aaf2163eebbc Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Fri, 10 Apr 2026 18:54:03 -0400 Subject: [PATCH 5/8] Address comments --- chasm/chasmtest/task_helpers.go | 40 +++++++++++++--------- chasm/chasmtest/test_engine.go | 45 ++++++++++--------------- chasm/lib/callback/tasks_test.go | 52 ++++++++++++----------------- chasm/lib/scheduler/handler_test.go | 9 ++--- 4 files changed, 70 insertions(+), 76 deletions(-) diff --git a/chasm/chasmtest/task_helpers.go b/chasm/chasmtest/task_helpers.go index 0e1ad5167b2..d32d63f3efc 100644 --- a/chasm/chasmtest/task_helpers.go +++ b/chasm/chasmtest/task_helpers.go @@ -1,6 +1,7 @@ package chasmtest import ( + "context" "fmt" "go.temporal.io/server/chasm" @@ -10,19 +11,23 @@ import ( // It returns taskDropped=true if [chasm.PureTaskHandler.Validate] returns (false, nil), // indicating the task is no longer relevant and was not executed. // +// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, use +// [Engine.Ref] to look up the ref from the component instance. +// // This helper ensures that Validate is always exercised alongside Execute, matching the real -// engine's behavior. Use [MockMutableContext] directly when you need to inspect the typed -// task payloads added to the context during execution. +// engine's behavior. Use [chasm.MockMutableContext] directly when you need to inspect the +// typed task payloads added to the context during execution. func ExecutePureTask[C chasm.Component, T any]( + ctx context.Context, e *Engine, - component C, + ref chasm.ComponentRef, handler chasm.PureTaskHandler[C, T], attrs chasm.TaskAttributes, task T, ) (taskDropped bool, err error) { - ref := e.Ref(component) + engineCtx := chasm.NewEngineContext(ctx, e) _, err = e.UpdateComponent( - e.EngineContext(), + engineCtx, ref, func(mutableCtx chasm.MutableContext, c chasm.Component) error { typedC, ok := c.(C) @@ -45,34 +50,39 @@ func ExecutePureTask[C chasm.Component, T any]( } // ExecuteSideEffectTask validates and executes a side-effect task. -// Validation runs via [Engine.ReadComponent] (read-only), and if valid, [chasm.SideEffectTaskHandler.Execute] -// is called with the engine context so that [chasm.UpdateComponent] and [chasm.ReadComponent] -// inside the handler route through the test engine. +// Validation runs via [Engine.ReadComponent] (read-only), and if valid, +// [chasm.SideEffectTaskHandler.Execute] is called with an engine context so that +// [chasm.UpdateComponent] and [chasm.ReadComponent] inside the handler route through +// the test engine. // // It returns taskDropped=true if [chasm.SideEffectTaskHandler.Validate] returns (false, nil), // indicating the task is no longer relevant and was not executed. // -// Use [MockMutableContext] directly when you need to inspect typed task payloads added +// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, use +// [Engine.Ref] to look up the ref from the component instance. +// +// Use [chasm.MockMutableContext] directly when you need to inspect typed task payloads added // during execution, since the real engine serializes them into history-layer tasks. func ExecuteSideEffectTask[C chasm.Component, T any]( + ctx context.Context, e *Engine, - component C, + ref chasm.ComponentRef, handler chasm.SideEffectTaskHandler[C, T], attrs chasm.TaskAttributes, task T, ) (taskDropped bool, err error) { - ref := e.Ref(component) + engineCtx := chasm.NewEngineContext(ctx, e) var valid bool if err = e.ReadComponent( - e.EngineContext(), + engineCtx, ref, - func(ctx chasm.Context, c chasm.Component) error { + func(chasmCtx chasm.Context, c chasm.Component) error { typedC, ok := c.(C) if !ok { return fmt.Errorf("component type mismatch: got %T", c) } - valid, err = handler.Validate(ctx, typedC, attrs, task) + valid, err = handler.Validate(chasmCtx, typedC, attrs, task) return err }, ); err != nil { @@ -82,5 +92,5 @@ func ExecuteSideEffectTask[C chasm.Component, T any]( return true, nil } - return false, handler.Execute(e.EngineContext(), ref, attrs, task) + return false, handler.Execute(engineCtx, ref, attrs, task) } diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index d98a45c3bf3..27eb91cad30 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -75,10 +75,10 @@ func NewEngine( return e } -func (e *Engine) EngineContext() context.Context { - return chasm.NewEngineContext(context.Background(), e) -} - +// Ref returns a ComponentRef for a subcomponent attached to this engine. For root components, +// prefer constructing a ref directly with [chasm.NewComponentRef] using the execution key. +// Subcomponent refs cannot be constructed externally (the component path is unexported), so +// this method is needed when testing task handlers that operate on subcomponents. func (e *Engine) Ref(component chasm.Component) chasm.ComponentRef { for _, execution := range e.executions { ref, err := execution.node.Ref(component) @@ -100,7 +100,7 @@ func (e *Engine) StartExecution( startFn func(chasm.MutableContext) (chasm.RootComponent, error), _ ...chasm.TransitionOption, ) (chasm.StartExecutionResult, error) { - if _, ok := e.executionForKey(ref.ExecutionKey); ok { + if _, ok := e.executions[newExecutionKey(ref.ExecutionKey)]; ok { return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID) } @@ -140,7 +140,7 @@ func (e *Engine) UpdateWithStartExecution( updateFn func(chasm.MutableContext, chasm.Component) error, _ ...chasm.TransitionOption, ) (chasm.EngineUpdateWithStartExecutionResult, error) { - if execution, ok := e.executionForKey(ref.ExecutionKey); ok { + if execution, ok := e.executions[newExecutionKey(ref.ExecutionKey)]; ok { serializedRef, err := e.updateComponentInExecution(ctx, execution, ref, updateFn) if err != nil { return chasm.EngineUpdateWithStartExecutionResult{}, err @@ -227,17 +227,23 @@ func (e *Engine) PollComponent( } func (e *Engine) DeleteExecution( - context.Context, - chasm.ComponentRef, - chasm.DeleteExecutionRequest, + _ context.Context, + ref chasm.ComponentRef, + _ chasm.DeleteExecutionRequest, ) error { - return serviceerror.NewUnimplemented("chasmtest.Engine.DeleteExecution") + key := newExecutionKey(ref.ExecutionKey) + if _, ok := e.executions[key]; !ok { + return serviceerror.NewNotFound( + fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), + ) + } + delete(e.executions, key) + return nil } func (e *Engine) NotifyExecution(chasm.ExecutionKey) {} func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { - key = normalizeExecutionKey(key) backend := &chasm.MockNodeBackend{ HandleNextTransitionCount: func() int64 { return 2 }, HandleGetCurrentVersion: func() int64 { return 1 }, @@ -266,13 +272,8 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { } } -func (e *Engine) executionForKey(key chasm.ExecutionKey) (*execution, bool) { - execution, ok := e.executions[newExecutionKey(normalizeExecutionKey(key))] - return execution, ok -} - func (e *Engine) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { - execution, ok := e.executionForKey(ref.ExecutionKey) + execution, ok := e.executions[newExecutionKey(ref.ExecutionKey)] if !ok { return nil, serviceerror.NewNotFound( fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), @@ -311,13 +312,3 @@ func newExecutionKey(key chasm.ExecutionKey) executionKey { businessID: key.BusinessID, } } - -func normalizeExecutionKey(key chasm.ExecutionKey) chasm.ExecutionKey { - if key.NamespaceID == "" { - key.NamespaceID = "test-namespace-id" - } - if key.BusinessID == "" { - key.BusinessID = "test-workflow-id" - } - return key -} diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index bda70033017..01a35fa1f71 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -80,26 +80,6 @@ func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComp } } -func readCallbackFromEngine( - t *testing.T, - testEngine *chasmtest.Engine, - callback *Callback, -) *Callback { - t.Helper() - - var readCallback *Callback - _, err := chasm.ReadComponent[*Callback, chasm.ComponentRef, any, chasm.NoValue]( - testEngine.EngineContext(), - testEngine.Ref(callback), - func(cb *Callback, _ chasm.Context, _ any) (chasm.NoValue, error) { - readCallback = cb - return nil, nil - }, - nil, - ) - require.NoError(t, err) - return readCallback -} // Test the full executeInvocationTask flow with direct handler calls func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { @@ -239,14 +219,16 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { // Create completion completion := nexusrpc.CompleteOperationOptions{} + executionKey := chasm.ExecutionKey{ + NamespaceID: "namespace-id", + BusinessID: "workflow-id", + RunID: "run-id", + } testEngine := chasmtest.NewEngine(t, chasmRegistry) + engineCtx := chasm.NewEngineContext(context.Background(), testEngine) _, err = chasm.StartExecution[*mockNexusCompletionGetterComponent, struct{}]( - testEngine.EngineContext(), - chasm.ExecutionKey{ - NamespaceID: "namespace-id", - BusinessID: "workflow-id", - RunID: "run-id", - }, + engineCtx, + executionKey, func(ctx chasm.MutableContext, _ struct{}) (*mockNexusCompletionGetterComponent, error) { return &mockNexusCompletionGetterComponent{ completion: completion, @@ -257,15 +239,25 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { ) require.NoError(t, err) + callbackRef := testEngine.Ref(callback) err = handler.Execute( - testEngine.EngineContext(), - testEngine.Ref(callback), + engineCtx, + callbackRef, chasm.TaskAttributes{Destination: "http://localhost"}, &callbackspb.InvocationTask{Attempt: 0}, ) - // Verify the outcome and tasks - tc.assertOutcome(t, readCallbackFromEngine(t, testEngine, callback), err) + // Verify outcome by reading component state directly + var resultCallback *Callback + require.NoError(t, testEngine.ReadComponent( + engineCtx, + callbackRef, + func(_ chasm.Context, c chasm.Component) error { + resultCallback = c.(*Callback) + return nil + }, + )) + tc.assertOutcome(t, resultCallback, err) }) } } diff --git a/chasm/lib/scheduler/handler_test.go b/chasm/lib/scheduler/handler_test.go index 31b494d5371..209209ddeca 100644 --- a/chasm/lib/scheduler/handler_test.go +++ b/chasm/lib/scheduler/handler_test.go @@ -1,6 +1,7 @@ package scheduler_test import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -118,8 +119,9 @@ func TestHandler_CreateFromMigrationState_Sentinel(t *testing.T) { h := scheduler.NewTestHandler(logger) testEngine := chasmtest.NewEngine(t, registry) + engineCtx := chasm.NewEngineContext(context.Background(), testEngine) _, err := chasm.StartExecution[*scheduler.Scheduler, struct{}]( - testEngine.EngineContext(), + engineCtx, chasm.ExecutionKey{ NamespaceID: namespaceID, BusinessID: scheduleID, @@ -131,7 +133,6 @@ func TestHandler_CreateFromMigrationState_Sentinel(t *testing.T) { ) require.NoError(t, err) - engineCtx := testEngine.EngineContext() _, err = h.TestCreateFromMigrationState(engineCtx, &schedulerpb.CreateFromMigrationStateRequest{ NamespaceId: namespaceID, State: &schedulerpb.SchedulerMigrationState{ @@ -156,8 +157,9 @@ func TestHandler_MigrateToWorkflow_Sentinel(t *testing.T) { h := scheduler.NewTestHandler(logger) testEngine := chasmtest.NewEngine(t, registry) + engineCtx := chasm.NewEngineContext(context.Background(), testEngine) _, err := chasm.StartExecution[*scheduler.Scheduler, struct{}]( - testEngine.EngineContext(), + engineCtx, chasm.ExecutionKey{ NamespaceID: namespaceID, BusinessID: scheduleID, @@ -169,7 +171,6 @@ func TestHandler_MigrateToWorkflow_Sentinel(t *testing.T) { ) require.NoError(t, err) - engineCtx := testEngine.EngineContext() _, err = h.TestMigrateToWorkflow(engineCtx, &schedulerpb.MigrateToWorkflowRequest{ NamespaceId: namespaceID, ScheduleId: scheduleID, From 2e57fc94e4e0d0c6504563e6978c465137220cda Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Mon, 13 Apr 2026 10:07:15 -0400 Subject: [PATCH 6/8] fix formatting in tasks_test.go --- chasm/chasmtest/test_engine.go | 6 +++--- chasm/lib/callback/tasks_test.go | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 27eb91cad30..8a9689996f7 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -190,7 +190,7 @@ func (e *Engine) UpdateComponent( updateFn func(chasm.MutableContext, chasm.Component) error, _ ...chasm.TransitionOption, ) ([]byte, error) { - execution, err := e.mustExecutionForRef(ref) + execution, err := e.executionForRef(ref) if err != nil { return nil, err } @@ -203,7 +203,7 @@ func (e *Engine) ReadComponent( readFn func(chasm.Context, chasm.Component) error, _ ...chasm.TransitionOption, ) error { - execution, err := e.mustExecutionForRef(ref) + execution, err := e.executionForRef(ref) if err != nil { return err } @@ -272,7 +272,7 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { } } -func (e *Engine) mustExecutionForRef(ref chasm.ComponentRef) (*execution, error) { +func (e *Engine) executionForRef(ref chasm.ComponentRef) (*execution, error) { execution, ok := e.executions[newExecutionKey(ref.ExecutionKey)] if !ok { return nil, serviceerror.NewNotFound( diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index 01a35fa1f71..2e87a2997bb 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -80,7 +80,6 @@ func (l *mockNexusCompletionGetterLibrary) Components() []*chasm.RegistrableComp } } - // Test the full executeInvocationTask flow with direct handler calls func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { cases := []struct { From 66b376f6389669677727f38d09ed1aa8d19be4a3 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 15 Apr 2026 12:24:00 -0400 Subject: [PATCH 7/8] Remove Ref() helper from chasmtest.Engine Use ctx.Ref(component) + DeserializeComponentRef inside a ReadComponent callback instead, which is the idiomatic approach available on the public chasm.Context interface. Co-Authored-By: Claude Sonnet 4.6 --- chasm/chasmtest/test_engine.go | 20 -------------------- chasm/lib/callback/tasks_test.go | 12 +++++++++++- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 8a9689996f7..55a19b84d0d 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -5,7 +5,6 @@ import ( "fmt" "testing" - "github.com/stretchr/testify/require" "go.temporal.io/api/serviceerror" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" @@ -75,25 +74,6 @@ func NewEngine( return e } -// Ref returns a ComponentRef for a subcomponent attached to this engine. For root components, -// prefer constructing a ref directly with [chasm.NewComponentRef] using the execution key. -// Subcomponent refs cannot be constructed externally (the component path is unexported), so -// this method is needed when testing task handlers that operate on subcomponents. -func (e *Engine) Ref(component chasm.Component) chasm.ComponentRef { - for _, execution := range e.executions { - ref, err := execution.node.Ref(component) - if err != nil { - continue - } - structuredRef, err := chasm.DeserializeComponentRef(ref) - require.NoError(e.t, err) - return structuredRef - } - - e.t.Fatalf("component %T is not attached to the chasmtest engine", component) - return chasm.ComponentRef{} -} - func (e *Engine) StartExecution( ctx context.Context, ref chasm.ComponentRef, diff --git a/chasm/lib/callback/tasks_test.go b/chasm/lib/callback/tasks_test.go index 2e87a2997bb..b455d529253 100644 --- a/chasm/lib/callback/tasks_test.go +++ b/chasm/lib/callback/tasks_test.go @@ -238,7 +238,17 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) { ) require.NoError(t, err) - callbackRef := testEngine.Ref(callback) + rootRef := chasm.NewComponentRef[*mockNexusCompletionGetterComponent](executionKey) + var callbackRef chasm.ComponentRef + require.NoError(t, testEngine.ReadComponent(engineCtx, rootRef, func(chasmCtx chasm.Context, _ chasm.Component) error { + serialized, err := chasmCtx.Ref(callback) + if err != nil { + return err + } + callbackRef, err = chasm.DeserializeComponentRef(serialized) + return err + })) + err = handler.Execute( engineCtx, callbackRef, From d42ca8124ae59cee5bfbe16cfbcf5d4ef02600f6 Mon Sep 17 00:00:00 2001 From: Alan Wu Date: Wed, 15 Apr 2026 13:48:52 -0400 Subject: [PATCH 8/8] Add conflict policy handling and fuller impl of MockNodeBackend --- chasm/chasmtest/task_helpers.go | 12 +- chasm/chasmtest/test_engine.go | 629 ++++++++++++++++++++++++++------ 2 files changed, 531 insertions(+), 110 deletions(-) diff --git a/chasm/chasmtest/task_helpers.go b/chasm/chasmtest/task_helpers.go index d32d63f3efc..c92836887a8 100644 --- a/chasm/chasmtest/task_helpers.go +++ b/chasm/chasmtest/task_helpers.go @@ -11,8 +11,9 @@ import ( // It returns taskDropped=true if [chasm.PureTaskHandler.Validate] returns (false, nil), // indicating the task is no longer relevant and was not executed. // -// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, use -// [Engine.Ref] to look up the ref from the component instance. +// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, obtain +// the ref via [chasm.Context.Ref] inside a [Engine.ReadComponent] or [Engine.UpdateComponent] +// callback and deserialize it with [chasm.DeserializeComponentRef]. // // This helper ensures that Validate is always exercised alongside Execute, matching the real // engine's behavior. Use [chasm.MockMutableContext] directly when you need to inspect the @@ -58,11 +59,12 @@ func ExecutePureTask[C chasm.Component, T any]( // It returns taskDropped=true if [chasm.SideEffectTaskHandler.Validate] returns (false, nil), // indicating the task is no longer relevant and was not executed. // -// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, use -// [Engine.Ref] to look up the ref from the component instance. +// For root components, construct ref with [chasm.NewComponentRef]. For subcomponents, obtain +// the ref via [chasm.Context.Ref] inside a [Engine.ReadComponent] or [Engine.UpdateComponent] +// callback and deserialize it with [chasm.DeserializeComponentRef]. // // Use [chasm.MockMutableContext] directly when you need to inspect typed task payloads added -// during execution, since the real engine serializes them into history-layer tasks. +// during execution, since the real engine serializes them into history layer tasks. func ExecuteSideEffectTask[C chasm.Component, T any]( ctx context.Context, e *Engine, diff --git a/chasm/chasmtest/test_engine.go b/chasm/chasmtest/test_engine.go index 55a19b84d0d..addb7b67067 100644 --- a/chasm/chasmtest/test_engine.go +++ b/chasm/chasmtest/test_engine.go @@ -3,9 +3,12 @@ package chasmtest import ( "context" "fmt" + "sync" "testing" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/common/clock" @@ -13,6 +16,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/testing/testlogger" + "go.temporal.io/server/service/history/tasks" ) // WithTimeSource overrides the engine's default real-time clock with the provided time source. @@ -27,28 +31,59 @@ func WithTimeSource(ts clock.TimeSource) EngineOption { type ( EngineOption func(*Engine) + // Engine is a lightweight in-memory CHASM engine for unit tests. It implements + // [chasm.Engine] and supports the full set of conflict and reuse policies, as + // well as blocking [PollComponent] with [NotifyExecution] — matching the behavior + // of the production engine as closely as possible without persistence or shard logic. Engine struct { t *testing.T registry *chasm.Registry logger log.Logger metrics metrics.Handler timeSource clock.TimeSource - executions map[executionKey]*execution + // current maps (namespaceID, businessID) -> the latest run (running or closed). + current map[businessKey]*execution + // all maps (namespaceID, businessID, runID) -> any run, for lookups by specific RunID. + all map[runKey]*execution + notifier *executionNotifier } + executionStatus int + execution struct { - key chasm.ExecutionKey - node *chasm.Node - backend *chasm.MockNodeBackend - root chasm.RootComponent + key chasm.ExecutionKey + node *chasm.Node + backend *chasm.MockNodeBackend + root chasm.RootComponent + status executionStatus + requestID string + // failed is only meaningful when status == executionStatusClosed. + // It controls whether AllowDuplicateFailedOnly reuse policy permits a new run. + failed bool } - executionKey struct { + businessKey struct { namespaceID string businessID string } + + runKey struct { + namespaceID string + businessID string + runID string + } +) + +const ( + executionStatusRunning executionStatus = iota + executionStatusClosed ) +var defaultTransitionOptions = chasm.TransitionOptions{ + ReusePolicy: chasm.BusinessIDReusePolicyAllowDuplicate, + ConflictPolicy: chasm.BusinessIDConflictPolicyFail, +} + var _ chasm.Engine = (*Engine)(nil) func NewEngine( @@ -64,7 +99,9 @@ func NewEngine( logger: testlogger.NewTestLogger(t, testlogger.FailOnExpectedErrorOnly), metrics: metrics.NoopMetricsHandler, timeSource: clock.NewRealTimeSource(), - executions: make(map[executionKey]*execution), + current: make(map[businessKey]*execution), + all: make(map[runKey]*execution), + notifier: newExecutionNotifier(), } for _, opt := range opts { @@ -74,43 +111,78 @@ func NewEngine( return e } -func (e *Engine) StartExecution( - ctx context.Context, - ref chasm.ComponentRef, - startFn func(chasm.MutableContext) (chasm.RootComponent, error), - _ ...chasm.TransitionOption, -) (chasm.StartExecutionResult, error) { - if _, ok := e.executions[newExecutionKey(ref.ExecutionKey)]; ok { - return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr("already exists", "", ref.RunID) - } - - execution := e.newExecution(ref.ExecutionKey) - mutableCtx := chasm.NewMutableContext(ctx, execution.node) - root, err := startFn(mutableCtx) +// CloseExecution marks the execution identified by ref as closed, removing it from the +// set of running executions. Set failed=true to simulate a failed or terminated execution, +// which affects the [chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly] reuse policy. +// A closed execution can still be read via [Engine.ReadComponent] using its specific RunID, +// but it will no longer be returned as the current run for the businessID. +func (e *Engine) CloseExecution(_ context.Context, ref chasm.ComponentRef, failed bool) error { + exec, err := e.executionForRef(ref) if err != nil { - return chasm.StartExecutionResult{}, err + return err } - if err := execution.node.SetRootComponent(root); err != nil { - return chasm.StartExecutionResult{}, err + exec.status = executionStatusClosed + exec.failed = failed + status := enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED + if failed { + status = enumspb.WORKFLOW_EXECUTION_STATUS_FAILED } - _, err = execution.node.CloseTransaction() + // Keep the backend execution state consistent so that CloseTransaction correctly + // skips the lifecycle-change logic on any subsequent UpdateComponent calls. + _, _ = exec.backend.UpdateWorkflowStateStatus(enumsspb.WORKFLOW_EXECUTION_STATE_COMPLETED, status) + return nil +} + +// Tasks returns all tasks scheduled for the execution identified by ref, grouped by category. +// Tasks accumulate across every [Engine.UpdateComponent], [Engine.StartExecution], and +// [Engine.UpdateWithStartExecution] call on the execution, matching what the real engine +// would deliver to task processors. +func (e *Engine) Tasks(ref chasm.ComponentRef) (map[tasks.Category][]tasks.Task, error) { + exec, err := e.executionForRef(ref) if err != nil { - return chasm.StartExecutionResult{}, err + return nil, err + } + // Return a shallow copy so callers cannot mutate the internal task lists. + result := make(map[tasks.Category][]tasks.Task, len(exec.backend.TasksByCategory)) + for cat, ts := range exec.backend.TasksByCategory { + result[cat] = ts } + return result, nil +} - execution.root = root - e.executions[newExecutionKey(execution.key)] = execution +func (e *Engine) StartExecution( + ctx context.Context, + ref chasm.ComponentRef, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + opts ...chasm.TransitionOption, +) (chasm.StartExecutionResult, error) { + options := constructTransitionOptions(opts...) + bKey := newBusinessKey(ref.ExecutionKey) + + current, hasCurrent := e.current[bKey] + if hasCurrent { + // Idempotency: if the requestID matches the original create request, return the existing run. + if options.RequestID != "" && options.RequestID == current.requestID { + serializedRef, err := current.node.Ref(current.root) + if err != nil { + return chasm.StartExecutionResult{}, err + } + return chasm.StartExecutionResult{ + ExecutionKey: current.key, + ExecutionRef: serializedRef, + Created: false, + }, nil + } - serializedRef, err := execution.node.Ref(root) - if err != nil { - return chasm.StartExecutionResult{}, err + switch current.status { + case executionStatusRunning: + return e.handleConflictPolicy(ctx, ref, current, startFn, options) + case executionStatusClosed: + return e.handleReusePolicy(ctx, ref, current, startFn, options) + } } - return chasm.StartExecutionResult{ - ExecutionKey: execution.key, - ExecutionRef: serializedRef, - Created: true, - }, nil + return e.startNew(ctx, ref.ExecutionKey, startFn, options.RequestID) } func (e *Engine) UpdateWithStartExecution( @@ -118,50 +190,58 @@ func (e *Engine) UpdateWithStartExecution( ref chasm.ComponentRef, startFn func(chasm.MutableContext) (chasm.RootComponent, error), updateFn func(chasm.MutableContext, chasm.Component) error, - _ ...chasm.TransitionOption, + opts ...chasm.TransitionOption, ) (chasm.EngineUpdateWithStartExecutionResult, error) { - if execution, ok := e.executions[newExecutionKey(ref.ExecutionKey)]; ok { - serializedRef, err := e.updateComponentInExecution(ctx, execution, ref, updateFn) - if err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err + options := constructTransitionOptions(opts...) + bKey := newBusinessKey(ref.ExecutionKey) + + current, hasCurrent := e.current[bKey] + if hasCurrent { + if current.status == executionStatusRunning { + // Execution is running — just apply the update, no start. + serializedRef, err := e.updateComponentInExecution(ctx, current, ref, updateFn) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + return chasm.EngineUpdateWithStartExecutionResult{ + ExecutionKey: current.key, + ExecutionRef: serializedRef, + Created: false, + }, nil } - return chasm.EngineUpdateWithStartExecutionResult{ - ExecutionKey: execution.key, - ExecutionRef: serializedRef, - Created: false, - }, nil - } - - execution := e.newExecution(ref.ExecutionKey) - mutableCtx := chasm.NewMutableContext(ctx, execution.node) - root, err := startFn(mutableCtx) - if err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err - } - if err := execution.node.SetRootComponent(root); err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err - } - if err := updateFn(mutableCtx, root); err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err - } - _, err = execution.node.CloseTransaction() - if err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err - } - - execution.root = root - e.executions[newExecutionKey(execution.key)] = execution - serializedRef, err := execution.node.Ref(root) - if err != nil { - return chasm.EngineUpdateWithStartExecutionResult{}, err + // Execution is closed — check reuse policy before starting a new one. + switch options.ReusePolicy { + case chasm.BusinessIDReusePolicyAllowDuplicate: + // No restriction; fall through to start+update. + case chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly: + if !current.failed { + return chasm.EngineUpdateWithStartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr( + fmt.Sprintf( + "CHASM execution already completed successfully. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", + ref.BusinessID, current.key.RunID, options.ReusePolicy, + ), + current.requestID, + current.key.RunID, + ) + } + case chasm.BusinessIDReusePolicyRejectDuplicate: + return chasm.EngineUpdateWithStartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr( + fmt.Sprintf( + "CHASM execution already finished. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", + ref.BusinessID, current.key.RunID, options.ReusePolicy, + ), + current.requestID, + current.key.RunID, + ) + default: + return chasm.EngineUpdateWithStartExecutionResult{}, serviceerror.NewInternal( + fmt.Sprintf("unknown business ID reuse policy: %v", options.ReusePolicy), + ) + } } - return chasm.EngineUpdateWithStartExecutionResult{ - ExecutionKey: execution.key, - ExecutionRef: serializedRef, - Created: true, - }, nil + return e.startAndUpdateNew(ctx, ref.ExecutionKey, startFn, updateFn, options.RequestID) } func (e *Engine) UpdateComponent( @@ -188,22 +268,79 @@ func (e *Engine) ReadComponent( return err } - component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) + chasmCtx := chasm.NewContext(ctx, execution.node) + component, err := execution.node.Component(chasmCtx, ref) if err != nil { return err } - readCtx := chasm.NewContext(ctx, execution.node) - return readFn(readCtx, component) + return readFn(chasmCtx, component) } +// PollComponent waits until the supplied predicate is satisfied when evaluated against the +// component identified by ref. If the predicate is true immediately it returns without blocking. +// Otherwise it subscribes to [NotifyExecution] signals and re-evaluates after each one, just +// like the production engine. Returns (nil, nil) if ctx is cancelled (long-poll timeout +// semantics — the caller should re-poll). func (e *Engine) PollComponent( - context.Context, - chasm.ComponentRef, - func(chasm.Context, chasm.Component) (bool, error), - ...chasm.TransitionOption, + ctx context.Context, + ref chasm.ComponentRef, + predicate func(chasm.Context, chasm.Component) (bool, error), + _ ...chasm.TransitionOption, ) ([]byte, error) { - return nil, serviceerror.NewUnimplemented("chasmtest.Engine.PollComponent") + executionKey := ref.ExecutionKey + + checkPredicate := func() ([]byte, bool, error) { + exec, err := e.executionForRef(ref) + if err != nil { + return nil, false, err + } + chasmCtx := chasm.NewContext(ctx, exec.node) + component, err := exec.node.Component(chasmCtx, ref) + if err != nil { + return nil, false, err + } + satisfied, err := predicate(chasmCtx, component) + if err != nil || !satisfied { + return nil, satisfied, err + } + serializedRef, err := exec.node.Ref(component) + return serializedRef, true, err + } + + // Evaluate once before subscribing. + serializedRef, satisfied, err := checkPredicate() + if err != nil || satisfied { + return serializedRef, err + } + + for { + ch, unsubscribe := e.notifier.subscribe(executionKey) + // Re-evaluate while holding the subscription to avoid missing a notification + // that arrives between the failed check above and this subscribe call. + serializedRef, satisfied, err = checkPredicate() + if err != nil || satisfied { + unsubscribe() + return serializedRef, err + } + + select { + case <-ch: + unsubscribe() + serializedRef, satisfied, err = checkPredicate() + if err != nil || satisfied { + return serializedRef, err + } + case <-ctx.Done(): + unsubscribe() + return nil, nil //nolint:nilerr // nil, nil = long-poll timeout; caller should re-poll + } + } +} + +// NotifyExecution wakes up any [PollComponent] callers waiting on the execution. +func (e *Engine) NotifyExecution(key chasm.ExecutionKey) { + e.notifier.notify(key) } func (e *Engine) DeleteExecution( @@ -211,32 +348,236 @@ func (e *Engine) DeleteExecution( ref chasm.ComponentRef, _ chasm.DeleteExecutionRequest, ) error { - key := newExecutionKey(ref.ExecutionKey) - if _, ok := e.executions[key]; !ok { - return serviceerror.NewNotFound( - fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), - ) + exec, err := e.executionForRef(ref) + if err != nil { + return err + } + rKey := newRunKey(exec.key) + bKey := newBusinessKey(exec.key) + delete(e.all, rKey) + // Only evict from current if this is still the current run for the businessID. + if cur, ok := e.current[bKey]; ok && cur == exec { + delete(e.current, bKey) } - delete(e.executions, key) return nil } -func (e *Engine) NotifyExecution(chasm.ExecutionKey) {} +// handleConflictPolicy is called when a StartExecution arrives for a businessID whose +// current run is still running. +func (e *Engine) handleConflictPolicy( + ctx context.Context, + ref chasm.ComponentRef, + current *execution, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + options chasm.TransitionOptions, +) (chasm.StartExecutionResult, error) { + switch options.ConflictPolicy { + case chasm.BusinessIDConflictPolicyFail: + return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr( + fmt.Sprintf( + "CHASM execution still running. BusinessID: %s, RunID: %s, ID Conflict Policy: %v", + ref.BusinessID, current.key.RunID, options.ConflictPolicy, + ), + current.requestID, + current.key.RunID, + ) + case chasm.BusinessIDConflictPolicyTerminateExisting: + current.status = executionStatusClosed + current.failed = true + return e.startNew(ctx, ref.ExecutionKey, startFn, options.RequestID) + case chasm.BusinessIDConflictPolicyUseExisting: + serializedRef, err := current.node.Ref(current.root) + if err != nil { + return chasm.StartExecutionResult{}, err + } + return chasm.StartExecutionResult{ + ExecutionKey: current.key, + ExecutionRef: serializedRef, + Created: false, + }, nil + default: + return chasm.StartExecutionResult{}, serviceerror.NewInternal( + fmt.Sprintf("unknown business ID conflict policy: %v", options.ConflictPolicy), + ) + } +} + +// handleReusePolicy is called when a StartExecution arrives for a businessID whose +// current run is closed/completed. +func (e *Engine) handleReusePolicy( + ctx context.Context, + ref chasm.ComponentRef, + current *execution, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + options chasm.TransitionOptions, +) (chasm.StartExecutionResult, error) { + switch options.ReusePolicy { + case chasm.BusinessIDReusePolicyAllowDuplicate: + // No restriction. + case chasm.BusinessIDReusePolicyAllowDuplicateFailedOnly: + if !current.failed { + return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr( + fmt.Sprintf( + "CHASM execution already completed successfully. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", + ref.BusinessID, current.key.RunID, options.ReusePolicy, + ), + current.requestID, + current.key.RunID, + ) + } + case chasm.BusinessIDReusePolicyRejectDuplicate: + return chasm.StartExecutionResult{}, chasm.NewExecutionAlreadyStartedErr( + fmt.Sprintf( + "CHASM execution already finished. BusinessID: %s, RunID: %s, ID Reuse Policy: %v", + ref.BusinessID, current.key.RunID, options.ReusePolicy, + ), + current.requestID, + current.key.RunID, + ) + default: + return chasm.StartExecutionResult{}, serviceerror.NewInternal( + fmt.Sprintf("unknown business ID reuse policy: %v", options.ReusePolicy), + ) + } + return e.startNew(ctx, ref.ExecutionKey, startFn, options.RequestID) +} + +// startNew creates a new execution and registers it as the current run for the businessID. +func (e *Engine) startNew( + ctx context.Context, + key chasm.ExecutionKey, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + requestID string, +) (chasm.StartExecutionResult, error) { + exec := e.newExecution(key) + exec.requestID = requestID + + mutableCtx := chasm.NewMutableContext(ctx, exec.node) + root, err := startFn(mutableCtx) + if err != nil { + return chasm.StartExecutionResult{}, err + } + if err := exec.node.SetRootComponent(root); err != nil { + return chasm.StartExecutionResult{}, err + } + if _, err = exec.node.CloseTransaction(); err != nil { + return chasm.StartExecutionResult{}, err + } + + exec.root = root + e.current[newBusinessKey(exec.key)] = exec + e.all[newRunKey(exec.key)] = exec + + serializedRef, err := exec.node.Ref(root) + if err != nil { + return chasm.StartExecutionResult{}, err + } + + return chasm.StartExecutionResult{ + ExecutionKey: exec.key, + ExecutionRef: serializedRef, + Created: true, + }, nil +} + +// startAndUpdateNew creates a new execution, applies startFn and updateFn in the same +// transaction, and registers it as the current run for the businessID. +func (e *Engine) startAndUpdateNew( + ctx context.Context, + key chasm.ExecutionKey, + startFn func(chasm.MutableContext) (chasm.RootComponent, error), + updateFn func(chasm.MutableContext, chasm.Component) error, + requestID string, +) (chasm.EngineUpdateWithStartExecutionResult, error) { + exec := e.newExecution(key) + exec.requestID = requestID + + mutableCtx := chasm.NewMutableContext(ctx, exec.node) + root, err := startFn(mutableCtx) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + if err := exec.node.SetRootComponent(root); err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + if err := updateFn(mutableCtx, root); err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + if _, err = exec.node.CloseTransaction(); err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + + exec.root = root + e.current[newBusinessKey(exec.key)] = exec + e.all[newRunKey(exec.key)] = exec + + serializedRef, err := exec.node.Ref(root) + if err != nil { + return chasm.EngineUpdateWithStartExecutionResult{}, err + } + + return chasm.EngineUpdateWithStartExecutionResult{ + ExecutionKey: exec.key, + ExecutionRef: serializedRef, + Created: true, + }, nil +} func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { + // bsMu guards transitionCount and execState, which are shared across handler closures. + // This is a separate mutex from MockNodeBackend's internal mu to avoid deadlocks. + var ( + bsMu sync.Mutex + transitionCount int64 = 1 + execState = persistencespb.WorkflowExecutionState{ + State: enumsspb.WORKFLOW_EXECUTION_STATE_CREATED, + Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, + } + ) + backend := &chasm.MockNodeBackend{ - HandleNextTransitionCount: func() int64 { return 2 }, - HandleGetCurrentVersion: func() int64 { return 1 }, - HandleGetWorkflowKey: func() definition.WorkflowKey { - return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID) + // NextTransitionCount increments on every CloseTransaction call, matching + // the real engine's per-transition monotonic counter. + HandleNextTransitionCount: func() int64 { + bsMu.Lock() + defer bsMu.Unlock() + transitionCount++ + return transitionCount }, - HandleIsWorkflow: func() bool { return false }, + // CurrentVersionedTransition reflects the latest committed transition count. HandleCurrentVersionedTransition: func() *persistencespb.VersionedTransition { + bsMu.Lock() + defer bsMu.Unlock() return &persistencespb.VersionedTransition{ NamespaceFailoverVersion: 1, - TransitionCount: 1, + TransitionCount: transitionCount, } }, + HandleGetCurrentVersion: func() int64 { return 1 }, + HandleGetWorkflowKey: func() definition.WorkflowKey { + return definition.NewWorkflowKey(key.NamespaceID, key.BusinessID, key.RunID) + }, + HandleIsWorkflow: func() bool { return false }, + // GetExecutionState returns the current lifecycle state, which CloseTransaction + // uses to decide whether to call UpdateWorkflowStateStatus. + HandleGetExecutionState: func() *persistencespb.WorkflowExecutionState { + bsMu.Lock() + defer bsMu.Unlock() + return &persistencespb.WorkflowExecutionState{ + State: execState.State, + Status: execState.Status, + } + }, + // UpdateWorkflowStateStatus is called by CloseTransaction when the root + // component's LifecycleState changes (Running → Completed/Failed/Terminated). + HandleUpdateWorkflowStateStatus: func(state enumsspb.WorkflowExecutionState, status enumspb.WorkflowExecutionStatus) (bool, error) { + bsMu.Lock() + defer bsMu.Unlock() + changed := execState.State != state || execState.Status != status + execState.State = state + execState.Status = status + return changed, nil + }, } return &execution{ key: key, @@ -252,14 +593,25 @@ func (e *Engine) newExecution(key chasm.ExecutionKey) *execution { } } +// executionForRef looks up an execution by the ref's RunID when present, or falls back +// to the current run for the businessID when RunID is empty. func (e *Engine) executionForRef(ref chasm.ComponentRef) (*execution, error) { - execution, ok := e.executions[newExecutionKey(ref.ExecutionKey)] + if ref.RunID != "" { + exec, ok := e.all[newRunKey(ref.ExecutionKey)] + if !ok { + return nil, serviceerror.NewNotFound( + fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), + ) + } + return exec, nil + } + exec, ok := e.current[newBusinessKey(ref.ExecutionKey)] if !ok { return nil, serviceerror.NewNotFound( - fmt.Sprintf("execution not found: namespace=%q business_id=%q run_id=%q", ref.NamespaceID, ref.BusinessID, ref.RunID), + fmt.Sprintf("execution not found: namespace=%q business_id=%q", ref.NamespaceID, ref.BusinessID), ) } - return execution, nil + return exec, nil } func (e *Engine) updateComponentInExecution( @@ -268,7 +620,8 @@ func (e *Engine) updateComponentInExecution( ref chasm.ComponentRef, updateFn func(chasm.MutableContext, chasm.Component) error, ) ([]byte, error) { - component, err := execution.node.Component(chasm.NewContext(ctx, execution.node), ref) + chasmCtx := chasm.NewContext(ctx, execution.node) + component, err := execution.node.Component(chasmCtx, ref) if err != nil { return nil, err } @@ -278,17 +631,83 @@ func (e *Engine) updateComponentInExecution( return nil, err } - _, err = execution.node.CloseTransaction() - if err != nil { + if _, err = execution.node.CloseTransaction(); err != nil { return nil, err } return mutableCtx.Ref(component) } -func newExecutionKey(key chasm.ExecutionKey) executionKey { - return executionKey{ - namespaceID: key.NamespaceID, - businessID: key.BusinessID, +func constructTransitionOptions(opts ...chasm.TransitionOption) chasm.TransitionOptions { + options := defaultTransitionOptions + for _, opt := range opts { + opt(&options) + } + // NOTE: TransitionOptions.Speculative is intentionally not implemented here — it is also + // unimplemented in the production engine (see the TODO in service/history/chasm_engine.go). + return options +} + +func newBusinessKey(key chasm.ExecutionKey) businessKey { + return businessKey{namespaceID: key.NamespaceID, businessID: key.BusinessID} +} + +func newRunKey(key chasm.ExecutionKey) runKey { + return runKey{namespaceID: key.NamespaceID, businessID: key.BusinessID, runID: key.RunID} +} + +// executionNotifier allows [PollComponent] callers to subscribe to state-change +// signals for a given execution. [notify] closes the channel for all current +// subscribers; each subscriber must resubscribe after being woken. +type executionNotifier struct { + mu sync.Mutex + subscribers map[chasm.ExecutionKey][]chan struct{} +} + +func newExecutionNotifier() *executionNotifier { + return &executionNotifier{ + subscribers: make(map[chasm.ExecutionKey][]chan struct{}), + } +} + +// subscribe returns a channel that will be closed on the next [notify] for key, +// and an unsubscribe function that must be called when the caller is done waiting. +func (n *executionNotifier) subscribe(key chasm.ExecutionKey) (<-chan struct{}, func()) { + ch := make(chan struct{}) + n.mu.Lock() + n.subscribers[key] = append(n.subscribers[key], ch) + n.mu.Unlock() + + unsubscribed := false + unsubscribe := func() { + n.mu.Lock() + defer n.mu.Unlock() + if unsubscribed { + return + } + unsubscribed = true + subs := n.subscribers[key] + for i, s := range subs { + if s == ch { + n.subscribers[key] = append(subs[:i], subs[i+1:]...) + if len(n.subscribers[key]) == 0 { + delete(n.subscribers, key) + } + break + } + } + } + return ch, unsubscribe +} + +// notify closes all subscriber channels for key, waking any blocked [PollComponent] callers. +func (n *executionNotifier) notify(key chasm.ExecutionKey) { + n.mu.Lock() + subs := n.subscribers[key] + delete(n.subscribers, key) + n.mu.Unlock() + + for _, ch := range subs { + close(ch) } }