Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use libp2p::PeerId;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use types::{
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, LightClientBootstrap,
BlobSidecar, DataColumnSidecar, Epoch, EthSpec, Hash256, LightClientBootstrap,
LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock,
};

Expand Down Expand Up @@ -71,7 +71,6 @@ pub struct DataColumnsByRangeRequestId {

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub enum DataColumnsByRangeRequester {
ComponentsByRange(ComponentsByRangeRequestId),
CustodyBackfillSync(CustodyBackFillBatchRequestId),
}

Expand Down Expand Up @@ -130,9 +129,17 @@ pub struct CustodyId {
}

/// Downstream components that perform custody by root requests.
/// Currently, it's only single block lookups, so not using an enum
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct CustodyRequester(pub SingleLookupReqId);
pub enum CustodyRequester {
SingleLookup(SingleLookupReqId),
RangeSync(RangeSyncCustodyId),
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct RangeSyncCustodyId {
pub id: ComponentsByRangeRequestId,
pub block_root: Hash256,
}

/// Application level requests sent to the network.
#[derive(Debug, Clone, Copy, PartialEq)]
Expand Down Expand Up @@ -254,7 +261,16 @@ impl Display for DataColumnsByRootRequester {

impl Display for CustodyRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
match self {
Self::SingleLookup(id) => write!(f, "{id}"),
Self::RangeSync(id) => write!(f, "RangeSync/{id}"),
}
}
}

impl Display for RangeSyncCustodyId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{:?}", self.id, self.block_root)
}
}

