Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ MODULE.bazel.lock
.idea/
.claude/
.mcp.json
.mcp.json.bak
.ijwb/


Expand Down
25 changes: 18 additions & 7 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,39 @@ func (s BatchState) IsTerminal() bool {
type Batch struct {
// ID is the globally unique identifier for the batch. Format: "<queue>/batch/<counter_value>".
ID string

// Queue is the name of the queue processing the land request. Queue name is defined in the configuration and should be unique within the system.
Queue string

// Contains is a list of land request IDs that are part of this batch.
// Request IDs will always be part of the same queue.
//
// For e.g. - [queueA/1, queueA/2, queueA/3].
//
Contains []string
// Dependencies is a list of batch IDs (and associated metadata) for this batch.
// Dependencies will always be part of the same queue.

// Dependencies is a list of other batch IDs that this batch depends on.
// Dependencies will always be part of the same queue. This way batches form a directed acyclic graph (DAG).
// If a batch A depends on batch B directly, it means that some request in batch A has overlapping changed targets with
// some another request in batch B. The Dependencies list contains all the transitive closure of all the dependencies, both direct and indirect.
// The order is not specified. Only active batches are considered for dependencies, i.e. if the batch is in a terminal state, it does not need to be included.
// Because batch states are eventually consistent, dependent batches identified at the time of batch creation may move to terminal states. The interpretation logic
// should reconcile batch states separately (i.e. ignore them for processing).
//
//This field is ok to be updated whether the state of the dependency graph changes. Update should use Version property for optimistic locking.
//
// For e.g - Consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3
// such that - queueA/batch/2 and queueA/batch/3 depend on queueA/batch/1
// Example: consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3
// such that - queueA/batch/2 and queueA/batch/3 have overlapping targets with requests in queueA/batch/1, but queueA/batch/2 and queueA/batch/3 do not have overlapping targets with each other.
//
// In this case, the Dependencies field for -
// - queueA/batch/1 will be empty
// - queueA/batch/2 will contain queueA/batch/1
// - queueA/batch/3 will contain queueA/batch/1
//
Dependencies []map[string]interface{}
Comment thread
behinddwalls marked this conversation as resolved.
// The state of the batch lifecycle this batch is in.
Dependencies []string

// The state of the batch lifecycle this batch is in. Updateable field with Version for optimistic locking.
State BatchState

// Version is the version of the object. It is used for optimistic locking.
// Versioning starts at 1 and is incremented for each change to the object.
Version int32
Expand Down
30 changes: 17 additions & 13 deletions entity/batch_dependent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@

package entity

// BatchDependent represents the downstream batches that depend on a given batch.
// BatchDependent is the reverse index of Batch.Dependencies. While Batch.Dependencies lists the batches a batch depends on (upstream),
// BatchDependent maps a batch to the batches that depend on it (downstream). This enables efficient fan-out notifications
// when a batch completes or fails — rather than scanning all batches to find which ones reference a given dependency,
// the system can look up dependents directly.
//
// Example: consider batches queueA/batch/1, queueA/batch/2, queueA/batch/3
// where batch/2 and batch/3 both depend on batch/1 (i.e. batch/1 is in their Dependencies list).
//
// The BatchDependent records would be:
// - BatchID=queueA/batch/1 → Dependents=[queueA/batch/2, queueA/batch/3]
// - BatchID=queueA/batch/2 → Dependents=[] (nothing depends on it)
// - BatchID=queueA/batch/3 → Dependents=[] (nothing depends on it)
type BatchDependent struct {
// BatchID is the globally unique identifier representing a batch.
// BatchID is the globally unique identifier of the upstream batch. Format: "<queue>/batch/<counter_value>".
BatchID string
// Dependents is a list of batch IDs that are dependents for this
// batch.
//
// For e.g - Consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3
// such that - queueA/batch/2 and queueA/batch/3 depend on queueA/batch/1
//
// In this case, the Dependents field for -
// - queueA/batch/1 will be [queueA/batch/2, queueA/batch/3]
// - queueA/batch/2 will be empty
// - queueA/batch/3 will be empty
//

// Dependents is a list of batch IDs that depend on this batch (i.e. batches whose Dependencies list contains BatchID).
// Updated as new batches are created that conflict with this batch. Uses Version for optimistic locking on updates.
Dependents []string

// Version is the version of the object. It is used for optimistic locking.
// Versioning starts at 1 and is incremented for each change to the object.
Version int32
Expand Down
6 changes: 3 additions & 3 deletions entity/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ func TestBatch_SerializationRoundTrip(t *testing.T) {
ID: "queueA/batch/3",
Queue: "queueA",
Contains: []string{"queueA/5"},
Dependencies: []map[string]interface{}{
{"id": "queueA/batch/1"},
{"id": "queueA/batch/2"},
Dependencies: []string{
"queueA/batch/1",
"queueA/batch/2",
},
State: BatchStateCreated,
Version: 1,
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_test(
"//entity/queue",
"//extension/counter/mock",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
Expand Down
56 changes: 34 additions & 22 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package batch

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
Expand Down Expand Up @@ -96,6 +95,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"partition_key", msg.PartitionKey,
)

// TODO: if capacity is full, wait here for other requests to accumulate to batch them together, or include a request into an existing batch if it's not too late.

// Generate a globally unique batch ID.
seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
if err != nil {
Expand All @@ -111,6 +112,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
Version: 1,
}

// TODO: run Target Analyzer to understand new batch's dependency graph, run it against other active batches to understand the conflicts.
// So far we'll just assume that the new batch conflicts with all active batches, which results in a serial non-parallelized queue.

// Get active batches for this queue to set as dependencies.
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
entity.BatchStateCreated,
Expand All @@ -123,40 +127,46 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
}

for _, dep := range activeBatches {
batch.Dependencies = append(batch.Dependencies, map[string]interface{}{
"id": dep.ID,
"state": string(dep.State),
})
batch.Dependencies = append(batch.Dependencies, dep.ID)
}

// Create batch dependent entities (reverse relationship of batch.Dependencies).
// For each dependency, record the new batch as a dependent.
// If existing dependents are found in the store, append them.
for _, dep := range activeBatches {
bd := entity.BatchDependent{
BatchID: dep.ID,
Dependents: []string{batch.ID},
}

// Get existing reverse index entry for the dependency.
existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID)
if err != nil && !storage.IsNotFound(err) {
if err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err)
}
if err == nil {
bd.Dependents = append(existing.Dependents, bd.Dependents...)
}

if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
dependents := append(existing.Dependents, batch.ID)

if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, dependents); err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, err)
return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", dep.ID, batch.ID, err)
}
}

