Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare
bucketlistDB-live.bulk.prefetch | timer | time to prefetch
bucketlistDB-live.bulk.eviction | timer | time to load for eviction scan
bucketlistDB-live.bulk.query | timer | time to load for query server
bucketlistDB-hotArchive.bulk.hot-query | timer | time to load hot archive entries for query server
bucketlistDB-<X>.<Y>.sum | counter | sum of time (microseconds) to load single entry of type <Y> on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.<Y>.count | counter | number of times single entry of type <Y> on BucketList <X> (live/hotArchive) is loaded
bucketlistDB-<X>.<Y>.max | counter | max (since last metrics call) of time (microseconds) to load single entry of type <Y> on BucketList <X> (live/hotArchive)
Expand Down
110 changes: 19 additions & 91 deletions src/bucket/BucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,8 @@ BucketListSnapshotData<BucketT>::BucketListSnapshotData(
template <class BucketT>
SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots,
uint32_t ledgerSeq)
std::shared_ptr<BucketListSnapshotData<BucketT> const> data)
: mData(std::move(data))
, mHistoricalSnapshots(std::move(historicalSnapshots))
, mLedgerSeq(ledgerSeq)
, mMetrics(metrics)
, mBulkLoadMeter(
metrics.NewMeter({BucketT::METRIC_STRING, "query", "loads"}, "query"))
Expand All @@ -85,8 +80,6 @@ template <class BucketT>
SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
SearchableBucketListSnapshot const& other)
: mData(other.mData)
, mHistoricalSnapshots(other.mHistoricalSnapshots)
, mLedgerSeq(other.mLedgerSeq)
// mStreams intentionally left empty — each copy gets its own stream cache
, mMetrics(other.mMetrics)
, mPointTimers(other.mPointTimers)
Expand All @@ -103,8 +96,6 @@ SearchableBucketListSnapshot<BucketT>::operator=(
if (this != &other)
{
mData = other.mData;
mHistoricalSnapshots = other.mHistoricalSnapshots;
mLedgerSeq = other.mLedgerSeq;
mStreams.clear();
mMetrics = other.mMetrics;
mPointTimers = other.mPointTimers;
Expand Down Expand Up @@ -345,51 +336,6 @@ SearchableBucketListSnapshot<BucketT>::load(LedgerKey const& k) const
return result;
}

template <class BucketT>
std::optional<std::vector<typename BucketT::LoadT>>
SearchableBucketListSnapshot<BucketT>::loadKeysInternal(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::optional<uint32_t> ledgerSeq) const
{
ZoneScoped;
releaseAssert(mData);

// Make a copy of the key set, this loop is destructive
auto keys = inKeys;
std::vector<typename BucketT::LoadT> entries;

auto loadKeysLoop = [&](std::shared_ptr<BucketT const> const& bucket) {
loadKeysFromBucket(bucket, keys, entries);
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};

if (!ledgerSeq || *ledgerSeq == mLedgerSeq)
{
loopAllBuckets(loadKeysLoop, *mData);
}
else
{
auto iter = mHistoricalSnapshots.find(*ledgerSeq);
if (iter == mHistoricalSnapshots.end())
{
return std::nullopt;
}
releaseAssert(iter->second);
loopAllBuckets(loadKeysLoop, *iter->second);
}

return entries;
}

template <class BucketT>
std::optional<std::vector<typename BucketT::LoadT>>
SearchableBucketListSnapshot<BucketT>::loadKeysFromLedger(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const
{
return loadKeysInternal(inKeys, ledgerSeq);
}

template <class BucketT>
medida::Timer&
SearchableBucketListSnapshot<BucketT>::getBulkLoadTimer(
Expand Down Expand Up @@ -418,39 +364,35 @@ SearchableBucketListSnapshot<BucketT>::getSnapshotData() const
return mData;
}

template <class BucketT>
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>> const&
SearchableBucketListSnapshot<BucketT>::getHistoricalSnapshots() const
{
return mHistoricalSnapshots;
}

//
// SearchableLiveBucketListSnapshot
//

SearchableLiveBucketListSnapshot::SearchableLiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<LiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data)
: SearchableBucketListSnapshot<LiveBucket>(metrics, std::move(data))
{
}

std::vector<LedgerEntry>
SearchableLiveBucketListSnapshot::loadKeys(
template <class BucketT>
std::vector<typename BucketT::LoadT>
SearchableBucketListSnapshot<BucketT>::loadKeys(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::string const& label) const
{
ZoneScoped;
releaseAssert(mData);
auto timer = getBulkLoadTimer(label, inKeys.size()).TimeScope();
auto op = loadKeysInternal(inKeys, std::nullopt);
releaseAssertOrThrow(op);
return std::move(*op);

auto keys = inKeys;
std::vector<typename BucketT::LoadT> entries;
auto loadKeysLoop = [&](std::shared_ptr<BucketT const> const& bucket) {
loadKeysFromBucket(bucket, keys, entries);
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};
loopAllBuckets(loadKeysLoop, *mData);
return entries;
}

// This query has two steps:
Expand Down Expand Up @@ -864,23 +806,9 @@ SearchableLiveBucketListSnapshot::scanForEvictionInBucket(

SearchableHotArchiveBucketListSnapshot::SearchableHotArchiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<HotArchiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
{
}

std::vector<HotArchiveBucketEntry>
SearchableHotArchiveBucketListSnapshot::loadKeys(
std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data)
: SearchableBucketListSnapshot<HotArchiveBucket>(metrics, std::move(data))
{
auto op = loadKeysInternal(inKeys, std::nullopt);
releaseAssertOrThrow(op);
return std::move(*op);
}

void
Expand Down
59 changes: 9 additions & 50 deletions src/bucket/BucketListSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

#include <functional>
#include <list>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <vector>
Expand Down Expand Up @@ -81,13 +79,6 @@ template <class BucketT> class SearchableBucketListSnapshot
protected:
// Shared immutable snapshot data
std::shared_ptr<BucketListSnapshotData<BucketT> const> mData;
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
mHistoricalSnapshots;

// Ledger sequence number for this snapshot, used internally to route
// queries between current and historical data. Not exposed publicly;
// callers should get ledger metadata from ImmutableLedgerData.
uint32_t mLedgerSeq;

// Per-snapshot mutable stream cache
mutable UnorderedMap<BucketT const*, std::unique_ptr<XDRInputFileStream>>
Expand Down Expand Up @@ -133,29 +124,19 @@ template <class BucketT> class SearchableBucketListSnapshot
std::set<LedgerKey, LedgerEntryIdCmp>& keys,
std::vector<typename BucketT::LoadT>& result) const;

std::optional<std::vector<typename BucketT::LoadT>>
loadKeysInternal(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::optional<uint32_t> ledgerSeq) const;

medida::Timer& getBulkLoadTimer(std::string const& label,
size_t numEntries) const;

// Iterate over all buckets in a snapshot in order, calling f on each
// non-empty bucket. Exits early if function returns Loop::COMPLETE.
// The first overload operates on an explicit snapshot (used for historical
// queries).
template <typename Func>
void loopAllBuckets(Func&& f,
BucketListSnapshotData<BucketT> const& snapshot) const;
template <typename Func> void loopAllBuckets(Func&& f) const;

SearchableBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<BucketT> const> data);

public:
// Copy: copies all state except mStreams, which is reset to empty.
Expand All @@ -171,23 +152,16 @@ template <class BucketT> class SearchableBucketListSnapshot
std::shared_ptr<typename BucketT::LoadT const>
load(LedgerKey const& k) const;

// Loads inKeys from the specified historical snapshot. Returns
// load_result_vec if the snapshot for the given ledger is
// available, std::nullopt otherwise. Note that ledgerSeq is defined
// as the state of the BucketList at the beginning of the ledger. This means
// that for ledger N, the maximum lastModifiedLedgerSeq of any LedgerEntry
// in the BucketList is N - 1.
std::optional<std::vector<typename BucketT::LoadT>>
loadKeysFromLedger(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const;
// Bulk load. Walks all buckets, accumulating found entries until all
// keys have been resolved. label identifies the call site for the bulk
// load timer metric.
std::vector<typename BucketT::LoadT>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::string const& label) const;

// Access to underlying data (for copying/refreshing)
std::shared_ptr<BucketListSnapshotData<BucketT> const> const&
getSnapshotData() const;

std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>> const&
getHistoricalSnapshots() const;
};

