Skip to content

Commit ba0d8ee

Browse files
committed
feat(pipeline) Orchestrator pipeline re-set up
1 parent 92500ce commit ba0d8ee

34 files changed

Lines changed: 1389 additions & 750 deletions

core/consumer/registry.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,22 @@ type TopicKey string
1616
const (
1717
// TopicKeyRequest is the pipeline stage where new requests arrive from the gateway.
1818
TopicKeyRequest TopicKey = "request"
19-
// TopicKeyToBatch is the pipeline stage where validated requests are published for batching.
20-
TopicKeyToBatch TopicKey = "to-batch"
21-
// TopicKeyBatched is the pipeline stage where batched requests are published for speculation.
22-
TopicKeyBatched TopicKey = "batched"
23-
// TopicKeyBuild is the pipeline stage where requests are published for builds.
19+
// TopicKeyValidate is the pipeline stage where requests are published for validation.
20+
TopicKeyValidate TopicKey = "validate"
21+
// TopicKeyBatch is the pipeline stage where validated requests are published for batching.
22+
TopicKeyBatch TopicKey = "batch"
23+
// TopicKeyScore is the pipeline stage where batches are published for scoring.
24+
TopicKeyScore TopicKey = "score"
25+
// TopicKeySpeculate is the pipeline stage where scored batches are published for speculation.
26+
TopicKeySpeculate TopicKey = "speculate"
27+
// TopicKeyBuild is the pipeline stage where speculated batches are published for builds.
2428
TopicKeyBuild TopicKey = "build"
25-
// TopicKeyBuildSignal is the pipeline stage where build signals are published for processing.
26-
TopicKeyBuildSignal TopicKey = "build-signal"
27-
// TopicKeyToMerge is the pipeline stage where requests are published for merging.
28-
TopicKeyToMerge TopicKey = "to-merge"
29-
// TopicKeyMergeSignal is the pipeline stage where merge signals are published for processing.
30-
TopicKeyMergeSignal TopicKey = "merge-signal"
31-
// TopicKeyFinalize is the pipeline stage where requests are published for finalization.
32-
TopicKeyFinalize TopicKey = "finalize"
29+
// TopicKeyPoll is the pipeline stage where builds are published for polling.
30+
TopicKeyPoll TopicKey = "poll"
31+
// TopicKeyMerge is the pipeline stage where polled builds are published for merging.
32+
TopicKeyMerge TopicKey = "merge"
33+
// TopicKeyConclude is the pipeline stage where merged requests are published for conclusion.
34+
TopicKeyConclude TopicKey = "conclude"
3335
)
3436

3537
// String returns the topic key as a string.

