From e48db65be93956b2fc067673862cb70354aae101 Mon Sep 17 00:00:00 2001 From: Hamza Khalid Date: Mon, 16 Mar 2026 22:25:49 +0500 Subject: [PATCH] fix: replace panic in snapshot buffer --- crates/events/src/bus_handle.rs | 4 + crates/events/src/sequencer.rs | 4 + crates/events/src/snapshot_buffer/batch.rs | 13 +- .../src/snapshot_buffer/batch_router.rs | 161 ++++++++---------- .../src/snapshot_buffer/snapshot_buffer.rs | 58 +++---- .../src/snapshot_buffer/timelock_queue.rs | 34 ++-- crates/sync/src/sync.rs | 2 +- 7 files changed, 124 insertions(+), 152 deletions(-) diff --git a/crates/events/src/bus_handle.rs b/crates/events/src/bus_handle.rs index 87ce6d08da..73e2505622 100644 --- a/crates/events/src/bus_handle.rs +++ b/crates/events/src/bus_handle.rs @@ -178,6 +178,10 @@ impl EventPublisher> for BusHandle { } impl BusHandle { + pub async fn naked_dispatch_async(&self, event: EnclaveEvent) -> Result<()> { + self.sequencer.send(event).await?; + Ok(()) + } fn publish_from_remote_impl( &self, data: impl Into, diff --git a/crates/events/src/sequencer.rs b/crates/events/src/sequencer.rs index 6e8b6cb40b..8298cbb387 100644 --- a/crates/events/src/sequencer.rs +++ b/crates/events/src/sequencer.rs @@ -9,6 +9,7 @@ use crate::{ EnclaveEvent, EventBus, Sequenced, Unsequenced, }; use actix::{Actor, Addr, AsyncContext, Handler, Recipient}; +use e3_utils::MAILBOX_LIMIT; /// Component to sequence the storage of events pub struct Sequencer { @@ -35,6 +36,9 @@ impl Sequencer { impl Actor for Sequencer { type Context = actix::Context; + fn started(&mut self, ctx: &mut Self::Context) { + ctx.set_mailbox_capacity(MAILBOX_LIMIT); + } } impl Handler> for Sequencer { diff --git a/crates/events/src/snapshot_buffer/batch.rs b/crates/events/src/snapshot_buffer/batch.rs index 722cb069c5..1fa1cea774 100644 --- a/crates/events/src/snapshot_buffer/batch.rs +++ b/crates/events/src/snapshot_buffer/batch.rs @@ -8,7 +8,7 @@ use std::mem::replace; use actix::{Actor, ActorContext, Addr, AsyncContext, Handler, Message, Recipient}; use e3_utils::MAILBOX_LIMIT; -use crate::{trap, Die, EType, Insert, InsertBatch, PanicDispatcher}; +use crate::{Die, Insert, InsertBatch}; #[derive(Message)] #[rtype(result = "()")] @@ -49,13 +49,10 @@ impl Handler for Batch { type Result = (); fn handle(&mut self, _: Flush, ctx: &mut Self::Context) -> Self::Result { let inserts = replace(&mut self.inserts, Vec::new()); - trap(EType::IO, &PanicDispatcher::new(), || { - if inserts.len() > 0 { - self.db.try_send(InsertBatch::new(inserts))?; - } - ctx.notify(Die); - Ok(()) - }) + if !inserts.is_empty() { + self.db.do_send(InsertBatch::new(inserts)); + } + ctx.notify(Die); } } diff --git a/crates/events/src/snapshot_buffer/batch_router.rs b/crates/events/src/snapshot_buffer/batch_router.rs index a7882a25da..f6cbd49e56 100644 --- a/crates/events/src/snapshot_buffer/batch_router.rs +++ b/crates/events/src/snapshot_buffer/batch_router.rs @@ -9,14 +9,13 @@ use super::{ AggregateConfig, UpdateDestination, }; use crate::{ - trap, AggregateId, EType, EnclaveEvent, EventContextAccessors, EventContextSeq, Insert, - InsertBatch, PanicDispatcher, Sequenced, StoreKeys, + AggregateId, EnclaveEvent, EventContextAccessors, EventContextSeq, Insert, InsertBatch, + Sequenced, StoreKeys, }; use actix::{Actor, Addr, Handler, Message, Recipient}; -use anyhow::Context; use e3_utils::MAILBOX_LIMIT; use std::{collections::HashMap, sync::Arc, time::Duration}; -use tracing::debug; +use tracing::{debug, error}; type Seq = u64; @@ -93,32 +92,29 @@ impl BatchRouter { impl Handler for BatchRouter { type Result = (); fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - // Messages without context go straight to disk - // This is probably direct datastore manipulation - let Some(ctx) = msg.ctx() else { - debug!("Message without context. Flushing straight to disk."); - self.db.try_send(InsertBatch::new(vec![msg]))?; - return Ok(()); - }; - - // Route to existing batch, or fall back to disk - match self.batches.get(&ctx.seq()) { - Some(batch) => { - debug!("Forwarding to batch actor for seq={}", ctx.seq()); - batch.try_send(msg)?; - } - // This must mean that this insert is late - None => { - debug!( - "No batch available for seq={} assuming this is late. Flushing to disk.", - ctx.seq() - ); - self.db.try_send(InsertBatch::new(vec![msg]))?; - } + // Messages without context go straight to disk + // This is probably direct datastore manipulation + let Some(ctx) = msg.ctx() else { + debug!("Message without context. Flushing straight to disk."); + self.db.do_send(InsertBatch::new(vec![msg])); + return; + }; + + // Route to existing batch, or fall back to disk + match self.batches.get(&ctx.seq()) { + Some(batch) => { + debug!("Forwarding to batch actor for seq={}", ctx.seq()); + batch.do_send(msg); + } + // This must mean that this insert is late + None => { + debug!( + "No batch available for seq={} assuming this is late. Flushing to disk.", + ctx.seq() + ); + self.db.do_send(InsertBatch::new(vec![msg])); } - Ok(()) - }) + } } } @@ -126,80 +122,73 @@ impl Handler> for BatchRouter { type Result = (); fn handle(&mut self, msg: EnclaveEvent, _: &mut Self::Context) -> Self::Result { let ec = msg.get_ctx(); - trap(EType::IO, &PanicDispatcher::new(), || { - let prev_seq = ec.seq() - 1; - if self.batches.contains_key(&prev_seq) { - let prev_agg = self - .aggregates - .get(&prev_seq) - .context("invariant: prev_agg MUST exist if batches has a batch")?; - - debug!( - "Preparing timelock to clear batch for seq={}, agg={}", - prev_seq, prev_agg + let prev_seq = ec.seq() - 1; + if self.batches.contains_key(&prev_seq) { + let Some(prev_agg) = self.aggregates.get(&prev_seq) else { + error!( + "invariant violation: prev_agg must exist if batches has a batch for seq={}", + prev_seq ); - let delay = self.config.get_delay(prev_agg); - - let now = Duration::from_micros(self.clock.now_micros()); - - self.timelock_queue - .try_send(StartTimelock::new(prev_seq, now, delay))?; - } + return; + }; - debug!("Creating batch for {}", ec.seq()); - let agg_id = ec.aggregate_id(); - let highest_block = self.get_highest_block(agg_id, ec.block()); - let batch = Batch::spawn( - self.db.clone(), - vec![ - Insert::new_with_context( - &StoreKeys::aggregate_seq(agg_id), - encode_u64(ec.seq()), - ec.clone(), - ), - Insert::new_with_context( - &StoreKeys::aggregate_block(agg_id), - encode_u64(highest_block), - ec.clone(), - ), - Insert::new_with_context( - &StoreKeys::aggregate_ts(agg_id), - encode_u128(ec.ts()), - ec.clone(), - ), - ], + debug!( + "Preparing timelock to clear batch for seq={}, agg={}", + prev_seq, prev_agg ); + let delay = self.config.get_delay(prev_agg); - self.batches.insert(ec.seq(), batch); - self.aggregates.insert(ec.seq(), ec.aggregate_id()); + let now = Duration::from_micros(self.clock.now_micros()); - Ok(()) - }) + self.timelock_queue + .do_send(StartTimelock::new(prev_seq, now, delay)); + } + + debug!("Creating batch for {}", ec.seq()); + let agg_id = ec.aggregate_id(); + let highest_block = self.get_highest_block(agg_id, ec.block()); + let batch = Batch::spawn( + self.db.clone(), + vec![ + Insert::new_with_context( + &StoreKeys::aggregate_seq(agg_id), + encode_u64(ec.seq()), + ec.clone(), + ), + Insert::new_with_context( + &StoreKeys::aggregate_block(agg_id), + encode_u64(highest_block), + ec.clone(), + ), + Insert::new_with_context( + &StoreKeys::aggregate_ts(agg_id), + encode_u128(ec.ts()), + ec.clone(), + ), + ], + ); + + self.batches.insert(ec.seq(), batch); + self.aggregates.insert(ec.seq(), ec.aggregate_id()); } } impl Handler for BatchRouter { type Result = (); fn handle(&mut self, msg: FlushSeq, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - debug!("Flushing sequence... {}", msg.seq()); - if let Some(batch) = self.batches.get(&msg.seq()) { - batch.try_send(Flush)?; - self.batches.remove(&msg.seq()); - self.aggregates.remove(&msg.seq()); - } - Ok(()) - }) + debug!("Flushing sequence... {}", msg.seq()); + if let Some(batch) = self.batches.get(&msg.seq()) { + batch.do_send(Flush); + self.batches.remove(&msg.seq()); + self.aggregates.remove(&msg.seq()); + } } } impl Handler for BatchRouter { type Result = (); fn handle(&mut self, msg: UpdateDestination, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - self.db = msg.0; - Ok(()) - }) + self.db = msg.0; } } diff --git a/crates/events/src/snapshot_buffer/snapshot_buffer.rs b/crates/events/src/snapshot_buffer/snapshot_buffer.rs index b28ddb14ea..68b6f1aa72 100644 --- a/crates/events/src/snapshot_buffer/snapshot_buffer.rs +++ b/crates/events/src/snapshot_buffer/snapshot_buffer.rs @@ -8,7 +8,7 @@ use super::{ timelock_queue::{Clock, StartTimelock, SystemClock, Tick, TimelockQueue}, AggregateConfig, }; -use crate::{trap, EType, EnclaveEvent, Insert, InsertBatch, PanicDispatcher}; +use crate::{EnclaveEvent, Insert, InsertBatch}; use actix::{Actor, Addr, Handler, Message, Recipient}; use anyhow::Result; use e3_utils::MAILBOX_LIMIT; @@ -94,24 +94,18 @@ impl Actor for SnapshotBuffer { impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: FlushSeq, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref router) = self.router { - router.try_send(msg)?; - } - Ok(()) - }) + if let Some(ref router) = self.router { + router.do_send(msg); + } } } impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: StartTimelock, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref timelock) = self.timelock { - timelock.try_send(msg)?; - } - Ok(()) - }) + if let Some(ref timelock) = self.timelock { + timelock.do_send(msg); + } } } @@ -128,49 +122,37 @@ impl Handler for SnapshotBuffer { impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: Insert, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref router) = self.router { - trace!("Forwarding Insert message to batch router..."); - router.try_send(msg)?; - }; - Ok(()) - }) + if let Some(ref router) = self.router { + trace!("Forwarding Insert message to batch router..."); + router.do_send(msg); + } } } impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: EnclaveEvent, _ctx: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref router) = self.router { - router.try_send(msg)?; - } - Ok(()) - }) + if let Some(ref router) = self.router { + router.do_send(msg); + } } } impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: Tick, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref tickable) = self.tickable { - tickable.try_send(msg)?; - } - Ok(()) - }) + if let Some(ref tickable) = self.tickable { + tickable.do_send(msg); + } } } impl Handler for SnapshotBuffer { type Result = (); fn handle(&mut self, msg: UpdateDestination, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - if let Some(ref router) = self.router { - router.try_send(msg)?; - } - Ok(()) - }) + if let Some(ref router) = self.router { + router.do_send(msg); + } } } diff --git a/crates/events/src/snapshot_buffer/timelock_queue.rs b/crates/events/src/snapshot_buffer/timelock_queue.rs index 0b04cc577a..2902a7a40e 100644 --- a/crates/events/src/snapshot_buffer/timelock_queue.rs +++ b/crates/events/src/snapshot_buffer/timelock_queue.rs @@ -3,7 +3,6 @@ // This file is provided WITHOUT ANY WARRANTY; // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::{trap, EType, PanicDispatcher}; use actix::{Actor, Addr, AsyncContext, Handler, Message, Recipient}; use e3_utils::MAILBOX_LIMIT; use std::{ @@ -150,25 +149,22 @@ impl Handler for TimelockQueue { impl Handler for TimelockQueue { type Result = (); fn handle(&mut self, _: Tick, _: &mut Self::Context) -> Self::Result { - trap(EType::IO, &PanicDispatcher::new(), || { - let now_time = Duration::from_micros(self.clock.now_micros()); - debug!( - "Running timelock tick. waiting times: {:?}.", - self.timelocks - .iter() - .map(|t| t.0.expiry.saturating_sub(now_time)) - .collect::>(), - ); - - while self.timelocks.len() > 0 && self.next_timelock_lt(now_time) { - if let Some(tl) = self.timelocks.pop() { - let seq = tl.0.seq; - debug!("Flushing seq {}", seq); - self.batch_router.try_send(FlushSeq(seq))?; - } + let now_time = Duration::from_micros(self.clock.now_micros()); + debug!( + "Running timelock tick. waiting times: {:?}.", + self.timelocks + .iter() + .map(|t| t.0.expiry.saturating_sub(now_time)) + .collect::>(), + ); + + while self.timelocks.len() > 0 && self.next_timelock_lt(now_time) { + if let Some(tl) = self.timelocks.pop() { + let seq = tl.0.seq; + debug!("Flushing seq {}", seq); + self.batch_router.do_send(FlushSeq(seq)); } - Ok(()) - }) + } } } diff --git a/crates/sync/src/sync.rs b/crates/sync/src/sync.rs index b5f20e0b56..4fe10ec225 100644 --- a/crates/sync/src/sync.rs +++ b/crates/sync/src/sync.rs @@ -131,7 +131,7 @@ pub async fn sync( // 9. Publish the new sorted events to the eventstore info!("Publishing historical events to actors..."); for event in historical { - bus.naked_dispatch(event); + bus.naked_dispatch_async(event).await?; } info!("Historical events published.");