Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions core/consumer/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ type TopicKey string
const (
// TopicKeyRequest is the pipeline stage where new requests arrive from the gateway.
TopicKeyRequest TopicKey = "request"
// TopicKeyToBatch is the pipeline stage where validated requests are published for batching.
TopicKeyToBatch TopicKey = "to-batch"
// TopicKeyBatched is the pipeline stage where batched requests are published for speculation.
TopicKeyBatched TopicKey = "batched"
// TopicKeyBuild is the pipeline stage where requests are published for builds.
// TopicKeyValidate is the pipeline stage where requests are published for validation.
TopicKeyValidate TopicKey = "validate"
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.
TopicKeyBatch TopicKey = "batch"
// TopicKeyScore is the pipeline stage where batches are published for scoring.
TopicKeyScore TopicKey = "score"
// TopicKeySpeculate is the pipeline stage where scored batches are published for speculation.
TopicKeySpeculate TopicKey = "speculate"
// TopicKeyBuild is the pipeline stage where speculated batches are published for builds.
TopicKeyBuild TopicKey = "build"
// TopicKeyBuildSignal is the pipeline stage where build signals are published for processing.
TopicKeyBuildSignal TopicKey = "build-signal"
// TopicKeyToMerge is the pipeline stage where requests are published for merging.
TopicKeyToMerge TopicKey = "to-merge"
// TopicKeyMergeSignal is the pipeline stage where merge signals are published for processing.
TopicKeyMergeSignal TopicKey = "merge-signal"
// TopicKeyFinalize is the pipeline stage where requests are published for finalization.
TopicKeyFinalize TopicKey = "finalize"
// TopicKeyBuildSignal is the pipeline stage where builds are published for build signal processing.
TopicKeyBuildSignal TopicKey = "buildsignal"
// TopicKeyMerge is the pipeline stage where speculated batches are published for merging.
TopicKeyMerge TopicKey = "merge"
// TopicKeyConclude is the pipeline stage where merged requests are published for conclusion.
TopicKeyConclude TopicKey = "conclude"
)

// String returns the topic key as a string.
Expand Down
4 changes: 2 additions & 2 deletions core/consumer/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {
registry, err := consumer.NewTopicRegistry(
[]consumer.TopicConfig{
{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1},
{Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ2},
{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2},
},
)
require.NoError(t, err)
Expand All @@ -171,7 +171,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {
require.True(t, ok)
assert.Equal(t, mockQ1, q1)

q2, ok := registry.Queue(consumer.TopicKeyToBatch)
q2, ok := registry.Queue(consumer.TopicKeyValidate)
require.True(t, ok)
assert.Equal(t, mockQ2, q2)

Expand Down
14 changes: 14 additions & 0 deletions entity/build.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package entity

import "encoding/json"

// BuildStatus defines the possible states of a build.
type BuildStatus string

Expand Down Expand Up @@ -61,3 +63,15 @@ type Build struct {
// Status represents the state of the build lifecycle this build is in.
Status BuildStatus
}

// ToBytes serializes the Build to JSON bytes for queue message payload.
func (b Build) ToBytes() ([]byte, error) {
return json.Marshal(b)
}

// BuildFromBytes deserializes a Build from JSON bytes.
func BuildFromBytes(data []byte) (Build, error) {
var build Build
err := json.Unmarshal(data, &build)
return build, err
}
126 changes: 126 additions & 0 deletions entity/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBuildStatus_IsTerminal(t *testing.T) {
Expand Down Expand Up @@ -55,3 +56,128 @@ func TestBuildStatus_IsTerminal(t *testing.T) {
})
}
}

