Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SubmitQueue is a distributed system for managing code submission workflows. It f
1. **Immutable entities** — once created, don't modify in place. Create new versions with updated fields.
2. **Eventual consistency** — handle stale reads, idempotent operations, and convergence over time.
3. **Event sourcing** — store events (what happened) rather than just current state for critical changes.
4. **Optimistic locking** — use version numbers instead of pessimistic locks. Avoid transactions; prefer optimistic concurrency and retries.
4. **Optimistic locking** — use version numbers instead of pessimistic locks. Avoid transactions; prefer optimistic concurrency and retries. **Version arithmetic lives in the controller, not the storage layer.** Update methods take both `oldVersion` (the where-clause guard) and `newVersion` (the value to write); the store performs a pure conditional write. Controllers compute `newVersion = oldVersion + 1`, call the store, and only assign `entity.Version = newVersion` after the call succeeds. Pre-incrementing in memory before the call is a bug pattern — on error the in-memory version drifts ahead of the database. See [extension/storage/README.md](extension/storage/README.md).
5. **Idempotency keys** — include unique request IDs, check for duplicates before executing.

```go
Expand All @@ -22,16 +22,12 @@ type Request struct {
UpdatedAt int64
}

// Instead of mutating, create new version
func (r Request) WithStatus(status Status) Request {
return Request{
ID: r.ID,
Version: r.Version + 1,
Status: status,
CreatedAt: r.CreatedAt,
UpdatedAt: time.Now().UnixMilli(),
}
// Controller pattern — version arithmetic outside storage, assigned only on success
newVersion := request.Version + 1
if err := store.UpdateStatus(ctx, request.ID, request.Version, newVersion, newStatus); err != nil {
return err
}
request.Version = newVersion
```

## Architecture
Expand Down
27 changes: 27 additions & 0 deletions extension/storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Storage

Pluggable persistence interfaces for SubmitQueue entities (requests, batches, dependents, logs, etc.). Implementations live under `extension/storage/<impl>/`.

## Optimistic locking contract

Entities that support concurrent mutation carry an `int32 Version` field. Updates are conditional on the version: the write only succeeds if the persisted version matches the caller's expected version. On mismatch, the implementation returns `storage.ErrVersionMismatch`.

**Version arithmetic is owned by the controller, not the store.** Update methods take both `oldVersion` (the where-clause guard) and `newVersion` (the value to write):

```go
UpdateState(ctx, id, oldVersion, newVersion int32, newState entity.RequestState) error
```

The store performs a pure conditional write — it does not compute `oldVersion + 1` internally. This keeps the in-memory entity and the persisted row in sync without the storage layer mutating values the caller didn't supply.

### Caller pattern

```go
newVersion := entity.Version + 1
if err := store.UpdateState(ctx, entity.ID, entity.Version, newVersion, newState); err != nil {
return err // entity.Version unchanged on failure — safe to retry
}
entity.Version = newVersion // only after the write succeeded
```

The post-success assignment matters whenever the entity is read again later in the same flow. Pre-incrementing in memory before the call is a bug pattern: if the call fails and the caller swallows the error, the in-memory version is now ahead of the database and subsequent updates will fail with `ErrVersionMismatch` for non-obvious reasons.
7 changes: 4 additions & 3 deletions extension/storage/batch_dependent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type BatchDependentStore interface {
// Returns ErrAlreadyExists if the entry already exists.
Create(ctx context.Context, batchDependent entity.BatchDependent) error

// UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch.
UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) error
// UpdateDependents updates the dependents of a batch dependent and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; the store performs a pure conditional write.
UpdateDependents(ctx context.Context, batchID string, oldVersion, newVersion int32, dependents []string) error
}
16 changes: 9 additions & 7 deletions extension/storage/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ type BatchStore interface {
// Returns ErrAlreadyExists if a batch with the same ID already exists.
Create(ctx context.Context, batch entity.Batch) error

// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
// The implementation should increment the version by 1 atomically with the state update.
UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) error

// UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch. The implementation should increment the version by 1 atomically.
UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) error
// UpdateState updates the state of a batch to newState and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; the store performs a pure conditional write.
UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.BatchState) error

// UpdateScoreAndState atomically updates the score and state of a batch and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; the store performs a pure conditional write.
UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) error

// GetByQueueAndStates retrieves all batches that belong to the given queue and are in the given states.
GetByQueueAndStates(ctx context.Context, queue string, states []entity.BatchState) ([]entity.Batch, error)
Expand Down
8 changes: 4 additions & 4 deletions extension/storage/mock/batch_dependent_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions extension/storage/mock/batch_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions extension/storage/mock/request_store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions extension/storage/mysql/batch_dependent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ func (s *batchDependentStore) Create(ctx context.Context, batchDependent entity.
return nil
}