core/consumer/registry_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {
162162
registry, err := consumer.NewTopicRegistry(
163163
[]consumer.TopicConfig{
164164
{Key: consumer.TopicKeyRequest, Name: "request", Queue: mockQ1},
165-
{Key: consumer.TopicKeyToBatch, Name: "to-batch", Queue: mockQ2},
165+
{Key: consumer.TopicKeyValidate, Name: "validate", Queue: mockQ2},
166166
},
167167
)
168168
require.NoError(t, err)
@@ -171,7 +171,7 @@ func TestTopicRegistry_Queue_PerTopic(t *testing.T) {
171171
require.True(t, ok)
172172
assert.Equal(t, mockQ1, q1)
173173

174-
q2, ok := registry.Queue(consumer.TopicKeyToBatch)
174+
q2, ok := registry.Queue(consumer.TopicKeyValidate)
175175
require.True(t, ok)
176176
assert.Equal(t, mockQ2, q2)
177177

entity/build.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package entity
22

3+
import "encoding/json"
4+
35
// BuildStatus defines the possible states of a build.
46
type BuildStatus string
57

@@ -61,3 +63,15 @@ type Build struct {
6163
// Status represents the state of the build lifecycle this build is in.
6264
Status BuildStatus
6365
}
66+
67+
// ToBytes serializes the Build to JSON bytes for queue message payload.
68+
func (b Build) ToBytes() ([]byte, error) {
69+
return json.Marshal(b)
70+
}
71+
72+
// BuildFromBytes deserializes a Build from JSON bytes.
73+
func BuildFromBytes(data []byte) (Build, error) {
74+
var build Build
75+
err := json.Unmarshal(data, &build)
76+
return build, err
77+
}

entity/build_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7+
"github.com/stretchr/testify/require"
78
)
89

910
func TestBuildStatus_IsTerminal(t *testing.T) {
@@ -55,3 +56,128 @@ func TestBuildStatus_IsTerminal(t *testing.T) {
5556
})
5657
}
5758
}
59+
60+
func TestBuild_ToBytes(t *testing.T) {
61+
build := Build{
62+
ID: "build-1",
63+
BatchID: "batch-1",
64+
SpeculationPath: SpeculationPathInfo{
65+
Base: []string{"batch-0", "batch-prev"},
66+
},
67+
Score: 0.85,
68+
Status: BuildStatusQueued,
69+
}
70+
71+
data, err := build.ToBytes()
72+
require.NoError(t, err)
73+
assert.NotEmpty(t, data)
74+
75+
// Verify JSON contains expected fields
76+
jsonStr := string(data)
77+
assert.Contains(t, jsonStr, "build-1")
78+
assert.Contains(t, jsonStr, "batch-1")
79+
assert.Contains(t, jsonStr, "queued")
80+
}
81+
82+
func TestBuildFromBytes(t *testing.T) {
83+
original := Build{
84+
ID: "build-42",
85+
BatchID: "batch-7",
86+
SpeculationPath: SpeculationPathInfo{
87+
Base: []string{"batch-5", "batch-6"},
88+
},
89+
Score: 0.92,
90+
Status: BuildStatusRunning,
91+
}
92+
93+
// Serialize
94+
data, err := original.ToBytes()
95+
require.NoError(t, err)
96+
97+
// Deserialize
98+
deserialized, err := BuildFromBytes(data)
99+
require.NoError(t, err)
100+
101+
// Verify all fields match
102+
assert.Equal(t, original.ID, deserialized.ID)
103+
assert.Equal(t, original.BatchID, deserialized.BatchID)
104+
assert.Equal(t, original.SpeculationPath.Base, deserialized.SpeculationPath.Base)
105+
assert.Equal(t, original.Score, deserialized.Score)
106+
assert.Equal(t, original.Status, deserialized.Status)
107+
}
108+
109+
func TestBuildFromBytes_InvalidJSON(t *testing.T) {
110+
invalidJSON := []byte(`{"invalid": json"}`)
111+
112+
_, err := BuildFromBytes(invalidJSON)
113+
assert.Error(t, err)
114+
}
115+
116+
func TestBuildFromBytes_EmptyData(t *testing.T) {
117+
emptyJSON := []byte(`{}`)
118+
119+
build, err := BuildFromBytes(emptyJSON)
120+
require.NoError(t, err)
121+
122+
// Empty JSON should deserialize with zero values
123+
assert.Empty(t, build.ID)
124+
assert.Empty(t, build.BatchID)
125+
assert.Equal(t, BuildStatusUnknown, build.Status)
126+
assert.Equal(t, float32(0), build.Score)
127+
}
128+
129+
func TestBuild_SerializationRoundTrip(t *testing.T) {
130+
tests := []struct {
131+
name string
132+
build Build
133+
}{
134+
{
135+
name: "queued build with speculation path",
136+
build: Build{
137+
ID: "build-100",
138+
BatchID: "batch-50",
139+
SpeculationPath: SpeculationPathInfo{
140+
Base: []string{"batch-48", "batch-49"},
141+
},
142+
Score: 0.75,
143+
Status: BuildStatusQueued,
144+
},
145+
},
146+
{
147+
name: "passed build with no speculation base",
148+
build: Build{
149+
ID: "build-200",
150+
BatchID: "batch-60",
151+
Score: 1.0,
152+
Status: BuildStatusPassed,
153+
},
154+
},
155+
{
156+
name: "failed build with zero score",
157+
build: Build{
158+
ID: "build-300",
159+
BatchID: "batch-70",
160+
SpeculationPath: SpeculationPathInfo{
161+
Base: []string{"batch-65"},
162+
},
163+
Score: 0,
164+
Status: BuildStatusFailed,
165+
},
166+
},
167+
}
168+
169+
for _, tt := range tests {
170+
t.Run(tt.name, func(t *testing.T) {
171+
// Serialize
172+
data, err := tt.build.ToBytes()
173+
require.NoError(t, err)
174+
175+
// Deserialize
176+
deserialized, err := BuildFromBytes(data)
177+
require.NoError(t, err)
178+
179+
// Verify complete equality
180+
assert.Equal(t, tt.build, deserialized)
181+
})
182+
}
183+
}

example/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Example gRPC servers and clients for running the submitqueue services locally.
55
## Services
66

77
- **Gateway** (port 8081) — entry point for land requests. Exposes `Ping` and `Land` RPCs.
8-
- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 8 pipeline topics.
8+
- **Orchestrator** (port 8082) — coordinates the pipeline. Exposes `Ping` RPC and consumes queue messages across 9 pipeline topics.
99

1010
Both services require MySQL (app database + queue database). Docker Compose handles this automatically.
1111

example/server/orchestrator/BUILD.bazel

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ go_library(
2323
"//orchestrator/controller",
2424
"//orchestrator/controller/batch",
2525
"//orchestrator/controller/build",
26-
"//orchestrator/controller/buildsignal",
27-
"//orchestrator/controller/finalize",
26+
"//orchestrator/controller/conclude",
2827
"//orchestrator/controller/merge",
29-
"//orchestrator/controller/mergesignal",
28+
"//orchestrator/controller/poll",
3029
"//orchestrator/controller/request",
30+
"//orchestrator/controller/score",
3131
"//orchestrator/controller/speculate",
32+
"//orchestrator/controller/validate",
3233
"//orchestrator/protopb",
3334
"@com_github_go_sql_driver_mysql//:mysql",
3435
"@com_github_uber_go_tally_v4//:tally",

0 commit comments

Comments
 (0)