Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kafka/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ dependencies {
testImplementation(libs.hamcrest)
testImplementation(libs.awaitility)
testImplementation(libs.bcpkix.jdk18on)

testImplementation("org.testcontainers:kafka:1.19.7")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.kafka;

import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.io.Encoders;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -65,9 +66,11 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
private KafkaSourceConfig kafkaSourceConfig;
private Thread runnerThread;
private long maxPollIntervalMs;
private volatile Thread instanceThread;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.instanceThread = Thread.currentThread();
kafkaSourceConfig = KafkaSourceConfig.load(config, sourceContext);
Objects.requireNonNull(kafkaSourceConfig.getTopic(), "Kafka topic is not set");
Objects.requireNonNull(kafkaSourceConfig.getBootstrapServers(), "Kafka bootstrapServers is not set");
Expand Down Expand Up @@ -163,12 +166,31 @@ public void close() throws InterruptedException {
LOG.info("Kafka source stopped.");
}

@VisibleForTesting
void setInstanceThread(Thread instanceThread) {
this.instanceThread = instanceThread;
}

@VisibleForTesting
void setKafkaSourceConfig(KafkaSourceConfig kafkaSourceConfig) {
this.kafkaSourceConfig = kafkaSourceConfig;
}

@VisibleForTesting
void setConsumer(Consumer<Object, Object> consumer) {
this.consumer = consumer;
}

@SuppressWarnings("unchecked")
public void start() {
LOG.info("Starting subscribe kafka source on {}", kafkaSourceConfig.getTopic());

// 1. REVERT: Keep subscribe on the main thread so initialization errors fail synchronously
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));

runnerThread = new Thread(() -> {
LOG.info("Kafka source started.");

while (running) {
try {
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(1L));
Expand All @@ -184,20 +206,41 @@ public void start() {
index++;
}
if (!kafkaSourceConfig.isAutoCommitEnabled()) {
// Wait about 2/3 of the time of maxPollIntervalMs.
// so as to avoid waiting for the timeout to be kicked out of the consumer group.
CompletableFuture.allOf(futures).get(maxPollIntervalMs * 2 / 3, TimeUnit.MILLISECONDS);
consumer.commitSync();
}
} catch (Exception e) {
if (!running) {
LOG.info("Kafka source is shutting down gracefully. Ignoring interrupt.");
break;
}

LOG.error("Error while processing records", e);
notifyError(e);

// Fire the flare to break the I/O deadlock
if (instanceThread != null && instanceThread.isAlive()) {
LOG.warn("Interrupting the Instance Thread to break I/O deadlock.");
instanceThread.interrupt();
}
break;
}
}
});
running = true;
runnerThread.setName("Kafka Source Thread");

// Update the UncaughtExceptionHandler
runnerThread.setUncaughtExceptionHandler((t, e) -> {
LOG.error("[{}] Uncaught error while consuming records", t.getName(), e);
notifyError(new RuntimeException(e));

if (running && instanceThread != null && instanceThread.isAlive()) {
LOG.warn("Interrupting the Instance Thread due to uncaught consumer thread exception.");
instanceThread.interrupt();
}
});

runnerThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.time.Duration;
import java.util.Collections;

public class KafkaSourceDeadlockTest {

@Test(timeOut = 15000)
public void testConnectorBreaksInstanceThreadDeadlock() throws Exception {
KafkaBytesSource source = new KafkaBytesSource();

// Inject dependencies using package-private @VisibleForTesting methods
source.setInstanceThread(Thread.currentThread());

KafkaSourceConfig config = Mockito.mock(KafkaSourceConfig.class);
Mockito.when(config.getTopic()).thenReturn("test-topic");
Mockito.when(config.isAutoCommitEnabled()).thenReturn(false);
source.setKafkaSourceConfig(config);

@SuppressWarnings("unchecked")
Consumer<Object, Object> mockConsumer = Mockito.mock(Consumer.class);
source.setConsumer(mockConsumer);

// Make the mocked poll() block safely so the background thread stays alive in its loop
Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))).thenAnswer(invocation -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// Trigger the fatal error handling block when interrupted by the simulator
throw new RuntimeException("Simulated fatal Kafka network error", e);
}
return new ConsumerRecords<>(Collections.emptyMap());
});

// Start the background Kafka polling thread
source.start();

// Simulate a fatal exception in the background consumer thread
Thread fatalErrorSimulator = new Thread(() -> {
try {
// Wait to ensure the instance thread enters its blocked state first
Thread.sleep(1000);

// Find the Kafka Source Thread in the JVM and interrupt it
Thread runner = Thread.getAllStackTraces().keySet().stream()
.filter(t -> "Kafka Source Thread".equals(t.getName()))
.findFirst()
.orElse(null);

if (runner != null) {
runner.interrupt();
}
} catch (Exception e) {
// Ignore simulator exceptions
}
});
fatalErrorSimulator.start();

// Simulate the instance thread blocking on Pulsar network I/O
boolean wasInterrupted = false;
try {
Thread.sleep(10000);
Assert.fail("Test failed: The instance thread remained deadlocked and was never interrupted.");
} catch (InterruptedException e) {
// Expected behavior: The consumer thread successfully interrupted this thread to break the deadlock
wasInterrupted = true;
}

// Cleanup the interrupt flag so close() works cleanly
Thread.interrupted();
source.close();

// Verify that the deadlock resolution logic executed successfully
Assert.assertTrue(wasInterrupted,
"The instance thread should have been interrupted by the failing consumer thread to prevent deadlock.");
}
}