From ef9b810f67101ec73068a4cca6d4d303ee56b34e Mon Sep 17 00:00:00 2001 From: void-ptr974 Date: Sun, 10 May 2026 19:55:32 +0800 Subject: [PATCH] Fix WriteSet use-after-recycle in sequence reads Sequence read completion recycled the WriteSet before registering slow bookies. When a speculative read completed the entry, the subclass could still access the recycled WriteSet. Late error callbacks could also enter reattempt logic and call indexOf() after the request had already completed. Snapshot slow bookie addresses before delegating to the shared complete path, and ignore late errors after sequence read requests have completed. Apply the same guard to ReadLAC sequence reads, where orderedEnsemble is also recycled on completion. Add regression coverage for normal sequence reads and ReadLAC covering both completion and late-error paths. --- .../bookkeeper/client/PendingReadOp.java | 34 +- .../client/ReadLastConfirmedAndEntryOp.java | 30 +- .../TestSequenceReadWriteSetRecycle.java | 382 ++++++++++++++++++ 3 files changed, 432 insertions(+), 14 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 79cfa41bfa4..6771adb8382 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -386,6 +386,12 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // A late error response from a bookie whose request was already superseded by + // a faster (e.g. speculative) read must not flow through the reattempt logic: + // writeSet has been recycled at the moment this entry transitioned to complete. + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = writeSet.indexOf(bookieIndex); @@ -412,15 +418,29 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { + if (isComplete()) { + return false; + } + // Common case: the very first replica responded successfully; no + // speculative retry happened, so there are no slow bookies to mark. + // Skip the snapshot allocation entirely. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + if (numReplicasTried <= 1) { + return super.complete(bookieIndex, host, buffer); + } + // Speculative retry happened: snapshot the addresses of the replicas tried + // before this one BEFORE calling super.complete(), which recycles writeSet + // (see issue #4680). The WriteSet keeps its normal pooled lifecycle. + final BookieId[] slowBookies = new BookieId[numReplicasTried - 1]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(writeSet.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any slow bookies before - // the first successful speculative read as "slow" - for (int i = 0; i < numReplicasTried - 1; i++) { - int slowBookieIndex = writeSet.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + // Mark replicas tried before the first successful response as "slow". + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, eId); } } return completed; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java index 2b7ebaaaf6a..87d2544d40e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOp.java @@ -189,6 +189,12 @@ private synchronized void translateAndSetFirstError(int rc) { * read result code */ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + // Late error response after this request was already completed (e.g. by a + // faster bookie's success): writeSet/orderedEnsemble have been recycled, so + // do not enter logic that would access them. + if (isComplete()) { + return; + } translateAndSetFirstError(rc); if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) { @@ -398,6 +404,9 @@ synchronized BookieId sendNextRead() { @Override synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (isComplete()) { + return; + } super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); int replica = getReplicaIndex(bookieIndex); @@ -422,15 +431,22 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin @Override boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) { + if (isComplete()) { + return false; + } + // Snapshot the slow-bookie addresses BEFORE super.complete() recycles + // orderedEnsemble (see issue #4680). The loop runs for every replica tried + // (including the successful one), so numReplicasTried is at least 1 here. + final int numReplicasTried = getNextReplicaIndexToReadFrom(); + final BookieId[] slowBookies = new BookieId[numReplicasTried]; + for (int i = 0; i < slowBookies.length; i++) { + slowBookies[i] = ensemble.get(orderedEnsemble.get(i)); + } + boolean completed = super.complete(bookieIndex, host, buffer, entryId); if (completed) { - int numReplicasTried = getNextReplicaIndexToReadFrom(); - // Check if any speculative reads were issued and mark any bookies before the - // first speculative read as slow - for (int i = 0; i < numReplicasTried; i++) { - int slowBookieIndex = orderedEnsemble.get(i); - BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); - clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId); + for (BookieId slowBookie : slowBookies) { + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, entryId); } } return completed; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java new file mode 100644 index 00000000000..2ff3801529c --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSequenceReadWriteSetRecycle.java @@ -0,0 +1,382 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; + +/** + * Regression test for issue #4680. + * + *

{@code SequenceReadRequest.complete()} delegates to + * {@code SingleLedgerEntryRequest.complete()}, which recycles {@code writeSet}, + * and then the subclass accesses {@code writeSet.get(i)} to register slow + * bookies. Because {@code WriteSetImpl} is pooled via Netty's {@code Recycler}, + * this is a use-after-recycle: the same instance can be handed to another + * read and resized/repopulated, causing either {@code IndexOutOfBoundsException} + * or, worse, silently registering the wrong bookie as slow. + * + *

