Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type TopicKey string
const (
// TopicKeyStart is the pipeline stage where new requests arrive from the gateway.
TopicKeyStart TopicKey = "start"
// TopicKeyCancel is the pipeline stage where cancellation requests arrive from the gateway.
TopicKeyCancel TopicKey = "cancel"
// TopicKeyValidate is the pipeline stage where requests are published for validation.
TopicKeyValidate TopicKey = "validate"
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.
Expand Down
9 changes: 8 additions & 1 deletion core/request/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ import (

// PublishLog publishes a single request log entry to the log topic for async persistence.
// The partitionKey ensures ordering of log entries for the same request; typically set to the request ID.
//
// The message ID is scoped to (requestID, status) so that the queue's
// (topic, partition_key, id) unique index dedupes retries of the same logical
// log event (same delivery re-processed) without rejecting distinct statuses
// for the same request (e.g. "started" emitted by the start controller and
// "cancelled" emitted later by the cancel controller).
func PublishLog(ctx context.Context, registry consumer.TopicRegistry, logEntry entity.RequestLog, partitionKey string) error {
payload, err := logEntry.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize request log: %w", err)
}

msg := entityqueue.NewMessage(logEntry.RequestID, payload, partitionKey, nil)
msgID := fmt.Sprintf("%s/%s", logEntry.RequestID, logEntry.Status)
msg := entityqueue.NewMessage(msgID, payload, partitionKey, nil)

q, ok := registry.Queue(consumer.TopicKeyLog)
if !ok {
Expand Down
48 changes: 48 additions & 0 deletions core/request/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,51 @@ func TestPublishBatchLogs_Empty(t *testing.T) {
err := PublishBatchLogs(context.Background(), registry, nil, entity.RequestStatusScored, nil)
require.NoError(t, err)
}

// TestPublishLog_MessageIDScopedByStatus locks in the queue-id scheme:
// distinct statuses for the same request must produce distinct message IDs so
// the queue's (topic, partition_key, id) uniqueness check does not reject the
// second publish, while the same status emitted twice (retry of the same
// delivery) must produce the same message ID so the queue dedupes it.
//
// Regression test for the duplicate-key crash where the orchestrator cancel
// controller could not emit a `cancelled` log entry because the start
// controller had already published `started` for the same request.
func TestPublishLog_MessageIDScopedByStatus(t *testing.T) {
ctrl := gomock.NewController(t)

var ids []string
mockPub := queuemock.NewMockPublisher(ctrl)
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, msg queue.Message) error {
ids = append(ids, msg.ID)
return nil
},
).AnyTimes()
mockQ := queuemock.NewMockQueue(ctrl)
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
)
require.NoError(t, err)

// Three distinct statuses for the same request.
for _, st := range []entity.RequestStatus{
entity.RequestStatusStarted,
entity.RequestStatusCancelling,
entity.RequestStatusCancelled,
} {
require.NoError(t, PublishLog(context.Background(), registry,
entity.NewRequestLog("req/1", st, 0, "", nil), "req/1"))
}
// Re-emit "started" to simulate a retry of the same delivery — must reuse the same ID.
require.NoError(t, PublishLog(context.Background(), registry,
entity.NewRequestLog("req/1", entity.RequestStatusStarted, 0, "", nil), "req/1"))

require.Equal(t, []string{
"req/1/started",
"req/1/cancelling",
"req/1/cancelled",
"req/1/started",
}, ids)
}
2 changes: 2 additions & 0 deletions entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"batch.go",
"batch_dependent.go",
"build.go",
"cancel_request.go",
"change_provider.go",
"change_record.go",
"land_request.go",
Expand All @@ -23,6 +24,7 @@ go_test(
srcs = [
"batch_test.go",
"build_test.go",
"cancel_request_test.go",
"land_request_test.go",
"request_log_test.go",
"request_test.go",
Expand Down
20 changes: 20 additions & 0 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,24 @@ const (
BatchStateFailed BatchState = "failed"
// BatchStateScored is the state of a batch that has been scored for build success probability.
BatchStateScored BatchState = "scored"
// BatchStateCancelling is the non-terminal intent state set when a cancel has been requested but the
// batch has not yet been transitioned to BatchStateCancelled. A batch in this state may still reach
// BatchStateSucceeded or BatchStateFailed if a concurrent merge wins the race (e.g. the push had
// already completed before the cancel CAS observed the batch); those terminal states prevail.
// Forward-progress controllers must treat this state as halted (no new work). The speculate
// controller owns the transition to the terminal BatchStateCancelled and the downstream fan-out
// (cancelling in-flight builds, respeculating dependents, publishing to conclude).
BatchStateCancelling BatchState = "cancelling"
// BatchStateCancelled is the terminal state of a batch that was cancelled before completion.
BatchStateCancelled BatchState = "cancelled"
)

// IsTerminal returns true if the batch state is a terminal state.
// Terminal states are states from which no further transitions are possible.
// BatchStateCancelling is intentionally excluded: cancellation is best-effort and a Cancelling batch
// may still transition to BatchStateSucceeded or BatchStateFailed before it reaches BatchStateCancelled.
// Callers that want to gate forward progress (and treat Cancelling as halted) should use
// IsBatchStateHalted instead.
func (s BatchState) IsTerminal() bool {
switch s {
case BatchStateSucceeded, BatchStateFailed, BatchStateCancelled:
Expand All @@ -49,6 +61,14 @@ func (s BatchState) IsTerminal() bool {
}
}

// IsBatchStateHalted returns true if the batch is either terminal or in the process of being cancelled.
// Forward-progress controllers (score, build, buildsignal, speculate, merge) use this to short-circuit
// work for batches that the user has asked to cancel — even though Cancelling is non-terminal, no
// further pipeline work should start (cancel will write the terminal state and fan out).
func IsBatchStateHalted(s BatchState) bool {
return s.IsTerminal() || s == BatchStateCancelling
}

// Batch represents a group of requests to land (merge into target branch of the source control repository).
type Batch struct {
// ID is the globally unique identifier for the batch. Format: "<queue>/batch/<counter_value>".
Expand Down
38 changes: 38 additions & 0 deletions entity/cancel_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import "encoding/json"

// CancelRequest represents a cancellation request sent over the queue from the gateway to the orchestrator.
// It identifies the request to cancel by its ID and carries an optional human-readable reason for observability.
type CancelRequest struct {
// ID is the globally unique identifier of the request to cancel. Format: "<queue>/<counter_value>".
ID string `json:"id"`
// Reason is an optional free-form explanation of why the cancellation was requested.
Reason string `json:"reason"`
}

// ToBytes serializes the CancelRequest to JSON bytes for queue message payload.
func (r CancelRequest) ToBytes() ([]byte, error) {
return json.Marshal(r)
}

// CancelRequestFromBytes deserializes a CancelRequest from JSON bytes.
func CancelRequestFromBytes(data []byte) (CancelRequest, error) {
var req CancelRequest
err := json.Unmarshal(data, &req)
return req, err
}
55 changes: 55 additions & 0 deletions entity/cancel_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package entity

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCancelRequest_SerializationRoundTrip(t *testing.T) {
tests := []struct {
name string
req CancelRequest
}{
{
name: "with reason",
req: CancelRequest{ID: "queue1/100", Reason: "obsolete change"},
},
{
name: "without reason",
req: CancelRequest{ID: "queue2/200"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := tt.req.ToBytes()
require.NoError(t, err)

deserialized, err := CancelRequestFromBytes(data)
require.NoError(t, err)

assert.Equal(t, tt.req, deserialized)
})
}
}

func TestCancelRequestFromBytes_InvalidJSON(t *testing.T) {
_, err := CancelRequestFromBytes([]byte(`{not json`))
assert.Error(t, err)
}
29 changes: 27 additions & 2 deletions entity/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,42 @@ const (
RequestStateStarted RequestState = "started"
// RequestStateValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully.
RequestStateValidated RequestState = "validated"
// RequestStateBatched indicates that the request has been claimed by the batch controller and enrolled in a
// batch. The CAS-write of this state by the batch controller is the serialization point between batch and
// cancel: the batch controller transitions Validated → Batched immediately before persisting the new batch,
// so any concurrent cancel that has already transitioned the request to Cancelling will lose the CAS and
// abandon the batch. From this state forward, the request's terminal outcome is owned by the batch it is
// enrolled in (via conclude), not by the cancel controller's request-only fast path.
RequestStateBatched RequestState = "batched"
// RequestStateProcessing is the state of a land request that is being processed.
RequestStateProcessing RequestState = "processing"
// RequestStateLanded is the state of a land request that has been successfully processed and landed. This is the final state.
RequestStateLanded RequestState = "landed"
// RequestStateError is the state of a land request that has encountered an error. This is the final state.
RequestStateError RequestState = "error"
// RequestStateCancelling is the non-terminal intent state set when the user has requested cancellation but the
// request has not yet been transitioned to RequestStateCancelled. A request in this state may still reach
// RequestStateLanded or RequestStateError if a concurrent merge or failure wins the race; those terminal
// states prevail. Forward-progress controllers must treat this state the same as terminal (i.e. do not start
// any new work on the request).
RequestStateCancelling RequestState = "cancelling"
// RequestStateCancelled is the state of a land request that was cancelled by the user before it could land. This is the final state.
RequestStateCancelled RequestState = "cancelled"
)

// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed or error).
// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed, error, or cancelled).
// RequestStateCancelling is intentionally excluded: cancellation is best-effort and a Cancelling request may still
// transition to Landed or Error before it reaches Cancelled. Callers that want to gate forward progress (and treat
// Cancelling as halted) should use IsRequestStateHalted instead.
func IsRequestStateTerminal(s RequestState) bool {
return s == RequestStateLanded || s == RequestStateError
return s == RequestStateLanded || s == RequestStateError || s == RequestStateCancelled
}

// IsRequestStateHalted returns true if the request is either terminal or in the process of being cancelled.
// Forward-progress controllers (validate, batch, ...) use this to short-circuit work for requests that the
// user has asked to cancel — even though Cancelling is non-terminal, no further pipeline work should start.
func IsRequestStateHalted(s RequestState) bool {
return IsRequestStateTerminal(s) || s == RequestStateCancelling
}

// Change represents a code change identified by URIs from a code change provider (e.g., GitHub Pull Request, Phabricator Diff).
Expand Down
10 changes: 10 additions & 0 deletions entity/request_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ const (

// RequestStatusError indicates that the request has encountered an error. It corresponds to the RequestStateError state.
RequestStatusError RequestStatus = "error"

// RequestStatusCancelling indicates that the user has requested cancellation but the request has not yet transitioned
// to the RequestStateCancelled state. Cancellation is best-effort: a request that has already been merged or that
// races to completion before the cancel propagates through the pipeline may still land. Observers should treat this
// as intent only and rely on RequestStatusCancelled (or RequestStatusLanded) for the terminal outcome. Emitted by
// the gateway when the Cancel RPC is received.
RequestStatusCancelling RequestStatus = "cancelling"

// RequestStatusCancelled indicates that the request was cancelled by the user before it could land. It corresponds to the RequestStateCancelled state.
RequestStatusCancelled RequestStatus = "cancelled"
)

// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status
Expand Down
42 changes: 42 additions & 0 deletions entity/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,48 @@ func TestRequestFromBytes_EmptyData(t *testing.T) {
assert.Equal(t, int32(0), req.Version)
}

func TestIsRequestStateTerminal(t *testing.T) {
tests := []struct {
state RequestState
terminal bool
}{
{RequestStateUnknown, false},
{RequestStateStarted, false},
{RequestStateValidated, false},
{RequestStateProcessing, false},
{RequestStateCancelling, false}, // intent only — not terminal
{RequestStateLanded, true},
{RequestStateError, true},
{RequestStateCancelled, true},
}
for _, tt := range tests {
t.Run(string(tt.state), func(t *testing.T) {
assert.Equal(t, tt.terminal, IsRequestStateTerminal(tt.state))
})
}
}

func TestIsRequestStateHalted(t *testing.T) {
tests := []struct {
state RequestState
halted bool
}{
{RequestStateUnknown, false},
{RequestStateStarted, false},
{RequestStateValidated, false},
{RequestStateProcessing, false},
{RequestStateCancelling, true}, // intent halts forward progress
{RequestStateLanded, true},
{RequestStateError, true},
{RequestStateCancelled, true},
}
for _, tt := range tests {
t.Run(string(tt.state), func(t *testing.T) {
assert.Equal(t, tt.halted, IsRequestStateHalted(tt.state))
})
}
}

func TestRequest_SerializationRoundTrip(t *testing.T) {
tests := []struct {
name string
Expand Down
2 changes: 2 additions & 0 deletions example/server/gateway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ go_library(
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//reflection",
"@org_golang_google_grpc//status",
"@org_uber_go_zap//:zap",
],
)
Expand Down
Loading
Loading