Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2415,6 +2415,238 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) {
ASSERT_OK(DeprecatedAddFile({file_path}));
}

TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) {
// These waits only prevent broken test paths from hanging forever. Keep them
// large enough to avoid treating slow CI scheduling as a regression.
constexpr uint64_t kHangGuardMicros = 30 * 1000 * 1000;
constexpr uint64_t kSetupWaitMicros = kHangGuardMicros;
constexpr uint64_t kIngestWaitMicros = kHangGuardMicros;
constexpr uint64_t kPollMicros = 1000;

Options options = CurrentOptions();
options.atomic_flush = false;
options.disable_auto_compactions = true;
options.write_buffer_size = 1024;
options.max_write_buffer_number = 8;
options.min_write_buffer_number_to_merge = 2;
env_->SetBackgroundThreads(1, Env::HIGH);
Reopen(options);

std::string external_file_path;
std::vector<std::pair<std::string, std::string>> external_file_data = {
{"ingest_key", "ingested_value"}};
ASSERT_OK(GenerateOneExternalFile(options, nullptr, external_file_data, -1,
true /* sort_data */, &external_file_path,
nullptr /* true_data */));

auto sleeping_task = std::make_shared<test::SleepingBackgroundTask>();
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::HIGH);
sleeping_task->WaitUntilSleeping();

std::atomic<int> atomic_flush_scheduled{0};
std::atomic<int> get_live_files_done{0};
std::atomic<bool> ingest_waiting_for_flush{false};
std::atomic<bool> ingest_done{false};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void*) {
atomic_flush_scheduled.fetch_add(1, std::memory_order_acq_rel);
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushMemTable:BeforeWaitForBgFlush", [&](void*) {
ingest_waiting_for_flush.store(true, std::memory_order_release);
});
SyncPoint::GetInstance()->EnableProcessing();

auto wait_until = [&](const std::function<bool()>& predicate,
uint64_t timeout_micros) {
for (uint64_t waited_micros = 0; waited_micros < timeout_micros;
waited_micros += kPollMicros) {
if (predicate()) {
return true;
}
env_->SleepForMicroseconds(kPollMicros);
}
return predicate();
};

auto wake_sleeping_task = [&] {
if (!sleeping_task->WokenUp()) {
sleeping_task->WakeUp();
}
return !sleeping_task->TimedWaitUntilDone(kSetupWaitMicros);
};

ASSERT_OK(Put("atomic_flush_key", "value"));

Status get_live_files_status[2];
std::vector<LiveFileStorageInfo> live_files[2];
port::Thread get_live_files_thread_1([&] {
LiveFilesStorageInfoOptions lfsi_opts;
lfsi_opts.wal_size_for_flush = 0;
lfsi_opts.atomic_flush = true;
get_live_files_status[0] =
db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[0]);
get_live_files_done.fetch_add(1, std::memory_order_acq_rel);
});
port::Thread get_live_files_thread_2([&] {
LiveFilesStorageInfoOptions lfsi_opts;
lfsi_opts.wal_size_for_flush = 0;
lfsi_opts.atomic_flush = true;
get_live_files_status[1] =
db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[1]);
get_live_files_done.fetch_add(1, std::memory_order_acq_rel);
});

const bool saw_both_atomic_flushes = wait_until(
[&] {
return atomic_flush_scheduled.load(std::memory_order_acquire) >= 2;
},
kSetupWaitMicros);

if (!saw_both_atomic_flushes) {
const bool sleeping_task_done = wake_sleeping_task();
if (!wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros)) {
dbfull()->CancelAllBackgroundWork(false /* wait */);
wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros);
if (get_live_files_done.load(std::memory_order_acquire) != 2) {
dbfull()->CancelAllBackgroundWork(true /* wait */);
wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros);
}
}
get_live_files_thread_1.join();
get_live_files_thread_2.join();
EXPECT_OK(get_live_files_status[0]);
EXPECT_OK(get_live_files_status[1]);
ASSERT_TRUE(saw_both_atomic_flushes)
<< "Timed out waiting for both GetLiveFilesStorageInfo calls to "
"schedule their flushes. scheduled="
<< atomic_flush_scheduled.load(std::memory_order_acquire)
<< " get_live_files_done="
<< get_live_files_done.load(std::memory_order_acquire)
<< " sleeping_task_done=" << sleeping_task_done;
}

