From f00a82c60daec055748f97de16ed143095817b12 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 18:25:01 +0000 Subject: [PATCH 01/19] add timestamp driven callbacks to indexer --- Cargo.lock | 1 + crates/aggregator/src/committee_finalizer.rs | 1 + crates/evm-helpers/Cargo.toml | 1 + crates/evm-helpers/src/lib.rs | 1 + crates/evm-helpers/src/listener.rs | 92 +++++++++- crates/evm-helpers/src/threshold_queue.rs | 121 +++++++++++++ crates/evm/src/event_reader.rs | 5 + crates/indexer/src/delayed_handlers.rs | 170 +++++++++++++++++++ crates/indexer/src/indexer.rs | 35 +++- crates/indexer/src/lib.rs | 1 + crates/wasm/init_node.cjs | 2 +- examples/CRISP/server/src/server/indexer.rs | 10 ++ package.json | 6 +- packages/enclave-sdk/src/enclave-sdk.ts | 30 ---- packages/enclave-sdk/src/types.ts | 10 +- templates/default/server/index.ts | 6 + 16 files changed, 449 insertions(+), 43 deletions(-) create mode 100644 crates/evm-helpers/src/threshold_queue.rs create mode 100644 crates/indexer/src/delayed_handlers.rs diff --git a/Cargo.lock b/Cargo.lock index fc10fc725e..e88bd0660b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,6 +2906,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] diff --git a/crates/aggregator/src/committee_finalizer.rs b/crates/aggregator/src/committee_finalizer.rs index 587c8311ea..dc2b582d3c 100644 --- a/crates/aggregator/src/committee_finalizer.rs +++ b/crates/aggregator/src/committee_finalizer.rs @@ -107,6 +107,7 @@ impl Handler for CommitteeFinalizer { let bus = act.bus.clone(); let e3_id_clone = e3_id_for_async.clone(); + // XXX: refactor to use blockchain time let handle = ctx.run_later( Duration::from_secs(seconds_until_deadline), move |act, _ctx| { diff --git a/crates/evm-helpers/Cargo.toml b/crates/evm-helpers/Cargo.toml index f5a5848237..8470dcf60c 100644 --- a/crates/evm-helpers/Cargo.toml +++ b/crates/evm-helpers/Cargo.toml @@ -14,3 +14,4 @@ futures.workspace = true futures-util.workspace = true once_cell.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/evm-helpers/src/lib.rs b/crates/evm-helpers/src/lib.rs index da02ad0519..7c343cc998 100644 --- a/crates/evm-helpers/src/lib.rs +++ b/crates/evm-helpers/src/lib.rs @@ -7,3 +7,4 @@ pub mod contracts; pub mod events; pub mod listener; +pub mod threshold_queue; diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index ccb00b1620..f703bd57b1 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -5,6 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use alloy::{ + consensus::Header, network::Ethereum, primitives::{Address, B256}, providers::{Provider, ProviderBuilder}, @@ -14,17 +15,23 @@ use alloy::{ use eyre::Result; use futures::stream::StreamExt; use futures_util::future::FutureExt; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; -use tokio::{sync::RwLock, task::JoinHandle}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; +use tokio::{sync::RwLock, time::sleep}; +use tracing::error; type EventHandler = Box Pin> + Send>> + Send + Sync>; +type BlockHandler = + Box Pin> + Send>> + Send + Sync>; + #[derive(Clone)] +/// Listens for contract events pub struct EventListener { provider: Arc>, filter: Filter, handlers: Arc>>>, + block_handlers: Arc>>, } impl EventListener { @@ -33,6 +40,7 @@ impl EventListener { provider, filter, handlers: Arc::new(RwLock::new(HashMap::new())), + block_handlers: Arc::new(RwLock::new(Vec::new())), } } @@ -63,7 +71,18 @@ impl EventListener { .push(wrapped_handler); } - async fn listen(&self) -> Result<()> { + pub async fn add_block_handler(&mut self, handler: F) + where + F: Fn(&Header) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.block_handlers + .write() + .await + .push(Box::new(move |h: &Header| Box::pin(handler(h)))); + } + + async fn listen_once(&self) -> Result<()> { let mut stream = self .provider .subscribe_logs(&self.filter) @@ -89,9 +108,54 @@ impl EventListener { Ok(()) } - pub fn start(&self) -> JoinHandle> { + async fn block_listen_once(&self) -> Result<()> { + let mut stream = self.provider.subscribe_blocks().await?.into_stream(); + while let Some(block) = stream.next().await { + let handlers = self.block_handlers.read().await; + for handler in handlers.iter() { + let fut = handler(&block); + tokio::spawn(async move { + if let Err(e) = fut.await { + eprintln!("Error processing block: {:?}", e); + } + }); + } + } + Ok(()) + } + + fn start_block_listen_loop(&self) { + let this = self.clone(); + tokio::spawn(async move { this.retry_loop(|| this.block_listen_once()).await }); + } + + fn start_listen_loop(&self) { let this = self.clone(); - tokio::spawn(async move { this.listen().await }) + tokio::spawn(async move { this.retry_loop(|| this.listen_once()).await }); + } + + async fn retry_loop(&self, mut operation: F) + where + F: FnMut() -> Fut, + Fut: Future>, + E: std::fmt::Display, + { + loop { + match operation().await { + Ok(_) => { + sleep(Duration::from_secs(1)).await; + } + Err(e) => { + error!("Error occurred: {}. Retrying in 5 seconds...", e); + sleep(Duration::from_secs(5)).await; + } + } + } + } + + pub fn start(&self) { + self.start_listen_loop(); + self.start_block_listen_loop(); } pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result { @@ -103,3 +167,21 @@ impl EventListener { Ok(EventListener::new(provider, filter)) } } + +async fn retry_with_backoff(mut f: F) +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + loop { + match f().await { + Ok(_) => { + sleep(Duration::from_secs(1)).await; + } + Err(e) => { + error!("Error occurred: {}. Retrying in 5 seconds...", e); + sleep(Duration::from_secs(5)).await; + } + } + } +} diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs new file mode 100644 index 0000000000..728c43db62 --- /dev/null +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::{ + cmp::Reverse, + collections::BinaryHeap, + sync::{Arc, RwLock}, +}; + +/// An implementation of a ThresholdQueue +#[derive(Clone)] +pub struct ThresholdQueue { + inner: Arc>>>, +} + +pub trait ThresholdItem { + type Item; + fn within_threshold(&self, threshold: u64) -> bool; + fn item(&self) -> Self::Item; +} + +impl ThresholdQueue +where + T: Ord + ThresholdItem, +{ + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(BinaryHeap::new())), + } + } + + pub fn insert(&self, item: T) { + self.inner + .write() + .expect("Poisoned write in ThresholdQueue") + .push(Reverse(item)); + } + + pub fn take_until_including(&self, threshold: u64) -> Vec { + let mut found = Vec::new(); + let mut inner = self + .inner + .write() + .expect("Poisoned write in ThresholdQueue"); + + while let Some(Reverse(item)) = inner.peek() { + if item.within_threshold(threshold) { + if let Some(Reverse(item)) = inner.pop() { + found.push(item.item()); + } + } else { + break; + } + } + + found + } +} + +#[cfg(test)] +mod tests { + use super::{ThresholdItem, ThresholdQueue}; + + struct ThreshItem { + val: u64, + rank: u64, + } + + impl Eq for ThreshItem {} + + impl PartialEq for ThreshItem { + fn eq(&self, other: &Self) -> bool { + self.rank == other.rank + } + } + + impl PartialOrd for ThreshItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for ThreshItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.rank.cmp(&other.rank) + } + } + + impl ThresholdItem for ThreshItem { + type Item = u64; + fn item(&self) -> Self::Item { + self.val + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.rank <= threshold + } + } + + #[test] + fn test_collection_is_ordered() { + let queue = ThresholdQueue::new(); + queue.insert(ThreshItem { val: 111, rank: 25 }); + queue.insert(ThreshItem { + val: 666, + rank: 100, + }); + queue.insert(ThreshItem { val: 444, rank: 70 }); + queue.insert(ThreshItem { val: 222, rank: 26 }); + let items = queue.take_until_including(70); + + assert_eq!(items, vec![111, 222, 444]); + + let items = queue.take_until_including(101); + + assert_eq!(items, vec![666]); + } +} diff --git a/crates/evm/src/event_reader.rs b/crates/evm/src/event_reader.rs index b206b1568e..d3c0f18014 100644 --- a/crates/evm/src/event_reader.rs +++ b/crates/evm/src/event_reader.rs @@ -33,6 +33,9 @@ pub enum EnclaveEvmEvent { event: EnclaveEvent, block: Option, }, + // Log { + // data,topic, chain_id, address + // } } impl EnclaveEvmEvent { @@ -182,6 +185,8 @@ impl Actor for EvmEventReader

{ } } +// TODO: +// - extract and combine all filters #[instrument(name = "evm_event_reader", skip_all)] async fn stream_from_evm( provider: EthProvider

