Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion db/db_log_iter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class DBTestXactLogIterator : public DBTestBase {
std::unique_ptr<TransactionLogIterator> iter;
Status status = dbfull()->GetUpdatesSince(seq, &iter);
EXPECT_OK(status);
EXPECT_TRUE(iter->Valid());
return iter;
}
};
Expand Down Expand Up @@ -210,6 +209,72 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorStallAtLastRecord) {
} while (ChangeCompactOptions());
}

TEST_F(DBTestXactLogIterator, TransactionLogIteratorPollNoWALFiles) {
do {
// read the WAL on an empty DB: no records
auto iter = OpenTransactionLogIter(0);
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());

// poll for WAL records when there is no WAL: used to crash now returns
// TryAgain
iter->Next();
ASSERT_EQ(Status::kTryAgain, iter->status().code());
ASSERT_FALSE(iter->Valid());
iter->Next();
ASSERT_EQ(Status::kTryAgain, iter->status().code());
ASSERT_FALSE(iter->Valid());

// put a record: iterator still returns TryAgain
ASSERT_OK(Put("key1", DummyString(1024)));
iter->Next();
ASSERT_EQ(Status::kTryAgain, iter->status().code());
ASSERT_FALSE(iter->Valid());

// open the iterator: positioned at batch with key1
iter = OpenTransactionLogIter(0);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
BatchResult batch = iter->GetBatch();
ASSERT_EQ(1, batch.sequence);
ASSERT_EQ(1, batch.writeBatchPtr->Count());

// flush the memtable and WaitForCompact to wait for the WAL to be removed
Status status = dbfull()->Flush(FlushOptions());
ASSERT_OK(status);
WaitForCompactOptions wait_options;
wait_options.wait_for_purge = true;
status = dbfull()->WaitForCompact(wait_options);
ASSERT_OK(status);

// open a new iterator: no WAL records
auto iter2 = OpenTransactionLogIter(0);
ASSERT_OK(iter2->status());
ASSERT_FALSE(iter2->Valid());

// put another key: creates a new WAL
ASSERT_OK(Put("key2", DummyString(1024)));

// iter2 was created with no WAL files: Next returns TryAgain
iter2->Next();
ASSERT_EQ(Status::kTryAgain, iter2->status().code());
ASSERT_FALSE(iter2->Valid());

// iter: Next also returns TryAgain because there are new wal files
iter->Next();
ASSERT_EQ(Status::kTryAgain, iter->status().code());
ASSERT_FALSE(iter->Valid());

// reopening the iterator returns the second write
iter = OpenTransactionLogIter(0);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
batch = iter->GetBatch();
ASSERT_EQ(2, batch.sequence);
ASSERT_EQ(1, batch.writeBatchPtr->Count());
} while (ChangeCompactOptions());
}

TEST_F(DBTestXactLogIterator, TransactionLogIteratorCheckAfterRestart) {
do {
Options options = OptionsForLogIterTest();
Expand Down
17 changes: 15 additions & 2 deletions db/transaction_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,24 @@ void TransactionLogIteratorImpl::Next() {
return NextImpl(false);
}

// Returns a Status with code TryAgain and a message to re-create the iterator.
static Status TryAgainStatus() {
static const char* const msg = "Create a new iterator to fetch the new tail.";
return Status::TryAgain(msg);
}

void TransactionLogIteratorImpl::NextImpl(bool internal) {
Slice record;
is_valid_ = false;
if (!internal && !started_) {
// if the iterator has no files and we are polling for WAL updates: we can't
// tell if the state has changed. Return TryAgain.
if (files_->empty()) {
current_status_ = TryAgainStatus();
assert(!Valid());
return;
}

// Runs every time until we can seek to the start sequence
SeekToStartSequence();
}
Expand Down Expand Up @@ -217,8 +231,7 @@ void TransactionLogIteratorImpl::NextImpl(bool internal) {
if (current_last_seq_ == versions_->LastSequence()) {
current_status_ = Status::OK();
} else {
const char* msg = "Create a new iterator to fetch the new tail.";
current_status_ = Status::TryAgain(msg);
current_status_ = TryAgainStatus();
}
return;
}
Expand Down
4 changes: 3 additions & 1 deletion include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,9 @@ class DB {
// write-batch exists, then iter is positioned at the next write-batch whose
// start_seq > seq_number.
//
// Returns Status::OK if iterator is valid
// Returns Status::OK if iterator is valid. See the comment on
// TransactionLogIterator for how to use the iterator.
//
// Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to
// use this api, else the WAL files will get
// cleared aggressively and the iterator might keep getting invalid before
Expand Down
28 changes: 20 additions & 8 deletions include/rocksdb/transaction_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,41 @@ struct BatchResult {
}
};

// A TransactionLogIterator is used to iterate over the transactions in a db.
// TransactionLogIterator iterates over WriteBatches in the write-ahead log.
// One run of the iterator is continuous, i.e. the iterator will stop at the
// beginning of any gap in sequences
// beginning of any gap in sequences. It can also be efficiently polled for
// new WAL records. To use it:
//
// 1. Create the iterator.
// 2. While Valid() == true: call GetBatch(), then call Next() to advance.
// 3. If Valid() is false: check status(). If status() is OK: the iterator
// can be polled for new WAL records by calling Next() then Valid().
//
// The iterator will return Valid() false and status() TryAgain in two cases:
// 1. A new WAL file was added after the iterator was created.
// 2. The iterator was created when there were no WAL files and Next() was
// called.
class TransactionLogIterator {
public:
TransactionLogIterator() {}
virtual ~TransactionLogIterator() {}

// An iterator is either positioned at a WriteBatch or not valid.
// This method returns true if the iterator is valid.
// Can read data from a valid iterator.
// This method returns true if the iterator is valid. If it is not valid,
// call status() to check for errors.
virtual bool Valid() = 0;

// Moves the iterator to the next WriteBatch.
// REQUIRES: Valid() to be true.
// Moves the iterator to the next WriteBatch. If the iterator is at the end,
// and status() == OK, this can be used to poll for updates.
virtual void Next() = 0;

// Returns ok if the iterator is valid.
// Returns the Error when something has gone wrong.
virtual Status status() = 0;

// If valid return's the current write_batch and the sequence number of the
// earliest transaction contained in the batch.
// If valid: returns the current write_batch and the sequence number of the
// earliest transaction contained in the batch. This can only be called once
// since it moves the WriteBatch.
// ONLY use if Valid() is true and status() is OK.
virtual BatchResult GetBatch() = 0;

Expand Down
Loading