diff --git a/docs/metrics.md b/docs/metrics.md index 37c0dadd1d..b410b7f06b 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -47,6 +47,7 @@ bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare bucketlistDB-live.bulk.prefetch | timer | time to prefetch bucketlistDB-live.bulk.eviction | timer | time to load for eviction scan bucketlistDB-live.bulk.query | timer | time to load for query server +bucketlistDB-hotArchive.bulk.hot-query | timer | time to load hot archive entries for query server bucketlistDB-..sum | counter | sum of time (microseconds) to load single entry of type on BucketList (live/hotArchive) bucketlistDB-..count | counter | number of times single entry of type on BucketList (live/hotArchive) is loaded bucketlistDB-..max | counter | max (since last metrics call) of time (microseconds) to load single entry of type on BucketList (live/hotArchive) diff --git a/src/bucket/BucketListSnapshot.cpp b/src/bucket/BucketListSnapshot.cpp index 99a44a8dfc..17f8a3104f 100644 --- a/src/bucket/BucketListSnapshot.cpp +++ b/src/bucket/BucketListSnapshot.cpp @@ -60,13 +60,8 @@ BucketListSnapshotData::BucketListSnapshotData( template SearchableBucketListSnapshot::SearchableBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq) + std::shared_ptr const> data) : mData(std::move(data)) - , mHistoricalSnapshots(std::move(historicalSnapshots)) - , mLedgerSeq(ledgerSeq) , mMetrics(metrics) , mBulkLoadMeter( metrics.NewMeter({BucketT::METRIC_STRING, "query", "loads"}, "query")) @@ -85,8 +80,6 @@ template SearchableBucketListSnapshot::SearchableBucketListSnapshot( SearchableBucketListSnapshot const& other) : mData(other.mData) - , mHistoricalSnapshots(other.mHistoricalSnapshots) - , mLedgerSeq(other.mLedgerSeq) // mStreams intentionally left empty — each copy gets its own stream cache , mMetrics(other.mMetrics) , mPointTimers(other.mPointTimers) @@ -103,8 +96,6 @@ SearchableBucketListSnapshot::operator=( if (this != &other) { mData = other.mData; - mHistoricalSnapshots = other.mHistoricalSnapshots; - mLedgerSeq = other.mLedgerSeq; mStreams.clear(); mMetrics = other.mMetrics; mPointTimers = other.mPointTimers; @@ -345,51 +336,6 @@ SearchableBucketListSnapshot::load(LedgerKey const& k) const return result; } -template -std::optional> -SearchableBucketListSnapshot::loadKeysInternal( - std::set const& inKeys, - std::optional ledgerSeq) const -{ - ZoneScoped; - releaseAssert(mData); - - // Make a copy of the key set, this loop is destructive - auto keys = inKeys; - std::vector entries; - - auto loadKeysLoop = [&](std::shared_ptr const& bucket) { - loadKeysFromBucket(bucket, keys, entries); - return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE; - }; - - if (!ledgerSeq || *ledgerSeq == mLedgerSeq) - { - loopAllBuckets(loadKeysLoop, *mData); - } - else - { - auto iter = mHistoricalSnapshots.find(*ledgerSeq); - if (iter == mHistoricalSnapshots.end()) - { - return std::nullopt; - } - releaseAssert(iter->second); - loopAllBuckets(loadKeysLoop, *iter->second); - } - - return entries; -} - -template -std::optional> -SearchableBucketListSnapshot::loadKeysFromLedger( - std::set const& inKeys, - uint32_t ledgerSeq) const -{ - return loadKeysInternal(inKeys, ledgerSeq); -} - template medida::Timer& SearchableBucketListSnapshot::getBulkLoadTimer( @@ -418,39 +364,35 @@ SearchableBucketListSnapshot::getSnapshotData() const return mData; } -template -std::map const>> const& -SearchableBucketListSnapshot::getHistoricalSnapshots() const -{ - return mHistoricalSnapshots; -} - // // SearchableLiveBucketListSnapshot // SearchableLiveBucketListSnapshot::SearchableLiveBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq) - : SearchableBucketListSnapshot( - metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq) + std::shared_ptr const> data) + : SearchableBucketListSnapshot(metrics, std::move(data)) { } -std::vector -SearchableLiveBucketListSnapshot::loadKeys( +template +std::vector +SearchableBucketListSnapshot::loadKeys( std::set const& inKeys, std::string const& label) const { + ZoneScoped; + releaseAssert(mData); auto timer = getBulkLoadTimer(label, inKeys.size()).TimeScope(); - auto op = loadKeysInternal(inKeys, std::nullopt); - releaseAssertOrThrow(op); - return std::move(*op); + + auto keys = inKeys; + std::vector entries; + auto loadKeysLoop = [&](std::shared_ptr const& bucket) { + loadKeysFromBucket(bucket, keys, entries); + return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE; + }; + loopAllBuckets(loadKeysLoop, *mData); + return entries; } // This query has two steps: @@ -864,23 +806,9 @@ SearchableLiveBucketListSnapshot::scanForEvictionInBucket( SearchableHotArchiveBucketListSnapshot::SearchableHotArchiveBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq) - : SearchableBucketListSnapshot( - metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq) -{ -} - -std::vector -SearchableHotArchiveBucketListSnapshot::loadKeys( - std::set const& inKeys) const + std::shared_ptr const> data) + : SearchableBucketListSnapshot(metrics, std::move(data)) { - auto op = loadKeysInternal(inKeys, std::nullopt); - releaseAssertOrThrow(op); - return std::move(*op); } void diff --git a/src/bucket/BucketListSnapshot.h b/src/bucket/BucketListSnapshot.h index 0670c622dd..558cf012ca 100644 --- a/src/bucket/BucketListSnapshot.h +++ b/src/bucket/BucketListSnapshot.h @@ -16,9 +16,7 @@ #include #include -#include #include -#include #include #include #include @@ -81,13 +79,6 @@ template class SearchableBucketListSnapshot protected: // Shared immutable snapshot data std::shared_ptr const> mData; - std::map const>> - mHistoricalSnapshots; - - // Ledger sequence number for this snapshot, used internally to route - // queries between current and historical data. Not exposed publicly; - // callers should get ledger metadata from ImmutableLedgerData. - uint32_t mLedgerSeq; // Per-snapshot mutable stream cache mutable UnorderedMap> @@ -133,17 +124,11 @@ template class SearchableBucketListSnapshot std::set& keys, std::vector& result) const; - std::optional> - loadKeysInternal(std::set const& inKeys, - std::optional ledgerSeq) const; - medida::Timer& getBulkLoadTimer(std::string const& label, size_t numEntries) const; // Iterate over all buckets in a snapshot in order, calling f on each // non-empty bucket. Exits early if function returns Loop::COMPLETE. - // The first overload operates on an explicit snapshot (used for historical - // queries). template void loopAllBuckets(Func&& f, BucketListSnapshotData const& snapshot) const; @@ -151,11 +136,7 @@ template class SearchableBucketListSnapshot SearchableBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq); + std::shared_ptr const> data); public: // Copy: copies all state except mStreams, which is reset to empty. @@ -171,23 +152,16 @@ template class SearchableBucketListSnapshot std::shared_ptr load(LedgerKey const& k) const; - // Loads inKeys from the specified historical snapshot. Returns - // load_result_vec if the snapshot for the given ledger is - // available, std::nullopt otherwise. Note that ledgerSeq is defined - // as the state of the BucketList at the beginning of the ledger. This means - // that for ledger N, the maximum lastModifiedLedgerSeq of any LedgerEntry - // in the BucketList is N - 1. - std::optional> - loadKeysFromLedger(std::set const& inKeys, - uint32_t ledgerSeq) const; + // Bulk load. Walks all buckets, accumulating found entries until all + // keys have been resolved. label identifies the call site for the bulk + // load timer metric. + std::vector + loadKeys(std::set const& inKeys, + std::string const& label) const; // Access to underlying data (for copying/refreshing) std::shared_ptr const> const& getSnapshotData() const; - - std::map const>> const& - getHistoricalSnapshots() const; }; // Live bucket list snapshot with additional query methods @@ -196,11 +170,7 @@ class SearchableLiveBucketListSnapshot { SearchableLiveBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq); + std::shared_ptr const> data); Loop scanForEvictionInBucket( std::shared_ptr const& bucket, EvictionIterator& iter, @@ -212,10 +182,6 @@ class SearchableLiveBucketListSnapshot SearchableLiveBucketListSnapshot(SearchableLiveBucketListSnapshot const&) = default; - std::vector - loadKeys(std::set const& inKeys, - std::string const& label) const; - std::vector loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, Asset const& asset) const; @@ -244,19 +210,12 @@ class SearchableHotArchiveBucketListSnapshot { SearchableHotArchiveBucketListSnapshot( MetricsRegistry& metrics, - std::shared_ptr const> data, - std::map const>> - historicalSnapshots, - uint32_t ledgerSeq); + std::shared_ptr const> data); public: SearchableHotArchiveBucketListSnapshot( SearchableHotArchiveBucketListSnapshot const&) = default; - std::vector - loadKeys(std::set const& inKeys) const; - // Iterate over all entries in all buckets. Note this iterates over all // HotArchiveBucketEntry, so some may be shadowed and outdated. void scanAllEntries( diff --git a/src/bucket/test/BucketIndexTests.cpp b/src/bucket/test/BucketIndexTests.cpp index 59f1381b51..1714f54976 100644 --- a/src/bucket/test/BucketIndexTests.cpp +++ b/src/bucket/test/BucketIndexTests.cpp @@ -193,58 +193,6 @@ class BucketIndexTest buildBucketList(f, isCacheTest); } - void - runHistoricalSnapshotTest() - { - uint32_t ledger = 0; - - // Exclude soroban types so we don't have to insert TTLs - auto canonicalEntry = - LedgerTestUtils::generateValidLedgerEntryWithExclusions( - {CONFIG_SETTING, TTL, CONTRACT_CODE, CONTRACT_DATA}); - canonicalEntry.lastModifiedLedgerSeq = 0; - - do - { - ++ledger; - auto entryCopy = canonicalEntry; - entryCopy.lastModifiedLedgerSeq = ledger; - mApp->getLedgerManager().setNextLedgerEntryBatchForBucketTesting( - {}, {entryCopy}, {}); - closeLedger(*mApp); - } while (ledger < mApp->getConfig().QUERY_SNAPSHOT_LEDGERS + 2); - ++ledger; - - auto ledgerView = getApp().getLedgerManager().copyImmutableLedgerView(); - auto lk = LedgerEntryKey(canonicalEntry); - - auto currentLoadedEntry = ledgerView.loadLiveEntry(lk); - REQUIRE(currentLoadedEntry); - - // Note: The definition of "historical snapshot" ledger is that the - // BucketList snapshot for ledger N is the BucketList as it exists at - // the beginning of ledger N. This means that the lastModifiedLedgerSeq - // is at most N - 1. - REQUIRE(currentLoadedEntry->lastModifiedLedgerSeq == ledger - 1); - - for (uint32_t currLedger = ledger; currLedger > 0; --currLedger) - { - auto loadRes = ledgerView.loadLiveKeysFromLedger({lk}, currLedger); - - // If we query an older snapshot, should return - if (currLedger < ledger - mApp->getConfig().QUERY_SNAPSHOT_LEDGERS) - { - REQUIRE(!loadRes); - } - else - { - REQUIRE(loadRes); - REQUIRE(loadRes->size() == 1); - REQUIRE(loadRes->at(0).lastModifiedLedgerSeq == currLedger - 1); - } - } - } - virtual void buildMultiVersionTest(bool sorobanOnly = false) { @@ -1166,17 +1114,6 @@ TEST_CASE("soroban cache population", "[soroban][bucketindex]") testAllIndexTypes(f); } -TEST_CASE("load from historical snapshots", "[bucket][bucketindex]") -{ - auto f = [&](Config& cfg) { - cfg.QUERY_SNAPSHOT_LEDGERS = 5; - auto test = BucketIndexTest(cfg); - test.runHistoricalSnapshotTest(); - }; - - testAllIndexTypes(f); -} - TEST_CASE("loadPoolShareTrustLinesByAccountAndAsset", "[bucket][bucketindex]") { auto f = [&](Config& cfg) { @@ -1347,7 +1284,8 @@ TEST_CASE("hot archive bucket lookups", "[bucket][bucketindex][archive]") bulkLoadKeys.emplace(k); } - auto bulkLoadResult = ledgerView.loadArchiveKeys(bulkLoadKeys); + auto bulkLoadResult = + ledgerView.loadArchiveKeys(bulkLoadKeys, "test"); for (auto entry : bulkLoadResult) { REQUIRE(entry.type() == HOT_ARCHIVE_ARCHIVED); @@ -1417,7 +1355,7 @@ TEST_CASE("hot archive bucket lookups", "[bucket][bucketindex][archive]") // Bulk load auto bulkLoadResult = - ledgerView.loadArchiveKeys({liveShadow1, liveShadow2}); + ledgerView.loadArchiveKeys({liveShadow1, liveShadow2}, "test"); REQUIRE(bulkLoadResult.size() == 0); // Shadow via archivedEntries @@ -1437,8 +1375,8 @@ TEST_CASE("hot archive bucket lookups", "[bucket][bucketindex][archive]") REQUIRE(entryPtr->archivedEntry() == archivedShadow); // Bulk load - auto bulkLoadResult2 = - ledgerView.loadArchiveKeys({LedgerEntryKey(archivedShadow)}); + auto bulkLoadResult2 = ledgerView.loadArchiveKeys( + {LedgerEntryKey(archivedShadow)}, "test"); REQUIRE(bulkLoadResult2.size() == 1); REQUIRE(bulkLoadResult2[0].type() == HOT_ARCHIVE_ARCHIVED); REQUIRE(bulkLoadResult2[0].archivedEntry() == archivedShadow); diff --git a/src/bucket/test/BucketTestUtils.cpp b/src/bucket/test/BucketTestUtils.cpp index 9e1d83dd2b..1e8da564ea 100644 --- a/src/bucket/test/BucketTestUtils.cpp +++ b/src/bucket/test/BucketTestUtils.cpp @@ -320,7 +320,7 @@ LedgerManagerForBucketTests::finalizeLedgerTxnChanges( auto& bm = mApp.getBucketManager(); auto tempState = ImmutableLedgerData::createAndMaybeLoadConfig( bm.getLiveBucketList(), bm.getHotArchiveBucketList(), tempLcl, - tempHas, mApp.getMetrics(), nullptr, 0); + tempHas, mApp.getMetrics()); finalSorobanConfig = tempState->getSorobanConfig(); } diff --git a/src/invariant/ArchivedStateConsistency.cpp b/src/invariant/ArchivedStateConsistency.cpp index ee150a1573..5ef39e3650 100644 --- a/src/invariant/ArchivedStateConsistency.cpp +++ b/src/invariant/ArchivedStateConsistency.cpp @@ -99,7 +99,8 @@ ArchivedStateConsistency::checkOnLedgerCommit( preloadedLiveEntries[LedgerEntryKey(entry)] = entry; } - auto preloadedArchivedVector = lclApplyView.loadArchiveKeys(allKeys); + auto preloadedArchivedVector = + lclApplyView.loadArchiveKeys(allKeys, "ArchivedStateConsistency"); UnorderedMap preloadedArchivedEntries; for (auto const& entry : preloadedArchivedVector) { diff --git a/src/ledger/ImmutableLedgerView.cpp b/src/ledger/ImmutableLedgerView.cpp index c4a7a3d8ea..3c9969383b 100644 --- a/src/ledger/ImmutableLedgerView.cpp +++ b/src/ledger/ImmutableLedgerView.cpp @@ -222,62 +222,15 @@ ImmutableLedgerData::checkInvariant() const releaseAssert(mHotArchiveBucketData); } -namespace -{ -// Build the next historical snapshot map by copying the previous map, -// evicting the oldest entry if at capacity, and inserting the previous -// state's current snapshot keyed by its ledger sequence number. -template -auto -rotateHistorical( - std::shared_ptr const> const& prevData, - std::map const>> const& - prevHistorical, - uint32_t prevLedgerSeq, uint32_t numHistorical) -{ - std::map const>> - result; - if (numHistorical == 0 || !prevData) - { - return result; - } - result = prevHistorical; - if (result.size() == numHistorical) - { - result.erase(result.begin()); - } - result.emplace(prevLedgerSeq, prevData); - return result; -} -} // anonymous namespace - ImmutableLedgerData::ImmutableLedgerData( LiveBucketList const& liveBL, HotArchiveBucketList const& hotArchiveBL, LedgerHeaderHistoryEntry const& lcl, HistoryArchiveState const& has, - std::optional sorobanConfig, - ImmutableLedgerDataPtr prevState, uint32_t numHistorical) + std::optional sorobanConfig) : mLiveBucketData( std::make_shared>(liveBL)) - , mLiveHistoricalSnapshots( - prevState ? rotateHistorical( - prevState->mLiveBucketData, - prevState->mLiveHistoricalSnapshots, - prevState->mLastClosedLedgerHeader.header.ledgerSeq, - numHistorical) - : std::map const>>{}) , mHotArchiveBucketData( std::make_shared>( hotArchiveBL)) - , mHotArchiveHistoricalSnapshots( - prevState ? rotateHistorical( - prevState->mHotArchiveBucketData, - prevState->mHotArchiveHistoricalSnapshots, - prevState->mLastClosedLedgerHeader.header.ledgerSeq, - numHistorical) - : std::map const>>{}) , mSorobanConfig(std::move(sorobanConfig)) , mLastClosedLedgerHeader(lcl) , mLastClosedHistoryArchiveState(has) @@ -313,35 +266,28 @@ ImmutableLedgerDataPtr ImmutableLedgerData::createAndMaybeLoadConfig( LiveBucketList const& liveBL, HotArchiveBucketList const& hotArchiveBL, LedgerHeaderHistoryEntry const& lcl, HistoryArchiveState const& has, - MetricsRegistry& metrics, ImmutableLedgerDataPtr prevState, - uint32_t numHistoricalSnapshots) + MetricsRegistry& metrics) { std::optional sorobanConfig; if (protocolVersionStartsFrom(lcl.header.ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { - // Bootstrap: build a lightweight temporary state (no historical - // snapshots) just to load config from the current live bucket list. + // Bootstrap: build a lightweight temporary state just to load config + // from the current live bucket list. auto tempState = std::make_shared( - liveBL, hotArchiveBL, lcl, has, /*sorobanConfig*/ std::nullopt, - /*prevState*/ nullptr, /*numHistoricalSnapshots*/ 0); + liveBL, hotArchiveBL, lcl, has, /*sorobanConfig*/ std::nullopt); ImmutableLedgerView tempView(tempState, metrics); sorobanConfig = SorobanNetworkConfig::loadFromLedger(tempView); } - return std::make_shared( - liveBL, hotArchiveBL, lcl, has, std::move(sorobanConfig), - std::move(prevState), numHistoricalSnapshots); + return std::make_shared(liveBL, hotArchiveBL, lcl, has, + std::move(sorobanConfig)); } ImmutableLedgerView::ImmutableLedgerView(ImmutableLedgerDataPtr state, MetricsRegistry& metrics) : mState(state) - , mLiveSnapshot(metrics, state->mLiveBucketData, - state->mLiveHistoricalSnapshots, - state->mLastClosedLedgerHeader.header.ledgerSeq) - , mHotArchiveSnapshot(metrics, state->mHotArchiveBucketData, - state->mHotArchiveHistoricalSnapshots, - state->mLastClosedLedgerHeader.header.ledgerSeq) + , mLiveSnapshot(metrics, state->mLiveBucketData) + , mHotArchiveSnapshot(metrics, state->mHotArchiveBucketData) , mMetrics(metrics) { } @@ -419,14 +365,6 @@ ImmutableLedgerView::loadLiveKeys( return mLiveSnapshot.loadKeys(inKeys, label); } -std::optional> -ImmutableLedgerView::loadLiveKeysFromLedger( - std::set const& inKeys, - uint32_t ledgerSeq) const -{ - return mLiveSnapshot.loadKeysFromLedger(inKeys, ledgerSeq); -} - std::vector ImmutableLedgerView::loadPoolShareTrustLinesByAccountAndAsset( AccountID const& accountID, Asset const& asset) const @@ -472,17 +410,10 @@ ImmutableLedgerView::loadArchiveEntry(LedgerKey const& k) const std::vector ImmutableLedgerView::loadArchiveKeys( - std::set const& inKeys) const -{ - return mHotArchiveSnapshot.loadKeys(inKeys); -} - -std::optional> -ImmutableLedgerView::loadArchiveKeysFromLedger( std::set const& inKeys, - uint32_t ledgerSeq) const + std::string const& label) const { - return mHotArchiveSnapshot.loadKeysFromLedger(inKeys, ledgerSeq); + return mHotArchiveSnapshot.loadKeys(inKeys, label); } void diff --git a/src/ledger/ImmutableLedgerView.h b/src/ledger/ImmutableLedgerView.h index 98ccf4dfe1..6e42144609 100644 --- a/src/ledger/ImmutableLedgerView.h +++ b/src/ledger/ImmutableLedgerView.h @@ -157,9 +157,6 @@ class ImmutableLedgerView : public virtual AbstractLedgerView std::vector loadLiveKeys(std::set const& inKeys, std::string const& label) const; - std::optional> - loadLiveKeysFromLedger(std::set const& inKeys, - uint32_t ledgerSeq) const; std::vector loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID, Asset const& asset) const; @@ -177,10 +174,8 @@ class ImmutableLedgerView : public virtual AbstractLedgerView std::shared_ptr loadArchiveEntry(LedgerKey const& k) const; std::vector - loadArchiveKeys(std::set const& inKeys) const; - std::optional> loadArchiveKeysFromLedger( - std::set const& inKeys, - uint32_t ledgerSeq) const; + loadArchiveKeys(std::set const& inKeys, + std::string const& label) const; void scanAllArchiveEntries( std::function callback) const; }; @@ -204,11 +199,9 @@ class ApplyLedgerView : private ImmutableLedgerView, using ImmutableLedgerView::load; using ImmutableLedgerView::loadArchiveEntry; using ImmutableLedgerView::loadArchiveKeys; - using ImmutableLedgerView::loadArchiveKeysFromLedger; using ImmutableLedgerView::loadInflationWinners; using ImmutableLedgerView::loadLiveEntry; using ImmutableLedgerView::loadLiveKeys; - using ImmutableLedgerView::loadLiveKeysFromLedger; using ImmutableLedgerView::loadPoolShareTrustLinesByAccountAndAsset; using ImmutableLedgerView::scanAllArchiveEntries; using ImmutableLedgerView::scanForEviction; @@ -283,15 +276,8 @@ class ImmutableLedgerData : public NonMovableOrCopyable // Raw immutable bucket data for the live and hot archive bucket lists std::shared_ptr const> const mLiveBucketData; - std::map const>> const - mLiveHistoricalSnapshots; std::shared_ptr const> const mHotArchiveBucketData; - std::map< - uint32_t, - std::shared_ptr const>> const - mHotArchiveHistoricalSnapshots; std::optional const mSorobanConfig; LedgerHeaderHistoryEntry const mLastClosedLedgerHeader; @@ -302,25 +288,21 @@ class ImmutableLedgerData : public NonMovableOrCopyable friend class ImmutableLedgerView; public: - // Construct a new ledger state, rotating historical snapshots from - // prevState. If prevState is null, history maps will be empty. + // Construct a new immutable ledger state snapshot. // sorobanConfig is nullopt for pre-Soroban protocol versions, or when // building the empty initial state at startup. ImmutableLedgerData(LiveBucketList const& liveBL, HotArchiveBucketList const& hotArchiveBL, LedgerHeaderHistoryEntry const& lcl, HistoryArchiveState const& has, - std::optional sorobanConfig, - ImmutableLedgerDataPtr prevState, - uint32_t numHistoricalSnapshots); + std::optional sorobanConfig); // Factory: constructs a ImmutableLedgerData, auto-loading the // SorobanNetworkConfig from the bucket list when the protocol requires it. static ImmutableLedgerDataPtr createAndMaybeLoadConfig( LiveBucketList const& liveBL, HotArchiveBucketList const& hotArchiveBL, LedgerHeaderHistoryEntry const& lcl, HistoryArchiveState const& has, - MetricsRegistry& metrics, ImmutableLedgerDataPtr prevState, - uint32_t numHistoricalSnapshots); + MetricsRegistry& metrics); SorobanNetworkConfig const& getSorobanConfig() const; bool hasSorobanConfig() const; diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 53bbea4e27..963ef40edd 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -30,6 +30,7 @@ #include "ledger/P23HotArchiveBug.h" #include "ledger/SharedModuleCacheCompiler.h" #include "main/Application.h" +#include "main/CommandHandler.h" #include "main/Config.h" #include "main/ErrorMessages.h" #include "rust/RustBridge.h" @@ -344,7 +345,6 @@ LedgerManagerImpl::ApplyState::manuallyAdvanceLedgerHeader( LedgerManagerImpl::LedgerManagerImpl(Application& app) : mApp(app) , mApplyState(app) - , mNumHistoricalSnapshots(app.getConfig().QUERY_SNAPSHOT_LEDGERS) , mLastClose(mApp.getClock().now()) , mCatchupDuration( app.getMetrics().NewTimer({"ledger", "catchup", "duration"})) @@ -360,8 +360,7 @@ LedgerManagerImpl::LedgerManagerImpl(Application& app) auto initialState = std::make_shared( bm.getLiveBucketList(), bm.getHotArchiveBucketList(), emptyLcl, - emptyHas, /*sorobanConfig*/ std::nullopt, /*prevState*/ nullptr, - mNumHistoricalSnapshots); + emptyHas, /*sorobanConfig*/ std::nullopt); mApplyState.setLedgerState(initialState); { @@ -2101,24 +2100,30 @@ LedgerManagerImpl::advanceLastClosedLedgerState( releaseAssert(threadIsMain()); releaseAssert(newLedgerState); - JITTER_INJECT_DELAY(); - SharedLockExclusive lock(mLastClosedLedgerStateMutex); - JITTER_INJECT_DELAY(); - if (mLastClosedLedgerState) { - CLOG_DEBUG( - Ledger, "Advancing LCL: {} -> {}", - ledgerAbbrev( - mLastClosedLedgerState->getLastClosedLedgerHeader().header), - ledgerAbbrev(newLedgerState->getLastClosedLedgerHeader().header)); + JITTER_INJECT_DELAY(); + SharedLockExclusive lock(mLastClosedLedgerStateMutex); + JITTER_INJECT_DELAY(); + if (mLastClosedLedgerState) + { + CLOG_DEBUG( + Ledger, "Advancing LCL: {} -> {}", + ledgerAbbrev( + mLastClosedLedgerState->getLastClosedLedgerHeader().header), + ledgerAbbrev( + newLedgerState->getLastClosedLedgerHeader().header)); + } + mLastClosedLedgerState = newLedgerState; } - mLastClosedLedgerState = newLedgerState; + + // Push new state to QueryServer (after releasing + // mLastClosedLedgerStateMutex to avoid nested lock acquisition). + mApp.getCommandHandler().addSnapshot(newLedgerState); } ImmutableLedgerDataPtr LedgerManagerImpl::buildLedgerState( LedgerHeader const& header, HistoryArchiveState const& has, - ImmutableLedgerDataPtr prevState, std::optional sorobanConfig) { mApplyState.threadInvariant(); @@ -2133,14 +2138,13 @@ LedgerManagerImpl::buildLedgerState( // Caller already loaded config (e.g. from LTX during ledger close) return std::make_shared( bm.getLiveBucketList(), bm.getHotArchiveBucketList(), lcl, has, - std::move(sorobanConfig), std::move(prevState), - mNumHistoricalSnapshots); + std::move(sorobanConfig)); } // Auto-load SorobanNetworkConfig from the BucketList return ImmutableLedgerData::createAndMaybeLoadConfig( bm.getLiveBucketList(), bm.getHotArchiveBucketList(), lcl, has, - mApp.getMetrics(), std::move(prevState), mNumHistoricalSnapshots); + mApp.getMetrics()); } ImmutableLedgerDataPtr @@ -2148,8 +2152,7 @@ LedgerManagerImpl::advanceApplySnapshotAndMakeLedgerState( LedgerHeader const& header, HistoryArchiveState const& has, std::optional sorobanConfig) { - auto state = buildLedgerState(header, has, mApplyState.getLedgerState(), - std::move(sorobanConfig)); + auto state = buildLedgerState(header, has, std::move(sorobanConfig)); mApplyState.setLedgerState(state); return state; } @@ -2223,16 +2226,20 @@ LedgerManagerImpl::updateCanonicalStateForTesting(LedgerHeader const& header) HistoryArchiveState has; has.currentLedger = header.ledgerSeq; - JITTER_INJECT_DELAY(); - SharedLockExclusive lock(mLastClosedLedgerStateMutex); - JITTER_INJECT_DELAY(); + ImmutableLedgerDataPtr state; + { + JITTER_INJECT_DELAY(); + SharedLockExclusive lock(mLastClosedLedgerStateMutex); + JITTER_INJECT_DELAY(); - auto state = - buildLedgerState(header, has, mLastClosedLedgerState, std::nullopt); + state = buildLedgerState(header, has, /*sorobanConfig=*/std::nullopt); - mApplyState.setLedgerStateForTesting(state); + mApplyState.setLedgerStateForTesting(state); + + mLastClosedLedgerState = state; + } - mLastClosedLedgerState = state; + mApp.getCommandHandler().addSnapshot(state); } #endif } diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index 47cd8b9d3a..454e831ac2 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -311,9 +311,6 @@ class LedgerManagerImpl : public LedgerManager ImmutableLedgerDataPtr mLastClosedLedgerState GUARDED_BY(mLastClosedLedgerStateMutex); - // Max number of historical snapshots to maintain. - uint32_t const mNumHistoricalSnapshots; - VirtualClock::time_point mLastClose; // Use mutex to guard ledger state during apply @@ -462,13 +459,11 @@ class LedgerManagerImpl : public LedgerManager std::unique_ptr const& ledgerCloseMeta, LedgerHeader lh, uint32_t initialLedgerVers); - // Build a new ImmutableLedgerData from the current BucketLists, - // copying then updating historical snapshots from prevState. If + // Build a new ImmutableLedgerData from the current BucketLists. If // sorobanConfig is not provided, it is loaded from a temporary bucket // snapshot when the protocol requires it. ImmutableLedgerDataPtr buildLedgerState(LedgerHeader const& header, HistoryArchiveState const& has, - ImmutableLedgerDataPtr prevState, std::optional sorobanConfig); // Build a new ledger state and advance ApplyState snapshot to it. This does diff --git a/src/ledger/test/ImmutableLedgerViewTests.cpp b/src/ledger/test/ImmutableLedgerViewTests.cpp index 027634137f..31626db22f 100644 --- a/src/ledger/test/ImmutableLedgerViewTests.cpp +++ b/src/ledger/test/ImmutableLedgerViewTests.cpp @@ -14,6 +14,8 @@ #include "ledger/ImmutableLedgerView.h" #include "ledger/test/LedgerTestUtils.h" #include "main/Application.h" +#include "main/CommandHandler.h" +#include "main/QueryServer.h" #include "test/TestUtils.h" #include "test/test.h" #include "util/Logging.h" @@ -75,9 +77,11 @@ makeHeader(uint32_t seq, uint32_t protocolVersion) // --------------------------------------------------------------------------- struct PregenData { - // Each element is the set of entries to write to the live BucketList + // Each element is the set of new entries to write to the live BucketList // for a given ledger, in order. Index 0 = first ledger closed. std::vector> liveEntriesToWrite; + // Updates to existing live entries for each ledger. + std::vector> liveUpdatesToWrite; // Same for the hot archive BucketList. std::vector> archiveEntriesToWrite; @@ -107,6 +111,7 @@ pregenEntries(uint32_t startSeq, int numLedgers, int entriesPerLedger) auto seq = startSeq + 1 + i; // --- Live entries --- + // Generate new unique entries for this ledger. auto entries = LedgerTestUtils::generateValidUniqueLedgerEntriesWithExclusions( SOROBAN_TYPES, entriesPerLedger, seenKeys); @@ -115,8 +120,31 @@ pregenEntries(uint32_t startSeq, int numLedgers, int entriesPerLedger) e.lastModifiedLedgerSeq = seq; runningLiveState[LedgerEntryKey(e)] = e; } + + // Modify some existing entries so that adjacent ledgers have + // distinguishable data for the same keys. This ensures that loading + // from the wrong snapshot is detected by the data comparison. + std::vector updates; + if (i > 0) + { + int updated = 0; + for (auto& [key, entry] : runningLiveState) + { + if (entry.lastModifiedLedgerSeq < seq) + { + entry.lastModifiedLedgerSeq = seq; + updates.push_back(entry); + if (++updated >= entriesPerLedger / 2) + { + break; + } + } + } + } + data.stateAtLedger[seq] = runningLiveState; data.liveEntriesToWrite.push_back(std::move(entries)); + data.liveUpdatesToWrite.push_back(std::move(updates)); auto archiveEntries = LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( @@ -295,7 +323,7 @@ class SnapshotStressTest { public: SnapshotStressTest(int numThreads, unsigned seed, Application& app, - PregenData const& pregen); + PregenData const& pregen, QueryServer& queryServer); ~SnapshotStressTest() = default; void run(); @@ -322,6 +350,7 @@ class SnapshotStressTest int const mNumThreads; unsigned const mSeed; Application& mApp; + QueryServer& mQueryServer; uint32_t const mProtocolVersion; uint32_t const mNumHistorical; PregenData const& mPregen; @@ -329,6 +358,7 @@ class SnapshotStressTest // --- Shared state --- std::atomic mDone{false}; std::atomic mError{false}; + std::atomic mHistoricalVerifications{0}; std::vector> mThreads; bool @@ -369,10 +399,12 @@ class SnapshotStressTest SnapshotStressTest::SnapshotStressTest(int numThreads, unsigned seed, Application& app, - PregenData const& pregen) + PregenData const& pregen, + QueryServer& queryServer) : mNumThreads(numThreads) , mSeed(seed) , mApp(app) + , mQueryServer(queryServer) , mProtocolVersion(getAppLedgerVersion(app)) , mNumHistorical(app.getConfig().QUERY_SNAPSHOT_LEDGERS) , mPregen(pregen) @@ -391,21 +423,39 @@ SnapshotStressTest::SnapshotStressTest(int numThreads, unsigned seed, void SnapshotStressTest::run() { + std::atomic numRegistered{0}; + ThreadGroup tg; for (int t = 0; t < mNumThreads; ++t) { - tg.launch(1, [this, t]() { workerLoop(t); }); + tg.launch(1, [this, t, &numRegistered]() { + mQueryServer.registerThread(); + ++numRegistered; + + // Wait until all threads are registered before proceeding. + while (numRegistered.load(std::memory_order_acquire) < mNumThreads) + { + std::this_thread::yield(); + } + workerLoop(t); + }); } tg.start(); closeLedgers(); // Give workers a brief window to exercise the final state. - std::this_thread::sleep_for(std::chrono::milliseconds{10}); + std::this_thread::sleep_for(std::chrono::milliseconds{100}); mDone.store(true, std::memory_order_release); tg.join(); REQUIRE(!mError.load()); + // Ensure historical queries were actually exercised and verified + if (mNumHistorical > 0) + { + REQUIRE(mHistoricalVerifications.load() > 0); + } + // Liveness check: after all ledgers are closed, a fresh snapshot must // reflect the final ledger sequence. auto finalLedgerView = mApp.getLedgerManager().copyImmutableLedgerView(); @@ -631,7 +681,7 @@ SnapshotStressTest::readHistoricalQuery(SnapshotThread& sthread, bool archive, auto const& histState = histStateIt->second; // Build query: positive keys (exist at histSeq) + negative keys - // (exist at currentSeq but not at histSeq). We only need to track + // (exist at a later ledger but not at histSeq). We only need to track // negativeKeys separately; any queried key not in negativeKeys is // positive. std::set queryKeys; @@ -642,15 +692,17 @@ SnapshotStressTest::readHistoricalQuery(SnapshotThread& sthread, bool archive, } std::set negativeKeys; - if (histSeq < currentSeq) + + // Add negative keys from nearby ledgers (histSeq+1, histSeq+2, etc.) + // to catch off-by-one bugs in snapshot selection. + for (uint32_t futureSeq = histSeq + 1; + futureSeq <= std::min(histSeq + 3, currentSeq); ++futureSeq) { - auto curStateIt = stateMap.find(currentSeq); - if (curStateIt != stateMap.end()) + auto futureStateIt = stateMap.find(futureSeq); + if (futureStateIt != stateMap.end()) { - auto const& curState = curStateIt->second; - for (int c = 0; c < 5; c++) + for (auto const& [key, _] : futureStateIt->second) { - auto const& [key, _] = randMapEntry(curState, rng); if (histState.find(key) == histState.end()) { queryKeys.insert(key); @@ -660,33 +712,40 @@ SnapshotStressTest::readHistoricalQuery(SnapshotThread& sthread, bool archive, } } - // Call the appropriate bulk historical load and extract LedgerEntries - // from the result into a uniform map for verification. - bool retained = shouldHistoricalExist(currentSeq, histSeq); + // Look up the historical snapshot from the QueryServer. Use the QS's + // latest seq to determine the expected window: there is a brief window + // where the LedgerManager has advanced to seq N but addSnapshot(N) hasn't + // been called yet, so the worker's currentSeq may be ahead of the QS. + auto* latestSnapshot = + mQueryServer.getSnapshotForLedgerForTesting(std::nullopt); + releaseAssert(latestSnapshot); + auto qsCurrentSeq = latestSnapshot->getLedgerSeq(); + + bool retained = shouldHistoricalExist(qsCurrentSeq, histSeq); + auto* histSnapshot = mQueryServer.getSnapshotForLedgerForTesting(histSeq); + + // We use lazy GC for the per-thread cache, so it's possible we retain + // something outside the window. + if (!retained && !histSnapshot) + { + return; + } + if (!histSnapshot) + { + fail(fmt::format("{} unexpected nullptr histSeq={} " + "currentSeq={} seed={}", + opName, histSeq, currentSeq, mSeed)); + return; + } + + // Load from the historical snapshot and extract LedgerEntries + // into a uniform map for verification. UnorderedMap resultMap; if (archive) { - auto result = - sthread.ledgerView().loadArchiveKeysFromLedger(queryKeys, histSeq); - if (!retained) - { - if (result.has_value()) - { - fail(fmt::format("{} expected nullopt histSeq={} " - "currentSeq={} seed={}", - opName, histSeq, currentSeq, mSeed)); - } - return; - } - if (!result.has_value()) - { - fail(fmt::format("{} unexpected nullopt histSeq={} " - "currentSeq={} seed={}", - opName, histSeq, currentSeq, mSeed)); - return; - } - for (auto const& habe : *result) + auto result = histSnapshot->loadArchiveKeys(queryKeys, "test"); + for (auto const& habe : result) { if (habe.type() != HOT_ARCHIVE_ARCHIVED) { @@ -700,26 +759,8 @@ SnapshotStressTest::readHistoricalQuery(SnapshotThread& sthread, bool archive, } else { - auto result = - sthread.ledgerView().loadLiveKeysFromLedger(queryKeys, histSeq); - if (!retained) - { - if (result.has_value()) - { - fail(fmt::format("{} expected nullopt histSeq={} " - "currentSeq={} seed={}", - opName, histSeq, currentSeq, mSeed)); - } - return; - } - if (!result.has_value()) - { - fail(fmt::format("{} unexpected nullopt histSeq={} " - "currentSeq={} seed={}", - opName, histSeq, currentSeq, mSeed)); - return; - } - for (auto const& entry : *result) + auto result = histSnapshot->loadLiveKeys(queryKeys, "hist-query"); + for (auto const& entry : result) { resultMap[LedgerEntryKey(entry)] = entry; } @@ -759,6 +800,8 @@ SnapshotStressTest::readHistoricalQuery(SnapshotThread& sthread, bool archive, } } } + + ++mHistoricalVerifications; } // Copy a snapshot from a random peer thread. The peer's copySnapshot() @@ -820,9 +863,9 @@ SnapshotStressTest::closeLedgers() // Add both live and archive batches to their bucket lists, then // update the canonical state once so the snapshot atomically // includes both. - bm.getLiveBucketList().addBatch(mApp, header.ledgerSeq, - header.ledgerVersion, - mPregen.liveEntriesToWrite[i], {}, {}); + bm.getLiveBucketList().addBatch( + mApp, header.ledgerSeq, header.ledgerVersion, + mPregen.liveEntriesToWrite[i], mPregen.liveUpdatesToWrite[i], {}); if (i < mPregen.archiveEntriesToWrite.size()) { bm.getHotArchiveBucketList().addBatch( @@ -966,11 +1009,14 @@ TEST_CASE("snapshot concurrent stress test", "[snapshot][acceptance]") VirtualClock clock; auto cfg = getTestConfig(); cfg.QUERY_SNAPSHOT_LEDGERS = numHistorical; + cfg.QUERY_SERVER_FOR_TESTING = true; auto app = createTestApplication(clock, cfg); auto startSeq = app->getLedgerManager().getLastClosedLedgerNum(); auto pregen = pregenEntries(startSeq, NUM_LEDGERS, ENTRIES_PER_LEDGER); - SnapshotStressTest test(NUM_THREADS, seed, *app, pregen); + auto& qServer = app->getCommandHandler().getQueryServer(); + + SnapshotStressTest test(NUM_THREADS, seed, *app, pregen, qServer); test.run(); } diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index eb3d610fce..d323d15550 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -322,6 +322,12 @@ ApplicationImpl::initialize(bool createNewDB, bool forceRebuild) enableInvariantsFromConfig(); + // Create CommandHandler before newDB/ledger loading so that + // advanceLastClosedLedgerState can push snapshots to the QueryServer. + // This is safe because endpoints are blocked until we call setReady() after + // ledger loading is complete. + mCommandHandler = std::make_unique(*this); + if (initNewDB) { newDB(); @@ -340,9 +346,6 @@ ApplicationImpl::initialize(bool createNewDB, bool forceRebuild) // FILTERED_G_ADDRESSES config entries into the persistent table. mBannedAccountsPersistor = std::make_unique(*this); - // After everything is initialized, start accepting HTTP commands - mCommandHandler = std::make_unique(*this); - LOG_DEBUG(DEFAULT_LOG, "Application constructed"); } @@ -806,6 +809,17 @@ ApplicationImpl::start() CLOG_INFO(Ledger, "Starting up application"); mStarted = true; +#ifdef BUILD_TESTS + // In tests, newDB() and loadLastKnownLedger() both run in the same + // process, causing advanceLastClosedLedgerState to push the same LCL + // seq to the QueryServer twice. Clear the QS state so + // loadLastKnownLedger starts fresh. + if (getConfig().QUERY_SERVER_FOR_TESTING) + { + mCommandHandler->getQueryServer().resetForTesting(); + } +#endif + mLedgerManager->loadLastKnownLedger(); // LCL is now loaded; unblock HTTP endpoints that were gated during boot. diff --git a/src/main/CommandHandler.cpp b/src/main/CommandHandler.cpp index 94c649d704..7af8bf1595 100644 --- a/src/main/CommandHandler.cpp +++ b/src/main/CommandHandler.cpp @@ -80,6 +80,14 @@ CommandHandler::CommandHandler(Application& app) : mApp(app) mApp.getConfig().QUERY_THREAD_POOL_SIZE, mApp.getAppConnector()); } +#ifdef BUILD_TESTS + else if (mApp.getConfig().QUERY_SERVER_FOR_TESTING) + { + mQueryServer = std::make_unique( + "127.0.0.1", 0, 1, 1, mApp.getAppConnector(), true); + mQueryServer->setReady(); + } +#endif } if (!mApp.getConfig().HTTP_PORT) @@ -160,6 +168,15 @@ CommandHandler::setReady() } } +void +CommandHandler::addSnapshot(ImmutableLedgerDataPtr state) +{ + if (mQueryServer) + { + mQueryServer->addSnapshot(std::move(state)); + } +} + void CommandHandler::addRoute(std::string const& name, HandlerRoute route) { diff --git a/src/main/CommandHandler.h b/src/main/CommandHandler.h index 712a7e9ddb..b94b28b1fe 100644 --- a/src/main/CommandHandler.h +++ b/src/main/CommandHandler.h @@ -4,6 +4,7 @@ #pragma once +#include "ledger/ImmutableLedgerView.h" #include "lib/http/server.hpp" #include "main/QueryServer.h" #include "util/ProtocolVersion.h" @@ -52,6 +53,10 @@ class CommandHandler // "core is booting" response for every request. void setReady(); + // Forward new ledger state to QueryServer (no-op if query server is + // not enabled). + void addSnapshot(ImmutableLedgerDataPtr state); + std::string manualCmd(std::string const& cmd); void fileNotFound(std::string const& params, std::string& retStr); @@ -97,6 +102,13 @@ class CommandHandler void testAcc(std::string const& params, std::string& retStr); void testTx(std::string const& params, std::string& retStr); void toggleOverlayOnlyMode(std::string const& params, std::string& retStr); + + QueryServer& + getQueryServer() + { + releaseAssert(mQueryServer); + return *mQueryServer; + } #endif }; } diff --git a/src/main/Config.h b/src/main/Config.h index a718a8b843..7392d16efa 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -758,6 +758,14 @@ class Config : public std::enable_shared_from_this // Number of ledger snapshots to maintain for querying uint32_t QUERY_SNAPSHOT_LEDGERS; +#ifdef BUILD_TESTS + // When true, CommandHandler creates a QueryServer using the main thread + // for snapshot lookups (no network I/O). This allows tests to call + // QueryServer functions directly and ensures it has + // all snapshots from startup. + bool QUERY_SERVER_FOR_TESTING{false}; +#endif + // process-management config size_t MAX_CONCURRENT_SUBPROCESSES; diff --git a/src/main/QueryServer.cpp b/src/main/QueryServer.cpp index 06fb35c56b..99c6bbb110 100644 --- a/src/main/QueryServer.cpp +++ b/src/main/QueryServer.cpp @@ -67,6 +67,8 @@ QueryServer::QueryServer(std::string const& address, unsigned short port, ) : mServer(address, port, maxClient, threadPoolSize) , mAppConnector(appConnector) + // Always keep the LCL snapshot plus any additional historical snapshots. + , mMaxSnapshots(appConnector.getConfig().QUERY_SNAPSHOT_LEDGERS + 1) { LOG_INFO(DEFAULT_LOG, "Listening on {}:{} for Query requests", address, port); @@ -78,8 +80,8 @@ QueryServer::QueryServer(std::string const& address, unsigned short port, #ifdef BUILD_TESTS if (useMainThreadForTesting) { - mLedgerViews.emplace(std::this_thread::get_id(), - mAppConnector.copyImmutableLedgerView()); + // Register the main thread for per-thread snapshot cache + mPerThreadSnapshots[std::this_thread::get_id()]; } else #endif @@ -87,7 +89,7 @@ QueryServer::QueryServer(std::string const& address, unsigned short port, auto workerPids = mServer.start(); for (auto pid : workerPids) { - mLedgerViews.emplace(pid, mAppConnector.copyImmutableLedgerView()); + mPerThreadSnapshots[pid]; } } } @@ -104,6 +106,96 @@ QueryServer::setReady() mIsReady = true; } +void +QueryServer::addSnapshot(ImmutableLedgerDataPtr state) +{ + releaseAssert(state); + if (mMaxSnapshots == 0) + { + return; + } + + SharedLockExclusive guard(mMutex); + auto seq = state->getLastClosedLedgerHeader().header.ledgerSeq; + + // Make sure we don't have gaps in our snapshots. + if (!mStates.empty()) + { + releaseAssert(mStates.rbegin()->first == seq - 1); + } + + mStates.emplace(seq, std::move(state)); + + // Clean up outdated snapshots + while (mStates.size() > mMaxSnapshots) + { + mStates.erase(mStates.begin()); + } +} + +ImmutableLedgerView* +QueryServer::getSnapshotForLedger(std::optional ledgerSeq) +{ + auto it = mPerThreadSnapshots.find(std::this_thread::get_id()); + releaseAssert(it != mPerThreadSnapshots.end()); + auto& cache = it->second; + + // If a specific ledger was requested, check thread-local cache first + if (ledgerSeq) + { + auto cacheIt = cache.find(*ledgerSeq); + if (cacheIt != cache.end()) + { + return &cacheIt->second; + } + } + + // Look up in the main snapshot map under read lock. If no ledgerSeq + // was specified, resolve to the latest available. + ImmutableLedgerDataPtr state; + uint32_t oldestValid = 0; + { + SharedLockShared guard(mMutex); + if (mStates.empty()) + { + return nullptr; + } + + if (ledgerSeq) + { + auto it = mStates.find(*ledgerSeq); + if (it == mStates.end()) + { + return nullptr; + } + state = it->second; + } + else + { + // Check cache for latest snapshot and return it if found + auto latestLedgerSeq = mStates.rbegin()->first; + auto cacheIt = cache.find(latestLedgerSeq); + if (cacheIt != cache.end()) + { + return &cacheIt->second; + } + + state = mStates.rbegin()->second; + } + + oldestValid = mStates.begin()->first; + } + + // GC outdated snapshots from the local thread cache. + cache.erase(cache.begin(), cache.lower_bound(oldestValid)); + + // Create a thread local snapshot. + auto seq = state->getLastClosedLedgerHeader().header.ledgerSeq; + auto [inserted, _] = cache.emplace( + seq, ImmutableLedgerView(state, mAppConnector.getMetrics())); + return &inserted->second; +} + bool QueryServer::notFound(std::string const& params, std::string const& body, std::string& retStr) @@ -171,8 +263,13 @@ QueryServer::getLedgerEntryRaw(std::string const& params, if (!keys.empty()) { - auto& ledgerView = mLedgerViews.at(std::this_thread::get_id()); - mAppConnector.maybeUpdateImmutableLedgerView(ledgerView); + auto* snapshotPtr = getSnapshotForLedger(snapshotLedger); + if (!snapshotPtr) + { + retStr = "Ledger not found\n"; + return false; + } + root["ledgerSeq"] = snapshotPtr->getLedgerSeq(); LedgerKeySet orderedKeys; for (auto const& key : keys) @@ -182,31 +279,7 @@ QueryServer::getLedgerEntryRaw(std::string const& params, orderedKeys.emplace(k); } - std::vector loadedKeys; - - // If a ledgerView ledger is specified, use it to get the ledger entry - if (snapshotLedger) - { - root["ledgerSeq"] = *snapshotLedger; - - auto loadedKeysOp = - ledgerView.loadLiveKeysFromLedger(orderedKeys, *snapshotLedger); - - // Return 404 if ledgerSeq not found - if (!loadedKeysOp) - { - retStr = "Ledger not found\n"; - return false; - } - - loadedKeys = std::move(*loadedKeysOp); - } - // Otherwise default to current ledger - else - { - loadedKeys = ledgerView.loadLiveKeys(orderedKeys, "query"); - root["ledgerSeq"] = ledgerView.getLedgerSeq(); - } + auto loadedKeys = snapshotPtr->loadLiveKeys(orderedKeys, "query"); for (auto const& le : loadedKeys) { @@ -253,8 +326,6 @@ QueryServer::getLedgerEntry(std::string const& params, std::string const& body, return false; } - auto& ledgerView = mLedgerViews.at(std::this_thread::get_id()); - mAppConnector.maybeUpdateImmutableLedgerView(ledgerView); LedgerKeySet keysToSearch; // Keep track of keys in their original order for response ordering @@ -280,23 +351,19 @@ QueryServer::getLedgerEntry(std::string const& params, std::string const& body, inputOrderedKeys.push_back(k); } - std::vector liveEntries; - std::vector archivedEntries; - uint32_t ledgerSeq = - snapshotLedger ? *snapshotLedger : ledgerView.getLedgerSeq(); - root["ledgerSeq"] = ledgerSeq; - - auto liveEntriesOp = - ledgerView.loadLiveKeysFromLedger(keysToSearch, ledgerSeq); - - // Return 404 if ledgerSeq not found - if (!liveEntriesOp) + auto* snapshotPtr = getSnapshotForLedger(snapshotLedger); + if (!snapshotPtr) { retStr = "Ledger not found\n"; return false; } + uint32_t ledgerSeq = snapshotPtr->getLedgerSeq(); + root["ledgerSeq"] = ledgerSeq; + + std::vector liveEntries; + std::vector archivedEntries; - liveEntries = std::move(*liveEntriesOp); + liveEntries = snapshotPtr->loadLiveKeys(keysToSearch, "query"); // Remove keys found in live bucketList from subsequent searches for (auto const& le : liveEntries) @@ -316,14 +383,8 @@ QueryServer::getLedgerEntry(std::string const& params, std::string const& body, // Only query archive for soroban keys we didn't find in the live bucketList if (!hotArchiveKeysToSearch.empty()) { - auto archivedEntriesOp = ledgerView.loadArchiveKeysFromLedger( - hotArchiveKeysToSearch, ledgerSeq); - if (!archivedEntriesOp) - { - retStr = "Ledger not found\n"; - return false; - } - archivedEntries = std::move(*archivedEntriesOp); + archivedEntries = + snapshotPtr->loadArchiveKeys(hotArchiveKeysToSearch, "hot-query"); } // Collect TTL keys for Soroban entries in the live BucketList @@ -339,10 +400,7 @@ QueryServer::getLedgerEntry(std::string const& params, std::string const& body, std::vector ttlEntries; if (!ttlKeys.empty()) { - // We haven't updated the live ledgerView so we know the have a - // ledgerView available for ledgerSeq - ttlEntries = std::move( - ledgerView.loadLiveKeysFromLedger(ttlKeys, ledgerSeq).value()); + ttlEntries = snapshotPtr->loadLiveKeys(ttlKeys, "query"); } std::unordered_map ttlMap; diff --git a/src/main/QueryServer.h b/src/main/QueryServer.h index 5159acf726..9440d003ff 100644 --- a/src/main/QueryServer.h +++ b/src/main/QueryServer.h @@ -7,8 +7,11 @@ #include "lib/httpthreaded/server.hpp" #include "ledger/ImmutableLedgerView.h" +#include "util/ThreadAnnotations.h" #include #include +#include +#include #include #include #include @@ -26,12 +29,32 @@ class QueryServer httpThreaded::server::server mServer; - std::unordered_map mLedgerViews; + // Per-thread cache of ImmutableLedgerView objects. Each thread owns its + // cache exclusively, so no synchronization is needed for access. Entries + // are created lazily from mStates and garbage-collected when no longer + // present in the shared map. + std::unordered_map> + mPerThreadSnapshots; AppConnector& mAppConnector; std::atomic mIsReady{false}; + // Ledger states for query lookups, containing both the current and recent + // historical states. Protected by a shared mutex so that worker threads + // can read concurrently while the main thread writes on ledger close. + mutable ANNOTATED_SHARED_MUTEX(mMutex); + std::map mStates GUARDED_BY(mMutex); + uint32_t const mMaxSnapshots; + + // Returns a cached ImmutableLedgerView for the given ledger seq, or + // the latest available snapshot if ledgerSeq is nullopt. The pointer + // is into the per-thread cache and remains valid until the next call + // to getSnapshotForLedger on the same thread. Returns nullptr if no + // snapshot is found. + ImmutableLedgerView* + getSnapshotForLedger(std::optional ledgerSeq); + bool safeRouter(HandlerRoute route, std::string const& params, std::string const& body, std::string& retStr); @@ -42,7 +65,35 @@ class QueryServer #ifdef BUILD_TESTS public: + // Register the calling thread for per-thread snapshot caching. Must be + // called before any query methods are called from that thread. + void + registerThread() + { + SharedLockExclusive guard(mMutex); + mPerThreadSnapshots[std::this_thread::get_id()]; + } + + ImmutableLedgerView* + getSnapshotForLedgerForTesting(std::optional ledgerSeq) + { + return getSnapshotForLedger(ledgerSeq); + } + + // Clear all snapshot state. Used between newDB() and start() in tests + // to avoid the duplicate-seq assertion when both paths push the same LCL. + void + resetForTesting() + { + SharedLockExclusive guard(mMutex); + mStates.clear(); + for (auto& [tid, cache] : mPerThreadSnapshots) + { + cache.clear(); + } + } #endif + // Returns raw LedgerKeys for the given keys from the Live BucketList. Does // not query other BucketLists or reason about archival. bool getLedgerEntryRaw(std::string const& params, std::string const& body, @@ -64,5 +115,10 @@ class QueryServer // Called by CommandHandler::setReady() to unblock query endpoints. void setReady(); + + // Called from main thread when a new ledger state is published. The state + // is added to the snapshot map so query workers can serve current and + // historical ledger lookups. + void addSnapshot(ImmutableLedgerDataPtr state); }; } diff --git a/src/main/test/QueryServerTests.cpp b/src/main/test/QueryServerTests.cpp index bb15d14375..0b7be52ca3 100644 --- a/src/main/test/QueryServerTests.cpp +++ b/src/main/test/QueryServerTests.cpp @@ -9,6 +9,7 @@ #include "ledger/test/LedgerTestUtils.h" #include "lib/catch.hpp" #include "main/Application.h" +#include "main/CommandHandler.h" #include "main/Config.h" #include "main/QueryServer.h" #include "test/TestUtils.h" @@ -28,23 +29,60 @@ using namespace stellar; +namespace +{ + +std::string +buildRequestBody(std::optional ledgerSeq, + std::vector const& keys) +{ + std::string body; + if (ledgerSeq) + { + body = "ledgerSeq=" + std::to_string(*ledgerSeq); + } + for (auto const& key : keys) + { + body += (body.empty() ? "" : "&") + std::string("key=") + + toOpaqueBase64(key); + } + return body; +} + +// Performs a query and parses the JSON response. Returns true if the query +// succeeded, populating root with the parsed JSON. On failure, retStr contains +// the error message. +bool +queryAndParse(QueryServer& qServer, std::optional ledgerSeq, + std::vector const& keys, Json::Value& root, + std::string& retStr) +{ + auto reqBody = buildRequestBody(ledgerSeq, keys); + std::string empty; + retStr.clear(); + if (!qServer.getLedgerEntry(empty, reqBody, retStr)) + { + return false; + } + + Json::Reader reader; + REQUIRE(reader.parse(retStr, root)); + return true; +} + +} // namespace + TEST_CASE("getledgerentry", "[queryserver]") { VirtualClock clock; auto cfg = getTestConfig(); cfg.QUERY_SNAPSHOT_LEDGERS = 5; + cfg.QUERY_SERVER_FOR_TESTING = true; auto app = createTestApplication( clock, cfg); auto& lm = app->getLedgerManager(); - - // Query Server is disabled by default in cfg. Instead of enabling it, we're - // going to manage a version here manually so we can directly call functions - // and avoid sending network requests. - auto qServer = std::make_unique("127.0.0.1", 0, - 1, // maxClient - 2, // threadPoolSize - app->getAppConnector(), true); + auto& qServer = app->getCommandHandler().getQueryServer(); std::unordered_map liveEntryMap; @@ -106,24 +144,6 @@ TEST_CASE("getledgerentry", "[queryserver]") closeLedger(*app); } - // Build HTTP request string body - auto buildRequestBody = - [](std::optional ledgerSeq, - std::vector const& keys) -> std::string { - std::string body; - if (ledgerSeq) - { - body = "ledgerSeq=" + std::to_string(*ledgerSeq); - } - - for (auto const& key : keys) - { - body += (body.empty() ? "" : "&") + std::string("key=") + - toOpaqueBase64(key); - } - return body; - }; - // Check response for entries that exist from returned JSON string auto checkEntry = [](auto const& entries, LedgerEntry const& le, std::optional expectedTTL, @@ -244,16 +264,12 @@ TEST_CASE("getledgerentry", "[queryserver]") keysToSearch.push_back(key); } - auto reqBody = buildRequestBody(std::nullopt, keysToSearch); - std::string retStr; - std::string empty; - REQUIRE(qServer->getLedgerEntry(empty, reqBody, retStr)); - auto ledgerSeq = lm.getLastClosedLedgerNum(); Json::Value root; - Json::Reader reader; - REQUIRE(reader.parse(retStr, root)); + std::string retStr; + REQUIRE( + queryAndParse(qServer, std::nullopt, keysToSearch, root, retStr)); REQUIRE(root.isMember("entries")); REQUIRE(root.isMember("ledgerSeq")); REQUIRE(root["ledgerSeq"].asUInt() == ledgerSeq); @@ -377,14 +393,10 @@ TEST_CASE("getledgerentry", "[queryserver]") SECTION("current values") { - auto reqBody = buildRequestBody(newLedger, keysToSearch); - std::string retStr; - std::string empty; - REQUIRE(qServer->getLedgerEntry(empty, reqBody, retStr)); - Json::Value root; - Json::Reader reader; - REQUIRE(reader.parse(retStr, root)); + std::string retStr; + REQUIRE( + queryAndParse(qServer, newLedger, keysToSearch, root, retStr)); REQUIRE(root.isMember("entries")); REQUIRE(root.isMember("ledgerSeq")); REQUIRE(root["ledgerSeq"].asUInt() == newLedger); @@ -412,14 +424,10 @@ TEST_CASE("getledgerentry", "[queryserver]") SECTION("snapshot values") { - auto reqBody = buildRequestBody(oldLedger, keysToSearch); - std::string retStr; - std::string empty; - REQUIRE(qServer->getLedgerEntry(empty, reqBody, retStr)); - Json::Value root; - Json::Reader reader; - REQUIRE(reader.parse(retStr, root)); + std::string retStr; + REQUIRE( + queryAndParse(qServer, oldLedger, keysToSearch, root, retStr)); REQUIRE(root.isMember("entries")); REQUIRE(root.isMember("ledgerSeq")); REQUIRE(root["ledgerSeq"].asUInt() == oldLedger); @@ -450,10 +458,9 @@ TEST_CASE("getledgerentry", "[queryserver]") SECTION("empty keys") { + Json::Value root; std::string retStr; - std::string empty; - auto body = "ledgerSeq=10"; // No keys provided - REQUIRE(!qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE(!queryAndParse(qServer, 10, {}, root, retStr)); REQUIRE(retStr == "Must specify key in POST body: key=\n"); @@ -464,10 +471,9 @@ TEST_CASE("getledgerentry", "[queryserver]") LedgerKey ttlKey = LedgerEntryKey( LedgerTestUtils::generateValidLedgerEntryOfType(TTL)); - auto body = buildRequestBody(std::nullopt, {ttlKey}); + Json::Value root; std::string retStr; - std::string empty; - REQUIRE(!qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE(!queryAndParse(qServer, std::nullopt, {ttlKey}, root, retStr)); REQUIRE(retStr == "TTL keys are not allowed\n"); } @@ -475,20 +481,22 @@ TEST_CASE("getledgerentry", "[queryserver]") { auto liveEntry = liveEntryMap.begin()->first; auto currentLedger = lm.getLastClosedLedgerNum(); - auto body = buildRequestBody(currentLedger + 1000, {liveEntry}); + + Json::Value root; std::string retStr; - std::string empty; - REQUIRE(!qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE(!queryAndParse(qServer, currentLedger + 1000, {liveEntry}, root, + retStr)); REQUIRE(retStr == "Ledger not found\n"); } SECTION("duplicate keys") { auto liveEntry = liveEntryMap.begin()->first; - auto body = buildRequestBody(std::nullopt, {liveEntry, liveEntry}); + + Json::Value root; std::string retStr; - std::string empty; - REQUIRE(!qServer->getLedgerEntry(empty, body, retStr)); + REQUIRE(!queryAndParse(qServer, std::nullopt, {liveEntry, liveEntry}, + root, retStr)); REQUIRE(retStr == "Duplicate keys\n"); } @@ -514,14 +522,10 @@ TEST_CASE("getledgerentry", "[queryserver]") auto testKeyOrder = [&](std::vector const& keyOrder, bool liveFirst) { - auto reqBody = buildRequestBody(std::nullopt, keyOrder); - std::string retStr; - std::string empty; - REQUIRE(qServer->getLedgerEntry(empty, reqBody, retStr)); - Json::Value root; - Json::Reader reader; - REQUIRE(reader.parse(retStr, root)); + std::string retStr; + REQUIRE( + queryAndParse(qServer, std::nullopt, keyOrder, root, retStr)); REQUIRE(root.isMember("entries")); auto entries = root["entries"]; @@ -556,3 +560,153 @@ TEST_CASE("getledgerentry", "[queryserver]") testKeyOrder({newKey, liveKey}, false); } } + +TEST_CASE("query server with zero snapshot ledgers", "[queryserver]") +{ + VirtualClock clock; + auto cfg = getTestConfig(); + cfg.QUERY_SNAPSHOT_LEDGERS = 0; + cfg.QUERY_SERVER_FOR_TESTING = true; + + auto app = createTestApplication( + clock, cfg); + auto& lm = app->getLedgerManager(); + auto& qServer = app->getCommandHandler().getQueryServer(); + + // Insert some entries and close a few ledgers + auto entries = LedgerTestUtils::generateValidUniqueLedgerEntriesWithTypes( + {ACCOUNT}, 5); + std::vector keys; + for (auto const& le : entries) + { + keys.push_back(LedgerEntryKey(le)); + } + + lm.setNextLedgerEntryBatchForBucketTesting(entries, {}, {}); + closeLedger(*app); + + auto currentLedger = lm.getLastClosedLedgerNum(); + + SECTION("current ledger query works") + { + // Query without specifying ledgerSeq — should return LCL data + Json::Value root; + std::string retStr; + REQUIRE(queryAndParse(qServer, std::nullopt, keys, root, retStr)); + REQUIRE(root["ledgerSeq"].asUInt() == currentLedger); + REQUIRE(root["entries"].size() == keys.size()); + } + + SECTION("explicit current ledger query works") + { + // Query with explicit current ledgerSeq + Json::Value root; + std::string retStr; + REQUIRE(queryAndParse(qServer, currentLedger, keys, root, retStr)); + REQUIRE(root["ledgerSeq"].asUInt() == currentLedger); + REQUIRE(root["entries"].size() == keys.size()); + } + + SECTION("historical query fails") + { + // Close another ledger so the previous one becomes historical + lm.setNextLedgerEntryBatchForBucketTesting({}, entries, {}); + closeLedger(*app); + + Json::Value root; + std::string retStr; + REQUIRE(!queryAndParse(qServer, currentLedger, keys, root, retStr)); + REQUIRE(retStr == "Ledger not found\n"); + } +} + +TEST_CASE("query server historical snapshots", "[queryserver]") +{ + VirtualClock clock; + auto cfg = getTestConfig(); + cfg.QUERY_SNAPSHOT_LEDGERS = 5; + cfg.QUERY_SERVER_FOR_TESTING = true; + + auto app = createTestApplication( + clock, cfg); + auto& lm = app->getLedgerManager(); + auto& qServer = app->getCommandHandler().getQueryServer(); + + // Create a single account entry that we modify each ledger + auto entry = LedgerTestUtils::generateValidLedgerEntryOfType(ACCOUNT); + auto key = LedgerEntryKey(entry); + + lm.setNextLedgerEntryBatchForBucketTesting({entry}, {}, {}); + closeLedger(*app); + uint32_t firstLedger = lm.getLastClosedLedgerNum(); + + // Close several more ledgers, modifying the entry each time + uint32_t const additionalLedgers = 8; + for (uint32_t i = 0; i < additionalLedgers; ++i) + { + entry.lastModifiedLedgerSeq = lm.getLastClosedLedgerNum(); + lm.setNextLedgerEntryBatchForBucketTesting({}, {entry}, {}); + closeLedger(*app); + } + + uint32_t currentLedger = lm.getLastClosedLedgerNum(); + + SECTION("current ledger returns data") + { + Json::Value root; + std::string retStr; + REQUIRE(queryAndParse(qServer, currentLedger, {key}, root, retStr)); + REQUIRE(root["ledgerSeq"].asUInt() == currentLedger); + REQUIRE(root["entries"].size() == 1); + REQUIRE(root["entries"][0]["state"].asString() == "live"); + } + + SECTION("recent historical ledgers return data") + { + // Query each ledger in the historical window + for (uint32_t seq = currentLedger - 1; + seq >= currentLedger - cfg.QUERY_SNAPSHOT_LEDGERS + 1; --seq) + { + Json::Value root; + std::string retStr; + REQUIRE(queryAndParse(qServer, seq, {key}, root, retStr)); + REQUIRE(root["ledgerSeq"].asUInt() == seq); + REQUIRE(root["entries"].size() == 1); + } + } + + SECTION("ledger outside window returns not found") + { + // Query a ledger that has been evicted from the window + Json::Value root; + std::string retStr; + REQUIRE(!queryAndParse(qServer, firstLedger, {key}, root, retStr)); + REQUIRE(retStr == "Ledger not found\n"); + } + + SECTION("future ledger returns not found") + { + Json::Value root; + std::string retStr; + REQUIRE( + !queryAndParse(qServer, currentLedger + 100, {key}, root, retStr)); + REQUIRE(retStr == "Ledger not found\n"); + } + + SECTION("default query returns latest after advancing") + { + // Close one more ledger + entry.lastModifiedLedgerSeq = lm.getLastClosedLedgerNum(); + lm.setNextLedgerEntryBatchForBucketTesting({}, {entry}, {}); + closeLedger(*app); + + uint32_t newLedger = lm.getLastClosedLedgerNum(); + REQUIRE(newLedger == currentLedger + 1); + + // Query without ledgerSeq should return the new latest + Json::Value root; + std::string retStr; + REQUIRE(queryAndParse(qServer, std::nullopt, {key}, root, retStr)); + REQUIRE(root["ledgerSeq"].asUInt() == newLedger); + } +} diff --git a/src/simulation/test/LoadGeneratorTests.cpp b/src/simulation/test/LoadGeneratorTests.cpp index dc1fee9760..5f76b50f88 100644 --- a/src/simulation/test/LoadGeneratorTests.cpp +++ b/src/simulation/test/LoadGeneratorTests.cpp @@ -1078,7 +1078,7 @@ TEST_CASE("apply load", "[loadgen][applyload][acceptance]") sampleKeys.insert(ApplyLoad::getKeyForArchivedEntry(idx)); } - auto sampleEntries = ledgerView.loadArchiveKeys(sampleKeys); + auto sampleEntries = ledgerView.loadArchiveKeys(sampleKeys, "test"); REQUIRE(sampleEntries.size() == sampleKeys.size()); al.execute(); diff --git a/src/transactions/test/InvokeHostFunctionTests.cpp b/src/transactions/test/InvokeHostFunctionTests.cpp index 5311bf62a4..a66e10f8d0 100644 --- a/src/transactions/test/InvokeHostFunctionTests.cpp +++ b/src/transactions/test/InvokeHostFunctionTests.cpp @@ -10090,7 +10090,7 @@ TEST_CASE("autorestore from another contract", "[tx][soroban][archival]") auto archivedSnap = test.getApp().getLedgerManager().copyImmutableLedgerView(); - REQUIRE(archivedSnap.loadArchiveKeys({lk1, lk2}).size() == 2); + REQUIRE(archivedSnap.loadArchiveKeys({lk1, lk2}, "load").size() == 2); REQUIRE(archivedSnap.loadLiveKeys({lk1, lk2}, "load").size() == 0); // Now, invoke contract2, but also autorestore state from contract1. @@ -10120,7 +10120,7 @@ TEST_CASE("autorestore from another contract", "[tx][soroban][archival]") test.getApp().getLedgerManager().copyImmutableLedgerView(); REQUIRE(restoredSnap.loadLiveKeys({lk1, lk2}, "load").size() == 2); - REQUIRE(restoredSnap.loadArchiveKeys({lk1, lk2}).size() == 0); + REQUIRE(restoredSnap.loadArchiveKeys({lk1, lk2}, "load").size() == 0); // Verify that the correct values were restored REQUIRE(client1.get("key1", ContractDataDurability::PERSISTENT, 111) ==