Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.pulsar.functions.worker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import lombok.CustomLog;
import org.apache.pulsar.client.api.Consumer;
Expand All @@ -43,6 +48,11 @@ public class LeaderService implements AutoCloseable, ConsumerEventListener {
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private volatile boolean isLeader = false;
// The consumer event listener callbacks (becameActive/becameInactive) run the blocking
// leader-election routines on this dedicated single-threaded executor so that the Pulsar client's
// shared consumer-listener thread is not blocked. The single thread also preserves event ordering.
private final ExecutorService executor =
Executors.newSingleThreadExecutor(new DefaultThreadFactory("function-worker-leader"));

static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";

Expand Down Expand Up @@ -90,6 +100,12 @@ public void start() throws PulsarClientException {

@Override
public void becameActive(Consumer<?> consumer, int partitionId) {
// Run the (blocking) become-leader routine on a dedicated executor so the consumer
// event-listener thread, which is shared with the Pulsar client, is not blocked.
executor.execute(() -> becameActiveInternal(consumer, partitionId));
}

private void becameActiveInternal(Consumer<?> consumer, int partitionId) {
synchronized (this) {
if (isLeader) {
return;
Expand Down Expand Up @@ -151,7 +167,11 @@ public void becameActive(Consumer<?> consumer, int partitionId) {
}

@Override
public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
public void becameInactive(Consumer<?> consumer, int partitionId) {
executor.execute(() -> becameInactiveInternal(consumer, partitionId));
}

private synchronized void becameInactiveInternal(Consumer<?> consumer, int partitionId) {
if (isLeader) {
log.info().attr("worker", consumerName).log("Worker lost the leadership.");
isLeader = false;
Expand Down Expand Up @@ -180,10 +200,16 @@ public boolean isLeader() {
return isLeader;
}

@VisibleForTesting
void joinPendingEventTasks() throws InterruptedException, ExecutionException {
executor.submit(() -> { }).get();
}

@Override
public void close() throws PulsarClientException {
if (consumer != null) {
consumer.close();
}
executor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ public void testLeaderService() throws Exception {
verify(mockClient, times(1)).newConsumer();

listenerHolder.get().becameActive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertTrue(leaderService.isLeader());

verify(functionMetadataManager, times(1)).getIsInitialized();
Expand All @@ -153,6 +154,7 @@ public void testLeaderService() throws Exception {
verify(schedulerManager, times((1))).initialize(any());

listenerHolder.get().becameInactive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertFalse(leaderService.isLeader());

verify(functionAssignmentTailer, times(1)).startFromMessage(messageId);
Expand All @@ -168,6 +170,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception {
verify(mockClient, times(1)).newConsumer();

listenerHolder.get().becameActive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertTrue(leaderService.isLeader());

verify(functionMetadataManager, times(1)).acquireExclusiveWrite(any());
Expand All @@ -179,6 +182,7 @@ public void testLeaderServiceNoNewScheduling() throws Exception {
verify(schedulerManager, times((1))).initialize(any());

listenerHolder.get().becameInactive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertFalse(leaderService.isLeader());

verify(functionAssignmentTailer, times(1)).start();
Expand All @@ -198,6 +202,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws
when(schedulerManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore());

listenerHolder.get().becameActive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
// should have failed to become leader
assertFalse(leaderService.isLeader());

Expand All @@ -214,6 +219,7 @@ public void testAcquireScheduleManagerExclusiveProducerNotLeaderAnymore() throws
verify(schedulerManager, times((0))).initialize(any());

listenerHolder.get().becameInactive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertFalse(leaderService.isLeader());

verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
Expand All @@ -234,6 +240,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore(
when(functionMetadataManager.acquireExclusiveWrite(any())).thenThrow(new WorkerUtils.NotLeaderAnymore());

listenerHolder.get().becameActive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
// should have failed to become leader
assertFalse(leaderService.isLeader());

Expand All @@ -250,6 +257,7 @@ public void testAcquireFunctionMetadataManagerExclusiveProducerNotLeaderAnymore(
verify(schedulerManager, times((0))).initialize(any());

listenerHolder.get().becameInactive(mockConsumer, 0);
leaderService.joinPendingEventTasks();
assertFalse(leaderService.isLeader());

verify(functionAssignmentTailer, times(0)).startFromMessage(messageId);
Expand Down
Loading