Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import io.kubernetes.client.extended.controller.ControllerManager;
import io.kubernetes.client.informer.SharedInformerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -136,7 +139,7 @@ void stopShutsDownControllerManagerAndUnblocksAwaitTermination() throws Exceptio
}
}, "test-await-waiter");
waiter.start();
Thread.sleep(50);
awaitState(waiter, Thread.State.WAITING, Thread.State.TIMED_WAITING);
assertTrue(waiter.isAlive(), "awaitTermination() should still be blocking");

app.stop(Duration.ofSeconds(2));
Expand Down Expand Up @@ -279,8 +282,10 @@ ControllerManager newControllerManager(SharedInformerFactory factory, Controller
};

app.start(List.of(exit1, exit2));
// awaitTermination returns only after interactiveFakeManager.run() returns, which itself
// waits for every wrapped controller's run() (including the finally block that may invoke
// reportFailure) to complete — so no extra wait is needed here.
app.awaitTermination();
Thread.sleep(50); // let any straggling wrappers finish

assertEquals(1, handlerCalls.get(),
"failure handler should fire at most once even when multiple controllers exit");
Expand All @@ -289,7 +294,10 @@ ControllerManager newControllerManager(SharedInformerFactory factory, Controller
@Test
void stopSuppressesFailureHandler() throws Exception {
// When stop() is called, controller wrappers fire on the way out — but the handler must
// not be invoked because the exit was intentional.
// not be invoked because the exit was intentional. We tie the controller's release to the
// fake manager's shutdown() call so the ordering is deterministic:
// stop() -> stopRequested=true -> manager.shutdown() -> countDown -> controller returns
// -> wrapper finally sees stopRequested=true -> handler suppressed.
AtomicInteger handlerCalls = new AtomicInteger(0);
Consumer<Throwable> handler = cause -> handlerCalls.incrementAndGet();

Expand All @@ -303,25 +311,43 @@ void stopSuppressesFailureHandler() throws Exception {
TestablePipelineOperatorApp app = new TestablePipelineOperatorApp(context, null, handler) {
@Override
ControllerManager newControllerManager(SharedInformerFactory factory, Controller[] controllers) {
return interactiveFakeManager(controllers);
ControllerManager manager = interactiveFakeManager(controllers);
doAnswer(inv -> {
holdLongRunning.countDown();
return null;
}).when(manager).shutdown();
return manager;
}
};

app.start(List.of(longRunning));
Thread.sleep(50); // let the controller enter run()

// Release the long-running controller from a separate thread so it can exit cleanly
// after stop() flips stopRequested.
new Thread(holdLongRunning::countDown).start();
app.stop(Duration.ofSeconds(2));
Thread.sleep(50); // let the wrapper's finally block observe stopRequested

assertEquals(0, handlerCalls.get(),
"failure handler must not fire during intentional stop");
}

// --- Helpers ---

/**
* Polls until {@code thread} reaches one of the supplied states (or dies). Avoids fixed sleeps
* when a test needs to know the thread has actually entered a blocking call. Fails the test if
* the wait exceeds a few seconds.
*/
private static void awaitState(Thread thread, Thread.State... expected) throws InterruptedException {
Set<Thread.State> wanted = new HashSet<>(Arrays.asList(expected));
long deadline = System.currentTimeMillis() + 2_000;
while (System.currentTimeMillis() < deadline) {
Thread.State s = thread.getState();
if (s == Thread.State.TERMINATED || wanted.contains(s)) {
return;
}
Thread.sleep(5);
}
throw new AssertionError("Thread " + thread.getName() + " never reached " + wanted
+ " (last state " + thread.getState() + ")");
}

/**
* Builds a fake {@link ControllerManager} whose {@code run()} signals it entered, then blocks on
* {@code shutdownSignal} until released. {@code shutdown()} releases the signal so {@code run()}
Expand Down
Loading