From d1abec125aefb7d6ef4e2bdc447f906cee71924a Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 2 Mar 2026 13:38:15 +1100 Subject: [PATCH 1/3] Regression test for range sync CGC race condition. --- .../src/peer_manager/peerdb.rs | 58 ++++++++++---- .../network/src/sync/backfill_sync/mod.rs | 2 +- beacon_node/network/src/sync/tests/lookups.rs | 34 +++++++- beacon_node/network/src/sync/tests/range.rs | 78 +++++++++++++++++-- 4 files changed, 145 insertions(+), 27 deletions(-) diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs index 11ce7853507..937278b23ab 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb.rs @@ -793,12 +793,39 @@ impl PeerDB { ); } - /// Updates the connection state. MUST ONLY BE USED IN TESTS. - pub fn __add_connected_peer_testing_only( + /// Adds a connected peer to the PeerDB and sets the custody subnets. + /// WARNING: This updates the connection state. MUST ONLY BE USED IN TESTS. + pub fn __add_connected_peer_with_custody_subnets( &mut self, supernode: bool, spec: &ChainSpec, enr_key: CombinedKey, + ) -> PeerId { + let peer_id = self.__add_connected_peer(supernode, enr_key, spec); + + let subnets = if supernode { + (0..spec.data_column_sidecar_subnet_count) + .map(|subnet_id| subnet_id.into()) + .collect() + } else { + let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); + compute_subnets_for_node::(node_id.raw(), spec.custody_requirement, spec) + .expect("should compute custody subnets") + }; + + let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); + peer_info.set_custody_subnets(subnets); + + peer_id + } + + /// Adds a connected peer to the PeerDB and updates the connection state. + /// MUST ONLY BE USED IN TESTS. + pub fn __add_connected_peer( + &mut self, + supernode: bool, + enr_key: CombinedKey, + spec: &ChainSpec, ) -> PeerId { let mut enr = Enr::builder().build(&enr_key).unwrap(); let peer_id = enr.peer_id(); @@ -835,24 +862,21 @@ impl PeerDB { }, ); - if supernode { - let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); - let all_subnets = (0..spec.data_column_sidecar_subnet_count) - .map(|subnet_id| subnet_id.into()) - .collect(); - peer_info.set_custody_subnets(all_subnets); - } else { - let peer_info = self.peers.get_mut(&peer_id).expect("peer exists"); - let node_id = peer_id_to_node_id(&peer_id).expect("convert peer_id to node_id"); - let subnets = - compute_subnets_for_node::(node_id.raw(), spec.custody_requirement, spec) - .expect("should compute custody subnets"); - peer_info.set_custody_subnets(subnets); - } - peer_id } + /// MUST ONLY BE USED IN TESTS. + pub fn __set_custody_subnets( + &mut self, + peer_id: &PeerId, + custody_subnets: HashSet, + ) -> Result<(), String> { + self.peers + .get_mut(peer_id) + .map(|info| info.set_custody_subnets(custody_subnets)) + .ok_or("Cannot set custody subnets, peer not found".to_string()) + } + /// The connection state of the peer has been changed. Modify the peer in the db to ensure all /// variables are in sync with libp2p. /// Updating the state can lead to a `BanOperation` which needs to be processed via the peer diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 7ef72c7f3a3..502715d0a16 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -1229,7 +1229,7 @@ mod tests { let peer_id = network_globals .peers .write() - .__add_connected_peer_testing_only( + .__add_connected_peer_with_custody_subnets( true, &beacon_chain.spec, k256::ecdsa::SigningKey::random(&mut rng).into(), diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 769a11d9767..a904338aa73 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -28,13 +28,14 @@ use lighthouse_network::{ types::SyncState, }; use slot_clock::{SlotClock, TestingSlotClock}; +use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; use tracing::info; use types::{ - BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, EthSpec, ForkContext, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + BlobSidecar, BlockImportSource, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, EthSpec, + ForkContext, ForkName, Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, test_utils::{SeedableRng, XorShiftRng}, }; @@ -1311,7 +1312,7 @@ impl TestRig { .network_globals .peers .write() - .__add_connected_peer_testing_only(false, &self.harness.spec, key); + .__add_connected_peer_with_custody_subnets(false, &self.harness.spec, key); // Assumes custody subnet count == column count let custody_subnets = self @@ -1342,13 +1343,38 @@ impl TestRig { .network_globals .peers .write() - .__add_connected_peer_testing_only(true, &self.harness.spec, key); + .__add_connected_peer_with_custody_subnets(true, &self.harness.spec, key); self.log(&format!( "Added new peer for testing {peer_id:?}, custody: supernode" )); peer_id } + /// Add a connected supernode peer, but without setting the peers' custody subnet. + /// This is to simulate the real behaviour where metadata is only received some time after + /// a connection is established. + pub fn new_connected_supernode_peer_no_metadata_custody_subnet(&mut self) -> PeerId { + let key = self.determinstic_key(); + self.network_globals + .peers + .write() + .__add_connected_peer(true, key, &self.harness.spec) + } + + /// Update the peer's custody subnet in PeerDB and send a `UpdatedPeerCgc` message to sync. + pub fn send_peer_cgc_update_to_sync( + &mut self, + peer_id: &PeerId, + subnets: HashSet, + ) { + self.network_globals + .peers + .write() + .__set_custody_subnets(peer_id, subnets) + .unwrap(); + self.send_sync_message(SyncMessage::UpdatedPeerCgc(*peer_id)) + } + fn determinstic_key(&mut self) -> CombinedKey { k256::ecdsa::SigningKey::random(&mut self.rng_08).into() } diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 67395ccd25a..8862f20e906 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -9,7 +9,7 @@ use beacon_chain::BeaconChain; use beacon_chain::block_verification_types::AvailableBlockData; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::data_column_verification::CustodyDataColumn; -use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; +use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy, test_spec}; use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock}; use beacon_processor::WorkType; use lighthouse_network::rpc::RequestType; @@ -22,10 +22,11 @@ use lighthouse_network::service::api_types::{ SyncRequestId, }; use lighthouse_network::{PeerId, SyncInfo}; +use std::collections::HashSet; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, - SignedBeaconBlock, SignedBeaconBlockHash, Slot, + BlobSidecarList, BlockImportSource, DataColumnSubnetId, Epoch, EthSpec, Hash256, + MinimalEthSpec as E, SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; const D: Duration = Duration::new(0, 0); @@ -101,7 +102,7 @@ impl TestRig { finalized_root, head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), head_root: Hash256::random(), - earliest_available_slot: None, + earliest_available_slot: Some(Slot::new(0)), }) } @@ -113,7 +114,7 @@ impl TestRig { finalized_root: Hash256::random(), head_slot: finalized_epoch.start_slot(E::slots_per_epoch()), head_root: Hash256::random(), - earliest_available_slot: None, + earliest_available_slot: Some(Slot::new(0)), } } @@ -631,3 +632,70 @@ fn finalized_sync_not_enough_custody_peers_on_start() { let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; r.complete_and_process_range_sync_until(last_epoch, filter()); } + +/// This is a regression test for the following race condition scenario: +/// 1. A node is connected to 3 supernode peers: peer 1 is synced, & peer 2 and 3 are advanced. +/// 2. No metadata has been received yet (i.e. no custody info), so the node cannot start data +/// column range sync yet. +/// 3. Now peer 1 sends the CGC via metadata response, we now have one peer on all custody subnets, +/// BUT not on the finalized syncing chain. +/// 4. The node tries to `send_batch` but fails repeatedly with `NoPeers`, as there's no peer +/// that is able to serve columns for the advanced epochs. The chain is removed after 5 failed attempts. +/// 5. Now peer 2 & 3 send CGC updates, BUT because there's no syncing chain, nothing happens - +/// sync is stuck until finding new peers. +/// +/// The expected behaivour in this scenario should be: +/// 4. not finding suitable peers, chain is kept and batch remains in AwaitingDownload +/// 5. finalized sync should resume as soon as CGC updates are received from peer 2 or 3. +#[test] +fn finalized_sync_not_enough_custody_peers_resume_after_peer_cgc_update() { + // Only run post-PeerDAS + if !test_spec::().is_fulu_scheduled() { + return; + } + let mut r = TestRig::default(); + + // GIVEN: the node is connected to 3 supernode peers: + + // Peer 1 is synced (same finalized epoch) + let peer_1 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + let remote_info = r.local_info().clone(); + r.send_sync_message(SyncMessage::AddPeer(peer_1, remote_info)); + + // Peer 2 is advanced (local finalized epoch + 2) + let advanced_epochs: u64 = 2; + let peer_2 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + let remote_info = r.finalized_remote_info_advanced_by(advanced_epochs.into()); + r.send_sync_message(SyncMessage::AddPeer(peer_2, remote_info.clone())); + // We expect a finalized chain to be created with peer 2, but no requests sent out yet due to missing custody info. + r.assert_state(RangeSyncType::Finalized); + r.assert_empty_network(); + + // Peer 3 is connected and advanced + let peer_3 = r.new_connected_supernode_peer_no_metadata_custody_subnet(); + r.send_sync_message(SyncMessage::AddPeer(peer_3, remote_info)); + // We are still in finalized sync state (now with peer 3 added) + r.assert_state(RangeSyncType::Finalized); + + for (i, p) in [peer_1, peer_2, peer_3].iter().enumerate() { + let peer_idx = i + 1; + r.log(&format!("Peer {peer_idx}: {p:?}")); + } + + // WHEN: peer 1 sends its CGC via metadata response + let all_custody_subnets = (0..r.harness.spec.data_column_sidecar_subnet_count) + .map(|i| i.into()) + .collect::>(); + r.send_peer_cgc_update_to_sync(&peer_1, all_custody_subnets.clone()); + + // We still don't have any peers on the syncing chain with custody columns (peer 1 & 2) + // The node won't send the batch and will remain in the finalized sync state (this was failing before!) + r.assert_state(RangeSyncType::Finalized); + + // Now we receive peer 2 & 3's CGC updates, the node will resume syncing from these two peers + r.send_peer_cgc_update_to_sync(&peer_2, all_custody_subnets.clone()); + r.send_peer_cgc_update_to_sync(&peer_3, all_custody_subnets); + + let last_epoch = advanced_epochs + EXTRA_SYNCED_EPOCHS; + r.complete_and_process_range_sync_until(last_epoch, filter()); +} From b524f01f56f3cf3f9bfb6806a4dc825dd99127d8 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 2 Mar 2026 22:47:38 +1100 Subject: [PATCH 2/3] Fix clippy --- beacon_node/network/src/sync/tests/range.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 8862f20e906..18ae31910e1 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -25,7 +25,7 @@ use lighthouse_network::{PeerId, SyncInfo}; use std::collections::HashSet; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, DataColumnSubnetId, Epoch, EthSpec, Hash256, + BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; From 8bc7472026e3b839c27c4dd1ff11b94265509f86 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 2 Mar 2026 22:55:41 +1100 Subject: [PATCH 3/3] Fix fmt --- beacon_node/network/src/sync/tests/range.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 18ae31910e1..1d7e72294d7 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -25,8 +25,8 @@ use lighthouse_network::{PeerId, SyncInfo}; use std::collections::HashSet; use std::time::Duration; use types::{ - BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, - MinimalEthSpec as E, SignedBeaconBlock, SignedBeaconBlockHash, Slot, + BlobSidecarList, BlockImportSource, Epoch, EthSpec, Hash256, MinimalEthSpec as E, + SignedBeaconBlock, SignedBeaconBlockHash, Slot, }; const D: Duration = Duration::new(0, 0);