Skip to content

Commit 693fb79

Browse files
committed
validate: per-URI loop calls GetByURI with early termination
Adapts to the per-record / per-URI ChangeStore API: - start.claimURIs loops Create per URI; each call is independent and idempotent on PK conflict, so partial-failure retries converge. - validate.checkDuplicate iterates request URIs, calls GetByURI for each, filters out self, dedupes seen owners, and short-circuits on the first live duplicate. - Test cases restructured to map records by URI (since the controller now queries one URI at a time) and to exercise multi-URI requests.
1 parent 7d4c49d commit 693fb79

4 files changed

Lines changed: 100 additions & 82 deletions

File tree

orchestrator/controller/start/start.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -148,26 +148,26 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
148148
return nil // Success - message will be acked
149149
}
150150

151-
// claimURIs persists one ChangeRecord per URI in the request. The change store's
152-
// INSERT IGNORE semantics make this idempotent on queue redelivery (same (URI, RequestID)
153-
// is a no-op). Different requests with overlapping URIs do NOT collide on insert; the
154-
// validate controller queries the change store to detect that overlap.
151+
// claimURIs persists one ChangeRecord per URI in the request. Each Create call is
152+
// independent; the change store's per-PK idempotency makes the loop safe under
153+
// queue redelivery (same (Queue, URI, RequestID) is a no-op on retry). Different
154+
// requests with overlapping URIs do NOT collide on insert; the validate controller
155+
// queries the change store to detect that overlap.
155156
func (c *Controller) claimURIs(ctx context.Context, request entity.Request) error {
156-
if len(request.Change.URIs) == 0 {
157-
return nil
158-
}
159157
now := time.Now().UnixMilli()
160-
records := make([]entity.ChangeRecord, 0, len(request.Change.URIs))
161158
for _, uri := range request.Change.URIs {
162-
records = append(records, entity.ChangeRecord{
159+
record := entity.ChangeRecord{
163160
URI: uri,
164161
RequestID: request.ID,
165162
Queue: request.Queue,
166163
CreatedAt: now,
167164
UpdatedAt: now,
168-
})
165+
}
166+
if err := c.changeStore.Create(ctx, record); err != nil {
167+
return fmt.Errorf("failed to claim uri=%s for request %s: %w", uri, request.ID, err)
168+
}
169169
}
170-
return c.changeStore.Create(ctx, records)
170+
return nil
171171
}
172172

173173
// publish publishes a request ID to the specified topic key.

orchestrator/controller/start/start_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,11 @@ func TestController_Process_MultipleChanges(t *testing.T) {
200200
cs := changemock.NewMockChangeStore(ctrl)
201201
var captured []entity.ChangeRecord
202202
cs.EXPECT().Create(gomock.Any(), gomock.Any()).DoAndReturn(
203-
func(ctx context.Context, records []entity.ChangeRecord) error {
204-
captured = records
203+
func(ctx context.Context, record entity.ChangeRecord) error {
204+
captured = append(captured, record)
205205
return nil
206206
},
207-
)
207+
).Times(3)
208208

209209
controller := newTestController(t, ctrl, newMockStorage(ctrl), cs, nil)
210210

orchestrator/controller/validate/validate.go

Lines changed: 32 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -165,47 +165,42 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
165165
return nil // Success - message will be acked
166166
}
167167

