diff --git a/CLAUDE.md b/CLAUDE.md index bcfac746..b34a6847 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -9,7 +9,7 @@ SubmitQueue is a distributed system for managing code submission workflows. It f 1. **Immutable entities** — once created, don't modify in place. Create new versions with updated fields. 2. **Eventual consistency** — handle stale reads, idempotent operations, and convergence over time. 3. **Event sourcing** — store events (what happened) rather than just current state for critical changes. -4. **Optimistic locking** — use version numbers instead of pessimistic locks. Avoid transactions; prefer optimistic concurrency and retries. +4. **Optimistic locking** — use version numbers instead of pessimistic locks. Avoid transactions; prefer optimistic concurrency and retries. **Version arithmetic lives in the controller, not the storage layer.** Update methods take both `oldVersion` (the where-clause guard) and `newVersion` (the value to write); the store performs a pure conditional write. Controllers compute `newVersion = oldVersion + 1`, call the store, and only assign `entity.Version = newVersion` after the call succeeds. Pre-incrementing in memory before the call is a bug pattern — on error the in-memory version drifts ahead of the database. See [extension/storage/README.md](extension/storage/README.md). 5. **Idempotency keys** — include unique request IDs, check for duplicates before executing. ```go @@ -22,16 +22,12 @@ type Request struct { UpdatedAt int64 } -// Instead of mutating, create new version -func (r Request) WithStatus(status Status) Request { - return Request{ - ID: r.ID, - Version: r.Version + 1, - Status: status, - CreatedAt: r.CreatedAt, - UpdatedAt: time.Now().UnixMilli(), - } +// Controller pattern — version arithmetic outside storage, assigned only on success +newVersion := request.Version + 1 +if err := store.UpdateStatus(ctx, request.ID, request.Version, newVersion, newStatus); err != nil { + return err } +request.Version = newVersion ``` ## Architecture diff --git a/extension/storage/README.md b/extension/storage/README.md new file mode 100644 index 00000000..fe53f2c6 --- /dev/null +++ b/extension/storage/README.md @@ -0,0 +1,27 @@ +# Storage + +Pluggable persistence interfaces for SubmitQueue entities (requests, batches, dependents, logs, etc.). Implementations live under `extension/storage//`. + +## Optimistic locking contract + +Entities that support concurrent mutation carry an `int32 Version` field. Updates are conditional on the version: the write only succeeds if the persisted version matches the caller's expected version. On mismatch, the implementation returns `storage.ErrVersionMismatch`. + +**Version arithmetic is owned by the controller, not the store.** Update methods take both `oldVersion` (the where-clause guard) and `newVersion` (the value to write): + +```go +UpdateState(ctx, id, oldVersion, newVersion int32, newState entity.RequestState) error +``` + +The store performs a pure conditional write — it does not compute `oldVersion + 1` internally. This keeps the in-memory entity and the persisted row in sync without the storage layer mutating values the caller didn't supply. + +### Caller pattern + +```go +newVersion := entity.Version + 1 +if err := store.UpdateState(ctx, entity.ID, entity.Version, newVersion, newState); err != nil { + return err // entity.Version unchanged on failure — safe to retry +} +entity.Version = newVersion // only after the write succeeded +``` + +The post-success assignment matters whenever the entity is read again later in the same flow. Pre-incrementing in memory before the call is a bug pattern: if the call fails and the caller swallows the error, the in-memory version is now ahead of the database and subsequent updates will fail with `ErrVersionMismatch` for non-obvious reasons. diff --git a/extension/storage/batch_dependent_store.go b/extension/storage/batch_dependent_store.go index 4c062f96..0dc2d1ac 100644 --- a/extension/storage/batch_dependent_store.go +++ b/extension/storage/batch_dependent_store.go @@ -32,7 +32,8 @@ type BatchDependentStore interface { // Returns ErrAlreadyExists if the entry already exists. Create(ctx context.Context, batchDependent entity.BatchDependent) error - // UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version. - // If versions do not match, returns ErrVersionMismatch. - UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) error + // UpdateDependents updates the dependents of a batch dependent and the version to newVersion + // if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. + // Version arithmetic is owned by the caller; the store performs a pure conditional write. + UpdateDependents(ctx context.Context, batchID string, oldVersion, newVersion int32, dependents []string) error } diff --git a/extension/storage/batch_store.go b/extension/storage/batch_store.go index 412a90cb..fa432e62 100644 --- a/extension/storage/batch_store.go +++ b/extension/storage/batch_store.go @@ -31,13 +31,15 @@ type BatchStore interface { // Returns ErrAlreadyExists if a batch with the same ID already exists. Create(ctx context.Context, batch entity.Batch) error - // UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. - // The implementation should increment the version by 1 atomically with the state update. - UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error - - // UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version. - // If versions do not match, returns ErrVersionMismatch. The implementation should increment the version by 1 atomically. - UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) error + // UpdateState updates the state of a batch to newState and the version to newVersion + // if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. + // Version arithmetic is owned by the caller; the store performs a pure conditional write. + UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.BatchState) error + + // UpdateScoreAndState atomically updates the score and state of a batch and the version to newVersion + // if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. + // Version arithmetic is owned by the caller; the store performs a pure conditional write. + UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error // GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states. GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error) diff --git a/extension/storage/mock/batch_dependent_store_mock.go b/extension/storage/mock/batch_dependent_store_mock.go index 06269fc9..f29e6a5d 100644 --- a/extension/storage/mock/batch_dependent_store_mock.go +++ b/extension/storage/mock/batch_dependent_store_mock.go @@ -71,15 +71,15 @@ func (mr *MockBatchDependentStoreMockRecorder) Get(ctx, batchID any) *gomock.Cal } // UpdateDependents mocks base method. -func (m *MockBatchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) error { +func (m *MockBatchDependentStore) UpdateDependents(ctx context.Context, batchID string, oldVersion, newVersion int32, dependents []string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateDependents", ctx, batchID, version, dependents) + ret := m.ctrl.Call(m, "UpdateDependents", ctx, batchID, oldVersion, newVersion, dependents) ret0, _ := ret[0].(error) return ret0 } // UpdateDependents indicates an expected call of UpdateDependents. -func (mr *MockBatchDependentStoreMockRecorder) UpdateDependents(ctx, batchID, version, dependents any) *gomock.Call { +func (mr *MockBatchDependentStoreMockRecorder) UpdateDependents(ctx, batchID, oldVersion, newVersion, dependents any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDependents", reflect.TypeOf((*MockBatchDependentStore)(nil).UpdateDependents), ctx, batchID, version, dependents) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateDependents", reflect.TypeOf((*MockBatchDependentStore)(nil).UpdateDependents), ctx, batchID, oldVersion, newVersion, dependents) } diff --git a/extension/storage/mock/batch_store_mock.go b/extension/storage/mock/batch_store_mock.go index 4b79c0d5..87297303 100644 --- a/extension/storage/mock/batch_store_mock.go +++ b/extension/storage/mock/batch_store_mock.go @@ -86,29 +86,29 @@ func (mr *MockBatchStoreMockRecorder) GetByQueueAndStates(ctx, queue, states any } // UpdateScoreAndState mocks base method. -func (m *MockBatchStore) UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) error { +func (m *MockBatchStore) UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateScoreAndState", ctx, id, version, score, newState) + ret := m.ctrl.Call(m, "UpdateScoreAndState", ctx, id, oldVersion, newVersion, score, newState) ret0, _ := ret[0].(error) return ret0 } // UpdateScoreAndState indicates an expected call of UpdateScoreAndState. -func (mr *MockBatchStoreMockRecorder) UpdateScoreAndState(ctx, id, version, score, newState any) *gomock.Call { +func (mr *MockBatchStoreMockRecorder) UpdateScoreAndState(ctx, id, oldVersion, newVersion, score, newState any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScoreAndState", reflect.TypeOf((*MockBatchStore)(nil).UpdateScoreAndState), ctx, id, version, score, newState) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScoreAndState", reflect.TypeOf((*MockBatchStore)(nil).UpdateScoreAndState), ctx, id, oldVersion, newVersion, score, newState) } // UpdateState mocks base method. -func (m *MockBatchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error { +func (m *MockBatchStore) UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.BatchState) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateState", ctx, id, version, newState) + ret := m.ctrl.Call(m, "UpdateState", ctx, id, oldVersion, newVersion, newState) ret0, _ := ret[0].(error) return ret0 } // UpdateState indicates an expected call of UpdateState. -func (mr *MockBatchStoreMockRecorder) UpdateState(ctx, id, version, newState any) *gomock.Call { +func (mr *MockBatchStoreMockRecorder) UpdateState(ctx, id, oldVersion, newVersion, newState any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockBatchStore)(nil).UpdateState), ctx, id, version, newState) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockBatchStore)(nil).UpdateState), ctx, id, oldVersion, newVersion, newState) } diff --git a/extension/storage/mock/request_store_mock.go b/extension/storage/mock/request_store_mock.go index 7ee16efe..2c5bdeeb 100644 --- a/extension/storage/mock/request_store_mock.go +++ b/extension/storage/mock/request_store_mock.go @@ -71,15 +71,15 @@ func (mr *MockRequestStoreMockRecorder) Get(ctx, id any) *gomock.Call { } // UpdateState mocks base method. -func (m *MockRequestStore) UpdateState(ctx context.Context, id string, version int32, newState entity.RequestState) error { +func (m *MockRequestStore) UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.RequestState) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateState", ctx, id, version, newState) + ret := m.ctrl.Call(m, "UpdateState", ctx, id, oldVersion, newVersion, newState) ret0, _ := ret[0].(error) return ret0 } // UpdateState indicates an expected call of UpdateState. -func (mr *MockRequestStoreMockRecorder) UpdateState(ctx, id, version, newState any) *gomock.Call { +func (mr *MockRequestStoreMockRecorder) UpdateState(ctx, id, oldVersion, newVersion, newState any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockRequestStore)(nil).UpdateState), ctx, id, version, newState) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockRequestStore)(nil).UpdateState), ctx, id, oldVersion, newVersion, newState) } diff --git a/extension/storage/mysql/batch_dependent_store.go b/extension/storage/mysql/batch_dependent_store.go index d2fa8677..f7cc54a5 100644 --- a/extension/storage/mysql/batch_dependent_store.go +++ b/extension/storage/mysql/batch_dependent_store.go @@ -91,10 +91,10 @@ func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity. return nil } -// UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version. -// If versions do not match, returns ErrVersionMismatch. -// The implementation increments the version by 1 atomically with the dependents update. -func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) (retErr error) { +// UpdateDependents updates the dependents of a batch dependent and the version to newVersion +// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. +// Version arithmetic is owned by the caller; this is a pure conditional write. +func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, oldVersion, newVersion int32, dependents []string) (retErr error) { op := metrics.Begin(s.scope, "update_dependents") defer func() { op.Complete(retErr) }() @@ -104,28 +104,28 @@ func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID stri } result, err := s.db.ExecContext(ctx, - "UPDATE batch_dependent SET dependents = ?, version = version + 1 WHERE batch_id = ? AND version = ?", - dependentsJSON, batchID, version, + "UPDATE batch_dependent SET dependents = ?, version = ? WHERE batch_id = ? AND version = ?", + dependentsJSON, newVersion, batchID, oldVersion, ) if err != nil { return fmt.Errorf( - "failed to update batch dependent dependents for batchID=%q version=%d: %w", - batchID, version, err, + "failed to update batch dependent dependents for batchID=%q oldVersion=%d newVersion=%d: %w", + batchID, oldVersion, newVersion, err, ) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf( - "failed to get rows affected from update for batchID=%q version=%d: %w", - batchID, version, err, + "failed to get rows affected from update for batchID=%q oldVersion=%d newVersion=%d: %w", + batchID, oldVersion, newVersion, err, ) } if rowsAffected != 1 { return fmt.Errorf( "version mismatch for batch dependent update: batchID=%q expected_version=%d: %w", - batchID, version, storage.ErrVersionMismatch, + batchID, oldVersion, storage.ErrVersionMismatch, ) } diff --git a/extension/storage/mysql/batch_store.go b/extension/storage/mysql/batch_store.go index b14b4fe7..b2eae76a 100644 --- a/extension/storage/mysql/batch_store.go +++ b/extension/storage/mysql/batch_store.go @@ -102,70 +102,72 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err return nil } -// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. -// The implementation increments the version by 1 atomically with the state update. -func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) (retErr error) { +// UpdateState updates the state of a batch to newState and the version to newVersion +// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. +// Version arithmetic is owned by the caller; this is a pure conditional write. +func (s *batchStore) UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.BatchState) (retErr error) { op := metrics.Begin(s.scope, "update_state") defer func() { op.Complete(retErr) }() result, err := s.db.ExecContext(ctx, - "UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?", - newState, id, version, + "UPDATE batch SET state = ?, version = ? WHERE id = ? AND version = ?", + newState, newVersion, id, oldVersion, ) if err != nil { return fmt.Errorf( - "failed to update batch state for id=%q version=%d newState=%v: %w", - id, version, newState, err, + "failed to update batch state for id=%q oldVersion=%d newVersion=%d newState=%v: %w", + id, oldVersion, newVersion, newState, err, ) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf( - "failed to get rows affected from update for id=%q version=%d newState=%v: %w", - id, version, newState, err, + "failed to get rows affected from update for id=%q oldVersion=%d newVersion=%d newState=%v: %w", + id, oldVersion, newVersion, newState, err, ) } if rowsAffected != 1 { return fmt.Errorf( "version mismatch for batch update: id=%q expected_version=%d newState=%v: %w", - id, version, newState, storage.ErrVersionMismatch, + id, oldVersion, newState, storage.ErrVersionMismatch, ) } return nil } -// UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version. -// If versions do not match, returns ErrVersionMismatch. The implementation increments the version by 1 atomically. -func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) (retErr error) { +// UpdateScoreAndState atomically updates the score and state of a batch and the version to newVersion +// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. +// Version arithmetic is owned by the caller; this is a pure conditional write. +func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) (retErr error) { op := metrics.Begin(s.scope, "update_score_and_state") defer func() { op.Complete(retErr) }() result, err := s.db.ExecContext(ctx, - "UPDATE batch SET score = ?, state = ?, version = version + 1 WHERE id = ? AND version = ?", - score, newState, id, version, + "UPDATE batch SET score = ?, state = ?, version = ? WHERE id = ? AND version = ?", + score, newState, newVersion, id, oldVersion, ) if err != nil { return fmt.Errorf( - "failed to update batch score and state for id=%q version=%d score=%f newState=%v: %w", - id, version, score, newState, err, + "failed to update batch score and state for id=%q oldVersion=%d newVersion=%d score=%f newState=%v: %w", + id, oldVersion, newVersion, score, newState, err, ) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf( - "failed to get rows affected from update score and state for id=%q version=%d score=%f newState=%v: %w", - id, version, score, newState, err, + "failed to get rows affected from update score and state for id=%q oldVersion=%d newVersion=%d score=%f newState=%v: %w", + id, oldVersion, newVersion, score, newState, err, ) } if rowsAffected != 1 { return fmt.Errorf( "version mismatch for batch update score and state: id=%q expected_version=%d score=%f newState=%v: %w", - id, version, score, newState, storage.ErrVersionMismatch, + id, oldVersion, score, newState, storage.ErrVersionMismatch, ) } diff --git a/extension/storage/mysql/request_store.go b/extension/storage/mysql/request_store.go index fc4cb3f4..078cf954 100644 --- a/extension/storage/mysql/request_store.go +++ b/extension/storage/mysql/request_store.go @@ -95,35 +95,36 @@ func (r *requestStore) Create(ctx context.Context, request entity.Request) (retE return nil } -// UpdateState updates the state of a land request if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. -// The implementation increments the version by 1 atomically with the state update. -func (r *requestStore) UpdateState(ctx context.Context, id string, version int32, newState entity.RequestState) (retErr error) { +// UpdateState updates the state of a land request to newState and the version to newVersion +// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. +// Version arithmetic is owned by the caller; this is a pure conditional write. +func (r *requestStore) UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.RequestState) (retErr error) { op := metrics.Begin(r.scope, "update_state") defer func() { op.Complete(retErr) }() result, err := r.db.ExecContext(ctx, - "UPDATE request SET state = ?, version = version + 1 WHERE id = ? AND version = ?", - newState, id, version, + "UPDATE request SET state = ?, version = ? WHERE id = ? AND version = ?", + newState, newVersion, id, oldVersion, ) if err != nil { return fmt.Errorf( - "failed to update request state for id=%q version=%d newState=%v: %w", - id, version, newState, err, + "failed to update request state for id=%q oldVersion=%d newVersion=%d newState=%v: %w", + id, oldVersion, newVersion, newState, err, ) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf( - "failed to get rows affected from update for id=%q version=%d newState=%v: %w", - id, version, newState, err, + "failed to get rows affected from update for id=%q oldVersion=%d newVersion=%d newState=%v: %w", + id, oldVersion, newVersion, newState, err, ) } if rowsAffected != 1 { return fmt.Errorf( "version mismatch for request update: id=%q expected_version=%d newState=%v: %w", - id, version, newState, storage.ErrVersionMismatch, + id, oldVersion, newState, storage.ErrVersionMismatch, ) } diff --git a/extension/storage/request_store.go b/extension/storage/request_store.go index 49a3a129..1abd87a2 100644 --- a/extension/storage/request_store.go +++ b/extension/storage/request_store.go @@ -31,7 +31,8 @@ type RequestStore interface { // Returns ErrAlreadyExists if a request with the same ID already exists. Create(ctx context.Context, request entity.Request) error - // UpdateState updates the state of a land request if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch. - // The implementation should increment the version by 1 atomically with the state update. - UpdateState(ctx context.Context, id string, version int32, newState entity.RequestState) error + // UpdateState updates the state of a land request to newState and the version to newVersion + // if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch. + // Version arithmetic is owned by the caller; the store performs a pure conditional write. + UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.RequestState) error } diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 1a1cf9c9..ddfb5259 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -143,10 +143,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er dependents := append(existing.Dependents, batch.ID) - if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, dependents); err != nil { + newVersion := existing.Version + 1 + if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, newVersion, dependents); err != nil { c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", dep.ID, batch.ID, err) } + existing.Version = newVersion } // Create new reverse index entry for the new batch. It would be empty for now, but will be updated as new batches are created that conflict with this batch. diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index b480f3c1..91d41331 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -211,14 +211,14 @@ func TestController_Process_WithDependencies(t *testing.T) { BatchID: "test-queue/batch/1", Version: 1, }, nil) - mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/1", int32(1), gomock.Any()).Return(nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/1", int32(1), int32(2), gomock.Any()).Return(nil) // batch/2 already has an existing dependent. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ BatchID: "test-queue/batch/2", Dependents: []string{"test-queue/batch/99"}, Version: 2, }, nil) - mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(2), gomock.Any()).Return(nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(2), int32(3), gomock.Any()).Return(nil) // Create empty reverse index for the new batch. mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) diff --git a/orchestrator/controller/conclude/conclude.go b/orchestrator/controller/conclude/conclude.go index 1c454e27..a955d472 100644 --- a/orchestrator/controller/conclude/conclude.go +++ b/orchestrator/controller/conclude/conclude.go @@ -111,10 +111,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r return fmt.Errorf("failed to get request %s: %w", requestID, err) } - if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, requestState); err != nil { + newVersion := request.Version + 1 + if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, newVersion, requestState); err != nil { metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1) return fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err) } + request.Version = newVersion c.logger.Infow("updated request state", "batch_id", batch.ID, diff --git a/orchestrator/controller/conclude/conclude_test.go b/orchestrator/controller/conclude/conclude_test.go index 990e139d..45596ad7 100644 --- a/orchestrator/controller/conclude/conclude_test.go +++ b/orchestrator/controller/conclude/conclude_test.go @@ -100,11 +100,11 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, }, nil) - mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), int32(3), entity.RequestStateLanded).Return(nil) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/2").Return(entity.Request{ ID: "test-queue/2", Version: 3, State: entity.RequestStateProcessing, }, nil) - mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/2", int32(3), entity.RequestStateLanded).Return(nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/2", int32(3), int32(4), entity.RequestStateLanded).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -135,7 +135,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/5").Return(entity.Request{ ID: "test-queue/5", Version: 1, State: entity.RequestStateProcessing, }, nil) - mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/5", int32(1), entity.RequestStateError).Return(nil) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/5", int32(1), int32(2), entity.RequestStateError).Return(nil) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() @@ -248,7 +248,7 @@ func TestController_Process(t *testing.T) { mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(entity.Request{ ID: "test-queue/1", Version: 2, State: entity.RequestStateProcessing, }, nil) - mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), entity.RequestStateLanded).Return(storage.ErrVersionMismatch) + mockRequestStore.EXPECT().UpdateState(gomock.Any(), "test-queue/1", int32(2), int32(3), entity.RequestStateLanded).Return(storage.ErrVersionMismatch) mockStorage := storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() diff --git a/orchestrator/controller/score/score.go b/orchestrator/controller/score/score.go index f3f3b687..87d5d222 100644 --- a/orchestrator/controller/score/score.go +++ b/orchestrator/controller/score/score.go @@ -107,10 +107,12 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } // Atomically update score and state to "scored" in the database - if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, batchScore, entity.BatchStateScored); err != nil { + newVersion := batch.Version + 1 + if err := c.store.GetBatchStore().UpdateScoreAndState(ctx, batch.ID, batch.Version, newVersion, batchScore, entity.BatchStateScored); err != nil { c.metricsScope.Counter("storage_errors").Inc(1) return fmt.Errorf("failed to update score for batch %s: %w", batch.ID, err) } + batch.Version = newVersion c.logger.Infow("scored batch", "batch_id", batch.ID, diff --git a/orchestrator/controller/score/score_test.go b/orchestrator/controller/score/score_test.go index 6face5d7..1ed89a00 100644 --- a/orchestrator/controller/score/score_test.go +++ b/orchestrator/controller/score/score_test.go @@ -68,7 +68,7 @@ func testRequest() entity.Request { func newMockStorage(ctrl *gomock.Controller, batch entity.Batch, request entity.Request) *storagemock.MockStorage { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil).AnyTimes() - mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, gomock.Any(), entity.BatchStateScored).Return(nil).AnyTimes() + mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, gomock.Any(), entity.BatchStateScored).Return(nil).AnyTimes() mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil).AnyTimes() @@ -169,7 +169,7 @@ func TestController_Process_MultipleRequests_MinScore(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) // Expect the multiplicative score (0.9 * 0.6 = 0.54) to be persisted - mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, 0.54, entity.BatchStateScored).Return(nil) + mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, 0.54, entity.BatchStateScored).Return(nil) mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), "test-queue/1").Return(request1, nil) @@ -253,7 +253,7 @@ func TestController_Process_UpdateScoreFailure(t *testing.T) { mockBatchStore := storagemock.NewMockBatchStore(ctrl) mockBatchStore.EXPECT().Get(gomock.Any(), batch.ID).Return(batch, nil) - mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, gomock.Any(), entity.BatchStateScored).Return(fmt.Errorf("version mismatch")) + mockBatchStore.EXPECT().UpdateScoreAndState(gomock.Any(), batch.ID, batch.Version, batch.Version+1, gomock.Any(), entity.BatchStateScored).Return(fmt.Errorf("version mismatch")) mockRequestStore := storagemock.NewMockRequestStore(ctrl) mockRequestStore.EXPECT().Get(gomock.Any(), request.ID).Return(request, nil) diff --git a/test/integration/extension/storage/suite.go b/test/integration/extension/storage/suite.go index 77825d5f..fe1e65d5 100644 --- a/test/integration/extension/storage/suite.go +++ b/test/integration/extension/storage/suite.go @@ -143,7 +143,7 @@ func (s *StorageContractSuite) TestStorage_UpdateState() { require.NoError(t, err) // Update state - err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, request.Version, entity.RequestStateProcessing) + err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, request.Version, request.Version+1, entity.RequestStateProcessing) require.NoError(t, err, "failed to update request state") // Verify update @@ -173,11 +173,11 @@ func (s *StorageContractSuite) TestStorage_OptimisticLocking() { require.NoError(t, err) // Update with correct version - err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, 1, entity.RequestStateProcessing) + err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, 1, 2, entity.RequestStateProcessing) require.NoError(t, err, "update with correct version should succeed") // Try to update with stale version (should fail) - err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, 1, entity.RequestStateLanded) + err = s.storage.GetRequestStore().UpdateState(ctx, request.ID, 1, 2, entity.RequestStateLanded) assert.Error(t, err, "update with stale version should fail") assert.ErrorIs(t, err, storage.ErrVersionMismatch, "should return ErrVersionMismatch")