diff --git a/kafka/build.gradle.kts b/kafka/build.gradle.kts index 6d219d0460..f716847ee6 100644 --- a/kafka/build.gradle.kts +++ b/kafka/build.gradle.kts @@ -48,4 +48,6 @@ dependencies { testImplementation(libs.hamcrest) testImplementation(libs.awaitility) testImplementation(libs.bcpkix.jdk18on) + + testImplementation("org.testcontainers:kafka:1.19.7") } diff --git a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 7eba7438b2..0452cd534b 100644 --- a/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.kafka; +import com.google.common.annotations.VisibleForTesting; import io.jsonwebtoken.io.Encoders; import java.time.Duration; import java.util.Collections; @@ -65,9 +66,11 @@ public abstract class KafkaAbstractSource extends PushSource { private KafkaSourceConfig kafkaSourceConfig; private Thread runnerThread; private long maxPollIntervalMs; + private volatile Thread instanceThread; @Override public void open(Map config, SourceContext sourceContext) throws Exception { + this.instanceThread = Thread.currentThread(); kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext); Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set"); Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set"); @@ -163,12 +166,31 @@ public void close() throws InterruptedException { LOG.info("Kafka source stopped."); } + @VisibleForTesting + void setInstanceThread(Thread instanceThread) { + this.instanceThread = instanceThread; + } + + @VisibleForTesting + void setKafkaSourceConfig(KafkaSourceConfig kafkaSourceConfig) { + this.kafkaSourceConfig = kafkaSourceConfig; + } + + @VisibleForTesting + void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + @SuppressWarnings("unchecked") public void start() { LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic()); + + // 1. REVERT: Keep subscribe on the main thread so initialization errors fail synchronously consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic())); + runnerThread = new Thread(() -> { LOG.info("Kafka source started."); + while (running) { try { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(1L)); @@ -184,20 +206,41 @@ public void start() { index++; } if (!kafkaSourceConfig.isAutoCommitEnabled()) { - // Wait about 2/3 of the time of maxPollIntervalMs. - // so as to avoid waiting for the timeout to be kicked out of the consumer group. CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS); consumer.commitSync(); } } catch (Exception e) { + if (!running) { + LOG.info("Kafka source is shutting down gracefully. Ignoring interrupt."); + break; + } + LOG.error("Error while processing records", e); notifyError(e); + + // Fire the flare to break the I/O deadlock + if (instanceThread != null && instanceThread.isAlive()) { + LOG.warn("Interrupting the Instance Thread to break I/O deadlock."); + instanceThread.interrupt(); + } break; } } }); running = true; runnerThread.setName("Kafka Source Thread"); + + // Update the UncaughtExceptionHandler + runnerThread.setUncaughtExceptionHandler((t, e) -> { + LOG.error("[{}] Uncaught error while consuming records", t.getName(), e); + notifyError(new RuntimeException(e)); + + if (running && instanceThread != null && instanceThread.isAlive()) { + LOG.warn("Interrupting the Instance Thread due to uncaught consumer thread exception."); + instanceThread.interrupt(); + } + }); + runnerThread.start(); } diff --git a/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java b/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java new file mode 100644 index 0000000000..9767a6ce04 --- /dev/null +++ b/kafka/src/test/java/org/apache/pulsar/io/kafka/KafkaSourceDeadlockTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.Collections; + +public class KafkaSourceDeadlockTest { + + @Test(timeOut = 15000) + public void testConnectorBreaksInstanceThreadDeadlock() throws Exception { + KafkaBytesSource source = new KafkaBytesSource(); + + // Inject dependencies using package-private @VisibleForTesting methods + source.setInstanceThread(Thread.currentThread()); + + KafkaSourceConfig config = Mockito.mock(KafkaSourceConfig.class); + Mockito.when(config.getTopic()).thenReturn("test-topic"); + Mockito.when(config.isAutoCommitEnabled()).thenReturn(false); + source.setKafkaSourceConfig(config); + + @SuppressWarnings("unchecked") + Consumer mockConsumer = Mockito.mock(Consumer.class); + source.setConsumer(mockConsumer); + + // Make the mocked poll() block safely so the background thread stays alive in its loop + Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))).thenAnswer(invocation -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + // Trigger the fatal error handling block when interrupted by the simulator + throw new RuntimeException("Simulated fatal Kafka network error", e); + } + return new ConsumerRecords<>(Collections.emptyMap()); + }); + + // Start the background Kafka polling thread + source.start(); + + // Simulate a fatal exception in the background consumer thread + Thread fatalErrorSimulator = new Thread(() -> { + try { + // Wait to ensure the instance thread enters its blocked state first + Thread.sleep(1000); + + // Find the Kafka Source Thread in the JVM and interrupt it + Thread runner = Thread.getAllStackTraces().keySet().stream() + .filter(t -> "Kafka Source Thread".equals(t.getName())) + .findFirst() + .orElse(null); + + if (runner != null) { + runner.interrupt(); + } + } catch (Exception e) { + // Ignore simulator exceptions + } + }); + fatalErrorSimulator.start(); + + // Simulate the instance thread blocking on Pulsar network I/O + boolean wasInterrupted = false; + try { + Thread.sleep(10000); + Assert.fail("Test failed: The instance thread remained deadlocked and was never interrupted."); + } catch (InterruptedException e) { + // Expected behavior: The consumer thread successfully interrupted this thread to break the deadlock + wasInterrupted = true; + } + + // Cleanup the interrupt flag so close() works cleanly + Thread.interrupted(); + source.close(); + + // Verify that the deadlock resolution logic executed successfully + Assert.assertTrue(wasInterrupted, + "The instance thread should have been interrupted by the failing consumer thread to prevent deadlock."); + } +} \ No newline at end of file