Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions entity/batch.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package entity

import "encoding/json"

// BatchState defines the possible states of a batch.
type BatchState string

Expand Down Expand Up @@ -61,3 +63,15 @@ type Batch struct {
// Versioning starts at 1 and is incremented for each change to the object.
Version int32
}

// ToBytes serializes the Batch to JSON bytes for queue message payload.
func (b Batch) ToBytes() ([]byte, error) {
return json.Marshal(b)
}

// BatchFromBytes deserializes a Batch from JSON bytes.
func BatchFromBytes(data []byte) (Batch, error) {
var batch Batch
err := json.Unmarshal(data, &batch)
return batch, err
}
92 changes: 92 additions & 0 deletions entity/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBatchState_IsTerminal(t *testing.T) {
Expand All @@ -28,3 +29,94 @@ func TestBatchState_IsTerminal(t *testing.T) {
})
}
}

func TestBatch_SerializationRoundTrip(t *testing.T) {
tests := []struct {
name string
batch Batch
}{
{
name: "batch with single request",
batch: Batch{
ID: "queueA/batch/1",
Queue: "queueA",
Contains: []string{"queueA/1"},
State: BatchStateCreated,
Version: 1,
},
},
{
name: "batch with multiple requests",
batch: Batch{
ID: "queueB/batch/42",
Queue: "queueB",
Contains: []string{"queueB/10", "queueB/11", "queueB/12"},
State: BatchStateSpeculating,
Version: 3,
},
},
{
name: "batch with dependencies",
batch: Batch{
ID: "queueA/batch/3",
Queue: "queueA",
Contains: []string{"queueA/5"},
Dependencies: []map[string]interface{}{
{"id": "queueA/batch/1"},
{"id": "queueA/batch/2"},
},
State: BatchStateCreated,
Version: 1,
},
},
{
name: "batch in terminal state",
batch: Batch{
ID: "queueC/batch/99",
Queue: "queueC",
Contains: []string{"queueC/50"},
State: BatchStateSucceeded,
Version: 5,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
data, err := tt.batch.ToBytes()
require.NoError(t, err)

deserialized, err := BatchFromBytes(data)
require.NoError(t, err)

assert.Equal(t, tt.batch, deserialized)
})
}
}

func TestBatchFromBytes_InvalidJSON(t *testing.T) {
_, err := BatchFromBytes([]byte(`{"invalid": json"}`))
assert.Error(t, err)
}

func TestBatchFromBytes_EmptyJSON(t *testing.T) {
batch, err := BatchFromBytes([]byte(`{}`))
require.NoError(t, err)

assert.Empty(t, batch.ID)
assert.Empty(t, batch.Queue)
assert.Nil(t, batch.Contains)
assert.Nil(t, batch.Dependencies)
assert.Equal(t, BatchStateUnknown, batch.State)
assert.Equal(t, int32(0), batch.Version)
}

func TestBatchFromBytes_EmptyBytes(t *testing.T) {
_, err := BatchFromBytes([]byte{})
assert.Error(t, err)
}

func TestBatchFromBytes_NilBytes(t *testing.T) {
_, err := BatchFromBytes(nil)
assert.Error(t, err)
}
18 changes: 9 additions & 9 deletions orchestrator/controller/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,18 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
// - Add to batch dependent DB

// Publish to speculate topic
if err := c.publish(ctx, consumer.TopicKeyBatched, request); err != nil {
if err := c.publish(ctx, consumer.TopicKeyBatched, batch); err != nil {
c.logger.Errorw("failed to publish output",
"request_id", request.ID,
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyBatched,
"error", err,
)
c.metricsScope.Counter("publish_errors").Inc(1)
return fmt.Errorf("failed to publish to speculate: %w", err)
}

c.logger.Infow("published request to next stage",
"request_id", request.ID,
c.logger.Infow("published batch to next stage",
"batch_id", batch.ID,
"topic_key", consumer.TopicKeyBatched,
)

Expand All @@ -130,14 +130,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er
return nil // Success - message will be acked
}

// publish publishes a request to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error {
payload, err := request.ToBytes()
// publish publishes a batch to the specified topic key.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error {
payload, err := batch.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize request: %w", err)
return fmt.Errorf("failed to serialize batch: %w", err)
}

msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil)
msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil)

q, ok := c.registry.Queue(key)
if !ok {
Expand Down