168-
// checkDuplicate queries the change store for any other in-flight request whose URIs
169-
// overlap with this request's. The change rows themselves are written upstream by the
170-
// start controller; validate is read-only here. For each unique candidate request_id
171-
// returned, this consults the request store to skip terminal owners and orphans
172-
// (ErrNotFound). Returns the first live-duplicate request_id found, or "" if none.
168+
// checkDuplicate looks for any other in-flight request whose URIs overlap with this
169+
// request's. The change rows themselves are written upstream by the start controller;
170+
// validate is read-only here. For each URI it queries the change store, walks the
171+
// returned candidates skipping self/duplicates/orphans/terminals, and short-circuits
172+
// on the first live duplicate. Returns that request_id, or "" if none.
173173
//
174-
// Two single-table queries by design — no cross-store SQL joins.
174+
// Per-URI / per-record reads keep the contract backend-agnostic; the typical request
175+
// has 1-5 URIs, so the loop is cheap.
175176
func (c *Controller) checkDuplicate(ctx context.Context, request entity.Request) (string, error) {
176-
if len(request.Change.URIs) == 0 {
177-
return "", nil
178-
}
179-
180-
overlaps, err := c.changeStore.FindOverlapping(ctx, request.Queue, request.Change.URIs)
181-
if err != nil {
182-
coremetrics.NamedCounter(c.metricsScope, "process", "change_store_query_errors", 1)
183-
return "", fmt.Errorf("failed to query overlapping changes for request %s: %w", request.ID, err)
184-
}
185-
186-
// Liveness check: an overlap is only a duplicate if the owning request is non-terminal.
187-
// The store does not exclude self, so skip our own request_id here. Walk unique candidate
188-
// request_ids; orphans (ErrNotFound) and terminal owners are skipped.
189-
seen := make(map[string]struct{}, len(overlaps))
190-
for _, rec := range overlaps {
191-
if rec.RequestID == request.ID {
192-
continue // skip rows belonging to this request itself
193-
}
194-
if _, ok := seen[rec.RequestID]; ok {
195-
continue
196-
}
197-
seen[rec.RequestID] = struct{}{}
198-
199-
owner, err := c.store.GetRequestStore().Get(ctx, rec.RequestID)
200-
if errors.Is(err, storage.ErrNotFound) {
201-
continue
202-
}
177+
seenOwners := make(map[string]struct{})
178+
for _, uri := range request.Change.URIs {
179+
records, err := c.changeStore.GetByURI(ctx, request.Queue, uri)
203180
if err != nil {
204-
coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1)
205-
return "", fmt.Errorf("failed to look up overlapping request %s: %w", rec.RequestID, err)
181+
coremetrics.NamedCounter(c.metricsScope, "process", "change_store_query_errors", 1)
182+
return "", fmt.Errorf("failed to query change store for request %s uri=%s: %w", request.ID, uri, err)
206183
}
207-
if !entity.IsRequestStateTerminal(owner.State) {
208-
return rec.RequestID, nil
184+
for _, rec := range records {
185+
if rec.RequestID == request.ID {
186+
continue // skip rows belonging to this request itself
187+
}
188+
if _, ok := seenOwners[rec.RequestID]; ok {
189+
continue
190+
}
191+
seenOwners[rec.RequestID] = struct{}{}
192+
193+
owner, err := c.store.GetRequestStore().Get(ctx, rec.RequestID)
194+
if errors.Is(err, storage.ErrNotFound) {
195+
continue
196+
}
197+
if err != nil {
198+
coremetrics.NamedCounter(c.metricsScope, "process", "storage_errors", 1)
199+
return "", fmt.Errorf("failed to look up overlapping request %s: %w", rec.RequestID, err)
200+
}
201+
if !entity.IsRequestStateTerminal(owner.State) {
202+
return rec.RequestID, nil
203+
}
209204
}
210205
}
211206
return "", nil

orchestrator/controller/validate/validate_test.go

Lines changed: 54 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ func newMockStorage(ctrl *gomock.Controller, request entity.Request) (*storagemo
8181
}
8282

8383
// newMockChangeStore creates a MockChangeStore with default no-overlap behavior.
84-
// Tests that need to simulate overlap can override FindOverlapping with their own EXPECT.
84+
// Tests that need to simulate overlap can override GetByURI with their own EXPECT.
8585
// Validate is read-only against the change store — it never calls Create.
8686
func newMockChangeStore(ctrl *gomock.Controller) *changemock.MockChangeStore {
8787
cs := changemock.NewMockChangeStore(ctrl)
88-
cs.EXPECT().FindOverlapping(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
88+
cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
8989
return cs
9090
}
9191

@@ -278,61 +278,71 @@ func TestController_Process_DuplicateDetection(t *testing.T) {
278278
queueName = "test-queue"
279279
newRequestID = queueName + "/123"
280280
dupRequestID = queueName + "/100"
281-
uri = "github://uber/service/pull/1/abc"
281+
uriA = "github://uber/service/pull/1/abc"
282+
uriB = "github://uber/service/pull/2/def"
282283
anotherReqID = queueName + "/050"
283284
orphanReqID = queueName + "/999"
284285
terminalReqID = queueName + "/200"
285286
)
286287

287288
tests := []struct {
288289
name string
289-
overlap []entity.ChangeRecord
290+
requestURIs []string // URIs on the new request; defaults to [uriA]
291+
byURI map[string][]entity.ChangeRecord // GetByURI mock returns
290292
ownerLookup map[string]entity.Request
291293
ownerNotFound map[string]bool
292294
ownerErr map[string]error
293295
wantUserErr bool
294296
wantUnexpected bool
295297
}{
296298
{
297-
name: "no overlap proceeds to merge check",
298-
overlap: nil,
299+
name: "no overlap proceeds to merge check",
300+
byURI: map[string][]entity.ChangeRecord{uriA: nil},
299301
},
300302
{
301-
name: "overlap with live in-flight request returns user error",
302-
overlap: []entity.ChangeRecord{{URI: uri, RequestID: dupRequestID, Queue: queueName}},
303+
name: "overlap with live in-flight request returns user error",
304+
byURI: map[string][]entity.ChangeRecord{
305+
uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}},
306+
},
303307
ownerLookup: map[string]entity.Request{
304308
dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateStarted, Version: 1},
305309
},
306310
wantUserErr: true,
307311
},
308312
{
309-
name: "overlap with terminal owner is skipped",
310-
overlap: []entity.ChangeRecord{{URI: uri, RequestID: terminalReqID, Queue: queueName}},
313+
name: "overlap with terminal owner is skipped",
314+
byURI: map[string][]entity.ChangeRecord{
315+
uriA: {{URI: uriA, RequestID: terminalReqID, Queue: queueName}},
316+
},
311317
ownerLookup: map[string]entity.Request{
312318
terminalReqID: {ID: terminalReqID, Queue: queueName, State: entity.RequestStateLanded, Version: 5},
313319
},
314320
},
315321
{
316-
name: "overlap with orphan owner (ErrNotFound) is skipped",
317-
overlap: []entity.ChangeRecord{{URI: uri, RequestID: orphanReqID, Queue: queueName}},
322+
name: "overlap with orphan owner (ErrNotFound) is skipped",
323+
byURI: map[string][]entity.ChangeRecord{
324+
uriA: {{URI: uriA, RequestID: orphanReqID, Queue: queueName}},
325+
},
318326
ownerNotFound: map[string]bool{orphanReqID: true},
319327
},
320328
{
321-
name: "multiple URIs same owner deduped to single Get call",
322-
overlap: []entity.ChangeRecord{
323-
{URI: uri, RequestID: dupRequestID, Queue: queueName},
324-
{URI: "github://uber/service/pull/2/def", RequestID: dupRequestID, Queue: queueName},
329+
name: "multi-URI same owner deduped to single Get call",
330+
requestURIs: []string{uriA, uriB},
331+
byURI: map[string][]entity.ChangeRecord{
332+
uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}},
333+
uriB: {{URI: uriB, RequestID: dupRequestID, Queue: queueName}},
325334
},
326335
ownerLookup: map[string]entity.Request{
327336
dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateValidated, Version: 2},
328337
},
329338
wantUserErr: true,
330339
},
331340
{
332-
name: "first owner terminal then second live picks the live one",
333-
overlap: []entity.ChangeRecord{
334-
{URI: uri, RequestID: terminalReqID, Queue: queueName},
335-
{URI: "github://uber/service/pull/2/def", RequestID: anotherReqID, Queue: queueName},
341+
name: "first URI's owner is terminal, second URI's owner is live",
342+
requestURIs: []string{uriA, uriB},
343+
byURI: map[string][]entity.ChangeRecord{
344+
uriA: {{URI: uriA, RequestID: terminalReqID, Queue: queueName}},
345+
uriB: {{URI: uriB, RequestID: anotherReqID, Queue: queueName}},
336346
},
337347
ownerLookup: map[string]entity.Request{
338348
terminalReqID: {ID: terminalReqID, State: entity.RequestStateError, Version: 3},
@@ -342,25 +352,29 @@ func TestController_Process_DuplicateDetection(t *testing.T) {
342352
},
343353
{
344354
// Store doesn't exclude self; controller filters by RequestID and must not look up its own row.
345-
name: "self row in overlap is filtered (no Get call)",
346-
overlap: []entity.ChangeRecord{
347-
{URI: uri, RequestID: newRequestID, Queue: queueName},
355+
name: "self row in result is filtered (no Get call)",
356+
byURI: map[string][]entity.ChangeRecord{
357+
uriA: {{URI: uriA, RequestID: newRequestID, Queue: queueName}},
348358
},
349359
},
350360
{
351361
name: "self row mixed with live other returns the other",
352-
overlap: []entity.ChangeRecord{
353-
{URI: uri, RequestID: newRequestID, Queue: queueName},
354-
{URI: uri, RequestID: dupRequestID, Queue: queueName},
362+
byURI: map[string][]entity.ChangeRecord{
363+
uriA: {
364+
{URI: uriA, RequestID: newRequestID, Queue: queueName},
365+
{URI: uriA, RequestID: dupRequestID, Queue: queueName},
366+
},
355367
},
356368
ownerLookup: map[string]entity.Request{
357369
dupRequestID: {ID: dupRequestID, Queue: queueName, State: entity.RequestStateStarted, Version: 1},
358370
},
359371
wantUserErr: true,
360372
},
361373
{
362-
name: "owner lookup unexpected error propagates",
363-
overlap: []entity.ChangeRecord{{URI: uri, RequestID: dupRequestID, Queue: queueName}},
374+
name: "owner lookup unexpected error propagates",
375+
byURI: map[string][]entity.ChangeRecord{
376+
uriA: {{URI: uriA, RequestID: dupRequestID, Queue: queueName}},
377+
},
364378
ownerErr: map[string]error{
365379
dupRequestID: fmt.Errorf("db down"),
366380
},
@@ -373,10 +387,15 @@ func TestController_Process_DuplicateDetection(t *testing.T) {
373387
ctrl := gomock.NewController(t)
374388
mc := newMergeableMock(ctrl)
375389

390+
uris := tt.requestURIs
391+
if uris == nil {
392+
uris = []string{uriA}
393+
}
394+
376395
request := entity.Request{
377396
ID: newRequestID,
378397
Queue: queueName,
379-
Change: entity.Change{URIs: []string{uri}},
398+
Change: entity.Change{URIs: uris},
380399
LandStrategy: entity.RequestLandStrategyRebase,
381400
State: entity.RequestStateStarted,
382401
Version: 1,
@@ -397,7 +416,11 @@ func TestController_Process_DuplicateDetection(t *testing.T) {
397416
store.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()
398417

399418
cs := changemock.NewMockChangeStore(ctrl)
400-
cs.EXPECT().FindOverlapping(gomock.Any(), queueName, []string{uri}).Return(tt.overlap, nil)
419+
// One GetByURI per URI on the request, in order. Controller short-circuits on first
420+
// live duplicate, so .AnyTimes() lets unmatched URIs go un-queried.
421+
for _, u := range uris {
422+
cs.EXPECT().GetByURI(gomock.Any(), queueName, u).Return(tt.byURI[u], nil).MaxTimes(1)
423+
}
401424

402425
controller := newTestController(t, ctrl, store, cs, mc, nil)
403426

@@ -437,7 +460,7 @@ func TestController_Process_ChangeStoreQueryFailure(t *testing.T) {
437460
store, _ := newMockStorage(ctrl, request)
438461

439462
cs := changemock.NewMockChangeStore(ctrl)
440-
cs.EXPECT().FindOverlapping(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down"))
463+
cs.EXPECT().GetByURI(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("change store down"))
441464

442465
controller := newTestController(t, ctrl, store, cs, mc, nil)
443466

0 commit comments

Comments
 (0)