Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,12 +793,39 @@ impl<E: EthSpec> PeerDB<E> {
);
}

/// Updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_testing_only(
/// Adds a connected peer to the PeerDB and sets the custody subnets.
/// WARNING: This updates the connection state. MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer_with_custody_subnets(
&mut self,
supernode: bool,
spec: &ChainSpec,
enr_key: CombinedKey,
) -> PeerId {
let peer_id = self.__add_connected_peer(supernode, enr_key, spec);

let subnets = if supernode {
(0..spec.data_column_sidecar_subnet_count)
.map(|subnet_id| subnet_id.into())
.collect()
} else {
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
.expect("should compute custody subnets")
};

let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
peer_info.set_custody_subnets(subnets);

peer_id
}

/// Adds a connected peer to the PeerDB and updates the connection state.
/// MUST ONLY BE USED IN TESTS.
pub fn __add_connected_peer(
&mut self,
supernode: bool,
enr_key: CombinedKey,
spec: &ChainSpec,
) -> PeerId {
let mut enr = Enr::builder().build(&enr_key).unwrap();
let peer_id = enr.peer_id();
Expand Down Expand Up @@ -835,24 +862,21 @@ impl<E: EthSpec> PeerDB<E> {
},
);

if supernode {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let all_subnets = (0..spec.data_column_sidecar_subnet_count)
.map(|subnet_id| subnet_id.into())
.collect();
peer_info.set_custody_subnets(all_subnets);
} else {
let peer_info = self.peers.get_mut(&peer_id).expect("peer exists");
let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id");
let subnets =
compute_subnets_for_node::<E>(node_id.raw(), spec.custody_requirement, spec)
.expect("should compute custody subnets");
peer_info.set_custody_subnets(subnets);
}

peer_id
}