// UpdateDependents updates the dependents of a batch dependent if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch.
// The implementation increments the version by 1 atomically with the dependents update.
func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, version int32, dependents []string) (retErr error) {
// UpdateDependents updates the dependents of a batch dependent and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; this is a pure conditional write.
func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID string, oldVersion, newVersion int32, dependents []string) (retErr error) {
op := metrics.Begin(s.scope, "update_dependents")
defer func() { op.Complete(retErr) }()

Expand All @@ -104,28 +104,28 @@ func (s *batchDependentStore) UpdateDependents(ctx context.Context, batchID stri
}

result, err := s.db.ExecContext(ctx,
"UPDATE batch_dependent SET dependents = ?, version = version + 1 WHERE batch_id = ? AND version = ?",
dependentsJSON, batchID, version,
"UPDATE batch_dependent SET dependents = ?, version = ? WHERE batch_id = ? AND version = ?",
dependentsJSON, newVersion, batchID, oldVersion,
)
if err != nil {
return fmt.Errorf(
"failed to update batch dependent dependents for batchID=%q version=%d: %w",
batchID, version, err,
"failed to update batch dependent dependents for batchID=%q oldVersion=%d newVersion=%d: %w",
batchID, oldVersion, newVersion, err,
)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf(
"failed to get rows affected from update for batchID=%q version=%d: %w",
batchID, version, err,
"failed to get rows affected from update for batchID=%q oldVersion=%d newVersion=%d: %w",
batchID, oldVersion, newVersion, err,
)
}

if rowsAffected != 1 {
return fmt.Errorf(
"version mismatch for batch dependent update: batchID=%q expected_version=%d: %w",
batchID, version, storage.ErrVersionMismatch,
batchID, oldVersion, storage.ErrVersionMismatch,
)
}

Expand Down
42 changes: 22 additions & 20 deletions extension/storage/mysql/batch_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,70 +102,72 @@ func (s *batchStore) Create(ctx context.Context, batch entity.Batch) (retErr err
return nil
}

// UpdateState updates the state of a batch if the current version matches the expected version. If versions do not match, returns ErrVersionMismatch.
// The implementation increments the version by 1 atomically with the state update.
func (s *batchStore) UpdateState(ctx context.Context, id string, version int32, newState entity.BatchState) (retErr error) {
// UpdateState updates the state of a batch to newState and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; this is a pure conditional write.
func (s *batchStore) UpdateState(ctx context.Context, id string, oldVersion, newVersion int32, newState entity.BatchState) (retErr error) {
op := metrics.Begin(s.scope, "update_state")
defer func() { op.Complete(retErr) }()

result, err := s.db.ExecContext(ctx,
"UPDATE batch SET state = ?, version = version + 1 WHERE id = ? AND version = ?",
newState, id, version,
"UPDATE batch SET state = ?, version = ? WHERE id = ? AND version = ?",
newState, newVersion, id, oldVersion,
)
if err != nil {
return fmt.Errorf(
"failed to update batch state for id=%q version=%d newState=%v: %w",
id, version, newState, err,
"failed to update batch state for id=%q oldVersion=%d newVersion=%d newState=%v: %w",
id, oldVersion, newVersion, newState, err,
)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf(
"failed to get rows affected from update for id=%q version=%d newState=%v: %w",
id, version, newState, err,
"failed to get rows affected from update for id=%q oldVersion=%d newVersion=%d newState=%v: %w",
id, oldVersion, newVersion, newState, err,
)
}

if rowsAffected != 1 {
return fmt.Errorf(
"version mismatch for batch update: id=%q expected_version=%d newState=%v: %w",
id, version, newState, storage.ErrVersionMismatch,
id, oldVersion, newState, storage.ErrVersionMismatch,
)
}

return nil
}

// UpdateScoreAndState atomically updates the score and state of a batch if the current version matches the expected version.
// If versions do not match, returns ErrVersionMismatch. The implementation increments the version by 1 atomically.
func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, version int32, score float64, newState entity.BatchState) (retErr error) {
// UpdateScoreAndState atomically updates the score and state of a batch and the version to newVersion
// if the current persisted version matches oldVersion. If versions do not match, returns ErrVersionMismatch.
// Version arithmetic is owned by the caller; this is a pure conditional write.
func (s *batchStore) UpdateScoreAndState(ctx context.Context, id string, oldVersion, newVersion int32, score float64, newState entity.BatchState) (retErr error) {
op := metrics.Begin(s.scope, "update_score_and_state")
defer func() { op.Complete(retErr) }()

result, err := s.db.ExecContext(ctx,
"UPDATE batch SET score = ?, state = ?, version = version + 1 WHERE id = ? AND version = ?",
score, newState, id, version,
"UPDATE batch SET score = ?, state = ?, version = ? WHERE id = ? AND version = ?",
score, newState, newVersion, id, oldVersion,
)
if err != nil {
return fmt.Errorf(
"failed to update batch score and state for id=%q version=%d score=%f newState=%v: %w",
id, version, score, newState, err,
"failed to update batch score and state for id=%q oldVersion=%d newVersion=%d score=%f newState=%v: %w",
id, oldVersion, newVersion, score, newState, err,
)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf(
"failed to get rows affected from update score and state for id=%q version=%d score=%f newState=%v: %w",
id, version, score, newState, err,
"failed to get rows affected from update score and state for id=%q oldVersion=%d newVersion=%d score=%f newState=%v: %w",
id, oldVersion, newVersion, score, newState, err,
)
}

if rowsAffected != 1 {
return fmt.Errorf(
"version mismatch for batch update score and state: id=%q expected_version=%d score=%f newState=%v: %w",
id, version, score, newState, storage.ErrVersionMismatch,
id, oldVersion, score, newState, storage.ErrVersionMismatch,
)
}

Expand Down
Loading
Loading