Expand All @@ -270,7 +286,6 @@ impl Display for RangeRequestId {
impl Display for DataColumnsByRangeRequester {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::ComponentsByRange(id) => write!(f, "ByRange/{id}"),
Self::CustodyBackfillSync(id) => write!(f, "CustodyBackfill/{id}"),
}
}
Expand All @@ -285,7 +300,7 @@ mod tests {
let id = DataColumnsByRootRequestId {
id: 123,
requester: DataColumnsByRootRequester::Custody(CustodyId {
requester: CustodyRequester(SingleLookupReqId {
requester: CustodyRequester::SingleLookup(SingleLookupReqId {
req_id: 121,
lookup_id: 101,
}),
Expand All @@ -298,17 +313,17 @@ mod tests {
fn display_id_data_columns_by_range() {
let id = DataColumnsByRangeRequestId {
id: 123,
parent_request_id: DataColumnsByRangeRequester::ComponentsByRange(
ComponentsByRangeRequestId {
parent_request_id: DataColumnsByRangeRequester::CustodyBackfillSync(
CustodyBackFillBatchRequestId {
id: 122,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
batch_id: CustodyBackfillBatchId {
epoch: Epoch::new(0),
run_id: 54,
},
},
),
peer: PeerId::random(),
};
assert_eq!(format!("{id}"), "123/ByRange/122/RangeSync/0/54");
assert_eq!(format!("{id}"), "123/CustodyBackfill/122/0/54");
}
}
13 changes: 7 additions & 6 deletions beacon_node/lighthouse_network/tests/rpc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use libp2p::PeerId;
use lighthouse_network::rpc::{RequestType, methods::*};
use lighthouse_network::service::api_types::{
AppRequestId, BlobsByRangeRequestId, BlocksByRangeRequestId, ComponentsByRangeRequestId,
DataColumnsByRangeRequestId, DataColumnsByRangeRequester, RangeRequestId, SyncRequestId,
CustodyBackFillBatchRequestId, CustodyBackfillBatchId, DataColumnsByRangeRequestId,
DataColumnsByRangeRequester, RangeRequestId, SyncRequestId,
};
use lighthouse_network::{NetworkEvent, ReportSource, Response};
use ssz::Encode;
Expand Down Expand Up @@ -1847,12 +1848,12 @@ fn test_request_too_large_data_columns_by_range() {
AppRequestId::Sync(SyncRequestId::DataColumnsByRange(
DataColumnsByRangeRequestId {
id: 1,
parent_request_id: DataColumnsByRangeRequester::ComponentsByRange(
ComponentsByRangeRequestId {
parent_request_id: DataColumnsByRangeRequester::CustodyBackfillSync(
CustodyBackFillBatchRequestId {
id: 1,
requester: RangeRequestId::RangeSync {
chain_id: 1,
batch_id: Epoch::new(1),
batch_id: CustodyBackfillBatchId {
epoch: Epoch::new(1),
run_id: 1,
},
},
),
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,30 @@ pub static SYNCING_CHAIN_BATCHES: LazyLock<Result<IntGaugeVec>> = LazyLock::new(
&["sync_type", "state"],
)
});
pub static SYNCING_CHAIN_BATCH_DOWNLOADING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram_with_buckets(
"sync_range_chain_batch_downloading_seconds",
"Time range sync batches spend downloading",
Ok(vec![0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0, 30.0, 60.0]),
)
});
pub static SYNCING_CHAIN_BATCH_PROCESSING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
try_create_histogram_with_buckets(
"sync_range_chain_batch_processing_seconds",
"Time range sync batches spend in processing",
Ok(vec![
0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 20.0,
]),
)
});
pub static SYNCING_CHAIN_BATCH_AWAITING_PROCESSING_COUNT: LazyLock<Result<Histogram>> =
LazyLock::new(|| {
try_create_histogram_with_buckets(
"sync_range_chain_batch_awaiting_processing_count",
"Number of batches in AwaitingProcessing when a batch starts processing",
Ok(vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]),
)
});
pub static SYNC_SINGLE_BLOCK_LOOKUPS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
try_create_int_gauge(
"sync_single_block_lookups",
Expand Down
89 changes: 8 additions & 81 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use strum::IntoEnumIterator;
use tracing::{debug, error, info, warn};
use types::{ColumnIndex, Epoch, EthSpec};
use types::{Epoch, EthSpec};

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -315,45 +315,22 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
peer_id: Option<&PeerId>,
request_id: Id,
err: RpcResponseError,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
if let RpcResponseError::BlockComponentCouplingError(coupling_error) = &err {
match coupling_error {
CouplingError::DataColumnPeerFailure {
error,
faulty_peers,
exceeded_retries,
} => {
debug!(?batch_id, error, "Block components coupling error");
// Note: we don't fail the batch here because a `CouplingError` is
// recoverable by requesting from other honest peers.
let mut failed_columns = HashSet::new();
let mut failed_peers = HashSet::new();
for (column, peer) in faulty_peers {
failed_columns.insert(*column);
failed_peers.insert(*peer);
}

// Only retry if peer failure **and** retries haven't been exceeded
if !*exceeded_retries {
return self.retry_partial_batch(
network,
batch_id,
request_id,
failed_columns,
failed_peers,
);
}
}
CouplingError::BlobPeerFailure(msg) => {
debug!(?batch_id, msg, "Blob peer failure");
}
CouplingError::InternalError(msg) => {
error!(?batch_id, msg, "Block components coupling internal error");
}
CouplingError::DataColumnPeerFailure { error, .. } => {
debug!(?batch_id, error, "Data column peer failure");
}
}
}
// A batch could be retried without the peer failing the request (disconnecting/
Expand All @@ -365,7 +342,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
match batch.download_failed(Some(*peer_id)) {
match batch.download_failed(peer_id.copied()) {
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
Expand Down Expand Up @@ -699,7 +676,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// Batches can be in `AwaitingDownload` state if there weren't good data column subnet
// peers to send the request to.
BatchState::AwaitingDownload => return Ok(ProcessResult::Successful),
BatchState::Failed | BatchState::Processing(_) => {
BatchState::Failed | BatchState::Processing(..) => {
// these are all inconsistent states:
// - Failed -> non recoverable batch. Chain should have been removed
// - Processing -> `self.current_processing_batch` is None
Expand Down Expand Up @@ -805,7 +782,7 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
crit!("batch indicates inconsistent chain state while advancing chain")
}
BatchState::AwaitingProcessing(..) => {}
BatchState::Processing(_) => {
BatchState::Processing(..) => {
debug!(batch = %id, %batch, "Advancing chain while processing a batch");
if let Some(processing_id) = self.current_processing_batch
&& id >= processing_id
Expand Down Expand Up @@ -902,14 +879,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
.collect::<HashSet<_>>();

let (request, is_blob_batch) = batch.to_blocks_by_range_request();
let failed_peers = batch.failed_peers();
match network.block_components_by_range_request(
is_blob_batch,
request,
RangeRequestId::BackfillSync { batch_id },
&synced_peers,
&synced_peers, // All synced peers have imported up to the finalized slot so they must have their custody columns available
&failed_peers,
) {
Ok(request_id) => {
// inform the batch about the new request
Expand Down Expand Up @@ -957,53 +931,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
Ok(())
}

/// Retries partial column requests within the batch by creating new requests for the failed columns.
pub fn retry_partial_batch(
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
id: Id,
failed_columns: HashSet<ColumnIndex>,
mut failed_peers: HashSet<PeerId>,
) -> Result<(), BackFillError> {
if let Some(batch) = self.batches.get_mut(&batch_id) {
failed_peers.extend(&batch.failed_peers());
let req = batch.to_blocks_by_range_request().0;

let synced_peers = network
.network_globals()
.peers
.read()
.synced_peers_for_epoch(batch_id)
.cloned()
.collect::<HashSet<_>>();

match network.retry_columns_by_range(
id,
&synced_peers,
&failed_peers,
req,
&failed_columns,
) {
Ok(_) => {
debug!(
?batch_id,
id, "Retried column requests from different peers"
);
return Ok(());
}
Err(e) => {
debug!(?batch_id, id, e, "Failed to retry partial batch");
}
}
} else {
return Err(BackFillError::InvalidSyncState(
"Batch should exist to be retried".to_string(),
));
}
Ok(())
}

/// When resuming a chain, this function searches for batches that need to be re-downloaded and
/// transitions their state to redownload the batch.
fn resume_batches(&mut self, network: &mut SyncNetworkContext<T>) -> Result<(), BackFillError> {
Expand Down
Loading