diff --git a/pkg/webhook/handlers/quantum.go b/pkg/webhook/handlers/quantum.go index 26a09b0..b0ce218 100644 --- a/pkg/webhook/handlers/quantum.go +++ b/pkg/webhook/handlers/quantum.go @@ -81,6 +81,18 @@ const ( // for, and never dedup tasks meant to be distinct. CoordinationIndependent = "independent" + // CoordinationBatch (brokered): one broker (index 0) submits ALL N tasks to + // the same vendor queue, recording slot i -> task id_i. Each worker is gated + // and ungated with ITS OWN id_i the moment task_i COMPLETES (per-result, not + // broadcast). The broker holds the only node during the long vendor wait; + // workers materialize just-in-time to post-process. This is the shared + // machinery generalized from one shared result to N per-slot results -- it + // trades a little worker-startup latency for a large node-time saving, since + // N-1 workers no longer squat nodes through the queue wait. Submitting from + // one pod (vs N) also lets the broker throttle to the vendor concurrency cap; + // results returning out of order is a non-issue (each keyed to its slot id). + CoordinationBatch = "batch" + // ProducerGroupSuffix names the producer's own group-of-one: -producer // (minCount 1) so it schedules alone and never deadlocks against the gated // consumer gang. @@ -125,7 +137,7 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * // member. The REAL submit happens in THIS pod; the sidecar is added only for // observe-only telemetry. (independent mode routes every member here -> N // standalone producers, each owning its task and its own queue wait.) - if mode != CoordinationShared || g == "" || n <= 1 { + if (mode != CoordinationShared && mode != CoordinationBatch) || g == "" || n <= 1 { ops := interceptorOps(pod) if observe { sc := sidecarFor(m) @@ -137,11 +149,14 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * return ops } - // shared mode: promote one member to producer; the rest are gated consumers. + // shared / batch: same two roles. Promote one member to producer (index 0), + // the rest are gated consumers. The mode (shared|batch) rides along and + // decides the producer's submit fan-out and the sidecar's ungate strategy -- + // it does NOT create new roles. if isProducer(ctx, m, pod, g) { - return h.mutateProducer(ctx, m, pod, g) + return h.mutateProducer(ctx, m, pod, g, mode, n) } - return h.mutateConsumer(ctx, m, pod, g, n) + return h.mutateConsumer(ctx, m, pod, g, n, mode) } // mutateProducer wires the single producer member (indexed-Job completion index @@ -149,18 +164,28 @@ func (h *quantumHandler) Mutate(ctx context.Context, m webhook.MutatorAPI, pod * // and runs the REAL submit, the interceptor in tag mode, RBAC, and the sidecar // told which consumer group to ungate (FLUENCE_GANG_GROUP). The producer is one // of the N members, so the application is NOT run an extra time. Never gated. -func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string) []spec.Op { +func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group, mode string, n int32) []spec.Op { pg := group + ProducerGroupSuffix m.EnsurePodGroup(ctx, pod.Namespace, pg, pod.Name, 1) ops := linkGroupOps(pod, pg) ops = append(ops, interceptorOps(pod)...) // tag mode: the producer submits for real ops = append(ops, roleEnvOps(pod, RoleProducer)...) // FLUENCE_COORDINATION_ROLE=producer + ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: the producer's submit fan-out + // the producer's workload needs N (in batch it submits N tasks). + ops = append(ops, setContainerEnvOps(pod, corev1.EnvVar{Name: GangSizeEnv, Value: strconv.Itoa(int(n))})...) sc := sidecarFor(m) sc.EnsureRBAC(ctx, pod.Namespace) - extra := []corev1.EnvVar{{Name: GangGroupEnv, Value: group}} + // The sidecar needs the gang group (whom to ungate), the mode (broadcast one + // id vs release each consumer by its own per-slot result), and N (in batch, + // the number of tasks/slots to map). + extra := []corev1.EnvVar{ + {Name: GangGroupEnv, Value: group}, + {Name: CoordinationModeEnv, Value: mode}, + {Name: GangSizeEnv, Value: strconv.Itoa(int(n))}, + } ops = append(ops, sc.ContainerOps(pod, false, extra)...) - log.Printf("[fluence-webhook] quantum producer %s/%s — group %s (ungates consumers %q)", - pod.Namespace, pod.Name, pg, group) + log.Printf("[fluence-webhook] quantum producer %s/%s — group %s, mode=%s, N=%d (ungates consumers %q)", + pod.Namespace, pod.Name, pg, mode, n, group) return ops } @@ -171,7 +196,7 @@ func (h *quantumHandler) mutateProducer(ctx context.Context, m webhook.MutatorAP // role-aware consumer reads those and fetches the shared result instead of // submitting — so the consumer never calls the vendor submit, and needs neither // the interceptor nor a faux flag. -func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32) []spec.Op { +func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAPI, pod *corev1.Pod, group string, n int32, mode string) []spec.Op { m.EnsurePodGroup(ctx, pod.Namespace, group, pod.Name, n-1) ops := linkGroupOps(pod, group) // Express the wait as the GENERAL dependency primitive: this consumer depends @@ -180,6 +205,7 @@ func (h *quantumHandler) mutateConsumer(ctx context.Context, m webhook.MutatorAP dep := Dependency{Kind: DependencyKindQuantumSubmit, Producer: group + ProducerGroupSuffix, Gate: QuantumGate} ops = append(ops, dep.applyOps(pod)...) ops = append(ops, consumerEnvOps(pod)...) + ops = append(ops, modeEnvOps(pod, mode)...) // shared|batch: how the sidecar releases this consumer // A gated consumer never runs the QPU task — it only fetches the producer's // shared result — so it must not hold the Fluxion quantum resource. Leaving it // would make Fluxion allocate a qpu per consumer, capping the gang at the @@ -383,6 +409,16 @@ const ( RoleProducer = "producer" RoleConsumer = "consumer" + // CoordinationModeEnv carries the coordination mode (shared|batch) to the + // workload and the sidecar. Roles are unchanged across modes -- the producer + // (index 0) always submits and runs the sidecar; consumers are always gated. + // Only WHAT the producer submits (one task vs N) and HOW the sidecar ungates + // (one shared id broadcast vs each consumer released by its own per-slot + // result) differ, and that is exactly what this mode selects. + CoordinationModeEnv = "FLUENCE_COORDINATION_MODE" + // GangSizeEnv tells the producer the gang size N (in batch it submits N). + GangSizeEnv = "FLUENCE_GANG_SIZE" + // QuantumJobIDAnnotation is the vendor-neutral task id the ungating sidecar // stamps on each consumer (mirrors python/fluence/ungate.py JOB_ID_ANNOTATION), // BEFORE removing the gate. Surfaced into FLUENCE_QUANTUM_JOB_ID via the @@ -408,6 +444,11 @@ func consumerEnvOps(pod *corev1.Pod) []spec.Op { return ops } +// modeEnvOps sets FLUENCE_COORDINATION_MODE= on each non-sidecar container. +func modeEnvOps(pod *corev1.Pod, mode string) []spec.Op { + return setContainerEnvOps(pod, corev1.EnvVar{Name: CoordinationModeEnv, Value: mode}) +} + // setContainerEnvOps appends env var e to every non-sidecar container that does // not already define it, returning the patch ops and mutating pod in place. func setContainerEnvOps(pod *corev1.Pod, e corev1.EnvVar) []spec.Op { diff --git a/pkg/webhook/handlers/quantum_batch_test.go b/pkg/webhook/handlers/quantum_batch_test.go new file mode 100644 index 0000000..639de19 --- /dev/null +++ b/pkg/webhook/handlers/quantum_batch_test.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "context" + "testing" + + "github.com/converged-computing/fluence/pkg/webhook" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +// batchQPUPod is a quantum workload pod owned by a Job of parallelism N, with +// coordination=batch and a completion index. Same two roles as shared: index 0 +// is the producer, any other index is a consumer -- batch changes only the +// submit fan-out and ungate strategy (carried by FLUENCE_COORDINATION_MODE). +func batchQPUPod(ns, group, name, job, index string) *corev1.Pod { + p := qpuPod("fluence") + p.Name = name + p.Namespace = ns + p.Labels = map[string]string{webhook.GroupLabel: group} + p.Annotations = map[string]string{ + CoordinationAnnotation: CoordinationBatch, + CompletionIndexAnnotation: index, + } + p.OwnerReferences = []metav1.OwnerReference{{Kind: "Job", Name: job}} + return p +} + +func batchJobClientset(ns, job string, n int32) *fake.Clientset { + return fake.NewSimpleClientset(&batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{Name: job, Namespace: ns}, + Spec: batchv1.JobSpec{Parallelism: &n, Completions: &n}, + }) +} + +// The batch PRODUCER (index 0) is wired exactly like the shared producer -- own +// group-of-one (minCount 1), real sidecar, never gated, keeps its qpu, role=producer +// -- and additionally carries mode=batch and the gang size N so its workload +// submits N tasks. No new role, no extra pod. +func TestBatchProducerIsProducerWithMode(t *testing.T) { + ns, group, job := "default", "bg", "bg-job" + cs := batchJobClientset(ns, job, 4) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-0", job, "0")) + + if !hasSidecarOp(ops) { + t.Error("batch producer must get the real sidecar") + } + if hasGateOp(ops) { + t.Error("batch producer must NOT be gated") + } + if hasDropQuantumResourceOp(ops) { + t.Error("batch producer must KEEP its qpu resource") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer { + t.Errorf("must reuse role=%s (no broker role), got %q (ok=%v)", RoleProducer, e.Value, ok) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch { + t.Errorf("producer must carry %s=%s", CoordinationModeEnv, CoordinationBatch) + } + if e, ok := envOp(ops, GangSizeEnv); !ok || e.Value != "4" { + t.Errorf("producer must be told the gang size %s=4, got %q (ok=%v)", GangSizeEnv, e.Value, ok) + } + if mc, ok := mincount(t, cs, ns, group+ProducerGroupSuffix); !ok || mc != 1 { + t.Errorf("producer group-of-one minCount=%d (ok=%v), want 1", mc, ok) + } +} + +// A batch CONSUMER (index != 0) is wired exactly like the shared consumer -- +// gated, role=consumer, joins the gang at minCount N-1, qpu stripped -- and +// additionally carries mode=batch so the sidecar releases it by its OWN result. +func TestBatchConsumerIsConsumerWithMode(t *testing.T) { + ns, group, job := "default", "bg", "bg-job" + cs := batchJobClientset(ns, job, 4) + m := &webhook.Mutator{Clientset: cs} + + ops := m.Mutate(context.Background(), batchQPUPod(ns, group, "bg-2", job, "2")) + + if !hasGateOp(ops) { + t.Error("batch consumer must be gated") + } + if hasSidecarOp(ops) { + t.Error("batch consumer (gated) must NOT get a sidecar") + } + if !hasDropQuantumResourceOp(ops) { + t.Error("batch consumer must have its qpu stripped") + } + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleConsumer { + t.Errorf("must reuse role=%s (no worker role), got %q (ok=%v)", RoleConsumer, e.Value, ok) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationBatch { + t.Errorf("consumer must carry %s=%s", CoordinationModeEnv, CoordinationBatch) + } + if mc, ok := mincount(t, cs, ns, group); !ok || mc != 3 { + t.Errorf("consumer gang minCount=%d (ok=%v), want 3 (N-1 split)", mc, ok) + } +} + +// shared mode still stamps role=producer and mode=shared (no regression from the +// mode unification). +func TestSharedStillStampsModeShared(t *testing.T) { + ns, group, job := "default", "sg", "sg-job" + cs := batchJobClientset(ns, job, 3) + m := &webhook.Mutator{Clientset: cs} + p := batchQPUPod(ns, group, "sg-0", job, "0") + p.Annotations[CoordinationAnnotation] = CoordinationShared + + ops := m.Mutate(context.Background(), p) + if e, ok := envOp(ops, CoordinationRoleEnv); !ok || e.Value != RoleProducer { + t.Errorf("shared producer role=%s", RoleProducer) + } + if e, ok := envOp(ops, CoordinationModeEnv); !ok || e.Value != CoordinationShared { + t.Errorf("shared producer must carry %s=%s", CoordinationModeEnv, CoordinationShared) + } +} diff --git a/python/fluence/providers/base.py b/python/fluence/providers/base.py index 561bca2..ab53160 100644 --- a/python/fluence/providers/base.py +++ b/python/fluence/providers/base.py @@ -79,6 +79,22 @@ def find_my_task(self, pod_uid: str, backend: str, timeout: int) -> "Task | None found or timeout. Returns an opaque Task or None.""" raise NotImplementedError + def find_my_tasks(self, pod_uid: str, backend: str, timeout: int, + n: int) -> "list[Task]": + """BATCH mode: the producer submitted N tasks, all tagged + TAG_KEY=. Return them as a list. Order and identity are + irrelevant -- the gated workers come from one template and are + interchangeable, so each result is assigned to whichever worker is free + when it completes. No per-slot tag and no Job completion index are + required, so this works for ANY gang abstraction (indexed Job, + ReplicaSet/Deployment, StatefulSet, or loose label-grouped pods). + + Vendors override this to list their tagged submissions. The default + raises, so a provider without batch support fails loudly rather than + silently stranding the gang.""" + raise NotImplementedError( + f"{self.name} provider does not implement find_my_tasks (batch mode)") + def is_ready_to_ungate(self, task: "Task") -> bool: """True when the gang should be ungated — queue position == 1 or the task is already RUNNING/terminal. Always implementable.""" diff --git a/python/fluence/sidecar.py b/python/fluence/sidecar.py index d0724e5..9cd798b 100644 --- a/python/fluence/sidecar.py +++ b/python/fluence/sidecar.py @@ -28,7 +28,8 @@ from fluence.providers import resolve_from_env from fluence.providers.base import log -from fluence.ungate import ungate_pods, gated_pods_from_env, namespace_from_env, wait_for_gated_pods +from fluence.ungate import (ungate_pods, gated_pods_from_env, namespace_from_env, + wait_for_gated_pods, ungate_per_result) @@ -77,6 +78,24 @@ def main(): sys.exit(1) log(f"resolved provider: {provider.name}") + # BATCH mode: the producer submitted N tasks (one per slot); release each + # gated consumer the moment ITS task completes, keyed by slot. Same gate + # primitive as shared, fired per-result instead of broadcast. + mode = os.environ.get("FLUENCE_COORDINATION_MODE", "shared") + if mode == "batch" and not observe: + gang_size = int(os.environ.get("FLUENCE_GANG_SIZE", "0")) + tasks = provider.find_my_tasks(pod_uid, backend, discovery_timeout, gang_size) + log(f"batch mode: discovered {len(tasks)} task(s) from {gang_size}-pod gang") + # No completion-index assumption: the workers are interchangeable, so just + # find the gated members and hand each a result as it completes. + gated = gated_pods_from_env() or wait_for_gated_pods( + namespace, gang_group, exclude=pod_name, timeout=ungate_timeout) + log(f"batch mode: {len(gated)} gated worker(s): {gated}") + n_ok = ungate_per_result(provider, tasks, gated, namespace, + poll_interval=poll_interval, timeout=ungate_timeout) + log(f"batch done — {n_ok} worker(s) released per-result") + return + task = provider.find_my_task(pod_uid, backend, discovery_timeout) if task is None: log("ERROR: could not discover quantum task") diff --git a/python/fluence/ungate.py b/python/fluence/ungate.py index a40e662..58b3af6 100644 --- a/python/fluence/ungate.py +++ b/python/fluence/ungate.py @@ -28,48 +28,93 @@ def kubectl(args): return result.stdout.strip() -def ungate_pods(gated_pods, job_id, namespace): - """ - For each gated worker pod: - 1. Stamp the vendor-neutral job-id annotation so the worker can locate - the quantum result. - 2. Remove the scheduling gate so the pod can be scheduled. - - NOTE: priorityClassName is NOT set here — it is immutable after pod creation - (the API server forbids changing any spec field but image/tolerations/ - activeDeadlineSeconds/terminationGracePeriodSeconds on an existing pod). If a - priority class is wanted on the classical gang, the webhook must set it at - admission, when the pod is created. Setting it in the ungate patch made the - whole patch fail atomically, so the gate was never removed and workers stayed - gated. +def ungate_one(pod_name, job_id, namespace): + """Ungate a SINGLE gated pod: stamp its (own) job-id annotation so it can + locate its quantum result, then remove the scheduling gate. Returns True on + a successful gate removal. See the priorityClassName note below. + + priorityClassName is NOT set here -- it is immutable after pod creation, so + the webhook must set it at admission. Setting it in this patch made the whole + patch fail atomically, leaving the pod gated. """ - ok = 0 - for pod_name in gated_pods: - pod_name = pod_name.strip() - if not pod_name: - continue - log(f"ungating pod: {pod_name}") + pod_name = (pod_name or "").strip() + if not pod_name: + return False + log(f"ungating pod: {pod_name}") - if job_id: - try: - kubectl(["annotate", "pod", pod_name, "-n", namespace, - f"{JOB_ID_ANNOTATION}={job_id}", "--overwrite"]) - log(f" patched job id onto {pod_name}: {job_id}") - except RuntimeError as e: - log(f" WARNING: could not annotate {pod_name}: {e}") - else: - log(f" WARNING: no job id to patch onto {pod_name}") - - patch = json.dumps([ - {"op": "remove", "path": "/spec/schedulingGates/0"}, - ]) + if job_id: try: - kubectl(["patch", "pod", pod_name, "-n", namespace, - "--type=json", f"-p={patch}"]) - log(f" removed gate from {pod_name}") - ok += 1 + kubectl(["annotate", "pod", pod_name, "-n", namespace, + f"{JOB_ID_ANNOTATION}={job_id}", "--overwrite"]) + log(f" patched job id onto {pod_name}: {job_id}") except RuntimeError as e: - log(f" WARNING: could not patch {pod_name}: {e}") + log(f" WARNING: could not annotate {pod_name}: {e}") + else: + log(f" WARNING: no job id to patch onto {pod_name}") + + patch = json.dumps([{"op": "remove", "path": "/spec/schedulingGates/0"}]) + try: + kubectl(["patch", "pod", pod_name, "-n", namespace, "--type=json", f"-p={patch}"]) + log(f" removed gate from {pod_name}") + return True + except RuntimeError as e: + log(f" WARNING: could not patch {pod_name}: {e}") + return False + + +def ungate_pods(gated_pods, job_id, namespace): + """SHARED mode: one quantum result is shared by the whole gang, so broadcast + the same job_id to every gated pod and remove all gates together.""" + return sum(1 for p in gated_pods if ungate_one(p, job_id, namespace)) + + +def ungate_per_result(provider, tasks, gated_pods, namespace, poll_interval=10, + timeout=3600, _sleep=time.sleep): + """BATCH mode: the producer submitted N tasks; release each gated worker the + moment ANY task completes -- assigning that task's result to whichever worker + is still free. + + tasks : list of provider Tasks (the producer's N submissions) + gated_pods : list of gated worker pod names (the gang minus the producer) + + Workers come from one template and are interchangeable, so there is no slot + or completion-index mapping -- we never assume the gang is an indexed Job. + A worker is ungated with the job_id of whatever task finished, as it finishes. + Out-of-order completion is fine, and so is heterogeneous task duration: the + first result out wakes the first worker, and so on. We assign at most one task + per worker; any surplus tasks (e.g. the producer's own share) are left for the + producer. Returns the count of workers released. + + This is the only K8s-side difference from shared mode: the same gate + primitive, fired per-result and matched to a free worker, instead of one + shared id broadcast to all. + """ + pending = list(tasks) + free = [p for p in (gated_pods or []) if p] + ok = 0 + deadline = time.time() + timeout + while pending and free and time.time() < deadline: + progressed = False + for task in list(pending): + if not free: + break + try: + if provider.is_ready_to_ungate(task): + jid = provider.job_id(task) + pod = free.pop(0) + if ungate_one(pod, jid, namespace): + ok += 1 + log(f"task ready (job_id={jid}) -> released worker {pod}") + pending.remove(task) + progressed = True + except Exception as e: # noqa: BLE001 - keep polling the other tasks + log(f"poll error (will retry): {e}") + if free and pending and not progressed: + _sleep(poll_interval) + if free: + log(f"WARNING: {len(free)} worker(s) never received a result before timeout: {free}") + elif pending: + log(f"{len(pending)} task(s) not assigned to a worker (left for the producer)") return ok diff --git a/python/tests/test_batch_ungate.py b/python/tests/test_batch_ungate.py new file mode 100644 index 0000000..57fdc62 --- /dev/null +++ b/python/tests/test_batch_ungate.py @@ -0,0 +1,118 @@ +""" +Tests for batch-mode (per-result) ungating in fluence.ungate. + +Mocks kubectl (no cluster) and the provider (controllable, out-of-order task +readiness). Key properties, none of which assume an indexed Job: + - each gated worker is released the moment ANY task completes, + - stamped with that task's own job id, + - workers are interchangeable (no slot / completion-index mapping), + - surplus tasks (the producer's own share) and never-completing tasks are + handled without stranding the rest. +""" +import json + +import fluence.ungate as ung + + +class FakeTask: + def __init__(self, name): + self.name = name + + +class FakeProvider: + """Tasks become ready in the given order, one per poll tick, to force a + specific (out-of-order) completion sequence.""" + + def __init__(self, ready_order): + self._order = list(ready_order) + self.ready = set() + self.tick() # first task ready immediately + + def tick(self): + if self._order: + self.ready.add(self._order.pop(0)) + + def is_ready_to_ungate(self, task): + return task.name in self.ready + + def job_id(self, task): + return f"job-{task.name}" + + +def _record_kubectl(monkeypatch): + calls = [] + monkeypatch.setattr(ung, "kubectl", lambda args: calls.append(list(args)) or "") + return calls + + +def _annotations(calls): + return {a[2]: a[5].split("=", 1)[1] for a in calls if a[0] == "annotate"} + + +def _gate_removals(calls): + return [a[2] for a in calls if a[0] == "patch"] + + +def test_per_result_assigns_each_completed_task_to_a_free_worker(monkeypatch): + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b"), FakeTask("c")] + workers = ["w-x", "w-y", "w-z"] + prov = FakeProvider(ready_order=["c", "a", "b"]) # completes c, a, b + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 3 + # one distinct job id per worker; each worker gate-removed exactly once + assert set(_annotations(calls).values()) == {"job-a", "job-b", "job-c"} + assert sorted(_gate_removals(calls)) == ["w-x", "w-y", "w-z"] + # first result out (c) wakes the first free worker (w-x), etc. -- assignment + # follows completion order, NOT task or worker identity. + assert _annotations(calls)["w-x"] == "job-c" + assert _annotations(calls)["w-y"] == "job-a" + assert _annotations(calls)["w-z"] == "job-b" + + +def test_surplus_tasks_left_for_producer(monkeypatch): + # N tasks, N-1 workers (the producer keeps one result): only N-1 released. + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b"), FakeTask("c")] + workers = ["w-x", "w-y"] + prov = FakeProvider(ready_order=["a", "b", "c"]) + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 2 + assert sorted(_gate_removals(calls)) == ["w-x", "w-y"] + + +def test_never_completing_task_does_not_strand_workers(monkeypatch): + _record_kubectl(monkeypatch) + tasks = [FakeTask("a"), FakeTask("b")] + workers = ["w-x", "w-y"] + prov = FakeProvider(ready_order=["a"]) # b never completes + + # one worker released; loop exits at timeout without hanging + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, timeout=0.05, _sleep=lambda _: None) + assert n == 1 + + +def test_works_without_any_completion_index(monkeypatch): + # worker names carry no index at all (e.g. a ReplicaSet's random suffixes). + calls = _record_kubectl(monkeypatch) + tasks = [FakeTask("t1"), FakeTask("t2")] + workers = ["batch-rs-7f9-abcde", "batch-rs-7f9-zzzzz"] + prov = FakeProvider(ready_order=["t2", "t1"]) + + n = ung.ungate_per_result(prov, tasks, workers, "ns", + poll_interval=0, _sleep=lambda _: prov.tick()) + assert n == 2 + assert sorted(_gate_removals(calls)) == sorted(workers) + + +def test_shared_broadcast_still_uses_one_id(monkeypatch): + """Regression: shared mode (ungate_pods) broadcasts ONE id to all pods.""" + calls = _record_kubectl(monkeypatch) + n = ung.ungate_pods(["a", "b", "c"], "shared-job", "ns") + assert n == 3 + assert _annotations(calls) == {"a": "shared-job", "b": "shared-job", "c": "shared-job"}