diff --git a/entity/batch.go b/entity/batch.go index fbe155c7..8d47c677 100644 --- a/entity/batch.go +++ b/entity/batch.go @@ -1,5 +1,7 @@ package entity +import "encoding/json" + // BatchState defines the possible states of a batch. type BatchState string @@ -61,3 +63,15 @@ type Batch struct { // Versioning starts at 1 and is incremented for each change to the object. Version int32 } + +// ToBytes serializes the Batch to JSON bytes for queue message payload. +func (b Batch) ToBytes() ([]byte, error) { + return json.Marshal(b) +} + +// BatchFromBytes deserializes a Batch from JSON bytes. +func BatchFromBytes(data []byte) (Batch, error) { + var batch Batch + err := json.Unmarshal(data, &batch) + return batch, err +} diff --git a/entity/batch_test.go b/entity/batch_test.go index 084e4b21..e204b24f 100644 --- a/entity/batch_test.go +++ b/entity/batch_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestBatchState_IsTerminal(t *testing.T) { @@ -28,3 +29,94 @@ func TestBatchState_IsTerminal(t *testing.T) { }) } } + +func TestBatch_SerializationRoundTrip(t *testing.T) { + tests := []struct { + name string + batch Batch + }{ + { + name: "batch with single request", + batch: Batch{ + ID: "queueA/batch/1", + Queue: "queueA", + Contains: []string{"queueA/1"}, + State: BatchStateCreated, + Version: 1, + }, + }, + { + name: "batch with multiple requests", + batch: Batch{ + ID: "queueB/batch/42", + Queue: "queueB", + Contains: []string{"queueB/10", "queueB/11", "queueB/12"}, + State: BatchStateSpeculating, + Version: 3, + }, + }, + { + name: "batch with dependencies", + batch: Batch{ + ID: "queueA/batch/3", + Queue: "queueA", + Contains: []string{"queueA/5"}, + Dependencies: []map[string]interface{}{ + {"id": "queueA/batch/1"}, + {"id": "queueA/batch/2"}, + }, + State: BatchStateCreated, + Version: 1, + }, + }, + { + name: "batch in terminal state", + batch: Batch{ + ID: "queueC/batch/99", + Queue: "queueC", + Contains: []string{"queueC/50"}, + State: BatchStateSucceeded, + Version: 5, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + data, err := tt.batch.ToBytes() + require.NoError(t, err) + + deserialized, err := BatchFromBytes(data) + require.NoError(t, err) + + assert.Equal(t, tt.batch, deserialized) + }) + } +} + +func TestBatchFromBytes_InvalidJSON(t *testing.T) { + _, err := BatchFromBytes([]byte(`{"invalid": json"}`)) + assert.Error(t, err) +} + +func TestBatchFromBytes_EmptyJSON(t *testing.T) { + batch, err := BatchFromBytes([]byte(`{}`)) + require.NoError(t, err) + + assert.Empty(t, batch.ID) + assert.Empty(t, batch.Queue) + assert.Nil(t, batch.Contains) + assert.Nil(t, batch.Dependencies) + assert.Equal(t, BatchStateUnknown, batch.State) + assert.Equal(t, int32(0), batch.Version) +} + +func TestBatchFromBytes_EmptyBytes(t *testing.T) { + _, err := BatchFromBytes([]byte{}) + assert.Error(t, err) +} + +func TestBatchFromBytes_NilBytes(t *testing.T) { + _, err := BatchFromBytes(nil) + assert.Error(t, err) +} diff --git a/orchestrator/controller/batch/batch.go b/orchestrator/controller/batch/batch.go index d880d6b5..1f6e1840 100644 --- a/orchestrator/controller/batch/batch.go +++ b/orchestrator/controller/batch/batch.go @@ -110,9 +110,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er // - Add to batch dependent DB // Publish to speculate topic - if err := c.publish(ctx, consumer.TopicKeyBatched, request); err != nil { + if err := c.publish(ctx, consumer.TopicKeyBatched, batch); err != nil { c.logger.Errorw("failed to publish output", - "request_id", request.ID, + "batch_id", batch.ID, "topic_key", consumer.TopicKeyBatched, "error", err, ) @@ -120,8 +120,8 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return fmt.Errorf("failed to publish to speculate: %w", err) } - c.logger.Infow("published request to next stage", - "request_id", request.ID, + c.logger.Infow("published batch to next stage", + "batch_id", batch.ID, "topic_key", consumer.TopicKeyBatched, ) @@ -130,14 +130,14 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) er return nil // Success - message will be acked } -// publish publishes a request to the specified topic key. -func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, request entity.Request) error { - payload, err := request.ToBytes() +// publish publishes a batch to the specified topic key. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, batch entity.Batch) error { + payload, err := batch.ToBytes() if err != nil { - return fmt.Errorf("failed to serialize request: %w", err) + return fmt.Errorf("failed to serialize batch: %w", err) } - msg := entityqueue.NewMessage(request.ID, payload, request.Queue, nil) + msg := entityqueue.NewMessage(batch.ID, payload, batch.Queue, nil) q, ok := c.registry.Queue(key) if !ok {