Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
concludeController := conclude.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyConclude,
"orchestrator-conclude",
Expand Down
5 changes: 5 additions & 0 deletions orchestrator/controller/conclude/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//core/consumer",
"//core/errs",
"//core/metrics",
"//entity",
"//extension/storage",
"@com_github_uber_go_tally_v4//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -23,6 +26,8 @@ go_test(
"//entity",
"//entity/queue",
"//extension/queue/mock",
"//extension/storage",
"//extension/storage/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
75 changes: 67 additions & 8 deletions orchestrator/controller/conclude/conclude.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (

"github.com/uber-go/tally/v4"
"github.com/uber/submitqueue/core/consumer"
"github.com/uber/submitqueue/core/errs"
"github.com/uber/submitqueue/core/metrics"
"github.com/uber/submitqueue/entity"
"github.com/uber/submitqueue/extension/storage"
"go.uber.org/zap"
)

Expand All @@ -16,6 +19,7 @@ import (
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
store storage.Storage
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
Expand All @@ -28,13 +32,15 @@ var _ consumer.Controller = (*Controller)(nil)
func NewController(
logger *zap.SugaredLogger,
scope tally.Scope,
store storage.Storage,
registry consumer.TopicRegistry,
topicKey consumer.TopicKey,
consumerGroup string,
) *Controller {
return &Controller{
logger: logger.Named("conclude_controller"),
metricsScope: scope.SubScope("conclude_controller"),
store: store,
registry: registry,
topicKey: topicKey,
consumerGroup: consumerGroup,
Expand All @@ -44,8 +50,9 @@ func NewController(
// Process processes a conclude delivery from the queue.
// Deserializes the batch and completes the pipeline processing.
// Returns nil to ack (success), or error to nack (retry).
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) error {
c.metricsScope.Counter("received").Inc(1)
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
op := metrics.Begin(c.metricsScope, "process")
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

Expand All @@ -58,8 +65,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"attempt", delivery.Attempt(),
"error", err,
)
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
metrics.NamedCounter(c.metricsScope, "process", "deserialize_errors", 1)
return fmt.Errorf("failed to deserialize batch: %w", err)
}

Expand All @@ -72,12 +79,52 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
"partition_key", msg.PartitionKey,
)

// TODO: Add conclusion logic
// - Mark batch as succeeded or failed
// - Send notifications
// - Clean up resources
// TODO: Handle cancellation

c.metricsScope.Counter("processed").Inc(1)
// Map batch terminal state to request state.
// We expect the batch to be in a terminal state
// as updated by the merge controller.
requestState, err := batchStateToRequestState(batch.State)
if err != nil {
c.logger.Errorw("unexpected batch state",
"batch_id", batch.ID,
"state", string(batch.State),
)
metrics.NamedCounter(c.metricsScope, "process", "unexpected_state_errors", 1)
return fmt.Errorf("unexpected batch state %q for batch %s: %w", batch.State, batch.ID, err)
}

// Update each request's state to reflect the batch outcome.
for _, requestID := range batch.Contains {
request, err := c.store.GetRequestStore().Get(ctx, requestID)
if err != nil {
c.logger.Errorw("failed to get request from storage",
"batch_id", batch.ID,
"request_id", requestID,
"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "request_store_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to get request %s: %w", requestID, err))
}

if err := c.store.GetRequestStore().UpdateState(ctx, requestID, request.Version, requestState); err != nil {
c.logger.Errorw("failed to update request state",
"batch_id", batch.ID,
"request_id", requestID,
"from_version", request.Version,
"to_state", string(requestState),
"error", err,
)
metrics.NamedCounter(c.metricsScope, "process", "request_update_errors", 1)
return errs.NewRetryableError(fmt.Errorf("failed to update request %s state to %s: %w", requestID, requestState, err))
}

c.logger.Infow("updated request state",
"batch_id", batch.ID,
"request_id", requestID,
"new_state", string(requestState),
)
}

return nil // Success - message will be acked
}
Expand All @@ -96,3 +143,15 @@ func (c *Controller) TopicKey() consumer.TopicKey {
func (c *Controller) ConsumerGroup() string {
return c.consumerGroup
}

// batchStateToRequestState maps a terminal batch state to the corresponding request state.
func batchStateToRequestState(state entity.BatchState) (entity.RequestState, error) {
switch state {
case entity.BatchStateSucceeded:
return entity.RequestStateLanded, nil
case entity.BatchStateFailed:
return entity.RequestStateError, nil
Comment thread
behinddwalls marked this conversation as resolved.
default:
return entity.RequestStateUnknown, fmt.Errorf("non-terminal batch state: %s", state)
}
}
Loading