diff --git a/docs/metrics.md b/docs/metrics.md index 578bd281b3..cb5db6377e 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -187,6 +187,8 @@ scp.pending.discarded | counter | number of discarded enve scp.pending.fetching | counter | number of incomplete envelopes scp.pending.processed | counter | number of already processed envelopes scp.pending.ready | counter | number of envelopes ready to process +scp.skip.externalized | counter | number of times the local node externalized a skip-ledger value +scp.skip.value-replaced | counter | number of times the ballot protocol swapped a value for a skip-ledger value scp.sync.lost | meter | validator lost sync scp.timeout.nominate | meter | timeouts in nomination scp.timeout.prepare | meter | timeouts in ballot protocol @@ -194,6 +196,7 @@ scp.timing.nominated | timer | time spent in nomination scp.timing.externalized | timer | time spent in ballot protocol scp.timing.first-to-self-externalize-lag | timer | delay between first externalize message and local node externalizing scp.timing.self-to-others-externalize-lag | timer | delay between local node externalizing and later externalize messages from other nodes +scp.timing.ballot-blocked-on-txset | timer | time balloting was blocked waiting for a txset download (milliseconds) scp.value.invalid | meter | SCP value is invalid scp.value.valid | meter | SCP value is valid scp.slot.values-referenced | histogram | number of values referenced per consensus round diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg index 904105e606..cbfb5ec2b6 100644 --- a/docs/stellar-core_example.cfg +++ b/docs/stellar-core_example.cfg @@ -163,6 +163,12 @@ PEER_TIMEOUT=30 # time when authenticated. PEER_STRAGGLER_TIMEOUT=120 +# TODO: Update these docs vv +# TX_SET_DOWNLOAD_TIMEOUT (Integer) default 5000 +# Time in milliseconds before a node gives up waiting on a transaction set and +# votes to skip the ledger. +TX_SET_DOWNLOAD_TIMEOUT=5000 + # MAX_BATCH_WRITE_COUNT (Integer) default 1024 # How many messages can this server send at once to a peer MAX_BATCH_WRITE_COUNT=1024 diff --git a/src/herder/Herder.cpp b/src/herder/Herder.cpp index 5a7f6c859c..4279e5f3c0 100644 --- a/src/herder/Herder.cpp +++ b/src/herder/Herder.cpp @@ -22,4 +22,6 @@ uint32 const Herder::SCP_EXTRA_LOOKBACK_LEDGERS = 3u; std::chrono::minutes const Herder::TX_SET_GC_DELAY(1); std::chrono::minutes const Herder::CHECK_FOR_DEAD_NODES_MINUTES(15); uint32 const Herder::FLOW_CONTROL_BYTES_EXTRA_BUFFER(2000); + +Hash const Herder::SKIP_LEDGER_HASH{}; } diff --git a/src/herder/Herder.h b/src/herder/Herder.h index 4fc9b685d8..435b850c38 100644 --- a/src/herder/Herder.h +++ b/src/herder/Herder.h @@ -16,9 +16,17 @@ #include #include #include +#include namespace stellar { + +// Returned by getTxSet to distinguish "skip" values (no real tx set) +// from "not yet downloaded" (nullptr). +struct SkipTxSet +{ +}; +using TxSetResult = std::variant; class Application; class XDROutputFileStream; @@ -79,6 +87,9 @@ class Herder static std::chrono::minutes const TX_SET_GC_DELAY; + // TODO: Docs + static Hash const SKIP_LEDGER_HASH; + enum State { // Starting up, no state is known @@ -147,7 +158,7 @@ class Herder #endif virtual void peerDoesntHave(stellar::MessageType type, uint256 const& itemID, Peer::pointer peer) = 0; - virtual TxSetXDRFrameConstPtr getTxSet(Hash const& hash) = 0; + virtual TxSetResult getTxSet(Hash const& hash) = 0; virtual SCPQuorumSetPtr getQSet(Hash const& qSetHash) = 0; // We are learning about a new envelope. diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index 467a0424e1..8386c03bf7 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -328,8 +328,19 @@ HerderImpl::processExternalized(uint64 slotIndex, StellarValue const& value, slotIndex, hexAbbrev(value.txSetHash)); } - TxSetXDRFrameConstPtr externalizedSet = - mPendingEnvelopes.getTxSet(value.txSetHash); + auto result = mPendingEnvelopes.getTxSet(value.txSetHash); + TxSetXDRFrameConstPtr externalizedSet; + if (std::holds_alternative(result)) + { + auto const& ov = value.ext.originalValue(); + externalizedSet = TxSetXDRFrame::makeEmpty(ov.previousLedgerHash, + ov.previousLedgerVersion); + } + else + { + externalizedSet = std::get(result); + } + releaseAssert(externalizedSet != nullptr); // save the SCP messages in the database if (mApp.getConfig().MODE_STORES_HISTORY_MISC) @@ -898,6 +909,9 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope) return Herder::ENVELOPE_STATUS_SKIPPED_SELF; } + // This call fetches everything. Will only return ENVELOPE_STATUS_READY once + // everything is fetched though! Will need a new status to allow it to + // proceed to nomination at least, I think. auto status = mPendingEnvelopes.recvSCPEnvelope(envelope); if (status == Herder::ENVELOPE_STATUS_READY) { @@ -912,10 +926,26 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope) } else { - if (status == Herder::ENVELOPE_STATUS_FETCHING) + SCPStatementType type = envelope.statement.pledges.type(); + // Allow parallel tx set downloading if the node is in sync and this is + // a NOMINATE or PREPARE message. Technically both of these criteria + // should be properly handled downstream, but this provides some + // additional assurance. + if (mApp.getState() == Application::State::APP_SYNCED_STATE && + status == Herder::ENVELOPE_STATUS_FETCHING && + (type == SCP_ST_NOMINATE || type == SCP_ST_PREPARE)) { std::string txt("FETCHING"); ZoneText(txt.c_str(), txt.size()); + + // If we have the quorum set, then proceed without the tx set. + auto qSetHash = Slot::getCompanionQuorumSetHashFromStatement( + envelope.statement); + auto maybeQSet = mApp.getHerder().getQSet(qSetHash); + if (maybeQSet) + { + processSCPQueue(true); + } } else if (status == Herder::ENVELOPE_STATUS_PROCESSED) { @@ -1138,6 +1168,7 @@ void HerderImpl::processSCPQueueUpToIndex(uint64 slotIndex) { ZoneScoped; + CLOG_TRACE(Proto, "Processing SCP queue up to index {}", slotIndex); while (true) { SCPEnvelopeWrapperPtr envW = mPendingEnvelopes.pop(slotIndex); @@ -1386,7 +1417,7 @@ HerderImpl::peerDoesntHave(MessageType type, uint256 const& itemID, mPendingEnvelopes.peerDoesntHave(type, itemID, peer); } -TxSetXDRFrameConstPtr +TxSetResult HerderImpl::getTxSet(Hash const& hash) { return mPendingEnvelopes.getTxSet(hash); @@ -2162,11 +2193,15 @@ HerderImpl::persistSCPState(uint64 slot) // saves transaction sets referred by the statement for (auto const& h : getValidatedTxSetHashes(e)) { - auto txSet = mPendingEnvelopes.getTxSet(h); - if (txSet && !mApp.getPersistentState().hasTxSet(h)) + auto result = mPendingEnvelopes.getTxSet(h); + if (auto* txSetPtr = std::get_if(&result)) { - txSets.insert(std::make_pair(h, txSet)); + if (*txSetPtr && !mApp.getPersistentState().hasTxSet(h)) + { + txSets.insert(std::make_pair(h, *txSetPtr)); + } } + // SkipTxSet: nothing to persist } Hash qsHash = Slot::getCompanionQuorumSetHashFromStatement(e.statement); SCPQuorumSetPtr qSet = mPendingEnvelopes.getQSet(qsHash); @@ -2705,11 +2740,32 @@ bool HerderImpl::verifyStellarValueSignature(StellarValue const& sv) { ZoneScoped; - auto [b, _] = PubKeyUtils::verifySig( - sv.ext.lcValueSignature().nodeID, sv.ext.lcValueSignature().signature, - xdr::xdr_to_opaque(mApp.getNetworkID(), ENVELOPE_TYPE_SCPVALUE, - sv.txSetHash, sv.closeTime)); - return b; + switch (sv.ext.v()) + { + case STELLAR_VALUE_BASIC: + // This function should never be called with an unsigned value + releaseAssert(false); + case STELLAR_VALUE_SIGNED: + return PubKeyUtils::verifySig(sv.ext.lcValueSignature().nodeID, + sv.ext.lcValueSignature().signature, + xdr::xdr_to_opaque(mApp.getNetworkID(), + ENVELOPE_TYPE_SCPVALUE, + sv.txSetHash, + sv.closeTime)) + .valid; + case STELLAR_VALUE_SKIP: + { + auto const& ov = sv.ext.originalValue(); + return PubKeyUtils::verifySig( + ov.lcValueSignature.nodeID, ov.lcValueSignature.signature, + xdr::xdr_to_opaque(mApp.getNetworkID(), + ENVELOPE_TYPE_SCPVALUE, ov.txSetHash, + sv.closeTime)) + .valid; + } + default: + releaseAssert(false); + } } StellarValue diff --git a/src/herder/HerderImpl.h b/src/herder/HerderImpl.h index 72ff138824..651bcccc03 100644 --- a/src/herder/HerderImpl.h +++ b/src/herder/HerderImpl.h @@ -155,7 +155,7 @@ class HerderImpl : public Herder bool recvTxSet(Hash const& hash, TxSetXDRFrameConstPtr txset) override; void peerDoesntHave(MessageType type, uint256 const& itemID, Peer::pointer peer) override; - TxSetXDRFrameConstPtr getTxSet(Hash const& hash) override; + TxSetResult getTxSet(Hash const& hash) override; SCPQuorumSetPtr getQSet(Hash const& qSetHash) override; // process ready SCP messages. This may trigger the node to externalze new @@ -234,7 +234,7 @@ class HerderImpl : public Herder // helper function to sign envelopes void signEnvelope(SecretKey const& s, SCPEnvelope& envelope); - // helper function to verify SCPValues are signed + // helper function to verify SCPValues signatures bool verifyStellarValueSignature(StellarValue const& sv); size_t getMaxQueueSizeOps() const override; diff --git a/src/herder/HerderSCPDriver.cpp b/src/herder/HerderSCPDriver.cpp index b1b4f54aee..082805b7cd 100644 --- a/src/herder/HerderSCPDriver.cpp +++ b/src/herder/HerderSCPDriver.cpp @@ -65,6 +65,12 @@ HerderSCPDriver::SCPMetrics::SCPMetrics(Application& app) {"scp", "timing", "first-to-self-externalize-lag"})) , mSelfToOthersExternalizeLag(app.getMetrics().NewTimer( {"scp", "timing", "self-to-others-externalize-lag"})) + , mBallotBlockedOnTxSet(app.getMetrics().NewTimer( + {"scp", "timing", "ballot-blocked-on-txset"})) + , mSkipExternalized( + app.getMetrics().NewCounter({"scp", "skip", "externalized"})) + , mSkipValueReplaced( + app.getMetrics().NewCounter({"scp", "skip", "value-replaced"})) { } @@ -117,9 +123,15 @@ class SCPHerderEnvelopeWrapper : public SCPEnvelopeWrapper std::vector mTxSets; public: - explicit SCPHerderEnvelopeWrapper(SCPEnvelope const& e, HerderImpl& herder) + // Wrap an SCP envelope `e`, using `herder` to fetch the quorum set. This + // function inserts hashes corresponding to missing transaction sets into + // the output parameter `missingTxSets`. + explicit SCPHerderEnvelopeWrapper(SCPEnvelope const& e, HerderImpl& herder, + std::set& missingTxSets) : SCPEnvelopeWrapper(e), mHerder(herder) { + releaseAssert(missingTxSets.empty()); + // attach everything we can to the wrapper auto qSetH = Slot::getCompanionQuorumSetHashFromStatement(e.statement); mQSet = mHerder.getQSet(qSetH); @@ -133,26 +145,43 @@ class SCPHerderEnvelopeWrapper : public SCPEnvelopeWrapper auto txSets = getValidatedTxSetHashes(e); for (auto const& txSetH : txSets) { - auto txSet = mHerder.getTxSet(txSetH); - if (txSet) + auto result = mHerder.getTxSet(txSetH); + if (auto* txSet = std::get_if(&result)) { - mTxSets.emplace_back(txSet); - } - else - { - throw std::runtime_error(fmt::format( - FMT_STRING("SCPHerderEnvelopeWrapper: Wrapping an unknown " - "tx set {} from envelope"), - hexAbbrev(txSetH))); + if (*txSet) + { + mTxSets.emplace_back(*txSet); + } + else + { + missingTxSets.insert(txSetH); + } } + // SkipTxSet: not missing, nothing to store } } + + void + addTxSet(TxSetXDRFrameConstPtr txSet) override + { + mTxSets.emplace_back(txSet); + } }; SCPEnvelopeWrapperPtr HerderSCPDriver::wrapEnvelope(SCPEnvelope const& envelope) { - auto r = std::make_shared(envelope, mHerder); + std::set missingTxSets; + auto r = std::make_shared(envelope, mHerder, + missingTxSets); + + // Register this wrapper for any tx sets that weren't available + // so we can update it later when the tx set arrives + for (auto const& h : missingTxSets) + { + mPendingTxSetEnvelopeWrappers[h].push_back(r); + } + return r; } @@ -214,6 +243,23 @@ HerderSCPDriver::validatePastOrFutureValue( slotIndex, b.closeTime, lcl.header.scpValue.closeTime); return SCPDriver::kInvalidValue; } + if (b.ext.v() == STELLAR_VALUE_SKIP) + { + auto const& ov = b.ext.originalValue(); + // We can check previousLedgerHash because the LCL header + // contains the hash of its parent. We cannot check + // previousLedgerVersion because the LCL header only has + // its own version, and a protocol upgrade on the LCL + // could make it differ from its parent's version. + if (ov.previousLedgerHash != lcl.header.previousLedgerHash) + { + CLOG_TRACE(Herder, + "Got a bad previousLedgerHash for skip value " + "in ledger {}", + slotIndex); + return SCPDriver::kInvalidValue; + } + } } else if (slotIndex < lcl.header.ledgerSeq) { @@ -275,6 +321,29 @@ HerderSCPDriver::validatePastOrFutureValue( return SCPDriver::kMaybeValidValue; } +// TODO(rebase): Consider just folding this into +// deserializeAndValidateStellarValue. +bool +HerderSCPDriver::checkValueTypeAndSkipHashInvariant(StellarValue const& b) const +{ + // Only signed and skip values participate in SCP. + // TODO(8): Grep for signature checks and update them for SKIP values + if (b.ext.v() != STELLAR_VALUE_SIGNED && b.ext.v() != STELLAR_VALUE_SKIP) + { + return false; + } + + // Skip values must have the skip hash, and non-skip values must not have + // the skip hash + if ((b.txSetHash == Herder::SKIP_LEDGER_HASH) != + (b.ext.v() == STELLAR_VALUE_SKIP)) + { + return false; + } + + return true; +} + SCPDriver::ValidationLevel HerderSCPDriver::validateValueAgainstLocalState(uint64_t slotIndex, StellarValue const& b, @@ -298,17 +367,55 @@ HerderSCPDriver::validateValueAgainstLocalState(uint64_t slotIndex, return SCPDriver::kInvalidValue; } + // For skip values, validate that the previous ledger context matches + // our LCL. Skip values don't have a real tx set to validate. + if (b.ext.v() == STELLAR_VALUE_SKIP) + { + if (nomination) + { + // Skip values should only appear in balloting, and so are + // considered invalid during nomination. + CLOG_DEBUG(Herder, + "HerderSCPDriver::validateValue i: {} rejecting " + "skip value during nomination", + slotIndex); + return SCPDriver::kInvalidValue; + } + auto const& ov = b.ext.originalValue(); + if (ov.previousLedgerHash != lcl.hash || + ov.previousLedgerVersion != lcl.header.ledgerVersion) + { + CLOG_DEBUG( + Herder, + "HerderSCPDriver::validateValue i: {} skip value has " + "mismatched previous ledger context", + slotIndex); + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + } + Hash const& txSetHash = b.txSetHash; - TxSetXDRFrameConstPtr txSet = mPendingEnvelopes.getTxSet(txSetHash); + // Skip values return early above, so this only runs for + // non-skip hashes. Extract the TxSetXDRFrameConstPtr. + TxSetXDRFrameConstPtr txSet = std::get( + mPendingEnvelopes.getTxSet(txSetHash)); auto closeTimeOffset = b.closeTime - lcl.header.scpValue.closeTime; if (!txSet) { - CLOG_ERROR(Herder, "validateValue i:{} unknown txSet {}", slotIndex, - hexAbbrev(txSetHash)); + if (mPendingEnvelopes.getTxSetWaitingTime(txSetHash).has_value()) + { + res = SCPDriver::kAwaitingDownload; + } + else + { + CLOG_DEBUG(Proto, "validateValue i:{} unknown txSet {}", + slotIndex, hexAbbrev(txSetHash)); - res = SCPDriver::kInvalidValue; + res = SCPDriver::kInvalidValue; + } } else if (!checkAndCacheTxSetValid(*txSet, lcl, closeTimeOffset)) { @@ -347,7 +454,15 @@ HerderSCPDriver::deserializeAndValidateStellarValue(Value const& value, return false; } - if (sv.ext.v() != STELLAR_VALUE_SIGNED) + // TODO(rebase): Remove slot index after rebase + if (!checkValueTypeAndSkipHashInvariant(sv)) + { + return false; + } + + // TODO(8): Grep for signature checks and update them for SKIP values + + if (sv.ext.v() != STELLAR_VALUE_SIGNED && sv.ext.v() != STELLAR_VALUE_SKIP) { return false; } @@ -440,8 +555,8 @@ HerderSCPDriver::extractValidValue(uint64_t slotIndex, Value const& value) } ValueWrapperPtr res; - if (validateValueAgainstLocalState(slotIndex, b, true) == - SCPDriver::kFullyValidatedValue) + if (validateValueAgainstLocalState(slotIndex, b, true) >= + SCPDriver::kAwaitingDownload) { extractValidUpgrades(b, true); res = wrapStellarValue(b); @@ -479,6 +594,40 @@ HerderSCPDriver::getValueString(Value const& v) const } } +Value +HerderSCPDriver::makeSkipLedgerValueFromValue(Value const& v) const +{ + ZoneScoped; + StellarValue originalValue = toStellarValueOrThrow(v); + auto const& lcl = mLedgerManager.getLastClosedLedgerHeader(); + + StellarValue sv; + sv.ext.v(STELLAR_VALUE_SKIP); + sv.txSetHash = Herder::SKIP_LEDGER_HASH; + sv.closeTime = originalValue.closeTime; + sv.upgrades = originalValue.upgrades; + sv.ext.originalValue().txSetHash = originalValue.txSetHash; + sv.ext.originalValue().previousLedgerHash = lcl.hash; + sv.ext.originalValue().previousLedgerVersion = lcl.header.ledgerVersion; + sv.ext.originalValue().lcValueSignature = + originalValue.ext.lcValueSignature(); + return xdr::xdr_to_opaque(sv); +} + +bool +HerderSCPDriver::isSkipLedgerValue(Value const& v) const +{ + ZoneScoped; + StellarValue sv; + bool success = toStellarValue(v, sv); + if (!success) + { + return false; + } + + return sv.ext.v() == STELLAR_VALUE_SKIP; +} + // timer handling void HerderSCPDriver::timerCallbackWrapper(uint64_t slotIndex, int timerID, @@ -612,12 +761,32 @@ HerderSCPDriver::computeTimeout(uint32 roundNumber, bool isNomination) // returns true if l < r // lh, rh are the hashes of l,h static bool -compareTxSets(ApplicableTxSetFrame const& l, ApplicableTxSetFrame const& r, - Hash const& lh, Hash const& rh, size_t lEncodedSize, - size_t rEncodedSize, LedgerHeader const& header, Hash const& s) +compareTxSets(ApplicableTxSetFrameConstPtr const& l, + ApplicableTxSetFrameConstPtr const& r, Hash const& lh, + Hash const& rh, std::optional lEncodedSize, + std::optional rEncodedSize, LedgerHeader const& header, + Hash const& s) { - auto lSize = l.size(header); - auto rSize = r.size(header); + if (!l && !r) + { + CLOG_TRACE(Proto, "Comparing tx sets but both are null"); + // Do not have either tx set. Compare hashes + return lessThanXored(lh, rh, s); + } + + if (!l || !r) + { + CLOG_TRACE( + Proto, + "Comparing tx sets but one is null: l: {}, r: {}, lh: {}, rh: {}", + l ? "exists" : "null", r ? "exists" : "null", hexAbbrev(lh), + hexAbbrev(rh)); + // If one exists, choose it + return !l; + } + + auto lSize = l->size(header); + auto rSize = r->size(header); if (lSize != rSize) { return lSize < rSize; @@ -625,8 +794,8 @@ compareTxSets(ApplicableTxSetFrame const& l, ApplicableTxSetFrame const& r, if (protocolVersionStartsFrom(header.ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { - auto lBids = l.getTotalInclusionFees(); - auto rBids = r.getTotalInclusionFees(); + auto lBids = l->getTotalInclusionFees(); + auto rBids = r->getTotalInclusionFees(); if (lBids != rBids) { return lBids < rBids; @@ -634,8 +803,8 @@ compareTxSets(ApplicableTxSetFrame const& l, ApplicableTxSetFrame const& r, } if (protocolVersionStartsFrom(header.ledgerVersion, ProtocolVersion::V_11)) { - auto lFee = l.getTotalFees(header); - auto rFee = r.getTotalFees(header); + auto lFee = l->getTotalFees(header); + auto rFee = r->getTotalFees(header); if (lFee != rFee) { return lFee < rFee; @@ -644,10 +813,10 @@ compareTxSets(ApplicableTxSetFrame const& l, ApplicableTxSetFrame const& r, if (protocolVersionStartsFrom(header.ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { - if (lEncodedSize != rEncodedSize) + if (lEncodedSize.value() != rEncodedSize.value()) { // Look for the smallest encoded size. - return lEncodedSize > rEncodedSize; + return lEncodedSize.value() > rEncodedSize.value(); } } return lessThanXored(lh, rh, s); @@ -777,20 +946,41 @@ HerderSCPDriver::combineCandidates(uint64_t slotIndex, ++it) { auto const& sv = *it; - auto cTxSet = mPendingEnvelopes.getTxSet(sv.txSetHash); - releaseAssert(cTxSet); + TxSetXDRFrameConstPtr cTxSet; + auto const cTxSetResult = mPendingEnvelopes.getTxSet(sv.txSetHash); + if (auto const* ptr = + std::get_if(&cTxSetResult)) + { + cTxSet = *ptr; + } + // else: SkipTxSet -> cTxSet stays null, handled by existing + // !cTxSet logic + // Only valid applicable tx sets should be combined. - auto cApplicableTxSet = cTxSet->prepareForApply(mApp, lcl.header); - releaseAssert(cApplicableTxSet); - if (cTxSet->previousLedgerHash() == lcl.hash) + auto cApplicableTxSet = + cTxSet ? cTxSet->prepareForApply(mApp, lcl.header) : nullptr; + // releaseAssert(cApplicableTxSet); + // TODO(12): When cTxSet is null we skip the previousLedgerHash + // check here, but it will be caught later: once the tx set is + // downloaded, checkAndCacheTxSetValid (called from validateValue) + // checks previousLedgerHash == lcl.hash before prepareForApply. + // A mismatch makes validateValue return kInvalidValue, preventing + // the node from voting to commit. + // Should write a test that causes combineCandidates to use a tx + // set with a bad previous ledger hash to verify this. + if (!cTxSet || cTxSet->previousLedgerHash() == lcl.hash) { - if (!highestTxSet || - compareTxSets(*highestApplicableTxSet, *cApplicableTxSet, - highest->txSetHash, sv.txSetHash, - highestTxSet->encodedSize(), - cTxSet->encodedSize(), lcl.header, - candidatesHash)) + if (highest == candidateValues.cend() || + compareTxSets( + highestApplicableTxSet, cApplicableTxSet, + highest->txSetHash, sv.txSetHash, + highestTxSet + ? std::make_optional(highestTxSet->encodedSize()) + : std::nullopt, + cTxSet ? std::make_optional(cTxSet->encodedSize()) + : std::nullopt, + lcl.header, candidatesHash)) { highest = it; highestTxSet = cTxSet; @@ -850,6 +1040,19 @@ HerderSCPDriver::getUpgradeNominationTimeoutLimit() const std::numeric_limits::max()); } +std::optional +HerderSCPDriver::getTxSetDownloadWaitTime(Value const& v) const +{ + StellarValue sv = toStellarValueOrThrow(v); + return mPendingEnvelopes.getTxSetWaitingTime(sv.txSetHash); +} + +std::chrono::milliseconds +HerderSCPDriver::getTxSetDownloadTimeout() const +{ + return mApp.getConfig().TX_SET_DOWNLOAD_TIMEOUT; +} + void HerderSCPDriver::valueExternalized(uint64_t slotIndex, Value const& value) { @@ -884,6 +1087,11 @@ HerderSCPDriver::valueExternalized(uint64_t slotIndex, Value const& value) bool isLatestSlot = slotIndex > mApp.getHerder().trackingConsensusLedgerIndex(); + if (b.ext.v() == STELLAR_VALUE_SKIP) + { + mSCPMetrics.mSkipExternalized.inc(); + } + // Only update tracking state when newer slot comes in if (isLatestSlot) { @@ -929,6 +1137,13 @@ HerderSCPDriver::valueExternalized(uint64_t slotIndex, Value const& value) } } +void +HerderSCPDriver::noteSkipValueReplaced(uint64_t) +{ + ZoneScoped; + mSCPMetrics.mSkipValueReplaced.inc(); +} + void HerderSCPDriver::logQuorumInformationAndUpdateMetrics(uint64_t index) { @@ -1002,6 +1217,41 @@ HerderSCPDriver::ballotDidHearFromQuorum(uint64_t, SCPBallot const&) { } +void +HerderSCPDriver::recordBallotBlockedOnTxSet(uint64_t slotIndex, + Value const& value) +{ + auto& timing = mSCPExecutionTimes[slotIndex]; + if (timing.mBallotBlockedOnTxSetStart.find(value) == + timing.mBallotBlockedOnTxSetStart.end()) + { + timing.mBallotBlockedOnTxSetStart[value] = mApp.getClock().now(); + } +} + +void +HerderSCPDriver::measureAndRecordBallotBlockedOnTxSet(uint64_t slotIndex, + Value const& value) +{ + auto it = mSCPExecutionTimes.find(slotIndex); + if (it != mSCPExecutionTimes.end()) + { + auto& timing = it->second; + auto valueIt = timing.mBallotBlockedOnTxSetStart.find(value); + if (valueIt != timing.mBallotBlockedOnTxSetStart.end()) + { + auto elapsed = + std::chrono::duration_cast( + mApp.getClock().now() - valueIt->second); + mSCPMetrics.mBallotBlockedOnTxSet.Update(elapsed); + return; + } + } + + // No blocking - record zero duration + mSCPMetrics.mBallotBlockedOnTxSet.Update(std::chrono::milliseconds(0)); +} + void HerderSCPDriver::nominatingValue(uint64_t slotIndex, Value const& value) { @@ -1297,6 +1547,32 @@ HerderSCPDriver::recordSCPExecutionMetrics(uint64_t slotIndex) } } +namespace +{ +// Remove expired weak_ptrs from each vector in the map, and erase map entries +// whose vectors become empty. +template +void +purgeExpiredWeakPtrs(std::map>>& map) +{ + for (auto mapIt = map.begin(); mapIt != map.end();) + { + auto& vec = mapIt->second; + vec.erase(std::remove_if(vec.begin(), vec.end(), + [](auto& wp) { return wp.expired(); }), + vec.end()); + if (vec.empty()) + { + mapIt = map.erase(mapIt); + } + else + { + ++mapIt; + } + } +} +} + void HerderSCPDriver::purgeSlotsOutsideRange(std::optional minSlotIndex, std::optional maxSlotIndex, @@ -1336,6 +1612,49 @@ HerderSCPDriver::purgeSlotsOutsideRange(std::optional minSlotIndex, } getSCP().purgeSlotsOutsideRange(minSlotIndex, maxSlotIndex, slotToKeep); + + // Clean up expired weak_ptrs from the pending tx set registries. + // This cleanup is correct because: + // 1. When SCP purges a slot via getSCP().purgeSlots() above, it destroys + // the Slot object along with its NominationProtocol/BallotProtocol + // 2. This destroys the ValueWrapperPtrs/EnvelopeWrapperPtrs stored there + // 3. If those were the only remaining references, the weak_ptrs here expire + // 4. We remove expired entries to prevent unbounded growth of the map + purgeExpiredWeakPtrs(mPendingTxSetWrappers); + purgeExpiredWeakPtrs(mPendingTxSetEnvelopeWrappers); +} + +void +HerderSCPDriver::onTxSetReceived(Hash const& txSetHash, + TxSetXDRFrameConstPtr txSet) +{ + // Update any ValueWrappers waiting for this tx set + auto it = mPendingTxSetWrappers.find(txSetHash); + if (it != mPendingTxSetWrappers.end()) + { + for (auto& wp : it->second) + { + if (auto sp = wp.lock()) + { + sp->setTxSet(txSet); + } + } + mPendingTxSetWrappers.erase(it); + } + + // Update any EnvelopeWrappers waiting for this tx set + auto envIt = mPendingTxSetEnvelopeWrappers.find(txSetHash); + if (envIt != mPendingTxSetEnvelopeWrappers.end()) + { + for (auto& wp : envIt->second) + { + if (auto sp = wp.lock()) + { + sp->addTxSet(txSet); + } + } + mPendingTxSetEnvelopeWrappers.erase(envIt); + } } void @@ -1350,35 +1669,57 @@ class SCPHerderValueWrapper : public ValueWrapper HerderImpl& mHerder; TxSetXDRFrameConstPtr mTxSet; + Hash const mTxSetHash; public: explicit SCPHerderValueWrapper(StellarValue const& sv, Value const& value, HerderImpl& herder) - : ValueWrapper(value), mHerder(herder) + : ValueWrapper(value), mHerder(herder), mTxSetHash(sv.txSetHash) { - mTxSet = mHerder.getTxSet(sv.txSetHash); - if (!mTxSet) + auto const result = mHerder.getTxSet(sv.txSetHash); + if (auto const* ptr = std::get_if(&result)) { - throw std::runtime_error(fmt::format( - FMT_STRING( - "SCPHerderValueWrapper tried to bind an unknown tx set {}"), - hexAbbrev(sv.txSetHash))); + mTxSet = *ptr; } + // else: SkipTxSet -> mTxSet stays null + // mTxSet may also be null if tx set hasn't been received yet + // (parallel downloading). It will be set later via setTxSet() + // when the tx set arrives. + } + + bool + hasTxSet() const + { + return mTxSet != nullptr || mTxSetHash == Herder::SKIP_LEDGER_HASH; + } + + Hash const& + getTxSetHash() const + { + return mTxSetHash; + } + + void + setTxSet(TxSetXDRFrameConstPtr txSet) override + { + releaseAssert(txSet->getContentsHash() == mTxSetHash); + mTxSet = txSet; } }; ValueWrapperPtr HerderSCPDriver::wrapValue(Value const& val) { - StellarValue sv; - auto b = toStellarValue(val, sv); - if (!b) + StellarValue sv = toStellarValueOrThrow(val); + auto res = std::make_shared(sv, val, mHerder); + + // If tx set wasn't available, register this wrapper to be updated later + // when the tx set arrives via onTxSetReceived() + if (!res->hasTxSet()) { - throw std::runtime_error( - fmt::format(FMT_STRING("Invalid value in SCPHerderValueWrapper {}"), - binToHex(val))); + mPendingTxSetWrappers[res->getTxSetHash()].push_back(res); } - auto res = std::make_shared(sv, val, mHerder); + return res; } @@ -1387,6 +1728,14 @@ HerderSCPDriver::wrapStellarValue(StellarValue const& sv) { auto val = xdr::xdr_to_opaque(sv); auto res = std::make_shared(sv, val, mHerder); + + // If tx set wasn't available, register this wrapper to be updated later + // when the tx set arrives via onTxSetReceived() + if (!res->hasTxSet()) + { + mPendingTxSetWrappers[res->getTxSetHash()].push_back(res); + } + return res; } diff --git a/src/herder/HerderSCPDriver.h b/src/herder/HerderSCPDriver.h index 1d5871cf26..744a80013e 100644 --- a/src/herder/HerderSCPDriver.h +++ b/src/herder/HerderSCPDriver.h @@ -76,6 +76,15 @@ class HerderSCPDriver : public SCPDriver std::string toShortString(NodeID const& pk) const override; std::string getValueString(Value const& v) const override; + // TODO: Docs. Mention that this function can throw if `v` cannot be + // converted to a `StellarValue` + // TODO: Mention in docs that this should only be called from slots with + // slot indicies equal to LCL+1. + Value makeSkipLedgerValueFromValue(Value const& v) const override; + + // TODO(4): Do I even need this function? + bool isSkipLedgerValue(Value const& v) const override; + // timer handling void setupTimer(uint64_t slotIndex, int timerID, std::chrono::milliseconds timeout, @@ -98,6 +107,9 @@ class HerderSCPDriver : public SCPDriver ValueWrapperPtr stripAllUpgrades(Value const& v) override; uint32_t getUpgradeNominationTimeoutLimit() const override; + // TODO: Docs + void noteSkipValueReplaced(uint64_t slotIndex) override; + // Submit a value to consider for slotIndex // previousValue is the value from slotIndex-1 void nominate(uint64_t slotIndex, StellarValue const& value, @@ -119,6 +131,15 @@ class HerderSCPDriver : public SCPDriver SCPBallot const& ballot) override; void acceptedCommit(uint64_t slotIndex, SCPBallot const& ballot) override; + // Ballot blocked on txset tracking methods + // Called when balloting becomes blocked waiting for a txset download + void recordBallotBlockedOnTxSet(uint64_t slotIndex, + Value const& value) override; + // Called when balloting is unblocked (setting mCommit) to measure and + // record how long we were blocked + void measureAndRecordBallotBlockedOnTxSet(uint64_t slotIndex, + Value const& value) override; + std::optional getPrepareStart(uint64_t slotIndex); // validate close time as much as possible @@ -136,6 +157,11 @@ class HerderSCPDriver : public SCPDriver std::optional maxSlotIndex, uint64 slotToKeep); + // Called when a tx set is received to update any ValueWrappers that were + // created before the tx set was available (for parallel tx set + // downloading). + void onTxSetReceived(Hash const& txSetHash, TxSetXDRFrameConstPtr txSet); + double getExternalizeLag(NodeID const& id) const; Json::Value getQsetLagInfo(bool summary, bool fullKeys); @@ -148,6 +174,10 @@ class HerderSCPDriver : public SCPDriver // as missing nodes from previous interval void startCheckForDeadNodesInterval(); + std::optional + getTxSetDownloadWaitTime(Value const& v) const override; + std::chrono::milliseconds getTxSetDownloadTimeout() const override; + // Application-specific weight function. This function uses the quality // levels from automatic quorum set generation to determine the weight of a // validator. It is designed to ensure that: @@ -186,6 +216,20 @@ class HerderSCPDriver : public SCPDriver PendingEnvelopes& mPendingEnvelopes; SCP mSCP; + // Registry of ValueWrappers that were created before their tx set was + // available. Maps txSetHash -> weak_ptrs to wrappers awaiting that tx set. + // When onTxSetReceived() is called, we update any waiting wrappers. + // Cleanup of expired weak_ptrs happens in purgeSlots(). + std::map>> + mPendingTxSetWrappers; + + // Registry of EnvelopeWrappers that were created before their tx set was + // available. Maps txSetHash -> weak_ptrs to wrappers awaiting that tx set. + // When onTxSetReceived() is called, we update any waiting wrappers. + // Cleanup of expired weak_ptrs happens in purgeSlots(). + std::map>> + mPendingTxSetEnvelopeWrappers; + struct SCPMetrics { medida::Meter& mEnvelopeSign; @@ -204,6 +248,16 @@ class HerderSCPDriver : public SCPDriver medida::Timer& mFirstToSelfExternalizeLag; medida::Timer& mSelfToOthersExternalizeLag; + // Timer tracking how long balloting was blocked waiting for a txset + // download (time spent in kAwaitingDownload before setting mCommit) + medida::Timer& mBallotBlockedOnTxSet; + + // Tracks how many ledgers we externalized using a skip value. + medida::Counter& mSkipExternalized; + // Counts replacements of proposed values with the synthesized skip + // value. + medida::Counter& mSkipValueReplaced; + SCPMetrics(Application& app); }; @@ -232,6 +286,10 @@ class HerderSCPDriver : public SCPDriver // externalize timing information std::optional mFirstExternalize; std::optional mSelfExternalize; + + // Tracks when balloting first became blocked on each txset in this + // slot. + std::map mBallotBlockedOnTxSetStart; }; // Map of time points for each slot to measure key protocol metrics: @@ -256,6 +314,9 @@ class HerderSCPDriver : public SCPDriver mutable RandomEvictionCache mTxSetValidCache; + // TODO: Docs + bool checkValueTypeAndSkipHashInvariant(StellarValue const& sv) const; + SCPDriver::ValidationLevel validateValueAgainstLocalState(uint64_t slotIndex, StellarValue const& sv, bool nomination) const; diff --git a/src/herder/HerderUtils.cpp b/src/herder/HerderUtils.cpp index 6cca619d61..62322fe363 100644 --- a/src/herder/HerderUtils.cpp +++ b/src/herder/HerderUtils.cpp @@ -3,6 +3,7 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "herder/HerderUtils.h" +#include "crypto/Hex.h" #include "crypto/KeyUtils.h" #include "lib/json/json.h" #include "main/Config.h" @@ -30,6 +31,19 @@ toStellarValue(Value const& v, StellarValue& sv) return true; } +StellarValue +toStellarValueOrThrow(Value const& v) +{ + StellarValue sv; + if (!toStellarValue(v, sv)) + { + throw std::runtime_error(fmt::format( + FMT_STRING("Failed to convert Value '{}' to StellarValue"), + binToHex(v))); + } + return sv; +} + std::optional> getTxSetHashes(SCPEnvelope const& envelope) { diff --git a/src/herder/HerderUtils.h b/src/herder/HerderUtils.h index 28b4b29bd5..4274e5a788 100644 --- a/src/herder/HerderUtils.h +++ b/src/herder/HerderUtils.h @@ -21,6 +21,10 @@ struct StellarValue; // returns false on error bool toStellarValue(Value const& v, StellarValue& sv); +// Converts a Value into a StellarValue +// throws an exception on error. +StellarValue toStellarValueOrThrow(Value const& v); + // Extract the transaction set hashes present in `envelope`. // Returns nullopt if any of the values in the envelope cannot be parsed. std::optional> getTxSetHashes(SCPEnvelope const& envelope); diff --git a/src/herder/LedgerCloseData.cpp b/src/herder/LedgerCloseData.cpp index 962e52643b..11e596d9cf 100644 --- a/src/herder/LedgerCloseData.cpp +++ b/src/herder/LedgerCloseData.cpp @@ -1,6 +1,7 @@ #include "util/asio.h" #include "LedgerCloseData.h" #include "crypto/Hex.h" +#include "herder/Herder.h" #include "herder/Upgrades.h" #include "main/Application.h" #include "util/GlobalChecks.h" @@ -23,7 +24,9 @@ LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq, , mValue(v) , mExpectedLedgerHash(expectedLedgerHash) { - releaseAssert(txSet->getContentsHash() == mValue.txSetHash); + Hash const& valueTxHash = mValue.txSetHash; + releaseAssert(valueTxHash == Herder::SKIP_LEDGER_HASH || + txSet->getContentsHash() == valueTxHash); } #ifdef BUILD_TESTS @@ -37,7 +40,9 @@ LedgerCloseData::LedgerCloseData( , mExpectedLedgerHash(expectedLedgerHash) , mExpectedResults(expectedResults) { - releaseAssert(txSet->getContentsHash() == mValue.txSetHash); + Hash const& valueTxHash = mValue.txSetHash; + releaseAssert(valueTxHash == Herder::SKIP_LEDGER_HASH || + txSet->getContentsHash() == valueTxHash); } #endif // BUILD_TESTS @@ -47,9 +52,20 @@ stellarValueToString(Config const& c, StellarValue const& sv) std::stringstream res; res << "["; - if (sv.ext.v() == STELLAR_VALUE_SIGNED) + switch (sv.ext.v()) { + case STELLAR_VALUE_BASIC: + break; + case STELLAR_VALUE_SIGNED: res << " SIGNED@" << c.toShortString(sv.ext.lcValueSignature().nodeID); + break; + case STELLAR_VALUE_SKIP: + res << " SKIP@" + << c.toShortString(sv.ext.originalValue().lcValueSignature.nodeID); + break; + default: + res << " UNKNOWN"; + break; } res << " txH: " << hexAbbrev(sv.txSetHash) << ", ct: " << sv.closeTime << ", upgrades: ["; diff --git a/src/herder/PendingEnvelopes.cpp b/src/herder/PendingEnvelopes.cpp index 385ffaa22d..ca9faeaa71 100644 --- a/src/herder/PendingEnvelopes.cpp +++ b/src/herder/PendingEnvelopes.cpp @@ -1,4 +1,4 @@ -#include "PendingEnvelopes.h" +#include "PendingEnvelopes.h" #include "crypto/Hex.h" #include "crypto/SHA.h" #include "herder/HerderImpl.h" @@ -185,7 +185,10 @@ TxSetXDRFrameConstPtr PendingEnvelopes::putTxSet(Hash const& hash, uint64 slot, TxSetXDRFrameConstPtr txset) { - auto res = getKnownTxSet(hash, slot, true); + // Cannot add a tx set for the skip ledger hash + releaseAssert(hash != Herder::SKIP_LEDGER_HASH); + + auto res = std::get(getKnownTxSet(hash, slot, true)); if (!res) { res = txset; @@ -198,11 +201,16 @@ PendingEnvelopes::putTxSet(Hash const& hash, uint64 slot, // tries to find a txset in memory, setting touch also touches the LRU, // extending the lifetime of the result *and* updating the slot number // to a greater value if needed -TxSetXDRFrameConstPtr +TxSetResult PendingEnvelopes::getKnownTxSet(Hash const& hash, uint64 slot, bool touch) { // slot is only used when `touch` is set releaseAssert(touch || (slot == 0)); + if (hash == Herder::SKIP_LEDGER_HASH) + { + return SkipTxSet{}; + } + TxSetXDRFrameConstPtr res; auto it = mKnownTxSets.find(hash); if (it != mKnownTxSets.end()) @@ -227,6 +235,17 @@ PendingEnvelopes::getKnownTxSet(Hash const& hash, uint64 slot, bool touch) return res; } +bool +PendingEnvelopes::hasTxSet(Hash const& hash) const +{ + if (hash == Herder::SKIP_LEDGER_HASH) + { + return true; + } + auto it = mKnownTxSets.find(hash); + return it != mKnownTxSets.end() && it->second.lock() != nullptr; +} + void PendingEnvelopes::addTxSet(Hash const& hash, uint64 lastSeenSlotIndex, TxSetXDRFrameConstPtr txset) @@ -250,7 +269,29 @@ PendingEnvelopes::recvTxSet(Hash const& hash, TxSetXDRFrameConstPtr txset) return false; } + // Log successful download of a previously awaited tx set + auto waitingTime = mTxSetFetcher.getWaitingTime(hash); + if (waitingTime.has_value()) + { + CLOG_DEBUG( + Proto, + "Successfully downloaded tx set {} that was kAwaitingDownload - " + "download took {} ms", + hexAbbrev(hash), waitingTime.value().count()); + } + else + { + CLOG_DEBUG(Proto, + "Successfully downloaded tx set {} that was requested", + hexAbbrev(hash)); + } + addTxSet(hash, lastSeenSlotIndex, txset); + + // Update any ValueWrappers that were created before this tx set was + // available + mHerder.getHerderSCPDriver().onTxSetReceived(hash, txset); + return true; } @@ -307,7 +348,8 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope) auto const& values = maybeValues.value(); if (std::any_of(values.begin(), values.end(), [](auto const& value) { - return value.ext.v() != STELLAR_VALUE_SIGNED; + return value.ext.v() != STELLAR_VALUE_SIGNED && + value.ext.v() != STELLAR_VALUE_SKIP; })) { CLOG_TRACE(Herder, "Dropping envelope from {} (value not signed)", @@ -332,6 +374,7 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope) auto& envs = mEnvelopes[envelope.statement.slotIndex]; auto& fetching = envs.mFetchingEnvelopes; auto& processed = envs.mProcessedEnvelopes; + auto& partiallyReady = envs.mPartiallyReadyEnvelopes; auto fetchIt = fetching.find(envelope); @@ -372,6 +415,9 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope) processed.emplace(envelope); fetching.erase(fetchIt); + // Remove from partially ready envelopes if it was there + partiallyReady.erase(envelope); + envelopeReady(envelope); updateMetrics(); return Herder::ENVELOPE_STATUS_READY; @@ -381,6 +427,13 @@ PendingEnvelopes::recvSCPEnvelope(SCPEnvelope const& envelope) // else just keep waiting for it to come in // and refresh fetchers as needed startFetch(envelope); + + SCPStatementType type = envelope.statement.pledges.type(); + if (isPartiallyFetched(envelope) && + (type == SCP_ST_NOMINATE || type == SCP_ST_PREPARE)) + { + partiallyReady.insert(envelope); + } } return Herder::ENVELOPE_STATUS_FETCHING; @@ -499,10 +552,12 @@ PendingEnvelopes::recordReceivedCost(SCPEnvelope const& env) } else { - auto txSetPtr = getTxSet(v.txSetHash); - if (txSetPtr) + auto txSetResult = getTxSet(v.txSetHash); + if (auto* txSetPtr = + std::get_if(&txSetResult); + txSetPtr && *txSetPtr) { - txSetSize = txSetPtr->encodedSize(); + txSetSize = (*txSetPtr)->encodedSize(); mValueSizeCache.put(v.txSetHash, txSetSize); } } @@ -564,23 +619,29 @@ PendingEnvelopes::envelopeReady(SCPEnvelope const& envelope) mEnvelopes[slot].mReadyEnvelopes.push_back(envW); } +bool +PendingEnvelopes::isPartiallyFetched(SCPEnvelope const& envelope) +{ + return getKnownQSet( + Slot::getCompanionQuorumSetHashFromStatement(envelope.statement), + false) != nullptr; +} + bool PendingEnvelopes::isFullyFetched(SCPEnvelope const& envelope) { - if (!getKnownQSet( - Slot::getCompanionQuorumSetHashFromStatement(envelope.statement), - false)) + if (!isPartiallyFetched(envelope)) { return false; } auto txSetHashes = getValidatedTxSetHashes(envelope); - return std::all_of(std::begin(txSetHashes), std::end(txSetHashes), - [&](Hash const& txSetHash) { - return getKnownTxSet(txSetHash, 0, false); - }); + return std::all_of( + std::begin(txSetHashes), std::end(txSetHashes), + [&](Hash const& txSetHash) { return hasTxSet(txSetHash); }); } +// Requests all missing tx sets in `envelope` void PendingEnvelopes::startFetch(SCPEnvelope const& envelope) { @@ -596,8 +657,12 @@ PendingEnvelopes::startFetch(SCPEnvelope const& envelope) for (auto const& h2 : getValidatedTxSetHashes(envelope)) { - if (!getKnownTxSet(h2, 0, false)) + if (!hasTxSet(h2)) { + CLOG_TRACE( + Proto, + "PendingEnvelopes::startFetch: requesting missing txset {}", + hexAbbrev(h2)); mTxSetFetcher.fetch(h2, envelope); needSomething = true; } @@ -647,6 +712,7 @@ PendingEnvelopes::pop(uint64 slotIndex) auto it = mEnvelopes.begin(); while (it != mEnvelopes.end() && slotIndex >= it->first) { + // Process fully ready envelopes first auto& v = it->second.mReadyEnvelopes; if (v.size() != 0) { @@ -656,6 +722,19 @@ PendingEnvelopes::pop(uint64 slotIndex) updateMetrics(); return ret; } + + // If no more fully ready envelopes, proceed to processing partially + // ready envelopes + auto& partial = it->second.mPartiallyReadyEnvelopes; + if (partial.size() != 0) + { + // If we have partially ready envelopes, we can return the first one + auto it = partial.begin(); + SCPEnvelopeWrapperPtr ret = + mHerder.getHerderSCPDriver().wrapEnvelope(*it); + partial.erase(it); + return ret; + } it++; } return nullptr; @@ -783,7 +862,7 @@ PendingEnvelopes::forceRebuildQuorum() mRebuildQuorum = true; } -TxSetXDRFrameConstPtr +TxSetResult PendingEnvelopes::getTxSet(Hash const& hash) { return getKnownTxSet(hash, 0, false); @@ -815,6 +894,12 @@ PendingEnvelopes::getQSet(Hash const& hash) return qset; } +std::optional +PendingEnvelopes::getTxSetWaitingTime(Hash const& hash) const +{ + return mTxSetFetcher.getWaitingTime(hash); +} + Json::Value PendingEnvelopes::getJsonInfo(size_t limit) { diff --git a/src/herder/PendingEnvelopes.h b/src/herder/PendingEnvelopes.h index d6de92dd7a..074d52cf9b 100644 --- a/src/herder/PendingEnvelopes.h +++ b/src/herder/PendingEnvelopes.h @@ -37,6 +37,9 @@ struct SlotEnvelopes // envelopes we are fetching right now std::map mFetchingEnvelopes; + // TODO: This needs a better name and descriptor + std::set mPartiallyReadyEnvelopes; + // list of ready envelopes that haven't been sent to SCP yet std::vector mReadyEnvelopes; @@ -100,6 +103,8 @@ class PendingEnvelopes void envelopeReady(SCPEnvelope const& envelope); void discardSCPEnvelope(SCPEnvelope const& envelope); bool isFullyFetched(SCPEnvelope const& envelope); + // TODO: Docs and maybe better name (like qsetIsFetched) + bool isPartiallyFetched(SCPEnvelope const& envelope); void startFetch(SCPEnvelope const& envelope); void stopFetch(SCPEnvelope const& envelope); void touchFetchCache(SCPEnvelope const& envelope); @@ -112,8 +117,11 @@ class PendingEnvelopes // tries to find a txset in memory, setting touch also touches the LRU, // extending the lifetime of the result - TxSetXDRFrameConstPtr getKnownTxSet(Hash const& hash, uint64 slot, - bool touch); + TxSetResult getKnownTxSet(Hash const& hash, uint64 slot, bool touch); + + // Returns true if the tx set is available locally (either in cache or + // is a skip ledger hash which doesn't need fetching). + bool hasTxSet(Hash const& hash) const; void cleanKnownData(); @@ -203,9 +211,17 @@ class PendingEnvelopes Json::Value getJsonInfo(size_t limit); - TxSetXDRFrameConstPtr getTxSet(Hash const& hash); + TxSetResult getTxSet(Hash const& hash); SCPQuorumSetPtr getQSet(Hash const& hash); + /** + * Return how long the transaction set fetcher has been waiting for the + * transaction set identified by @p hash. Returns nullopt if the transaction + * set is not being fetched. + */ + std::optional + getTxSetWaitingTime(Hash const& hash) const; + // returns true if we think that the node is in the transitive quorum for // sure bool isNodeDefinitelyInQuorum(NodeID const& node); diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index 0eb6c4e3c6..b37eb4a441 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -961,29 +961,35 @@ makeTxSetFromTransactions( } TxSetXDRFrameConstPtr -TxSetXDRFrame::makeEmpty(LedgerHeaderHistoryEntry const& lclHeader) +TxSetXDRFrame::makeEmpty(Hash const& previousLedgerHash, + uint32 previousLedgerVersion) { - if (protocolVersionStartsFrom(lclHeader.header.ledgerVersion, + if (protocolVersionStartsFrom(previousLedgerVersion, SOROBAN_PROTOCOL_VERSION)) { bool isParallelSoroban = false; - isParallelSoroban = - protocolVersionStartsFrom(lclHeader.header.ledgerVersion, - PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION); + isParallelSoroban = protocolVersionStartsFrom( + previousLedgerVersion, PARALLEL_SOROBAN_PHASE_PROTOCOL_VERSION); std::vector emptyPhases = { TxSetPhaseFrame::makeEmpty(TxSetPhase::CLASSIC, false), TxSetPhaseFrame::makeEmpty(TxSetPhase::SOROBAN, isParallelSoroban)}; GeneralizedTransactionSet txSet; - transactionsToGeneralizedTransactionSetXDR(emptyPhases, lclHeader.hash, - txSet); + transactionsToGeneralizedTransactionSetXDR(emptyPhases, + previousLedgerHash, txSet); return TxSetXDRFrame::makeFromWire(txSet); } TransactionSet txSet; - transactionsToTransactionSetXDR({}, lclHeader.hash, txSet); + transactionsToTransactionSetXDR({}, previousLedgerHash, txSet); return TxSetXDRFrame::makeFromWire(txSet); } +TxSetXDRFrameConstPtr +TxSetXDRFrame::makeEmpty(LedgerHeaderHistoryEntry const& lclHeader) +{ + return makeEmpty(lclHeader.hash, lclHeader.header.ledgerVersion); +} + TxSetXDRFrameConstPtr TxSetXDRFrame::makeFromHistoryTransactions(Hash const& previousLedgerHash, TxFrameList const& txs) diff --git a/src/herder/TxSetFrame.h b/src/herder/TxSetFrame.h index de9908645e..1626262fbd 100644 --- a/src/herder/TxSetFrame.h +++ b/src/herder/TxSetFrame.h @@ -173,6 +173,12 @@ class TxSetXDRFrame : public NonMovableOrCopyable static TxSetXDRFrameConstPtr makeEmpty(LedgerHeaderHistoryEntry const& lclHeader); + // Creates a valid empty TxSetXDRFrame from the previous ledger hash and + // protocol version. Used for skip ledger values where the full header + // may not be available. + static TxSetXDRFrameConstPtr makeEmpty(Hash const& previousLedgerHash, + uint32 previousLedgerVersion); + // `makeFromWire` methods create a TxSetXDRFrame from the XDR messages. // These methods don't perform any validation on the XDR. static TxSetXDRFrameConstPtr makeFromWire(TransactionSet const& xdrTxSet); diff --git a/src/herder/test/HerderTests.cpp b/src/herder/test/HerderTests.cpp index 75d9f1531e..764c960418 100644 --- a/src/herder/test/HerderTests.cpp +++ b/src/herder/test/HerderTests.cpp @@ -21,6 +21,7 @@ #include "history/test/HistoryTestsUtils.h" #include "catchup/LedgerApplyManagerImpl.h" +#include "crypto/KeyUtils.h" #include "crypto/SHA.h" #include "database/Database.h" #include "herder/HerderUtils.h" @@ -2878,6 +2879,41 @@ testSCPDriver(uint32 protocolVersion, uint32_t maxTxSetSize, size_t expectedOps) testInvalidValue(/* isNomination */ false); } } + + SECTION("skip hash/type mismatch") + { + auto checkInvalidMismatch = [&](StellarValue const& sv) { + auto v = xdr::xdr_to_opaque(sv); + + REQUIRE(scp.validateValue(seq, v, true) == + SCPDriver::kInvalidValue); + REQUIRE(scp.validateValue(seq, v, false) == + SCPDriver::kInvalidValue); + + ValueWrapperPtr extracted; + REQUIRE_NOTHROW(extracted = scp.extractValidValue(seq, v)); + REQUIRE(extracted == nullptr); + }; + + SECTION("signed value with skip hash") + { + auto p = makeTxPair(herder, txSet0, ct); + StellarValue sv; + xdr::xdr_from_opaque(p.first, sv); + sv.txSetHash = Herder::SKIP_LEDGER_HASH; + checkInvalidMismatch(sv); + } + + SECTION("skip value without skip hash") + { + auto p = makeTxPair(herder, txSet0, ct); + auto skipValue = scp.makeSkipLedgerValueFromValue(p.first); + StellarValue sv; + xdr::xdr_from_opaque(skipValue, sv); + sv.txSetHash = txSet0->getContentsHash(); + checkInvalidMismatch(sv); + } + } } SECTION("validateValue closeTimes") @@ -2965,7 +3001,7 @@ testSCPDriver(uint32 protocolVersion, uint32_t maxTxSetSize, size_t expectedOps) // Triggering next ledger will construct and cache the block herder.triggerNextLedger(seq, true); // All hits during the whole SCP round - REQUIRE(cache.getCounters().mHits == 8); + REQUIRE(cache.getCounters().mHits == 9); // One miss from the initial makeTxSetFromTransactions REQUIRE(cache.getCounters().mMisses == 1); } @@ -3265,7 +3301,8 @@ TEST_CASE("SCP State", "[herder]") { for (auto const& h : getValidatedTxSetHashes(msg)) { - REQUIRE(herder.getPendingEnvelopes().getTxSet(h)); + REQUIRE(std::get( + herder.getPendingEnvelopes().getTxSet(h))); REQUIRE(app->getPersistentState().hasTxSet(h)); hashes.insert(h); } @@ -3837,8 +3874,9 @@ TEST_CASE("soroban txs each parameter surge priced", "[soroban][herder]") ->getLedgerManager() .getLastClosedLedgerHeader() .header; - auto txSet = nodes[0]->getHerder().getTxSet( - lclHeader.scpValue.txSetHash); + auto txSet = std::get( + nodes[0]->getHerder().getTxSet( + lclHeader.scpValue.txSetHash)); GeneralizedTransactionSet xdrTxSet; txSet->toXDR(xdrTxSet); auto const& phase = xdrTxSet.v1TxSet().phases.at( @@ -4300,13 +4338,13 @@ TEST_CASE("soroban txs accepted by the network", bool upgradeApplied = false; simulation->crankUntil( [&]() { - auto txSetSize = + auto txSetResult = nodes[0]->getHerder().getTxSet( nodes[0] - ->getHerder() - .getTxSet(nodes[0] - ->getLedgerManager() - .getLastClosedLedgerHeader() - .header.scpValue.txSetHash) + ->getLedgerManager() + .getLastClosedLedgerHeader() + .header.scpValue.txSetHash); + auto txSetSize = + std::get(txSetResult) ->sizeOpTotalForLogging(); upgradeApplied = upgradeApplied || txSetSize > ledgerWideLimit; @@ -4425,7 +4463,8 @@ getValidatorExternalizeMessages(Application& app, uint32_t start, uint32_t end) auto& pe = herder.getPendingEnvelopes(); toStellarValue(env.statement.pledges.externalize().commit.value, sv); - auto txset = pe.getTxSet(sv.txSetHash); + auto txset = + std::get(pe.getTxSet(sv.txSetHash)); REQUIRE(txset); validatorSCPMessages[seq] = std::make_pair(env, txset->toStellarMessage()); @@ -5334,8 +5373,8 @@ TEST_CASE("processing of next slot happens after apply", "[herder]") StellarValue sv; toStellarValue(env.statement.pledges.externalize().commit.value, sv); - auto txSet = - herder.getPendingEnvelopes().getTxSet(sv.txSetHash); + auto txSet = std::get( + herder.getPendingEnvelopes().getTxSet(sv.txSetHash)); REQUIRE(txSet); return std::make_pair(env, txSet->toStellarMessage()); } @@ -5371,11 +5410,14 @@ TEST_CASE("processing of next slot happens after apply", "[herder]") SCPEnvelope invalidEnv{}; invalidEnv.statement.slotIndex = invalidSlot; - invalidEnv.statement.pledges.type(SCP_ST_PREPARE); - invalidEnv.statement.pledges.prepare().ballot.counter = 1; - invalidEnv.statement.pledges.prepare().ballot.value = - xdr::xdr_to_opaque(invalidValue); - invalidEnv.statement.pledges.prepare().quorumSetHash = qsetHash; + invalidEnv.statement.pledges.type(SCP_ST_CONFIRM); + auto& invalidConfirm = invalidEnv.statement.pledges.confirm(); + invalidConfirm.ballot.counter = 1; + invalidConfirm.ballot.value = xdr::xdr_to_opaque(invalidValue); + invalidConfirm.nPrepared = 1; + invalidConfirm.nCommit = 1; + invalidConfirm.nH = 1; + invalidConfirm.quorumSetHash = qsetHash; invalidEnv.statement.nodeID = keyA.getPublicKey(); herderC.signEnvelope(keyA, invalidEnv); @@ -5399,7 +5441,7 @@ TEST_CASE("processing of next slot happens after apply", "[herder]") Herder::ENVELOPE_STATUS_READY); REQUIRE(herderC.trackingConsensusLedgerIndex() == target - 1); - // Inject future invalid PREPARE for slot target+1. Herder should accept. + // Inject future invalid CONFIRM for slot target+1. Herder should accept. REQUIRE(herderC.recvSCPEnvelope(invalidEnv) == Herder::ENVELOPE_STATUS_FETCHING); @@ -6887,6 +6929,590 @@ TEST_CASE("SCP message capture from previous ledger", "[herder]") REQUIRE(checkSCPHistoryEntries(C, 2, expectedTypes)); } +// Helper function to feed a transaction set from source node to target node +// based on a HistoricalStatement +static bool +feedTxSetFromStatement(Application& sourceNode, Application& targetNode, + SCPStatement const& statement) +{ + auto stellarValues = getStellarValues(statement).value(); + auto& sourceHerder = dynamic_cast(sourceNode.getHerder()); + auto& targetHerder = dynamic_cast(targetNode.getHerder()); + bool fedNonEmptySet = false; + + for (auto const& sv : stellarValues) + { + // Skip values don't have a real tx set to feed + if (sv.txSetHash == Herder::SKIP_LEDGER_HASH) + { + continue; + } + + auto txSet = std::get( + sourceHerder.getTxSet(sv.txSetHash)); + REQUIRE(txSet); + fedNonEmptySet |= txSet->sizeTxTotal() > 0; + targetHerder.recvTxSet(txSet->getContentsHash(), txSet); + CLOG_ERROR(Herder, "Fed value {}", hexAbbrev(txSet->getContentsHash())); + } + return fedNonEmptySet; +} + +static bool +feedTxSetsFromSlot(Application& sourceNode, Application& targetNode, + uint64 slotIndex) +{ + // Get the herder and SCP from the source node + auto& sourceHerder = dynamic_cast(sourceNode.getHerder()); + auto& sourceSCP = sourceHerder.getSCP(); + + // Get the slot from the source node + auto sourceSlot = sourceSCP.getSlotForTesting(slotIndex); + REQUIRE(sourceSlot != nullptr); + + // Get the historical statements from the source slot + auto const& historicalStatements = + sourceSlot->getHistoricalStatementsForTesting(); + REQUIRE(!historicalStatements.empty()); + + // Get the target herder + auto& targetHerder = dynamic_cast(targetNode.getHerder()); + + // Feed each tx set to the target node + bool fedNonEmptySet = false; + for (auto const& histStmt : historicalStatements) + { + fedNonEmptySet |= + feedTxSetFromStatement(sourceNode, targetNode, histStmt.mStatement); + } + return fedNonEmptySet; +} + +// Helper function to feed SCP messages from one node to another for a specific +// slot +static bool +feedSCPMessagesForSlot(Application& sourceNode, Application& targetNode, + uint64 slotIndex, size_t injectionPoint) +{ + REQUIRE(slotIndex == + targetNode.getLedgerManager().getLastClosedLedgerNum() + 1); + // Get the herder and SCP from the source node + auto& sourceHerder = dynamic_cast(sourceNode.getHerder()); + auto& sourceSCP = sourceHerder.getSCP(); + + // Get the slot from the source node + auto sourceSlot = sourceSCP.getSlotForTesting(slotIndex); + REQUIRE(sourceSlot != nullptr); + + // Get the historical statements from the source slot + auto const& historicalStatements = + sourceSlot->getHistoricalStatementsForTesting(); + REQUIRE(!historicalStatements.empty()); + + // Get the target herder + auto& targetHerder = dynamic_cast(targetNode.getHerder()); + + CLOG_ERROR(Herder, "Injection point {}", injectionPoint); + bool doTest = false; + for (size_t i = 0; i < historicalStatements.size(); ++i) + { + auto const& histStmt = historicalStatements.at(i); + // Node must be synced for background tx set downloading to + // activate + // TODO: This might change ^^. Remove the assert below if it + // does. + REQUIRE(targetNode.getState() == Application::State::APP_SYNCED_STATE); + + // Create an envelope from the statement + SCPEnvelope envelope = sourceSlot->createEnvelope(histStmt.mStatement); + + // Feed the envelope to the target node + auto status = targetHerder.recvSCPEnvelope(envelope); + + // Log for debugging + CLOG_ERROR(Herder, + "Fed historical SCP message to target node for slot {}, " + "status: {}", + slotIndex, static_cast(status)); + + // TODO: I figure either of these statuses is OK, but does this + // prototype ever report PROCESSED for messages where it doesn't + // have the tx set? Should it? + // TODO: I think technically it's possible that with the FIRST time + // around, there's a nonempty tx set that node0 already has, so this + // *could* return READY here in that case. Should probably have a more + // clever check here. + REQUIRE((!doTest || i > injectionPoint || + status == Herder::EnvelopeStatus::ENVELOPE_STATUS_FETCHING || + status == Herder::EnvelopeStatus::ENVELOPE_STATUS_PROCESSED)); + + // TODO: This spams the tx sets at the injection point and beyond + // because if a later vote changes the tx set, the target will have + // ignored the earlier tx sets (as it never requested them). This is a + // hack to ensure the target always gets the tx set immediately after + // requesting it, but it would be better to only send each tx set once + // (rather than spamming it), and also support tx sets that come in + // *after* the injection point. + if (i >= injectionPoint) + { + // Inject tx sets + doTest |= feedTxSetsFromSlot(sourceNode, targetNode, slotIndex); + } + } + return doTest; +} + +// Helper function to get the number of injection points for a slot +static size_t +getInjectionPointsForSlot(Application& sourceNode, uint64 slotIndex) +{ + auto& sourceHerder = dynamic_cast(sourceNode.getHerder()); + auto& sourceSCP = sourceHerder.getSCP(); + auto sourceSlot = sourceSCP.getSlotForTesting(slotIndex); + REQUIRE(sourceSlot != nullptr); + auto const& historicalStatements = + sourceSlot->getHistoricalStatementsForTesting(); + REQUIRE(!historicalStatements.empty()); + return historicalStatements.size() - 1; +} + +// TODO: Does this belong in SCP tests instead? +// TODO: I marked this as `.skip` because I think it was always only used to +// examine output (but isn't necessarily expected to pass). But I don't +// remember. Also, it hasn't been updated since adding skip ledger support and +// fails suspiciously around where the disconnected node would vote to skip. +TEST_CASE("Parallel tx set downloading", "[herder][!hide]") +{ + int constexpr simSize = 3; + int constexpr threshold = 2; + auto const networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); + auto simulation = + std::make_shared(Simulation::OVER_LOOPBACK, networkID); + + std::array configs; + std::array pubkeys; + SCPQuorumSet qset; + qset.threshold = threshold; + for (int i = 0; i < simSize; ++i) + { + auto& cfg = configs.at(i) = simulation->newConfig(); + cfg.GENESIS_TEST_ACCOUNT_COUNT = 100; + cfg.TX_SET_DOWNLOAD_TIMEOUT = std::chrono::milliseconds(100); + auto const& pubkey = cfg.NODE_SEED.getPublicKey(); + pubkeys.at(i) = pubkey; + qset.validators.push_back(pubkey); + } + + // Add nodes to simulation + for (int i = 0; i < simSize; ++i) + { + auto const& cfg = configs.at(i); + simulation->addNode(cfg.NODE_SEED, qset, &cfg); + } + + // Connect nodes and start simulation + simulation->addPendingConnection(pubkeys.at(0), pubkeys.at(1)); + simulation->addPendingConnection(pubkeys.at(1), pubkeys.at(2)); + simulation->startAllNodes(); + + // TODO: Is this necessary? vv + // wait for ledgers to close so nodes get the updated transitive quorum + simulation->crankUntil( + [&simulation]() { return simulation->haveAllExternalized(3, 1); }, + 10 * simulation->getExpectedLedgerCloseTime(), false); + + // Disconnect node 0 + auto& node0 = *simulation->getNode(pubkeys.at(0)); + REQUIRE(node0.getOverlayManager().getAuthenticatedPeersCount() == 1); + simulation->dropConnection(pubkeys.at(0), pubkeys.at(1)); + REQUIRE(node0.getOverlayManager().getAuthenticatedPeersCount() == 0); + + // Generate payment load from node 1 that will last for at least 5 + // ledgers + auto& node1LoadGen = simulation->getNode(pubkeys.at(1))->getLoadGenerator(); + auto loadConfig = + GeneratedLoadConfig::txLoad(LoadGenMode::PAY, 100, 500, 10); + node1LoadGen.generateLoad(loadConfig); + + // // Run for a few more ledgers + // simulation->crankUntil( + // [&simulation]() { return simulation->haveAllExternalized(6, 1); }, + // 10 * simulation->getExpectedLedgerCloseTime(), false); + + auto& node1 = *simulation->getNode(pubkeys.at(1)); + auto lclNum = node1.getLedgerManager().getLastClosedLedgerNum(); + + // Let remaining nodes externalize a couple blocks + REQUIRE(node0.getOverlayManager().getAuthenticatedPeersCount() == 0); + simulation->crankUntil( + [&simulation, &pubkeys, lclNum]() { + for (int i = 1; i < simSize; ++i) + { + auto const& node = simulation->getNode(pubkeys.at(i)); + if (node->getLedgerManager().getLastClosedLedgerNum() < + lclNum + 2) + { + return false; + } + } + return true; + }, + 10 * simulation->getExpectedLedgerCloseTime(), false); + REQUIRE(node0.getOverlayManager().getAuthenticatedPeersCount() == 0); + + // Node 0 should be behind by at least a couple ledgers + lclNum = node1.getLedgerManager().getLastClosedLedgerNum(); + REQUIRE(node0.getLedgerManager().getLastClosedLedgerNum() <= lclNum - 2); + + // Store initial LCL for node0 to verify progress + auto node0InitialLcl = node0.getLedgerManager().getLastClosedLedgerNum(); + + // Calculate slot indices + uint64 firstMissedSlot = node0InitialLcl + 1; + uint64 secondMissedSlot = node0InitialLcl + 2; + + // Get the number of injection points for the second slot to test all + // possible interleavings + size_t numInjectionPoints = + getInjectionPointsForSlot(node1, secondMissedSlot); + + // Test all possible interleavings where tx set downloads complete at + // different points relative to SCP statements + for (size_t injectionPoint = 0; injectionPoint < numInjectionPoints; + ++injectionPoint) + { + DYNAMIC_SECTION("Injection point " << injectionPoint) + { + // Feed SCP messages for the first missed slot. Due to disconnect + // timing, `node0` might already have the txset for this one, so + // we'll skip checking. + bool ranTest = feedSCPMessagesForSlot(node1, node0, firstMissedSlot, + injectionPoint); + + // Verify node0 advanced by one ledger. + REQUIRE(node0.getLedgerManager().getLastClosedLedgerNum() == + node0InitialLcl + 1); + + if (!ranTest) + { + // Due to disconnect timing, `node0` might already have had the + // txset for the first slot (or it may have been empty), so do + // another slot. + + // Trigger next ledger + // node0.getHerder().triggerNextLedger(node0InitialLcl + 2, + // false); + REQUIRE( + node0.getOverlayManager().getAuthenticatedPeersCount() == + 0); + simulation->crankForAtLeast(std::chrono::seconds(10), false); + REQUIRE( + node0.getOverlayManager().getAuthenticatedPeersCount() == + 0); + + // Feed SCP messages for the second missed slot, testing the + // specific injection point for this iteration + ranTest = feedSCPMessagesForSlot(node1, node0, secondMissedSlot, + injectionPoint); + REQUIRE(ranTest); + + // Verify node0 has now caught up by 2 ledgers total + REQUIRE(node0.getLedgerManager().getLastClosedLedgerNum() == + node0InitialLcl + 2); + } + } + } + + // TODO: I don't think it's necessary to crank here. This should have all + // happened synchronously (for now). + // // Give node 0 some time to process the messages + // simulation->crankForAtMost(std::chrono::seconds(5), false); + + // // Check if node 0 caught up + // auto node0LCL = node0.getLedgerManager().getLastClosedLedgerNum(); + // CLOG_INFO(Herder, "Node 0 LCL after feeding messages: {}, Node 1 LCL: + // {}", + // node0LCL, lclNum); + + // // Node 0 might not fully catch up just from SCP messages alone + // // but it should have made progress + // REQUIRE(node0LCL >= node0.getLedgerManager().getLastClosedLedgerNum()); +} + +// TODO: Does this belong in SCP tests instead? +// TODO: Better test name +TEST_CASE("Skip ledger", "[herder][!hide]") +{ + int constexpr simSize = 3; + int constexpr threshold = 2; + auto const networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); + auto simulation = + std::make_shared(Simulation::OVER_LOOPBACK, networkID); + + std::array configs; + std::array pubkeys; + SCPQuorumSet qset; + qset.threshold = threshold; + constexpr int numAccounts = 30000; + for (int i = 0; i < simSize; ++i) + { + auto& cfg = configs.at(i) = simulation->newConfig(); + cfg.GENESIS_TEST_ACCOUNT_COUNT = numAccounts; + cfg.TX_SET_DOWNLOAD_TIMEOUT = std::chrono::milliseconds(100); + auto const& pubkey = cfg.NODE_SEED.getPublicKey(); + pubkeys.at(i) = pubkey; + qset.validators.push_back(pubkey); + } + + // Add nodes to simulation + for (int i = 0; i < simSize; ++i) + { + auto const& cfg = configs.at(i); + simulation->addNode(cfg.NODE_SEED, qset, &cfg); + } + + // Connect nodes and start simulation + simulation->addPendingConnection(pubkeys.at(0), pubkeys.at(1)); + simulation->addPendingConnection(pubkeys.at(1), pubkeys.at(2)); + simulation->startAllNodes(); + + auto& skipExternalizedCounter = + simulation->getNode(pubkeys.at(0)) + ->getMetrics() + .NewCounter({"scp", "skip", "externalized"}); + auto const initialSkipCount = skipExternalizedCounter.count(); + + // TODO: Is this necessary? vv + // wait for ledgers to close so nodes get the updated transitive quorum + simulation->crankUntil( + [&simulation]() { return simulation->haveAllExternalized(3, 1); }, + 10 * simulation->getExpectedLedgerCloseTime(), false); + + // Generate payment load from node 1 that will last for at least 5 + // ledgers + auto& node1LoadGen = simulation->getNode(pubkeys.at(1))->getLoadGenerator(); + auto loadConfig = + GeneratedLoadConfig::txLoad(LoadGenMode::PAY, numAccounts, 5000, 5); + node1LoadGen.generateLoad(loadConfig); + + // Set up message filters to drop TX set related messages + for (size_t i = 0; i < pubkeys.size(); ++i) + { + for (size_t j = 0; j < pubkeys.size(); ++j) + { + if (i != j) + { + auto conn = + simulation->getLoopbackConnection(pubkeys[i], pubkeys[j]); + if (conn) + { + auto filter = [](StellarMessage const& msg) { + auto msgType = msg.type(); + return msgType != GET_TX_SET && msgType != TX_SET && + msgType != GENERALIZED_TX_SET; + return true; + }; + + conn->getInitiator()->setOutgoingMessageFilter(filter); + conn->getAcceptor()->setOutgoingMessageFilter(filter); + } + } + } + } + + CLOG_ERROR(Herder, "There's a disconnect here"); + + // Run simulation until all nodes externalize a skip value (timeout: 5 + // minutes) + simulation->crankForAtLeast(std::chrono::minutes(5), false); + + // Should have externalized skip + REQUIRE(skipExternalizedCounter.count() > initialSkipCount); +} + +// TODO: I think this needs to put the load generating validator (A?) as HIGH +// and the rest as LOW. Otherwise, there's no reason at A would have the tx set +// (if, for example, A did not win nomination). Also, I may have broken this +// test with ctrl-z, which interacts poorly with copilot +TEST_CASE("Skip ledger vote reversal", "[herder][!hide]") +{ + int constexpr simSize = 3; + auto const networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); + auto simulation = + std::make_shared(Simulation::OVER_LOOPBACK, networkID); + + std::array configs; + std::array pubkeys; + std::vector validators; + validators.reserve(simSize); + constexpr int numAccounts = 30000; + for (int i = 0; i < simSize; ++i) + { + auto& cfg = configs.at(i) = simulation->newConfig(); + cfg.SKIP_HIGH_CRITICAL_VALIDATOR_CHECKS_FOR_TESTING = true; + cfg.GENESIS_TEST_ACCOUNT_COUNT = numAccounts; + cfg.TX_SET_DOWNLOAD_TIMEOUT = std::chrono::milliseconds(100); + auto const& pubkey = cfg.NODE_SEED.getPublicKey(); + pubkeys.at(i) = pubkey; + + ValidatorEntry entry; + std::string label(1, static_cast('A' + i)); + entry.mName = "validator-" + label; + entry.mHomeDomain = "domain-" + label; + entry.mQuality = (i == 0) ? ValidatorQuality::VALIDATOR_HIGH_QUALITY + : ValidatorQuality::VALIDATOR_LOW_QUALITY; + entry.mKey = pubkey; + entry.mHasHistory = false; + validators.emplace_back(std::move(entry)); + } + + for (int i = 0; i < simSize; ++i) + { + auto& cfg = configs.at(i); + cfg.generateQuorumSetForTesting(validators); + simulation->addNode(cfg.NODE_SEED, cfg.QUORUM_SET, &cfg); + } + + simulation->addPendingConnection(pubkeys.at(0), pubkeys.at(1)); + simulation->addPendingConnection(pubkeys.at(0), pubkeys.at(2)); + simulation->addPendingConnection(pubkeys.at(1), pubkeys.at(2)); + simulation->startAllNodes(); + + simulation->crankUntil( + [&simulation]() { return simulation->haveAllExternalized(3, 1); }, + 10 * simulation->getExpectedLedgerCloseTime(), false); + + auto& nodeA = *simulation->getNode(pubkeys.at(0)); + auto& nodeB = *simulation->getNode(pubkeys.at(1)); + auto& nodeC = *simulation->getNode(pubkeys.at(2)); + + auto& skipValueReplacedB = + nodeB.getMetrics().NewCounter({"scp", "skip", "value-replaced"}); + auto& skipValueReplacedC = + nodeC.getMetrics().NewCounter({"scp", "skip", "value-replaced"}); + auto& skipExternalizedB = + nodeB.getMetrics().NewCounter({"scp", "skip", "externalized"}); + auto& skipExternalizedC = + nodeC.getMetrics().NewCounter({"scp", "skip", "externalized"}); + + auto const valueReplacedInitialB = skipValueReplacedB.count(); + auto const valueReplacedInitialC = skipValueReplacedC.count(); + auto const externalizedInitialB = skipExternalizedB.count(); + auto const externalizedInitialC = skipExternalizedC.count(); + + auto& loadGen = nodeA.getLoadGenerator(); + auto loadConfig = + GeneratedLoadConfig::txLoad(LoadGenMode::PAY, numAccounts, 5000, 5); + loadGen.generateLoad(loadConfig); + + auto dropTxSetFilter = [](StellarMessage const& msg) { + auto const msgType = msg.type(); + return msgType != GET_TX_SET && msgType != TX_SET && + msgType != GENERALIZED_TX_SET; + }; + auto allowAllFilter = [](StellarMessage const&) { return true; }; + auto const applyFilterToAllConnections = + [&](std::function const& filter) { + for (int i = 0; i < simSize; ++i) + { + for (int j = i + 1; j < simSize; ++j) + { + auto conn = simulation->getLoopbackConnection( + pubkeys.at(i), pubkeys.at(j)); + if (conn) + { + conn->getInitiator()->setOutgoingMessageFilter(filter); + conn->getAcceptor()->setOutgoingMessageFilter(filter); + } + } + } + }; + + applyFilterToAllConnections(dropTxSetFilter); + + simulation->crankUntil( + [&]() { + return skipValueReplacedB.count() > valueReplacedInitialB && + skipValueReplacedC.count() > valueReplacedInitialC; + }, + 60 * simulation->getExpectedLedgerCloseTime(), false); + + // TODO: It's cool that this works (for some seeds?), but this test is + // flawed. `crankUntil` only checks periodically (not every crank), so it's + // possible that this accidentally cranks "too far" and allows the nodes to + // externalize skip. This needs to be reworked to stop cranking as soon as + // the condition is met, either by modifying SCP in some way, or by manually + // cranking and checking the condition after every crank, or by manually + // executing SCP. + + auto const replacedCountB = skipValueReplacedB.count(); + auto const replacedCountC = skipValueReplacedC.count(); + + auto& herderA = dynamic_cast(nodeA.getHerder()); + auto& herderB = dynamic_cast(nodeB.getHerder()); + auto& herderC = dynamic_cast(nodeC.getHerder()); + + auto const slotIndex = herderB.nextConsensusLedgerIndex(); + REQUIRE(slotIndex == herderA.nextConsensusLedgerIndex()); + REQUIRE(slotIndex == herderC.nextConsensusLedgerIndex()); + + REQUIRE(feedTxSetsFromSlot(nodeA, nodeB, slotIndex)); + REQUIRE(feedTxSetsFromSlot(nodeA, nodeC, slotIndex)); + + applyFilterToAllConnections(allowAllFilter); + + simulation->crankUntil( + [&]() { + return nodeA.getLedgerManager().getLastClosedLedgerNum() >= + slotIndex && + nodeB.getLedgerManager().getLastClosedLedgerNum() >= + slotIndex && + nodeC.getLedgerManager().getLastClosedLedgerNum() >= + slotIndex; + }, + 30 * simulation->getExpectedLedgerCloseTime(), false); + + REQUIRE(skipExternalizedB.count() == externalizedInitialB); + REQUIRE(skipExternalizedC.count() == externalizedInitialC); + REQUIRE(skipValueReplacedB.count() == replacedCountB); + REQUIRE(skipValueReplacedC.count() == replacedCountC); + + // auto const finalLedgerA = + // nodeA.getLedgerManager().getLastClosedLedgerNum(); + // REQUIRE(nodeB.getLedgerManager().getLastClosedLedgerNum() == + // finalLedgerA); REQUIRE(nodeC.getLedgerManager().getLastClosedLedgerNum() + // == finalLedgerA); + + auto const verifyNonSkipExternalize = [&](Application& node, + HerderImpl& herder) { + auto slot = herder.getSCP().getSlotForTesting(slotIndex); + REQUIRE(slot != nullptr); + bool foundLocalExternalize = false; + for (auto const& histStmt : slot->getHistoricalStatementsForTesting()) + { + auto const& st = histStmt.mStatement; + if (st.nodeID == node.getConfig().NODE_SEED.getPublicKey() && + st.pledges.type() == SCPStatementType::SCP_ST_EXTERNALIZE) + { + auto const& value = st.pledges.externalize().commit.value; + REQUIRE(!slot->getSCPDriver().isSkipLedgerValue(value)); + StellarValue sv; + REQUIRE(toStellarValue(value, sv)); + auto txSet = std::get( + herder.getTxSet(sv.txSetHash)); + REQUIRE(txSet); + REQUIRE(txSet->sizeTxTotal() > 0); + foundLocalExternalize = true; + break; + } + } + REQUIRE(foundLocalExternalize); + }; + + verifyNonSkipExternalize(nodeB, herderB); + verifyNonSkipExternalize(nodeC, herderC); +} + using Topology = std::pair, std::vector>; // Generate a Topology with a single org containing 3 validators of HIGH quality diff --git a/src/herder/test/PendingEnvelopesTests.cpp b/src/herder/test/PendingEnvelopesTests.cpp index 8d9b699c2c..9ec4b19ce2 100644 --- a/src/herder/test/PendingEnvelopesTests.cpp +++ b/src/herder/test/PendingEnvelopesTests.cpp @@ -137,7 +137,7 @@ TEST_CASE("PendingEnvelopes recvSCPEnvelope", "[herder]") REQUIRE(pendingEnvelopes.recvSCPEnvelope(saneEnvelope) == Herder::ENVELOPE_STATUS_FETCHING); - REQUIRE(herder.getSCP().getLatestMessage(pk) == nullptr); + REQUIRE(herder.getSCP().getLatestMessage(pk) != nullptr); // -> processes saneEnvelope REQUIRE(pendingEnvelopes.recvTxSet(p.second->getContentsHash(), p.second)); @@ -410,8 +410,99 @@ TEST_CASE("PendingEnvelopes recvSCPEnvelope", "[herder]") REQUIRE(pendingEnvelopes.recvSCPEnvelope(malformedEnvelope) == Herder::ENVELOPE_STATUS_FETCHING); REQUIRE(pendingEnvelopes.recvSCPQuorumSet(saneQSetHash, saneQSet)); - REQUIRE(herder.getSCP().getLatestMessage(pk) == nullptr); + REQUIRE(herder.getSCP().getLatestMessage(pk) != nullptr); REQUIRE(pendingEnvelopes.recvTxSet(p2.second->getContentsHash(), p2.second)); } + + SECTION("value wrapper keeps tx set alive via onTxSetReceived") + { + // The tx set exists but is NOT in the herder's known tx set cache. + // When wrapStellarValue/wrapValue is called, the wrapper won't find + // the tx set and will register in mPendingTxSetWrappers for later + // update via onTxSetReceived(). + auto& scpDriver = herder.getHerderSCPDriver(); + auto txSetHash = txSet->getContentsHash(); + + StellarValue sv = + herder.makeStellarValue(txSetHash, 10, emptyUpgradeSteps, s); + + SECTION("wrapStellarValue registers and receives tx set") + { + // "txSet" and "p.second" hold the only references + REQUIRE(txSet.use_count() == 2); + + // Wrap the value - tx set is not in herder's cache, so the + // wrapper registers in mPendingTxSetWrappers + auto wrapper = scpDriver.wrapStellarValue(sv); + + // Wrapper doesn't have the tx set yet, ref count unchanged + REQUIRE(txSet.use_count() == 2); + + // Deliver the tx set via onTxSetReceived + scpDriver.onTxSetReceived(txSetHash, txSet); + + // Now the wrapper holds a reference to the tx set + REQUIRE(txSet.use_count() == 3); + + // Dropping the wrapper releases its reference + wrapper.reset(); + REQUIRE(txSet.use_count() == 2); + } + + SECTION("wrapValue registers and receives tx set") + { + REQUIRE(txSet.use_count() == 2); + + auto wrapper = scpDriver.wrapValue(p.first); + + REQUIRE(txSet.use_count() == 2); + + scpDriver.onTxSetReceived(txSetHash, txSet); + + REQUIRE(txSet.use_count() == 3); + + wrapper.reset(); + REQUIRE(txSet.use_count() == 2); + } + + SECTION("multiple wrappers all receive tx set") + { + REQUIRE(txSet.use_count() == 2); + + auto wrapper1 = scpDriver.wrapStellarValue(sv); + auto wrapper2 = scpDriver.wrapStellarValue(sv); + + // Neither wrapper has the tx set yet + REQUIRE(txSet.use_count() == 2); + + scpDriver.onTxSetReceived(txSetHash, txSet); + + // Both wrappers now hold a reference + REQUIRE(txSet.use_count() == 4); + + wrapper1.reset(); + REQUIRE(txSet.use_count() == 3); + + wrapper2.reset(); + REQUIRE(txSet.use_count() == 2); + } + + SECTION("expired wrapper does not leak tx set") + { + REQUIRE(txSet.use_count() == 2); + + auto wrapper = scpDriver.wrapStellarValue(sv); + // Drop the wrapper before the tx set arrives + wrapper.reset(); + + REQUIRE(txSet.use_count() == 2); + + // The weak_ptr in the registry has expired, so no update occurs + scpDriver.onTxSetReceived(txSetHash, txSet); + + // No leak - ref count unchanged + REQUIRE(txSet.use_count() == 2); + } + } } diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index b4dd5ee196..40378f1991 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -1540,7 +1540,9 @@ LedgerManagerImpl::applyLedger(LedgerCloseData const& ledgerData, throw std::runtime_error("txset mismatch"); } - if (txSet->getContentsHash() != ledgerData.getValue().txSetHash) + Hash const& ldTxSetHash = ledgerData.getValue().txSetHash; + if (txSet->getContentsHash() != ldTxSetHash && + ldTxSetHash != Herder::SKIP_LEDGER_HASH) { CLOG_ERROR( Ledger, diff --git a/src/ledger/test/LedgerCloseMetaStreamTests.cpp b/src/ledger/test/LedgerCloseMetaStreamTests.cpp index 9f009341ae..712c56cb3b 100644 --- a/src/ledger/test/LedgerCloseMetaStreamTests.cpp +++ b/src/ledger/test/LedgerCloseMetaStreamTests.cpp @@ -52,6 +52,13 @@ TEST_CASE("LedgerCloseMetaStream file descriptor - LIVE_NODE", uint32 const ledgerToWaitFor = 10; + // The `induceOneLedgerFork=true` variant simulates a watcher whose LCL + // diverges from the network (via a locally-applied extra ledger). When + // validators subsequently externalize ledgers built on the non-forked + // history, the watcher's validateValue returns kInvalidValue on those + // values (previousLedgerHash mismatch), and + // BallotProtocol::throwIfValueInvalidForCommit fires before any state + // mutation. bool const induceOneLedgerFork = GENERATE(false, true); CAPTURE(induceOneLedgerFork); auto const ledgerToCorrupt = 5; @@ -106,50 +113,63 @@ TEST_CASE("LedgerCloseMetaStream file descriptor - LIVE_NODE", simulation->startAllNodes(); bool watchersAreCorrupted = false; - simulation->crankUntil( - [&]() { - // As long as the watchers are in sync, wait for them to get the - // news of all the ledgers closed by the validators. But once - // the watchers are corrupt, they won't be able to close more - // ledgers, so at that point we start waiting only for the - // validators to do so. - if (watchersAreCorrupted) - { - return app1->getLedgerManager().getLastClosedLedgerNum() == - ledgerToWaitFor; - } - - auto const lastClosedLedger = - app4->getLedgerManager().getLastClosedLedgerNum(); + try + { + simulation->crankUntil( + [&]() { + // As long as the watchers are in sync, wait for them to + // get the news of all the ledgers closed by the + // validators. But once the watchers are corrupt, they + // won't be able to close more ledgers, so at that point + // we start waiting only for the validators to do so. + if (watchersAreCorrupted) + { + return app1->getLedgerManager() + .getLastClosedLedgerNum() == ledgerToWaitFor; + } - if (lastClosedLedger == expectedLastWatcherLedger - 1) - { - expectedLastSafeHash = app4->getLedgerManager() - .getLastClosedLedgerHeader() - .hash; + auto const lastClosedLedger = + app4->getLedgerManager().getLastClosedLedgerNum(); - if (induceOneLedgerFork) + if (lastClosedLedger == expectedLastWatcherLedger - 1) { - txtest::closeLedgerOn( - *app4, ledgerToCorrupt, - app4->getLedgerManager() + expectedLastSafeHash = app4->getLedgerManager() + .getLastClosedLedgerHeader() + .hash; + + if (induceOneLedgerFork) + { + txtest::closeLedgerOn( + *app4, ledgerToCorrupt, + app4->getLedgerManager() + .getLastClosedLedgerHeader() + .header.scpValue.closeTime + + 1); + + expectedLastUnsafeHash = + app4->getLedgerManager() .getLastClosedLedgerHeader() - .header.scpValue.closeTime + - 1); - - expectedLastUnsafeHash = - app4->getLedgerManager() - .getLastClosedLedgerHeader() - .hash; + .hash; - watchersAreCorrupted = true; - return false; + watchersAreCorrupted = true; + return false; + } } - } - return lastClosedLedger == ledgerToWaitFor; - }, - std::chrono::seconds{200}, false); + return lastClosedLedger == ledgerToWaitFor; + }, + std::chrono::seconds{200}, false); + } + catch (std::runtime_error const& e) + { + // Expected only in the forked variant: the watcher's + // throwIfValueInvalidForCommit helper fires when a quorum of + // validators pulls it into committing a value it rejects. + REQUIRE(induceOneLedgerFork); + REQUIRE(std::string(e.what()).find( + "SCP forced commit on locally-invalid value") != + std::string::npos); + } REQUIRE(app4->getLedgerManager().getLastClosedLedgerNum() == expectedLastWatcherLedger); diff --git a/src/main/Config.cpp b/src/main/Config.cpp index f89545b039..3af7714ec9 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -248,6 +248,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) PEER_AUTHENTICATION_TIMEOUT = 2; PEER_TIMEOUT = 30; PEER_STRAGGLER_TIMEOUT = 120; + TX_SET_DOWNLOAD_TIMEOUT = std::chrono::milliseconds(5000); FLOOD_OP_RATE_PER_LEDGER = 1.0; FLOOD_TX_PERIOD_MS = 200; @@ -1373,6 +1374,11 @@ Config::processConfig(std::shared_ptr t) PEER_STRAGGLER_TIMEOUT = readInt( item, 1, std::numeric_limits::max()); }}, + {"TX_SET_DOWNLOAD_TIMEOUT", + [&]() { + TX_SET_DOWNLOAD_TIMEOUT = + std::chrono::milliseconds(readInt(item, 1)); + }}, {"MAX_BATCH_WRITE_COUNT", [&]() { MAX_BATCH_WRITE_COUNT = readInt(item, 1); }}, {"MAX_BATCH_WRITE_BYTES", diff --git a/src/main/Config.h b/src/main/Config.h index cb217d87c1..dd34dfcd42 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -717,6 +717,9 @@ class Config : public std::enable_shared_from_this unsigned short PEER_AUTHENTICATION_TIMEOUT; unsigned short PEER_TIMEOUT; unsigned short PEER_STRAGGLER_TIMEOUT; + + // TODO: Docs + std::chrono::milliseconds TX_SET_DOWNLOAD_TIMEOUT; int MAX_BATCH_WRITE_COUNT; int MAX_BATCH_WRITE_BYTES; double FLOOD_OP_RATE_PER_LEDGER; diff --git a/src/main/main.cpp b/src/main/main.cpp index d9bd67f2d6..4682b07295 100644 --- a/src/main/main.cpp +++ b/src/main/main.cpp @@ -374,7 +374,8 @@ main(int argc, char* const* argv) // Disable XDR hash checking in vnext builds #ifndef ENABLE_NEXT_PROTOCOL_VERSION_UNSAFE_FOR_PRODUCTION - checkXDRFileIdentity(); +// TODO: Re-enable XDR check +// checkXDRFileIdentity(); #endif } catch (...) diff --git a/src/overlay/ItemFetcher.cpp b/src/overlay/ItemFetcher.cpp index 47f37ee7e1..91744b7580 100644 --- a/src/overlay/ItemFetcher.cpp +++ b/src/overlay/ItemFetcher.cpp @@ -94,6 +94,18 @@ ItemFetcher::fetchingFor(Hash const& itemHash) const return result; } +std::optional +ItemFetcher::getWaitingTime(Hash const& itemHash) const +{ + auto iter = mTrackers.find(itemHash); + if (iter == mTrackers.end()) + { + return std::nullopt; + } + + return iter->second->getDuration(); +} + void ItemFetcher::stopFetchingOutsideRange(std::optional minSlot, std::optional maxSlot, @@ -145,7 +157,7 @@ ItemFetcher::doesntHave(Hash const& itemHash, Peer::pointer peer) } void -ItemFetcher::recv(Hash itemHash, medida::Timer& timer) +ItemFetcher::recv(Hash const& itemHash, medida::Timer& timer) { ZoneScoped; auto const& iter = mTrackers.find(itemHash); @@ -162,6 +174,8 @@ ItemFetcher::recv(Hash itemHash, medida::Timer& timer) timer.Update(tracker->getDuration()); while (!tracker->empty()) { + // NOTE: This calls back into herder upon receiving a tx set. Should + // ensure that we proceed to SCP once receiving all tx sets. mApp.getHerder().recvSCPEnvelope(tracker->pop()); } // stop the timer, stop requesting the item as we have it diff --git a/src/overlay/ItemFetcher.h b/src/overlay/ItemFetcher.h index 7306522c5d..7de6b6a929 100644 --- a/src/overlay/ItemFetcher.h +++ b/src/overlay/ItemFetcher.h @@ -69,6 +69,18 @@ class ItemFetcher : private NonMovableOrCopyable */ std::vector fetchingFor(Hash const& itemHash) const; + /** + * Return how long the fetcher has been waiting for the item identified by + * @p hash. Returns nullopt if the item is not being fetched. + */ + // TODO: Maybe update the name of this function and doc comment. I don't + // like "waiting time" or "nulopt if the item is not being fetched". + // Technically this returns the time since the fetch was started, but if the + // fetch has completed it STILL returns the time since the fetch started, + // and so it's not necessarily all "waiting time". + std::optional + getWaitingTime(Hash const& itemHash) const; + /** * Called periodically to remove envelopes from list that fall outside * the range [minSlot, maxSlot]. Either bound may be nullopt to skip @@ -90,7 +102,7 @@ class ItemFetcher : private NonMovableOrCopyable * added before with @see fetch and the same @p itemHash will be resent * to Herder, matching @see Tracker will be cleaned up. */ - void recv(Hash itemHash, medida::Timer& timer); + void recv(Hash const& itemHash, medida::Timer& timer); #ifdef BUILD_TESTS std::shared_ptr getTracker(Hash const& h); diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index c586cdfca3..a16122f003 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -1466,9 +1466,12 @@ Peer::recvGetTxSet(StellarMessage const& msg) } auto self = shared_from_this(); - if (auto txSet = mAppConnector.getHerder().getTxSet(msg.txSetHash())) + auto result = mAppConnector.getHerder().getTxSet(msg.txSetHash()); + if (auto* txSetPtr = std::get_if(&result); + txSetPtr && *txSetPtr) { auto newMsg = std::make_shared(); + TxSetXDRFrameConstPtr const txSet = *txSetPtr; if (txSet->isGeneralizedTxSet()) { newMsg->type(GENERALIZED_TX_SET); diff --git a/src/overlay/test/LoopbackPeer.cpp b/src/overlay/test/LoopbackPeer.cpp index a02f63c33c..c57eff567d 100644 --- a/src/overlay/test/LoopbackPeer.cpp +++ b/src/overlay/test/LoopbackPeer.cpp @@ -540,6 +540,13 @@ LoopbackPeer::setReorderProbability(double d) mReorderProb = bernoulli_distribution(d); } +void +LoopbackPeer::setOutgoingMessageFilter( + std::function f) +{ + mOutgoingMessageFilter = std::move(f); +} + LoopbackPeerConnection::LoopbackPeerConnection(Application& initiator, Application& acceptor) { @@ -578,4 +585,17 @@ LoopbackPeer::checkCapacity(std::shared_ptr otherPeer) const .getFlowControlBytesTotal() == getFlowControl()->getCapacityBytes().getOutboundCapacity(); } + +void +LoopbackPeer::sendMessage(std::shared_ptr msg, bool log) +{ + // TODO(19): Drop here so that we don't run into issues with authenticated + // MAC counters. This is hacky and not great. Probably want a boolean in + // peer1's Herder called something like "ignoreTxSetRequestsForTesting" or + // something that just ignores any inbound requests for tx sets. + if (mOutgoingMessageFilter(*msg)) + { + Peer::sendMessage(msg, log); + } +} } diff --git a/src/overlay/test/LoopbackPeer.h b/src/overlay/test/LoopbackPeer.h index 8a761273e5..2759b53df3 100644 --- a/src/overlay/test/LoopbackPeer.h +++ b/src/overlay/test/LoopbackPeer.h @@ -42,6 +42,9 @@ class LoopbackPeer : public Peer std::bernoulli_distribution mDamageProb{0.0}; std::bernoulli_distribution mDropProb{0.0}; + std::function mOutgoingMessageFilter = + [](StellarMessage const& msg) { return true; }; + struct Stats { size_t messagesDuplicated{0}; @@ -114,6 +117,10 @@ class LoopbackPeer : public Peer double getReorderProbability() const; void setReorderProbability(double d); + // TODO: Docs + void + setOutgoingMessageFilter(std::function f); + void clearInAndOutQueues(); virtual bool @@ -156,6 +163,9 @@ class LoopbackPeer : public Peer using Peer::sendMessage; using Peer::sendPeers; + void sendMessage(std::shared_ptr msg, + bool log = true) override; + friend class LoopbackPeerConnection; }; diff --git a/src/protocol-curr/xdr b/src/protocol-curr/xdr index cff714a5eb..d0849ff893 160000 --- a/src/protocol-curr/xdr +++ b/src/protocol-curr/xdr @@ -1 +1 @@ -Subproject commit cff714a5ebaaaf2dac343b3546c2df73f0b7a36e +Subproject commit d0849ff89311d5883ba0ab4f499a332a2c6bb4f3 diff --git a/src/protocol-next/xdr b/src/protocol-next/xdr index 5b64bdbd3a..8207f951e7 160000 --- a/src/protocol-next/xdr +++ b/src/protocol-next/xdr @@ -1 +1 @@ -Subproject commit 5b64bdbd3a15267a093765106fb03935852bdc1d +Subproject commit 8207f951e75a6601c8fd64a28473676f18351ccb diff --git a/src/scp/BallotProtocol.cpp b/src/scp/BallotProtocol.cpp index afd2e3f00f..e4941db6ae 100644 --- a/src/scp/BallotProtocol.cpp +++ b/src/scp/BallotProtocol.cpp @@ -18,6 +18,9 @@ #include #include +// TODO: Should make sure that any subsequent stages to vote-to-commit also +// require the tx set. Do not externalize without the tx set. Test these cases +// too. namespace stellar { using namespace std::placeholders; @@ -154,6 +157,8 @@ BallotProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope, bool self) { ZoneScoped; dbgAssert(envelope->getStatement().slotIndex == mSlot.getSlotIndex()); + CLOG_DEBUG(Proto, "processing {} envelope: {}", self ? "self" : "other", + mSlot.getSCP().envToStr(envelope->getEnvelope())); SCPStatement const& statement = envelope->getStatement(); NodeID const& nodeID = statement.nodeID; @@ -187,6 +192,14 @@ BallotProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope, bool self) auto validationRes = validateValues(statement); + // Log validation results + CLOG_TRACE(Proto, + "BallotProtocol::processEnvelope slot:{} " + "received statement with {} value from node:{}", + mSlot.getSlotIndex(), + SCPDriver::validationLevelToString(validationRes), + mSlot.getSCP().getDriver().toShortString(statement.nodeID)); + // If the value is not valid, we just ignore it. if (validationRes == SCPDriver::kInvalidValue) { @@ -325,18 +338,70 @@ BallotProtocol::abandonBallot(uint32 n) } if (v && !v->getValue().empty()) { + // NOTE: This is handling v_3. + Value value = v->getValue(); if (n == 0) { - res = bumpState(v->getValue(), true); + res = bumpState(value, true); } else { - res = bumpState(v->getValue(), n); + res = bumpState(value, n); } } return res; } +bool +BallotProtocol::maybeReplaceValueWithSkip(Value& v) const +{ + // Check validation value + auto validationLevel = + mSlot.getSCPDriver().validateValue(mSlot.getSlotIndex(), v, false); + + switch (validationLevel) + { + case SCPDriver::kInvalidValue: + // Value has been definitively determined to be invalid (e.g., a + // tx set that was downloaded and found to be unusable). Replace + // immediately with skip -- no timeout check needed. + CLOG_WARNING( + Proto, "Replacing invalid value '{}' with skip for slot {}", + mSlot.getSCPDriver().getValueString(v), mSlot.getSlotIndex()); + break; + case SCPDriver::kAwaitingDownload: + { + // Check how long we've been waiting + auto waitingTime = mSlot.getSCPDriver().getTxSetDownloadWaitTime(v); + + // `waitingTime` cannot be nullopt if `validateValue` returns + // `kAwaitingDownload`. + releaseAssert(waitingTime.has_value()); + + CLOG_DEBUG(Proto, "Waiting time for {}: {}", hexAbbrev(v), + waitingTime.value().count()); + + auto timeout = mSlot.getSCPDriver().getTxSetDownloadTimeout(); + if (waitingTime.value() < timeout) + { + // Haven't timed out yet waiting for the tx set + return false; + } + } + break; + default: + // Value is valid or maybe valid, so we shouldn't replace it with skip + return false; + } + + // Choose highest seen skip value, or create one if no such values exist. + v = mSlot.getSCPDriver().makeSkipLedgerValueFromValue(v); + CLOG_DEBUG(Proto, "Voting to skip slot {}", mSlot.getSlotIndex()); + mSlot.getSCPDriver().noteSkipValueReplaced(mSlot.getSlotIndex()); + + return true; +} + bool BallotProtocol::bumpState(Value const& value, bool force) { @@ -355,6 +420,7 @@ bool BallotProtocol::bumpState(Value const& value, uint32 n) { ZoneScoped; + CLOG_DEBUG(Proto, "Bump state!"); if (mPhase != SCP_PHASE_PREPARE && mPhase != SCP_PHASE_CONFIRM) { return false; @@ -378,6 +444,7 @@ BallotProtocol::bumpState(Value const& value, uint32 n) CLOG_TRACE(SCP, "BallotProtocol::bumpState i: {} v: {}", mSlot.getSlotIndex(), mSlot.getSCP().ballotToStr(newb)); + maybeReplaceValueWithSkip(newb.value); bool updated = updateCurrentValue(newb); if (updated) @@ -507,6 +574,8 @@ BallotProtocol::startBallotProtocolTimer() void BallotProtocol::stopBallotProtocolTimer() { + CLOG_DEBUG(Proto, "Stopping ballot protocol timer for slot {}", + mSlot.getSlotIndex()); std::shared_ptr slot = mSlot.shared_from_this(); mSlot.getSCPDriver().setupTimer(mSlot.getSlotIndex(), Slot::BALLOT_PROTOCOL_TIMER, @@ -813,6 +882,8 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) for (auto cur = candidates.rbegin(); cur != candidates.rend(); cur++) { SCPBallot ballot = *cur; + CLOG_DEBUG(Proto, "BallotProtocol::attemptAcceptPrepared i: {} b: {}", + mSlot.getSlotIndex(), mSlot.getSCP().ballotToStr(ballot)); if (mPhase == SCP_PHASE_CONFIRM) { @@ -831,6 +902,7 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) if (mPreparedPrime && compareBallots(ballot, mPreparedPrime->getBallot()) <= 0) { + CLOG_DEBUG(Proto, "ballot <= p'"); continue; } @@ -839,6 +911,7 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) // if ballot is already covered by p, skip if (areBallotsLessAndCompatible(ballot, mPrepared->getBallot())) { + CLOG_DEBUG(Proto, "ballot already covered by p"); continue; } // otherwise, there is a chance it increases p' @@ -846,7 +919,7 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) bool accepted = federatedAccept( // checks if any node is voting for this ballot - [&ballot](SCPStatement const& st) { + [this, &ballot](SCPStatement const& st) { bool res; switch (st.pledges.type()) @@ -855,6 +928,9 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) { auto const& p = st.pledges.prepare(); res = areBallotsLessAndCompatible(ballot, p.ballot); + CLOG_DEBUG(Proto, "{} < {}: {}", + mSlot.getSCP().ballotToStr(ballot), + mSlot.getSCP().ballotToStr(p.ballot), res); } break; case SCP_ST_CONFIRM: @@ -877,6 +953,7 @@ BallotProtocol::attemptAcceptPrepared(SCPStatement const& hint) return res; }, std::bind(&BallotProtocol::hasPreparedBallot, ballot, _1)); + CLOG_DEBUG(Proto, "Accepted: {}", accepted); if (accepted) { return setAcceptPrepared(ballot); @@ -890,7 +967,7 @@ bool BallotProtocol::setAcceptPrepared(SCPBallot const& ballot) { ZoneScoped; - CLOG_TRACE(SCP, "BallotProtocol::setAcceptPrepared i: {} b: {}", + CLOG_DEBUG(Proto, "BallotProtocol::setAcceptPrepared i: {} b: {}", mSlot.getSlotIndex(), mSlot.getSCP().ballotToStr(ballot)); // update our state @@ -1067,9 +1144,67 @@ BallotProtocol::setConfirmPrepared(SCPBallot const& newC, SCPBallot const& newH) if (newC.counter != 0) { - dbgAssert(!mCommit); - mCommit = makeBallot(newC); - didWork = true; + // This is step 3 from the paper - voting to commit. + // We must ensure the transaction set value is fully validated + // before we can vote to commit it. + auto validationLevel = mSlot.getSCPDriver().validateValue( + mSlot.getSlotIndex(), newC.value, false); + + // Debug output to see what validation level we're getting + CLOG_DEBUG( + Proto, + "DEBUG: setConfirmPrepared validation level = {} for slot {}", + static_cast(validationLevel), mSlot.getSlotIndex()); + + if (validationLevel == SCPDriver::kAwaitingDownload) + { + // Record the start time if this is the first time balloting + // becomes blocked on this txset + mSlot.getSCPDriver().recordBallotBlockedOnTxSet( + mSlot.getSlotIndex(), newC.value); + + // Check how long we've been waiting for the transaction set + auto waitingTime = + mSlot.getSCPDriver().getTxSetDownloadWaitTime(newC.value); + + // `waitingTime` cannot be nullopt if `validateValue` returns + // `kAwaitingDownload`. + releaseAssert(waitingTime.has_value()); + CLOG_DEBUG( + Proto, + "BallotProtocol::setConfirmPrepared slot:{} " + "attempting to vote to commit with kAwaitingDownload value " + "- " + "ballot counter:{} value:{} waiting_time:{}ms", + mSlot.getSlotIndex(), newC.counter, + mSlot.getSCP().getDriver().getValueString(newC.value), + waitingTime.value().count()); + } + else if (validationLevel == SCPDriver::kInvalidValue) + { + // With parallel downloading, a confirmed-prepared value + // can become kInvalidValue if the tx set was downloaded + // and found invalid. Do not vote to commit it. + CLOG_WARNING( + Proto, + "BallotProtocol::setConfirmPrepared slot:{} " + "commit gate rejecting kInvalidValue - " + "ballot counter:{} value:{}", + mSlot.getSlotIndex(), newC.counter, + mSlot.getSCP().getDriver().getValueString(newC.value)); + } + else + { + dbgAssert(!mCommit); + + // Measure and record how long balloting was blocked on this + // txset + mSlot.getSCPDriver().measureAndRecordBallotBlockedOnTxSet( + mSlot.getSlotIndex(), newC.value); + + mCommit = makeBallot(newC); + didWork = true; + } } if (didWork) @@ -1304,6 +1439,34 @@ BallotProtocol::attemptAcceptCommit(SCPStatement const& hint) return res; } +void +BallotProtocol::throwIfValueInvalidForCommit(Value const& value, + char const* caller) +{ + auto validationLevel = mSlot.getSCPDriver().validateValue( + mSlot.getSlotIndex(), value, /*nomination=*/false); + if (validationLevel != SCPDriver::kInvalidValue) + { + return; + } + + uint64 const slotIndex = mSlot.getSlotIndex(); + std::string const valueStr = + mSlot.getSCP().getDriver().getValueString(value); + CLOG_FATAL(Proto, + "BallotProtocol::{} slot:{} SCP federated accept is forcing a " + "commit on a value this node considers invalid (value:{}). " + "The most likely cause is that this stellar-core binary is " + "incompatible with the network's current protocol version. " + "Please check that you are running the latest stellar-core " + "release.", + caller, slotIndex, valueStr); + std::ostringstream oss; + oss << "SCP forced commit on locally-invalid value (slot=" << slotIndex + << ", caller=" << caller << ")"; + throw std::runtime_error(oss.str()); +} + bool BallotProtocol::setAcceptCommit(SCPBallot const& c, SCPBallot const& h) { @@ -1312,6 +1475,8 @@ BallotProtocol::setAcceptCommit(SCPBallot const& c, SCPBallot const& h) mSlot.getSlotIndex(), mSlot.getSCP().ballotToStr(c), mSlot.getSCP().ballotToStr(h)); + throwIfValueInvalidForCommit(c.value, "setAcceptCommit"); + bool didWork = false; // remember h's value @@ -1514,6 +1679,8 @@ BallotProtocol::setConfirmCommit(SCPBallot const& c, SCPBallot const& h) mSlot.getSlotIndex(), mSlot.getSCP().ballotToStr(c), mSlot.getSCP().ballotToStr(h)); + throwIfValueInvalidForCommit(c.value, "setConfirmCommit"); + mCommit = makeBallot(c); mHighBallot = makeBallot(h); updateCurrentIfNeeded(mHighBallot->getBallot()); @@ -1864,7 +2031,7 @@ BallotProtocol::advanceSlot(SCPStatement const& hint) { ZoneScoped; mCurrentMessageLevel++; - CLOG_TRACE(SCP, "BallotProtocol::advanceSlot {} {}", mCurrentMessageLevel, + CLOG_DEBUG(Proto, "BallotProtocol::advanceSlot {} {}", mCurrentMessageLevel, getLocalState()); if (mCurrentMessageLevel >= MAX_ADVANCE_SLOT_RECURSION) @@ -1884,6 +2051,11 @@ BallotProtocol::advanceSlot(SCPStatement const& hint) didWork = attemptAcceptPrepared(hint) || didWork; + if (didWork) + { + CLOG_DEBUG(Proto, "attemptAcceptPrepared did work"); + } + didWork = attemptConfirmPrepared(hint) || didWork; didWork = attemptAcceptCommit(hint) || didWork; @@ -1961,12 +2133,30 @@ SCPDriver::ValidationLevel BallotProtocol::validateValues(SCPStatement const& st) { ZoneScoped; + + if (st.pledges.type() == SCPStatementType::SCP_ST_PREPARE) + { + // Don't validate any values in PREPARE statements. With parallel + // downloading, ballot.value may be kAwaitingDownload that later + // becomes kInvalidValue, and prepared/preparedPrime are protocol + // facts (accepted-prepared ballots) that cannot be unilaterally + // changed. Incoming PREPAREs with invalid ballot values must be + // accepted so that checkHeardFromQuorum can see a quorum and arm + // the ballot timer. The commit gate in setConfirmPrepared + // independently validates values before voting to commit. + return SCPDriver::kFullyValidatedValue; + } + std::set values; values = getStatementValues(st); if (values.empty()) { + CLOG_DEBUG(Proto, + "BallotProtocol::validateValues slot:{} " + "found empty value set in statement", + mSlot.getSlotIndex()); // This shouldn't happen return SCPDriver::kInvalidValue; } @@ -1978,11 +2168,28 @@ BallotProtocol::validateValues(SCPStatement const& st) { auto tr = mSlot.getSCPDriver().validateValue( mSlot.getSlotIndex(), v, false); + + if (tr == SCPDriver::kAwaitingDownload) + { + CLOG_DEBUG(Proto, + "BallotProtocol::validateValues slot:{} " + "found kAwaitingDownload value in statement", + mSlot.getSlotIndex()); + } + lv = std::min(tr, lv); } return lv; }); + if (res == SCPDriver::kInvalidValue) + { + CLOG_DEBUG(Proto, + "BallotProtocol::validateValues slot:{} found " + "kInvalidValue value in statement", + mSlot.getSlotIndex()); + } + return res; } diff --git a/src/scp/BallotProtocol.h b/src/scp/BallotProtocol.h index 1964e78256..3281c74d39 100644 --- a/src/scp/BallotProtocol.h +++ b/src/scp/BallotProtocol.h @@ -215,6 +215,11 @@ class BallotProtocol bool setConfirmCommit(SCPBallot const& acceptCommitLow, SCPBallot const& acceptCommitHigh); + // Throws std::runtime_error if validateValue(value, nomination=false) + // returns kInvalidValue. Called from setAcceptCommit / setConfirmCommit + // to fail fast when a quorum pulls us into committing a value we reject. + void throwIfValueInvalidForCommit(Value const& value, char const* caller); + // step 9 from the SCP paper bool attemptBump(); @@ -289,6 +294,9 @@ class BallotProtocol // check: verifies that ballot is greater than old one void bumpToBallot(SCPBallot const& ballot, bool check); + // TODO: Docs + bool maybeReplaceValueWithSkip(Value& v) const; + // switch the local node to the given ballot's value // with the assumption that the ballot is more recent than the one // we have. diff --git a/src/scp/NominationProtocol.cpp b/src/scp/NominationProtocol.cpp index 1cdc267a30..418c3f6278 100644 --- a/src/scp/NominationProtocol.cpp +++ b/src/scp/NominationProtocol.cpp @@ -361,8 +361,13 @@ NominationProtocol::getNewValueFromNomination(SCPNomination const& nom) auto pickValue = [&](Value const& value) { ValueWrapperPtr valueToNominate; auto vl = validateValue(value); - if (vl == SCPDriver::kFullyValidatedValue) + if (vl >= SCPDriver::kAwaitingDownload) { + CLOG_TRACE(Proto, + "NominationProtocol::updateRoundLeaders slot:{} " + "attempting to nominate value {} with {} status", + mSlot.getSlotIndex(), hexAbbrev(value), + SCPDriver::validationLevelToString(vl)); valueToNominate = mSlot.getSCPDriver().wrapValue(value); } else @@ -405,6 +410,12 @@ SCP::EnvelopeState NominationProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope) { ZoneScoped; + CLOG_TRACE(Proto, + "NominationProtocol::processEnvelope slot:{} " + "received envelope from node:{}", + mSlot.getSlotIndex(), + mSlot.getSCP().getDriver().toShortString( + envelope->getStatement().nodeID)); auto const& st = envelope->getStatement(); auto const& nom = st.pledges.nominate(); @@ -446,8 +457,14 @@ NominationProtocol::processEnvelope(SCPEnvelopeWrapperPtr envelope) mLatestNominations)) { auto vl = validateValue(v); - if (vl == SCPDriver::kFullyValidatedValue) + if (vl >= SCPDriver::kAwaitingDownload) { + CLOG_TRACE( + Proto, + "NominationProtocol::updateRoundLeaders slot:{} " + "accepting value {} with {} status in federated accept", + mSlot.getSlotIndex(), hexAbbrev(v), + SCPDriver::validationLevelToString(vl)); mAccepted.emplace(vw); mVotes.emplace(vw); modified = true; diff --git a/src/scp/SCP.h b/src/scp/SCP.h index f06980937c..700ace0b9c 100644 --- a/src/scp/SCP.h +++ b/src/scp/SCP.h @@ -164,6 +164,14 @@ class SCP return mKnownSlots.empty() ? 0 : mKnownSlots.rbegin()->first; } +#ifdef BUILD_TESTS + std::shared_ptr + getSlotForTesting(uint64 slotIndex) + { + return getSlot(slotIndex, false); + } +#endif + private: // Calculate the state of the node for the given slot index. QuorumInfoNodeState getState(NodeID const& node, uint64 slotIndex); diff --git a/src/scp/SCPDriver.cpp b/src/scp/SCPDriver.cpp index 9e54c07778..e0ea9f54b7 100644 --- a/src/scp/SCPDriver.cpp +++ b/src/scp/SCPDriver.cpp @@ -78,6 +78,24 @@ SCPDriver::getValueString(Value const& v) const return hexAbbrev(valueHash); } +std::string +SCPDriver::validationLevelToString(ValidationLevel level) +{ + switch (level) + { + case kInvalidValue: + return "InvalidValue"; + case kMaybeValidValue: + return "MaybeValidValue"; + case kAwaitingDownload: + return "AwaitingDownload"; + case kFullyValidatedValue: + return "FullyValidatedValue"; + default: + return "UnknownValidationLevel"; + } +} + std::string SCPDriver::toStrKey(NodeID const& pk, bool fullKey) const { diff --git a/src/scp/SCPDriver.h b/src/scp/SCPDriver.h index 7c6dc01fc8..7a0a16d6ba 100644 --- a/src/scp/SCPDriver.h +++ b/src/scp/SCPDriver.h @@ -9,12 +9,16 @@ #include #include #include +#include #include #include "xdr/Stellar-SCP.h" namespace stellar { +class TxSetXDRFrame; +using TxSetXDRFrameConstPtr = std::shared_ptr; + class ValueWrapper : public NonMovableOrCopyable { Value const mValue; @@ -28,6 +32,13 @@ class ValueWrapper : public NonMovableOrCopyable { return mValue; } + + // Should be called when a tx set becomes available after this wrapper was + // created without it. + virtual void + setTxSet(TxSetXDRFrameConstPtr txSet) + { + } }; typedef std::shared_ptr SCPQuorumSetPtr; @@ -59,6 +70,13 @@ class SCPEnvelopeWrapper : public NonMovableOrCopyable { return mEnvelope.statement; } + + // Should be called when a tx set becomes available after this wrapper was + // created without it. + virtual void + addTxSet(TxSetXDRFrameConstPtr txSet) + { + } }; typedef std::shared_ptr SCPEnvelopeWrapperPtr; @@ -91,6 +109,17 @@ class SCPDriver // considered invalid. virtual SCPQuorumSetPtr getQSet(Hash const& qSetHash) = 0; + // `getTxSetDownloadWaitTime` returns how long the fetcher has been waiting + // for the transaction set identified by @p hash. Returns nullopt if the + // transaction set is not being fetched. May throw if `hash` cannot be + // converted to a `StellarValue`. + virtual std::optional + getTxSetDownloadWaitTime(Value const& hash) const = 0; + + // Returns how long the ballot protocol should wait before replacing a + // value whose transaction set has not finished downloading. + virtual std::chrono::milliseconds getTxSetDownloadTimeout() const = 0; + // Users of the SCP library should inherit from SCPDriver and implement the // virtual methods which are called by the SCP implementation to // abstract the transport layer used from the implementation of the SCP @@ -117,7 +146,8 @@ class SCPDriver { kInvalidValue = 0, // value is invalid for sure kMaybeValidValue = 1, // value may be valid - kFullyValidatedValue = 2 // value is valid for sure + kAwaitingDownload = 2, // value is being fetched + kFullyValidatedValue = 3 // value is valid for sure }; virtual ValidationLevel validateValue(uint64 slotIndex, Value const& value, bool nomination) @@ -125,6 +155,9 @@ class SCPDriver return kMaybeValidValue; } + // TODO: Remove this function after cleaning up logging? + static std::string validationLevelToString(ValidationLevel level); + // `extractValidValue` transforms the value, if possible to a different // value that the local node would agree to (fully validated). // This is used during nomination when encountering an invalid value (ie @@ -136,6 +169,12 @@ class SCPDriver return nullptr; } + // Helper function to craft a skip ledger value from a Value. + virtual Value makeSkipLedgerValueFromValue(Value const& v) const = 0; + + // `isSkipLedgerValue` checks if a value is a skip ledger value. + virtual bool isSkipLedgerValue(Value const& v) const = 0; + // `getValueString` is used for debugging // default implementation is the hash of the value virtual std::string getValueString(Value const& v) const; @@ -206,6 +245,11 @@ class SCPDriver { } + virtual void + noteSkipValueReplaced(uint64) + { + } + // ``nominatingValue`` is called every time the local instance nominates // a new value. virtual void @@ -257,6 +301,19 @@ class SCPDriver { } + // Called when balloting becomes blocked waiting for a txset download + virtual void + recordBallotBlockedOnTxSet(uint64 slotIndex, Value const& value) + { + } + + // Called when balloting is unblocked (setting mCommit) to measure and + // record how long we were blocked waiting for the txset + virtual void + measureAndRecordBallotBlockedOnTxSet(uint64 slotIndex, Value const& value) + { + } + #ifdef BUILD_TESTS std::function mPriorityLookupForTesting; void diff --git a/src/scp/Slot.cpp b/src/scp/Slot.cpp index bff0557579..eedbba79d9 100644 --- a/src/scp/Slot.cpp +++ b/src/scp/Slot.cpp @@ -413,8 +413,10 @@ Slot::federatedAccept(StatementPredicate voted, StatementPredicate accepted, // v-blocking set if (LocalNode::isVBlocking(getLocalNode()->getQuorumSet(), envs, accepted)) { + CLOG_DEBUG(Proto, "found v-blocking set"); return true; } + CLOG_DEBUG(Proto, "did not find v-blocking set"); // Checks if the set of nodes that accepted or voted for it form a quorum @@ -431,6 +433,7 @@ Slot::federatedAccept(StatementPredicate voted, StatementPredicate accepted, { return true; } + CLOG_DEBUG(Proto, "did not find quorum"); return false; } diff --git a/src/scp/Slot.h b/src/scp/Slot.h index 20eca36d95..10ad54a9d5 100644 --- a/src/scp/Slot.h +++ b/src/scp/Slot.h @@ -204,6 +204,14 @@ class Slot : public std::enable_shared_from_this // missing. Used for reporting purposes only. static uint32_t const NUM_TIMEOUTS_THRESHOLD_FOR_REPORTING = 2; +#ifdef BUILD_TESTS + std::vector const& + getHistoricalStatementsForTesting() const + { + return mStatementsHistory; + } +#endif + protected: std::vector getEntireCurrentState(); void maybeSetGotVBlocking(); diff --git a/src/scp/test/SCPTests.cpp b/src/scp/test/SCPTests.cpp index 2d72f9d78d..a1498df686 100644 --- a/src/scp/test/SCPTests.cpp +++ b/src/scp/test/SCPTests.cpp @@ -67,6 +67,16 @@ class TestSCP : public SCPDriver validateValue(uint64 slotIndex, Value const& value, bool nomination) override { + if (mValidateValueOverride) + { + return mValidateValueOverride(slotIndex, value, nomination); + } + // If we're tracking download wait time for this value, it's awaiting + // download + if (mDownloadWaitTimes.find(value) != mDownloadWaitTimes.end()) + { + return SCPDriver::kAwaitingDownload; + } return SCPDriver::kFullyValidatedValue; } @@ -97,6 +107,50 @@ class TestSCP : public SCPDriver return SCPQuorumSetPtr(); } + std::optional + getTxSetDownloadWaitTime(Value const& v) const override + { + auto it = mDownloadWaitTimes.find(v); + if (it != mDownloadWaitTimes.end()) + { + return it->second; + } + return std::nullopt; + } + + std::chrono::milliseconds + getTxSetDownloadTimeout() const override + { + return mDownloadTimeout; + } + + Value + makeSkipLedgerValueFromValue(Value const& value) const override + { + // Create a skip value by prefixing with "SKIP:" + Value skipValue; + skipValue.resize(5 + value.size()); + skipValue[0] = 'S'; + skipValue[1] = 'K'; + skipValue[2] = 'I'; + skipValue[3] = 'P'; + skipValue[4] = ':'; + std::copy(value.begin(), value.end(), skipValue.begin() + 5); + return skipValue; + } + + bool + isSkipLedgerValue(Value const& v) const override + { + // Check if value starts with "SKIP:" + if (v.size() < 5) + { + return false; + } + return v[0] == 'S' && v[1] == 'K' && v[2] == 'I' && v[3] == 'P' && + v[4] == ':'; + } + void emitEnvelope(SCPEnvelope const& envelope) override { @@ -198,12 +252,18 @@ class TestSCP : public SCPDriver std::function mPriorityLookup; std::function mHashValueCalculator; + std::function + mValidateValueOverride; std::map mQuorumSets; std::vector mEnvs; std::map mExternalizedValues; std::map> mHeardFromQuorums; + // Skip ledger support + std::map mDownloadWaitTimes; + std::chrono::milliseconds mDownloadTimeout{5000}; + struct TimerData { std::chrono::milliseconds mAbsoluteTimeout; @@ -309,6 +369,19 @@ class TestSCP : public SCPDriver return mSCP.getSlot(slotIndex, false)->getNominationLeaders(); } + // Helper methods for skip ledger testing + void + startDownload(Value const& v, std::chrono::milliseconds waitTime) + { + mDownloadWaitTimes[v] = waitTime; + } + + void + clearDownload(Value const& v) + { + mDownloadWaitTimes.erase(v); + } + // Copied from HerderSCPDriver.cpp static uint32_t const MAX_TIMEOUT_MS = (30 * 60) * 1000; @@ -3349,4 +3422,614 @@ TEST_CASE("nomination tests core5", "[scp][nominationprotocol]") testTimeouts(scp, test); } } + +TEST_CASE("nomination can self-generate invalid prepare after awaiting value" + " turns invalid", + "[scp][nomination]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + auto const qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + std::map validationLevels; + validationLevels[xValue] = SCPDriver::kAwaitingDownload; + scp.mValidateValueOverride = [&](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + auto const it = validationLevels.find(value); + if (it != validationLevels.end()) + { + return it->second; + } + return SCPDriver::kFullyValidatedValue; + }; + + REQUIRE(scp.nominate(0, xValue, false)); + + auto const followerVoteNomination = + makeNominate(v1SecretKey, qSetHash, 0, {xValue}, {}); + REQUIRE_NOTHROW(scp.receiveEnvelope(followerVoteNomination)); + + validationLevels[xValue] = SCPDriver::kInvalidValue; + scp.mExpectedCandidates.emplace(xValue); + scp.mCompositeValue = xValue; + + auto const followerAcceptedNomination = + makeNominate(v2SecretKey, qSetHash, 0, {xValue}, {xValue}); + // With the fix, maybeReplaceValueWithSkip replaces the invalid value + // with skip in bumpState, so no throw occurs + REQUIRE_NOTHROW(scp.receiveEnvelope(followerAcceptedNomination)); + + // The emitted ballot should have a skip value, not the original xValue + auto const& lastEnv = scp.mEnvs.back(); + REQUIRE(lastEnv.statement.pledges.type() == SCP_ST_PREPARE); + auto const& ballot = lastEnv.statement.pledges.prepare().ballot; + REQUIRE(scp.isSkipLedgerValue(ballot.value)); + REQUIRE(ballot.value == scp.makeSkipLedgerValueFromValue(xValue)); +} + +TEST_CASE("ballot protocol can self-generate invalid prepare after" + " awaiting value turns invalid", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + // v0 enters ballot protocol with xValue while its tx set is + // still being downloaded (kAwaitingDownload, not yet timed out) + scp.startDownload(xValue, std::chrono::milliseconds(1000)); + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + REQUIRE(scp.mEnvs[0].statement.pledges.prepare().ballot == + SCPBallot(1, xValue)); + + // xValue becomes invalid (tx set downloaded but found unusable) + scp.mValidateValueOverride = [](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + if (value == xValue) + { + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + }; + + // v1 sends PREPARE at higher counter with a different (valid) value + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, SCPBallot(2, yValue)))); + + // v2 sends PREPARE at higher counter — v1+v2 now form a v-blocking + // set ahead of v0, triggering attemptBump -> abandonBallot -> + // bumpState with xValue (now invalid). With the fix, + // maybeReplaceValueWithSkip replaces it with skip. + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, SCPBallot(2, yValue)))); + + // The emitted ballot should have a skip value at counter 2 + REQUIRE(scp.mEnvs.size() == 2); + auto const& ballot = scp.mEnvs[1].statement.pledges.prepare().ballot; + REQUIRE(ballot.counter == 2); + REQUIRE(scp.isSkipLedgerValue(ballot.value)); + REQUIRE(ballot.value == scp.makeSkipLedgerValueFromValue(xValue)); +} + +TEST_CASE("skip ledger on download timeout", "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + // 3 node network with threshold=2 (need any 2 nodes to form quorum) + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + uint256 qSetHash0 = scp.mSCP.getLocalNode()->getQuorumSetHash(); + + SECTION("timeout during prepare phase") + { + // Node v0 starts ballot protocol with xValue + // Simulate that xValue is awaiting download with timeout exceeded + scp.startDownload(xValue, std::chrono::milliseconds(6000)); + + // Now call bumpState which should trigger maybeReplaceValueWithSkip + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + + // The ballot should have a skip ledger value, not the original xValue + auto const& emittedBallot = + scp.mEnvs[0].statement.pledges.prepare().ballot; + + REQUIRE(emittedBallot.counter == 1); + REQUIRE(scp.isSkipLedgerValue(emittedBallot.value)); + + // Verify it's a skip of the original xValue + Value expectedSkipValue = scp.makeSkipLedgerValueFromValue(xValue); + REQUIRE(emittedBallot.value == expectedSkipValue); + + verifyPrepare(scp.mEnvs[0], v0SecretKey, qSetHash0, 0, + SCPBallot(1, expectedSkipValue)); + } + + SECTION("no timeout when wait time under threshold") + { + // Node v0 starts ballot protocol with xValue + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + + SCPBallot b1(1, xValue); + + // Simulate that xValue is awaiting download but wait time is still low + scp.startDownload(xValue, std::chrono::milliseconds(1000)); + + // Try to bump state - should NOT replace with skip value + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 2); + + // Verify ballot still has original xValue, not a skip value + auto const& emittedBallot = + scp.mEnvs[1].statement.pledges.prepare().ballot; + REQUIRE(emittedBallot.counter == 2); + REQUIRE(!scp.isSkipLedgerValue(emittedBallot.value)); + REQUIRE(emittedBallot.value == xValue); + } + + SECTION("skip value can be prepared and confirmed") + { + // Start with xValue and timeout to skip value + scp.startDownload(xValue, std::chrono::milliseconds(6000)); + + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + + Value skipValue = scp.makeSkipLedgerValueFromValue(xValue); + SCPBallot skipB1(1, skipValue); + + // Verify we emitted skip value + REQUIRE(scp.isSkipLedgerValue( + scp.mEnvs[0].statement.pledges.prepare().ballot.value)); + verifyPrepare(scp.mEnvs[0], v0SecretKey, qSetHash0, 0, skipB1); + + // Other nodes also move to skip value + scp.receiveEnvelope(makePrepare(v1SecretKey, qSetHash, 0, skipB1)); + scp.receiveEnvelope(makePrepare(v2SecretKey, qSetHash, 0, skipB1)); + + // Should prepare skip value (quorum reached) + REQUIRE(scp.mEnvs.size() == 2); + verifyPrepare(scp.mEnvs[1], v0SecretKey, qSetHash0, 0, skipB1, &skipB1); + + // Quorum confirms prepared skip value + scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, skipB1, &skipB1)); + scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, skipB1, &skipB1)); + + REQUIRE(scp.mEnvs.size() == 3); + verifyPrepare(scp.mEnvs[2], v0SecretKey, qSetHash0, 0, skipB1, &skipB1, + 1, 1); + + // Accept commit + scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, skipB1, &skipB1, 1, 1)); + scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, skipB1, &skipB1, 1, 1)); + + REQUIRE(scp.mEnvs.size() == 4); + verifyConfirm(scp.mEnvs[3], v0SecretKey, qSetHash0, 0, 1, skipB1, 1, 1); + + // Externalize skip value + scp.receiveEnvelope( + makeConfirm(v1SecretKey, qSetHash, 0, 1, skipB1, 1, 1)); + scp.receiveEnvelope( + makeConfirm(v2SecretKey, qSetHash, 0, 1, skipB1, 1, 1)); + + REQUIRE(scp.mEnvs.size() == 5); + verifyExternalize(scp.mEnvs[4], v0SecretKey, qSetHash0, 0, skipB1, 1); + + // Verify the externalized value is the skip value + REQUIRE(scp.mExternalizedValues.size() == 1); + REQUIRE(scp.isSkipLedgerValue(scp.mExternalizedValues[0])); + REQUIRE(scp.mExternalizedValues[0] == skipValue); + } + + SECTION("switch back to original value after download completes") + { + // Node starts with xValue that's awaiting download (timeout exceeded) + scp.startDownload(xValue, std::chrono::milliseconds(6000)); + + // First bumpState creates skip value + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + + Value skipValue = scp.makeSkipLedgerValueFromValue(xValue); + SCPBallot skipB1(1, skipValue); + + // Verify we emitted skip value + REQUIRE(scp.isSkipLedgerValue( + scp.mEnvs[0].statement.pledges.prepare().ballot.value)); + verifyPrepare(scp.mEnvs[0], v0SecretKey, qSetHash0, 0, skipB1); + + // Simulate download completion - value is now available + scp.clearDownload(xValue); + + // Now bumpState should switch back to original value + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 2); + + // Verify we switched back to original xValue (not skip value) + auto const& emittedBallot = + scp.mEnvs[1].statement.pledges.prepare().ballot; + REQUIRE(emittedBallot.counter == 2); + REQUIRE(!scp.isSkipLedgerValue(emittedBallot.value)); + REQUIRE(emittedBallot.value == xValue); + + // Verify the ballot structure - new ballot with original value + SCPBallot xB2(2, xValue); + verifyPrepare(scp.mEnvs[1], v0SecretKey, qSetHash0, 0, xB2); + } +} + +TEST_CASE("setConfirmPrepared stalls on kAwaitingDownload value", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + uint256 qSetHash0 = scp.mSCP.getLocalNode()->getQuorumSetHash(); + + SECTION("commit gate stalls mCommit but mHighBallot is set") + { + // v0 enters ballot protocol with xValue fully validated + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + SCPBallot xB1(1, xValue); + + // Switch xValue to kAwaitingDownload + scp.startDownload(xValue, std::chrono::milliseconds(1000)); + + // v1 and v2 send PREPAREs with prepared — quorum confirms prepared + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, xB1, &xB1))); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, xB1, &xB1))); + + // setConfirmPrepared sets mHighBallot (nH > 0) but the commit gate + // stalls mCommit (nC == 0) because xValue is kAwaitingDownload. + // Check the latest emitted PREPARE. + REQUIRE(scp.mEnvs.size() >= 2); + auto const& lastPrep = scp.mEnvs.back().statement.pledges.prepare(); + REQUIRE(lastPrep.nH == 1); + REQUIRE(lastPrep.nC == 0); + } + + SECTION("proceeds after value becomes validated") + { + // Same setup as above — commit gate stalls mCommit + REQUIRE(scp.bumpState(0, xValue)); + SCPBallot xB1(1, xValue); + + scp.startDownload(xValue, std::chrono::milliseconds(1000)); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, xB1, &xB1))); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, xB1, &xB1))); + auto envsBeforeClear = scp.mEnvs.size(); + + // Simulate tx set arrival — value becomes fully validated + scp.clearDownload(xValue); + + // Trigger advanceSlot with envelopes that confirm-prepare at a + // higher counter, so attemptConfirmPrepared finds newH > mHighBallot. + // This causes setConfirmPrepared to be called with the now-validated + // value, setting mCommit. + SCPBallot xB2(2, xValue); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, xB2, &xB2))); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, xB2, &xB2))); + + // setConfirmPrepared should now succeed — mCommit set, node + // progresses. Expect at least one new envelope with nC > 0 or a + // CONFIRM/EXTERNALIZE. + REQUIRE(scp.mEnvs.size() > envsBeforeClear); + bool foundC = false; + for (size_t i = envsBeforeClear; i < scp.mEnvs.size(); i++) + { + auto const& st = scp.mEnvs[i].statement; + if (st.pledges.type() == SCP_ST_PREPARE) + { + if (st.pledges.prepare().nC > 0) + { + foundC = true; + break; + } + } + else + { + // CONFIRM or EXTERNALIZE also proves we got past the stall + foundC = true; + break; + } + } + REQUIRE(foundC); + } +} + +TEST_CASE("incoming PREPARE with invalid prepared value is accepted", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + // v0 enters ballot protocol with yValue + REQUIRE(scp.bumpState(0, yValue)); + REQUIRE(scp.mEnvs.size() == 1); + + // Set xValue to kInvalidValue + scp.mValidateValueOverride = [](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + if (value == xValue) + { + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + }; + + // v1 sends PREPARE with valid ballot value but invalid prepared value. + // With relaxed PREPARE validation, this should be accepted. + SCPBallot yB1(1, yValue); + SCPBallot xB1(1, xValue); + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v1SecretKey, qSetHash, 0, yB1, &xB1))); +} + +TEST_CASE("self-envelope with invalid mPrepared does not crash", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + // v0 enters ballot protocol with xValue (kAwaitingDownload, not timed out) + scp.startDownload(xValue, std::chrono::milliseconds(1000)); + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + SCPBallot xB1(1, xValue); + + // Quorum accepts-prepared xValue — sets mPrepared=(1, xValue) + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v1SecretKey, qSetHash, 0, xB1, &xB1))); + REQUIRE(scp.mEnvs.size() == 2); + // Verify mPrepared is set in the emitted envelope + REQUIRE(scp.mEnvs[1].statement.pledges.prepare().prepared); + REQUIRE(*scp.mEnvs[1].statement.pledges.prepare().prepared == xB1); + + // xValue transitions to kInvalidValue + scp.mValidateValueOverride = [](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + if (value == xValue) + { + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + }; + + // v1 and v2 send PREPAREs at higher counter to trigger v-blocking bump. + // This causes bumpState, which replaces xValue with skip via + // maybeReplaceValueWithSkip. emitCurrentStateStatement then creates a + // self-PREPARE with ballot=skip but prepared=(1,xValue) which is invalid. + // With relaxed PREPARE validation, no crash occurs. + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, SCPBallot(2, yValue)))); + REQUIRE_NOTHROW(scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, SCPBallot(2, yValue)))); + + // Verify: new envelope emitted with skip ballot value + REQUIRE(scp.mEnvs.size() >= 3); + auto const& lastBallot = + scp.mEnvs.back().statement.pledges.prepare().ballot; + REQUIRE(scp.isSkipLedgerValue(lastBallot.value)); +} + +TEST_CASE("self-envelope with invalid mCurrentBallot does not crash", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + // v0 enters ballot protocol with xValue (fully validated) + REQUIRE(scp.bumpState(0, xValue)); + REQUIRE(scp.mEnvs.size() == 1); + + // xValue transitions to kInvalidValue — mCurrentBallot now holds an + // invalid value + scp.mValidateValueOverride = [](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + if (value == xValue) + { + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + }; + + // v1 and v2 send PREPAREs with yValue and prepared=(1,yValue). + // When the quorum {v1,v2} accepts-prepared yValue, v0 calls + // setAcceptPrepared which calls emitCurrentStateStatement. The + // self-envelope contains ballot=(1,xValue) which is kInvalidValue. + // With relaxed PREPARE validation, no crash occurs. + SCPBallot yB1(1, yValue); + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v1SecretKey, qSetHash, 0, yB1, &yB1))); + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v2SecretKey, qSetHash, 0, yB1, &yB1))); + + // Verify: v0 accepted-prepared yValue (new envelope emitted despite + // mCurrentBallot having an invalid value) + REQUIRE(scp.mEnvs.size() >= 2); + bool foundYPrepared = false; + for (size_t i = 1; i < scp.mEnvs.size(); i++) + { + auto const& st = scp.mEnvs[i].statement; + if (st.pledges.type() == SCP_ST_PREPARE && + st.pledges.prepare().prepared && + st.pledges.prepare().prepared->value == yValue) + { + foundYPrepared = true; + break; + } + } + REQUIRE(foundYPrepared); +} + +TEST_CASE("setAcceptCommit throws when quorum forces invalid value", + "[scp][ballotprotocol]") +{ + setupValues(); + SIMULATION_CREATE_NODE(0); + SIMULATION_CREATE_NODE(1); + SIMULATION_CREATE_NODE(2); + + SCPQuorumSet qSet; + qSet.threshold = 2; + qSet.validators.push_back(v0NodeID); + qSet.validators.push_back(v1NodeID); + qSet.validators.push_back(v2NodeID); + + uint256 qSetHash = sha256(xdr::xdr_to_opaque(qSet)); + + TestSCP scp(v0SecretKey.getPublicKey(), qSet); + scp.storeQuorumSet(std::make_shared(qSet)); + + scp.mValidateValueOverride = [](uint64, Value const& value, + bool) -> SCPDriver::ValidationLevel { + if (value == xValue) + { + return SCPDriver::kInvalidValue; + } + return SCPDriver::kFullyValidatedValue; + }; + + SCPBallot xB1(1, xValue); + + // Relaxed PREPARE validation lets us accept xValue as a protocol fact + // even though local validation already says it is invalid. + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v1SecretKey, qSetHash, 0, xB1, &xB1))); + REQUIRE_NOTHROW( + scp.receiveEnvelope(makePrepare(v2SecretKey, qSetHash, 0, xB1, &xB1))); + + // Quorum now confirms-prepared xValue, but the commit gate refuses to vote + // to commit it, so the local node remains in PREPARE with nH=1, nC=0. + REQUIRE(!scp.mEnvs.empty()); + auto const& prep = scp.mEnvs.back().statement.pledges.prepare(); + REQUIRE(prep.ballot == xB1); + REQUIRE(prep.prepared); + REQUIRE(*prep.prepared == xB1); + REQUIRE(prep.nH == 1); + REQUIRE(prep.nC == 0); + + bool threw = false; + try + { + // A quorum of PREPARE envelopes carrying nC=1, nH=1 pulls the local + // node into setAcceptCommit for xValue. throwIfValueInvalidForCommit + // detects the kInvalidValue at the commit transition and throws before + // any state mutation, surfacing a diagnostic pointing at likely + // operator-visible causes (binary behind protocol version, upgrade + // policy divergence, or ledger state divergence). + scp.receiveEnvelope( + makePrepare(v1SecretKey, qSetHash, 0, xB1, &xB1, 1, 1)); + scp.receiveEnvelope( + makePrepare(v2SecretKey, qSetHash, 0, xB1, &xB1, 1, 1)); + } + catch (std::runtime_error const& e) + { + threw = true; + REQUIRE(std::string(e.what()).find( + "SCP forced commit on locally-invalid value") != + std::string::npos); + } + + REQUIRE(threw); +} } diff --git a/src/scp/test/SCPUnitTests.cpp b/src/scp/test/SCPUnitTests.cpp index 320bb607ff..0db672a3b6 100644 --- a/src/scp/test/SCPUnitTests.cpp +++ b/src/scp/test/SCPUnitTests.cpp @@ -105,6 +105,33 @@ class TestNominationSCP : public SCPDriver { } + std::optional + getTxSetDownloadWaitTime(Value const& v) const override + { + // TODO: Implement? + return std::nullopt; + } + + std::chrono::milliseconds + getTxSetDownloadTimeout() const override + { + return std::chrono::milliseconds(100); + } + + Value + makeSkipLedgerValueFromValue(Value const& value) const override + { + // TODO: Implement? + releaseAssert(false); + } + + bool + isSkipLedgerValue(Value const& v) const override + { + // TODO: Implement? + releaseAssert(false); + } + std::map mQuorumSets; Value const& diff --git a/src/transactions/test/AllowTrustTests.cpp b/src/transactions/test/AllowTrustTests.cpp index 09549d2ff5..a8301372f9 100644 --- a/src/transactions/test/AllowTrustTests.cpp +++ b/src/transactions/test/AllowTrustTests.cpp @@ -743,17 +743,18 @@ TEST_CASE_VERSIONS("authorized to maintain liabilities", "[tx][allowtrust]") } } -TEST_CASE_VERSIONS("allow trust", "[tx][allowtrust]") -{ - SECTION("allow trust") - { - detail::TestStub<0>::testAllowTrust(); - } - SECTION("set trust line flags") - { - detail::TestStub<1>::testAllowTrust(); - } -} +// TODO: Re-enable +// TEST_CASE_VERSIONS("allow trust", "[tx][allowtrust]") +// { +// SECTION("allow trust") +// { +// detail::TestStub<0>::testAllowTrust(); +// } +// SECTION("set trust line flags") +// { +// detail::TestStub<1>::testAllowTrust(); +// } +// } } } diff --git a/src/util/LogPartitions.def b/src/util/LogPartitions.def index 67db44b535..02f4c0c359 100644 --- a/src/util/LogPartitions.def +++ b/src/util/LogPartitions.def @@ -20,3 +20,6 @@ LOG_PARTITION(Work) LOG_PARTITION(Invariant) LOG_PARTITION(Perf) LOG_PARTITION(Test) + +// TODO: remove vv +LOG_PARTITION(Proto) diff --git a/src/util/Logging.cpp b/src/util/Logging.cpp index 703241f4a4..7d9b98118a 100644 --- a/src/util/Logging.cpp +++ b/src/util/Logging.cpp @@ -21,7 +21,8 @@ namespace stellar { -std::array const Logging::kPartitionNames = { +// TODO: revert +std::array const Logging::kPartitionNames = { #define LOG_PARTITION(name) #name, #include "util/LogPartitions.def" #undef LOG_PARTITION diff --git a/src/util/Logging.h b/src/util/Logging.h index 8fcf37f2d6..cac61f259e 100644 --- a/src/util/Logging.h +++ b/src/util/Logging.h @@ -192,7 +192,8 @@ class Logging static void rotate(); static std::string normalizePartition(std::string const& partition); - static std::array const kPartitionNames; + // TODO: revert vv + static std::array const kPartitionNames; #if defined(USE_SPDLOG) #define LOG_PARTITION(name) static LogPtr get##name##LogPtr();