Skip to content

Commit ed52df8

Browse files
sbalabanovsbalabanov-zzclaude
authored
fix(conclude): publish request log on terminal reconcile + make idempotent (#198)
## Summary The conclude controller reconciles each request in a concluded batch to the batch's terminal state (Landed / Error / Cancelled) but did not emit a corresponding `RequestLog` entry, leaving the request lifecycle log without a terminal record. Peer controllers (`start`, `score`, `cancel`) already publish to the log topic on their transitions; this brings `conclude` in line. The per-request loop is also made idempotent under at-least-once redelivery: - **Already in target terminal state** → skip the CAS but **still publish the log**. The log queue dedupes on `(request_id, status)`, so a prior delivery that completed the state update but failed before publishing the log gets its log emitted on retry, recorded exactly once. - **In a different terminal state** (a racing writer reached terminal first) → skip both the CAS and the log publish; the other writer owns the terminal log entry for the state it wrote. Recorded via a `terminal_state_divergence` metric and a warning log. - **Not terminal** → CAS to target state, then publish log. ## Test plan - [x] `bazel test //submitqueue/orchestrator/controller/conclude:conclude_test` passes - [x] Added test case: idempotent retry — request already `Landed`, no `UpdateState` mocked (gomock would fail if called), publisher still expected - [x] Added test case: divergent terminal state — request already `Cancelled` for a `Succeeded` batch; neither `UpdateState` nor publish allowed - [x] Existing cases updated to expect log publish on the happy path - [x] `make gazelle` clean (picks up new `submitqueue/core/request` dep) - [x] `make fmt` clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: sergeyb <sergeyb@uber.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent f17c76f commit ed52df8

3 files changed

Lines changed: 164 additions & 25 deletions

File tree

submitqueue/orchestrator/controller/conclude/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ go_library(
88
deps = [
99
"//core/metrics",
1010
"//submitqueue/core/consumer",
11+
"//submitqueue/core/request",
1112
"//submitqueue/entity",
1213
"//submitqueue/extension/storage",
1314
"@com_github_uber_go_tally_v4//:tally",

submitqueue/orchestrator/controller/conclude/conclude.go

Lines changed: 64 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/uber-go/tally/v4"
2222
"github.com/uber/submitqueue/core/metrics"
2323
"github.com/uber/submitqueue/submitqueue/core/consumer"
24+
corerequest "github.com/uber/submitqueue/submitqueue/core/request"
2425
"github.com/uber/submitqueue/submitqueue/entity"
2526
"github.com/uber/submitqueue/submitqueue/extension/storage"
2627
"go.uber.org/zap"
@@ -101,27 +102,65 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
101102
metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1)
102103
return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err)
103104
}
105+
requestStatus, err := requestStateToStatus(requestState)
106+
if err != nil {
107+
// Unreachable: batchStateToRequestState only returns terminal request states.
108+
return fmt.Errorf("failed to map request state %s to status: %w", requestState, err)
109+
}
104110

105-
// Update each request's state to reflect the batch outcome.
111+
// Reconcile each request to the batch's terminal state and emit a terminal
112+
// log entry. The flow is idempotent under at-least-once delivery: a prior
113+
// attempt may have completed the CAS but failed before publishing the log,
114+
// so the log publish must still run when the request is already in the
115+
// target terminal state.
106116
for _, requestID := range batch.Contains {
107117
request, err := c.store.GetRequestStore().Get(ctx, requestID)
108118
if err != nil {
109119
metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1)
110120
return fmt.Errorf("failed to get request %s: %w", requestID, err)
111121
}
112122

113-
newVersion := request.Version + 1
114-
if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil {
115-
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
116-
return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)
123+
switch {
124+
case request.State == requestState:
125+
// Idempotent retry: a prior delivery already wrote the terminal
126+
// state. Skip the CAS and fall through to the log publish.
127+
metrics.NamedCounter(c.metricsScope, "process", "already_reconciled", 1)
128+
case entity.IsRequestStateTerminal(request.State):
129+
// Divergent terminal state — a concurrent path (e.g. a racing
130+
// cancel-not-yet-batched transition) reached terminal first. Skip
131+
// the reconcile and the log publish; the other writer owns the
132+
// terminal log entry for the state it actually wrote.
133+
c.logger.Warnw("request already in different terminal state, skipping reconcile",
134+
"batch_id", batch.ID,
135+
"request_id", requestID,
136+
"actual_state", string(request.State),
137+
"expected_state", string(requestState),
138+
)
139+
metrics.NamedCounter(c.metricsScope, "process", "terminal_state_divergence", 1)
140+
continue
141+
default:
142+
newVersion := request.Version + 1
143+
if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil {
144+
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
145+
return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err)
146+
}
147+
request.Version = newVersion
148+
request.State = requestState
149+
150+
c.logger.Infow("updated request state",
151+
"batch_id", batch.ID,
152+
"request_id", requestID,
153+
"new_state", string(requestState),
154+
)
117155
}
118-
request.Version = newVersion
119156

