Skip to content
6 changes: 4 additions & 2 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,15 @@ impl<E: EthSpec> RpcBlock<E> {
custody_columns: Vec<CustodyDataColumn<E>>,
expected_custody_indices: Vec<ColumnIndex>,
spec: &ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
) -> Result<Self, String> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

let custody_columns_count = expected_custody_indices.len();
let inner = RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)?,
RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)
// This is an internal error that should never happen
.map_err(|e| format!("custody_columns variable list error: {e:?}"))?,
expected_custody_indices,
);
Ok(Self {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2418,7 +2418,8 @@ where
columns,
expected_custody_indices,
&self.spec,
)?
)
.map_err(BlockError::InternalError)?
} else {
RpcBlock::new_without_blobs(Some(block_root), block, sampling_column_count)
}
Expand Down
19 changes: 17 additions & 2 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::discovery::enr::PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY;
use crate::discovery::{peer_id_to_node_id, CombinedKey};
use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId};
use crate::{
metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub, PeerId, SyncInfo,
};
use itertools::Itertools;
use logging::crit;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
Expand All @@ -15,7 +17,7 @@ use std::{
use sync_status::SyncStatus;
use tracing::{debug, error, trace, warn};
use types::data_column_custody_group::compute_subnets_for_node;
use types::{ChainSpec, DataColumnSubnetId, EthSpec};
use types::{ChainSpec, DataColumnSubnetId, Epoch, EthSpec, Hash256, Slot};

pub mod client;
pub mod peer_info;
Expand Down Expand Up @@ -735,6 +737,19 @@ impl<E: EthSpec> PeerDB<E> {
},
);

self.update_sync_status(
&peer_id,
SyncStatus::Synced {
// Fill in mock SyncInfo, only for the peer to return `is_synced() == true`.
info: SyncInfo {
head_slot: Slot::new(0),
head_root: Hash256::ZERO,
finalized_epoch: Epoch::new(0),
finalized_root: Hash256::ZERO,
},
},
);