// Persist batch to storage (idempotent — ErrAlreadyExists means a retry)
if err := c.store.GetBatchStore().Create(ctx, batch); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
// Create new reverse index entry for the new batch. It would be empty for now, but will be updated as new batches are created that conflict with this batch.
bd := entity.BatchDependent{
BatchID: batch.ID,
Dependents: []string{},
Version: 1,
}

if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to create batch dependent index for new batchID=%s: %w", batch.ID, err)
}

// Persist batch to storage.
// This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist.
// We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries.
if err := c.store.GetBatchStore().Create(ctx, batch); err != nil {
c.metricsScope.Counter("batch_store_errors").Inc(1)
return fmt.Errorf("failed to create batch: %w", err)
return fmt.Errorf("failed to create batch in batch store: %w", err)
}

c.logger.Infow("batch created",
Expand All @@ -166,13 +176,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"dependency_count", len(batch.Dependencies),
)

// Publish to score topic
// Publish to score topic for further processing.
// If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID.
// The downstream logic should be able to handle stale entries by looking at the state of the batch.
if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to score: %w", err)
return fmt.Errorf("failed to publish batch ID to score topic: %w", err)
}

c.logger.Infow("published batch to score",
c.logger.Infow("published batch to score topic",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyScore,
)
Expand Down
15 changes: 12 additions & 3 deletions orchestrator/controller/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/uber/submitqueue/entity/queue"
countermock "github.com/uber/submitqueue/extension/counter/mock"
queuemock "github.com/uber/submitqueue/extension/queue/mock"
"github.com/uber/submitqueue/extension/storage"
storagemock "github.com/uber/submitqueue/extension/storage/mock"
"go.uber.org/mock/gomock"
"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -80,8 +79,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M
req := testRequest()
mockReqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil).AnyTimes()

mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

mockStorage = storagemock.NewMockStorage(ctrl)
mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes()
mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes()
mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes()
}

Expand Down Expand Up @@ -204,13 +207,19 @@ func TestController_Process_WithDependencies(t *testing.T) {

mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl)
// batch/1 has no existing dependents.
mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound)
mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)
mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{
BatchID: "test-queue/batch/1",
Version: 1,
}, nil)
mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/1", int32(1), gomock.Any()).Return(nil)
// batch/2 already has an existing dependent.
mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{
BatchID: "test-queue/batch/2",
Dependents: []string{"test-queue/batch/99"},
Version: 2,
}, nil)
mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(2), gomock.Any()).Return(nil)
// Create empty reverse index for the new batch.
mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil)

mockReqStore := storagemock.NewMockRequestStore(ctrl)
Expand Down
Loading