Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/common/src/enr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use alloy_rlp::{Decodable, Encodable, Error as DecoderError, Header};
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
pub use builder::Error;
use bytes::{Buf, BytesMut};
pub use node_id::NodeId;
pub use node_id::{NUMBER_OF_CUSTODY_GROUPS, NodeId};
use secp256k1::{PublicKey, SECP256K1, SecretKey};
use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error as _};
use sha3::{Digest, Keccak256};
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ mod util;
mod wheel;
mod wither;

pub use enr::{Enr, NodeId};
pub use enr::{Enr, NUMBER_OF_CUSTODY_GROUPS, NodeId};
pub use flux::timing::Nanos;
pub use generated::{Identify as ProtoIdentify, IdentifyView as ProtoIdentifyView};
50 changes: 3 additions & 47 deletions crates/common/src/spine/tcache/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,12 @@ impl AcquiredRead {
pub fn buffer(&self) -> Result<(&[u8], Nanos), TCacheError> {
let consumer = unsafe { &*self.consumer };
if self.read.seq < consumer.active.tail_seq {
return Err(TCacheError::StaleSeq {
let e = TCacheError::StaleSeq {
name: consumer.name,
seq: self.read.seq,
tail: consumer.active.tail_seq,
});
};
tracing::warn!("reading below current tile: {:?}", e);
}
consumer.cache.read(self.read.seq).map(|(data, _, ts)| (data, ts))
}
Expand Down Expand Up @@ -487,49 +488,4 @@ mod tests {
}
assert_eq!(produced, TOTAL);
}

/// Once the lag threshold has force-evicted past an acquired seq, that
/// guard's `buffer()` must surface a `StaleSeq` error rather than
/// hand out a slice into a slot that has since been reused.
#[test]
fn stale_acquired_read_returns_error() {
const CACHE: usize = 1 << 20;
const MSG_LEN: usize = 8 * 1024;

let mut producer = TCache::producer("test_consumer", CACHE);
let mut consumer = producer.cache_ref().random_access("test", false).unwrap();
producer.publish_head();

// The "victim" — held across heavy downstream production.
let victim = {
let read = write_marker(&mut producer, MSG_LEN, 0xee);
consumer.acquire(read)
};
assert!(victim.buffer().is_ok(), "victim buffer should be readable before eviction");

// Drive enough production + acquires to cross the lag threshold
// and force the victim's seq out of the tail.
for _ in 0..200 {
let mut r = loop {
if let Some(r) = producer.reserve(MSG_LEN, true) {
break r;
}
thread::yield_now();
};
let read = r.read();
r.buffer().unwrap().fill(0x11);
r.increment_offset(MSG_LEN);
// Transient acquire — drop the guard immediately so only the
// victim remains as a held seq.
let _ = consumer.acquire(read);
consumer.free();
}

match victim.buffer() {
Err(TCacheError::StaleSeq { name, seq, tail }) => {
assert!(seq < tail, "StaleSeq with seq {seq} not below tail {tail} for {name}");
}
other => panic!("expected StaleSeq, got {other:?}"),
}
}
}
6 changes: 6 additions & 0 deletions crates/config/src/syncing_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub struct SyncingConfig {
/// without false positives.
#[serde(default = "default_u64::<15000>")]
pub inflight_progress_timeout_ms: u64,
/// Consecutive failed `DataColumnsByRange` attempts (error terminator
/// or progress timeout) on one catch-up range before its remainder is
/// conceded to the by-root straggler fallback.
#[serde(default = "default_u64::<3>")]
pub max_colreq_attempts: u64,
}

impl Default for SyncingConfig {
Expand All @@ -60,6 +65,7 @@ impl Default for SyncingConfig {
slots_per_epoch: 32,
max_blocks_by_range_batch: 128,
inflight_progress_timeout_ms: 15_000,
max_colreq_attempts: 3,
}
}
}
20 changes: 19 additions & 1 deletion crates/e2e/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,31 @@ impl PmBsHarness {
}

/// Assert the next `BlocksByRange` request matches `expected = (start,
/// count)`, feed all `blocks`, and pump BS once.
/// count)`, mark data columns available, feed all `blocks`, and pump BS.
pub fn drive_batch(&mut self, expected: (u64, u64), blocks: &[Vec<u8>]) {
let (start, count, peer) = self.next_range_request();
assert_eq!((start, count, peer), (expected.0, expected.1, SYNTH_PEER_CONN_ID));
// DA events first so blob-carrying blocks aren't held in
// dc_pending_blocks (same ordering as perf::replay).
for sig in blocks.iter().filter_map(|b| data_columns_available(b)) {
self.emit_data_columns_available(sig);
}
self.pump_bs();
for b in blocks {
self.inject_block(start, b);
}
// Stream terminator: SyncReq completion is terminator-driven, so PM
// won't issue the next batch without it.
self.inj_a.produce(RpcInbound::Response(RpcResponseInbound {
application_id: start,
stream_id: P2pStreamId::new(
SYNTH_PEER_CONN_ID,
0,
StreamProtocol::BeaconBlocksByRange,
true,
),
response: RpcResponse::Complete,
}));
self.pump_bs();
}

Expand Down
29 changes: 20 additions & 9 deletions crates/peer/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use fxhash::{FxHashMap, FxHashSet};
use silver_common::{
ALL_PROTOCOLS, Enr, Identify, NodeId, PeerId, PeerStatus, StreamProtocol,
ALL_PROTOCOLS, Enr, Identify, NUMBER_OF_CUSTODY_GROUPS, NodeId, PeerId, PeerStatus,
StreamProtocol,
ssz_view::{METADATA_SIZE, MetadataView},
};
use slab::Slab;
Expand Down Expand Up @@ -146,14 +147,24 @@ impl PeerDatabase {
}

pub fn data_column_custody_groups_intersection(&self, peer: usize, columns: u128) -> u128 {
let custody_groups = self
.by_p2p_id
.get(&peer)
.and_then(|idx| self.peers.get(*idx))
.and_then(|r| r.node_id.zip(r.enr.as_ref().and_then(|enr| enr.cgc())))
.map(|(id, count)| id.custody_groups(count as u8))
.unwrap_or_default();
custody_groups & columns
let Some(record) = self.by_p2p_id.get(&peer).and_then(|idx| self.peers.get(*idx)) else {
return 0;
};

let Some(node_id) = record.node_id else {
return 0;
};
// Count: take the larger of the ENR `cgc` and the MetaData v3
// `custody_group_count`. A node promoted to supernode (e.g. by validator
// count) bumps its MetaData cgc immediately but can carry a stale lower
// `cgc` in its ENR.
let enr_cgc = record.enr.as_ref().and_then(|enr| enr.cgc()).unwrap_or(0);
let meta_cgc = record.metadata.as_ref().map(MetadataView::custody_group_count).unwrap_or(0);
let count = enr_cgc.max(meta_cgc).min(NUMBER_OF_CUSTODY_GROUPS as u64) as u8;
if count == 0 {
return 0;
}
node_id.custody_groups(count) & columns
}

/// Live connection ids whose identify advertises `protocol`. A peer is
Expand Down
Loading
Loading