From 29b4921d614a6766b5dbec57648f025fb5319a58 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Sat, 6 Jun 2026 05:59:15 -0700 Subject: [PATCH 1/5] Add db_stress liveness watchdog --- db_stress_tool/db_stress_common.cc | 254 ++++++++++++++++++++ db_stress_tool/db_stress_common.h | 2 + db_stress_tool/db_stress_driver.cc | 14 +- db_stress_tool/db_stress_gflags.cc | 10 + db_stress_tool/db_stress_listener.h | 4 + db_stress_tool/db_stress_shared_state.cc | 24 +- db_stress_tool/db_stress_shared_state.h | 294 ++++++++++++++++++++++- db_stress_tool/db_stress_test_base.cc | 166 ++++++++++++- db_stress_tool/db_stress_test_base.h | 2 + tools/db_crashtest.py | 136 ++++++++++- tools/db_crashtest_test.py | 159 ++++++++++++ 11 files changed, 1039 insertions(+), 26 deletions(-) diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index dfe533f48e3e..4c3818da6bee 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -11,7 +11,10 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#include +#include #include +#include #include "db_stress_tool/db_stress_test_base.h" #include "file/file_util.h" @@ -90,6 +93,257 @@ int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) { return rand_local.Next() % max_key; } +namespace { + +constexpr uint64_t kLongActiveOperationTimeoutMultiplier = 4; + +using UIntMicros = std::chrono::duration; +using UIntSeconds = std::chrono::duration; + +uint64_t SecondsToMicrosSaturated(uint64_t seconds) { + const UIntSeconds max_seconds = + std::chrono::duration_cast(UIntMicros::max()); + if (seconds > max_seconds.count()) { + return UIntMicros::max().count(); + } + return std::chrono::duration_cast(UIntSeconds(seconds)).count(); +} + +int LivenessSleepMicros() { + const std::chrono::microseconds max_sleep_micros( + std::numeric_limits::max()); + const uint64_t max_sleep_seconds = + std::chrono::duration_cast(max_sleep_micros) + .count(); + const std::chrono::seconds sleep_seconds( + std::min(FLAGS_liveness_check_interval_sec, max_sleep_seconds)); + return static_cast( + std::chrono::duration_cast(sleep_seconds) + .count()); +} + +bool StopBgThreadIfNeeded(SharedState* shared) { + port::Mutex* mu = shared->GetMutex(); + // The liveness watchdog must not block on the shared harness mutex; a stuck + // holder is one of the failure modes it needs to report. + if (!mu->TryLock()) { + return false; + } + + if (!shared->ShouldStopBgThread()) { + mu->Unlock(); + return false; + } + + shared->IncBgThreadsFinished(); + if (shared->BgThreadsFinished()) { + shared->GetCondVar()->SignalAll(); + } + mu->Unlock(); + return true; +} + +uint64_t ElapsedMicros(uint64_t now, uint64_t start) { + return now >= start ? now - start : 0; +} + +uint64_t SaturatingMultiply(uint64_t value, uint64_t multiplier) { + if (multiplier != 0 && + value > std::numeric_limits::max() / multiplier) { + return std::numeric_limits::max(); + } + return value * multiplier; +} + +uint64_t ActiveOperationTimeoutMicros(StressOperationType type, + uint64_t no_progress_timeout_micros) { + switch (type) { + case StressOperationType::kNone: + case StressOperationType::kCount: + return 0; + case StressOperationType::kCompactFiles: + case StressOperationType::kCompactRange: + case StressOperationType::kFlush: + case StressOperationType::kVerifyDb: + case StressOperationType::kVerifyChecksum: + case StressOperationType::kVerifyFileChecksums: + case StressOperationType::kIngestExternalFile: + case StressOperationType::kBackup: + case StressOperationType::kCheckpoint: + case StressOperationType::kApproximateSize: + case StressOperationType::kPauseBackground: + case StressOperationType::kGetLiveFiles: + return SaturatingMultiply(no_progress_timeout_micros, + kLongActiveOperationTimeoutMultiplier); + case StressOperationType::kPrepare: + case StressOperationType::kReopen: + case StressOperationType::kSetOptions: + case StressOperationType::kManualWalFlush: + case StressOperationType::kLockWal: + case StressOperationType::kSyncWal: + case StressOperationType::kMetadata: + case StressOperationType::kDisableFileDeletions: + case StressOperationType::kDisableManualCompaction: + case StressOperationType::kAbortResumeCompactions: + case StressOperationType::kGetProperty: + case StressOperationType::kTableProperties: + case StressOperationType::kSnapshot: + case StressOperationType::kKeyMayExist: + case StressOperationType::kRead: + case StressOperationType::kPrefixScan: + case StressOperationType::kWrite: + case StressOperationType::kDelete: + case StressOperationType::kDeleteRange: + case StressOperationType::kIterate: + case StressOperationType::kCustom: + return no_progress_timeout_micros; + } + return no_progress_timeout_micros; +} + +void PrintLivenessState(SharedState* shared, uint64_t now) { + fprintf(stderr, "Finished db_stress operations=%" PRIu64 "\n", + shared->GetFinishedOps()); + fprintf(stderr, "Successful compactions=%" PRIu64 "\n", + shared->GetSuccessfulCompactions()); + fprintf(stderr, "Completed db_stress operation scopes by type:"); + bool printed_completed_op_count = false; + for (size_t i = 1; i < kStressOperationTypeCount; ++i) { + const auto type = static_cast(i); + const uint64_t count = shared->GetCompletedOpsForDiagnostics(type); + if (count > 0) { + fprintf(stderr, " %s=%" PRIu64, StressOperationTypeName(type), count); + printed_completed_op_count = true; + } + } + if (!printed_completed_op_count) { + fprintf(stderr, " none"); + } + fprintf(stderr, "\n"); + + for (uint32_t tid = 0; tid < shared->GetNumThreads(); ++tid) { + const ThreadOperationSnapshot snapshot = + shared->GetThreadOperationSnapshot(tid); + if (snapshot.type == StressOperationType::kNone || + snapshot.started_micros == 0) { + fprintf(stderr, "thread %" PRIu32 " active_op=none\n", tid); + continue; + } + + fprintf(stderr, + "thread %" PRIu32 " active_op=%s active_op_started_micros=%" PRIu64 + " active_op_elapsed_micros=%" PRIu64 "\n", + tid, StressOperationTypeName(snapshot.type), + snapshot.started_micros, + ElapsedMicros(now, snapshot.started_micros)); + } +} + +bool HasStuckWriteOperation(SharedState* shared, uint64_t now, + uint64_t timeout_micros) { + for (uint32_t tid = 0; tid < shared->GetNumThreads(); ++tid) { + const ThreadOperationSnapshot snapshot = + shared->GetThreadOperationSnapshot(tid); + if (snapshot.type == StressOperationType::kWrite && + snapshot.started_micros != 0 && + ElapsedMicros(now, snapshot.started_micros) >= timeout_micros) { + fprintf(stderr, + "Liveness watchdog detected a stuck write operation on thread " + "%" PRIu32 " for %" PRIu64 " seconds.\n", + tid, FLAGS_liveness_no_progress_timeout_sec); + return true; + } + } + return false; +} + +bool HasTimedOutNonWriteOperation(SharedState* shared, uint64_t now, + uint64_t no_progress_timeout_micros) { + for (uint32_t tid = 0; tid < shared->GetNumThreads(); ++tid) { + const ThreadOperationSnapshot snapshot = + shared->GetThreadOperationSnapshot(tid); + if (snapshot.type == StressOperationType::kWrite || + snapshot.started_micros == 0) { + continue; + } + + const uint64_t timeout_micros = + ActiveOperationTimeoutMicros(snapshot.type, no_progress_timeout_micros); + if (timeout_micros != 0 && + ElapsedMicros(now, snapshot.started_micros) >= timeout_micros) { + fprintf(stderr, + "Liveness watchdog detected a timed out active operation on " + "thread %" PRIu32 + ". active_op=%s active_op_elapsed_micros=%" PRIu64 + " active_op_timeout_micros=%" PRIu64 "\n", + tid, StressOperationTypeName(snapshot.type), + ElapsedMicros(now, snapshot.started_micros), timeout_micros); + return true; + } + } + return false; +} + +} // namespace + +void LivenessWatchdogThread(void* v) { + assert(FLAGS_liveness_check_interval_sec > 0); + assert(FLAGS_liveness_no_progress_timeout_sec > 0); + + auto* thread = static_cast(v); + SharedState* shared = thread->shared; + + uint64_t last_finished_ops = shared->GetFinishedOps(); + uint64_t last_progress_time_micros = raw_env->NowMicros(); + const uint64_t no_progress_timeout_micros = + SecondsToMicrosSaturated(FLAGS_liveness_no_progress_timeout_sec); + + while (true) { + if (StopBgThreadIfNeeded(shared)) { + return; + } + + const bool started = shared->OperationStarted(); + const bool all_operated = shared->OperationFinished(); + + const uint64_t now = raw_env->NowMicros(); + if (!started || all_operated) { + last_finished_ops = shared->GetFinishedOps(); + last_progress_time_micros = now; + raw_env->SleepForMicroseconds(LivenessSleepMicros()); + continue; + } + + const uint64_t finished_ops = shared->GetFinishedOps(); + if (HasStuckWriteOperation(shared, now, no_progress_timeout_micros)) { + PrintLivenessState(shared, now); + fflush(stderr); + shared->TerminateWithoutMutex(); + } else if (HasTimedOutNonWriteOperation(shared, now, + no_progress_timeout_micros)) { + PrintLivenessState(shared, now); + fflush(stderr); + shared->TerminateWithoutMutex(); + } else if (finished_ops != last_finished_ops) { + last_finished_ops = finished_ops; + last_progress_time_micros = now; + } else if (now >= last_progress_time_micros && + now - last_progress_time_micros >= no_progress_timeout_micros) { + fprintf(stderr, + "Liveness watchdog detected no completed db_stress operations " + "for %" PRIu64 " seconds. finished_ops=%" PRIu64 "\n", + FLAGS_liveness_no_progress_timeout_sec, finished_ops); + PrintLivenessState(shared, now); + fflush(stderr); + shared->TerminateWithoutMutex(); + } else if (now < last_progress_time_micros) { + last_progress_time_micros = now; + } + + raw_env->SleepForMicroseconds(LivenessSleepMicros()); + } +} + void PoolSizeChangeThread(void* v) { assert(FLAGS_compaction_thread_pool_adjust_interval > 0); ThreadState* thread = static_cast(v); diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index b9115113170c..0cacd256b67a 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -809,6 +809,8 @@ void PoolSizeChangeThread(void* v); void DbVerificationThread(void* v); +void LivenessWatchdogThread(void* v); + void RemoteCompactionWorkerThread(void* v); void CompressedCacheSetCapacityThread(void* v); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 29d922dd5000..79d5cf5ce4d3 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -95,6 +95,13 @@ bool RunStressTestImpl(SharedState* shared) { shared->SetThreads(n); + const bool liveness_watchdog_enabled = + FLAGS_liveness_check_interval_sec > 0 && + FLAGS_liveness_no_progress_timeout_sec > 0; + if (liveness_watchdog_enabled) { + shared->IncBgThreads(); + } + if (FLAGS_continuous_verification_interval > 0) { shared->IncBgThreads(); } @@ -131,6 +138,11 @@ bool RunStressTestImpl(SharedState* shared) { raw_env->StartThread(DbVerificationThread, &continuous_verification_thread); } + ThreadState liveness_watchdog_thread(0, shared); + if (liveness_watchdog_enabled) { + raw_env->StartThread(LivenessWatchdogThread, &liveness_watchdog_thread); + } + // Spawn at most one CompressedCacheSetCapacityThread globally. The cache // is shared across all DBs, and the thread's SetCapacity(0)/assert(==0) // sequence would race if multiple DBs each spawned their own copy. @@ -268,7 +280,7 @@ bool RunStressTestImpl(SharedState* shared) { } if (FLAGS_compaction_thread_pool_adjust_interval > 0 || - FLAGS_continuous_verification_interval > 0 || + FLAGS_continuous_verification_interval > 0 || liveness_watchdog_enabled || FLAGS_compressed_secondary_cache_size > 0 || FLAGS_compressed_secondary_cache_ratio > 0.0 || remote_compaction_worker_thread_count > 0) { diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 3cc8dbc30c4b..fe33e92f3e44 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -148,6 +148,16 @@ DEFINE_bool(verbose, false, "Verbose"); DEFINE_bool(progress_reports, true, "If true, db_stress will report number of finished operations"); +DEFINE_uint64( + liveness_check_interval_sec, 0, + "If non-zero, check periodically whether db_stress operations are making " + "progress. Requires --liveness_no_progress_timeout_sec to be non-zero."); + +DEFINE_uint64( + liveness_no_progress_timeout_sec, 0, + "If non-zero with --liveness_check_interval_sec, terminate db_stress when " + "no operation completes for this many seconds during the operation phase."); + DEFINE_uint64(db_write_buffer_size, ROCKSDB_NAMESPACE::Options().db_write_buffer_size, "Number of bytes to buffer in all memtables before compacting"); diff --git a/db_stress_tool/db_stress_listener.h b/db_stress_tool/db_stress_listener.h index 9d42b4c0536a..b33eed08bfa0 100644 --- a/db_stress_tool/db_stress_listener.h +++ b/db_stress_tool/db_stress_listener.h @@ -231,6 +231,10 @@ class DbStressListener : public EventListener { precommitted_jobs_.erase(it); } + if (ci.status.ok() && !ci.aborted) { + shared_->IncSuccessfulCompactions(); + } + // pretending doing some work here RandomSleep(); } diff --git a/db_stress_tool/db_stress_shared_state.cc b/db_stress_tool/db_stress_shared_state.cc index 7eaa4740a739..12b6b312f742 100644 --- a/db_stress_tool/db_stress_shared_state.cc +++ b/db_stress_tool/db_stress_shared_state.cc @@ -12,12 +12,14 @@ #include "db_stress_tool/db_stress_shared_state.h" #include "db_stress_tool/db_stress_test_base.h" +#include "rocksdb/env.h" namespace ROCKSDB_NAMESPACE { thread_local bool SharedState::ignore_read_error; -SharedState::SharedState(Env* /*env*/, StressTest* stress_test) +SharedState::SharedState(Env* env, StressTest* stress_test) : cv_(&mu_), + env_(env != nullptr ? env : Env::Default()), seed_(static_cast(FLAGS_seed)), max_key_(FLAGS_max_key), log2_keys_per_lock_(static_cast(FLAGS_log2_keys_per_lock)), @@ -28,16 +30,26 @@ SharedState::SharedState(Env* /*env*/, StressTest* stress_test) num_done_(0), start_(false), start_verify_(false), + operation_started_(false), + operation_finished_(false), num_bg_threads_(0), should_stop_bg_thread_(false), bg_thread_finished_(0), stress_test_(stress_test), + finished_ops_(0), + successful_compactions_(0), + successful_compactions_at_last_compaction_abort_(0), + abort_resume_compactions_running_(false), verification_failure_(false), should_stop_test_(false), no_overwrite_ids_(GenerateNoOverwriteIds()), expected_state_manager_(nullptr), printing_verification_results_(false), - start_timestamp_(Env::Default()->NowNanos()) { + start_timestamp_(env_->NowNanos()) { + for (auto& completed_ops : completed_ops_by_type_) { + completed_ops.store(0, std::memory_order_relaxed); + } + Status status; // TODO: We should introduce a way to explicitly disable verification // during shutdown. When that is disabled and FLAGS_expected_values_dir @@ -111,6 +123,14 @@ SharedState::SharedState(Env* /*env*/, StressTest* stress_test) } } +void SharedState::StartOperation(uint32_t tid, StressOperationType type) { + assert(tid < static_cast(num_threads_)); + ThreadOperationState& state = thread_operation_states_[tid]; + state.started_micros.store(env_->NowMicros(), std::memory_order_relaxed); + state.active_type.store(static_cast(type), + std::memory_order_release); +} + bool SharedState::ShouldVerifyAtBeginning() const { return !stress_test_->GetExpectedValuesDir().empty(); } diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 3b293f94dca0..5abb93997fe6 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -10,6 +10,11 @@ #ifdef GFLAGS #pragma once +#include +#include +#include +#include + #include "db_stress_tool/db_stress_stat.h" #include "db_stress_tool/expected_state.h" // SyncPoint is not supported in Released Windows Mode. @@ -30,6 +35,8 @@ DECLARE_int32(compaction_thread_pool_adjust_interval); DECLARE_int32(continuous_verification_interval); DECLARE_bool(error_recovery_with_no_fault_injection); DECLARE_bool(sync_fault_injection); +DECLARE_uint64(liveness_check_interval_sec); +DECLARE_uint64(liveness_no_progress_timeout_sec); DECLARE_int32(range_deletion_width); DECLARE_bool(disable_wal); DECLARE_int32(manual_wal_flush_one_in); @@ -50,6 +57,137 @@ DECLARE_bool(enable_compaction_filter); namespace ROCKSDB_NAMESPACE { class StressTest; +enum class StressOperationType : uint32_t { + kNone = 0, + kPrepare, + kReopen, + kSetOptions, + kVerifyDb, + kManualWalFlush, + kLockWal, + kSyncWal, + kCompactFiles, + kCompactRange, + kFlush, + kGetLiveFiles, + kMetadata, + kPauseBackground, + kDisableFileDeletions, + kDisableManualCompaction, + kAbortResumeCompactions, + kVerifyChecksum, + kVerifyFileChecksums, + kGetProperty, + kTableProperties, + kIngestExternalFile, + kBackup, + kCheckpoint, + kApproximateSize, + kSnapshot, + kKeyMayExist, + kRead, + kPrefixScan, + kWrite, + kDelete, + kDeleteRange, + kIterate, + kCustom, + kCount, +}; + +static constexpr size_t kStressOperationTypeCount = + static_cast(StressOperationType::kCount); + +inline const char* StressOperationTypeName(StressOperationType type) { + switch (type) { + case StressOperationType::kNone: + return "none"; + case StressOperationType::kPrepare: + return "prepare"; + case StressOperationType::kReopen: + return "reopen"; + case StressOperationType::kSetOptions: + return "set_options"; + case StressOperationType::kVerifyDb: + return "verify_db"; + case StressOperationType::kManualWalFlush: + return "manual_wal_flush"; + case StressOperationType::kLockWal: + return "lock_wal"; + case StressOperationType::kSyncWal: + return "sync_wal"; + case StressOperationType::kCompactFiles: + return "compact_files"; + case StressOperationType::kCompactRange: + return "compact_range"; + case StressOperationType::kFlush: + return "flush"; + case StressOperationType::kGetLiveFiles: + return "get_live_files"; + case StressOperationType::kMetadata: + return "metadata"; + case StressOperationType::kPauseBackground: + return "pause_background"; + case StressOperationType::kDisableFileDeletions: + return "disable_file_deletions"; + case StressOperationType::kDisableManualCompaction: + return "disable_manual_compaction"; + case StressOperationType::kAbortResumeCompactions: + return "abort_resume_compactions"; + case StressOperationType::kVerifyChecksum: + return "verify_checksum"; + case StressOperationType::kVerifyFileChecksums: + return "verify_file_checksums"; + case StressOperationType::kGetProperty: + return "get_property"; + case StressOperationType::kTableProperties: + return "table_properties"; + case StressOperationType::kIngestExternalFile: + return "ingest_external_file"; + case StressOperationType::kBackup: + return "backup"; + case StressOperationType::kCheckpoint: + return "checkpoint"; + case StressOperationType::kApproximateSize: + return "approximate_size"; + case StressOperationType::kSnapshot: + return "snapshot"; + case StressOperationType::kKeyMayExist: + return "key_may_exist"; + case StressOperationType::kRead: + return "read"; + case StressOperationType::kPrefixScan: + return "prefix_scan"; + case StressOperationType::kWrite: + return "write"; + case StressOperationType::kDelete: + return "delete"; + case StressOperationType::kDeleteRange: + return "delete_range"; + case StressOperationType::kIterate: + return "iterate"; + case StressOperationType::kCustom: + return "custom"; + case StressOperationType::kCount: + break; + } + return "unknown"; +} + +struct ThreadOperationSnapshot { + StressOperationType type; + uint64_t started_micros; +}; + +struct ThreadOperationState { + ThreadOperationState() + : active_type(static_cast(StressOperationType::kNone)), + started_micros(0) {} + + std::atomic active_type; + std::atomic started_micros; +}; + struct RemoteCompactionQueueItem { std::string job_id; CompactionServiceJobInfo job_info; @@ -99,11 +237,19 @@ class SharedState { uint32_t GetNumThreads() const { return num_threads_; } - void SetThreads(int num_threads) { num_threads_ = num_threads; } + void SetThreads(int num_threads) { + num_threads_ = num_threads; + thread_operation_states_.reset(new ThreadOperationState[num_threads]); + } void IncInitialized() { num_initialized_++; } - void IncOperated() { num_populated_++; } + void IncOperated() { + num_populated_++; + if (num_populated_ >= num_threads_) { + operation_finished_.store(true, std::memory_order_release); + } + } void IncDone() { num_done_++; } @@ -117,7 +263,10 @@ class SharedState { bool AllVotedReopen() { return (vote_reopen_ == 0); } - void SetStart() { start_ = true; } + void SetStart() { + start_ = true; + operation_started_.store(true, std::memory_order_release); + } void SetStartVerify() { start_verify_ = true; } @@ -125,10 +274,100 @@ class SharedState { bool VerifyStarted() const { return start_verify_; } + bool OperationStarted() const { + return operation_started_.load(std::memory_order_acquire); + } + + bool OperationFinished() const { + return operation_finished_.load(std::memory_order_acquire); + } + void SetVerificationFailure() { verification_failure_.store(true); } bool HasVerificationFailedYet() const { return verification_failure_.load(); } + void IncFinishedOps() { + finished_ops_.fetch_add(1, std::memory_order_relaxed); + } + + uint64_t GetFinishedOps() const { + return finished_ops_.load(std::memory_order_relaxed); + } + + void IncCompletedOpForDiagnostics(StressOperationType type) { + const size_t index = static_cast(type); + if (index == static_cast(StressOperationType::kNone) || + index == static_cast(StressOperationType::kPrepare) || + index >= kStressOperationTypeCount) { + return; + } + completed_ops_by_type_[index].fetch_add(1, std::memory_order_relaxed); + } + + uint64_t GetCompletedOpsForDiagnostics(StressOperationType type) const { + const size_t index = static_cast(type); + if (index >= kStressOperationTypeCount) { + return 0; + } + return completed_ops_by_type_[index].load(std::memory_order_relaxed); + } + + void IncSuccessfulCompactions() { + successful_compactions_.fetch_add(1, std::memory_order_relaxed); + } + + uint64_t GetSuccessfulCompactions() const { + return successful_compactions_.load(std::memory_order_relaxed); + } + + bool TryBeginAbortAndResumeCompactions(uint64_t* successful_compactions) { + bool expected = false; + if (!abort_resume_compactions_running_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel, + std::memory_order_relaxed)) { + return false; + } + + *successful_compactions = GetSuccessfulCompactions(); + if (*successful_compactions <= + successful_compactions_at_last_compaction_abort_.load( + std::memory_order_relaxed)) { + EndAbortAndResumeCompactions(); + return false; + } + return true; + } + + void MarkAbortAndResumeCompactions(uint64_t successful_compactions) { + successful_compactions_at_last_compaction_abort_.store( + successful_compactions, std::memory_order_relaxed); + } + + void EndAbortAndResumeCompactions() { + abort_resume_compactions_running_.store(false, std::memory_order_release); + } + + void StartOperation(uint32_t tid, StressOperationType type); + + void ClearOperation(uint32_t tid) { + assert(tid < static_cast(num_threads_)); + ThreadOperationState& state = thread_operation_states_[tid]; + state.active_type.store(static_cast(StressOperationType::kNone), + std::memory_order_release); + state.started_micros.store(0, std::memory_order_relaxed); + } + + ThreadOperationSnapshot GetThreadOperationSnapshot(uint32_t tid) const { + assert(tid < static_cast(num_threads_)); + const ThreadOperationState& state = thread_operation_states_[tid]; + // This is a diagnostic snapshot, not one atomic value. A concurrent + // start/clear can produce a torn pair; timeout checks must treat `kNone` + // and `started_micros == 0` as no active operation. + const auto type = static_cast( + state.active_type.load(std::memory_order_acquire)); + return {type, state.started_micros.load(std::memory_order_relaxed)}; + } + void SetShouldStopTest() { should_stop_test_.store(true); } bool ShouldStopTest() const { return should_stop_test_.load(); } @@ -353,11 +592,17 @@ class SharedState { void SafeTerminate() { // Grab mutex so that we don't call terminate while another thread is - // attempting to print a stack trace due to the first one + // attempting to print a stack trace due to the first one. MutexLock l(&mu_); std::terminate(); } + void TerminateWithoutMutex() { + // The liveness watchdog must still terminate when the no-progress failure + // mode itself is holding the shared harness mutex. + std::terminate(); + } + private: static void IgnoreReadErrorCallback(void*) { ignore_read_error = true; } @@ -393,6 +638,7 @@ class SharedState { port::Mutex mu_; port::CondVar cv_; + Env* env_; port::Mutex persist_seqno_mu_; const uint32_t seed_; const int64_t max_key_; @@ -404,12 +650,21 @@ class SharedState { long num_done_; bool start_; bool start_verify_; + std::atomic operation_started_; + std::atomic operation_finished_; int num_bg_threads_; bool should_stop_bg_thread_; int bg_thread_finished_; StressTest* stress_test_; + std::atomic finished_ops_; + std::array, kStressOperationTypeCount> + completed_ops_by_type_; + std::atomic successful_compactions_; + std::atomic successful_compactions_at_last_compaction_abort_; + std::atomic abort_resume_compactions_running_; std::atomic verification_failure_; std::atomic should_stop_test_; + std::unique_ptr thread_operation_states_; // Queue for the remote compaction. port::Mutex remote_compaction_queue_mu_; @@ -458,6 +713,37 @@ struct ThreadState { ThreadState(uint32_t index, SharedState* _shared) : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} + + bool LivenessTrackingEnabled() const { + return FLAGS_liveness_check_interval_sec > 0 && + FLAGS_liveness_no_progress_timeout_sec > 0; + } + + void StartOperation(StressOperationType type) { + if (LivenessTrackingEnabled()) { + shared->StartOperation(tid, type); + } + } + + void ClearOperation() { + if (LivenessTrackingEnabled()) { + shared->ClearOperation(tid); + } + } + + void CompletedOpForDiagnostics(StressOperationType type) { + if (LivenessTrackingEnabled()) { + shared->IncCompletedOpForDiagnostics(type); + } + } + + void FinishedSingleOp() { + stats.FinishedSingleOp(); + if (LivenessTrackingEnabled()) { + ClearOperation(); + shared->IncFinishedOps(); + } + } }; } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 5c7857cabc03..5682b883b71a 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. // +#include #include #include @@ -48,6 +49,39 @@ namespace ROCKSDB_NAMESPACE { namespace { +constexpr int kMaxAbortResumeCompactionsSleepMicros = 3 * 1000 * 1000; + +class ScopedThreadOperation { + public: + enum class FinishAction { kRestorePrepare, kKeepActive, kClear }; + + ScopedThreadOperation( + ThreadState* thread, StressOperationType type, + FinishAction finish_action = FinishAction::kRestorePrepare) + : thread_(thread), type_(type), finish_action_(finish_action) { + thread_->StartOperation(type); + } + + ~ScopedThreadOperation() { + thread_->CompletedOpForDiagnostics(type_); + switch (finish_action_) { + case FinishAction::kRestorePrepare: + thread_->StartOperation(StressOperationType::kPrepare); + break; + case FinishAction::kClear: + thread_->ClearOperation(); + break; + case FinishAction::kKeepActive: + break; + } + } + + private: + ThreadState* thread_; + StressOperationType type_; + FinishAction finish_action_; +}; + std::shared_ptr CreateFilterPolicy() { if (FLAGS_bloom_bits < 0) { return BlockBasedTableOptions().filter_policy; @@ -1239,7 +1273,9 @@ void StressTest::OperateDb(ThreadState* thread) { break; } if (open_cnt != 0) { - thread->stats.FinishedSingleOp(); + ScopedThreadOperation op( + thread, StressOperationType::kReopen, + ScopedThreadOperation::FinishAction::kKeepActive); MutexLock l(thread->shared->GetMutex()); while (!thread->snapshot_queue.empty()) { db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); @@ -1258,6 +1294,7 @@ void StressTest::OperateDb(ThreadState* thread) { if (FLAGS_use_trie_index && udi_factory_) { read_opts.table_index_factory = udi_factory_.get(); } + thread->FinishedSingleOp(); } #ifndef NDEBUG @@ -1300,9 +1337,13 @@ void StressTest::OperateDb(ThreadState* thread) { if (thread->shared->HasVerificationFailedYet()) { break; } + ScopedThreadOperation iteration_op( + thread, StressOperationType::kPrepare, + ScopedThreadOperation::FinishAction::kClear); // Change Options if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kSetOptions); Status s = SetOptions(thread); ProcessStatus(shared, "SetOptions", s); } @@ -1313,6 +1354,7 @@ void StressTest::OperateDb(ThreadState* thread) { if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 && thread->rand.OneIn(FLAGS_verify_db_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kVerifyDb); // Temporarily disable error injection for verification if (db_fault_injection_fs_) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); @@ -1330,6 +1372,7 @@ void StressTest::OperateDb(ThreadState* thread) { MaybeClearOneColumnFamily(thread); if (thread->rand.OneInOpt(FLAGS_manual_wal_flush_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kManualWalFlush); bool sync = thread->rand.OneIn(2) ? true : false; Status s = db_->FlushWAL(sync); if (!s.ok() && !IsErrorInjectedAndRetryable(s) && @@ -1340,6 +1383,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_lock_wal_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kLockWal); Status s = db_->LockWAL(); if (!s.ok() && !IsErrorInjectedAndRetryable(s)) { fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str()); @@ -1407,6 +1451,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kSyncWal); Status s = db_->SyncWAL(); if (!s.ok() && !s.IsNotSupported() && !IsErrorInjectedAndRetryable(s)) { fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str()); @@ -1417,6 +1462,7 @@ void StressTest::OperateDb(ThreadState* thread) { ColumnFamilyHandle* column_family = column_families_[rand_column_family]; if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kCompactFiles); TestCompactFiles(thread, column_family); } @@ -1425,6 +1471,7 @@ void StressTest::OperateDb(ThreadState* thread) { Slice key = keystr; if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kCompactRange); TestCompactRange(thread, rand_key, key, column_family); if (thread->shared->HasVerificationFailedYet()) { break; @@ -1439,11 +1486,13 @@ void StressTest::OperateDb(ThreadState* thread) { GenerateColumnFamilies(FLAGS_column_families, rand_column_family); if (thread->rand.OneInOpt(FLAGS_flush_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kFlush); Status status = TestFlush(rand_column_families); ProcessStatus(shared, "Flush", status); } if (thread->rand.OneInOpt(FLAGS_get_live_files_apis_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kGetLiveFiles); Status s_1 = TestGetLiveFiles(); ProcessStatus(shared, "GetLiveFiles", s_1); Status s_2 = TestGetLiveFilesMetaData(); @@ -1457,16 +1506,19 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_get_all_column_family_metadata_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kMetadata); Status status = TestGetAllColumnFamilyMetaData(); ProcessStatus(shared, "GetAllColumnFamilyMetaData", status); } if (thread->rand.OneInOpt(FLAGS_get_sorted_wal_files_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kMetadata); Status status = TestGetSortedWalFiles(); ProcessStatus(shared, "GetSortedWalFiles", status); } if (thread->rand.OneInOpt(FLAGS_get_current_wal_file_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kMetadata); Status status = TestGetCurrentWalFile(); ProcessStatus(shared, "GetCurrentWalFile", status); } @@ -1477,16 +1529,21 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kPauseBackground); Status status = TestPauseBackground(thread); ProcessStatus(shared, "Pause/ContinueBackgroundWork", status); } if (thread->rand.OneInOpt(FLAGS_disable_file_deletions_one_in)) { + ScopedThreadOperation op(thread, + StressOperationType::kDisableFileDeletions); Status status = TestDisableFileDeletions(thread); ProcessStatus(shared, "TestDisableFileDeletions", status); } if (thread->rand.OneInOpt(FLAGS_disable_manual_compaction_one_in)) { + ScopedThreadOperation op(thread, + StressOperationType::kDisableManualCompaction); Status status = TestDisableManualCompaction(thread); ProcessStatus(shared, "TestDisableManualCompaction", status); } @@ -1497,6 +1554,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_verify_checksum_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kVerifyChecksum); ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); ThreadStatusUtil::SetThreadOperation( ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM); @@ -1506,6 +1564,8 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_verify_file_checksums_one_in)) { + ScopedThreadOperation op(thread, + StressOperationType::kVerifyFileChecksums); ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); ThreadStatusUtil::SetThreadOperation( ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS); @@ -1515,6 +1575,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_get_property_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kGetProperty); // TestGetProperty doesn't return status for us to tell whether it has // failed due to injected error. So we disable fault injection to avoid // false positive @@ -1530,6 +1591,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_get_properties_of_all_tables_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kTableProperties); Status status = TestGetPropertiesOfAllTables(); ProcessStatus(shared, "TestGetPropertiesOfAllTables", status); } @@ -1537,10 +1599,13 @@ void StressTest::OperateDb(ThreadState* thread) { std::vector rand_keys = GenerateKeys(rand_key); if (thread->rand.OneInOpt(FLAGS_ingest_external_file_one_in)) { + ScopedThreadOperation op(thread, + StressOperationType::kIngestExternalFile); TestIngestExternalFile(thread, rand_column_families, rand_keys); } if (thread->rand.OneInOpt(FLAGS_backup_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kBackup); // Beyond a certain DB size threshold, this test becomes heavier than // it's worth. uint64_t total_size = 0; @@ -1567,20 +1632,24 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kCheckpoint); Status s = TestCheckpoint(thread, rand_column_families, rand_keys); ProcessStatus(shared, "Checkpoint", s); } if (thread->rand.OneInOpt(FLAGS_approximate_size_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kApproximateSize); Status s = TestApproximateSize(thread, i, rand_column_families, rand_keys); ProcessStatus(shared, "ApproximateSize", s); } if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kSnapshot); TestAcquireSnapshot(thread, rand_column_family, keystr, i); } /*always*/ { + ScopedThreadOperation op(thread, StressOperationType::kSnapshot); Status s = MaybeReleaseSnapshots(thread, i); ProcessStatus(shared, "Snapshot", s); } @@ -1595,6 +1664,7 @@ void StressTest::OperateDb(ThreadState* thread) { } if (thread->rand.OneInOpt(FLAGS_key_may_exist_one_in)) { + ScopedThreadOperation op(thread, StressOperationType::kKeyMayExist); TestKeyMayExist(thread, read_opts, rand_column_families, rand_keys); } // Historical expected-state restore replays exactly @@ -1605,12 +1675,12 @@ void StressTest::OperateDb(ThreadState* thread) { bool disable_fault_injection_during_user_write = db_fault_injection_fs_ && MightHaveUnsyncedDataLoss(); int prob_op = thread->rand.Uniform(100); - // Reset this in case we pick something other than a read op. We don't - // want to use a stale value when deciding at the beginning of the loop - // whether to vote to reopen if (prob_op >= 0 && prob_op < static_cast(FLAGS_readpercent)) { assert(0 <= prob_op); // OPERATION read + ScopedThreadOperation op( + thread, StressOperationType::kRead, + ScopedThreadOperation::FinishAction::kKeepActive); ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); if (FLAGS_use_multi_get_entity) { constexpr uint64_t max_batch_size = 64; @@ -1654,6 +1724,9 @@ void StressTest::OperateDb(ThreadState* thread) { } else if (prob_op < prefix_bound) { assert(static_cast(FLAGS_readpercent) <= prob_op); // OPERATION prefix scan + ScopedThreadOperation op( + thread, StressOperationType::kPrefixScan, + ScopedThreadOperation::FinishAction::kKeepActive); // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same @@ -1662,6 +1735,9 @@ void StressTest::OperateDb(ThreadState* thread) { } else if (prob_op < write_bound) { assert(prefix_bound <= prob_op); // OPERATION write + ScopedThreadOperation op( + thread, StressOperationType::kWrite, + ScopedThreadOperation::FinishAction::kKeepActive); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1673,6 +1749,9 @@ void StressTest::OperateDb(ThreadState* thread) { } else if (prob_op < del_bound) { assert(write_bound <= prob_op); // OPERATION delete + ScopedThreadOperation op( + thread, StressOperationType::kDelete, + ScopedThreadOperation::FinishAction::kKeepActive); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1683,6 +1762,9 @@ void StressTest::OperateDb(ThreadState* thread) { } else if (prob_op < delrange_bound) { assert(del_bound <= prob_op); // OPERATION delete range + ScopedThreadOperation op( + thread, StressOperationType::kDeleteRange, + ScopedThreadOperation::FinishAction::kKeepActive); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1693,6 +1775,9 @@ void StressTest::OperateDb(ThreadState* thread) { } else if (prob_op < iterate_bound) { assert(delrange_bound <= prob_op); // OPERATION iterate + ScopedThreadOperation op( + thread, StressOperationType::kIterate, + ScopedThreadOperation::FinishAction::kKeepActive); if (FLAGS_use_multiscan) { int num_seeks = static_cast( std::min(static_cast(thread->rand.Uniform(64)), @@ -1740,9 +1825,12 @@ void StressTest::OperateDb(ThreadState* thread) { } } else { assert(iterate_bound <= prob_op); + ScopedThreadOperation op( + thread, StressOperationType::kCustom, + ScopedThreadOperation::FinishAction::kKeepActive); TestCustomOperations(thread, rand_column_families); } - thread->stats.FinishedSingleOp(); + thread->FinishedSingleOp(); } #ifndef NDEBUG @@ -3501,17 +3589,79 @@ Status StressTest::TestDisableManualCompaction(ThreadState* thread) { return Status::OK(); } +bool StressTest::ShouldAbortAndResumeCompactions() const { + uint64_t is_write_stopped = 0; + if (db_->GetIntProperty(DB::Properties::kIsWriteStopped, &is_write_stopped) && + is_write_stopped != 0) { + return false; + } + + uint64_t delayed_write_rate = 0; + if (db_->GetIntProperty(DB::Properties::kActualDelayedWriteRate, + &delayed_write_rate) && + delayed_write_rate != 0) { + return false; + } + + uint64_t running_compactions = 0; + const bool has_running_compactions = + db_->GetIntProperty(DB::Properties::kNumRunningCompactions, + &running_compactions) && + running_compactions != 0; + + uint64_t pending_compactions = 0; + uint64_t pending_compaction_bytes = 0; + for (auto* column_family : column_families_) { + uint64_t cf_pending_compactions = 0; + if (db_->GetIntProperty(column_family, DB::Properties::kCompactionPending, + &cf_pending_compactions)) { + pending_compactions += cf_pending_compactions; + } + + uint64_t cf_pending_compaction_bytes = 0; + if (db_->GetIntProperty(column_family, + DB::Properties::kEstimatePendingCompactionBytes, + &cf_pending_compaction_bytes)) { + pending_compaction_bytes += cf_pending_compaction_bytes; + } + } + + return has_running_compactions || pending_compactions != 0 || + pending_compaction_bytes != 0; +} + Status StressTest::TestAbortAndResumeCompactions(ThreadState* thread) { + uint64_t successful_compactions = 0; + if (!thread->shared->TryBeginAbortAndResumeCompactions( + &successful_compactions)) { + return Status::OK(); + } + + if (!ShouldAbortAndResumeCompactions()) { + thread->shared->EndAbortAndResumeCompactions(); + return Status::OK(); + } + + // Each abort must be paid for by at least one completed non-aborted + // compaction, and concurrent abort/resume sections are serialized. This keeps + // aggressive random sampling from starving compactions and causing unrelated + // write stalls. + thread->shared->MarkAbortAndResumeCompactions(successful_compactions); + ScopedThreadOperation op(thread, + StressOperationType::kAbortResumeCompactions); + // Abort all running compactions and prevent new ones from starting db_->AbortAllCompactions(); // Sleep to allow other threads to attempt operations while aborted - // Uses same sleep pattern as TestPauseBackground and - // TestDisableManualCompaction + // Uses the same short-skewed pattern as TestPauseBackground, capped to avoid + // creating long compaction starvation windows. int pwr2_micros = std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); - clock_->SleepForMicroseconds(1 << pwr2_micros); + clock_->SleepForMicroseconds( + std::min(1 << pwr2_micros, kMaxAbortResumeCompactionsSleepMicros)); // Resume compactions db_->ResumeAllCompactions(); + thread->shared->EndAbortAndResumeCompactions(); return Status::OK(); } diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index a26687531edd..e7a1037e5ce6 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -390,6 +390,8 @@ class StressTest { Status TestDisableManualCompaction(ThreadState* thread); + bool ShouldAbortAndResumeCompactions() const; + Status TestAbortAndResumeCompactions(ThreadState* thread); void TestAcquireSnapshot(ThreadState* thread, int rand_column_family, diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 8707946166ba..6972a6e489bb 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -37,6 +37,7 @@ _TSAN_SUPPRESSIONS_FILE = os.path.abspath( os.path.join(os.path.dirname(__file__), "tsan_suppressions.txt") ) +DEFAULT_LIVENESS_TIMEOUT_SEC = 3600 def get_random_seed(override): @@ -116,20 +117,25 @@ def apply_random_seed_per_iteration(): # params overwrite priority: # for default: -# default_params < {blackbox,whitebox}_default_params < args +# default_params < {blackbox,whitebox,liveness}_default_params < args # for simple: -# default_params < {blackbox,whitebox}_default_params < +# default_params < {blackbox,whitebox,liveness}_default_params < # simple_default_params < # {blackbox,whitebox}_simple_default_params < args # for cf_consistency: -# default_params < {blackbox,whitebox}_default_params < +# default_params < {blackbox,whitebox,liveness}_default_params < # cf_consistency_params < args # for txn: -# default_params < {blackbox,whitebox}_default_params < txn_params < args +# default_params < {blackbox,whitebox,liveness}_default_params < +# txn_params < args # for ts: -# default_params < {blackbox,whitebox}_default_params < ts_params < args +# default_params < {blackbox,whitebox,liveness}_default_params < +# ts_params < args # for multiops_txn: -# default_params < {blackbox,whitebox}_default_params < multiops_txn_params < args +# default_params < {blackbox,whitebox,liveness}_default_params < +# multiops_txn_params < args +# Liveness mode reapplies liveness_fault_injection_params after args so the +# base liveness profile cannot accidentally turn fault injection back on. default_params = { @@ -242,7 +248,7 @@ def apply_random_seed_per_iteration(): "pause_background_one_in": lambda: random.choice([10000, 1000000]), "disable_file_deletions_one_in": lambda: random.choice([10000, 1000000]), "disable_manual_compaction_one_in": lambda: random.choice([10000, 1000000]), - "abort_and_resume_compactions_one_in": lambda: random.choice([10000, 1000000]), + "abort_and_resume_compactions_one_in": lambda: random.choice([0, 1000, 10000]), "prefix_size": lambda: random.choice([-1, 1, 5, 7, 8]), "prefixpercent": 5, "progress_reports": 0, @@ -642,6 +648,43 @@ def is_direct_io_supported(dbname): "reopen": 20, } +# Liveness mode inherits the normal randomized stress-test matrix and mixed +# workload by default. The C++ watchdog tracks the active operation and +# per-operation completion counts, so liveness does not need to force an +# all-write workload to keep the failure signal understandable. Keep fault +# injection disabled, though: injected read/open/write/sync/cache faults can +# legitimately put the DB into background-error recovery or a no-progress write +# stall for longer than the liveness timeout. Those cases are covered by other +# test modes. +# The Python wrapper owns duration, so keep db_stress's operation budget high +# enough for long liveness runs. The C++ abort/resume path is additionally +# gated on completed compaction progress and backs off when write stall pressure +# is already visible. +liveness_fault_injection_params = { + "error_recovery_with_no_fault_injection": 0, + "exclude_wal_from_write_fault_injection": 0, + "metadata_read_fault_one_in": 0, + "metadata_write_fault_one_in": 0, + "open_metadata_read_fault_one_in": 0, + "open_metadata_write_fault_one_in": 0, + "open_read_fault_one_in": 0, + "open_write_fault_one_in": 0, + "read_fault_one_in": 0, + "secondary_cache_fault_one_in": 0, + "sync_fault_injection": 0, + "write_fault_one_in": 0, +} + +liveness_default_params = { + "duration": DEFAULT_LIVENESS_TIMEOUT_SEC, + "enable_thread_tracking": 1, + "liveness_check_interval_sec": 1, + "liveness_no_progress_timeout_sec": 300, + "ops_per_thread": 100000000, + "progress_reports": 1, +} +liveness_default_params.update(liveness_fault_injection_params) + simple_default_params = { "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "column_families": 1, @@ -1579,6 +1622,8 @@ def gen_cmd_params(args): params.update(blackbox_default_params) if args.test_type == "whitebox": params.update(whitebox_default_params) + if args.test_type == "liveness": + params.update(liveness_default_params) if args.simple: params.update(simple_default_params) if args.test_type == "blackbox": @@ -1607,6 +1652,7 @@ def gen_cmd_params(args): if ( not args.test_best_efforts_recovery and not args.test_tiered_storage + and args.test_type != "liveness" and params.get("test_secondary", 0) == 0 and random.choice([0] * 9 + [1]) == 1 ): @@ -1632,6 +1678,8 @@ def gen_cmd_params(args): for k, v in vars(args).items(): if v is not None: params[k] = v + if args.test_type == "liveness": + params.update(liveness_fault_injection_params) return params @@ -1931,7 +1979,7 @@ def diagnostic_paths(finalized_params): ] -def execute_cmd(cmd, timeout=None, timeout_pstack=False): +def execute_cmd(cmd, timeout=None, timeout_pstack=False, expected_to_timeout=True): child = subprocess.Popen( cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=stress_cmd_env() ) @@ -1944,11 +1992,20 @@ def execute_cmd(cmd, timeout=None, timeout_pstack=False): try: outs, errs = child.communicate(timeout=timeout) hit_timeout = False - print("WARNING: db_stress ended before kill: exitcode=%d\n" % child.returncode) + if expected_to_timeout: + print( + "WARNING: db_stress ended before kill: exitcode=%d\n" + % child.returncode + ) + else: + print("db_stress ended: exitcode=%d\n" % child.returncode) except subprocess.TimeoutExpired: hit_timeout = True if timeout_pstack: - os.system("pstack %d" % pid) + try: + subprocess.call(["pstack", str(pid)], env=stress_cmd_env()) + except OSError as exc: + print("WARNING: failed to run pstack for pid %d: %s\n" % (pid, exc)) child.terminate() # SIGTERM -- triggers TerminationHandler try: outs, errs = child.communicate(timeout=3) @@ -2096,6 +2153,60 @@ def print_and_cleanup_fault_injection_log(pid): pass +def liveness_timeout(cmd_params): + duration = cmd_params.get("duration", 0) + if duration is None or duration <= 0: + return DEFAULT_LIVENESS_TIMEOUT_SEC + return duration + + +def liveness_main(args, unknown_args): + cmd_params = gen_cmd_params(args) + db_parent_dir = get_db_parent_dir("liveness") + num_dbs = cmd_params.get("num_dbs", 1) + + if not cmd_params.get("db"): + cmd_params["db"] = db_parent_dir + + apply_random_seed_per_iteration() + wrapper_timeout = liveness_timeout(cmd_params) + + print( + "Running liveness-test with \n" + + "liveness_check_interval_sec=" + + str(cmd_params["liveness_check_interval_sec"]) + + "\n" + + "liveness_no_progress_timeout_sec=" + + str(cmd_params["liveness_no_progress_timeout_sec"]) + + "\n" + + "wrapper-duration=" + + str(wrapper_timeout) + + "\n" + ) + + cmd, finalized_params = gen_cmd(dict(list(cmd_params.items())), unknown_args) + hit_timeout, retcode, outs, errs, pid = execute_cmd( + cmd, wrapper_timeout, False, False + ) + + print_and_cleanup_fault_injection_log(pid) + outs, errs = strip_expected_sigterm_stderr(outs, errs, hit_timeout) + print_run_output_and_exit_on_error(args, finalized_params, outs, errs) + + if hit_timeout: + if retcode == -9: + print("TEST FAILED. db_stress ignored SIGTERM on wrapper timeout\n") + sys.exit(1) + print("db_stress reached liveness wrapper duration\n") + cleanup_after_success(cmd_params["db"], num_dbs) + return + if retcode != 0: + print("TEST FAILED. db_stress exited with code %d\n" % retcode) + sys.exit(1) + + cleanup_after_success(cmd_params["db"], num_dbs) + + # This script runs and kills db_stress multiple times. It checks consistency # in case of unsafe crashes in RocksDB. def blackbox_crash_main(args, unknown_args): @@ -2364,7 +2475,7 @@ def main(): description="This script runs and kills \ db_stress multiple times" ) - parser.add_argument("test_type", choices=["blackbox", "whitebox"]) + parser.add_argument("test_type", choices=["blackbox", "whitebox", "liveness"]) parser.add_argument("--simple", action="store_true") parser.add_argument("--cf_consistency", action="store_true") parser.add_argument("--txn", action="store_true") @@ -2381,6 +2492,7 @@ def main(): list(default_params.items()) + list(blackbox_default_params.items()) + list(whitebox_default_params.items()) + + list(liveness_default_params.items()) + list(simple_default_params.items()) + list(blackbox_simple_default_params.items()) + list(whitebox_simple_default_params.items()) @@ -2422,6 +2534,8 @@ def main(): blackbox_crash_main(args, unknown_args) if args.test_type == "whitebox": whitebox_crash_main(args, unknown_args) + if args.test_type == "liveness": + liveness_main(args, unknown_args) # Delete the expected values base dir if test passes. # Per-DB subdirectories (db_0, db_1, ...) live under the base, # so rmtree of the parent cleans everything. diff --git a/tools/db_crashtest_test.py b/tools/db_crashtest_test.py index f72e5814380f..d2dd133805f6 100644 --- a/tools/db_crashtest_test.py +++ b/tools/db_crashtest_test.py @@ -11,6 +11,7 @@ import sys import tempfile import unittest +from types import SimpleNamespace _DB_CRASHTEST_PATH = os.path.join(os.path.dirname(__file__), "db_crashtest.py") @@ -73,6 +74,22 @@ def build_params(self, base_params, overrides=None): params.update(overrides) return params + def build_mode_args(self, test_type="liveness", **overrides): + args = { + "test_type": test_type, + "simple": False, + "cf_consistency": False, + "txn": False, + "optimistic_txn": False, + "test_best_efforts_recovery": False, + "enable_ts": False, + "test_multiops_txn": False, + "test_tiered_storage": False, + "print_stderr_separately": False, + } + args.update(overrides) + return SimpleNamespace(**args) + def test_stress_cmd_env_defaults_tsan_suppressions(self): os.environ.pop(_TSAN_OPTIONS_ENV_VAR, None) db_crashtest = self.load_db_crashtest() @@ -352,6 +369,148 @@ def test_build_out_of_space_diagnostics_summarizes_directory_suffixes(self): diagnostics, ) + def test_liveness_params_keep_mixed_workload_and_enable_watchdog(self): + db_crashtest = self.load_db_crashtest() + + # Liveness mode should inherit the normal mixed workload. The C++ + # watchdog tracks per-thread active operation type, so Python does not + # need to force an all-write workload to keep the signal readable. + params = db_crashtest.gen_cmd_params(self.build_mode_args()) + params["db"] = self.test_tmpdir + finalized_params = db_crashtest.finalize_and_sanitize(params) + + self.assertEqual(db_crashtest.DEFAULT_LIVENESS_TIMEOUT_SEC, params["duration"]) + self.assertEqual(1, params["liveness_check_interval_sec"]) + self.assertEqual(300, params["liveness_no_progress_timeout_sec"]) + self.assertEqual(1, params["enable_thread_tracking"]) + self.assertEqual(1, params["progress_reports"]) + self.assertEqual(100000000, params["ops_per_thread"]) + for fault_param in [ + "error_recovery_with_no_fault_injection", + "exclude_wal_from_write_fault_injection", + "metadata_read_fault_one_in", + "metadata_write_fault_one_in", + "open_metadata_read_fault_one_in", + "open_metadata_write_fault_one_in", + "open_read_fault_one_in", + "open_write_fault_one_in", + "read_fault_one_in", + "secondary_cache_fault_one_in", + "sync_fault_injection", + "write_fault_one_in", + ]: + self.assertEqual(0, finalized_params[fault_param]) + self.assertIn( + finalized_params["abort_and_resume_compactions_one_in"], + [0, 1000, 10000], + ) + self.assertEqual(db_crashtest.default_params["readpercent"], params["readpercent"]) + self.assertEqual( + db_crashtest.default_params["prefixpercent"], params["prefixpercent"] + ) + self.assertEqual( + db_crashtest.default_params["writepercent"], params["writepercent"] + ) + self.assertGreaterEqual(params["writepercent"], 20) + self.assertEqual(db_crashtest.default_params["delpercent"], params["delpercent"]) + self.assertEqual( + db_crashtest.default_params["delrangepercent"], params["delrangepercent"] + ) + self.assertEqual(db_crashtest.default_params["iterpercent"], params["iterpercent"]) + + params = db_crashtest.gen_cmd_params( + self.build_mode_args(read_fault_one_in=32, sync_fault_injection=1) + ) + self.assertEqual(0, params["read_fault_one_in"]) + self.assertEqual(0, params["sync_fault_injection"]) + + def test_liveness_command_passes_watchdog_flags_not_wrapper_duration(self): + db_crashtest = self.load_db_crashtest() + check_interval_sec = 2 + no_progress_timeout_sec = 7 + wrapper_duration_sec = 17 + args = self.build_mode_args( + duration=wrapper_duration_sec, + liveness_check_interval_sec=check_interval_sec, + liveness_no_progress_timeout_sec=no_progress_timeout_sec, + ) + params = db_crashtest.gen_cmd_params(args) + params["db"] = self.test_tmpdir + + # The Python wrapper owns duration; db_stress only needs the watchdog + # interval and no-progress timeout flags. + cmd, _ = db_crashtest.gen_cmd(params, []) + cmd_flags = {} + for arg in cmd: + if arg.startswith("--") and "=" in arg: + key, value = arg[2:].split("=", 1) + cmd_flags[key] = value + + self.assertEqual( + str(check_interval_sec), cmd_flags["liveness_check_interval_sec"] + ) + self.assertEqual( + str(no_progress_timeout_sec), + cmd_flags["liveness_no_progress_timeout_sec"], + ) + self.assertNotIn("duration", cmd_flags) + + def test_liveness_timeout_zero_uses_default_duration(self): + db_crashtest = self.load_db_crashtest() + + # A zero/None duration still needs a finite wrapper runtime. + self.assertEqual( + db_crashtest.DEFAULT_LIVENESS_TIMEOUT_SEC, + db_crashtest.liveness_timeout({"duration": 0}), + ) + self.assertEqual( + db_crashtest.DEFAULT_LIVENESS_TIMEOUT_SEC, + db_crashtest.liveness_timeout({"duration": None}), + ) + self.assertEqual(30, db_crashtest.liveness_timeout({"duration": 30})) + + def test_liveness_wrapper_timeout_is_successful_end_of_run(self): + db_crashtest = self.load_db_crashtest() + execute_calls = [] + cleanups = [] + + def fake_execute_cmd(cmd, timeout=None, timeout_pstack=False, expected_to_timeout=True): + execute_calls.append((cmd, timeout, timeout_pstack, expected_to_timeout)) + return ( + True, + -15, + "Received signal 15 (Terminated)\n", + "", + 123, + ) + + db_crashtest.execute_cmd = fake_execute_cmd + db_crashtest.print_and_cleanup_fault_injection_log = lambda pid: None + db_crashtest.cleanup_after_success = lambda db_arg, num_dbs=1: cleanups.append( + (db_arg, num_dbs) + ) + + db_crashtest.liveness_main(self.build_mode_args(duration=17), []) + + self.assertEqual(1, len(execute_calls)) + self.assertEqual(17, execute_calls[0][1]) + self.assertFalse(execute_calls[0][2]) + self.assertFalse(execute_calls[0][3]) + self.assertEqual(1, len(cleanups)) + + def test_liveness_can_target_transaction_lock_manager(self): + db_crashtest = self.load_db_crashtest() + + # Transaction mode remains composable with liveness mode, which lets + # the same watchdog cover point-lock-manager deadlocks/livelocks. + params = db_crashtest.gen_cmd_params(self.build_mode_args(txn=True)) + + self.assertEqual(1, params["use_txn"]) + self.assertEqual(0, params["use_optimistic_txn"]) + self.assertIn("use_per_key_point_lock_mgr", params) + self.assertEqual(0, params.get("kill_random_test", 0)) + self.assertGreater(params["liveness_no_progress_timeout_sec"], 0) + if __name__ == "__main__": unittest.main() From 00526b60e70e06424de84c725106ec40b92af7ed Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Sun, 7 Jun 2026 15:06:52 -0700 Subject: [PATCH 2/5] Fix lost flush request after partial memtable pick A flush request can be capped at an older memtable while newer immutable memtables are added. PickMemtablesToFlush() could pick only part of the pending list and clear flush_requested_, causing the newer queued flush request to be skipped and leaving an immutable memtable unflushed. Keep flush_requested_ until all unstarted immutable memtables have been picked, and add unit and DB-level regressions for the lost-request case. --- db/external_sst_file_test.cc | 105 +++++++++++++++++++++++++++++++++++ db/memtable_list.cc | 11 ++-- db/memtable_list.h | 10 ++-- db/memtable_list_test.cc | 50 ++++++++++++++++- 4 files changed, 164 insertions(+), 12 deletions(-) diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index c4cc09797af2..cf73c6758774 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -5,6 +5,7 @@ #include +#include #include #include #include @@ -1769,6 +1770,110 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { ASSERT_OK(DeprecatedAddFile({file_path})); } +TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { + Options options = CurrentOptions(); + options.atomic_flush = false; + options.disable_auto_compactions = true; + options.write_buffer_size = 1024; + options.max_write_buffer_number = 8; + options.min_write_buffer_number_to_merge = 2; + env_->SetBackgroundThreads(1, Env::HIGH); + Reopen(options); + + std::string external_file_path; + std::vector> external_file_data = { + {"ingest_key", "ingested_value"}}; + ASSERT_OK(GenerateOneExternalFile(options, nullptr, external_file_data, -1, + true /* sort_data */, &external_file_path, + nullptr /* true_data */)); + + auto sleeping_task = std::make_shared(); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + sleeping_task.get(), Env::Priority::HIGH); + sleeping_task->WaitUntilSleeping(); + + std::atomic atomic_flush_scheduled{0}; + std::atomic ingest_waiting_for_flush{false}; + std::atomic ingest_done{false}; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::AtomicFlushMemTables:AfterScheduleFlush", [&](void*) { + atomic_flush_scheduled.fetch_add(1, std::memory_order_acq_rel); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTable:BeforeWaitForBgFlush", [&](void*) { + ingest_waiting_for_flush.store(true, std::memory_order_release); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("atomic_flush_key", "value")); + + Status get_live_files_status[2]; + std::vector live_files[2]; + port::Thread get_live_files_thread_1([&] { + LiveFilesStorageInfoOptions lfsi_opts; + lfsi_opts.wal_size_for_flush = 0; + lfsi_opts.atomic_flush = true; + get_live_files_status[0] = + db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[0]); + }); + port::Thread get_live_files_thread_2([&] { + LiveFilesStorageInfoOptions lfsi_opts; + lfsi_opts.wal_size_for_flush = 0; + lfsi_opts.atomic_flush = true; + get_live_files_status[1] = + db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[1]); + }); + + while (atomic_flush_scheduled.load(std::memory_order_acquire) < 2) { + env_->SleepForMicroseconds(1000); + } + + ASSERT_OK(Put("ingest_key", "memtable_value")); + + Status ingest_status; + port::Thread ingest_thread([&] { + IngestExternalFileOptions ifo; + ingest_status = db_->IngestExternalFile({external_file_path}, ifo); + ingest_done.store(true, std::memory_order_release); + }); + + while (!ingest_waiting_for_flush.load(std::memory_order_acquire) && + !ingest_done.load(std::memory_order_acquire)) { + env_->SleepForMicroseconds(1000); + } + ASSERT_FALSE(ingest_done.load(std::memory_order_acquire)); + + sleeping_task->WakeUp(); + sleeping_task->WaitUntilDone(); + get_live_files_thread_1.join(); + get_live_files_thread_2.join(); + ASSERT_OK(get_live_files_status[0]); + ASSERT_OK(get_live_files_status[1]); + + for (int i = 0; i < 500 && !ingest_done.load(std::memory_order_acquire); + ++i) { + env_->SleepForMicroseconds(10000); + } + + if (!ingest_done.load(std::memory_order_acquire)) { + dbfull()->CancelAllBackgroundWork(false /* wait */); + ingest_thread.join(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + FAIL() << "IngestExternalFile remained blocked waiting for a flush that " + "was deduped behind an older queued flush request"; + } + + ingest_thread.join(); + ASSERT_OK(ingest_status); + ASSERT_EQ("ingested_value", Get("ingest_key")); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->LoadDependency( diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 9e99c74db6e8..42c9b3b47215 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -416,7 +416,6 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); const auto& memlist = current_->memlist_; - bool atomic_flush = false; // Note: every time MemTableList::Add(mem) is called, it adds the new mem // at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by @@ -427,9 +426,6 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, auto it = memlist.rbegin(); for (; it != memlist.rend(); ++it) { ReadOnlyMemTable* m = *it; - if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { - atomic_flush = true; - } if (m->GetID() > max_memtable_id) { break; } @@ -460,7 +456,12 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, // since they map to the same WAL and have the same NextLogNumber(). assert(strcmp((*it)->Name(), "WBWIMemTable") != 0); } - if (!atomic_flush || num_flush_not_started_ == 0) { + // The flush request is complete only after every unstarted immutable + // memtable has been picked. A request can be older than a newer immutable + // memtable; clearing flush_requested_ after a partial or empty pick would + // make the queued newer request look unnecessary and leave that memtable + // unflushed. + if (num_flush_not_started_ == 0) { flush_requested_ = false; // start-flush request is complete } } diff --git a/db/memtable_list.h b/db/memtable_list.h index a8e36550a531..ddea8a53f32b 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -362,11 +362,11 @@ class MemTableList { // Returns an estimate of the timestamp of the earliest key. uint64_t ApproximateOldestKeyTime() const; - // Request a flush of all existing memtables to storage. This will - // cause future calls to IsFlushPending() to return true if this list is - // non-empty (regardless of the min_write_buffer_number_to_merge - // parameter). This flush request will persist until the next time - // PickMemtablesToFlush() is called. + // Request a flush of all existing memtables to storage. This will cause + // future calls to IsFlushPending() to return true if this list is non-empty + // (regardless of the min_write_buffer_number_to_merge parameter). This flush + // request will persist until PickMemtablesToFlush() has picked all unstarted + // memtables. void FlushRequested() { flush_requested_ = true; // If there are some memtables stored in imm() that don't trigger diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index e5e38984d060..a2a3d1763478 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -890,8 +890,8 @@ TEST_F(MemTableListTest, FlushPendingTest) { ASSERT_TRUE(to_flush5.empty()); ASSERT_EQ(1, list.NumNotFlushed()); ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire)); - ASSERT_FALSE(list.IsFlushPending()); - ASSERT_FALSE(list.HasFlushRequested()); + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.HasFlushRequested()); // Pick tables to flush. The tables to pick must have ID smaller than or // equal to 5. Therefore, only tables[5] will be selected. @@ -920,6 +920,52 @@ TEST_F(MemTableListTest, FlushPendingTest) { to_delete.clear(); } +TEST_F(MemTableListTest, FlushRequestPersistsAfterPartialPick) { + SequenceNumber seq = 1; + + auto factory = std::make_shared(); + options.memtable_factory = factory; + ImmutableOptions ioptions(options); + InternalKeyComparator cmp(BytewiseComparator()); + WriteBufferManager wb(options.db_write_buffer_size); + MutableCFOptions mutable_cf_options(options); + autovector to_delete; + + MemTableList list(10 /* min_write_buffer_number_to_merge */, + 0 /* max_write_buffer_size_to_maintain */); + + for (uint64_t memtable_id = 0; memtable_id < 2; ++memtable_id) { + MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb, + kMaxSequenceNumber, 0 /* column_family_id */); + mem->SetID(memtable_id); + mem->Ref(); + ASSERT_OK(mem->Add(++seq, kTypeValue, "key" + std::to_string(memtable_id), + "value", nullptr /* kv_prot_info */)); + list.Add(mem, &to_delete); + } + + list.FlushRequested(); + ASSERT_TRUE(list.IsFlushPending()); + + autovector to_flush; + list.PickMemtablesToFlush(0 /* max_memtable_id */, &to_flush); + ASSERT_EQ(1, to_flush.size()); + ASSERT_TRUE(list.IsFlushPending()); + ASSERT_TRUE(list.HasFlushRequested()); + + autovector to_flush2; + list.PickMemtablesToFlush(1 /* max_memtable_id */, &to_flush2); + ASSERT_EQ(1, to_flush2.size()); + ASSERT_FALSE(list.IsFlushPending()); + ASSERT_FALSE(list.HasFlushRequested()); + + list.current()->Unref(&to_delete); + ASSERT_EQ(2, to_delete.size()); + for (ReadOnlyMemTable* m : to_delete) { + delete m; + } +} + TEST_F(MemTableListTest, EmptyAtomicFlushTest) { autovector lists; autovector cf_ids; From e4d90c486f42f866fd3914419ab74c2a0881ae1f Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Mon, 8 Jun 2026 09:10:53 -0700 Subject: [PATCH 3/5] refine scope tracking --- db_stress_tool/db_stress_shared_state.cc | 14 ++++- db_stress_tool/db_stress_shared_state.h | 73 +++++++++++++++++++----- db_stress_tool/db_stress_test_base.cc | 49 ++++++++-------- 3 files changed, 95 insertions(+), 41 deletions(-) diff --git a/db_stress_tool/db_stress_shared_state.cc b/db_stress_tool/db_stress_shared_state.cc index 12b6b312f742..1f5e3a301ea4 100644 --- a/db_stress_tool/db_stress_shared_state.cc +++ b/db_stress_tool/db_stress_shared_state.cc @@ -123,12 +123,20 @@ SharedState::SharedState(Env* env, StressTest* stress_test) } } -void SharedState::StartOperation(uint32_t tid, StressOperationType type) { +bool SharedState::PushOperation(uint32_t tid, StressOperationType type) { assert(tid < static_cast(num_threads_)); ThreadOperationState& state = thread_operation_states_[tid]; - state.started_micros.store(env_->NowMicros(), std::memory_order_relaxed); - state.active_type.store(static_cast(type), + const uint32_t depth = state.depth.load(std::memory_order_relaxed); + assert(depth < kMaxThreadOperationStackDepth); + if (depth >= kMaxThreadOperationStackDepth) { + return false; + } + ThreadOperationFrame& frame = state.frames[depth]; + frame.started_micros.store(env_->NowMicros(), std::memory_order_relaxed); + frame.active_type.store(static_cast(type), std::memory_order_release); + state.depth.store(depth + 1, std::memory_order_release); + return true; } bool SharedState::ShouldVerifyAtBeginning() const { diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index 5abb93997fe6..b2203fa73fd9 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -179,8 +179,8 @@ struct ThreadOperationSnapshot { uint64_t started_micros; }; -struct ThreadOperationState { - ThreadOperationState() +struct ThreadOperationFrame { + ThreadOperationFrame() : active_type(static_cast(StressOperationType::kNone)), started_micros(0) {} @@ -188,6 +188,15 @@ struct ThreadOperationState { std::atomic started_micros; }; +static constexpr size_t kMaxThreadOperationStackDepth = 16; + +struct ThreadOperationState { + ThreadOperationState() : depth(0) {} + + std::array frames; + std::atomic depth; +}; + struct RemoteCompactionQueueItem { std::string job_id; CompactionServiceJobInfo job_info; @@ -347,25 +356,51 @@ class SharedState { abort_resume_compactions_running_.store(false, std::memory_order_release); } - void StartOperation(uint32_t tid, StressOperationType type); + bool PushOperation(uint32_t tid, StressOperationType type); + + bool PopOperation(uint32_t tid, StressOperationType type) { + assert(tid < static_cast(num_threads_)); + ThreadOperationState& state = thread_operation_states_[tid]; + const uint32_t depth = state.depth.load(std::memory_order_relaxed); + assert(depth > 0); + if (depth == 0 || depth > kMaxThreadOperationStackDepth) { + return false; + } + const uint32_t top_index = depth - 1; + const uint32_t active_type = + state.frames[top_index].active_type.load(std::memory_order_acquire); + assert(active_type == static_cast(type)); + if (active_type != static_cast(type)) { + return false; + } + state.depth.store(top_index, std::memory_order_release); + state.frames[top_index].active_type.store( + static_cast(StressOperationType::kNone), + std::memory_order_relaxed); + state.frames[top_index].started_micros.store(0, std::memory_order_relaxed); + return true; + } void ClearOperation(uint32_t tid) { assert(tid < static_cast(num_threads_)); ThreadOperationState& state = thread_operation_states_[tid]; - state.active_type.store(static_cast(StressOperationType::kNone), - std::memory_order_release); - state.started_micros.store(0, std::memory_order_relaxed); + state.depth.store(0, std::memory_order_release); } ThreadOperationSnapshot GetThreadOperationSnapshot(uint32_t tid) const { assert(tid < static_cast(num_threads_)); const ThreadOperationState& state = thread_operation_states_[tid]; // This is a diagnostic snapshot, not one atomic value. A concurrent - // start/clear can produce a torn pair; timeout checks must treat `kNone` + // push/pop/clear can produce a torn pair; timeout checks must treat `kNone` // and `started_micros == 0` as no active operation. + const uint32_t depth = state.depth.load(std::memory_order_acquire); + if (depth == 0 || depth > kMaxThreadOperationStackDepth) { + return {StressOperationType::kNone, 0}; + } + const ThreadOperationFrame& frame = state.frames[depth - 1]; const auto type = static_cast( - state.active_type.load(std::memory_order_acquire)); - return {type, state.started_micros.load(std::memory_order_relaxed)}; + frame.active_type.load(std::memory_order_acquire)); + return {type, frame.started_micros.load(std::memory_order_relaxed)}; } void SetShouldStopTest() { should_stop_test_.store(true); } @@ -719,10 +754,18 @@ struct ThreadState { FLAGS_liveness_no_progress_timeout_sec > 0; } - void StartOperation(StressOperationType type) { + bool PushOperation(StressOperationType type) { if (LivenessTrackingEnabled()) { - shared->StartOperation(tid, type); + return shared->PushOperation(tid, type); } + return false; + } + + bool PopOperation(StressOperationType type) { + if (LivenessTrackingEnabled()) { + return shared->PopOperation(tid, type); + } + return false; } void ClearOperation() { @@ -737,13 +780,17 @@ struct ThreadState { } } - void FinishedSingleOp() { + void FinishSingleOpWhileOperationActive() { stats.FinishedSingleOp(); if (LivenessTrackingEnabled()) { - ClearOperation(); shared->IncFinishedOps(); } } + + void FinishedSingleOp() { + FinishSingleOpWhileOperationActive(); + ClearOperation(); + } }; } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 5682b883b71a..eca657275747 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -53,33 +53,36 @@ constexpr int kMaxAbortResumeCompactionsSleepMicros = 3 * 1000 * 1000; class ScopedThreadOperation { public: - enum class FinishAction { kRestorePrepare, kKeepActive, kClear }; + enum class FinishAction { kPop, kFinishSingleOp }; - ScopedThreadOperation( - ThreadState* thread, StressOperationType type, - FinishAction finish_action = FinishAction::kRestorePrepare) - : thread_(thread), type_(type), finish_action_(finish_action) { - thread_->StartOperation(type); + ScopedThreadOperation(ThreadState* thread, StressOperationType type, + FinishAction finish_action = FinishAction::kPop) + : thread_(thread), + type_(type), + finish_action_(finish_action), + pushed_(false) { + pushed_ = thread_->PushOperation(type); } ~ScopedThreadOperation() { thread_->CompletedOpForDiagnostics(type_); switch (finish_action_) { - case FinishAction::kRestorePrepare: - thread_->StartOperation(StressOperationType::kPrepare); + case FinishAction::kPop: break; - case FinishAction::kClear: - thread_->ClearOperation(); - break; - case FinishAction::kKeepActive: + case FinishAction::kFinishSingleOp: + thread_->FinishSingleOpWhileOperationActive(); break; } + if (pushed_) { + thread_->PopOperation(type_); + } } private: ThreadState* thread_; StressOperationType type_; FinishAction finish_action_; + bool pushed_; }; std::shared_ptr CreateFilterPolicy() { @@ -1275,7 +1278,7 @@ void StressTest::OperateDb(ThreadState* thread) { if (open_cnt != 0) { ScopedThreadOperation op( thread, StressOperationType::kReopen, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); MutexLock l(thread->shared->GetMutex()); while (!thread->snapshot_queue.empty()) { db_->ReleaseSnapshot(thread->snapshot_queue.front().second.snapshot); @@ -1294,7 +1297,6 @@ void StressTest::OperateDb(ThreadState* thread) { if (FLAGS_use_trie_index && udi_factory_) { read_opts.table_index_factory = udi_factory_.get(); } - thread->FinishedSingleOp(); } #ifndef NDEBUG @@ -1337,9 +1339,7 @@ void StressTest::OperateDb(ThreadState* thread) { if (thread->shared->HasVerificationFailedYet()) { break; } - ScopedThreadOperation iteration_op( - thread, StressOperationType::kPrepare, - ScopedThreadOperation::FinishAction::kClear); + ScopedThreadOperation iteration_op(thread, StressOperationType::kPrepare); // Change Options if (thread->rand.OneInOpt(FLAGS_set_options_one_in)) { @@ -1680,7 +1680,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION read ScopedThreadOperation op( thread, StressOperationType::kRead, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking); if (FLAGS_use_multi_get_entity) { constexpr uint64_t max_batch_size = 64; @@ -1726,7 +1726,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION prefix scan ScopedThreadOperation op( thread, StressOperationType::kPrefixScan, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same @@ -1737,7 +1737,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION write ScopedThreadOperation op( thread, StressOperationType::kWrite, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1751,7 +1751,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION delete ScopedThreadOperation op( thread, StressOperationType::kDelete, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1764,7 +1764,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION delete range ScopedThreadOperation op( thread, StressOperationType::kDeleteRange, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); if (disable_fault_injection_during_user_write) { db_fault_injection_fs_->DisableAllThreadLocalErrorInjection(); } @@ -1777,7 +1777,7 @@ void StressTest::OperateDb(ThreadState* thread) { // OPERATION iterate ScopedThreadOperation op( thread, StressOperationType::kIterate, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); if (FLAGS_use_multiscan) { int num_seeks = static_cast( std::min(static_cast(thread->rand.Uniform(64)), @@ -1827,10 +1827,9 @@ void StressTest::OperateDb(ThreadState* thread) { assert(iterate_bound <= prob_op); ScopedThreadOperation op( thread, StressOperationType::kCustom, - ScopedThreadOperation::FinishAction::kKeepActive); + ScopedThreadOperation::FinishAction::kFinishSingleOp); TestCustomOperations(thread, rand_column_families); } - thread->FinishedSingleOp(); } #ifndef NDEBUG From 1937126b5fdfde4f61a6d56fb2b8a70f92c522a8 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Fri, 26 Jun 2026 05:35:38 -0700 Subject: [PATCH 4/5] address feedback --- db/external_sst_file_test.cc | 166 ++++++++++++++++++++---- db_stress_tool/db_stress_common.cc | 16 ++- db_stress_tool/db_stress_shared_state.h | 4 +- db_stress_tool/db_stress_test_base.cc | 16 +-- 4 files changed, 166 insertions(+), 36 deletions(-) diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 8aa8d6aa4116..52ee34462941 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -2416,6 +2416,13 @@ TEST_F(ExternalSSTFileTest, SstFileWriterNonSharedKeys) { } TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { + // These waits only prevent broken test paths from hanging forever. Keep them + // large enough to avoid treating slow CI scheduling as a regression. + constexpr uint64_t kHangGuardMicros = 30 * 1000 * 1000; + constexpr uint64_t kSetupWaitMicros = kHangGuardMicros; + constexpr uint64_t kIngestWaitMicros = kHangGuardMicros; + constexpr uint64_t kPollMicros = 1000; + Options options = CurrentOptions(); options.atomic_flush = false; options.disable_auto_compactions = true; @@ -2438,6 +2445,7 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { sleeping_task->WaitUntilSleeping(); std::atomic atomic_flush_scheduled{0}; + std::atomic get_live_files_done{0}; std::atomic ingest_waiting_for_flush{false}; std::atomic ingest_done{false}; SyncPoint::GetInstance()->DisableProcessing(); @@ -2452,6 +2460,25 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { }); SyncPoint::GetInstance()->EnableProcessing(); + auto wait_until = [&](const std::function& predicate, + uint64_t timeout_micros) { + for (uint64_t waited_micros = 0; waited_micros < timeout_micros; + waited_micros += kPollMicros) { + if (predicate()) { + return true; + } + env_->SleepForMicroseconds(kPollMicros); + } + return predicate(); + }; + + auto wake_sleeping_task = [&] { + if (!sleeping_task->WokenUp()) { + sleeping_task->WakeUp(); + } + return !sleeping_task->TimedWaitUntilDone(kSetupWaitMicros); + }; + ASSERT_OK(Put("atomic_flush_key", "value")); Status get_live_files_status[2]; @@ -2462,6 +2489,7 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { lfsi_opts.atomic_flush = true; get_live_files_status[0] = db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[0]); + get_live_files_done.fetch_add(1, std::memory_order_acq_rel); }); port::Thread get_live_files_thread_2([&] { LiveFilesStorageInfoOptions lfsi_opts; @@ -2469,10 +2497,46 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { lfsi_opts.atomic_flush = true; get_live_files_status[1] = db_->GetLiveFilesStorageInfo(lfsi_opts, &live_files[1]); + get_live_files_done.fetch_add(1, std::memory_order_acq_rel); }); - while (atomic_flush_scheduled.load(std::memory_order_acquire) < 2) { - env_->SleepForMicroseconds(1000); + const bool saw_both_atomic_flushes = wait_until( + [&] { + return atomic_flush_scheduled.load(std::memory_order_acquire) >= 2; + }, + kSetupWaitMicros); + + if (!saw_both_atomic_flushes) { + const bool sleeping_task_done = wake_sleeping_task(); + if (!wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros)) { + dbfull()->CancelAllBackgroundWork(false /* wait */); + wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros); + if (get_live_files_done.load(std::memory_order_acquire) != 2) { + dbfull()->CancelAllBackgroundWork(true /* wait */); + wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros); + } + } + get_live_files_thread_1.join(); + get_live_files_thread_2.join(); + ASSERT_TRUE(saw_both_atomic_flushes) + << "Timed out waiting for both GetLiveFilesStorageInfo calls to " + "schedule their flushes. scheduled=" + << atomic_flush_scheduled.load(std::memory_order_acquire) + << " get_live_files_done=" + << get_live_files_done.load(std::memory_order_acquire) + << " sleeping_task_done=" << sleeping_task_done; } ASSERT_OK(Put("ingest_key", "memtable_value")); @@ -2484,39 +2548,97 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { ingest_done.store(true, std::memory_order_release); }); - while (!ingest_waiting_for_flush.load(std::memory_order_acquire) && - !ingest_done.load(std::memory_order_acquire)) { - env_->SleepForMicroseconds(1000); + const bool saw_ingest_wait = wait_until( + [&] { + return ingest_waiting_for_flush.load(std::memory_order_acquire) || + ingest_done.load(std::memory_order_acquire); + }, + kSetupWaitMicros); + const bool ingest_finished_before_wait = + ingest_done.load(std::memory_order_acquire); + + if (!saw_ingest_wait || ingest_finished_before_wait) { + const bool sleeping_task_done = wake_sleeping_task(); + if (!wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros)) { + dbfull()->CancelAllBackgroundWork(false /* wait */); + wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros); + if (get_live_files_done.load(std::memory_order_acquire) != 2) { + dbfull()->CancelAllBackgroundWork(true /* wait */); + wait_until( + [&] { + return get_live_files_done.load(std::memory_order_acquire) == 2; + }, + kSetupWaitMicros); + } + } + get_live_files_thread_1.join(); + get_live_files_thread_2.join(); + if (!wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kSetupWaitMicros)) { + dbfull()->CancelAllBackgroundWork(false /* wait */); + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kSetupWaitMicros); + if (!ingest_done.load(std::memory_order_acquire)) { + dbfull()->CancelAllBackgroundWork(true /* wait */); + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kSetupWaitMicros); + } + } + ingest_thread.join(); + ASSERT_TRUE(sleeping_task_done); + ASSERT_TRUE(saw_ingest_wait) + << "Timed out waiting for IngestExternalFile to enter its flush wait"; + ASSERT_FALSE(ingest_finished_before_wait) + << "IngestExternalFile finished before entering the expected flush " + "wait"; } - ASSERT_FALSE(ingest_done.load(std::memory_order_acquire)); - sleeping_task->WakeUp(); - sleeping_task->WaitUntilDone(); + const bool sleeping_task_done = wake_sleeping_task(); get_live_files_thread_1.join(); get_live_files_thread_2.join(); - ASSERT_OK(get_live_files_status[0]); - ASSERT_OK(get_live_files_status[1]); - for (int i = 0; i < 500 && !ingest_done.load(std::memory_order_acquire); - ++i) { - env_->SleepForMicroseconds(10000); + const bool ingest_completed_without_unblock = + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kIngestWaitMicros); + Status unblock_flush_status; + + if (!ingest_completed_without_unblock) { + FlushOptions unblock_flush_options; + unblock_flush_options.wait = false; + unblock_flush_status = db_->Flush(unblock_flush_options); + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kIngestWaitMicros); } - if (!ingest_done.load(std::memory_order_acquire)) { dbfull()->CancelAllBackgroundWork(false /* wait */); - ingest_thread.join(); - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); - FAIL() << "IngestExternalFile remained blocked waiting for a flush that " - "was deduped behind an older queued flush request"; + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kIngestWaitMicros); + if (!ingest_done.load(std::memory_order_acquire)) { + dbfull()->CancelAllBackgroundWork(true /* wait */); + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kIngestWaitMicros); + } } ingest_thread.join(); + ASSERT_TRUE(sleeping_task_done); + ASSERT_OK(get_live_files_status[0]); + ASSERT_OK(get_live_files_status[1]); + if (!ingest_completed_without_unblock) { + ASSERT_OK(unblock_flush_status); + FAIL() << "IngestExternalFile remained blocked waiting for a flush that " + "was deduped behind an older queued flush request"; + } ASSERT_OK(ingest_status); ASSERT_EQ("ingested_value", Get("ingest_key")); - - SyncPoint::GetInstance()->DisableProcessing(); - SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_F(ExternalSSTFileTest, WithUnorderedWrite) { diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 4c3818da6bee..fa8a770a3eba 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -244,13 +244,21 @@ bool HasStuckWriteOperation(SharedState* shared, uint64_t now, for (uint32_t tid = 0; tid < shared->GetNumThreads(); ++tid) { const ThreadOperationSnapshot snapshot = shared->GetThreadOperationSnapshot(tid); + const uint64_t elapsed_micros = ElapsedMicros(now, snapshot.started_micros); if (snapshot.type == StressOperationType::kWrite && - snapshot.started_micros != 0 && - ElapsedMicros(now, snapshot.started_micros) >= timeout_micros) { + snapshot.started_micros != 0 && elapsed_micros >= timeout_micros) { + const uint64_t elapsed_seconds = + std::chrono::duration_cast(UIntMicros(elapsed_micros)) + .count(); + const uint64_t timeout_seconds = + std::chrono::duration_cast(UIntMicros(timeout_micros)) + .count(); fprintf(stderr, "Liveness watchdog detected a stuck write operation on thread " - "%" PRIu32 " for %" PRIu64 " seconds.\n", - tid, FLAGS_liveness_no_progress_timeout_sec); + "%" PRIu32 " for %" PRIu64 + " seconds. active_op_elapsed_micros=%" PRIu64 + " active_op_timeout_seconds=%" PRIu64 "\n", + tid, elapsed_seconds, elapsed_micros, timeout_seconds); return true; } } diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index b2203fa73fd9..c9d8e014093f 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -625,14 +625,14 @@ class SharedState { uint64_t GetStartTimestamp() const { return start_timestamp_; } - void SafeTerminate() { + [[noreturn]] void SafeTerminate() { // Grab mutex so that we don't call terminate while another thread is // attempting to print a stack trace due to the first one. MutexLock l(&mu_); std::terminate(); } - void TerminateWithoutMutex() { + [[noreturn]] void TerminateWithoutMutex() { // The liveness watchdog must still terminate when the no-progress failure // mode itself is holding the shared harness mutex. std::terminate(); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 740b416de9c4..44e091a5c1dc 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -72,15 +72,15 @@ class ScopedThreadOperation { } ~ScopedThreadOperation() { - thread_->CompletedOpForDiagnostics(type_); - switch (finish_action_) { - case FinishAction::kPop: - break; - case FinishAction::kFinishSingleOp: - thread_->FinishSingleOpWhileOperationActive(); - break; - } if (pushed_) { + thread_->CompletedOpForDiagnostics(type_); + switch (finish_action_) { + case FinishAction::kPop: + break; + case FinishAction::kFinishSingleOp: + thread_->FinishSingleOpWhileOperationActive(); + break; + } thread_->PopOperation(type_); } } From 5b270796587ec79f18c31a50c66cd0e6f4346315 Mon Sep 17 00:00:00 2001 From: Xingbo Wang Date: Fri, 26 Jun 2026 08:55:39 -0700 Subject: [PATCH 5/5] fix unit test status check --- db/external_sst_file_test.cc | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 52ee34462941..e6308729cce4 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -2530,6 +2530,8 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { } get_live_files_thread_1.join(); get_live_files_thread_2.join(); + EXPECT_OK(get_live_files_status[0]); + EXPECT_OK(get_live_files_status[1]); ASSERT_TRUE(saw_both_atomic_flushes) << "Timed out waiting for both GetLiveFilesStorageInfo calls to " "schedule their flushes. scheduled=" @@ -2593,6 +2595,9 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { } } ingest_thread.join(); + EXPECT_OK(get_live_files_status[0]); + EXPECT_OK(get_live_files_status[1]); + EXPECT_OK(ingest_status); ASSERT_TRUE(sleeping_task_done); ASSERT_TRUE(saw_ingest_wait) << "Timed out waiting for IngestExternalFile to enter its flush wait"; @@ -2608,35 +2613,36 @@ TEST_F(ExternalSSTFileTest, IngestFlushRequestNotLostBehindQueuedFlush) { const bool ingest_completed_without_unblock = wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, kIngestWaitMicros); - Status unblock_flush_status; if (!ingest_completed_without_unblock) { FlushOptions unblock_flush_options; unblock_flush_options.wait = false; - unblock_flush_status = db_->Flush(unblock_flush_options); - wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, - kIngestWaitMicros); - } - if (!ingest_done.load(std::memory_order_acquire)) { - dbfull()->CancelAllBackgroundWork(false /* wait */); + const Status unblock_flush_status = db_->Flush(unblock_flush_options); wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, kIngestWaitMicros); if (!ingest_done.load(std::memory_order_acquire)) { - dbfull()->CancelAllBackgroundWork(true /* wait */); + dbfull()->CancelAllBackgroundWork(false /* wait */); wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, kIngestWaitMicros); + if (!ingest_done.load(std::memory_order_acquire)) { + dbfull()->CancelAllBackgroundWork(true /* wait */); + wait_until([&] { return ingest_done.load(std::memory_order_acquire); }, + kIngestWaitMicros); + } } + ingest_thread.join(); + ASSERT_TRUE(sleeping_task_done); + ASSERT_OK(get_live_files_status[0]); + ASSERT_OK(get_live_files_status[1]); + ASSERT_OK(unblock_flush_status); + FAIL() << "IngestExternalFile remained blocked waiting for a flush that " + "was deduped behind an older queued flush request"; } ingest_thread.join(); ASSERT_TRUE(sleeping_task_done); ASSERT_OK(get_live_files_status[0]); ASSERT_OK(get_live_files_status[1]); - if (!ingest_completed_without_unblock) { - ASSERT_OK(unblock_flush_status); - FAIL() << "IngestExternalFile remained blocked waiting for a flush that " - "was deduped behind an older queued flush request"; - } ASSERT_OK(ingest_status); ASSERT_EQ("ingested_value", Get("ingest_key")); }