Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,9 @@ protected void handleCommandScalableTopicLookup(

final long sessionId = commandScalableTopicLookup.getSessionId();
final String topicStr = commandScalableTopicLookup.getTopic();
// Capture now: the command object is recycled once this handler returns, before the
// async authorization continuation that builds the session runs.
final boolean createIfMissing = commandScalableTopicLookup.isCreateIfMissing();

log.debug().attr("topic", topicStr).attr("sessionId", sessionId)
.log("Received ScalableTopicLookup");
Expand Down Expand Up @@ -836,7 +839,7 @@ protected void handleCommandScalableTopicLookup(
}
// Create a DagWatchSession that will send the initial layout and watch for changes
var session = new DagWatchSession(
sessionId, topicName, this, resources, service);
sessionId, topicName, this, resources, service, createIfMissing);
dagWatchSessions.put(sessionId, session);

session.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,23 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe
* real DAG (those require the {@code topic://} domain). */
private final TopicName scalableTopicName;
private final String resolvedTopicName;
/** When false, a {@code topic://} lookup of a non-existent scalable topic must not auto-create
* it (set by namespace consumers so a deleted topic isn't resurrected on a per-topic reconnect). */
private final boolean createIfMissing;
private volatile boolean closed = false;

public DagWatchSession(long sessionId,
TopicName topicName,
ServerCnx cnx,
ScalableTopicResources resources,
BrokerService brokerService) {
BrokerService brokerService,
boolean createIfMissing) {
this.sessionId = sessionId;
this.topicName = topicName;
this.cnx = cnx;
this.resources = resources;
this.brokerService = brokerService;
this.createIfMissing = createIfMissing;
this.metadataPath = resources.topicPath(topicName);
this.scalableTopicName = topicName.toScalableTopic();
this.resolvedTopicName = scalableTopicName.toString();
Expand Down Expand Up @@ -139,6 +144,12 @@ public CompletableFuture<ScalableTopicLayoutResponse> start() {
return buildSyntheticResponse();
}
if (topicName.getDomain() == TopicDomain.topic) {
if (!createIfMissing) {
// Caller (e.g. a namespace consumer) opted out of auto-creation: fail
// not-found rather than resurrect a topic that doesn't currently exist.
return CompletableFuture.failedFuture(
new IllegalStateException("Scalable topic not found: " + topicName));
}
return maybeAutoCreateAndBuildResponse();
}
return CompletableFuture.failedFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void setup() {
var store = mock(org.apache.pulsar.metadata.api.MetadataStore.class);
when(resources.getStore()).thenReturn(store);

session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService);
session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService, true);
}

// --- identity / lifecycle ---
Expand Down Expand Up @@ -131,6 +131,34 @@ public void testStartFailsWhenTopicMetadataMissingAndAutoCreateDisallowed() {
}
}

@Test
public void testStartDoesNotAutoCreateWhenCallerOptsOut() {
// A namespace consumer opens its per-topic watch with createIfMissing=false. A topic://
// lookup with no metadata must then fail not-found WITHOUT consulting auto-create policy or
// creating the topic — so a deleted topic can't be resurrected by a reconnecting watch, even
// when broker/namespace policy would otherwise allow auto-creation.
when(resources.getScalableTopicMetadataAsync(TOPIC, true))
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));

DagWatchSession s = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService, false);
CompletableFuture<ScalableTopicLayoutResponse> future = s.start();

assertTrue(future.isCompletedExceptionally());
try {
future.get();
fail("expected failure");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("interrupted");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof IllegalStateException, "got: " + e.getCause());
assertTrue(e.getCause().getMessage().contains("not found"), e.getCause().getMessage());
}
// The opt-out short-circuits before policy is consulted and nothing is created.
verify(brokerService, never()).isAllowAutoTopicCreationAsync(any(TopicName.class));
verify(brokerService, never()).getScalableTopicService();
}

// --- synthetic layout for not-yet-migrated regular topics ---

@Test
Expand All @@ -146,7 +174,7 @@ public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() thr
when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0)));

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true);
ScalableTopicLayoutResponse response = s.start().get();

assertEquals(response.epoch(), 0L);
Expand Down Expand Up @@ -174,7 +202,7 @@ public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws
when(brokerService.fetchPartitionedTopicMetadataAsync(regular))
.thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4)));

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true);
ScalableTopicLayoutResponse response = s.start().get();

assertEquals(response.epoch(), 0L);
Expand Down Expand Up @@ -207,7 +235,7 @@ public void testStartRejectsIndividualPartitionInput() throws Exception {
// produce nonsensical -partition-K-partition-J underlying names.
TopicName partition = TopicName.get("persistent://tenant/ns/my-partitioned-partition-3");

DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService);
DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService, true);
CompletableFuture<ScalableTopicLayoutResponse> future = s.start();