We inject a tracking {@link DistributionSchedule.WriteSet} via a custom + * placement policy and assert that no method on the {@code WriteSet} is + * invoked after {@code recycle()} during a sequence read that retries. + */ +public class TestSequenceReadWriteSetRecycle extends BookKeeperClusterTestCase { + + private static final DigestType DIGEST = DigestType.CRC32; + private static final int SPEC_TIMEOUT_MS = 500; + + public TestSequenceReadWriteSetRecycle() { + super(4); // matches the issue scenario: ensemble = 4 + } + + @Test + public void completeMustNotAccessWriteSetAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS) + .setReadTimeout(60); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + // ensemble = 4, writeQuorum = 2, ackQuorum = 2: the scenario from #4680. + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + for (int i = 0; i < 5; i++) { + writer.addEntry(("entry-" + i).getBytes()); + } + long ledgerId = writer.getId(); + writer.close(); + + try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) { + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + + // For entry 1 the round-robin write set is [1, 2]. Sleeping ensemble[1] + // forces SequenceReadRequest to speculatively retry to ensemble[2], + // which exercises the slow-bookie registration loop in complete(). + BookieId firstReplica = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(firstReplica, sleepLatch); + try { + assertTrue("read must succeed via speculative retry", + reader.readEntries(1, 1).hasMoreElements()); + sleepLatch.countDown(); + Thread.sleep(1000); + } finally { + sleepLatch.countDown(); + } + + assertNoViolation(bk); + } + } + } + + /** + * After {@code entry.complete()} succeeds (via a speculative retry) and + * recycles {@code writeSet}, a late error response from the originally + * slow bookie still flows through + * {@code SequenceReadRequest.logErrorAndReattemptRead}, which calls + * {@code writeSet.indexOf(bookieIndex)}, another use-after-recycle. + * + *

Reproduce by giving the slow bookie a read timeout shorter than the + * sleep window so the in-flight request fails after the speculative read + * has already completed the entry. + */ + @Test + public void logErrorAndReattemptReadMustNotAccessWriteSetAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setSpeculativeReadTimeout(SPEC_TIMEOUT_MS) + .setReadEntryTimeout(1); // 1s, so the slow bookie's request times out as an error + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + for (int i = 0; i < 5; i++) { + writer.addEntry(("entry-" + i).getBytes()); + } + long ledgerId = writer.getId(); + writer.close(); + + try (LedgerHandle reader = bk.openLedger(ledgerId, DIGEST, passwd())) { + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + + // Sleep ensemble[1] for longer than readEntryTimeout so its request + // returns a timeout error after the speculative retry has completed. + BookieId slowBookie = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(slowBookie, sleepLatch); + try { + // entry 1: writeSet=[1,2]. Bookie 1 sleeps; spec retries to bookie 2; + // bookie 2 responds OK, then complete() recycles writeSet. + assertTrue("read must succeed via speculative retry", + reader.readEntries(1, 1).hasMoreElements()); + + // Wait long enough for the slow bookie's read to time out and the + // late error response to flow through logErrorAndReattemptRead. + Thread.sleep(2500); + + assertNoViolation(bk); + } finally { + sleepLatch.countDown(); + } + } + } + } + + @Test + public void readLacCompleteMustNotAccessOrderedEnsembleAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setFirstSpeculativeReadLACTimeout(SPEC_TIMEOUT_MS) + .setReadTimeout(60); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + writer.addEntry("entry-0".getBytes()); + long ledgerId = writer.getId(); + + try (LedgerHandle reader = bk.openLedgerNoRecovery(ledgerId, DIGEST, passwd())) { + writer.addEntry("entry-1".getBytes()); + writer.addEntry("entry-2".getBytes()); + + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId firstReplica = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(firstReplica, sleepLatch); + try { + ReadLacResult result = new ReadLacResult(); + reader.asyncReadLastConfirmedAndEntry(1, 60000, false, result, null); + result.await(); + assertTrue("readLAC must succeed via speculative retry", + result.rc == BKException.Code.OK && result.entry != null); + sleepLatch.countDown(); + Thread.sleep(1000); + } finally { + sleepLatch.countDown(); + } + + assertNoViolation(bk); + } finally { + writer.close(); + } + } + } + + @Test + public void readLacLateErrorMustNotAccessOrderedEnsembleAfterRecycle() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()) + .setEnsemblePlacementPolicy(TrackingPlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setFirstSpeculativeReadLACTimeout(SPEC_TIMEOUT_MS) + .setReadEntryTimeout(1); + + try (BookKeeperTestClient bk = + new BookKeeperTestClient(conf, new TestStatsProvider())) { + LedgerHandle writer = bk.createLedger(4, 2, 2, DIGEST, passwd()); + writer.addEntry("entry-0".getBytes()); + long ledgerId = writer.getId(); + + try (LedgerHandle reader = bk.openLedgerNoRecovery(ledgerId, DIGEST, passwd())) { + writer.addEntry("entry-1".getBytes()); + writer.addEntry("entry-2".getBytes()); + + List ensemble = reader.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId slowBookie = ensemble.get(1); + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(slowBookie, sleepLatch); + try { + ReadLacResult result = new ReadLacResult(); + reader.asyncReadLastConfirmedAndEntry(1, 60000, false, result, null); + result.await(); + assertTrue("readLAC must succeed via speculative retry", + result.rc == BKException.Code.OK && result.entry != null); + + Thread.sleep(2500); + + assertNoViolation(bk); + } finally { + sleepLatch.countDown(); + } + } finally { + writer.close(); + } + } + } + + private static void assertNoViolation(BookKeeperTestClient bk) { + TrackingPlacementPolicy policy = (TrackingPlacementPolicy) bk.getPlacementPolicy(); + String violation = policy.violation.get(); + assertNull("WriteSet method '" + violation + "' was invoked after recycle()", violation); + } + + private static byte[] passwd() { + return "pwd".getBytes(StandardCharsets.UTF_8); + } + + static final class ReadLacResult implements AsyncCallback.ReadLastConfirmedAndEntryCallback { + private final CountDownLatch doneLatch = new CountDownLatch(1); + private int rc = BKException.Code.UnexpectedConditionException; + private LedgerEntry entry; + + @Override + public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) { + this.rc = rc; + this.entry = entry; + doneLatch.countDown(); + } + + void await() throws InterruptedException { + assertTrue("readLAC callback did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + } + } + + /** + * Placement policy whose {@code reorderReadSequence} wraps the result with a + * {@link RecycleTrackingWriteSet}. This is the only injection point for a + * custom {@code WriteSet} on the read path that does not require changes to + * production code. + */ + public static class TrackingPlacementPolicy extends RackawareEnsemblePlacementPolicy { + private final AtomicReference violation = new AtomicReference<>(); + + @Override + public DistributionSchedule.WriteSet reorderReadSequence( + List ensemble, + BookiesHealthInfo bookiesHealthInfo, + DistributionSchedule.WriteSet writeSet) { + DistributionSchedule.WriteSet reordered = + super.reorderReadSequence(ensemble, bookiesHealthInfo, writeSet); + return new RecycleTrackingWriteSet(reordered, violation); + } + + @Override + public DistributionSchedule.WriteSet reorderReadLACSequence( + List ensemble, + BookiesHealthInfo bookiesHealthInfo, + DistributionSchedule.WriteSet writeSet) { + DistributionSchedule.WriteSet reordered = + super.reorderReadLACSequence(ensemble, bookiesHealthInfo, writeSet); + return new RecycleTrackingWriteSet(reordered, violation); + } + } + + /** + * Wrapper that records the first method invoked after {@code recycle()}. + * The wrapper deliberately does not propagate {@code recycle()} + * to the delegate: keeping the delegate out of the recycler pool means + * any post-recycle calls still observe consistent data, so a caller bug + * shows up as a recorded violation rather than a flaky exception. + */ + static final class RecycleTrackingWriteSet implements DistributionSchedule.WriteSet { + private final DistributionSchedule.WriteSet delegate; + private final AtomicReference violation; + private volatile boolean recycled; + + RecycleTrackingWriteSet(DistributionSchedule.WriteSet delegate, AtomicReference violation) { + this.delegate = delegate; + this.violation = violation; + } + + private void check(String op) { + if (recycled) { + violation.compareAndSet(null, op); + } + } + + @Override + public int size() { + check("size"); + return delegate.size(); + } + + @Override + public boolean contains(int i) { + check("contains"); + return delegate.contains(i); + } + + @Override + public int get(int i) { + check("get(" + i + ")"); + return delegate.get(i); + } + + @Override + public int set(int i, int idx) { + check("set"); + return delegate.set(i, idx); + } + + @Override + public int indexOf(int idx) { + check("indexOf"); + return delegate.indexOf(idx); + } + + @Override + public void sort() { + check("sort"); + delegate.sort(); + } + + @Override + public void addMissingIndices(int max) { + check("addMissingIndices"); + delegate.addMissingIndices(max); + } + + @Override + public void moveAndShift(int from, int to) { + check("moveAndShift"); + delegate.moveAndShift(from, to); + } + + @Override + public DistributionSchedule.WriteSet copy() { + check("copy"); + return delegate.copy(); + } + + @Override public void recycle() { + recycled = true; + // intentionally not delegating recycle(); see class javadoc. + } + } +}