Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ const (
BatchStateCreated BatchState = "created"
// BatchStateSpeculating is the state of a batch that is undergoing speculative execution.
BatchStateSpeculating BatchState = "speculating"
// BatchStateFinalizing is the state of a batch that is being finalized after speculative execution.
BatchStateFinalizing BatchState = "finalizing"
// BatchStateMerging is the state of a batch that is being merged after speculative execution.
BatchStateMerging BatchState = "merging"
// BatchStateSucceeded is the terminal state of a batch that has been successfully landed.
BatchStateSucceeded BatchState = "succeeded"
// BatchStateFailed is the terminal state of a batch that has failed.
Expand Down
2 changes: 1 addition & 1 deletion entity/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBatchState_IsTerminal(t *testing.T) {
{name: "unknown", state: BatchStateUnknown, terminal: false},
{name: "created", state: BatchStateCreated, terminal: false},
{name: "speculating", state: BatchStateSpeculating, terminal: false},
{name: "finalizing", state: BatchStateFinalizing, terminal: false},
{name: "merging", state: BatchStateMerging, terminal: false},
{name: "succeeded", state: BatchStateSucceeded, terminal: true},
{name: "failed", state: BatchStateFailed, terminal: true},
{name: "cancelled", state: BatchStateCancelled, terminal: true},
Expand Down
2 changes: 1 addition & 1 deletion orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
entity.BatchStateCreated,
entity.BatchStateSpeculating,
entity.BatchStateFinalizing,
entity.BatchStateMerging,
})
if err != nil {
c.metricsScope.Counter("batch_store_errors").Inc(1)
Expand Down
12 changes: 6 additions & 6 deletions orchestrator/controller/merge/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestController_Process_SuccessfulMerge(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: []string{reqID},
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 4,
}
change := entity.Change{URIs: []string{"github://o/r/1/sha"}}
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestController_Process_PassesAllChangesInBatchOrder(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: requestIDs,
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 1,
}

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestController_Process_PushConflictMarksBatchFailed(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: []string{reqID},
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 3,
}

Expand Down Expand Up @@ -268,7 +268,7 @@ func TestController_Process_PushInfraFailureReturnsError(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: []string{reqID},
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 1,
}

Expand Down Expand Up @@ -375,7 +375,7 @@ func TestController_Process_RequestStoreFailurePropagates(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: []string{reqID},
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 1,
}

Expand Down Expand Up @@ -413,7 +413,7 @@ func TestController_Process_PublishFailureSurfaces(t *testing.T) {
ID: batchID,
Queue: "test-queue",
Contains: []string{reqID},
State: entity.BatchStateFinalizing,
State: entity.BatchStateMerging,
Version: 2,
}

Expand Down
184 changes: 156 additions & 28 deletions orchestrator/controller/speculate/speculate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,23 @@ import (
)

// Controller handles speculate queue messages.
// It consumes batches, performs speculation, and publishes to both build and merge stages.
// Implements consumer.Controller interface for integration with the consumer.
//
// Naive happy-path algorithm: assume every in-flight build will pass and
// treat batch.Dependencies + [batch.ID] as the single speculation chain.
// Per invocation, the controller advances the batch one step in the
// state machine:
//
// - Created or Scored → publish to build, transition to Speculating.
// - Speculating → if all deps are Succeeded, publish to merge and
// transition to Merging; otherwise no-op (or fail-fast if a dep is
// in a non-succeeding terminal state).
// - Merging → no-op (owned by the merge controller).
// - Terminal → re-fan-out to conclude for self-healing in case a
// prior publish was lost.
//
// The controller is re-triggered on every relevant downstream event
// (buildsignal, merge), so each call simply re-evaluates the current
// state and either advances or waits.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
Expand Down Expand Up @@ -60,68 +75,181 @@ func NewController(
}
}

// Process processes a speculate delivery from the queue.
// Deserializes the batch, performs speculation, and publishes to both build and merge topics.
// Process advances a batch one step along the naive happy-path.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)

msg := delivery.Message()

// Deserialize batch ID from payload
bid, err := entity.BatchIDFromBytes(msg.Payload)
if err != nil {
c.metricsScope.Counter("deserialize_errors").Inc(1)
return fmt.Errorf("failed to deserialize batch ID: %w", err)
}

// Fetch batch from storage
batch, err := c.store.GetBatchStore().Get(ctx, bid.ID)
if err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to get batch %s: %w", bid.ID, err)
}

c.logger.Infow("received speculate event",
// Terminal state: re-fan-out to conclude for self-healing. The batch is
// already done; if a previous publish was lost, downstream stages will
// otherwise never reconcile. Re-publishing is safe because conclude is
// idempotent on the batch ID.
if batch.State.IsTerminal() {
c.metricsScope.Counter("self_heal_terminal").Inc(1)
return c.fanout(ctx, batch.ID, batch.Queue)
}

// Merging is owned by the merge controller, which has its own self-heal.
if batch.State == entity.BatchStateMerging {
c.metricsScope.Counter("noop_merging").Inc(1)
return nil
}

switch batch.State {
case entity.BatchStateCreated, entity.BatchStateScored:
return c.startSpeculation(ctx, batch)
case entity.BatchStateSpeculating:
return c.tryFinalize(ctx, batch)
default:
c.metricsScope.Counter("unexpected_state").Inc(1)
return fmt.Errorf("unexpected batch state %q for batch %s", batch.State, batch.ID)
}
}