func TestBuild_ToBytes(t *testing.T) {
build := Build{
ID: "build-1",
BatchID: "batch-1",
SpeculationPath: SpeculationPathInfo{
Base: []string{"batch-0", "batch-prev"},
},
Score: 0.85,
Status: BuildStatusQueued,
}

data, err := build.ToBytes()
require.NoError(t, err)
assert.NotEmpty(t, data)

// Verify JSON contains expected fields
jsonStr := string(data)
assert.Contains(t, jsonStr, "build-1")
assert.Contains(t, jsonStr, "batch-1")
assert.Contains(t, jsonStr, "queued")
}

func TestBuildFromBytes(t *testing.T) {
original := Build{
ID: "build-42",
BatchID: "batch-7",
SpeculationPath: SpeculationPathInfo{
Base: []string{"batch-5", "batch-6"},
},
Score: 0.92,
Status: BuildStatusRunning,
}

// Serialize
data, err := original.ToBytes()
require.NoError(t, err)

// Deserialize
deserialized, err := BuildFromBytes(data)
require.NoError(t, err)

// Verify all fields match
assert.Equal(t, original.ID, deserialized.ID)
assert.Equal(t, original.BatchID, deserialized.BatchID)
assert.Equal(t, original.SpeculationPath.Base, deserialized.SpeculationPath.Base)
assert.Equal(t, original.Score, deserialized.Score)
assert.Equal(t, original.Status, deserialized.Status)
}

func TestBuildFromBytes_InvalidJSON(t *testing.T) {
invalidJSON := []byte(`{"invalid": json"}`)

_, err := BuildFromBytes(invalidJSON)
assert.Error(t, err)
}

func TestBuildFromBytes_EmptyData(t *testing.T) {
emptyJSON := []byte(`{}`)

build, err := BuildFromBytes(emptyJSON)
require.NoError(t, err)

// Empty JSON should deserialize with zero values
assert.Empty(t, build.ID)
assert.Empty(t, build.BatchID)
assert.Equal(t, BuildStatusUnknown, build.Status)
assert.Equal(t, float32(0), build.Score)
}

func TestBuild_SerializationRoundTrip(t *testing.T) {
tests := []struct {
name string
build Build
}{
{
name: "queued build with speculation path",
build: Build{
ID: "build-100",
BatchID: "batch-50",
SpeculationPath: SpeculationPathInfo{
Base: []string{"batch-48", "batch-49"},
},
Score: 0.75,
Status: BuildStatusQueued,
},
},
{
name: "passed build with no speculation base",
build: Build{
ID: "build-200",
BatchID: "batch-60",
Score: 1.0,
Status: BuildStatusPassed,
},
},
{
name: "failed build with zero score",
build: Build{
ID: "build-300",
BatchID: "batch-70",
SpeculationPath: SpeculationPathInfo{
Base: []string{"batch-65"},
},
Score: 0,
Status: BuildStatusFailed,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Serialize
data, err := tt.build.ToBytes()
require.NoError(t, err)

// Deserialize
deserialized, err := BuildFromBytes(data)
require.NoError(t, err)

// Verify complete equality
assert.Equal(t, tt.build, deserialized)
})
}
}
2 changes: 1 addition & 1 deletion example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Example gRPC servers and clients for running the submitqueue services locally.
## Services

- **Gateway** (port 8081) — entry point for land requests. Exposes `Ping` and `Land` RPCs.
- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 8 pipeline topics.
- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 9 pipeline topics.

Both services require MySQL (app database + queue database). Docker Compose handles this automatically.

Expand Down
5 changes: 3 additions & 2 deletions example/server/orchestrator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ go_library(
"//orchestrator/controller/batch",
"//orchestrator/controller/build",
"//orchestrator/controller/buildsignal",
"//orchestrator/controller/finalize",
"//orchestrator/controller/conclude",
"//orchestrator/controller/merge",
"//orchestrator/controller/mergesignal",
"//orchestrator/controller/request",
"//orchestrator/controller/score",
"//orchestrator/controller/speculate",
"//orchestrator/controller/validate",
"//orchestrator/protopb",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally_v4//:tally",
Expand Down
Loading