Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 4 additions & 8 deletions crates/events/src/eventstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::{
};
use actix::{Actor, Handler};
use anyhow::{bail, Result};
use e3_utils::{major_issue, MAILBOX_LIMIT};
use tracing::{error, warn};

const MAX_STORAGE_ERRORS: u64 = 10;
Expand Down Expand Up @@ -40,7 +39,7 @@ impl<I: SequenceIndex, L: EventLog> EventStore<I, L> {
}
let seq = self.log.append(&event)?;
self.index.insert(ts, seq)?;
sender.try_send(StoreEventResponse(event.into_sequenced(seq)))?;
sender.do_send(StoreEventResponse(event.into_sequenced(seq)));
Ok(())
}

Expand Down Expand Up @@ -93,18 +92,15 @@ impl<I: SequenceIndex, L: EventLog> EventStore<I, L> {

impl<I: SequenceIndex, L: EventLog> Actor for EventStore<I, L> {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(MAILBOX_LIMIT);
}
}

impl<I: SequenceIndex, L: EventLog> Handler<StoreEventRequested> for EventStore<I, L> {
type Result = ();
fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result {
if let Err(e) = self.handle_store_event_requested(msg) {
panic!("{}", major_issue("Could not store event in eventstore.", e))
// panic here because when event storage fails we really need
// to just give up
// Log append or index insert failed — storage is broken.
error!("Event storage failed: {e}");
panic!("Unrecoverable event storage failure: {e}");
}
}
Comment thread
hmzakhalid marked this conversation as resolved.
}
Expand Down
14 changes: 5 additions & 9 deletions crates/events/src/eventstore_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use crate::{CorrelationId, Die, EnclaveEvent, EventStoreQueryBy, Seq, SeqAgg, Ts, TsAgg};
use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Recipient};
use anyhow::Result;
use e3_utils::{major_issue, MAILBOX_LIMIT};
use e3_utils::MAILBOX_LIMIT_LARGE;
use std::collections::HashMap;
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -98,7 +98,7 @@ impl<I: SequenceIndex, L: EventLog> EventStoreRouter<I, L> {
Self { stores }
}

pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) -> Result<()> {
pub fn handle_store_event_requested(&mut self, msg: StoreEventRequested) {
debug!("Handling store event requested....");
let aggregate_id = msg.event.aggregate_id();
let store_addr = self.stores.get(&aggregate_id).unwrap_or_else(|| {
Expand All @@ -108,9 +108,7 @@ impl<I: SequenceIndex, L: EventLog> EventStoreRouter<I, L> {
});
let event = msg.event;
let sender = msg.sender;
let forwarded_msg = StoreEventRequested::new(event, sender);
store_addr.try_send(forwarded_msg)?;
Ok(())
store_addr.do_send(StoreEventRequested::new(event, sender));
}

pub fn handle_event_store_query_ts(
Expand Down Expand Up @@ -205,17 +203,15 @@ impl<I: SequenceIndex, L: EventLog> Actor for EventStoreRouter<I, L> {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(MAILBOX_LIMIT);
ctx.set_mailbox_capacity(MAILBOX_LIMIT_LARGE);
}
}

impl<I: SequenceIndex, L: EventLog> Handler<StoreEventRequested> for EventStoreRouter<I, L> {
type Result = ();

fn handle(&mut self, msg: StoreEventRequested, _: &mut Self::Context) -> Self::Result {
if let Err(e) = self.handle_store_event_requested(msg) {
panic!("{}", major_issue("Could not store event in eventstore.", e))
}
self.handle_store_event_requested(msg);
}
}

Expand Down
60 changes: 39 additions & 21 deletions crates/events/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use crate::{
EnclaveEvent, EventBus, Sequenced, Unsequenced,
};
use actix::{Actor, Addr, AsyncContext, Handler, Recipient};
use anyhow::Result;
use e3_utils::{major_issue, MAILBOX_LIMIT_LARGE};

/// Component to sequence the storage of events
pub struct Sequencer {
Expand All @@ -29,48 +27,35 @@ impl Sequencer {
}
}

fn handle_store_event_response(&self, msg: StoreEventResponse) -> Result<()> {
fn handle_store_event_response(&self, msg: StoreEventResponse) {
let event = msg.into_event();
self.bus.try_send(event)?;
Ok(())
self.bus.do_send(event);
}
}

impl Actor for Sequencer {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(MAILBOX_LIMIT_LARGE)
}
}

impl Handler<EnclaveEvent<Unsequenced>> for Sequencer {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent<Unsequenced>, ctx: &mut Self::Context) -> Self::Result {
if let Err(e) = self
.eventstore
.try_send(StoreEventRequested::new(msg, ctx.address()))
{
panic!("{}", major_issue("Could not store event in eventstore.", e))
}
self.eventstore
.do_send(StoreEventRequested::new(msg, ctx.address()));
Comment thread
hmzakhalid marked this conversation as resolved.
}
}

