Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 50 additions & 9 deletions pkg/webhook/handlers/quantum.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ const (
// for, and never dedup tasks meant to be distinct.
CoordinationIndependent = "independent"

// CoordinationBatch (brokered): one broker (index 0) submits ALL N tasks to
// the same vendor queue, recording slot i -> task id_i. Each worker is gated
// and ungated with ITS OWN id_i the moment task_i COMPLETES (per-result, not
// broadcast). The broker holds the only node during the long vendor wait;
// workers materialize just-in-time to post-process. This is the shared
// machinery generalized from one shared result to N per-slot results -- it
// trades a little worker-startup latency for a large node-time saving, since
// N-1 workers no longer squat nodes through the queue wait. Submitting from
// one pod (vs N) also lets the broker throttle to the vendor concurrency cap;
// results returning out of order is a non-issue (each keyed to its slot id).
CoordinationBatch = "batch"

// ProducerGroupSuffix names the producer's own group-of-one: <group>-producer
// (minCount 1) so it schedules alone and never deadlocks against the gated
// consumer gang.
Expand Down Expand Up @@ -125,7 +137,7 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *
// member. The REAL submit happens in THIS pod; the sidecar is added only for
// observe-only telemetry. (independent mode routes every member here -> N
// standalone producers, each owning its task and its own queue wait.)
if mode != CoordinationShared || g == "" || n <= 1 {
if (mode != CoordinationShared && mode != CoordinationBatch) || g == "" || n <= 1 {
ops := interceptorOps(pod)
if observe {
sc := sidecarFor(m)
Expand All @@ -137,30 +149,43 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod *
return ops
}

// shared mode: promote one member to producer; the rest are gated consumers.
// shared / batch: same two roles. Promote one member to producer (index 0),
// the rest are gated consumers. The mode (shared|batch) rides along and
// decides the producer's submit fan-out and the sidecar's ungate strategy --
// it does NOT create new roles.
if isProducer(ctx, m, pod, g) {
return h.mutateProducer(ctx, m, pod, g)
return h.mutateProducer(ctx, m, pod, g, mode, n)
}
return h.mutateConsumer(ctx, m, pod, g, n)
return h.mutateConsumer(ctx, m, pod, g, n, mode)
}

// mutateProducer wires the single producer member (indexed-Job completion index
// 0): its own group-of-one <group>-producer (minCount 1) so it schedules alone
// and runs the REAL submit, the interceptor in tag mode, RBAC, and the sidecar
// told which consumer group to ungate (FLUENCE_GANG_GROUP). The producer is one
// of the N members, so the application is NOT run an extra time. Never gated.
func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) []spec.Op {
func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group, mode string, n int32) []spec.Op {
pg := group + ProducerGroupSuffix
m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1)
ops := linkGroupOps(pod, pg)
ops = append(ops, interceptorOps(pod)...) // tag mode: the producer submits for real
ops = append(ops, roleEnvOps(pod, RoleProducer)...) // FLUENCE_COORDINATION_ROLE=producer
ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: the producer's submit fan-out
// the producer's workload needs N (in batch it submits N tasks).
ops = append(ops, setContainerEnvOps(pod, corev1.EnvVar{Name: GangSizeEnv, Value: strconv.Itoa(int(n))})...)
sc := sidecarFor(m)
sc.EnsureRBAC(ctx, pod.Namespace)
extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: group}}
// The sidecar needs the gang group (whom to ungate), the mode (broadcast one
// id vs release each consumer by its own per-slot result), and N (in batch,
// the number of tasks/slots to map).
extra := []corev1.EnvVar{
{Name: GangGroupEnv, Value: group},
{Name: CoordinationModeEnv, Value: mode},
{Name: GangSizeEnv, Value: strconv.Itoa(int(n))},
}
ops = append(ops, sc.ContainerOps(pod, false, extra)...)
log.Printf("[fluence-webhook] quantum producer %s/%s — group %s (ungates consumers %q)",
pod.Namespace, pod.Name, pg, group)
log.Printf("[fluence-webhook] quantum producer %s/%s — group %s, mode=%s, N=%d (ungates consumers %q)",
pod.Namespace, pod.Name, pg, mode, n, group)
return ops
}

