diff --git a/Cargo.lock b/Cargo.lock index 3d071b5094..95088c2a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3409,7 +3409,7 @@ dependencies = [ [[package]] name = "e3-hamt" -version = "0.1.14" +version = "0.1.15" dependencies = [ "anyhow", "bincode 1.3.3", diff --git a/crates/events/src/eventstore.rs b/crates/events/src/eventstore.rs index 41fb44fbf2..fb52fe74c4 100644 --- a/crates/events/src/eventstore.rs +++ b/crates/events/src/eventstore.rs @@ -11,7 +11,6 @@ use crate::{ }; use actix::{Actor, Handler}; use anyhow::{bail, Result}; -use e3_utils::{major_issue, MAILBOX_LIMIT}; use tracing::{error, warn}; const MAX_STORAGE_ERRORS: u64 = 10; @@ -40,7 +39,7 @@ impl EventStore { } let seq = self.log.append(&event)?; self.index.insert(ts, seq)?; - sender.try_send(StoreEventResponse(event.into_sequenced(seq)))?; + sender.do_send(StoreEventResponse(event.into_sequenced(seq))); Ok(()) } @@ -93,18 +92,15 @@ impl EventStore { impl Actor for EventStore { type Context = actix::Context; - fn started(&mut self, ctx: &mut Self::Context) { - ctx.set_mailbox_capacity(MAILBOX_LIMIT); - } } impl Handler for EventStore { type Result = (); fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result { if let Err(e) = self.handle_store_event_requested(msg) { - panic!("{}", major_issue("Could not store event in eventstore.", e)) - // panic here because when event storage fails we really need - // to just give up + // Log append or index insert failed — storage is broken. + error!("Event storage failed: {e}"); + panic!("Unrecoverable event storage failure: {e}"); } } } diff --git a/crates/events/src/eventstore_router.rs b/crates/events/src/eventstore_router.rs index ba214df7ff..3f3da61f3b 100644 --- a/crates/events/src/eventstore_router.rs +++ b/crates/events/src/eventstore_router.rs @@ -11,7 +11,7 @@ use crate::{ use crate::{CorrelationId, Die, EnclaveEvent, EventStoreQueryBy, Seq, SeqAgg, Ts, TsAgg}; use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Recipient}; use anyhow::Result; -use e3_utils::{major_issue, MAILBOX_LIMIT}; +use e3_utils::MAILBOX_LIMIT_LARGE; use std::collections::HashMap; use tracing::{debug, error, info, warn}; @@ -98,7 +98,7 @@ impl EventStoreRouter { Self { stores } } - pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) -> Result<()> { + pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) { debug!("Handling store event requested...."); let aggregate_id = msg.event.aggregate_id(); let store_addr = self.stores.get(&aggregate_id).unwrap_or_else(|| { @@ -108,9 +108,7 @@ impl EventStoreRouter { }); let event = msg.event; let sender = msg.sender; - let forwarded_msg = StoreEventRequested::new(event, sender); - store_addr.try_send(forwarded_msg)?; - Ok(()) + store_addr.do_send(StoreEventRequested::new(event, sender)); } pub fn handle_event_store_query_ts( @@ -205,7 +203,7 @@ impl Actor for EventStoreRouter { type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { - ctx.set_mailbox_capacity(MAILBOX_LIMIT); + ctx.set_mailbox_capacity(MAILBOX_LIMIT_LARGE); } } @@ -213,9 +211,7 @@ impl Handler for EventStoreR type Result = (); fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result { - if let Err(e) = self.handle_store_event_requested(msg) { - panic!("{}", major_issue("Could not store event in eventstore.", e)) - } + self.handle_store_event_requested(msg); } } diff --git a/crates/events/src/sequencer.rs b/crates/events/src/sequencer.rs index 976143f742..73e33314da 100644 --- a/crates/events/src/sequencer.rs +++ b/crates/events/src/sequencer.rs @@ -9,8 +9,6 @@ use crate::{ EnclaveEvent, EventBus, Sequenced, Unsequenced, }; use actix::{Actor, Addr, AsyncContext, Handler, Recipient}; -use anyhow::Result; -use e3_utils::{major_issue, MAILBOX_LIMIT_LARGE}; /// Component to sequence the storage of events pub struct Sequencer { @@ -29,48 +27,35 @@ impl Sequencer { } } - fn handle_store_event_response(&self, msg: StoreEventResponse) -> Result<()> { + fn handle_store_event_response(&self, msg: StoreEventResponse) { let event = msg.into_event(); - self.bus.try_send(event)?; - Ok(()) + self.bus.do_send(event); } } impl Actor for Sequencer { type Context = actix::Context; - fn started(&mut self, ctx: &mut Self::Context) { - ctx.set_mailbox_capacity(MAILBOX_LIMIT_LARGE) - } } impl Handler> for Sequencer { type Result = (); fn handle(&mut self, msg: EnclaveEvent, ctx: &mut Self::Context) -> Self::Result { - if let Err(e) = self - .eventstore - .try_send(StoreEventRequested::new(msg, ctx.address())) - { - panic!("{}", major_issue("Could not store event in eventstore.", e)) - } + self.eventstore + .do_send(StoreEventRequested::new(msg, ctx.address())); } } impl Handler for Sequencer { type Result = (); fn handle(&mut self, msg: StoreEventResponse, _: &mut Self::Context) -> Self::Result { - if let Err(e) = self.handle_store_event_response(msg) { - panic!( - "{}", - major_issue("Could not send event to snapshot_buffer or bus.", e) - ) - } + self.handle_store_event_response(msg); } } #[cfg(test)] mod tests { use e3_ciphernode_builder::EventSystem; - use e3_events::{EnclaveEvent, EventPublisher, TakeEvents, TestEvent}; + use e3_events::{EnclaveEvent, EventPublisher, GetEvents, TakeEvents, TestEvent}; #[actix::test] async fn it_adds_seqence_numbers_to_events() -> anyhow::Result<()> { @@ -103,4 +88,37 @@ mod tests { ); Ok(()) } + + #[actix::test] + async fn it_handles_event_burst_without_overflow() -> anyhow::Result<()> { + let count = 500usize; + let system = EventSystem::new().with_fresh_bus(); + let bus = system.handle()?.enable("test-burst"); + let history = bus.history(); + + let start = std::time::Instant::now(); + + for i in 0..count { + bus.publish_without_context(TestEvent::new(&format!("evt-{i}"), i as u64))?; + } + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30); + loop { + let events: Vec = history.send(GetEvents::new()).await?; + if events.len() >= count { + let elapsed = start.elapsed(); + println!("All {count} events arrived in {elapsed:?}"); + assert_eq!(events.len(), count, "all events must arrive"); + break; + } + if tokio::time::Instant::now() > deadline { + let got = events.len(); + panic!("test timed out — only {got}/{count} events arrived after 30s"); + } + // Yield to let the actor system make progress. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } + + Ok(()) + } } diff --git a/crates/net/src/document_publisher.rs b/crates/net/src/document_publisher.rs index 610a556805..63d98a6dc2 100644 --- a/crates/net/src/document_publisher.rs +++ b/crates/net/src/document_publisher.rs @@ -56,6 +56,8 @@ pub struct DocumentPublisher { topic: String, /// Set of E3ids we are interested in ids: HashMap, + /// Track DHT content hashes per E3 for cleanup on completion + dht_keys: HashMap>, } impl DocumentPublisher { @@ -72,6 +74,7 @@ impl DocumentPublisher { rx: rx.clone(), topic: topic.into(), ids: HashMap::new(), + dht_keys: HashMap::new(), } } @@ -129,6 +132,16 @@ impl DocumentPublisher { fn handle_e3_request_complete(&mut self, event: E3RequestComplete) -> Result<()> { self.ids.remove(&event.e3_id); + if let Some(keys) = self.dht_keys.remove(&event.e3_id) { + if !keys.is_empty() { + info!( + "Pruning {} DHT records for completed E3 {}", + keys.len(), + event.e3_id + ); + let _ = self.tx.try_send(NetCommand::DhtRemoveRecords { keys }); + } + } Ok(()) } } @@ -168,6 +181,13 @@ impl Handler> for DocumentPublisher { ) -> Self::Result { let tx = self.tx.clone(); let (msg, ec) = msg.into_components(); + + let key = ContentHash::from_content(&msg.value); + self.dht_keys + .entry(msg.meta.e3_id.clone()) + .or_default() + .push(key); + let rx = self.rx.clone(); let bus = self.bus.clone(); let topic = self.topic.clone(); diff --git a/crates/net/src/events.rs b/crates/net/src/events.rs index 929bc3a1a1..c172f1f595 100644 --- a/crates/net/src/events.rs +++ b/crates/net/src/events.rs @@ -122,6 +122,8 @@ pub enum NetCommand { correlation_id: CorrelationId, key: ContentHash, }, + /// Remove DHT records associated with a completed E3 + DhtRemoveRecords { keys: Vec }, /// Shutdown signal Shutdown, /// Called from the syning node to request libp2p events from a random peer node starting diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index d5d32b2df0..156b8a867d 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -16,7 +16,7 @@ use libp2p::{ identity::Keypair, kad::{ self, - store::{MemoryStore, MemoryStoreConfig}, + store::{MemoryStore, MemoryStoreConfig, RecordStore}, Behaviour as KademliaBehaviour, Config as KademliaConfig, GetRecordOk, QueryResult, Quorum, Record, RecordKey, }, @@ -39,6 +39,7 @@ use tracing::{debug, error, info, trace, warn}; const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/enclave/kad/1.0.0"); const MAX_KADEMLIA_PAYLOAD_MB: usize = 10; +const DHT_MAX_RECORDS: usize = 4096; const MAX_GOSSIP_MSG_SIZE_KB: usize = 700; const MAX_CONSECUTIVE_DIAL_FAILURES: u32 = 3; @@ -175,6 +176,7 @@ impl NetInterface { Err(e) => error!("Error processing NetEvent: {e}") } } + } } @@ -220,10 +222,10 @@ fn create_behaviour( .set_max_packet_size(MAX_KADEMLIA_PAYLOAD_MB * 1024 * 1024) .set_query_timeout(Duration::from_secs(30)); let store_config = MemoryStoreConfig { - max_records: 1024, + max_records: DHT_MAX_RECORDS, max_value_bytes: MAX_KADEMLIA_PAYLOAD_MB * 1024 * 1024, max_providers_per_key: usize::MAX, - max_provided_keys: 1024, + max_provided_keys: DHT_MAX_RECORDS, }; let store = MemoryStore::with_config(peer_id, store_config); // Force Server mode: in a private network all nodes should fully participate @@ -544,6 +546,10 @@ async fn process_swarm_command( handle_get_record(swarm, correlator, correlation_id, key)?; Ok(()) } + NetCommand::DhtRemoveRecords { keys } => { + handle_remove_records(swarm, keys); + Ok(()) + } NetCommand::OutgoingSyncRequest { correlation_id, value, @@ -607,6 +613,49 @@ fn handle_dial( Ok(()) } +/// Remove specific DHT records by key. +/// +/// Called when an E3 completes to free up local DHT store space. +/// Records on remote peers are left to expire naturally. +fn handle_remove_records(swarm: &mut Swarm, keys: Vec) { + let store = swarm.behaviour_mut().kademlia.store_mut(); + let mut removed = 0usize; + for key in &keys { + store.remove(&RecordKey::new(key)); + removed += 1; + } + if removed > 0 { + info!( + "DHT removed {} records for completed E3 ({} remaining)", + removed, + store.records().count() + ); + } +} + +/// Evict expired records from the DHT store. +/// +/// `MemoryStore` does not check expiration on `put()` — it simply counts +/// all records, expired or not. This helper removes stale entries so that +/// the `max_records` budget reflects only live data. +/// +/// This is a fallback safety net — primary cleanup happens per-E3 via +/// `handle_remove_records` when an E3 completes. +fn prune_expired_dht_records(swarm: &mut Swarm) { + let now = Instant::now(); + let store = swarm.behaviour_mut().kademlia.store_mut(); + let before = store.records().count(); + store.retain(|_, r| r.expires.map_or(true, |e| e > now)); + let after = store.records().count(); + if before != after { + info!( + "DHT pruned {} expired records ({} remaining)", + before - after, + after + ); + } +} + fn handle_put_record( swarm: &mut Swarm, event_tx: &broadcast::Sender, @@ -630,13 +679,36 @@ fn handle_put_record( // not the actual cluster size. With a routing table of ~21 entries, // it required 11 peers to acknowledge the record, which is impossible // in a 4-node cluster. - .put_record(record, Quorum::One) + .put_record(record.clone(), Quorum::One) { Ok(qid) => { - // QueryId is returned synchronously and we immediately add it to the correlator so race conditions should not be an issue. correlator.track(qid, correlation_id); debug!("PUT RECORD OK qid={:?} cid={}", qid, correlation_id); } + Err(kad::store::Error::MaxRecords) => { + warn!("DHT store full (MaxRecords) — attempting fallback expired-record prune"); + prune_expired_dht_records(swarm); + match swarm + .behaviour_mut() + .kademlia + .put_record(record, Quorum::One) + { + Ok(qid) => { + correlator.track(qid, correlation_id); + debug!( + "PUT RECORD OK (after prune) qid={:?} cid={}", + qid, correlation_id + ); + } + Err(error) => { + error!("DHT put failed even after pruning expired records: {error:?}"); + event_tx.send(NetEvent::DhtPutRecordError { + correlation_id, + error: PutOrStoreError::StoreError(error), + })?; + } + } + } Err(error) => { event_tx.send(NetEvent::DhtPutRecordError { correlation_id, @@ -757,3 +829,108 @@ impl PeerFailureTracker { self.failures.remove(peer_id); } } + +#[cfg(test)] +mod tests { + use libp2p::kad::store::{MemoryStore, MemoryStoreConfig, RecordStore}; + use libp2p::kad::{Record, RecordKey}; + use libp2p::PeerId; + use std::time::{Duration, Instant}; + + #[test] + fn expired_records_are_pruned_on_full_store() { + let peer_id = PeerId::random(); + let config = MemoryStoreConfig { + max_records: 5, + max_value_bytes: 1024, + max_providers_per_key: 1, + max_provided_keys: 5, + }; + let mut store = MemoryStore::with_config(peer_id, config); + + let past = Instant::now().checked_sub(Duration::from_secs(1)).unwrap(); + for i in 0..5 { + let record = Record { + key: RecordKey::new(&format!("expired-{i}").into_bytes()), + value: vec![i as u8], + publisher: None, + expires: Some(past), + }; + store.put(record).expect("should succeed while under limit"); + } + + // Store is full — new put must fail + let new_record = Record { + key: RecordKey::new(&b"new-record".to_vec()), + value: vec![42], + publisher: None, + expires: Some(Instant::now() + Duration::from_secs(3600)), + }; + assert!( + store.put(new_record.clone()).is_err(), + "put should fail when store is at max_records" + ); + + let now = Instant::now(); + store.retain(|_, r| r.expires.map_or(true, |e| e > now)); + + assert_eq!( + store.records().count(), + 0, + "all expired records should be pruned" + ); + + store + .put(new_record) + .expect("put should succeed after pruning expired records"); + assert_eq!(store.records().count(), 1); + } + + #[test] + fn non_expired_records_survive_pruning() { + let peer_id = PeerId::random(); + let config = MemoryStoreConfig { + max_records: 5, + max_value_bytes: 1024, + max_providers_per_key: 1, + max_provided_keys: 5, + }; + let mut store = MemoryStore::with_config(peer_id, config); + + let future = Instant::now() + Duration::from_secs(3600); + let past = Instant::now().checked_sub(Duration::from_secs(1)).unwrap(); + + // 3 live records, 2 expired + for i in 0..3 { + store + .put(Record { + key: RecordKey::new(&format!("live-{i}").into_bytes()), + value: vec![i as u8], + publisher: None, + expires: Some(future), + }) + .unwrap(); + } + for i in 0..2 { + store + .put(Record { + key: RecordKey::new(&format!("dead-{i}").into_bytes()), + value: vec![i as u8], + publisher: None, + expires: Some(past), + }) + .unwrap(); + } + + assert_eq!(store.records().count(), 5); + + let now = Instant::now(); + store.retain(|_, r| r.expires.map_or(true, |e| e > now)); + + assert_eq!( + store.records().count(), + 3, + "only live records should remain" + ); + } +}