if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
Expand Down
22 changes: 17 additions & 5 deletions beacon_node/lighthouse_network/src/service/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ pub struct BlobsByRangeRequestId {
pub struct DataColumnsByRangeRequestId {
/// Id to identify this attempt at a data_columns_by_range request for `parent_request_id`
pub id: Id,
/// The Id of the parent custody by range request that issued this data_columns_by_range request
pub parent_request_id: CustodyByRangeRequestId,
}

#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct CustodyByRangeRequestId {
/// Id to identify this attempt at a meta custody by range request for `parent_request_id`
pub id: Id,
/// The Id of the overall By Range request for block components.
pub parent_request_id: ComponentsByRangeRequestId,
}
Expand Down Expand Up @@ -221,6 +229,7 @@ macro_rules! impl_display {
impl_display!(BlocksByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(BlobsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(DataColumnsByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(CustodyByRangeRequestId, "{}/{}", id, parent_request_id);
impl_display!(ComponentsByRangeRequestId, "{}/{}", id, requester);
impl_display!(DataColumnsByRootRequestId, "{}/{}", id, requester);
impl_display!(SingleLookupReqId, "{}/Lookup/{}", req_id, lookup_id);
Expand Down Expand Up @@ -299,14 +308,17 @@ mod tests {
fn display_id_data_columns_by_range() {
let id = DataColumnsByRangeRequestId {
id: 123,
parent_request_id: ComponentsByRangeRequestId {
parent_request_id: CustodyByRangeRequestId {
id: 122,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
parent_request_id: ComponentsByRangeRequestId {
id: 121,
requester: RangeRequestId::RangeSync {
chain_id: 54,
batch_id: Epoch::new(0),
},
},
},
};
assert_eq!(format!("{id}"), "123/122/RangeSync/0/54");
assert_eq!(format!("{id}"), "123/122/121/RangeSync/0/54");
}
}
19 changes: 19 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,25 @@ impl<E: EthSpec> NetworkGlobals<E> {
Self::new_test_globals_with_metadata(trusted_peers, metadata, config, spec)
}

pub fn new_test_globals_as_supernode(
trusted_peers: Vec<PeerId>,
config: Arc<NetworkConfig>,
spec: Arc<ChainSpec>,
is_supernode: bool,
) -> NetworkGlobals<E> {
let metadata = MetaData::V3(MetaDataV3 {
seq_number: 0,
attnets: Default::default(),
syncnets: Default::default(),
custody_group_count: if is_supernode {
spec.number_of_custody_groups
} else {
spec.custody_requirement
},
});
Self::new_test_globals_with_metadata(trusted_peers, metadata, config, spec)
}

pub(crate) fn new_test_globals_with_metadata(
trusted_peers: Vec<PeerId>,
metadata: MetaData<E>,
Expand Down
34 changes: 18 additions & 16 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::service::api_types::Id;
use lighthouse_network::types::{BackFillState, NetworkGlobals};
use lighthouse_network::{PeerAction, PeerId};
use lighthouse_network::PeerAction;
use logging::crit;
use std::collections::{
btree_map::{BTreeMap, Entry},
HashSet,
HashMap, HashSet,
};
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
Expand Down Expand Up @@ -311,7 +311,6 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
&mut self,
network: &mut SyncNetworkContext<T>,
batch_id: BatchId,
peer_id: &PeerId,
request_id: Id,
err: RpcResponseError,
) -> Result<(), BackFillError> {
Expand All @@ -325,11 +324,18 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
debug!(batch_epoch = %batch_id, error = ?err, "Batch download failed");
match batch.download_failed(Some(*peer_id)) {
// TODO(das): Is it necessary for the batch to track failed peers? Can we make this
// mechanism compatible with PeerDAS and before PeerDAS?
match batch.download_failed(None) {
Err(e) => self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0)),
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => {
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
}
Ok(BatchOperationOutcome::Failed { blacklist: _ }) => self.fail_sync(match err {
RpcResponseError::RpcError(_)
| RpcResponseError::VerifyError(_)
| RpcResponseError::InternalError(_) => {
BackFillError::BatchDownloadFailed(batch_id)
}
RpcResponseError::RequestExpired(_) => BackFillError::Paused,
}),
Ok(BatchOperationOutcome::Continue) => self.send_batch(network, batch_id),
}
} else {
Expand Down Expand Up @@ -920,6 +926,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
RangeRequestId::BackfillSync { batch_id },
&synced_peers,
&failed_peers,
// Does not track total requests per peers for now
&HashMap::new(),
) {
Ok(request_id) => {
// inform the batch about the new request
Expand All @@ -931,15 +939,9 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
return Ok(());
}
Err(e) => match e {
RpcRequestSendError::NoPeer(no_peer) => {
// If we are here the chain has no more synced peers
info!(
"reason" = format!("insufficient_synced_peers({no_peer:?})"),
"Backfill sync paused"
);
self.set_state(BackFillState::Paused);
return Err(BackFillError::Paused);
}
// TODO(das): block_components_by_range requests can now hang out indefinitely.
// Is that fine? Maybe we should fail the requests from the network_context
// level without involving the BackfillSync itself.
RpcRequestSendError::InternalError(e) => {
// NOTE: under normal conditions this shouldn't happen but we handle it anyway
warn!(%batch_id, error = ?e, %batch,"Could not send batch request");
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(?id, "Block returned for single block lookup not present");
debug!(%id, "Block returned for single block lookup not present");
return Err(LookupRequestError::UnknownLookup);
};

Expand All @@ -507,7 +507,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
Ok((response, peer_group, seen_timestamp)) => {
debug!(
?block_root,
?id,
%id,
?peer_group,
?response_type,
"Received lookup download success"
Expand Down Expand Up @@ -540,7 +540,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
// the peer and the request ID which is linked to this `id` value here.
debug!(
?block_root,
?id,
%id,
?response_type,
error = ?e,
"Received lookup download failure"
Expand Down
Loading