Skip to content

Commit 92cef12

Browse files
sbalabanovsbalabanov-zzclaude
authored
feat(cancel): add Cancel RPC + orchestrator cancel pipeline (#157)
## Summary - New gateway `Cancel(sqid)` RPC that publishes a `CancelRequest` to `TopicKeyCancel`; processing is asynchronous and cancellation is not guaranteed (a request that has already merged, or that races to completion, may still land or error — callers should check the actual outcome via the separate status / request-log API). - New orchestrator cancel controller with a two-step transition: `RequestStateCancelling` (non-terminal intent) → either `RequestStateCancelled` directly (un-batched) or via batch cancel + conclude fan-out (batched). The Cancelling intent is non-terminal so concurrent merge / failure may still win and prevail with `Landed` / `Error`. - Every stage controller (validate, batch, score, build, buildsignal, merge, conclude, speculate) is now cancellation-aware via a single `IsRequestStateHalted` / batch-terminal short-circuit. Speculate treats cancelled deps as out-of-the-way (drop and continue) instead of cascade-failing; conclude maps `BatchStateCancelled` → `RequestStateCancelled`. - Cancel controller does not classify storage errors itself — `storage.ErrNotFound` and `storage.ErrVersionMismatch` propagate as-is so the base controller layer can map them to retryable, matching the pattern in every other orchestrator controller. ## Test plan - [ ] `make fmt && make lint && make check-tidy && make check-gazelle && make check-mocks` clean - [ ] `bazel test //... --test_tag_filters=-integration,-e2e` (32 unit-test targets) all pass - [ ] Manual e2e via `make local-start`: Land → Cancel before batch → confirm `RequestStateCancelled` and no batch created - [ ] Manual e2e: Land → wait for batch → Cancel → confirm batch goes `Cancelled`, contained requests reconcile to `Cancelled`, dependent batches respeculate - [ ] Integration test exercising both paths through real MySQL extension ## Out of scope (deferred) - Re-enqueuing non-cancelled requests from a cancelled batch (TODO in cancel controller). - Cancelling a real CI build via external API (no-op today; hook is the existing `BuildStatusCancelled` write). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: sergeyb <sergeyb@uber.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent e54cbaa commit 92cef12

47 files changed

Lines changed: 3025 additions & 134 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/consumer/registry.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type TopicKey string
3030
const (
3131
// TopicKeyStart is the pipeline stage where new requests arrive from the gateway.
3232
TopicKeyStart TopicKey = "start"
33+
// TopicKeyCancel is the pipeline stage where cancellation requests arrive from the gateway.
34+
TopicKeyCancel TopicKey = "cancel"
3335
// TopicKeyValidate is the pipeline stage where requests are published for validation.
3436
TopicKeyValidate TopicKey = "validate"
3537
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.

core/request/log.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@ import (
2525

2626
// PublishLog publishes a single request log entry to the log topic for async persistence.
2727
// The partitionKey ensures ordering of log entries for the same request; typically set to the request ID.
28+
//
29+
// The message ID is scoped to (requestID, status) so that the queue's
30+
// (topic, partition_key, id) unique index dedupes retries of the same logical
31+
// log event (same delivery re-processed) without rejecting distinct statuses
32+
// for the same request (e.g. "started" emitted by the start controller and
33+
// "cancelled" emitted later by the cancel controller).
2834
func PublishLog(ctx context.Context, registry consumer.TopicRegistry, logEntry entity.RequestLog, partitionKey string) error {
2935
payload, err := logEntry.ToBytes()
3036
if err != nil {
3137
return fmt.Errorf("failed to serialize request log: %w", err)
3238
}
3339

34-
msg := entityqueue.NewMessage(logEntry.RequestID, payload, partitionKey, nil)
40+
msgID := fmt.Sprintf("%s/%s", logEntry.RequestID, logEntry.Status)
41+
msg := entityqueue.NewMessage(msgID, payload, partitionKey, nil)
3542

3643
q, ok := registry.Queue(consumer.TopicKeyLog)
3744
if !ok {

core/request/log_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,51 @@ func TestPublishBatchLogs_Empty(t *testing.T) {
113113
err := PublishBatchLogs(context.Background(), registry, nil, entity.RequestStatusScored, nil)
114114
require.NoError(t, err)
115115
}
116+
117+
// TestPublishLog_MessageIDScopedByStatus locks in the queue-id scheme:
118+
// distinct statuses for the same request must produce distinct message IDs so
119+
// the queue's (topic, partition_key, id) uniqueness check does not reject the
120+
// second publish, while the same status emitted twice (retry of the same
121+
// delivery) must produce the same message ID so the queue dedupes it.
122+
//
123+
// Regression test for the duplicate-key crash where the orchestrator cancel
124+
// controller could not emit a `cancelled` log entry because the start
125+
// controller had already published `started` for the same request.
126+
func TestPublishLog_MessageIDScopedByStatus(t *testing.T) {
127+
ctrl := gomock.NewController(t)
128+
129+
var ids []string
130+
mockPub := queuemock.NewMockPublisher(ctrl)
131+
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
132+
func(_ context.Context, _ string, msg queue.Message) error {
133+
ids = append(ids, msg.ID)
134+
return nil
135+
},
136+
).AnyTimes()
137+
mockQ := queuemock.NewMockQueue(ctrl)
138+
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
139+
registry, err := consumer.NewTopicRegistry(
140+
[]consumer.TopicConfig{{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ}},
141+
)
142+
require.NoError(t, err)
143+
144+
// Three distinct statuses for the same request.
145+
for _, st := range []entity.RequestStatus{
146+
entity.RequestStatusStarted,
147+
entity.RequestStatusCancelling,
148+
entity.RequestStatusCancelled,
149+
} {
150+
require.NoError(t, PublishLog(context.Background(), registry,
151+
entity.NewRequestLog("req/1", st, 0, "", nil), "req/1"))
152+
}
153+
// Re-emit "started" to simulate a retry of the same delivery — must reuse the same ID.
154+
require.NoError(t, PublishLog(context.Background(), registry,
155+
entity.NewRequestLog("req/1", entity.RequestStatusStarted, 0, "", nil), "req/1"))
156+
157+
require.Equal(t, []string{
158+
"req/1/started",
159+
"req/1/cancelling",
160+
"req/1/cancelled",
161+
"req/1/started",
162+
}, ids)
163+
}

entity/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ go_library(
66
"batch.go",
77
"batch_dependent.go",
88
"build.go",
9+
"cancel_request.go",
910
"change_provider.go",
1011
"change_record.go",
1112
"land_request.go",
@@ -23,6 +24,7 @@ go_test(
2324
srcs = [
2425
"batch_test.go",
2526
"build_test.go",
27+
"cancel_request_test.go",
2628
"land_request_test.go",
2729
"request_log_test.go",
2830
"request_test.go",

entity/batch.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,24 @@ const (
3434
BatchStateFailed BatchState = "failed"
3535
// BatchStateScored is the state of a batch that has been scored for build success probability.
3636
BatchStateScored BatchState = "scored"
37+
// BatchStateCancelling is the non-terminal intent state set when a cancel has been requested but the
38+
// batch has not yet been transitioned to BatchStateCancelled. A batch in this state may still reach
39+
// BatchStateSucceeded or BatchStateFailed if a concurrent merge wins the race (e.g. the push had
40+
// already completed before the cancel CAS observed the batch); those terminal states prevail.
41+
// Forward-progress controllers must treat this state as halted (no new work). The speculate
42+
// controller owns the transition to the terminal BatchStateCancelled and the downstream fan-out
43+
// (cancelling in-flight builds, respeculating dependents, publishing to conclude).
44+
BatchStateCancelling BatchState = "cancelling"
3745
// BatchStateCancelled is the terminal state of a batch that was cancelled before completion.
3846
BatchStateCancelled BatchState = "cancelled"
3947
)
4048

4149
// IsTerminal returns true if the batch state is a terminal state.
4250
// Terminal states are states from which no further transitions are possible.
51+
// BatchStateCancelling is intentionally excluded: cancellation is best-effort and a Cancelling batch
52+
// may still transition to BatchStateSucceeded or BatchStateFailed before it reaches BatchStateCancelled.
53+
// Callers that want to gate forward progress (and treat Cancelling as halted) should use
54+
// IsBatchStateHalted instead.
4355
func (s BatchState) IsTerminal() bool {
4456
switch s {
4557
case BatchStateSucceeded, BatchStateFailed, BatchStateCancelled:
@@ -49,6 +61,14 @@ func (s BatchState) IsTerminal() bool {
4961
}
5062
}
5163

64+
// IsBatchStateHalted returns true if the batch is either terminal or in the process of being cancelled.
65+
// Forward-progress controllers (score, build, buildsignal, speculate, merge) use this to short-circuit
66+
// work for batches that the user has asked to cancel — even though Cancelling is non-terminal, no
67+
// further pipeline work should start (cancel will write the terminal state and fan out).
68+
func IsBatchStateHalted(s BatchState) bool {
69+
return s.IsTerminal() || s == BatchStateCancelling
70+
}
71+
5272
// Batch represents a group of requests to land (merge into target branch of the source control repository).
5373
type Batch struct {
5474
// ID is the globally unique identifier for the batch. Format: "<queue>/batch/<counter_value>".

entity/cancel_request.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import "encoding/json"
18+
19+
// CancelRequest represents a cancellation request sent over the queue from the gateway to the orchestrator.
20+
// It identifies the request to cancel by its ID and carries an optional human-readable reason for observability.
21+
type CancelRequest struct {
22+
// ID is the globally unique identifier of the request to cancel. Format: "<queue>/<counter_value>".
23+
ID string `json:"id"`
24+
// Reason is an optional free-form explanation of why the cancellation was requested.
25+
Reason string `json:"reason"`
26+
}
27+
28+
// ToBytes serializes the CancelRequest to JSON bytes for queue message payload.
29+
func (r CancelRequest) ToBytes() ([]byte, error) {
30+
return json.Marshal(r)
31+
}
32+
33+
// CancelRequestFromBytes deserializes a CancelRequest from JSON bytes.
34+
func CancelRequestFromBytes(data []byte) (CancelRequest, error) {
35+
var req CancelRequest
36+
err := json.Unmarshal(data, &req)
37+
return req, err
38+
}

entity/cancel_request_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright (c) 2025 Uber Technologies, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package entity
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
)
23+
24+
func TestCancelRequest_SerializationRoundTrip(t *testing.T) {
25+
tests := []struct {
26+
name string
27+
req CancelRequest
28+
}{
29+
{
30+
name: "with reason",
31+
req: CancelRequest{ID: "queue1/100", Reason: "obsolete change"},
32+
},
33+
{
34+
name: "without reason",
35+
req: CancelRequest{ID: "queue2/200"},
36+
},
37+
}
38+
39+
for _, tt := range tests {
40+
t.Run(tt.name, func(t *testing.T) {
41+
data, err := tt.req.ToBytes()
42+
require.NoError(t, err)
43+
44+
deserialized, err := CancelRequestFromBytes(data)
45+
require.NoError(t, err)
46+
47+
assert.Equal(t, tt.req, deserialized)
48+
})
49+
}
50+
}
51+
52+
func TestCancelRequestFromBytes_InvalidJSON(t *testing.T) {
53+
_, err := CancelRequestFromBytes([]byte(`{not json`))
54+
assert.Error(t, err)
55+
}

entity/request.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,42 @@ const (
4040
RequestStateStarted RequestState = "started"
4141
// RequestStateValidated indicates that the request has been validated (duplicate check, merge check etc.) successfully.
4242
RequestStateValidated RequestState = "validated"
43+
// RequestStateBatched indicates that the request has been claimed by the batch controller and enrolled in a
44+
// batch. The CAS-write of this state by the batch controller is the serialization point between batch and
45+
// cancel: the batch controller transitions Validated → Batched immediately before persisting the new batch,
46+
// so any concurrent cancel that has already transitioned the request to Cancelling will lose the CAS and
47+
// abandon the batch. From this state forward, the request's terminal outcome is owned by the batch it is
48+
// enrolled in (via conclude), not by the cancel controller's request-only fast path.
49+
RequestStateBatched RequestState = "batched"
4350
// RequestStateProcessing is the state of a land request that is being processed.
4451
RequestStateProcessing RequestState = "processing"
4552
// RequestStateLanded is the state of a land request that has been successfully processed and landed. This is the final state.
4653
RequestStateLanded RequestState = "landed"
4754
// RequestStateError is the state of a land request that has encountered an error. This is the final state.
4855
RequestStateError RequestState = "error"
56+
// RequestStateCancelling is the non-terminal intent state set when the user has requested cancellation but the
57+
// request has not yet been transitioned to RequestStateCancelled. A request in this state may still reach
58+
// RequestStateLanded or RequestStateError if a concurrent merge or failure wins the race; those terminal
59+
// states prevail. Forward-progress controllers must treat this state the same as terminal (i.e. do not start
60+
// any new work on the request).
61+
RequestStateCancelling RequestState = "cancelling"
62+
// RequestStateCancelled is the state of a land request that was cancelled by the user before it could land. This is the final state.
63+
RequestStateCancelled RequestState = "cancelled"
4964
)
5065

51-
// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed or error).
66+
// IsRequestStateTerminal returns true if the state represents a final, irreversible state (landed, error, or cancelled).
67+
// RequestStateCancelling is intentionally excluded: cancellation is best-effort and a Cancelling request may still
68+
// transition to Landed or Error before it reaches Cancelled. Callers that want to gate forward progress (and treat
69+
// Cancelling as halted) should use IsRequestStateHalted instead.
5270
func IsRequestStateTerminal(s RequestState) bool {
53-
return s == RequestStateLanded || s == RequestStateError
71+
return s == RequestStateLanded || s == RequestStateError || s == RequestStateCancelled
72+
}
73+
74+
// IsRequestStateHalted returns true if the request is either terminal or in the process of being cancelled.
75+
// Forward-progress controllers (validate, batch, ...) use this to short-circuit work for requests that the
76+
// user has asked to cancel — even though Cancelling is non-terminal, no further pipeline work should start.
77+
func IsRequestStateHalted(s RequestState) bool {
78+
return IsRequestStateTerminal(s) || s == RequestStateCancelling
5479
}
5580

5681
// Change represents a code change identified by URIs from a code change provider (e.g., GitHub Pull Request, Phabricator Diff).

entity/request_log.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ const (
7878

7979
// RequestStatusError indicates that the request has encountered an error. It corresponds to the RequestStateError state.
8080
RequestStatusError RequestStatus = "error"
81+
82+
// RequestStatusCancelling indicates that the user has requested cancellation but the request has not yet transitioned
83+
// to the RequestStateCancelled state. Cancellation is best-effort: a request that has already been merged or that
84+
// races to completion before the cancel propagates through the pipeline may still land. Observers should treat this
85+
// as intent only and rely on RequestStatusCancelled (or RequestStatusLanded) for the terminal outcome. Emitted by
86+
// the gateway when the Cancel RPC is received.
87+
RequestStatusCancelling RequestStatus = "cancelling"
88+
89+
// RequestStatusCancelled indicates that the request was cancelled by the user before it could land. It corresponds to the RequestStateCancelled state.
90+
RequestStatusCancelled RequestStatus = "cancelled"
8191
)
8292

8393
// RequestLog is an append-only record that captures a point-in-time snapshot of a request's status

entity/request_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,48 @@ func TestRequestFromBytes_EmptyData(t *testing.T) {
9494
assert.Equal(t, int32(0), req.Version)
9595
}
9696

97+
func TestIsRequestStateTerminal(t *testing.T) {
98+
tests := []struct {
99+
state RequestState
100+
terminal bool
101+
}{
102+
{RequestStateUnknown, false},
103+
{RequestStateStarted, false},
104+
{RequestStateValidated, false},
105+
{RequestStateProcessing, false},
106+
{RequestStateCancelling, false}, // intent only — not terminal
107+
{RequestStateLanded, true},
108+
{RequestStateError, true},
109+
{RequestStateCancelled, true},
110+
}
111+
for _, tt := range tests {
112+
t.Run(string(tt.state), func(t *testing.T) {
113+
assert.Equal(t, tt.terminal, IsRequestStateTerminal(tt.state))
114+
})
115+
}
116+
}
117+
118+
func TestIsRequestStateHalted(t *testing.T) {
119+
tests := []struct {
120+
state RequestState
121+
halted bool
122+
}{
123+
{RequestStateUnknown, false},
124+
{RequestStateStarted, false},
125+
{RequestStateValidated, false},
126+
{RequestStateProcessing, false},
127+
{RequestStateCancelling, true}, // intent halts forward progress
128+
{RequestStateLanded, true},
129+
{RequestStateError, true},
130+
{RequestStateCancelled, true},
131+
}
132+
for _, tt := range tests {
133+
t.Run(string(tt.state), func(t *testing.T) {
134+
assert.Equal(t, tt.halted, IsRequestStateHalted(tt.state))
135+
})
136+
}
137+
}
138+
97139
func TestRequest_SerializationRoundTrip(t *testing.T) {
98140
tests := []struct {
99141
name string

0 commit comments

Comments
 (0)