diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index d0323bab52f..375b2867ee8 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -3,7 +3,7 @@ use libp2p::PeerId; use std::fmt::{Display, Formatter}; use std::sync::Arc; use types::{ - BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap, + BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }; @@ -71,7 +71,6 @@ pub struct DataColumnsByRangeRequestId { #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRangeRequester { - ComponentsByRange(ComponentsByRangeRequestId), CustodyBackfillSync(CustodyBackFillBatchRequestId), } @@ -130,9 +129,17 @@ pub struct CustodyId { } /// Downstream components that perform custody by root requests. -/// Currently, it's only single block lookups, so not using an enum #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] -pub struct CustodyRequester(pub SingleLookupReqId); +pub enum CustodyRequester { + SingleLookup(SingleLookupReqId), + RangeSync(RangeSyncCustodyId), +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct RangeSyncCustodyId { + pub id: ComponentsByRangeRequestId, + pub block_root: Hash256, +} /// Application level requests sent to the network. #[derive(Debug, Clone, Copy, PartialEq)] @@ -254,7 +261,16 @@ impl Display for DataColumnsByRootRequester { impl Display for CustodyRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + match self { + Self::SingleLookup(id) => write!(f, "{id}"), + Self::RangeSync(id) => write!(f, "RangeSync/{id}"), + } + } +} + +impl Display for RangeSyncCustodyId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{:?}", self.id, self.block_root) } } @@ -270,7 +286,6 @@ impl Display for RangeRequestId { impl Display for DataColumnsByRangeRequester { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - Self::ComponentsByRange(id) => write!(f, "ByRange/{id}"), Self::CustodyBackfillSync(id) => write!(f, "CustodyBackfill/{id}"), } } @@ -285,7 +300,7 @@ mod tests { let id = DataColumnsByRootRequestId { id: 123, requester: DataColumnsByRootRequester::Custody(CustodyId { - requester: CustodyRequester(SingleLookupReqId { + requester: CustodyRequester::SingleLookup(SingleLookupReqId { req_id: 121, lookup_id: 101, }), @@ -298,17 +313,17 @@ mod tests { fn display_id_data_columns_by_range() { let id = DataColumnsByRangeRequestId { id: 123, - parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( - ComponentsByRangeRequestId { + parent_request_id: DataColumnsByRangeRequester::CustodyBackfillSync( + CustodyBackFillBatchRequestId { id: 122, - requester: RangeRequestId::RangeSync { - chain_id: 54, - batch_id: Epoch::new(0), + batch_id: CustodyBackfillBatchId { + epoch: Epoch::new(0), + run_id: 54, }, }, ), peer: PeerId::random(), }; - assert_eq!(format!("{id}"), "123/ByRange/122/RangeSync/0/54"); + assert_eq!(format!("{id}"), "123/CustodyBackfill/122/0/54"); } } diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index debe30b34f0..a25c56ffc6e 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -9,7 +9,8 @@ use libp2p::PeerId; use lighthouse_network::rpc::{RequestType, methods::*}; use lighthouse_network::service::api_types::{ AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, DataColumnsByRangeRequestId, + DataColumnsByRangeRequester, RangeRequestId, SyncRequestId, }; use lighthouse_network::{NetworkEvent, ReportSource, Response}; use ssz::Encode; @@ -1847,12 +1848,12 @@ fn test_request_too_large_data_columns_by_range() { AppRequestId::Sync(SyncRequestId::DataColumnsByRange( DataColumnsByRangeRequestId { id: 1, - parent_request_id: DataColumnsByRangeRequester::ComponentsByRange( - ComponentsByRangeRequestId { + parent_request_id: DataColumnsByRangeRequester::CustodyBackfillSync( + CustodyBackFillBatchRequestId { id: 1, - requester: RangeRequestId::RangeSync { - chain_id: 1, - batch_id: Epoch::new(1), + batch_id: CustodyBackfillBatchId { + epoch: Epoch::new(1), + run_id: 1, }, }, ), diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 240fd70e01f..3fb12832656 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -469,6 +469,30 @@ pub static SYNCING_CHAIN_BATCHES: LazyLock> = LazyLock::new( &["sync_type", "state"], ) }); +pub static SYNCING_CHAIN_BATCH_DOWNLOADING: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( + "sync_range_chain_batch_downloading_seconds", + "Time range sync batches spend downloading", + Ok(vec![0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 30.0, 60.0]), + ) +}); +pub static SYNCING_CHAIN_BATCH_PROCESSING: LazyLock> = LazyLock::new(|| { + try_create_histogram_with_buckets( + "sync_range_chain_batch_processing_seconds", + "Time range sync batches spend in processing", + Ok(vec![ + 0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, + ]), + ) +}); +pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_histogram_with_buckets( + "sync_range_chain_batch_awaiting_processing_count", + "Number of batches in AwaitingProcessing when a batch starts processing", + Ok(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]), + ) + }); pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock> = LazyLock::new(|| { try_create_int_gauge( "sync_single_block_lookups", diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 801c9eca4d5..e511e887aa9 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -35,7 +35,7 @@ use std::marker::PhantomData; use std::sync::Arc; use strum::IntoEnumIterator; use tracing::{debug, error, info, warn}; -use types::{ColumnIndex, Epoch, EthSpec}; +use types::{Epoch, EthSpec}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -315,45 +315,22 @@ impl BackFillSync { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: &PeerId, + peer_id: Option<&PeerId>, request_id: Id, err: RpcResponseError, ) -> Result<(), BackFillError> { if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { match coupling_error { - CouplingError::DataColumnPeerFailure { - error, - faulty_peers, - exceeded_retries, - } => { - debug!(?batch_id, error, "Block components coupling error"); - // Note: we don't fail the batch here because a `CouplingError` is - // recoverable by requesting from other honest peers. - let mut failed_columns = HashSet::new(); - let mut failed_peers = HashSet::new(); - for (column, peer) in faulty_peers { - failed_columns.insert(*column); - failed_peers.insert(*peer); - } - - // Only retry if peer failure **and** retries haven't been exceeded - if !*exceeded_retries { - return self.retry_partial_batch( - network, - batch_id, - request_id, - failed_columns, - failed_peers, - ); - } - } CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } + CouplingError::DataColumnPeerFailure { error, .. } => { + debug!(?batch_id, error, "Data column peer failure"); + } } } // A batch could be retried without the peer failing the request (disconnecting/ @@ -365,7 +342,7 @@ impl BackFillSync { return Ok(()); } debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed"); - match batch.download_failed(Some(*peer_id)) { + match batch.download_failed(peer_id.copied()) { Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)), Ok(BatchOperationOutcome::Failed { blacklist: _ }) => { self.fail_sync(BackFillError::BatchDownloadFailed(batch_id)) @@ -699,7 +676,7 @@ impl BackFillSync { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(ProcessResult::Successful), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(..) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - Processing -> `self.current_processing_batch` is None @@ -805,7 +782,7 @@ impl BackFillSync { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} - BatchState::Processing(_) => { + BatchState::Processing(..) => { debug!(batch = %id, %batch, "Advancing chain while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id >= processing_id @@ -902,14 +879,11 @@ impl BackFillSync { .collect::>(); let (request, is_blob_batch) = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_peers(); match network.block_components_by_range_request( is_blob_batch, request, RangeRequestId::BackfillSync { batch_id }, &synced_peers, - &synced_peers, // All synced peers have imported up to the finalized slot so they must have their custody columns available - &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request @@ -957,53 +931,6 @@ impl BackFillSync { Ok(()) } - /// Retries partial column requests within the batch by creating new requests for the failed columns. - pub fn retry_partial_batch( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - id: Id, - failed_columns: HashSet, - mut failed_peers: HashSet, - ) -> Result<(), BackFillError> { - if let Some(batch) = self.batches.get_mut(&batch_id) { - failed_peers.extend(&batch.failed_peers()); - let req = batch.to_blocks_by_range_request().0; - - let synced_peers = network - .network_globals() - .peers - .read() - .synced_peers_for_epoch(batch_id) - .cloned() - .collect::>(); - - match network.retry_columns_by_range( - id, - &synced_peers, - &failed_peers, - req, - &failed_columns, - ) { - Ok(_) => { - debug!( - ?batch_id, - id, "Retried column requests from different peers" - ); - return Ok(()); - } - Err(e) => { - debug!(?batch_id, id, e, "Failed to retry partial batch"); - } - } - } else { - return Err(BackFillError::InvalidSyncState( - "Batch should exist to be retried".to_string(), - )); - } - Ok(()) - } - /// When resuming a chain, this function searches for batches that need to be re-downloaded and /// transitions their state to redownload the batch. fn resume_batches(&mut self, network: &mut SyncNetworkContext) -> Result<(), BackFillError> { diff --git a/beacon_node/network/src/sync/batch.rs b/beacon_node/network/src/sync/batch.rs index e87ffd119e2..d30356bc7a2 100644 --- a/beacon_node/network/src/sync/batch.rs +++ b/beacon_node/network/src/sync/batch.rs @@ -131,11 +131,11 @@ pub enum BatchState { /// The batch has failed either downloading or processing, but can be requested again. AwaitingDownload, /// The batch is being downloaded. - Downloading(Id), + Downloading(Id, Instant), /// The batch has been completely downloaded and is ready for processing. AwaitingProcessing(PeerId, D, Instant), /// The batch is being processed. - Processing(Attempt), + Processing(Attempt, Instant), /// The batch was successfully processed and is waiting to be validated. /// /// It is not sufficient to process a batch successfully to consider it correct. This is @@ -159,9 +159,9 @@ impl BatchState { pub fn metrics_state(&self) -> BatchMetricsState { match self { BatchState::AwaitingDownload => BatchMetricsState::AwaitingDownload, - BatchState::Downloading(_) => BatchMetricsState::Downloading, + BatchState::Downloading(..) => BatchMetricsState::Downloading, BatchState::AwaitingProcessing(..) => BatchMetricsState::AwaitingProcessing, - BatchState::Processing(_) => BatchMetricsState::Processing, + BatchState::Processing(..) => BatchMetricsState::Processing, BatchState::AwaitingValidation(_) => BatchMetricsState::AwaitingValidation, BatchState::Poisoned | BatchState::Failed => BatchMetricsState::Failed, } @@ -217,18 +217,36 @@ impl BatchInfo { /// Verifies if an incoming request id to this batch. pub fn is_expecting_request_id(&self, request_id: &Id) -> bool { - if let BatchState::Downloading(expected_id) = &self.state { + if let BatchState::Downloading(expected_id, _) = &self.state { return expected_id == request_id; } false } + /// Returns the elapsed time since the batch entered the Downloading state. + pub fn time_since_downloading(&self) -> Option { + if let BatchState::Downloading(_, start) = &self.state { + Some(start.elapsed()) + } else { + None + } + } + + /// Returns the elapsed time since the batch entered the Processing state. + pub fn time_since_processing(&self) -> Option { + if let BatchState::Processing(_, start) = &self.state { + Some(start.elapsed()) + } else { + None + } + } + /// Returns the peer that is currently responsible for progressing the state of the batch. pub fn processing_peer(&self) -> Option<&PeerId> { match &self.state { BatchState::AwaitingDownload | BatchState::Failed | BatchState::Downloading(..) => None, BatchState::AwaitingProcessing(peer_id, _, _) - | BatchState::Processing(Attempt { peer_id, .. }) + | BatchState::Processing(Attempt { peer_id, .. }, _) | BatchState::AwaitingValidation(Attempt { peer_id, .. }) => Some(peer_id), BatchState::Poisoned => unreachable!("Poisoned batch"), } @@ -264,7 +282,7 @@ impl BatchInfo { #[must_use = "Batch may have failed"] pub fn download_completed(&mut self, data_columns: D, peer: PeerId) -> Result<(), WrongState> { match self.state.poison() { - BatchState::Downloading(_) => { + BatchState::Downloading(..) => { self.state = BatchState::AwaitingProcessing(peer, data_columns, Instant::now()); Ok(()) } @@ -284,15 +302,14 @@ impl BatchInfo { /// This can happen if a peer disconnects or some error occurred that was not the peers fault. /// The `peer` parameter, when set to `None`, still counts toward /// `max_batch_download_attempts` (to prevent infinite retries on persistent failures) - /// but does not register a peer in `failed_peers()`. Use - /// [`Self::downloading_to_awaiting_download`] to retry without counting a failed attempt. + /// but does not register a peer in `failed_peers()`. #[must_use = "Batch may have failed"] pub fn download_failed( &mut self, peer: Option, ) -> Result { match self.state.poison() { - BatchState::Downloading(_) => { + BatchState::Downloading(..) => { // register the attempt and check if the batch can be tried again self.failed_download_attempts.push(peer); @@ -316,35 +333,10 @@ impl BatchInfo { } } - /// Change the batch state from `Self::Downloading` to `Self::AwaitingDownload` without - /// registering a failed attempt. - /// - /// Note: must use this cautiously with some level of retry protection - /// as not registering a failed attempt could lead to requesting in a loop. - #[must_use = "Batch may have failed"] - pub fn downloading_to_awaiting_download( - &mut self, - ) -> Result { - match self.state.poison() { - BatchState::Downloading(_) => { - self.state = BatchState::AwaitingDownload; - Ok(self.outcome()) - } - BatchState::Poisoned => unreachable!("Poisoned batch"), - other => { - self.state = other; - Err(WrongState(format!( - "Download failed for batch in wrong state {:?}", - self.state - ))) - } - } - } - pub fn start_downloading(&mut self, request_id: Id) -> Result<(), WrongState> { match self.state.poison() { BatchState::AwaitingDownload => { - self.state = BatchState::Downloading(request_id); + self.state = BatchState::Downloading(request_id, Instant::now()); Ok(()) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -361,7 +353,8 @@ impl BatchInfo { pub fn start_processing(&mut self) -> Result<(D, Duration), WrongState> { match self.state.poison() { BatchState::AwaitingProcessing(peer, data_columns, start_instant) => { - self.state = BatchState::Processing(Attempt::new::(peer, &data_columns)); + self.state = + BatchState::Processing(Attempt::new::(peer, &data_columns), Instant::now()); Ok((data_columns, start_instant.elapsed())) } BatchState::Poisoned => unreachable!("Poisoned batch"), @@ -380,7 +373,7 @@ impl BatchInfo { processing_result: BatchProcessingResult, ) -> Result { match self.state.poison() { - BatchState::Processing(attempt) => { + BatchState::Processing(attempt, _start) => { self.state = match processing_result { BatchProcessingResult::Success => BatchState::AwaitingValidation(attempt), BatchProcessingResult::FaultyFailure => { @@ -518,7 +511,7 @@ impl Attempt { impl std::fmt::Debug for BatchState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - BatchState::Processing(Attempt { peer_id, .. }) => { + BatchState::Processing(Attempt { peer_id, .. }, _) => { write!(f, "Processing({})", peer_id) } BatchState::AwaitingValidation(Attempt { peer_id, .. }) => { @@ -529,7 +522,7 @@ impl std::fmt::Debug for BatchState { BatchState::AwaitingProcessing(peer, ..) => { write!(f, "AwaitingProcessing({})", peer) } - BatchState::Downloading(request_id) => { + BatchState::Downloading(request_id, _) => { write!(f, "Downloading({})", request_id) } BatchState::Poisoned => f.write_str("Poisoned"), @@ -543,8 +536,8 @@ impl BatchState { fn visualize(&self) -> char { match self { BatchState::Downloading(..) => 'D', - BatchState::Processing(_) => 'P', - BatchState::AwaitingValidation(_) => 'v', + BatchState::Processing(..) => 'P', + BatchState::AwaitingValidation(..) => 'v', BatchState::AwaitingDownload => 'd', BatchState::Failed => 'F', BatchState::AwaitingProcessing(..) => 'p', @@ -599,7 +592,7 @@ mod tests { assert!(matches!(batch.state(), BatchState::AwaitingDownload)); batch.start_downloading(1).unwrap(); - assert!(matches!(batch.state(), BatchState::Downloading(1))); + assert!(matches!(batch.state(), BatchState::Downloading(1, _))); batch.download_completed(vec![10, 20], p).unwrap(); assert!(matches!(batch.state(), BatchState::AwaitingProcessing(..))); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index edd99345b43..cd4a9ee65bf 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -5,9 +5,9 @@ use crate::sync::block_lookups::{ BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, }; use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; +use crate::sync::network_context::{LookupRequestResult, RpcRequestSendError, SyncNetworkContext}; use beacon_chain::BeaconChainTypes; -use lighthouse_network::service::api_types::Id; +use lighthouse_network::service::api_types::{CustodyRequester, Id, SingleLookupReqId}; use parking_lot::RwLock; use std::collections::HashSet; use std::sync::Arc; @@ -172,8 +172,26 @@ impl RequestState for CustodyRequestState { _: usize, cx: &mut SyncNetworkContext, ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) - .map_err(LookupRequestError::SendFailedNetwork) + let requester = CustodyRequester::SingleLookup(SingleLookupReqId { + lookup_id: id, + req_id: cx.next_id(), + }); + // For single lookups, compute custody columns for the current epoch, + // excluding columns already imported via gossip. + let current_epoch = cx.chain.epoch().map_err(|e| { + LookupRequestError::SendFailedNetwork(RpcRequestSendError::InternalError(format!( + "Unable to read slot clock {:?}", + e + ))) + })?; + cx.custody_lookup_request( + requester, + self.block_root, + current_epoch, + false, + lookup_peers, + ) + .map_err(LookupRequestError::SendFailedNetwork) } fn send_for_processing( diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index a2877718548..9106b2e6afd 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -8,18 +8,22 @@ use beacon_chain::{ use lighthouse_network::{ PeerId, service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, + BlocksByRangeRequestId, ComponentsByRangeRequestId, CustodyRequester, RangeSyncCustodyId, }, }; +use parking_lot::RwLock; use ssz_types::RuntimeVariableList; -use std::{collections::HashMap, sync::Arc}; -use tracing::{Span, debug}; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tracing::debug; use types::{ BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, }; -use crate::sync::network_context::MAX_COLUMN_RETRIES; +use crate::sync::network_context::{LookupRequestResult, PeerGroup, SyncNetworkContext}; + +pub use lighthouse_network::service::api_types::BlobsByRangeRequestId; /// Accumulates and couples beacon blocks with their associated data (blobs or data columns) /// from range sync network responses. @@ -27,18 +31,15 @@ use crate::sync::network_context::MAX_COLUMN_RETRIES; /// This struct acts as temporary storage while multiple network responses arrive: /// - Blocks themselves (always required) /// - Blob sidecars (pre-Fulu fork) -/// - Data columns (Fulu fork and later) +/// - Data columns (Fulu fork and later, via custody-by-root) /// /// It accumulates responses until all expected components are received, then couples -/// them together and returns complete `RpcBlock`s ready for processing. Handles validation -/// and peer failure detection during the coupling process. +/// them together and returns complete `RpcBlock`s ready for processing. pub struct RangeBlockComponentsRequest { /// Blocks we have received awaiting for their corresponding sidecar. blocks_request: ByRangeRequest>>>, /// Sidecars we have received awaiting for their corresponding block. block_data_request: RangeBlockDataRequest, - /// Span to track the range request and all children range requests. - pub(crate) request_span: Span, } pub enum ByRangeRequest { @@ -50,17 +51,17 @@ enum RangeBlockDataRequest { NoData, Blobs(ByRangeRequest>>>), DataColumns { - requests: HashMap< - DataColumnsByRangeRequestId, - ByRangeRequest>, - >, - /// The column indices corresponding to the request - column_peers: HashMap>, + custody_columns_by_root: HashMap>, expected_custody_columns: Vec, - attempt: usize, }, } +#[derive(Clone)] +enum DataColumnsState { + Requesting, + Complete(DataColumnSidecarList, PeerGroup), +} + #[derive(Debug)] pub(crate) enum CouplingError { InternalError(String), @@ -68,7 +69,6 @@ pub(crate) enum CouplingError { DataColumnPeerFailure { error: String, faulty_peers: Vec<(ColumnIndex, PeerId)>, - exceeded_retries: bool, }, BlobPeerFailure(String), } @@ -79,29 +79,18 @@ impl RangeBlockComponentsRequest { /// # Arguments /// * `blocks_req_id` - Request ID for the blocks /// * `blobs_req_id` - Optional request ID for blobs (pre-Fulu fork) - /// * `data_columns` - Optional tuple of (request_id->column_indices pairs, expected_custody_columns) for Fulu fork - #[allow(clippy::type_complexity)] + /// * `expects_custody_columns` - If Some, custody-by-root will be used after blocks arrive pub fn new( blocks_req_id: BlocksByRangeRequestId, blobs_req_id: Option, - data_columns: Option<( - Vec<(DataColumnsByRangeRequestId, Vec)>, - Vec, - )>, - request_span: Span, + expects_custody_columns: Option>, ) -> Self { let block_data_request = if let Some(blobs_req_id) = blobs_req_id { RangeBlockDataRequest::Blobs(ByRangeRequest::Active(blobs_req_id)) - } else if let Some((requests, expected_custody_columns)) = data_columns { - let column_peers: HashMap<_, _> = requests.into_iter().collect(); + } else if let Some(expected_custody_columns) = expects_custody_columns { RangeBlockDataRequest::DataColumns { - requests: column_peers - .keys() - .map(|id| (*id, ByRangeRequest::Active(*id))) - .collect(), - column_peers, + custody_columns_by_root: HashMap::new(), expected_custody_columns, - attempt: 0, } } else { RangeBlockDataRequest::NoData @@ -110,31 +99,12 @@ impl RangeBlockComponentsRequest { Self { blocks_request: ByRangeRequest::Active(blocks_req_id), block_data_request, - request_span, } } - /// Modifies `self` by inserting a new `DataColumnsByRangeRequestId` for a formerly failed - /// request for some columns. - pub fn reinsert_failed_column_requests( - &mut self, - failed_column_requests: Vec<(DataColumnsByRangeRequestId, Vec)>, - ) -> Result<(), String> { - match &mut self.block_data_request { - RangeBlockDataRequest::DataColumns { - requests, - expected_custody_columns: _, - column_peers, - attempt: _, - } => { - for (request, columns) in failed_column_requests.into_iter() { - requests.insert(request, ByRangeRequest::Active(request)); - column_peers.insert(request, columns); - } - Ok(()) - } - _ => Err("not a column request".to_string()), - } + /// Returns true if the blocks component of this request has been received. + pub fn blocks_received(&self) -> bool { + self.blocks_request.to_finished().is_some() } /// Adds received blocks to the request. @@ -166,29 +136,121 @@ impl RangeBlockComponentsRequest { } } - /// Adds received custody columns to the request. + /// Adds received custody columns for a specific block root. /// - /// Returns an error if this request expects blobs instead of data columns, - /// or if the request ID is unknown. + /// Returns an error if not in DataColumns mode, or if columns for this root + /// were already completed. pub fn add_custody_columns( &mut self, - req_id: DataColumnsByRangeRequestId, - columns: Vec>>, + block_root: Hash256, + columns: DataColumnSidecarList, + peer_group: PeerGroup, ) -> Result<(), String> { - match &mut self.block_data_request { - RangeBlockDataRequest::NoData => { - Err("received data columns but expected no data".to_owned()) + let RangeBlockDataRequest::DataColumns { + custody_columns_by_root, + .. + } = &mut self.block_data_request + else { + return Err("received custody columns but not in DataColumns mode".to_owned()); + }; + match custody_columns_by_root.get(&block_root) { + Some(DataColumnsState::Complete(..)) => Err(format!( + "duplicate custody columns for block root {block_root:?}" + )), + None => Err(format!( + "received custody columns for unregistered block root {block_root:?}" + )), + Some(DataColumnsState::Requesting) => { + custody_columns_by_root + .insert(block_root, DataColumnsState::Complete(columns, peer_group)); + Ok(()) + } + } + } + + /// After blocks arrive, initiates custody-by-root requests for blocks that need data columns. + /// + /// Only does work when blocks have arrived and we're in DataColumns mode. For each block + /// with data, inserts a `Requesting` entry and fires a custody request via the network context. + pub fn continue_requests>( + &mut self, + id: ComponentsByRangeRequestId, + cx: &mut SyncNetworkContext, + ) -> Result<(), String> { + let Some(blocks) = self.blocks_request.to_finished() else { + return Ok(()); + }; + let RangeBlockDataRequest::DataColumns { + custody_columns_by_root, + .. + } = &mut self.block_data_request + else { + return Ok(()); + }; + + let mut errors = vec![]; + + for block in blocks { + if block.num_expected_blobs() == 0 { + continue; } - RangeBlockDataRequest::Blobs(_) => { - Err("received data columns but expected blobs".to_owned()) + let block_root = get_block_root(block); + if custody_columns_by_root.contains_key(&block_root) { + continue; } - RangeBlockDataRequest::DataColumns { requests, .. } => { - let req = requests - .get_mut(&req_id) - .ok_or(format!("unknown data columns by range req_id {req_id}"))?; - req.finish(req_id, columns) + + let block_epoch = block.slot().epoch(E::slots_per_epoch()); + let requester = CustodyRequester::RangeSync(RangeSyncCustodyId { id, block_root }); + match cx.custody_lookup_request( + requester, + block_root, + block_epoch, + true, // ignore_cache: range blocks won't have gossip-imported columns + Arc::new(RwLock::new(HashSet::new())), + ) { + Ok(LookupRequestResult::RequestSent(_)) => { + custody_columns_by_root.insert(block_root, DataColumnsState::Requesting); + debug!(?block_root, %id, "Initiated custody-by-root for range block"); + } + Ok(LookupRequestResult::NoRequestNeeded(reason)) => { + // All columns already available (e.g. arrived via gossip). Mark as complete. + debug!(?block_root, %id, %reason, "Custody-by-root not needed for range block"); + custody_columns_by_root.insert( + block_root, + DataColumnsState::Complete(vec![], PeerGroup::from_set(Default::default())), + ); + } + Ok(LookupRequestResult::Pending(reason)) => { + errors.push(format!( + "Custody request for {block_root:?} pending: {reason}" + )); + } + Err(e) => { + errors.push(format!( + "Failed to initiate custody for {block_root:?}: {e:?}" + )); + } } } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors.join("; ")) + } + } + + /// Registers a block root as awaiting custody columns. Used in tests to simulate + /// the effect of `continue_requests` without requiring a full network context. + #[cfg(test)] + pub fn register_custody_block(&mut self, block_root: Hash256) { + if let RangeBlockDataRequest::DataColumns { + custody_columns_by_root, + .. + } = &mut self.block_data_request + { + custody_columns_by_root.insert(block_root, DataColumnsState::Requesting); + } } /// Attempts to construct RPC blocks from all received components. @@ -208,8 +270,7 @@ impl RangeBlockComponentsRequest { return None; }; - // Increment the attempt once this function returns the response or errors - match &mut self.block_data_request { + match &self.block_data_request { RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( blocks.to_vec(), vec![], @@ -228,57 +289,23 @@ impl RangeBlockComponentsRequest { )) } RangeBlockDataRequest::DataColumns { - requests, + custody_columns_by_root, expected_custody_columns, - column_peers, - attempt, } => { - let mut data_columns = vec![]; - let mut column_to_peer_id: HashMap = HashMap::new(); - for req in requests.values() { - let Some(data) = req.to_finished() else { - return None; - }; - data_columns.extend(data.clone()) - } - - // An "attempt" is complete here after we have received a response for all the - // requests we made. i.e. `req.to_finished()` returns Some for all requests. - *attempt += 1; - - // Note: this assumes that only 1 peer is responsible for a column - // with a batch. - for (id, columns) in column_peers { - for column in columns { - column_to_peer_id.insert(*column, id.peer); - } + if custody_columns_by_root + .values() + .any(|s| matches!(s, DataColumnsState::Requesting)) + { + return None; } - let resp = Self::responses_with_custody_columns( + Some(Self::responses_with_custody_columns( blocks.to_vec(), - data_columns, - column_to_peer_id, + custody_columns_by_root.clone(), expected_custody_columns, - *attempt, da_checker, spec, - ); - - if let Err(CouplingError::DataColumnPeerFailure { - error: _, - faulty_peers, - exceeded_retries: _, - }) = &resp - { - for (_, peer) in faulty_peers.iter() { - // find the req id associated with the peer and - // delete it from the entries as we are going to make - // a separate attempt for those components. - requests.retain(|&k, _| k.peer != *peer); - } - } - - Some(resp) + )) } } } @@ -354,84 +381,50 @@ impl RangeBlockComponentsRequest { fn responses_with_custody_columns( blocks: Vec>>, - data_columns: DataColumnSidecarList, - column_to_peer: HashMap, + custody_columns_by_root: HashMap>, expects_custody_columns: &[ColumnIndex], - attempt: usize, da_checker: Arc>, spec: Arc, ) -> Result>, CouplingError> where T: BeaconChainTypes, { - // Group data columns by block_root and index - let mut data_columns_by_block = - HashMap::>>>::new(); - - for column in data_columns { - let block_root = column.block_root(); - let index = *column.index(); - if data_columns_by_block - .entry(block_root) - .or_default() - .insert(index, column) - .is_some() - { - // `DataColumnsByRangeRequestItems` ensures that we do not request any duplicated indices across all peers - // we request the data from. - // If there are duplicated indices, its likely a peer sending us the same index multiple times. - // However we can still proceed even if there are extra columns, just log an error. - debug!(?block_root, ?index, "Repeated column for block_root"); - continue; - } - } - - // Now iterate all blocks ensuring that the block roots of each block and data column match, - // plus we have columns for our custody requirements + let mut custody_columns_by_root = custody_columns_by_root; let mut rpc_blocks = Vec::with_capacity(blocks.len()); - let exceeded_retries = attempt >= MAX_COLUMN_RETRIES; for block in blocks { let block_root = get_block_root(&block); rpc_blocks.push(if block.num_expected_blobs() > 0 { - let Some(mut data_columns_by_index) = data_columns_by_block.remove(&block_root) + let Some(DataColumnsState::Complete(data_columns, _peer_group)) = + custody_columns_by_root.remove(&block_root) else { - let responsible_peers = column_to_peer.iter().map(|c| (*c.0, *c.1)).collect(); - return Err(CouplingError::DataColumnPeerFailure { - error: format!("No columns for block {block_root:?} with data"), - faulty_peers: responsible_peers, - exceeded_retries, - - }); + return Err(CouplingError::InternalError(format!( + "No columns for block {block_root:?} with data" + ))); }; + let mut data_columns_by_index = + HashMap::>>::new(); + for column in data_columns { + let index = *column.index(); + if data_columns_by_index.insert(index, column).is_some() { + debug!(?block_root, ?index, "Repeated column for block_root"); + } + } + let mut custody_columns = vec![]; - let mut naughty_peers = vec![]; for index in expects_custody_columns { - // Safe to convert to `CustodyDataColumn`: we have asserted that the index of - // this column is in the set of `expects_custody_columns` and with the expected - // block root, so for the expected epoch of this batch. if let Some(data_column) = data_columns_by_index.remove(index) { custody_columns.push(CustodyDataColumn::from_asserted_custody(data_column)); } else { - let Some(responsible_peer) = column_to_peer.get(index) else { - return Err(CouplingError::InternalError(format!("Internal error, no request made for column {}", index))); - }; - naughty_peers.push((*index, *responsible_peer)); + return Err(CouplingError::InternalError(format!( + "Missing custody column {index} for block {block_root:?}" + ))); } } - if !naughty_peers.is_empty() { - return Err(CouplingError::DataColumnPeerFailure { - error: format!("Peers did not return column for block_root {block_root:?} {naughty_peers:?}"), - faulty_peers: naughty_peers, - exceeded_retries - }); - } - // Assert that there are no columns left if !data_columns_by_index.is_empty() { let remaining_indices = data_columns_by_index.keys().collect::>(); - // log the error but don't return an error, we can still progress with extra columns. debug!( ?block_root, ?remaining_indices, @@ -439,22 +432,30 @@ impl RangeBlockComponentsRequest { ); } - let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); + let block_data = AvailableBlockData::new_with_data_columns( + custody_columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); RpcBlock::new(block, Some(block_data), &da_checker, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns - RpcBlock::new(block, Some(AvailableBlockData::NoData), &da_checker, spec.clone()) - .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? + RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + &da_checker, + spec.clone(), + ) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } // Assert that there are no columns left for other blocks - if !data_columns_by_block.is_empty() { - let remaining_roots = data_columns_by_block.keys().collect::>(); - // log the error but don't return an error, we can still progress with responses. - // this is most likely an internal error with overrequesting or a client bug. + if !custody_columns_by_root.is_empty() { + let remaining_roots = custody_columns_by_root.keys().collect::>(); debug!(?remaining_roots, "Not all columns consumed for block"); } @@ -486,25 +487,22 @@ impl ByRangeRequest { #[cfg(test)] mod tests { - use crate::sync::network_context::MAX_COLUMN_RETRIES; - use super::RangeBlockComponentsRequest; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::test_utils::{ NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_da_checker, test_spec, }; - use lighthouse_network::{ - PeerId, - service::api_types::{ - BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - DataColumnsByRangeRequestId, DataColumnsByRangeRequester, Id, RangeRequestId, - }, + use lighthouse_network::service::api_types::{ + BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, RangeRequestId, }; use rand::SeedableRng; - use std::{collections::HashMap, sync::Arc}; - use tracing::Span; - use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; + use std::sync::Arc; + use types::{ + Epoch, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng, + }; + + use crate::sync::network_context::PeerGroup; fn components_id() -> ComponentsByRangeRequestId { ComponentsByRangeRequestId { @@ -530,23 +528,6 @@ mod tests { } } - fn columns_id( - id: Id, - parent_request_id: DataColumnsByRangeRequester, - ) -> DataColumnsByRangeRequestId { - DataColumnsByRangeRequestId { - id, - parent_request_id, - peer: PeerId::random(), - } - } - - fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { - let spec = Arc::new(test_spec::()); - let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - info.responses(da_checker, spec).is_some() - } - #[test] fn no_blobs_into_responses() { let mut rng = XorShiftRng::from_seed([42; 16]); @@ -559,8 +540,7 @@ mod tests { .collect::>>>(); let blocks_req_id = blocks_id(components_id()); - let mut info = - RangeBlockComponentsRequest::::new(blocks_req_id, None, None, Span::none()); + let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, None); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -587,12 +567,8 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); let blobs_req_id = blobs_id(components_id); - let mut info = RangeBlockComponentsRequest::::new( - blocks_req_id, - Some(blobs_req_id), - None, - Span::none(), - ); + let mut info = + RangeBlockComponentsRequest::::new(blocks_req_id, Some(blobs_req_id), None); // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); @@ -633,52 +609,37 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expects_custody_columns - .iter() - .enumerate() - .map(|(i, column)| { - ( - columns_id( - i as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ), - vec![*column], - ) - }) - .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expects_custody_columns.clone())), - Span::none(), + Some(expects_custody_columns.clone()), ); - // Send blocks and complete terminate response + + // Send blocks info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); - // Assert response is not finished - assert!(!is_finished(&mut info)); - // Send data columns - for (i, &column_index) in expects_custody_columns.iter().enumerate() { - let (req, _columns) = columns_req_id.get(i).unwrap(); + // Register block roots as awaiting custody, then add columns + for (block, _) in &blocks { + let block_root = beacon_chain::get_block_root(block); + info.register_custody_block(block_root); + } + for (block, data_columns) in &blocks { + let block_root = beacon_chain::get_block_root(block); + let custody_columns: Vec<_> = data_columns + .iter() + .filter(|d| expects_custody_columns.contains(d.index())) + .cloned() + .collect(); info.add_custody_columns( - *req, - blocks - .iter() - .flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned()) - .collect(), + block_root, + custody_columns, + PeerGroup::from_set(Default::default()), ) .unwrap(); - - if i < expects_custody_columns.len() - 1 { - assert!( - !is_finished(&mut info), - "requested should not be finished at loop {i}" - ); - } } // All completed construct response @@ -686,325 +647,181 @@ mod tests { } #[test] - fn rpc_block_with_custody_columns_batched() { + fn add_custody_columns_rejects_unregistered_root() { + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, Some(vec![0, 1])); + let root = Hash256::random(); + let result = + info.add_custody_columns(root, vec![], PeerGroup::from_set(Default::default())); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("unregistered")); + } + + #[test] + fn add_custody_columns_rejects_duplicate() { + let components_id = components_id(); + let blocks_req_id = blocks_id(components_id); + let mut info = RangeBlockComponentsRequest::::new(blocks_req_id, None, Some(vec![0, 1])); + let root = Hash256::random(); + info.register_custody_block(root); + info.add_custody_columns(root, vec![], PeerGroup::from_set(Default::default())) + .unwrap(); + let result = + info.add_custody_columns(root, vec![], PeerGroup::from_set(Default::default())); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("duplicate")); + } + + #[test] + fn responses_returns_none_while_custody_requesting() { let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker + let expects_custody_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); - // Split sampling columns into two batches - let mid = expected_sampling_columns.len() / 2; - let batched_column_requests = [ - expected_sampling_columns[..mid].to_vec(), - expected_sampling_columns[mid..].to_vec(), - ]; - let custody_column_request_ids = - (0..batched_column_requests.len() as u32).collect::>(); - let num_of_data_column_requests = custody_column_request_ids.len(); + let mut rng = XorShiftRng::from_seed([42; 16]); + let (block, _data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = batched_column_requests - .iter() - .enumerate() - .map(|(i, columns)| { - ( - columns_id( - i as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ), - columns.clone(), - ) - }) - .collect::>(); - let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_sampling_columns.clone())), - Span::none(), + Some(expects_custody_columns), ); - let mut rng = XorShiftRng::from_seed([42; 16]); - let blocks = (0..4) - .map(|_| { - generate_rand_block_and_data_columns::( - ForkName::Fulu, - NumBlobs::Number(1), - &mut rng, - &spec, - ) - }) - .collect::>(); - - // Send blocks and complete terminate response - info.add_blocks( - blocks_req_id, - blocks.iter().map(|b| b.0.clone().into()).collect(), - ) - .unwrap(); - // Assert response is not finished - assert!(!is_finished(&mut info)); + info.add_blocks(blocks_req_id, vec![block.into()]).unwrap(); + let block_root = Hash256::random(); + info.register_custody_block(block_root); - for (i, column_indices) in batched_column_requests.iter().enumerate() { - let (req, _columns) = columns_req_id.get(i).unwrap(); - // Send the set of columns in the same batch request - info.add_custody_columns( - *req, - blocks - .iter() - .flat_map(|b| { - b.1.iter() - .filter(|d| column_indices.contains(d.index())) - .cloned() - }) - .collect::>(), - ) - .unwrap(); - - if i < num_of_data_column_requests - 1 { - assert!( - !is_finished(&mut info), - "requested should not be finished at loop {i}" - ); - } - } - - // All completed construct response - info.responses(da_checker, spec).unwrap().unwrap(); + // Still requesting — responses should return None + assert!(info.responses(da_checker, spec).is_none()); } #[test] - fn missing_custody_columns_from_faulty_peers() { - // GIVEN: A request expecting sampling columns from multiple peers - let spec = Arc::new(test_spec::()); + fn responses_error_on_missing_custody_columns() { + // Block with data but no custody columns registered → error + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker + let expects_custody_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); - let blocks = (0..2) - .map(|_| { - generate_rand_block_and_data_columns::( - ForkName::Fulu, - NumBlobs::Number(1), - &mut rng, - &spec, - ) - }) - .collect::>(); + let (block, _data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_sampling_columns - .iter() - .enumerate() - .map(|(i, column)| { - ( - columns_id( - i as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ), - vec![*column], - ) - }) - .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_sampling_columns.clone())), - Span::none(), + Some(expects_custody_columns), ); - // AND: All blocks are received successfully - info.add_blocks( - blocks_req_id, - blocks.iter().map(|b| b.0.clone().into()).collect(), - ) - .unwrap(); + info.add_blocks(blocks_req_id, vec![block.into()]).unwrap(); - // AND: Only the first 2 sampling columns are received successfully - for (i, &column_index) in expected_sampling_columns.iter().take(2).enumerate() { - let (req, _columns) = columns_req_id.get(i).unwrap(); - info.add_custody_columns( - *req, - blocks - .iter() - .flat_map(|b| b.1.iter().filter(|d| *d.index() == column_index).cloned()) - .collect(), - ) - .unwrap(); - } - - // AND: Remaining column requests are completed with empty data (simulating faulty peers) - for i in 2..expected_sampling_columns.len() { - let (req, _columns) = columns_req_id.get(i).unwrap(); - info.add_custody_columns(*req, vec![]).unwrap(); - } - - // WHEN: Attempting to construct RPC blocks + // No custody columns registered or completed — block has data so this should error let result = info.responses(da_checker, spec).unwrap(); - - // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); - if let Err(super::CouplingError::DataColumnPeerFailure { - error, - faulty_peers, - exceeded_retries, - }) = result - { - assert!(error.contains("Peers did not return column")); - // All columns after the first 2 should be reported as faulty - let expected_faulty_count = expected_sampling_columns.len() - 2; - assert_eq!(faulty_peers.len(), expected_faulty_count); - // Verify the faulty column indices match - for (i, (column_index, _peer)) in faulty_peers.iter().enumerate() { - assert_eq!(*column_index, expected_sampling_columns[i + 2]); - } - assert!(!exceeded_retries); // First attempt, should be false - } else { - panic!("Expected PeerFailure error"); - } } #[test] - fn retry_logic_after_peer_failures() { - // GIVEN: A request expecting sampling columns where some peers initially fail + fn mixed_blocks_with_and_without_data() { + // Mix of blocks: some with blobs (need custody), some without let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker + let expects_custody_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); - let blocks = (0..2) - .map(|_| { - generate_rand_block_and_data_columns::( - ForkName::Fulu, - NumBlobs::Number(1), - &mut rng, - &spec, - ) - }) - .collect::>(); + + let (block_with_data, data_columns) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::Number(1), + &mut rng, + &spec, + ); + let (block_no_data, _) = generate_rand_block_and_data_columns::( + ForkName::Fulu, + NumBlobs::None, + &mut rng, + &spec, + ); let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_sampling_columns - .iter() - .enumerate() - .map(|(i, column)| { - ( - columns_id( - i as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ), - vec![*column], - ) - }) - .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_sampling_columns.clone())), - Span::none(), + Some(expects_custody_columns.clone()), ); - // AND: All blocks are received + let block_root_with_data = beacon_chain::get_block_root(&block_with_data); info.add_blocks( blocks_req_id, - blocks.iter().map(|b| b.0.clone().into()).collect(), - ) - .unwrap(); - - // AND: Only partial sampling columns are received (first column but not others) - let (req0, _) = columns_req_id.first().unwrap(); - info.add_custody_columns( - *req0, - blocks - .iter() - .flat_map(|b| { - b.1.iter() - .filter(|d| *d.index() == expected_sampling_columns[0]) - .cloned() - }) - .collect(), + vec![block_with_data.into(), block_no_data.into()], ) .unwrap(); - // AND: The remaining column requests are completed with empty data (peer failure) - for i in 1..expected_sampling_columns.len() { - let (req, _) = columns_req_id.get(i).unwrap(); - info.add_custody_columns(*req, vec![]).unwrap(); - } - - let result: Result< - Vec>, - crate::sync::block_sidecar_coupling::CouplingError, - > = info.responses(da_checker.clone(), spec.clone()).unwrap(); - assert!(result.is_err()); - - // AND: We retry with a new peer for the failed columns - let new_columns_req_id = columns_id( - 10 as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ); - for column in &expected_sampling_columns[1..] { - let failed_column_requests = vec![(new_columns_req_id, vec![*column])]; - info.reinsert_failed_column_requests(failed_column_requests) - .unwrap(); - } - - // AND: The new peer provides the missing column data - let failed_column_indices: Vec<_> = expected_sampling_columns[1..].to_vec(); + // Only register and complete custody for the block with data + info.register_custody_block(block_root_with_data); + let custody_columns: Vec<_> = data_columns + .iter() + .filter(|d| expects_custody_columns.contains(d.index())) + .cloned() + .collect(); info.add_custody_columns( - new_columns_req_id, - blocks - .iter() - .flat_map(|b| { - b.1.iter() - .filter(|d| failed_column_indices.contains(d.index())) - .cloned() - }) - .collect(), + block_root_with_data, + custody_columns, + PeerGroup::from_set(Default::default()), ) .unwrap(); - // WHEN: Attempting to get responses again - let result = info.responses(da_checker, spec).unwrap(); - - // THEN: Should succeed with complete RPC blocks - assert!(result.is_ok()); - let rpc_blocks = result.unwrap(); - assert_eq!(rpc_blocks.len(), 2); + // Both blocks should resolve — one with columns, one with NoData + info.responses(da_checker, spec).unwrap().unwrap(); } #[test] - fn max_retries_exceeded_behavior() { - // GIVEN: A request where peers consistently fail to provide required columns + fn rpc_block_with_custody_columns_no_data_blocks() { + // Test blocks that don't have blob commitments don't need custody let mut spec = test_spec::(); spec.deneb_fork_epoch = Some(Epoch::new(0)); spec.fulu_fork_epoch = Some(Epoch::new(0)); let spec = Arc::new(spec); let da_checker = Arc::new(test_da_checker(spec.clone(), NodeCustodyType::Fullnode)); - let expected_sampling_columns = da_checker + let expects_custody_columns = da_checker .custody_context() .sampling_columns_for_epoch(Epoch::new(0), &spec) .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); - let blocks = (0..1) + // Generate blocks with NO blobs + let blocks = (0..4) .map(|_| { generate_rand_block_and_data_columns::( ForkName::Fulu, - NumBlobs::Number(1), + NumBlobs::None, &mut rng, &spec, ) @@ -1013,92 +830,20 @@ mod tests { let components_id = components_id(); let blocks_req_id = blocks_id(components_id); - let columns_req_id = expected_sampling_columns - .iter() - .enumerate() - .map(|(i, column)| { - ( - columns_id( - i as Id, - DataColumnsByRangeRequester::ComponentsByRange(components_id), - ), - vec![*column], - ) - }) - .collect::>(); let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expected_sampling_columns.clone())), - Span::none(), + Some(expects_custody_columns), ); - // AND: All blocks are received + // Send blocks info.add_blocks( blocks_req_id, blocks.iter().map(|b| b.0.clone().into()).collect(), ) .unwrap(); - // AND: Only the first sampling column is provided successfully - let (req0, _) = columns_req_id.first().unwrap(); - info.add_custody_columns( - *req0, - blocks - .iter() - .flat_map(|b| { - b.1.iter() - .filter(|d| *d.index() == expected_sampling_columns[0]) - .cloned() - }) - .collect(), - ) - .unwrap(); - - // AND: All other column requests complete with empty data (persistent peer failure) - for i in 1..expected_sampling_columns.len() { - let (req, _) = columns_req_id.get(i).unwrap(); - info.add_custody_columns(*req, vec![]).unwrap(); - } - - // WHEN: Multiple retry attempts are made (up to max retries) - for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(da_checker.clone(), spec.clone()).unwrap(); - assert!(result.is_err()); - - if let Err(super::CouplingError::DataColumnPeerFailure { - exceeded_retries, .. - }) = &result - && *exceeded_retries - { - break; - } - } - - // AND: One final attempt after exceeding max retries - let result = info.responses(da_checker, spec).unwrap(); - - // THEN: Should fail with exceeded_retries = true - assert!(result.is_err()); - if let Err(super::CouplingError::DataColumnPeerFailure { - error: _, - faulty_peers, - exceeded_retries, - }) = result - { - // All columns except the first one should be faulty - let expected_faulty_count = expected_sampling_columns.len() - 1; - assert_eq!(faulty_peers.len(), expected_faulty_count); - - let mut faulty_peers = faulty_peers.into_iter().collect::>(); - // Only the columns that failed (indices 1..N) should be in faulty_peers - for column in &expected_sampling_columns[1..] { - faulty_peers.remove(column); - } - assert!(faulty_peers.is_empty()); - assert!(exceeded_retries); // Should be true after max retries - } else { - panic!("Expected PeerFailure error with exceeded_retries=true"); - } + // Response should be ready immediately (no blocks need custody columns) + info.responses(da_checker, spec).unwrap().unwrap(); } } diff --git a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs index fe4c7dfe4cc..5e0d10ace6c 100644 --- a/beacon_node/network/src/sync/custody_backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/custody_backfill_sync/mod.rs @@ -598,7 +598,6 @@ impl CustodyBackFillSync { && let CouplingError::DataColumnPeerFailure { error, faulty_peers, - exceeded_retries: _, } = coupling_error { for (column_index, faulty_peer) in faulty_peers { @@ -858,7 +857,7 @@ impl CustodyBackFillSync { // The batch is validated } BatchState::Poisoned => unreachable!("Poisoned batch"), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(..) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Columns should have been removed // - AwaitingDownload -> A recoverable failed batch should have been @@ -908,7 +907,7 @@ impl CustodyBackFillSync { crit!("Batch indicates inconsistent data columns while advancing custody sync") } BatchState::AwaitingProcessing(..) => {} - BatchState::Processing(_) => { + BatchState::Processing(..) => { debug!(batch = %id, %batch, "Advancing custody sync while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id >= processing_id diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7e618d89808..1cc04503cc1 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -1250,7 +1250,7 @@ impl SyncManager { if let Some(resp) = self.network.on_blocks_by_range_response(id, peer_id, block) { self.on_range_components_response( id.parent_request_id, - peer_id, + Some(peer_id), RangeBlockComponent::Block(id, resp), ); } @@ -1265,7 +1265,7 @@ impl SyncManager { if let Some(resp) = self.network.on_blobs_by_range_response(id, peer_id, blob) { self.on_range_components_response( id.parent_request_id, - peer_id, + Some(peer_id), RangeBlockComponent::Blob(id, resp), ); } @@ -1281,22 +1281,9 @@ impl SyncManager { .network .on_data_columns_by_range_response(id, peer_id, data_column) { - match id.parent_request_id { - DataColumnsByRangeRequester::ComponentsByRange(components_by_range_req_id) => { - self.on_range_components_response( - components_by_range_req_id, - peer_id, - RangeBlockComponent::CustodyColumns(id, resp), - ); - } - DataColumnsByRangeRequester::CustodyBackfillSync(custody_backfill_req_id) => self - .on_custody_backfill_columns_response( - custody_backfill_req_id, - id, - peer_id, - resp, - ), - } + let DataColumnsByRangeRequester::CustodyBackfillSync(custody_backfill_req_id) = + id.parent_request_id; + self.on_custody_backfill_columns_response(custody_backfill_req_id, id, peer_id, resp); } } @@ -1305,12 +1292,31 @@ impl SyncManager { requester: CustodyRequester, response: CustodyByRootResult, ) { - self.block_lookups - .on_download_response::>( - requester.0, - response, - &mut self.network, - ); + match requester { + CustodyRequester::SingleLookup(id) => { + self.block_lookups + .on_download_response::>( + id, + response, + &mut self.network, + ); + } + CustodyRequester::RangeSync(range_id) => { + // Route custody-by-root results through the standard range components + // response path, reusing the same dispatch to range_sync / backfill. + let peer_group = response + .as_ref() + .ok() + .map(|(_, peer_group, _)| peer_group.clone()) + .unwrap_or_else(|| PeerGroup::from_set(Default::default())); + let peer_id = peer_group.all().next().copied(); + self.on_range_components_response( + range_id.id, + peer_id, + RangeBlockComponent::CustodyResult(range_id.block_root, response, peer_group), + ); + } + } } /// Handles receiving a response for a range sync request that should have both blocks and @@ -1318,15 +1324,19 @@ impl SyncManager { fn on_range_components_response( &mut self, range_request_id: ComponentsByRangeRequestId, - peer_id: PeerId, + peer_id: Option, range_block_component: RangeBlockComponent, ) { + let is_block_response = matches!(range_block_component, RangeBlockComponent::Block(..)); + if let Some(resp) = self .network .range_block_component_response(range_request_id, range_block_component) { match resp { Ok(blocks) => { + // Success path: peer_id should always be available + let peer_id = peer_id.unwrap_or(PeerId::random()); match range_request_id.requester { RangeRequestId::RangeSync { chain_id, batch_id } => { self.range_sync.blocks_by_range_response( @@ -1374,7 +1384,7 @@ impl SyncManager { match self.backfill_sync.inject_error( &mut self.network, batch_id, - &peer_id, + peer_id.as_ref(), range_request_id.id, e, ) { @@ -1384,6 +1394,13 @@ impl SyncManager { } }, } + } else if is_block_response { + // Blocks arrived but columns are still pending. Notify the chain so it can + // start downloading the next batch (the block download gate is now lifted). + if let RangeRequestId::RangeSync { chain_id, .. } = range_request_id.requester { + self.range_sync + .trigger_batch_downloads(&mut self.network, chain_id); + } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 7e2c0d9a94c..2159dc10487 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -50,8 +50,8 @@ use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::data::FixedBlobSidecarList; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, + EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; pub mod custody; @@ -69,9 +69,6 @@ macro_rules! new_range_request_span { }}; } -/// Max retries for block components after which we fail the batch. -pub const MAX_COLUMN_RETRIES: usize = 3; - #[derive(Debug)] pub enum RpcEvent { StreamTermination, @@ -245,10 +242,9 @@ pub enum RangeBlockComponent { BlobsByRangeRequestId, RpcResponseResult>>>, ), - CustodyColumns( - DataColumnsByRangeRequestId, - RpcResponseResult>>>, - ), + /// Custody-by-root result for a specific block root. Arrives after blocks and carries + /// the columns fetched via ActiveCustodyRequest. + CustodyResult(Hash256, CustodyByRootResult, PeerGroup), } #[cfg(test)] @@ -451,104 +447,21 @@ impl SyncNetworkContext { active_request_count_by_peer } - /// Retries only the specified failed columns by requesting them again. - /// - /// Note: This function doesn't retry the whole batch, but retries specific requests within - /// the batch. - pub fn retry_columns_by_range( - &mut self, - id: Id, - peers: &HashSet, - peers_to_deprioritize: &HashSet, - request: BlocksByRangeRequest, - failed_columns: &HashSet, - ) -> Result<(), String> { - let Some((requester, parent_request_span)) = self - .components_by_range_requests - .iter() - .find_map(|(key, value)| { - if key.id == id { - Some((key.requester, value.request_span.clone())) - } else { - None - } - }) - else { - return Err("request id not present".to_string()); - }; - - let active_request_count_by_peer = self.active_request_count_by_peer(); - - debug!( - ?failed_columns, - ?id, - ?requester, - "Retrying only failed column requests from other peers" - ); - - // Attempt to find all required custody peers to request the failed columns from - let columns_by_range_peers_to_request = self - .select_columns_by_range_peers_to_request( - failed_columns, - peers, - active_request_count_by_peer, - peers_to_deprioritize, - ) - .map_err(|e| format!("{:?}", e))?; - - // Reuse the id for the request that received partially correct responses - let id = ComponentsByRangeRequestId { id, requester }; - - let data_column_requests = columns_by_range_peers_to_request - .into_iter() - .map(|(peer_id, columns)| { - self.send_data_columns_by_range_request( - peer_id, - DataColumnsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - columns, - }, - DataColumnsByRangeRequester::ComponentsByRange(id), - new_range_request_span!( - self, - "outgoing_columns_by_range_retry", - parent_request_span.clone(), - peer_id - ), - ) - }) - .collect::, _>>() - .map_err(|e| format!("{:?}", e))?; - - // instead of creating a new `RangeBlockComponentsRequest`, we reinsert - // the new requests created for the failed requests - let Some(range_request) = self.components_by_range_requests.get_mut(&id) else { - return Err( - "retrying custody request for range request that does not exist".to_string(), - ); - }; - - range_request.reinsert_failed_column_requests(data_column_requests)?; - Ok(()) - } - - /// A blocks by range request sent by the range sync algorithm + /// A blocks by range request sent by the range sync algorithm. + /// For Fulu+ epochs (BlocksAndColumns), only sends BlocksByRange upfront. + /// Custody-by-root requests for columns are initiated after blocks arrive. pub fn block_components_by_range_request( &mut self, batch_type: ByRangeRequestType, request: BlocksByRangeRequest, requester: RangeRequestId, block_peers: &HashSet, - column_peers: &HashSet, - peers_to_deprioritize: &HashSet, ) -> Result { let range_request_span = debug_span!( parent: None, "lh_outgoing_range_request", range_req_id = %requester, block_peers = block_peers.len(), - column_peers = column_peers.len() ); let _guard = range_request_span.clone().entered(); let active_request_count_by_peer = self.active_request_count_by_peer(); @@ -557,8 +470,6 @@ impl SyncNetworkContext { .iter() .map(|peer| { ( - // If contains -> 1 (order after), not contains -> 0 (order first) - peers_to_deprioritize.contains(peer), // Prefer peers with less overall requests active_request_count_by_peer.get(peer).copied().unwrap_or(0), // Random factor to break ties, otherwise the PeerID breaks ties @@ -567,7 +478,7 @@ impl SyncNetworkContext { ) }) .min() - .map(|(_, _, _, peer)| *peer) + .map(|(_, _, peer)| *peer) else { // Backfill and forward sync handle this condition gracefully. // - Backfill sync: will pause waiting for more peers to join @@ -575,26 +486,6 @@ impl SyncNetworkContext { return Err(RpcRequestSendError::NoPeer(NoPeerError::BlockPeer)); }; - // Attempt to find all required custody peers before sending any request or creating an ID - let columns_by_range_peers_to_request = - if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) { - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let column_indexes = self - .chain - .sampling_columns_for_epoch(epoch) - .iter() - .cloned() - .collect(); - Some(self.select_columns_by_range_peers_to_request( - &column_indexes, - column_peers, - active_request_count_by_peer, - peers_to_deprioritize, - )?) - } else { - None - }; - // Create the overall components_by_range request ID before its individual components let id = ComponentsByRangeRequestId { id: self.next_id(), @@ -632,48 +523,34 @@ impl SyncNetworkContext { None }; - let data_column_requests = columns_by_range_peers_to_request - .map(|columns_by_range_peers_to_request| { - columns_by_range_peers_to_request - .into_iter() - .map(|(peer_id, columns)| { - self.send_data_columns_by_range_request( - peer_id, - DataColumnsByRangeRequest { - start_slot: *request.start_slot(), - count: *request.count(), - columns, - }, - DataColumnsByRangeRequester::ComponentsByRange(id), - new_range_request_span!( - self, - "outgoing_columns_by_range", - range_request_span.clone(), - peer_id - ), - ) - }) - .collect::, _>>() - }) - .transpose()?; + // For BlocksAndColumns, compute expected custody columns but don't send requests yet. + // Custody-by-root requests will be initiated after blocks arrive. + let expects_custody_columns = if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) + { + let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); + Some(self.chain.sampling_columns_for_epoch(epoch).to_vec()) + } else { + None + }; - let epoch = Slot::new(*request.start_slot()).epoch(T::EthSpec::slots_per_epoch()); - let info = RangeBlockComponentsRequest::new( - blocks_req_id, - blobs_req_id, - data_column_requests.map(|data_column_requests| { - ( - data_column_requests, - self.chain.sampling_columns_for_epoch(epoch).to_vec(), - ) - }), - range_request_span, - ); + let info = + RangeBlockComponentsRequest::new(blocks_req_id, blobs_req_id, expects_custody_columns); self.components_by_range_requests.insert(id, info); Ok(id.id) } + /// Returns true if there is a pending range sync request for the given chain where + /// blocks have not yet been received. Used to serialize block downloads for PeerDAS. + pub fn has_pending_block_range_download(&self, chain_id: Id) -> bool { + self.components_by_range_requests.iter().any(|(id, req)| { + matches!( + id.requester, + RangeRequestId::RangeSync { chain_id: cid, .. } if cid == chain_id + ) && !req.blocks_received() + }) + } + fn select_columns_by_range_peers_to_request( &self, custody_indexes: &HashSet, @@ -729,80 +606,61 @@ impl SyncNetworkContext { Ok(columns_to_request_by_peer) } - /// Received a blocks by range or blobs by range response for a request that couples blocks ' - /// and blobs. + /// Received a blocks by range, blobs by range, or custody-by-root response for a request + /// that couples blocks with their data. The coupling struct handles initiating custody-by-root + /// requests when blocks arrive. pub fn range_block_component_response( &mut self, id: ComponentsByRangeRequestId, range_block_component: RangeBlockComponent, ) -> Option>, RpcResponseError>> { - let Entry::Occupied(mut entry) = self.components_by_range_requests.entry(id) else { - metrics::inc_counter_vec(&metrics::SYNC_UNKNOWN_NETWORK_REQUESTS, &["range_blocks"]); - return None; - }; - - if let Err(e) = { - let request = entry.get_mut(); - match range_block_component { - RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { - request.add_blocks(req_id, blocks).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( - e, - )) - }) - }), - RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { - request.add_blobs(req_id, blobs).map_err(|e| { - RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError( - e, - )) - }) - }), - RangeBlockComponent::CustodyColumns(req_id, resp) => { - resp.and_then(|(custody_columns, _)| { - request - .add_custody_columns(req_id, custody_columns) - .map_err(|e| { - RpcResponseError::BlockComponentCouplingError( - CouplingError::InternalError(e), - ) - }) - }) - } + // Remove from map to allow passing &mut self to continue_requests + let mut request = self.components_by_range_requests.remove(&id)?; + + // Add the incoming component + let add_result = match range_block_component { + RangeBlockComponent::Block(req_id, resp) => resp.and_then(|(blocks, _)| { + request.add_blocks(req_id, blocks).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError(e)) + }) + }), + RangeBlockComponent::Blob(req_id, resp) => resp.and_then(|(blobs, _)| { + request.add_blobs(req_id, blobs).map_err(|e| { + RpcResponseError::BlockComponentCouplingError(CouplingError::InternalError(e)) + }) + }), + RangeBlockComponent::CustodyResult(block_root, resp, peer_group) => { + resp.and_then(|(columns, _peer_group, _seen_timestamp)| { + request + .add_custody_columns(block_root, columns, peer_group) + .map_err(|e| { + RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ) + }) + }) } - } { - entry.remove(); + }; + if let Err(e) = add_result { return Some(Err(e)); } - let range_req = entry.get_mut(); - if let Some(blocks_result) = range_req.responses( + // Let the coupling struct initiate any follow-up requests (custody-by-root) + if let Err(e) = request.continue_requests(id, self) { + return Some(Err(RpcResponseError::BlockComponentCouplingError( + CouplingError::InternalError(e), + ))); + } + + // Check if all components have arrived + if let Some(blocks_result) = request.responses( self.chain.data_availability_checker.clone(), self.chain.spec.clone(), ) { - if let Err(CouplingError::DataColumnPeerFailure { - error, - faulty_peers: _, - exceeded_retries, - }) = &blocks_result - { - // Remove the entry if it's a peer failure **and** retry counter is exceeded - if *exceeded_retries { - debug!( - entry=?entry.key(), - msg = error, - "Request exceeded max retries, failing batch" - ); - entry.remove(); - }; - } else { - // also remove the entry only if it coupled successfully - // or if it isn't a column peer failure. - entry.remove(); - } - // If the request is finished, dequeue everything Some(blocks_result.map_err(RpcResponseError::BlockComponentCouplingError)) } else { + // Re-insert — still waiting for more components + self.components_by_range_requests.insert(id, request); None } } @@ -1078,26 +936,36 @@ impl SyncNetworkContext { /// any request to the network if no columns have to be fetched based on the import state of the /// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root` /// requests. + /// + /// The caller provides `custody_indexes_to_fetch` — the set of column indices needed. For + /// single lookups this should be derived from the current epoch minus already-imported columns. + /// For range sync this should use the batch epoch columns (already computed at request creation). + /// Initiate a custody-by-root request for the given block root. + /// + /// When `ignore_cache` is true, the DA checker cache is not consulted and all custody + /// columns are fetched. This is used by range sync where blocks are historical and + /// won't have gossip-imported columns in the cache. pub fn custody_lookup_request( &mut self, - lookup_id: SingleLookupId, + requester: CustodyRequester, block_root: Hash256, + block_epoch: Epoch, + ignore_cache: bool, lookup_peers: Arc>>, ) -> Result { - let custody_indexes_imported = self - .chain - .data_availability_checker - .cached_data_column_indexes(&block_root) - .unwrap_or_default(); - - let current_epoch = self.chain.epoch().map_err(|e| { - RpcRequestSendError::InternalError(format!("Unable to read slot clock {:?}", e)) - })?; + let custody_indexes_imported = if ignore_cache { + Default::default() + } else { + self.chain + .data_availability_checker + .cached_data_column_indexes(&block_root) + .unwrap_or_default() + }; - // Include only the blob indexes not yet imported (received through gossip) + // Include only the column indexes not yet imported (received through gossip) let mut custody_indexes_to_fetch = self .chain - .sampling_columns_for_epoch(current_epoch) + .sampling_columns_for_epoch(block_epoch) .iter() .copied() .filter(|index| !custody_indexes_imported.contains(index)) @@ -1109,19 +977,21 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), - }; - debug!( ?block_root, indices = ?custody_indexes_to_fetch, - %id, + %requester, "Starting custody columns request" ); - let requester = CustodyRequester(id); + // Extract the caller-allocated req_id before continue_requests() increments + // self.request_id internally. For single lookups, the caller stores this req_id in + // State::Downloading and later matches it against response req_ids. + let caller_req_id = match &requester { + CustodyRequester::SingleLookup(id) => id.req_id, + CustodyRequester::RangeSync(id) => id.id.id, + }; + let mut request = ActiveCustodyRequest::new( block_root, CustodyId { requester }, @@ -1136,7 +1006,7 @@ impl SyncNetworkContext { // created cannot return data immediately, it must send some request to the network // first. And there must exist some request, `custody_indexes_to_fetch` is not empty. self.custody_by_root_requests.insert(requester, request); - Ok(LookupRequestResult::RequestSent(id.req_id)) + Ok(LookupRequestResult::RequestSent(caller_req_id)) } Err(e) => Err(match e { CustodyRequestError::NoPeer(column_index) => { diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs index ae0eee99648..b9bac9bce7b 100644 --- a/beacon_node/network/src/sync/network_context/custody.rs +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -45,8 +45,8 @@ pub enum Error { /// There should only exist a single request at a time. Having multiple requests is a bug and /// can result in undefined state, so it's treated as a hard error and the lookup is dropped. UnexpectedRequestId { - expected_req_id: DataColumnsByRootRequestId, - req_id: DataColumnsByRootRequestId, + expected_req_id: Box, + req_id: Box, }, } @@ -441,8 +441,8 @@ impl ColumnRequest { Status::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(Error::UnexpectedRequestId { - expected_req_id: *expected_req_id, - req_id, + expected_req_id: Box::new(*expected_req_id), + req_id: Box::new(req_id), }); } self.status = Status::NotStarted(Instant::now()); @@ -474,8 +474,8 @@ impl ColumnRequest { Status::Downloading(expected_req_id) => { if req_id != *expected_req_id { return Err(Error::UnexpectedRequestId { - expected_req_id: *expected_req_id, - req_id, + expected_req_id: Box::new(*expected_req_id), + req_id: Box::new(req_id), }); } self.status = Status::Downloaded(peer_id, data_column, seen_timestamp); diff --git a/beacon_node/network/src/sync/range_data_column_batch_request.rs b/beacon_node/network/src/sync/range_data_column_batch_request.rs index 4a6987a752b..95f4ca2af2f 100644 --- a/beacon_node/network/src/sync/range_data_column_batch_request.rs +++ b/beacon_node/network/src/sync/range_data_column_batch_request.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, HashSet}; use crate::sync::block_sidecar_coupling::{ByRangeRequest, CouplingError}; -use crate::sync::network_context::MAX_COLUMN_RETRIES; use beacon_chain::{BeaconChain, BeaconChainTypes}; use itertools::Itertools; use lighthouse_network::PeerId; @@ -105,7 +104,6 @@ impl RangeDataColumnBatchRequest { if let Err(CouplingError::DataColumnPeerFailure { error: _, faulty_peers, - exceeded_retries: _, }) = &resp { for (_, peer) in faulty_peers.iter() { @@ -123,7 +121,7 @@ impl RangeDataColumnBatchRequest { mut received_columns_for_slot: HashMap>, column_to_peer: HashMap, expected_custody_columns: &HashSet, - attempt: usize, + _attempt: usize, ) -> Result, CouplingError> { let mut naughty_peers = vec![]; let mut result: DataColumnSidecarList = vec![]; @@ -297,7 +295,6 @@ impl RangeDataColumnBatchRequest { return Err(CouplingError::DataColumnPeerFailure { error: "Bad or missing columns for some slots".to_string(), faulty_peers: naughty_peers, - exceeded_retries: attempt >= MAX_COLUMN_RETRIES, }); } diff --git a/beacon_node/network/src/sync/range_sync/chain.rs b/beacon_node/network/src/sync/range_sync/chain.rs index e3ff638121f..6557bc95dbc 100644 --- a/beacon_node/network/src/sync/range_sync/chain.rs +++ b/beacon_node/network/src/sync/range_sync/chain.rs @@ -19,7 +19,7 @@ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use strum::IntoStaticStr; use tracing::{Span, debug, error, instrument, warn}; -use types::{ColumnIndex, Epoch, EthSpec, Hash256, Slot}; +use types::{Epoch, EthSpec, Hash256, Slot}; /// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of /// blocks per batch are requested _at most_. A batch may request less blocks to account for @@ -302,6 +302,9 @@ impl SyncingChain { // TODO(das): should use peer group here https://github.com/sigp/lighthouse/issues/6258 let received = blocks.len(); + if let Some(duration) = batch.time_since_downloading() { + metrics::observe_duration(&metrics::SYNCING_CHAIN_BATCH_DOWNLOADING, duration); + } batch.download_completed(blocks, *peer_id)?; let awaiting_batches = batch_id .saturating_sub(self.optimistic_start.unwrap_or(self.processing_target)) @@ -352,6 +355,15 @@ impl SyncingChain { &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING, duration_in_awaiting_processing, ); + let awaiting_processing_count = self + .batches + .values() + .filter(|b| matches!(b.state(), BatchState::AwaitingProcessing(..))) + .count(); + metrics::observe( + &metrics::SYNCING_CHAIN_BATCH_AWAITING_PROCESSING_COUNT, + awaiting_processing_count as f64, + ); let process_id = ChainSegmentProcessId::RangeBatchId(self.id, batch_id); self.current_processing_batch = Some(batch_id); @@ -401,7 +413,7 @@ impl SyncingChain { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(KeepChain), - BatchState::Processing(_) | BatchState::Failed => { + BatchState::Processing(..) | BatchState::Failed => { // these are all inconsistent states: // - Processing -> `self.current_processing_batch` is None // - Failed -> non recoverable batch. For an optimistic batch, it should @@ -438,7 +450,7 @@ impl SyncingChain { // Batches can be in `AwaitingDownload` state if there weren't good data column subnet // peers to send the request to. BatchState::AwaitingDownload => return Ok(KeepChain), - BatchState::Failed | BatchState::Processing(_) => { + BatchState::Failed | BatchState::Processing(..) => { // these are all inconsistent states: // - Failed -> non recoverable batch. Chain should have been removed // - AwaitingDownload -> A recoverable failed batch should have been @@ -524,6 +536,10 @@ impl SyncingChain { )) })?; + if let Some(duration) = batch.time_since_processing() { + metrics::observe_duration(&metrics::SYNCING_CHAIN_BATCH_PROCESSING, duration); + } + // Log the process result and the batch for debugging purposes. debug!( result = ?result, @@ -741,7 +757,7 @@ impl SyncingChain { crit!("batch indicates inconsistent chain state while advancing chain") } BatchState::AwaitingProcessing(..) => {} - BatchState::Processing(_) => { + BatchState::Processing(..) => { debug!(batch = %id, %batch, "Advancing chain while processing a batch"); if let Some(processing_id) = self.current_processing_batch && id <= processing_id @@ -901,7 +917,7 @@ impl SyncingChain { &mut self, network: &mut SyncNetworkContext, batch_id: BatchId, - peer_id: &PeerId, + peer_id: Option<&PeerId>, request_id: Id, err: RpcResponseError, ) -> ProcessingResult { @@ -910,51 +926,15 @@ impl SyncingChain { if let Some(batch) = self.batches.get_mut(&batch_id) { if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err { match coupling_error { - CouplingError::DataColumnPeerFailure { - error, - faulty_peers, - exceeded_retries, - } => { - debug!(?batch_id, error, "Block components coupling error"); - // Note: we don't fail the batch here because a `CouplingError` is - // recoverable by requesting from other honest peers. - let mut failed_columns = HashSet::new(); - let mut failed_peers = HashSet::new(); - for (column, peer) in faulty_peers { - failed_columns.insert(*column); - failed_peers.insert(*peer); - } - // Retry the failed columns if the column requests haven't exceeded the - // max retries. Otherwise, remove treat it as a failed batch below. - if !*exceeded_retries { - // Set the batch back to `AwaitingDownload` before retrying. - // This is to ensure that the batch doesn't get stuck in `Downloading` state. - // - // DataColumn retries has a retry limit so calling `downloading_to_awaiting_download` - // is safe. - if let BatchOperationOutcome::Failed { blacklist } = - batch.downloading_to_awaiting_download()? - { - return Err(RemoveChain::ChainFailed { - blacklist, - failing_batch: batch_id, - }); - } - return self.retry_partial_batch( - network, - batch_id, - request_id, - failed_columns, - failed_peers, - ); - } - } CouplingError::BlobPeerFailure(msg) => { debug!(?batch_id, msg, "Blob peer failure"); } CouplingError::InternalError(msg) => { error!(?batch_id, msg, "Block components coupling internal error"); } + CouplingError::DataColumnPeerFailure { error, .. } => { + debug!(?batch_id, error, "Data column peer failure"); + } } } // A batch could be retried without the peer failing the request (disconnecting/ @@ -964,7 +944,7 @@ impl SyncingChain { debug!( batch_epoch = %batch_id, batch_state = ?batch.state(), - %peer_id, + ?peer_id, %request_id, ?batch_state, "Batch not expecting block" @@ -975,13 +955,12 @@ impl SyncingChain { batch_epoch = %batch_id, batch_state = ?batch.state(), error = ?err, - %peer_id, + ?peer_id, %request_id, "Batch download error" ); - if let BatchOperationOutcome::Failed { blacklist } = - batch.download_failed(Some(*peer_id))? - { + let dl_outcome = batch.download_failed(peer_id.copied())?; + if let BatchOperationOutcome::Failed { blacklist } = dl_outcome { return Err(RemoveChain::ChainFailed { blacklist, failing_batch: batch_id, @@ -994,7 +973,7 @@ impl SyncingChain { } else { debug!( batch_epoch = %batch_id, - %peer_id, + ?peer_id, %request_id, batch_state, "Batch not found" @@ -1050,15 +1029,6 @@ impl SyncingChain { let batch_state = self.visualize_batch_state(); if let Some(batch) = self.batches.get_mut(&batch_id) { let (request, batch_type) = batch.to_blocks_by_range_request(); - let failed_peers = batch.failed_peers(); - - let synced_column_peers = network - .network_globals() - .peers - .read() - .synced_peers_for_epoch(batch_id) - .cloned() - .collect::>(); match network.block_components_by_range_request( batch_type, @@ -1069,12 +1039,6 @@ impl SyncingChain { }, // Request blocks only from peers of this specific chain &self.peers, - // Request column from all synced peers, even if they are not part of this chain. - // This is to avoid splitting of good column peers across many head chains in a heavy forking - // environment. If the column peers and block peer are on different chains, then we return - // a coupling error and retry only the columns that failed to couple. See `Self::retry_partial_batch`. - &synced_column_peers, - &failed_peers, ) { Ok(request_id) => { // inform the batch about the new request @@ -1122,55 +1086,6 @@ impl SyncingChain { Ok(KeepChain) } - /// Retries partial column requests within the batch by creating new requests for the failed columns. - fn retry_partial_batch( - &mut self, - network: &mut SyncNetworkContext, - batch_id: BatchId, - id: Id, - failed_columns: HashSet, - mut failed_peers: HashSet, - ) -> ProcessingResult { - let _guard = self.span.clone().entered(); - debug!(%batch_id, %id, ?failed_columns, "Retrying partial batch"); - if let Some(batch) = self.batches.get_mut(&batch_id) { - failed_peers.extend(&batch.failed_peers()); - let req = batch.to_blocks_by_range_request().0; - - let synced_peers = network - .network_globals() - .peers - .read() - .synced_peers_for_epoch(batch_id) - .cloned() - .collect::>(); - - match network.retry_columns_by_range( - id, - &synced_peers, - &failed_peers, - req, - &failed_columns, - ) { - Ok(_) => { - // inform the batch about the new request - batch.start_downloading(id)?; - debug!( - ?batch_id, - id, "Retried column requests from different peers" - ); - return Ok(KeepChain); - } - Err(e) => { - // No need to explicitly fail the batch since its in `AwaitingDownload` state - // before we attempted to retry. - debug!(?batch_id, id, e, "Failed to retry partial batch"); - } - } - } - Ok(KeepChain) - } - /// Returns true if this chain is currently syncing. pub fn is_syncing(&self) -> bool { match self.state { @@ -1198,7 +1113,10 @@ impl SyncingChain { /// Attempts to request the next required batches from the peer pool if the chain is syncing. It will exhaust the peer /// pool and left over batches until the batch buffer is reached or all peers are exhausted. - fn request_batches(&mut self, network: &mut SyncNetworkContext) -> ProcessingResult { + pub(super) fn request_batches( + &mut self, + network: &mut SyncNetworkContext, + ) -> ProcessingResult { if !matches!(self.state, ChainSyncingState::Syncing) { return Ok(KeepChain); } @@ -1291,6 +1209,13 @@ impl SyncingChain { return None; } + // Don't start downloading the next batch until the previous batch's blocks have + // arrived. This serializes the block download phase while allowing custody-by-root + // requests to run in parallel across batches. + if network.has_pending_block_range_download(self.id) { + return None; + } + // don't send batch requests until we have peers on sampling subnets // TODO(das): this is a workaround to avoid sending out excessive block requests because // block and data column requests are currently coupled. This can be removed once we find a diff --git a/beacon_node/network/src/sync/range_sync/range.rs b/beacon_node/network/src/sync/range_sync/range.rs index 9fd72ac98a6..b83f21a9a09 100644 --- a/beacon_node/network/src/sync/range_sync/range.rs +++ b/beacon_node/network/src/sync/range_sync/range.rs @@ -301,7 +301,7 @@ where pub fn inject_error( &mut self, network: &mut SyncNetworkContext, - peer_id: PeerId, + peer_id: Option, batch_id: BatchId, chain_id: ChainId, request_id: Id, @@ -309,7 +309,7 @@ where ) { // check that this request is pending match self.chains.call_by_id(chain_id, |chain| { - chain.inject_error(network, batch_id, &peer_id, request_id, err) + chain.inject_error(network, batch_id, peer_id.as_ref(), request_id, err) }) { Ok((removed_chain, sync_type)) => { if let Some((removed_chain, remove_reason)) = removed_chain { @@ -380,6 +380,33 @@ where self.chains.register_metrics(); } + /// Notifies the chain that a block download has completed (blocks received, columns may still + /// be pending). This allows the chain to start downloading the next batch immediately. + pub fn trigger_batch_downloads( + &mut self, + network: &mut SyncNetworkContext, + chain_id: ChainId, + ) { + match self + .chains + .call_by_id(chain_id, |chain| chain.request_batches(network)) + { + Ok((None, _)) => {} + Ok((Some((removed_chain, remove_reason)), sync_type)) => { + self.on_chain_removed( + removed_chain, + sync_type, + remove_reason, + network, + "trigger_batch_downloads", + ); + } + Err(_) => { + debug!(%chain_id, "trigger_batch_downloads for removed chain"); + } + } + } + /// Kickstarts sync. pub fn resume(&mut self, network: &mut SyncNetworkContext) { for (removed_chain, sync_type, remove_reason) in diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 67395ccd25a..3dc8bf75698 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -14,12 +14,10 @@ use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types:: use beacon_processor::WorkType; use lighthouse_network::rpc::RequestType; use lighthouse_network::rpc::methods::{ - BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, - OldBlocksByRangeRequestV2, StatusMessageV2, + BlobsByRangeRequest, OldBlocksByRangeRequest, OldBlocksByRangeRequestV2, StatusMessageV2, }; use lighthouse_network::service::api_types::{ - AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, DataColumnsByRangeRequestId, - SyncRequestId, + AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, SyncRequestId, }; use lighthouse_network::{PeerId, SyncInfo}; use std::time::Duration; @@ -38,7 +36,7 @@ pub(crate) enum DataSidecars { enum ByRangeDataRequestIds { PreDeneb, PrePeerDAS(BlobsByRangeRequestId, PeerId), - PostPeerDAS(Vec<(DataColumnsByRangeRequestId, PeerId)>), + PostPeerDAS, } /// Sync tests are usually written in the form: @@ -236,24 +234,9 @@ impl TestRig { }); let by_range_data_requests = if self.is_after_fulu() { - let mut data_columns_requests = vec![]; - while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { - NetworkMessage::SendRequest { - peer_id, - request: - RequestType::DataColumnsByRange(DataColumnsByRangeRequest { - start_slot, .. - }), - app_request_id: AppRequestId::Sync(SyncRequestId::DataColumnsByRange(id)), - } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), - _ => None, - }) { - data_columns_requests.push(data_columns_request); - } - if data_columns_requests.is_empty() { - panic!("Found zero DataColumnsByRange requests, filter {request_filter:?}"); - } - ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) + // Post-PeerDAS: no DataColumnsByRange requests are sent upfront. + // Custody-by-root requests are triggered after blocks arrive via continue_requests(). + ByRangeDataRequestIds::PostPeerDAS } else if self.is_after_deneb() { let (id, peer) = self .pop_received_network_event(|ev| match ev { @@ -307,19 +290,9 @@ impl TestRig { seen_timestamp: D, }); } - ByRangeDataRequestIds::PostPeerDAS(data_column_req_ids) => { - // Complete the request with a single stream termination - for (id, peer_id) in data_column_req_ids { - self.log(&format!( - "Completing DataColumnsByRange request {id:?} with empty stream" - )); - self.send_sync_message(SyncMessage::RpcDataColumn { - sync_request_id: SyncRequestId::DataColumnsByRange(id), - peer_id, - data_column: None, - seen_timestamp: D, - }); - } + ByRangeDataRequestIds::PostPeerDAS => { + // Post-PeerDAS: no DataColumnsByRange requests to complete. + // With empty blocks, continue_requests() won't trigger custody-by-root. } }