From 107da4a80ca523d71889253d72c809b1fc7df5d0 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Thu, 30 Apr 2026 10:09:57 +0530 Subject: [PATCH 1/5] [fix][io] Interrupt Kafka Source instance thread on consumer fatal error to prevent deadlock --- .../pulsar/io/kafka/KafkaAbstractSource.java | 30 ++++- .../kafka/source/KafkaSourceDeadlockTest.java | 125 ++++++++++++++++++ 2 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java diff --git a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 7eba7438b2..07193c4b84 100644 --- a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -65,9 +65,11 @@ public abstract class KafkaAbstractSource extends PushSource { private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; private long maxPollIntervalMs; + private volatile Thread instanceThread; @Override public void open(Map config, SourceContext sourceContext) throws Exception { + this.instanceThread = Thread.currentThread(); kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set"); Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set"); @@ -166,9 +168,16 @@ public void close() throws InterruptedException { @SuppressWarnings("unchecked") public void start() { LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); runnerThread = new Thread(() -> { LOG.info("Kafka source started."); + try { + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); + } catch (Exception e) { + LOG.error("Failed to subscribe to Kafka topic", e); + notifyError(e); + return; + } + while (running) { try { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); @@ -185,19 +194,36 @@ public void start() { } if (!kafkaSourceConfig.isAutoCommitEnabled()) { // Wait about 2/3 of the time of maxPollIntervalMs. - // so as to avoid waiting for the timeout to be kicked out of the consumer group. CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS); consumer.commitSync(); } } catch (Exception e) { LOG.error("Error while processing records", e); notifyError(e); + + // Safely interrupt the stuck Instance Thread + if (instanceThread != null && instanceThread.isAlive()) { + LOG.warn("Interrupting the Instance Thread to break I/O deadlock."); + instanceThread.interrupt(); + } break; } } }); running = true; runnerThread.setName("Kafka Source Thread"); + + // Update the UncaughtExceptionHandler + runnerThread.setUncaughtExceptionHandler((t, e) -> { + LOG.error("[{}] Uncaught error while consuming records", t.getName(), e); + notifyError(new RuntimeException(e)); + + if (instanceThread != null && instanceThread.isAlive()) { + LOG.warn("Interrupting the Instance Thread due to uncaught consumer thread exception."); + instanceThread.interrupt(); + } + }); + runnerThread.start(); } diff --git a/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java b/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java new file mode 100644 index 0000000000..58c9dbabdc --- /dev/null +++ b/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kafka.source; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.kafka.KafkaBytesSource; +import org.mockito.Mockito; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + + +public class KafkaSourceDeadlockTest { + + public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); + + private KafkaBytesSource source; + private SourceContext mockContext; + private final String TOPIC = "liveness-test-topic"; + + @BeforeClass + public void setup() throws Exception { + kafka.start(); + + // Send a dummy message into Kafka so the Consumer Thread has something to read + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + try (KafkaProducer producer = new KafkaProducer<>(props)) { + producer.send(new ProducerRecord<>(TOPIC, "key", "test-data".getBytes())).get(); + } + } + + @AfterClass + public void teardown() throws Exception { + Thread.interrupted(); + + if (source != null) { + source.close(); + } + kafka.stop(); + } + + @Test(timeOut = 15000) + public void testConnectorBreaksInstanceThreadDeadlock() throws Exception { + + // Configure and OPEN the source inside the test method + Map config = new HashMap<>(); + config.put("topic", TOPIC); + config.put("bootstrapServers", kafka.getBootstrapServers()); + config.put("groupId", "test-group"); + config.put("autoCommitEnabled", false); + + mockContext = Mockito.mock(SourceContext.class); + source = new KafkaBytesSource(); + source.open(config, mockContext); + + // Verify the consumer thread is working normally + Record record = source.read(); + Assert.assertNotNull(record, "Should have read the initial message from Kafka"); + + // THE SABOTAGE: Force the consumer thread to crash instantly. + // Stopping the Kafka container takes too long + // Instead, we directly find the background thread and interrupt it to trigger the fatal error block. + Thread crashSimulator = new Thread(() -> { + try { + // Wait 1sec to let the Instance Thread (below) get trapped first and find the Kafka Source Thread + Thread.sleep(1000); + Thread runner = Thread.getAllStackTraces().keySet().stream() + .filter(t -> "Kafka Source Thread".equals(t.getName())) + .findFirst() + .orElse(null); + + if (runner != null) { + // This instantly forces a java.lang.InterruptedException inside the consumer loop + runner.interrupt(); + } + } catch (Exception e) {} + }); + crashSimulator.start(); + + // THE TRAP: We simulate the Instance Thread getting stuck on Pulsar I/O + boolean wasInterrupted = false; + try { + // Simulate sendOutputMessage() hanging indefinitely due to a bad network + Thread.sleep(10000); + Assert.fail("Test failed: The instance thread was ignored and never interrupted."); + } catch (InterruptedException e) { + // It actively reached out and snapped this thread out of its deadlock + System.out.println("BUG FIXED: The stuck instance thread was successfully interrupted!"); + wasInterrupted = true; + } + Assert.assertTrue(wasInterrupted, "The thread should have been interrupted by the dying consumer."); + } +} \ No newline at end of file From 19a4536d25a527cc4cb87d7ee5cbf84e0bc85b3b Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Wed, 6 May 2026 16:32:29 +0530 Subject: [PATCH 2/5] build: Add Testcontainers dependency to kafka module for liveness test --- kafka/build.gradle.kts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka/build.gradle.kts b/kafka/build.gradle.kts index 6d219d0460..f716847ee6 100644 --- a/kafka/build.gradle.kts +++ b/kafka/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { testImplementation(libs.hamcrest) testImplementation(libs.awaitility) testImplementation(libs.bcpkix.jdk18on) + + testImplementation("org.testcontainers:kafka:1.19.7") } From 57060ff56b96242f7d88a5d66f19a5a0446dffd9 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Wed, 6 May 2026 20:46:37 +0530 Subject: [PATCH 3/5] refactor: revert consumer.subscribe() back to main thread and check inside the catch block --- .../pulsar/io/kafka/KafkaAbstractSource.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 07193c4b84..7c965c0cfd 100644 --- a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -168,15 +168,12 @@ public void close() throws InterruptedException { @SuppressWarnings("unchecked") public void start() { LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + + // 1. REVERT: Keep subscribe on the main thread so initialization errors fail synchronously + consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); + runnerThread = new Thread(() -> { LOG.info("Kafka source started."); - try { - consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); - } catch (Exception e) { - LOG.error("Failed to subscribe to Kafka topic", e); - notifyError(e); - return; - } while (running) { try { @@ -193,15 +190,19 @@ public void start() { index++; } if (!kafkaSourceConfig.isAutoCommitEnabled()) { - // Wait about 2/3 of the time of maxPollIntervalMs. CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS); consumer.commitSync(); } } catch (Exception e) { + if (!running) { + LOG.info("Kafka source is shutting down gracefully. Ignoring interrupt."); + break; + } + LOG.error("Error while processing records", e); notifyError(e); - // Safely interrupt the stuck Instance Thread + // Fire the flare to break the I/O deadlock if (instanceThread != null && instanceThread.isAlive()) { LOG.warn("Interrupting the Instance Thread to break I/O deadlock."); instanceThread.interrupt(); @@ -218,7 +219,7 @@ public void start() { LOG.error("[{}] Uncaught error while consuming records", t.getName(), e); notifyError(new RuntimeException(e)); - if (instanceThread != null && instanceThread.isAlive()) { + if (running && instanceThread != null && instanceThread.isAlive()) { LOG.warn("Interrupting the Instance Thread due to uncaught consumer thread exception."); instanceThread.interrupt(); } From 3ad58fabac6dc4818e5ebbf13928952bc41685e6 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Sat, 23 May 2026 23:01:24 +0530 Subject: [PATCH 4/5] updated Testcontainers to wait up to 5 minutes for it to boot, so it survives the GitHub Actions CI --- .../pulsar/io/kafka/source/KafkaSourceDeadlockTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java b/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java index 58c9dbabdc..67b584cba2 100644 --- a/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java +++ b/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java @@ -31,7 +31,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -39,7 +39,8 @@ public class KafkaSourceDeadlockTest { - public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); + public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")) + .withStartupTimeout(Duration.ofMinutes(5)); private KafkaBytesSource source; private SourceContext mockContext; From 91d00e0101bce90b93ac2daddd955003df0ee763 Mon Sep 17 00:00:00 2001 From: Praveenkumar76 Date: Sun, 24 May 2026 13:23:11 +0530 Subject: [PATCH 5/5] refactor: the VM runs OOM,so completely eliminate the Testcontainers entire Kafka broker and zookeeper --- .../pulsar/io/kafka/KafkaAbstractSource.java | 16 +++ .../io/kafka/KafkaSourceDeadlockTest.java | 101 ++++++++++++++ .../kafka/source/KafkaSourceDeadlockTest.java | 126 ------------------ 3 files changed, 117 insertions(+), 126 deletions(-) create mode 100644 kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java delete mode 100644 kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java diff --git a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 7c965c0cfd..0452cd534b 100644 --- a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.kafka; +import com.google.common.annotations.VisibleForTesting; import io.jsonwebtoken.io.Encoders; import java.time.Duration; import java.util.Collections; @@ -165,6 +166,21 @@ public void close() throws InterruptedException { LOG.info("Kafka source stopped."); } + @VisibleForTesting + void setInstanceThread(Thread instanceThread) { + this.instanceThread = instanceThread; + } + + @VisibleForTesting + void setKafkaSourceConfig(KafkaSourceConfig kafkaSourceConfig) { + this.kafkaSourceConfig = kafkaSourceConfig; + } + + @VisibleForTesting + void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + @SuppressWarnings("unchecked") public void start() { LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); diff --git a/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java b/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java new file mode 100644 index 0000000000..9767a6ce04 --- /dev/null +++ b/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; + +public class KafkaSourceDeadlockTest { + + @Test(timeOut = 15000) + public void testConnectorBreaksInstanceThreadDeadlock() throws Exception { + KafkaBytesSource source = new KafkaBytesSource(); + + // Inject dependencies using package-private @VisibleForTesting methods + source.setInstanceThread(Thread.currentThread()); + + KafkaSourceConfig config = Mockito.mock(KafkaSourceConfig.class); + Mockito.when(config.getTopic()).thenReturn("test-topic"); + Mockito.when(config.isAutoCommitEnabled()).thenReturn(false); + source.setKafkaSourceConfig(config); + + @SuppressWarnings("unchecked") + Consumer mockConsumer = Mockito.mock(Consumer.class); + source.setConsumer(mockConsumer); + + // Make the mocked poll() block safely so the background thread stays alive in its loop + Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))).thenAnswer(invocation -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // Trigger the fatal error handling block when interrupted by the simulator + throw new RuntimeException("Simulated fatal Kafka network error", e); + } + return new ConsumerRecords<>(Collections.emptyMap()); + }); + + // Start the background Kafka polling thread + source.start(); + + // Simulate a fatal exception in the background consumer thread + Thread fatalErrorSimulator = new Thread(() -> { + try { + // Wait to ensure the instance thread enters its blocked state first + Thread.sleep(1000); + + // Find the Kafka Source Thread in the JVM and interrupt it + Thread runner = Thread.getAllStackTraces().keySet().stream() + .filter(t -> "Kafka Source Thread".equals(t.getName())) + .findFirst() + .orElse(null); + + if (runner != null) { + runner.interrupt(); + } + } catch (Exception e) { + // Ignore simulator exceptions + } + }); + fatalErrorSimulator.start(); + + // Simulate the instance thread blocking on Pulsar network I/O + boolean wasInterrupted = false; + try { + Thread.sleep(10000); + Assert.fail("Test failed: The instance thread remained deadlocked and was never interrupted."); + } catch (InterruptedException e) { + // Expected behavior: The consumer thread successfully interrupted this thread to break the deadlock + wasInterrupted = true; + } + + // Cleanup the interrupt flag so close() works cleanly + Thread.interrupted(); + source.close(); + + // Verify that the deadlock resolution logic executed successfully + Assert.assertTrue(wasInterrupted, + "The instance thread should have been interrupted by the failing consumer thread to prevent deadlock."); + } +} \ No newline at end of file diff --git a/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java b/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java deleted file mode 100644 index 67b584cba2..0000000000 --- a/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaSourceDeadlockTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.io.kafka.source; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.SourceContext; -import org.apache.pulsar.io.kafka.KafkaBytesSource; -import org.mockito.Mockito; -import org.testcontainers.containers.KafkaContainer; -import org.testcontainers.utility.DockerImageName; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - - -public class KafkaSourceDeadlockTest { - - public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")) - .withStartupTimeout(Duration.ofMinutes(5)); - - private KafkaBytesSource source; - private SourceContext mockContext; - private final String TOPIC = "liveness-test-topic"; - - @BeforeClass - public void setup() throws Exception { - kafka.start(); - - // Send a dummy message into Kafka so the Consumer Thread has something to read - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.ByteArraySerializer"); - try (KafkaProducer producer = new KafkaProducer<>(props)) { - producer.send(new ProducerRecord<>(TOPIC, "key", "test-data".getBytes())).get(); - } - } - - @AfterClass - public void teardown() throws Exception { - Thread.interrupted(); - - if (source != null) { - source.close(); - } - kafka.stop(); - } - - @Test(timeOut = 15000) - public void testConnectorBreaksInstanceThreadDeadlock() throws Exception { - - // Configure and OPEN the source inside the test method - Map config = new HashMap<>(); - config.put("topic", TOPIC); - config.put("bootstrapServers", kafka.getBootstrapServers()); - config.put("groupId", "test-group"); - config.put("autoCommitEnabled", false); - - mockContext = Mockito.mock(SourceContext.class); - source = new KafkaBytesSource(); - source.open(config, mockContext); - - // Verify the consumer thread is working normally - Record record = source.read(); - Assert.assertNotNull(record, "Should have read the initial message from Kafka"); - - // THE SABOTAGE: Force the consumer thread to crash instantly. - // Stopping the Kafka container takes too long - // Instead, we directly find the background thread and interrupt it to trigger the fatal error block. - Thread crashSimulator = new Thread(() -> { - try { - // Wait 1sec to let the Instance Thread (below) get trapped first and find the Kafka Source Thread - Thread.sleep(1000); - Thread runner = Thread.getAllStackTraces().keySet().stream() - .filter(t -> "Kafka Source Thread".equals(t.getName())) - .findFirst() - .orElse(null); - - if (runner != null) { - // This instantly forces a java.lang.InterruptedException inside the consumer loop - runner.interrupt(); - } - } catch (Exception e) {} - }); - crashSimulator.start(); - - // THE TRAP: We simulate the Instance Thread getting stuck on Pulsar I/O - boolean wasInterrupted = false; - try { - // Simulate sendOutputMessage() hanging indefinitely due to a bad network - Thread.sleep(10000); - Assert.fail("Test failed: The instance thread was ignored and never interrupted."); - } catch (InterruptedException e) { - // It actively reached out and snapped this thread out of its deadlock - System.out.println("BUG FIXED: The stuck instance thread was successfully interrupted!"); - wasInterrupted = true; - } - Assert.assertTrue(wasInterrupted, "The thread should have been interrupted by the dying consumer."); - } -} \ No newline at end of file