/// MUST ONLY BE USED IN TESTS.
pub fn __set_custody_subnets(
&mut self,
peer_id: &PeerId,
custody_subnets: HashSet<DataColumnSubnetId>,
) -> Result<(), String> {
self.peers
.get_mut(peer_id)
.map(|info| info.set_custody_subnets(custody_subnets))
.ok_or_else(|| "Cannot set custody subnets, peer not found".to_string())
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
/// variables are in sync with libp2p.
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,7 +1247,7 @@ mod tests {
let peer_id = network_globals
.peers
.write()
.__add_connected_peer_testing_only(
.__add_connected_peer_with_custody_subnets(
true,
&beacon_chain.spec,
k256::ecdsa::SigningKey::random(&mut rng).into(),
Expand Down
34 changes: 30 additions & 4 deletions beacon_node/network/src/sync/tests/lookups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ use lighthouse_network::{
types::SyncState,
};
use slot_clock::{SlotClock, TestingSlotClock};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::info;
use types::{
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, ForkContext, ForkName, Hash256,
MinimalEthSpec as E, SignedBeaconBlock, Slot,
BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSubnetId,
ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot,
};

const D: Duration = Duration::new(0, 0);
Expand Down Expand Up @@ -1454,7 +1455,7 @@ impl TestRig {
.network_globals
.peers
.write()
.__add_connected_peer_testing_only(false, &self.harness.spec, key);
.__add_connected_peer_with_custody_subnets(false, &self.harness.spec, key);

// Assumes custody subnet count == column count
let custody_subnets = self
Expand Down Expand Up @@ -1485,13 +1486,38 @@ impl TestRig {
.network_globals
.peers
.write()
.__add_connected_peer_testing_only(true, &self.harness.spec, key);
.__add_connected_peer_with_custody_subnets(true, &self.harness.spec, key);
self.log(&format!(
"Added new peer for testing {peer_id:?}, custody: supernode"
));
peer_id
}

/// Add a connected supernode peer, but without setting the peers' custody subnet.
/// This is to simulate the real behaviour where metadata is only received some time after
/// a connection is established.
pub fn new_connected_supernode_peer_no_metadata_custody_subnet(&mut self) -> PeerId {
let key = self.determinstic_key();
self.network_globals
.peers
.write()
.__add_connected_peer(true, key, &self.harness.spec)
}

/// Update the peer's custody subnet in PeerDB and send a `UpdatedPeerCgc` message to sync.
pub fn send_peer_cgc_update_to_sync(
&mut self,
peer_id: &PeerId,
subnets: HashSet<DataColumnSubnetId>,
) {
self.network_globals
.peers
.write()
.__set_custody_subnets(peer_id, subnets)
.unwrap();
self.send_sync_message(SyncMessage::UpdatedPeerCgc(*peer_id))
}

fn determinstic_key(&mut self) -> CombinedKey {
k256::ecdsa::SigningKey::random(&mut self.rng_08).into()
}
Expand Down
76 changes: 75 additions & 1 deletion beacon_node/network/src/sync/tests/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::sync::range_sync::RangeSyncType;
use lighthouse_network::rpc::RPCError;
use lighthouse_network::rpc::methods::StatusMessageV2;
use lighthouse_network::{PeerId, SyncInfo};
use std::collections::HashSet;
use types::{Epoch, EthSpec, Hash256, MinimalEthSpec as E, Slot};

/// MinimalEthSpec has 8 slots per epoch
Expand All @@ -50,7 +51,7 @@ impl TestRig {
finalized_root: Hash256::random(),
head_slot: finalized_epoch.start_slot(E::slots_per_epoch()),
head_root: Hash256::random(),
earliest_available_slot: None,
earliest_available_slot: Some(Slot::new(0)),
}
}

Expand Down Expand Up @@ -476,3 +477,76 @@ async fn not_enough_custody_peers_then_peers_arrive() {
r.simulate(SimulateConfig::happy_path()).await;
r.assert_range_sync_completed();
}

/// This is a regression test for the following race condition scenario:
/// 1. A node is connected to 3 supernode peers: peer 1 is synced, & peer 2 and 3 are advanced.
/// 2. No metadata has been received yet (i.e. no custody info), so the node cannot start data
/// column range sync yet.
/// 3. Now peer 1 sends the CGC via metadata response, we now have one peer on all custody subnets,
/// BUT not on the finalized syncing chain.
/// 4. The node tries to `send_batch` but fails repeatedly with `NoPeers`, as there's no peer
/// that is able to serve columns for the advanced epochs. The chain is removed after 5 failed attempts.
/// 5. Now peer 2 & 3 send CGC updates, BUT because there's no syncing chain, nothing happens -
/// sync is stuck until finding new peers.
///
/// The expected behaviour in this scenario should be:
/// 4. not finding suitable peers, chain is kept and batch remains in AwaitingDownload
/// 5. finalized sync should resume as soon as CGC updates are received from peer 2 or 3.
#[tokio::test]
async fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() {
let mut r = TestRig::default();
if !r.fork_name.fulu_enabled() {
return;
}

// GIVEN: the node is connected to 3 supernode peers:
let advanced_epochs: usize = 2;
let sync_epochs = advanced_epochs + 3;
let sync_slots = sync_epochs * SLOTS_PER_EPOCH - 1;
r.build_chain(sync_slots).await;
r.harness.set_current_slot(Slot::new(sync_slots as u64 + 1));

// Peer 1 is synced (same finalized epoch), but its earliest available slot means it
// cannot serve the batches needed for this sync.
let peer_1 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
let mut remote_info = r.local_info().clone();
remote_info.earliest_available_slot = Some(Slot::new(sync_slots as u64));
r.send_sync_message(SyncMessage::AddPeer(peer_1, remote_info));

// Peer 2 is advanced (local finalized epoch + 2)
let peer_2 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
let remote_info = r.finalized_remote_info_advanced_by((advanced_epochs as u64).into());
r.send_sync_message(SyncMessage::AddPeer(peer_2, remote_info.clone()));
// We expect a finalized chain to be created with peer 2, but no requests sent out yet due to missing custody info.
r.assert_state(RangeSyncType::Finalized);
r.assert_empty_network();

// Peer 3 is connected and advanced
let peer_3 = r.new_connected_supernode_peer_no_metadata_custody_subnet();
r.send_sync_message(SyncMessage::AddPeer(peer_3, remote_info));
// We are still in finalized sync state (now with peer 3 added)
r.assert_state(RangeSyncType::Finalized);

for (i, p) in [peer_1, peer_2, peer_3].iter().enumerate() {
let peer_idx = i + 1;
r.log(&format!("Peer {peer_idx}: {p:?}"));
}

// WHEN: peer 1 sends its CGC via metadata response
let all_custody_subnets = (0..r.harness.spec.data_column_sidecar_subnet_count)
.map(|i| i.into())
.collect::<HashSet<_>>();
r.send_peer_cgc_update_to_sync(&peer_1, all_custody_subnets.clone());

// We still don't have any peers on the syncing chain with custody columns (only peer 1)
// The node won't send the batch and will remain in the finalized sync state (this was failing before!)
r.assert_state(RangeSyncType::Finalized);
r.assert_empty_network();

// Now we receive peer 2 & 3's CGC updates, the node will resume syncing from these two peers
r.send_peer_cgc_update_to_sync(&peer_2, all_custody_subnets.clone());
r.send_peer_cgc_update_to_sync(&peer_3, all_custody_subnets);

r.simulate(SimulateConfig::happy_path()).await;
r.assert_range_sync_completed();
}
Loading