From 0d5d242e57f795e236212bdd8faaee1237f88869 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 02:51:32 +0000 Subject: [PATCH 01/23] prefactor integrate registry --- crates/evm-helpers/src/contracts.rs | 29 +- crates/evm-helpers/src/listener.rs | 10 +- crates/evm-helpers/tests/integration.rs | 8 +- crates/indexer/src/indexer.rs | 346 ++++++++++++-------- examples/CRISP/server/src/server/indexer.rs | 72 ++-- 5 files changed, 260 insertions(+), 205 deletions(-) diff --git a/crates/evm-helpers/src/contracts.rs b/crates/evm-helpers/src/contracts.rs index e56eddd510..2fae264586 100644 --- a/crates/evm-helpers/src/contracts.rs +++ b/crates/evm-helpers/src/contracts.rs @@ -164,8 +164,8 @@ pub trait EnclaveWrite { } /// Generic type to represent different provider types -pub trait ProviderType: Send { - type Provider: Provider + Send + Sync + 'static; +pub trait ProviderType: Clone + Send + Sync + 'static { + type Provider: Provider + Clone + Send + Sync + 'static; } /// Marker type for read-only provider @@ -190,6 +190,15 @@ pub struct EnclaveContract { _marker: PhantomData, } +impl EnclaveContract { + pub fn address(&self) -> &Address { + &self.contract_address + } + pub fn get_provider(&self) -> Arc { + self.provider.clone() + } +} + impl EnclaveContract { pub async fn new( http_rpc_url: &str, @@ -198,14 +207,6 @@ impl EnclaveContract { ) -> Result> { EnclaveContractFactory::create_write(http_rpc_url, contract_address, private_key).await } - - pub fn get_provider(&self) -> Arc { - self.provider.clone() - } - - pub fn address(&self) -> &Address { - &self.contract_address - } } impl EnclaveContract { @@ -215,14 +216,6 @@ impl EnclaveContract { ) -> Result> { EnclaveContractFactory::create_read(http_rpc_url, contract_address).await } - - pub fn get_provider(&self) -> Arc { - self.provider.clone() - } - - pub fn address(&self) -> &Address { - &self.contract_address - } } /// Type alias for read-only provider diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index ccb00b1620..27105c36e4 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -36,7 +36,7 @@ impl EventListener { } } - pub async fn add_event_handler(&mut self, handler: F) + pub async fn add_event_handler(&self, handler: F) where E: SolEvent + Send + Clone + 'static, F: Fn(E) -> Fut + Send + Sync + 'static, @@ -94,9 +94,13 @@ impl EventListener { tokio::spawn(async move { this.listen().await }) } - pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result { + pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); - let address = contract_address.parse::
()?; + + let address = addresses + .iter() + .map(|a| a.parse::
().map_err(|e| eyre::eyre!("{e}"))) + .collect::>>()?; let filter = Filter::new() .address(address) .from_block(BlockNumberOrTag::Latest); diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index dc2c4d5c48..9b04bc3817 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -25,9 +25,9 @@ async fn test_event_listener() -> Result<()> { let (tx, mut rx) = tokio::sync::mpsc::channel::(10); let (tx_addr, mut rx_addr) = tokio::sync::mpsc::channel::(10); - let mut event_listener = EventListener::create_contract_listener( + let event_listener = EventListener::create_contract_listener( &anvil.ws_endpoint(), - &contract.address().to_string(), + &[&contract.address().to_string()], ) .await?; @@ -104,9 +104,9 @@ async fn test_overlapping_listener_handlers() -> Result<()> { let (contract, _, _, anvil) = setup_logs_contract().await?; let (tx, mut rx) = tokio::sync::mpsc::channel::(10); - let mut event_listener = EventListener::create_contract_listener( + let event_listener = EventListener::create_contract_listener( &anvil.ws_endpoint(), - &contract.address().to_string(), + &[&contract.address().to_string()], ) .await?; diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 4d6c8a8c9b..c3295e2236 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -13,7 +13,9 @@ use alloy::providers::Provider; use alloy::sol_types::SolEvent; use async_trait::async_trait; use e3_evm_helpers::{ - contracts::{EnclaveContract, EnclaveContractFactory, EnclaveRead, ReadOnly}, + contracts::{ + EnclaveContract, EnclaveContractFactory, EnclaveRead, ProviderType, ReadOnly, ReadWrite, + }, events::{CiphertextOutputPublished, E3Activated, InputPublished, PlaintextOutputPublished}, listener::EventListener, }; @@ -143,38 +145,130 @@ impl DataStore for SharedStore { } #[derive(Clone)] -pub struct EnclaveIndexer { +pub struct EnclaveIndexer { listener: EventListener, - contract: EnclaveContract, + contract: EnclaveContract, store: Arc>, contract_address: String, chain_id: u64, } -impl EnclaveIndexer { +#[derive(Clone)] +pub struct IndexerContext { + store: SharedStore, + listener: EventListener, + contract: EnclaveContract, + contract_address: String, + chain_id: u64, +} + +impl IndexerContext { + pub fn from_indexer(indexer: &EnclaveIndexer) -> Self { + Self { + store: SharedStore::new(indexer.store.clone()), + contract: indexer.contract.clone(), + listener: indexer.listener.clone(), + contract_address: indexer.contract_address.clone(), + chain_id: indexer.chain_id, + } + } + + pub fn store(&self) -> SharedStore { + self.store.clone() + } + + pub fn listener(&self) -> EventListener { + self.listener.clone() + } + pub fn contract(&self) -> EnclaveContract { + self.contract.clone() + } + pub fn enclave_address(&self) -> String { + self.contract_address.clone() + } + + pub fn chain_id(&self) -> u64 { + self.chain_id + } +} + +impl EnclaveIndexer { pub async fn new_with_in_mem_store( listener: EventListener, - contract: EnclaveContract, - ) -> Result> { + contract: EnclaveContract, + ) -> Result> { let store = InMemoryStore::new(); EnclaveIndexer::new(listener, contract, store).await } +} +impl EnclaveIndexer { pub async fn from_endpoint_address_in_mem( ws_url: &str, contract_address: &str, - ) -> Result> { - let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; + ) -> Result { + let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + } + pub async fn from_endpoint_address( + ws_url: &str, + contract_address: &str, + store: InMemoryStore, + ) -> Result { + let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; + let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; + EnclaveIndexer::new(listener, contract, store).await + } +} + +impl EnclaveIndexer { + pub async fn from_endpoint_address_in_mem_write( + ws_url: &str, + contract_address: &str, + private_key: &str, + ) -> Result { + let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; + let contract = + EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; + EnclaveIndexer::::new_with_in_mem_store(listener, contract).await + } + pub async fn from_endpoint_address_write( + ws_url: &str, + contract_address: &str, + private_key: &str, + store: InMemoryStore, + ) -> Result { + let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; + let contract = + EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; + EnclaveIndexer::new(listener, contract, store).await + } +} + +impl EnclaveIndexer { + pub async fn new_write( + ws_url: &str, + contract_address: &str, + registry_address: &str, + store: S, + private_key: &str, + ) -> Result { + EnclaveIndexer::new( + EventListener::create_contract_listener(ws_url, &[contract_address, registry_address]) + .await?, + EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?, + store, + ) + .await } } -impl EnclaveIndexer { +impl EnclaveIndexer { pub async fn new( listener: EventListener, - contract: EnclaveContract, + contract: EnclaveContract, store: S, ) -> Result { let chain_id = contract.provider.get_chain_id().await?; @@ -190,160 +284,132 @@ impl EnclaveIndexer { Ok(instance) } - pub async fn from_endpoint_address( - ws_url: &str, - contract_address: &str, - store: S, - ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - EnclaveIndexer::new(listener, contract, store).await - } - - pub async fn add_event_handler(&mut self, handler: F) + pub async fn add_event_handler(&self, handler: F) where E: SolEvent + Send + Clone + 'static, - F: Fn(E, SharedStore) -> Fut + Send + Sync + 'static, + F: Fn(E, Arc>) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - let store = SharedStore::new(self.store.clone()); let handler = Arc::new(handler); + let ctx = Arc::new(IndexerContext::from_indexer(self)); self.listener .add_event_handler(move |e: E| { let handler = Arc::clone(&handler); - let store = store.clone(); - async move { handler(e, store).await } + let ctx = ctx.clone(); + async move { handler(e, ctx).await } }) .await; } async fn register_e3_activated(&mut self) -> Result<()> { - let db = self.store.clone(); - let contract = self.contract.clone(); - let chain_id = self.chain_id; - let enclave_address = self.contract_address.clone(); - self.listener - .add_event_handler(move |e: E3Activated| { - let db = SharedStore::new(db.clone()); - let enclave_address = enclave_address.clone(); - let contract = contract.clone(); - - async move { - println!( - "E3Activated: id={}, expiration={}, pubkey=0x{}...", - e.e3Id, - e.expiration, - hex::encode(&e.committeePublicKey[..8.min(e.committeePublicKey.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - let e3 = contract.get_e3(e.e3Id).await?; - let duration = u64_try_from(e3.duration)?; - let expiration = u64_try_from(e.expiration)?; - let seed = e3.seed.to_be_bytes(); - let request_block = u64_try_from(e3.requestBlock)?; - let start_window = [ - u64_try_from(e3.startWindow[0])?, - u64_try_from(e3.startWindow[1])?, - ]; - // NOTE: we are only saving protocol specific info - // here and not CRISP specific info so E3 corresponds to the solidity E3 - let e3_obj = E3 { - chain_id, - ciphertext_inputs: vec![], - ciphertext_output: vec![], - committee_public_key: e.committeePublicKey.to_vec(), - duration, - custom_params: e3.customParams.to_vec(), - e3_params: e3.e3ProgramParams.to_vec(), - enclave_address, - encryption_scheme_id: e3.encryptionSchemeId.to_vec(), - expiration, - id: e3_id, - plaintext_output: vec![], - request_block, - seed, - start_window, - threshold: e3.threshold, - }; - - let mut repo = E3Repository::new(db, e3_id); - - repo.set_e3(e3_obj).await?; - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: E3Activated, ctx| { + async move { + let contract = ctx.contract(); + let db = ctx.store(); + let enclave_address = ctx.enclave_address(); + println!( + "E3Activated: id={}, expiration={}, pubkey=0x{}...", + e.e3Id, + e.expiration, + hex::encode(&e.committeePublicKey[..8.min(e.committeePublicKey.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + let e3 = contract.get_e3(e.e3Id).await?; + let duration = u64_try_from(e3.duration)?; + let expiration = u64_try_from(e.expiration)?; + let seed = e3.seed.to_be_bytes(); + let request_block = u64_try_from(e3.requestBlock)?; + let start_window = [ + u64_try_from(e3.startWindow[0])?, + u64_try_from(e3.startWindow[1])?, + ]; + // NOTE: we are only saving protocol specific info + // here and not CRISP specific info so E3 corresponds to the solidity E3 + let e3_obj = E3 { + chain_id: ctx.chain_id(), + ciphertext_inputs: vec![], + ciphertext_output: vec![], + committee_public_key: e.committeePublicKey.to_vec(), + duration, + custom_params: e3.customParams.to_vec(), + e3_params: e3.e3ProgramParams.to_vec(), + enclave_address, + encryption_scheme_id: e3.encryptionSchemeId.to_vec(), + expiration, + id: e3_id, + plaintext_output: vec![], + request_block, + seed, + start_window, + threshold: e3.threshold, + }; + + let mut repo = E3Repository::new(db, e3_id); + + repo.set_e3(e3_obj).await?; + Ok(()) + } + }) + .await; Ok(()) } async fn register_input_published(&mut self) -> Result<()> { - let store = self.store.clone(); - self.listener - .add_event_handler(move |e: InputPublished| { - let store = SharedStore::new(store.clone()); - async move { - println!( - "InputPublished: e3_id={}, index={}, data=0x{}...", - e.e3Id, - e.index, - hex::encode(&e.data[..8.min(e.data.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - - let mut repo = E3Repository::new(store, e3_id); - repo.insert_ciphertext_input(e.data.to_vec(), e.index.to::()) - .await?; - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: InputPublished, ctx| async move { + let store = ctx.store(); + println!( + "InputPublished: e3_id={}, index={}, data=0x{}...", + e.e3Id, + e.index, + hex::encode(&e.data[..8.min(e.data.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + + let mut repo = E3Repository::new(store, e3_id); + repo.insert_ciphertext_input(e.data.to_vec(), e.index.to::()) + .await?; + Ok(()) + }) + .await; Ok(()) } async fn register_ciphertext_output_published(&mut self) -> Result<()> { - let store = self.store.clone(); - self.listener - .add_event_handler(move |e: CiphertextOutputPublished| { - let store = SharedStore::new(store.clone()); - async move { - println!( - "CiphertextOutputPublished: e3_id={}, output=0x{}...", - e.e3Id, - hex::encode(&e.ciphertextOutput[..8.min(e.ciphertextOutput.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - - let mut repo = E3Repository::new(store, e3_id); - repo.set_ciphertext_output(e.ciphertextOutput.to_vec()) - .await?; - - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: CiphertextOutputPublished, ctx| async move { + let store = ctx.store(); + println!( + "CiphertextOutputPublished: e3_id={}, output=0x{}...", + e.e3Id, + hex::encode(&e.ciphertextOutput[..8.min(e.ciphertextOutput.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + + let mut repo = E3Repository::new(store, e3_id); + repo.set_ciphertext_output(e.ciphertextOutput.to_vec()) + .await?; + + Ok(()) + }) + .await; Ok(()) } async fn register_plaintext_output_published(&mut self) -> Result<()> { - let store = self.store.clone(); - self.listener - .add_event_handler(move |e: PlaintextOutputPublished| { - let store = SharedStore::new(store.clone()); - async move { - println!( - "PlaintextOutputPublished: e3_id={}, output=0x{}...", - e.e3Id, - hex::encode(&e.plaintextOutput[..8.min(e.plaintextOutput.len())]) - ); - let e3_id = u64_try_from(e.e3Id)?; - let mut repo = E3Repository::new(store, e3_id); - repo.set_plaintext_output(e.plaintextOutput.to_vec()) - .await?; - - Ok(()) - } - }) - .await; + self.add_event_handler(move |e: PlaintextOutputPublished, ctx| async move { + let store = ctx.store(); + println!( + "PlaintextOutputPublished: e3_id={}, output=0x{}...", + e.e3Id, + hex::encode(&e.plaintextOutput[..8.min(e.plaintextOutput.len())]) + ); + let e3_id = u64_try_from(e.e3Id)?; + let mut repo = E3Repository::new(store, e3_id); + repo.set_plaintext_output(e.plaintextOutput.to_vec()) + .await?; + + Ok(()) + }) + .await; Ok(()) } diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 2e029b5f55..3b926e64cd 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -18,14 +18,11 @@ use alloy_primitives::{Address, U256}; use e3_sdk::{ bfv_helpers::decode_bytes_to_vec_u64, evm_helpers::{ - contracts::{ - EnclaveContract, EnclaveContractFactory, EnclaveRead, EnclaveWrite, ReadWrite, - }, + contracts::{EnclaveRead, EnclaveWrite, ReadWrite}, events::{ CiphertextOutputPublished, CommitteePublished, E3Activated, E3Requested, PlaintextOutputPublished, }, - listener::EventListener, }, indexer::{DataStore, EnclaveIndexer}, }; @@ -40,11 +37,12 @@ use tokio::time::sleep; type Result = std::result::Result>; pub async fn register_e3_requested( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // E3Requested indexer - .add_event_handler(move |event: E3Requested, store| { + .add_event_handler(move |event: E3Requested, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); @@ -179,11 +177,12 @@ pub async fn register_e3_requested( } pub async fn register_e3_activated( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // E3Activated indexer - .add_event_handler(move |event: E3Activated, store| { + .add_event_handler(move |event: E3Activated, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); let mut current_round_repo = CurrentRoundRepository::new(store); @@ -267,11 +266,12 @@ pub async fn register_e3_activated( } pub async fn register_ciphertext_output_published( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // CiphertextOutputPublished indexer - .add_event_handler(move |event: CiphertextOutputPublished, store| { + .add_event_handler(move |event: CiphertextOutputPublished, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store, e3_id); async move { @@ -285,11 +285,12 @@ pub async fn register_ciphertext_output_published( } pub async fn register_plaintext_output_published( - mut indexer: EnclaveIndexer, -) -> Result> { + indexer: EnclaveIndexer, +) -> Result> { // PlaintextOutputPublished indexer - .add_event_handler(move |event: PlaintextOutputPublished, store| { + .add_event_handler(move |event: PlaintextOutputPublished, ctx| { + let store = ctx.store(); let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store, e3_id); async move { @@ -327,14 +328,13 @@ pub async fn register_plaintext_output_published( } pub async fn register_committee_published( - mut listener: EventListener, - contract: EnclaveContract, -) -> Result { + indexer: EnclaveIndexer, +) -> Result> { // CommitteePublished - listener - .add_event_handler(move |event: CommitteePublished| { - let contract = contract.clone(); + indexer + .add_event_handler(move |event: CommitteePublished, ctx| { async move { + let contract = ctx.contract(); // We need to do this to ensure this is idempotent. // TODO: conserve bandwidth and check for E3AlreadyActivated error instead of // making two calls to contract @@ -381,7 +381,7 @@ pub async fn register_committee_published( } }) .await; - Ok(listener) + Ok(indexer) } pub async fn get_current_timestamp_rpc() -> eyre::Result { @@ -401,29 +401,21 @@ pub async fn start_indexer( store: impl DataStore, private_key: &str, ) -> Result<()> { - let readonly_contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - - let readwrite_contract = - EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; - - let enclave_contract_listener = - EventListener::create_contract_listener(ws_url, contract_address).await?; - // CRISP indexer - let crisp_indexer = - EnclaveIndexer::new(enclave_contract_listener, readonly_contract, store).await?; + let crisp_indexer = EnclaveIndexer::new_write( + ws_url, + contract_address, + registry_address, + store, + private_key, + ) + .await?; let crisp_indexer = register_e3_requested(crisp_indexer).await?; let crisp_indexer = register_e3_activated(crisp_indexer).await?; let crisp_indexer = register_ciphertext_output_published(crisp_indexer).await?; let crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; + let crisp_indexer = register_committee_published(crisp_indexer).await?; crisp_indexer.start(); - // Registry Listener - let registry_contract_listener = - EventListener::create_contract_listener(&ws_url, registry_address).await?; - let registry_listener = - register_committee_published(registry_contract_listener, readwrite_contract).await?; - registry_listener.start(); - Ok(()) } From d34c6133902a81b63eb31293d681dfeabe46a839 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 13:45:04 +0000 Subject: [PATCH 02/23] fix test --- crates/indexer/tests/integration.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index ff892c5f0f..950ef72850 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -9,6 +9,7 @@ use alloy::{ primitives::{Bytes, Uint}, sol, }; +use e3_evm_helpers::contracts::ReadOnly; use e3_indexer::{DataStore, EnclaveIndexer, InMemoryStore}; use eyre::Result; use helpers::setup_fake_enclave; @@ -28,11 +29,14 @@ async fn test_indexer() -> Result<()> { let address = address.to_string(); let endpoint = endpoint.to_string(); - let mut indexer = - EnclaveIndexer::::from_endpoint_address_in_mem(&endpoint, &address).await?; + let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( + &endpoint, &address, + ) + .await?; indexer - .add_event_handler(move |_: InputPublished, mut store| async move { + .add_event_handler(move |_: InputPublished, ctx| async move { + let mut store = ctx.store(); store .modify("input_count", |counter: Option| { Some(counter.map_or(1, |c| c + 1)) From 8607483efbb05a473f56a7aaec44632dfbc230a3 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 14:19:54 +0000 Subject: [PATCH 03/23] test multiple contracts --- crates/indexer/src/indexer.rs | 6 ++-- crates/indexer/tests/helpers.rs | 19 +++++++++++ crates/indexer/tests/integration.rs | 49 ++++++++++++++++++++++++++--- 3 files changed, 66 insertions(+), 8 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index c3295e2236..3ee2d6d3e9 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -206,10 +206,10 @@ impl EnclaveIndexer { impl EnclaveIndexer { pub async fn from_endpoint_address_in_mem( ws_url: &str, - contract_address: &str, + contract_address: &[&str], ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; + let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; + let contract = EnclaveContractFactory::create_read(ws_url, contract_address[0]).await?; EnclaveIndexer::::new_with_in_mem_store(listener, contract).await } pub async fn from_endpoint_address( diff --git a/crates/indexer/tests/helpers.rs b/crates/indexer/tests/helpers.rs index 8e24dfaa6f..43dfdb1d7b 100644 --- a/crates/indexer/tests/helpers.rs +++ b/crates/indexer/tests/helpers.rs @@ -4,6 +4,8 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use std::sync::Arc; + // helpers.rs use alloy::{ network::Ethereum, @@ -40,6 +42,23 @@ pub async fn setup_fake_enclave() -> Result<( Ok((contract, address, endpoint, anvil)) } +pub async fn setup_two_contracts() -> Result<( + EnclaveInstance, + String, + EmitLogsInstance, + String, + String, + AnvilInstance, +)> { + let (provider, endpoint, anvil) = setup_provider().await?; + let provider = Arc::new(provider); + let contract1 = Enclave::deploy(provider.clone()).await?; + let contract2 = EmitLogsInstance::deploy(provider.clone()).await?; + let address1 = contract1.address().to_string(); + let address2 = contract2.address().to_string(); + Ok((contract1, address1, contract2, address2, endpoint, anvil)) +} + pub async fn setup_provider() -> Result<(impl Provider, String, AnvilInstance)> { // Set anvil with fast blocktimes for testing let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?; diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 950ef72850..759607bf1c 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -12,9 +12,13 @@ use alloy::{ use e3_evm_helpers::contracts::ReadOnly; use e3_indexer::{DataStore, EnclaveIndexer, InMemoryStore}; use eyre::Result; -use helpers::setup_fake_enclave; -use std::time::Duration; +use helpers::setup_two_contracts; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::time::sleep; +use EmitLogs::PublishMessage; use Enclave::InputPublished; sol!( @@ -23,17 +27,26 @@ sol!( "tests/fixtures/fake_enclave.json" ); +sol!( + #[sol(rpc)] + EmitLogs, + "tests/fixtures/emit_logs.json" +); + #[tokio::test] async fn test_indexer() -> Result<()> { - let (contract, address, endpoint, _anvil) = setup_fake_enclave().await?; + let (contract, address, contract2, address2, endpoint, _anvil) = setup_two_contracts().await?; let address = address.to_string(); let endpoint = endpoint.to_string(); let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( - &endpoint, &address, + &endpoint, + &[&address, &address2], ) .await?; + let published: Arc>> = Arc::new(Mutex::new(vec![])); + indexer .add_event_handler(move |_: InputPublished, ctx| async move { let mut store = ctx.store(); @@ -47,6 +60,18 @@ async fn test_indexer() -> Result<()> { }) .await; + let published_clone = published.clone(); + indexer + .add_event_handler(move |msg: PublishMessage, _ctx| { + let published_clone = published_clone.clone(); + async move { + let mut guard = published_clone.lock().unwrap(); + guard.push(msg.value); + Ok(()) + } + }) + .await; + // Start tracking state let _ = indexer.start(); @@ -79,6 +104,15 @@ async fn test_indexer() -> Result<()> { .watch() .await?; + // Here we check that we can emit events on a second contract and have the indexer still listen + // for it + contract2 + .emitPublishMessage("Hello from contract2!".into()) + .send() + .await? + .watch() + .await?; + contract .emitInputPublished( Uint::from(e3_id), @@ -104,7 +138,12 @@ async fn test_indexer() -> Result<()> { .await?; sleep(Duration::from_millis(10)).await; - + let published_clone = published.clone(); + let published_guard = published_clone.lock().unwrap(); + assert_eq!( + published_guard.iter().cloned().collect::>(), + vec!["Hello from contract2!".to_string()] + ); assert_eq!(indexer.get_e3(e3_id).await?.ciphertext_inputs.len(), 3); assert_eq!( indexer.get_e3(e3_id).await?.ciphertext_inputs, From c400e00658f4e1ea8c2a4e3e8e3920de526a15eb Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 14:25:37 +0000 Subject: [PATCH 04/23] update tests --- crates/indexer/tests/integration.rs | 130 ++++++++++++++++------------ 1 file changed, 76 insertions(+), 54 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 759607bf1c..745e8faf3b 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -35,18 +35,26 @@ sol!( #[tokio::test] async fn test_indexer() -> Result<()> { - let (contract, address, contract2, address2, endpoint, _anvil) = setup_two_contracts().await?; - let address = address.to_string(); - let endpoint = endpoint.to_string(); + const E3_ID: u64 = 10; + const THRESHOLD: u64 = 10; + const INDEXER_DELAY_MS: u64 = 10; + + let ( + enclave_contract, + enclave_address, + emit_logs_contract, + emit_logs_address, + endpoint, + _anvil, + ) = setup_two_contracts().await?; let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( - &endpoint, - &[&address, &address2], + &endpoint.to_string(), + &[&enclave_address.to_string(), &emit_logs_address.to_string()], ) .await?; - let published: Arc>> = Arc::new(Mutex::new(vec![])); - + // Track InputPublished event count in store indexer .add_event_handler(move |_: InputPublished, ctx| async move { let mut store = ctx.store(); @@ -55,47 +63,47 @@ async fn test_indexer() -> Result<()> { Some(counter.map_or(1, |c| c + 1)) }) .await?; - Ok(()) }) .await; - let published_clone = published.clone(); + // Collect PublishMessage events + let captured_messages: Arc>> = Arc::new(Mutex::new(vec![])); + let captured_messages_for_handler = captured_messages.clone(); + indexer .add_event_handler(move |msg: PublishMessage, _ctx| { - let published_clone = published_clone.clone(); + // Collect message + let messages = captured_messages_for_handler.clone(); async move { - let mut guard = published_clone.lock().unwrap(); - guard.push(msg.value); + messages.lock().unwrap().push(msg.value); Ok(()) } }) .await; - // Start tracking state let _ = indexer.start(); - // E3Activated - let e3_id = 10; + let public_key = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; + let input_data = "Random data that wont actually be a string".to_string(); + let input_data_bytes = Bytes::from(input_data.clone().into_bytes()); + let ciphertext_output_data = vec![9, 8, 7, 6, 5, 4, 3, 2, 1]; - let pubkey = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; - contract + enclave_contract .emitE3Activated( - Uint::from(e3_id), - Uint::from(10), - Bytes::from(pubkey.clone()), + Uint::from(E3_ID), + Uint::from(THRESHOLD), + Bytes::from(public_key.clone()), ) .send() .await? .watch() .await?; - // InputPublished - let data = "Random data that wont actually be a string".to_string(); - contract + enclave_contract .emitInputPublished( - Uint::from(e3_id), - Bytes::from(data.clone().into_bytes()), + Uint::from(E3_ID), + input_data_bytes.clone(), Uint::from(1111), Uint::from(1), ) @@ -104,19 +112,18 @@ async fn test_indexer() -> Result<()> { .watch() .await?; - // Here we check that we can emit events on a second contract and have the indexer still listen - // for it - contract2 + // Sending message from logs contract which indexer is listening to + emit_logs_contract .emitPublishMessage("Hello from contract2!".into()) .send() .await? .watch() .await?; - contract + enclave_contract .emitInputPublished( - Uint::from(e3_id), - Bytes::from(data.clone().into_bytes()), + Uint::from(E3_ID), + input_data_bytes.clone(), Uint::from(2222), Uint::from(2), ) @@ -125,10 +132,10 @@ async fn test_indexer() -> Result<()> { .watch() .await?; - contract + enclave_contract .emitInputPublished( - Uint::from(e3_id), - Bytes::from(data.clone().into_bytes()), + Uint::from(E3_ID), + input_data_bytes.clone(), Uint::from(3333), Uint::from(3), ) @@ -137,39 +144,54 @@ async fn test_indexer() -> Result<()> { .watch() .await?; - sleep(Duration::from_millis(10)).await; - let published_clone = published.clone(); - let published_guard = published_clone.lock().unwrap(); + sleep(Duration::from_millis(INDEXER_DELAY_MS)).await; + + let messages_from_second_contract = captured_messages.lock().unwrap(); assert_eq!( - published_guard.iter().cloned().collect::>(), + messages_from_second_contract + .iter() + .cloned() + .collect::>(), vec!["Hello from contract2!".to_string()] ); - assert_eq!(indexer.get_e3(e3_id).await?.ciphertext_inputs.len(), 3); + drop(messages_from_second_contract); + + let e3_state = indexer.get_e3(E3_ID).await?; + let expected_input_count = 3; + assert_eq!( - indexer.get_e3(e3_id).await?.ciphertext_inputs, - vec![ - (Bytes::from(data.clone().into_bytes()).to_vec(), 1), - (Bytes::from(data.clone().into_bytes()).to_vec(), 2), - (Bytes::from(data.clone().into_bytes()).to_vec(), 3), - ] + e3_state.ciphertext_inputs.len(), + expected_input_count as usize ); - let ciphertext_output = vec![9, 8, 7, 6, 5, 4, 3, 2, 1]; - contract - .emitCiphertextOutputPublished(Uint::from(e3_id), Bytes::from(ciphertext_output.clone())) + let expected_inputs = vec![ + (input_data_bytes.to_vec(), 1), + (input_data_bytes.to_vec(), 2), + (input_data_bytes.to_vec(), 3), + ]; + assert_eq!(e3_state.ciphertext_inputs, expected_inputs); + + enclave_contract + .emitCiphertextOutputPublished( + Uint::from(E3_ID), + Bytes::from(ciphertext_output_data.clone()), + ) .send() .await? .watch() .await?; - sleep(Duration::from_millis(10)).await; - - let e3 = indexer.get_e3(e3_id).await?; + sleep(Duration::from_millis(INDEXER_DELAY_MS)).await; - assert_eq!(e3.ciphertext_output, ciphertext_output); + let e3_state_after_output = indexer.get_e3(E3_ID).await?; + assert_eq!( + e3_state_after_output.ciphertext_output, + ciphertext_output_data + ); let store = indexer.get_store(); - let val = store.get::("input_count").await?.unwrap(); - assert_eq!(val, 3); + let total_inputs_processed = store.get::("input_count").await?.unwrap(); + assert_eq!(total_inputs_processed, expected_input_count); + Ok(()) } From 2dd4832781a9225ebd525670eb90dbf13a9cc87b Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 14:40:34 +0000 Subject: [PATCH 05/23] update comments --- crates/evm-helpers/src/listener.rs | 1 + crates/indexer/src/indexer.rs | 44 ++++++++++-------------------- 2 files changed, 15 insertions(+), 30 deletions(-) diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index 27105c36e4..3a8aaf29cf 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -94,6 +94,7 @@ impl EventListener { tokio::spawn(async move { this.listen().await }) } + /// Create a contract listener that will listen to events from all addresses. pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 3ee2d6d3e9..12f54bbf4d 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -204,50 +204,34 @@ impl EnclaveIndexer { } impl EnclaveIndexer { + /// Creates an `EnclaveIndexer` with an in-memory store. + /// + /// Note: `contract_addresses[0]` must be the enclave contract address. pub async fn from_endpoint_address_in_mem( ws_url: &str, - contract_address: &[&str], + contract_addresses: &[&str], ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, contract_address).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address[0]).await?; + let listener = EventListener::create_contract_listener(ws_url, contract_addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, contract_addresses[0]).await?; EnclaveIndexer::::new_with_in_mem_store(listener, contract).await } - pub async fn from_endpoint_address( - ws_url: &str, - contract_address: &str, - store: InMemoryStore, - ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; - EnclaveIndexer::new(listener, contract, store).await - } -} -impl EnclaveIndexer { - pub async fn from_endpoint_address_in_mem_write( - ws_url: &str, - contract_address: &str, - private_key: &str, - ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; - let contract = - EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; - EnclaveIndexer::::new_with_in_mem_store(listener, contract).await - } - pub async fn from_endpoint_address_write( + /// Creates an `EnclaveIndexer` with a provided in-memory store. + /// + /// Note: `contract_addresses[0]` must be the enclave contract address. + pub async fn from_endpoint_address( ws_url: &str, - contract_address: &str, - private_key: &str, + contract_addresses: &[&str], store: InMemoryStore, ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, &[contract_address]).await?; - let contract = - EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; + let listener = EventListener::create_contract_listener(ws_url, contract_addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, contract_addresses[0]).await?; EnclaveIndexer::new(listener, contract, store).await } } impl EnclaveIndexer { + /// Creates a new EnclaveIndexer with a writeable contract. pub async fn new_write( ws_url: &str, contract_address: &str, From 254be0a8ff4d41d66e5fb495e0a7ddd99fc16546 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 14:46:35 +0000 Subject: [PATCH 06/23] rename constructor --- crates/indexer/src/indexer.rs | 2 +- examples/CRISP/server/src/server/indexer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 12f54bbf4d..aa0384d8f1 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -232,7 +232,7 @@ impl EnclaveIndexer { impl EnclaveIndexer { /// Creates a new EnclaveIndexer with a writeable contract. - pub async fn new_write( + pub async fn new_with_write_contract( ws_url: &str, contract_address: &str, registry_address: &str, diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 3b926e64cd..05dd959c89 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -402,7 +402,7 @@ pub async fn start_indexer( private_key: &str, ) -> Result<()> { // CRISP indexer - let crisp_indexer = EnclaveIndexer::new_write( + let crisp_indexer = EnclaveIndexer::new_with_write_contract( ws_url, contract_address, registry_address, From 421a92e45088fe4c0d57bf120be710c3dab23da1 Mon Sep 17 00:00:00 2001 From: ryardley Date: Sun, 23 Nov 2025 16:56:49 +0000 Subject: [PATCH 07/23] add comment --- crates/indexer/src/indexer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index aa0384d8f1..e213fc4b76 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -268,6 +268,8 @@ impl EnclaveIndexer { Ok(instance) } + /// Listen for contract events from all contracts. + /// Callback will provide the event and a context object. pub async fn add_event_handler(&self, handler: F) where E: SolEvent + Send + Clone + 'static, From c82cbae48abf4dba115803ff44ccfefab944a139 Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 15:47:59 +0000 Subject: [PATCH 08/23] add test to demonstrate memory leak --- crates/indexer/tests/integration.rs | 117 ++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 745e8faf3b..ec84cb6e2c 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -195,3 +195,120 @@ async fn test_indexer() -> Result<()> { Ok(()) } + +mod memory_leak { + + use e3_evm_helpers::{contracts::ProviderType, listener::EventListener}; + + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + // Track how many instances exist + static INDEXER_COUNT: AtomicUsize = AtomicUsize::new(0); + static LISTENER_COUNT: AtomicUsize = AtomicUsize::new(0); + static CONTEXT_COUNT: AtomicUsize = AtomicUsize::new(0); + + // Wrapper types that track creation/destruction + struct TrackedIndexer { + inner: EnclaveIndexer, + } + + impl Drop for TrackedIndexer { + fn drop(&mut self) { + INDEXER_COUNT.fetch_sub(1, Ordering::SeqCst); + println!("Dropped TrackedIndexer"); + } + } + + struct TrackedListener { + inner: EventListener, + } + + impl Drop for TrackedListener { + fn drop(&mut self) { + LISTENER_COUNT.fetch_sub(1, Ordering::SeqCst); + println!("Dropped TrackedListener"); + } + } + + impl Clone for TrackedListener { + fn clone(&self) -> Self { + LISTENER_COUNT.fetch_add(1, Ordering::SeqCst); + Self { + inner: self.inner.clone(), + } + } + } + + async fn create_test_indexer() -> Result> { + EnclaveIndexer::::from_endpoint_address_in_mem( + "ws://example.com", + &[""], + ) + .await + } + + #[tokio::test] + async fn test_memory_leak() -> Result<()> { + sol! { + #[derive(Debug)] + event TestEvent(); + + } + + let (_, enclave_address, _, _, endpoint, _anvil) = setup_two_contracts().await?; + + // Reset counters + INDEXER_COUNT.store(0, Ordering::SeqCst); + LISTENER_COUNT.store(0, Ordering::SeqCst); + CONTEXT_COUNT.store(0, Ordering::SeqCst); + + // Create indexer + let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( + &endpoint, + &[&enclave_address], + ) + .await?; + + INDEXER_COUNT.fetch_add(1, Ordering::SeqCst); + + println!("Created indexer"); + println!("Indexer count: {}", INDEXER_COUNT.load(Ordering::SeqCst)); + println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); + + // Add an event handler that captures context + indexer + .add_event_handler(|event: TestEvent, ctx| async move { + // This closure captures ctx, which contains a listener clone + println!("Event received: {:?}", event); + Ok(()) + }) + .await; + + println!("Added handler"); + println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); + + // Drop the indexer + drop(indexer); + + println!("Dropped indexer"); + println!("Indexer count: {}", INDEXER_COUNT.load(Ordering::SeqCst)); + println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); + + // Force garbage collection attempts + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Check if everything was cleaned up + assert_eq!( + INDEXER_COUNT.load(Ordering::SeqCst), + 0, + "Indexer was not dropped!" + ); + assert_eq!( + LISTENER_COUNT.load(Ordering::SeqCst), + 0, + "Listener was not dropped - MEMORY LEAK!" + ); + Ok(()) + } +} From f30db0d27d52acda26eb29120bfaa8ee66d963f6 Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 16:54:08 +0000 Subject: [PATCH 09/23] fix test --- crates/indexer/tests/integration.rs | 130 +++++++++++----------------- 1 file changed, 50 insertions(+), 80 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index ec84cb6e2c..23ccd1a69a 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -198,54 +198,42 @@ async fn test_indexer() -> Result<()> { mod memory_leak { - use e3_evm_helpers::{contracts::ProviderType, listener::EventListener}; + use e3_evm_helpers::{contracts::EnclaveContractFactory, listener::EventListener}; use super::*; use std::sync::atomic::{AtomicUsize, Ordering}; // Track how many instances exist - static INDEXER_COUNT: AtomicUsize = AtomicUsize::new(0); - static LISTENER_COUNT: AtomicUsize = AtomicUsize::new(0); - static CONTEXT_COUNT: AtomicUsize = AtomicUsize::new(0); + static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); + static CREATE_COUNT: AtomicUsize = AtomicUsize::new(0); - // Wrapper types that track creation/destruction - struct TrackedIndexer { - inner: EnclaveIndexer, - } - - impl Drop for TrackedIndexer { - fn drop(&mut self) { - INDEXER_COUNT.fetch_sub(1, Ordering::SeqCst); - println!("Dropped TrackedIndexer"); - } - } + #[derive(Clone)] + struct LeakDetector(Arc); - struct TrackedListener { - inner: EventListener, - } + struct DropCounter; - impl Drop for TrackedListener { + impl Drop for DropCounter { fn drop(&mut self) { - LISTENER_COUNT.fetch_sub(1, Ordering::SeqCst); - println!("Dropped TrackedListener"); + DROP_COUNT.fetch_add(1, Ordering::SeqCst); } } - impl Clone for TrackedListener { - fn clone(&self) -> Self { - LISTENER_COUNT.fetch_add(1, Ordering::SeqCst); - Self { - inner: self.inner.clone(), - } + impl LeakDetector { + fn new() -> Self { + CREATE_COUNT.fetch_add(1, Ordering::SeqCst); + Self(Arc::new(DropCounter)) } } - async fn create_test_indexer() -> Result> { - EnclaveIndexer::::from_endpoint_address_in_mem( - "ws://example.com", - &[""], - ) - .await + async fn create_indexer() -> Result> { + let (_, enclave_address, _, _, endpoint, _anvil) = setup_two_contracts().await?; + + // Create indexer + let listener = + EventListener::create_contract_listener(&endpoint, &[&enclave_address]).await?; + let contract = EnclaveContractFactory::create_read(&endpoint, &enclave_address).await?; + + EnclaveIndexer::::new_with_in_mem_store(listener, contract).await } #[tokio::test] @@ -256,59 +244,41 @@ mod memory_leak { } - let (_, enclave_address, _, _, endpoint, _anvil) = setup_two_contracts().await?; - - // Reset counters - INDEXER_COUNT.store(0, Ordering::SeqCst); - LISTENER_COUNT.store(0, Ordering::SeqCst); - CONTEXT_COUNT.store(0, Ordering::SeqCst); - - // Create indexer - let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( - &endpoint, - &[&enclave_address], - ) - .await?; - - INDEXER_COUNT.fetch_add(1, Ordering::SeqCst); - - println!("Created indexer"); - println!("Indexer count: {}", INDEXER_COUNT.load(Ordering::SeqCst)); - println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); - - // Add an event handler that captures context - indexer - .add_event_handler(|event: TestEvent, ctx| async move { - // This closure captures ctx, which contains a listener clone - println!("Event received: {:?}", event); - Ok(()) - }) - .await; - - println!("Added handler"); - println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); + DROP_COUNT.store(0, Ordering::SeqCst); + CREATE_COUNT.store(0, Ordering::SeqCst); + + { + // Add an event handler that captures context + let indexer = create_indexer().await?; + let detector = LeakDetector::new(); + + indexer + .add_event_handler(move |event: TestEvent, ctx| { + let _captured = detector.clone(); + async move { + // This closure captures ctx, which contains a listener clone + println!("Event received: {:?}", event); + Ok(()) + } + }) + .await; + } - // Drop the indexer - drop(indexer); + // Delay to ensure everything is dropped. + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - println!("Dropped indexer"); - println!("Indexer count: {}", INDEXER_COUNT.load(Ordering::SeqCst)); - println!("Listener count: {}", LISTENER_COUNT.load(Ordering::SeqCst)); + let created = CREATE_COUNT.load(Ordering::SeqCst); + let dropped = DROP_COUNT.load(Ordering::SeqCst); - // Force garbage collection attempts - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + println!("Created: {}, Dropped: {}", created, dropped); - // Check if everything was cleaned up + // This assertion will FAIL if there's a leak assert_eq!( - INDEXER_COUNT.load(Ordering::SeqCst), - 0, - "Indexer was not dropped!" - ); - assert_eq!( - LISTENER_COUNT.load(Ordering::SeqCst), - 0, - "Listener was not dropped - MEMORY LEAK!" + created, dropped, + "Memory leak detected! Created {} objects but only dropped {}", + created, dropped ); + Ok(()) } } From 7b697a51295ee1471d1bb32e9b1339c8de9c377a Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 16:56:05 +0000 Subject: [PATCH 10/23] ensure nothing is compiled away --- crates/indexer/tests/integration.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 23ccd1a69a..4a8a5c794b 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -210,6 +210,7 @@ mod memory_leak { #[derive(Clone)] struct LeakDetector(Arc); + #[derive(Debug)] struct DropCounter; impl Drop for DropCounter { @@ -253,8 +254,9 @@ mod memory_leak { let detector = LeakDetector::new(); indexer - .add_event_handler(move |event: TestEvent, ctx| { + .add_event_handler(move |event: TestEvent, _ctx| { let _captured = detector.clone(); + println!("{:?}", _captured.0); async move { // This closure captures ctx, which contains a listener clone println!("Event received: {:?}", event); From 002220cdbe491329d184558870ae52a77b179e65 Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 17:07:46 +0000 Subject: [PATCH 11/23] failing.. --- crates/indexer/src/indexer.rs | 16 ++++++++++++++-- crates/indexer/tests/integration.rs | 2 ++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index e213fc4b76..743e75156c 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -278,11 +278,23 @@ impl EnclaveIndexer { { let handler = Arc::new(handler); let ctx = Arc::new(IndexerContext::from_indexer(self)); + // In order to avoid a memory leak we create a weak reference here + let ctx_weak = Arc::downgrade(&ctx); + self.listener .add_event_handler(move |e: E| { let handler = Arc::clone(&handler); - let ctx = ctx.clone(); - async move { handler(e, ctx).await } + let ctx_weak = ctx_weak.clone(); + + async move { + // We check the weak reference if it can be upgraded + // if not it must have been destroyed + if let Some(ctx) = ctx_weak.upgrade() { + handler(e, ctx).await + } else { + Ok(()) + } + } }) .await; } diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 4a8a5c794b..2152fe22dd 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -34,6 +34,7 @@ sol!( ); #[tokio::test] +// #[ignore] async fn test_indexer() -> Result<()> { const E3_ID: u64 = 10; const THRESHOLD: u64 = 10; @@ -238,6 +239,7 @@ mod memory_leak { } #[tokio::test] + #[ignore] async fn test_memory_leak() -> Result<()> { sol! { #[derive(Debug)] From 9ebb6b25beb434f9a29c1a1f038e26d42be6fb93 Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 17:26:23 +0000 Subject: [PATCH 12/23] fix tests --- crates/indexer/src/indexer.rs | 43 +++++++++++------------------ crates/indexer/tests/integration.rs | 1 - 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 743e75156c..9d6de13d6b 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -146,14 +146,9 @@ impl DataStore for SharedStore { #[derive(Clone)] pub struct EnclaveIndexer { - listener: EventListener, - contract: EnclaveContract, - store: Arc>, - contract_address: String, - chain_id: u64, + ctx: Arc>, } -#[derive(Clone)] pub struct IndexerContext { store: SharedStore, listener: EventListener, @@ -163,16 +158,6 @@ pub struct IndexerContext { } impl IndexerContext { - pub fn from_indexer(indexer: &EnclaveIndexer) -> Self { - Self { - store: SharedStore::new(indexer.store.clone()), - contract: indexer.contract.clone(), - listener: indexer.listener.clone(), - contract_address: indexer.contract_address.clone(), - chain_id: indexer.chain_id, - } - } - pub fn store(&self) -> SharedStore { self.store.clone() } @@ -258,11 +243,13 @@ impl EnclaveIndexer { let chain_id = contract.provider.get_chain_id().await?; let contract_address = contract.address().to_string(); let mut instance = Self { - store: Arc::new(RwLock::new(store)), - contract, - listener, - contract_address, - chain_id, + ctx: Arc::new(IndexerContext { + store: SharedStore::new(Arc::new(RwLock::new(store))), + contract, + listener, + contract_address, + chain_id, + }), }; instance.setup_listeners().await?; Ok(instance) @@ -277,11 +264,12 @@ impl EnclaveIndexer { Fut: Future> + Send + 'static, { let handler = Arc::new(handler); - let ctx = Arc::new(IndexerContext::from_indexer(self)); + let ctx = self.ctx.clone(); // In order to avoid a memory leak we create a weak reference here let ctx_weak = Arc::downgrade(&ctx); - self.listener + self.ctx + .listener .add_event_handler(move |e: E| { let handler = Arc::clone(&handler); let ctx_weak = ctx_weak.clone(); @@ -292,6 +280,7 @@ impl EnclaveIndexer { if let Some(ctx) = ctx_weak.upgrade() { handler(e, ctx).await } else { + println!("Context was dropped!"); Ok(()) } } @@ -420,20 +409,20 @@ impl EnclaveIndexer { } pub fn start(&self) -> JoinHandle> { - self.listener.start() + self.ctx.listener.start() } pub async fn get_e3(&self, e3_id: u64) -> Result { - let (e3, _) = get_e3(self.store.clone(), e3_id).await?; + let (e3, _) = get_e3(self.ctx.store.inner.clone(), e3_id).await?; Ok(e3) } pub fn get_listener(&self) -> EventListener { - self.listener.clone() + self.ctx.listener.clone() } pub fn get_store(&self) -> SharedStore { - SharedStore::new(self.store.clone()) + self.ctx.store.clone() } } diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 2152fe22dd..7521a94a1d 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -239,7 +239,6 @@ mod memory_leak { } #[tokio::test] - #[ignore] async fn test_memory_leak() -> Result<()> { sol! { #[derive(Debug)] From f5e62980f906d21a01d298045446b7904405e955 Mon Sep 17 00:00:00 2001 From: ryardley Date: Mon, 24 Nov 2025 17:32:44 +0000 Subject: [PATCH 13/23] tidy up comments --- crates/indexer/tests/integration.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 7521a94a1d..d8b88f519f 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -34,7 +34,6 @@ sol!( ); #[tokio::test] -// #[ignore] async fn test_indexer() -> Result<()> { const E3_ID: u64 = 10; const THRESHOLD: u64 = 10; @@ -204,7 +203,6 @@ mod memory_leak { use super::*; use std::sync::atomic::{AtomicUsize, Ordering}; - // Track how many instances exist static DROP_COUNT: AtomicUsize = AtomicUsize::new(0); static CREATE_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -230,7 +228,6 @@ mod memory_leak { async fn create_indexer() -> Result> { let (_, enclave_address, _, _, endpoint, _anvil) = setup_two_contracts().await?; - // Create indexer let listener = EventListener::create_contract_listener(&endpoint, &[&enclave_address]).await?; let contract = EnclaveContractFactory::create_read(&endpoint, &enclave_address).await?; @@ -256,10 +253,10 @@ mod memory_leak { indexer .add_event_handler(move |event: TestEvent, _ctx| { + // This closure captures a ref to detector let _captured = detector.clone(); println!("{:?}", _captured.0); async move { - // This closure captures ctx, which contains a listener clone println!("Event received: {:?}", event); Ok(()) } @@ -275,7 +272,7 @@ mod memory_leak { println!("Created: {}, Dropped: {}", created, dropped); - // This assertion will FAIL if there's a leak + // If the handler was dropped then the detector will be dropped too assert_eq!( created, dropped, "Memory leak detected! Created {} objects but only dropped {}", From 24da2ce7f738774b599e3d9aaf0ac50b544838ad Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 15:42:44 +0000 Subject: [PATCH 14/23] --wip-- [skip ci] --- Cargo.lock | 1 + crates/indexer/Cargo.toml | 1 + crates/indexer/src/indexer.rs | 11 ++ examples/CRISP/Cargo.lock | 1 + .../crisp-contracts/deployed_contracts.json | 123 ++++++++++++++++++ examples/CRISP/server/src/server/indexer.rs | 7 +- .../IBondingRegistry.json | 2 +- .../ICiphernodeRegistry.json | 2 +- .../interfaces/IEnclave.sol/IEnclave.json | 2 +- 9 files changed, 146 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc10fc725e..a5ee1b5dc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2957,6 +2957,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index 4e83410aa6..d1dbfca2df 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -15,3 +15,4 @@ eyre.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 9d6de13d6b..e5133e3b21 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -27,6 +27,7 @@ use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; use tokio::task::JoinHandle; +use tracing::info; type E3Id = u64; @@ -149,6 +150,12 @@ pub struct EnclaveIndexer { ctx: Arc>, } +impl Drop for EnclaveIndexer { + fn drop(&mut self) { + info!("EnclaveIndexer is DROPPED"); + } +} + pub struct IndexerContext { store: SharedStore, listener: EventListener, @@ -252,6 +259,7 @@ impl EnclaveIndexer { }), }; instance.setup_listeners().await?; + info!("EnclaveIndexer has been configured"); Ok(instance) } @@ -401,14 +409,17 @@ impl EnclaveIndexer { } async fn setup_listeners(&mut self) -> Result<()> { + info!("Setting up listeners for EnclaveIndexer..."); self.register_e3_activated().await?; self.register_input_published().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; + info!("Listeners have been setup!"); Ok(()) } pub fn start(&self) -> JoinHandle> { + info!("Starting EnclaveIndexer..."); self.ctx.listener.start() } diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 72614a8689..c308875714 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2312,6 +2312,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json index 9d3e9ce8e9..299c3161e2 100644 --- a/examples/CRISP/packages/crisp-contracts/deployed_contracts.json +++ b/examples/CRISP/packages/crisp-contracts/deployed_contracts.json @@ -124,5 +124,128 @@ "CRISPInputValidator": { "address": "0x764364c0e4C3072A7DF777F4037907fdf2f1A89f" } + }, + "localhost": { + "PoseidonT3": { + "blockNumber": 3, + "address": "0x3333333C0A88F9BE4fd23ed0536F9B6c427e3B93" + }, + "MockUSDC": { + "constructorArgs": { + "initialSupply": "1000000" + }, + "blockNumber": 4, + "address": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" + }, + "EnclaveToken": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" + }, + "blockNumber": 5, + "address": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9" + }, + "EnclaveTicketToken": { + "constructorArgs": { + "baseToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", + "registry": "0x0000000000000000000000000000000000000001", + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" + }, + "blockNumber": 7, + "address": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707" + }, + "SlashingManager": { + "constructorArgs": { + "admin": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "bondingRegistry": "0x0000000000000000000000000000000000000001" + }, + "blockNumber": 8, + "address": "0x0165878A594ca255338adfa4d48449f69242Eb8F" + }, + "BondingRegistry": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "ticketToken": "0x5FC8d32690cc91D4c39d9d3abcBD16989F875707", + "licenseToken": "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9", + "registry": "0x0000000000000000000000000000000000000001", + "slashedFundsTreasury": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "ticketPrice": "10000000", + "licenseRequiredBond": "100000000000000000000", + "minTicketBalance": "1", + "exitDelay": "604800" + }, + "proxyRecords": { + "initData": "0x7333fa82000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000005fc8d32690cc91d4c39d9d3abcbd16989f875707000000000000000000000000cf7ed3acca5a467e9e704c703e8d87f634fb0fc90000000000000000000000000000000000000000000000000000000000000001000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb9226600000000000000000000000000000000000000000000000000000000009896800000000000000000000000000000000000000000000000056bc75e2d6310000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000093a80", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + "proxyAdminAddress": "0x94099942864EA81cCF197E9D71ac53310b1468D8", + "implementationAddress": "0xa513E6E4b8f2a923D98304ec87F64353C4D5C853" + }, + "blockNumber": 8, + "address": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" + }, + "CiphernodeRegistryOwnable": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "enclaveAddress": "0x0000000000000000000000000000000000000001", + "submissionWindow": "10" + }, + "proxyRecords": { + "initData": "0x1794bb3c000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb922660000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000000a", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", + "proxyAdminAddress": "0x6F1216D1BFe15c98520CA1434FC1d9D57AC95321", + "implementationAddress": "0x8A791620dd6260079BF849Dc5567aDC3F2FdC318" + }, + "blockNumber": 11, + "address": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788" + }, + "Enclave": { + "constructorArgs": { + "owner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "registry": "0x610178dA211FEF7D417bC0e6FeD39F05609AD788", + "bondingRegistry": "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6", + "feeToken": "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0", + "maxDuration": "2592000", + "params": [ + "0x000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001" + ] + }, + "proxyRecords": { + "initData": "0xefe0308b000000000000000000000000f39fd6e51aad88f6f4ce6ab8827279cfffb92266000000000000000000000000610178da211fef7d417bc0e6fed39f05609ad7880000000000000000000000002279b7a0a67db372996a5fab50d91eaa73d2ebe60000000000000000000000009fe46736679d2d9a65f0992f2272de9f3c7fa6e00000000000000000000000000000000000000000000000000000000000278d0000000000000000000000000000000000000000000000000000000000000000c00000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000fc00100000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000003fffffff000001", + "initialOwner": "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266", + "proxyAddress": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", + "proxyAdminAddress": "0x1F708C24a0D3A740cD47cC0444E9480899f3dA7D", + "implementationAddress": "0xB7f8BC63BbcaD18155201308C8f3540b07f84F5e" + }, + "blockNumber": 13, + "address": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0" + }, + "MockComputeProvider": { + "blockNumber": 23, + "address": "0x59b670e9fA9D0A427751Af201D676719a970857b" + }, + "MockDecryptionVerifier": { + "blockNumber": 24, + "address": "0x4ed7c70F96B99c776995fB64377f0d4aB3B0e1C1" + }, + "MockE3Program": { + "blockNumber": 25, + "address": "0x322813Fd9A801c5507c9de605d63CEA4f2CE6c44" + }, + "MockRISC0Verifier": { + "address": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F" + }, + "HonkVerifier": { + "address": "0xc5a5C42992dECbae36851359345FE25997F5C42d" + }, + "CRISPProgram": { + "address": "0x67d269191c92Caf3cD7723F116c85e6E9bf55933", + "constructorArgs": { + "enclave": "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0", + "verifierAddress": "0x7a2088a1bFc9d81c55368AE168C2C02570cB814F", + "honkVerifierAddress": "0xc5a5C42992dECbae36851359345FE25997F5C42d", + "imageId": "0x23734b77b0f76e85623a88d7a82f24c34c94834f2501964ea123b7a2027013a2" + } + } } } \ No newline at end of file diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 05dd959c89..3d1352d2a6 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -401,6 +401,7 @@ pub async fn start_indexer( store: impl DataStore, private_key: &str, ) -> Result<()> { + info!("CRISP: Creating indexer..."); // CRISP indexer let crisp_indexer = EnclaveIndexer::new_with_write_contract( ws_url, @@ -410,12 +411,16 @@ pub async fn start_indexer( private_key, ) .await?; + info!("CRISP: Indexer registering handlers..."); + let crisp_indexer = register_e3_requested(crisp_indexer).await?; let crisp_indexer = register_e3_activated(crisp_indexer).await?; let crisp_indexer = register_ciphertext_output_published(crisp_indexer).await?; let crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; let crisp_indexer = register_committee_published(crisp_indexer).await?; - crisp_indexer.start(); + info!("CRISP: Indexer finished registering handlers!"); + crisp_indexer.start(); + info!("Start has been executed!"); Ok(()) } diff --git a/packages/enclave-contracts/artifacts/contracts/interfaces/IBondingRegistry.sol/IBondingRegistry.json b/packages/enclave-contracts/artifacts/contracts/interfaces/IBondingRegistry.sol/IBondingRegistry.json index fad57ed6bf..cd7a86eb69 100644 --- a/packages/enclave-contracts/artifacts/contracts/interfaces/IBondingRegistry.sol/IBondingRegistry.json +++ b/packages/enclave-contracts/artifacts/contracts/interfaces/IBondingRegistry.sol/IBondingRegistry.json @@ -851,5 +851,5 @@ "deployedLinkReferences": {}, "immutableReferences": {}, "inputSourceName": "project/contracts/interfaces/IBondingRegistry.sol", - "buildInfoId": "solc-0_8_28-47e0db0804e3f0de0bf2198d3ad75eb1054de690" + "buildInfoId": "solc-0_8_28-3a44114abab506c12d9565d53dc5305c7fed9813" } \ No newline at end of file diff --git a/packages/enclave-contracts/artifacts/contracts/interfaces/ICiphernodeRegistry.sol/ICiphernodeRegistry.json b/packages/enclave-contracts/artifacts/contracts/interfaces/ICiphernodeRegistry.sol/ICiphernodeRegistry.json index 80bb8bea50..9d7ead7b44 100644 --- a/packages/enclave-contracts/artifacts/contracts/interfaces/ICiphernodeRegistry.sol/ICiphernodeRegistry.json +++ b/packages/enclave-contracts/artifacts/contracts/interfaces/ICiphernodeRegistry.sol/ICiphernodeRegistry.json @@ -535,5 +535,5 @@ "deployedLinkReferences": {}, "immutableReferences": {}, "inputSourceName": "project/contracts/interfaces/ICiphernodeRegistry.sol", - "buildInfoId": "solc-0_8_28-47e0db0804e3f0de0bf2198d3ad75eb1054de690" + "buildInfoId": "solc-0_8_28-3a44114abab506c12d9565d53dc5305c7fed9813" } \ No newline at end of file diff --git a/packages/enclave-contracts/artifacts/contracts/interfaces/IEnclave.sol/IEnclave.json b/packages/enclave-contracts/artifacts/contracts/interfaces/IEnclave.sol/IEnclave.json index b6491bcf45..f5b3cadea5 100644 --- a/packages/enclave-contracts/artifacts/contracts/interfaces/IEnclave.sol/IEnclave.json +++ b/packages/enclave-contracts/artifacts/contracts/interfaces/IEnclave.sol/IEnclave.json @@ -962,5 +962,5 @@ "deployedLinkReferences": {}, "immutableReferences": {}, "inputSourceName": "project/contracts/interfaces/IEnclave.sol", - "buildInfoId": "solc-0_8_28-e5cbacc29cffac21a8890a9017b7aa0e2311220f" + "buildInfoId": "solc-0_8_28-3a44114abab506c12d9565d53dc5305c7fed9813" } \ No newline at end of file From a9f92d21096c0bc49ade6777378f3726d8cf266b Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 16:13:17 +0000 Subject: [PATCH 15/23] make sure listen is being awaited on --- Cargo.lock | 1 + crates/evm-helpers/Cargo.toml | 1 + crates/evm-helpers/src/listener.rs | 14 ++++++++------ crates/indexer/src/indexer.rs | 6 +++--- examples/CRISP/Cargo.lock | 1 + examples/CRISP/server/src/server/indexer.rs | 4 ++-- 6 files changed, 16 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5ee1b5dc6..2a275f63d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,6 +2906,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] diff --git a/crates/evm-helpers/Cargo.toml b/crates/evm-helpers/Cargo.toml index f5a5848237..8470dcf60c 100644 --- a/crates/evm-helpers/Cargo.toml +++ b/crates/evm-helpers/Cargo.toml @@ -14,3 +14,4 @@ futures.workspace = true futures-util.workspace = true once_cell.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index 3a8aaf29cf..68d1ae7e3a 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -16,6 +16,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; use tokio::{sync::RwLock, task::JoinHandle}; +use tracing::info; type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -27,6 +28,12 @@ pub struct EventListener { handlers: Arc>>>, } +impl Drop for EventListener { + fn drop(&mut self) { + info!("Event Listener was DROPPED"); + } +} + impl EventListener { pub fn new(provider: Arc>, filter: Filter) -> Self { Self { @@ -63,7 +70,7 @@ impl EventListener { .push(wrapped_handler); } - async fn listen(&self) -> Result<()> { + pub async fn listen(&self) -> Result<()> { let mut stream = self .provider .subscribe_logs(&self.filter) @@ -89,11 +96,6 @@ impl EventListener { Ok(()) } - pub fn start(&self) -> JoinHandle> { - let this = self.clone(); - tokio::spawn(async move { this.listen().await }) - } - /// Create a contract listener that will listen to events from all addresses. pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index e5133e3b21..36ec70bcf4 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -418,9 +418,9 @@ impl EnclaveIndexer { Ok(()) } - pub fn start(&self) -> JoinHandle> { - info!("Starting EnclaveIndexer..."); - self.ctx.listener.start() + pub async fn listen(&self) -> Result<()> { + info!("Starting EnclaveIndexer listening..."); + self.ctx.listener.listen().await } pub async fn get_e3(&self, e3_id: u64) -> Result { diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index c308875714..c9891247b1 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2298,6 +2298,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 3d1352d2a6..ce137e1af9 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -420,7 +420,7 @@ pub async fn start_indexer( let crisp_indexer = register_committee_published(crisp_indexer).await?; info!("CRISP: Indexer finished registering handlers!"); - crisp_indexer.start(); - info!("Start has been executed!"); + crisp_indexer.listen().await?; + info!("Indexer listen loop has finished!"); Ok(()) } From 8e9d94f77a55b9e14dcbc3495d10e6d0726a4036 Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 17:44:07 +0000 Subject: [PATCH 16/23] update enclave --- examples/CRISP/enclave.config.yaml | 31 +++++++++++------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/examples/CRISP/enclave.config.yaml b/examples/CRISP/enclave.config.yaml index 92bb4b9d16..c79f839aea 100644 --- a/examples/CRISP/enclave.config.yaml +++ b/examples/CRISP/enclave.config.yaml @@ -3,59 +3,50 @@ chains: rpc_url: ws://localhost:8545 contracts: e3_program: - address: "0xc5a5C42992dECbae36851359345FE25997F5C42d" + address: '0x67d269191c92Caf3cD7723F116c85e6E9bf55933' deploy_block: 1 enclave: - address: "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0" + address: '0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0' deploy_block: 13 ciphernode_registry: - address: "0x610178dA211FEF7D417bC0e6FeD39F05609AD788" + address: '0x610178dA211FEF7D417bC0e6FeD39F05609AD788' deploy_block: 11 bonding_registry: - address: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" + address: '0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6' deploy_block: 8 fee_token: - address: "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" + address: '0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0' deploy_block: 4 program: dev: true - # risc0: - # risc0_dev_mode: 0 # 0 = production (Boundless), 1 = dev mode (fake proofs) - # boundless: - # rpc_url: "https://sepolia.infura.io/v3/YOUR_KEY" - # private_key: "PRIVATE_KEY" # Use env vars for secrets - # pinata_jwt: "PINATA_JWT" # For uploading programs - # program_url: "https://gateway.pinata.cloud/ipfs/QmNMRAB7DW43JSmENfzGmD96G6sqaeBBNfTVrrq5WQae3D" # Pre-uploaded program - # onchain: true # true = onchain requests, false = offchain - nodes: cn1: - address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" + address: '0x70997970C51812dc3A010C7d01b50e0d17dc79C8' quic_port: 9201 autonetkey: true autopassword: true cn2: - address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" + address: '0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC' quic_port: 9202 autonetkey: true autopassword: true cn3: - address: "0x90F79bf6EB2c4f870365E785982E1f101E93b906" + address: '0x90F79bf6EB2c4f870365E785982E1f101E93b906' quic_port: 9203 autonetkey: true autopassword: true cn4: - address: "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65" + address: '0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65' quic_port: 9204 autonetkey: true autopassword: true cn5: - address: "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc" + address: '0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc' quic_port: 9205 autonetkey: true autopassword: true ag: - address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" + address: '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266' quic_port: 9206 autonetkey: true autopassword: true From baf7c5f201ce47bd0ddc13720303e6f4977cd49a Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:04:53 +0000 Subject: [PATCH 17/23] update format --- examples/CRISP/packages/crisp-contracts/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/CRISP/packages/crisp-contracts/README.md b/examples/CRISP/packages/crisp-contracts/README.md index 5afed88e19..a3f1160e03 100644 --- a/examples/CRISP/packages/crisp-contracts/README.md +++ b/examples/CRISP/packages/crisp-contracts/README.md @@ -3,7 +3,8 @@ This directory contains the Solidity contracts for CRISP - Coercion-Resistant Impartial Selection Protocol. -Contracts are built and tested with [Hardhat](https://hardhat.org). Tests are defined in the `test` directory. +Contracts are built and tested with [Hardhat](https://hardhat.org). Tests are defined in the `test` +directory. ## Running Tests From 391192a612213d99d9f7bd2385ebd9a04075a619 Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:07:51 +0000 Subject: [PATCH 18/23] revert enclave config --- examples/CRISP/enclave.config.yaml | 31 +++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/examples/CRISP/enclave.config.yaml b/examples/CRISP/enclave.config.yaml index c79f839aea..92bb4b9d16 100644 --- a/examples/CRISP/enclave.config.yaml +++ b/examples/CRISP/enclave.config.yaml @@ -3,50 +3,59 @@ chains: rpc_url: ws://localhost:8545 contracts: e3_program: - address: '0x67d269191c92Caf3cD7723F116c85e6E9bf55933' + address: "0xc5a5C42992dECbae36851359345FE25997F5C42d" deploy_block: 1 enclave: - address: '0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0' + address: "0xA51c1fc2f0D1a1b8494Ed1FE312d7C3a78Ed91C0" deploy_block: 13 ciphernode_registry: - address: '0x610178dA211FEF7D417bC0e6FeD39F05609AD788' + address: "0x610178dA211FEF7D417bC0e6FeD39F05609AD788" deploy_block: 11 bonding_registry: - address: '0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6' + address: "0x2279B7A0a67DB372996a5FaB50D91eAA73d2eBe6" deploy_block: 8 fee_token: - address: '0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0' + address: "0x9fE46736679d2D9a65F0992F2272dE9f3c7fa6e0" deploy_block: 4 program: dev: true + # risc0: + # risc0_dev_mode: 0 # 0 = production (Boundless), 1 = dev mode (fake proofs) + # boundless: + # rpc_url: "https://sepolia.infura.io/v3/YOUR_KEY" + # private_key: "PRIVATE_KEY" # Use env vars for secrets + # pinata_jwt: "PINATA_JWT" # For uploading programs + # program_url: "https://gateway.pinata.cloud/ipfs/QmNMRAB7DW43JSmENfzGmD96G6sqaeBBNfTVrrq5WQae3D" # Pre-uploaded program + # onchain: true # true = onchain requests, false = offchain + nodes: cn1: - address: '0x70997970C51812dc3A010C7d01b50e0d17dc79C8' + address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" quic_port: 9201 autonetkey: true autopassword: true cn2: - address: '0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC' + address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" quic_port: 9202 autonetkey: true autopassword: true cn3: - address: '0x90F79bf6EB2c4f870365E785982E1f101E93b906' + address: "0x90F79bf6EB2c4f870365E785982E1f101E93b906" quic_port: 9203 autonetkey: true autopassword: true cn4: - address: '0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65' + address: "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65" quic_port: 9204 autonetkey: true autopassword: true cn5: - address: '0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc' + address: "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc" quic_port: 9205 autonetkey: true autopassword: true ag: - address: '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266' + address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" quic_port: 9206 autonetkey: true autopassword: true From 20f05d1e939b60e1ed1e1973e51da380c8e67293 Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:23:11 +0000 Subject: [PATCH 19/23] fix indexer test --- crates/indexer/tests/integration.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index d8b88f519f..1cf46b2982 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -48,11 +48,13 @@ async fn test_indexer() -> Result<()> { _anvil, ) = setup_two_contracts().await?; - let indexer = EnclaveIndexer::::from_endpoint_address_in_mem( - &endpoint.to_string(), - &[&enclave_address.to_string(), &emit_logs_address.to_string()], - ) - .await?; + let indexer = Arc::new( + EnclaveIndexer::::from_endpoint_address_in_mem( + &endpoint.to_string(), + &[&enclave_address.to_string(), &emit_logs_address.to_string()], + ) + .await?, + ); // Track InputPublished event count in store indexer @@ -82,7 +84,8 @@ async fn test_indexer() -> Result<()> { }) .await; - let _ = indexer.start(); + let indexer_listening = indexer.clone(); + let _ = tokio::spawn(async move { indexer_listening.listen().await }); let public_key = vec![1, 2, 3, 4, 5, 6, 7, 8, 9]; let input_data = "Random data that wont actually be a string".to_string(); From fca878da36b9c26d789101787f7e730f7ee1342d Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:32:26 +0000 Subject: [PATCH 20/23] fix names --- crates/evm-helpers/src/listener.rs | 8 +------- crates/indexer/src/indexer.rs | 19 ++++++++----------- 2 files changed, 9 insertions(+), 18 deletions(-) diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index 68d1ae7e3a..76e2d97d9e 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -15,7 +15,7 @@ use eyre::Result; use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; -use tokio::{sync::RwLock, task::JoinHandle}; +use tokio::sync::RwLock; use tracing::info; type EventHandler = @@ -28,12 +28,6 @@ pub struct EventListener { handlers: Arc>>>, } -impl Drop for EventListener { - fn drop(&mut self) { - info!("Event Listener was DROPPED"); - } -} - impl EventListener { pub fn new(provider: Arc>, filter: Filter) -> Self { Self { diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index aee67a9145..e17b755133 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -197,26 +197,23 @@ impl EnclaveIndexer { impl EnclaveIndexer { /// Creates an `EnclaveIndexer` with an in-memory store. /// - /// Note: `contract_addresses[0]` must be the enclave contract address. - pub async fn from_endpoint_address_in_mem( - ws_url: &str, - contract_addresses: &[&str], - ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, contract_addresses).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_addresses[0]).await?; + /// Note: `addresses[0]` must be the enclave contract address. + pub async fn from_endpoint_address_in_mem(ws_url: &str, addresses: &[&str]) -> Result { + let listener = EventListener::create_contract_listener(ws_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; EnclaveIndexer::::new_with_in_mem_store(listener, contract).await } /// Creates an `EnclaveIndexer` with a provided in-memory store. /// - /// Note: `contract_addresses[0]` must be the enclave contract address. + /// Note: `addresses[0]` must be the enclave contract address. pub async fn from_endpoint_address( ws_url: &str, - contract_addresses: &[&str], + addresses: &[&str], store: InMemoryStore, ) -> Result { - let listener = EventListener::create_contract_listener(ws_url, contract_addresses).await?; - let contract = EnclaveContractFactory::create_read(ws_url, contract_addresses[0]).await?; + let listener = EventListener::create_contract_listener(ws_url, addresses).await?; + let contract = EnclaveContractFactory::create_read(ws_url, addresses[0]).await?; EnclaveIndexer::new(listener, contract, store).await } } From 531df2514d74b382ff15deffd45069a48f3697f0 Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:33:38 +0000 Subject: [PATCH 21/23] fix names --- crates/indexer/tests/integration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 1cf46b2982..05cafa0509 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -199,7 +199,7 @@ async fn test_indexer() -> Result<()> { Ok(()) } -mod memory_leak { +mod test_memory_leak { use e3_evm_helpers::{contracts::EnclaveContractFactory, listener::EventListener}; From 810140c847a059e38cd2726ec7e09d16834f567c Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 18:45:09 +0000 Subject: [PATCH 22/23] update integration --- crates/evm-helpers/tests/integration.rs | 35 ++++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/crates/evm-helpers/tests/integration.rs b/crates/evm-helpers/tests/integration.rs index 9b04bc3817..34077de664 100644 --- a/crates/evm-helpers/tests/integration.rs +++ b/crates/evm-helpers/tests/integration.rs @@ -9,7 +9,10 @@ use alloy::sol; use e3_evm_helpers::listener::EventListener; use eyre::Result; use helpers::setup_logs_contract; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use tokio::time::sleep; sol!( @@ -25,11 +28,13 @@ async fn test_event_listener() -> Result<()> { let (tx, mut rx) = tokio::sync::mpsc::channel::(10); let (tx_addr, mut rx_addr) = tokio::sync::mpsc::channel::(10); - let event_listener = EventListener::create_contract_listener( - &anvil.ws_endpoint(), - &[&contract.address().to_string()], - ) - .await?; + let event_listener = Arc::new( + EventListener::create_contract_listener( + &anvil.ws_endpoint(), + &[&contract.address().to_string()], + ) + .await?, + ); event_listener .add_event_handler(move |event: EmitLogs::ValueChanged| { @@ -51,7 +56,8 @@ async fn test_event_listener() -> Result<()> { }) .await; - event_listener.start(); + let spawn_event_listener = event_listener.clone(); + let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); contract .setValue("hello".to_string()) @@ -104,11 +110,13 @@ async fn test_overlapping_listener_handlers() -> Result<()> { let (contract, _, _, anvil) = setup_logs_contract().await?; let (tx, mut rx) = tokio::sync::mpsc::channel::(10); - let event_listener = EventListener::create_contract_listener( - &anvil.ws_endpoint(), - &[&contract.address().to_string()], - ) - .await?; + let event_listener = Arc::new( + EventListener::create_contract_listener( + &anvil.ws_endpoint(), + &[&contract.address().to_string()], + ) + .await?, + ); let tx1 = tx.clone(); event_listener @@ -140,7 +148,8 @@ async fn test_overlapping_listener_handlers() -> Result<()> { }) .await; - event_listener.start(); + let spawn_event_listener = event_listener.clone(); + let _ = tokio::spawn(async move { spawn_event_listener.listen().await }); // Events should be returned roughly in this order: // 0ms : one From 9b77dde672eddd4e81e695d3db66c745a0f1f6a5 Mon Sep 17 00:00:00 2001 From: ryardley Date: Tue, 25 Nov 2025 19:40:00 +0000 Subject: [PATCH 23/23] remove rendant part of test --- crates/indexer/tests/integration.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/crates/indexer/tests/integration.rs b/crates/indexer/tests/integration.rs index 05cafa0509..dc63325587 100644 --- a/crates/indexer/tests/integration.rs +++ b/crates/indexer/tests/integration.rs @@ -159,21 +159,6 @@ async fn test_indexer() -> Result<()> { ); drop(messages_from_second_contract); - let e3_state = indexer.get_e3(E3_ID).await?; - let expected_input_count = 3; - - assert_eq!( - e3_state.ciphertext_inputs.len(), - expected_input_count as usize - ); - - let expected_inputs = vec![ - (input_data_bytes.to_vec(), 1), - (input_data_bytes.to_vec(), 2), - (input_data_bytes.to_vec(), 3), - ]; - assert_eq!(e3_state.ciphertext_inputs, expected_inputs); - enclave_contract .emitCiphertextOutputPublished( Uint::from(E3_ID), @@ -194,7 +179,7 @@ async fn test_indexer() -> Result<()> { let store = indexer.get_store(); let total_inputs_processed = store.get::("input_count").await?.unwrap(); - assert_eq!(total_inputs_processed, expected_input_count); + assert_eq!(total_inputs_processed, 3); Ok(()) }