From 73da38ab09fd569acc29c28cda619cb1bb09c107 Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 28 Jun 2026 16:38:34 -0700 Subject: [PATCH] wip: feature for batch mode. If we DO have batch (or embarrassingly parallel) we would not ideally have to submit N pods and then have N pods each wait in the queue. I want to test the idea of having the broker do all the submit, but then have the pods ungated as each result comes in, and each pod given the result. Signed-off-by: vsoch --- pkg/webhook/handlers/quantum.go | 59 ++++++++-- pkg/webhook/handlers/quantum_batch_test.go | 119 ++++++++++++++++++++ python/fluence/providers/base.py | 16 +++ python/fluence/sidecar.py | 21 +++- python/fluence/ungate.py | 121 ++++++++++++++------- python/tests/test_batch_ungate.py | 118 ++++++++++++++++++++ 6 files changed, 406 insertions(+), 48 deletions(-) create mode 100644 pkg/webhook/handlers/quantum_batch_test.go create mode 100644 python/tests/test_batch_ungate.py diff --git a/pkg/webhook/handlers/quantum.go b/pkg/webhook/handlers/quantum.go index 26a09b0..b0ce218 100644 --- a/pkg/webhook/handlers/quantum.go +++ b/pkg/webhook/handlers/quantum.go @@ -81,6 +81,18 @@ const ( // for, and never dedup tasks meant to be distinct. CoordinationIndependent = "independent" + // CoordinationBatch (brokered): one broker (index 0) submits ALL N tasks to + // the same vendor queue, recording slot i -> task id_i. Each worker is gated + // and ungated with ITS OWN id_i the moment task_i COMPLETES (per-result, not + // broadcast). The broker holds the only node during the long vendor wait; + // workers materialize just-in-time to post-process. This is the shared + // machinery generalized from one shared result to N per-slot results -- it + // trades a little worker-startup latency for a large node-time saving, since + // N-1 workers no longer squat nodes through the queue wait. Submitting from + // one pod (vs N) also lets the broker throttle to the vendor concurrency cap; + // results returning out of order is a non-issue (each keyed to its slot id). + CoordinationBatch = "batch" + // ProducerGroupSuffix names the producer's own group-of-one: -producer // (minCount 1) so it schedules alone and never deadlocks against the gated // consumer gang. @@ -125,7 +137,7 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * // member. The REAL submit happens in THIS pod; the sidecar is added only for // observe-only telemetry. (independent mode routes every member here -> N // standalone producers, each owning its task and its own queue wait.) - if mode != CoordinationShared || g == "" || n <= 1 { + if (mode != CoordinationShared && mode != CoordinationBatch) || g == "" || n <= 1 { ops := interceptorOps(pod) if observe { sc := sidecarFor(m) @@ -137,11 +149,14 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * return ops } - // shared mode: promote one member to producer; the rest are gated consumers. + // shared / batch: same two roles. Promote one member to producer (index 0), + // the rest are gated consumers. The mode (shared|batch) rides along and + // decides the producer's submit fan-out and the sidecar's ungate strategy -- + // it does NOT create new roles. if isProducer(ctx, m, pod, g) { - return h.mutateProducer(ctx, m, pod, g) + return h.mutateProducer(ctx, m, pod, g, mode, n) } - return h.mutateConsumer(ctx, m, pod, g, n) + return h.mutateConsumer(ctx, m, pod, g, n, mode) } // mutateProducer wires the single producer member (indexed-Job completion index @@ -149,18 +164,28 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * // and runs the REAL submit, the interceptor in tag mode, RBAC, and the sidecar // told which consumer group to ungate (FLUENCE_GANG_GROUP). The producer is one // of the N members, so the application is NOT run an extra time. Never gated. -func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) []spec.Op { +func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group, mode string, n int32) []spec.Op { pg := group + ProducerGroupSuffix m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) ops := linkGroupOps(pod, pg) ops = append(ops, interceptorOps(pod)...) // tag mode: the producer submits for real ops = append(ops, roleEnvOps(pod, RoleProducer)...) // FLUENCE_COORDINATION_ROLE=producer + ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: the producer's submit fan-out + // the producer's workload needs N (in batch it submits N tasks). + ops = append(ops, setContainerEnvOps(pod, corev1.EnvVar{Name: GangSizeEnv, Value: strconv.Itoa(int(n))})...) sc := sidecarFor(m) sc.EnsureRBAC(ctx, pod.Namespace) - extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: group}} + // The sidecar needs the gang group (whom to ungate), the mode (broadcast one + // id vs release each consumer by its own per-slot result), and N (in batch, + // the number of tasks/slots to map). + extra := []corev1.EnvVar{ + {Name: GangGroupEnv, Value: group}, + {Name: CoordinationModeEnv, Value: mode}, + {Name: GangSizeEnv, Value: strconv.Itoa(int(n))}, + } ops = append(ops, sc.ContainerOps(pod, false, extra)...) - log.Printf("[fluence-webhook] quantum producer %s/%s — group %s (ungates consumers %q)", - pod.Namespace, pod.Name, pg, group) + log.Printf("[fluence-webhook] quantum producer %s/%s — group %s, mode=%s, N=%d (ungates consumers %q)", + pod.Namespace, pod.Name, pg, mode, n, group) return ops } @@ -171,7 +196,7 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP // role-aware consumer reads those and fetches the shared result instead of // submitting — so the consumer never calls the vendor submit, and needs neither // the interceptor nor a faux flag. -func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { +func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32, mode string) []spec.Op { m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) ops := linkGroupOps(pod, group) // Express the wait as the GENERAL dependency primitive: this consumer depends @@ -180,6 +205,7 @@ func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAP dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} ops = append(ops, dep.applyOps(pod)...) ops = append(ops, consumerEnvOps(pod)...) + ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: how the sidecar releases this consumer // A gated consumer never runs the QPU task — it only fetches the producer's // shared result — so it must not hold the Fluxion quantum resource. Leaving it // would make Fluxion allocate a qpu per consumer, capping the gang at the @@ -383,6 +409,16 @@ const ( RoleProducer = "producer" RoleConsumer = "consumer" + // CoordinationModeEnv carries the coordination mode (shared|batch) to the + // workload and the sidecar. Roles are unchanged across modes -- the producer + // (index 0) always submits and runs the sidecar; consumers are always gated. + // Only WHAT the producer submits (one task vs N) and HOW the sidecar ungates + // (one shared id broadcast vs each consumer released by its own per-slot + // result) differ, and that is exactly what this mode selects. + CoordinationModeEnv = "FLUENCE_COORDINATION_MODE" + // GangSizeEnv tells the producer the gang size N (in batch it submits N). + GangSizeEnv = "FLUENCE_GANG_SIZE" + // QuantumJobIDAnnotation is the vendor-neutral task id the ungating sidecar // stamps on each consumer (mirrors python/fluence/ungate.py JOB_ID_ANNOTATION), // BEFORE removing the gate. Surfaced into FLUENCE_QUANTUM_JOB_ID via the @@ -408,6 +444,11 @@ func consumerEnvOps(pod *corev1.Pod) []spec.Op { return ops } +// modeEnvOps sets FLUENCE_COORDINATION_MODE= on each non-sidecar container. +func modeEnvOps(pod *corev1.Pod, mode string) []spec.Op { + return setContainerEnvOps(pod, corev1.EnvVar{Name: CoordinationModeEnv, Value: mode}) +} + // setContainerEnvOps appends env var e to every non-sidecar container that does // not already define it, returning the patch ops and mutating pod in place. func setContainerEnvOps(pod *corev1.Pod, e corev1.EnvVar) []spec.Op { diff --git a/pkg/webhook/handlers/quantum_batch_test.go b/pkg/webhook/handlers/quantum_batch_test.go new file mode 100644 index 0000000..639de19 --- /dev/null +++ b/pkg/webhook/handlers/quantum_batch_test.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "context" + "testing" + + "github.com/converged-computing/fluence/pkg/webhook" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +// batchQPUPod is a quantum workload pod owned by a Job of parallelism N, with +// coordination=batch and a completion index. Same two roles as shared: index 0 +// is the producer, any other index is a consumer -- batch changes only the +// submit fan-out and ungate strategy (carried by FLUENCE_COORDINATION_MODE). +func batchQPUPod(ns, group, name, job, index string) *corev1.Pod { + p := qpuPod("fluence") + p.Name = name + p.Namespace = ns + p.Labels = map[string]string{webhook.GroupLabel: group} + p.Annotations = map[string]string{ + CoordinationAnnotation: CoordinationBatch, + CompletionIndexAnnotation: index, + } + p.OwnerReferences = []metav1.OwnerReference{{Kind: "Job", Name: job}} + return p +} + +func batchJobClientset(ns, job string, n int32) *fake.Clientset { + return fake.NewSimpleClientset(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &n, Completions: &n}, + }) +} + +// The batch PRODUCER (index 0) is wired exactly like the shared producer -- own +// group-of-one (minCount 1), real sidecar, never gated, keeps its qpu, role=producer +// -- and additionally carries mode=batch and the gang size N so its workload +// submits N tasks. No new role, no extra pod. +func TestBatchProducerIsProducerWithMode(t *testing.T) { + ns, group, job := "default", "bg", "bg-job" + cs := batchJobClientset(ns, job, 4) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-0", job, "0")) + + if !hasSidecarOp(ops) { + t.Error("batch producer must get the real sidecar") + } + if hasGateOp(ops) { + t.Error("batch producer must NOT be gated") + } + if hasDropQuantumResourceOp(ops) { + t.Error("batch producer must KEEP its qpu resource") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer { + t.Errorf("must reuse role=%s (no broker role), got %q (ok=%v)", RoleProducer, e.Value, ok) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch { + t.Errorf("producer must carry %s=%s", CoordinationModeEnv, CoordinationBatch) + } + if e, ok := envOp(ops, GangSizeEnv); !ok || e.Value != "4" { + t.Errorf("producer must be told the gang size %s=4, got %q (ok=%v)", GangSizeEnv, e.Value, ok) + } + if mc, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); !ok || mc != 1 { + t.Errorf("producer group-of-one minCount=%d (ok=%v), want 1", mc, ok) + } +} + +// A batch CONSUMER (index != 0) is wired exactly like the shared consumer -- +// gated, role=consumer, joins the gang at minCount N-1, qpu stripped -- and +// additionally carries mode=batch so the sidecar releases it by its OWN result. +func TestBatchConsumerIsConsumerWithMode(t *testing.T) { + ns, group, job := "default", "bg", "bg-job" + cs := batchJobClientset(ns, job, 4) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-2", job, "2")) + + if !hasGateOp(ops) { + t.Error("batch consumer must be gated") + } + if hasSidecarOp(ops) { + t.Error("batch consumer (gated) must NOT get a sidecar") + } + if !hasDropQuantumResourceOp(ops) { + t.Error("batch consumer must have its qpu stripped") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleConsumer { + t.Errorf("must reuse role=%s (no worker role), got %q (ok=%v)", RoleConsumer, e.Value, ok) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch { + t.Errorf("consumer must carry %s=%s", CoordinationModeEnv, CoordinationBatch) + } + if mc, ok := mincount(t, cs, ns, group); !ok || mc != 3 { + t.Errorf("consumer gang minCount=%d (ok=%v), want 3 (N-1 split)", mc, ok) + } +} + +// shared mode still stamps role=producer and mode=shared (no regression from the +// mode unification). +func TestSharedStillStampsModeShared(t *testing.T) { + ns, group, job := "default", "sg", "sg-job" + cs := batchJobClientset(ns, job, 3) + m := &webhook.Mutator{Clientset: cs} + p := batchQPUPod(ns, group, "sg-0", job, "0") + p.Annotations[CoordinationAnnotation] = CoordinationShared + + ops := m.Mutate(context.Background(), p) + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer { + t.Errorf("shared producer role=%s", RoleProducer) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationShared { + t.Errorf("shared producer must carry %s=%s", CoordinationModeEnv, CoordinationShared) + } +} diff --git a/python/fluence/providers/base.py b/python/fluence/providers/base.py index 561bca2..ab53160 100644 --- a/python/fluence/providers/base.py +++ b/python/fluence/providers/base.py @@ -79,6 +79,22 @@ def find_my_task(self, pod_uid: str, backend: str, timeout: int) -> "Task | None found or timeout. Returns an opaque Task or None.""" raise NotImplementedError + def find_my_tasks(self, pod_uid: str, backend: str, timeout: int, + n: int) -> "list[Task]": + """BATCH mode: the producer submitted N tasks, all tagged + TAG_KEY=. Return them as a list. Order and identity are + irrelevant -- the gated workers come from one template and are + interchangeable, so each result is assigned to whichever worker is free + when it completes. No per-slot tag and no Job completion index are + required, so this works for ANY gang abstraction (indexed Job, + ReplicaSet/Deployment, StatefulSet, or loose label-grouped pods). + + Vendors override this to list their tagged submissions. The default + raises, so a provider without batch support fails loudly rather than + silently stranding the gang.""" + raise NotImplementedError( + f"{self.name} provider does not implement find_my_tasks (batch mode)") + def is_ready_to_ungate(self, task: "Task") -> bool: """True when the gang should be ungated — queue position == 1 or the task is already RUNNING/terminal. Always implementable.""" diff --git a/python/fluence/sidecar.py b/python/fluence/sidecar.py index d0724e5..9cd798b 100644 --- a/python/fluence/sidecar.py +++ b/python/fluence/sidecar.py @@ -28,7 +28,8 @@ from fluence.providers import resolve_from_env from fluence.providers.base import log -from fluence.ungate import ungate_pods, gated_pods_from_env, namespace_from_env, wait_for_gated_pods +from fluence.ungate import (ungate_pods, gated_pods_from_env, namespace_from_env, + wait_for_gated_pods, ungate_per_result) @@ -77,6 +78,24 @@ def main(): sys.exit(1) log(f"resolved provider: {provider.name}") + # BATCH mode: the producer submitted N tasks (one per slot); release each + # gated consumer the moment ITS task completes, keyed by slot. Same gate + # primitive as shared, fired per-result instead of broadcast. + mode = os.environ.get("FLUENCE_COORDINATION_MODE", "shared") + if mode == "batch" and not observe: + gang_size = int(os.environ.get("FLUENCE_GANG_SIZE", "0")) + tasks = provider.find_my_tasks(pod_uid, backend, discovery_timeout, gang_size) + log(f"batch mode: discovered {len(tasks)} task(s) from {gang_size}-pod gang") + # No completion-index assumption: the workers are interchangeable, so just + # find the gated members and hand each a result as it completes. + gated = gated_pods_from_env() or wait_for_gated_pods( + namespace, gang_group, exclude=pod_name, timeout=ungate_timeout) + log(f"batch mode: {len(gated)} gated worker(s): {gated}") + n_ok = ungate_per_result(provider, tasks, gated, namespace, + poll_interval=poll_interval, timeout=ungate_timeout) + log(f"batch done — {n_ok} worker(s) released per-result") + return + task = provider.find_my_task(pod_uid, backend, discovery_timeout) if task is None: log("ERROR: could not discover quantum task") diff --git a/python/fluence/ungate.py b/python/fluence/ungate.py index a40e662..58b3af6 100644 --- a/python/fluence/ungate.py +++ b/python/fluence/ungate.py @@ -28,48 +28,93 @@ def kubectl(args): return result.stdout.strip() -def ungate_pods(gated_pods, job_id, namespace): - """ - For each gated worker pod: - 1. Stamp the vendor-neutral job-id annotation so the worker can locate - the quantum result. - 2. Remove the scheduling gate so the pod can be scheduled. - - NOTE: priorityClassName is NOT set here — it is immutable after pod creation - (the API server forbids changing any spec field but image/tolerations/ - activeDeadlineSeconds/terminationGracePeriodSeconds on an existing pod). If a - priority class is wanted on the classical gang, the webhook must set it at - admission, when the pod is created. Setting it in the ungate patch made the - whole patch fail atomically, so the gate was never removed and workers stayed - gated. +def ungate_one(pod_name, job_id, namespace): + """Ungate a SINGLE gated pod: stamp its (own) job-id annotation so it can + locate its quantum result, then remove the scheduling gate. Returns True on + a successful gate removal. See the priorityClassName note below. + + priorityClassName is NOT set here -- it is immutable after pod creation, so + the webhook must set it at admission. Setting it in this patch made the whole + patch fail atomically, leaving the pod gated. """ - ok = 0 - for pod_name in gated_pods: - pod_name = pod_name.strip() - if not pod_name: - continue - log(f"ungating pod: {pod_name}") + pod_name = (pod_name or "").strip() + if not pod_name: + return False + log(f"ungating pod: {pod_name}") - if job_id: - try: - kubectl(["annotate", "pod", pod_name, "-n", namespace, - f"{JOB_ID_ANNOTATION}={job_id}", "--overwrite"]) - log(f" patched job id onto {pod_name}: {job_id}") - except RuntimeError as e: - log(f" WARNING: could not annotate {pod_name}: {e}") - else: - log(f" WARNING: no job id to patch onto {pod_name}") - - patch = json.dumps([ - {"op": "remove", "path": "/spec/schedulingGates/0"}, - ]) + if job_id: try: - kubectl(["patch", "pod", pod_name, "-n", namespace, - "--type=json", f"-p={patch}"]) - log(f" removed gate from {pod_name}") - ok += 1 + kubectl(["annotate", "pod", pod_name, "-n", namespace, + f"{JOB_ID_ANNOTATION}={job_id}", "--overwrite"]) + log(f" patched job id onto {pod_name}: {job_id}") except RuntimeError as e: - log(f" WARNING: could not patch {pod_name}: {e}") + log(f" WARNING: could not annotate {pod_name}: {e}") + else: + log(f" WARNING: no job id to patch onto {pod_name}") + + patch = json.dumps([{"op": "remove", "path": "/spec/schedulingGates/0"}]) + try: + kubectl(["patch", "pod", pod_name, "-n", namespace, "--type=json", f"-p={patch}"]) + log(f" removed gate from {pod_name}") + return True + except RuntimeError as e: + log(f" WARNING: could not patch {pod_name}: {e}") + return False + + +def ungate_pods(gated_pods, job_id, namespace): + """SHARED mode: one quantum result is shared by the whole gang, so broadcast + the same job_id to every gated pod and remove all gates together.""" + return sum(1 for p in gated_pods if ungate_one(p, job_id, namespace)) + + +def ungate_per_result(provider, tasks, gated_pods, namespace, poll_interval=10, + timeout=3600, _sleep=time.sleep): + """BATCH mode: the producer submitted N tasks; release each gated worker the + moment ANY task completes -- assigning that task's result to whichever worker + is still free. + + tasks : list of provider Tasks (the producer's N submissions) + gated_pods : list of gated worker pod names (the gang minus the producer) + + Workers come from one template and are interchangeable, so there is no slot + or completion-index mapping -- we never assume the gang is an indexed Job. + A worker is ungated with the job_id of whatever task finished, as it finishes. + Out-of-order completion is fine, and so is heterogeneous task duration: the + first result out wakes the first worker, and so on. We assign at most one task + per worker; any surplus tasks (e.g. the producer's own share) are left for the + producer. Returns the count of workers released. + + This is the only K8s-side difference from shared mode: the same gate + primitive, fired per-result and matched to a free worker, instead of one + shared id broadcast to all. + """ + pending = list(tasks) + free = [p for p in (gated_pods or []) if p] + ok = 0 + deadline = time.time() + timeout + while pending and free and time.time() < deadline: + progressed = False + for task in list(pending): + if not free: + break + try: + if provider.is_ready_to_ungate(task): + jid = provider.job_id(task) + pod = free.pop(0) + if ungate_one(pod, jid, namespace): + ok += 1 + log(f"task ready (job_id={jid}) -> released worker {pod}") + pending.remove(task) + progressed = True + except Exception as e: # noqa: BLE001 - keep polling the other tasks + log(f"poll error (will retry): {e}") + if free and pending and not progressed: + _sleep(poll_interval) + if free: + log(f"WARNING: {len(free)} worker(s) never received a result before timeout: {free}") + elif pending: + log(f"{len(pending)} task(s) not assigned to a worker (left for the producer)") return ok diff --git a/python/tests/test_batch_ungate.py b/python/tests/test_batch_ungate.py new file mode 100644 index 0000000..57fdc62 --- /dev/null +++ b/python/tests/test_batch_ungate.py @@ -0,0 +1,118 @@ +""" +Tests for batch-mode (per-result) ungating in fluence.ungate. + +Mocks kubectl (no cluster) and the provider (controllable, out-of-order task +readiness). Key properties, none of which assume an indexed Job: + - each gated worker is released the moment ANY task completes, + - stamped with that task's own job id, + - workers are interchangeable (no slot / completion-index mapping), + - surplus tasks (the producer's own share) and never-completing tasks are + handled without stranding the rest. +""" +import json + +import fluence.ungate as ung + + +class FakeTask: + def __init__(self, name): + self.name = name + + +class FakeProvider: + """Tasks become ready in the given order, one per poll tick, to force a + specific (out-of-order) completion sequence.""" + + def __init__(self, ready_order): + self._order = list(ready_order) + self.ready = set() + self.tick() # first task ready immediately + + def tick(self): + if self._order: + self.ready.add(self._order.pop(0)) + + def is_ready_to_ungate(self, task): + return task.name in self.ready + + def job_id(self, task): + return f"job-{task.name}" + + +def _record_kubectl(monkeypatch): + calls = [] + monkeypatch.setattr(ung, "kubectl", lambda args: calls.append(list(args)) or "") + return calls + + +def _annotations(calls): + return {a[2]: a[5].split("=", 1)[1] for a in calls if a[0] == "annotate"} + + +def _gate_removals(calls): + return [a[2] for a in calls if a[0] == "patch"] + + +def test_per_result_assigns_each_completed_task_to_a_free_worker(monkeypatch): + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b"), FakeTask("c")] + workers = ["w-x", "w-y", "w-z"] + prov = FakeProvider(ready_order=["c", "a", "b"]) # completes c, a, b + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 3 + # one distinct job id per worker; each worker gate-removed exactly once + assert set(_annotations(calls).values()) == {"job-a", "job-b", "job-c"} + assert sorted(_gate_removals(calls)) == ["w-x", "w-y", "w-z"] + # first result out (c) wakes the first free worker (w-x), etc. -- assignment + # follows completion order, NOT task or worker identity. + assert _annotations(calls)["w-x"] == "job-c" + assert _annotations(calls)["w-y"] == "job-a" + assert _annotations(calls)["w-z"] == "job-b" + + +def test_surplus_tasks_left_for_producer(monkeypatch): + # N tasks, N-1 workers (the producer keeps one result): only N-1 released. + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b"), FakeTask("c")] + workers = ["w-x", "w-y"] + prov = FakeProvider(ready_order=["a", "b", "c"]) + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 2 + assert sorted(_gate_removals(calls)) == ["w-x", "w-y"] + + +def test_never_completing_task_does_not_strand_workers(monkeypatch): + _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b")] + workers = ["w-x", "w-y"] + prov = FakeProvider(ready_order=["a"]) # b never completes + + # one worker released; loop exits at timeout without hanging + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, timeout=0.05, _sleep=lambda _: None) + assert n == 1 + + +def test_works_without_any_completion_index(monkeypatch): + # worker names carry no index at all (e.g. a ReplicaSet's random suffixes). + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("t1"), FakeTask("t2")] + workers = ["batch-rs-7f9-abcde", "batch-rs-7f9-zzzzz"] + prov = FakeProvider(ready_order=["t2", "t1"]) + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 2 + assert sorted(_gate_removals(calls)) == sorted(workers) + + +def test_shared_broadcast_still_uses_one_id(monkeypatch): + """Regression: shared mode (ungate_pods) broadcasts ONE id to all pods.""" + calls = _record_kubectl(monkeypatch) + n = ung.ungate_pods(["a", "b", "c"], "shared-job", "ns") + assert n == 3 + assert _annotations(calls) == {"a": "shared-job", "b": "shared-job", "c": "shared-job"}