Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,25 @@ default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, R
*/
long getNumberOfEntriesInBacklog(boolean isPrecise);

/**
* Return whether this cursor has non-deleted messages in backlog.
*
* @return true if there is at least one entry in backlog
*/
default boolean hasBacklog() {
return hasBacklog(true);
}

/**
* Return whether this cursor has non-deleted messages in backlog.
*
* @param isPrecise set to true to get a precise backlog check
* @return true if there is at least one entry in backlog
*/
default boolean hasBacklog(boolean isPrecise) {
return getNumberOfEntriesInBacklog(isPrecise) > 0;
}

/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
* trigger a ledger deletion, if all the other cursors are done too with the underlying ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,6 +1391,50 @@ public long getNumberOfEntriesInBacklog(boolean isPrecise) {
return backlog;
}

@Override
public boolean hasBacklog() {
Position markDeletePosition = this.markDeletePosition;
Position lastPosition = ledger.getLastPosition();
if (markDeletePosition == null || markDeletePosition.compareTo(lastPosition) >= 0) {
return false;
}

Position nextPosition = ledger.getNextValidPosition(markDeletePosition);
if (nextPosition.compareTo(lastPosition) > 0) {
return false;
}

lock.readLock().lock();
try {
while (nextPosition.compareTo(lastPosition) <= 0) {
Range<Position> deletedRange = individualDeletedMessages.rangeContaining(
nextPosition.getLedgerId(), nextPosition.getEntryId());
if (deletedRange == null) {
return true;
}

Position upperEndpoint = deletedRange.upperEndpoint();
if (upperEndpoint.compareTo(lastPosition) >= 0) {
return false;
}
nextPosition = ledger.getNextValidPosition(upperEndpoint);
}
return false;
} finally {
lock.readLock().unlock();
}
}

@Override
public boolean hasBacklog(boolean isPrecise) {
if (isPrecise) {
return hasBacklog();
}

long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
return backlog >= 0 ? backlog > 0 : hasBacklog();
}

public long getNumberOfEntriesInStorage() {
return ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -2339,6 +2340,282 @@ void deleteSingleMessageTwice(boolean useOpenRangeSet) throws Exception {
assertEquals(c1.getReadPosition(), p4.getNext());
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogAccountsForIndividuallyDeletedMessages(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(2));

ManagedCursor c1 = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));

assertTrue(c1.hasBacklog());
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p1);
c1.delete(p3);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p2);
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogReturnsFalseForEmptyAndCaughtUpCursor(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_empty_caught_up_" + useOpenRangeSet,
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursor c1 = ledger.openCursor("c1");

assertHasBacklogMatchesBacklogCounts(c1);

Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
assertHasBacklogMatchesBacklogCounts(c1);

c1.markDelete(p1);
assertHasBacklogMatchesBacklogCounts(c1);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogTracksEntriesAcrossLedgerRollovers(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_across_ledgers_" + useOpenRangeSet,
new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(1));

ManagedCursor c1 = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));

c1.markDelete(p1);
c1.delete(p3);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p2);
assertHasBacklogMatchesBacklogCounts(c1);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogAccountsForPartiallyDeletedBatchEntries(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_partial_batch_" + useOpenRangeSet,
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursor c1 = ledger.openCursor("c1");
Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));

deleteBatchIndex(c1, p1, 10, Lists.newArrayList(new IntRange().setStart(0).setEnd(8)));
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 1);

deleteBatchIndex(c1, p1, 10, Lists.newArrayList(new IntRange().setStart(9).setEnd(9)));
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogSkipsDeletedRangesThroughLastPosition(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_deleted_range_through_last_" + useOpenRangeSet,
new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(10));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));

c1.markDelete(p1);
// Exercise the hasBacklog() branch that skips a deleted range through the last position.
c1.getIndividuallyDeletedMessagesSet().addOpenClosed(
p1.getLedgerId(), p1.getEntryId(), p3.getLedgerId(), p3.getEntryId());

assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
assertFalse(c1.hasBacklog());
assertFalse(c1.hasBacklog(true));
assertTrue(c1.getIndividuallyDeletedMessagesSet().contains(p2.getLedgerId(), p2.getEntryId()));
assertTrue(c1.getIndividuallyDeletedMessagesSet().contains(p3.getLedgerId(), p3.getEntryId()));
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogFalseFallsBackToPreciseBacklogWhenCounterIsNegative(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_negative_counter_" + useOpenRangeSet,
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));

c1.messagesConsumedCounter = ((ManagedLedgerImpl) ledger).getEntriesAddedCounter() + 1;
assertEquals(c1.getNumberOfEntriesInBacklog(false), c1.getNumberOfEntriesInBacklog(true));
assertTrue(c1.hasBacklog(false));
assertHasBacklogMatchesPreciseBacklogCount(c1);

c1.markDelete(p2);
c1.messagesConsumedCounter = ((ManagedLedgerImpl) ledger).getEntriesAddedCounter() + 1;
assertEquals(c1.getNumberOfEntriesInBacklog(false), c1.getNumberOfEntriesInBacklog(true));
assertFalse(c1.hasBacklog(false));
assertHasBacklogMatchesPreciseBacklogCount(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogReturnsFalseWhenMarkDeletePositionIsNotInitialized(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_uninitialized_mark_delete_" + useOpenRangeSet,
new ManagedLedgerConfig().setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet));

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor("c1");
ledger.addEntry("entry-1".getBytes(Encoding));