assertTrue(future.isCompletedExceptionally());
Expand All @@ -232,7 +260,7 @@ public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() {
"persistent://tenant/ns/my-regular", 0L, 12345L)),
null, null, null, null);

DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService);
DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true);
s.pushUpdate(response);

ArgumentCaptor<ByteBuf> captor = ArgumentCaptor.forClass(ByteBuf.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.v5;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.v5.QueueConsumer;
import org.apache.pulsar.client.api.v5.V5ClientBaseTest;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

/**
* Tests for V5 namespace-level (multi-topic) consumers and how they react when scalable topics in the
* namespace are deleted. Expected behavior:
* <ul>
* <li>property filters select the matching topics — only those get a per-topic consumer;</li>
* <li>when a matched topic is deleted, the watcher notifies the consumer, which stops the per-topic
* consumer for it;</li>
* <li>the deleted topic is <b>not</b> auto-recreated by the namespace consumer, even though the
* broker has auto-topic-creation enabled (a reconnecting per-topic consumer must not resurrect a
* topic the operator just deleted).</li>
* </ul>
*
* <p>Lives in {@code org.apache.pulsar.client.impl.v5} to reach the package-private
* {@link MultiTopicQueueConsumer#attachedTopicsForTesting()} accessor.
*/
public class V5NamespaceConsumerTopicDeletionTest extends V5ClientBaseTest {

private String topicName(String suffix) {
return "topic://" + getNamespace() + "/" + suffix + "-"
+ UUID.randomUUID().toString().substring(0, 8);
}

private MultiTopicQueueConsumer<String> subscribeNamespace(String subscription,
Map<String, String> propertyFilters)
throws Exception {
QueueConsumer<String> consumer;
if (propertyFilters == null) {
consumer = v5Client.newQueueConsumer(Schema.string())
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.namespace(getNamespace())
.subscribe();
} else {
consumer = v5Client.newQueueConsumer(Schema.string())
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
.namespace(getNamespace(), propertyFilters)
.subscribe();
}
return (MultiTopicQueueConsumer<String>) consumer;
}

/**
* Property filters act as the selector: only topics whose properties match get a per-topic
* consumer; non-matching topics in the same namespace never attach.
*/
@Test
public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception {
String aliceTopic = topicName("alice");
String bobTopic = topicName("bob");
admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice"));
admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob"));

@Cleanup
MultiTopicQueueConsumer<String> consumer = subscribeNamespace("ns-filter", Map.of("owner", "alice"));

Awaitility.await().untilAsserted(() ->
assertEquals(consumer.attachedTopicsForTesting(), Set.of(aliceTopic)));
assertFalse(consumer.attachedTopicsForTesting().contains(bobTopic));
}

/**
* Deleting a matched topic notifies the namespace consumer, which stops the per-topic consumer
* for it (the attached set shrinks).
*/
@Test
public void internalConsumerStopsWhenTopicDeleted() throws Exception {
String keep = topicName("keep");
String drop = topicName("drop");
admin.scalableTopics().createScalableTopic(keep, 1);
admin.scalableTopics().createScalableTopic(drop, 1);

@Cleanup
MultiTopicQueueConsumer<String> consumer = subscribeNamespace("ns-stop", null);
Awaitility.await().untilAsserted(() ->
assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep, drop)));

admin.scalableTopics().deleteScalableTopic(drop, true);

Awaitility.await().untilAsserted(() ->
assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep)));
}

/**
* A deleted topic must not be auto-recreated by the namespace consumer, even though the broker has
* auto-topic-creation enabled. Asserted continuously across reconnect/recheck cycles, so a
* reconnecting per-topic consumer cannot quietly resurrect the topic.
*/
@Test
public void deletedTopicIsNotRecreatedByNamespaceConsumer() throws Exception {
String keep = topicName("keep");
String drop = topicName("drop");
admin.scalableTopics().createScalableTopic(keep, 1);
admin.scalableTopics().createScalableTopic(drop, 1);

@Cleanup
MultiTopicQueueConsumer<String> consumer = subscribeNamespace("ns-norecreate", null);
Awaitility.await().untilAsserted(() ->
assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep, drop)));

admin.scalableTopics().deleteScalableTopic(drop, true);
Awaitility.await().untilAsserted(() ->
assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep)));