120-
c.logger.Infow("updated request state",
121-
"batch_id", batch.ID,
122-
"request_id", requestID,
123-
"new_state", string(requestState),
124-
)
157+
logEntry := entity.NewRequestLog(requestID, requestStatus, request.Version, "", map[string]string{
158+
"batch_id": batch.ID,
159+
})
160+
if err := corerequest.PublishLog(ctx, c.registry, logEntry, requestID); err != nil {
161+
metrics.NamedCounter(c.metricsScope, "process", "log_publish_errors", 1)
162+
return fmt.Errorf("failed to publish request log for %s: %w", requestID, err)
163+
}
125164
}
126165

127166
return nil // Success - message will be acked
@@ -155,3 +194,17 @@ func batchStateToRequestState(state entity.BatchState) (entity.RequestState, err
155194
return entity.RequestStateUnknown, fmt.Errorf("non-terminal batch state: %s", state)
156195
}
157196
}
197+
198+
// requestStateToStatus maps a terminal request state to the corresponding log status.
199+
func requestStateToStatus(state entity.RequestState) (entity.RequestStatus, error) {
200+
switch state {
201+
case entity.RequestStateLanded:
202+
return entity.RequestStatusLanded, nil
203+
case entity.RequestStateError:
204+
return entity.RequestStatusError, nil
205+
case entity.RequestStateCancelled:
206+
return entity.RequestStatusCancelled, nil
207+
default:
208+
return entity.RequestStatusUnknown, fmt.Errorf("non-terminal request state: %s", state)
209+
}
210+
}

submitqueue/orchestrator/controller/conclude/conclude_test.go

Lines changed: 99 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,10 @@ func batchIDPayload(t *testing.T, id string) []byte {
4141
}
4242

4343
// newTestController creates a controller with test dependencies.
44-
func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage) *Controller {
44+
// expectLogPublish controls whether the log topic publisher is wired with an
45+
// expectation; tests that don't reach the log publish step pass false so an
46+
// unexpected publish would fail the test.
47+
func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *storagemock.MockStorage, expectLogPublish bool) (*Controller, *queuemock.MockPublisher) {
4548
logger := zaptest.NewLogger(t).Sugar()
4649
scope := tally.NoopScope
4750

@@ -53,15 +56,26 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, mockStorage *stora
5356
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
5457
}
5558

56-
registry, err := consumer.NewTopicRegistry(nil)
59+
mockPub := queuemock.NewMockPublisher(ctrl)
60+
if expectLogPublish {
61+
mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
62+
}
63+
mockQ := queuemock.NewMockQueue(ctrl)
64+
mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes()
65+
66+
registry, err := consumer.NewTopicRegistry(
67+
[]consumer.TopicConfig{
68+
{Key: consumer.TopicKeyLog, Name: "log", Queue: mockQ},
69+
},
70+
)
5771
require.NoError(t, err)
5872

59-
return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude")
73+
return NewController(logger, scope, mockStorage, registry, consumer.TopicKeyConclude, "orchestrator-conclude"), mockPub
6074
}
6175