, diff --git a/crates/indexer/src/delayed_handlers.rs b/crates/indexer/src/delayed_handlers.rs new file mode 100644 index 0000000000..fb0e4f4b8d --- /dev/null +++ b/crates/indexer/src/delayed_handlers.rs @@ -0,0 +1,170 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use e3_evm_helpers::threshold_queue::{ThresholdItem, ThresholdQueue}; +use eyre::Result; +use std::{future::Future, pin::Pin, sync::Arc}; + +type AsyncCallback = + Arc Pin> + Send>> + Send + Sync>; + +#[derive(Clone)] +pub struct TimedHandler { + time: u64, + handler: AsyncCallback, +} + +impl ThresholdItem for TimedHandler { + type Item = AsyncCallback; + + fn item(&self) -> Self::Item { + self.handler.clone() + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.time <= threshold + } +} + +impl Eq for TimedHandler {} + +impl PartialEq for TimedHandler { + fn eq(&self, other: &Self) -> bool { + self.time == other.time + } +} + +impl PartialOrd for TimedHandler { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedHandler { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.time.cmp(&other.time) + } +} + +#[derive(Clone)] +pub struct CallbackQueue { + queue: ThresholdQueue, +} + +impl CallbackQueue { + pub fn new() -> Self { + Self { + queue: ThresholdQueue::new(), + } + } + + pub fn dispatch_later(&mut self, when: u64, handler: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.queue.insert(TimedHandler { + time: when, + handler: Arc::new(move || Box::pin(handler())), + }) + } + + pub async fn execute_until_including(&self, when: u64) -> Result<()> { + let handlers = self.queue.take_until_including(when); + for handler in handlers { + handler().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + #[tokio::test] + async fn test_single_callback_executes() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.dispatch_later(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert!(*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_callback_not_executed_before_threshold() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.dispatch_later(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(50).await.unwrap(); + assert!(!*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_multiple_callbacks_execute() { + let mut queue = CallbackQueue::new(); + let counter = Arc::new(Mutex::new(0)); + + let c1 = counter.clone(); + queue.dispatch_later(50, move || { + let c = c1.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c2 = counter.clone(); + queue.dispatch_later(100, move || { + let c = c2.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c3 = counter.clone(); + queue.dispatch_later(150, move || { + let c = c3.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert_eq!(*counter.lock().unwrap(), 2); + } + + #[tokio::test] + async fn test_error_propagation() { + let mut queue = CallbackQueue::new(); + + queue.dispatch_later(100, || async { Err(eyre::eyre!("test error")) }); + + let result = queue.execute_until_including(100).await; + assert!(result.is_err()); + } +} diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 4d6c8a8c9b..0c76f6efba 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -4,19 +4,21 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use crate::delayed_handlers::CallbackQueue; use crate::E3Repository; use super::{models::E3, DataStore}; -use alloy::hex; use alloy::primitives::Uint; use alloy::providers::Provider; use alloy::sol_types::SolEvent; +use alloy::{consensus::BlockHeader, hex}; use async_trait::async_trait; use e3_evm_helpers::{ contracts::{EnclaveContract, EnclaveContractFactory, EnclaveRead, ReadOnly}, events::{CiphertextOutputPublished, E3Activated, InputPublished, PlaintextOutputPublished}, listener::EventListener, }; +// TODO: Remove eyre in favour of thiserror use eyre::eyre; use eyre::Result; use serde::{de::DeserializeOwned, Serialize}; @@ -24,7 +26,6 @@ use std::future::Future; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; -use tokio::task::JoinHandle; type E3Id = u64; @@ -143,8 +144,10 @@ impl DataStore for SharedStore { } #[derive(Clone)] +/// Stores E3 event data on store for easy querying. pub struct EnclaveIndexer { listener: EventListener, + callbacks: CallbackQueue, contract: EnclaveContract, store: Arc>, contract_address: String, @@ -182,6 +185,7 @@ impl EnclaveIndexer { let mut instance = Self { store: Arc::new(RwLock::new(store)), contract, + callbacks: CallbackQueue::new(), listener, contract_address, chain_id, @@ -217,6 +221,15 @@ impl EnclaveIndexer { .await; } + /// Register a callback for execution after the given timestap as returned by the chain. + pub fn dispatch_later(&mut self, when: u64, handler: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.callbacks.dispatch_later(when, handler); + } + async fn register_e3_activated(&mut self) -> Result<()> { let db = self.store.clone(); let contract = self.contract.clone(); @@ -347,15 +360,31 @@ impl EnclaveIndexer { Ok(()) } + async fn register_input_window_expired(&mut self) -> Result<()> { + let callbacks = self.callbacks.clone(); + self.listener + .add_block_handler(move |block| { + let timestamp = block.timestamp(); + let callbacks = callbacks.clone(); + async move { + callbacks.execute_until_including(timestamp).await?; + Ok(()) + } + }) + .await; + Ok(()) + } + async fn setup_listeners(&mut self) -> Result<()> { self.register_e3_activated().await?; self.register_input_published().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; + self.register_input_window_expired().await?; Ok(()) } - pub fn start(&self) -> JoinHandle> { + pub fn start(&self) { self.listener.start() } diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index b42c0bff8c..e47134bfd4 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -4,6 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +pub mod delayed_handlers; mod indexer; pub mod models; mod repo; diff --git a/crates/wasm/init_node.cjs b/crates/wasm/init_node.cjs index 42be6ce9b1..e5fe3553bf 100644 --- a/crates/wasm/init_node.cjs +++ b/crates/wasm/init_node.cjs @@ -6,4 +6,4 @@ module.exports = async function initializeWasm() { // Node does not need to be loaded async -} \ No newline at end of file +} diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 2e029b5f55..4211337b41 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -198,6 +198,9 @@ pub async fn register_e3_activated( .await?; // Calculate expiration time to sleep until + // XXX: This should not happen like this. During E3Activated the EnclaveIndexer + // should calculate the expired timestamp and listen for each block. When a block + // is broadcast that has the expected timestamp we then trigger an event let now = get_current_timestamp_rpc().await?; info!("[e3_id={}] Current time before sleep: {}", e3_id, now); let wait_duration = if expiration > now { @@ -211,6 +214,12 @@ pub async fn register_e3_activated( info!("[e3_id={}] Expired E3", e3_id); Duration::ZERO }; + + /* + * indexer.dispatch_after(expiration, move async |block| { + * // rest of the computation + * }) + */ if !wait_duration.is_zero() { sleep(wait_duration).await; } @@ -367,6 +376,7 @@ pub async fn register_committee_published( info!("[e3_id={}] Wait duration: {:?}", event.e3Id, wait_duration); // Sleep until start time + // XXX: refactor to use blocktime if !wait_duration.is_zero() { sleep(wait_duration).await; } diff --git a/package.json b/package.json index 98932aee3c..4102eaec97 100644 --- a/package.json +++ b/package.json @@ -12,9 +12,9 @@ "bump:versions": "tsx scripts/bump-versions.ts", "clean": "tsx scripts/clean.ts", "compile": "pnpm build:ts && pnpm rust:build", - "lint": "eslint . && pnpm evm:lint && pnpm rust:lint && pnpm noir:lint", - "format": "prettier --write \"**/*.{js,json,md,mdx,ts,tsx,yml,yaml,css}\"", - "format:check": "prettier --check \"**/*.{js,json,md,mdx,ts,tsx,yml,yaml,css}\"", + "lint": "pnpm eslint . && pnpm evm:lint && pnpm rust:lint && pnpm noir:lint", + "format": "pnpm prettier --write \"**/*.{js,json,md,mdx,ts,tsx,yml,yaml,css}\"", + "format:check": "pnpm prettier --check \"**/*.{js,json,md,mdx,ts,tsx,yml,yaml,css}\"", "check:license": "./scripts/check-license-headers.sh", "check:size": "./scripts/check-size.sh", "check:pnpm": "./scripts/check-pnpm.sh", diff --git a/packages/enclave-sdk/src/enclave-sdk.ts b/packages/enclave-sdk/src/enclave-sdk.ts index 1b1089670c..754b814cde 100644 --- a/packages/enclave-sdk/src/enclave-sdk.ts +++ b/packages/enclave-sdk/src/enclave-sdk.ts @@ -450,36 +450,6 @@ export class EnclaveSDK { this.eventListener.cleanup() } - /** - * Update SDK configuration - */ - // TODO: We should delete this as we don't want a stateful client. - public updateConfig(newConfig: Partial): void { - if (newConfig.publicClient) { - this.config.publicClient = newConfig.publicClient - this.eventListener = new EventListener(newConfig.publicClient) - } - - if (newConfig.walletClient) { - this.config.walletClient = newConfig.walletClient - } - - if (newConfig.contracts) { - this.config.contracts = { - ...this.config.contracts, - ...newConfig.contracts, - } - } - - if (newConfig.chainId) { - this.config.chainId = newConfig.chainId - } - - this.contractClient = new ContractClient(this.config.publicClient, this.config.walletClient, this.config.contracts) - - this.initialized = false - } - public static create(options: { rpcUrl: string contracts: { diff --git a/packages/enclave-sdk/src/types.ts b/packages/enclave-sdk/src/types.ts index b424a1ed5b..9b380fabec 100644 --- a/packages/enclave-sdk/src/types.ts +++ b/packages/enclave-sdk/src/types.ts @@ -74,6 +74,7 @@ export interface ContractInstances { } // Unified Event System +// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. export enum EnclaveEventType { // E3 Lifecycle Events E3_REQUESTED = 'E3Requested', @@ -100,6 +101,7 @@ export enum EnclaveEventType { INITIALIZED = 'Initialized', } +// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. export enum RegistryEventType { // Committee Management COMMITTEE_REQUESTED = 'CommitteeRequested', @@ -114,8 +116,14 @@ export enum RegistryEventType { INITIALIZED = 'Initialized', } +// Events that don't come from the contracts but are part of the protocol +// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. +export enum ArtificialEventType { + E3_INPUT_PUBLISH_WINDOW_EXPIRED = 'E3InputPublishWindowExpired', +} + // Union type for all events -export type AllEventTypes = EnclaveEventType | RegistryEventType +export type AllEventTypes = EnclaveEventType | RegistryEventType | ArtificialEventType // Event data interfaces based on TypeChain types export interface E3 { diff --git a/templates/default/server/index.ts b/templates/default/server/index.ts index ce9eaca08c..51900cbf47 100644 --- a/templates/default/server/index.ts +++ b/templates/default/server/index.ts @@ -143,6 +143,7 @@ async function handleE3ActivatedEvent(event: any) { def.resolve() } + // XXX: This needs to be based off blocktime and driven by an event from the sdk const currentTime = BigInt(Math.floor(Date.now() / 1000)) const sleepSeconds = expiration > currentTime ? Number(expiration - currentTime) : 0 @@ -157,6 +158,10 @@ async function handleE3ActivatedEvent(event: any) { } } +async function handleE3InputWindowExpired(event: any) { + /// +} + async function handleInputPublishedEvent(event: any) { const data = event.data as InputPublishedData const e3Id = data.e3Id @@ -188,6 +193,7 @@ async function setupEventListeners() { sdk.onEnclaveEvent(EnclaveEventType.E3_ACTIVATED, handleE3ActivatedEvent) sdk.onEnclaveEvent(EnclaveEventType.INPUT_PUBLISHED, handleInputPublishedEvent) + // sdk.onEnclaveEvent(EnclaveEventType.E3_INPUT_WINDOW_EXPIRED, handleE3InputWindowExpired); console.log('✅ Event listeners set up successfully') } From f6677f794e06924f7392c261b6a2f8cde1f0e94b Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 18:29:01 +0000 Subject: [PATCH 02/19] update ts file --- templates/default/server/index.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/templates/default/server/index.ts b/templates/default/server/index.ts index 51900cbf47..5cf065a01e 100644 --- a/templates/default/server/index.ts +++ b/templates/default/server/index.ts @@ -84,8 +84,8 @@ async function runProgram(e3Id: bigint): Promise { } function defer() { - let resolve: () => void = () => {} - let reject: (e?: any) => void = () => {} + let resolve: () => void = () => { } + let reject: (e?: any) => void = () => { } const promise = new Promise((res, rej) => { resolve = res @@ -158,9 +158,9 @@ async function handleE3ActivatedEvent(event: any) { } } -async function handleE3InputWindowExpired(event: any) { - /// -} +// XXX: handle function +// async function handleE3InputWindowExpired(event: any) { +// } async function handleInputPublishedEvent(event: any) { const data = event.data as InputPublishedData From b59f4b084df00084a34db6cd9886fd3d10645800 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 18:42:35 +0000 Subject: [PATCH 03/19] update ts file --- templates/default/server/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/templates/default/server/index.ts b/templates/default/server/index.ts index 5cf065a01e..ed63c7056f 100644 --- a/templates/default/server/index.ts +++ b/templates/default/server/index.ts @@ -84,8 +84,8 @@ async function runProgram(e3Id: bigint): Promise { } function defer() { - let resolve: () => void = () => { } - let reject: (e?: any) => void = () => { } + let resolve: () => void = () => {} + let reject: (e?: any) => void = () => {} const promise = new Promise((res, rej) => { resolve = res From c2b11e2ae0b3549f4031814f6b703ffcc52fbb82 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 19:28:45 +0000 Subject: [PATCH 04/19] update dispatch to blocktime --- crates/evm-helpers/src/threshold_queue.rs | 5 +- crates/indexer/src/delayed_handlers.rs | 25 +++-- crates/indexer/src/indexer.rs | 35 ++++-- examples/CRISP/Cargo.lock | 1 + examples/CRISP/server/src/server/indexer.rs | 118 ++++++++------------ 5 files changed, 91 insertions(+), 93 deletions(-) diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs index 728c43db62..021203e79a 100644 --- a/crates/evm-helpers/src/threshold_queue.rs +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -16,7 +16,8 @@ pub struct ThresholdQueue { inner: Arc>>>, } -pub trait ThresholdItem { +/// An item that can be added to a threshold queue +pub trait ThresholdItem: Ord { type Item; fn within_threshold(&self, threshold: u64) -> bool; fn item(&self) -> Self::Item; @@ -24,7 +25,7 @@ pub trait ThresholdItem { impl ThresholdQueue where - T: Ord + ThresholdItem, + T: ThresholdItem, { pub fn new() -> Self { Self { diff --git a/crates/indexer/src/delayed_handlers.rs b/crates/indexer/src/delayed_handlers.rs index fb0e4f4b8d..25bad16490 100644 --- a/crates/indexer/src/delayed_handlers.rs +++ b/crates/indexer/src/delayed_handlers.rs @@ -12,6 +12,7 @@ type AsyncCallback = Arc Pin> + Send>> + Send + Sync>; #[derive(Clone)] +/// A callback that has an execute time associated with it pub struct TimedHandler { time: u64, handler: AsyncCallback, @@ -50,30 +51,34 @@ impl Ord for TimedHandler { } #[derive(Clone)] +/// A queue of callbacks that can be executed when a given timestamp has been passed pub struct CallbackQueue { queue: ThresholdQueue, } impl CallbackQueue { + /// Create a new queue pub fn new() -> Self { Self { queue: ThresholdQueue::new(), } } - pub fn dispatch_later(&mut self, when: u64, handler: F) + /// Push a handler to the queue to be executed at or before the given time. + pub fn push(&mut self, time: u64, handler: F) where F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { self.queue.insert(TimedHandler { - time: when, + time, handler: Arc::new(move || Box::pin(handler())), }) } - pub async fn execute_until_including(&self, when: u64) -> Result<()> { - let handlers = self.queue.take_until_including(when); + /// Execute all pending callbacks up to and including the given time + pub async fn execute_until_including(&self, time: u64) -> Result<()> { + let handlers = self.queue.take_until_including(time); for handler in handlers { handler().await?; } @@ -92,7 +97,7 @@ mod tests { let called = Arc::new(Mutex::new(false)); let called_clone = called.clone(); - queue.dispatch_later(100, move || { + queue.push(100, move || { let called = called_clone.clone(); async move { *called.lock().unwrap() = true; @@ -110,7 +115,7 @@ mod tests { let called = Arc::new(Mutex::new(false)); let called_clone = called.clone(); - queue.dispatch_later(100, move || { + queue.push(100, move || { let called = called_clone.clone(); async move { *called.lock().unwrap() = true; @@ -128,7 +133,7 @@ mod tests { let counter = Arc::new(Mutex::new(0)); let c1 = counter.clone(); - queue.dispatch_later(50, move || { + queue.push(50, move || { let c = c1.clone(); async move { *c.lock().unwrap() += 1; @@ -137,7 +142,7 @@ mod tests { }); let c2 = counter.clone(); - queue.dispatch_later(100, move || { + queue.push(100, move || { let c = c2.clone(); async move { *c.lock().unwrap() += 1; @@ -146,7 +151,7 @@ mod tests { }); let c3 = counter.clone(); - queue.dispatch_later(150, move || { + queue.push(150, move || { let c = c3.clone(); async move { *c.lock().unwrap() += 1; @@ -162,7 +167,7 @@ mod tests { async fn test_error_propagation() { let mut queue = CallbackQueue::new(); - queue.dispatch_later(100, || async { Err(eyre::eyre!("test error")) }); + queue.push(100, || async { Err(eyre::eyre!("test error")) }); let result = queue.execute_until_including(100).await; assert!(result.is_err()); diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 0c76f6efba..abfdbd6967 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -143,9 +143,8 @@ impl DataStore for SharedStore { } } -#[derive(Clone)] /// Stores E3 event data on store for easy querying. -pub struct EnclaveIndexer { +pub struct EnclaveIndexer { listener: EventListener, callbacks: CallbackQueue, contract: EnclaveContract, @@ -154,6 +153,19 @@ pub struct EnclaveIndexer { chain_id: u64, } +impl Clone for EnclaveIndexer { + fn clone(&self) -> Self { + Self { + listener: self.listener.clone(), + callbacks: self.callbacks.clone(), + contract: self.contract.clone(), + store: self.store.clone(), + contract_address: self.contract_address.clone(), + chain_id: self.chain_id, + } + } +} + impl EnclaveIndexer { pub async fn new_with_in_mem_store( listener: EventListener, @@ -222,12 +234,19 @@ impl EnclaveIndexer { } /// Register a callback for execution after the given timestap as returned by the chain. - pub fn dispatch_later(&mut self, when: u64, handler: F) + pub fn dispatch_after_timestamp(&mut self, when: u64, handler: F) where - F: Fn() -> Fut + Send + Sync + 'static, + F: Fn(SharedStore) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - self.callbacks.dispatch_later(when, handler); + let store = SharedStore::new(self.store.clone()); + let handler = Arc::new(handler); + + self.callbacks.push(when, move || { + let handler = Arc::clone(&handler); + let store = store.clone(); + handler(store) + }); } async fn register_e3_activated(&mut self) -> Result<()> { @@ -240,7 +259,6 @@ impl EnclaveIndexer { let db = SharedStore::new(db.clone()); let enclave_address = enclave_address.clone(); let contract = contract.clone(); - async move { println!( "E3Activated: id={}, expiration={}, pubkey=0x{}...", @@ -258,6 +276,7 @@ impl EnclaveIndexer { u64_try_from(e3.startWindow[0])?, u64_try_from(e3.startWindow[1])?, ]; + // NOTE: we are only saving protocol specific info // here and not CRISP specific info so E3 corresponds to the solidity E3 let e3_obj = E3 { @@ -360,7 +379,7 @@ impl EnclaveIndexer { Ok(()) } - async fn register_input_window_expired(&mut self) -> Result<()> { + async fn register_blocktime_callbacks(&mut self) -> Result<()> { let callbacks = self.callbacks.clone(); self.listener .add_block_handler(move |block| { @@ -380,7 +399,7 @@ impl EnclaveIndexer { self.register_input_published().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; - self.register_input_window_expired().await?; + self.register_blocktime_callbacks().await?; Ok(()) } diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 72614a8689..6b2495b53b 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2298,6 +2298,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 4211337b41..a708fc7dbc 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -181,92 +181,64 @@ pub async fn register_e3_requested( pub async fn register_e3_activated( mut indexer: EnclaveIndexer, ) -> Result> { + let indexer_clone = indexer.clone(); // E3Activated indexer .add_event_handler(move |event: E3Activated, store| { - let e3_id = event.e3Id.to::(); - let mut repo = CrispE3Repository::new(store.clone(), e3_id); - let mut current_round_repo = CurrentRoundRepository::new(store); - let expiration = event.expiration.to::(); - - info!("[e3_id={}] Handling E3 request", e3_id); + let mut indexer = indexer_clone.clone(); async move { + let e3_id = event.e3Id.to::(); + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let mut current_round_repo = CurrentRoundRepository::new(store); + let expiration = event.expiration.to::(); + + info!("[e3_id={}] Handling E3 request", e3_id); repo.start_round().await?; current_round_repo .set_current_round(CurrentRound { id: e3_id }) .await?; - // Calculate expiration time to sleep until - // XXX: This should not happen like this. During E3Activated the EnclaveIndexer - // should calculate the expired timestamp and listen for each block. When a block - // is broadcast that has the expected timestamp we then trigger an event - let now = get_current_timestamp_rpc().await?; - info!("[e3_id={}] Current time before sleep: {}", e3_id, now); - let wait_duration = if expiration > now { - let secs = expiration - now; - info!( - "[e3_id={}] Need to wait {} seconds until expiration", - e3_id, secs - ); - Duration::from_secs(secs) - } else { - info!("[e3_id={}] Expired E3", e3_id); - Duration::ZERO - }; - - /* - * indexer.dispatch_after(expiration, move async |block| { - * // rest of the computation - * }) - */ - if !wait_duration.is_zero() { - sleep(wait_duration).await; - } - let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; - repo.update_status("Expired").await?; - - if repo.get_vote_count().await? > 0 { - info!("[e3_id={}] Starting computation for E3", e3_id); - repo.update_status("Computing").await?; - - let (id, status) = run_compute( - e3_id, - e3.e3_params, - e3.ciphertext_inputs, - format!("{}/state/add-result", CONFIG.enclave_server_url), - ) - .await - .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; - - if id != e3_id { - return Err(eyre::eyre!( - "Computation request returned unexpected E3 ID: expected {}, got {}", + indexer.dispatch_after_timestamp(expiration, move |store| async move { + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; + repo.update_status("Expired").await?; + if repo.get_vote_count().await? > 0 { + info!("[e3_id={}] Starting computation for E3", e3_id); + repo.update_status("Computing").await?; + let (id, status) = run_compute( e3_id, - id - ) - .into()); - } - - if status != "processing" { - return Err(eyre::eyre!( - "Computation request failed with status: {}", - status + e3.e3_params, + e3.ciphertext_inputs, + format!("{}/state/add-result", CONFIG.enclave_server_url), ) - .into()); + .await + .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; + if id != e3_id { + return Err(eyre::eyre!( + "Computation request returned unexpected E3 ID: expected {}, got {}", + e3_id, + id + ).into()); + }; + if status != "processing" { + return Err(eyre::eyre!( + "Computation request failed with status: {}", + status + ).into()); + }; + info!("[e3_id={}] Request Computation for E3", e3_id); + repo.update_status("PublishingCiphertext").await?; + } else { + info!( + "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", + e3_id + ); + repo.update_status("Finished").await?; } - - info!("[e3_id={}] Request Computation for E3", e3_id); - - repo.update_status("PublishingCiphertext").await?; - } else { - info!( - "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", - e3_id - ); - repo.update_status("Finished").await?; - } - info!("[e3_id={}] E3 request handled successfully.", e3_id); + info!("[e3_id={}] E3 request handled successfully.", e3_id); + Ok(()) + }); Ok(()) } From 7f78030f26ee2c87abe0308ddb67dc63014ba358 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 19:30:34 +0000 Subject: [PATCH 05/19] rename file --- crates/indexer/src/{delayed_handlers.rs => callback_queue.rs} | 0 crates/indexer/src/lib.rs | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename crates/indexer/src/{delayed_handlers.rs => callback_queue.rs} (100%) diff --git a/crates/indexer/src/delayed_handlers.rs b/crates/indexer/src/callback_queue.rs similarity index 100% rename from crates/indexer/src/delayed_handlers.rs rename to crates/indexer/src/callback_queue.rs diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index e47134bfd4..0a3b0466bc 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -4,7 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -pub mod delayed_handlers; +pub mod callback_queue; mod indexer; pub mod models; mod repo; From ab082b1853ff481f3411410fcf770e7e7f76fd19 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 19:46:19 +0000 Subject: [PATCH 06/19] update documentation --- crates/evm-helpers/src/threshold_queue.rs | 7 ++- crates/indexer/src/callback_queue.rs | 2 +- crates/indexer/src/indexer.rs | 47 +++++++++++-------- .../src/server/program_server_request.rs | 1 + 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs index 021203e79a..38541496ac 100644 --- a/crates/evm-helpers/src/threshold_queue.rs +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -10,8 +10,8 @@ use std::{ sync::{Arc, RwLock}, }; -/// An implementation of a ThresholdQueue #[derive(Clone)] +/// An implementation of a ThresholdQueue pub struct ThresholdQueue { inner: Arc>>>, } @@ -27,19 +27,22 @@ impl ThresholdQueue where T: ThresholdItem, { + /// Create a new ThresholdQueue pub fn new() -> Self { Self { inner: Arc::new(RwLock::new(BinaryHeap::new())), } } - pub fn insert(&self, item: T) { + /// Push an item onto the queue + pub fn push(&self, item: T) { self.inner .write() .expect("Poisoned write in ThresholdQueue") .push(Reverse(item)); } + /// Keep taking items off the queue until `item.within_threshold(threshold)` returns false pub fn take_until_including(&self, threshold: u64) -> Vec { let mut found = Vec::new(); let mut inner = self diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs index 25bad16490..5215adc5bb 100644 --- a/crates/indexer/src/callback_queue.rs +++ b/crates/indexer/src/callback_queue.rs @@ -70,7 +70,7 @@ impl CallbackQueue { F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - self.queue.insert(TimedHandler { + self.queue.push(TimedHandler { time, handler: Arc::new(move || Box::pin(handler())), }) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index abfdbd6967..7b7ef7b7d4 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -4,7 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::delayed_handlers::CallbackQueue; +use crate::callback_queue::CallbackQueue; use crate::E3Repository; use super::{models::E3, DataStore}; @@ -143,7 +143,7 @@ impl DataStore for SharedStore { } } -/// Stores E3 event data on store for easy querying. +/// Stores E3 event data on a datastore (persisted or otherwise) for easy querying. pub struct EnclaveIndexer { listener: EventListener, callbacks: CallbackQueue, @@ -187,6 +187,7 @@ impl EnclaveIndexer { } impl EnclaveIndexer { + /// Try to create a new EnclaveIndexer pub async fn new( listener: EventListener, contract: EnclaveContract, @@ -206,6 +207,7 @@ impl EnclaveIndexer { Ok(instance) } + /// Try to create a new EnclaveIndexer from an endpoint and an address pub async fn from_endpoint_address( ws_url: &str, contract_address: &str, @@ -216,6 +218,7 @@ impl EnclaveIndexer { EnclaveIndexer::new(listener, contract, store).await } + /// Add a new Solidity event handler to the indexer pub async fn add_event_handler(&mut self, handler: F) where E: SolEvent + Send + Clone + 'static, @@ -233,7 +236,7 @@ impl EnclaveIndexer { .await; } - /// Register a callback for execution after the given timestap as returned by the chain. + /// Register a callback for execution after the given timestap as returned by the blockchain. pub fn dispatch_after_timestamp(&mut self, when: u64, handler: F) where F: Fn(SharedStore) -> Fut + Send + Sync + 'static, @@ -249,6 +252,27 @@ impl EnclaveIndexer { }); } + /// Start listening + pub fn start(&self) { + self.listener.start() + } + + /// Get E3 data by ID + pub async fn get_e3(&self, e3_id: u64) -> Result { + let (e3, _) = get_e3(self.store.clone(), e3_id).await?; + Ok(e3) + } + + /// Get a handle to the listener + pub fn get_listener(&self) -> EventListener { + self.listener.clone() + } + + /// Get a handle to the store + pub fn get_store(&self) -> SharedStore { + SharedStore::new(self.store.clone()) + } + async fn register_e3_activated(&mut self) -> Result<()> { let db = self.store.clone(); let contract = self.contract.clone(); @@ -402,23 +426,6 @@ impl EnclaveIndexer { self.register_blocktime_callbacks().await?; Ok(()) } - - pub fn start(&self) { - self.listener.start() - } - - pub async fn get_e3(&self, e3_id: u64) -> Result { - let (e3, _) = get_e3(self.store.clone(), e3_id).await?; - Ok(e3) - } - - pub fn get_listener(&self) -> EventListener { - self.listener.clone() - } - - pub fn get_store(&self) -> SharedStore { - SharedStore::new(self.store.clone()) - } } pub async fn get_e3( diff --git a/examples/CRISP/server/src/server/program_server_request.rs b/examples/CRISP/server/src/server/program_server_request.rs index 10687f82e4..05ffbdd014 100644 --- a/examples/CRISP/server/src/server/program_server_request.rs +++ b/examples/CRISP/server/src/server/program_server_request.rs @@ -44,6 +44,7 @@ pub struct ProcessingResponse { pub e3_id: u64, } +/// Run the computation remotely pub async fn run_compute( e3_id: u64, params: Vec, From d017a7f741e00ec2856c38500c44aab2bce68dda Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 19:49:38 +0000 Subject: [PATCH 07/19] update language --- crates/indexer/src/callback_queue.rs | 36 ++++++++++++++-------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs index 5215adc5bb..e9dffb45d6 100644 --- a/crates/indexer/src/callback_queue.rs +++ b/crates/indexer/src/callback_queue.rs @@ -8,21 +8,21 @@ use e3_evm_helpers::threshold_queue::{ThresholdItem, ThresholdQueue}; use eyre::Result; use std::{future::Future, pin::Pin, sync::Arc}; -type AsyncCallback = - Arc Pin> + Send>> + Send + Sync>; +/// Callback for CallbackQueue +type Callback = Arc Pin> + Send>> + Send + Sync>; #[derive(Clone)] /// A callback that has an execute time associated with it -pub struct TimedHandler { +pub struct TimedCallback { time: u64, - handler: AsyncCallback, + callback: Callback, } -impl ThresholdItem for TimedHandler { - type Item = AsyncCallback; +impl ThresholdItem for TimedCallback { + type Item = Callback; fn item(&self) -> Self::Item { - self.handler.clone() + self.callback.clone() } fn within_threshold(&self, threshold: u64) -> bool { @@ -30,21 +30,21 @@ impl ThresholdItem for TimedHandler { } } -impl Eq for TimedHandler {} +impl Eq for TimedCallback {} -impl PartialEq for TimedHandler { +impl PartialEq for TimedCallback { fn eq(&self, other: &Self) -> bool { self.time == other.time } } -impl PartialOrd for TimedHandler { +impl PartialOrd for TimedCallback { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for TimedHandler { +impl Ord for TimedCallback { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.time.cmp(&other.time) } @@ -53,7 +53,7 @@ impl Ord for TimedHandler { #[derive(Clone)] /// A queue of callbacks that can be executed when a given timestamp has been passed pub struct CallbackQueue { - queue: ThresholdQueue, + queue: ThresholdQueue, } impl CallbackQueue { @@ -64,23 +64,23 @@ impl CallbackQueue { } } - /// Push a handler to the queue to be executed at or before the given time. - pub fn push(&mut self, time: u64, handler: F) + /// Push a callback to the queue to be executed at or before the given time. + pub fn push(&mut self, time: u64, callback: F) where F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - self.queue.push(TimedHandler { + self.queue.push(TimedCallback { time, - handler: Arc::new(move || Box::pin(handler())), + callback: Arc::new(move || Box::pin(callback())), }) } /// Execute all pending callbacks up to and including the given time pub async fn execute_until_including(&self, time: u64) -> Result<()> { let handlers = self.queue.take_until_including(time); - for handler in handlers { - handler().await?; + for callback in handlers { + callback().await?; } Ok(()) } From cdf4e823f0b9111872bb46a14a4856459531c269 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 20:09:20 +0000 Subject: [PATCH 08/19] update docs to be more accurate and extract handler --- crates/indexer/src/indexer.rs | 12 +-- examples/CRISP/server/src/server/indexer.rs | 96 ++++++++++--------- .../contracts/interfaces/IE3.sol | 2 +- .../contracts/interfaces/IEnclave.sol | 2 +- 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 7b7ef7b7d4..da116eedfa 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -237,18 +237,18 @@ impl EnclaveIndexer { } /// Register a callback for execution after the given timestap as returned by the blockchain. - pub fn dispatch_after_timestamp(&mut self, when: u64, handler: F) + pub fn dispatch_after_timestamp(&mut self, when: u64, callback: F) where F: Fn(SharedStore) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { let store = SharedStore::new(self.store.clone()); - let handler = Arc::new(handler); + let callback = Arc::new(callback); self.callbacks.push(when, move || { - let handler = Arc::clone(&handler); + let callback = Arc::clone(&callback); let store = store.clone(); - handler(store) + callback(store) }); } @@ -403,7 +403,7 @@ impl EnclaveIndexer { Ok(()) } - async fn register_blocktime_callbacks(&mut self) -> Result<()> { + async fn register_blocktime_callback_handler(&mut self) -> Result<()> { let callbacks = self.callbacks.clone(); self.listener .add_block_handler(move |block| { @@ -423,7 +423,7 @@ impl EnclaveIndexer { self.register_input_published().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; - self.register_blocktime_callbacks().await?; + self.register_blocktime_callback_handler().await?; Ok(()) } } diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index a708fc7dbc..b98fcabbb8 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -15,6 +15,7 @@ use crate::server::{ use alloy::providers::{Provider, ProviderBuilder}; use alloy::sol_types::{sol_data, SolType}; use alloy_primitives::{Address, U256}; +use e3_sdk::indexer::SharedStore; use e3_sdk::{ bfv_helpers::decode_bytes_to_vec_u64, evm_helpers::{ @@ -31,15 +32,13 @@ use e3_sdk::{ }; use evm_helpers::CRISPContractFactory; use eyre::Context; +use eyre::Result; use log::info; use num_bigint::BigUint; -use std::error::Error; use std::time::Duration; use tokio::time::sleep; -type Result = std::result::Result>; - -pub async fn register_e3_requested( +async fn register_e3_requested( mut indexer: EnclaveIndexer, ) -> Result> { // E3Requested @@ -178,7 +177,7 @@ pub async fn register_e3_requested( Ok(indexer) } -pub async fn register_e3_activated( +async fn register_e3_activated( mut indexer: EnclaveIndexer, ) -> Result> { let indexer_clone = indexer.clone(); @@ -199,45 +198,8 @@ pub async fn register_e3_activated( .set_current_round(CurrentRound { id: e3_id }) .await?; - indexer.dispatch_after_timestamp(expiration, move |store| async move { - let mut repo = CrispE3Repository::new(store.clone(), e3_id); - let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; - repo.update_status("Expired").await?; - if repo.get_vote_count().await? > 0 { - info!("[e3_id={}] Starting computation for E3", e3_id); - repo.update_status("Computing").await?; - let (id, status) = run_compute( - e3_id, - e3.e3_params, - e3.ciphertext_inputs, - format!("{}/state/add-result", CONFIG.enclave_server_url), - ) - .await - .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; - if id != e3_id { - return Err(eyre::eyre!( - "Computation request returned unexpected E3 ID: expected {}, got {}", - e3_id, - id - ).into()); - }; - if status != "processing" { - return Err(eyre::eyre!( - "Computation request failed with status: {}", - status - ).into()); - }; - info!("[e3_id={}] Request Computation for E3", e3_id); - repo.update_status("PublishingCiphertext").await?; - } else { - info!( - "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", - e3_id - ); - repo.update_status("Finished").await?; - } - info!("[e3_id={}] E3 request handled successfully.", e3_id); - Ok(()) + indexer.dispatch_after_timestamp(expiration, move |store| { + handle_e3_input_deadline_expiration(e3_id, store) }); Ok(()) @@ -247,7 +209,49 @@ pub async fn register_e3_activated( Ok(indexer) } -pub async fn register_ciphertext_output_published( +async fn handle_e3_input_deadline_expiration( + e3_id: u64, + store: SharedStore, +) -> Result<()> { + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; + repo.update_status("Expired").await?; + if repo.get_vote_count().await? > 0 { + info!("[e3_id={}] Starting computation for E3", e3_id); + repo.update_status("Computing").await?; + let (id, status) = run_compute( + e3_id, + e3.e3_params, + e3.ciphertext_inputs, + format!("{}/state/add-result", CONFIG.enclave_server_url), + ) + .await + .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; + if id != e3_id { + return Err(eyre::eyre!( + "Computation request returned unexpected E3 ID: expected {}, got {}", + e3_id, + id + ) + .into()); + }; + if status != "processing" { + return Err(eyre::eyre!("Computation request failed with status: {}", status).into()); + }; + info!("[e3_id={}] Request Computation for E3", e3_id); + repo.update_status("PublishingCiphertext").await?; + } else { + info!( + "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", + e3_id + ); + repo.update_status("Finished").await?; + } + info!("[e3_id={}] E3 request handled successfully.", e3_id); + Ok(()) +} + +async fn register_ciphertext_output_published( mut indexer: EnclaveIndexer, ) -> Result> { // CiphertextOutputPublished @@ -265,7 +269,7 @@ pub async fn register_ciphertext_output_published( Ok(indexer) } -pub async fn register_plaintext_output_published( +async fn register_plaintext_output_published( mut indexer: EnclaveIndexer, ) -> Result> { // PlaintextOutputPublished diff --git a/packages/enclave-contracts/contracts/interfaces/IE3.sol b/packages/enclave-contracts/contracts/interfaces/IE3.sol index e18023d8df..d2d787eb51 100644 --- a/packages/enclave-contracts/contracts/interfaces/IE3.sol +++ b/packages/enclave-contracts/contracts/interfaces/IE3.sol @@ -18,7 +18,7 @@ import { IDecryptionVerifier } from "./IDecryptionVerifier.sol"; * @param requestBlock Block number when the E3 computation was requested * @param startWindow Start window for the computation: index 0 is minimum block, index 1 is the maximum block * @param duration Duration of the E3 computation in blocks or time units - * @param expiration Timestamp when committee duties expire and computation is considered failed + * @param expiration Timestamp when input deadline has expired and computation should commence * @param encryptionSchemeId Identifier for the encryption scheme used in this computation * @param e3Program Address of the E3 Program contract that validates and verifies the computation * @param e3ProgramParams ABI encoded computation parameters specific to the E3 program diff --git a/packages/enclave-contracts/contracts/interfaces/IEnclave.sol b/packages/enclave-contracts/contracts/interfaces/IEnclave.sol index f8b926a4d9..2a7a36cbce 100644 --- a/packages/enclave-contracts/contracts/interfaces/IEnclave.sol +++ b/packages/enclave-contracts/contracts/interfaces/IEnclave.sol @@ -26,7 +26,7 @@ interface IEnclave { /// @notice This event MUST be emitted when an Encrypted Execution Environment (E3) is successfully activated. /// @param e3Id ID of the E3. - /// @param expiration Timestamp when committee duties expire. + /// @param expiration Timestamp when input deadline has expired. /// @param committeePublicKey Public key of the committee. event E3Activated( uint256 e3Id, From 31fa80cc80646d0c136c0de487554841603cdaea Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 20:14:05 +0000 Subject: [PATCH 09/19] revert comments in ts to keep this focussed --- packages/enclave-sdk/src/types.ts | 10 +--------- templates/default/server/index.ts | 6 ------ 2 files changed, 1 insertion(+), 15 deletions(-) diff --git a/packages/enclave-sdk/src/types.ts b/packages/enclave-sdk/src/types.ts index 9b380fabec..b424a1ed5b 100644 --- a/packages/enclave-sdk/src/types.ts +++ b/packages/enclave-sdk/src/types.ts @@ -74,7 +74,6 @@ export interface ContractInstances { } // Unified Event System -// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. export enum EnclaveEventType { // E3 Lifecycle Events E3_REQUESTED = 'E3Requested', @@ -101,7 +100,6 @@ export enum EnclaveEventType { INITIALIZED = 'Initialized', } -// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. export enum RegistryEventType { // Committee Management COMMITTEE_REQUESTED = 'CommitteeRequested', @@ -116,14 +114,8 @@ export enum RegistryEventType { INITIALIZED = 'Initialized', } -// Events that don't come from the contracts but are part of the protocol -// TODO: bad practice to use TypeScript enums - should use discriminated unions instead. -export enum ArtificialEventType { - E3_INPUT_PUBLISH_WINDOW_EXPIRED = 'E3InputPublishWindowExpired', -} - // Union type for all events -export type AllEventTypes = EnclaveEventType | RegistryEventType | ArtificialEventType +export type AllEventTypes = EnclaveEventType | RegistryEventType // Event data interfaces based on TypeChain types export interface E3 { diff --git a/templates/default/server/index.ts b/templates/default/server/index.ts index ed63c7056f..ce9eaca08c 100644 --- a/templates/default/server/index.ts +++ b/templates/default/server/index.ts @@ -143,7 +143,6 @@ async function handleE3ActivatedEvent(event: any) { def.resolve() } - // XXX: This needs to be based off blocktime and driven by an event from the sdk const currentTime = BigInt(Math.floor(Date.now() / 1000)) const sleepSeconds = expiration > currentTime ? Number(expiration - currentTime) : 0 @@ -158,10 +157,6 @@ async function handleE3ActivatedEvent(event: any) { } } -// XXX: handle function -// async function handleE3InputWindowExpired(event: any) { -// } - async function handleInputPublishedEvent(event: any) { const data = event.data as InputPublishedData const e3Id = data.e3Id @@ -193,7 +188,6 @@ async function setupEventListeners() { sdk.onEnclaveEvent(EnclaveEventType.E3_ACTIVATED, handleE3ActivatedEvent) sdk.onEnclaveEvent(EnclaveEventType.INPUT_PUBLISHED, handleInputPublishedEvent) - // sdk.onEnclaveEvent(EnclaveEventType.E3_INPUT_WINDOW_EXPIRED, handleE3InputWindowExpired); console.log('✅ Event listeners set up successfully') } From 367bb687b90bbe9962ebf51f8ca738b571e1fbda Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 22:01:59 +0000 Subject: [PATCH 10/19] fix test compilation bug --- crates/evm-helpers/src/threshold_queue.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs index 38541496ac..b705cc14af 100644 --- a/crates/evm-helpers/src/threshold_queue.rs +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -107,13 +107,13 @@ mod tests { #[test] fn test_collection_is_ordered() { let queue = ThresholdQueue::new(); - queue.insert(ThreshItem { val: 111, rank: 25 }); - queue.insert(ThreshItem { + queue.push(ThreshItem { val: 111, rank: 25 }); + queue.push(ThreshItem { val: 666, rank: 100, }); - queue.insert(ThreshItem { val: 444, rank: 70 }); - queue.insert(ThreshItem { val: 222, rank: 26 }); + queue.push(ThreshItem { val: 444, rank: 70 }); + queue.push(ThreshItem { val: 222, rank: 26 }); let items = queue.take_until_including(70); assert_eq!(items, vec![111, 222, 444]); From 66c9f0c338eafe603b5d738a924795aa53d33690 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 22:08:14 +0000 Subject: [PATCH 11/19] fix test bug --- Cargo.lock | 1 + crates/indexer/Cargo.toml | 1 + crates/indexer/src/indexer.rs | 3 +++ examples/CRISP/Cargo.lock | 1 + 4 files changed, 6 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index e88bd0660b..2a275f63d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2958,6 +2958,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index 4e83410aa6..d1dbfca2df 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -15,3 +15,4 @@ eyre.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index da116eedfa..9636b961b6 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -26,6 +26,7 @@ use std::future::Future; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; +use tracing::info; type E3Id = u64; @@ -408,8 +409,10 @@ impl EnclaveIndexer { self.listener .add_block_handler(move |block| { let timestamp = block.timestamp(); + let blockheight = block.number(); let callbacks = callbacks.clone(); async move { + info!("on block: {}:{}", blockheight, timestamp); callbacks.execute_until_including(timestamp).await?; Ok(()) } diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 6b2495b53b..c9891247b1 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2313,6 +2313,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] From 26dcc14b445887be2f6a9b22a59da8751d7fc100 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 22:54:54 +0000 Subject: [PATCH 12/19] add logging --- crates/evm-helpers/src/listener.rs | 9 ++++++++- crates/indexer/src/indexer.rs | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index f703bd57b1..9510e9e51e 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -17,7 +17,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{sync::RwLock, time::sleep}; -use tracing::error; +use tracing::{error, info}; type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -76,6 +76,7 @@ impl EventListener { F: Fn(&Header) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { + info!("add_block_handler"); self.block_handlers .write() .await @@ -83,6 +84,7 @@ impl EventListener { } async fn listen_once(&self) -> Result<()> { + info!("listen_once()"); let mut stream = self .provider .subscribe_logs(&self.filter) @@ -109,8 +111,10 @@ impl EventListener { } async fn block_listen_once(&self) -> Result<()> { + info!("block_listen_once()"); let mut stream = self.provider.subscribe_blocks().await?.into_stream(); while let Some(block) = stream.next().await { + info!("GOT BLOCK! {:?}", block); let handlers = self.block_handlers.read().await; for handler in handlers.iter() { let fut = handler(&block); @@ -125,11 +129,13 @@ impl EventListener { } fn start_block_listen_loop(&self) { + info!("start_block_listen_loop"); let this = self.clone(); tokio::spawn(async move { this.retry_loop(|| this.block_listen_once()).await }); } fn start_listen_loop(&self) { + info!("start_listen_loop"); let this = self.clone(); tokio::spawn(async move { this.retry_loop(|| this.listen_once()).await }); } @@ -154,6 +160,7 @@ impl EventListener { } pub fn start(&self) { + info!("Starting event listener!"); self.start_listen_loop(); self.start_block_listen_loop(); } diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 9636b961b6..b1fee26f33 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -405,6 +405,7 @@ impl EnclaveIndexer { } async fn register_blocktime_callback_handler(&mut self) -> Result<()> { + info!("register_blocktime_callback_handler()..."); let callbacks = self.callbacks.clone(); self.listener .add_block_handler(move |block| { From b194b2082b7db330df71834e5a394f9f46e3f8e8 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 23:02:06 +0000 Subject: [PATCH 13/19] add logging --- crates/evm-helpers/src/listener.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index 9510e9e51e..bb25599709 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -150,9 +150,14 @@ impl EventListener { match operation().await { Ok(_) => { sleep(Duration::from_secs(1)).await; + info!("\n**********************************************************"); + info!("Operation finished unexpectedly!\nRestarting..."); + info!("**********************************************************\n\n"); } Err(e) => { + error!("\n**********************************************************"); error!("Error occurred: {}. Retrying in 5 seconds...", e); + error!("**********************************************************\n\n"); sleep(Duration::from_secs(5)).await; } } From 431ebb5a36750d8a744992df891d7391b800aaed Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 23:16:48 +0000 Subject: [PATCH 14/19] add logging --- crates/evm-helpers/src/listener.rs | 8 ++------ crates/indexer/src/callback_queue.rs | 3 +++ crates/indexer/src/indexer.rs | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index bb25599709..9511448cd8 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -17,7 +17,7 @@ use futures::stream::StreamExt; use futures_util::future::FutureExt; use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; use tokio::{sync::RwLock, time::sleep}; -use tracing::{error, info}; +use tracing::{error, info, warn}; type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -114,7 +114,6 @@ impl EventListener { info!("block_listen_once()"); let mut stream = self.provider.subscribe_blocks().await?.into_stream(); while let Some(block) = stream.next().await { - info!("GOT BLOCK! {:?}", block); let handlers = self.block_handlers.read().await; for handler in handlers.iter() { let fut = handler(&block); @@ -150,9 +149,6 @@ impl EventListener { match operation().await { Ok(_) => { sleep(Duration::from_secs(1)).await; - info!("\n**********************************************************"); - info!("Operation finished unexpectedly!\nRestarting..."); - info!("**********************************************************\n\n"); } Err(e) => { error!("\n**********************************************************"); @@ -161,11 +157,11 @@ impl EventListener { sleep(Duration::from_secs(5)).await; } } + warn!("Ongoing operation finished unexpectedly"); } } pub fn start(&self) { - info!("Starting event listener!"); self.start_listen_loop(); self.start_block_listen_loop(); } diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs index e9dffb45d6..b6956a2e1d 100644 --- a/crates/indexer/src/callback_queue.rs +++ b/crates/indexer/src/callback_queue.rs @@ -7,6 +7,7 @@ use e3_evm_helpers::threshold_queue::{ThresholdItem, ThresholdQueue}; use eyre::Result; use std::{future::Future, pin::Pin, sync::Arc}; +use tracing::info; /// Callback for CallbackQueue type Callback = Arc Pin> + Send>> + Send + Sync>; @@ -78,7 +79,9 @@ impl CallbackQueue { /// Execute all pending callbacks up to and including the given time pub async fn execute_until_including(&self, time: u64) -> Result<()> { + info!("execute_until_including..."); let handlers = self.queue.take_until_including(time); + info!("found {} handlers", handlers.len()); for callback in handlers { callback().await?; } diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index b1fee26f33..51c02ea279 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -413,7 +413,7 @@ impl EnclaveIndexer { let blockheight = block.number(); let callbacks = callbacks.clone(); async move { - info!("on block: {}:{}", blockheight, timestamp); + info!("ON BLOCK: {}:{}", blockheight, timestamp); callbacks.execute_until_including(timestamp).await?; Ok(()) } From 3084d67e0bdad553aad7515b624d23b9d82c4cc7 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 23:19:52 +0000 Subject: [PATCH 15/19] add logging --- crates/indexer/src/callback_queue.rs | 1 + crates/indexer/src/indexer.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs index b6956a2e1d..002204b47f 100644 --- a/crates/indexer/src/callback_queue.rs +++ b/crates/indexer/src/callback_queue.rs @@ -71,6 +71,7 @@ impl CallbackQueue { F: Fn() -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { + info!("ADDING CALLBACK TO time={}", time); self.queue.push(TimedCallback { time, callback: Arc::new(move || Box::pin(callback())), diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 51c02ea279..4341e4ee7d 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -243,10 +243,12 @@ impl EnclaveIndexer { F: Fn(SharedStore) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { + info!("dispatch_after_timestamp time={}", when); let store = SharedStore::new(self.store.clone()); let callback = Arc::new(callback); self.callbacks.push(when, move || { + info!("Running callback: time={}", when); let callback = Arc::clone(&callback); let store = store.clone(); callback(store) From b35e7813ed2a0b5d2718e8942fd034b0f4457be0 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 23:21:38 +0000 Subject: [PATCH 16/19] add logging --- examples/CRISP/server/src/server/indexer.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index b98fcabbb8..e10394ba0a 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -198,7 +198,9 @@ async fn register_e3_activated( .set_current_round(CurrentRound { id: e3_id }) .await?; + info!("[e3_id={}] Registering hook for {}", e3_id, expiration); indexer.dispatch_after_timestamp(expiration, move |store| { + info!("Running...."); handle_e3_input_deadline_expiration(e3_id, store) }); From 1a12de0c520a032c19333a6b15ba2cda2819e826 Mon Sep 17 00:00:00 2001 From: ryardley Date: Fri, 21 Nov 2025 23:31:36 +0000 Subject: [PATCH 17/19] add logging --- examples/CRISP/scripts/dev_server.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/CRISP/scripts/dev_server.sh b/examples/CRISP/scripts/dev_server.sh index 928ad6c88b..27096d8aa8 100755 --- a/examples/CRISP/scripts/dev_server.sh +++ b/examples/CRISP/scripts/dev_server.sh @@ -4,4 +4,7 @@ set -e export CARGO_INCREMENTAL=1 +echo "<<................RUNNING SERVER...............>>" + + (cd ./server && cargo run --bin server) From 88df99f2b7639a4f2c5bc48a410f889344cf42ac Mon Sep 17 00:00:00 2001 From: ryardley Date: Sat, 22 Nov 2025 01:36:35 +0000 Subject: [PATCH 18/19] unify providers --- crates/evm-helpers/src/contracts.rs | 41 +++++++++---- crates/evm-helpers/src/listener.rs | 64 ++++++++++++--------- crates/indexer/src/indexer.rs | 2 +- examples/CRISP/server/src/server/indexer.rs | 23 +++++--- 4 files changed, 81 insertions(+), 49 deletions(-) diff --git a/crates/evm-helpers/src/contracts.rs b/crates/evm-helpers/src/contracts.rs index e56eddd510..7ece6a6959 100644 --- a/crates/evm-helpers/src/contracts.rs +++ b/crates/evm-helpers/src/contracts.rs @@ -4,7 +4,9 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use alloy::network::NetworkWallet; use alloy::providers::fillers::BlobGasFiller; +use alloy::providers::WalletProvider; use alloy::{ network::{Ethereum, EthereumWallet}, primitives::{Address, Bytes, U256}, @@ -206,6 +208,8 @@ impl EnclaveContract { pub fn address(&self) -> &Address { &self.contract_address } + + // pub fn create_new_read_contract() } impl EnclaveContract { @@ -261,18 +265,26 @@ impl EnclaveContractFactory { contract_address: &str, private_key: &str, ) -> Result> { - let contract_address = contract_address.parse()?; - let signer: PrivateKeySigner = private_key.parse()?; - let wallet_address = signer.address(); let wallet = EthereumWallet::from(signer); - let provider = ProviderBuilder::new() - .wallet(wallet) - .connect(http_rpc_url) - .await?; + let provider = Arc::new( + ProviderBuilder::new() + .wallet(wallet) + .connect(http_rpc_url) + .await?, + ); + EnclaveContractFactory::create_write_from_provider(contract_address, provider) + } + pub fn create_write_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result> { + let contract_address = contract_address.parse()?; + let wallet = provider.wallet(); + let wallet_address = wallet.default_signer().address(); Ok(EnclaveContract:: { - provider: Arc::new(provider), + provider, contract_address, wallet_address: Some(wallet_address), _marker: PhantomData, @@ -284,12 +296,17 @@ impl EnclaveContractFactory { http_rpc_url: &str, contract_address: &str, ) -> Result> { - let contract_address = contract_address.parse()?; - - let provider = ProviderBuilder::new().connect(http_rpc_url).await?; + let provider = Arc::new(ProviderBuilder::new().connect(http_rpc_url).await?); + EnclaveContractFactory::create_read_from_provider(contract_address, provider) + } + pub fn create_read_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result> { + let contract_address = contract_address.parse()?; Ok(EnclaveContract:: { - provider: Arc::new(provider), + provider, contract_address, wallet_address: None, _marker: PhantomData, diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index 9511448cd8..59264e727b 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -19,6 +19,8 @@ use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Durat use tokio::{sync::RwLock, time::sleep}; use tracing::{error, info, warn}; +use crate::contracts::{EnclaveContractFactory, EnclaveReadOnlyProvider}; + type EventHandler = Box Pin> + Send>> + Send + Sync>; @@ -32,6 +34,8 @@ pub struct EventListener { filter: Filter, handlers: Arc>>>, block_handlers: Arc>>, + event_started: bool, + block_started: bool, } impl EventListener { @@ -41,6 +45,8 @@ impl EventListener { filter, handlers: Arc::new(RwLock::new(HashMap::new())), block_handlers: Arc::new(RwLock::new(Vec::new())), + event_started: false, + block_started: false, } } @@ -83,8 +89,8 @@ impl EventListener { .push(Box::new(move |h: &Header| Box::pin(handler(h)))); } - async fn listen_once(&self) -> Result<()> { - info!("listen_once()"); + async fn event_listen_once(&self) -> Result<()> { + info!("event_listen_once()"); let mut stream = self .provider .subscribe_logs(&self.filter) @@ -127,16 +133,29 @@ impl EventListener { Ok(()) } - fn start_block_listen_loop(&self) { + fn ensure_block_listen_loop(&mut self) { info!("start_block_listen_loop"); + self.block_started = true; let this = self.clone(); - tokio::spawn(async move { this.retry_loop(|| this.block_listen_once()).await }); + tokio::spawn(async move { + let len = { this.block_handlers.read().await.len() }; + + if len > 0 { + this.retry_loop(|| this.block_listen_once()).await; + } + }); } - fn start_listen_loop(&self) { - info!("start_listen_loop"); + fn ensure_event_listen_loop(&mut self) { + info!("ensure_event_listen_loop"); + self.event_started = true; let this = self.clone(); - tokio::spawn(async move { this.retry_loop(|| this.listen_once()).await }); + tokio::spawn(async move { + let len = { this.handlers.read().await.len() }; + if len > 0 { + this.retry_loop(|| this.event_listen_once()).await; + } + }); } async fn retry_loop(&self, mut operation: F) @@ -161,13 +180,20 @@ impl EventListener { } } - pub fn start(&self) { - self.start_listen_loop(); - self.start_block_listen_loop(); + pub fn start(&mut self) { + self.ensure_event_listen_loop(); + self.ensure_block_listen_loop(); } pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); + EventListener::create_contract_listener_from_provider(contract_address, provider) + } + + pub fn create_contract_listener_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result { let address = contract_address.parse::

