From dbc0505c4325768e4b13d020a4dbd1a94bdce886 Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Tue, 19 May 2026 09:47:58 -0400 Subject: [PATCH] Remove flaky sleeps from PipelineOperatorAppTest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit stopSuppressesFailureHandler had a race: the controller-release thread and stop() ran concurrently, so the controller's run() could return before stopRequested was set, firing the failure handler from the wrapper's finally block. Tie the release to the fake manager's shutdown() instead — stop() now deterministically sets stopRequested before releasing the controller. Also drop two unnecessary Thread.sleep(50)s: - stopShutsDownControllerManagerAndUnblocksAwaitTermination: replaced the post-start sleep with a Thread.State poll so we only assert awaitTermination is blocking once the waiter thread is actually in a waiting state. - multipleControllerExitsFireHandlerOnlyOnce: removed entirely. awaitTermination() already waits for every wrapped controller's finally block (including reportFailure) to complete. --- .../operator/PipelineOperatorAppTest.java | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/PipelineOperatorAppTest.java b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/PipelineOperatorAppTest.java index bca5fbbc..3e8b1d1e 100644 --- a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/PipelineOperatorAppTest.java +++ b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/PipelineOperatorAppTest.java @@ -5,8 +5,11 @@ import io.kubernetes.client.extended.controller.ControllerManager; import io.kubernetes.client.informer.SharedInformerFactory; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -136,7 +139,7 @@ void stopShutsDownControllerManagerAndUnblocksAwaitTermination() throws Exceptio } }, "test-await-waiter"); waiter.start(); - Thread.sleep(50); + awaitState(waiter, Thread.State.WAITING, Thread.State.TIMED_WAITING); assertTrue(waiter.isAlive(), "awaitTermination() should still be blocking"); app.stop(Duration.ofSeconds(2)); @@ -279,8 +282,10 @@ ControllerManager newControllerManager(SharedInformerFactory factory, Controller }; app.start(List.of(exit1, exit2)); + // awaitTermination returns only after interactiveFakeManager.run() returns, which itself + // waits for every wrapped controller's run() (including the finally block that may invoke + // reportFailure) to complete — so no extra wait is needed here. app.awaitTermination(); - Thread.sleep(50); // let any straggling wrappers finish assertEquals(1, handlerCalls.get(), "failure handler should fire at most once even when multiple controllers exit"); @@ -289,7 +294,10 @@ ControllerManager newControllerManager(SharedInformerFactory factory, Controller @Test void stopSuppressesFailureHandler() throws Exception { // When stop() is called, controller wrappers fire on the way out — but the handler must - // not be invoked because the exit was intentional. + // not be invoked because the exit was intentional. We tie the controller's release to the + // fake manager's shutdown() call so the ordering is deterministic: + // stop() -> stopRequested=true -> manager.shutdown() -> countDown -> controller returns + // -> wrapper finally sees stopRequested=true -> handler suppressed. AtomicInteger handlerCalls = new AtomicInteger(0); Consumer handler = cause -> handlerCalls.incrementAndGet(); @@ -303,18 +311,17 @@ void stopSuppressesFailureHandler() throws Exception { TestablePipelineOperatorApp app = new TestablePipelineOperatorApp(context, null, handler) { @Override ControllerManager newControllerManager(SharedInformerFactory factory, Controller[] controllers) { - return interactiveFakeManager(controllers); + ControllerManager manager = interactiveFakeManager(controllers); + doAnswer(inv -> { + holdLongRunning.countDown(); + return null; + }).when(manager).shutdown(); + return manager; } }; app.start(List.of(longRunning)); - Thread.sleep(50); // let the controller enter run() - - // Release the long-running controller from a separate thread so it can exit cleanly - // after stop() flips stopRequested. - new Thread(holdLongRunning::countDown).start(); app.stop(Duration.ofSeconds(2)); - Thread.sleep(50); // let the wrapper's finally block observe stopRequested assertEquals(0, handlerCalls.get(), "failure handler must not fire during intentional stop"); @@ -322,6 +329,25 @@ ControllerManager newControllerManager(SharedInformerFactory factory, Controller // --- Helpers --- + /** + * Polls until {@code thread} reaches one of the supplied states (or dies). Avoids fixed sleeps + * when a test needs to know the thread has actually entered a blocking call. Fails the test if + * the wait exceeds a few seconds. + */ + private static void awaitState(Thread thread, Thread.State... expected) throws InterruptedException { + Set wanted = new HashSet<>(Arrays.asList(expected)); + long deadline = System.currentTimeMillis() + 2_000; + while (System.currentTimeMillis() < deadline) { + Thread.State s = thread.getState(); + if (s == Thread.State.TERMINATED || wanted.contains(s)) { + return; + } + Thread.sleep(5); + } + throw new AssertionError("Thread " + thread.getName() + " never reached " + wanted + + " (last state " + thread.getState() + ")"); + } + /** * Builds a fake {@link ControllerManager} whose {@code run()} signals it entered, then blocks on * {@code shutdownSignal} until released. {@code shutdown()} releases the signal so {@code run()}