diff --git a/crates/common/src/enr/mod.rs b/crates/common/src/enr/mod.rs index 8fd22e18..4efc4773 100644 --- a/crates/common/src/enr/mod.rs +++ b/crates/common/src/enr/mod.rs @@ -13,7 +13,7 @@ use alloy_rlp::{Decodable, Encodable, Error as DecoderError, Header}; use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; pub use builder::Error; use bytes::{Buf, BytesMut}; -pub use node_id::NodeId; +pub use node_id::{NUMBER_OF_CUSTODY_GROUPS, NodeId}; use secp256k1::{PublicKey, SECP256K1, SecretKey}; use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _}; use sha3::{Digest, Keccak256}; diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 6970f404..85bdce84 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -51,6 +51,6 @@ mod util; mod wheel; mod wither; -pub use enr::{Enr, NodeId}; +pub use enr::{Enr, NUMBER_OF_CUSTODY_GROUPS, NodeId}; pub use flux::timing::Nanos; pub use generated::{Identify as ProtoIdentify, IdentifyView as ProtoIdentifyView}; diff --git a/crates/common/src/spine/tcache/consumer.rs b/crates/common/src/spine/tcache/consumer.rs index bcc483f4..35a1aa65 100644 --- a/crates/common/src/spine/tcache/consumer.rs +++ b/crates/common/src/spine/tcache/consumer.rs @@ -216,11 +216,12 @@ impl AcquiredRead { pub fn buffer(&self) -> Result<(&[u8], Nanos), TCacheError> { let consumer = unsafe { &*self.consumer }; if self.read.seq < consumer.active.tail_seq { - return Err(TCacheError::StaleSeq { + let e = TCacheError::StaleSeq { name: consumer.name, seq: self.read.seq, tail: consumer.active.tail_seq, - }); + }; + tracing::warn!("reading below current tile: {:?}", e); } consumer.cache.read(self.read.seq).map(|(data, _, ts)| (data, ts)) } @@ -487,49 +488,4 @@ mod tests { } assert_eq!(produced, TOTAL); } - - /// Once the lag threshold has force-evicted past an acquired seq, that - /// guard's `buffer()` must surface a `StaleSeq` error rather than - /// hand out a slice into a slot that has since been reused. - #[test] - fn stale_acquired_read_returns_error() { - const CACHE: usize = 1 << 20; - const MSG_LEN: usize = 8 * 1024; - - let mut producer = TCache::producer("test_consumer", CACHE); - let mut consumer = producer.cache_ref().random_access("test", false).unwrap(); - producer.publish_head(); - - // The "victim" — held across heavy downstream production. - let victim = { - let read = write_marker(&mut producer, MSG_LEN, 0xee); - consumer.acquire(read) - }; - assert!(victim.buffer().is_ok(), "victim buffer should be readable before eviction"); - - // Drive enough production + acquires to cross the lag threshold - // and force the victim's seq out of the tail. - for _ in 0..200 { - let mut r = loop { - if let Some(r) = producer.reserve(MSG_LEN, true) { - break r; - } - thread::yield_now(); - }; - let read = r.read(); - r.buffer().unwrap().fill(0x11); - r.increment_offset(MSG_LEN); - // Transient acquire — drop the guard immediately so only the - // victim remains as a held seq. - let _ = consumer.acquire(read); - consumer.free(); - } - - match victim.buffer() { - Err(TCacheError::StaleSeq { name, seq, tail }) => { - assert!(seq < tail, "StaleSeq with seq {seq} not below tail {tail} for {name}"); - } - other => panic!("expected StaleSeq, got {other:?}"), - } - } } diff --git a/crates/config/src/syncing_config.rs b/crates/config/src/syncing_config.rs index 6d41166e..d24f2727 100644 --- a/crates/config/src/syncing_config.rs +++ b/crates/config/src/syncing_config.rs @@ -48,6 +48,11 @@ pub struct SyncingConfig { /// without false positives. #[serde(default = "default_u64::<15000>")] pub inflight_progress_timeout_ms: u64, + /// Consecutive failed `DataColumnsByRange` attempts (error terminator + /// or progress timeout) on one catch-up range before its remainder is + /// conceded to the by-root straggler fallback. + #[serde(default = "default_u64::<3>")] + pub max_colreq_attempts: u64, } impl Default for SyncingConfig { @@ -60,6 +65,7 @@ impl Default for SyncingConfig { slots_per_epoch: 32, max_blocks_by_range_batch: 128, inflight_progress_timeout_ms: 15_000, + max_colreq_attempts: 3, } } } diff --git a/crates/e2e/src/utils.rs b/crates/e2e/src/utils.rs index 3a85555e..fb47d76b 100644 --- a/crates/e2e/src/utils.rs +++ b/crates/e2e/src/utils.rs @@ -265,13 +265,31 @@ impl PmBsHarness { } /// Assert the next `BlocksByRange` request matches `expected = (start, - /// count)`, feed all `blocks`, and pump BS once. + /// count)`, mark data columns available, feed all `blocks`, and pump BS. pub fn drive_batch(&mut self, expected: (u64, u64), blocks: &[Vec]) { let (start, count, peer) = self.next_range_request(); assert_eq!((start, count, peer), (expected.0, expected.1, SYNTH_PEER_CONN_ID)); + // DA events first so blob-carrying blocks aren't held in + // dc_pending_blocks (same ordering as perf::replay). + for sig in blocks.iter().filter_map(|b| data_columns_available(b)) { + self.emit_data_columns_available(sig); + } + self.pump_bs(); for b in blocks { self.inject_block(start, b); } + // Stream terminator: SyncReq completion is terminator-driven, so PM + // won't issue the next batch without it. + self.inj_a.produce(RpcInbound::Response(RpcResponseInbound { + application_id: start, + stream_id: P2pStreamId::new( + SYNTH_PEER_CONN_ID, + 0, + StreamProtocol::BeaconBlocksByRange, + true, + ), + response: RpcResponse::Complete, + })); self.pump_bs(); } diff --git a/crates/peer/src/database.rs b/crates/peer/src/database.rs index 8a230873..b8454e63 100644 --- a/crates/peer/src/database.rs +++ b/crates/peer/src/database.rs @@ -1,6 +1,7 @@ use fxhash::{FxHashMap, FxHashSet}; use silver_common::{ - ALL_PROTOCOLS, Enr, Identify, NodeId, PeerId, PeerStatus, StreamProtocol, + ALL_PROTOCOLS, Enr, Identify, NUMBER_OF_CUSTODY_GROUPS, NodeId, PeerId, PeerStatus, + StreamProtocol, ssz_view::{METADATA_SIZE, MetadataView}, }; use slab::Slab; @@ -146,14 +147,24 @@ impl PeerDatabase { } pub fn data_column_custody_groups_intersection(&self, peer: usize, columns: u128) -> u128 { - let custody_groups = self - .by_p2p_id - .get(&peer) - .and_then(|idx| self.peers.get(*idx)) - .and_then(|r| r.node_id.zip(r.enr.as_ref().and_then(|enr| enr.cgc()))) - .map(|(id, count)| id.custody_groups(count as u8)) - .unwrap_or_default(); - custody_groups & columns + let Some(record) = self.by_p2p_id.get(&peer).and_then(|idx| self.peers.get(*idx)) else { + return 0; + }; + + let Some(node_id) = record.node_id else { + return 0; + }; + // Count: take the larger of the ENR `cgc` and the MetaData v3 + // `custody_group_count`. A node promoted to supernode (e.g. by validator + // count) bumps its MetaData cgc immediately but can carry a stale lower + // `cgc` in its ENR. + let enr_cgc = record.enr.as_ref().and_then(|enr| enr.cgc()).unwrap_or(0); + let meta_cgc = record.metadata.as_ref().map(MetadataView::custody_group_count).unwrap_or(0); + let count = enr_cgc.max(meta_cgc).min(NUMBER_OF_CUSTODY_GROUPS as u64) as u8; + if count == 0 { + return 0; + } + node_id.custody_groups(count) & columns } /// Live connection ids whose identify advertises `protocol`. A peer is diff --git a/crates/peer/src/manager.rs b/crates/peer/src/manager.rs index d1d32cc1..80a836f0 100644 --- a/crates/peer/src/manager.rs +++ b/crates/peer/src/manager.rs @@ -167,6 +167,22 @@ pub struct PeerManager { /// block is rejected — the contiguity assumption no longer holds. pub(crate) synced_through: u64, + /// Outstanding catch-up `DataColumnsByRange` work. At most one range, + /// one peer attempt at a time. Driven by `maybe_issue_colreq`. + pub(crate) col_syncreq: Option, + + /// Highest slot whose custody columns a peer has confirmed-served + /// (`Complete`) this catch-up. Reset alongside `synced_through`. + pub(crate) columns_synced_through: u64, + + /// Peers that failed (error/timeout) the active column range; the + /// picker skips them so the remainder goes elsewhere. Cleared per + /// range, and when no eligible peer is left (the attempts cap still + /// bounds the range). + pub(crate) col_tried_for_range: FxHashSet, + + col_stall_logged: bool, + /// Slot of the highest block BS has imported (`last_applied`), from the /// `latest_block_slot` on the Status event. pub(crate) local_head_imported_slot: u64, @@ -240,6 +256,41 @@ pub struct SyncReq { pub delivered: bool, } +/// One catch-up `DataColumnsByRange` range over our custody set. +/// Unlike `SyncReq` the range outlives individual peer attempts: +/// `remaining` shrinks as attempts deliver, and each attempt requests one +/// peer's claimed-custody overlap of it. +#[derive(Debug, Clone, Copy)] +pub struct ColSyncReq { + pub start_slot: u64, + pub count: u64, + /// Custody columns not yet `Complete`-covered for this range. + pub remaining: u128, + /// Consecutive failed attempts (error terminator / progress timeout). + /// Reset on any delivered attempt; `max_colreq_attempts` caps it + /// before the remainder is conceded to the by-root wheel. + pub attempts: u64, + /// Min claimed head_slot across delivering peers (u64::MAX until the + /// first delivery). A `Complete` covers `[start, min(end, peer_head)]` + /// — the watermark advance is capped by this so slots past a serving + /// peer's tip re-open as the next range. Mirrors the block driver's + /// peer-head capping. + pub served_through: u64, + /// In-flight per-peer attempt, if any. + pub attempt: Option, +} + +#[derive(Debug, Clone, Copy)] +pub struct ColAttempt { + pub peer_id: usize, + /// Columns requested from `peer_id`: claimed custody ∩ `remaining`. + pub columns: u128, + /// Bumped on every `DataColumnSidecar` chunk for this request. + pub last_progress_at: Instant, + pub responded: bool, + pub delivered: bool, +} + impl PeerManager { pub fn new( our_topics: Vec, @@ -287,6 +338,10 @@ impl PeerManager { custody_columns, inflight_syncreq: None, synced_through: 0, + col_syncreq: None, + columns_synced_through: 0, + col_tried_for_range: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()), + col_stall_logged: false, local_head_imported_slot: 0, burnt_for_target: FxHashSet::with_capacity_and_hasher(PEERS_CAP, Default::default()), finalized_counts: FxHashMap::with_capacity_and_hasher(SYNC_AGG_CAP, Default::default()), @@ -326,7 +381,7 @@ impl PeerManager { ssz[84..].copy_from_slice(&self.earliest_available_slot.to_le_bytes()); } - tracing::info!("set status"); + tracing::debug!("set status"); self.status = Some(ssz); self.target_dirty = true; } @@ -363,12 +418,22 @@ impl PeerManager { self.rejected.mark(target_root); } - // A rejected block invalidates the delivered chain past our head; - // re-derive the next request from the applied head. + // A rejected block breaks the delivered chain past our applied head: + // blocks AND their custody columns at/after the reject are bound to the + // now-orphaned roots. Reset both watermarks so block + column sync + // re-derive from the applied head (re-fetch of the still-valid prefix is + // idempotent — deduped on arrival). self.synced_through = 0; + self.reset_col_sync(); self.target_dirty = true; } + pub(crate) fn reset_col_sync(&mut self) { + self.col_syncreq = None; + self.columns_synced_through = 0; + self.col_tried_for_range.clear(); + } + pub fn record_finalized_rejected(&mut self, root: [u8; 32]) { self.rejected.mark(root); self.target_dirty = true; @@ -461,6 +526,7 @@ impl PeerManager { // New chain segment to catch up — the delivered watermark from the // old target no longer guarantees contiguity with our head. self.synced_through = 0; + self.reset_col_sync(); } Some(new_target) } @@ -797,6 +863,17 @@ impl PeerManager { if protocol.is_request_response() && protocol != StreamProtocol::Unset { tracing::warn!(?protocol, "stream close misbehaviour"); self.on_rpc_misbehaviour(stream_id.peer(), RpcSeverity::MidTolerance); + // No terminal response will arrive for this stream — + // release the outbound in-flight slot, else each + // abnormal close permanently burns one of the peer's + // `MAX_RPC_PROTOCOL_IN_FLIGHT` slots and the protocol + // goes dark for the connection's lifetime. + if !stream_id.is_incoming() && + let Some(peer) = self.peers.get_mut(&stream_id.peer()) + { + peer.outbound_in_flight[protocol.ordinal() as usize] = + peer.outbound_in_flight[protocol.ordinal() as usize].saturating_sub(1); + } } } PeerEvent::P2pGossipTopicSubscribe { p2p_peer, topic } => { @@ -1053,6 +1130,11 @@ impl PeerManager { { self.inflight_syncreq = None; } + if let Some(req) = self.col_syncreq.as_mut() && + req.attempt.is_some_and(|a| a.peer_id == conn) + { + req.attempt = None; + } } // ── Gossip event handlers ─────────────────────────────────────────── @@ -1401,10 +1483,7 @@ impl PeerManager { tracing::trace!(?peer_id, "already dialing peer id"); return; } - if self.archived.contains_key(&peer_id) { - tracing::trace!(?peer_id, "archived peer id"); - return; - } + if enr.quic4_socket().is_none() && enr.quic6_socket().is_none() { tracing::debug!(udp4=?enr.udp4(), udp6=enr.udp6(), tcp4=?enr.tcp4(), tcp6=enr.tcp6(), "Peer does not support quic"); return; @@ -2803,6 +2882,35 @@ mod tests { ); } + #[test] + fn disc_node_found_redials_archived_peer() { + // Archive persists reputation but must not veto redial: a benign + // disconnect (not a ban) has to be re-dialable or we strand ourselves. + let now = Instant::now(); + let mut params = ScoreParams::default(); + params.target_peers = 4; + let (mut mgr, mut cap) = fixture(vec![], params); + + connect(&mut mgr, &mut cap, 1, 7, now); + mgr.handle_event( + PeerEvent::P2pDisconnect { p2p_peer: 1, peer_id: peer_id(7) }, + now, + &mut |c| cap.0.push(c), + ); + assert_eq!(mgr.archived_count(), 1); + cap.0.clear(); + + let enr = + test_enr_with(7, std::net::Ipv4Addr::new(10, 0, 0, 7), Some([0u8; 16]), None, None); + mgr.handle_event(PeerEvent::DiscNodeFound { enr }, now, &mut |c| cap.0.push(c)); + + assert!( + cap.0.iter().any(|e| matches!(e, PeerControl::P2pDial { .. })), + "archived (non-banned) peer must be redialed, got {:?}", + cap.0 + ); + } + #[test] fn disc_node_found_at_target_does_not_dial() { let now = Instant::now(); @@ -4027,6 +4135,274 @@ mod tests { assert!(mgr.inflight_syncreq.is_none(), "large head jump resets inflight"); } + /// Connect a peer advertising `DataColumnSidecarsByRange` with a cgc=4 + /// ENR and a Status claiming `head_slot`; returns its deterministic + /// custody mask. + fn connect_column_peer( + mgr: &mut PeerManager, + cap: &mut Captured, + conn: usize, + seed: u8, + head_slot: u64, + now: Instant, + ) -> u128 { + let kp = Keypair::from_secret(&[seed; 32]).unwrap(); + let enr = Enr::builder().cgc(4).build(kp.secret_key()).unwrap(); + mgr.database.add_enr(enr); + mgr.handle_event( + PeerEvent::P2pNewConnection { + p2p_peer_id: conn, + peer_id_full: kp.peer_id(), + ip: IpBytes::V4([10, 0, 0, seed]), + port: 4000 + seed as u16, + local_dial: false, + }, + now, + &mut |c| cap.0.push(c), + ); + let mut identify = Identify::default(); + identify.protocols |= 1 << StreamProtocol::DataColumnSidecarsByRange.ordinal(); + mgr.handle_event(PeerEvent::P2pPeerIdentity { p2p_peer: conn, identify }, now, &mut |c| { + cap.0.push(c) + }); + // Digest matches the fixture's `[0u8; 4]` so the status is stored. + send_status(mgr, cap, conn, make_status_v2([0u8; 4], [0xAA; 32], 0, [0xBB; 32], head_slot)); + mgr.database.data_column_custody_groups_intersection(conn, u128::MAX) + } + + #[test] + fn colreq_complete_advances_column_watermark() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100_000, now); + mgr.custody_columns = mask; // full overlap with peer 1 + mgr.synced_through = 128; // blocks confirmed-served [1, 128] + + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("range opened"); + assert_eq!(req.start_slot, 1); + assert_eq!(req.count, 128); + let att = req.attempt.expect("attempt issued"); + assert_eq!(att.peer_id, 1); + assert_eq!(att.columns, mask); + assert_eq!( + mgr.peers.get(&1).unwrap().outbound_in_flight + [StreamProtocol::DataColumnSidecarsByRange.ordinal() as usize], + 1 + ); + + // Complete terminator (stamped by `on_rpc_inbound` in prod). + let mut req = req; + req.attempt = Some(ColAttempt { responded: true, delivered: true, ..att }); + mgr.col_syncreq = Some(req); + + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + assert_eq!(mgr.columns_synced_through, 128); + assert!(mgr.col_syncreq.is_none(), "range done; nothing further to request"); + } + + #[test] + fn colreq_partial_overlap_chases_remainder_on_second_peer() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask1 = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100_000, now); + let mask2 = connect_column_peer(&mut mgr, &mut cap, 2, 2, 100_000, now); + // Custody is deterministic from node_id+cgc; these seeds must give + // each peer columns the other lacks. + assert_ne!(mask1 & !mask2, 0); + assert_ne!(mask2 & !mask1, 0); + mgr.custody_columns = mask1 | mask2; + mgr.synced_through = 64; + + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("range opened"); + let first = req.attempt.expect("attempt issued"); + // Only the picked peer's overlap is requested, not full custody. + assert_ne!(first.columns, mgr.custody_columns); + + let mut req = req; + req.attempt = Some(ColAttempt { responded: true, delivered: true, ..first }); + mgr.col_syncreq = Some(req); + + // Remainder goes to the other peer in the same sweep. + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("remainder still in flight"); + let second = req.attempt.expect("remainder attempt issued"); + assert_ne!(second.peer_id, first.peer_id); + assert_eq!(second.columns, mgr.custody_columns & !first.columns); + + let mut req = req; + req.attempt = Some(ColAttempt { responded: true, delivered: true, ..second }); + mgr.col_syncreq = Some(req); + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + assert_eq!(mgr.columns_synced_through, 64, "full custody covered across two peers"); + assert!(mgr.col_syncreq.is_none()); + } + + #[test] + fn colreq_error_concedes_to_by_root_after_attempts_cap() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100_000, now); + mgr.custody_columns = mask; + mgr.synced_through = 32; + + // Sole peer answers every attempt with an Error terminator. Each + // failure re-picks it (tried set cleared when no one is left); + // `max_colreq_attempts` must bound the loop, then concede the + // range so the by-root wheel owns the remainder. + let mut calls = 0; + while mgr.columns_synced_through < 32 { + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + if let Some(mut req) = mgr.col_syncreq && + let Some(att) = req.attempt && + !att.responded + { + req.attempt = Some(ColAttempt { responded: true, ..att }); + mgr.col_syncreq = Some(req); + // The Error terminator also releases the in-flight slot + // (the `terminal_protocol` bookkeeping in prod). + mgr.peers.get_mut(&1).unwrap().outbound_in_flight + [StreamProtocol::DataColumnSidecarsByRange.ordinal() as usize] -= 1; + } + calls += 1; + assert!(calls < 20, "driver failed to converge"); + } + assert!(mgr.col_syncreq.is_none(), "range conceded"); + assert!(mgr.burnt_for_target.is_empty(), "column failures must not burn block backers"); + } + + #[test] + fn colreq_timeout_scores_peer_and_reissues() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100_000, now); + mgr.custody_columns = mask; + mgr.synced_through = 16; + + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + assert!(mgr.col_syncreq.is_some_and(|r| r.attempt.is_some())); + + // No terminator, no sidecar chunk for a full window: stall. + let timeout = Duration::from_millis(mgr.syncing.inflight_progress_timeout_ms); + let later = now + timeout + Duration::from_millis(1); + mgr.maybe_issue_colreq(later, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("range survives the timeout"); + assert!(req.attempt.is_none(), "stalled attempt dropped"); + assert!(mgr.peers.get(&1).unwrap().application_score < 0.0, "stalling peer scored"); + + // Sole peer: tried set was cleared, next sweep re-issues to it. + mgr.maybe_issue_colreq(later, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("range still active"); + assert!(req.attempt.is_some_and(|a| a.peer_id == 1), "re-issued"); + assert_eq!(req.attempts, 2); + } + + #[test] + fn colreq_complete_capped_by_peer_head() { + // Tip wedge regression: a peer `Complete`s a range that extends past + // its claimed head. The watermark must only advance to the peer's + // head — the tail re-opens as the next range — or the uncovered + // columns would never be re-requested and the DA check would wedge + // the chain (observed live: head stuck at the serving peer's tip). + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100, now); + mgr.custody_columns = mask; + mgr.synced_through = 120; // blocks confirmed past the peer's head + + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("range opened"); + assert_eq!((req.start_slot, req.count), (1, 120)); + let att = req.attempt.expect("attempt issued"); + + let mut req = req; + req.attempt = Some(ColAttempt { responded: true, delivered: true, ..att }); + mgr.col_syncreq = Some(req); + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + + assert_eq!(mgr.columns_synced_through, 100, "watermark capped by peer head"); + // Same sweep re-opened the tail [101, 120] and re-issued. + let req = mgr.col_syncreq.expect("tail range re-opened"); + assert_eq!((req.start_slot, req.start_slot + req.count - 1), (101, 120)); + } + + #[test] + fn colreq_paced_to_block_driver() { + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let mask = connect_column_peer(&mut mgr, &mut cap, 1, 1, 100_000, now); + mgr.custody_columns = mask; + + // No block range requested or served → nothing to pair with. + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + assert!(mgr.col_syncreq.is_none()); + + // An inflight block request opens the same range for columns. + mgr.inflight_syncreq = Some(SyncReq { + peer_id: 1, + start_slot: 1, + count: 128, + last_observed_head_slot: 0, + last_progress_at: now, + responded: false, + delivered: false, + }); + mgr.maybe_issue_colreq(now, &mut |c| cap.0.push(c)); + let req = mgr.col_syncreq.expect("paired range opened"); + assert_eq!((req.start_slot, req.count), (1, 128)); + } + + #[test] + fn syncreq_paused_when_column_fetch_stalls() { + // Block issuance must not outrun the column fetch: batches past the + // column watermark only buffer in BS awaiting DA. A stalled column + // watermark halts block batches; advancing it (delivery or concede) + // releases them. + let now = Instant::now(); + let (mut mgr, mut cap) = fixture(vec![], ScoreParams::default()); + let target = [0xBB; 32]; + set_snapshot(&mut mgr, 10, 320); + connect(&mut mgr, &mut cap, 1, 1, now); + send_status(&mut mgr, &mut cap, 1, make_status_v2(fork_a(), target, 20, [0; 32], 640)); + let _ = mgr.maybe_emit_sync_target(); + + // We custody columns, but the column fetch is frozen at the start head + // (no column peer here, so `maybe_issue_colreq` never advances it). + mgr.custody_columns = (1u128 << 3) | (1u128 << 7); + mgr.columns_synced_through = 320; + let batch = mgr.syncing.max_blocks_by_range_batch; + + // Drive block deliveries with the applied head kept level, isolating + // the column-coupling gate from the apply-lag gate. + mgr.maybe_issue_syncreq(now, &mut |c| cap.0.push(c)); + let mut guard = 0; + while let Some(req) = mgr.inflight_syncreq { + let end = req.start_slot + req.count - 1; + let mut r = req; + r.delivered = true; + mgr.inflight_syncreq = Some(r); + mgr.set_local_head_imported(end); // apply keeps up; columns do not + // Free the slot the terminal response would (test bypasses that path). + mgr.peers.get_mut(&1).unwrap().outbound_in_flight + [StreamProtocol::BeaconBlocksByRange.ordinal() as usize] = 0; + mgr.maybe_issue_syncreq(now, &mut |c| cap.0.push(c)); + guard += 1; + assert!(guard < 8, "block issuance never paused on stalled columns"); + } + assert_eq!(mgr.columns_synced_through, 320, "columns never advanced here"); + assert!( + mgr.synced_through <= mgr.columns_synced_through + 2 * batch, + "blocks ran to {} — more than one batch beyond the stalled column watermark", + mgr.synced_through + ); + + // Column watermark advances → block issuance resumes. + mgr.columns_synced_through = mgr.synced_through; + mgr.maybe_issue_syncreq(now, &mut |c| cap.0.push(c)); + assert!(mgr.inflight_syncreq.is_some(), "blocks resume once columns catch up"); + } + #[test] fn fell_behind_detects_silent_stall_in_following() { // Following + head trailing wall_slot by > head_lag_threshold_slots diff --git a/crates/peer/src/manager/rpc.rs b/crates/peer/src/manager/rpc.rs index 052555c7..72c15886 100644 --- a/crates/peer/src/manager/rpc.rs +++ b/crates/peer/src/manager/rpc.rs @@ -13,6 +13,7 @@ use std::{ time::{Duration, Instant}, }; +use fxhash::FxHashSet; use silver_common::{ BASE_REQUEST_ID, P2pSend, PeerControl, PeerEvent, PeerStatus, RequestCategory, RpcInbound, RpcOutbound, RpcRequest, RpcRequestInbound, RpcRequestOutbound, RpcResponse, @@ -20,7 +21,10 @@ use silver_common::{ ssz_view::{BLOCKS_BY_RANGE_REQ_SIZE, DC_BY_RANGE_REQ_MAX, MetadataView, StatusView}, }; -use crate::{PeerManager, manager::SyncReq}; +use crate::{ + PeerManager, + manager::{ColAttempt, ColSyncReq, SyncReq}, +}; /// Per-peer cap on outstanding RPC requests per protocol. Bounds load on any /// single peer and keeps fan-out useful when many ranges are pending. @@ -66,12 +70,14 @@ const INBOUND_QUOTAS: [Option; N_STREAM_PROTOCOLS] = [ ]; /// Result-byte values for eth2 RPC error chunks. Per -/// `consensus-specs/p2p-interface.md`, only 0x01..=0x03 are spec-defined; -/// 0x14 is a lighthouse extension some clients emit and others tolerate. +/// `consensus-specs/p2p-interface.md`, only 0x01..=0x03 are spec-defined and +/// [0x04, 0x7f] is RESERVED; codes >= 0x80 are client extensions. 0x8b is +/// lighthouse's `RateLimited`. Prysm overloads 0x01 (`InvalidRequest`) for +/// rate limiting, which is indistinguishable from a genuine malformed request. const RPC_ERR_INVALID_REQUEST: u8 = 0x01; const RPC_ERR_SERVER_ERROR: u8 = 0x02; const RPC_ERR_RESOURCE_UNAVAILABLE: u8 = 0x03; -const RPC_ERR_RATE_LIMITED: u8 = 0x14; +const RPC_ERR_RATE_LIMITED: u8 = 0x8b; #[derive(Default)] pub(crate) struct PeerInboundState { @@ -114,8 +120,10 @@ fn severity_for_error_response(code: u8, protocol: StreamProtocol) -> Option Some(RpcSeverity::HighTolerance), }, - // Lighthouse-emitted rate-limit signal. Means we're hammering them, - // not that they're misbehaving — back off, don't ban. + // Lighthouse's rate-limit signal (0x8b). We're hammering them, not + // misbehaviour — moderate, decaying penalty rotates us off this backer + // without banning. Prysm's code-0x01 rate limit can't be told apart + // from a real malformed request and lands on the arm above. RPC_ERR_RATE_LIMITED => Some(RpcSeverity::MidTolerance), // Unknown / reserved code. Spec may add new codes — forward-compat // mild penalty rather than crash. @@ -216,12 +224,25 @@ impl PeerManager { protocol: StreamProtocol, columns: u128, request_id: u64, - slot: Option, + exclude: &FxHashSet, + min_head: u64, ) -> Option<(usize, u128)> { let is_backfill = RequestCategory::from_request_id(request_id).is_backfill(); self.database .live_peers_supporting(protocol) .filter_map(|p| { + if exclude.contains(&p) { + return None; + } + // By-range only: skip peers whose claimed head is below the + // range start (no status ⇒ head 0). Their `Complete` could + // not cover a single requested slot. + if min_head > 0 && + self.database.peer_status_bytes(p).map(StatusView::head_slot).unwrap_or(0) < + min_head + { + return None; + } let peer = self.peers.get(&p)?; let in_flight = peer.outbound_in_flight[protocol.ordinal() as usize]; @@ -241,11 +262,14 @@ impl PeerManager { return None; } - if let Some(slot) = slot && - let Some(earliest) = self.database.earliest_available_slot(p) && - slot < earliest + if let Some(earliest) = self.database.earliest_available_slot(p) && + self.local_head_imported_slot < earliest { - tracing::warn!(slot, earliest, "slot out of bounds"); + tracing::warn!( + slot = self.local_head_imported_slot, + earliest, + "slot out of bounds" + ); return None; } @@ -533,6 +557,22 @@ impl PeerManager { req.delivered |= completed_ok; } } + // Column catch-up correlation: sidecar chunks bump progress, + // terminators resolve the attempt (swept by + // `maybe_issue_colreq`). + if stream_id.protocol() == StreamProtocol::DataColumnSidecarsByRange && + let Some(req) = self.col_syncreq.as_mut() && + let Some(att) = req.attempt.as_mut() && + att.peer_id == stream_id.peer() && + application_id == (BASE_REQUEST_ID | req.start_slot) + { + if terminal_protocol.is_some() { + att.responded = true; + att.delivered |= completed_ok; + } else { + att.last_progress_at = now; + } + } } } } @@ -553,6 +593,12 @@ impl PeerManager { /// BlocksByRange capacity, build the SSZ request, emit it, and set /// `inflight_syncreq`. pub fn maybe_issue_syncreq(&mut self, now: Instant, emit: &mut impl FnMut(PeerControl)) { + // Columns first, unconditionally: the early returns below (flow + // control, target reached) must not starve the column driver — + // with the DA check, missing columns are exactly what stalls the + // applied head that those returns key off. + self.maybe_issue_colreq(now, emit); + let head_slot = self.local_head_imported_slot; // Phase 1 + 2: completion (terminator-driven) / progress / timeout. @@ -634,6 +680,25 @@ impl PeerManager { if self.synced_through > local_head_slot + self.syncing.max_blocks_by_range_batch { return; } + // Couple block issuance to the column fetch. + if self.custody_columns != 0 && + self.synced_through > + self.columns_synced_through + self.syncing.max_blocks_by_range_batch + { + if !self.col_stall_logged { + tracing::warn!( + synced_through = self.synced_through, + columns_synced_through = self.columns_synced_through, + "block sync stalled on data columns: column fetch is more than one \ + batch behind; pausing block batches until it catches up" + ); + self.col_stall_logged = true; + } + return; + } + + self.col_stall_logged = false; + let start_slot = next_base + 1; let remaining = target_end_slot.saturating_sub(next_base); let count = remaining.min(self.syncing.max_blocks_by_range_batch); @@ -685,63 +750,173 @@ impl PeerManager { responded: false, delivered: false, }); + } - // Pair a best-effort data-column-by-range fetch over our custody set - // for the same range, from the peer with the largest custody overlap. - // Paced by the block watermark; no separate retry. Columns the peer - // can't serve fall to the by-root straggler fallback. `BASE_REQUEST_ID` - // routes the response to storage's live `data_columns` path. - if self.custody_columns != 0 { - let col_app_id = BASE_REQUEST_ID | local_head_slot; - match self.best_peer_for_data_columns( - StreamProtocol::DataColumnSidecarsByRange, - self.custody_columns, - col_app_id, - Some(local_head_slot), - ) { - Some((col_peer, overlap)) => { - tracing::info!( - col_peer, - local_head_slot, - count, - custody = self.custody_columns, - overlap, - "issuing DataColumnsByRange" - ); - let (ssz, len) = Self::data_columns_by_range_ssz( - local_head_slot, - count, - self.custody_columns, - ); - if let Some(peer) = self.peers.get_mut(&col_peer) { - peer.outbound_in_flight - [StreamProtocol::DataColumnSidecarsByRange.ordinal() as usize] += 1; - } - emit(PeerControl::P2pSend(P2pSend::Rpc(RpcOutbound::Request( - RpcRequestOutbound { - application_id: col_app_id, - peer: col_peer, - request: RpcRequest::DataColumnsByRange { ssz, len }, - }, - )))); + /// Drive the PM-owned catch-up `DataColumnsByRange` lifecycle: one + /// range over our custody set, one peer attempt at a time. `remaining` + /// shrinks as attempts deliver (`Complete` covers the claimed-custody + /// overlap that was requested); an error terminator or progress + /// timeout marks the peer tried and re-issues the remainder elsewhere. + /// After `max_colreq_attempts` consecutive failures the remainder is + /// conceded to storage's by-root straggler wheel. Paced to the block + /// driver: never requests columns past + /// `max(synced_through, inflight block range end)`. + pub(crate) fn maybe_issue_colreq(&mut self, now: Instant, emit: &mut impl FnMut(PeerControl)) { + if self.custody_columns == 0 { + return; + } + self.resolve_colreq_attempt(now); + self.open_colreq_range(); + self.issue_colreq_attempt(now, emit); + } + + fn resolve_colreq_attempt(&mut self, now: Instant) { + let Some(mut req) = self.col_syncreq else { return }; + let Some(att) = req.attempt else { return }; + + if att.delivered { + // `Complete` covers the overlap only up to the peer's claimed + // head — never claim slots past its tip. Under-capping on a stale + // Status is safe (re-fetch is idempotent); over-capping would skip + // real columns and wedge the DA check. + let peer_head = self + .database + .peer_status_bytes(att.peer_id) + .map(StatusView::head_slot) + .unwrap_or(0); + req.served_through = req.served_through.min(peer_head); + req.remaining &= !att.columns; + req.attempts = 0; + req.attempt = None; + if req.remaining == 0 { + let end = (req.start_slot + req.count - 1).min(req.served_through); + self.columns_synced_through = self.columns_synced_through.max(end); + self.col_tried_for_range.clear(); + self.col_syncreq = None; + } else { + self.col_syncreq = Some(req); + } + } else if att.responded { + // Error terminator — peer already scored via + // `severity_for_error_response`. Re-issue elsewhere. + self.col_tried_for_range.insert(att.peer_id); + req.attempt = None; + self.col_syncreq = Some(req); + } else if now.saturating_duration_since(att.last_progress_at) >= + Duration::from_millis(self.syncing.inflight_progress_timeout_ms) + { + // No terminator and no sidecar chunk for a full timeout window: + // peer stall. + self.col_tried_for_range.insert(att.peer_id); + req.attempt = None; + self.col_syncreq = Some(req); + self.on_rpc_misbehaviour(att.peer_id, RpcSeverity::MidTolerance); + } + } + + fn open_colreq_range(&mut self) { + if self.col_syncreq.is_some() { + return; + } + let base = self.columns_synced_through.max(self.local_head_imported_slot); + let block_end = self.inflight_syncreq.map_or(0, |r| r.start_slot + r.count - 1); + let cap = self.synced_through.max(block_end); + if base >= cap { + return; + } + let count = (cap - base).min(self.syncing.max_blocks_by_range_batch); + self.col_tried_for_range.clear(); + self.col_syncreq = Some(ColSyncReq { + start_slot: base + 1, + count, + remaining: self.custody_columns, + attempts: 0, + served_through: u64::MAX, + attempt: None, + }); + } + + fn issue_colreq_attempt(&mut self, now: Instant, emit: &mut impl FnMut(PeerControl)) { + let Some(mut req) = self.col_syncreq else { return }; + if req.attempt.is_some() { + return; + } + if req.attempts >= self.syncing.max_colreq_attempts { + // Concede: advance the watermark past the range; storage's by-root + // wheel picks up the stragglers as blocks land. + tracing::debug!( + start_slot = req.start_slot, + count = req.count, + remaining = req.remaining, + "colreq attempts exhausted; remainder conceded to by-root" + ); + self.columns_synced_through = + self.columns_synced_through.max(req.start_slot + req.count - 1); + self.col_tried_for_range.clear(); + self.col_syncreq = None; + return; + } + + let app_id = BASE_REQUEST_ID | req.start_slot; + match self.best_peer_for_data_columns( + StreamProtocol::DataColumnSidecarsByRange, + req.remaining, + app_id, + &self.col_tried_for_range, + req.start_slot, + ) { + Some((peer, overlap)) => { + // Request only the overlap: the responder omits columns it + // doesn't custody, so `Complete` then implies coverage of + // exactly `overlap`. + let (ssz, len) = + Self::data_columns_by_range_ssz(req.start_slot, req.count, overlap); + if let Some(p) = self.peers.get_mut(&peer) { + p.outbound_in_flight + [StreamProtocol::DataColumnSidecarsByRange.ordinal() as usize] += 1; } - None => { - // No peer to serve our custody columns by range — sync - // falls back to the by-root wheel. - tracing::debug!( - start_slot, - count, - custody = self.custody_columns, - supporting = self - .database - .live_peers_supporting(StreamProtocol::DataColumnSidecarsByRange) - .count(), - "no peer for DataColumnsByRange" - ); + tracing::debug!( + peer, + start_slot = req.start_slot, + count = req.count, + overlap, + remaining = req.remaining, + attempts = req.attempts, + "issuing DataColumnsByRange" + ); + emit(PeerControl::P2pSend(P2pSend::Rpc(RpcOutbound::Request( + RpcRequestOutbound { + application_id: app_id, + peer, + request: RpcRequest::DataColumnsByRange { ssz, len }, + }, + )))); + req.attempts += 1; + req.attempt = Some(ColAttempt { + peer_id: peer, + columns: overlap, + last_progress_at: now, + responded: false, + delivered: false, + }); + self.col_syncreq = Some(req); + } + None => { + // No eligible peer. If some were skipped as tried, allow + // re-picks next tick; `attempts` still bounds the range. + if !self.col_tried_for_range.is_empty() { + self.col_tried_for_range.clear(); } + tracing::debug!( + start_slot = req.start_slot, + remaining = req.remaining, + supporting = self + .database + .live_peers_supporting(StreamProtocol::DataColumnSidecarsByRange) + .count(), + "no peer for DataColumnsByRange; retrying next tick" + ); } - } else { - tracing::debug!("custody_columns is 0 — no by-range column fetch"); } } @@ -828,7 +1003,8 @@ impl PeerManager { StreamProtocol::DataColumnSidecarsByRoot, remaining, request_id, - None, + &FxHashSet::default(), + 0, ) else { tracing::debug!("no peer has data columns: {remaining}"); break; @@ -1142,11 +1318,19 @@ mod tests { #[test] fn rate_limited_is_mid_tolerance() { - // Self-throttle, don't ban. + // Self-throttle, don't ban. Assert the literal wire code (lighthouse + // `RateLimited` = 139) so a regressed constant is caught here. + assert_eq!(RPC_ERR_RATE_LIMITED, 139); assert!(matches!( - severity_for_error_response(RPC_ERR_RATE_LIMITED, StreamProtocol::BeaconBlocksByRange), + severity_for_error_response(139, StreamProtocol::BeaconBlocksByRange), Some(RpcSeverity::MidTolerance) )); + // Prysm overloads InvalidRequest (1) for rate limiting; by code alone + // that is indistinguishable from a malformed request. + assert!(matches!( + severity_for_error_response(1, StreamProtocol::BeaconBlocksByRange), + Some(RpcSeverity::LowTolerance) + )); } #[test] diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index db6c2af9..7eb98a72 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -468,6 +468,10 @@ impl Store { self.head_slot } + pub(super) fn finalized_slot(&self) -> u64 { + self.finalized_slot + } + pub(super) fn head_root(&self) -> &[u8; 32] { &self.head_root } diff --git a/crates/storage/src/tile.rs b/crates/storage/src/tile.rs index 222e8424..40d10cac 100644 --- a/crates/storage/src/tile.rs +++ b/crates/storage/src/tile.rs @@ -125,6 +125,10 @@ impl StorageTile { return; } + if SignedBeaconBlockView::slot(buffer) <= self.store.finalized_slot() { + return; + } + let block_root = util::block_root(buffer); if self.outstanding_requests.contains(&block_root) { @@ -270,17 +274,17 @@ impl StorageTile { }; if !above_finalized { - tracing::warn!(?stream_id, "sidecar slot at or below finalized"); - return Some((block_root, column_bitmask)); + tracing::debug!(?stream_id, "sidecar slot at or below finalized — ignoring"); + return None; } if !parent_validated { - tracing::warn!( + tracing::debug!( ?stream_id, block_slot, parent_root = hex::encode(parent_root), - "sidecar parent_root not in validated set" + "sidecar parent_root not yet validated — ignoring (not penalized)" ); - return Some((block_root, column_bitmask)); + return None; } if !proposer_matches { tracing::warn!(?stream_id, "sidecar proposer_index mismatch");