-
Notifications
You must be signed in to change notification settings - Fork 0
feat(controller) Persist batch and batch dependent entities in store #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,11 +2,13 @@ package batch | |
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
|
|
||
| "github.com/uber-go/tally/v4" | ||
| "github.com/uber/submitqueue/core/consumer" | ||
| "github.com/uber/submitqueue/core/errs" | ||
| "github.com/uber/submitqueue/core/metrics" | ||
| "github.com/uber/submitqueue/entity" | ||
| entityqueue "github.com/uber/submitqueue/entity/queue" | ||
| "github.com/uber/submitqueue/extension/counter" | ||
|
|
@@ -54,8 +56,9 @@ func NewController( | |
| // Process processes a batch delivery from the queue. | ||
| // Deserializes the request, groups into batch, and publishes to the score topic. | ||
| // Returns nil to ack (success), or error to nack (retry). | ||
| func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { | ||
| c.metricsScope.Counter("received").Inc(1) | ||
| func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { | ||
| op := metrics.Begin(c.metricsScope, "process") | ||
| defer func() { op.Complete(retErr) }() | ||
|
|
||
| msg := delivery.Message() | ||
|
|
||
|
|
@@ -68,7 +71,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er | |
| "attempt", delivery.Attempt(), | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("deserialize_errors").Inc(1) | ||
| // Non-retryable: malformed messages will never succeed regardless of retry count | ||
| return fmt.Errorf("failed to deserialize request: %w", err) | ||
| } | ||
|
|
@@ -83,15 +85,16 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er | |
| ) | ||
|
|
||
| // Generate a globally unique batch ID. | ||
| counterOp := metrics.Begin(c.metricsScope, "counter_next") | ||
| seq, err := c.counter.Next(ctx, "batch/"+request.Queue) | ||
| counterOp.Complete(err) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to generate batch ID", | ||
| "request_id", request.ID, | ||
| "queue", request.Queue, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("counter_errors").Inc(1) | ||
| return fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to generate batch ID for queue=%s: %w", request.Queue, err)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz do not default to retryables
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see other review for details |
||
| } | ||
|
|
||
| batch := entity.Batch{ | ||
|
|
@@ -102,20 +105,37 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er | |
| Version: 1, | ||
| } | ||
|
|
||
| // Persist batch to storage. | ||
| // ErrAlreadyExists should never happen since batch IDs are generated from a unique counter. | ||
| batchCreateOp := metrics.Begin(c.metricsScope, "batch_store_create") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
| if err := c.store.GetBatchStore().Create(ctx, batch); err != nil { | ||
| batchCreateOp.Complete(err) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adhoc approach to scope management is fragile |
||
| c.logger.Errorw("failed to create batch in storage", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as long as we surface error up, avoid logging as it would most likely result in double logging.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For all QueueControllers, i guess we just log at the consumer.go and leave it as is. For API controllers, we just log at the handlers which would be implemented in go-code? So i think we should probably never log anywhere unless we are swallowing some errors (ideally we never need to) |
||
| "batch_id", batch.ID, | ||
| "error", err, | ||
| ) | ||
| if errors.Is(err, storage.ErrAlreadyExists) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not expect it to ever happen if counter is correct, do not need special processing to this error, just fire it up |
||
| return fmt.Errorf("unexpected duplicate batch ID=%s: %w", batch.ID, err) | ||
| } | ||
| return errs.NewRetryableError(fmt.Errorf("failed to create batch: %w", err)) | ||
| } | ||
| batchCreateOp.Complete(nil) | ||
|
|
||
| // Get active batches for this queue to set as dependencies. | ||
| getActiveOp := metrics.Begin(c.metricsScope, "batch_store_get_active") | ||
| activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{ | ||
| entity.BatchStateCreated, | ||
| entity.BatchStateReady, | ||
| entity.BatchStateSpeculating, | ||
| entity.BatchStateFinalizing, | ||
| }) | ||
| getActiveOp.Complete(err) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to get active batches", | ||
| "request_id", request.ID, | ||
| "queue", request.Queue, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("batch_store_errors").Inc(1) | ||
| return fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to get active batches for queue=%s: %w", request.Queue, err)) | ||
| } | ||
|
|
||
| for _, dep := range activeBatches { | ||
|
|
@@ -132,51 +152,100 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er | |
| bd := entity.BatchDependent{ | ||
| BatchID: dep.ID, | ||
| Dependents: []string{batch.ID}, | ||
| Version: 1, | ||
| } | ||
|
|
||
| // Get existing dependents for this batch (already in store) | ||
| getDepOp := metrics.Begin(c.metricsScope, "batch_dependent_get") | ||
| existing, err := c.store.GetBatchDependentStore().Get(ctx, dep.ID) | ||
| if err != nil && !storage.IsNotFound(err) { | ||
| getDepOp.Complete(err) | ||
| c.logger.Errorw("failed to get existing batch dependent", | ||
| "batch_id", dep.ID, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("batch_dependent_store_errors").Inc(1) | ||
| return fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to get batch dependent for batchID=%s: %w", dep.ID, err)) | ||
| } | ||
| getDepOp.Complete(nil) | ||
|
|
||
| if err == nil { | ||
| // Existing record found — update with merged dependents list. | ||
| // Note: existing.Dependents may have batches that are in "new" state | ||
| // indicating errors in previous batch creation pipeline. "new" | ||
| // should not be considered an active state for further processing. The | ||
| // callers of the batch dependents store should check for this. | ||
| bd.Dependents = append(existing.Dependents, bd.Dependents...) | ||
| updateOp := metrics.Begin(c.metricsScope, "batch_dependent_update") | ||
| if err := c.store.GetBatchDependentStore().UpdateDependents(ctx, dep.ID, existing.Version, bd.Dependents); err != nil { | ||
| updateOp.Complete(err) | ||
| c.logger.Errorw("failed to update batch dependent", | ||
| "batch_id", dep.ID, | ||
| "error", err, | ||
| ) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to update batch dependent for batchID=%s: %w", dep.ID, err)) | ||
| } | ||
| updateOp.Complete(nil) | ||
| c.logger.Debugw("updated batch dependent", | ||
| "batch_id", dep.ID, | ||
| "dependent_count", len(bd.Dependents), | ||
| ) | ||
| } else { | ||
| // No existing record — create new batch dependent entry. | ||
| createDepOp := metrics.Begin(c.metricsScope, "batch_dependent_create") | ||
| createErr := c.store.GetBatchDependentStore().Create(ctx, bd) | ||
| if createErr != nil && !errors.Is(createErr, storage.ErrAlreadyExists) { | ||
| createDepOp.Complete(createErr) | ||
| c.logger.Errorw("failed to create batch dependent", | ||
| "batch_id", dep.ID, | ||
| "error", createErr, | ||
| ) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, createErr)) | ||
| } | ||
| createDepOp.Complete(nil) | ||
| c.logger.Debugw("created batch dependent", | ||
| "batch_id", dep.ID, | ||
| "dependent_batch_id", batch.ID, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| // TODO: | ||
| // - Add batch to DB | ||
| // - Add to batch dependent DB | ||
| // Transition batch state from created to ready. | ||
| updateStateOp := metrics.Begin(c.metricsScope, "batch_store_update_state") | ||
| if err := c.store.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, entity.BatchStateReady); err != nil { | ||
| updateStateOp.Complete(err) | ||
| c.logger.Errorw("failed to update batch state to ready", | ||
| "batch_id", batch.ID, | ||
| "error", err, | ||
| ) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to update batch state to ready: %w", err)) | ||
| } | ||
| updateStateOp.Complete(nil) | ||
|
|
||
| c.logger.Infow("batch created", | ||
| c.logger.Infow("batch ready", | ||
| "batch_id", batch.ID, | ||
| "request_id", request.ID, | ||
| "queue", request.Queue, | ||
| "dependency_count", len(batch.Dependencies), | ||
| ) | ||
|
|
||
| // Publish to score topic | ||
| publishOp := metrics.Begin(c.metricsScope, "publish") | ||
| if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil { | ||
| publishOp.Complete(err) | ||
| c.logger.Errorw("failed to publish output", | ||
| "batch_id", batch.ID, | ||
| "topic_key", consumer.TopicKeyScore, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("publish_errors").Inc(1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to publish to score: %w", err)) | ||
| } | ||
| publishOp.Complete(nil) | ||
|
|
||
| c.logger.Infow("published batch to score", | ||
| "batch_id", batch.ID, | ||
| "topic_key", consumer.TopicKeyScore, | ||
| ) | ||
|
|
||
| c.metricsScope.Counter("processed").Inc(1) | ||
|
|
||
| return nil // Success - message will be acked | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
counter has its one metrics, so this is probably redundant?