impl Handler<StoreEventResponse> for Sequencer {
type Result = ();
fn handle(&mut self, msg: StoreEventResponse, _: &mut Self::Context) -> Self::Result {
if let Err(e) = self.handle_store_event_response(msg) {
panic!(
"{}",
major_issue("Could not send event to snapshot_buffer or bus.", e)
)
}
self.handle_store_event_response(msg);
}
}

#[cfg(test)]
mod tests {
use e3_ciphernode_builder::EventSystem;
use e3_events::{EnclaveEvent, EventPublisher, TakeEvents, TestEvent};
use e3_events::{EnclaveEvent, EventPublisher, GetEvents, TakeEvents, TestEvent};

#[actix::test]
async fn it_adds_seqence_numbers_to_events() -> anyhow::Result<()> {
Expand Down Expand Up @@ -103,4 +88,37 @@ mod tests {
);
Ok(())
}

#[actix::test]
async fn it_handles_event_burst_without_overflow() -> anyhow::Result<()> {
let count = 500usize;
let system = EventSystem::new().with_fresh_bus();
let bus = system.handle()?.enable("test-burst");
let history = bus.history();

let start = std::time::Instant::now();

for i in 0..count {
bus.publish_without_context(TestEvent::new(&format!("evt-{i}"), i as u64))?;
}

let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
let events: Vec<EnclaveEvent> = history.send(GetEvents::new()).await?;
if events.len() >= count {
let elapsed = start.elapsed();
println!("All {count} events arrived in {elapsed:?}");
assert_eq!(events.len(), count, "all events must arrive");
break;
}
if tokio::time::Instant::now() > deadline {
let got = events.len();
panic!("test timed out — only {got}/{count} events arrived after 30s");
}
// Yield to let the actor system make progress.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

Ok(())
}
}
20 changes: 20 additions & 0 deletions crates/net/src/document_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ pub struct DocumentPublisher {
topic: String,
/// Set of E3ids we are interested in
ids: HashMap<E3id, PartyId>,
/// Track DHT content hashes per E3 for cleanup on completion
dht_keys: HashMap<E3id, Vec<ContentHash>>,
}

impl DocumentPublisher {
Expand All @@ -72,6 +74,7 @@ impl DocumentPublisher {
rx: rx.clone(),
topic: topic.into(),
ids: HashMap::new(),
dht_keys: HashMap::new(),
}
}

Expand Down Expand Up @@ -129,6 +132,16 @@ impl DocumentPublisher {

fn handle_e3_request_complete(&mut self, event: E3RequestComplete) -> Result<()> {
self.ids.remove(&event.e3_id);
if let Some(keys) = self.dht_keys.remove(&event.e3_id) {
if !keys.is_empty() {
info!(
"Pruning {} DHT records for completed E3 {}",
keys.len(),
event.e3_id
);
let _ = self.tx.try_send(NetCommand::DhtRemoveRecords { keys });
}
}
Ok(())
}
}
Expand Down Expand Up @@ -168,6 +181,13 @@ impl Handler<TypedEvent<PublishDocumentRequested>> for DocumentPublisher {
) -> Self::Result {
let tx = self.tx.clone();
let (msg, ec) = msg.into_components();

let key = ContentHash::from_content(&msg.value);
self.dht_keys
.entry(msg.meta.e3_id.clone())
.or_default()
.push(key);

let rx = self.rx.clone();
let bus = self.bus.clone();
let topic = self.topic.clone();
Expand Down
2 changes: 2 additions & 0 deletions crates/net/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub enum NetCommand {
correlation_id: CorrelationId,
key: ContentHash,
},
/// Remove DHT records associated with a completed E3
DhtRemoveRecords { keys: Vec<ContentHash> },
/// Shutdown signal
Shutdown,
/// Called from the syning node to request libp2p events from a random peer node starting
Expand Down
Loading
Loading