Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions entity/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,21 @@ func BatchFromBytes(data []byte) (Batch, error) {
err := json.Unmarshal(data, &batch)
return batch, err
}

// BatchID is a lightweight entity for publishing and consuming just the batch identifier via the queue.
type BatchID struct {
// ID is the globally unique identifier for the batch.
ID string `json:"id"`
}

// ToBytes serializes the BatchID to JSON bytes for queue message payload.
func (b BatchID) ToBytes() ([]byte, error) {
return json.Marshal(b)
}

// BatchIDFromBytes deserializes a BatchID from JSON bytes.
func BatchIDFromBytes(data []byte) (BatchID, error) {
var bid BatchID
err := json.Unmarshal(data, &bid)
return bid, err
}
18 changes: 18 additions & 0 deletions entity/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,21 @@ func RequestFromBytes(data []byte) (Request, error) {
err := json.Unmarshal(data, &req)
return req, err
}

// RequestID is a lightweight entity for publishing and consuming just the request identifier via the queue.
type RequestID struct {
// ID is the globally unique identifier for the land request.
ID string `json:"id"`
}

// ToBytes serializes the RequestID to JSON bytes for queue message payload.
func (r RequestID) ToBytes() ([]byte, error) {
return json.Marshal(r)
}

// RequestIDFromBytes deserializes a RequestID from JSON bytes.
func RequestIDFromBytes(data []byte) (RequestID, error) {
var rid RequestID
err := json.Unmarshal(data, &rid)
return rid, err
}
6 changes: 6 additions & 0 deletions example/server/orchestrator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
validateController := validate.NewController(
logger,
scope,
store,
registry,
mc,
consumer.TopicKeyValidate,
Expand All @@ -415,6 +416,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
scoreController := score.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyScore,
"orchestrator-score",
Expand All @@ -426,6 +428,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
speculateController := speculate.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeySpeculate,
"orchestrator-speculate",
Expand All @@ -437,6 +440,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildController := build.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyBuild,
"orchestrator-build",
Expand All @@ -448,6 +452,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
buildsignalController := buildsignal.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyBuildSignal,
"orchestrator-buildsignal",
Expand All @@ -459,6 +464,7 @@ func registerControllers(c consumer.Consumer, logger *zap.SugaredLogger, scope t
mergeController := merge.NewController(
logger,
scope,
store,
registry,
consumer.TopicKeyMerge,
"orchestrator-merge",
Expand Down
1 change: 0 additions & 1 deletion orchestrator/controller/batch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ go_test(
embed = [":batch"],
deps = [
"//core/consumer",
"//core/errs",
"//entity",
"//entity/queue",
"//extension/counter/mock",
Expand Down
41 changes: 28 additions & 13 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package batch

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally/v4"
Expand Down Expand Up @@ -72,12 +73,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er

msg := delivery.Message()

// Deserialize request entity
request, err := entity.RequestFromBytes(msg.Payload)
// Deserialize request ID from payload
rid, err := entity.RequestIDFromBytes(msg.Payload)
if err != nil {
c.metricsScope.Counter("deserialize_errors").Inc(1)
// Non-retryable: malformed messages will never succeed regardless of retry count
return fmt.Errorf("failed to deserialize request: %w", err)
return fmt.Errorf("failed to deserialize request ID: %w", err)
}

// Fetch request from storage
request, err := c.store.GetRequestStore().Get(ctx, rid.ID)
if err != nil {
c.metricsScope.Counter("storage_errors").Inc(1)
return fmt.Errorf("failed to get request %s: %w", rid.ID, err)
}

c.logger.Infow("received batch event",
Expand Down Expand Up @@ -139,11 +146,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
if err == nil {
bd.Dependents = append(existing.Dependents, bd.Dependents...)
}

if err := c.store.GetBatchDependentStore().Create(ctx, bd); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
c.metricsScope.Counter("batch_dependent_store_errors").Inc(1)
return fmt.Errorf("failed to create batch dependent for batchID=%s: %w", dep.ID, err)
}
}

// TODO:
// - Add batch to DB
// - Add to batch dependent DB
// Persist batch to storage (idempotent — ErrAlreadyExists means a retry)
if err := c.store.GetBatchStore().Create(ctx, batch); err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
c.metricsScope.Counter("batch_store_errors").Inc(1)
return fmt.Errorf("failed to create batch: %w", err)
}

c.logger.Infow("batch created",
"batch_id", batch.ID,
Expand All @@ -153,7 +167,7 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
)

// Publish to score topic
if err := c.publish(ctx, consumer.TopicKeyScore, batch); err != nil {
if err := c.publish(ctx, consumer.TopicKeyScore, batch.ID, batch.Queue); err != nil {
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to score: %w", err)
}
Expand All @@ -168,14 +182,15 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
return nil // Success - message will be acked
}

// publish publishes a batch to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error {
payload, err := batch.ToBytes()
// publish publishes a batch ID to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batchID string, partitionKey string) error {
bid := entity.BatchID{ID: batchID}
payload, err := bid.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize batch: %w", err)
return fmt.Errorf("failed to serialize batch ID: %w", err)
}

msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil)
msg := entityqueue.NewMessage(batchID, payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
Expand Down
Loading
Loading