diff --git a/.ai/plans/gloas-lookup-levels.md b/.ai/plans/gloas-lookup-levels.md new file mode 100644 index 00000000000..34ce1938b3f --- /dev/null +++ b/.ai/plans/gloas-lookup-levels.md @@ -0,0 +1,299 @@ +# Gloas block lookup: three-stream model + +Design for handling block lookups across forks with independent block, data, +and payload streams. Avoids state machine mutation — streams are additive only. + +## Problem + +In Gloas, a block has three independent components: **block**, **data** +(blobs/columns), and **payload** (execution payload envelope). Different +triggers need different subsets: + +- **Unknown root** (attestation reference): only need the block +- **Unknown parent, child is empty**: need block + data +- **Unknown parent, child is full**: need block + data + payload + +A lookup's completeness requirement can increase over time as children arrive. +Mutating a live state machine is error-prone, so the design uses additive-only +independent streams. + +## Design: three independent streams + +Each lookup contains three independent streams with their own lifecycle: + +``` +BLOCK stream: + 1. download block (has peers from lookup creation) + 2. wait for parent ready: + - Pre-Gloas: parent block is in fork choice (block_is_known_to_fork_choice) + - Gloas: parent block is in fork choice AND (parent is empty OR parent envelope imported) + 3. send block for processing + 4. block processing done + +DATA stream (created after block downloaded, if block has blobs): + preconditions: block downloaded + parent processed + starts with: 0 peers (Gloas) or lookup peers (pre-Gloas) + 1. wait for preconditions + peers > 0 + 2. download data + 3. wait for block processing done + 4. import data (send_blobs_for_processing / send_custody_columns_for_processing) + 5. data imported → continue empty children + +PAYLOAD stream (created after block downloaded, if block is full, Gloas only): + preconditions: block downloaded + parent processed + starts with: 0 peers + 1. wait for preconditions + peers > 0 + 2. download payload + 3. wait for block processing done + 4. import payload + 5. payload imported → continue full children +``` + +### Key properties + +- **Additive only**: streams are created, never mutated or removed. +- **0-peers gating**: streams exist proactively but can't download without + peers. Children arriving add peers to the right stream. +- **No upgrade logic**: no levels, no replacement. Just add peers. +- **Independent lifecycles**: each stream has its own download + processing + state. + +## When streams are created + +### Block stream + +Always created on lookup creation. Has the lookup's initial peers. + +### Data stream + +Created after block is downloaded, if `block.num_expected_blobs() > 0`. +Fork-dependent type: +- Deneb/Electra: blob request +- Fulu/Gloas: custody column request + +Initial peers: +- Pre-Gloas forks: same peers as block stream (always need data) +- Gloas: 0 peers (data only needed when a child arrives) + +### Payload stream + +Created after block is downloaded, if block is "full" (Gloas only). +Initial peers: 0 (payload only needed when a full child arrives). + +## What children do + +When child Y arrives with unknown parent X and lookup for X exists: + +1. Determine if parent X is full or empty using `parent_hash` from Y +2. Add Y's peers to X's **data** stream (if data stream exists) +3. If X is full: also add Y's peers to X's **payload** stream + +The child sets `awaiting_parent` with enough info to know what it waits for. + +## AwaitingParent enum + +```rust +enum AwaitingParent { + /// Pre-Gloas: wait for parent lookup to fully complete (block + data) + PreGloas { parent_root: Hash256 }, + /// Post-Gloas: track parent_hash to determine full/empty dependency + PostGloas { + parent_root: Hash256, + parent_hash: ExecutionBlockHash, + }, +} +``` + +For PostGloas children: +- If parent is empty → child waits for parent's **data** import +- If parent is full → child waits for parent's **payload** import + +## Child continuation + +Currently: `continue_child_lookups(block_root)` runs when a lookup completes. + +New model — three separate continuation events: + +- `on_block_processing_result(id, result)`: block processed. If the block + stream completes, advance data/payload streams (they can now download and + process). Does NOT unblock children yet. +- `on_data_processing_result(id, result)`: data imported. Unblock children + that are waiting on data (empty children in PostGloas, all children in + PreGloas). +- `on_payload_processing_result(id, result)`: payload imported. Unblock + children that are waiting on payload (full children in PostGloas). + +For PreGloas: `on_data_processing_result` completes the lookup (no payload). +For Gloas without data/payload streams: `on_block_processing_result` completes. + +## Completion conditions + +A lookup is complete when ALL active streams have finished processing: + +- If only block stream (Gloas unknown root, no children ever arrive): + complete after block processing +- If block + data (pre-Gloas, or Gloas with empty child): + complete after data import +- If block + data + payload (Gloas with full child): + complete after both data and payload import + +## Processing infrastructure (already exists) + +The codebase already has the message types for per-component processing: + +- `BlockProcessType::SingleBlock { id }` → block processing result +- `BlockProcessType::SingleBlob { id }` → data (blob) processing result +- `BlockProcessType::SingleCustodyColumn(id)` → data (column) processing result + +All flow through `SyncMessage::BlockComponentProcessed` and back to +`on_processing_result`. The handler needs to dispatch based on `process_type`. + +Methods already exist (currently dead code): +- `send_blobs_for_processing(id, block_root, blobs, ...)` in network_context.rs +- `send_custody_columns_for_processing(id, block_root, columns, ...)` in network_context.rs + +## Dependency diagram + +``` + block downloaded ─────────────────────────┐ + │ │ + ▼ ▼ + create data stream create payload stream + (if has blobs) (if block is full, Gloas) + │ │ + │ parent processed │ + │◄────────────┤────────────────────► │ + │ │ │ + │ ▼ │ + │ send block for processing │ + │ │ │ + has peers?───────┤ │ has peers?─┤ + │ yes │ │ │ yes │ + ▼ │ ▼ ▼ │ + download data │ block processing done download payload + │ │ │ │ + ▼ │ │ ▼ + wait for block done◄──┘─────────────┘──────►wait for block done + │ │ + ▼ ▼ + import data import payload + │ │ + ▼ ▼ + continue empty children continue full children +``` + +## Pre-Gloas fork behavior + +Minimal changes. Data stream always created with lookup peers (not 0). +No payload stream. AwaitingParent::PreGloas. Lookup completes after data +import (or after block processing if no data needed). + +## Error handling and retry + +Each stream retries independently: +- Block download failure → retry block download +- Data download failure → retry data download +- Payload download failure → retry payload download +- Block processing failure → reset ALL streams, retry from scratch +- Data processing failure → retry data download only +- Payload processing failure → retry payload download only + +## Peer tracking + +- Block stream: uses the lookup's main peer set (Arc>>) +- Data stream: own peer set, starts at 0 for Gloas, shared with block for pre-Gloas +- Payload stream: own peer set, always starts at 0 + +## Interaction with tree sync (future) + +The three-stream model enables tree sync naturally: + +**Phase 1 — Chain discovery:** +- Walk back the chain. Each lookup only runs its block stream. +- Data/payload streams exist (if block has blobs) but have 0 peers → idle. +- Fast and lightweight. + +**Phase 2 — When chain anchors to fork choice:** +- Process blocks in order (parent first via awaiting_parent). +- Children arriving add peers to parent's data/payload streams. +- Streams activate and start downloading. + +**Phase 3 — Processing:** +- Block processed → data/payload download starts (if peers available). +- Data imported → continue empty children. +- Payload imported → continue full children. + +No upgrade logic needed. The 0-peers pattern means data/payload streams +naturally activate when children provide peers, regardless of timing. + +### Depth limit implications + +With block streams being lightweight (no data/payload downloading until peers +arrive), `PARENT_DEPTH_TOLERANCE` can be increased. Only block downloads +contribute to bandwidth during chain discovery. + +## Implementation changes + +### SingleBlockLookup restructure + +```rust +struct SingleBlockLookup { + id: Id, + block_root: Hash256, + // Block stream — always present + block_request: BlockRequestState, + block_processing: ProcessingState, + // Data stream — created after block downloaded if has blobs + data_request: Option>, + data_processing: ProcessingState, + // Payload stream — created after block downloaded if full (Gloas) + payload_request: Option>, + payload_processing: ProcessingState, + // Peer sets + peers: Arc>>, + data_peers: Arc>>, + payload_peers: Arc>>, + // Parent tracking + awaiting_parent: Option, + created: Instant, + span: Span, +} + +enum ProcessingState { + NotSent, + Sent, + Done, +} +``` + +### DataRequestState (fork-dependent) + +```rust +enum DataRequestState { + Blobs(BlobRequestState), + Columns(CustodyRequestState), +} +``` + +### Modified: on_processing_result (mod.rs) + +Dispatch based on `BlockProcessType`: +- `SingleBlock` → update block_processing state, drive data/payload +- `SingleBlob` / `SingleCustodyColumn` → update data_processing, continue + empty children +- (future) payload type → update payload_processing, continue full children + +### Modified: continue_child_lookups + +Split into component-specific continuation: +- `on_data_imported(block_root)` → continue children awaiting data +- `on_payload_imported(block_root)` → continue children awaiting payload + +### Types removed + +- `LookupState` enum (Downloading | Processing) +- `BlockComponentsByRootRequest` +- `DownloadPhase` +- `BlockExtraRequests` +- `BlockExtras` +- `BlockComponentsResult` diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e226c707a4e..656068aac21 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5851,6 +5851,12 @@ impl BeaconChain { .contains_block(root) } + // TODO(gloas): implement this once issue #8956 is resolved + pub fn envelope_is_known_to_fork_choice(&self, root: &Hash256) -> bool { + // for now just check the database + self.store.payload_envelope_exists(root).unwrap_or(false) + } + /// Determines the beacon proposer for the next slot. If that proposer is registered in the /// `execution_layer`, provide the `execution_layer` with the necessary information to produce /// `PayloadAttributes` for future calls to fork choice. diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 486a4438579..4ddd58c19cf 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -23,6 +23,8 @@ pub enum SyncRequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a payload envelope given a hash. + SinglePayloadEnvelope { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Blocks by range request diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 1f55d9a8789..1f86cde6986 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -3486,6 +3486,13 @@ impl NetworkBeaconProcessor { "Processing execution payload bid" ); + // Keep bids ignored for now. + // + // Note: bids are weak signals for lookup-sync purposes compared with payload envelopes and + // payload attestations, which directly carry a beacon block root that can be used as an + // unknown-block trigger. We intentionally avoid adding lookup triggers from bids to reduce + // noise until bid verification/integration is fully designed. + // For now, ignore all payload bids since verification is not implemented self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } @@ -3507,6 +3514,13 @@ impl NetworkBeaconProcessor { "Processing payload attestation message" ); + // Trigger lookup sync by beacon block root. Treat payload attestations as unknown block + // root signals (same as attestation-style lookup trigger). + self.send_sync_message(SyncMessage::UnknownBlockHashFromAttestation( + peer_id, + payload_attestation_message.data.beacon_block_root, + )); + // For now, ignore all payload attestation messages since verification is not implemented self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Ignore); } diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index e6982e6a847..ede43e677c7 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -24,7 +24,10 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, trace, warn}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock}; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, +}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -327,10 +330,13 @@ impl Router { Response::DataColumnsByRange(data_column) => { self.on_data_columns_by_range_response(peer_id, app_request_id, data_column); } - // TODO(EIP-7732): implement outgoing payload envelopes by range and root - // responses once sync manager requests them. - Response::PayloadEnvelopesByRoot(_) | Response::PayloadEnvelopesByRange(_) => { - debug!("Requesting envelopes by root and by range not supported yet"); + Response::PayloadEnvelopesByRoot(envelope) => { + self.on_payload_envelopes_by_root_response(peer_id, app_request_id, envelope); + } + // TODO(EIP-7732): implement outgoing payload envelopes by range responses + // once sync manager requests them. + Response::PayloadEnvelopesByRange(_) => { + debug!("Requesting envelopes by range not supported yet"); } // Light client responses should not be received Response::LightClientBootstrap(_) @@ -794,6 +800,40 @@ impl Router { } } + /// Handle a `PayloadEnvelopesByRoot` response from the peer. + pub fn on_payload_envelopes_by_root_response( + &mut self, + peer_id: PeerId, + app_request_id: AppRequestId, + envelope: Option>>, + ) { + let sync_request_id = match app_request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::SinglePayloadEnvelope { .. } => id, + other => { + crit!(request = ?other, "PayloadEnvelopesByRoot response on incorrect request"); + return; + } + }, + AppRequestId::Router => { + crit!(%peer_id, "All PayloadEnvelopesByRoot requests belong to sync"); + return; + } + AppRequestId::Internal => unreachable!("Handled internally"), + }; + + trace!( + %peer_id, + "Received PayloadEnvelopesByRoot Response" + ); + self.send_to_sync(SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp: timestamp_now(), + }); + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs deleted file mode 100644 index edd99345b43..00000000000 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ /dev/null @@ -1,217 +0,0 @@ -use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, -}; -use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, -}; -use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; -use beacon_chain::BeaconChainTypes; -use lighthouse_network::service::api_types::Id; -use parking_lot::RwLock; -use std::collections::HashSet; -use std::sync::Arc; -use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; - -use super::SingleLookupId; -use super::single_block_lookup::{ComponentRequests, DownloadResult}; - -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - Blob, - CustodyColumn, -} - -/// This trait unifies common single block lookup functionality across blocks and blobs. This -/// includes making requests, verifying responses, and handling processing results. A -/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is -/// implemented for each. -/// -/// The use of the `ResponseType` associated type gives us a degree of type -/// safety when handling a block/blob response ensuring we only mutate the correct corresponding -/// state. -pub trait RequestState { - /// The type created after validation. - type VerifiedResponseType: Clone; - - /// Request the network context to prepare a request of a component of `block_root`. If the - /// request is not necessary because the component is already known / processed, return false. - /// Return true if it sent a request and we can expect an event back from the network. - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result; - - /* Response handling methods */ - - /// Send the response to the beacon processor. - fn send_for_processing( - id: Id, - result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; - - /* Utility methods */ - - /// Returns the `ResponseType` associated with this trait implementation. Useful in logging. - fn response_type() -> ResponseType; - - /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str>; - - /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. - fn get_state(&self) -> &SingleLookupRequestState; - - /// A getter for a mutable reference to the SingleLookupRequestState associated with this trait. - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; -} - -impl RequestState for BlockRequestState { - type VerifiedResponseType = Arc>; - - fn make_request( - &self, - id: SingleLookupId, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.block_lookup_request(id, lookup_peers, self.requested_block_root) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: SingleLookupId, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_block_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Block - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - Ok(&mut request.block_request_state) - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for BlobRequestState { - type VerifiedResponseType = FixedBlobSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Blob - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), - ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for CustodyRequestState { - type VerifiedResponseType = DataColumnSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_custody_columns_for_processing( - id, - block_root, - value, - seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), - ) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::CustodyColumn - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), - ComponentRequests::ActiveCustodyRequest(request) => Ok(request), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 394f2fc37d5..ff2194cb503 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -22,32 +22,33 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; -use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; +use self::single_block_lookup::{ + AwaitingParent, LookupRequestError, LookupResult, PeerType, SingleBlockLookup, +}; use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::SyncMessage; -use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::{ AvailabilityCheckError, AvailabilityCheckErrorCategory, }; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; -pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::service::api_types::SingleLookupReqId; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; use store::Hash256; use tracing::{debug, error, warn}; -use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; +use types::data::FixedBlobSidecarList; +use types::{ + BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock, SignedExecutionPayloadEnvelope, +}; -pub mod common; pub mod parent_chain; mod single_block_lookup; @@ -77,6 +78,15 @@ const LOOKUP_MAX_DURATION_NO_PEERS_SECS: u64 = 10; /// take at most 2 GB. 200 lookups allow 3 parallel chains of depth 64 (current maximum). const MAX_LOOKUPS: usize = 200; +type BlockDownloadResponse = + Result<(Arc>, PeerGroup, Duration), RpcResponseError>; +type BlobDownloadResponse = + Result<(FixedBlobSidecarList, PeerGroup, Duration), RpcResponseError>; +type CustodyDownloadResponse = + Result<(types::DataColumnSidecarList, PeerGroup, Duration), RpcResponseError>; +type PayloadDownloadResponse = + Result<(Arc>, PeerGroup, Duration), RpcResponseError>; + pub enum BlockComponent { Block(DownloadResult>>), Blob(DownloadResult>>), @@ -106,13 +116,6 @@ impl BlockComponent { pub type SingleLookupId = u32; -enum Action { - Retry, - ParentUnknown { parent_root: Hash256 }, - Drop(/* reason: */ String), - Continue, -} - pub struct BlockLookups { /// A cache of block roots that must be ignored for some time to prevent useless searches. For /// example if a chain is too long, its lookup chain is dropped, and range sync is expected to @@ -205,8 +208,11 @@ impl BlockLookups { ) -> bool { let parent_root = block_component.parent_root(); + // We don't know the child's fork yet (no block downloaded), use PreGloas conservatively. + // The correct AwaitingParent will be set when the child's block downloads. + let awaiting = AwaitingParent::PreGloas(parent_root); let parent_lookup_exists = - self.search_parent_of_child(parent_root, block_root, &[peer_id], cx); + self.search_parent_of_child(awaiting, block_root, &[peer_id], cx); // Only create the child lookup if the parent exists if parent_lookup_exists { // `search_parent_of_child` ensures that the parent lookup exists so we can safely wait for it @@ -218,6 +224,10 @@ impl BlockLookups { // to have the rest of the block components (refer to decoupled blob gossip). Create // the lookup with zero peers to house the block components. &[], + &PeerType { + data: false, + payload: false, + }, cx, ) } else { @@ -225,7 +235,7 @@ impl BlockLookups { } } - /// Seach a block whose parent root is unknown. + /// Search a block whose parent root is unknown. /// /// Returns true if the lookup is created or already exists #[must_use = "only reference the new lookup if returns true"] @@ -235,7 +245,41 @@ impl BlockLookups { peer_source: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { - self.new_current_lookup(block_root, None, None, peer_source, cx) + self.new_current_lookup( + block_root, + None, + None, + peer_source, + &PeerType { + data: false, + payload: false, + }, + cx, + ) + } + + /// Search for a block triggered by a Gloas data column. The peer that sent the data column + /// is a valid data source, so mark it as data-capable. + /// + /// Returns true if the lookup is created or already exists + #[must_use = "only reference the new lookup if returns true"] + pub fn search_unknown_block_with_data_peer( + &mut self, + block_root: Hash256, + peer_source: &[PeerId], + cx: &mut SyncNetworkContext, + ) -> bool { + self.new_current_lookup( + block_root, + None, + None, + peer_source, + &PeerType { + data: true, + payload: false, + }, + cx, + ) } /// A block or blob triggers the search of a parent. @@ -247,11 +291,19 @@ impl BlockLookups { #[must_use = "only reference the new lookup if returns true"] pub fn search_parent_of_child( &mut self, - block_root_to_search: Hash256, + awaiting_parent: AwaitingParent, child_block_root_trigger: Hash256, peers: &[PeerId], cx: &mut SyncNetworkContext, ) -> bool { + let block_root_to_search = awaiting_parent.parent_root(); + + // The zero hash is the parent root of the genesis block, not a real block. + if block_root_to_search == Hash256::ZERO { + debug!("Not searching for zero hash (parent of genesis)"); + return false; + } + let parent_chains = self.active_parent_lookups(); for (chain_idx, parent_chain) in parent_chains.iter().enumerate() { @@ -339,8 +391,30 @@ impl BlockLookups { } } + // Child's peers can serve block, and data + payload if the parent is full. + // In Gloas, data and payload are coupled: empty blocks have neither. + // Pre-Gloas: data is always needed with block, payload is never needed. + let peer_type = match &awaiting_parent { + AwaitingParent::PreGloas(_) => PeerType { + data: true, + payload: false, + }, + AwaitingParent::PostGloas(..) => { + let is_full = self + .single_block_lookups + .values() + .find(|l| l.is_for_block(block_root_to_search)) + .map(|parent| parent.is_full_payload(&awaiting_parent)) + .unwrap_or(false); + PeerType { + data: is_full, + payload: is_full, + } + } + }; + // `block_root_to_search` is a failed chain check happens inside new_current_lookup - self.new_current_lookup(block_root_to_search, None, None, peers, cx) + self.new_current_lookup(block_root_to_search, None, None, peers, &peer_type, cx) } /// Searches for a single block hash. If the blocks parent is unknown, a chain of blocks is @@ -353,6 +427,7 @@ impl BlockLookups { block_component: Option>, awaiting_parent: Option, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> bool { // If this block or it's parent is part of a known ignored chain, ignore it. @@ -378,7 +453,8 @@ impl BlockLookups { } } - if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, cx) { + if let Err(e) = self.add_peers_to_lookup_and_ancestors(lookup_id, peers, peer_type, cx) + { warn!(error = ?e, "Error adding peers to ancestor lookup"); } @@ -405,7 +481,13 @@ impl BlockLookups { // If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve), // signal here to hold processing downloaded data. - let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent); + let mut lookup = SingleBlockLookup::new( + block_root, + peers, + peer_type, + cx.next_id(), + awaiting_parent.map(AwaitingParent::PreGloas), + ); let _guard = lookup.span.clone().entered(); // Add block components to the new request @@ -446,88 +528,78 @@ impl BlockLookups { /* Lookup responses */ - /// Process a block or blob response received from a single lookup request. - pub fn on_download_response>( + /// Process a block response received from a single lookup request. + pub fn on_block_download_response( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: BlockDownloadResponse, cx: &mut SyncNetworkContext, ) { - let result = self.on_download_response_inner::(id, response, cx); - self.on_lookup_result(id.lookup_id, result, "download_response", cx); + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!(?id, "Block returned for single block lookup not present"); + return; + }; + let block_root = lookup.block_root(); + debug!( + ?block_root, + ?id, + is_ok = response.is_ok(), + "Block download response" + ); + + let result = lookup.on_block_download_response(id.req_id, response.map_err(|_| ()), cx); + self.on_lookup_result(id.lookup_id, result, "block_download_response", cx); } - /// Process a block or blob response received from a single lookup request. - pub fn on_download_response_inner>( + pub fn on_blob_download_response( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: BlobDownloadResponse, cx: &mut SyncNetworkContext, - ) -> Result { - // Note: no need to downscore peers here, already downscored on network context - - let response_type = R::response_type(); + ) { let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { - // We don't have the ability to cancel in-flight RPC requests. So this can happen - // if we started this RPC request, and later saw the block/blobs via gossip. - debug!(?id, "Block returned for single block lookup not present"); - return Err(LookupRequestError::UnknownLookup); + debug!(?id, "Blob returned for single block lookup not present"); + return; }; + debug!(block_root = ?lookup.block_root(), ?id, is_ok = response.is_ok(), "Blob download response"); - let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); + let result = lookup.on_blob_download_response(id.req_id, response.map_err(|_| ()), cx); + self.on_lookup_result(id.lookup_id, result, "blob_download_response", cx); + } - match response { - Ok((response, peer_group, seen_timestamp)) => { - debug!( - ?block_root, - ?id, - ?peer_group, - ?response_type, - "Received lookup download success" - ); + pub fn on_custody_download_response( + &mut self, + id: SingleLookupReqId, + response: CustodyDownloadResponse, + cx: &mut SyncNetworkContext, + ) { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!(?id, "Custody returned for single block lookup not present"); + return; + }; + debug!(block_root = ?lookup.block_root(), ?id, is_ok = response.is_ok(), "Custody download response"); - // Here we could check if response extends a parent chain beyond its max length. - // However we defer that check to the handling of a processing error ParentUnknown. - // - // Here we could check if there's already a lookup for parent_root of `response`. In - // that case we know that sending the response for processing will likely result in - // a `ParentUnknown` error. However, for simplicity we choose to not implement this - // optimization. - - // Register the download peer here. Once we have received some data over the wire we - // attribute it to this peer for scoring latter regardless of how the request was - // done. - request_state.on_download_success( - id.req_id, - DownloadResult { - value: response, - block_root, - seen_timestamp, - peer_group, - }, - )?; - // continue_request will send for processing as the request state is AwaitingProcessing - } - Err(e) => { - // No need to log peer source here. When sending a DataColumnsByRoot request we log - // the peer and the request ID which is linked to this `id` value here. - debug!( - ?block_root, - ?id, - ?response_type, - error = ?e, - "Received lookup download failure" - ); + let result = lookup.on_custody_download_response(id.req_id, response.map_err(|_| ()), cx); + self.on_lookup_result(id.lookup_id, result, "custody_download_response", cx); + } - request_state.on_download_failure(id.req_id)?; - // continue_request will retry a download as the request state is AwaitingDownload - } - } + pub fn on_payload_download_response( + &mut self, + id: SingleLookupReqId, + response: PayloadDownloadResponse, + cx: &mut SyncNetworkContext, + ) { + let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { + debug!( + ?id, + "Payload envelope returned for single block lookup not present" + ); + return; + }; + debug!(block_root = ?lookup.block_root(), ?id, is_ok = response.is_ok(), "Payload download response"); - lookup.continue_requests(cx) + let result = lookup.on_payload_download_response(id.req_id, response.map_err(|_| ()), cx); + self.on_lookup_result(id.lookup_id, result, "payload_download_response", cx); } /* Error responses */ @@ -549,21 +621,22 @@ impl BlockLookups { result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { + let lookup_id = process_type.id(); let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleBlob { id } => { - self.on_processing_result_inner::>(id, result, cx) + BlockProcessType::SingleBlock { .. } => { + self.on_block_processing_result(lookup_id, result, cx) } - BlockProcessType::SingleCustodyColumn(id) => { - self.on_processing_result_inner::>(id, result, cx) + BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { + self.on_data_processing_result(lookup_id, result, cx) } }; - self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); + self.on_lookup_result(lookup_id, lookup_result, "processing_result", cx); } - pub fn on_processing_result_inner>( + /// Handle block processing result. The block is sent for processing alone (without data). + /// On success: marks block processing done and advances data/payload streams. + /// On error: penalizes block peer, resets all streams, retries from scratch. + fn on_block_processing_result( &mut self, lookup_id: SingleLookupId, result: BlockProcessingResult, @@ -575,180 +648,147 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); debug!( - component = ?R::response_type(), ?block_root, id = lookup_id, ?result, - "Received lookup processing result" + "Received block processing result" ); - let action = match result { + match result { + // Block processed successfully (imported or missing components — both are ok since + // we send the block alone first, data follows independently) BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { + .. + }) | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) | BlockProcessingResult::Err(BlockError::GenesisBlock) => { - // Successfully imported - request_state.on_processing_success()?; - Action::Continue - } - - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) => { - // `on_processing_success` is called here to ensure the request state is updated prior to checking - // if both components have been processed. - request_state.on_processing_success()?; - - if lookup.all_components_processed() { - // We don't request for other block components until being sure that the block has - // data. If we request blobs / columns to a peer we are sure those must exist. - // Therefore if all components are processed and we still receive `MissingComponents` - // it indicates an internal bug. - return Err(LookupRequestError::MissingComponentsAfterAllProcessed); - } else { - // Continue request, potentially request blobs - Action::Retry - } - } - BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => { - // This is unreachable because RPC blocks do not undergo gossip verification, and - // this error can *only* come from gossip verification. - error!(?block_root, "Single block lookup hit unreachable condition"); - Action::Drop("DuplicateImportStatusUnknown".to_owned()) + lookup.on_block_processing_result(true, cx) } BlockProcessingResult::Ignored => { - // Beacon processor signalled to ignore the block processing result. - // This implies that the cpu is overloaded. Drop the request. - warn!( - component = ?R::response_type(), - "Lookup component processing ignored, cpu might be overloaded" - ); - Action::Drop("Block processing ignored".to_owned()) + warn!("Block processing ignored, cpu might be overloaded"); + Err(LookupRequestError::Failed( + "Block processing ignored".to_owned(), + )) } BlockProcessingResult::Err(e) => { - match e { - BlockError::BeaconChainError(e) => { - // Internal error - error!(%block_root, error = ?e, "Beacon chain error processing lookup component"); - Action::Drop(format!("{e:?}")) - } - BlockError::ParentUnknown { parent_root, .. } => { - // Reverts the status of this request to `AwaitingProcessing` holding the - // downloaded data. A future call to `continue_requests` will re-submit it - // once there are no pending parent requests. - // Note: `BlockError::ParentUnknown` is only returned when processing - // blocks, not blobs. - request_state.revert_to_awaiting_processing()?; - Action::ParentUnknown { parent_root } - } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - ?block_root, - error = ?e, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured" - ); - Action::Drop(format!("{e:?}")) + debug!(?block_root, error = ?e, "Block processing error, retrying"); + + match &e { + BlockError::ParentUnknown { .. } => { + return Err(LookupRequestError::InternalError( + "ParentUnknown on processing".to_string(), + )); } + // No penalization for internal / non-attributable errors + BlockError::BeaconChainError(_) + | BlockError::DuplicateImportStatusUnknown(..) => {} + BlockError::ExecutionPayloadError(epe) if !epe.penalize_peer() => {} BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => - { - // There errors indicate internal problems and should not downscore the peer - warn!(?block_root, error = ?e, "Internal availability check failure"); - - // Here we choose *not* to call `on_processing_failure` because this could result in a bad - // lookup state transition. This error invalidates both blob and block requests, and we don't know the - // state of both requests. Blobs may have already successfullly processed for example. - // We opt to drop the lookup instead. - Action::Drop(format!("{e:?}")) - } - other => { - debug!( - ?block_root, - component = ?R::response_type(), - error = ?other, - "Invalid lookup component" - ); - let peer_group = request_state.on_processing_failure()?; - let peers_to_penalize: Vec<_> = match other { - // Note: currenlty only InvalidColumn errors have index granularity, - // but future errors may follow the same pattern. Generalize this - // pattern with https://github.com/sigp/lighthouse/pull/6321 - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn((index_opt, _)), - ) => { - match index_opt { - Some(index) => peer_group.of_index(index as usize).collect(), - // If no index supplied this is an un-attributable fault. In practice - // this should never happen. - None => vec![], - } - } - _ => peer_group.all().collect(), - }; - for peer in peers_to_penalize { + if e.category() == AvailabilityCheckErrorCategory::Internal => {} + // All other attributable errors: penalize the block peer + _ => { + if let Some(block_peer) = lookup.block_peer() { cx.report_peer( - *peer, + block_peer, PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - ResponseType::CustodyColumn => { - "lookup_custody_column_processing_failure" - } - }, + "lookup_block_processing_failure", ); } - - Action::Retry } } + + // Block processing failed — reset everything and retry from scratch + lookup.on_block_processing_result(false, cx) } + } + } + + /// Handle data processing result (blobs or custody columns). + /// On success: marks data processing done, may complete the lookup. + /// On error: penalizes data peers, retries data download only. + fn on_data_processing_result( + &mut self, + lookup_id: SingleLookupId, + result: BlockProcessingResult, + cx: &mut SyncNetworkContext, + ) -> Result { + let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else { + debug!(id = lookup_id, "Unknown single block lookup"); + return Err(LookupRequestError::UnknownLookup); }; - match action { - Action::Retry => { - // Trigger download for all components in case `MissingComponents` failed the blob - // request. Also if blobs are `AwaitingProcessing` and need to be progressed - lookup.continue_requests(cx) + let block_root = lookup.block_root(); + + debug!( + ?block_root, + id = lookup_id, + ?result, + "Received data processing result" + ); + + match result { + BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) + | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) + | BlockProcessingResult::Err(BlockError::GenesisBlock) => { + lookup.on_data_processing_result(true, cx) } - Action::ParentUnknown { parent_root } => { - let peers = lookup.all_peers(); - // Mark lookup as awaiting **before** creating the parent lookup. At this point the - // lookup maybe inconsistent. - lookup.set_awaiting_parent(parent_root); - let parent_lookup_exists = - self.search_parent_of_child(parent_root, block_root, &peers, cx); - if parent_lookup_exists { - // The parent lookup exist or has been created. It's safe for `lookup` to - // reference the parent as awaiting. - debug!( - id = lookup_id, - ?block_root, - ?parent_root, - "Marking lookup as awaiting parent" - ); - Ok(LookupResult::Pending) - } else { - // The parent lookup is faulty and was not created, we must drop the `lookup` as - // it's in an inconsistent state. We must drop all of its children too. - Err(LookupRequestError::Failed(format!( - "Parent lookup is faulty {parent_root:?}" - ))) - } + BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { + .. + }) => { + // Data sent for processing but still missing components — this can happen if + // the block hasn't been fully validated yet. Treat as success for the data + // stream; completion check will handle the rest. + lookup.on_data_processing_result(true, cx) } - Action::Drop(reason) => { - // Drop with noop - Err(LookupRequestError::Failed(reason)) + BlockProcessingResult::Ignored => { + warn!("Data processing ignored, cpu might be overloaded"); + Err(LookupRequestError::Failed( + "Data processing ignored".to_owned(), + )) } - Action::Continue => { - // Drop this completed lookup only - Ok(LookupResult::Completed) + BlockProcessingResult::Err(e) => { + debug!(?block_root, error = ?e, "Data processing error, retrying"); + + match &e { + // No penalization for internal / non-attributable errors + BlockError::BeaconChainError(_) + | BlockError::DuplicateImportStatusUnknown(..) => {} + BlockError::AvailabilityCheck(e) + if e.category() == AvailabilityCheckErrorCategory::Internal => {} + // InvalidColumn: penalize only the peer(s) that served the bad column + BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn(( + index_opt, + _, + ))) => { + if let Some(custody_pg) = lookup.data_peer_group() + && let Some(index) = index_opt + { + for peer in custody_pg.of_index(*index as usize) { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + "lookup_data_processing_failure", + ); + } + } + } + // All other attributable errors: penalize the block peer (who also serves blobs) + _ => { + if let Some(block_peer) = lookup.block_peer() { + cx.report_peer( + block_peer, + PeerAction::MidToleranceError, + "lookup_data_processing_failure", + ); + } + } + } + + // Data processing failed — retry data download only + lookup.on_data_processing_result(false, cx) } } } @@ -771,14 +811,6 @@ impl BlockLookups { let lookup_result = if imported { Ok(LookupResult::Completed) } else { - // A lookup may be in the following state: - // - Block awaiting processing from a different source - // - Blobs downloaded processed, and inserted into the da_checker - // - // At this point the block fails processing (e.g. execution engine offline) and it is - // removed from the da_checker. Note that ALL components are removed from the da_checker - // so when we re-download and process the block we get the error - // MissingComponentsAfterAllProcessed and get stuck. lookup.reset_requests(); lookup.continue_requests(cx) }; @@ -791,7 +823,7 @@ impl BlockLookups { let mut lookup_results = vec![]; // < need to buffer lookup results to not re-borrow &mut self for (id, lookup) in self.single_block_lookups.iter_mut() { - if lookup.awaiting_parent() == Some(block_root) { + if lookup.awaiting_parent().map(|a| a.parent_root()) == Some(block_root) { lookup.resolve_awaiting_parent(); debug!( parent_root = ?block_root, @@ -827,7 +859,10 @@ impl BlockLookups { let child_lookups = self .single_block_lookups .iter() - .filter(|(_, lookup)| lookup.awaiting_parent() == Some(dropped_lookup.block_root())) + .filter(|(_, lookup)| { + lookup.awaiting_parent().map(|a| a.parent_root()) + == Some(dropped_lookup.block_root()) + }) .map(|(id, _)| *id) .collect::>(); @@ -847,7 +882,21 @@ impl BlockLookups { cx: &mut SyncNetworkContext, ) -> bool { match result { - Ok(LookupResult::Pending) => true, // no action + Ok(LookupResult::Pending) => true, + Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root, + peers, + .. + }) => { + if self.search_parent_of_child(awaiting_parent, block_root, &peers, cx) { + true + } else { + self.drop_lookup_and_children(id, "Failed"); + self.update_metrics(); + false + } + } Ok(LookupResult::Completed) => { if let Some(lookup) = self.single_block_lookups.remove(&id) { debug!( @@ -995,17 +1044,16 @@ impl BlockLookups { &'a self, lookup: &'a SingleBlockLookup, ) -> Result<&'a SingleBlockLookup, String> { - if let Some(awaiting_parent) = lookup.awaiting_parent() { + if let Some(awaiting) = lookup.awaiting_parent() { + let parent_root = awaiting.parent_root(); if let Some(lookup) = self .single_block_lookups .values() - .find(|l| l.block_root() == awaiting_parent) + .find(|l| l.block_root() == parent_root) { self.find_oldest_ancestor_lookup(lookup) } else { - Err(format!( - "Lookup references unknown parent {awaiting_parent:?}" - )) + Err(format!("Lookup references unknown parent {parent_root:?}")) } } else { Ok(lookup) @@ -1013,12 +1061,14 @@ impl BlockLookups { } /// Adds peers to a lookup and its ancestors recursively. - /// Note: Takes a `lookup_id` as argument to allow recursion on mutable lookups, without having - /// to duplicate the code to add peers to a lookup + /// - Block peers are added at each level (needed for block download). + /// - When recursing from child to parent, also adds to parent's data/payload peer sets, + /// since children arriving activates the parent's data/payload downloads. fn add_peers_to_lookup_and_ancestors( &mut self, lookup_id: SingleLookupId, peers: &[PeerId], + peer_type: &PeerType, cx: &mut SyncNetworkContext, ) -> Result<(), String> { let lookup = self @@ -1028,7 +1078,7 @@ impl BlockLookups { let mut added_some_peer = false; for peer in peers { - if lookup.add_peer(*peer) { + if lookup.add_peer(*peer, peer_type) { added_some_peer = true; debug!( block_root = ?lookup.block_root(), @@ -1038,13 +1088,27 @@ impl BlockLookups { } } - if let Some(parent_root) = lookup.awaiting_parent() { - if let Some((&child_id, _)) = self + if let Some(awaiting) = lookup.awaiting_parent() { + let parent_root = awaiting.parent_root(); + if let Some((&parent_id, parent_lookup)) = self .single_block_lookups .iter() .find(|(_, l)| l.block_root() == parent_root) { - self.add_peers_to_lookup_and_ancestors(child_id, peers, cx) + let peer_type = match &awaiting { + AwaitingParent::PreGloas(_) => PeerType { + data: true, + payload: false, + }, + AwaitingParent::PostGloas(..) => { + let is_full = parent_lookup.is_full_payload(&awaiting); + PeerType { + data: is_full, + payload: is_full, + } + } + }; + self.add_peers_to_lookup_and_ancestors(parent_id, peers, &peer_type, cx) } else { Err(format!("Lookup references unknown parent {parent_root:?}")) } diff --git a/beacon_node/network/src/sync/block_lookups/parent_chain.rs b/beacon_node/network/src/sync/block_lookups/parent_chain.rs index 5deea1dd94e..120ce5b1cc2 100644 --- a/beacon_node/network/src/sync/block_lookups/parent_chain.rs +++ b/beacon_node/network/src/sync/block_lookups/parent_chain.rs @@ -13,7 +13,7 @@ impl From<&SingleBlockLookup> for Node { fn from(value: &SingleBlockLookup) -> Self { Self { block_root: value.block_root(), - parent_root: value.awaiting_parent(), + parent_root: value.awaiting_parent().map(|a| a.parent_root()), } } } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 919526c2386..3a8420a8259 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,30 +1,53 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; -use crate::sync::block_lookups::common::RequestState; +use crate::sync::manager::BlockProcessType; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, }; -use beacon_chain::{BeaconChainTypes, BlockProcessStatus}; +use beacon_chain::BeaconChainTypes; +use beacon_chain::block_verification_types::AsBlock; use educe::Educe; use lighthouse_network::service::api_types::Id; use parking_lot::RwLock; use std::collections::HashSet; -use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; use store::Hash256; use strum::IntoStaticStr; use tracing::{Span, debug_span}; use types::data::FixedBlobSidecarList; -use types::{DataColumnSidecarList, EthSpec, SignedBeaconBlock, Slot}; +use types::{ + DataColumnSidecarList, EthSpec, ExecutionBlockHash, ForkName, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, +}; -// Dedicated enum for LookupResult to force its usage -#[must_use = "LookupResult must be handled with on_lookup_result"] -pub enum LookupResult { - /// Lookup completed successfully - Completed, - /// Lookup is expecting some future event from the network - Pending, +// === AwaitingParent — tracks what a child lookup waits for === + +#[derive(Debug, Clone, Copy)] +pub enum AwaitingParent { + /// Pre-Gloas: wait for parent lookup to fully complete (block + data) + PreGloas(Hash256), + /// Post-Gloas: also tracks parent_hash to determine full/empty dependency + PostGloas(Hash256, ExecutionBlockHash), +} + +impl AwaitingParent { + pub fn parent_root(&self) -> Hash256 { + match self { + AwaitingParent::PreGloas(root) | AwaitingParent::PostGloas(root, _) => *root, + } + } +} + +// === Public types re-exported by mod.rs === + +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct DownloadResult { + pub value: T, + pub block_root: Hash256, + pub seen_timestamp: Duration, + pub peer_group: PeerGroup, } #[derive(Debug, PartialEq, Eq, IntoStaticStr)] @@ -42,9 +65,6 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed(/* reason: */ String), - /// Received MissingComponents when all components have been processed. This should never - /// happen, and indicates some internal bug - MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, /// Received a download result for a different request id than the in-flight request. @@ -54,42 +74,355 @@ pub enum LookupRequestError { expected_req_id: ReqId, req_id: ReqId, }, + InternalError(String), +} + +// Dedicated enum for LookupResult to force its usage +#[must_use = "LookupResult must be handled with on_lookup_result"] +pub enum LookupResult { + /// Lookup completed successfully + Completed, + /// Lookup is expecting some future event from the network + Pending, + /// Block's parent is not known to fork-choice, a parent lookup is needed + ParentUnknown { + awaiting_parent: AwaitingParent, + block_root: Hash256, + peers: Vec, + }, +} + +// === Block request: Downloading → Downloaded → Processing → Done === + +#[derive(Educe)] +#[educe(Debug)] +enum BlockRequest { + /// Block downloading or awaiting download + Downloading { + block_root: Hash256, + state: SingleLookupRequestState>>, + }, + /// Block downloaded, waiting for parent check + send for processing + Downloaded { + #[educe(Debug(ignore))] + block: Arc>, + peer: PeerId, + }, + /// Block sent for processing, awaiting result + Processing { + #[educe(Debug(ignore))] + block: Arc>, + peer: PeerId, + }, + /// Block processing complete + Done { + #[educe(Debug(ignore))] + block: Arc>, + }, +} + +impl BlockRequest { + fn new(block_root: Hash256) -> Self { + BlockRequest::Downloading { + block_root, + state: SingleLookupRequestState::new(), + } + } + + fn new_with_processing_failures(block_root: Hash256, failed_processing: u8) -> Self { + BlockRequest::Downloading { + block_root, + state: SingleLookupRequestState::new_with_processing_failures(failed_processing), + } + } + + fn peek_block(&self) -> Option<&Arc>> { + match self { + BlockRequest::Downloading { state, .. } => state.peek_downloaded_data(), + BlockRequest::Downloaded { block, .. } + | BlockRequest::Processing { block, .. } + | BlockRequest::Done { block } => Some(block), + } + } + + fn peek_slot(&self) -> Option { + self.peek_block().map(|b| b.slot()) + } + + /// Returns the block peer for error attribution. Available in Downloaded/Processing states. + fn peer(&self) -> Option { + match self { + BlockRequest::Downloaded { peer, .. } | BlockRequest::Processing { peer, .. } => { + Some(*peer) + } + BlockRequest::Downloading { state, .. } => state + .peek_downloaded_peer_group() + .and_then(|pg| pg.all().next().copied()), + BlockRequest::Done { .. } => None, + } + } + + fn is_awaiting_event(&self) -> bool { + match self { + BlockRequest::Downloading { state, .. } => state.is_awaiting_event(), + BlockRequest::Processing { .. } => true, + _ => false, + } + } + + fn is_done(&self) -> bool { + matches!(self, BlockRequest::Done { .. }) + } + + fn insert_verified_response( + &mut self, + result: DownloadResult>>, + ) -> bool { + if let BlockRequest::Downloading { state, .. } = self { + state.insert_verified_response(result) + } else { + false + } + } +} + +// === Data request: WaitingForBlock → Downloading → Downloaded → Processing → Done === + +#[derive(Debug)] +enum DataRequest { + /// Waiting for block to be downloaded to determine what data is needed + WaitingForBlock, + /// Data downloading or awaiting download + Downloading(DataDownload), + /// Data downloaded, waiting for block processing to complete before import + Downloaded { + data: DownloadedData, + peer_group: PeerGroup, + }, + /// Data sent for processing, awaiting result + Processing { + kind: DataDownloadKind, + peer_group: PeerGroup, + }, + /// Data processing complete (or no data needed) + Done, +} + +impl DataRequest { + fn is_awaiting_event(&self) -> bool { + match self { + DataRequest::Downloading(dl) => dl.is_awaiting_event(), + DataRequest::Processing { .. } => true, + _ => false, + } + } + + fn peer_group(&self) -> Option<&PeerGroup> { + match self { + DataRequest::Downloading(dl) => dl.peek_downloaded_peer_group(), + DataRequest::Downloaded { peer_group, .. } + | DataRequest::Processing { peer_group, .. } => Some(peer_group), + DataRequest::WaitingForBlock | DataRequest::Done => None, + } + } +} + +/// Fork-dependent data download state +#[derive(Debug)] +enum DataDownload { + Blobs { + block_root: Hash256, + expected_blobs: usize, + state: SingleLookupRequestState>, + }, + Columns { + block_root: Hash256, + state: SingleLookupRequestState>, + }, +} + +impl DataDownload { + fn continue_requests>( + &mut self, + id: Id, + peers: Arc>>, + cx: &mut SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + match self { + DataDownload::Blobs { + block_root, + expected_blobs, + state, + } => { + let br = *block_root; + let eb = *expected_blobs; + state.make_request(|| cx.blob_lookup_request(id, peers, br, eb))?; + } + DataDownload::Columns { + block_root, state, .. + } => { + let br = *block_root; + state.make_request(|| cx.custody_lookup_request(id, br, peers))?; + } + } + Ok(()) + } + + fn take_download_result(&mut self) -> Option<(DownloadedData, PeerGroup)> { + match self { + DataDownload::Blobs { + expected_blobs, + state, + .. + } => state.take_download_result().map(|r| { + ( + DownloadedData::Blobs { + blobs: r.value, + expected_blobs: *expected_blobs, + }, + r.peer_group, + ) + }), + DataDownload::Columns { state, .. } => state + .take_download_result() + .map(|r| (DownloadedData::Columns(r.value), r.peer_group)), + } + } + + fn is_awaiting_event(&self) -> bool { + match self { + DataDownload::Blobs { state, .. } => state.is_awaiting_event(), + DataDownload::Columns { state, .. } => state.is_awaiting_event(), + } + } + + fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> { + match self { + DataDownload::Blobs { state, .. } => state.peek_downloaded_peer_group(), + DataDownload::Columns { state, .. } => state.peek_downloaded_peer_group(), + } + } +} + +/// Downloaded data, waiting to be sent for processing +#[derive(Debug)] +enum DownloadedData { + Blobs { + blobs: FixedBlobSidecarList, + expected_blobs: usize, + }, + Columns(DataColumnSidecarList), +} + +impl DownloadedData { + fn kind(&self) -> DataDownloadKind { + match self { + DownloadedData::Blobs { expected_blobs, .. } => DataDownloadKind::Blobs { + expected_blobs: *expected_blobs, + }, + DownloadedData::Columns(_) => DataDownloadKind::Columns, + } + } } +/// Enough info to reconstruct a fresh DataDownload on reset +#[derive(Debug, Clone, Copy)] +enum DataDownloadKind { + Blobs { expected_blobs: usize }, + Columns, +} + +impl DataDownloadKind { + fn into_fresh_download(self, block_root: Hash256) -> DataDownload { + match self { + DataDownloadKind::Blobs { expected_blobs } => DataDownload::Blobs { + block_root, + expected_blobs, + state: SingleLookupRequestState::new(), + }, + DataDownloadKind::Columns => DataDownload::Columns { + block_root, + state: SingleLookupRequestState::new(), + }, + } + } +} + +// === Payload request: WaitingForBlock → Downloading → Downloaded → Processing → Done === + +#[derive(Educe)] +#[educe(Debug)] +enum PayloadRequest { + /// Waiting for block to be downloaded to determine if payload is needed + WaitingForBlock, + Downloading { + block_root: Hash256, + state: SingleLookupRequestState>>, + }, + Downloaded { + peer_group: PeerGroup, + }, + Processing { + peer_group: PeerGroup, + }, + /// Done (payload processed, or no payload needed) + Done, +} + +impl PayloadRequest { + fn is_awaiting_event(&self) -> bool { + match self { + PayloadRequest::Downloading { state, .. } => state.is_awaiting_event(), + PayloadRequest::Processing { .. } => true, + _ => false, + } + } +} + +// === SingleBlockLookup — three independent requests === + #[derive(Educe)] #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { pub id: Id, - pub block_request_state: BlockRequestState, - pub component_requests: ComponentRequests, - /// Peers that claim to have imported this set of block components. This state is shared with - /// the custody request to have an updated view of the peers that claim to have imported the - /// block associated with this lookup. The peer set of a lookup can change rapidly, and faster - /// than the lifetime of a custody request. + block_root: Hash256, + + // Block request — always present + block_request: BlockRequest, + + // Data request — starts as WaitingForBlock, set after block downloaded + data_request: DataRequest, + + // Payload request — starts as WaitingForBlock, set after block downloaded + payload_request: PayloadRequest, + + // Peer sets + /// Peers for block download (also used for data in pre-Gloas forks) #[educe(Debug(method(fmt_peer_set_as_len)))] peers: Arc>>, - block_root: Hash256, - awaiting_parent: Option, + /// Peers for data download (0 initially for Gloas, shared with block for pre-Gloas) + #[educe(Debug(method(fmt_peer_set_as_len)))] + data_peers: Arc>>, + /// Peers for payload download (0 initially, Gloas only) + #[educe(Debug(method(fmt_peer_set_as_len)))] + payload_peers: Arc>>, + + // Parent tracking + awaiting_parent: Option, created: Instant, pub(crate) span: Span, -} -#[derive(Debug)] -pub(crate) enum ComponentRequests { - WaitingForBlock, - ActiveBlobRequest(BlobRequestState, usize), - ActiveCustodyRequest(CustodyRequestState), - // When printing in debug this state display the reason why it's not needed - #[allow(dead_code)] - NotNeeded(&'static str), + // Retry tracking + failed_processing: u8, } impl SingleBlockLookup { pub fn new( requested_block_root: Hash256, peers: &[PeerId], + peer_type: &PeerType, id: Id, - awaiting_parent: Option, + awaiting_parent: Option, ) -> Self { let lookup_span = debug_span!( "lh_single_block_lookup", @@ -97,30 +430,64 @@ impl SingleBlockLookup { id = id, ); + let peer_set: HashSet = peers.iter().copied().collect(); + let data_peers = if peer_type.data { + peer_set.clone() + } else { + HashSet::new() + }; + let payload_peers = if peer_type.payload { + peer_set.clone() + } else { + HashSet::new() + }; + Self { id, - block_request_state: BlockRequestState::new(requested_block_root), - component_requests: ComponentRequests::WaitingForBlock, - peers: Arc::new(RwLock::new(HashSet::from_iter(peers.iter().copied()))), block_root: requested_block_root, + block_request: BlockRequest::new(requested_block_root), + data_request: DataRequest::WaitingForBlock, + payload_request: PayloadRequest::WaitingForBlock, + data_peers: Arc::new(RwLock::new(data_peers)), + payload_peers: Arc::new(RwLock::new(payload_peers)), + peers: Arc::new(RwLock::new(peer_set)), awaiting_parent, created: Instant::now(), + failed_processing: 0, span: lookup_span, } } - /// Reset the status of all internal requests + pub fn is_full_payload(&self, awaiting_parent: &AwaitingParent) -> bool { + match awaiting_parent { + AwaitingParent::PreGloas(_) => false, + AwaitingParent::PostGloas(_, parent_hash) => { + match self.block_request.peek_block() { + Some(block) => match block.message().body().signed_execution_payload_bid() { + Ok(payload) => payload.message.block_hash == *parent_hash, + Err(_) => false, + }, + // TODO(gloas): We should cache peers instead of dropping if we don't know + None => false, + } + } + } + } + + /// Reset the status of all requests (used on block processing failure) pub fn reset_requests(&mut self) { - self.block_request_state = BlockRequestState::new(self.block_root); - self.component_requests = ComponentRequests::WaitingForBlock; + // Increment processing failure counter (we're resetting due to processing error) + self.failed_processing = self.failed_processing.saturating_add(1); + // Reset to fresh Downloading state with the updated counter + self.block_request = + BlockRequest::new_with_processing_failures(self.block_root, self.failed_processing); + self.data_request = DataRequest::WaitingForBlock; + self.payload_request = PayloadRequest::WaitingForBlock; } - /// Return the slot of this lookup's block if it's currently cached as `AwaitingProcessing` + /// Return the slot of this lookup's block if it's currently cached pub fn peek_downloaded_block_slot(&self) -> Option { - self.block_request_state - .state - .peek_downloaded_data() - .map(|block| block.slot()) + self.block_request.peek_slot() } /// Get the block root that is being requested. @@ -128,16 +495,10 @@ impl SingleBlockLookup { self.block_root } - pub fn awaiting_parent(&self) -> Option { + pub fn awaiting_parent(&self) -> Option { self.awaiting_parent } - /// Mark this lookup as awaiting a parent lookup from being processed. Meanwhile don't send - /// components for processing. - pub fn set_awaiting_parent(&mut self, parent_root: Hash256) { - self.awaiting_parent = Some(parent_root) - } - /// Mark this lookup as no longer awaiting a parent lookup. Components can be sent for /// processing. pub fn resolve_awaiting_parent(&mut self) { @@ -152,15 +513,10 @@ impl SingleBlockLookup { /// Maybe insert a verified response into this lookup. Returns true if imported pub fn add_child_components(&mut self, block_component: BlockComponent) -> bool { match block_component { - BlockComponent::Block(block) => self - .block_request_state - .state - .insert_verified_response(block), + BlockComponent::Block(block) => self.block_request.insert_verified_response(block), BlockComponent::Blob(_) | BlockComponent::DataColumn(_) => { - // For now ignore single blobs and columns, as the blob request state assumes all blobs are - // attributed to the same peer = the peer serving the remaining blobs. Ignoring this - // block component has a minor effect, causing the node to re-request this blob - // once the parent chain is successfully resolved + // For now ignore single blobs and columns, as the blob request state assumes all + // blobs are attributed to the same peer = the peer serving the remaining blobs. false } } @@ -171,36 +527,26 @@ impl SingleBlockLookup { self.block_root() == block_root } - /// Returns true if the block has already been downloaded. - pub fn all_components_processed(&self) -> bool { - self.block_request_state.state.is_processed() - && match &self.component_requests { - ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => request.state.is_processed(), - ComponentRequests::ActiveCustodyRequest(request) => request.state.is_processed(), - ComponentRequests::NotNeeded { .. } => true, - } - } - /// Returns true if this request is expecting some event to make progress pub fn is_awaiting_event(&self) -> bool { self.awaiting_parent.is_some() - || self.block_request_state.state.is_awaiting_event() - || match &self.component_requests { - // If components are waiting for the block request to complete, here we should - // check if the`block_request_state.state.is_awaiting_event(). However we already - // checked that above, so `WaitingForBlock => false` is equivalent. - ComponentRequests::WaitingForBlock => false, - ComponentRequests::ActiveBlobRequest(request, _) => { - request.state.is_awaiting_event() - } - ComponentRequests::ActiveCustodyRequest(request) => { - request.state.is_awaiting_event() - } - ComponentRequests::NotNeeded { .. } => false, - } + || self.block_request.is_awaiting_event() + || self.data_request.is_awaiting_event() + || self.payload_request.is_awaiting_event() } + /// Returns the block peer if block has been downloaded. Used for peer penalization. + pub fn block_peer(&self) -> Option { + self.block_request.peer() + } + + /// Returns custody column peer group if data has been downloaded. Used for peer penalization. + pub fn data_peer_group(&self) -> Option<&PeerGroup> { + self.data_request.peer_group() + } + + // -- Main state machine driver -- + /// Makes progress on all requests of this lookup. Any error is not recoverable and must result /// in dropping the lookup. May mark the lookup as completed. pub fn continue_requests( @@ -208,147 +554,486 @@ impl SingleBlockLookup { cx: &mut SyncNetworkContext, ) -> Result { let _guard = self.span.clone().entered(); - // TODO: Check what's necessary to download, specially for blobs - self.continue_request::>(cx, 0)?; - - if let ComponentRequests::WaitingForBlock = self.component_requests { - let downloaded_block = self - .block_request_state - .state - .peek_downloaded_data() - .cloned(); - - if let Some(block) = downloaded_block.or_else(|| { - // If the block is already being processed or fully validated, retrieve how many blobs - // it expects. Consider any stage of the block. If the block root has been validated, we - // can assert that this is the correct value of `blob_kzg_commitments_count`. - match cx.chain.get_block_process_status(&self.block_root) { - BlockProcessStatus::Unknown => None, - BlockProcessStatus::NotValidated(block, _) - | BlockProcessStatus::ExecutionValidated(block) => Some(block.clone()), + let id = self.id; + + // === Block request === + loop { + match &mut self.block_request { + BlockRequest::Downloading { block_root, state } => { + let peers = self.peers.clone(); + let br = *block_root; + state.make_request(|| cx.block_lookup_request(id, peers, br))?; + + if let Some(result) = state.take_download_result() { + // Block downloaded :) Transition state and break out of loop + let peer = result.peer_group.all().next().copied().ok_or_else(|| { + LookupRequestError::BadState("block download has no peer".into()) + })?; + self.block_request = BlockRequest::Downloaded { + block: result.value, + peer, + }; + } else { + // Awaiting download + break; + } } - }) { - let expected_blobs = block.num_expected_blobs(); - let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); - if expected_blobs == 0 { - self.component_requests = ComponentRequests::NotNeeded("no data"); - } else if cx.chain.should_fetch_blobs(block_epoch) { - self.component_requests = ComponentRequests::ActiveBlobRequest( - BlobRequestState::new(self.block_root), - expected_blobs, - ); - } else if cx.chain.should_fetch_custody_columns(block_epoch) { - self.component_requests = ComponentRequests::ActiveCustodyRequest( - CustodyRequestState::new(self.block_root), - ); - } else { - self.component_requests = ComponentRequests::NotNeeded("outside da window"); + BlockRequest::Downloaded { block, peer } => { + if self.awaiting_parent.is_some() { + break; + } + + let parent_root = block.parent_root(); + // Zero hash is the parent of the genesis block — not a real block. + if parent_root != Hash256::ZERO { + let Some(_parent_proto_block) = cx + .chain + .canonical_head + .fork_choice_read_lock() + .get_block(&parent_root) + else { + let awaiting_parent = if let Ok(bid) = + block.message().body().signed_execution_payload_bid() + { + AwaitingParent::PostGloas( + parent_root, + bid.message.parent_block_hash, + ) + } else { + AwaitingParent::PreGloas(parent_root) + }; + self.awaiting_parent = Some(awaiting_parent); + return Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root: self.block_root, + peers: self.all_peers(), + }); + }; + // post-gloas we need to also check if the envelope is known to fork choice + if let Ok(child_bid) = block.message().body().signed_execution_payload_bid() + { + // TODO(gloas): after fork-choice: use parent_proto_block.execution_payload_block_hash here + let parent_is_full = cx + .chain + .get_blinded_block(&parent_root) + .map(|maybe_parent_block| { + if let Some(parent_block) = maybe_parent_block { + parent_block + .message() + .body() + .signed_execution_payload_bid() + .map(|parent_bid| { + parent_bid.message.block_hash + == child_bid.message.parent_block_hash + }) + .unwrap_or(false) + } else { + false + } + }) + .unwrap_or(false); + + if parent_is_full + && !cx.chain.envelope_is_known_to_fork_choice(&parent_root) + { + let awaiting_parent = AwaitingParent::PostGloas( + parent_root, + child_bid.message.parent_block_hash, + ); + self.awaiting_parent = Some(awaiting_parent); + return Ok(LookupResult::ParentUnknown { + awaiting_parent, + block_root: self.block_root, + peers: self.all_peers(), + }); + } + } + } + + let block = block.clone(); + let peer = *peer; + cx.send_block_for_processing( + id, + self.block_root, + block.clone(), + Duration::ZERO, + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + self.block_request = BlockRequest::Processing { block, peer }; } - } else { - // Wait to download the block before downloading blobs. Then we can be sure that the - // block has data, so there's no need to do "blind" requests for all possible blobs and - // latter handle the case where if the peer sent no blobs, penalize. - // - // Lookup sync event safety: Reaching this code means that a block is not in any pre-import - // cache nor in the request state of this lookup. Therefore, the block must either: (1) not - // be downloaded yet or (2) the block is already imported into the fork-choice. - // In case (1) the lookup must either successfully download the block or get dropped. - // In case (2) the block will be downloaded, processed, reach `DuplicateFullyImported` - // and get dropped as completed. + BlockRequest::Processing { .. } | BlockRequest::Done { .. } => break, } } - match &self.component_requests { - ComponentRequests::WaitingForBlock => {} // do nothing - ComponentRequests::ActiveBlobRequest(_, expected_blobs) => { - self.continue_request::>(cx, *expected_blobs)? + // === Data request === + loop { + match &mut self.data_request { + DataRequest::WaitingForBlock => { + if let Some(block) = self.block_request.peek_block() { + let block = block.clone(); + self.create_data_request_for_block(&block, cx); + } else { + // Wait for block to be downloaded + break; + } + } + DataRequest::Downloading(dl) => { + if !self.data_peers.read().is_empty() { + dl.continue_requests(id, self.data_peers.clone(), cx)?; + } + if let Some((data, peer_group)) = dl.take_download_result() { + self.data_request = DataRequest::Downloaded { data, peer_group }; + } else { + // Wait for data to be downloaded + break; + } + } + DataRequest::Downloaded { data, peer_group } => { + if !self.block_request.is_done() { + // Wait for block to be imported before importing data + break; + } + match data { + DownloadedData::Blobs { blobs, .. } => { + cx.send_blobs_for_processing( + id, + self.block_root, + blobs.clone(), + Duration::ZERO, + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + DownloadedData::Columns(columns) => { + cx.send_custody_columns_for_processing( + id, + self.block_root, + columns.clone(), + Duration::ZERO, + BlockProcessType::SingleCustodyColumn(id), + ) + .map_err(LookupRequestError::SendFailedProcessor)?; + } + } + let kind = data.kind(); + let peer_group = peer_group.clone(); + self.data_request = DataRequest::Processing { kind, peer_group }; + } + DataRequest::Processing { .. } | DataRequest::Done => break, } - ComponentRequests::ActiveCustodyRequest(_) => { - self.continue_request::>(cx, 0)? + } + + // === Payload request === + loop { + match &mut self.payload_request { + PayloadRequest::WaitingForBlock => { + if let Some(block) = self.block_request.peek_block() { + let block = block.clone(); + self.create_payload_request_for_block(&block, cx); + } else { + break; + } + } + PayloadRequest::Downloading { block_root, state } => { + if !self.payload_peers.read().is_empty() { + let peers = self.payload_peers.clone(); + let br = *block_root; + match cx.payload_lookup_request(id, peers, br) { + Ok(LookupRequestResult::RequestSent(req_id)) => { + state.on_download_start(req_id)?; + } + Ok(LookupRequestResult::NoRequestNeeded(_reason)) => { + // Payload RPC not wired yet — skip download, mark as done + self.payload_request = PayloadRequest::Done; + continue; + } + Ok(LookupRequestResult::Pending(reason)) => { + state.update_awaiting_download_status(reason); + } + Err(e) => { + return Err(LookupRequestError::SendFailedNetwork(e)); + } + } + } + if let Some(result) = state.take_download_result() { + self.payload_request = PayloadRequest::Downloaded { + peer_group: result.peer_group, + }; + } else { + break; + } + } + PayloadRequest::Downloaded { peer_group } => { + if !self.block_request.is_done() { + break; + } + // TODO(gloas): send payload for processing + // cx.send_payload_for_processing(...) + let peer_group = peer_group.clone(); + self.payload_request = PayloadRequest::Processing { peer_group }; + } + PayloadRequest::Processing { .. } | PayloadRequest::Done => break, } - ComponentRequests::NotNeeded { .. } => {} // do nothing } - // If all components of this lookup are already processed, there will be no future events - // that can make progress so it must be dropped. Consider the lookup completed. - // This case can happen if we receive the components from gossip during a retry. - if self.all_components_processed() { - self.span = Span::none(); - Ok(LookupResult::Completed) - } else { - Ok(LookupResult::Pending) + // === Check completion === + if self.block_request.is_done() + && matches!(self.data_request, DataRequest::Done) + && matches!(self.payload_request, PayloadRequest::Done) + { + return Ok(LookupResult::Completed); } + + Ok(LookupResult::Pending) } - /// Potentially makes progress on this request if it's in a progress-able state - fn continue_request>( + /// Create data request based on the downloaded block's content and fork. + fn create_data_request_for_block( &mut self, - cx: &mut SyncNetworkContext, - expected_blobs: usize, - ) -> Result<(), LookupRequestError> { - let id = self.id; - let awaiting_parent = self.awaiting_parent.is_some(); - let request = - R::request_state_mut(self).map_err(|e| LookupRequestError::BadState(e.to_owned()))?; - - // Attempt to progress awaiting downloads - if request.get_state().is_awaiting_download() { - // Verify the current request has not exceeded the maximum number of attempts. - let request_state = request.get_state(); - if request_state.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { - let cannot_process = request_state.more_failed_processing_attempts(); - return Err(LookupRequestError::TooManyAttempts { cannot_process }); - } - - let peers = self.peers.clone(); - let request = R::request_state_mut(self) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))?; + block: &Arc>, + cx: &SyncNetworkContext, + ) { + let expected_blobs = block.num_expected_blobs(); + let block_fork = cx.chain.spec.fork_name_at_slot::(block.slot()); - match request.make_request(id, peers, expected_blobs, cx)? { - LookupRequestResult::RequestSent(req_id) => { - // Lookup sync event safety: If make_request returns `RequestSent`, we are - // guaranteed that `BlockLookups::on_download_response` will be called exactly - // with this `req_id`. - request.get_state_mut().on_download_start(req_id)? + match block_fork { + ForkName::Base | ForkName::Altair | ForkName::Bellatrix | ForkName::Capella => { + self.data_request = DataRequest::Done; + } + ForkName::Deneb | ForkName::Electra => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Blobs { + block_root: self.block_root, + expected_blobs, + state: SingleLookupRequestState::new(), + }); + // Pre-Gloas: data peers = block peers (always need data with block) + self.data_peers = self.peers.clone(); + } else { + self.data_request = DataRequest::Done; } - LookupRequestResult::NoRequestNeeded(reason) => { - // Lookup sync event safety: Advances this request to the terminal `Processed` - // state. If all requests reach this state, the request is marked as completed - // in `Self::continue_requests`. - request.get_state_mut().on_completed_request(reason)? + } + ForkName::Fulu => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Columns { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }); + // Pre-Gloas: data peers = block peers + self.data_peers = self.peers.clone(); + } else { + self.data_request = DataRequest::Done; } - // Sync will receive a future event to make progress on the request, do nothing now - LookupRequestResult::Pending(reason) => { - // Lookup sync event safety: Refer to the code paths constructing - // `LookupRequestResult::Pending` - request - .get_state_mut() - .update_awaiting_download_status(reason); - return Ok(()); + } + ForkName::Gloas => { + if expected_blobs > 0 { + self.data_request = DataRequest::Downloading(DataDownload::Columns { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }); + // Gloas: data peers start at 0, populated when children arrive + } else { + self.data_request = DataRequest::Done; } } + } + } + + /// Create payload request based on the downloaded block's content and fork. + fn create_payload_request_for_block( + &mut self, + block: &Arc>, + cx: &SyncNetworkContext, + ) { + let expected_blobs = block.num_expected_blobs(); + let block_fork = cx.chain.spec.fork_name_at_slot::(block.slot()); - // Otherwise, attempt to progress awaiting processing - // If this request is awaiting a parent lookup to be processed, do not send for processing. - // The request will be rejected with unknown parent error. - } else if !awaiting_parent { - // maybe_start_processing returns Some if state == AwaitingProcess. This pattern is - // useful to conditionally access the result data. - if let Some(result) = request.get_state_mut().maybe_start_processing() { - // Lookup sync event safety: If `send_for_processing` returns Ok() we are guaranteed - // that `BlockLookups::on_processing_result` will be called exactly once with this - // lookup_id - return R::send_for_processing(id, result, cx); + match block_fork { + ForkName::Base + | ForkName::Altair + | ForkName::Bellatrix + | ForkName::Capella + | ForkName::Deneb + | ForkName::Electra + | ForkName::Fulu => { + self.payload_request = PayloadRequest::Done; + } + ForkName::Gloas => { + if expected_blobs > 0 { + self.payload_request = PayloadRequest::Downloading { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }; + // Payload peers start at 0, download gated until children provide peers + } else { + // Empty blocks have no payload and no data — both are Done + self.payload_request = PayloadRequest::Done; + } } - // Lookup sync event safety: If the request is not in `AwaitingDownload` or - // `AwaitingProcessing` state it is guaranteed to receive some event to make progress. } + } - // Lookup sync event safety: If a lookup is awaiting a parent we are guaranteed to either: - // (1) attempt to make progress with `BlockLookups::continue_child_lookups` if the parent - // lookup completes, or (2) get dropped if the parent fails and is dropped. + // -- Processing result handlers -- - Ok(()) + /// Handle block processing result. Advances the lookup state machine. + pub fn on_block_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + let BlockRequest::Processing { block, .. } = &self.block_request else { + return Err(LookupRequestError::BadState( + "block processing result but not in Processing state".to_owned(), + )); + }; + if result_is_ok { + let block = block.clone(); + self.block_request = BlockRequest::Done { block }; + self.continue_requests(cx) + } else { + // Block processing failed — reset everything and retry from scratch + self.reset_requests(); + self.continue_requests(cx) + } + } + + /// Handle data processing result (blobs or custody columns imported). + pub fn on_data_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + if !matches!(self.data_request, DataRequest::Processing { .. }) { + return Err(LookupRequestError::BadState( + "data processing result but not in Processing state".to_owned(), + )); + } + if result_is_ok { + self.data_request = DataRequest::Done; + self.continue_requests(cx) + } else { + // Data processing failed — retry data download only + self.reset_data_request(); + self.continue_requests(cx) + } + } + + /// Handle payload processing result. + #[allow(dead_code)] + pub fn on_payload_processing_result( + &mut self, + result_is_ok: bool, + cx: &mut SyncNetworkContext, + ) -> Result { + if !matches!(self.payload_request, PayloadRequest::Processing { .. }) { + return Err(LookupRequestError::BadState( + "payload processing result but not in Processing state".to_owned(), + )); + } + if result_is_ok { + self.payload_request = PayloadRequest::Done; + self.continue_requests(cx) + } else { + self.payload_request = PayloadRequest::Downloading { + block_root: self.block_root, + state: SingleLookupRequestState::new(), + }; + self.continue_requests(cx) + } + } + + /// Reset data request to a fresh download, preserving the download kind. + fn reset_data_request(&mut self) { + let kind = match &self.data_request { + DataRequest::Downloading(dl) => match dl { + DataDownload::Blobs { expected_blobs, .. } => Some(DataDownloadKind::Blobs { + expected_blobs: *expected_blobs, + }), + DataDownload::Columns { .. } => Some(DataDownloadKind::Columns), + }, + DataRequest::Downloaded { data, .. } => Some(data.kind()), + DataRequest::Processing { kind, .. } => Some(*kind), + DataRequest::WaitingForBlock | DataRequest::Done => None, + }; + if let Some(kind) = kind { + self.data_request = DataRequest::Downloading(kind.into_fresh_download(self.block_root)); + } + } + + // -- Download response handlers -- + + /// Handle a block download response. Updates download state and advances the lookup. + #[allow(clippy::type_complexity)] + pub fn on_block_download_response( + &mut self, + req_id: ReqId, + result: Result<(Arc>, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let BlockRequest::Downloading { state, .. } = &mut self.block_request else { + return Err(LookupRequestError::BadState( + "block response but not downloading".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a blob download response. Updates download state and advances the lookup. + pub fn on_blob_download_response( + &mut self, + req_id: ReqId, + result: Result<(FixedBlobSidecarList, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let DataRequest::Downloading(DataDownload::Blobs { state, .. }) = &mut self.data_request + else { + return Err(LookupRequestError::BadState( + "blob response but not downloading blobs".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a custody columns download response. Updates download state and advances the lookup. + pub fn on_custody_download_response( + &mut self, + req_id: ReqId, + result: Result<(DataColumnSidecarList, PeerGroup, Duration), ()>, + cx: &mut SyncNetworkContext, + ) -> Result { + let DataRequest::Downloading(DataDownload::Columns { state, .. }) = &mut self.data_request + else { + return Err(LookupRequestError::BadState( + "custody response but not downloading columns".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) + } + + /// Handle a payload envelope download response. Updates download state and advances the lookup. + #[allow(clippy::type_complexity)] + pub fn on_payload_download_response( + &mut self, + req_id: ReqId, + result: Result< + ( + Arc>, + PeerGroup, + Duration, + ), + (), + >, + cx: &mut SyncNetworkContext, + ) -> Result { + let PayloadRequest::Downloading { state, .. } = &mut self.payload_request else { + return Err(LookupRequestError::BadState( + "payload envelope response but not downloading payload".to_owned(), + )); + }; + state.on_download_response(req_id, self.block_root, result)?; + self.continue_requests(cx) } /// Get all unique peers that claim to have imported this set of block components @@ -357,14 +1042,24 @@ impl SingleBlockLookup { } /// Add peer to all request states. The peer must be able to serve this request. - /// Returns true if the peer was newly inserted into some request state. - pub fn add_peer(&mut self, peer_id: PeerId) -> bool { - self.peers.write().insert(peer_id) + /// Returns true if the peer was newly inserted into any peer set. + pub fn add_peer(&mut self, peer_id: PeerId, peer_type: &PeerType) -> bool { + let mut added = false; + if peer_type.payload { + added |= self.payload_peers.write().insert(peer_id); + } + if peer_type.data { + added |= self.data_peers.write().insert(peer_id); + } + added |= self.peers.write().insert(peer_id); + added } /// Remove peer from available peers. pub fn remove_peer(&mut self, peer_id: &PeerId) { self.peers.write().remove(peer_id); + self.data_peers.write().remove(peer_id); + self.payload_peers.write().remove(peer_id); } /// Returns true if this lookup has zero peers @@ -373,171 +1068,120 @@ impl SingleBlockLookup { } } -/// The state of the blob request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct BlobRequestState { - #[educe(Debug(ignore))] - pub block_root: Hash256, - pub state: SingleLookupRequestState>, -} - -impl BlobRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - block_root, - state: SingleLookupRequestState::new(), - } - } -} - -/// The state of the custody request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct CustodyRequestState { - #[educe(Debug(ignore))] - pub block_root: Hash256, - pub state: SingleLookupRequestState>, -} - -impl CustodyRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - block_root, - state: SingleLookupRequestState::new(), - } - } -} - -/// The state of the block request component of a `SingleBlockLookup`. -#[derive(Educe)] -#[educe(Debug)] -pub struct BlockRequestState { - #[educe(Debug(ignore))] - pub requested_block_root: Hash256, - pub state: SingleLookupRequestState>>, -} - -impl BlockRequestState { - pub fn new(block_root: Hash256) -> Self { - Self { - requested_block_root: block_root, - state: SingleLookupRequestState::new(), - } - } +pub struct PeerType { + pub data: bool, + pub payload: bool, } -#[derive(Debug, Clone)] -pub struct DownloadResult { - pub value: T, - pub block_root: Hash256, - pub seen_timestamp: Duration, - pub peer_group: PeerGroup, -} +// === Generic download state machine === #[derive(IntoStaticStr)] -pub enum State { +enum DownloadState { AwaitingDownload(/* reason */ &'static str), Downloading(ReqId), - AwaitingProcess(DownloadResult), - /// Request is processing, sent by lookup sync - Processing(DownloadResult), - /// Request is processed - Processed(/* reason */ &'static str), + Downloaded(DownloadResult), + /// Download completed with no request needed (e.g. all components already imported) + Completed(/* reason */ &'static str), } /// Object representing the state of a single block or blob lookup request. #[derive(Debug)] -pub struct SingleLookupRequestState { - /// State of this request. - state: State, - /// How many times have we attempted to process this block or blob. +struct SingleLookupRequestState { + state: DownloadState, failed_processing: u8, - /// How many times have we attempted to download this block or blob. failed_downloading: u8, } impl SingleLookupRequestState { - pub fn new() -> Self { + fn new() -> Self { Self { - state: State::AwaitingDownload("not started"), + state: DownloadState::AwaitingDownload("not started"), failed_processing: 0, failed_downloading: 0, } } - pub fn is_awaiting_download(&self) -> bool { - match self.state { - State::AwaitingDownload { .. } => true, - State::Downloading { .. } - | State::AwaitingProcess { .. } - | State::Processing { .. } - | State::Processed { .. } => false, + fn new_with_processing_failures(failed_processing: u8) -> Self { + Self { + state: DownloadState::AwaitingDownload("reset after processing failure"), + failed_processing, + failed_downloading: 0, } } - pub fn is_processed(&self) -> bool { - match self.state { - State::AwaitingDownload { .. } - | State::Downloading { .. } - | State::AwaitingProcess { .. } - | State::Processing { .. } => false, - State::Processed { .. } => true, + fn is_awaiting_download(&self) -> bool { + matches!(self.state, DownloadState::AwaitingDownload { .. }) + } + + /// Drive download: check max attempts, issue request, handle result. + fn make_request( + &mut self, + request_fn: impl FnOnce() -> Result, + ) -> Result<(), LookupRequestError> { + if !self.is_awaiting_download() { + return Ok(()); + } + if self.failed_attempts() >= SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS { + let cannot_process = self.more_failed_processing_attempts(); + return Err(LookupRequestError::TooManyAttempts { cannot_process }); } + match request_fn().map_err(LookupRequestError::SendFailedNetwork)? { + LookupRequestResult::RequestSent(req_id) => self.on_download_start(req_id)?, + LookupRequestResult::NoRequestNeeded(reason) => self.on_completed_request(reason)?, + LookupRequestResult::Pending(reason) => self.update_awaiting_download_status(reason), + } + Ok(()) } - /// Returns true if we can expect some future event to progress this block component request - /// specifically. - pub fn is_awaiting_event(&self) -> bool { - match self.state { - // No event will progress this request specifically, but the request may be put on hold - // due to some external event - State::AwaitingDownload { .. } => false, - // Network will emit a download success / error event - State::Downloading { .. } => true, - // Not awaiting any external event - State::AwaitingProcess { .. } => false, - // Beacon processor will emit a processing result event - State::Processing { .. } => true, - // Request complete, no future event left - State::Processed { .. } => false, - } - } - - pub fn peek_downloaded_data(&self) -> Option<&T> { + fn is_awaiting_event(&self) -> bool { + matches!(self.state, DownloadState::Downloading { .. }) + } + + fn peek_downloaded_data(&self) -> Option<&T> { + match &self.state { + DownloadState::Downloaded(data) => Some(&data.value), + _ => None, + } + } + + fn peek_downloaded_peer_group(&self) -> Option<&PeerGroup> { match &self.state { - State::AwaitingDownload { .. } => None, - State::Downloading { .. } => None, - State::AwaitingProcess(result) => Some(&result.value), - State::Processing(result) => Some(&result.value), - State::Processed { .. } => None, + DownloadState::Downloaded(data) => Some(&data.peer_group), + _ => None, + } + } + + /// Take the download result out, transitioning back to AwaitingDownload. + /// Returns None if not in Downloaded state. + fn take_download_result(&mut self) -> Option> { + let old = std::mem::replace(&mut self.state, DownloadState::AwaitingDownload("taken")); + if let DownloadState::Downloaded(result) = old { + Some(result) + } else { + self.state = old; + None } } - /// Switch to `AwaitingProcessing` if the request is in `AwaitingDownload` state, otherwise - /// ignore. - pub fn insert_verified_response(&mut self, result: DownloadResult) -> bool { - if let State::AwaitingDownload { .. } = &self.state { - self.state = State::AwaitingProcess(result); + fn insert_verified_response(&mut self, result: DownloadResult) -> bool { + if let DownloadState::AwaitingDownload { .. } = &self.state { + self.state = DownloadState::Downloaded(result); true } else { false } } - /// Append metadata on why this request is in AwaitingDownload status. Very helpful to debug - /// stuck lookups. Not fallible as it's purely informational. - pub fn update_awaiting_download_status(&mut self, new_status: &'static str) { - if let State::AwaitingDownload(status) = &mut self.state { - *status = new_status + fn update_awaiting_download_status(&mut self, new_status: &'static str) { + if let DownloadState::AwaitingDownload(status) = &mut self.state { + *status = new_status; } } - /// Switch to `Downloading` if the request is in `AwaitingDownload` state, otherwise returns None. - pub fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + fn on_download_start(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - State::AwaitingDownload { .. } => { - self.state = State::Downloading(req_id); + DownloadState::AwaitingDownload { .. } => { + self.state = DownloadState::Downloading(req_id); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -546,11 +1190,30 @@ impl SingleLookupRequestState { } } - /// Registers a failure in downloading a block. This might be a peer disconnection or a wrong - /// block. - pub fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { + /// Handle a download response: dispatch success or failure based on result. + fn on_download_response( + &mut self, + req_id: ReqId, + block_root: Hash256, + result: Result<(T, PeerGroup, Duration), ()>, + ) -> Result<(), LookupRequestError> { + match result { + Ok((value, peer_group, seen_timestamp)) => self.on_download_success( + req_id, + DownloadResult { + value, + block_root, + seen_timestamp, + peer_group, + }, + ), + Err(()) => self.on_download_failure(req_id), + } + } + + fn on_download_failure(&mut self, req_id: ReqId) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading(expected_req_id) => { + DownloadState::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, @@ -558,7 +1221,7 @@ impl SingleLookupRequestState { }); } self.failed_downloading = self.failed_downloading.saturating_add(1); - self.state = State::AwaitingDownload("not started"); + self.state = DownloadState::AwaitingDownload("not started"); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -567,20 +1230,20 @@ impl SingleLookupRequestState { } } - pub fn on_download_success( + fn on_download_success( &mut self, req_id: ReqId, result: DownloadResult, ) -> Result<(), LookupRequestError> { match &self.state { - State::Downloading(expected_req_id) => { + DownloadState::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(LookupRequestError::UnexpectedRequestId { expected_req_id: *expected_req_id, req_id, }); } - self.state = State::AwaitingProcess(result); + self.state = DownloadState::Downloaded(result); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -589,65 +1252,10 @@ impl SingleLookupRequestState { } } - /// Switch to `Processing` if the request is in `AwaitingProcess` state, otherwise returns None. - pub fn maybe_start_processing(&mut self) -> Option> { - // For 2 lines replace state with placeholder to gain ownership of `result` - match &self.state { - State::AwaitingProcess(result) => { - let result = result.clone(); - self.state = State::Processing(result.clone()); - Some(result) - } - _ => None, - } - } - - /// Revert into `AwaitingProcessing`, if the payload if not invalid and can be submitted for - /// processing latter. - pub fn revert_to_awaiting_processing(&mut self) -> Result<(), LookupRequestError> { - match &self.state { - State::Processing(result) => { - self.state = State::AwaitingProcess(result.clone()); - Ok(()) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on revert_to_awaiting_processing expected Processing got {other}" - ))), - } - } - - /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { - match &self.state { - State::Processing(result) => { - let peers_source = result.peer_group.clone(); - self.failed_processing = self.failed_processing.saturating_add(1); - self.state = State::AwaitingDownload("not started"); - Ok(peers_source) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_processing_failure expected Processing got {other}" - ))), - } - } - - pub fn on_processing_success(&mut self) -> Result<(), LookupRequestError> { - match &self.state { - State::Processing(_) => { - self.state = State::Processed("processing success"); - Ok(()) - } - other => Err(LookupRequestError::BadState(format!( - "Bad state on_processing_success expected Processing got {other}" - ))), - } - } - - /// Mark a request as complete without any download or processing - pub fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> { + fn on_completed_request(&mut self, reason: &'static str) -> Result<(), LookupRequestError> { match &self.state { - State::AwaitingDownload { .. } => { - self.state = State::Processed(reason); + DownloadState::AwaitingDownload { .. } => { + self.state = DownloadState::Completed(reason); Ok(()) } other => Err(LookupRequestError::BadState(format!( @@ -656,33 +1264,28 @@ impl SingleLookupRequestState { } } - /// The total number of failures, whether it be processing or downloading. - pub fn failed_attempts(&self) -> u8 { + fn failed_attempts(&self) -> u8 { self.failed_processing + self.failed_downloading } - pub fn more_failed_processing_attempts(&self) -> bool { + fn more_failed_processing_attempts(&self) -> bool { self.failed_processing >= self.failed_downloading } } -// Display is used in the BadState assertions above -impl std::fmt::Display for State { +impl std::fmt::Display for DownloadState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", Into::<&'static str>::into(self)) } } -// Debug is used in the log_stuck_lookups print to include some more info. Implements custom Debug -// to not dump an entire block or blob to terminal which don't add valuable data. -impl std::fmt::Debug for State { +impl std::fmt::Debug for DownloadState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::AwaitingDownload(reason) => write!(f, "AwaitingDownload({})", reason), Self::Downloading(req_id) => write!(f, "Downloading({:?})", req_id), - Self::AwaitingProcess(d) => write!(f, "AwaitingProcess({:?})", d.peer_group), - Self::Processing(d) => write!(f, "Processing({:?})", d.peer_group), - Self::Processed(reason) => write!(f, "Processed({})", reason), + Self::Downloaded(_) => write!(f, "Downloaded()"), + Self::Completed(reason) => write!(f, "Completed({})", reason), } } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7e618d89808..841c3d6da50 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -43,9 +43,7 @@ use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; -use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, -}; +use crate::sync::block_lookups::{BlockComponent, DownloadResult}; use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; @@ -74,7 +72,8 @@ use strum::IntoStaticStr; use tokio::sync::mpsc; use tracing::{debug, error, info, trace}; use types::{ - BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, DataColumnSidecar, EthSpec, ForkContext, Hash256, SignedBeaconBlock, + SignedExecutionPayloadEnvelope, Slot, }; /// The number of slots ahead of us that is allowed before requesting a long-range (batch) Sync @@ -133,6 +132,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A payload envelope has been received from the RPC. + RpcPayloadEnvelope { + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, Arc>, Hash256), @@ -493,6 +500,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::SinglePayloadEnvelope { id } => { + self.on_single_payload_envelope_response(id, peer_id, RpcEvent::RPCError(error)) + } SyncRequestId::DataColumnsByRoot(req_id) => { self.on_data_columns_by_root_response(req_id, peer_id, RpcEvent::RPCError(error)) } @@ -839,6 +849,17 @@ impl SyncManager { } => { self.rpc_data_column_received(sync_request_id, peer_id, data_column, seen_timestamp) } + SyncMessage::RpcPayloadEnvelope { + sync_request_id, + peer_id, + envelope, + seen_timestamp, + } => self.rpc_payload_envelope_received( + sync_request_id, + peer_id, + envelope, + seen_timestamp, + ), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -894,9 +915,33 @@ impl SyncManager { }), ); } - // TODO(gloas) support gloas data column variant + // In Gloas, data columns identify the beacon block root but do not carry + // parent root. Treat as an unknown block-root trigger (attestation-style). + // The peer is marked as data-capable since it sent us a data column. DataColumnSidecar::Gloas(_) => { - error!("Gloas variant not yet supported") + match self.should_search_for_block(Some(data_column_slot), &peer_id) { + Ok(_) => { + if self.block_lookups.search_unknown_block_with_data_peer( + block_root, + &[peer_id], + &mut self.network, + ) { + debug!( + ?block_root, + "Created unknown block lookup from Gloas data column" + ); + } else { + debug!(?block_root, "No lookup created from Gloas data column"); + } + } + Err(reason) => { + debug!( + %block_root, + reason, + "Ignoring Gloas data column unknown block request" + ); + } + } } } } @@ -1137,14 +1182,13 @@ impl SyncManager { block: RpcEvent>>, ) { if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { - self.block_lookups - .on_download_response::>( - id, - resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), - &mut self.network, - ) + self.block_lookups.on_block_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } @@ -1207,14 +1251,53 @@ impl SyncManager { blob: RpcEvent>>, ) { if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { - self.block_lookups - .on_download_response::>( + self.block_lookups.on_blob_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) + } + } + + fn rpc_payload_envelope_received( + &mut self, + sync_request_id: SyncRequestId, + peer_id: PeerId, + envelope: Option>>, + seen_timestamp: Duration, + ) { + match sync_request_id { + SyncRequestId::SinglePayloadEnvelope { id } => self + .on_single_payload_envelope_response( id, - resp.map(|(value, seen_timestamp)| { - (value, PeerGroup::from_single(peer_id), seen_timestamp) - }), - &mut self.network, - ) + peer_id, + RpcEvent::from_chunk(envelope, seen_timestamp), + ), + _ => { + crit!(%peer_id, "bad request id for payload_envelope"); + } + } + } + + fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + envelope: RpcEvent>>, + ) { + if let Some(resp) = self + .network + .on_single_payload_envelope_response(id, peer_id, envelope) + { + self.block_lookups.on_payload_download_response( + id, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), + &mut self.network, + ) } } @@ -1306,11 +1389,7 @@ impl SyncManager { response: CustodyByRootResult, ) { self.block_lookups - .on_download_response::>( - requester.0, - response, - &mut self.network, - ); + .on_custody_download_response(requester.0, response, &mut self.network); } /// Handles receiving a response for a range sync request that should have both blocks and diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index ff630bb470a..ade71cbede2 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -2,7 +2,10 @@ //! channel and stores a global RPC ID to perform requests. use self::custody::{ActiveCustodyRequest, Error as CustodyRequestError}; -pub use self::requests::{BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest}; +pub use self::requests::{ + BlocksByRootSingleRequest, DataColumnsByRootSingleBlockRequest, + PayloadEnvelopesByRootSingleRequest, +}; use super::SyncMessage; use super::block_sidecar_coupling::RangeBlockComponentsRequest; use super::manager::BlockProcessType; @@ -37,6 +40,7 @@ pub use requests::LookupVerifyError; use requests::{ ActiveRequests, BlobsByRangeRequestItems, BlobsByRootRequestItems, BlocksByRangeRequestItems, BlocksByRootRequestItems, DataColumnsByRangeRequestItems, DataColumnsByRootRequestItems, + PayloadEnvelopesByRootRequestItems, }; #[cfg(test)] use slot_clock::SlotClock; @@ -52,7 +56,7 @@ use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + ForkContext, Hash256, SignedBeaconBlock, SignedExecutionPayloadEnvelope, Slot, }; pub mod custody; @@ -201,6 +205,9 @@ pub struct SyncNetworkContext { ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: ActiveRequests>, + /// A mapping of active PayloadEnvelopesByRoot requests + payload_envelopes_by_root_requests: + ActiveRequests>, /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, @@ -294,6 +301,7 @@ impl SyncNetworkContext { request_id: 1, blocks_by_root_requests: ActiveRequests::new("blocks_by_root"), blobs_by_root_requests: ActiveRequests::new("blobs_by_root"), + payload_envelopes_by_root_requests: ActiveRequests::new("payload_envelopes_by_root"), data_columns_by_root_requests: ActiveRequests::new("data_columns_by_root"), blocks_by_range_requests: ActiveRequests::new("blocks_by_range"), blobs_by_range_requests: ActiveRequests::new("blobs_by_range"), @@ -322,6 +330,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -345,6 +354,10 @@ impl SyncNetworkContext { .active_requests_of_peer(peer_id) .into_iter() .map(|id| SyncRequestId::SingleBlob { id: *id }); + let payload_envelopes_by_root_ids = payload_envelopes_by_root_requests + .active_requests_of_peer(peer_id) + .into_iter() + .map(|id| SyncRequestId::SinglePayloadEnvelope { id: *id }); let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -363,6 +376,7 @@ impl SyncNetworkContext { .map(|req_id| SyncRequestId::DataColumnsByRange(*req_id)); blocks_by_root_ids .chain(blobs_by_root_ids) + .chain(payload_envelopes_by_root_ids) .chain(data_column_by_root_ids) .chain(blocks_by_range_ids) .chain(blobs_by_range_ids) @@ -419,6 +433,7 @@ impl SyncNetworkContext { request_id: _, blocks_by_root_requests, blobs_by_root_requests, + payload_envelopes_by_root_requests, data_columns_by_root_requests, blocks_by_range_requests, blobs_by_range_requests, @@ -441,6 +456,7 @@ impl SyncNetworkContext { for peer_id in blocks_by_root_requests .iter_request_peers() .chain(blobs_by_root_requests.iter_request_peers()) + .chain(payload_envelopes_by_root_requests.iter_request_peers()) .chain(data_columns_by_root_requests.iter_request_peers()) .chain(blocks_by_range_requests.iter_request_peers()) .chain(blobs_by_range_requests.iter_request_peers()) @@ -927,6 +943,72 @@ impl SyncNetworkContext { Ok(LookupRequestResult::RequestSent(id.req_id)) } + /// Request a payload envelope for a block root via PayloadEnvelopesByRoot RPC. + pub fn payload_lookup_request( + &mut self, + lookup_id: SingleLookupId, + lookup_peers: Arc>>, + block_root: Hash256, + ) -> Result { + let active_request_count_by_peer = self.active_request_count_by_peer(); + let Some(peer_id) = lookup_peers + .read() + .iter() + .map(|peer| { + ( + active_request_count_by_peer.get(peer).copied().unwrap_or(0), + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + return Ok(LookupRequestResult::Pending("no peers")); + }; + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + let request = PayloadEnvelopesByRootSingleRequest { block_root }; + + let network_request = RequestType::PayloadEnvelopesByRoot( + request + .clone() + .into_request(&self.fork_context) + .map_err(RpcRequestSendError::InternalError)?, + ); + self.network_send + .send(NetworkMessage::SendRequest { + peer_id, + request: network_request, + app_request_id: AppRequestId::Sync(SyncRequestId::SinglePayloadEnvelope { id }), + }) + .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; + + debug!( + method = "PayloadEnvelopesByRoot", + ?block_root, + peer = %peer_id, + %id, + "Sync RPC request sent" + ); + + self.payload_envelopes_by_root_requests.insert( + id, + peer_id, + // true = enforce that the peer returns a response. We only request a single envelope + // and the peer must have it. + true, + PayloadEnvelopesByRootRequestItems::new(request), + Span::none(), + ); + + Ok(LookupRequestResult::RequestSent(id.req_id)) + } + /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: /// - If we have a downloaded but not yet processed block /// - If the da_checker has a pending block @@ -1464,6 +1546,27 @@ impl SyncNetworkContext { self.on_rpc_response_result(resp, peer_id) } + pub(crate) fn on_single_payload_envelope_response( + &mut self, + id: SingleLookupReqId, + peer_id: PeerId, + rpc_event: RpcEvent>>, + ) -> Option>>> { + let resp = self + .payload_envelopes_by_root_requests + .on_response(id, rpc_event); + let resp = resp.map(|res| { + res.and_then(|(mut envelopes, seen_timestamp)| { + match envelopes.pop() { + Some(envelope) => Ok((envelope, seen_timestamp)), + // Should never happen, we enforce at least 1 chunk. + None => Err(LookupVerifyError::NotEnoughResponsesReturned { actual: 0 }.into()), + } + }) + }); + self.on_rpc_response_result(resp, peer_id) + } + #[allow(clippy::type_complexity)] pub(crate) fn on_data_columns_by_root_response( &mut self, diff --git a/beacon_node/network/src/sync/network_context/requests.rs b/beacon_node/network/src/sync/network_context/requests.rs index 8f9540693e1..5b5e779d9bf 100644 --- a/beacon_node/network/src/sync/network_context/requests.rs +++ b/beacon_node/network/src/sync/network_context/requests.rs @@ -16,6 +16,9 @@ pub use data_columns_by_range::DataColumnsByRangeRequestItems; pub use data_columns_by_root::{ DataColumnsByRootRequestItems, DataColumnsByRootSingleBlockRequest, }; +pub use payload_envelopes_by_root::{ + PayloadEnvelopesByRootRequestItems, PayloadEnvelopesByRootSingleRequest, +}; use crate::metrics; @@ -27,6 +30,7 @@ mod blocks_by_range; mod blocks_by_root; mod data_columns_by_range; mod data_columns_by_root; +mod payload_envelopes_by_root; #[derive(Debug, PartialEq, Eq, IntoStaticStr)] pub enum LookupVerifyError { diff --git a/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs new file mode 100644 index 00000000000..a142d86e905 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/requests/payload_envelopes_by_root.rs @@ -0,0 +1,54 @@ +use lighthouse_network::rpc::methods::PayloadEnvelopesByRootRequest; +use std::sync::Arc; +use types::{EthSpec, ForkContext, Hash256, SignedExecutionPayloadEnvelope}; + +use super::{ActiveRequestItems, LookupVerifyError}; + +#[derive(Debug, Clone)] +pub struct PayloadEnvelopesByRootSingleRequest { + pub block_root: Hash256, +} + +impl PayloadEnvelopesByRootSingleRequest { + pub fn into_request( + self, + fork_context: &ForkContext, + ) -> Result { + PayloadEnvelopesByRootRequest::new(vec![self.block_root], fork_context) + } +} + +pub struct PayloadEnvelopesByRootRequestItems { + request: PayloadEnvelopesByRootSingleRequest, + items: Vec>>, +} + +impl PayloadEnvelopesByRootRequestItems { + pub fn new(request: PayloadEnvelopesByRootSingleRequest) -> Self { + Self { + request, + items: vec![], + } + } +} + +impl ActiveRequestItems for PayloadEnvelopesByRootRequestItems { + type Item = Arc>; + + /// Append a response to the single chunk request. We expect exactly one envelope per + /// block root. Returns `true` when the single expected item has been received. + fn add(&mut self, envelope: Self::Item) -> Result { + let block_root = envelope.message.beacon_block_root; + if self.request.block_root != block_root { + return Err(LookupVerifyError::UnrequestedBlockRoot(block_root)); + } + + self.items.push(envelope); + // Always returns true, we expect a single envelope per block root + Ok(true) + } + + fn consume(&mut self) -> Vec { + std::mem::take(&mut self.items) + } +}