Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
BatchStateUnknown BatchState = ""
// BatchStateCreated is the state of a batch that has been created for processing.
BatchStateCreated BatchState = "created"
// BatchStateReady is the state of a batch that has been persisted and is ready for scoring.
BatchStateReady BatchState = "ready"
// BatchStateSpeculating is the state of a batch that is undergoing speculative execution.
BatchStateSpeculating BatchState = "speculating"
// BatchStateFinalizing is the state of a batch that is being finalized after speculative execution.
Expand Down
1 change: 1 addition & 0 deletions orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//core/consumer",
"//core/errs",
"//core/metrics",
"//entity",
"//entity/queue",
"//extension/counter",
Expand Down
103 changes: 86 additions & 17 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package batch

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/counter"
Expand Down Expand Up @@ -54,8 +56,9 @@ func NewController(
// Process processes a batch delivery from the queue.
// Deserializes the request, groups into batch, and publishes to the score topic.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
op := metrics.Begin(c.metricsScope, "process")
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

Expand All @@ -68,7 +71,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize request: %w", err)
}
Expand All @@ -83,15 +85,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
)

// Generate a globally unique batch ID.
counterOp := metrics.Begin(c.metricsScope, "counter_next")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counter has its one metrics, so this is probably redundant?

seq, err := c.counter.Next(ctx, "batch/"+request.Queue)
counterOp.Complete(err)
if err != nil {
c.logger.Errorw("failed to generate batch ID",
"request_id", request.ID,
"queue", request.Queue,
"error", err,
)
c.metricsScope.Counter("counter_errors").Inc(1)
return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)
return errs.NewRetryableError(fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz do not default to retryables

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see other review for details

}

batch := entity.Batch{
Expand All @@ -102,20 +105,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
Version: 1,
}

// Persist batch to storage.
// ErrAlreadyExists should never happen since batch IDs are generated from a unique counter.
batchCreateOp := metrics.Begin(c.metricsScope, "batch_store_create")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

if err := c.store.GetBatchStore().Create(ctx, batch); err != nil {
batchCreateOp.Complete(err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adhoc approach to scope management is fragile
what if some code path below won't return (could be another code change that makes it such). Complete would be called twice.
Prefer defer or another function wrapper to scope the measured block into.

c.logger.Errorw("failed to create batch in storage",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we surface error up, avoid logging as it would most likely result in double logging.
Logging should be done at where the error chain terminates, typically the highest function in the call stack

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all QueueControllers, i guess we just log at the consumer.go and leave it as is. For API controllers, we just log at the handlers which would be implemented in go-code?

So i think we should probably never log anywhere unless we are swallowing some errors (ideally we never need to)

"batch_id", batch.ID,
"error", err,
)
if errors.Is(err, storage.ErrAlreadyExists) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not expect it to ever happen if counter is correct, do not need special processing to this error, just fire it up

return fmt.Errorf("unexpected duplicate batch ID=%s: %w", batch.ID, err)
}
return errs.NewRetryableError(fmt.Errorf("failed to create batch: %w", err))
}
batchCreateOp.Complete(nil)

// Get active batches for this queue to set as dependencies.
getActiveOp := metrics.Begin(c.metricsScope, "batch_store_get_active")
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
entity.BatchStateCreated,
entity.BatchStateReady,
entity.BatchStateSpeculating,
entity.BatchStateFinalizing,
})
getActiveOp.Complete(err)
if err != nil {
c.logger.Errorw("failed to get active batches",
"request_id", request.ID,
"queue", request.Queue,
"error", err,
)
c.metricsScope.Counter("batch_store_errors").Inc(1)
return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)
return errs.NewRetryableError(fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err))
}

for _, dep := range activeBatches {
Expand All @@ -132,51 +152,100 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
bd := entity.BatchDependent{
BatchID: dep.ID,
Dependents: []string{batch.ID},
Version: 1,
}

// Get existing dependents for this batch (already in store)
getDepOp := metrics.Begin(c.metricsScope, "batch_dependent_get")
existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID)
if err != nil && !storage.IsNotFound(err) {
getDepOp.Complete(err)
c.logger.Errorw("failed to get existing batch dependent",
"batch_id", dep.ID,
"error", err,
)
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err)
return errs.NewRetryableError(fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err))
}
getDepOp.Complete(nil)

if err == nil {
// Existing record found — update with merged dependents list.
// Note: existing.Dependents may have batches that are in "new" state
// indicating errors in previous batch creation pipeline. "new"
// should not be considered an active state for further processing. The
// callers of the batch dependents store should check for this.
bd.Dependents = append(existing.Dependents, bd.Dependents...)
updateOp := metrics.Begin(c.metricsScope, "batch_dependent_update")
if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, bd.Dependents); err != nil {
updateOp.Complete(err)
c.logger.Errorw("failed to update batch dependent",
"batch_id", dep.ID,
"error", err,
)
return errs.NewRetryableError(fmt.Errorf("failed to update batch dependent for batchID=%s: %w", dep.ID, err))
}
updateOp.Complete(nil)
c.logger.Debugw("updated batch dependent",
"batch_id", dep.ID,
"dependent_count", len(bd.Dependents),
)
} else {
// No existing record — create new batch dependent entry.
createDepOp := metrics.Begin(c.metricsScope, "batch_dependent_create")
createErr := c.store.GetBatchDependentStore().Create(ctx, bd)
if createErr != nil && !errors.Is(createErr, storage.ErrAlreadyExists) {
createDepOp.Complete(createErr)
c.logger.Errorw("failed to create batch dependent",
"batch_id", dep.ID,
"error", createErr,
)
return errs.NewRetryableError(fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, createErr))
}
createDepOp.Complete(nil)
c.logger.Debugw("created batch dependent",
"batch_id", dep.ID,
"dependent_batch_id", batch.ID,
)
}
}

// TODO:
// - Add batch to DB
// - Add to batch dependent DB
// Transition batch state from created to ready.
updateStateOp := metrics.Begin(c.metricsScope, "batch_store_update_state")
if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, entity.BatchStateReady); err != nil {
updateStateOp.Complete(err)
c.logger.Errorw("failed to update batch state to ready",
"batch_id", batch.ID,
"error", err,
)
return errs.NewRetryableError(fmt.Errorf("failed to update batch state to ready: %w", err))
}
updateStateOp.Complete(nil)

c.logger.Infow("batch created",
c.logger.Infow("batch ready",
"batch_id", batch.ID,
"request_id", request.ID,
"queue", request.Queue,
"dependency_count", len(batch.Dependencies),
)

// Publish to score topic
publishOp := metrics.Begin(c.metricsScope, "publish")
if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil {
publishOp.Complete(err)
c.logger.Errorw("failed to publish output",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyScore,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err))
}
publishOp.Complete(nil)

c.logger.Infow("published batch to score",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyScore,
)

c.metricsScope.Counter("processed").Inc(1)

return nil // Success - message will be acked
}

Expand Down
Loading