6276
func TestNewController(t *testing.T) {
6377
ctrl := gomock.NewController(t)
64-
controller := newTestController(t, ctrl, nil)
78+
controller, _ := newTestController(t, ctrl, nil, false)
6579

6680
require.NotNil(t, controller)
6781
assert.Equal(t, consumer.TopicKeyConclude, controller.TopicKey())
@@ -71,11 +85,12 @@ func TestNewController(t *testing.T) {
7185

7286
func TestController_Process(t *testing.T) {
7387
tests := []struct {
74-
name string
75-
batch entity.Batch
76-
setupStore func(*gomock.Controller) *storagemock.MockStorage
77-
wantErr bool
78-
retryable bool
88+
name string
89+
batch entity.Batch
90+
setupStore func(*gomock.Controller) *storagemock.MockStorage
91+
expectLogPublish bool
92+
wantErr bool
93+
retryable bool
7994
}{
8095
{
8196
name: "succeeded batch lands requests",
@@ -111,6 +126,7 @@ func TestController_Process(t *testing.T) {
111126
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
112127
return mockStorage
113128
},
129+
expectLogPublish: true,
114130
},
115131
{
116132
name: "failed batch errors requests",
@@ -142,6 +158,7 @@ func TestController_Process(t *testing.T) {
142158
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
143159
return mockStorage
144160
},
161+
expectLogPublish: true,
145162
},
146163
{
147164
name: "cancelled batch cancels requests",
@@ -173,6 +190,74 @@ func TestController_Process(t *testing.T) {
173190
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
174191
return mockStorage
175192
},
193+
expectLogPublish: true,
194+
},
195+
{
196+
name: "idempotent retry: request already in target terminal state still publishes log",
197+
batch: entity.Batch{
198+
ID: "test-queue/batch/8",
199+
Queue: "test-queue",
200+
Contains: []string{"test-queue/20"},
201+
State: entity.BatchStateSucceeded,
202+
Version: 2,
203+
},
204+
setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage {
205+
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
206+
mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/8").Return(entity.Batch{
207+
ID: "test-queue/batch/8",
208+
Queue: "test-queue",
209+
Contains: []string{"test-queue/20"},
210+
State: entity.BatchStateSucceeded,
211+
Version: 2,
212+
}, nil)
213+
214+
// Request is already Landed (prior delivery wrote it). UpdateState
215+
// must NOT be called — gomock will fail the test if it is.
216+
mockRequestStore := storagemock.NewMockRequestStore(ctrl)
217+
mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/20").Return(entity.Request{
218+
ID: "test-queue/20", Version: 7, State: entity.RequestStateLanded,
219+
}, nil)
220+
221+
mockStorage := storagemock.NewMockStorage(ctrl)
222+
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
223+
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
224+
return mockStorage
225+
},
226+
expectLogPublish: true,
227+
},
228+
{
229+
name: "divergent terminal state skips reconcile and log publish",
230+
batch: entity.Batch{
231+
ID: "test-queue/batch/9",
232+
Queue: "test-queue",
233+
Contains: []string{"test-queue/30"},
234+
State: entity.BatchStateSucceeded,
235+
Version: 2,
236+
},
237+
setupStore: func(ctrl *gomock.Controller) *storagemock.MockStorage {
238+
mockBatchStore := storagemock.NewMockBatchStore(ctrl)
239+
mockBatchStore.EXPECT().Get(gomock.Any(), "test-queue/batch/9").Return(entity.Batch{
240+
ID: "test-queue/batch/9",
241+
Queue: "test-queue",
242+
Contains: []string{"test-queue/30"},
243+
State: entity.BatchStateSucceeded,
244+
Version: 2,
245+
}, nil)
246+
247+
// Request is already in a *different* terminal state (Cancelled).
248+
// Conclude must not write the log entry (the other writer owns it),
249+
// and must not attempt UpdateState.
250+
mockRequestStore := storagemock.NewMockRequestStore(ctrl)
251+
mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/30").Return(entity.Request{
252+
ID: "test-queue/30", Version: 5, State: entity.RequestStateCancelled,
253+
}, nil)
254+
255+
mockStorage := storagemock.NewMockStorage(ctrl)
256+
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
257+
mockStorage.EXPECT().GetRequestStore().Return(mockRequestStore).AnyTimes()
258+
return mockStorage
259+
},
260+
expectLogPublish: false,
176261
},
177262
{
178263
name: "non-terminal batch state returns error",
@@ -201,7 +286,7 @@ func TestController_Process(t *testing.T) {
201286
retryable: false,
202287
},
203288
{
204-
name: "request store get failure is retryable",
289+
name: "request store get failure returns error",
205290
batch: entity.Batch{
206291
ID: "test-queue/batch/5",
207292
Queue: "test-queue",
@@ -231,7 +316,7 @@ func TestController_Process(t *testing.T) {
231316
retryable: false,
232317
},
233318
{
234-
name: "request store update failure is retryable",
319+
name: "request store update failure returns error",
235320
batch: entity.Batch{
236321
ID: "test-queue/batch/6",
237322
Queue: "test-queue",
@@ -296,7 +381,7 @@ func TestController_Process(t *testing.T) {
296381
mockStorage = tt.setupStore(ctrl)
297382
}
298383

299-
controller := newTestController(t, ctrl, mockStorage)
384+
controller, _ := newTestController(t, ctrl, mockStorage, tt.expectLogPublish)
300385

301386
msg := entityqueue.NewMessage(tt.batch.ID, batchIDPayload(t, tt.batch.ID), tt.batch.Queue, nil)
302387
delivery := queuemock.NewMockDelivery(ctrl)
@@ -324,7 +409,7 @@ func TestController_Process_StorageFailure(t *testing.T) {
324409
mockStorage := storagemock.NewMockStorage(ctrl)
325410
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
326411

327-
controller := newTestController(t, ctrl, mockStorage)
412+
controller, _ := newTestController(t, ctrl, mockStorage, false)
328413

329414
msg := entityqueue.NewMessage("test-queue/batch/1", batchIDPayload(t, "test-queue/batch/1"), "test-queue", nil)
330415
delivery := queuemock.NewMockDelivery(ctrl)
@@ -338,7 +423,7 @@ func TestController_Process_StorageFailure(t *testing.T) {
338423

339424
func TestController_InterfaceImplementation(t *testing.T) {
340425
ctrl := gomock.NewController(t)
341-
controller := newTestController(t, ctrl, nil)
426+
controller, _ := newTestController(t, ctrl, nil, false)
342427

343428
var _ consumer.Controller = controller
344429
}

0 commit comments

Comments
 (0)