Expand All @@ -171,7 +196,7 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP
// role-aware consumer reads those and fetches the shared result instead of
// submitting — so the consumer never calls the vendor submit, and needs neither
// the interceptor nor a faux flag.
func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op {
func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32, mode string) []spec.Op {
m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1)
ops := linkGroupOps(pod, group)
// Express the wait as the GENERAL dependency primitive: this consumer depends
Expand All @@ -180,6 +205,7 @@ func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAP
dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate}
ops = append(ops, dep.applyOps(pod)...)
ops = append(ops, consumerEnvOps(pod)...)
ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: how the sidecar releases this consumer
// A gated consumer never runs the QPU task — it only fetches the producer's
// shared result — so it must not hold the Fluxion quantum resource. Leaving it
// would make Fluxion allocate a qpu per consumer, capping the gang at the
Expand Down Expand Up @@ -383,6 +409,16 @@ const (
RoleProducer = "producer"
RoleConsumer = "consumer"

// CoordinationModeEnv carries the coordination mode (shared|batch) to the
// workload and the sidecar. Roles are unchanged across modes -- the producer
// (index 0) always submits and runs the sidecar; consumers are always gated.
// Only WHAT the producer submits (one task vs N) and HOW the sidecar ungates
// (one shared id broadcast vs each consumer released by its own per-slot
// result) differ, and that is exactly what this mode selects.
CoordinationModeEnv = "FLUENCE_COORDINATION_MODE"
// GangSizeEnv tells the producer the gang size N (in batch it submits N).
GangSizeEnv = "FLUENCE_GANG_SIZE"

// QuantumJobIDAnnotation is the vendor-neutral task id the ungating sidecar
// stamps on each consumer (mirrors python/fluence/ungate.py JOB_ID_ANNOTATION),
// BEFORE removing the gate. Surfaced into FLUENCE_QUANTUM_JOB_ID via the
Expand All @@ -408,6 +444,11 @@ func consumerEnvOps(pod *corev1.Pod) []spec.Op {
return ops
}

// modeEnvOps sets FLUENCE_COORDINATION_MODE=<mode> on each non-sidecar container.
func modeEnvOps(pod *corev1.Pod, mode string) []spec.Op {
return setContainerEnvOps(pod, corev1.EnvVar{Name: CoordinationModeEnv, Value: mode})
}

// setContainerEnvOps appends env var e to every non-sidecar container that does
// not already define it, returning the patch ops and mutating pod in place.
func setContainerEnvOps(pod *corev1.Pod, e corev1.EnvVar) []spec.Op {
Expand Down
119 changes: 119 additions & 0 deletions pkg/webhook/handlers/quantum_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package handlers

import (
"context"
"testing"

"github.com/converged-computing/fluence/pkg/webhook"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)

// batchQPUPod is a quantum workload pod owned by a Job of parallelism N, with
// coordination=batch and a completion index. Same two roles as shared: index 0
// is the producer, any other index is a consumer -- batch changes only the
// submit fan-out and ungate strategy (carried by FLUENCE_COORDINATION_MODE).
func batchQPUPod(ns, group, name, job, index string) *corev1.Pod {
p := qpuPod("fluence")
p.Name = name
p.Namespace = ns
p.Labels = map[string]string{webhook.GroupLabel: group}
p.Annotations = map[string]string{
CoordinationAnnotation: CoordinationBatch,
CompletionIndexAnnotation: index,
}
p.OwnerReferences = []metav1.OwnerReference{{Kind: "Job", Name: job}}
return p
}

func batchJobClientset(ns, job string, n int32) *fake.Clientset {
return fake.NewSimpleClientset(&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns},
Spec: batchv1.JobSpec{Parallelism: &n, Completions: &n},
})
}

// The batch PRODUCER (index 0) is wired exactly like the shared producer -- own
// group-of-one (minCount 1), real sidecar, never gated, keeps its qpu, role=producer
// -- and additionally carries mode=batch and the gang size N so its workload
// submits N tasks. No new role, no extra pod.
func TestBatchProducerIsProducerWithMode(t *testing.T) {
ns, group, job := "default", "bg", "bg-job"
cs := batchJobClientset(ns, job, 4)
m := &webhook.Mutator{Clientset: cs}

ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-0", job, "0"))

if !hasSidecarOp(ops) {
t.Error("batch producer must get the real sidecar")
}
if hasGateOp(ops) {
t.Error("batch producer must NOT be gated")
}
if hasDropQuantumResourceOp(ops) {
t.Error("batch producer must KEEP its qpu resource")
}
if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer {
t.Errorf("must reuse role=%s (no broker role), got %q (ok=%v)", RoleProducer, e.Value, ok)
}
if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch {
t.Errorf("producer must carry %s=%s", CoordinationModeEnv, CoordinationBatch)
}
if e, ok := envOp(ops, GangSizeEnv); !ok || e.Value != "4" {
t.Errorf("producer must be told the gang size %s=4, got %q (ok=%v)", GangSizeEnv, e.Value, ok)
}
if mc, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); !ok || mc != 1 {
t.Errorf("producer group-of-one minCount=%d (ok=%v), want 1", mc, ok)
}
}

