Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,12 @@ synchronized BookieId sendNextRead() {

@Override
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
// A late error response from a bookie whose request was already superseded by
// a faster (e.g. speculative) read must not flow through the reattempt logic:
// writeSet has been recycled at the moment this entry transitioned to complete.
if (isComplete()) {
return;
}
super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);

int replica = writeSet.indexOf(bookieIndex);
Expand All @@ -412,15 +418,29 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin

@Override
boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) {
if (isComplete()) {
return false;
}
// Common case: the very first replica responded successfully; no
// speculative retry happened, so there are no slow bookies to mark.
// Skip the snapshot allocation entirely.
final int numReplicasTried = getNextReplicaIndexToReadFrom();
if (numReplicasTried <= 1) {
return super.complete(bookieIndex, host, buffer);
}
// Speculative retry happened: snapshot the addresses of the replicas tried
// before this one BEFORE calling super.complete(), which recycles writeSet
// (see issue #4680). The WriteSet keeps its normal pooled lifecycle.
final BookieId[] slowBookies = new BookieId[numReplicasTried - 1];
for (int i = 0; i < slowBookies.length; i++) {
slowBookies[i] = ensemble.get(writeSet.get(i));
}

boolean completed = super.complete(bookieIndex, host, buffer);
if (completed) {
int numReplicasTried = getNextReplicaIndexToReadFrom();
// Check if any speculative reads were issued and mark any slow bookies before
// the first successful speculative read as "slow"
for (int i = 0; i < numReplicasTried - 1; i++) {
int slowBookieIndex = writeSet.get(i);
BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex);
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId);
// Mark replicas tried before the first successful response as "slow".
for (BookieId slowBookie : slowBookies) {
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, eId);
}
}
return completed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ private synchronized void translateAndSetFirstError(int rc) {
* read result code
*/
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
// Late error response after this request was already completed (e.g. by a
// faster bookie's success): writeSet/orderedEnsemble have been recycled, so
// do not enter logic that would access them.
if (isComplete()) {
return;
}
translateAndSetFirstError(rc);

if (BKException.Code.NoSuchEntryException == rc || BKException.Code.NoSuchLedgerExistsException == rc) {
Expand Down Expand Up @@ -398,6 +404,9 @@ synchronized BookieId sendNextRead() {

@Override
synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) {
if (isComplete()) {
return;
}
super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc);

int replica = getReplicaIndex(bookieIndex);
Expand All @@ -422,15 +431,22 @@ synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, Strin

@Override
boolean complete(int bookieIndex, BookieId host, ByteBuf buffer, long entryId) {
if (isComplete()) {
return false;
}
// Snapshot the slow-bookie addresses BEFORE super.complete() recycles
// orderedEnsemble (see issue #4680). The loop runs for every replica tried
// (including the successful one), so numReplicasTried is at least 1 here.
final int numReplicasTried = getNextReplicaIndexToReadFrom();
final BookieId[] slowBookies = new BookieId[numReplicasTried];
for (int i = 0; i < slowBookies.length; i++) {
slowBookies[i] = ensemble.get(orderedEnsemble.get(i));
}

boolean completed = super.complete(bookieIndex, host, buffer, entryId);
if (completed) {
int numReplicasTried = getNextReplicaIndexToReadFrom();
// Check if any speculative reads were issued and mark any bookies before the
// first speculative read as slow
for (int i = 0; i < numReplicasTried; i++) {
int slowBookieIndex = orderedEnsemble.get(i);
BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex);
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, entryId);
for (BookieId slowBookie : slowBookies) {
clientCtx.getPlacementPolicy().registerSlowBookie(slowBookie, entryId);
}
}
return completed;
Expand Down
Loading