c1.markDeletePosition = null;
assertFalse(c1.hasBacklog());
assertFalse(c1.hasBacklog(true));
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogMatchesBacklogCountsThroughMixedAcknowledgements(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_mixed_acks_" + useOpenRangeSet,
new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(2));

ManagedCursor c1 = ledger.openCursor("c1");
assertHasBacklogMatchesBacklogCounts(c1);

Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));
Position p5 = ledger.addEntry("entry-5".getBytes(Encoding));
Position p6 = ledger.addEntry("entry-6".getBytes(Encoding));
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p2);
c1.delete(p4);
c1.delete(p6);
assertHasBacklogMatchesBacklogCounts(c1);

c1.markDelete(p1);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p5);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p3);
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogMatchesBacklogCountsAfterResetCursor(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("has_backlog_reset_" + useOpenRangeSet,
new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(2));

ManagedCursor c1 = ledger.openCursor("c1");
Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));

c1.markDelete(p4);
assertHasBacklogMatchesBacklogCounts(c1);

c1.resetCursor(p2);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p3);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p2);
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p4);
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);

c1.resetCursor(p1);
assertHasBacklogMatchesBacklogCounts(c1);
}

@Test(timeOut = 20000, dataProvider = "useOpenRangeSet")
void hasBacklogMatchesBacklogCountsAfterCursorRecovery(boolean useOpenRangeSet) throws Exception {
String ledgerName = "has_backlog_recovery_" + useOpenRangeSet;
ManagedLedgerConfig config = new ManagedLedgerConfig()
.setUnackedRangesOpenCacheSetEnabled(useOpenRangeSet)
.setMaxEntriesPerLedger(2);
ManagedLedger ledger = factory.open(ledgerName, config);
ManagedCursor c1 = ledger.openCursor("c1");

Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));

c1.delete(p1);
c1.delete(p3);
assertHasBacklogMatchesBacklogCounts(c1);

ledger.close();
ledger = factory.open(ledgerName, config);
c1 = ledger.openCursor("c1");
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p2);
assertHasBacklogMatchesBacklogCounts(c1);

ledger.close();
ledger = factory.open(ledgerName, config);
c1 = ledger.openCursor("c1");
assertHasBacklogMatchesBacklogCounts(c1);

c1.delete(p4);
assertHasBacklogMatchesBacklogCounts(c1);
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}

private static void assertHasBacklogMatchesBacklogCounts(ManagedCursor cursor) {
long preciseBacklog = cursor.getNumberOfEntriesInBacklog(true);
boolean hasPreciseBacklog = preciseBacklog > 0;
assertEquals(cursor.hasBacklog(), hasPreciseBacklog,
"hasBacklog() must match getNumberOfEntriesInBacklog(true) > 0");
assertEquals(cursor.hasBacklog(true), hasPreciseBacklog,
"hasBacklog(true) must match getNumberOfEntriesInBacklog(true) > 0");

long impreciseBacklog = cursor.getNumberOfEntriesInBacklog(false);
assertEquals(cursor.hasBacklog(false), impreciseBacklog > 0,
"hasBacklog(false) must match getNumberOfEntriesInBacklog(false) > 0");
}

private static void assertHasBacklogMatchesPreciseBacklogCount(ManagedCursor cursor) {
boolean hasPreciseBacklog = cursor.getNumberOfEntriesInBacklog(true) > 0;
assertEquals(cursor.hasBacklog(), hasPreciseBacklog,
"hasBacklog() must match getNumberOfEntriesInBacklog(true) > 0");
assertEquals(cursor.hasBacklog(true), hasPreciseBacklog,
"hasBacklog(true) must match getNumberOfEntriesInBacklog(true) > 0");
}

@Test
void hasBacklogDefaultMethodsDelegateToBacklogCount() {
ManagedCursor cursor = mock(ManagedCursor.class);
doCallRealMethod().when(cursor).hasBacklog();
doCallRealMethod().when(cursor).hasBacklog(true);
doCallRealMethod().when(cursor).hasBacklog(false);

when(cursor.getNumberOfEntriesInBacklog(true)).thenReturn(1L);
when(cursor.getNumberOfEntriesInBacklog(false)).thenReturn(0L);

assertTrue(cursor.hasBacklog());
assertFalse(cursor.hasBacklog(false));
}

@Test(timeOut = 10000, dataProvider = "useOpenRangeSet")
void testReadEntriesOrWait(boolean useOpenRangeSet) throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ default Optional<DispatchRateLimiter> getRateLimiter() {

long getNumberOfEntriesInBacklog();

default boolean hasBacklog() {
return getNumberOfEntriesInBacklog() > 0;
}

boolean isTerminated();

ReplicatorStatsImpl getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions, AckTyp

long getNumberOfEntriesInBacklog(boolean getPreciseBacklog);

default boolean hasBacklog(boolean getPreciseBacklog) {
return getNumberOfEntriesInBacklog(getPreciseBacklog) > 0;
}

default long getNumberOfEntriesDelayed() {
return 0;
}
Expand Down
Loading