// A batch CONSUMER (index != 0) is wired exactly like the shared consumer --
// gated, role=consumer, joins the gang at minCount N-1, qpu stripped -- and
// additionally carries mode=batch so the sidecar releases it by its OWN result.
func TestBatchConsumerIsConsumerWithMode(t *testing.T) {
ns, group, job := "default", "bg", "bg-job"
cs := batchJobClientset(ns, job, 4)
m := &webhook.Mutator{Clientset: cs}

ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-2", job, "2"))

if !hasGateOp(ops) {
t.Error("batch consumer must be gated")
}
if hasSidecarOp(ops) {
t.Error("batch consumer (gated) must NOT get a sidecar")
}
if !hasDropQuantumResourceOp(ops) {
t.Error("batch consumer must have its qpu stripped")
}
if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleConsumer {
t.Errorf("must reuse role=%s (no worker role), got %q (ok=%v)", RoleConsumer, e.Value, ok)
}
if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch {
t.Errorf("consumer must carry %s=%s", CoordinationModeEnv, CoordinationBatch)
}
if mc, ok := mincount(t, cs, ns, group); !ok || mc != 3 {
t.Errorf("consumer gang minCount=%d (ok=%v), want 3 (N-1 split)", mc, ok)
}
}

// shared mode still stamps role=producer and mode=shared (no regression from the
// mode unification).
func TestSharedStillStampsModeShared(t *testing.T) {
ns, group, job := "default", "sg", "sg-job"
cs := batchJobClientset(ns, job, 3)
m := &webhook.Mutator{Clientset: cs}
p := batchQPUPod(ns, group, "sg-0", job, "0")
p.Annotations[CoordinationAnnotation] = CoordinationShared

ops := m.Mutate(context.Background(), p)
if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer {
t.Errorf("shared producer role=%s", RoleProducer)
}
if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationShared {
t.Errorf("shared producer must carry %s=%s", CoordinationModeEnv, CoordinationShared)
}
}
16 changes: 16 additions & 0 deletions python/fluence/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ def find_my_task(self, pod_uid: str, backend: str, timeout: int) -> "Task | None
found or timeout. Returns an opaque Task or None."""
raise NotImplementedError

def find_my_tasks(self, pod_uid: str, backend: str, timeout: int,
n: int) -> "list[Task]":
"""BATCH mode: the producer submitted N tasks, all tagged
TAG_KEY=<pod_uid>. Return them as a list. Order and identity are
irrelevant -- the gated workers come from one template and are
interchangeable, so each result is assigned to whichever worker is free
when it completes. No per-slot tag and no Job completion index are
required, so this works for ANY gang abstraction (indexed Job,
ReplicaSet/Deployment, StatefulSet, or loose label-grouped pods).

Vendors override this to list their tagged submissions. The default
raises, so a provider without batch support fails loudly rather than
silently stranding the gang."""
raise NotImplementedError(
f"{self.name} provider does not implement find_my_tasks (batch mode)")

def is_ready_to_ungate(self, task: "Task") -> bool:
"""True when the gang should be ungated — queue position == 1 or the task
is already RUNNING/terminal. Always implementable."""
Expand Down
21 changes: 20 additions & 1 deletion python/fluence/sidecar.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@

from fluence.providers import resolve_from_env
from fluence.providers.base import log
from fluence.ungate import ungate_pods, gated_pods_from_env, namespace_from_env, wait_for_gated_pods
from fluence.ungate import (ungate_pods, gated_pods_from_env, namespace_from_env,
wait_for_gated_pods, ungate_per_result)



Expand Down Expand Up @@ -77,6 +78,24 @@ def main():
sys.exit(1)
log(f"resolved provider: {provider.name}")

# BATCH mode: the producer submitted N tasks (one per slot); release each
# gated consumer the moment ITS task completes, keyed by slot. Same gate
# primitive as shared, fired per-result instead of broadcast.
mode = os.environ.get("FLUENCE_COORDINATION_MODE", "shared")
if mode == "batch" and not observe:
gang_size = int(os.environ.get("FLUENCE_GANG_SIZE", "0"))
tasks = provider.find_my_tasks(pod_uid, backend, discovery_timeout, gang_size)
log(f"batch mode: discovered {len(tasks)} task(s) from {gang_size}-pod gang")
# No completion-index assumption: the workers are interchangeable, so just
# find the gated members and hand each a result as it completes.
gated = gated_pods_from_env() or wait_for_gated_pods(
namespace, gang_group, exclude=pod_name, timeout=ungate_timeout)
log(f"batch mode: {len(gated)} gated worker(s): {gated}")
n_ok = ungate_per_result(provider, tasks, gated, namespace,
poll_interval=poll_interval, timeout=ungate_timeout)
log(f"batch done — {n_ok} worker(s) released per-result")
return

task = provider.find_my_task(pod_uid, backend, discovery_timeout)
if task is None:
log("ERROR: could not discover quantum task")
Expand Down
Loading
Loading