From 32b6cf3737bd81075bd33ad16bcf63af76885783 Mon Sep 17 00:00:00 2001 From: sergeyb Date: Wed, 11 Mar 2026 00:16:54 +0000 Subject: [PATCH] feat(batch): Reimplement batch controller for consistency BatchDependent is a secondary index stucture to Batch entity, make it fully eventually consistent: - Give it more comments - Always create with batch creation so next batch won't have to guess - Remove metadata, it won't be used, at least not for state tracking as it gets stale - Ensure order of operations - dependents first, empty dependent entry second, batch itself to follow, and then queue - Additional comments added explaining the further stale state reconciliation logic --- .gitignore | 1 + entity/batch.go | 25 ++++++--- entity/batch_dependent.go | 30 ++++++----- entity/batch_test.go | 6 +-- orchestrator/controller/batch/BUILD.bazel | 1 - orchestrator/controller/batch/batch.go | 56 +++++++++++++-------- orchestrator/controller/batch/batch_test.go | 15 ++++-- 7 files changed, 85 insertions(+), 49 deletions(-) diff --git a/.gitignore b/.gitignore index 899dc31e..8f6c622c 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ MODULE.bazel.lock .idea/ .claude/ .mcp.json +.mcp.json.bak .ijwb/ diff --git a/entity/batch.go b/entity/batch.go index 5d80c3a8..6bf958ea 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -51,28 +51,39 @@ func (s BatchState) IsTerminal() bool { type Batch struct { // ID is the globally unique identifier for the batch. Format: "/batch/". ID string + // Queue is the name of the queue processing the land request. Queue name is defined in the configuration and should be unique within the system. Queue string + // Contains is a list of land request IDs that are part of this batch. // Request IDs will always be part of the same queue. // // For e.g. - [queueA/1, queueA/2, queueA/3]. // Contains []string - // Dependencies is a list of batch IDs (and associated metadata) for this batch. - // Dependencies will always be part of the same queue. + + // Dependencies is a list of other batch IDs that this batch depends on. + // Dependencies will always be part of the same queue. This way batches form a directed acyclic graph (DAG). + // If a batch A depends on batch B directly, it means that some request in batch A has overlapping changed targets with + // some another request in batch B. The Dependencies list contains all the transitive closure of all the dependencies, both direct and indirect. + // The order is not specified. Only active batches are considered for dependencies, i.e. if the batch is in a terminal state, it does not need to be included. + // Because batch states are eventually consistent, dependent batches identified at the time of batch creation may move to terminal states. The interpretation logic + // should reconcile batch states separately (i.e. ignore them for processing). + // + //This field is ok to be updated whether the state of the dependency graph changes. Update should use Version property for optimistic locking. // - // For e.g - Consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3 - // such that - queueA/batch/2 and queueA/batch/3 depend on queueA/batch/1 + // Example: consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3 + // such that - queueA/batch/2 and queueA/batch/3 have overlapping targets with requests in queueA/batch/1, but queueA/batch/2 and queueA/batch/3 do not have overlapping targets with each other. // // In this case, the Dependencies field for - // - queueA/batch/1 will be empty // - queueA/batch/2 will contain queueA/batch/1 // - queueA/batch/3 will contain queueA/batch/1 - // - Dependencies []map[string]interface{} - // The state of the batch lifecycle this batch is in. + Dependencies []string + + // The state of the batch lifecycle this batch is in. Updateable field with Version for optimistic locking. State BatchState + // Version is the version of the object. It is used for optimistic locking. // Versioning starts at 1 and is incremented for each change to the object. Version int32 diff --git a/entity/batch_dependent.go b/entity/batch_dependent.go index db731190..fb237f6d 100644 --- a/entity/batch_dependent.go +++ b/entity/batch_dependent.go @@ -14,22 +14,26 @@ package entity -// BatchDependent represents the downstream batches that depend on a given batch. +// BatchDependent is the reverse index of Batch.Dependencies. While Batch.Dependencies lists the batches a batch depends on (upstream), +// BatchDependent maps a batch to the batches that depend on it (downstream). This enables efficient fan-out notifications +// when a batch completes or fails — rather than scanning all batches to find which ones reference a given dependency, +// the system can look up dependents directly. +// +// Example: consider batches queueA/batch/1, queueA/batch/2, queueA/batch/3 +// where batch/2 and batch/3 both depend on batch/1 (i.e. batch/1 is in their Dependencies list). +// +// The BatchDependent records would be: +// - BatchID=queueA/batch/1 → Dependents=[queueA/batch/2, queueA/batch/3] +// - BatchID=queueA/batch/2 → Dependents=[] (nothing depends on it) +// - BatchID=queueA/batch/3 → Dependents=[] (nothing depends on it) type BatchDependent struct { - // BatchID is the globally unique identifier representing a batch. + // BatchID is the globally unique identifier of the upstream batch. Format: "/batch/". BatchID string - // Dependents is a list of batch IDs that are dependents for this - // batch. - // - // For e.g - Consider batches - queueA/batch/1, queueA/batch/2, queueA/batch/3 - // such that - queueA/batch/2 and queueA/batch/3 depend on queueA/batch/1 - // - // In this case, the Dependents field for - - // - queueA/batch/1 will be [queueA/batch/2, queueA/batch/3] - // - queueA/batch/2 will be empty - // - queueA/batch/3 will be empty - // + + // Dependents is a list of batch IDs that depend on this batch (i.e. batches whose Dependencies list contains BatchID). + // Updated as new batches are created that conflict with this batch. Uses Version for optimistic locking on updates. Dependents []string + // Version is the version of the object. It is used for optimistic locking. // Versioning starts at 1 and is incremented for each change to the object. Version int32 diff --git a/entity/batch_test.go b/entity/batch_test.go index 2d6d0ad1..58d7dc4d 100644 --- a/entity/batch_test.go +++ b/entity/batch_test.go @@ -75,9 +75,9 @@ func TestBatch_SerializationRoundTrip(t *testing.T) { ID: "queueA/batch/3", Queue: "queueA", Contains: []string{"queueA/5"}, - Dependencies: []map[string]interface{}{ - {"id": "queueA/batch/1"}, - {"id": "queueA/batch/2"}, + Dependencies: []string{ + "queueA/batch/1", + "queueA/batch/2", }, State: BatchStateCreated, Version: 1, diff --git a/orchestrator/controller/batch/BUILD.bazel b/orchestrator/controller/batch/BUILD.bazel index 1633aa71..73c66d9b 100644 --- a/orchestrator/controller/batch/BUILD.bazel +++ b/orchestrator/controller/batch/BUILD.bazel @@ -26,7 +26,6 @@ go_test( "//entity/queue", "//extension/counter/mock", "//extension/queue/mock", - "//extension/storage", "//extension/storage/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index 803de2db..1a1cf9c9 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -16,7 +16,6 @@ package batch import ( "context" - "errors" "fmt" "github.com/uber-go/tally/v4" @@ -96,6 +95,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "partition_key", msg.PartitionKey, ) + // TODO: if capacity is full, wait here for other requests to accumulate to batch them together, or include a request into an existing batch if it's not too late. + // Generate a globally unique batch ID. seq, err := c.counter.Next(ctx, "batch/"+request.Queue) if err != nil { @@ -111,6 +112,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er Version: 1, } + // TODO: run Target Analyzer to understand new batch's dependency graph, run it against other active batches to understand the conflicts. + // So far we'll just assume that the new batch conflicts with all active batches, which results in a serial non-parallelized queue. + // Get active batches for this queue to set as dependencies. activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ entity.BatchStateCreated, @@ -123,40 +127,46 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er } for _, dep := range activeBatches { - batch.Dependencies = append(batch.Dependencies, map[string]interface{}{ - "id": dep.ID, - "state": string(dep.State), - }) + batch.Dependencies = append(batch.Dependencies, dep.ID) } // Create batch dependent entities (reverse relationship of batch.Dependencies). // For each dependency, record the new batch as a dependent. // If existing dependents are found in the store, append them. for _, dep := range activeBatches { - bd := entity.BatchDependent{ - BatchID: dep.ID, - Dependents: []string{batch.ID}, - } - + // Get existing reverse index entry for the dependency. existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID) - if err != nil && !storage.IsNotFound(err) { + if err != nil { c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err) } - if err == nil { - bd.Dependents = append(existing.Dependents, bd.Dependents...) - } - if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + dependents := append(existing.Dependents, batch.ID) + + if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, dependents); err != nil { c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) - return fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, err) + return fmt.Errorf("failed to update batch dependent index for existing batchID=%s and new batchID=%s: %w", dep.ID, batch.ID, err) } } - // Persist batch to storage (idempotent — ErrAlreadyExists means a retry) - if err := c.store.GetBatchStore().Create(ctx, batch); err != nil && !errors.Is(err, storage.ErrAlreadyExists) { + // Create new reverse index entry for the new batch. It would be empty for now, but will be updated as new batches are created that conflict with this batch. + bd := entity.BatchDependent{ + BatchID: batch.ID, + Dependents: []string{}, + Version: 1, + } + + if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil { + c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) + return fmt.Errorf("failed to create batch dependent index for new batchID=%s: %w", batch.ID, err) + } + + // Persist batch to storage. + // This is the final operation that concludes the batch creation process. If it fails, BatchDependents will be pointing to a batch id that does not exist. + // We do not reuse batch ids, a retry of this operation will create a new batch with a new ID. The downstream logic that operates on BatchDependent should be able to handle stale entries. + if err := c.store.GetBatchStore().Create(ctx, batch); err != nil { c.metricsScope.Counter("batch_store_errors").Inc(1) - return fmt.Errorf("failed to create batch: %w", err) + return fmt.Errorf("failed to create batch in batch store: %w", err) } c.logger.Infow("batch created", @@ -166,13 +176,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er "dependency_count", len(batch.Dependencies), ) - // Publish to score topic + // Publish to score topic for further processing. + // If it fails and the controller retries, a new batch will be created with the new batch ID but the same request ID. + // The downstream logic should be able to handle stale entries by looking at the state of the batch. if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil { c.metricsScope.Counter("publish_errors").Inc(1) - return fmt.Errorf("failed to publish to score: %w", err) + return fmt.Errorf("failed to publish batch ID to score topic: %w", err) } - c.logger.Infow("published batch to score", + c.logger.Infow("published batch to score topic", "batch_id", batch.ID, "topic_key", consumer.TopicKeyScore, ) diff --git a/orchestrator/controller/batch/batch_test.go b/orchestrator/controller/batch/batch_test.go index 70c47751..b480f3c1 100644 --- a/orchestrator/controller/batch/batch_test.go +++ b/orchestrator/controller/batch/batch_test.go @@ -28,7 +28,6 @@ import ( "github.com/uber/submitqueue/entity/queue" countermock "github.com/uber/submitqueue/extension/counter/mock" queuemock "github.com/uber/submitqueue/extension/queue/mock" - "github.com/uber/submitqueue/extension/storage" storagemock "github.com/uber/submitqueue/extension/storage/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -80,8 +79,12 @@ func newTestController(t *testing.T, ctrl *gomock.Controller, cnt *countermock.M req := testRequest() mockReqStore.EXPECT().Get(gomock.Any(), req.ID).Return(req, nil).AnyTimes() + mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) + mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + mockStorage = storagemock.NewMockStorage(ctrl) mockStorage.EXPECT().GetBatchStore().Return(mockBatchStore).AnyTimes() + mockStorage.EXPECT().GetBatchDependentStore().Return(mockBatchDependentStore).AnyTimes() mockStorage.EXPECT().GetRequestStore().Return(mockReqStore).AnyTimes() } @@ -204,13 +207,19 @@ func TestController_Process_WithDependencies(t *testing.T) { mockBatchDependentStore := storagemock.NewMockBatchDependentStore(ctrl) // batch/1 has no existing dependents. - mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{}, storage.ErrNotFound) - mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) + mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/1").Return(entity.BatchDependent{ + BatchID: "test-queue/batch/1", + Version: 1, + }, nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/1", int32(1), gomock.Any()).Return(nil) // batch/2 already has an existing dependent. mockBatchDependentStore.EXPECT().Get(gomock.Any(), "test-queue/batch/2").Return(entity.BatchDependent{ BatchID: "test-queue/batch/2", Dependents: []string{"test-queue/batch/99"}, + Version: 2, }, nil) + mockBatchDependentStore.EXPECT().UpdateDependents(gomock.Any(), "test-queue/batch/2", int32(2), gomock.Any()).Return(nil) + // Create empty reverse index for the new batch. mockBatchDependentStore.EXPECT().Create(gomock.Any(), gomock.Any()).Return(nil) mockReqStore := storagemock.NewMockRequestStore(ctrl)