// startSpeculation kicks off CI for this batch on top of the speculative head
// (batch.Dependencies assumed to all pass), then transitions to Speculating.
func (c *Controller) startSpeculation(ctx context.Context, batch entity.Batch) error {
c.logger.Infow("starting speculation",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want to use the context aware logger for tracing/ maybe we deal it with separately to track request journey in the logs end to end

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not today, I just want to have something working first

"batch_id", batch.ID,
"queue", batch.Queue,
"state", string(batch.State),
"version", batch.Version,
"attempt", delivery.Attempt(),
"partition_key", msg.PartitionKey,
"speculation_chain", append(append([]string{}, batch.Dependencies...), batch.ID),
)

// TODO: Add speculation logic
// - Speculative merge/rebase
// - Conflict detection
// - Publish to build only if speculation is in progress (needs CI verification)
// - Publish to merge only if speculation is complete and successful (ready to land)

// Publish to build topic
if err := c.publish(ctx, consumer.TopicKeyBuild, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to build: %w", err)
}

c.logger.Infow("published batch to build",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyBuild,
)
// Optimistic CAS: if the version has already advanced (concurrent speculate),
// the next event will see the new state and behave correctly.
newVersion := batch.Version + 1
if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateSpeculating); err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to update batch %s state to speculating: %w", batch.ID, err)
}

c.metricsScope.Counter("started_speculation").Inc(1)
return nil
}

// tryFinalize publishes to merge and transitions to Merging iff every
// dependency batch has reached Succeeded. If any dep is Failed/Cancelled,
// the batch cannot land on top of it; we mark it Failed and hand off to
// conclude so the request state and log are reconciled. Otherwise (some
// deps still in flight) it no-ops and waits for the next event.
//
// TODO: when a dependency fails we currently fail this batch outright.
// We will need to respeculate the failed paths — drop the failed dep
// from the chain and re-issue speculation for the surviving ordering(s)
// — instead of cascading the failure into requests that could still land.
func (c *Controller) tryFinalize(ctx context.Context, batch entity.Batch) error {
deps, err := c.fetchDependencies(ctx, batch)
if err != nil {
return err
}

pending := make([]string, 0, len(deps))
for _, d := range deps {
switch d.State {
case entity.BatchStateSucceeded:
// ok
case entity.BatchStateFailed, entity.BatchStateCancelled:
return c.failOnDependency(ctx, batch, d)
default:
pending = append(pending, d.ID)
}
}

if len(pending) > 0 {
c.metricsScope.Counter("waiting_on_deps").Inc(1)
c.logger.Debugw("dependencies still in flight; waiting",
"batch_id", batch.ID,
"pending_dependency_ids", pending,
)
return nil
}

// Publish to merge topic
if err := c.publish(ctx, consumer.TopicKeyMerge, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to merge: %w", err)
}

c.logger.Infow("published batch to merge",
newVersion := batch.Version + 1
if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateMerging); err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to update batch %s state to merging: %w", batch.ID, err)
}

c.metricsScope.Counter("processed").Inc(1)
return nil
}

// failOnDependency transitions a Speculating batch to Failed when one of its
// dependencies has reached a non-succeeding terminal state, then publishes to
// the conclude queue so the request store and request log get reconciled.
// Without this transition the batch would sit in Speculating forever — no
// downstream event ever fires for it again.
func (c *Controller) failOnDependency(ctx context.Context, batch entity.Batch, dep entity.Batch) error {
c.metricsScope.Counter("dependency_failed").Inc(1)
c.logger.Warnw("dependency in non-succeeding terminal state; failing batch",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyMerge,
"dependency_id", dep.ID,
"dependency_state", string(dep.State),
)

c.metricsScope.Counter("processed").Inc(1)
newVersion := batch.Version + 1
if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newVersion, entity.BatchStateFailed); err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to update batch %s state to failed: %w", batch.ID, err)
}

return nil // Success - message will be acked
if err := c.publish(ctx, consumer.TopicKeyConclude, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to conclude: %w", err)
}

return nil
}

// fetchDependencies loads each batch in batch.Dependencies. Any storage error
// is surfaced as a retryable infra failure; missing dependencies should not
// happen in practice, but if one does it is treated the same as a transient
// fetch failure (i.e. the message is retried).
func (c *Controller) fetchDependencies(ctx context.Context, batch entity.Batch) ([]entity.Batch, error) {
deps := make([]entity.Batch, 0, len(batch.Dependencies))
for _, depID := range batch.Dependencies {
d, err := c.store.GetBatchStore().Get(ctx, depID)
if err != nil {
c.metricsScope.Counter("dependency_fetch_errors").Inc(1)
return nil, fmt.Errorf("failed to get dependency batch %s of %s: %w", depID, batch.ID, err)
}
deps = append(deps, d)
}
return deps, nil
}

// fanout re-publishes downstream events for a batch that has already reached
// a terminal state. Used for self-healing when a previous publish was lost:
// re-sending to conclude guarantees request-state reconciliation.
func (c *Controller) fanout(ctx context.Context, batchID, partitionKey string) error {
if err := c.publish(ctx, consumer.TopicKeyConclude, batchID, partitionKey); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to conclude: %w", err)
}
return nil
}

// publish publishes a batch ID to the specified topic key.
Expand Down
Loading
Loading