// Live bucket list snapshot with additional query methods
Expand All @@ -196,11 +170,7 @@ class SearchableLiveBucketListSnapshot
{
SearchableLiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data);

Loop scanForEvictionInBucket(
std::shared_ptr<LiveBucket const> const& bucket, EvictionIterator& iter,
Expand All @@ -212,10 +182,6 @@ class SearchableLiveBucketListSnapshot
SearchableLiveBucketListSnapshot(SearchableLiveBucketListSnapshot const&) =
default;

std::vector<LedgerEntry>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::string const& label) const;

std::vector<LedgerEntry>
loadPoolShareTrustLinesByAccountAndAsset(AccountID const& accountID,
Asset const& asset) const;
Expand Down Expand Up @@ -244,19 +210,12 @@ class SearchableHotArchiveBucketListSnapshot
{
SearchableHotArchiveBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t, std::shared_ptr<
BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots,
uint32_t ledgerSeq);
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data);

public:
SearchableHotArchiveBucketListSnapshot(
SearchableHotArchiveBucketListSnapshot const&) = default;

std::vector<HotArchiveBucketEntry>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const;

// Iterate over all entries in all buckets. Note this iterates over all
// HotArchiveBucketEntry, so some may be shadowed and outdated.
void scanAllEntries(
Expand Down
Loading
Loading