diff --git a/lean_client/Dockerfile b/lean_client/Dockerfile index 2786ff4..15456e5 100644 --- a/lean_client/Dockerfile +++ b/lean_client/Dockerfile @@ -1,3 +1,29 @@ +# ============================================================================== +# Stage 1: Builder +# ============================================================================== +FROM rustlang/rust:nightly-bookworm AS builder + +# Install build dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + pkg-config \ + libssl-dev \ + libclang-dev \ + cmake \ + git \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Copy source code +COPY . . + +# Build release binary +RUN cargo build --release + +# ============================================================================== +# Stage 2: Runtime +# ============================================================================== FROM ubuntu:22.04 ARG COMMIT_SHA @@ -13,8 +39,13 @@ LABEL org.opencontainers.image.revision=$COMMIT_SHA LABEL org.opencontainers.image.ref.name=$GIT_BRANCH LABEL org.opencontainers.image.created=$BUILD_DATE -ARG TARGETARCH +# Install runtime dependencies +RUN apt-get update && apt-get install -y \ + ca-certificates \ + libssl3 \ + && rm -rf /var/lib/apt/lists/* -COPY ./bin/$TARGETARCH/lean_client /usr/local/bin/lean_client +# Copy binary from builder stage +COPY --from=builder /app/target/release/lean_client /usr/local/bin/lean_client ENTRYPOINT ["lean_client"] diff --git a/lean_client/containers/src/attestation.rs b/lean_client/containers/src/attestation.rs index 3fdced1..263ea32 100644 --- a/lean_client/containers/src/attestation.rs +++ b/lean_client/containers/src/attestation.rs @@ -18,13 +18,13 @@ pub type AttestationSignatures = PersistentList Result<()> { + // Structure: { data: AttestationData, proof: AggregatedSignatureProof } + let attestation_data = signed_aggregated_attestation.data.clone(); + let proof = signed_aggregated_attestation.proof.clone(); + + // Validate attestation data (slot bounds, target validity, etc.) + // TODO(production): Add signature verification here or in caller + validate_attestation_data(store, &attestation_data)?; + + // Store attestation data indexed by hash for later extraction + let data_root = attestation_data.hash_tree_root(); + store + .attestation_data_by_root + .insert(data_root, attestation_data.clone()); + + // Per leanSpec: Store the proof in latest_new_aggregated_payloads + // Each participating validator gets an entry via their SignatureKey + for (bit_idx, bit) in proof.participants.0.iter().enumerate() { + if *bit { + let validator_id = bit_idx as u64; + let sig_key = SignatureKey::new(validator_id, data_root); + store + .latest_new_aggregated_payloads + .entry(sig_key) + .or_default() + .push(proof.clone()); + } + } + + METRICS.get().map(|metrics| { + metrics + .lean_attestations_valid_total + .with_label_values(&["aggregation"]) + .inc() + }); + + Ok(()) +} + /// Internal attestation processing - stores AttestationData fn on_attestation_internal( store: &mut Store, @@ -383,6 +459,12 @@ fn process_block_internal( for (att_idx, aggregated_attestation) in aggregated_attestations.into_iter().enumerate() { let data_root = aggregated_attestation.data.hash_tree_root(); + // Store attestation data for safe target extraction + // This is critical: without this, block attestations are invisible to update_safe_target() + store + .attestation_data_by_root + .insert(data_root, aggregated_attestation.data.clone()); + // Get the corresponding proof from attestation_signatures if let Ok(proof_data) = signatures.attestation_signatures.get(att_idx as u64) { // Store proof for each validator in the aggregation @@ -391,9 +473,9 @@ fn process_block_internal( let validator_id = bit_idx as u64; let sig_key = SignatureKey::new(validator_id, data_root); store - .aggregated_payloads + .latest_known_aggregated_payloads .entry(sig_key) - .or_insert_with(Vec::new) + .or_default() .push(proof_data.clone()); } } diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index ab79210..98f7bde 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -10,9 +10,11 @@ use ssz::{H256, SszHash}; use xmss::Signature; pub type Interval = u64; -pub const INTERVALS_PER_SLOT: Interval = 4; +pub const INTERVALS_PER_SLOT: Interval = 5; pub const SECONDS_PER_SLOT: u64 = 4; -pub const SECONDS_PER_INTERVAL: u64 = SECONDS_PER_SLOT / INTERVALS_PER_SLOT; +/// Milliseconds per interval: (4 * 1000) / 5 = 800ms +/// Using milliseconds avoids integer division truncation (4/5 = 0 in integer math) +pub const MILLIS_PER_INTERVAL: u64 = (SECONDS_PER_SLOT * 1000) / INTERVALS_PER_SLOT; /// Forkchoice store tracking chain state and validator attestations @@ -42,7 +44,20 @@ pub struct Store { pub gossip_signatures: HashMap, - pub aggregated_payloads: HashMap>, + /// Devnet-3: Aggregated signature proofs from block bodies (on-chain). + /// These are attestations that have been included in blocks and are part of + /// the "known" pool for safe target computation. + pub latest_known_aggregated_payloads: HashMap>, + + /// Devnet-3: Aggregated signature proofs from gossip aggregation topic. + /// These are newly received aggregations that haven't been migrated to "known" yet. + /// At interval 3, we merge this with latest_known_aggregated_payloads for safe target. + pub latest_new_aggregated_payloads: HashMap>, + + /// Attestation data indexed by hash (data_root). + /// Used to look up the exact attestation data that was signed, + /// matching ream's attestation_data_by_root_provider design. + pub attestation_data_by_root: HashMap, } const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; @@ -142,7 +157,9 @@ pub fn get_forkchoice_store( latest_new_attestations: HashMap::new(), blocks_queue: HashMap::new(), gossip_signatures: HashMap::new(), - aggregated_payloads: HashMap::new(), + latest_known_aggregated_payloads: HashMap::new(), + latest_new_aggregated_payloads: HashMap::new(), + attestation_data_by_root: HashMap::new(), } } @@ -253,6 +270,52 @@ pub fn update_head(store: &mut Store) { ); } +/// Extract per-validator attestations from aggregated payloads. +/// +/// Per leanSpec: walks through all aggregated proofs and extracts the latest +/// attestation data for each validator based on their participation bits. +fn extract_attestations_from_aggregated_payloads( + payloads: &HashMap>, + attestation_data_by_root: &HashMap, +) -> HashMap { + let mut attestations: HashMap = HashMap::new(); + + for (sig_key, proofs) in payloads { + // Look up the attestation data for this signature key's data_root + let Some(attestation_data) = attestation_data_by_root.get(&sig_key.data_root) else { + continue; + }; + + // For each proof, extract participating validators + for proof in proofs { + for (bit_idx, bit) in proof.participants.0.iter().enumerate() { + if *bit { + let validator_id = bit_idx as u64; + // Only update if this is a newer attestation for this validator + if attestations + .get(&validator_id) + .map_or(true, |existing| existing.slot < attestation_data.slot) + { + attestations.insert(validator_id, attestation_data.clone()); + } + } + } + } + } + + attestations +} + +/// Devnet-3: Update safe target from aggregated attestations +/// +/// Per leanSpec: Safe target is computed by merging BOTH aggregated payload pools: +/// - latest_known_aggregated_payloads: from block bodies (on-chain) +/// - latest_new_aggregated_payloads: from gossip aggregation topic +/// +/// This merge is critical because at interval 3 (when this runs), the migration +/// to "known" (interval 4) hasn't happened yet. Without merging: +/// - Proposer's own attestation in block body (goes directly to known) would be invisible +/// - Node's self-attestation (goes directly to known) would be invisible pub fn update_safe_target(store: &mut Store) { let n_validators = if let Some(state) = store.states.get(&store.head) { state.validators.len_usize() @@ -260,12 +323,35 @@ pub fn update_safe_target(store: &mut Store) { 0 }; + // Compute 2/3 supermajority threshold using ceiling division + // Formula: ceil(2n/3) = (2n + 2) / 3 for integer math let min_score = (n_validators * 2 + 2) / 3; let root = store.latest_justified.root; - let new_safe_target = - get_fork_choice_head(store, root, &store.latest_new_attestations, min_score); + + // Per leanSpec: Merge both aggregated payload pools + // This ensures we see all attestations including proposer's own and self-attestations + let mut all_payloads: HashMap> = + store.latest_known_aggregated_payloads.clone(); + + for (sig_key, proofs) in &store.latest_new_aggregated_payloads { + all_payloads + .entry(sig_key.clone()) + .or_default() + .extend(proofs.clone()); + } + + // Extract per-validator attestations from merged payloads + let attestations = + extract_attestations_from_aggregated_payloads(&all_payloads, &store.attestation_data_by_root); + + // Run LMD-GHOST with 2/3 threshold to find safe target + let new_safe_target = get_fork_choice_head(store, root, &attestations, min_score); store.safe_target = new_safe_target; + // Clear the "new" pool after processing (will be repopulated by gossip) + // Note: We do NOT clear latest_known_aggregated_payloads as those persist + store.latest_new_aggregated_payloads.clear(); + set_gauge_u64( |metrics| &metrics.lean_safe_target_slot, || { @@ -289,21 +375,25 @@ pub fn accept_new_attestations(store: &mut Store) { pub fn tick_interval(store: &mut Store, has_proposal: bool) { store.time += 1; // Calculate current interval within slot: time % INTERVALS_PER_SLOT + // Devnet-3: 5 intervals per slot (800ms each) let curr_interval = store.time % INTERVALS_PER_SLOT; match curr_interval { - 0 if has_proposal => accept_new_attestations(store), - 2 => update_safe_target(store), - 3 => accept_new_attestations(store), + 0 if has_proposal => accept_new_attestations(store), // Interval 0: Block proposal + 1 => {} // Interval 1: Attestation phase + 2 => {} // Interval 2: Aggregation phase (handled in main.rs) + 3 => update_safe_target(store), // Interval 3: Safe target update + 4 => accept_new_attestations(store), // Interval 4: Accept attestations _ => {} } } #[inline] pub fn get_proposal_head(store: &mut Store, slot: Slot) -> H256 { - let slot_time = store.config.genesis_time + (slot.0 * SECONDS_PER_SLOT); + // Convert to milliseconds for on_tick (devnet-3 uses 800ms intervals) + let slot_time_millis = (store.config.genesis_time + (slot.0 * SECONDS_PER_SLOT)) * 1000; - crate::handlers::on_tick(store, slot_time, true); + crate::handlers::on_tick(store, slot_time_millis, true); accept_new_attestations(store); store.head } @@ -374,7 +464,7 @@ pub fn produce_block_with_signatures( Some(available_attestations), Some(&known_block_roots), Some(&store.gossip_signatures), - Some(&store.aggregated_payloads), + Some(&store.latest_known_aggregated_payloads), )?; // Compute block root using the header hash (canonical block root) diff --git a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs index 2b4c5ef..f977b85 100644 --- a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs +++ b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs @@ -568,10 +568,10 @@ fn forkchoice(spec_file: &str) { let block_root = signed_block.message.block.hash_tree_root(); // Advance time to the block's slot to ensure attestations are processable - // SECONDS_PER_SLOT is 4 (not 12) - let block_time = - store.config.genesis_time + (signed_block.message.block.slot.0 * 4); - on_tick(&mut store, block_time, false); + // SECONDS_PER_SLOT is 4. Convert to milliseconds for devnet-3 + let block_time_millis = + (store.config.genesis_time + (signed_block.message.block.slot.0 * 4)) * 1000; + on_tick(&mut store, block_time_millis, false); on_block(&mut store, signed_block).unwrap(); Ok(block_root) @@ -610,7 +610,9 @@ fn forkchoice(spec_file: &str) { .tick .or(step.time) .expect(&format!("Step {step_idx}: Missing tick/time data")); - on_tick(&mut store, time_value, false); + // Convert seconds to milliseconds for devnet-3 + let time_value_millis = time_value * 1000; + on_tick(&mut store, time_value_millis, false); if step.valid { verify_checks(&store, &step.checks, &block_labels, step_idx).expect( diff --git a/lean_client/fork_choice/tests/unit_tests/time.rs b/lean_client/fork_choice/tests/unit_tests/time.rs index a4db1b5..05fe4f6 100644 --- a/lean_client/fork_choice/tests/unit_tests/time.rs +++ b/lean_client/fork_choice/tests/unit_tests/time.rs @@ -1,14 +1,15 @@ use super::common::create_test_store; use fork_choice::handlers::on_tick; -use fork_choice::store::{INTERVALS_PER_SLOT, SECONDS_PER_SLOT, tick_interval}; +use fork_choice::store::{INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, SECONDS_PER_SLOT, tick_interval}; #[test] fn test_on_tick_basic() { let mut store = create_test_store(); let initial_time = store.time; - let target_time = store.config.genesis_time + 200; + // on_tick now expects milliseconds (devnet-3) + let target_time_millis = (store.config.genesis_time + 200) * 1000; - on_tick(&mut store, target_time, true); + on_tick(&mut store, target_time_millis, true); assert!(store.time > initial_time); } @@ -17,9 +18,10 @@ fn test_on_tick_basic() { fn test_on_tick_no_proposal() { let mut store = create_test_store(); let initial_time = store.time; - let target_time = store.config.genesis_time + 100; + // on_tick now expects milliseconds (devnet-3) + let target_time_millis = (store.config.genesis_time + 100) * 1000; - on_tick(&mut store, target_time, false); + on_tick(&mut store, target_time_millis, false); assert!(store.time >= initial_time); } @@ -28,10 +30,12 @@ fn test_on_tick_no_proposal() { fn test_on_tick_already_current() { let mut store = create_test_store(); let initial_time = store.time; - let current_target = store.config.genesis_time + initial_time; + // on_tick now expects milliseconds (devnet-3) + // Convert initial_time (in intervals) to seconds, then to milliseconds + let current_target_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL); // Try to advance to current time - on_tick(&mut store, current_target, true); + on_tick(&mut store, current_target_millis, true); // Should not change significantly assert_eq!(store.time, initial_time); @@ -41,10 +45,11 @@ fn test_on_tick_already_current() { fn test_on_tick_small_increment() { let mut store = create_test_store(); let initial_time = store.time; - // Advance by just 1 second - let target_time = store.config.genesis_time + initial_time + 1; + // on_tick now expects milliseconds (devnet-3) + // Advance by just 1 second (1000ms) + let target_time_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL) + 1000; - on_tick(&mut store, target_time, false); + on_tick(&mut store, target_time_millis, false); // Should advance or stay same depending on interval rounding, but definitely not go back assert!(store.time >= initial_time); @@ -144,10 +149,18 @@ fn test_interval_calculations() { let slot_number = total_intervals / INTERVALS_PER_SLOT; let interval_in_slot = total_intervals % INTERVALS_PER_SLOT; - // INTERVALS_PER_SLOT is 4 (from store.rs) - // 10 intervals = 2 slots (8 intervals) + 2 intervals + // INTERVALS_PER_SLOT is 5 (devnet-3) + // 10 intervals = 2 slots (10 intervals) + 0 intervals assert_eq!(slot_number, 2); - assert_eq!(interval_in_slot, 2); + assert_eq!(interval_in_slot, 0); + + // Test with 8 intervals + let total_8 = 8; + let slot_8 = total_8 / INTERVALS_PER_SLOT; + let interval_8 = total_8 % INTERVALS_PER_SLOT; + // 8 intervals = 1 slot (5 intervals) + 3 intervals + assert_eq!(slot_8, 1); + assert_eq!(interval_8, 3); // Test boundary cases let boundary_intervals = INTERVALS_PER_SLOT; @@ -157,3 +170,28 @@ fn test_interval_calculations() { assert_eq!(boundary_slot, 1); // Start of next slot assert_eq!(boundary_interval, 0); // First interval of slot } + +#[test] +fn test_millis_per_interval() { + // Devnet-3: 4 seconds per slot, 5 intervals = 800ms per interval + assert_eq!(MILLIS_PER_INTERVAL, 800); + + // Verify no integer truncation: 4 * 1000 = 5 * 800 + assert_eq!(SECONDS_PER_SLOT * 1000, MILLIS_PER_INTERVAL * INTERVALS_PER_SLOT); +} + +#[test] +fn test_devnet3_interval_schedule() { + // Test that 5 intervals are properly cycled through + let mut store = create_test_store(); + store.time = 0; + + // Tick through a complete slot (5 intervals) + for i in 0..5 { + tick_interval(&mut store, i == 0); + } + + // After 5 ticks, we should be at interval 0 of the next slot + assert_eq!(store.time, 5); + assert_eq!(store.time % INTERVALS_PER_SLOT, 0); +} diff --git a/lean_client/networking/src/gossipsub/message.rs b/lean_client/networking/src/gossipsub/message.rs index 6e770f4..ab9df7d 100644 --- a/lean_client/networking/src/gossipsub/message.rs +++ b/lean_client/networking/src/gossipsub/message.rs @@ -1,13 +1,16 @@ use crate::gossipsub::topic::GossipsubKind; use crate::gossipsub::topic::GossipsubTopic; -use containers::SignedAttestation; -use containers::SignedBlockWithAttestation; +use containers::{SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation}; use libp2p::gossipsub::TopicHash; use ssz::SszReadDefault as _; +/// Devnet-3 gossipsub message types pub enum GossipsubMessage { Block(SignedBlockWithAttestation), - Attestation(SignedAttestation), + /// Attestation from a specific subnet (devnet-3) + AttestationSubnet { subnet_id: u64, attestation: SignedAttestation }, + /// Aggregated attestation (devnet-3) + Aggregation(SignedAggregatedAttestation), } impl GossipsubMessage { @@ -17,8 +20,14 @@ impl GossipsubMessage { SignedBlockWithAttestation::from_ssz_default(data) .map_err(|e| format!("{:?}", e))?, )), - GossipsubKind::Attestation => Ok(Self::Attestation( - SignedAttestation::from_ssz_default(data).map_err(|e| format!("{:?}", e))?, + GossipsubKind::AttestationSubnet(subnet_id) => Ok(Self::AttestationSubnet { + subnet_id, + attestation: SignedAttestation::from_ssz_default(data) + .map_err(|e| format!("{:?}", e))?, + }), + GossipsubKind::Aggregation => Ok(Self::Aggregation( + SignedAggregatedAttestation::from_ssz_default(data) + .map_err(|e| format!("{:?}", e))?, )), } } diff --git a/lean_client/networking/src/gossipsub/tests/config.rs b/lean_client/networking/src/gossipsub/tests/config.rs index 4fa245d..5390d69 100644 --- a/lean_client/networking/src/gossipsub/tests/config.rs +++ b/lean_client/networking/src/gossipsub/tests/config.rs @@ -1,5 +1,5 @@ use crate::gossipsub::config::GossipsubConfig; -use crate::gossipsub::topic::{GossipsubKind, get_topics}; +use crate::gossipsub::topic::{ATTESTATION_SUBNET_COUNT, GossipsubKind, get_topics}; #[test] fn test_default_parameters() { @@ -43,9 +43,22 @@ fn test_set_topics() { config.set_topics(topics.clone()); - assert_eq!(config.topics.len(), 2); + // Block + Aggregation + ATTESTATION_SUBNET_COUNT subnets (no legacy Attestation) + let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; + assert_eq!(config.topics.len(), expected_count); + + // Verify topics assert_eq!(config.topics[0].fork, "genesis"); assert_eq!(config.topics[0].kind, GossipsubKind::Block); assert_eq!(config.topics[1].fork, "genesis"); - assert_eq!(config.topics[1].kind, GossipsubKind::Attestation); + assert_eq!(config.topics[1].kind, GossipsubKind::Aggregation); + + // Verify subnet topics + for i in 0..ATTESTATION_SUBNET_COUNT as usize { + assert_eq!(config.topics[2 + i].fork, "genesis"); + assert_eq!( + config.topics[2 + i].kind, + GossipsubKind::AttestationSubnet(i as u64) + ); + } } diff --git a/lean_client/networking/src/gossipsub/tests/message.rs b/lean_client/networking/src/gossipsub/tests/message.rs index 9fd25dd..458dac0 100644 --- a/lean_client/networking/src/gossipsub/tests/message.rs +++ b/lean_client/networking/src/gossipsub/tests/message.rs @@ -1,6 +1,6 @@ use crate::gossipsub::message::GossipsubMessage; use crate::gossipsub::topic::{ - ATTESTATION_TOPIC, BLOCK_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, + ATTESTATION_SUBNET_PREFIX, BLOCK_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, }; use libp2p::gossipsub::TopicHash; @@ -27,10 +27,10 @@ fn test_message_decode_invalid_ssz_for_block() { } #[test] -fn test_message_decode_invalid_ssz_for_attestation() { +fn test_message_decode_invalid_ssz_for_attestation_subnet() { let topic_str = format!( - "/{}/{}/{}/{}", - TOPIC_PREFIX, "genesis", ATTESTATION_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX + "/{}/{}/{}{}/{}", + TOPIC_PREFIX, "genesis", ATTESTATION_SUBNET_PREFIX, "0", SSZ_SNAPPY_ENCODING_POSTFIX ); let topic = TopicHash::from_raw(topic_str); let invalid_ssz = b"not_valid_ssz"; diff --git a/lean_client/networking/src/gossipsub/tests/message_id.rs b/lean_client/networking/src/gossipsub/tests/message_id.rs index 17a0b51..61576aa 100644 --- a/lean_client/networking/src/gossipsub/tests/message_id.rs +++ b/lean_client/networking/src/gossipsub/tests/message_id.rs @@ -1,6 +1,6 @@ use crate::gossipsub::config::compute_message_id; use crate::gossipsub::topic::{ - ATTESTATION_TOPIC, BLOCK_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, + ATTESTATION_SUBNET_PREFIX, BLOCK_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, }; use crate::types::MESSAGE_DOMAIN_VALID_SNAPPY; use libp2p::gossipsub::{Message, TopicHash}; @@ -163,10 +163,10 @@ fn test_realistic_blockchain_scenarios() { ), ( format!( - "/{}/genesis/{}/{}", - TOPIC_PREFIX, ATTESTATION_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX + "/{}/genesis/{}0/{}", + TOPIC_PREFIX, ATTESTATION_SUBNET_PREFIX, SSZ_SNAPPY_ENCODING_POSTFIX ), - b"aggregate_proof_ssz".to_vec(), + b"attestation_ssz".to_vec(), ), ]; diff --git a/lean_client/networking/src/gossipsub/tests/topic.rs b/lean_client/networking/src/gossipsub/tests/topic.rs index 7e3d70b..ae49a4f 100644 --- a/lean_client/networking/src/gossipsub/tests/topic.rs +++ b/lean_client/networking/src/gossipsub/tests/topic.rs @@ -1,6 +1,7 @@ use crate::gossipsub::topic::{ - ATTESTATION_TOPIC, BLOCK_TOPIC, GossipsubKind, GossipsubTopic, SSZ_SNAPPY_ENCODING_POSTFIX, - TOPIC_PREFIX, get_topics, + AGGREGATION_TOPIC, ATTESTATION_SUBNET_COUNT, ATTESTATION_SUBNET_PREFIX, BLOCK_TOPIC, + GossipsubKind, GossipsubTopic, SSZ_SNAPPY_ENCODING_POSTFIX, TOPIC_PREFIX, + get_subscription_topics, get_topics, }; use libp2p::gossipsub::TopicHash; @@ -18,20 +19,6 @@ fn test_topic_decode_valid_block() { assert_eq!(decoded.kind, GossipsubKind::Block); } -#[test] -fn test_topic_decode_valid_attestation() { - let topic_str = format!( - "/{}/{}/{}/{}", - TOPIC_PREFIX, "genesis", ATTESTATION_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX - ); - let topic_hash = TopicHash::from_raw(topic_str); - - let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); - - assert_eq!(decoded.fork, "genesis"); - assert_eq!(decoded.kind, GossipsubKind::Attestation); -} - #[test] fn test_topic_decode_invalid_prefix() { let topic_str = format!( @@ -97,7 +84,7 @@ fn test_topic_to_string() { fn test_topic_encoding_decoding_roundtrip() { let original = GossipsubTopic { fork: "testfork".to_string(), - kind: GossipsubKind::Attestation, + kind: GossipsubKind::AttestationSubnet(0), }; let topic_hash: TopicHash = original.clone().into(); @@ -111,11 +98,18 @@ fn test_topic_encoding_decoding_roundtrip() { fn test_get_topics_all_same_fork() { let topics = get_topics("myfork".to_string()); - assert_eq!(topics.len(), 2); + // Block + Aggregation + ATTESTATION_SUBNET_COUNT subnets (no legacy Attestation) + let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; + assert_eq!(topics.len(), expected_count); - let kinds: Vec<_> = topics.iter().map(|t| t.kind).collect(); + let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); assert!(kinds.contains(&GossipsubKind::Block)); - assert!(kinds.contains(&GossipsubKind::Attestation)); + assert!(kinds.contains(&GossipsubKind::Aggregation)); + + // Check subnet topics + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); + } // All should have the same fork for topic in &topics { @@ -126,7 +120,97 @@ fn test_get_topics_all_same_fork() { #[test] fn test_gossipsub_kind_display() { assert_eq!(GossipsubKind::Block.to_string(), BLOCK_TOPIC); - assert_eq!(GossipsubKind::Attestation.to_string(), ATTESTATION_TOPIC); + assert_eq!(GossipsubKind::Aggregation.to_string(), AGGREGATION_TOPIC); + assert_eq!( + GossipsubKind::AttestationSubnet(0).to_string(), + format!("{ATTESTATION_SUBNET_PREFIX}0") + ); + assert_eq!( + GossipsubKind::AttestationSubnet(5).to_string(), + format!("{ATTESTATION_SUBNET_PREFIX}5") + ); +} + +#[test] +fn test_topic_decode_valid_aggregation() { + let topic_str = format!( + "/{}/{}/{}/{}", + TOPIC_PREFIX, "genesis", AGGREGATION_TOPIC, SSZ_SNAPPY_ENCODING_POSTFIX + ); + let topic_hash = TopicHash::from_raw(topic_str); + + let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); + + assert_eq!(decoded.fork, "genesis"); + assert_eq!(decoded.kind, GossipsubKind::Aggregation); +} + +#[test] +fn test_topic_decode_valid_attestation_subnet() { + let topic_str = format!( + "/{}/{}/{}/{}", + TOPIC_PREFIX, "genesis", "attestation_0", SSZ_SNAPPY_ENCODING_POSTFIX + ); + let topic_hash = TopicHash::from_raw(topic_str); + + let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); + + assert_eq!(decoded.fork, "genesis"); + assert_eq!(decoded.kind, GossipsubKind::AttestationSubnet(0)); +} + +#[test] +fn test_topic_decode_valid_attestation_subnet_large_id() { + let topic_str = format!( + "/{}/{}/{}/{}", + TOPIC_PREFIX, "genesis", "attestation_42", SSZ_SNAPPY_ENCODING_POSTFIX + ); + let topic_hash = TopicHash::from_raw(topic_str); + + let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); + + assert_eq!(decoded.fork, "genesis"); + assert_eq!(decoded.kind, GossipsubKind::AttestationSubnet(42)); +} + +#[test] +fn test_topic_decode_invalid_attestation_subnet_id() { + let topic_str = format!( + "/{}/{}/{}/{}", + TOPIC_PREFIX, "genesis", "attestation_abc", SSZ_SNAPPY_ENCODING_POSTFIX + ); + let topic_hash = TopicHash::from_raw(topic_str); + + let result = GossipsubTopic::decode(&topic_hash); + assert!(result.is_err()); +} + +#[test] +fn test_topic_aggregation_roundtrip() { + let original = GossipsubTopic { + fork: "testfork".to_string(), + kind: GossipsubKind::Aggregation, + }; + + let topic_hash: TopicHash = original.clone().into(); + let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); + + assert_eq!(original.fork, decoded.fork); + assert_eq!(original.kind, decoded.kind); +} + +#[test] +fn test_topic_attestation_subnet_roundtrip() { + let original = GossipsubTopic { + fork: "testfork".to_string(), + kind: GossipsubKind::AttestationSubnet(7), + }; + + let topic_hash: TopicHash = original.clone().into(); + let decoded = GossipsubTopic::decode(&topic_hash).unwrap(); + + assert_eq!(original.fork, decoded.fork); + assert_eq!(original.kind, decoded.kind); } #[test] @@ -141,16 +225,16 @@ fn test_topic_equality() { }; let topic3 = GossipsubTopic { fork: "genesis".to_string(), - kind: GossipsubKind::Attestation, + kind: GossipsubKind::Aggregation, }; let topic4 = GossipsubTopic { fork: "genesis2".to_string(), - kind: GossipsubKind::Attestation, + kind: GossipsubKind::Aggregation, }; assert_eq!(topic1, topic2); assert_ne!(topic1, topic3); - assert_ne!(topic1, topic4); + assert_ne!(topic3, topic4); // Same kind, different fork } #[test] @@ -168,3 +252,41 @@ fn test_topic_hash_conversion() { assert_eq!(hash.as_str(), expected); } + +#[test] +fn test_get_subscription_topics_aggregator() { + // Aggregators subscribe to all topics including attestation subnets + let topics = get_subscription_topics("myfork".to_string(), true); + + // Block + Aggregation + ATTESTATION_SUBNET_COUNT subnets + let expected_count = 2 + ATTESTATION_SUBNET_COUNT as usize; + assert_eq!(topics.len(), expected_count); + + let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); + assert!(kinds.contains(&GossipsubKind::Block)); + assert!(kinds.contains(&GossipsubKind::Aggregation)); + + // Aggregators should have subnet topics + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + assert!(kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); + } +} + +#[test] +fn test_get_subscription_topics_non_aggregator() { + // Non-aggregators only subscribe to Block and Aggregation + // They do NOT subscribe to attestation subnet topics (they publish to them but don't subscribe) + let topics = get_subscription_topics("myfork".to_string(), false); + + // Block + Aggregation only (no subnet topics) + assert_eq!(topics.len(), 2); + + let kinds: Vec<_> = topics.iter().map(|t| t.kind.clone()).collect(); + assert!(kinds.contains(&GossipsubKind::Block)); + assert!(kinds.contains(&GossipsubKind::Aggregation)); + + // Non-aggregators should NOT have subnet topics + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + assert!(!kinds.contains(&GossipsubKind::AttestationSubnet(subnet_id))); + } +} diff --git a/lean_client/networking/src/gossipsub/topic.rs b/lean_client/networking/src/gossipsub/topic.rs index 09fcd33..ef4d080 100644 --- a/lean_client/networking/src/gossipsub/topic.rs +++ b/lean_client/networking/src/gossipsub/topic.rs @@ -4,7 +4,17 @@ pub const TOPIC_PREFIX: &str = "leanconsensus"; pub const SSZ_SNAPPY_ENCODING_POSTFIX: &str = "ssz_snappy"; pub const BLOCK_TOPIC: &str = "block"; -pub const ATTESTATION_TOPIC: &str = "attestation"; +pub const ATTESTATION_SUBNET_PREFIX: &str = "attestation_"; +pub const AGGREGATION_TOPIC: &str = "aggregation"; + +/// Number of attestation subnets (devnet-3) +pub const ATTESTATION_SUBNET_COUNT: u64 = 1; + +/// Compute the subnet ID for a validator (devnet-3) +/// Subnet assignment: validator_id % ATTESTATION_SUBNET_COUNT +pub fn compute_subnet_id(validator_id: u64) -> u64 { + validator_id % ATTESTATION_SUBNET_COUNT +} #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub struct GossipsubTopic { @@ -12,23 +22,62 @@ pub struct GossipsubTopic { pub kind: GossipsubKind, } -#[derive(Debug, Hash, Clone, Copy, PartialEq, Eq)] +/// Devnet-3 gossipsub topic kinds +/// Note: Legacy global "attestation" topic removed - use AttestationSubnet(subnet_id) instead +#[derive(Debug, Hash, Clone, PartialEq, Eq)] pub enum GossipsubKind { Block, - Attestation, + /// Subnet-specific attestation topic (devnet-3) + /// Format: attestation_{subnet_id} + AttestationSubnet(u64), + /// Aggregated attestation topic (devnet-3) + Aggregation, +} + +impl GossipsubKind { + /// Check if this kind matches another, treating AttestationSubnet as matching any subnet + pub fn matches(&self, other: &Self) -> bool { + match (self, other) { + (GossipsubKind::AttestationSubnet(_), GossipsubKind::AttestationSubnet(_)) => true, + _ => self == other, + } + } } +/// Get all topics (for testing or full subscription) pub fn get_topics(fork: String) -> Vec { - vec![ + get_subscription_topics(fork, true) +} + +/// Get topics for subscription based on node role (devnet-3) +/// - All nodes subscribe to Block and Aggregation topics +/// - Only aggregators subscribe to AttestationSubnet topics to collect attestations +/// - Non-aggregators publish to subnet topics but don't subscribe +pub fn get_subscription_topics(fork: String, is_aggregator: bool) -> Vec { + let mut topics = vec![ GossipsubTopic { fork: fork.clone(), kind: GossipsubKind::Block, }, + // Aggregation topic - all nodes subscribe to receive aggregated attestations GossipsubTopic { fork: fork.clone(), - kind: GossipsubKind::Attestation, + kind: GossipsubKind::Aggregation, }, - ] + ]; + + // Only aggregators subscribe to attestation subnet topics (devnet-3) + // Non-aggregators publish to these topics but don't subscribe + if is_aggregator { + for subnet_id in 0..ATTESTATION_SUBNET_COUNT { + topics.push(GossipsubTopic { + fork: fork.clone(), + kind: GossipsubKind::AttestationSubnet(subnet_id), + }); + } + } + + topics } impl GossipsubTopic { @@ -63,10 +112,19 @@ impl GossipsubTopic { } fn extract_kind(parts: &[&str]) -> Result { - match parts[2] { - BLOCK_TOPIC => Ok(GossipsubKind::Block), - ATTESTATION_TOPIC => Ok(GossipsubKind::Attestation), - other => Err(format!("Invalid topic kind: {other:?}")), + let topic_name = parts[2]; + + if topic_name == BLOCK_TOPIC { + Ok(GossipsubKind::Block) + } else if topic_name == AGGREGATION_TOPIC { + Ok(GossipsubKind::Aggregation) + } else if let Some(subnet_str) = topic_name.strip_prefix(ATTESTATION_SUBNET_PREFIX) { + let subnet_id = subnet_str.parse::().map_err(|e| { + format!("Invalid attestation subnet id: {subnet_str:?}, error: {e}") + })?; + Ok(GossipsubKind::AttestationSubnet(subnet_id)) + } else { + Err(format!("Invalid topic kind: {topic_name:?}")) } } } @@ -96,8 +154,11 @@ impl From for String { impl From for TopicHash { fn from(val: GossipsubTopic) -> Self { let kind_str = match &val.kind { - GossipsubKind::Block => BLOCK_TOPIC, - GossipsubKind::Attestation => ATTESTATION_TOPIC, + GossipsubKind::Block => BLOCK_TOPIC.to_string(), + GossipsubKind::AttestationSubnet(subnet_id) => { + format!("{ATTESTATION_SUBNET_PREFIX}{subnet_id}") + } + GossipsubKind::Aggregation => AGGREGATION_TOPIC.to_string(), }; TopicHash::from_raw(format!( "/{}/{}/{}/{}", @@ -110,7 +171,10 @@ impl std::fmt::Display for GossipsubKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { GossipsubKind::Block => write!(f, "{BLOCK_TOPIC}"), - GossipsubKind::Attestation => write!(f, "{ATTESTATION_TOPIC}"), + GossipsubKind::AttestationSubnet(subnet_id) => { + write!(f, "{ATTESTATION_SUBNET_PREFIX}{subnet_id}") + } + GossipsubKind::Aggregation => write!(f, "{AGGREGATION_TOPIC}"), } } } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 2874f25..4784ea6 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -496,24 +496,44 @@ where ); } } - Ok(GossipsubMessage::Attestation(signed_attestation)) => { + Ok(GossipsubMessage::AttestationSubnet { subnet_id, attestation }) => { info!( - validator = %signed_attestation.validator_id, - slot = %signed_attestation.message.slot.0, - "received attestation via gossip" + validator = %attestation.validator_id, + slot = %attestation.message.slot.0, + subnet_id = subnet_id, + "received attestation via subnet gossip" ); - let slot = signed_attestation.message.slot.0; + let slot = attestation.message.slot.0; if let Err(err) = self .chain_message_sink .send(ChainMessage::ProcessAttestation { - signed_attestation: signed_attestation, + signed_attestation: attestation, is_trusted: false, should_gossip: true, }) .await { - warn!("failed to send vote for slot {slot} to chain: {err:?}"); + warn!("failed to send subnet attestation for slot {slot} to chain: {err:?}"); + } + } + Ok(GossipsubMessage::Aggregation(signed_aggregated_attestation)) => { + info!( + slot = %signed_aggregated_attestation.data.slot.0, + "received aggregated attestation via gossip" + ); + let slot = signed_aggregated_attestation.data.slot.0; + + if let Err(err) = self + .chain_message_sink + .send(ChainMessage::ProcessAggregation { + signed_aggregated_attestation, + is_trusted: false, + should_gossip: true, + }) + .await + { + warn!("failed to send aggregation for slot {slot} to chain: {err:?}"); } } Err(err) => { @@ -722,15 +742,23 @@ where } } } - OutboundP2pRequest::GossipAttestation(signed_attestation) => { + OutboundP2pRequest::GossipAttestation(signed_attestation, subnet_id) => { let slot = signed_attestation.message.slot.0; + let validator_id = signed_attestation.validator_id; match signed_attestation.to_ssz() { Ok(bytes) => { - if let Err(err) = self.publish_to_topic(GossipsubKind::Attestation, bytes) { - warn!(slot = slot, ?err, "Publish attestation failed"); + // Devnet-3: Publish to subnet-specific topic only + let topic_kind = GossipsubKind::AttestationSubnet(subnet_id); + if let Err(err) = self.publish_to_topic(topic_kind, bytes) { + warn!(slot = slot, subnet_id = subnet_id, ?err, "Publish attestation to subnet failed"); } else { - info!(slot = slot, "Broadcasted attestation"); + info!( + slot = slot, + validator = validator_id, + subnet_id = subnet_id, + "Broadcasted attestation to subnet" + ); } } Err(err) => { @@ -738,6 +766,21 @@ where } } } + OutboundP2pRequest::GossipAggregation(signed_aggregated_attestation) => { + let slot = signed_aggregated_attestation.data.slot.0; + match signed_aggregated_attestation.to_ssz() { + Ok(bytes) => { + if let Err(err) = self.publish_to_topic(GossipsubKind::Aggregation, bytes) { + warn!(slot = slot, ?err, "Publish aggregation failed"); + } else { + info!(slot = slot, "Broadcasted aggregated attestation"); + } + } + Err(err) => { + warn!(slot = slot, ?err, "Serialize aggregation failed"); + } + } + } OutboundP2pRequest::RequestBlocksByRoot(roots) => { if let Some(peer_id) = self.get_random_connected_peer() { info!( diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index ff1b732..fae2185 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, fmt::Display}; use anyhow::{Result, anyhow}; use async_trait::async_trait; -use containers::{SignedAttestation, SignedBlockWithAttestation}; +use containers::{SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation}; use metrics::METRICS; use serde::{Deserialize, Serialize}; use ssz::H256; @@ -136,6 +136,12 @@ pub enum ChainMessage { is_trusted: bool, should_gossip: bool, }, + /// Devnet-3: Process aggregated attestation from aggregation topic + ProcessAggregation { + signed_aggregated_attestation: SignedAggregatedAttestation, + is_trusted: bool, + should_gossip: bool, + }, } impl ChainMessage { @@ -180,6 +186,16 @@ impl Display for ChainMessage { signed_attestation.message.slot.0 ) } + ChainMessage::ProcessAggregation { + signed_aggregated_attestation, + .. + } => { + write!( + f, + "ProcessAggregation(slot={})", + signed_aggregated_attestation.data.slot.0 + ) + } } } } @@ -187,7 +203,11 @@ impl Display for ChainMessage { #[derive(Debug, Clone)] pub enum OutboundP2pRequest { GossipBlockWithAttestation(SignedBlockWithAttestation), - GossipAttestation(SignedAttestation), + /// Devnet-3: Gossip attestation to subnet-specific topic + /// Contains (attestation, subnet_id) + GossipAttestation(SignedAttestation, u64), + /// Devnet-3: Gossip aggregated attestation to aggregation topic + GossipAggregation(SignedAggregatedAttestation), RequestBlocksByRoot(Vec), } diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 949aadc..cb9a241 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -7,14 +7,14 @@ use containers::{ use ethereum_types::H256; use features::Feature; use fork_choice::{ - handlers::{on_attestation, on_block, on_tick}, + handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick}, store::{INTERVALS_PER_SLOT, Store, get_forkchoice_store}, }; use http_api::HttpServerConfig; use libp2p_identity::Keypair; use metrics::{METRICS, Metrics}; use networking::gossipsub::config::GossipsubConfig; -use networking::gossipsub::topic::get_topics; +use networking::gossipsub::topic::{compute_subnet_id, get_subscription_topics}; use networking::network::{NetworkService, NetworkServiceConfig}; use networking::types::{ChainMessage, OutboundP2pRequest}; use ssz::{PersistentList, SszHash}; @@ -136,6 +136,16 @@ struct Args { /// List of optional runtime features to enable #[clap(long, value_delimiter = ',')] features: Vec, + + /// Enable aggregator mode (devnet-3) + /// When enabled, this node will aggregate attestations at interval 2 + #[arg(long = "is-aggregator", default_value_t = false)] + is_aggregator: bool, + + /// Override attestation committee count (devnet-3) + /// When set, uses this value instead of the hardcoded default + #[arg(long = "attestation-committee-count")] + attestation_committee_count: Option, } #[tokio::main] @@ -264,16 +274,18 @@ async fn main() -> Result<()> { if let Some(ref keys_dir) = args.hash_sig_key_dir { let keys_path = std::path::Path::new(keys_dir); if keys_path.exists() { - match ValidatorService::new_with_keys( + match ValidatorService::new_with_keys_and_aggregator( config.clone(), num_validators, keys_path, + args.is_aggregator, ) { Ok(service) => { info!( node_id = %node_id, indices = ?config.validator_indices, keys_dir = ?keys_path, + aggregator = args.is_aggregator, "Validator mode enabled with XMSS signing" ); Some(service) @@ -283,7 +295,7 @@ async fn main() -> Result<()> { "Failed to load XMSS keys: {}, falling back to zero signatures", e ); - Some(ValidatorService::new(config, num_validators)) + Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) } } } else { @@ -291,15 +303,16 @@ async fn main() -> Result<()> { "Hash-sig key directory not found: {:?}, using zero signatures", keys_path ); - Some(ValidatorService::new(config, num_validators)) + Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) } } else { info!( node_id = %node_id, indices = ?config.validator_indices, + aggregator = args.is_aggregator, "Validator mode enabled (no --hash-sig-key-dir specified - using zero signatures)" ); - Some(ValidatorService::new(config, num_validators)) + Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) } } Err(e) => { @@ -313,7 +326,9 @@ async fn main() -> Result<()> { }; let fork = "devnet0".to_string(); - let gossipsub_topics = get_topics(fork); + // Devnet-3: Non-aggregators only subscribe to Block, Attestation, Aggregation + // Aggregators also subscribe to AttestationSubnet topics to collect attestations + let gossipsub_topics = get_subscription_topics(fork, args.is_aggregator); let mut gossipsub_config = GossipsubConfig::new(); gossipsub_config.set_topics(gossipsub_topics); @@ -387,7 +402,8 @@ async fn main() -> Result<()> { }); let chain_handle = task::spawn(async move { - let mut tick_interval = interval(Duration::from_millis(1000)); + // Devnet-3: 5 intervals per slot at 800ms each (4 second slots) + let mut tick_interval = interval(Duration::from_millis(800)); let mut last_logged_slot = 0u64; let mut last_status_slot: Option = None; let mut last_proposal_slot: Option = None; @@ -399,11 +415,12 @@ async fn main() -> Result<()> { loop { tokio::select! { _ = tick_interval.tick() => { - let now = SystemTime::now() + // Devnet-3: on_tick expects time in milliseconds + let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs(); - on_tick(&mut store, now, false); + .as_millis() as u64; + on_tick(&mut store, now_millis, false); let current_slot = store.time / INTERVALS_PER_SLOT; let current_interval = store.time % INTERVALS_PER_SLOT; @@ -435,11 +452,11 @@ async fn main() -> Result<()> { ); // Synchronize store time with wall clock before processing own block - let now = SystemTime::now() + let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs(); - on_tick(&mut store, now, false); + .as_millis() as u64; + on_tick(&mut store, now_millis, false); match on_block(&mut store, signed_block.clone()) { Ok(()) => { @@ -467,16 +484,18 @@ async fn main() -> Result<()> { let attestations = vs.create_attestations(&store, Slot(current_slot)); for signed_att in attestations { let validator_id = signed_att.validator_id; + let subnet_id = compute_subnet_id(validator_id); info!( slot = current_slot, validator = validator_id, - "Broadcasting attestation" + subnet_id = subnet_id, + "Broadcasting attestation to subnet" ); match on_attestation(&mut store, signed_att.clone(), false) { Ok(()) => { if let Err(e) = chain_outbound_sender.send( - OutboundP2pRequest::GossipAttestation(signed_att) + OutboundP2pRequest::GossipAttestation(signed_att, subnet_id) ) { warn!("Failed to gossip attestation: {}", e); } @@ -489,9 +508,28 @@ async fn main() -> Result<()> { } } 2 => { - info!(slot = current_slot, tick = store.time, "Computing safe target"); + // Interval 2: Aggregation phase (devnet-3) + if let Some(ref vs) = validator_service { + if let Some(aggregations) = vs.maybe_aggregate(&store, Slot(current_slot)) { + for aggregation in aggregations { + if let Err(e) = chain_outbound_sender.send( + OutboundP2pRequest::GossipAggregation(aggregation) + ) { + warn!("Failed to gossip aggregation: {}", e); + } + } + info!(slot = current_slot, tick = store.time, "Aggregation phase - broadcast aggregated attestations"); + } else { + info!(slot = current_slot, tick = store.time, "Aggregation phase - no aggregation duty or no attestations"); + } + } } 3 => { + // Interval 3: Safe target update (devnet-3) + info!(slot = current_slot, tick = store.time, "Computing safe target"); + } + 4 => { + // Interval 4: Accept attestations (devnet-3) info!(slot = current_slot, tick = store.time, "Accepting new attestations"); } _ => {} @@ -526,11 +564,11 @@ async fn main() -> Result<()> { ); // Synchronize store time with wall clock before processing block - let now = SystemTime::now() + let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() - .as_secs(); - on_tick(&mut store, now, false); + .as_millis() as u64; + on_tick(&mut store, now_millis, false); match on_block(&mut store, signed_block_with_attestation.clone()) { Ok(()) => { @@ -584,18 +622,62 @@ async fn main() -> Result<()> { match on_attestation(&mut store, signed_attestation.clone(), false) { Ok(()) => { if should_gossip { + let subnet_id = compute_subnet_id(validator_id); if let Err(e) = outbound_p2p_sender.send( - OutboundP2pRequest::GossipAttestation(signed_attestation) + OutboundP2pRequest::GossipAttestation(signed_attestation, subnet_id) ) { warn!("Failed to gossip attestation: {}", e); } else { - info!(slot = att_slot, "Broadcasted attestation"); + info!(slot = att_slot, subnet_id = subnet_id, "Broadcasted attestation to subnet"); } } } Err(e) => warn!("Error processing attestation: {}", e), } } + ChainMessage::ProcessAggregation { + signed_aggregated_attestation, + should_gossip, + .. + } => { + let agg_slot = signed_aggregated_attestation.data.slot.0; + let validator_count = signed_aggregated_attestation + .proof + .participants + .0 + .iter() + .filter(|b| **b) + .count(); + + // Devnet-3: Process aggregated attestation for safe target computation + match on_aggregated_attestation(&mut store, signed_aggregated_attestation.clone()) { + Ok(_) => { + info!( + slot = agg_slot, + validators = validator_count, + "Processed aggregated attestation for safe target" + ); + } + Err(e) => { + warn!( + slot = agg_slot, + error = %e, + "Failed to process aggregated attestation" + ); + } + } + + // Gossip the aggregation if needed + if should_gossip { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::GossipAggregation(signed_aggregated_attestation) + ) { + warn!("Failed to gossip aggregation: {}", e); + } else { + info!(slot = agg_slot, "Broadcasted aggregation"); + } + } + } } } } diff --git a/lean_client/test_vectors/fork_choice/devnet/fc/test_fork_choice_reorgs/test_reorg_with_slot_gaps.json b/lean_client/test_vectors/fork_choice/devnet/fc/test_fork_choice_reorgs/test_reorg_with_slot_gaps.json index 4be5081..21e28a3 100644 --- a/lean_client/test_vectors/fork_choice/devnet/fc/test_fork_choice_reorgs/test_reorg_with_slot_gaps.json +++ b/lean_client/test_vectors/fork_choice/devnet/fc/test_fork_choice_reorgs/test_reorg_with_slot_gaps.json @@ -258,7 +258,7 @@ "headRootLabel": "fork_a_7" }, "stepType": "tick", - "time": 31 + "time": 32 }, { "valid": true, @@ -345,7 +345,7 @@ "headRootLabel": "fork_b_9" }, "stepType": "tick", - "time": 39 + "time": 40 } ], "maxSlot": 9, diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index 6d85891..a67b745 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -4,12 +4,13 @@ use std::path::Path; use anyhow::{Context, Result, anyhow, bail}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, AttestationSignatures, Block, - BlockSignatures, BlockWithAttestation, Checkpoint, SignedAttestation, - SignedBlockWithAttestation, Slot, + AggregatedSignatureProof, AggregationBits, + Attestation, AttestationData, AttestationSignatures, Block, BlockSignatures, + BlockWithAttestation, Checkpoint, SignedAggregatedAttestation, SignedAttestation, + SignedBlockWithAttestation, Slot, SignatureKey, }; -use ethereum_types::H256; -use fork_choice::store::{Store, get_proposal_head, produce_block_with_signatures}; +use fork_choice::store::{Store, produce_block_with_signatures}; +use ssz::H256; use metrics::{METRICS, stop_and_discard, stop_and_record}; use ssz::SszHash; use tracing::{info, warn}; @@ -56,14 +57,21 @@ pub struct ValidatorService { pub config: ValidatorConfig, pub num_validators: u64, key_manager: Option, + /// Whether this node performs aggregation duties (devnet-3) + is_aggregator: bool, } impl ValidatorService { pub fn new(config: ValidatorConfig, num_validators: u64) -> Self { + Self::new_with_aggregator(config, num_validators, false) + } + + pub fn new_with_aggregator(config: ValidatorConfig, num_validators: u64, is_aggregator: bool) -> Self { info!( node_id = %config.node_id, indices = ?config.validator_indices, total_validators = num_validators, + is_aggregator = is_aggregator, "VALIDATOR INITIALIZED SUCCESSFULLY" ); @@ -77,6 +85,7 @@ impl ValidatorService { config, num_validators, key_manager: None, + is_aggregator, } } @@ -84,6 +93,15 @@ impl ValidatorService { config: ValidatorConfig, num_validators: u64, keys_dir: impl AsRef, + ) -> Result> { + Self::new_with_keys_and_aggregator(config, num_validators, keys_dir, false) + } + + pub fn new_with_keys_and_aggregator( + config: ValidatorConfig, + num_validators: u64, + keys_dir: impl AsRef, + is_aggregator: bool, ) -> Result> { let mut key_manager = KeyManager::new(keys_dir)?; @@ -97,6 +115,7 @@ impl ValidatorService { indices = ?config.validator_indices, total_validators = num_validators, keys_loaded = config.validator_indices.len(), + is_aggregator = is_aggregator, "VALIDATOR INITIALIZED WITH XMSS KEYS" ); @@ -110,6 +129,7 @@ impl ValidatorService { config, num_validators, key_manager: Some(key_manager), + is_aggregator, }) } @@ -126,6 +146,125 @@ impl ValidatorService { } } + /// Check if this node is an aggregator for the given slot (devnet-3) + /// For devnet-3, aggregator selection is simplified: a node is an aggregator + /// if it has validator duties and is_aggregator is enabled via config + pub fn is_aggregator_for_slot(&self, _slot: Slot) -> bool { + self.is_aggregator && !self.config.validator_indices.is_empty() + } + + /// Perform aggregation duty if this node is an aggregator (devnet-3) + /// Collects signatures from gossip_signatures and creates aggregated attestations + /// Returns None if not an aggregator or no signatures to aggregate + pub fn maybe_aggregate(&self, store: &Store, slot: Slot) -> Option> { + if !self.is_aggregator_for_slot(slot) { + return None; + } + + // Get the head state to access validator public keys + let head_state = store.states.get(&store.head)?; + + // Group signatures by data_root + // SignatureKey contains (validator_id, data_root) + let mut groups: HashMap> = HashMap::new(); + + for (sig_key, signature) in &store.gossip_signatures { + groups + .entry(sig_key.data_root) + .or_default() + .push((sig_key.validator_id, signature.clone())); + } + + if groups.is_empty() { + info!(slot = slot.0, "No signatures to aggregate"); + return None; + } + + let mut aggregated_attestations = Vec::new(); + + for (data_root, validator_sigs) in groups { + // Look up attestation data by its hash (data_root) + // This ensures we get the exact attestation that was signed, + // matching ream's attestation_data_by_root_provider approach + let Some(attestation_data) = store.attestation_data_by_root.get(&data_root).cloned() else { + warn!( + data_root = %format!("0x{:x}", data_root), + "Could not find attestation data for aggregation group" + ); + continue; + }; + + // Only aggregate attestations for the current slot + if attestation_data.slot != slot { + continue; + } + + // Collect validator IDs, public keys, and signatures + // IMPORTANT: Must sort by validator_id to match ream/zeam behavior. + // The participants bitfield is iterated in ascending order during verification, + // so the proof must be created with public_keys/signatures in the same order. + let mut entries: Vec<(u64, Signature)> = validator_sigs + .into_iter() + .filter(|(vid, _)| head_state.validators.get(*vid).is_ok()) + .collect(); + entries.sort_by_key(|(vid, _)| *vid); + + let mut validator_ids = Vec::new(); + let mut public_keys = Vec::new(); + let mut signatures = Vec::new(); + + for (vid, sig) in entries { + // Get public key from state validators (already filtered above) + let validator = head_state.validators.get(vid).unwrap(); + validator_ids.push(vid); + public_keys.push(validator.pubkey.clone()); + signatures.push(sig); + } + + if validator_ids.is_empty() { + continue; + } + + // Create aggregation bits from validator IDs + let participants = AggregationBits::from_validator_indices(&validator_ids); + + // Create the aggregated signature proof + // Uses attestation_data.slot as epoch (matches ream's approach) + let proof = match AggregatedSignatureProof::aggregate( + participants, + public_keys, + signatures, + data_root, + attestation_data.slot.0 as u32, + ) { + Ok(p) => p, + Err(e) => { + warn!(error = %e, "Failed to create aggregated signature proof"); + continue; + } + }; + + info!( + slot = slot.0, + validators = validator_ids.len(), + data_root = %format!("0x{:x}", data_root), + "Created aggregated attestation" + ); + + // Create SignedAggregatedAttestation matching ream/zeam structure + aggregated_attestations.push(SignedAggregatedAttestation { + data: attestation_data, + proof, + }); + } + + if aggregated_attestations.is_empty() { + None + } else { + Some(aggregated_attestations) + } + } + /// Build a block proposal for the given slot pub fn build_block_proposal( &self,