Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ const (
BatchStateCreated BatchState = "created"
// BatchStateSpeculating is the state of a batch that is undergoing speculative execution.
BatchStateSpeculating BatchState = "speculating"
// BatchStateFinalizing is the state of a batch that is being finalized after speculative execution.
BatchStateFinalizing BatchState = "finalizing"
// BatchStateSucceeded is the terminal state of a batch that has been successfully landed.
BatchStateSucceeded BatchState = "succeeded"
// BatchStateFailed is the terminal state of a batch that has failed.
Expand Down
1 change: 0 additions & 1 deletion entity/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ func TestBatchState_IsTerminal(t *testing.T) {
{name: "unknown", state: BatchStateUnknown, terminal: false},
{name: "created", state: BatchStateCreated, terminal: false},
{name: "speculating", state: BatchStateSpeculating, terminal: false},
{name: "finalizing", state: BatchStateFinalizing, terminal: false},
{name: "succeeded", state: BatchStateSucceeded, terminal: true},
{name: "failed", state: BatchStateFailed, terminal: true},
{name: "cancelled", state: BatchStateCancelled, terminal: true},
Expand Down
2 changes: 2 additions & 0 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ go_library(
"//core/consumer",
"//extension/counter",
"//extension/counter/mysql",
"//extension/landprovider",
"//extension/landprovider/github",
"//extension/mergechecker",
"//extension/mergechecker/github",
"//extension/queue",
Expand Down
32 changes: 30 additions & 2 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/extension/counter"
mysqlcounter "github.com/uber/submitqueue/extension/counter/mysql"
"github.com/uber/submitqueue/extension/landprovider"
"github.com/uber/submitqueue/extension/mergechecker"
githubchecker "github.com/uber/submitqueue/extension/mergechecker/github"
githublander "github.com/uber/submitqueue/extension/landprovider/github"
extqueue "github.com/uber/submitqueue/extension/queue"
queueMySQL "github.com/uber/submitqueue/extension/queue/mysql"
mysqlstorage "github.com/uber/submitqueue/extension/storage/mysql"
Expand Down Expand Up @@ -160,8 +162,11 @@ func run() error {
// Create merge checker
mc := newMergeChecker(logger, scope)

// Create land provider
lp := newLandProvider(logger, scope)

// Register controllers
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, cnt, store); err != nil {
if err := registerControllers(c, logger.Sugar(), scope, registry, mc, lp, cnt, store); err != nil {
return err
}

Expand Down Expand Up @@ -298,7 +303,7 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
// → merge → merge-signal
// finalize (terminal)

func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, cnt counter.Counter, store storage.Storage) error {
func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope tally.Scope, registry consumer.TopicRegistry, mc mergechecker.MergeChecker, lp landprovider.LandProvider, cnt counter.Counter, store storage.Storage) error {
requestController := request.NewController(
logger,
scope,
Expand Down Expand Up @@ -361,6 +366,8 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
logger,
scope,
registry,
lp,
store,
consumer.TopicKeyToMerge,
"orchestrator-merge",
)
Expand Down Expand Up @@ -418,6 +425,27 @@ func newMergeChecker(logger *zap.Logger, scope tally.Scope) mergechecker.MergeCh
})
}

// newLandProvider creates a LandProvider for GitHub (github.com).
// Configured via GITHUB_TOKEN and GITHUB_API_URL environment variables.
func newLandProvider(logger *zap.Logger, scope tally.Scope) landprovider.LandProvider {
apiURL := os.Getenv("GITHUB_API_URL")
if apiURL == "" {
apiURL = "https://api.github.com"
}

httpClient := &http.Client{}
if token := os.Getenv("GITHUB_TOKEN"); token != "" {
httpClient.Transport = &bearerTransport{token: token}
}

return githublander.NewLandProvider(githublander.Params{
HTTPClient: httpClient,
APIURL: apiURL,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("landprovider"),
})
}

// bearerTransport is an http.RoundTripper that adds a Bearer token to requests.
type bearerTransport struct {
token string
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
activeBatches, err := c.store.GetBatchStore().GetByQueueAndStates(ctx, request.Queue, []entity.BatchState{
entity.BatchStateCreated,
entity.BatchStateSpeculating,
entity.BatchStateFinalizing,
})
if err != nil {
c.logger.Errorw("failed to get active batches",
Expand Down
5 changes: 5 additions & 0 deletions orchestrator/controller/merge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ go_library(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/landprovider",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -24,7 +26,10 @@ go_test(
"//core/errs",
"//entity",
"//entity/queue",
"//extension/landprovider",
"//extension/landprovider/mock",
"//extension/queue/mock",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
169 changes: 129 additions & 40 deletions orchestrator/controller/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@ import (
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/entity"
entityqueue "github.com/uber/submitqueue/entity/queue"
"github.com/uber/submitqueue/extension/landprovider"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)

// Controller handles merge queue messages.
// It consumes merge requests, performs merges, and publishes results to the merge signal stage.
// It consumes batches, fetches the contained requests from storage,
// lands the changes via the LandProvider, and publishes results
// to the merge-signal and speculator stages.
// Implements consumer.Controller interface for integration with the consumer.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry consumer.TopicRegistry
landProvider landprovider.LandProvider
storage storage.Storage
topicKey consumer.TopicKey
consumerGroup string
}
Expand All @@ -31,82 +37,165 @@ func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
registry consumer.TopicRegistry,
landProvider landprovider.LandProvider,
storage storage.Storage,
topicKey consumer.TopicKey,
consumerGroup string,
) *Controller {
return &Controller{
logger: logger.Named("merge_controller"),
metricsScope: scope.SubScope("merge_controller"),
registry: registry,
landProvider: landProvider,
storage: storage,
topicKey: topicKey,
consumerGroup: consumerGroup,
}
}

// Process processes a merge delivery from the queue.
// Deserializes the request, performs the merge, and publishes to the merge signal topic.
// Extracts the batch ID from the message, fetches the batch from storage,
// lands the changes, and publishes the batch ID to downstream topics.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)

msg := delivery.Message()
batchID := string(msg.Payload)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to define a serialization format for this. Using direct string-to-byte conversion is Golang-specific.
Probably a separate serializable structure.


// Deserialize request entity
request, err := entity.RequestFromBytes(msg.Payload)
if err != nil {
c.logger.Errorw("failed to deserialize request",
c.logger.Infow("received merge event",
"batch_id", batchID,
"attempt", delivery.Attempt(),
"partition_key", msg.PartitionKey,
)

if batchID == "" {
c.logger.Errorw("empty batch ID in message",
"message_id", msg.ID,
"partition_key", msg.PartitionKey,
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize request: %w", err)
return fmt.Errorf("empty batch ID in message %s", msg.ID)
}

c.logger.Infow("received merge event",
"request_id", request.ID,
"queue", request.Queue,
"state", string(request.State),
"version", request.Version,
"attempt", delivery.Attempt(),
"partition_key", msg.PartitionKey,
)

// TODO: Add merge logic
// - Perform source control merge operation
// - Handle merge conflicts

// Publish to merge signal topic
if err := c.publish(ctx, consumer.TopicKeyMergeSignal, request); err != nil {
c.logger.Errorw("failed to publish output",
"request_id", request.ID,
"topic_key", consumer.TopicKeyMergeSignal,
// Fetch the batch from storage — this is the single source of truth.
// Terminal states mean the land outcome was already persisted on a
// previous attempt; skip straight to publish.
batch, err := c.storage.GetBatchStore().Get(ctx, batchID)
if err != nil {
c.logger.Errorw("failed to fetch batch",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as we return error to upstream, we do not have to log. Otherwise most likely we'll log twice.
Logging should happen at error chain termination (top-most) point.

"batch_id", batchID,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to merge-signal: %w", err))
c.metricsScope.Counter("batch_fetch_errors").Inc(1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz rebase and integrate with new metrics infrastructure that processes errors automagically

return errs.NewRetryableError(fmt.Errorf("failed to fetch batch %s: %w", batchID, err))
}

c.logger.Infow("published request to next stage",
"request_id", request.ID,
"topic_key", consumer.TopicKeyMergeSignal,
if !batch.State.IsTerminal() {
newState, err := c.land(ctx, batch)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also process the land error "already merged" for another idempotency check.
If it is for the future, add TODO comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the IsAlreadyLanded error to the interface in a previous PR in the stack and hooked it up to the land function

if err != nil {
return fmt.Errorf("batch %s: %w", batch.ID, err)
}

if err := c.storage.GetBatchStore().UpdateState(ctx, batch.ID, batch.Version, newState); err != nil {
c.logger.Errorw("failed to update batch state",
"batch_id", batch.ID,
"target_state", string(newState),
"error", err,
)
c.metricsScope.Counter("batch_update_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to update batch state: %w", err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should only be retryable if the error is expected type, like version mismatch, but we seem to retry on all of the errors? If the database is operational but "table does not exist" is the error, this is not retryable.

}

batch.State = newState
} else {
c.logger.Infow("batch already in terminal state, skipping to publish",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be a warning level, as this should only happen on infra errors

"batch_id", batch.ID,
"state", string(batch.State),
)
c.metricsScope.Counter("idempotent_skip").Inc(1)
}

// Publish batch ID to merge-signal and speculator topics.
// On retry, a topic that already received this message may get a duplicate;
// this is safe because downstream consumers are idempotent.
for _, key := range []consumer.TopicKey{consumer.TopicKeyMergeSignal, consumer.TopicKeyBatched} {
if err := c.publish(ctx, key, batch.ID, batch.Queue); err != nil {
c.logger.Errorw("failed to publish output",
"batch_id", batch.ID,
"topic_key", key,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return errs.NewRetryableError(fmt.Errorf("failed to publish to %s: %w", key, err))
}
}

c.logger.Infow("published batch to next stages",
"batch_id", batch.ID,
"state", string(batch.State),
"topic_keys", []string{consumer.TopicKeyMergeSignal.String(), consumer.TopicKeyBatched.String()},
)

c.metricsScope.Counter("processed").Inc(1)

return nil // Success - message will be acked
return nil
}

// publish publishes a request to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error {
payload, err := request.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize request: %w", err)
// land fetches the requests in the batch, lands them via the LandProvider,
// and classifies the outcome into a batch state.
func (c *Controller) land(ctx context.Context, batch entity.Batch) (entity.BatchState, error) {
requestStore := c.storage.GetRequestStore()
entries := make([]entity.LandEntry, 0, len(batch.Contains))

for _, requestID := range batch.Contains {
request, err := requestStore.Get(ctx, requestID)
if err != nil {
c.logger.Errorw("failed to fetch request",
"batch_id", batch.ID,
"request_id", requestID,
"error", err,
)
c.metricsScope.Counter("request_fetch_errors").Inc(1)
return "", errs.NewRetryableError(fmt.Errorf("failed to fetch request %s: %w", requestID, err))
}

entries = append(entries, entity.LandEntry{
Strategy: request.LandStrategy,
Change: request.Change,
})
}

msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil)
err := c.landProvider.Land(ctx, batch.Queue, entries)

switch {
case err == nil, landprovider.IsAlreadyLanded(err):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be at least log landprovider.IsAlreadyLanded, but better tally too?
this also should not happen for a normal workflow, so good to have some observability
also, comment here scenarios which could cause this

c.logger.Infow("land succeeded",
"batch_id", batch.ID,
"request_ids", batch.Contains,
"already_landed", landprovider.IsAlreadyLanded(err),
)
return entity.BatchStateSucceeded, nil
case landprovider.IsLandRejected(err):
c.logger.Errorw("land rejected",
"batch_id", batch.ID,
"error", err,
)
c.metricsScope.Counter("land_rejected").Inc(1)
return entity.BatchStateFailed, nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to propagate the error information about why land was rejected. Can be done in another diff, but then needs a TODO comment.

default:
c.logger.Errorw("land failed",
"batch_id", batch.ID,
"error", err,
)
c.metricsScope.Counter("land_errors").Inc(1)
return "", errs.NewRetryableError(fmt.Errorf("land failed: %w", err))
}
}

// publish publishes a batch ID to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, id string, partitionKey string) error {
msg := entityqueue.NewMessage(id, []byte(id), partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
Expand Down
Loading