ASSERT_OK(Put("ingest_key", "memtable_value"));

Status ingest_status;
port::Thread ingest_thread([&] {
IngestExternalFileOptions ifo;
ingest_status = db_->IngestExternalFile({external_file_path}, ifo);
ingest_done.store(true, std::memory_order_release);
});

const bool saw_ingest_wait = wait_until(
[&] {
return ingest_waiting_for_flush.load(std::memory_order_acquire) ||
ingest_done.load(std::memory_order_acquire);
},
kSetupWaitMicros);
const bool ingest_finished_before_wait =
ingest_done.load(std::memory_order_acquire);

if (!saw_ingest_wait || ingest_finished_before_wait) {
const bool sleeping_task_done = wake_sleeping_task();
if (!wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros)) {
dbfull()->CancelAllBackgroundWork(false /* wait */);
wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros);
if (get_live_files_done.load(std::memory_order_acquire) != 2) {
dbfull()->CancelAllBackgroundWork(true /* wait */);
wait_until(
[&] {
return get_live_files_done.load(std::memory_order_acquire) == 2;
},
kSetupWaitMicros);
}
}
get_live_files_thread_1.join();
get_live_files_thread_2.join();
if (!wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kSetupWaitMicros)) {
dbfull()->CancelAllBackgroundWork(false /* wait */);
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kSetupWaitMicros);
if (!ingest_done.load(std::memory_order_acquire)) {
dbfull()->CancelAllBackgroundWork(true /* wait */);
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kSetupWaitMicros);
}
}
ingest_thread.join();
EXPECT_OK(get_live_files_status[0]);
EXPECT_OK(get_live_files_status[1]);
EXPECT_OK(ingest_status);
ASSERT_TRUE(sleeping_task_done);
ASSERT_TRUE(saw_ingest_wait)
<< "Timed out waiting for IngestExternalFile to enter its flush wait";
ASSERT_FALSE(ingest_finished_before_wait)
<< "IngestExternalFile finished before entering the expected flush "
"wait";
}

const bool sleeping_task_done = wake_sleeping_task();
get_live_files_thread_1.join();
get_live_files_thread_2.join();

const bool ingest_completed_without_unblock =
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kIngestWaitMicros);

if (!ingest_completed_without_unblock) {
FlushOptions unblock_flush_options;
unblock_flush_options.wait = false;
const Status unblock_flush_status = db_->Flush(unblock_flush_options);
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kIngestWaitMicros);
if (!ingest_done.load(std::memory_order_acquire)) {
dbfull()->CancelAllBackgroundWork(false /* wait */);
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kIngestWaitMicros);
if (!ingest_done.load(std::memory_order_acquire)) {
dbfull()->CancelAllBackgroundWork(true /* wait */);
wait_until([&] { return ingest_done.load(std::memory_order_acquire); },
kIngestWaitMicros);
}
}
ingest_thread.join();
ASSERT_TRUE(sleeping_task_done);
ASSERT_OK(get_live_files_status[0]);
ASSERT_OK(get_live_files_status[1]);
ASSERT_OK(unblock_flush_status);
FAIL() << "IngestExternalFile remained blocked waiting for a flush that "
"was deduped behind an older queued flush request";
}

ingest_thread.join();
ASSERT_TRUE(sleeping_task_done);
ASSERT_OK(get_live_files_status[0]);
ASSERT_OK(get_live_files_status[1]);
ASSERT_OK(ingest_status);
ASSERT_EQ("ingested_value", Get("ingest_key"));
}

TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
Expand Down
11 changes: 6 additions & 5 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,6 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_;
bool atomic_flush = false;

