diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 79cfa41bfa4..6771adb8382 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -386,6 +386,12 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // A late error response from a bookie whose request was already superseded by + // a faster (e.g. speculative) read must not flow through the reattempt logic: + // writeSet has been recycled at the moment this entry transitioned to complete. + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = writeSet.indexOf(bookieIndex); @@ -412,15 +418,29 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { + if (isComplete()) { + return false; + } + // Common case: the very first replica responded successfully; no + // speculative retry happened, so there are no slow bookies to mark. + // Skip the snapshot allocation entirely. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + if (numReplicasTried <= 1) { + return super.complete(bookieIndex, host, buffer); + } + // Speculative retry happened: snapshot the addresses of the replicas tried + // before this one BEFORE calling super.complete(), which recycles writeSet + // (see issue #4680). The WriteSet keeps its normal pooled lifecycle. + final BookieId[] slowBookies = new BookieId[numReplicasTried - 1]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(writeSet.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any slow bookies before - // the first successful speculative read as "slow" - for (int i = 0; i < numReplicasTried - 1; i++) { - int slowBookieIndex = writeSet.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + // Mark replicas tried before the first successful response as "slow". + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, eId); } } return completed; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index 2b7ebaaaf6a..87d2544d40e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -189,6 +189,12 @@ private synchronized void translateAndSetFirstError(int rc) { * read result code */ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // Late error response after this request was already completed (e.g. by a + // faster bookie's success): writeSet/orderedEnsemble have been recycled, so + // do not enter logic that would access them. + if (isComplete()) { + return; + } translateAndSetFirstError(rc); if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) { @@ -398,6 +404,9 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = getReplicaIndex(bookieIndex); @@ -422,15 +431,22 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) { + if (isComplete()) { + return false; + } + // Snapshot the slow-bookie addresses BEFORE super.complete() recycles + // orderedEnsemble (see issue #4680). The loop runs for every replica tried + // (including the successful one), so numReplicasTried is at least 1 here. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + final BookieId[] slowBookies = new BookieId[numReplicasTried]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(orderedEnsemble.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer, entryId); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any bookies before the - // first speculative read as slow - for (int i = 0; i < numReplicasTried; i++) { - int slowBookieIndex = orderedEnsemble.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId); + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, entryId); } } return completed; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java new file mode 100644 index 00000000000..2ff3801529c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java @@ -0,0 +1,382 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; + +/** + * Regression test for issue #4680. + * + *

{@code SequenceReadRequest.complete()} delegates to + * {@code SingleLedgerEntryRequest.complete()}, which recycles {@code writeSet}, + * and then the subclass accesses {@code writeSet.get(i)} to register slow + * bookies. Because {@code WriteSetImpl} is pooled via Netty's {@code Recycler}, + * this is a use-after-recycle: the same instance can be handed to another + * read and resized/repopulated, causing either {@code IndexOutOfBoundsException} + * or, worse, silently registering the wrong bookie as slow. + * + *

