diff --git a/Cargo.lock b/Cargo.lock index 92202148e4..d08c19a95f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2907,6 +2907,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] @@ -2958,6 +2959,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/crates/evm-helpers/Cargo.toml b/crates/evm-helpers/Cargo.toml index f5a5848237..8470dcf60c 100644 --- a/crates/evm-helpers/Cargo.toml +++ b/crates/evm-helpers/Cargo.toml @@ -14,3 +14,4 @@ futures.workspace = true futures-util.workspace = true once_cell.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/evm-helpers/src/contracts.rs b/crates/evm-helpers/src/contracts.rs index e56eddd510..2fae264586 100644 --- a/crates/evm-helpers/src/contracts.rs +++ b/crates/evm-helpers/src/contracts.rs @@ -164,8 +164,8 @@ pub trait EnclaveWrite { } /// Generic type to represent different provider types -pub trait ProviderType: Send { - type Provider: Provider + Send + Sync + 'static; +pub trait ProviderType: Clone + Send + Sync + 'static { + type Provider: Provider + Clone + Send + Sync + 'static; } /// Marker type for read-only provider @@ -190,6 +190,15 @@ pub struct EnclaveContract { _marker: PhantomData, } +impl EnclaveContract { + pub fn address(&self) -> &Address { + &self.contract_address + } + pub fn get_provider(&self) -> Arc { + self.provider.clone() + } +} + impl EnclaveContract { pub async fn new( http_rpc_url: &str, @@ -198,14 +207,6 @@ impl EnclaveContract { ) -> Result> { EnclaveContractFactory::create_write(http_rpc_url, contract_address, private_key).await } - - pub fn get_provider(&self) -> Arc { - self.provider.clone() - } - - pub fn address(&self) -> &Address { - &self.contract_address - } } impl EnclaveContract { @@ -215,14 +216,6 @@ impl EnclaveContract { ) -> Result> { EnclaveContractFactory::create_read(http_rpc_url, contract_address).await } - - pub fn get_provider(&self) -> Arc { - self.provider.clone() - } - - pub fn address(&self) -> &Address { - &self.contract_address - } } /// Type alias for read-only provider diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index ccb00b1620..76e2d97d9e 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -15,7 +15,8 @@ use eyre::Result; use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; -use tokio::{sync::RwLock, task::JoinHandle}; +use tokio::sync::RwLock; +use tracing::info; type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -36,7 +37,7 @@ impl EventListener { } } - pub async fn add_event_handler(&mut self, handler: F) + pub async fn add_event_handler(&self, handler: F) where E: SolEvent + Send + Clone + 'static, F: Fn(E) -> Fut + Send + Sync + 'static, @@ -63,7 +64,7 @@ impl EventListener { .push(wrapped_handler); } - async fn listen(&self) -> Result<()> { + pub async fn listen(&self) -> Result<()> { let mut stream = self .provider .subscribe_logs(&self.filter) @@ -89,14 +90,14 @@ impl EventListener { Ok(()) } - pub fn start(&self) -> JoinHandle> { - let this = self.clone(); - tokio::spawn(async move { this.listen().await }) - } - - pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result { + /// Create a contract listener that will listen to events from all addresses. + pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); - let address = contract_address.parse::
()?; + + let address = addresses + .iter() + .map(|a| a.parse::
().map_err(|e| eyre::eyre!("{e}"))) + .collect::>>()?; let filter = Filter::new() .address(address) .from_block(BlockNumberOrTag::Latest); diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index dc2c4d5c48..34077de664 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -9,7 +9,10 @@ use alloy::sol; use e3_evm_helpers::listener::EventListener; use eyre::Result; use helpers::setup_logs_contract; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use tokio::time::sleep; sol!( @@ -25,11 +28,13 @@ async fn test_event_listener() -> Result<()> { let (tx, mut rx) = tokio::sync::mpsc::channel::(10); let (tx_addr, mut rx_addr) = tokio::sync::mpsc::channel::(10); - let mut event_listener = EventListener::create_contract_listener( - &anvil.ws_endpoint(), - &contract.address().to_string(), - ) - .await?; + let event_listener = Arc::new( + EventListener::create_contract_listener( + &anvil.ws_endpoint(), + &[&contract.address().to_string()], + ) + .await?, + ); event_listener .add_event_handler(move |event: EmitLogs::ValueChanged| { @@ -51,7 +56,8 @@ async fn test_event_listener() -> Result<()> { }) .await; - event_listener.start(); + let spawn_event_listener = event_listener.clone(); + let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); contract .setValue("hello".to_string()) @@ -104,11 +110,13 @@ async fn test_overlapping_listener_handlers() -> Result<()> { let (contract, _, _, anvil) = setup_logs_contract().await?; let (tx, mut rx) = tokio::sync::mpsc::channel::(10); - let mut event_listener = EventListener::create_contract_listener( - &anvil.ws_endpoint(), - &contract.address().to_string(), - ) - .await?; + let event_listener = Arc::new( + EventListener::create_contract_listener( + &anvil.ws_endpoint(), + &[&contract.address().to_string()], + ) + .await?, + ); let tx1 = tx.clone(); event_listener @@ -140,7 +148,8 @@ async fn test_overlapping_listener_handlers() -> Result<()> { }) .await; - event_listener.start(); + let spawn_event_listener = event_listener.clone(); + let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); // Events should be returned roughly in this order: // 0ms : one diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index 4e83410aa6..d1dbfca2df 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -15,3 +15,4 @@ eyre.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 953fcff33e..e17b755133 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -13,7 +13,9 @@ use alloy::providers::Provider; use alloy::sol_types::SolEvent; use async_trait::async_trait; use e3_evm_helpers::{ - contracts::{EnclaveContract, EnclaveContractFactory, EnclaveRead, ReadOnly}, + contracts::{ + EnclaveContract, EnclaveContractFactory, EnclaveRead, ProviderType, ReadOnly, ReadWrite, + }, events::{CiphertextOutputPublished, E3Activated, PlaintextOutputPublished}, listener::EventListener, }; @@ -24,7 +26,7 @@ use std::future::Future; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; -use tokio::task::JoinHandle; +use tracing::info; type E3Id = u64; @@ -143,208 +145,271 @@ impl DataStore for SharedStore { } #[derive(Clone)] -pub struct EnclaveIndexer { +pub struct EnclaveIndexer { + ctx: Arc>, +} + +impl Drop for EnclaveIndexer { + fn drop(&mut self) { + info!("EnclaveIndexer is DROPPED"); + } +} + +pub struct IndexerContext { + store: SharedStore, listener: EventListener, - contract: EnclaveContract, - store: Arc>, + contract: EnclaveContract, contract_address: String, chain_id: u64, } -impl EnclaveIndexer { +impl IndexerContext { + pub fn store(&self) -> SharedStore { + self.store.clone() + } + + pub fn listener(&self) -> EventListener { + self.listener.clone() + } + pub fn contract(&self) -> EnclaveContract { + self.contract.clone() + } + pub fn enclave_address(&self) -> String { + self.contract_address.clone() + } + + pub fn chain_id(&self) -> u64 { + self.chain_id + } +} + +impl EnclaveIndexer { pub async fn new_with_in_mem_store( listener: EventListener, - contract: EnclaveContract, - ) -> Result> { + contract: EnclaveContract, + ) -> Result> { let store = InMemoryStore::new(); EnclaveIndexer::new(listener, contract, store).await } +} + +impl EnclaveIndexer { + /// Creates an `EnclaveIndexer` with an in-memory store. + /// + /// Note: `addresses[0]` must be the enclave contract address. + pub async fn from_endpoint_address_in_mem(ws_url: &str, addresses: &[&str]) -> Result { + let listener = EventListener::create_contract_listener(ws_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; + EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + } - pub async fn from_endpoint_address_in_mem( + /// Creates an `EnclaveIndexer` with a provided in-memory store. + /// + /// Note: `addresses[0]` must be the enclave contract address. + pub async fn from_endpoint_address( ws_url: &str, - contract_address: &str, - ) -> Result> { - let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + addresses: &[&str], + store: InMemoryStore, + ) -> Result { + let listener = EventListener::create_contract_listener(ws_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; + EnclaveIndexer::new(listener, contract, store).await } } -impl EnclaveIndexer { +impl EnclaveIndexer { + /// Creates a new EnclaveIndexer with a writeable contract. + pub async fn new_with_write_contract( + ws_url: &str, + addresses: &[&str], // First address must be contract_address + store: S, + private_key: &str, + ) -> Result { + let Some(contract_address) = addresses.first() else { + return Err(eyre::eyre!("No addresses provided")); + }; + EnclaveIndexer::new( + EventListener::create_contract_listener(ws_url, addresses).await?, + EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?, + store, + ) + .await + } +} + +impl EnclaveIndexer { pub async fn new( listener: EventListener, - contract: EnclaveContract, + contract: EnclaveContract, store: S, ) -> Result { let chain_id = contract.provider.get_chain_id().await?; let contract_address = contract.address().to_string(); let mut instance = Self { - store: Arc::new(RwLock::new(store)), - contract, - listener, - contract_address, - chain_id, + ctx: Arc::new(IndexerContext { + store: SharedStore::new(Arc::new(RwLock::new(store))), + contract, + listener, + contract_address, + chain_id, + }), }; instance.setup_listeners().await?; + info!("EnclaveIndexer has been configured"); Ok(instance) } - pub async fn from_endpoint_address( - ws_url: &str, - contract_address: &str, - store: S, - ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - EnclaveIndexer::new(listener, contract, store).await - } - - pub async fn add_event_handler(&mut self, handler: F) + /// Listen for contract events from all contracts. + /// Callback will provide the event and a context object. + pub async fn add_event_handler(&self, handler: F) where E: SolEvent + Send + Clone + 'static, - F: Fn(E, SharedStore) -> Fut + Send + Sync + 'static, + F: Fn(E, Arc>) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - let store = SharedStore::new(self.store.clone()); let handler = Arc::new(handler); - self.listener + let ctx = self.ctx.clone(); + // In order to avoid a memory leak we create a weak reference here + let ctx_weak = Arc::downgrade(&ctx); + + self.ctx + .listener .add_event_handler(move |e: E| { let handler = Arc::clone(&handler); - let store = store.clone(); - async move { handler(e, store).await } - }) - .await; - } - - async fn register_e3_activated(&mut self) -> Result<()> { - let db = self.store.clone(); - let contract = self.contract.clone(); - let chain_id = self.chain_id; - let enclave_address = self.contract_address.clone(); - self.listener - .add_event_handler(move |e: E3Activated| { - let db = SharedStore::new(db.clone()); - let enclave_address = enclave_address.clone(); - let contract = contract.clone(); + let ctx_weak = ctx_weak.clone(); async move { - println!( - "E3Activated: id={}, expiration={}, pubkey=0x{}...", - e.e3Id, - e.expiration, - hex::encode(&e.committeePublicKey[..8.min(e.committeePublicKey.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - let e3 = contract.get_e3(e.e3Id).await?; - let duration = u64_try_from(e3.duration)?; - let expiration = u64_try_from(e.expiration)?; - let seed = e3.seed.to_be_bytes(); - let request_block = u64_try_from(e3.requestBlock)?; - let start_window = [ - u64_try_from(e3.startWindow[0])?, - u64_try_from(e3.startWindow[1])?, - ]; - // NOTE: we are only saving protocol specific info - // here and not CRISP specific info so E3 corresponds to the solidity E3 - let e3_obj = E3 { - chain_id, - ciphertext_inputs: vec![], - ciphertext_output: vec![], - committee_public_key: e.committeePublicKey.to_vec(), - duration, - custom_params: e3.customParams.to_vec(), - e3_params: e3.e3ProgramParams.to_vec(), - enclave_address, - encryption_scheme_id: e3.encryptionSchemeId.to_vec(), - expiration, - id: e3_id, - plaintext_output: vec![], - request_block, - seed, - start_window, - threshold: e3.threshold, - }; - - let mut repo = E3Repository::new(db, e3_id); - - repo.set_e3(e3_obj).await?; - Ok(()) + // We check the weak reference if it can be upgraded + // if not it must have been destroyed + if let Some(ctx) = ctx_weak.upgrade() { + handler(e, ctx).await + } else { + println!("Context was dropped!"); + Ok(()) + } } }) .await; + } + + async fn register_e3_activated(&mut self) -> Result<()> { + self.add_event_handler(move |e: E3Activated, ctx| { + async move { + let contract = ctx.contract(); + let db = ctx.store(); + let enclave_address = ctx.enclave_address(); + println!( + "E3Activated: id={}, expiration={}, pubkey=0x{}...", + e.e3Id, + e.expiration, + hex::encode(&e.committeePublicKey[..8.min(e.committeePublicKey.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + let e3 = contract.get_e3(e.e3Id).await?; + let duration = u64_try_from(e3.duration)?; + let expiration = u64_try_from(e.expiration)?; + let seed = e3.seed.to_be_bytes(); + let request_block = u64_try_from(e3.requestBlock)?; + let start_window = [ + u64_try_from(e3.startWindow[0])?, + u64_try_from(e3.startWindow[1])?, + ]; + // NOTE: we are only saving protocol specific info + // here and not CRISP specific info so E3 corresponds to the solidity E3 + let e3_obj = E3 { + chain_id: ctx.chain_id(), + ciphertext_inputs: vec![], + ciphertext_output: vec![], + committee_public_key: e.committeePublicKey.to_vec(), + duration, + custom_params: e3.customParams.to_vec(), + e3_params: e3.e3ProgramParams.to_vec(), + enclave_address, + encryption_scheme_id: e3.encryptionSchemeId.to_vec(), + expiration, + id: e3_id, + plaintext_output: vec![], + request_block, + seed, + start_window, + threshold: e3.threshold, + }; + + let mut repo = E3Repository::new(db, e3_id); + + repo.set_e3(e3_obj).await?; + Ok(()) + } + }) + .await; Ok(()) } async fn register_ciphertext_output_published(&mut self) -> Result<()> { - let store = self.store.clone(); - self.listener - .add_event_handler(move |e: CiphertextOutputPublished| { - let store = SharedStore::new(store.clone()); - async move { - println!( - "CiphertextOutputPublished: e3_id={}, output=0x{}...", - e.e3Id, - hex::encode(&e.ciphertextOutput[..8.min(e.ciphertextOutput.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - - let mut repo = E3Repository::new(store, e3_id); - repo.set_ciphertext_output(e.ciphertextOutput.to_vec()) - .await?; - - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: CiphertextOutputPublished, ctx| async move { + let store = ctx.store(); + println!( + "CiphertextOutputPublished: e3_id={}, output=0x{}...", + e.e3Id, + hex::encode(&e.ciphertextOutput[..8.min(e.ciphertextOutput.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + + let mut repo = E3Repository::new(store, e3_id); + repo.set_ciphertext_output(e.ciphertextOutput.to_vec()) + .await?; + + Ok(()) + }) + .await; Ok(()) } async fn register_plaintext_output_published(&mut self) -> Result<()> { - let store = self.store.clone(); - self.listener - .add_event_handler(move |e: PlaintextOutputPublished| { - let store = SharedStore::new(store.clone()); - async move { - println!( - "PlaintextOutputPublished: e3_id={}, output=0x{}...", - e.e3Id, - hex::encode(&e.plaintextOutput[..8.min(e.plaintextOutput.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - let mut repo = E3Repository::new(store, e3_id); - repo.set_plaintext_output(e.plaintextOutput.to_vec()) - .await?; - - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: PlaintextOutputPublished, ctx| async move { + let store = ctx.store(); + println!( + "PlaintextOutputPublished: e3_id={}, output=0x{}...", + e.e3Id, + hex::encode(&e.plaintextOutput[..8.min(e.plaintextOutput.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + let mut repo = E3Repository::new(store, e3_id); + repo.set_plaintext_output(e.plaintextOutput.to_vec()) + .await?; + + Ok(()) + }) + .await; Ok(()) } async fn setup_listeners(&mut self) -> Result<()> { + info!("Setting up listeners for EnclaveIndexer..."); self.register_e3_activated().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; + info!("Listeners have been setup!"); Ok(()) } - pub fn start(&self) -> JoinHandle> { - self.listener.start() + pub async fn listen(&self) -> Result<()> { + info!("Starting EnclaveIndexer listening..."); + self.ctx.listener.listen().await } pub async fn get_e3(&self, e3_id: u64) -> Result { - let (e3, _) = get_e3(self.store.clone(), e3_id).await?; + let (e3, _) = get_e3(self.ctx.store.inner.clone(), e3_id).await?; Ok(e3) } pub fn get_listener(&self) -> EventListener { - self.listener.clone() + self.ctx.listener.clone() } pub fn get_store(&self) -> SharedStore { - SharedStore::new(self.store.clone()) + self.ctx.store.clone() } } diff --git a/crates/indexer/tests/helpers.rs b/crates/indexer/tests/helpers.rs index 8e24dfaa6f..43dfdb1d7b 100644 --- a/crates/indexer/tests/helpers.rs +++ b/crates/indexer/tests/helpers.rs @@ -4,6 +4,8 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use std::sync::Arc; + // helpers.rs use alloy::{ network::Ethereum, @@ -40,6 +42,23 @@ pub async fn setup_fake_enclave() -> Result<( Ok((contract, address, endpoint, anvil)) } +pub async fn setup_two_contracts() -> Result<( + EnclaveInstance, + String, + EmitLogsInstance, + String, + String, + AnvilInstance, +)> { + let (provider, endpoint, anvil) = setup_provider().await?; + let provider = Arc::new(provider); + let contract1 = Enclave::deploy(provider.clone()).await?; + let contract2 = EmitLogsInstance::deploy(provider.clone()).await?; + let address1 = contract1.address().to_string(); + let address2 = contract2.address().to_string(); + Ok((contract1, address1, contract2, address2, endpoint, anvil)) +} + pub async fn setup_provider() -> Result<(impl Provider, String, AnvilInstance)> { // Set anvil with fast blocktimes for testing let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?; diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 535bcaed44..dc63325587 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -9,11 +9,16 @@ use alloy::{ primitives::{Bytes, Uint}, sol, }; +use e3_evm_helpers::contracts::ReadOnly; use e3_indexer::{DataStore, EnclaveIndexer, InMemoryStore}; use eyre::Result; -use helpers::setup_fake_enclave; -use std::time::Duration; +use helpers::setup_two_contracts; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::time::sleep; +use EmitLogs::PublishMessage; use Enclave::InputPublished; sol!( @@ -22,58 +27,246 @@ sol!( "tests/fixtures/fake_enclave.json" ); +sol!( + #[sol(rpc)] + EmitLogs, + "tests/fixtures/emit_logs.json" +); + #[tokio::test] async fn test_indexer() -> Result<()> { - let (contract, address, endpoint, _anvil) = setup_fake_enclave().await?; - let address = address.to_string(); - let endpoint = endpoint.to_string(); + const E3_ID: u64 = 10; + const THRESHOLD: u64 = 10; + const INDEXER_DELAY_MS: u64 = 10; + + let ( + enclave_contract, + enclave_address, + emit_logs_contract, + emit_logs_address, + endpoint, + _anvil, + ) = setup_two_contracts().await?; - let mut indexer = - EnclaveIndexer::::from_endpoint_address_in_mem(&endpoint, &address).await?; + let indexer = Arc::new( + EnclaveIndexer::::from_endpoint_address_in_mem( + &endpoint.to_string(), + &[&enclave_address.to_string(), &emit_logs_address.to_string()], + ) + .await?, + ); + // Track InputPublished event count in store indexer - .add_event_handler(move |_: InputPublished, mut store| async move { + .add_event_handler(move |_: InputPublished, ctx| async move { + let mut store = ctx.store(); store .modify("input_count", |counter: Option| { Some(counter.map_or(1, |c| c + 1)) }) .await?; - Ok(()) }) .await; - // Start tracking state - let _ = indexer.start(); + // Collect PublishMessage events + let captured_messages: Arc>> = Arc::new(Mutex::new(vec![])); + let captured_messages_for_handler = captured_messages.clone(); - // E3Activated - let e3_id = 10; + indexer + .add_event_handler(move |msg: PublishMessage, _ctx| { + // Collect message + let messages = captured_messages_for_handler.clone(); + async move { + messages.lock().unwrap().push(msg.value); + Ok(()) + } + }) + .await; + + let indexer_listening = indexer.clone(); + let _ = tokio::spawn(async move { indexer_listening.listen().await }); - let pubkey = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; - contract + let public_key = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + let input_data = "Random data that wont actually be a string".to_string(); + let input_data_bytes = Bytes::from(input_data.clone().into_bytes()); + let ciphertext_output_data = vec![9, 8, 7, 6, 5, 4, 3, 2, 1]; + + enclave_contract .emitE3Activated( - Uint::from(e3_id), - Uint::from(10), - Bytes::from(pubkey.clone()), + Uint::from(E3_ID), + Uint::from(THRESHOLD), + Bytes::from(public_key.clone()), + ) + .send() + .await? + .watch() + .await?; + + enclave_contract + .emitInputPublished( + Uint::from(E3_ID), + input_data_bytes.clone(), + Uint::from(1111), + Uint::from(1), + ) + .send() + .await? + .watch() + .await?; + + // Sending message from logs contract which indexer is listening to + emit_logs_contract + .emitPublishMessage("Hello from contract2!".into()) + .send() + .await? + .watch() + .await?; + + enclave_contract + .emitInputPublished( + Uint::from(E3_ID), + input_data_bytes.clone(), + Uint::from(2222), + Uint::from(2), + ) + .send() + .await? + .watch() + .await?; + + enclave_contract + .emitInputPublished( + Uint::from(E3_ID), + input_data_bytes.clone(), + Uint::from(3333), + Uint::from(3), ) .send() .await? .watch() .await?; - let ciphertext_output = vec![9, 8, 7, 6, 5, 4, 3, 2, 1]; - contract - .emitCiphertextOutputPublished(Uint::from(e3_id), Bytes::from(ciphertext_output.clone())) + sleep(Duration::from_millis(INDEXER_DELAY_MS)).await; + + let messages_from_second_contract = captured_messages.lock().unwrap(); + assert_eq!( + messages_from_second_contract + .iter() + .cloned() + .collect::>(), + vec!["Hello from contract2!".to_string()] + ); + drop(messages_from_second_contract); + + enclave_contract + .emitCiphertextOutputPublished( + Uint::from(E3_ID), + Bytes::from(ciphertext_output_data.clone()), + ) .send() .await? .watch() .await?; - sleep(Duration::from_millis(10)).await; + sleep(Duration::from_millis(INDEXER_DELAY_MS)).await; - let e3 = indexer.get_e3(e3_id).await?; + let e3_state_after_output = indexer.get_e3(E3_ID).await?; + assert_eq!( + e3_state_after_output.ciphertext_output, + ciphertext_output_data + ); - assert_eq!(e3.ciphertext_output, ciphertext_output); + let store = indexer.get_store(); + let total_inputs_processed = store.get::("input_count").await?.unwrap(); + assert_eq!(total_inputs_processed, 3); Ok(()) } + +mod test_memory_leak { + + use e3_evm_helpers::{contracts::EnclaveContractFactory, listener::EventListener}; + + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + static CREATE_COUNT: AtomicUsize = AtomicUsize::new(0); + + #[derive(Clone)] + struct LeakDetector(Arc); + + #[derive(Debug)] + struct DropCounter; + + impl Drop for DropCounter { + fn drop(&mut self) { + DROP_COUNT.fetch_add(1, Ordering::SeqCst); + } + } + + impl LeakDetector { + fn new() -> Self { + CREATE_COUNT.fetch_add(1, Ordering::SeqCst); + Self(Arc::new(DropCounter)) + } + } + + async fn create_indexer() -> Result> { + let (_, enclave_address, _, _, endpoint, _anvil) = setup_two_contracts().await?; + + let listener = + EventListener::create_contract_listener(&endpoint, &[&enclave_address]).await?; + let contract = EnclaveContractFactory::create_read(&endpoint, &enclave_address).await?; + + EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + } + + #[tokio::test] + async fn test_memory_leak() -> Result<()> { + sol! { + #[derive(Debug)] + event TestEvent(); + + } + + DROP_COUNT.store(0, Ordering::SeqCst); + CREATE_COUNT.store(0, Ordering::SeqCst); + + { + // Add an event handler that captures context + let indexer = create_indexer().await?; + let detector = LeakDetector::new(); + + indexer + .add_event_handler(move |event: TestEvent, _ctx| { + // This closure captures a ref to detector + let _captured = detector.clone(); + println!("{:?}", _captured.0); + async move { + println!("Event received: {:?}", event); + Ok(()) + } + }) + .await; + } + + // Delay to ensure everything is dropped. + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let created = CREATE_COUNT.load(Ordering::SeqCst); + let dropped = DROP_COUNT.load(Ordering::SeqCst); + + println!("Created: {}, Dropped: {}", created, dropped); + + // If the handler was dropped then the detector will be dropped too + assert_eq!( + created, dropped, + "Memory leak detected! Created {} objects but only dropped {}", + created, dropped + ); + + Ok(()) + } +} diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 8d0f5ff0ba..0df54a6e5d 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2299,6 +2299,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] @@ -2313,6 +2314,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/examples/CRISP/packages/crisp-contracts/README.md b/examples/CRISP/packages/crisp-contracts/README.md index 5afed88e19..a3f1160e03 100644 --- a/examples/CRISP/packages/crisp-contracts/README.md +++ b/examples/CRISP/packages/crisp-contracts/README.md @@ -3,7 +3,8 @@ This directory contains the Solidity contracts for CRISP - Coercion-Resistant Impartial Selection Protocol. -Contracts are built and tested with [Hardhat](https://hardhat.org). Tests are defined in the `test` directory. +Contracts are built and tested with [Hardhat](https://hardhat.org). Tests are defined in the `test` +directory. ## Running Tests diff --git a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json index 0b5a15cb3e..3c2f539839 100644 --- a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json +++ b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json @@ -109,5 +109,128 @@ "MockRISC0Verifier": { "address": "0xa85233C63b9Ee964Add6F2cffe00Fd84eb32338f" } + }, + "localhost": { + "PoseidonT3": { + "blockNumber": 3, + "address": "0x3333333C0A88F9BE4fd23ed0536F9B6c427e3B93" + }, + "MockUSDC": { + "constructorArgs": { + "initialSupply": "1000000" + }, + "blockNumber": 4, + "address": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" + }, + "EnclaveToken": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" + }, + "blockNumber": 5, + "address": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" + }, + "EnclaveTicketToken": { + "constructorArgs": { + "baseToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", + "registry": "0x0000000000000000000000000000000000000001", + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" + }, + "blockNumber": 7, + "address": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707" + }, + "SlashingManager": { + "constructorArgs": { + "admin": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "bondingRegistry": "0x0000000000000000000000000000000000000001" + }, + "blockNumber": 8, + "address": "0x0165878A594ca255338adfa4d48449f69242Eb8F" + }, + "BondingRegistry": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "ticketToken": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707", + "licenseToken": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9", + "registry": "0x0000000000000000000000000000000000000001", + "slashedFundsTreasury": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "ticketPrice": "10000000", + "licenseRequiredBond": "100000000000000000000", + "minTicketBalance": "1", + "exitDelay": "604800" + }, + "proxyRecords": { + "initData": "0x7333fa82000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000005fc8d32690cc91d4c39d9d3abcbd16989f875707000000000000000000000000cf7ed3acca5a467e9e704c703e8d87f634fb0fc90000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb9226600000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000056bc75e2d6310000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000093a80", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + "proxyAdminAddress": "0x94099942864EA81cCF197E9D71ac53310b1468D8", + "implementationAddress": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853" + }, + "blockNumber": 8, + "address": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" + }, + "CiphernodeRegistryOwnable": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "enclaveAddress": "0x0000000000000000000000000000000000000001", + "submissionWindow": "10" + }, + "proxyRecords": { + "initData": "0x1794bb3c000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000a", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", + "proxyAdminAddress": "0x6F1216D1BFe15c98520CA1434FC1d9D57AC95321", + "implementationAddress": "0x8A791620dd6260079BF849Dc5567aDC3F2FdC318" + }, + "blockNumber": 11, + "address": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788" + }, + "Enclave": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "registry": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", + "bondingRegistry": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + "feeToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", + "maxDuration": "2592000", + "params": [ + "0x000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001" + ] + }, + "proxyRecords": { + "initData": "0xefe0308b000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266000000000000000000000000610178da211fef7d417bc0e6fed39f05609ad7880000000000000000000000002279b7a0a67db372996a5fab50d91eaa73d2ebe60000000000000000000000009fe46736679d2d9a65f0992f2272de9f3c7fa6e00000000000000000000000000000000000000000000000000000000000278d0000000000000000000000000000000000000000000000000000000000000000c00000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", + "proxyAdminAddress": "0x1F708C24a0D3A740cD47cC0444E9480899f3dA7D", + "implementationAddress": "0xB7f8BC63BbcaD18155201308C8f3540b07f84F5e" + }, + "blockNumber": 13, + "address": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0" + }, + "MockComputeProvider": { + "blockNumber": 23, + "address": "0x59b670e9fA9D0A427751Af201D676719a970857b" + }, + "MockDecryptionVerifier": { + "blockNumber": 24, + "address": "0x4ed7c70F96B99c776995fB64377f0d4aB3B0e1C1" + }, + "MockE3Program": { + "blockNumber": 25, + "address": "0x322813Fd9A801c5507c9de605d63CEA4f2CE6c44" + }, + "MockRISC0Verifier": { + "address": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F" + }, + "HonkVerifier": { + "address": "0xc5a5C42992dECbae36851359345FE25997F5C42d" + }, + "CRISPProgram": { + "address": "0x67d269191c92Caf3cD7723F116c85e6E9bf55933", + "constructorArgs": { + "enclave": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", + "verifierAddress": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F", + "honkVerifierAddress": "0xc5a5C42992dECbae36851359345FE25997F5C42d", + "imageId": "0x23734b77b0f76e85623a88d7a82f24c34c94834f2501964ea123b7a2027013a2" + } + } } } \ No newline at end of file diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 93f0828e93..f7f15d9a95 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -18,14 +18,11 @@ use alloy_primitives::{Address, U256}; use e3_sdk::{ bfv_helpers::decode_bytes_to_vec_u64, evm_helpers::{ - contracts::{ - EnclaveContract, EnclaveContractFactory, EnclaveRead, EnclaveWrite, ReadWrite, - }, + contracts::{EnclaveRead, EnclaveWrite, ReadWrite}, events::{ CiphertextOutputPublished, CommitteePublished, E3Activated, E3Requested, PlaintextOutputPublished, }, - listener::EventListener, }, indexer::{DataStore, EnclaveIndexer, SharedStore}, }; @@ -40,11 +37,12 @@ use tokio::time::sleep; type Result = std::result::Result>; pub async fn register_e3_requested( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // E3Requested indexer - .add_event_handler(move |event: E3Requested, store| { + .add_event_handler(move |event: E3Requested, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); @@ -182,11 +180,12 @@ pub async fn register_e3_requested( } pub async fn register_e3_activated( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // E3Activated indexer - .add_event_handler(move |event: E3Activated, store| { + .add_event_handler(move |event: E3Activated, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); let mut current_round_repo = CurrentRoundRepository::new(store); @@ -272,11 +271,12 @@ pub async fn register_e3_activated( } pub async fn register_ciphertext_output_published( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // CiphertextOutputPublished indexer - .add_event_handler(move |event: CiphertextOutputPublished, store| { + .add_event_handler(move |event: CiphertextOutputPublished, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store, e3_id); async move { @@ -290,11 +290,12 @@ pub async fn register_ciphertext_output_published( } pub async fn register_plaintext_output_published( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // PlaintextOutputPublished indexer - .add_event_handler(move |event: PlaintextOutputPublished, store| { + .add_event_handler(move |event: PlaintextOutputPublished, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store, e3_id); async move { @@ -332,14 +333,13 @@ pub async fn register_plaintext_output_published( } pub async fn register_committee_published( - mut listener: EventListener, - contract: EnclaveContract, -) -> Result { + indexer: EnclaveIndexer, +) -> Result> { // CommitteePublished - listener - .add_event_handler(move |event: CommitteePublished| { - let contract = contract.clone(); + indexer + .add_event_handler(move |event: CommitteePublished, ctx| { async move { + let contract = ctx.contract(); // We need to do this to ensure this is idempotent. // TODO: conserve bandwidth and check for E3AlreadyActivated error instead of // making two calls to contract @@ -386,7 +386,7 @@ pub async fn register_committee_published( } }) .await; - Ok(listener) + Ok(indexer) } pub async fn get_current_timestamp_rpc() -> eyre::Result { @@ -400,13 +400,12 @@ pub async fn get_current_timestamp_rpc() -> eyre::Result { } pub async fn register_input_published( - mut listener: EventListener, - store: SharedStore -) -> Result { - listener - .add_event_handler(move |event: InputPublished| { - + indexer: EnclaveIndexer, +) -> Result> { + indexer + .add_event_handler(move |event: InputPublished, ctx| { let e3_id = event.e3Id.to::(); + let store = ctx.store(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); async move { println!( @@ -422,7 +421,7 @@ pub async fn register_input_published( } }) .await; - Ok(listener) + Ok(indexer) } pub async fn start_indexer( @@ -433,34 +432,24 @@ pub async fn start_indexer( store: SharedStore, private_key: &str, ) -> Result<()> { - let readonly_contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; + info!("CRISP: Creating indexer..."); + let crisp_indexer = EnclaveIndexer::new_with_write_contract( + ws_url, + &[contract_address, registry_address, crisp_address], + store, + private_key, + ) + .await?; + info!("CRISP: Indexer registering handlers..."); - let readwrite_contract = - EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; - - let enclave_contract_listener = - EventListener::create_contract_listener(ws_url, contract_address).await?; - - // CRISP indexer - let crisp_indexer = - EnclaveIndexer::new(enclave_contract_listener, readonly_contract, store.clone()).await?; let crisp_indexer = register_e3_requested(crisp_indexer).await?; let crisp_indexer = register_e3_activated(crisp_indexer).await?; let crisp_indexer = register_ciphertext_output_published(crisp_indexer).await?; let crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; - crisp_indexer.start(); - - // Registry Listener - let registry_contract_listener = - EventListener::create_contract_listener(&ws_url, registry_address).await?; - let registry_listener = - register_committee_published(registry_contract_listener, readwrite_contract.clone()).await?; - registry_listener.start(); - - // CRISP Listener - let crisp_contract_listener = EventListener::create_contract_listener(ws_url, crisp_address).await?; - let crisp_listener = register_input_published(crisp_contract_listener, store).await?; - crisp_listener.start(); - + let crisp_indexer = register_committee_published(crisp_indexer).await?; + let crisp_indexer = register_input_published(crisp_indexer).await?; + info!("CRISP: Indexer finished registering handlers!"); + crisp_indexer.listen().await?; + info!("CRISP: Indexer listen loop has finished!"); Ok(()) }