From 59b063cc08f43ad0572cd4aefceab182ab463beb Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 18 Jun 2026 11:34:12 -0700 Subject: [PATCH] [fix][functions] Run worker leader-election off the consumer event-listener thread LeaderService implements ConsumerEventListener for the failover-subscription leader election. Its becameActive/becameInactive callbacks ran the full blocking leader-election routine inline: functionMetaDataManager/functionRuntimeManager getIsInitialized().get(), schedulerManager/functionMetaDataManager acquireExclusiveWrite (a retry loop doing createAsync().get(10s) + Thread.sleep while still leader), functionAssignmentTailer.triggerReadToTheEndAndExit().get(), and on becameInactive schedulerManager.close() (which blocks on schedulerLock for an in-flight schedule). These callbacks are dispatched by the Pulsar client on its shared consumer-listener executor (ConsumerImpl.activeConsumerChanged -> externalPinnedExecutor), so this potentially-unbounded leader handshake blocked a thread shared by all consumers/readers on that client, including the worker's own tailers. Run becameActive/becameInactive on a dedicated single-threaded executor and return from the listener callbacks immediately. The single thread preserves the becameActive -> becameInactive ordering. The callback bodies are unchanged, moved into becameActiveInternal/becameInactiveInternal. close() shuts the executor down. A @VisibleForTesting joinPendingEventTasks() barrier keeps LeaderServiceTest deterministic. --- .../functions/worker/LeaderService.java | 28 ++++++++++++++++++- .../functions/worker/LeaderServiceTest.java | 8 ++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java index cc974d5045c80..b2c5a0a5a48ac 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/LeaderService.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.functions.worker; +import com.google.common.annotations.VisibleForTesting; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Supplier; import lombok.CustomLog; import org.apache.pulsar.client.api.Consumer; @@ -43,6 +48,11 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener { private final WorkerConfig workerConfig; private final PulsarClient pulsarClient; private volatile boolean isLeader = false; + // The consumer event listener callbacks (becameActive/becameInactive) run the blocking + // leader-election routines on this dedicated single-threaded executor so that the Pulsar client's + // shared consumer-listener thread is not blocked. The single thread also preserves event ordering. + private final ExecutorService executor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("function-worker-leader")); static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants"; @@ -90,6 +100,12 @@ public void start() throws PulsarClientException { @Override public void becameActive(Consumer consumer, int partitionId) { + // Run the (blocking) become-leader routine on a dedicated executor so the consumer + // event-listener thread, which is shared with the Pulsar client, is not blocked. + executor.execute(() -> becameActiveInternal(consumer, partitionId)); + } + + private void becameActiveInternal(Consumer consumer, int partitionId) { synchronized (this) { if (isLeader) { return; @@ -151,7 +167,11 @@ public void becameActive(Consumer consumer, int partitionId) { } @Override - public synchronized void becameInactive(Consumer consumer, int partitionId) { + public void becameInactive(Consumer consumer, int partitionId) { + executor.execute(() -> becameInactiveInternal(consumer, partitionId)); + } + + private synchronized void becameInactiveInternal(Consumer consumer, int partitionId) { if (isLeader) { log.info().attr("worker", consumerName).log("Worker lost the leadership."); isLeader = false; @@ -180,10 +200,16 @@ public boolean isLeader() { return isLeader; } + @VisibleForTesting + void joinPendingEventTasks() throws InterruptedException, ExecutionException { + executor.submit(() -> { }).get(); + } + @Override public void close() throws PulsarClientException { if (consumer != null) { consumer.close(); } + executor.shutdown(); } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java index 93e951383f078..f6dea177dc9cd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/LeaderServiceTest.java @@ -138,6 +138,7 @@ public void testLeaderService() throws Exception { verify(mockClient, times(1)).newConsumer(); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertTrue(leaderService.isLeader()); verify(functionMetadataManager, times(1)).getIsInitialized(); @@ -153,6 +154,7 @@ public void testLeaderService() throws Exception { verify(schedulerManager, times((1))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(1)).startFromMessage(messageId); @@ -168,6 +170,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception { verify(mockClient, times(1)).newConsumer(); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertTrue(leaderService.isLeader()); verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any()); @@ -179,6 +182,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception { verify(schedulerManager, times((1))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(1)).start(); @@ -198,6 +202,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore()); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); // should have failed to become leader assertFalse(leaderService.isLeader()); @@ -214,6 +219,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws verify(schedulerManager, times((0))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(0)).startFromMessage(messageId); @@ -234,6 +240,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore( when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore()); listenerHolder.get().becameActive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); // should have failed to become leader assertFalse(leaderService.isLeader()); @@ -250,6 +257,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore( verify(schedulerManager, times((0))).initialize(any()); listenerHolder.get().becameInactive(mockConsumer, 0); + leaderService.joinPendingEventTasks(); assertFalse(leaderService.isLeader()); verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);