We inject a tracking {@link DistributionSchedule.WriteSet} via a custom + * placement policy and assert that no method on the {@code WriteSet} is + * invoked after {@code recycle()} during a sequence read that retries. + */ +public class TestSequenceReadWriteSetRecycle extends BookKeeperClusterTestCase { + + private static final DigestType DIGEST = DigestType.CRC32; + private static final int SPEC_TIMEOUT_MS = 500; + + public TestSequenceReadWriteSetRecycle() { + super(4); // matches the issue scenario: ensemble = 4 + } + + @Test + public void completeMustNotAccessWriteSetAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS) + .setReadTimeout(60); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + // ensemble = 4, writeQuorum = 2, ackQuorum = 2: the scenario from #4680. + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + for (int i = 0; i < 5; i++) { + writer.addEntry(("entry-" + i).getBytes()); + } + long ledgerId = writer.getId(); + writer.close(); + + try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) { + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + + // For entry 1 the round-robin write set is [1, 2]. Sleeping ensemble[1] + // forces SequenceReadRequest to speculatively retry to ensemble[2], + // which exercises the slow-bookie registration loop in complete(). + BookieId firstReplica = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(firstReplica, sleepLatch); + try { + assertTrue("read must succeed via speculative retry", + reader.readEntries(1, 1).hasMoreElements()); + sleepLatch.countDown(); + Thread.sleep(1000); + } finally { + sleepLatch.countDown(); + } + + assertNoViolation(bk); + } + } + } + + /** + * After {@code entry.complete()} succeeds (via a speculative retry) and + * recycles {@code writeSet}, a late error response from the originally + * slow bookie still flows through + * {@code SequenceReadRequest.logErrorAndReattemptRead}, which calls + * {@code writeSet.indexOf(bookieIndex)}, another use-after-recycle. + * + *

Reproduce by giving the slow bookie a read timeout shorter than the + * sleep window so the in-flight request fails after the speculative read + * has already completed the entry. + */ + @Test + public void logErrorAndReattemptReadMustNotAccessWriteSetAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS) + .setReadEntryTimeout(1); // 1s, so the slow bookie's request times out as an error + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + for (int i = 0; i < 5; i++) { + writer.addEntry(("entry-" + i).getBytes()); + } + long ledgerId = writer.getId(); + writer.close(); + + try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) { + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + + // Sleep ensemble[1] for longer than readEntryTimeout so its request + // returns a timeout error after the speculative retry has completed. + BookieId slowBookie = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(slowBookie, sleepLatch); + try { + // entry 1: writeSet=[1,2]. Bookie 1 sleeps; spec retries to bookie 2; + // bookie 2 responds OK, then complete() recycles writeSet. + assertTrue("read must succeed via speculative retry", + reader.readEntries(1, 1).hasMoreElements()); + + // Wait long enough for the slow bookie's read to time out and the + // late error response to flow through logErrorAndReattemptRead. + Thread.sleep(2500); + + assertNoViolation(bk); + } finally { + sleepLatch.countDown(); + } + } + } + } + + @Test + public void readLacCompleteMustNotAccessOrderedEnsembleAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setFirstSpeculativeReadLACTimeout(SPEC_TIMEOUT_MS) + .setReadTimeout(60); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + writer.addEntry("entry-0".getBytes()); + long ledgerId = writer.getId(); + + try (LedgerHandle reader = bk.openLedgerNoRecovery(ledgerId, DIGEST, passwd())) { + writer.addEntry("entry-1".getBytes()); + writer.addEntry("entry-2".getBytes()); + + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId firstReplica = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(firstReplica, sleepLatch); + try { + ReadLacResult result = new ReadLacResult(); + reader.asyncReadLastConfirmedAndEntry(1, 60000, false, result, null); + result.await(); + assertTrue("readLAC must succeed via speculative retry", + result.rc == BKException.Code.OK && result.entry != null); + sleepLatch.countDown(); + Thread.sleep(1000); + } finally { + sleepLatch.countDown(); + } + + assertNoViolation(bk); + } finally { + writer.close(); + } + } + } + + @Test + public void readLacLateErrorMustNotAccessOrderedEnsembleAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setFirstSpeculativeReadLACTimeout(SPEC_TIMEOUT_MS) + .setReadEntryTimeout(1); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + writer.addEntry("entry-0".getBytes()); + long ledgerId = writer.getId(); + + try (LedgerHandle reader = bk.openLedgerNoRecovery(ledgerId, DIGEST, passwd())) { + writer.addEntry("entry-1".getBytes()); + writer.addEntry("entry-2".getBytes()); + + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId slowBookie = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(slowBookie, sleepLatch); + try { + ReadLacResult result = new ReadLacResult(); + reader.asyncReadLastConfirmedAndEntry(1, 60000, false, result, null); + result.await(); + assertTrue("readLAC must succeed via speculative retry", + result.rc == BKException.Code.OK && result.entry != null); + + Thread.sleep(2500); + + assertNoViolation(bk); + } finally { + sleepLatch.countDown(); + } + } finally { + writer.close(); + } + } + } + + private static void assertNoViolation(BookKeeperTestClient bk) { + TrackingPlacementPolicy policy = (TrackingPlacementPolicy) bk.getPlacementPolicy(); + String violation = policy.violation.get(); + assertNull("WriteSet method '" + violation + "' was invoked after recycle()", violation); + } + + private static byte[] passwd() { + return "pwd".getBytes(StandardCharsets.UTF_8); + } + + static final class ReadLacResult implements AsyncCallback.ReadLastConfirmedAndEntryCallback { + private final CountDownLatch doneLatch = new CountDownLatch(1); + private int rc = BKException.Code.UnexpectedConditionException; + private LedgerEntry entry; + + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + this.rc = rc; + this.entry = entry; + doneLatch.countDown(); + } + + void await() throws InterruptedException { + assertTrue("readLAC callback did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + } + } + + /** + * Placement policy whose {@code reorderReadSequence} wraps the result with a + * {@link RecycleTrackingWriteSet}. This is the only injection point for a + * custom {@code WriteSet} on the read path that does not require changes to + * production code. + */ + public static class TrackingPlacementPolicy extends RackawareEnsemblePlacementPolicy { + private final AtomicReference violation = new AtomicReference<>(); + + @Override + public DistributionSchedule.WriteSet reorderReadSequence( + List ensemble, + BookiesHealthInfo bookiesHealthInfo, + DistributionSchedule.WriteSet writeSet) { + DistributionSchedule.WriteSet reordered = + super.reorderReadSequence(ensemble, bookiesHealthInfo, writeSet); + return new RecycleTrackingWriteSet(reordered, violation); + } + + @Override + public DistributionSchedule.WriteSet reorderReadLACSequence( + List ensemble, + BookiesHealthInfo bookiesHealthInfo, + DistributionSchedule.WriteSet writeSet) { + DistributionSchedule.WriteSet reordered = + super.reorderReadLACSequence(ensemble, bookiesHealthInfo, writeSet); + return new RecycleTrackingWriteSet(reordered, violation); + } + } + + /** + * Wrapper that records the first method invoked after {@code recycle()}. + * The wrapper deliberately does not propagate {@code recycle()} + * to the delegate: keeping the delegate out of the recycler pool means + * any post-recycle calls still observe consistent data, so a caller bug + * shows up as a recorded violation rather than a flaky exception. + */ + static final class RecycleTrackingWriteSet implements DistributionSchedule.WriteSet { + private final DistributionSchedule.WriteSet delegate; + private final AtomicReference violation; + private volatile boolean recycled; + + RecycleTrackingWriteSet(DistributionSchedule.WriteSet delegate, AtomicReference violation) { + this.delegate = delegate; + this.violation = violation; + } + + private void check(String op) { + if (recycled) { + violation.compareAndSet(null, op); + } + } + + @Override + public int size() { + check("size"); + return delegate.size(); + } + + @Override + public boolean contains(int i) { + check("contains"); + return delegate.contains(i); + } + + @Override + public int get(int i) { + check("get(" + i + ")"); + return delegate.get(i); + } + + @Override + public int set(int i, int idx) { + check("set"); + return delegate.set(i, idx); + } + + @Override + public int indexOf(int idx) { + check("indexOf"); + return delegate.indexOf(idx); + } + + @Override + public void sort() { + check("sort"); + delegate.sort(); + } + + @Override + public void addMissingIndices(int max) { + check("addMissingIndices"); + delegate.addMissingIndices(max); + } + + @Override + public void moveAndShift(int from, int to) { + check("moveAndShift"); + delegate.moveAndShift(from, to); + } + + @Override + public DistributionSchedule.WriteSet copy() { + check("copy"); + return delegate.copy(); + } + + @Override public void recycle() { + recycled = true; + // intentionally not delegating recycle(); see class javadoc. + } + } +}