diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 79cfa41bfa4..6771adb8382 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -386,6 +386,12 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // A late error response from a bookie whose request was already superseded by + // a faster (e.g. speculative) read must not flow through the reattempt logic: + // writeSet has been recycled at the moment this entry transitioned to complete. + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = writeSet.indexOf(bookieIndex); @@ -412,15 +418,29 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { + if (isComplete()) { + return false; + } + // Common case: the very first replica responded successfully; no + // speculative retry happened, so there are no slow bookies to mark. + // Skip the snapshot allocation entirely. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + if (numReplicasTried <= 1) { + return super.complete(bookieIndex, host, buffer); + } + // Speculative retry happened: snapshot the addresses of the replicas tried + // before this one BEFORE calling super.complete(), which recycles writeSet + // (see issue #4680). The WriteSet keeps its normal pooled lifecycle. + final BookieId[] slowBookies = new BookieId[numReplicasTried - 1]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(writeSet.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any slow bookies before - // the first successful speculative read as "slow" - for (int i = 0; i < numReplicasTried - 1; i++) { - int slowBookieIndex = writeSet.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + // Mark replicas tried before the first successful response as "slow". + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, eId); } } return completed; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index 2b7ebaaaf6a..87d2544d40e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -189,6 +189,12 @@ private synchronized void translateAndSetFirstError(int rc) { * read result code */ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // Late error response after this request was already completed (e.g. by a + // faster bookie's success): writeSet/orderedEnsemble have been recycled, so + // do not enter logic that would access them. + if (isComplete()) { + return; + } translateAndSetFirstError(rc); if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) { @@ -398,6 +404,9 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = getReplicaIndex(bookieIndex); @@ -422,15 +431,22 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) { + if (isComplete()) { + return false; + } + // Snapshot the slow-bookie addresses BEFORE super.complete() recycles + // orderedEnsemble (see issue #4680). The loop runs for every replica tried + // (including the successful one), so numReplicasTried is at least 1 here. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + final BookieId[] slowBookies = new BookieId[numReplicasTried]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(orderedEnsemble.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer, entryId); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any bookies before the - // first speculative read as slow - for (int i = 0; i < numReplicasTried; i++) { - int slowBookieIndex = orderedEnsemble.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId); + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, entryId); } } return completed; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java new file mode 100644 index 00000000000..2ff3801529c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java @@ -0,0 +1,382 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; + +/** + * Regression test for issue #4680. + * + *
{@code SequenceReadRequest.complete()} delegates to + * {@code SingleLedgerEntryRequest.complete()}, which recycles {@code writeSet}, + * and then the subclass accesses {@code writeSet.get(i)} to register slow + * bookies. Because {@code WriteSetImpl} is pooled via Netty's {@code Recycler}, + * this is a use-after-recycle: the same instance can be handed to another + * read and resized/repopulated, causing either {@code IndexOutOfBoundsException} + * or, worse, silently registering the wrong bookie as slow. + * + *
We inject a tracking {@link DistributionSchedule.WriteSet} via a custom
+ * placement policy and assert that no method on the {@code WriteSet} is
+ * invoked after {@code recycle()} during a sequence read that retries.
+ */
+public class TestSequenceReadWriteSetRecycle extends BookKeeperClusterTestCase {
+
+ private static final DigestType DIGEST = DigestType.CRC32;
+ private static final int SPEC_TIMEOUT_MS = 500;
+
+ public TestSequenceReadWriteSetRecycle() {
+ super(4); // matches the issue scenario: ensemble = 4
+ }
+
+ @Test
+ public void completeMustNotAccessWriteSetAfterRecycle() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setMetadataServiceUri(zkUtil.getMetadataServiceUri())
+ .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class)
+ .setReorderReadSequenceEnabled(true)
+ .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS)
+ .setReadTimeout(60);
+
+ try (BookKeeperTestClient bk =
+ new BookKeeperTestClient(conf, new TestStatsProvider())) {
+ // ensemble = 4, writeQuorum = 2, ackQuorum = 2: the scenario from #4680.
+ LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd());
+ for (int i = 0; i < 5; i++) {
+ writer.addEntry(("entry-" + i).getBytes());
+ }
+ long ledgerId = writer.getId();
+ writer.close();
+
+ try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) {
+ List Reproduce by giving the slow bookie a read timeout shorter than the
+ * sleep window so the in-flight request fails after the speculative read
+ * has already completed the entry.
+ */
+ @Test
+ public void logErrorAndReattemptReadMustNotAccessWriteSetAfterRecycle() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration()
+ .setMetadataServiceUri(zkUtil.getMetadataServiceUri())
+ .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class)
+ .setReorderReadSequenceEnabled(true)
+ .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS)
+ .setReadEntryTimeout(1); // 1s, so the slow bookie's request times out as an error
+
+ try (BookKeeperTestClient bk =
+ new BookKeeperTestClient(conf, new TestStatsProvider())) {
+ LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd());
+ for (int i = 0; i < 5; i++) {
+ writer.addEntry(("entry-" + i).getBytes());
+ }
+ long ledgerId = writer.getId();
+ writer.close();
+
+ try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) {
+ List