// The deleted topic stays gone — absent from the broker and never re-attached — held
// continuously for several seconds so a per-topic reconnect or recheck cannot resurrect it.
Awaitility.await()
.during(5, TimeUnit.SECONDS)
.atMost(15, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThrows(PulsarAdminException.NotFoundException.class,
() -> admin.scalableTopics().getMetadata(drop));
assertFalse(consumer.attachedTopicsForTesting().contains(drop),
"namespace consumer must not re-attach the deleted topic");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable {
private final PulsarClientImpl v4Client;
private final TopicName topicName;
private final long sessionId;
/** When false, the broker must not auto-create the scalable topic if it's missing on lookup.
* Namespace (multi-topic) consumers set this false so a deleted topic isn't resurrected by
* a reconnecting per-topic watch. */
private final boolean createIfMissing;
private final AtomicReference<ClientSegmentLayout> currentLayout = new AtomicReference<>();
private final CompletableFuture<ClientSegmentLayout> initialLayoutFuture = new CompletableFuture<>();
private final Backoff reconnectBackoff;
Expand All @@ -71,8 +75,13 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable {
private volatile TopicName resolvedTopicName;

DagWatchClient(PulsarClientImpl v4Client, TopicName topicName) {
this(v4Client, topicName, true);
}

DagWatchClient(PulsarClientImpl v4Client, TopicName topicName, boolean createIfMissing) {
this.v4Client = v4Client;
this.topicName = topicName;
this.createIfMissing = createIfMissing;
this.sessionId = SESSION_ID_GENERATOR.incrementAndGet();
this.reconnectBackoff = Backoff.builder()
.initialDelay(Duration.ofMillis(100))
Expand Down Expand Up @@ -122,7 +131,7 @@ private void attach(ClientCnx newCnx) {
this.cnx = newCnx;
newCnx.registerDagWatchSession(sessionId, this);
newCnx.ctx().writeAndFlush(
Commands.newScalableTopicLookup(sessionId, topicName.toString()))
Commands.newScalableTopicLookup(sessionId, topicName.toString(), createIfMissing))
.addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
newCnx.removeDagWatchSession(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.v5;

import com.google.common.annotations.VisibleForTesting;
import io.github.merlimat.slog.Logger;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -160,7 +161,9 @@ private CompletableFuture<Void> openTopic(String topicName, boolean retry) {
return CompletableFuture.completedFuture(null);
}
TopicName topic = V5Utils.parseScalableTopicInput(topicName);
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
// A namespace consumer only attaches to topics the watcher reports as existing; it must
// never auto-create one (so a deleted topic can't be resurrected by a reconnecting watch).
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic, /* createIfMissing= */ false);
// Per-topic message sink: tag each delivered message with the parent scalable
// topic for ack routing + display, and forward into the shared mux. No pump
// thread; per-segment v4 receive loops fire this sink directly.
Expand Down Expand Up @@ -254,6 +257,16 @@ private CompletableFuture<Void> closeTopic(String topicName) {
.log("Per-topic consumer detached"));
}

/**
* The scalable topics this namespace consumer currently has an attached per-topic consumer for.
* Lets tests assert that topics are attached/detached as the namespace's matching set changes
* (e.g. that a deleted topic's per-topic consumer is stopped).
*/
@VisibleForTesting
Set<String> attachedTopicsForTesting() {
return new HashSet<>(perTopic.keySet());
}

// --- QueueConsumer ---

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,10 +1727,15 @@ public static BaseCommand newWatchTopicListClose(long watcherId, long requestId)
// --- Scalable topic commands ---

public static ByteBuf newScalableTopicLookup(long sessionId, String topic) {
return newScalableTopicLookup(sessionId, topic, true);
}

public static ByteBuf newScalableTopicLookup(long sessionId, String topic, boolean createIfMissing) {
BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_LOOKUP);
cmd.setScalableTopicLookup()
.setSessionId(sessionId)
.setTopic(topic);
.setTopic(topic)
.setCreateIfMissing(createIfMissing);
return serializeWithSize(cmd);
}

Expand Down
6 changes: 6 additions & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,12 @@ message CommandScalableTopicLookup {
// The broker resolves the input to the canonical topic://... identity and
// returns it in CommandScalableTopicUpdate.resolved_topic_name.
required string topic = 2;
// What to do when the scalable topic is missing on lookup: if true, create it (subject to
// broker/namespace auto-creation policy); if false, fail not-found instead of creating it.
// Namespace (multi-topic) consumers set this false so a deleted topic is never resurrected by
// a reconnecting per-topic watch. Defaults to true to preserve the create-on-lookup behavior
// for explicit single-topic producers/consumers (and for older clients that don't set it).
optional bool create_if_missing = 3 [default = true];
}

// Broker -> Client: Used for BOTH initial response and subsequent pushed updates
Expand Down
Loading