Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/events/src/bus_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ impl EventPublisher<EnclaveEvent<Unsequenced>> for BusHandle<Enabled> {
}

impl BusHandle<Enabled> {
pub async fn naked_dispatch_async(&self, event: EnclaveEvent<Unsequenced>) -> Result<()> {
self.sequencer.send(event).await?;
Ok(())
}
fn publish_from_remote_impl(
&self,
data: impl Into<EnclaveEventData>,
Expand Down
4 changes: 4 additions & 0 deletions crates/events/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
EnclaveEvent, EventBus, Sequenced, Unsequenced,
};
use actix::{Actor, Addr, AsyncContext, Handler, Recipient};
use e3_utils::MAILBOX_LIMIT;

/// Component to sequence the storage of events
pub struct Sequencer {
Expand All @@ -35,6 +36,9 @@ impl Sequencer {

impl Actor for Sequencer {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(MAILBOX_LIMIT);
}
}

impl Handler<EnclaveEvent<Unsequenced>> for Sequencer {
Expand Down
13 changes: 5 additions & 8 deletions crates/events/src/snapshot_buffer/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::mem::replace;
use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, Recipient};
use e3_utils::MAILBOX_LIMIT;

use crate::{trap, Die, EType, Insert, InsertBatch, PanicDispatcher};
use crate::{Die, Insert, InsertBatch};

#[derive(Message)]
#[rtype(result = "()")]
Expand Down Expand Up @@ -49,13 +49,10 @@ impl Handler<Flush> for Batch {
type Result = ();
fn handle(&mut self, _: Flush, ctx: &mut Self::Context) -> Self::Result {
let inserts = replace(&mut self.inserts, Vec::new());
trap(EType::IO, &PanicDispatcher::new(), || {
if inserts.len() > 0 {
self.db.try_send(InsertBatch::new(inserts))?;
}
ctx.notify(Die);
Ok(())
})
if !inserts.is_empty() {
self.db.do_send(InsertBatch::new(inserts));
}
ctx.notify(Die);
}
}

Expand Down
161 changes: 75 additions & 86 deletions crates/events/src/snapshot_buffer/batch_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ use super::{
AggregateConfig, UpdateDestination,
};
use crate::{
trap, AggregateId, EType, EnclaveEvent, EventContextAccessors, EventContextSeq, Insert,
InsertBatch, PanicDispatcher, Sequenced, StoreKeys,
AggregateId, EnclaveEvent, EventContextAccessors, EventContextSeq, Insert, InsertBatch,
Sequenced, StoreKeys,
};
use actix::{Actor, Addr, Handler, Message, Recipient};
use anyhow::Context;
use e3_utils::MAILBOX_LIMIT;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tracing::debug;
use tracing::{debug, error};

type Seq = u64;

Expand Down Expand Up @@ -93,113 +92,103 @@ impl BatchRouter {
impl Handler<Insert> for BatchRouter {
type Result = ();
fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
// Messages without context go straight to disk
// This is probably direct datastore manipulation
let Some(ctx) = msg.ctx() else {
debug!("Message without context. Flushing straight to disk.");
self.db.try_send(InsertBatch::new(vec![msg]))?;
return Ok(());
};

// Route to existing batch, or fall back to disk
match self.batches.get(&ctx.seq()) {
Some(batch) => {
debug!("Forwarding to batch actor for seq={}", ctx.seq());
batch.try_send(msg)?;
}
// This must mean that this insert is late
None => {
debug!(
"No batch available for seq={} assuming this is late. Flushing to disk.",
ctx.seq()
);
self.db.try_send(InsertBatch::new(vec![msg]))?;
}
// Messages without context go straight to disk
// This is probably direct datastore manipulation
let Some(ctx) = msg.ctx() else {
debug!("Message without context. Flushing straight to disk.");
self.db.do_send(InsertBatch::new(vec![msg]));
return;
};

// Route to existing batch, or fall back to disk
match self.batches.get(&ctx.seq()) {
Some(batch) => {
debug!("Forwarding to batch actor for seq={}", ctx.seq());
batch.do_send(msg);
}
// This must mean that this insert is late
None => {
debug!(
"No batch available for seq={} assuming this is late. Flushing to disk.",
ctx.seq()
);
self.db.do_send(InsertBatch::new(vec![msg]));
}
Ok(())
})
}
}
}

impl Handler<EnclaveEvent<Sequenced>> for BatchRouter {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent<Sequenced>, _: &mut Self::Context) -> Self::Result {
let ec = msg.get_ctx();
trap(EType::IO, &PanicDispatcher::new(), || {
let prev_seq = ec.seq() - 1;
if self.batches.contains_key(&prev_seq) {
let prev_agg = self
.aggregates
.get(&prev_seq)
.context("invariant: prev_agg MUST exist if batches has a batch")?;

debug!(
"Preparing timelock to clear batch for seq={}, agg={}",
prev_seq, prev_agg
let prev_seq = ec.seq() - 1;
if self.batches.contains_key(&prev_seq) {
let Some(prev_agg) = self.aggregates.get(&prev_seq) else {
error!(
"invariant violation: prev_agg must exist if batches has a batch for seq={}",
prev_seq
);
let delay = self.config.get_delay(prev_agg);

let now = Duration::from_micros(self.clock.now_micros());

self.timelock_queue
.try_send(StartTimelock::new(prev_seq, now, delay))?;
}
return;
};

debug!("Creating batch for {}", ec.seq());
let agg_id = ec.aggregate_id();
let highest_block = self.get_highest_block(agg_id, ec.block());
let batch = Batch::spawn(
self.db.clone(),
vec![
Insert::new_with_context(
&StoreKeys::aggregate_seq(agg_id),
encode_u64(ec.seq()),
ec.clone(),
),
Insert::new_with_context(
&StoreKeys::aggregate_block(agg_id),
encode_u64(highest_block),
ec.clone(),
),
Insert::new_with_context(
&StoreKeys::aggregate_ts(agg_id),
encode_u128(ec.ts()),
ec.clone(),
),
],
debug!(
"Preparing timelock to clear batch for seq={}, agg={}",
prev_seq, prev_agg
);
let delay = self.config.get_delay(prev_agg);

self.batches.insert(ec.seq(), batch);
self.aggregates.insert(ec.seq(), ec.aggregate_id());
let now = Duration::from_micros(self.clock.now_micros());

Ok(())
})
self.timelock_queue
.do_send(StartTimelock::new(prev_seq, now, delay));
}

debug!("Creating batch for {}", ec.seq());
let agg_id = ec.aggregate_id();
let highest_block = self.get_highest_block(agg_id, ec.block());
let batch = Batch::spawn(
self.db.clone(),
vec![
Insert::new_with_context(
&StoreKeys::aggregate_seq(agg_id),
encode_u64(ec.seq()),
ec.clone(),
),
Insert::new_with_context(
&StoreKeys::aggregate_block(agg_id),
encode_u64(highest_block),
ec.clone(),
),
Insert::new_with_context(
&StoreKeys::aggregate_ts(agg_id),
encode_u128(ec.ts()),
ec.clone(),
),
],
);

self.batches.insert(ec.seq(), batch);
self.aggregates.insert(ec.seq(), ec.aggregate_id());
}
}

impl Handler<FlushSeq> for BatchRouter {
type Result = ();
fn handle(&mut self, msg: FlushSeq, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
debug!("Flushing sequence... {}", msg.seq());
if let Some(batch) = self.batches.get(&msg.seq()) {
batch.try_send(Flush)?;
self.batches.remove(&msg.seq());
self.aggregates.remove(&msg.seq());
}
Ok(())
})
debug!("Flushing sequence... {}", msg.seq());
if let Some(batch) = self.batches.get(&msg.seq()) {
batch.do_send(Flush);
self.batches.remove(&msg.seq());
self.aggregates.remove(&msg.seq());
}
}
}