// Note: every time MemTableList::Add(mem) is called, it adds the new mem
// at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by
Expand All @@ -509,9 +508,6 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
auto it = memlist.rbegin();
for (; it != memlist.rend(); ++it) {
ReadOnlyMemTable* m = *it;
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
atomic_flush = true;
}
if (m->GetID() > max_memtable_id) {
break;
}
Expand Down Expand Up @@ -542,7 +538,12 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
// since they map to the same WAL and have the same NextLogNumber().
assert(strcmp((*it)->Name(), "WBWIMemTable") != 0);
}
if (!atomic_flush || num_flush_not_started_ == 0) {
// The flush request is complete only after every unstarted immutable
// memtable has been picked. A request can be older than a newer immutable
// memtable; clearing flush_requested_ after a partial or empty pick would
// make the queued newer request look unnecessary and leave that memtable
// unflushed.
if (num_flush_not_started_ == 0) {
flush_requested_ = false; // start-flush request is complete
}
}
Expand Down
10 changes: 5 additions & 5 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,11 @@ class MemTableList {
// Returns an estimate of the timestamp of the earliest key.
uint64_t ApproximateOldestKeyTime() const;

// Request a flush of all existing memtables to storage. This will
// cause future calls to IsFlushPending() to return true if this list is
// non-empty (regardless of the min_write_buffer_number_to_merge
// parameter). This flush request will persist until the next time
// PickMemtablesToFlush() is called.
// Request a flush of all existing memtables to storage. This will cause
// future calls to IsFlushPending() to return true if this list is non-empty
// (regardless of the min_write_buffer_number_to_merge parameter). This flush
// request will persist until PickMemtablesToFlush() has picked all unstarted
// memtables.
void FlushRequested() {
flush_requested_ = true;
// If there are some memtables stored in imm() that don't trigger
Expand Down
50 changes: 48 additions & 2 deletions db/memtable_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
ASSERT_TRUE(to_flush5.empty());
ASSERT_EQ(1, list.NumNotFlushed());
ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.HasFlushRequested());
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.HasFlushRequested());

// Pick tables to flush. The tables to pick must have ID smaller than or
// equal to 5. Therefore, only tables[5] will be selected.
Expand Down Expand Up @@ -920,6 +920,52 @@ TEST_F(MemTableListTest, FlushPendingTest) {
to_delete.clear();
}

TEST_F(MemTableListTest, FlushRequestPersistsAfterPartialPick) {
SequenceNumber seq = 1;

auto factory = std::make_shared<SkipListFactory>();
options.memtable_factory = factory;
ImmutableOptions ioptions(options);
InternalKeyComparator cmp(BytewiseComparator());
WriteBufferManager wb(options.db_write_buffer_size);
MutableCFOptions mutable_cf_options(options);
autovector<ReadOnlyMemTable*> to_delete;

MemTableList list(10 /* min_write_buffer_number_to_merge */,
0 /* max_write_buffer_size_to_maintain */);

for (uint64_t memtable_id = 0; memtable_id < 2; ++memtable_id) {
MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
kMaxSequenceNumber, 0 /* column_family_id */);
mem->SetID(memtable_id);
mem->Ref();
ASSERT_OK(mem->Add(++seq, kTypeValue, "key" + std::to_string(memtable_id),
"value", nullptr /* kv_prot_info */));
list.Add(mem, &to_delete);
}

list.FlushRequested();
ASSERT_TRUE(list.IsFlushPending());

autovector<ReadOnlyMemTable*> to_flush;
list.PickMemtablesToFlush(0 /* max_memtable_id */, &to_flush);
ASSERT_EQ(1, to_flush.size());
ASSERT_TRUE(list.IsFlushPending());
ASSERT_TRUE(list.HasFlushRequested());

autovector<ReadOnlyMemTable*> to_flush2;
list.PickMemtablesToFlush(1 /* max_memtable_id */, &to_flush2);
ASSERT_EQ(1, to_flush2.size());
ASSERT_FALSE(list.IsFlushPending());
ASSERT_FALSE(list.HasFlushRequested());

list.current()->Unref(&to_delete);
ASSERT_EQ(2, to_delete.size());
for (ReadOnlyMemTable* m : to_delete) {
delete m;
}
}

TEST_F(MemTableListTest, EmptyAtomicFlushTest) {
autovector<MemTableList*> lists;
autovector<uint32_t> cf_ids;
Expand Down
Loading
Loading