diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 222d444285286..c20d2ef157088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -781,6 +781,9 @@ protected void handleCommandScalableTopicLookup( final long sessionId = commandScalableTopicLookup.getSessionId(); final String topicStr = commandScalableTopicLookup.getTopic(); + // Capture now: the command object is recycled once this handler returns, before the + // async authorization continuation that builds the session runs. + final boolean createIfMissing = commandScalableTopicLookup.isCreateIfMissing(); log.debug().attr("topic", topicStr).attr("sessionId", sessionId) .log("Received ScalableTopicLookup"); @@ -836,7 +839,7 @@ protected void handleCommandScalableTopicLookup( } // Create a DagWatchSession that will send the initial layout and watch for changes var session = new DagWatchSession( - sessionId, topicName, this, resources, service); + sessionId, topicName, this, resources, service, createIfMissing); dagWatchSessions.put(sessionId, session); session.start() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java index e36b8fd7005a4..a7ae18c74bf44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java @@ -75,18 +75,23 @@ public class DagWatchSession implements ScalableTopicResources.MetadataPathListe * real DAG (those require the {@code topic://} domain). */ private final TopicName scalableTopicName; private final String resolvedTopicName; + /** When false, a {@code topic://} lookup of a non-existent scalable topic must not auto-create + * it (set by namespace consumers so a deleted topic isn't resurrected on a per-topic reconnect). */ + private final boolean createIfMissing; private volatile boolean closed = false; public DagWatchSession(long sessionId, TopicName topicName, ServerCnx cnx, ScalableTopicResources resources, - BrokerService brokerService) { + BrokerService brokerService, + boolean createIfMissing) { this.sessionId = sessionId; this.topicName = topicName; this.cnx = cnx; this.resources = resources; this.brokerService = brokerService; + this.createIfMissing = createIfMissing; this.metadataPath = resources.topicPath(topicName); this.scalableTopicName = topicName.toScalableTopic(); this.resolvedTopicName = scalableTopicName.toString(); @@ -139,6 +144,12 @@ public CompletableFuture start() { return buildSyntheticResponse(); } if (topicName.getDomain() == TopicDomain.topic) { + if (!createIfMissing) { + // Caller (e.g. a namespace consumer) opted out of auto-creation: fail + // not-found rather than resurrect a topic that doesn't currently exist. + return CompletableFuture.failedFuture( + new IllegalStateException("Scalable topic not found: " + topicName)); + } return maybeAutoCreateAndBuildResponse(); } return CompletableFuture.failedFuture( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java index 4945650178ed6..94f84267a2c7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java @@ -85,7 +85,7 @@ public void setup() { var store = mock(org.apache.pulsar.metadata.api.MetadataStore.class); when(resources.getStore()).thenReturn(store); - session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService); + session = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService, true); } // --- identity / lifecycle --- @@ -131,6 +131,34 @@ public void testStartFailsWhenTopicMetadataMissingAndAutoCreateDisallowed() { } } + @Test + public void testStartDoesNotAutoCreateWhenCallerOptsOut() { + // A namespace consumer opens its per-topic watch with createIfMissing=false. A topic:// + // lookup with no metadata must then fail not-found WITHOUT consulting auto-create policy or + // creating the topic — so a deleted topic can't be resurrected by a reconnecting watch, even + // when broker/namespace policy would otherwise allow auto-creation. + when(resources.getScalableTopicMetadataAsync(TOPIC, true)) + .thenReturn(CompletableFuture.completedFuture(Optional.empty())); + + DagWatchSession s = new DagWatchSession(SESSION_ID, TOPIC, cnx, resources, brokerService, false); + CompletableFuture future = s.start(); + + assertTrue(future.isCompletedExceptionally()); + try { + future.get(); + fail("expected failure"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + fail("interrupted"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof IllegalStateException, "got: " + e.getCause()); + assertTrue(e.getCause().getMessage().contains("not found"), e.getCause().getMessage()); + } + // The opt-out short-circuits before policy is consulted and nothing is created. + verify(brokerService, never()).isAllowAutoTopicCreationAsync(any(TopicName.class)); + verify(brokerService, never()).getScalableTopicService(); + } + // --- synthetic layout for not-yet-migrated regular topics --- @Test @@ -146,7 +174,7 @@ public void testStartBuildsSyntheticLayoutForNonPartitionedPersistentTopic() thr when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(0))); - DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true); ScalableTopicLayoutResponse response = s.start().get(); assertEquals(response.epoch(), 0L); @@ -174,7 +202,7 @@ public void testStartBuildsSyntheticLayoutForPartitionedPersistentTopic() throws when(brokerService.fetchPartitionedTopicMetadataAsync(regular)) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata(4))); - DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true); ScalableTopicLayoutResponse response = s.start().get(); assertEquals(response.epoch(), 0L); @@ -207,7 +235,7 @@ public void testStartRejectsIndividualPartitionInput() throws Exception { // produce nonsensical -partition-K-partition-J underlying names. TopicName partition = TopicName.get("persistent://tenant/ns/my-partitioned-partition-3"); - DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService); + DagWatchSession s = new DagWatchSession(SESSION_ID, partition, cnx, resources, brokerService, true); CompletableFuture future = s.start(); assertTrue(future.isCompletedExceptionally()); @@ -232,7 +260,7 @@ public void testSyntheticLayoutPushedToClientCarriesResolvedTopicName() { "persistent://tenant/ns/my-regular", 0L, 12345L)), null, null, null, null); - DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService); + DagWatchSession s = new DagWatchSession(SESSION_ID, regular, cnx, resources, brokerService, true); s.pushUpdate(response); ArgumentCaptor captor = ArgumentCaptor.forClass(ByteBuf.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/v5/V5NamespaceConsumerTopicDeletionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/v5/V5NamespaceConsumerTopicDeletionTest.java new file mode 100644 index 0000000000000..6dff19336dde3 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/v5/V5NamespaceConsumerTopicDeletionTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertThrows; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.v5.QueueConsumer; +import org.apache.pulsar.client.api.v5.V5ClientBaseTest; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/** + * Tests for V5 namespace-level (multi-topic) consumers and how they react when scalable topics in the + * namespace are deleted. Expected behavior: + *
    + *
  • property filters select the matching topics — only those get a per-topic consumer;
  • + *
  • when a matched topic is deleted, the watcher notifies the consumer, which stops the per-topic + * consumer for it;
  • + *
  • the deleted topic is not auto-recreated by the namespace consumer, even though the + * broker has auto-topic-creation enabled (a reconnecting per-topic consumer must not resurrect a + * topic the operator just deleted).
  • + *
+ * + *

Lives in {@code org.apache.pulsar.client.impl.v5} to reach the package-private + * {@link MultiTopicQueueConsumer#attachedTopicsForTesting()} accessor. + */ +public class V5NamespaceConsumerTopicDeletionTest extends V5ClientBaseTest { + + private String topicName(String suffix) { + return "topic://" + getNamespace() + "/" + suffix + "-" + + UUID.randomUUID().toString().substring(0, 8); + } + + private MultiTopicQueueConsumer subscribeNamespace(String subscription, + Map propertyFilters) + throws Exception { + QueueConsumer consumer; + if (propertyFilters == null) { + consumer = v5Client.newQueueConsumer(Schema.string()) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .namespace(getNamespace()) + .subscribe(); + } else { + consumer = v5Client.newQueueConsumer(Schema.string()) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .namespace(getNamespace(), propertyFilters) + .subscribe(); + } + return (MultiTopicQueueConsumer) consumer; + } + + /** + * Property filters act as the selector: only topics whose properties match get a per-topic + * consumer; non-matching topics in the same namespace never attach. + */ + @Test + public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception { + String aliceTopic = topicName("alice"); + String bobTopic = topicName("bob"); + admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob")); + + @Cleanup + MultiTopicQueueConsumer consumer = subscribeNamespace("ns-filter", Map.of("owner", "alice")); + + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.attachedTopicsForTesting(), Set.of(aliceTopic))); + assertFalse(consumer.attachedTopicsForTesting().contains(bobTopic)); + } + + /** + * Deleting a matched topic notifies the namespace consumer, which stops the per-topic consumer + * for it (the attached set shrinks). + */ + @Test + public void internalConsumerStopsWhenTopicDeleted() throws Exception { + String keep = topicName("keep"); + String drop = topicName("drop"); + admin.scalableTopics().createScalableTopic(keep, 1); + admin.scalableTopics().createScalableTopic(drop, 1); + + @Cleanup + MultiTopicQueueConsumer consumer = subscribeNamespace("ns-stop", null); + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep, drop))); + + admin.scalableTopics().deleteScalableTopic(drop, true); + + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep))); + } + + /** + * A deleted topic must not be auto-recreated by the namespace consumer, even though the broker has + * auto-topic-creation enabled. Asserted continuously across reconnect/recheck cycles, so a + * reconnecting per-topic consumer cannot quietly resurrect the topic. + */ + @Test + public void deletedTopicIsNotRecreatedByNamespaceConsumer() throws Exception { + String keep = topicName("keep"); + String drop = topicName("drop"); + admin.scalableTopics().createScalableTopic(keep, 1); + admin.scalableTopics().createScalableTopic(drop, 1); + + @Cleanup + MultiTopicQueueConsumer consumer = subscribeNamespace("ns-norecreate", null); + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep, drop))); + + admin.scalableTopics().deleteScalableTopic(drop, true); + Awaitility.await().untilAsserted(() -> + assertEquals(consumer.attachedTopicsForTesting(), Set.of(keep))); + + // The deleted topic stays gone — absent from the broker and never re-attached — held + // continuously for several seconds so a per-topic reconnect or recheck cannot resurrect it. + Awaitility.await() + .during(5, TimeUnit.SECONDS) + .atMost(15, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThrows(PulsarAdminException.NotFoundException.class, + () -> admin.scalableTopics().getMetadata(drop)); + assertFalse(consumer.attachedTopicsForTesting().contains(drop), + "namespace consumer must not re-attach the deleted topic"); + }); + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java index 807adab10b8c3..cdffec8e55443 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/DagWatchClient.java @@ -59,6 +59,10 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { private final PulsarClientImpl v4Client; private final TopicName topicName; private final long sessionId; + /** When false, the broker must not auto-create the scalable topic if it's missing on lookup. + * Namespace (multi-topic) consumers set this false so a deleted topic isn't resurrected by + * a reconnecting per-topic watch. */ + private final boolean createIfMissing; private final AtomicReference currentLayout = new AtomicReference<>(); private final CompletableFuture initialLayoutFuture = new CompletableFuture<>(); private final Backoff reconnectBackoff; @@ -71,8 +75,13 @@ final class DagWatchClient implements DagWatchSession, AutoCloseable { private volatile TopicName resolvedTopicName; DagWatchClient(PulsarClientImpl v4Client, TopicName topicName) { + this(v4Client, topicName, true); + } + + DagWatchClient(PulsarClientImpl v4Client, TopicName topicName, boolean createIfMissing) { this.v4Client = v4Client; this.topicName = topicName; + this.createIfMissing = createIfMissing; this.sessionId = SESSION_ID_GENERATOR.incrementAndGet(); this.reconnectBackoff = Backoff.builder() .initialDelay(Duration.ofMillis(100)) @@ -122,7 +131,7 @@ private void attach(ClientCnx newCnx) { this.cnx = newCnx; newCnx.registerDagWatchSession(sessionId, this); newCnx.ctx().writeAndFlush( - Commands.newScalableTopicLookup(sessionId, topicName.toString())) + Commands.newScalableTopicLookup(sessionId, topicName.toString(), createIfMissing)) .addListener(writeFuture -> { if (!writeFuture.isSuccess()) { newCnx.removeDagWatchSession(sessionId); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java index e407e912f2ed7..806d46f93d847 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl.v5; +import com.google.common.annotations.VisibleForTesting; import io.github.merlimat.slog.Logger; import java.time.Duration; import java.util.ArrayList; @@ -160,7 +161,9 @@ private CompletableFuture openTopic(String topicName, boolean retry) { return CompletableFuture.completedFuture(null); } TopicName topic = V5Utils.parseScalableTopicInput(topicName); - DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic); + // A namespace consumer only attaches to topics the watcher reports as existing; it must + // never auto-create one (so a deleted topic can't be resurrected by a reconnecting watch). + DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic, /* createIfMissing= */ false); // Per-topic message sink: tag each delivered message with the parent scalable // topic for ack routing + display, and forward into the shared mux. No pump // thread; per-segment v4 receive loops fire this sink directly. @@ -254,6 +257,16 @@ private CompletableFuture closeTopic(String topicName) { .log("Per-topic consumer detached")); } + /** + * The scalable topics this namespace consumer currently has an attached per-topic consumer for. + * Lets tests assert that topics are attached/detached as the namespace's matching set changes + * (e.g. that a deleted topic's per-topic consumer is stopped). + */ + @VisibleForTesting + Set attachedTopicsForTesting() { + return new HashSet<>(perTopic.keySet()); + } + // --- QueueConsumer --- @Override diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 2403e85e31a5d..a9626a3b4cb66 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1727,10 +1727,15 @@ public static BaseCommand newWatchTopicListClose(long watcherId, long requestId) // --- Scalable topic commands --- public static ByteBuf newScalableTopicLookup(long sessionId, String topic) { + return newScalableTopicLookup(sessionId, topic, true); + } + + public static ByteBuf newScalableTopicLookup(long sessionId, String topic, boolean createIfMissing) { BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_LOOKUP); cmd.setScalableTopicLookup() .setSessionId(sessionId) - .setTopic(topic); + .setTopic(topic) + .setCreateIfMissing(createIfMissing); return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 729359894d51b..a4c64f83843a1 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -888,6 +888,12 @@ message CommandScalableTopicLookup { // The broker resolves the input to the canonical topic://... identity and // returns it in CommandScalableTopicUpdate.resolved_topic_name. required string topic = 2; + // What to do when the scalable topic is missing on lookup: if true, create it (subject to + // broker/namespace auto-creation policy); if false, fail not-found instead of creating it. + // Namespace (multi-topic) consumers set this false so a deleted topic is never resurrected by + // a reconnecting per-topic watch. Defaults to true to preserve the create-on-lookup behavior + // for explicit single-topic producers/consumers (and for older clients that don't set it). + optional bool create_if_missing = 3 [default = true]; } // Broker -> Client: Used for BOTH initial response and subsequent pushed updates