From 43fb559fe9b3867ad80465d0cb8f3cfc0f2cbc3a Mon Sep 17 00:00:00 2001 From: sergeyb Date: Wed, 29 Apr 2026 19:56:42 +0000 Subject: [PATCH] feat(speculate): naive happy-path speculation in speculate controller MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the unconditional fan-out with a state-machine driver that advances a batch one step per invocation along a single happy-path speculation chain (batch.Dependencies + [batch.ID]): - Created/Scored → publish to build, Speculating - Speculating + deps OK → publish to merge, Merging - Speculating + dep failed → fail batch, log per request, publish to conclude - Speculating, waiting → no-op (re-evaluated when deps advance) - Merging/terminal → no-op - Unrecognized state → error (nack so the event is retried) When a dependency reaches Failed/Cancelled the batch cannot land on top of it; rather than livelock in Speculating forever the controller now emits a RequestStatusError log per contained request, transitions the batch to Failed, and publishes to conclude so the request store and log get reconciled. Assumes every in-flight build will pass and ignores score; failure replanning, score-driven prioritization, and the full Java-style SpeculationTree (2^N orderings + scoring) are explicit TODOs. Publishes before CAS-updating state so a publish failure leaves state unchanged for retry. Also renames BatchStateFinalizing → BatchStateMerging across the entity, batch controller, and tests. --- entity/batch.go | 4 +- entity/batch_test.go | 2 +- orchestrator/controller/batch/batch.go | 2 +- orchestrator/controller/merge/merge_test.go | 12 +- .../controller/speculate/speculate.go | 184 ++++++++++-- .../controller/speculate/speculate_test.go | 275 ++++++++++++++---- 6 files changed, 391 insertions(+), 88 deletions(-) diff --git a/entity/batch.go b/entity/batch.go index 1e8e2a55..7ab7bc84 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -26,8 +26,8 @@ const ( BatchStateCreated BatchState = "created" // BatchStateSpeculating is the state of a batch that is undergoing speculative execution. BatchStateSpeculating BatchState = "speculating" - // BatchStateFinalizing is the state of a batch that is being finalized after speculative execution. - BatchStateFinalizing BatchState = "finalizing" + // BatchStateMerging is the state of a batch that is being merged after speculative execution. + BatchStateMerging BatchState = "merging" // BatchStateSucceeded is the terminal state of a batch that has been successfully landed. BatchStateSucceeded BatchState = "succeeded" // BatchStateFailed is the terminal state of a batch that has failed. diff --git a/entity/batch_test.go b/entity/batch_test.go index 58d7dc4d..ea5fc95b 100644 --- a/entity/batch_test.go +++ b/entity/batch_test.go @@ -30,7 +30,7 @@ func TestBatchState_IsTerminal(t *testing.T) { {name: "unknown", state: BatchStateUnknown, terminal: false}, {name: "created", state: BatchStateCreated, terminal: false}, {name: "speculating", state: BatchStateSpeculating, terminal: false}, - {name: "finalizing", state: BatchStateFinalizing, terminal: false}, + {name: "merging", state: BatchStateMerging, terminal: false}, {name: "succeeded", state: BatchStateSucceeded, terminal: true}, {name: "failed", state: BatchStateFailed, terminal: true}, {name: "cancelled", state: BatchStateCancelled, terminal: true}, diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index ddfb5259..9330c7b4 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -119,7 +119,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, entity.BatchStateSpeculating, - entity.BatchStateFinalizing, + entity.BatchStateMerging, }) if err != nil { c.metricsScope.Counter("batch_store_errors").Inc(1) diff --git a/orchestrator/controller/merge/merge_test.go b/orchestrator/controller/merge/merge_test.go index 351f4b42..51e45b83 100644 --- a/orchestrator/controller/merge/merge_test.go +++ b/orchestrator/controller/merge/merge_test.go @@ -97,7 +97,7 @@ func TestController_Process_SuccessfulMerge(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: []string{reqID}, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 4, } change := entity.Change{URIs: []string{"github://o/r/1/sha"}} @@ -162,7 +162,7 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: requestIDs, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 1, } @@ -221,7 +221,7 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: []string{reqID}, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 3, } @@ -268,7 +268,7 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: []string{reqID}, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 1, } @@ -375,7 +375,7 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: []string{reqID}, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 1, } @@ -413,7 +413,7 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) { ID: batchID, Queue: "test-queue", Contains: []string{reqID}, - State: entity.BatchStateFinalizing, + State: entity.BatchStateMerging, Version: 2, } diff --git a/orchestrator/controller/speculate/speculate.go b/orchestrator/controller/speculate/speculate.go index 6d1a7df7..2e8145e8 100644 --- a/orchestrator/controller/speculate/speculate.go +++ b/orchestrator/controller/speculate/speculate.go @@ -27,8 +27,23 @@ import ( ) // Controller handles speculate queue messages. -// It consumes batches, performs speculation, and publishes to both build and merge stages. -// Implements consumer.Controller interface for integration with the consumer. +// +// Naive happy-path algorithm: assume every in-flight build will pass and +// treat batch.Dependencies + [batch.ID] as the single speculation chain. +// Per invocation, the controller advances the batch one step in the +// state machine: +// +// - Created or Scored → publish to build, transition to Speculating. +// - Speculating → if all deps are Succeeded, publish to merge and +// transition to Merging; otherwise no-op (or fail-fast if a dep is +// in a non-succeeding terminal state). +// - Merging → no-op (owned by the merge controller). +// - Terminal → re-fan-out to conclude for self-healing in case a +// prior publish was lost. +// +// The controller is re-triggered on every relevant downstream event +// (buildsignal, merge), so each call simply re-evaluates the current +// state and either advances or waits. type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope @@ -60,68 +75,181 @@ func NewController( } } -// Process processes a speculate delivery from the queue. -// Deserializes the batch, performs speculation, and publishes to both build and merge topics. +// Process advances a batch one step along the naive happy-path. // Returns nil to ack (success), or error to nack (retry). func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { c.metricsScope.Counter("received").Inc(1) msg := delivery.Message() - // Deserialize batch ID from payload bid, err := entity.BatchIDFromBytes(msg.Payload) if err != nil { c.metricsScope.Counter("deserialize_errors").Inc(1) return fmt.Errorf("failed to deserialize batch ID: %w", err) } - // Fetch batch from storage batch, err := c.store.GetBatchStore().Get(ctx, bid.ID) if err != nil { c.metricsScope.Counter("storage_errors").Inc(1) return fmt.Errorf("failed to get batch %s: %w", bid.ID, err) } - c.logger.Infow("received speculate event", + // Terminal state: re-fan-out to conclude for self-healing. The batch is + // already done; if a previous publish was lost, downstream stages will + // otherwise never reconcile. Re-publishing is safe because conclude is + // idempotent on the batch ID. + if batch.State.IsTerminal() { + c.metricsScope.Counter("self_heal_terminal").Inc(1) + return c.fanout(ctx, batch.ID, batch.Queue) + } + + // Merging is owned by the merge controller, which has its own self-heal. + if batch.State == entity.BatchStateMerging { + c.metricsScope.Counter("noop_merging").Inc(1) + return nil + } + + switch batch.State { + case entity.BatchStateCreated, entity.BatchStateScored: + return c.startSpeculation(ctx, batch) + case entity.BatchStateSpeculating: + return c.tryFinalize(ctx, batch) + default: + c.metricsScope.Counter("unexpected_state").Inc(1) + return fmt.Errorf("unexpected batch state %q for batch %s", batch.State, batch.ID) + } +} + +// startSpeculation kicks off CI for this batch on top of the speculative head +// (batch.Dependencies assumed to all pass), then transitions to Speculating. +func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) error { + c.logger.Infow("starting speculation", "batch_id", batch.ID, - "queue", batch.Queue, - "state", string(batch.State), - "version", batch.Version, - "attempt", delivery.Attempt(), - "partition_key", msg.PartitionKey, + "speculation_chain", append(append([]string{}, batch.Dependencies...), batch.ID), ) - // TODO: Add speculation logic - // - Speculative merge/rebase - // - Conflict detection - // - Publish to build only if speculation is in progress (needs CI verification) - // - Publish to merge only if speculation is complete and successful (ready to land) - - // Publish to build topic if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to build: %w", err) } - c.logger.Infow("published batch to build", - "batch_id", batch.ID, - "topic_key", consumer.TopicKeyBuild, - ) + // Optimistic CAS: if the version has already advanced (concurrent speculate), + // the next event will see the new state and behave correctly. + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateSpeculating); err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to update batch %s state to speculating: %w", batch.ID, err) + } + + c.metricsScope.Counter("started_speculation").Inc(1) + return nil +} + +// tryFinalize publishes to merge and transitions to Merging iff every +// dependency batch has reached Succeeded. If any dep is Failed/Cancelled, +// the batch cannot land on top of it; we mark it Failed and hand off to +// conclude so the request state and log are reconciled. Otherwise (some +// deps still in flight) it no-ops and waits for the next event. +// +// TODO: when a dependency fails we currently fail this batch outright. +// We will need to respeculate the failed paths — drop the failed dep +// from the chain and re-issue speculation for the surviving ordering(s) +// — instead of cascading the failure into requests that could still land. +func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error { + deps, err := c.fetchDependencies(ctx, batch) + if err != nil { + return err + } + + pending := make([]string, 0, len(deps)) + for _, d := range deps { + switch d.State { + case entity.BatchStateSucceeded: + // ok + case entity.BatchStateFailed, entity.BatchStateCancelled: + return c.failOnDependency(ctx, batch, d) + default: + pending = append(pending, d.ID) + } + } + + if len(pending) > 0 { + c.metricsScope.Counter("waiting_on_deps").Inc(1) + c.logger.Debugw("dependencies still in flight; waiting", + "batch_id", batch.ID, + "pending_dependency_ids", pending, + ) + return nil + } - // Publish to merge topic if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) return fmt.Errorf("failed to publish to merge: %w", err) } - c.logger.Infow("published batch to merge", + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateMerging); err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to update batch %s state to merging: %w", batch.ID, err) + } + + c.metricsScope.Counter("processed").Inc(1) + return nil +} + +// failOnDependency transitions a Speculating batch to Failed when one of its +// dependencies has reached a non-succeeding terminal state, then publishes to +// the conclude queue so the request store and request log get reconciled. +// Without this transition the batch would sit in Speculating forever — no +// downstream event ever fires for it again. +func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, dep entity.Batch) error { + c.metricsScope.Counter("dependency_failed").Inc(1) + c.logger.Warnw("dependency in non-succeeding terminal state; failing batch", "batch_id", batch.ID, - "topic_key", consumer.TopicKeyMerge, + "dependency_id", dep.ID, + "dependency_state", string(dep.State), ) - c.metricsScope.Counter("processed").Inc(1) + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil { + c.metricsScope.Counter("storage_errors").Inc(1) + return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err) + } - return nil // Success - message will be acked + if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil { + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to conclude: %w", err) + } + + return nil +} + +// fetchDependencies loads each batch in batch.Dependencies. Any storage error +// is surfaced as a retryable infra failure; missing dependencies should not +// happen in practice, but if one does it is treated the same as a transient +// fetch failure (i.e. the message is retried). +func (c *Controller) fetchDependencies(ctx context.Context, batch entity.Batch) ([]entity.Batch, error) { + deps := make([]entity.Batch, 0, len(batch.Dependencies)) + for _, depID := range batch.Dependencies { + d, err := c.store.GetBatchStore().Get(ctx, depID) + if err != nil { + c.metricsScope.Counter("dependency_fetch_errors").Inc(1) + return nil, fmt.Errorf("failed to get dependency batch %s of %s: %w", depID, batch.ID, err) + } + deps = append(deps, d) + } + return deps, nil +} + +// fanout re-publishes downstream events for a batch that has already reached +// a terminal state. Used for self-healing when a previous publish was lost: +// re-sending to conclude guarantees request-state reconciliation. +func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error { + if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil { + c.metricsScope.Counter("publish_errors").Inc(1) + return fmt.Errorf("failed to publish to conclude: %w", err) + } + return nil } // publish publishes a batch ID to the specified topic key. diff --git a/orchestrator/controller/speculate/speculate_test.go b/orchestrator/controller/speculate/speculate_test.go index dd651f67..997149bb 100644 --- a/orchestrator/controller/speculate/speculate_test.go +++ b/orchestrator/controller/speculate/speculate_test.go @@ -39,27 +39,19 @@ func batchIDPayload(t *testing.T, id string) []byte { return payload } -// testBatch returns a standard test batch for speculate tests. -func testBatch() entity.Batch { +// testBatch returns a standard test batch with the given state and dependencies. +func testBatch(state entity.BatchState, deps ...string) entity.Batch { return entity.Batch{ - ID: "test-queue/batch/1", - Queue: "test-queue", - State: entity.BatchStateCreated, - Version: 1, + ID: "test-queue/batch/1", + Queue: "test-queue", + Dependencies: deps, + State: state, + Version: 1, } } -// newMockStorage creates a MockStorage with a MockBatchStore that returns the given batch on Get. -func newMockStorage(ctrl *gomock.Controller, batch entity.Batch) *storagemock.MockStorage { - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() - - store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() - return store -} - -// newTestController creates a controller with test dependencies. +// newTestController wires a controller with a registry covering all topics the +// speculate controller may publish to. The publisher returns publishErr (or nil). func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock.MockStorage, publishErr error) *Controller { logger := zaptest.NewLogger(t).Sugar() scope := tally.NoopScope @@ -78,6 +70,8 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock []consumer.TopicConfig{ {Key: consumer.TopicKeyBuild, Name: "build", Queue: mockQ}, {Key: consumer.TopicKeyMerge, Name: "merge", Queue: mockQ}, + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + {Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}, }, ) require.NoError(t, err) @@ -85,75 +79,256 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, store *storagemock return NewController(logger, scope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") } +// runProcess builds a delivery for batchID and invokes Process once. +func runProcess(t *testing.T, ctrl *gomock.Controller, controller *Controller, batchID string) error { + msg := queue.NewMessage(batchID, batchIDPayload(t, batchID), "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return controller.Process(context.Background(), delivery) +} + func TestNewController(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) + store := storagemock.NewMockStorage(ctrl) controller := newTestController(t, ctrl, store, nil) require.NotNil(t, controller) assert.Equal(t, consumer.TopicKeySpeculate, controller.TopicKey()) assert.Equal(t, "orchestrator-speculate", controller.ConsumerGroup()) assert.Equal(t, "speculate", controller.Name()) + + var _ consumer.Controller = controller } -func TestController_Process_Success(t *testing.T) { +// startSpeculation: Created/Scored should publish to build and CAS to Speculating with newVersion = oldVersion+1. +func TestController_Process_StartSpeculation(t *testing.T) { + tests := []struct { + name string + state entity.BatchState + }{ + {name: "from_created", state: entity.BatchStateCreated}, + {name: "from_scored", state: entity.BatchStateScored}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(tt.state) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateSpeculating).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + }) + } +} + +// tryFinalize: Speculating with no deps should publish to merge and CAS to Merging. +func TestController_Process_FinalizeNoDeps(t *testing.T) { ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateSpeculating) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateMerging).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - batch := testBatch() - store := newMockStorage(ctrl, batch) controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() +// tryFinalize: Speculating with all deps Succeeded should publish to merge and CAS to Merging. +func TestController_Process_FinalizeAllDepsSucceeded(t *testing.T) { + ctrl := gomock.NewController(t) + depA := entity.Batch{ID: "test-queue/batch/0a", Queue: "test-queue", State: entity.BatchStateSucceeded, Version: 5} + depB := entity.Batch{ID: "test-queue/batch/0b", Queue: "test-queue", State: entity.BatchStateSucceeded, Version: 3} + batch := testBatch(entity.BatchStateSpeculating, depA.ID, depB.ID) - err := controller.Process(context.Background(), delivery) - require.NoError(t, err) + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().Get(gomock.Any(), depA.ID).Return(depA, nil) + batchStore.EXPECT().Get(gomock.Any(), depB.ID).Return(depB, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateMerging).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) } -func TestController_Process_StorageFailure(t *testing.T) { +// tryFinalize: Speculating with a dep still in flight is a no-op (no publish, no state change). +func TestController_Process_WaitingOnDep(t *testing.T) { ctrl := gomock.NewController(t) + dep := entity.Batch{ID: "test-queue/batch/0", Queue: "test-queue", State: entity.BatchStateSpeculating, Version: 1} + batch := testBatch(entity.BatchStateSpeculating, dep.ID) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().Get(gomock.Any(), dep.ID).Return(dep, nil) + // No UpdateState expected — gomock will fail if it is called. - mockBatchStore := storagemock.NewMockBatchStore(ctrl) - mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) store := storagemock.NewMockStorage(ctrl) - store.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} - msg := queue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() +// tryFinalize: a dep in a non-succeeding terminal state must fail the batch +// (Speculating → Failed) and publish to conclude. Otherwise the batch livelocks. +func TestController_Process_FailedDepFailsBatch(t *testing.T) { + tests := []struct { + name string + depState entity.BatchState + }{ + {name: "dep_failed", depState: entity.BatchStateFailed}, + {name: "dep_cancelled", depState: entity.BatchStateCancelled}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + dep := entity.Batch{ID: "test-queue/batch/0", Queue: "test-queue", State: tt.depState, Version: 1} + batch := testBatch(entity.BatchStateSpeculating, dep.ID) + batch.Contains = []string{"test-queue/req/1", "test-queue/req/2"} + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + batchStore.EXPECT().Get(gomock.Any(), dep.ID).Return(dep, nil) + batchStore.EXPECT().UpdateState(gomock.Any(), batch.ID, int32(1), int32(2), entity.BatchStateFailed).Return(nil) - err := controller.Process(context.Background(), delivery) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + }) + } +} + +// Merging is owned by the merge controller — speculate is a no-op for it. +func TestController_Process_MergingNoOp(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateMerging) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// Terminal states re-fan-out to conclude for self-healing in case a previous +// publish was lost. State must not change (no UpdateState). +func TestController_Process_TerminalSelfHeals(t *testing.T) { + for _, state := range []entity.BatchState{ + entity.BatchStateSucceeded, + entity.BatchStateFailed, + entity.BatchStateCancelled, + } { + t.Run(string(state), func(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(state) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected. + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + // Require exactly one publish to the conclude topic for self-healing. + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), "conclude", gomock.Any()).Return(nil).Times(1) + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry( + []consumer.TopicConfig{ + {Key: consumer.TopicKeyConclude, Name: "conclude", Queue: mockQ}, + }, + ) + require.NoError(t, err) + + logger := zaptest.NewLogger(t).Sugar() + controller := NewController(logger, tally.NoopScope, store, registry, consumer.TopicKeySpeculate, "orchestrator-speculate") + + require.NoError(t, runProcess(t, ctrl, controller, batch.ID)) + }) + } +} + +// An unrecognized state must surface as an error so the message is nacked +// instead of silently acked — silently acking would drop the event. +func TestController_Process_UnrecognizedState(t *testing.T) { + ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateUnknown) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + require.Error(t, runProcess(t, ctrl, controller, batch.ID)) +} + +// Storage failure on the primary batch fetch surfaces as an error and is not +// retryable per the controller default (plain fmt.Errorf). +func TestController_Process_StorageFailure(t *testing.T) { + ctrl := gomock.NewController(t) + + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.Batch{}, fmt.Errorf("db connection lost")) + + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() + + controller := newTestController(t, ctrl, store, nil) + err := runProcess(t, ctrl, controller, "test-queue/batch/1") require.Error(t, err) assert.False(t, errs.IsRetryable(err)) } +// Publish failure must not advance the batch state. func TestController_Process_PublishFailure(t *testing.T) { ctrl := gomock.NewController(t) + batch := testBatch(entity.BatchStateScored) - batch := testBatch() - store := newMockStorage(ctrl, batch) - controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + batchStore := storagemock.NewMockBatchStore(ctrl) + batchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) + // No UpdateState expected — publish fails before we get there. - msg := queue.NewMessage(batch.ID, batchIDPayload(t, batch.ID), batch.Queue, nil) - delivery := queuemock.NewMockDelivery(ctrl) - delivery.EXPECT().Message().Return(msg).AnyTimes() - delivery.EXPECT().Attempt().Return(1).AnyTimes() + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().GetBatchStore().Return(batchStore).AnyTimes() - err := controller.Process(context.Background(), delivery) - assert.Error(t, err) + controller := newTestController(t, ctrl, store, fmt.Errorf("publish failed")) + require.Error(t, runProcess(t, ctrl, controller, batch.ID)) } -func TestController_InterfaceImplementation(t *testing.T) { +// Malformed payload: deserialize error. +func TestController_Process_BadPayload(t *testing.T) { ctrl := gomock.NewController(t) - batch := testBatch() - store := newMockStorage(ctrl, batch) + store := storagemock.NewMockStorage(ctrl) controller := newTestController(t, ctrl, store, nil) - var _ consumer.Controller = controller + msg := queue.NewMessage("anything", []byte("not-json"), "test-queue", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + + require.Error(t, controller.Process(context.Background(), delivery)) }