diff --git a/crates/evm-helpers/src/block_listener.rs b/crates/evm-helpers/src/block_listener.rs new file mode 100644 index 0000000000..ec7af25ad3 --- /dev/null +++ b/crates/evm-helpers/src/block_listener.rs @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use alloy::{network::Ethereum, providers::Provider, rpc::types::Header}; +use eyre::Result; +use futures::stream::StreamExt; +use std::{future::Future, pin::Pin, sync::Arc}; +use tokio::sync::RwLock; +use tracing::info; + +type BlockHandler = + Box Pin> + Send>> + Send + Sync>; + +#[derive(Clone)] +pub struct BlockListener { + provider: Arc>, + block_handlers: Arc>>, +} + +impl BlockListener { + pub fn new(provider: Arc>) -> Self { + Self { + provider, + block_handlers: Arc::new(RwLock::new(Vec::new())), + } + } + + pub async fn add_block_handler(&self, handler: F) + where + F: Fn(&Header) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + info!("add_block_handler"); + self.block_handlers + .write() + .await + .push(Box::new(move |h: &Header| Box::pin(handler(h)))); + } + + pub async fn listen(&self) -> Result<()> { + let mut stream = self.provider.subscribe_blocks().await?.into_stream(); + while let Some(block) = stream.next().await { + let handlers = self.block_handlers.read().await; + for handler in handlers.iter() { + let fut = handler(&block); + tokio::spawn(async move { + if let Err(e) = fut.await { + eprintln!("Error processing block: {:?}", e); + } + }); + } + } + Ok(()) + } +} diff --git a/crates/evm-helpers/src/contracts.rs b/crates/evm-helpers/src/contracts.rs index 2fae264586..82926e8088 100644 --- a/crates/evm-helpers/src/contracts.rs +++ b/crates/evm-helpers/src/contracts.rs @@ -250,7 +250,7 @@ pub struct EnclaveContractFactory; impl EnclaveContractFactory { /// Create a write-capable contract pub async fn create_write( - http_rpc_url: &str, + rpc_url: &str, contract_address: &str, private_key: &str, ) -> Result> { @@ -261,7 +261,7 @@ impl EnclaveContractFactory { let wallet = EthereumWallet::from(signer); let provider = ProviderBuilder::new() .wallet(wallet) - .connect(http_rpc_url) + .connect(rpc_url) .await?; Ok(EnclaveContract:: { @@ -274,12 +274,12 @@ impl EnclaveContractFactory { /// Create a read-only contract pub async fn create_read( - http_rpc_url: &str, + rpc_url: &str, contract_address: &str, ) -> Result> { let contract_address = contract_address.parse()?; - let provider = ProviderBuilder::new().connect(http_rpc_url).await?; + let provider = ProviderBuilder::new().connect(rpc_url).await?; Ok(EnclaveContract:: { provider: Arc::new(provider), diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/event_listener.rs similarity index 92% rename from crates/evm-helpers/src/listener.rs rename to crates/evm-helpers/src/event_listener.rs index 76e2d97d9e..738d77451c 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/event_listener.rs @@ -16,7 +16,6 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; use tokio::sync::RwLock; -use tracing::info; type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -90,9 +89,13 @@ impl EventListener { Ok(()) } + pub fn provider(&self) -> Arc> { + self.provider.clone() + } + /// Create a contract listener that will listen to events from all addresses. - pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result { - let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); + pub async fn create_contract_listener(rpc_url: &str, addresses: &[&str]) -> Result { + let provider = Arc::new(ProviderBuilder::new().connect(rpc_url).await?); let address = addresses .iter() diff --git a/crates/evm-helpers/src/lib.rs b/crates/evm-helpers/src/lib.rs index da02ad0519..9af64694c9 100644 --- a/crates/evm-helpers/src/lib.rs +++ b/crates/evm-helpers/src/lib.rs @@ -4,6 +4,8 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +pub mod block_listener; pub mod contracts; +pub mod event_listener; pub mod events; -pub mod listener; +pub mod threshold_queue; diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs new file mode 100644 index 0000000000..f93a4bffe9 --- /dev/null +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::{ + cmp::Reverse, + collections::BinaryHeap, + sync::{Arc, RwLock}, +}; + +#[derive(Clone)] +/// An implementation of a ThresholdQueue. This enables releasing elements from a collection when +/// the given timestamp is reached or a given count has reached a certain threshold. +pub struct ThresholdQueue { + inner: Arc>>>, +} + +/// An item that can be added to a threshold queue +pub trait ThresholdItem: Ord { + type Item; + + /// Defines what it means to be withing the threshold eg self.myprop <= threshold + fn within_threshold(&self, threshold: u64) -> bool; + + /// Access the inner item this wraps + fn item(&self) -> Self::Item; +} + +impl ThresholdQueue +where + T: ThresholdItem, +{ + /// Create a new ThresholdQueue + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(BinaryHeap::new())), + } + } + + /// Push an item onto the queue + pub fn push(&self, item: T) { + self.inner + .write() + .expect("Poisoned write in ThresholdQueue") + .push(Reverse(item)); + } + + /// Keep taking items off the queue until `item.within_threshold(threshold)` returns false + pub fn take_until_including(&self, threshold: u64) -> Vec { + let mut found = Vec::new(); + let mut inner = self + .inner + .write() + .expect("Poisoned write in ThresholdQueue"); + + while let Some(Reverse(item)) = inner.peek() { + if item.within_threshold(threshold) { + if let Some(Reverse(item)) = inner.pop() { + found.push(item.item()); + } + } else { + break; + } + } + + found + } +} + +#[cfg(test)] +mod tests { + use super::{ThresholdItem, ThresholdQueue}; + + struct ThreshItem { + val: u64, + rank: u64, + } + + impl Eq for ThreshItem {} + + impl PartialEq for ThreshItem { + fn eq(&self, other: &Self) -> bool { + self.rank == other.rank + } + } + + impl PartialOrd for ThreshItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for ThreshItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.rank.cmp(&other.rank) + } + } + + impl ThresholdItem for ThreshItem { + type Item = u64; + fn item(&self) -> Self::Item { + self.val + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.rank <= threshold + } + } + + #[test] + fn test_collection_is_ordered() { + let queue = ThresholdQueue::new(); + queue.push(ThreshItem { val: 111, rank: 25 }); + queue.push(ThreshItem { + val: 666, + rank: 100, + }); + queue.push(ThreshItem { val: 444, rank: 70 }); + queue.push(ThreshItem { val: 222, rank: 26 }); + let items = queue.take_until_including(70); + + assert_eq!(items, vec![111, 222, 444]); + + let items = queue.take_until_including(101); + + assert_eq!(items, vec![666]); + } +} diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index 34077de664..48a15de85a 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -5,14 +5,18 @@ // or FITNESS FOR A PARTICULAR PURPOSE. mod helpers; -use alloy::sol; -use e3_evm_helpers::listener::EventListener; +use alloy::consensus::BlockHeader; +use alloy::providers::ext::AnvilApi; +use alloy::{node_bindings::Anvil, providers::ProviderBuilder, sol}; +use e3_evm_helpers::block_listener; +use e3_evm_helpers::{block_listener::BlockListener, event_listener::EventListener}; use eyre::Result; use helpers::setup_logs_contract; use std::{ sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use tokio::sync::Mutex; use tokio::time::sleep; sol!( @@ -202,3 +206,47 @@ async fn test_overlapping_listener_handlers() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_block_listener() -> Result<()> { + let anvil = Anvil::new().try_spawn()?; + let provider = Arc::new(ProviderBuilder::new().connect(&anvil.ws_endpoint()).await?); + let block_listener = Arc::new(BlockListener::new(provider.clone())); + let events: Arc>> = Arc::new(Mutex::new(vec![])); + let events_handler = events.clone(); + + // Save each block number to a vector. + block_listener + .add_block_handler(move |block| { + let events = events_handler.clone(); + let blockheight = block.number(); + async move { + let mut events = events.lock().await; + events.push(blockheight); + Ok(()) + } + }) + .await; + + // Start up a listener + let listen_handle = tokio::spawn(async move { + let _ = block_listener.listen().await; + }); + + // Give the listener time to start + sleep(Duration::from_millis(100)).await; + + // Mine a few blocks + provider.anvil_mine(Some(5), None).await?; + + // Wait for the block to be processed + sleep(Duration::from_secs(1)).await; + + // Cancel the listener + listen_handle.abort(); + + let guard = events.lock().await; + assert_eq!(*guard, vec![1, 2, 3, 4, 5]); + + Ok(()) +} diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs new file mode 100644 index 0000000000..4a5694a152 --- /dev/null +++ b/crates/indexer/src/callback_queue.rs @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use e3_evm_helpers::threshold_queue::{ThresholdItem, ThresholdQueue}; +use eyre::Result; +use std::{future::Future, pin::Pin, sync::Arc}; +use tracing::info; + +/// Callback for CallbackQueue +type Callback = Arc Pin> + Send>> + Send + Sync>; + +#[derive(Clone)] +/// A callback that has an execute time associated with it +pub struct TimedCallback { + time: u64, + callback: Callback, +} + +impl ThresholdItem for TimedCallback { + type Item = Callback; + + fn item(&self) -> Self::Item { + self.callback.clone() + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.time <= threshold + } +} + +// We need to ensure Ord is satisfied to use this in the threshold queue +impl Eq for TimedCallback {} + +impl PartialEq for TimedCallback { + fn eq(&self, other: &Self) -> bool { + self.time == other.time + } +} + +impl PartialOrd for TimedCallback { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedCallback { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.time.cmp(&other.time) + } +} + +#[derive(Clone)] +/// A queue of callbacks that can be executed when a given timestamp has been passed. This is a +/// specialization of a ThresholdQueue. +pub struct CallbackQueue { + inner: ThresholdQueue, +} + +impl CallbackQueue { + /// Create a new queue + pub fn new() -> Self { + Self { + inner: ThresholdQueue::new(), + } + } + + /// Push a callback to the queue to be executed at or before the given time. + pub fn push(&self, time: u64, callback: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + info!("ADDING CALLBACK TO time={}", time); + self.inner.push(TimedCallback { + time, + callback: Arc::new(move || Box::pin(callback())), + }) + } + + /// Execute all pending callbacks up to and including the given time + pub async fn execute_until_including(&self, time: u64) -> Result<()> { + info!("execute_until_including..."); + let handlers = self.inner.take_until_including(time); + info!("found {} handlers", handlers.len()); + for callback in handlers { + callback().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + #[tokio::test] + async fn test_single_callback_executes() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.push(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert!(*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_callback_not_executed_before_threshold() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.push(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(50).await.unwrap(); + assert!(!*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_multiple_callbacks_execute() { + let mut queue = CallbackQueue::new(); + let counter = Arc::new(Mutex::new(0)); + + let c1 = counter.clone(); + queue.push(50, move || { + let c = c1.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c2 = counter.clone(); + queue.push(100, move || { + let c = c2.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c3 = counter.clone(); + queue.push(150, move || { + let c = c3.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert_eq!(*counter.lock().unwrap(), 2); + } + + #[tokio::test] + async fn test_error_propagation() { + let mut queue = CallbackQueue::new(); + + queue.push(100, || async { Err(eyre::eyre!("test error")) }); + + let result = queue.execute_until_including(100).await; + assert!(result.is_err()); + } +} diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index e17b755133..155823c962 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -4,29 +4,31 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::E3Repository; - use super::{models::E3, DataStore}; +use crate::callback_queue::CallbackQueue; +use crate::E3Repository; +use alloy::consensus::BlockHeader; use alloy::hex; use alloy::primitives::Uint; use alloy::providers::Provider; use alloy::sol_types::SolEvent; use async_trait::async_trait; use e3_evm_helpers::{ + block_listener::BlockListener, contracts::{ EnclaveContract, EnclaveContractFactory, EnclaveRead, ProviderType, ReadOnly, ReadWrite, }, + event_listener::EventListener, events::{CiphertextOutputPublished, E3Activated, PlaintextOutputPublished}, - listener::EventListener, }; use eyre::eyre; use eyre::Result; use serde::{de::DeserializeOwned, Serialize}; -use std::future::Future; use std::{collections::HashMap, sync::Arc}; +use std::{future::Future, time::Duration}; use thiserror::Error; -use tokio::sync::RwLock; -use tracing::info; +use tokio::{sync::RwLock, time::sleep}; +use tracing::{error, info, warn}; type E3Id = u64; @@ -157,10 +159,12 @@ impl Drop for EnclaveIndexer { pub struct IndexerContext { store: SharedStore, - listener: EventListener, + event_listener: EventListener, + block_listener: BlockListener, contract: EnclaveContract, contract_address: String, chain_id: u64, + callbacks: CallbackQueue, } impl IndexerContext { @@ -168,9 +172,14 @@ impl IndexerContext { self.store.clone() } - pub fn listener(&self) -> EventListener { - self.listener.clone() + pub fn event_listener(&self) -> EventListener { + self.event_listener.clone() + } + + pub fn block_listener(&self) -> BlockListener { + self.block_listener.clone() } + pub fn contract(&self) -> EnclaveContract { self.contract.clone() } @@ -181,16 +190,38 @@ impl IndexerContext { pub fn chain_id(&self) -> u64 { self.chain_id } + + /// Schedule a callback to execute as or after a block with the given timestamp is processed. + /// + /// Useful for handling deadlines or expirations. The callback receives the scheduled + /// timestamp and a reference to the indexer context. + pub fn do_later(self: &Arc, timestamp: u64, callback: F) + where + F: Fn(u64, Arc) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + let callback = Arc::new(callback); + let ctx = Arc::clone(self); + self.callbacks.push(timestamp, move || { + info!("Running callback: time={}", timestamp); + let callback = Arc::clone(&callback); + let ctx = Arc::clone(&ctx); + async move { + callback(timestamp, ctx).await?; + Ok(()) + } + }) + } } impl EnclaveIndexer { pub async fn new_with_in_mem_store( - listener: EventListener, + event_listener: EventListener, contract: EnclaveContract, ) -> Result> { let store = InMemoryStore::new(); - EnclaveIndexer::new(listener, contract, store).await + EnclaveIndexer::new(event_listener, contract, store).await } } @@ -198,30 +229,31 @@ impl EnclaveIndexer { /// Creates an `EnclaveIndexer` with an in-memory store. /// /// Note: `addresses[0]` must be the enclave contract address. - pub async fn from_endpoint_address_in_mem(ws_url: &str, addresses: &[&str]) -> Result { - let listener = EventListener::create_contract_listener(ws_url, addresses).await?; - let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; - EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + pub async fn from_endpoint_address_in_mem(rpc_url: &str, addresses: &[&str]) -> Result { + let event_listener = EventListener::create_contract_listener(rpc_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(rpc_url, addresses[0]).await?; + EnclaveIndexer::::new_with_in_mem_store(event_listener, contract) + .await } /// Creates an `EnclaveIndexer` with a provided in-memory store. /// /// Note: `addresses[0]` must be the enclave contract address. pub async fn from_endpoint_address( - ws_url: &str, + rpc_url: &str, addresses: &[&str], store: InMemoryStore, ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, addresses).await?; - let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; - EnclaveIndexer::new(listener, contract, store).await + let event_listener = EventListener::create_contract_listener(rpc_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(rpc_url, addresses[0]).await?; + EnclaveIndexer::new(event_listener, contract, store).await } } impl EnclaveIndexer { /// Creates a new EnclaveIndexer with a writeable contract. pub async fn new_with_write_contract( - ws_url: &str, + rpc_url: &str, addresses: &[&str], // First address must be contract_address store: S, private_key: &str, @@ -229,9 +261,10 @@ impl EnclaveIndexer { let Some(contract_address) = addresses.first() else { return Err(eyre::eyre!("No addresses provided")); }; + let event_listener = EventListener::create_contract_listener(rpc_url, addresses).await?; EnclaveIndexer::new( - EventListener::create_contract_listener(ws_url, addresses).await?, - EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?, + event_listener, + EnclaveContractFactory::create_write(rpc_url, contract_address, private_key).await?, store, ) .await @@ -240,19 +273,22 @@ impl EnclaveIndexer { impl EnclaveIndexer { pub async fn new( - listener: EventListener, + event_listener: EventListener, contract: EnclaveContract, store: S, ) -> Result { let chain_id = contract.provider.get_chain_id().await?; let contract_address = contract.address().to_string(); + let block_listener = BlockListener::new(event_listener.provider()); let mut instance = Self { ctx: Arc::new(IndexerContext { store: SharedStore::new(Arc::new(RwLock::new(store))), contract, - listener, + event_listener, + block_listener, contract_address, chain_id, + callbacks: CallbackQueue::new(), }), }; instance.setup_listeners().await?; @@ -274,7 +310,7 @@ impl EnclaveIndexer { let ctx_weak = Arc::downgrade(&ctx); self.ctx - .listener + .event_listener .add_event_handler(move |e: E| { let handler = Arc::clone(&handler); let ctx_weak = ctx_weak.clone(); @@ -285,7 +321,7 @@ impl EnclaveIndexer { if let Some(ctx) = ctx_weak.upgrade() { handler(e, ctx).await } else { - println!("Context was dropped!"); + warn!("Context was dropped!"); Ok(()) } } @@ -349,7 +385,7 @@ impl EnclaveIndexer { async fn register_ciphertext_output_published(&mut self) -> Result<()> { self.add_event_handler(move |e: CiphertextOutputPublished, ctx| async move { let store = ctx.store(); - println!( + info!( "CiphertextOutputPublished: e3_id={}, output=0x{}...", e.e3Id, hex::encode(&e.ciphertextOutput[..8.min(e.ciphertextOutput.len())]) @@ -369,7 +405,7 @@ impl EnclaveIndexer { async fn register_plaintext_output_published(&mut self) -> Result<()> { self.add_event_handler(move |e: PlaintextOutputPublished, ctx| async move { let store = ctx.store(); - println!( + info!( "PlaintextOutputPublished: e3_id={}, output=0x{}...", e.e3Id, hex::encode(&e.plaintextOutput[..8.min(e.plaintextOutput.len())]) @@ -385,18 +421,58 @@ impl EnclaveIndexer { Ok(()) } + async fn register_blocktime_callback_handler(&mut self) -> Result<()> { + let callbacks = self.ctx.callbacks.clone(); + self.ctx + .block_listener + .add_block_handler(move |block| { + let timestamp = block.timestamp(); + let blockheight = block.number(); + let callbacks = callbacks.clone(); + async move { + info!("ON BLOCK: {}:{}", blockheight, timestamp); + callbacks.execute_until_including(timestamp).await?; + Ok(()) + } + }) + .await; + Ok(()) + } + async fn setup_listeners(&mut self) -> Result<()> { info!("Setting up listeners for EnclaveIndexer..."); self.register_e3_activated().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; + self.register_blocktime_callback_handler().await?; info!("Listeners have been setup!"); Ok(()) } pub async fn listen(&self) -> Result<()> { info!("Starting EnclaveIndexer listening..."); - self.ctx.listener.listen().await + loop { + let res = tokio::select! { + res = self.ctx.event_listener.listen() => { + match &res { + Ok(_) => warn!("EventListener curiously halted naturally."), + Err(e) => error!("EventListener halted with an error: {e}") + }; + res + } + res = self.ctx.block_listener.listen() => { + match &res { + Ok(_) => warn!("BlockListener curiously halted naturally."), + Err(e) => error!("BlockListener halted with an error: {e}") + }; + res + } + }; + + let secs = res.map(|_| 1).unwrap_or(5); + warn!("Restarting listeners in {}s...", secs); + sleep(Duration::from_secs(secs)).await + } } pub async fn get_e3(&self, e3_id: u64) -> Result { @@ -404,10 +480,6 @@ impl EnclaveIndexer { Ok(e3) } - pub fn get_listener(&self) -> EventListener { - self.ctx.listener.clone() - } - pub fn get_store(&self) -> SharedStore { self.ctx.store.clone() } diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index b42c0bff8c..1a377cd1ac 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -4,6 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +mod callback_queue; mod indexer; pub mod models; mod repo; diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index dc63325587..ebf31046d2 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -186,7 +186,7 @@ async fn test_indexer() -> Result<()> { mod test_memory_leak { - use e3_evm_helpers::{contracts::EnclaveContractFactory, listener::EventListener}; + use e3_evm_helpers::{contracts::EnclaveContractFactory, event_listener::EventListener}; use super::*; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -223,14 +223,13 @@ mod test_memory_leak { EnclaveIndexer::::new_with_in_mem_store(listener, contract).await } + sol! { + #[derive(Debug)] + event TestEvent(); + } + #[tokio::test] async fn test_memory_leak() -> Result<()> { - sol! { - #[derive(Debug)] - event TestEvent(); - - } - DROP_COUNT.store(0, Ordering::SeqCst); CREATE_COUNT.store(0, Ordering::SeqCst); @@ -269,4 +268,50 @@ mod test_memory_leak { Ok(()) } + + #[tokio::test] + async fn test_do_later_memory_leak() -> Result<()> { + DROP_COUNT.store(0, Ordering::SeqCst); + CREATE_COUNT.store(0, Ordering::SeqCst); + + { + let indexer = create_indexer().await?; + let detector = LeakDetector::new(); + + // Schedule a callback far in the future that will never execute + indexer + .add_event_handler(move |_e: TestEvent, ctx| { + let detector = detector.clone(); + async move { + ctx.do_later(u64::MAX, { + move |_timestamp, _ctx| { + let _captured = detector.clone(); + async move { + println!("This should never run"); + Ok(()) + } + } + }); + + Ok(()) + } + }) + .await; + } + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let created = CREATE_COUNT.load(Ordering::SeqCst); + let dropped = DROP_COUNT.load(Ordering::SeqCst); + + println!("Created: {}, Dropped: {}", created, dropped); + + assert_eq!( + created, dropped, + "Memory leak detected in do_later! Created {} objects but only dropped {}", + created, dropped + ); + + Ok(()) + } } diff --git a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json index 3c2f539839..0b5a15cb3e 100644 --- a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json +++ b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json @@ -109,128 +109,5 @@ "MockRISC0Verifier": { "address": "0xa85233C63b9Ee964Add6F2cffe00Fd84eb32338f" } - }, - "localhost": { - "PoseidonT3": { - "blockNumber": 3, - "address": "0x3333333C0A88F9BE4fd23ed0536F9B6c427e3B93" - }, - "MockUSDC": { - "constructorArgs": { - "initialSupply": "1000000" - }, - "blockNumber": 4, - "address": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" - }, - "EnclaveToken": { - "constructorArgs": { - "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" - }, - "blockNumber": 5, - "address": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" - }, - "EnclaveTicketToken": { - "constructorArgs": { - "baseToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", - "registry": "0x0000000000000000000000000000000000000001", - "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" - }, - "blockNumber": 7, - "address": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707" - }, - "SlashingManager": { - "constructorArgs": { - "admin": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "bondingRegistry": "0x0000000000000000000000000000000000000001" - }, - "blockNumber": 8, - "address": "0x0165878A594ca255338adfa4d48449f69242Eb8F" - }, - "BondingRegistry": { - "constructorArgs": { - "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "ticketToken": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707", - "licenseToken": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9", - "registry": "0x0000000000000000000000000000000000000001", - "slashedFundsTreasury": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "ticketPrice": "10000000", - "licenseRequiredBond": "100000000000000000000", - "minTicketBalance": "1", - "exitDelay": "604800" - }, - "proxyRecords": { - "initData": "0x7333fa82000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000005fc8d32690cc91d4c39d9d3abcbd16989f875707000000000000000000000000cf7ed3acca5a467e9e704c703e8d87f634fb0fc90000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb9226600000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000056bc75e2d6310000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000093a80", - "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "proxyAddress": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", - "proxyAdminAddress": "0x94099942864EA81cCF197E9D71ac53310b1468D8", - "implementationAddress": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853" - }, - "blockNumber": 8, - "address": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" - }, - "CiphernodeRegistryOwnable": { - "constructorArgs": { - "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "enclaveAddress": "0x0000000000000000000000000000000000000001", - "submissionWindow": "10" - }, - "proxyRecords": { - "initData": "0x1794bb3c000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000a", - "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "proxyAddress": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", - "proxyAdminAddress": "0x6F1216D1BFe15c98520CA1434FC1d9D57AC95321", - "implementationAddress": "0x8A791620dd6260079BF849Dc5567aDC3F2FdC318" - }, - "blockNumber": 11, - "address": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788" - }, - "Enclave": { - "constructorArgs": { - "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "registry": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", - "bondingRegistry": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", - "feeToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", - "maxDuration": "2592000", - "params": [ - "0x000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001" - ] - }, - "proxyRecords": { - "initData": "0xefe0308b000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266000000000000000000000000610178da211fef7d417bc0e6fed39f05609ad7880000000000000000000000002279b7a0a67db372996a5fab50d91eaa73d2ebe60000000000000000000000009fe46736679d2d9a65f0992f2272de9f3c7fa6e00000000000000000000000000000000000000000000000000000000000278d0000000000000000000000000000000000000000000000000000000000000000c00000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001", - "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", - "proxyAddress": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", - "proxyAdminAddress": "0x1F708C24a0D3A740cD47cC0444E9480899f3dA7D", - "implementationAddress": "0xB7f8BC63BbcaD18155201308C8f3540b07f84F5e" - }, - "blockNumber": 13, - "address": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0" - }, - "MockComputeProvider": { - "blockNumber": 23, - "address": "0x59b670e9fA9D0A427751Af201D676719a970857b" - }, - "MockDecryptionVerifier": { - "blockNumber": 24, - "address": "0x4ed7c70F96B99c776995fB64377f0d4aB3B0e1C1" - }, - "MockE3Program": { - "blockNumber": 25, - "address": "0x322813Fd9A801c5507c9de605d63CEA4f2CE6c44" - }, - "MockRISC0Verifier": { - "address": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F" - }, - "HonkVerifier": { - "address": "0xc5a5C42992dECbae36851359345FE25997F5C42d" - }, - "CRISPProgram": { - "address": "0x67d269191c92Caf3cD7723F116c85e6E9bf55933", - "constructorArgs": { - "enclave": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", - "verifierAddress": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F", - "honkVerifierAddress": "0xc5a5C42992dECbae36851359345FE25997F5C42d", - "imageId": "0x23734b77b0f76e85623a88d7a82f24c34c94834f2501964ea123b7a2027013a2" - } - } } } \ No newline at end of file diff --git a/examples/CRISP/packages/crisp-contracts/hardhat.config.ts b/examples/CRISP/packages/crisp-contracts/hardhat.config.ts index f66e4fc52f..f2ae3a7493 100644 --- a/examples/CRISP/packages/crisp-contracts/hardhat.config.ts +++ b/examples/CRISP/packages/crisp-contracts/hardhat.config.ts @@ -65,13 +65,26 @@ const config: HardhatUserConfig = { plugins: [hardhatTypechainPlugin, hardhatEthersChaiMatchers, hardhatNetworkHelpers, hardhatToolboxMochaEthersPlugin, hardhatVerify], tasks: [cleanDeploymentsTask, ciphernodeAdd, ciphernodeAdminAdd, ciphernodeMintTokens], networks: { - localhost: { + default: { accounts: { mnemonic, }, chainId: chainIds.hardhat, type: 'edr-simulated', chainType: 'l1', + mining: { + auto: true, + interval: 1000, + }, + }, + localhost: { + accounts: { + mnemonic, + }, + chainId: chainIds.hardhat, + type: 'http', + url: 'http://localhost:8545', + timeout: 60000, }, ganache: { accounts: { diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index f7f15d9a95..aeaa446576 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -15,6 +15,7 @@ use crate::server::{ use alloy::providers::{Provider, ProviderBuilder}; use alloy::sol_types::{sol_data, SolType}; use alloy_primitives::{Address, U256}; +use e3_sdk::indexer::IndexerContext; use e3_sdk::{ bfv_helpers::decode_bytes_to_vec_u64, evm_helpers::{ @@ -31,8 +32,7 @@ use eyre::Context; use log::info; use num_bigint::BigUint; use std::error::Error; -use std::time::Duration; -use tokio::time::sleep; +use std::sync::Arc; type Result = std::result::Result>; @@ -199,70 +199,10 @@ pub async fn register_e3_activated( .set_current_round(CurrentRound { id: e3_id }) .await?; - // Calculate expiration time to sleep until - let now = get_current_timestamp_rpc().await?; - info!("[e3_id={}] Current time before sleep: {}", e3_id, now); - let wait_duration = if expiration > now { - let secs = expiration - now; - info!( - "[e3_id={}] Need to wait {} seconds until expiration", - e3_id, secs - ); - Duration::from_secs(secs) - } else { - info!("[e3_id={}] Expired E3", e3_id); - Duration::ZERO - }; - if !wait_duration.is_zero() { - sleep(wait_duration).await; - } - let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; - repo.update_status("Expired").await?; - - if repo.get_vote_count().await? > 0 { - info!("[e3_id={}] Starting computation for E3", e3_id); - repo.update_status("Computing").await?; - - let votes = repo.get_ciphertext_inputs().await?; - - let (id, status) = run_compute( - e3_id, - e3.e3_params, - votes, - format!("{}/state/add-result", CONFIG.enclave_server_url), - ) - .await - .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; - - if id != e3_id { - return Err(eyre::eyre!( - "Computation request returned unexpected E3 ID: expected {}, got {}", - e3_id, - id - ) - .into()); - } - - if status != "processing" { - return Err(eyre::eyre!( - "Computation request failed with status: {}", - status - ) - .into()); - } - - info!("[e3_id={}] Request Computation for E3", e3_id); - - repo.update_status("PublishingCiphertext").await?; - } else { - info!( - "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", - e3_id - ); - repo.update_status("Finished").await?; - } - info!("[e3_id={}] E3 request handled successfully.", e3_id); - + info!("[e3_id={}] Registering hook for {}", e3_id, expiration); + ctx.do_later(expiration, move |_, ctx| { + handle_e3_input_deadline_expiration(e3_id, ctx.store()) + }); Ok(()) } }) @@ -270,6 +210,58 @@ pub async fn register_e3_activated( Ok(indexer) } +async fn handle_e3_input_deadline_expiration( + e3_id: u64, + store: SharedStore, +) -> eyre::Result<()> { + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; + + repo.update_status("Expired").await?; + + if repo.get_vote_count().await? > 0 { + info!("[e3_id={}] Starting computation for E3", e3_id); + repo.update_status("Computing").await?; + + let votes = repo.get_ciphertext_inputs().await?; + + let (id, status) = run_compute( + e3_id, + e3.e3_params, + votes, + format!("{}/state/add-result", CONFIG.enclave_server_url), + ) + .await + .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; + + if id != e3_id { + return Err(eyre::eyre!( + "Computation request returned unexpected E3 ID: expected {}, got {}", + e3_id, + id + ) + .into()); + } + + if status != "processing" { + return Err(eyre::eyre!("Computation request failed with status: {}", status).into()); + } + + info!("[e3_id={}] Request Computation for E3", e3_id); + + repo.update_status("PublishingCiphertext").await?; + } else { + info!( + "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", + e3_id + ); + repo.update_status("Finished").await?; + } + info!("[e3_id={}] E3 request handled successfully.", e3_id); + + Ok(()) +} + pub async fn register_ciphertext_output_published( indexer: EnclaveIndexer, ) -> Result> { @@ -357,31 +349,12 @@ pub async fn register_committee_published( let now = get_current_timestamp_rpc().await?; info!("[e3_id={}] Current time: {}", event.e3Id, now); - // Calculate wait duration - let wait_duration = if start_time > now { - let secs = start_time - now; - info!( - "[e3_id={}] Need to wait {} seconds until activation", - event.e3Id, secs - ); - Duration::from_secs(secs) - } else { - info!("[e3_id={}] Activating E3", event.e3Id); - Duration::ZERO - }; - info!("[e3_id={}] Wait duration: {:?}", event.e3Id, wait_duration); + let later_event = event.clone(); + ctx.do_later(start_time, move |_, ctx| { + let event = later_event.clone(); + handle_committee_time_expired(event, ctx) + }); - // Sleep until start time - if !wait_duration.is_zero() { - sleep(wait_duration).await; - } - - // If not activated activate - let tx = contract.activate(event.e3Id, event.publicKey).await?; - info!( - "[e3_id={}] E3 activated with tx: {:?}", - event.e3Id, tx.transaction_hash - ); Ok(()) } }) @@ -389,6 +362,19 @@ pub async fn register_committee_published( Ok(indexer) } +async fn handle_committee_time_expired( + event: CommitteePublished, + ctx: Arc>, +) -> eyre::Result<()> { + // If not activated activate + let tx = ctx.contract().activate(event.e3Id, event.publicKey).await?; + info!( + "[e3_id={}] E3 activated with tx: {:?}", + event.e3Id, tx.transaction_hash + ); + Ok(()) +} + pub async fn get_current_timestamp_rpc() -> eyre::Result { let provider = ProviderBuilder::new().connect(&CONFIG.http_rpc_url).await?; let block = provider @@ -425,7 +411,7 @@ pub async fn register_input_published( } pub async fn start_indexer( - ws_url: &str, + url: &str, contract_address: &str, registry_address: &str, crisp_address: &str, @@ -434,7 +420,7 @@ pub async fn start_indexer( ) -> Result<()> { info!("CRISP: Creating indexer..."); let crisp_indexer = EnclaveIndexer::new_with_write_contract( - ws_url, + url, &[contract_address, registry_address, crisp_address], store, private_key, diff --git a/examples/CRISP/test/crisp.spec.ts b/examples/CRISP/test/crisp.spec.ts index 1d0faa287f..601c278787 100644 --- a/examples/CRISP/test/crisp.spec.ts +++ b/examples/CRISP/test/crisp.spec.ts @@ -84,7 +84,8 @@ test('CRISP smoke test', async ({ context, page, metamaskPage, extensionId }) => const e3id = await runCliInit() log(`Got e3 id: ${e3id}`) - await page.goto('/') + await page.goto('/', { waitUntil: 'domcontentloaded' }) + await page.waitForLoadState('load') log(`ensureHomePageLoaded...`) await ensureHomePageLoaded(page)