From b02c5804b091b4d503202e62ae4fc11259dc5daa Mon Sep 17 00:00:00 2001 From: Preetam Dwivedi Date: Wed, 25 Feb 2026 15:52:21 -0800 Subject: [PATCH] feat(speculation): add speculation tree builder with power set path generation Why: The speculate controller had a TODO stub with no speculation logic. We need the core algorithm that generates all possible futures for a batch based on its predecessors to enable speculative builds. What: - Add SpeculationPath entity with explicit Base/Head fields, replacing the flat []string path and removing SpeculationPathInfo from Build - Add SpeculationPathAction enum values: schedule, cancel, land - Implement GenerateTree in core/speculation that produces 2^N paths via bitmask power set enumeration, sorted most-optimistic first - Wire GenerateTree into the speculate controller (empty deps for now) --- core/speculation/BUILD.bazel | 20 +++ core/speculation/speculation.go | 107 ++++++++++++ core/speculation/speculation_test.go | 161 ++++++++++++++++++ entity/build.go | 8 +- entity/speculation_tree.go | 27 ++- orchestrator/controller/speculate/BUILD.bazel | 1 + .../controller/speculate/speculate.go | 21 ++- 7 files changed, 329 insertions(+), 16 deletions(-) create mode 100644 core/speculation/BUILD.bazel create mode 100644 core/speculation/speculation.go create mode 100644 core/speculation/speculation_test.go diff --git a/core/speculation/BUILD.bazel b/core/speculation/BUILD.bazel new file mode 100644 index 00000000..840394c8 --- /dev/null +++ b/core/speculation/BUILD.bazel @@ -0,0 +1,20 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "speculation", + srcs = ["speculation.go"], + importpath = "github.com/uber/submitqueue/core/speculation", + visibility = ["//visibility:public"], + deps = ["//entity"], +) + +go_test( + name = "speculation_test", + srcs = ["speculation_test.go"], + embed = [":speculation"], + deps = [ + "//entity", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + ], +) diff --git a/core/speculation/speculation.go b/core/speculation/speculation.go new file mode 100644 index 00000000..b3e8cffc --- /dev/null +++ b/core/speculation/speculation.go @@ -0,0 +1,107 @@ +package speculation + +import ( + "fmt" + "math/bits" + "sort" + + "github.com/uber/submitqueue/entity" +) + +// MaxDependencies is the maximum number of predecessor dependencies allowed +// when generating a speculation tree. +// +// Speculation produces 2^N paths (the power set of predecessors). Growth is +// exponential, so a cap is required to prevent memory exhaustion: +// +// N=10 → 1,024 paths (~100 KB) +// N=15 → 32,768 paths (~3 MB) +// N=20 → 1,048,576 paths (~100 MB) +// N=25 → 33,554,432 paths (~3 GB) — OOM on most machines +// +// The current implementation uses bitmask iteration (one int per subset), +// which limits N to 62 on 64-bit systems before the bit shift overflows. +// An iterative or recursive approach would remove the bitmask ceiling, but +// the exponential memory growth remains the binding constraint regardless +// of algorithm — 2^N paths must all be held in memory. +// +// In practice, a submit queue batch rarely has more than a handful of +// predecessors. A limit of 10 (1,024 paths) is generous for real workloads. +const MaxDependencies = 10 + +// GenerateTree takes the current batch ID and its ordered dependencies (sorted by +// arrival time), and generates a SpeculationTree for the current batch containing +// all possible speculation paths (power set of predecessors). +// +// Each path represents a possible future: a subset of predecessors that succeed +// (forming the base) with the current batch as the head being tested. +// +// For N dependencies, this produces 2^N speculation paths. +// Paths are sorted most-optimistic first (most predecessors included) to +// least-optimistic (fewest predecessors included). +// All paths are initialized with Action = SpeculationPathActionSchedule. +// +// Returns an error if len(dependencyIDs) exceeds MaxDependencies. +func GenerateTree(currentID string, dependencyIDs []string) (entity.SpeculationTree, error) { + if len(dependencyIDs) > MaxDependencies { + return entity.SpeculationTree{}, fmt.Errorf( + "dependency count %d exceeds maximum %d", len(dependencyIDs), MaxDependencies, + ) + } + + // Defensive copy to avoid mutation of caller's slice. + deps := make([]string, len(dependencyIDs)) + copy(deps, dependencyIDs) + + n := len(deps) + + // We enumerate every subset of dependencies using bitmask iteration. + // An N-bit integer has 2^N possible values (0 to 2^N-1), and each value + // represents a unique subset: bit i being set means dependency i is + // included in the base (assumed to have succeeded). + // + // Example with deps = [B1, B2, B3]: + // mask=0 (000) → base=[] — all predecessors failed + // mask=1 (001) → base=[B1] — only B1 succeeded + // mask=2 (010) → base=[B2] — only B2 succeeded + // mask=3 (011) → base=[B1, B2] — B1 and B2 succeeded + // mask=4 (100) → base=[B3] — only B3 succeeded + // mask=5 (101) → base=[B1, B3] — B1 and B3 succeeded + // mask=6 (110) → base=[B2, B3] — B2 and B3 succeeded + // mask=7 (111) → base=[B1, B2, B3] — all predecessors succeeded + // + // Because we iterate bit positions low-to-high (i=0,1,...,N-1), included + // dependencies are appended in their original arrival order. + totalPaths := 1 << n // 2^N + speculations := make([]entity.SpeculationInfo, 0, totalPaths) + + for mask := range totalPaths { + // bits.OnesCount gives the number of set bits, which equals the + // number of dependencies included in this subset. + base := make([]string, 0, bits.OnesCount(uint(mask))) + for i := range n { + if mask&(1< len(speculations[j].Path.Base) + }) + + return entity.SpeculationTree{ + BatchID: currentID, + Speculations: speculations, + }, nil +} diff --git a/core/speculation/speculation_test.go b/core/speculation/speculation_test.go new file mode 100644 index 00000000..5eb6ac48 --- /dev/null +++ b/core/speculation/speculation_test.go @@ -0,0 +1,161 @@ +package speculation + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/entity" +) + +func TestGenerateTree(t *testing.T) { + tests := []struct { + name string + currentID string + dependencyIDs []string + wantPaths []entity.SpeculationPath + }{ + { + name: "no dependencies produces single path", + currentID: "B1", + dependencyIDs: nil, + wantPaths: []entity.SpeculationPath{ + {Base: []string{}, Head: "B1"}, + }, + }, + { + name: "one dependency produces two paths", + currentID: "B2", + dependencyIDs: []string{"B1"}, + wantPaths: []entity.SpeculationPath{ + {Base: []string{"B1"}, Head: "B2"}, // optimistic: B1 succeeded + {Base: []string{}, Head: "B2"}, // pessimistic: B1 failed + }, + }, + { + name: "two dependencies produces four paths", + currentID: "B3", + dependencyIDs: []string{"B1", "B2"}, + wantPaths: []entity.SpeculationPath{ + {Base: []string{"B1", "B2"}, Head: "B3"}, // both succeeded + {Base: []string{"B1"}, Head: "B3"}, // only B1 succeeded + {Base: []string{"B2"}, Head: "B3"}, // only B2 succeeded + {Base: []string{}, Head: "B3"}, // all failed + }, + }, + { + name: "three dependencies produces eight paths matching ERD example", + currentID: "B4", + dependencyIDs: []string{"B1", "B2", "B3"}, + wantPaths: []entity.SpeculationPath{ + {Base: []string{"B1", "B2", "B3"}, Head: "B4"}, // all succeeded + {Base: []string{"B1", "B2"}, Head: "B4"}, // B1, B2 succeeded + {Base: []string{"B1", "B3"}, Head: "B4"}, // B1, B3 succeeded + {Base: []string{"B2", "B3"}, Head: "B4"}, // B2, B3 succeeded + {Base: []string{"B1"}, Head: "B4"}, // only B1 succeeded + {Base: []string{"B2"}, Head: "B4"}, // only B2 succeeded + {Base: []string{"B3"}, Head: "B4"}, // only B3 succeeded + {Base: []string{}, Head: "B4"}, // all failed + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tree, err := GenerateTree(tt.currentID, tt.dependencyIDs) + require.NoError(t, err) + + assert.Equal(t, tt.currentID, tree.BatchID) + require.Len(t, tree.Speculations, len(tt.wantPaths)) + + for i, spec := range tree.Speculations { + assert.Equal(t, tt.wantPaths[i], spec.Path, "path at index %d", i) + } + }) + } +} + +func TestGenerateTree_Ordering(t *testing.T) { + tree, err := GenerateTree("B5", []string{"B1", "B2", "B3", "B4"}) + require.NoError(t, err) + + require.Len(t, tree.Speculations, 16) + + // Verify ordering: base length must be non-increasing (most optimistic first). + for i := 1; i < len(tree.Speculations); i++ { + assert.GreaterOrEqual(t, len(tree.Speculations[i-1].Path.Base), len(tree.Speculations[i].Path.Base), + "path at index %d should have >= base length than path at index %d", i-1, i) + } +} + +func TestGenerateTree_AllActionsSchedule(t *testing.T) { + tree, err := GenerateTree("B3", []string{"B1", "B2"}) + require.NoError(t, err) + + for i, spec := range tree.Speculations { + assert.Equal(t, entity.SpeculationPathActionSchedule, spec.Action, + "action at index %d", i) + } +} + +func TestGenerateTree_AllScoresZero(t *testing.T) { + tree, err := GenerateTree("B3", []string{"B1", "B2"}) + require.NoError(t, err) + + for i, spec := range tree.Speculations { + assert.Equal(t, float32(0), spec.Score, "score at index %d", i) + } +} + +func TestGenerateTree_InputImmutability(t *testing.T) { + deps := []string{"B1", "B2", "B3"} + original := make([]string, len(deps)) + copy(original, deps) + + _, err := GenerateTree("B4", deps) + require.NoError(t, err) + + assert.Equal(t, original, deps, "input dependency slice should not be mutated") +} + +func TestGenerateTree_EmptyDependencySlice(t *testing.T) { + tree, err := GenerateTree("B1", []string{}) + require.NoError(t, err) + + require.Len(t, tree.Speculations, 1) + assert.Equal(t, entity.SpeculationPath{Base: []string{}, Head: "B1"}, tree.Speculations[0].Path) +} + +func TestGenerateTree_HeadAlwaysCurrentID(t *testing.T) { + tree, err := GenerateTree("B3", []string{"B1", "B2"}) + require.NoError(t, err) + + for i, spec := range tree.Speculations { + assert.Equal(t, "B3", spec.Path.Head, "head at index %d", i) + } +} + +func TestGenerateTree_ExceedsMaxDependencies(t *testing.T) { + deps := make([]string, MaxDependencies+1) + for i := range deps { + deps[i] = fmt.Sprintf("B%d", i+1) + } + + _, err := GenerateTree("current", deps) + + require.Error(t, err) + assert.Contains(t, err.Error(), "exceeds maximum") +} + +func TestGenerateTree_AtMaxDependencies(t *testing.T) { + deps := make([]string, MaxDependencies) + for i := range deps { + deps[i] = fmt.Sprintf("B%d", i+1) + } + + tree, err := GenerateTree("current", deps) + + require.NoError(t, err) + assert.Len(t, tree.Speculations, 1<