diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 5978e97c4d9..068660b5f50 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -152,16 +152,20 @@ impl RpcBlock { block_root: Option, block: Arc>, custody_columns: Vec>, - ) -> Result { + ) -> Result { let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); if block.num_expected_blobs() > 0 && custody_columns.is_empty() { // The number of required custody columns is out of scope here. - return Err(AvailabilityCheckError::MissingCustodyColumns); + return Err("missing expected columns".to_string()); } // Treat empty data column lists as if they are missing. let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns(block, VariableList::new(custody_columns)?) + RpcBlockInner::BlockAndCustodyColumns( + block, + VariableList::new(custody_columns) + .map_err(|e| format!("Too many columns {e:?}"))?, + ) } else { RpcBlockInner::Block(block) }; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 05d67e4504a..77bbd0b4542 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -2460,7 +2460,8 @@ where .filter(|d| sampling_columns.contains(&d.index)) .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, columns)? + RpcBlock::new_with_custody_columns(Some(block_root), block, columns) + .expect("cannot build RpcBlock with columns") } else { RpcBlock::new_without_blobs(Some(block_root), block) } diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index f1a4d87de76..94750df3ed8 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -19,9 +19,9 @@ pub struct SingleLookupReqId { #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum SyncRequestId { /// Request searching for a block given a hash. - SingleBlock { id: SingleLookupReqId }, + SingleBlock(BlocksByRootRequestId), /// Request searching for a set of blobs given a hash. - SingleBlob { id: SingleLookupReqId }, + SingleBlob(BlobsByRootRequestId), /// Request searching for a set of data columns given a hash and list of column indices. DataColumnsByRoot(DataColumnsByRootRequestId), /// Blocks by range request @@ -32,6 +32,22 @@ pub enum SyncRequestId { DataColumnsByRange(DataColumnsByRangeRequestId), } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlocksByRootRequestId { + /// Id to identify this attempt at a blocks_by_root request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Root request for block components. + pub parent_request_id: ComponentsByRootRequestId, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct BlobsByRootRequestId { + /// Id to identify this attempt at a blocks_by_root request for `parent_request_id` + pub id: Id, + /// The Id of the overall By Root request for block components. + pub parent_request_id: ComponentsByRootRequestId, +} + /// Request ID for data_columns_by_root requests. Block lookups do not issue this request directly. /// Wrapping this particular req_id, ensures not mixing this request with a custody req_id. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -86,6 +102,17 @@ pub struct ComponentsByRangeRequestId { pub requester: RangeRequestId, } +/// Block components by root request for lookup sync. Includes an ID for downstream consumers to +/// handle retries and tie all their sub requests together. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct ComponentsByRootRequestId { + /// Each `Id` may request the same data in a later retry. This Id identifies the + /// current attempt. + pub id: Id, + /// What sync component is issuing a components by range request and expecting data back + pub requester: RangeRequestId, +} + /// A batch of data columns by range request for custody sync. Includes an ID for downstream consumers to /// handle retries and tie all the range requests for the given epoch together. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] @@ -234,10 +261,13 @@ macro_rules! impl_display { // Since each request Id is deeply nested with various types, if rendered with Debug on logs they // take too much visual space. This custom Display implementations make the overall Id short while // not losing information +impl_display!(BlocksByRootRequestId, "{}/{}", id, parent_request_id); impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id); +impl_display!(BlobsByRootRequestId, "{}/{}", id, parent_request_id); impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id); impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester); +impl_display!(ComponentsByRootRequestId, "{}/{}", id, requester); impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester); impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id); impl_display!(CustodyId, "{}", requester); diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index c6b05190871..e69de29bb2d 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -1,217 +0,0 @@ -use crate::sync::block_lookups::single_block_lookup::{ - LookupRequestError, SingleBlockLookup, SingleLookupRequestState, -}; -use crate::sync::block_lookups::{ - BlobRequestState, BlockRequestState, CustodyRequestState, PeerId, -}; -use crate::sync::manager::BlockProcessType; -use crate::sync::network_context::{LookupRequestResult, SyncNetworkContext}; -use beacon_chain::BeaconChainTypes; -use lighthouse_network::service::api_types::Id; -use parking_lot::RwLock; -use std::collections::HashSet; -use std::sync::Arc; -use types::blob_sidecar::FixedBlobSidecarList; -use types::{DataColumnSidecarList, SignedBeaconBlock}; - -use super::SingleLookupId; -use super::single_block_lookup::{ComponentRequests, DownloadResult}; - -#[derive(Debug, Copy, Clone)] -pub enum ResponseType { - Block, - Blob, - CustodyColumn, -} - -/// This trait unifies common single block lookup functionality across blocks and blobs. This -/// includes making requests, verifying responses, and handling processing results. A -/// `SingleBlockLookup` includes both a `BlockRequestState` and a `BlobRequestState`, this trait is -/// implemented for each. -/// -/// The use of the `ResponseType` associated type gives us a degree of type -/// safety when handling a block/blob response ensuring we only mutate the correct corresponding -/// state. -pub trait RequestState { - /// The type created after validation. - type VerifiedResponseType: Clone; - - /// Request the network context to prepare a request of a component of `block_root`. If the - /// request is not necessary because the component is already known / processed, return false. - /// Return true if it sent a request and we can expect an event back from the network. - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result; - - /* Response handling methods */ - - /// Send the response to the beacon processor. - fn send_for_processing( - id: Id, - result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError>; - - /* Utility methods */ - - /// Returns the `ResponseType` associated with this trait implementation. Useful in logging. - fn response_type() -> ResponseType; - - /// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait. - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str>; - - /// A getter for a reference to the `SingleLookupRequestState` associated with this trait. - fn get_state(&self) -> &SingleLookupRequestState; - - /// A getter for a mutable reference to the SingleLookupRequestState associated with this trait. - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState; -} - -impl RequestState for BlockRequestState { - type VerifiedResponseType = Arc>; - - fn make_request( - &self, - id: SingleLookupId, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.block_lookup_request(id, lookup_peers, self.requested_block_root) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: SingleLookupId, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_block_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Block - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - Ok(&mut request.block_request_state) - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for BlobRequestState { - type VerifiedResponseType = FixedBlobSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - expected_blobs: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.blob_lookup_request(id, lookup_peers, self.block_root, expected_blobs) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_blobs_for_processing(id, block_root, value, seen_timestamp) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::Blob - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest(request, _) => Ok(request), - ComponentRequests::ActiveCustodyRequest { .. } => Err("expecting custody request"), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} - -impl RequestState for CustodyRequestState { - type VerifiedResponseType = DataColumnSidecarList; - - fn make_request( - &self, - id: Id, - lookup_peers: Arc>>, - _: usize, - cx: &mut SyncNetworkContext, - ) -> Result { - cx.custody_lookup_request(id, self.block_root, lookup_peers) - .map_err(LookupRequestError::SendFailedNetwork) - } - - fn send_for_processing( - id: Id, - download_result: DownloadResult, - cx: &SyncNetworkContext, - ) -> Result<(), LookupRequestError> { - let DownloadResult { - value, - block_root, - seen_timestamp, - .. - } = download_result; - cx.send_custody_columns_for_processing( - id, - block_root, - value, - seen_timestamp, - BlockProcessType::SingleCustodyColumn(id), - ) - .map_err(LookupRequestError::SendFailedProcessor) - } - - fn response_type() -> ResponseType { - ResponseType::CustodyColumn - } - fn request_state_mut(request: &mut SingleBlockLookup) -> Result<&mut Self, &'static str> { - match &mut request.component_requests { - ComponentRequests::WaitingForBlock => Err("waiting for block"), - ComponentRequests::ActiveBlobRequest { .. } => Err("expecting blob request"), - ComponentRequests::ActiveCustodyRequest(request) => Ok(request), - ComponentRequests::NotNeeded { .. } => Err("not needed"), - } - } - fn get_state(&self) -> &SingleLookupRequestState { - &self.state - } - fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { - &mut self.state - } -} diff --git a/beacon_node/network/src/sync/block_lookups/header_chain.rs b/beacon_node/network/src/sync/block_lookups/header_chain.rs new file mode 100644 index 00000000000..b2142f3fd83 --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/header_chain.rs @@ -0,0 +1,190 @@ +use beacon_chain::BeaconChainTypes; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::HeaderChainId; +use std::collections::{HashMap, VecDeque}; +use types::{BeaconBlockHeader, Hash256, Slot}; + +use crate::sync::block_lookups::Error; +use crate::sync::block_lookups::header_request::HeaderRequest; +use crate::sync::network_context::{InternalError, SyncNetworkContext}; + +/// Minimum data that HeaderChain needs to track for already downloaded headers +type PendingBlock = (Hash256, Slot); + +#[derive(Copy, Clone, Debug)] +pub struct PeerStatusSummary { + pub max_slot: Slot, + pub min_slot: Slot, +} + +pub(crate) struct HeaderChain { + id: HeaderChainId, + /// Headers descendant of `next_header_request.block_root` that are already downloaded. + /// Does not include `next_header_request.block_root`. + /// Sorting: tip first, oldest ancestor last + block_roots: VecDeque, + status: HeaderChainStatus, + /// Peers that claim to have imported the oldest ancestor of this chain + peers: HashMap, +} + +enum HeaderChainStatus { + Backfill { + /// Oldest ancestor block root of this Chain. + next_request: HeaderRequest, + }, + WaitingParent { + /// Parent root of the last block_root in `block_roots` + parent_root: Hash256, + /// True if the oldest ancestor can start downloading + ready_to_sync: bool, + }, +} + +impl HeaderChain { + pub fn new( + initial_block_root: Hash256, + id: HeaderChainId, + initial_peer: PeerId, + initial_peer_status: PeerStatusSummary, + ) -> Self { + Self { + id, + block_roots: <_>::default(), + status: HeaderChainStatus::Backfill { + next_request: HeaderRequest::new(initial_block_root, id), + }, + peers: HashMap::from_iter([(initial_peer, initial_peer_status)]), + } + } + + /// Continues the header or blocks requests of this chain + pub fn continue_requests( + &mut self, + cx: &mut SyncNetworkContext, + ) -> Result<(), Error> { + match &mut self.status { + HeaderChainStatus::Backfill { next_request } => { + Ok(next_request.continue_request(self.peers.keys(), cx)?) + } + _ => Ok(()), + } + } + + fn add_ancestor(&mut self, header: BeaconBlockHeader) -> Result<(), InternalError> { + match &mut self.status { + HeaderChainStatus::Backfill { next_request, .. } => { + self.block_roots + .push_back((next_request.block_root, header.slot)); + *next_request = HeaderRequest::new(header.parent_root, self.id); + Ok(()) + } + _ => Err(InternalError( + "Expected lookup to be in DownloadingHeader state".to_owned(), + )), + } + } + + fn extend_with_children(&mut self, mut child_chain: Self) { + while let Some(block) = child_chain.block_roots.pop_back() { + // pop_back gives oldest first, pushing to front restores tip-first + self.block_roots.push_front(block); + } + + // All the peers of the child chain have imported the ancestors + self.peers.extend(child_chain.peers.drain()); + } + + fn to_waiting_parent( + &mut self, + parent_root: Hash256, + ready_to_sync: bool, + ) -> Result<(), Error> { + self.status = HeaderChainStatus::WaitingParent { + parent_root, + ready_to_sync, + }; + Ok(()) + } + + fn parent_root(&self) -> Option { + match &self.status { + HeaderChainStatus::Backfill { .. } => None, + HeaderChainStatus::WaitingParent { parent_root, .. } => Some(*parent_root), + } + } + + /// Returns true if the peer has been added to the map + fn add_peer(&mut self, peer: PeerId, status: PeerStatusSummary) -> bool { + let contains_key = self.peers.contains_key(&peer); + self.peers.insert(peer, status); + !contains_key + } + + /// Returns true if a peer was removed from the map + fn remove_peer(&mut self, peer: &PeerId) -> bool { + self.peers.remove(peer).is_some() + } + + fn pop_oldest_ancestor(&mut self) -> Option { + match &mut self.status { + HeaderChainStatus::WaitingParent { + parent_root, + ready_to_sync, + } => { + if !*ready_to_sync { + return None; + } + if let Some((block_root, block_slot)) = self.block_roots.pop_back() { + *parent_root = block_root; + Some((block_root, block_slot)) + } else { + None + } + } + _ => None, + } + } + + fn peers_of_block_slot(&self, block_slot: Slot) -> Vec { + self.peers + .iter() + .filter(|(_, status)| block_slot >= status.min_slot && block_slot < status.max_slot) + .map(|(peer, _)| *peer) + .collect() + } + + /// Returns true if this chain transitioned into ready to sync + fn on_parent_imported(&mut self, imported_block_root: &Hash256) -> bool { + match &mut self.status { + HeaderChainStatus::WaitingParent { + parent_root, + ready_to_sync, + } => { + if parent_root == imported_block_root && !*ready_to_sync { + *ready_to_sync = true; + true + } else { + false + } + } + _ => false, + } + } + + fn block_count(&self) -> usize { + self.block_roots.len() + } + + fn min_slot(&self) -> Option { + self.block_roots.back().map(|b| b.1) + } + + fn max_slot(&self) -> Option { + self.block_roots.front().map(|b| b.1) + } + + fn peer_count(&self) -> usize { + self.peers.len() + } +} diff --git a/beacon_node/network/src/sync/block_lookups/header_request.rs b/beacon_node/network/src/sync/block_lookups/header_request.rs new file mode 100644 index 00000000000..f00afe98a35 --- /dev/null +++ b/beacon_node/network/src/sync/block_lookups/header_request.rs @@ -0,0 +1,81 @@ +use crate::sync::block_lookups::Error; +use crate::sync::network_context::{ + DownloadRequest, DownloadRequestError, RpcRequestSendError, SyncNetworkContext, +}; +use beacon_chain::BeaconChainTypes; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::{ + BlocksByRootRequestId, BlocksByRootRequester, HeaderChainId, HeaderLookupId, Id, +}; +use std::collections::HashSet; +use types::{BeaconBlockHeader, Hash256}; + +/// Tracks a request to download a BeaconBlockHeader by block root +pub(crate) struct HeaderRequest { + id: Option, + chain_id: HeaderChainId, + block_root: Hash256, + failed_peers: HashSet, + request: DownloadRequest, +} + +impl HeaderRequest { + pub fn new(block_root: Hash256, chain_id: HeaderChainId) -> Self { + Self { + id: None, + chain_id, + block_root, + failed_peers: <_>::default(), + request: DownloadRequest::new(), + } + } + + fn empty() -> Self { + Self::new(Hash256::ZERO, HeaderChainId(0)) + } + + fn continue_request( + &mut self, + peers: I, + cx: &mut SyncNetworkContext, + ) -> Result<(), Error> + where + T: BeaconChainTypes, + I: Iterator, + { + if self.request.is_awaiting_download() { + let Some(peer) = peers + .map(|peer| { + ( + // If contains -> 1 (order after), not contains -> 0 (order first) + self.failed_peers.contains(peer), + // Random factor to break ties, otherwise the PeerID breaks ties + rand::random::(), + peer, + ) + }) + .min() + .map(|(_, _, peer)| *peer) + else { + // When a peer disconnects and is removed from the SyncingChain peer set, if the set + // reaches zero the lookup is removed + return Err(Error::InternalError("No peers".to_owned())); + }; + + let id = self.id.get_or_insert_with(|| cx.next_id()).clone(); + + // TODO(tree-sync): send headers_by_root request if available + let req_id = cx.send_blocks_by_root_request( + peer, + self.block_root, + BlocksByRootRequester::Header(HeaderLookupId { + id, + chain_id: self.chain_id, + }), + )?; + + self.request.on_download_start(req_id)?; + } + Ok(()) + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8ffd298caf..eaf39b5602c 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -23,20 +23,18 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; -use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; +use super::manager::{BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::SyncMessage; -use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; -use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::{ AvailabilityCheckError, AvailabilityCheckErrorCategory, }; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; -pub use common::RequestState; use fnv::FnvHashMap; -use lighthouse_network::service::api_types::SingleLookupReqId; +use lighthouse_network::service::api_types::{Id, SingleLookupReqId}; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; @@ -423,26 +421,25 @@ impl BlockLookups { /* Lookup responses */ /// Process a block or blob response received from a single lookup request. - pub fn on_download_response>( + pub fn on_download_response( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: Result<(RpcBlock, PeerGroup, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) { - let result = self.on_download_response_inner::(id, response, cx); + let result = self.on_download_response_inner(id, response, cx); self.on_lookup_result(id.lookup_id, result, "download_response", cx); } /// Process a block or blob response received from a single lookup request. - pub fn on_download_response_inner>( + pub fn on_download_response_inner( &mut self, id: SingleLookupReqId, - response: Result<(R::VerifiedResponseType, PeerGroup, Duration), RpcResponseError>, + response: Result<(RpcBlock, PeerGroup, Duration), RpcResponseError>, cx: &mut SyncNetworkContext, ) -> Result { // Note: no need to downscore peers here, already downscored on network context - let response_type = R::response_type(); let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else { // We don't have the ability to cancel in-flight RPC requests. So this can happen // if we started this RPC request, and later saw the block/blobs via gossip. @@ -451,9 +448,6 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); match response { Ok((response, peer_group, seen_timestamp)) => { @@ -461,7 +455,6 @@ impl BlockLookups { ?block_root, ?id, ?peer_group, - ?response_type, "Received lookup download success" ); @@ -476,7 +469,7 @@ impl BlockLookups { // Register the download peer here. Once we have received some data over the wire we // attribute it to this peer for scoring latter regardless of how the request was // done. - request_state.on_download_success( + self.request_state.on_download_success( id.req_id, DownloadResult { value: response, @@ -493,12 +486,11 @@ impl BlockLookups { debug!( ?block_root, ?id, - ?response_type, error = ?e, "Received lookup download failure" ); - request_state.on_download_failure(id.req_id)?; + self.request_state.on_download_failure(id.req_id)?; // continue_request will retry a download as the request state is AwaitingDownload } } @@ -518,25 +510,16 @@ impl BlockLookups { pub fn on_processing_result( &mut self, - process_type: BlockProcessType, + id: Id, result: BlockProcessingResult, cx: &mut SyncNetworkContext, ) { - let lookup_result = match process_type { - BlockProcessType::SingleBlock { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleBlob { id } => { - self.on_processing_result_inner::>(id, result, cx) - } - BlockProcessType::SingleCustodyColumn(id) => { - self.on_processing_result_inner::>(id, result, cx) - } - }; - self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx); + let lookup_result = + self.on_processing_result_inner::>(id, result, cx); + self.on_lookup_result(id, lookup_result, "processing_result", cx); } - pub fn on_processing_result_inner>( + pub fn on_processing_result_inner( &mut self, lookup_id: SingleLookupId, result: BlockProcessingResult, @@ -548,12 +531,8 @@ impl BlockLookups { }; let block_root = lookup.block_root(); - let request_state = R::request_state_mut(lookup) - .map_err(|e| LookupRequestError::BadState(e.to_owned()))? - .get_state_mut(); debug!( - component = ?R::response_type(), ?block_root, id = lookup_id, ?result, @@ -564,7 +543,7 @@ impl BlockLookups { BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) => { // Successfully imported - request_state.on_processing_success()?; + self.request_state.on_processing_success()?; Action::Continue } @@ -573,7 +552,7 @@ impl BlockLookups { }) => { // `on_processing_success` is called here to ensure the request state is updated prior to checking // if both components have been processed. - request_state.on_processing_success()?; + self.request_state.on_processing_success()?; if lookup.all_components_processed() { // We don't request for other block components until being sure that the block has @@ -595,10 +574,7 @@ impl BlockLookups { BlockProcessingResult::Ignored => { // Beacon processor signalled to ignore the block processing result. // This implies that the cpu is overloaded. Drop the request. - warn!( - component = ?R::response_type(), - "Lookup component processing ignored, cpu might be overloaded" - ); + warn!("Lookup component processing ignored, cpu might be overloaded"); Action::Drop("Block processing ignored".to_owned()) } BlockProcessingResult::Err(e) => { @@ -614,7 +590,7 @@ impl BlockLookups { // once there are no pending parent requests. // Note: `BlockError::ParentUnknown` is only returned when processing // blocks, not blobs. - request_state.revert_to_awaiting_processing()?; + self.request_state.revert_to_awaiting_processing()?; Action::ParentUnknown { parent_root } } ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { @@ -642,11 +618,10 @@ impl BlockLookups { other => { debug!( ?block_root, - component = ?R::response_type(), error = ?other, "Invalid lookup component" ); - let peer_group = request_state.on_processing_failure()?; + let peer_group = self.request_state.on_processing_failure()?; let peers_to_penalize: Vec<_> = match other { // Note: currenlty only InvalidColumn errors have index granularity, // but future errors may follow the same pattern. Generalize this @@ -667,13 +642,7 @@ impl BlockLookups { cx.report_peer( *peer, PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - ResponseType::CustodyColumn => { - "lookup_custody_column_processing_failure" - } - }, + "lookup_processing_failure", ); } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 46897b2283b..e5491ce1d4c 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -1,5 +1,4 @@ use super::{BlockComponent, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS}; -use crate::sync::block_lookups::common::RequestState; use crate::sync::network_context::{ LookupRequestResult, PeerGroup, ReqId, RpcRequestSendError, SendErrorProcessor, SyncNetworkContext, @@ -61,7 +60,7 @@ pub enum LookupRequestError { #[educe(Debug(bound(T: BeaconChainTypes)))] pub struct SingleBlockLookup { pub id: Id, - pub block_request_state: BlockRequestState, + pub request_state: BlockRequestState, pub component_requests: ComponentRequests, /// Peers that claim to have imported this set of block components. This state is shared with /// the custody request to have an updated view of the peers that claim to have imported the @@ -144,23 +143,6 @@ impl SingleBlockLookup { self.created.elapsed() } - /// Maybe insert a verified response into this lookup. Returns true if imported - pub fn add_child_components(&mut self, block_component: BlockComponent) -> bool { - match block_component { - BlockComponent::Block(block) => self - .block_request_state - .state - .insert_verified_response(block), - BlockComponent::Blob(_) | BlockComponent::DataColumn(_) => { - // For now ignore single blobs and columns, as the blob request state assumes all blobs are - // attributed to the same peer = the peer serving the remaining blobs. Ignoring this - // block component has a minor effect, causing the node to re-request this blob - // once the parent chain is successfully resolved - false - } - } - } - /// Check the block root matches the requested block root. pub fn is_for_block(&self, block_root: Hash256) -> bool { self.block_root() == block_root @@ -276,7 +258,7 @@ impl SingleBlockLookup { } /// Potentially makes progress on this request if it's in a progress-able state - fn continue_request>( + fn continue_request( &mut self, cx: &mut SyncNetworkContext, expected_blobs: usize, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 2e0c56db23f..96735c2af62 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -13,7 +13,6 @@ use crate::network_beacon_processor::TestBeaconChainType; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::batch::ByRangeRequestType; -use crate::sync::block_lookups::SingleLookupId; use crate::sync::block_sidecar_coupling::CouplingError; use crate::sync::network_context::requests::BlobsByRootSingleBlockRequest; use crate::sync::range_data_column_batch_request::RangeDataColumnBatchRequest; @@ -21,14 +20,16 @@ use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::{BeaconChain, BeaconChainTypes, BlockProcessStatus, EngineState}; use custody::CustodyRequestResult; use fnv::FnvHashMap; +use itertools::Itertools; use lighthouse_network::rpc::methods::{BlobsByRangeRequest, DataColumnsByRangeRequest}; use lighthouse_network::rpc::{BlocksByRangeRequest, GoodbyeReason, RPCError, RequestType}; pub use lighthouse_network::service::api_types::RangeRequestId; use lighthouse_network::service::api_types::{ - AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId, - CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyId, CustodyRequester, - DataColumnsByRangeRequestId, DataColumnsByRangeRequester, DataColumnsByRootRequestId, - DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, + AppRequestId, BlobsByRangeRequestId, BlobsByRootRequestId, BlocksByRangeRequestId, + BlocksByRootRequestId, ComponentsByRangeRequestId, ComponentsByRootRequestId, + CustodyBackFillBatchRequestId, CustodyBackfillBatchId, CustodyByRootRequestId, CustodyId, + CustodyRequester, DataColumnsByRangeRequestId, DataColumnsByRangeRequester, + DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, SingleLookupReqId, SyncRequestId, }; use lighthouse_network::{Client, NetworkGlobals, PeerAction, PeerId, ReportSource}; use lighthouse_tracing::{SPAN_OUTGOING_BLOCK_BY_ROOT_REQUEST, SPAN_OUTGOING_RANGE_REQUEST}; @@ -51,11 +52,13 @@ use tokio::sync::mpsc; use tracing::{Span, debug, debug_span, error, warn}; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, EthSpec, - ForkContext, Hash256, SignedBeaconBlock, Slot, + BlobSidecar, BlockImportSource, ChainSpec, ColumnIndex, DataColumnSidecar, + DataColumnSidecarList, EthSpec, ForkContext, Hash256, SignedBeaconBlock, Slot, }; +mod block_components_by_root; pub mod custody; +mod download_request; mod requests; macro_rules! new_range_request_span { @@ -101,6 +104,7 @@ pub enum RpcResponseError { VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), BlockComponentCouplingError(CouplingError), + InternalError(String), } #[derive(Debug, PartialEq, Eq)] @@ -171,6 +175,57 @@ impl PeerGroup { } } +#[derive(Clone, Debug)] +pub struct BatchPeers { + block_peer: PeerId, + column_peers: PeerGroup, +} + +impl BatchPeers { + pub(crate) fn new_from_block_peer(block_peer: PeerId) -> Self { + Self { + block_peer, + column_peers: PeerGroup { + peers: <_>::default(), + }, + } + } + pub(crate) fn new(block_peer: PeerId, column_peers: PeerGroup) -> Self { + Self { + block_peer, + column_peers, + } + } + + pub(crate) fn blame(&self, peer_action: PeerGroupAction) -> Vec<(PeerId, PeerAction)> { + // Penalize each peer only once. Currently a peer_action does not mix different + // PeerAction levels. + let mut peer_penalties = peer_action + .column_peer + .iter() + .flat_map(|(column_index, penalty)| { + self.column_peers + .of_index(*column_index as usize) + .map(|peer| (*peer, *penalty)) + }) + .collect::>(); + + if let Some(penalty) = peer_action.block_peer { + // Penalize the peer appropiately. + peer_penalties.push((self.block_peer, penalty)); + } + + peer_penalties + } +} + +/// Tracks which block(s) component caused the block to be invalid. Used to attribute fault in sync. +#[derive(Debug)] +pub struct PeerGroupAction { + pub block_peer: Option, + pub column_peer: HashMap, +} + /// Sequential ID that uniquely identifies ReqResp outgoing requests pub type ReqId = u32; @@ -198,9 +253,10 @@ pub struct SyncNetworkContext { /// A mapping of active BlocksByRoot requests, including both current slot and parent lookups. blocks_by_root_requests: - ActiveRequests>, + ActiveRequests>, /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. - blobs_by_root_requests: ActiveRequests>, + blobs_by_root_requests: + ActiveRequests>, /// A mapping of active DataColumnsByRoot requests data_columns_by_root_requests: ActiveRequests>, @@ -214,7 +270,7 @@ pub struct SyncNetworkContext { data_columns_by_range_requests: ActiveRequests>, /// Mapping of active custody column requests for a block root - custody_by_root_requests: FnvHashMap>, + custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with other ByRange requests for data components components_by_range_requests: @@ -340,11 +396,11 @@ impl SyncNetworkContext { let blocks_by_root_ids = blocks_by_root_requests .active_requests_of_peer(peer_id) .into_iter() - .map(|id| SyncRequestId::SingleBlock { id: *id }); + .map(|id| SyncRequestId::SingleBlock(*id)); let blobs_by_root_ids = blobs_by_root_requests .active_requests_of_peer(peer_id) .into_iter() - .map(|id| SyncRequestId::SingleBlob { id: *id }); + .map(|id| SyncRequestId::SingleBlob(*id)); let data_column_by_root_ids = data_columns_by_root_requests .active_requests_of_peer(peer_id) .into_iter() @@ -379,6 +435,10 @@ impl SyncNetworkContext { &self.network_beacon_processor.network_globals } + pub fn spec(&self) -> &ChainSpec { + &self.chain.spec + } + /// Returns the Client type of the peer if known pub fn client_type(&self, peer_id: &PeerId) -> Client { self.network_globals() @@ -809,12 +869,12 @@ impl SyncNetworkContext { /// - If the da_checker has a pending block from gossip or a previous request /// /// Returns false if no request was made, because the block is already imported - pub fn block_lookup_request( + pub fn send_blocks_by_root_request( &mut self, - lookup_id: SingleLookupId, + lookup_id: ComponentsByRootRequestId, lookup_peers: Arc>>, block_root: Hash256, - ) -> Result { + ) -> Result, RpcRequestSendError> { let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() @@ -872,9 +932,9 @@ impl SyncNetworkContext { } } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), + let id = BlocksByRootRequestId { + id: self.next_id(), + parent_request_id: lookup_id, }; let request = BlocksByRootSingleRequest(block_root); @@ -894,7 +954,7 @@ impl SyncNetworkContext { .send(NetworkMessage::SendRequest { peer_id, request: network_request, - app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock { id }), + app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlock(id)), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; @@ -921,7 +981,7 @@ impl SyncNetworkContext { request_span, ); - Ok(LookupRequestResult::RequestSent(id.req_id)) + Ok(LookupRequestResult::RequestSent(id)) } /// Request necessary blobs for `block_root`. Requests only the necessary blobs by checking: @@ -930,13 +990,13 @@ impl SyncNetworkContext { /// - If the da_checker has pending blobs from gossip /// /// Returns false if no request was made, because we don't need to import (more) blobs. - pub fn blob_lookup_request( + pub fn send_blobs_by_root_request( &mut self, - lookup_id: SingleLookupId, + lookup_id: ComponentsByRootRequestId, lookup_peers: Arc>>, block_root: Hash256, expected_blobs: usize, - ) -> Result { + ) -> Result, RpcRequestSendError> { let active_request_count_by_peer = self.active_request_count_by_peer(); let Some(peer_id) = lookup_peers .read() @@ -977,9 +1037,9 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), + let id = BlobsByRootRequestId { + id: self.next_id(), + parent_request_id: lookup_id, }; let request = BlobsByRootSingleBlockRequest { @@ -998,7 +1058,7 @@ impl SyncNetworkContext { .send(NetworkMessage::SendRequest { peer_id, request: network_request, - app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob { id }), + app_request_id: AppRequestId::Sync(SyncRequestId::SingleBlob(id)), }) .map_err(|_| RpcRequestSendError::InternalError("network send error".to_owned()))?; @@ -1023,7 +1083,7 @@ impl SyncNetworkContext { Span::none(), ); - Ok(LookupRequestResult::RequestSent(id.req_id)) + Ok(LookupRequestResult::RequestSent(id)) } /// Request to send a single `data_columns_by_root` request to the network. @@ -1076,10 +1136,11 @@ impl SyncNetworkContext { /// any request to the network if no columns have to be fetched based on the import state of the /// node. A custody request is a "super request" that may trigger 0 or more `data_columns_by_root` /// requests. - pub fn custody_lookup_request( + pub fn send_custody_by_root_request( &mut self, - lookup_id: SingleLookupId, + lookup_id: ComponentsByRootRequestId, block_root: Hash256, + block_slot: Slot, lookup_peers: Arc>>, ) -> Result { let custody_indexes_imported = self @@ -1088,14 +1149,10 @@ impl SyncNetworkContext { .cached_data_column_indexes(&block_root) .unwrap_or_default(); - let current_epoch = self.chain.epoch().map_err(|e| { - RpcRequestSendError::InternalError(format!("Unable to read slot clock {:?}", e)) - })?; - // Include only the blob indexes not yet imported (received through gossip) let custody_indexes_to_fetch = self .chain - .sampling_columns_for_epoch(current_epoch) + .sampling_columns_for_epoch(block_slot.epoch(T::EthSpec::slots_per_epoch())) .iter() .copied() .filter(|index| !custody_indexes_imported.contains(index)) @@ -1106,9 +1163,9 @@ impl SyncNetworkContext { return Ok(LookupRequestResult::NoRequestNeeded("no indices to fetch")); } - let id = SingleLookupReqId { - lookup_id, - req_id: self.next_id(), + let id = CustodyByRootRequestId { + id: self.next_id(), + parent_request_id: lookup_id, }; debug!( @@ -1411,7 +1468,7 @@ impl SyncNetworkContext { pub(crate) fn on_single_block_response( &mut self, - id: SingleLookupReqId, + id: BlocksByRootRequestId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>>> { @@ -1433,7 +1490,7 @@ impl SyncNetworkContext { pub(crate) fn on_single_blob_response( &mut self, - id: SingleLookupReqId, + id: BlobsByRootRequestId, peer_id: PeerId, rpc_event: RpcEvent>>, ) -> Option>> { diff --git a/beacon_node/network/src/sync/network_context/block_components_by_root.rs b/beacon_node/network/src/sync/network_context/block_components_by_root.rs new file mode 100644 index 00000000000..76cb1996990 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/block_components_by_root.rs @@ -0,0 +1,400 @@ +use crate::sync::network_context::download_request::{ + DownloadRequest, Error as DownloadRequestError, +}; +use crate::sync::network_context::{ + BatchPeers, PeerGroup, RpcRequestSendError, RpcResponseError, SyncNetworkContext, +}; +use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; +use beacon_chain::{BeaconChainTypes, get_block_root}; +use lighthouse_network::PeerId; +use lighthouse_network::service::api_types::{ + BlobsByRootRequestId, BlocksByRootRequestId, ComponentsByRootRequestId, CustodyByRootRequestId, +}; +use parking_lot::RwLock; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use types::{ + BlobSidecar, ChainSpec, DataColumnSidecarList, EthSpec, Hash256, RuntimeVariableList, + SignedBeaconBlock, +}; + +/// Given a `BlocksByRootRequest` (a collection of block roots) fetches all necessary data to +/// return potentially available RpcBlocks. +/// +/// See [`State`] for the set of `*_by_root` it may issue depending on the fork. +pub struct BlockComponentsByRootRequest { + id: ComponentsByRootRequestId, + peers: Arc>>, + block_root: Hash256, + state: State, +} + +// Request blocks first, then columns. Assuming the block peer is honest we can attribute +// custody failures to the peers serving us columns. We want to get rid of the honest block +// peer assumption in the future, see https://github.com/sigp/lighthouse/issues/6258 +enum State { + BlocksRequest { + blocks_request: DownloadRequest>>, + }, + DataRequest { + block: Arc>, + block_peer: PeerId, + data_request: DataRequest, + }, +} + +enum DataRequest { + Deneb { + blobs_request: DownloadRequest>>>, + }, + Fulu { + custody_request: + DownloadRequest, PeerGroup>, + }, +} + +enum Request { + /// Active(RequestIndex) + Active(I), + /// Complete(DownloadedData, Peers) + Complete(T, P), +} + +pub type BlockComponentsByRootRequestResult = Result, BatchPeers)>, Error>; + +pub enum Error { + InternalError(String), +} + +impl From for RpcResponseError { + fn from(e: Error) -> Self { + match e { + Error::InternalError(e) => RpcResponseError::InternalError(e), + } + } +} + +impl From for RpcRequestSendError { + fn from(e: Error) -> Self { + match e { + Error::InternalError(e) => RpcRequestSendError::InternalError(e), + } + } +} + +impl From for Error { + fn from(e: DownloadRequestError) -> Self { + match e { + DownloadRequestError::InternalError(e) => Self::InternalError(e), + DownloadRequestError::TooManyErrors(e) => todo!(), + } + } +} + +/// Used to typesafe assertions of state in range sync tests +#[cfg(test)] +#[derive(Debug)] +pub enum BlockComponentsByRootRequestStep { + BlocksRequest, + CustodyRequest, +} + +impl BlockComponentsByRootRequest { + pub fn new( + id: ComponentsByRootRequestId, + block_root: Hash256, + peers: Arc>>, + peers_to_deprioritize: &HashSet, + cx: &mut SyncNetworkContext, + ) -> Result { + let blocks_req_id = cx.send_blocks_by_root_request(id, peers.clone(), block_root)?; + + let state = State::BlocksRequest { + blocks_request: Request::Active(blocks_req_id), + }; + + Ok(Self { + id, + peers, + block_root, + state, + }) + } + + pub fn continue_requests( + &mut self, + cx: &mut SyncNetworkContext, + ) -> BlockComponentsByRootRequestResult { + match &mut self.state { + State::BlocksRequest { + blocks_request: blocks_by_range_request, + } => { + if let Some((block, block_peer)) = blocks_by_range_request.to_finished() { + let fork = cx.spec().fork_name_at_slot::(block.slot()); + let block_has_data = block.has_data(); + + if block_has_data && fork.fulu_enabled() { + let req_id = cx + .send_custody_by_root_request( + self.id, + self.block_root, + block.slot(), + self.peers.clone(), + ) + .map_err(|e| match e { + RpcRequestSendError::InternalError(e) => Error::InternalError(e), + RpcRequestSendError::NoPeer(_) => Error::InternalError( + "send_custody_by_range_request does not error with NoPeers" + .to_owned(), + ), + })?; + + self.state = State::DataRequest { + block: block.clone(), + block_peer: *block_peer, + data_request: DataRequest::Fulu { + custody_request: Request::Active(req_id), + }, + }; + Ok(None) + } else if block_has_data && fork.deneb_enabled() { + // TODO(deneb): is it okay to send blobs_by_range requests outside the DA window? I + // would like the beacon processor / da_checker to be the one that decides if an + // RpcBlock is valid or not with respect to containing blobs. Having sync not even + // attempt a requests seems like an added limitation. + let req_id = cx + .send_blobs_by_root_request( + self.id, + *block_peer, + self.block_root, + block.num_expected_blobs(), + ) + .map_err(|e| match e { + RpcRequestSendError::InternalError(e) => Error::InternalError(e), + RpcRequestSendError::NoPeer(_) => Error::InternalError( + "send_custody_by_range_request does not error with NoPeers" + .to_owned(), + ), + })?; + + let blobs_request = DownloadRequest::new(); + blobs_request.on_download_start(req_id)?; + self.state = State::DataRequest { + block: block.clone(), + block_peer: *block_peer, + data_request: DataRequest::Deneb { blobs_request }, + }; + Ok(None) + } else { + let peer_group = BatchPeers::new_from_block_peer(*block_peer); + let rpc_block = couple_block_base(block.clone()); + Ok(Some((rpc_block, peer_group))) + } + } else { + // Wait for blocks_by_range requests to complete + Ok(None) + } + } + State::DataRequest { + block, + block_peer, + data_request, + } => match data_request { + DataRequest::Deneb { + blobs_request: blobs_by_range_request, + } => { + if let Some((blobs, _)) = blobs_by_range_request.to_finished() { + // We use the same block_peer for the blobs request + let peer_group = BatchPeers::new_from_block_peer(*block_peer); + let rpc_block = + couple_block_deneb(block.clone(), blobs.to_vec(), cx.spec())?; + Ok(Some((rpc_block, peer_group))) + } else { + // Wait for blocks_by_range and blobs_by_range requests to complete + Ok(None) + } + } + DataRequest::Fulu { + custody_request: custody_by_range_request, + } => { + if let Some((columns, column_peers)) = custody_by_range_request.to_finished() { + let peer_group = BatchPeers::new(*block_peer, column_peers.clone()); + let rpc_block = couple_block_fulu(block.clone(), columns.to_vec())?; + Ok(Some((rpc_block, peer_group))) + } else { + // Wait for the custody_by_range request to complete + Ok(None) + } + } + }, + } + } + + pub fn on_blocks_by_root_result( + &mut self, + id: BlocksByRootRequestId, + data: Result>, RpcResponseError>, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) -> BlockComponentsByRootRequestResult { + match &mut self.state { + State::BlocksRequest { blocks_request } => { + blocks_request.on_result(id, data, peer_id)?; + } + _ => { + return Err(Error::InternalError( + "Received unexpected blocks_by_range response".to_string(), + )); + } + } + + self.continue_requests(cx) + } + + pub fn on_blobs_by_root_result( + &mut self, + id: BlobsByRootRequestId, + data: Result>>, RpcResponseError>, + peer_id: PeerId, + cx: &mut SyncNetworkContext, + ) -> BlockComponentsByRootRequestResult { + match &mut self.state { + State::DataRequest { + data_request: DataRequest::Deneb { blobs_request }, + .. + } => { + blobs_request.on_result(id, peer_id, data)?; + } + _ => { + return Err(Error::InternalError( + "Received unexpected blobs_by_range response".to_string(), + )); + } + } + + self.continue_requests(cx) + } + + pub fn on_custody_by_root_result( + &mut self, + id: CustodyByRootRequestId, + data: Result, RpcResponseError>, + peers: PeerGroup, + cx: &mut SyncNetworkContext, + ) -> BlockComponentsByRootRequestResult { + match &mut self.state { + State::DataRequest { + data_request: DataRequest::Fulu { custody_request }, + .. + } => { + custody_request.finish(id, data, peers)?; + } + _ => { + return Err(Error::InternalError( + "Received unexpected custody_by_range response".to_string(), + )); + } + } + + self.continue_requests(cx) + } + + #[cfg(test)] + pub fn state_step(&self) -> BlockComponentsByRootRequestStep { + match &self.state { + State::BlocksRequest { .. } => BlockComponentsByRootRequestStep::BlocksRequest, + State::DataRequest { .. } => BlockComponentsByRootRequestStep::CustodyRequest, + } + } +} + +fn couple_block_base(block: Arc>) -> RpcBlock { + RpcBlock::new_without_blobs(None, block) +} + +fn couple_block_deneb( + block: Arc>, + blobs: Vec>>, + spec: &ChainSpec, +) -> Result, Error> { + let mut blobs_by_block = HashMap::>>>::new(); + for blob in blobs { + let block_root = blob.block_root(); + blobs_by_block.entry(block_root).or_default().push(blob); + } + + // Now collect all blobs that match to the block by block root. BlobsByRange request checks + // the inclusion proof so we know that the commitment is the expected. + // + // BlobsByRange request handler ensures that we don't receive more blobs than possible. + // If the peer serving the request sends us blobs that don't pair well we'll send to the + // processor blocks without expected blobs, resulting in a downscoring event. A serving peer + // could serve fake blobs for blocks that don't have data, but it would gain nothing by it + // wasting theirs and our bandwidth 1:1. Therefore blobs that don't pair well are just ignored. + // + // RpcBlock::new ensures that the count of blobs is consistent with the block + let block_root = get_block_root(&block); + let max_blobs_per_block = spec.max_blobs_per_block(block.epoch()) as usize; + let blobs = blobs_by_block.remove(&block_root).unwrap_or_default(); + // BlobsByRange request handler enforces that blobs are sorted by index + let blobs = RuntimeVariableList::new(blobs, max_blobs_per_block) + .map_err(|_| Error::InternalError("Blobs returned exceeds max length".to_string()))?; + Ok(RpcBlock::new(Some(block_root), block, Some(blobs)).expect("TODO: don't do matching here")) +} + +fn couple_block_fulu( + block: Arc>, + data_columns: DataColumnSidecarList, +) -> Result, Error> { + // Group data columns by block_root and index + let mut custody_columns_by_block = HashMap::>>::new(); + + for column in data_columns { + let block_root = column.block_root(); + + custody_columns_by_block + .entry(block_root) + .or_default() + // Safe to convert to `CustodyDataColumn`: we have asserted that the index of + // this column is in the set of `expects_custody_columns` and with the expected + // block root, so for the expected epoch of this batch. + .push(CustodyDataColumn::from_asserted_custody(column)); + } + + // Now iterate all blocks ensuring that the block roots of each block and data column match, + let block_root = get_block_root(&block); + let data_columns_with_block_root = custody_columns_by_block + // Remove to only use columns once + .remove(&block_root) + .unwrap_or_default(); + + RpcBlock::new_with_custody_columns(Some(block_root), block, data_columns_with_block_root) + .map_err(Error::InternalError) +} + +impl Request { + fn finish(&mut self, id: I, data: T, peer_id: P) -> Result<(), Error> { + match self { + Self::Active(expected_id) => { + if expected_id != &id { + return Err(Error::InternalError(format!( + "unexpected req_id expected {expected_id} got {id}" + ))); + } + *self = Self::Complete(data, peer_id); + Ok(()) + } + Self::Complete(_, _) => Err(Error::InternalError(format!( + "request already complete {id}" + ))), + } + } + + fn to_finished(&self) -> Option<(&T, &P)> { + match self { + Self::Active(_) => None, + Self::Complete(data, peer_id) => Some((data, peer_id)), + } + } +} diff --git a/beacon_node/network/src/sync/network_context/download_request.rs b/beacon_node/network/src/sync/network_context/download_request.rs new file mode 100644 index 00000000000..57e7666cf86 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/download_request.rs @@ -0,0 +1,129 @@ +use crate::sync::network_context::RpcResponseError; +use lighthouse_network::PeerId; +use std::time::Duration; +use strum::IntoStaticStr; + +/// TODO(das): Reconsider this retry count, it was choosen as a placeholder value. Each +/// `custody_by_*` request is already retried multiple inside of a lookup or batch +const MAX_DOWNLOAD_ATTEMPTS: usize = 5; + +pub struct DownloadRequest { + status: Status, + download_failures: Vec, +} + +#[derive(Debug, Clone, IntoStaticStr)] +pub enum Status { + NotStarted, + Downloading(I), + Downloaded(PeerId, T), +} + +#[derive(Debug)] +pub enum Error { + InternalError(String), + TooManyErrors(RpcResponseError), +} + +impl DownloadRequest { + pub fn new() -> Self { + Self { + status: Status::NotStarted, + download_failures: vec![], + } + } + + pub fn status_str(&self) -> &'static str { + (&self.status).into() + } + + pub fn is_awaiting_download(&self) -> bool { + match self.status { + Status::NotStarted => true, + Status::Downloading { .. } | Status::Downloaded { .. } => false, + } + } + + pub fn is_downloading(&self) -> bool { + match self.status { + Status::NotStarted => false, + Status::Downloading { .. } => true, + Status::Downloaded { .. } => false, + } + } + + pub fn is_downloaded(&self) -> bool { + match self.status { + Status::NotStarted | Status::Downloading { .. } => false, + Status::Downloaded { .. } => true, + } + } + + pub fn on_download_start(&mut self, req_id: I) -> Result<(), Error> { + match &self.status { + Status::NotStarted => { + self.status = Status::Downloading(req_id); + Ok(()) + } + other => Err(Error::InternalError(format!( + "bad state on_download_start expected NotStarted got {}", + Into::<&'static str>::into(other), + ))), + } + } + + pub fn on_result( + &mut self, + req_id: I, + peer_id: PeerId, + result: Result, + ) -> Result<(), Error> { + match &self.status { + Status::Downloading(expected_req_id) => { + if req_id != *expected_req_id { + return Err(Error::InternalError(format!( + "Received download result for req_id {req_id} expecting {expected_req_id}" + ))); + } + + match result { + Ok(data) => { + self.status = Status::Downloaded(peer_id, data); + } + Err(e) => { + self.download_failures.push(e); + if self.download_failures.len() > MAX_DOWNLOAD_ATTEMPTS { + if let Some(last_error) = self.download_failures.pop() { + return Err(Error::TooManyErrors(last_error)); + } + } + self.status = Status::NotStarted; + } + } + + Ok(()) + } + other => Err(Error::InternalError(format!( + "bad state on_result expected Downloading got {}", + Into::<&'static str>::into(other), + ))), + } + } + + pub fn is_complete(&self) -> Option<&T> { + match &self.status { + Status::Downloaded(_, data) => Some(data), + _ => None, + } + } + + pub fn complete(self) -> Result<(PeerId, T), Error> { + match self.status { + Status::Downloaded(peer_id, data) => Ok((peer_id, data)), + other => Err(Error::InternalError(format!( + "bad state complete expected Downloaded got {}", + Into::<&'static str>::into(other), + ))), + } + } +}