diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java index cc974d5045c80..b2c5a0a5a48ac 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.functions.worker; +import com.google.common.annotations.VisibleForTesting; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Supplier; import lombok.CustomLog; import org.apache.pulsar.client.api.Consumer; @@ -43,6 +48,11 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener { private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; private volatile boolean isLeader = false; + // The consumer event listener callbacks (becameActive/becameInactive) run the blocking + // leader-election routines on this dedicated single-threaded executor so that the Pulsar client's + // shared consumer-listener thread is not blocked. The single thread also preserves event ordering. + private final ExecutorService executor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("function-worker-leader")); static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; @@ -90,6 +100,12 @@ public void start() throws PulsarClientException { @Override public void becameActive(Consumer consumer, int partitionId) { + // Run the (blocking) become-leader routine on a dedicated executor so the consumer + // event-listener thread, which is shared with the Pulsar client, is not blocked. + executor.execute(() -> becameActiveInternal(consumer, partitionId)); + } + + private void becameActiveInternal(Consumer consumer, int partitionId) { synchronized (this) { if (isLeader) { return; @@ -151,7 +167,11 @@ public void becameActive(Consumer consumer, int partitionId) { } @Override - public synchronized void becameInactive(Consumer consumer, int partitionId) { + public void becameInactive(Consumer consumer, int partitionId) { + executor.execute(() -> becameInactiveInternal(consumer, partitionId)); + } + + private synchronized void becameInactiveInternal(Consumer consumer, int partitionId) { if (isLeader) { log.info().attr("worker", consumerName).log("Worker lost the leadership."); isLeader = false; @@ -180,10 +200,16 @@ public boolean isLeader() { return isLeader; } + @VisibleForTesting + void joinPendingEventTasks() throws InterruptedException, ExecutionException { + executor.submit(() -> { }).get(); + } + @Override public void close() throws PulsarClientException { if (consumer != null) { consumer.close(); } + executor.shutdown(); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java index 93e951383f078..f6dea177dc9cd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java @@ -138,6 +138,7 @@ public void testLeaderService() throws Exception { verify(mockClient, times(1)).newConsumer(); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertTrue(leaderService.isLeader()); verify(functionMetadataManager, times(1)).getIsInitialized(); @@ -153,6 +154,7 @@ public void testLeaderService() throws Exception { verify(schedulerManager, times((1))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(1)).startFromMessage(messageId); @@ -168,6 +170,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception { verify(mockClient, times(1)).newConsumer(); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertTrue(leaderService.isLeader()); verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any()); @@ -179,6 +182,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception { verify(schedulerManager, times((1))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(1)).start(); @@ -198,6 +202,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore()); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); // should have failed to become leader assertFalse(leaderService.isLeader()); @@ -214,6 +219,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws verify(schedulerManager, times((0))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(0)).startFromMessage(messageId); @@ -234,6 +240,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore( when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore()); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); // should have failed to become leader assertFalse(leaderService.isLeader()); @@ -250,6 +257,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore( verify(schedulerManager, times((0))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);