-
Notifications
You must be signed in to change notification settings - Fork 0
feat(controller): hooking up merge controller #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3090b30
fb603fc
10669da
93bcc1f
f9a2c00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,16 +9,22 @@ import ( | |
| "github.com/uber/submitqueue/core/errs" | ||
| "github.com/uber/submitqueue/entity" | ||
| entityqueue "github.com/uber/submitqueue/entity/queue" | ||
| "github.com/uber/submitqueue/extension/landprovider" | ||
| "github.com/uber/submitqueue/extension/storage" | ||
| "go.uber.org/zap" | ||
| ) | ||
|
|
||
| // Controller handles merge queue messages. | ||
| // It consumes merge requests, performs merges, and publishes results to the merge signal stage. | ||
| // It consumes batches, fetches the contained requests from storage, | ||
| // lands the changes via the LandProvider, and publishes results | ||
| // to the merge-signal and speculator stages. | ||
| // Implements consumer.Controller interface for integration with the consumer. | ||
| type Controller struct { | ||
| logger *zap.SugaredLogger | ||
| metricsScope tally.Scope | ||
| registry consumer.TopicRegistry | ||
| landProvider landprovider.LandProvider | ||
| storage storage.Storage | ||
| topicKey consumer.TopicKey | ||
| consumerGroup string | ||
| } | ||
|
|
@@ -31,82 +37,165 @@ func NewController( | |
| logger *zap.SugaredLogger, | ||
| scope tally.Scope, | ||
| registry consumer.TopicRegistry, | ||
| landProvider landprovider.LandProvider, | ||
| storage storage.Storage, | ||
| topicKey consumer.TopicKey, | ||
| consumerGroup string, | ||
| ) *Controller { | ||
| return &Controller{ | ||
| logger: logger.Named("merge_controller"), | ||
| metricsScope: scope.SubScope("merge_controller"), | ||
| registry: registry, | ||
| landProvider: landProvider, | ||
| storage: storage, | ||
| topicKey: topicKey, | ||
| consumerGroup: consumerGroup, | ||
| } | ||
| } | ||
|
|
||
| // Process processes a merge delivery from the queue. | ||
| // Deserializes the request, performs the merge, and publishes to the merge signal topic. | ||
| // Extracts the batch ID from the message, fetches the batch from storage, | ||
| // lands the changes, and publishes the batch ID to downstream topics. | ||
| // Returns nil to ack (success), or error to nack (retry). | ||
| func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error { | ||
| c.metricsScope.Counter("received").Inc(1) | ||
|
|
||
| msg := delivery.Message() | ||
| batchID := string(msg.Payload) | ||
|
|
||
| // Deserialize request entity | ||
| request, err := entity.RequestFromBytes(msg.Payload) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to deserialize request", | ||
| c.logger.Infow("received merge event", | ||
| "batch_id", batchID, | ||
| "attempt", delivery.Attempt(), | ||
| "partition_key", msg.PartitionKey, | ||
| ) | ||
|
|
||
| if batchID == "" { | ||
| c.logger.Errorw("empty batch ID in message", | ||
| "message_id", msg.ID, | ||
| "partition_key", msg.PartitionKey, | ||
| "attempt", delivery.Attempt(), | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("deserialize_errors").Inc(1) | ||
| // Non-retryable: malformed messages will never succeed regardless of retry count | ||
| return fmt.Errorf("failed to deserialize request: %w", err) | ||
| return fmt.Errorf("empty batch ID in message %s", msg.ID) | ||
| } | ||
|
|
||
| c.logger.Infow("received merge event", | ||
| "request_id", request.ID, | ||
| "queue", request.Queue, | ||
| "state", string(request.State), | ||
| "version", request.Version, | ||
| "attempt", delivery.Attempt(), | ||
| "partition_key", msg.PartitionKey, | ||
| ) | ||
|
|
||
| // TODO: Add merge logic | ||
| // - Perform source control merge operation | ||
| // - Handle merge conflicts | ||
|
|
||
| // Publish to merge signal topic | ||
| if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil { | ||
| c.logger.Errorw("failed to publish output", | ||
| "request_id", request.ID, | ||
| "topic_key", consumer.TopicKeyMergeSignal, | ||
| // Fetch the batch from storage — this is the single source of truth. | ||
| // Terminal states mean the land outcome was already persisted on a | ||
| // previous attempt; skip straight to publish. | ||
| batch, err := c.storage.GetBatchStore().Get(ctx, batchID) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to fetch batch", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as long as we return error to upstream, we do not have to log. Otherwise most likely we'll log twice. |
||
| "batch_id", batchID, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("publish_errors").Inc(1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to publish to merge-signal: %w", err)) | ||
| c.metricsScope.Counter("batch_fetch_errors").Inc(1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz rebase and integrate with new metrics infrastructure that processes errors automagically |
||
| return errs.NewRetryableError(fmt.Errorf("failed to fetch batch %s: %w", batchID, err)) | ||
| } | ||
|
|
||
| c.logger.Infow("published request to next stage", | ||
| "request_id", request.ID, | ||
| "topic_key", consumer.TopicKeyMergeSignal, | ||
| if !batch.State.IsTerminal() { | ||
| newState, err := c.land(ctx, batch) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should also process the land error "already merged" for another idempotency check.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the |
||
| if err != nil { | ||
| return fmt.Errorf("batch %s: %w", batch.ID, err) | ||
| } | ||
|
|
||
| if err := c.storage.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newState); err != nil { | ||
| c.logger.Errorw("failed to update batch state", | ||
| "batch_id", batch.ID, | ||
| "target_state", string(newState), | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("batch_update_errors").Inc(1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to update batch state: %w", err)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should only be retryable if the error is expected type, like version mismatch, but we seem to retry on all of the errors? If the database is operational but "table does not exist" is the error, this is not retryable. |
||
| } | ||
|
|
||
| batch.State = newState | ||
| } else { | ||
| c.logger.Infow("batch already in terminal state, skipping to publish", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be a warning level, as this should only happen on infra errors |
||
| "batch_id", batch.ID, | ||
| "state", string(batch.State), | ||
| ) | ||
| c.metricsScope.Counter("idempotent_skip").Inc(1) | ||
| } | ||
|
|
||
| // Publish batch ID to merge-signal and speculator topics. | ||
| // On retry, a topic that already received this message may get a duplicate; | ||
| // this is safe because downstream consumers are idempotent. | ||
| for _, key := range []consumer.TopicKey{consumer.TopicKeyMergeSignal, consumer.TopicKeyBatched} { | ||
| if err := c.publish(ctx, key, batch.ID, batch.Queue); err != nil { | ||
| c.logger.Errorw("failed to publish output", | ||
| "batch_id", batch.ID, | ||
| "topic_key", key, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("publish_errors").Inc(1) | ||
| return errs.NewRetryableError(fmt.Errorf("failed to publish to %s: %w", key, err)) | ||
| } | ||
| } | ||
|
|
||
| c.logger.Infow("published batch to next stages", | ||
| "batch_id", batch.ID, | ||
| "state", string(batch.State), | ||
| "topic_keys", []string{consumer.TopicKeyMergeSignal.String(), consumer.TopicKeyBatched.String()}, | ||
| ) | ||
|
|
||
| c.metricsScope.Counter("processed").Inc(1) | ||
|
|
||
| return nil // Success - message will be acked | ||
| return nil | ||
| } | ||
|
|
||
| // publish publishes a request to the specified topic key. | ||
| func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { | ||
| payload, err := request.ToBytes() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to serialize request: %w", err) | ||
| // land fetches the requests in the batch, lands them via the LandProvider, | ||
| // and classifies the outcome into a batch state. | ||
| func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.BatchState, error) { | ||
| requestStore := c.storage.GetRequestStore() | ||
| entries := make([]entity.LandEntry, 0, len(batch.Contains)) | ||
|
|
||
| for _, requestID := range batch.Contains { | ||
| request, err := requestStore.Get(ctx, requestID) | ||
| if err != nil { | ||
| c.logger.Errorw("failed to fetch request", | ||
| "batch_id", batch.ID, | ||
| "request_id", requestID, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("request_fetch_errors").Inc(1) | ||
| return "", errs.NewRetryableError(fmt.Errorf("failed to fetch request %s: %w", requestID, err)) | ||
| } | ||
|
|
||
| entries = append(entries, entity.LandEntry{ | ||
| Strategy: request.LandStrategy, | ||
| Change: request.Change, | ||
| }) | ||
| } | ||
|
|
||
| msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) | ||
| err := c.landProvider.Land(ctx, batch.Queue, entries) | ||
|
|
||
| switch { | ||
| case err == nil, landprovider.IsAlreadyLanded(err): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be at least log landprovider.IsAlreadyLanded, but better tally too? |
||
| c.logger.Infow("land succeeded", | ||
| "batch_id", batch.ID, | ||
| "request_ids", batch.Contains, | ||
| "already_landed", landprovider.IsAlreadyLanded(err), | ||
| ) | ||
| return entity.BatchStateSucceeded, nil | ||
| case landprovider.IsLandRejected(err): | ||
| c.logger.Errorw("land rejected", | ||
| "batch_id", batch.ID, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("land_rejected").Inc(1) | ||
| return entity.BatchStateFailed, nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we'll need to propagate the error information about why land was rejected. Can be done in another diff, but then needs a TODO comment. |
||
| default: | ||
| c.logger.Errorw("land failed", | ||
| "batch_id", batch.ID, | ||
| "error", err, | ||
| ) | ||
| c.metricsScope.Counter("land_errors").Inc(1) | ||
| return "", errs.NewRetryableError(fmt.Errorf("land failed: %w", err)) | ||
| } | ||
| } | ||
|
|
||
| // publish publishes a batch ID to the specified topic key. | ||
| func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, id string, partitionKey string) error { | ||
| msg := entityqueue.NewMessage(id, []byte(id), partitionKey, nil) | ||
|
|
||
| q, ok := c.registry.Queue(key) | ||
| if !ok { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to define a serialization format for this. Using direct string-to-byte conversion is Golang-specific.
Probably a separate serializable structure.