impl Handler<UpdateDestination> for BatchRouter {
type Result = ();
fn handle(&mut self, msg: UpdateDestination, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
self.db = msg.0;
Ok(())
})
self.db = msg.0;
}
}

Expand Down
58 changes: 20 additions & 38 deletions crates/events/src/snapshot_buffer/snapshot_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{
timelock_queue::{Clock, StartTimelock, SystemClock, Tick, TimelockQueue},
AggregateConfig,
};
use crate::{trap, EType, EnclaveEvent, Insert, InsertBatch, PanicDispatcher};
use crate::{EnclaveEvent, Insert, InsertBatch};
use actix::{Actor, Addr, Handler, Message, Recipient};
use anyhow::Result;
use e3_utils::MAILBOX_LIMIT;
Expand Down Expand Up @@ -94,24 +94,18 @@ impl Actor for SnapshotBuffer {
impl Handler<FlushSeq> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: FlushSeq, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref router) = self.router {
router.try_send(msg)?;
}
Ok(())
})
if let Some(ref router) = self.router {
router.do_send(msg);
}
}
}

impl Handler<StartTimelock> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: StartTimelock, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref timelock) = self.timelock {
timelock.try_send(msg)?;
}
Ok(())
})
if let Some(ref timelock) = self.timelock {
timelock.do_send(msg);
}
}
}

Expand All @@ -128,49 +122,37 @@ impl Handler<SetDependencies> for SnapshotBuffer {
impl Handler<Insert> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref router) = self.router {
trace!("Forwarding Insert message to batch router...");
router.try_send(msg)?;
};
Ok(())
})
if let Some(ref router) = self.router {
trace!("Forwarding Insert message to batch router...");
router.do_send(msg);
}
}
}

impl Handler<EnclaveEvent> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref router) = self.router {
router.try_send(msg)?;
}
Ok(())
})
if let Some(ref router) = self.router {
router.do_send(msg);
}
}
}

impl Handler<Tick> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: Tick, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref tickable) = self.tickable {
tickable.try_send(msg)?;
}
Ok(())
})
if let Some(ref tickable) = self.tickable {
tickable.do_send(msg);
}
}
}

impl Handler<UpdateDestination> for SnapshotBuffer {
type Result = ();
fn handle(&mut self, msg: UpdateDestination, _: &mut Self::Context) -> Self::Result {
trap(EType::IO, &PanicDispatcher::new(), || {
if let Some(ref router) = self.router {
router.try_send(msg)?;
}
Ok(())
})
if let Some(ref router) = self.router {
router.do_send(msg);
}
}
}

Expand Down
Loading
Loading