()?; let filter = Filter::new() .address(address) @@ -175,21 +201,3 @@ impl EventListener { Ok(EventListener::new(provider, filter)) } } - -async fn retry_with_backoff(mut f: F) -where - F: FnMut() -> Fut, - Fut: std::future::Future>, -{ - loop { - match f().await { - Ok(_) => { - sleep(Duration::from_secs(1)).await; - } - Err(e) => { - error!("Error occurred: {}. Retrying in 5 seconds...", e); - sleep(Duration::from_secs(5)).await; - } - } - } -} diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 4341e4ee7d..f4f8b0ce5c 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -256,7 +256,7 @@ impl EnclaveIndexer { } /// Start listening - pub fn start(&self) { + pub fn start(&mut self) { self.listener.start() } diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index e10394ba0a..545bba1f4b 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -389,13 +389,25 @@ pub async fn start_indexer( store: impl DataStore, private_key: &str, ) -> Result<()> { + info!("== START INDEXER! =="); + let readonly_contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; + let enclave_contract_listener = EventListener::create_contract_listener_from_provider( + contract_address, + readonly_contract.get_provider(), + )?; + let readwrite_contract = EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; - let enclave_contract_listener = - EventListener::create_contract_listener(ws_url, contract_address).await?; + // Registry Listener + let registry_contract_listener = EventListener::create_contract_listener_from_provider( + registry_address, + readonly_contract.get_provider(), + )?; + let mut registry_listener = + register_committee_published(registry_contract_listener, readwrite_contract).await?; // CRISP indexer let crisp_indexer = @@ -403,14 +415,9 @@ pub async fn start_indexer( let crisp_indexer = register_e3_requested(crisp_indexer).await?; let crisp_indexer = register_e3_activated(crisp_indexer).await?; let crisp_indexer = register_ciphertext_output_published(crisp_indexer).await?; - let crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; + let mut crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; crisp_indexer.start(); - // Registry Listener - let registry_contract_listener = - EventListener::create_contract_listener(&ws_url, registry_address).await?; - let registry_listener = - register_committee_published(registry_contract_listener, readwrite_contract).await?; registry_listener.start(); Ok(()) From fad316c6c058648af3833b78faa1a0ad5454be6f Mon Sep 17 00:00:00 2001 From: ryardley Date: Sat, 22 Nov 2025 02:29:01 +0000 Subject: [PATCH 19/19] add logging --- crates/indexer/src/indexer.rs | 2 +- examples/CRISP/server/src/server/indexer.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index f4f8b0ce5c..08723b78a5 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -243,7 +243,7 @@ impl EnclaveIndexer { F: Fn(SharedStore) -> Fut + Send + Sync + 'static, Fut: Future> + Send + 'static, { - info!("dispatch_after_timestamp time={}", when); + info!("%%%%% ***** >>>>>> dispatch_after_timestamp time={}", when); let store = SharedStore::new(self.store.clone()); let callback = Arc::new(callback); diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 545bba1f4b..fa9a54ccda 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -180,11 +180,13 @@ async fn register_e3_requested( async fn register_e3_activated( mut indexer: EnclaveIndexer, ) -> Result> { + info!("** register_e3_activated"); let indexer_clone = indexer.clone(); // E3Activated indexer .add_event_handler(move |event: E3Activated, store| { - let mut indexer = indexer_clone.clone(); + info!("E3Activated!"); + let mut indexer_clone = indexer_clone.clone(); async move { let e3_id = event.e3Id.to::(); let mut repo = CrispE3Repository::new(store.clone(), e3_id); @@ -199,7 +201,7 @@ async fn register_e3_activated( .await?; info!("[e3_id={}] Registering hook for {}", e3_id, expiration); - indexer.dispatch_after_timestamp(expiration, move |store| { + indexer_clone.dispatch_after_timestamp(expiration, move |store| { info!("Running...."); handle_e3_input_deadline_expiration(e3_id, store) });