diff --git a/Cargo.lock b/Cargo.lock index fc10fc725e..2a275f63d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,6 +2906,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] @@ -2957,6 +2958,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/crates/aggregator/src/committee_finalizer.rs b/crates/aggregator/src/committee_finalizer.rs index 587c8311ea..dc2b582d3c 100644 --- a/crates/aggregator/src/committee_finalizer.rs +++ b/crates/aggregator/src/committee_finalizer.rs @@ -107,6 +107,7 @@ impl Handler for CommitteeFinalizer { let bus = act.bus.clone(); let e3_id_clone = e3_id_for_async.clone(); + // XXX: refactor to use blockchain time let handle = ctx.run_later( Duration::from_secs(seconds_until_deadline), move |act, _ctx| { diff --git a/crates/evm-helpers/Cargo.toml b/crates/evm-helpers/Cargo.toml index f5a5848237..8470dcf60c 100644 --- a/crates/evm-helpers/Cargo.toml +++ b/crates/evm-helpers/Cargo.toml @@ -14,3 +14,4 @@ futures.workspace = true futures-util.workspace = true once_cell.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/evm-helpers/src/contracts.rs b/crates/evm-helpers/src/contracts.rs index e56eddd510..7ece6a6959 100644 --- a/crates/evm-helpers/src/contracts.rs +++ b/crates/evm-helpers/src/contracts.rs @@ -4,7 +4,9 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use alloy::network::NetworkWallet; use alloy::providers::fillers::BlobGasFiller; +use alloy::providers::WalletProvider; use alloy::{ network::{Ethereum, EthereumWallet}, primitives::{Address, Bytes, U256}, @@ -206,6 +208,8 @@ impl EnclaveContract { pub fn address(&self) -> &Address { &self.contract_address } + + // pub fn create_new_read_contract() } impl EnclaveContract { @@ -261,18 +265,26 @@ impl EnclaveContractFactory { contract_address: &str, private_key: &str, ) -> Result> { - let contract_address = contract_address.parse()?; - let signer: PrivateKeySigner = private_key.parse()?; - let wallet_address = signer.address(); let wallet = EthereumWallet::from(signer); - let provider = ProviderBuilder::new() - .wallet(wallet) - .connect(http_rpc_url) - .await?; + let provider = Arc::new( + ProviderBuilder::new() + .wallet(wallet) + .connect(http_rpc_url) + .await?, + ); + EnclaveContractFactory::create_write_from_provider(contract_address, provider) + } + pub fn create_write_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result> { + let contract_address = contract_address.parse()?; + let wallet = provider.wallet(); + let wallet_address = wallet.default_signer().address(); Ok(EnclaveContract:: { - provider: Arc::new(provider), + provider, contract_address, wallet_address: Some(wallet_address), _marker: PhantomData, @@ -284,12 +296,17 @@ impl EnclaveContractFactory { http_rpc_url: &str, contract_address: &str, ) -> Result> { - let contract_address = contract_address.parse()?; - - let provider = ProviderBuilder::new().connect(http_rpc_url).await?; + let provider = Arc::new(ProviderBuilder::new().connect(http_rpc_url).await?); + EnclaveContractFactory::create_read_from_provider(contract_address, provider) + } + pub fn create_read_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result> { + let contract_address = contract_address.parse()?; Ok(EnclaveContract:: { - provider: Arc::new(provider), + provider, contract_address, wallet_address: None, _marker: PhantomData, diff --git a/crates/evm-helpers/src/lib.rs b/crates/evm-helpers/src/lib.rs index da02ad0519..7c343cc998 100644 --- a/crates/evm-helpers/src/lib.rs +++ b/crates/evm-helpers/src/lib.rs @@ -7,3 +7,4 @@ pub mod contracts; pub mod events; pub mod listener; +pub mod threshold_queue; diff --git a/crates/evm-helpers/src/listener.rs b/crates/evm-helpers/src/listener.rs index ccb00b1620..59264e727b 100644 --- a/crates/evm-helpers/src/listener.rs +++ b/crates/evm-helpers/src/listener.rs @@ -5,6 +5,7 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use alloy::{ + consensus::Header, network::Ethereum, primitives::{Address, B256}, providers::{Provider, ProviderBuilder}, @@ -14,17 +15,27 @@ use alloy::{ use eyre::Result; use futures::stream::StreamExt; use futures_util::future::FutureExt; -use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc}; -use tokio::{sync::RwLock, task::JoinHandle}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration}; +use tokio::{sync::RwLock, time::sleep}; +use tracing::{error, info, warn}; + +use crate::contracts::{EnclaveContractFactory, EnclaveReadOnlyProvider}; type EventHandler = Box Pin> + Send>> + Send + Sync>; +type BlockHandler = + Box Pin> + Send>> + Send + Sync>; + #[derive(Clone)] +/// Listens for contract events pub struct EventListener { provider: Arc>, filter: Filter, handlers: Arc>>>, + block_handlers: Arc>>, + event_started: bool, + block_started: bool, } impl EventListener { @@ -33,6 +44,9 @@ impl EventListener { provider, filter, handlers: Arc::new(RwLock::new(HashMap::new())), + block_handlers: Arc::new(RwLock::new(Vec::new())), + event_started: false, + block_started: false, } } @@ -63,7 +77,20 @@ impl EventListener { .push(wrapped_handler); } - async fn listen(&self) -> Result<()> { + pub async fn add_block_handler(&mut self, handler: F) + where + F: Fn(&Header) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + info!("add_block_handler"); + self.block_handlers + .write() + .await + .push(Box::new(move |h: &Header| Box::pin(handler(h)))); + } + + async fn event_listen_once(&self) -> Result<()> { + info!("event_listen_once()"); let mut stream = self .provider .subscribe_logs(&self.filter) @@ -89,13 +116,84 @@ impl EventListener { Ok(()) } - pub fn start(&self) -> JoinHandle> { + async fn block_listen_once(&self) -> Result<()> { + info!("block_listen_once()"); + let mut stream = self.provider.subscribe_blocks().await?.into_stream(); + while let Some(block) = stream.next().await { + let handlers = self.block_handlers.read().await; + for handler in handlers.iter() { + let fut = handler(&block); + tokio::spawn(async move { + if let Err(e) = fut.await { + eprintln!("Error processing block: {:?}", e); + } + }); + } + } + Ok(()) + } + + fn ensure_block_listen_loop(&mut self) { + info!("start_block_listen_loop"); + self.block_started = true; let this = self.clone(); - tokio::spawn(async move { this.listen().await }) + tokio::spawn(async move { + let len = { this.block_handlers.read().await.len() }; + + if len > 0 { + this.retry_loop(|| this.block_listen_once()).await; + } + }); + } + + fn ensure_event_listen_loop(&mut self) { + info!("ensure_event_listen_loop"); + self.event_started = true; + let this = self.clone(); + tokio::spawn(async move { + let len = { this.handlers.read().await.len() }; + if len > 0 { + this.retry_loop(|| this.event_listen_once()).await; + } + }); + } + + async fn retry_loop(&self, mut operation: F) + where + F: FnMut() -> Fut, + Fut: Future>, + E: std::fmt::Display, + { + loop { + match operation().await { + Ok(_) => { + sleep(Duration::from_secs(1)).await; + } + Err(e) => { + error!("\n**********************************************************"); + error!("Error occurred: {}. Retrying in 5 seconds...", e); + error!("**********************************************************\n\n"); + sleep(Duration::from_secs(5)).await; + } + } + warn!("Ongoing operation finished unexpectedly"); + } + } + + pub fn start(&mut self) { + self.ensure_event_listen_loop(); + self.ensure_block_listen_loop(); } pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result { let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?); + EventListener::create_contract_listener_from_provider(contract_address, provider) + } + + pub fn create_contract_listener_from_provider( + contract_address: &str, + provider: Arc, + ) -> Result { let address = contract_address.parse::
()?; let filter = Filter::new() .address(address) diff --git a/crates/evm-helpers/src/threshold_queue.rs b/crates/evm-helpers/src/threshold_queue.rs new file mode 100644 index 0000000000..b705cc14af --- /dev/null +++ b/crates/evm-helpers/src/threshold_queue.rs @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::{ + cmp::Reverse, + collections::BinaryHeap, + sync::{Arc, RwLock}, +}; + +#[derive(Clone)] +/// An implementation of a ThresholdQueue +pub struct ThresholdQueue { + inner: Arc>>>, +} + +/// An item that can be added to a threshold queue +pub trait ThresholdItem: Ord { + type Item; + fn within_threshold(&self, threshold: u64) -> bool; + fn item(&self) -> Self::Item; +} + +impl ThresholdQueue +where + T: ThresholdItem, +{ + /// Create a new ThresholdQueue + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(BinaryHeap::new())), + } + } + + /// Push an item onto the queue + pub fn push(&self, item: T) { + self.inner + .write() + .expect("Poisoned write in ThresholdQueue") + .push(Reverse(item)); + } + + /// Keep taking items off the queue until `item.within_threshold(threshold)` returns false + pub fn take_until_including(&self, threshold: u64) -> Vec { + let mut found = Vec::new(); + let mut inner = self + .inner + .write() + .expect("Poisoned write in ThresholdQueue"); + + while let Some(Reverse(item)) = inner.peek() { + if item.within_threshold(threshold) { + if let Some(Reverse(item)) = inner.pop() { + found.push(item.item()); + } + } else { + break; + } + } + + found + } +} + +#[cfg(test)] +mod tests { + use super::{ThresholdItem, ThresholdQueue}; + + struct ThreshItem { + val: u64, + rank: u64, + } + + impl Eq for ThreshItem {} + + impl PartialEq for ThreshItem { + fn eq(&self, other: &Self) -> bool { + self.rank == other.rank + } + } + + impl PartialOrd for ThreshItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + + impl Ord for ThreshItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.rank.cmp(&other.rank) + } + } + + impl ThresholdItem for ThreshItem { + type Item = u64; + fn item(&self) -> Self::Item { + self.val + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.rank <= threshold + } + } + + #[test] + fn test_collection_is_ordered() { + let queue = ThresholdQueue::new(); + queue.push(ThreshItem { val: 111, rank: 25 }); + queue.push(ThreshItem { + val: 666, + rank: 100, + }); + queue.push(ThreshItem { val: 444, rank: 70 }); + queue.push(ThreshItem { val: 222, rank: 26 }); + let items = queue.take_until_including(70); + + assert_eq!(items, vec![111, 222, 444]); + + let items = queue.take_until_including(101); + + assert_eq!(items, vec![666]); + } +} diff --git a/crates/evm/src/event_reader.rs b/crates/evm/src/event_reader.rs index b206b1568e..d3c0f18014 100644 --- a/crates/evm/src/event_reader.rs +++ b/crates/evm/src/event_reader.rs @@ -33,6 +33,9 @@ pub enum EnclaveEvmEvent { event: EnclaveEvent, block: Option, }, + // Log { + // data,topic, chain_id, address + // } } impl EnclaveEvmEvent { @@ -182,6 +185,8 @@ impl Actor for EvmEventReader

{ } } +// TODO: +// - extract and combine all filters #[instrument(name = "evm_event_reader", skip_all)] async fn stream_from_evm( provider: EthProvider

, diff --git a/crates/indexer/Cargo.toml b/crates/indexer/Cargo.toml index 4e83410aa6..d1dbfca2df 100644 --- a/crates/indexer/Cargo.toml +++ b/crates/indexer/Cargo.toml @@ -15,3 +15,4 @@ eyre.workspace = true serde.workspace = true thiserror.workspace = true tokio.workspace = true +tracing.workspace = true diff --git a/crates/indexer/src/callback_queue.rs b/crates/indexer/src/callback_queue.rs new file mode 100644 index 0000000000..002204b47f --- /dev/null +++ b/crates/indexer/src/callback_queue.rs @@ -0,0 +1,179 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use e3_evm_helpers::threshold_queue::{ThresholdItem, ThresholdQueue}; +use eyre::Result; +use std::{future::Future, pin::Pin, sync::Arc}; +use tracing::info; + +/// Callback for CallbackQueue +type Callback = Arc Pin> + Send>> + Send + Sync>; + +#[derive(Clone)] +/// A callback that has an execute time associated with it +pub struct TimedCallback { + time: u64, + callback: Callback, +} + +impl ThresholdItem for TimedCallback { + type Item = Callback; + + fn item(&self) -> Self::Item { + self.callback.clone() + } + + fn within_threshold(&self, threshold: u64) -> bool { + self.time <= threshold + } +} + +impl Eq for TimedCallback {} + +impl PartialEq for TimedCallback { + fn eq(&self, other: &Self) -> bool { + self.time == other.time + } +} + +impl PartialOrd for TimedCallback { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TimedCallback { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.time.cmp(&other.time) + } +} + +#[derive(Clone)] +/// A queue of callbacks that can be executed when a given timestamp has been passed +pub struct CallbackQueue { + queue: ThresholdQueue, +} + +impl CallbackQueue { + /// Create a new queue + pub fn new() -> Self { + Self { + queue: ThresholdQueue::new(), + } + } + + /// Push a callback to the queue to be executed at or before the given time. + pub fn push(&mut self, time: u64, callback: F) + where + F: Fn() -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + info!("ADDING CALLBACK TO time={}", time); + self.queue.push(TimedCallback { + time, + callback: Arc::new(move || Box::pin(callback())), + }) + } + + /// Execute all pending callbacks up to and including the given time + pub async fn execute_until_including(&self, time: u64) -> Result<()> { + info!("execute_until_including..."); + let handlers = self.queue.take_until_including(time); + info!("found {} handlers", handlers.len()); + for callback in handlers { + callback().await?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + #[tokio::test] + async fn test_single_callback_executes() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.push(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert!(*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_callback_not_executed_before_threshold() { + let mut queue = CallbackQueue::new(); + let called = Arc::new(Mutex::new(false)); + let called_clone = called.clone(); + + queue.push(100, move || { + let called = called_clone.clone(); + async move { + *called.lock().unwrap() = true; + Ok(()) + } + }); + + queue.execute_until_including(50).await.unwrap(); + assert!(!*called.lock().unwrap()); + } + + #[tokio::test] + async fn test_multiple_callbacks_execute() { + let mut queue = CallbackQueue::new(); + let counter = Arc::new(Mutex::new(0)); + + let c1 = counter.clone(); + queue.push(50, move || { + let c = c1.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c2 = counter.clone(); + queue.push(100, move || { + let c = c2.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + let c3 = counter.clone(); + queue.push(150, move || { + let c = c3.clone(); + async move { + *c.lock().unwrap() += 1; + Ok(()) + } + }); + + queue.execute_until_including(100).await.unwrap(); + assert_eq!(*counter.lock().unwrap(), 2); + } + + #[tokio::test] + async fn test_error_propagation() { + let mut queue = CallbackQueue::new(); + + queue.push(100, || async { Err(eyre::eyre!("test error")) }); + + let result = queue.execute_until_including(100).await; + assert!(result.is_err()); + } +} diff --git a/crates/indexer/src/indexer.rs b/crates/indexer/src/indexer.rs index 4d6c8a8c9b..08723b78a5 100644 --- a/crates/indexer/src/indexer.rs +++ b/crates/indexer/src/indexer.rs @@ -4,19 +4,21 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use crate::callback_queue::CallbackQueue; use crate::E3Repository; use super::{models::E3, DataStore}; -use alloy::hex; use alloy::primitives::Uint; use alloy::providers::Provider; use alloy::sol_types::SolEvent; +use alloy::{consensus::BlockHeader, hex}; use async_trait::async_trait; use e3_evm_helpers::{ contracts::{EnclaveContract, EnclaveContractFactory, EnclaveRead, ReadOnly}, events::{CiphertextOutputPublished, E3Activated, InputPublished, PlaintextOutputPublished}, listener::EventListener, }; +// TODO: Remove eyre in favour of thiserror use eyre::eyre; use eyre::Result; use serde::{de::DeserializeOwned, Serialize}; @@ -24,7 +26,7 @@ use std::future::Future; use std::{collections::HashMap, sync::Arc}; use thiserror::Error; use tokio::sync::RwLock; -use tokio::task::JoinHandle; +use tracing::info; type E3Id = u64; @@ -142,15 +144,29 @@ impl DataStore for SharedStore { } } -#[derive(Clone)] -pub struct EnclaveIndexer { +/// Stores E3 event data on a datastore (persisted or otherwise) for easy querying. +pub struct EnclaveIndexer { listener: EventListener, + callbacks: CallbackQueue, contract: EnclaveContract, store: Arc>, contract_address: String, chain_id: u64, } +impl Clone for EnclaveIndexer { + fn clone(&self) -> Self { + Self { + listener: self.listener.clone(), + callbacks: self.callbacks.clone(), + contract: self.contract.clone(), + store: self.store.clone(), + contract_address: self.contract_address.clone(), + chain_id: self.chain_id, + } + } +} + impl EnclaveIndexer { pub async fn new_with_in_mem_store( listener: EventListener, @@ -172,6 +188,7 @@ impl EnclaveIndexer { } impl EnclaveIndexer { + /// Try to create a new EnclaveIndexer pub async fn new( listener: EventListener, contract: EnclaveContract, @@ -182,6 +199,7 @@ impl EnclaveIndexer { let mut instance = Self { store: Arc::new(RwLock::new(store)), contract, + callbacks: CallbackQueue::new(), listener, contract_address, chain_id, @@ -190,6 +208,7 @@ impl EnclaveIndexer { Ok(instance) } + /// Try to create a new EnclaveIndexer from an endpoint and an address pub async fn from_endpoint_address( ws_url: &str, contract_address: &str, @@ -200,6 +219,7 @@ impl EnclaveIndexer { EnclaveIndexer::new(listener, contract, store).await } + /// Add a new Solidity event handler to the indexer pub async fn add_event_handler(&mut self, handler: F) where E: SolEvent + Send + Clone + 'static, @@ -217,6 +237,45 @@ impl EnclaveIndexer { .await; } + /// Register a callback for execution after the given timestap as returned by the blockchain. + pub fn dispatch_after_timestamp(&mut self, when: u64, callback: F) + where + F: Fn(SharedStore) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + info!("%%%%% ***** >>>>>> dispatch_after_timestamp time={}", when); + let store = SharedStore::new(self.store.clone()); + let callback = Arc::new(callback); + + self.callbacks.push(when, move || { + info!("Running callback: time={}", when); + let callback = Arc::clone(&callback); + let store = store.clone(); + callback(store) + }); + } + + /// Start listening + pub fn start(&mut self) { + self.listener.start() + } + + /// Get E3 data by ID + pub async fn get_e3(&self, e3_id: u64) -> Result { + let (e3, _) = get_e3(self.store.clone(), e3_id).await?; + Ok(e3) + } + + /// Get a handle to the listener + pub fn get_listener(&self) -> EventListener { + self.listener.clone() + } + + /// Get a handle to the store + pub fn get_store(&self) -> SharedStore { + SharedStore::new(self.store.clone()) + } + async fn register_e3_activated(&mut self) -> Result<()> { let db = self.store.clone(); let contract = self.contract.clone(); @@ -227,7 +286,6 @@ impl EnclaveIndexer { let db = SharedStore::new(db.clone()); let enclave_address = enclave_address.clone(); let contract = contract.clone(); - async move { println!( "E3Activated: id={}, expiration={}, pubkey=0x{}...", @@ -245,6 +303,7 @@ impl EnclaveIndexer { u64_try_from(e3.startWindow[0])?, u64_try_from(e3.startWindow[1])?, ]; + // NOTE: we are only saving protocol specific info // here and not CRISP specific info so E3 corresponds to the solidity E3 let e3_obj = E3 { @@ -347,30 +406,32 @@ impl EnclaveIndexer { Ok(()) } + async fn register_blocktime_callback_handler(&mut self) -> Result<()> { + info!("register_blocktime_callback_handler()..."); + let callbacks = self.callbacks.clone(); + self.listener + .add_block_handler(move |block| { + let timestamp = block.timestamp(); + let blockheight = block.number(); + let callbacks = callbacks.clone(); + async move { + info!("ON BLOCK: {}:{}", blockheight, timestamp); + callbacks.execute_until_including(timestamp).await?; + Ok(()) + } + }) + .await; + Ok(()) + } + async fn setup_listeners(&mut self) -> Result<()> { self.register_e3_activated().await?; self.register_input_published().await?; self.register_ciphertext_output_published().await?; self.register_plaintext_output_published().await?; + self.register_blocktime_callback_handler().await?; Ok(()) } - - pub fn start(&self) -> JoinHandle> { - self.listener.start() - } - - pub async fn get_e3(&self, e3_id: u64) -> Result { - let (e3, _) = get_e3(self.store.clone(), e3_id).await?; - Ok(e3) - } - - pub fn get_listener(&self) -> EventListener { - self.listener.clone() - } - - pub fn get_store(&self) -> SharedStore { - SharedStore::new(self.store.clone()) - } } pub async fn get_e3( diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index b42c0bff8c..0a3b0466bc 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -4,6 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +pub mod callback_queue; mod indexer; pub mod models; mod repo; diff --git a/examples/CRISP/Cargo.lock b/examples/CRISP/Cargo.lock index 72614a8689..c9891247b1 100644 --- a/examples/CRISP/Cargo.lock +++ b/examples/CRISP/Cargo.lock @@ -2298,6 +2298,7 @@ dependencies = [ "futures-util", "once_cell", "tokio", + "tracing", ] [[package]] @@ -2312,6 +2313,7 @@ dependencies = [ "serde", "thiserror 1.0.69", "tokio", + "tracing", ] [[package]] diff --git a/examples/CRISP/scripts/dev_server.sh b/examples/CRISP/scripts/dev_server.sh index 928ad6c88b..27096d8aa8 100755 --- a/examples/CRISP/scripts/dev_server.sh +++ b/examples/CRISP/scripts/dev_server.sh @@ -4,4 +4,7 @@ set -e export CARGO_INCREMENTAL=1 +echo "<<................RUNNING SERVER...............>>" + + (cd ./server && cargo run --bin server) diff --git a/examples/CRISP/server/src/server/indexer.rs b/examples/CRISP/server/src/server/indexer.rs index 2e029b5f55..fa9a54ccda 100644 --- a/examples/CRISP/server/src/server/indexer.rs +++ b/examples/CRISP/server/src/server/indexer.rs @@ -15,6 +15,7 @@ use crate::server::{ use alloy::providers::{Provider, ProviderBuilder}; use alloy::sol_types::{sol_data, SolType}; use alloy_primitives::{Address, U256}; +use e3_sdk::indexer::SharedStore; use e3_sdk::{ bfv_helpers::decode_bytes_to_vec_u64, evm_helpers::{ @@ -31,15 +32,13 @@ use e3_sdk::{ }; use evm_helpers::CRISPContractFactory; use eyre::Context; +use eyre::Result; use log::info; use num_bigint::BigUint; -use std::error::Error; use std::time::Duration; use tokio::time::sleep; -type Result = std::result::Result>; - -pub async fn register_e3_requested( +async fn register_e3_requested( mut indexer: EnclaveIndexer, ) -> Result> { // E3Requested @@ -178,86 +177,34 @@ pub async fn register_e3_requested( Ok(indexer) } -pub async fn register_e3_activated( +async fn register_e3_activated( mut indexer: EnclaveIndexer, ) -> Result> { + info!("** register_e3_activated"); + let indexer_clone = indexer.clone(); // E3Activated indexer .add_event_handler(move |event: E3Activated, store| { - let e3_id = event.e3Id.to::(); - let mut repo = CrispE3Repository::new(store.clone(), e3_id); - let mut current_round_repo = CurrentRoundRepository::new(store); - let expiration = event.expiration.to::(); - - info!("[e3_id={}] Handling E3 request", e3_id); + info!("E3Activated!"); + let mut indexer_clone = indexer_clone.clone(); async move { + let e3_id = event.e3Id.to::(); + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let mut current_round_repo = CurrentRoundRepository::new(store); + let expiration = event.expiration.to::(); + + info!("[e3_id={}] Handling E3 request", e3_id); repo.start_round().await?; current_round_repo .set_current_round(CurrentRound { id: e3_id }) .await?; - // Calculate expiration time to sleep until - let now = get_current_timestamp_rpc().await?; - info!("[e3_id={}] Current time before sleep: {}", e3_id, now); - let wait_duration = if expiration > now { - let secs = expiration - now; - info!( - "[e3_id={}] Need to wait {} seconds until expiration", - e3_id, secs - ); - Duration::from_secs(secs) - } else { - info!("[e3_id={}] Expired E3", e3_id); - Duration::ZERO - }; - if !wait_duration.is_zero() { - sleep(wait_duration).await; - } - let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; - repo.update_status("Expired").await?; - - if repo.get_vote_count().await? > 0 { - info!("[e3_id={}] Starting computation for E3", e3_id); - repo.update_status("Computing").await?; - - let (id, status) = run_compute( - e3_id, - e3.e3_params, - e3.ciphertext_inputs, - format!("{}/state/add-result", CONFIG.enclave_server_url), - ) - .await - .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; - - if id != e3_id { - return Err(eyre::eyre!( - "Computation request returned unexpected E3 ID: expected {}, got {}", - e3_id, - id - ) - .into()); - } - - if status != "processing" { - return Err(eyre::eyre!( - "Computation request failed with status: {}", - status - ) - .into()); - } - - info!("[e3_id={}] Request Computation for E3", e3_id); - - repo.update_status("PublishingCiphertext").await?; - } else { - info!( - "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", - e3_id - ); - repo.update_status("Finished").await?; - } - info!("[e3_id={}] E3 request handled successfully.", e3_id); + info!("[e3_id={}] Registering hook for {}", e3_id, expiration); + indexer_clone.dispatch_after_timestamp(expiration, move |store| { + info!("Running...."); + handle_e3_input_deadline_expiration(e3_id, store) + }); Ok(()) } @@ -266,7 +213,49 @@ pub async fn register_e3_activated( Ok(indexer) } -pub async fn register_ciphertext_output_published( +async fn handle_e3_input_deadline_expiration( + e3_id: u64, + store: SharedStore, +) -> Result<()> { + let mut repo = CrispE3Repository::new(store.clone(), e3_id); + let e3: e3_sdk::indexer::models::E3 = repo.get_e3().await?; + repo.update_status("Expired").await?; + if repo.get_vote_count().await? > 0 { + info!("[e3_id={}] Starting computation for E3", e3_id); + repo.update_status("Computing").await?; + let (id, status) = run_compute( + e3_id, + e3.e3_params, + e3.ciphertext_inputs, + format!("{}/state/add-result", CONFIG.enclave_server_url), + ) + .await + .map_err(|e| eyre::eyre!("Error sending run compute request: {e}"))?; + if id != e3_id { + return Err(eyre::eyre!( + "Computation request returned unexpected E3 ID: expected {}, got {}", + e3_id, + id + ) + .into()); + }; + if status != "processing" { + return Err(eyre::eyre!("Computation request failed with status: {}", status).into()); + }; + info!("[e3_id={}] Request Computation for E3", e3_id); + repo.update_status("PublishingCiphertext").await?; + } else { + info!( + "[e3_id={}] E3 has no votes to decrypt. Setting status to Finished.", + e3_id + ); + repo.update_status("Finished").await?; + } + info!("[e3_id={}] E3 request handled successfully.", e3_id); + Ok(()) +} + +async fn register_ciphertext_output_published( mut indexer: EnclaveIndexer, ) -> Result> { // CiphertextOutputPublished @@ -284,7 +273,7 @@ pub async fn register_ciphertext_output_published( Ok(indexer) } -pub async fn register_plaintext_output_published( +async fn register_plaintext_output_published( mut indexer: EnclaveIndexer, ) -> Result> { // PlaintextOutputPublished @@ -367,6 +356,7 @@ pub async fn register_committee_published( info!("[e3_id={}] Wait duration: {:?}", event.e3Id, wait_duration); // Sleep until start time + // XXX: refactor to use blocktime if !wait_duration.is_zero() { sleep(wait_duration).await; } @@ -401,13 +391,25 @@ pub async fn start_indexer( store: impl DataStore, private_key: &str, ) -> Result<()> { + info!("== START INDEXER! =="); + let readonly_contract = EnclaveContractFactory::create_read(ws_url, contract_address).await?; + let enclave_contract_listener = EventListener::create_contract_listener_from_provider( + contract_address, + readonly_contract.get_provider(), + )?; + let readwrite_contract = EnclaveContractFactory::create_write(ws_url, contract_address, private_key).await?; - let enclave_contract_listener = - EventListener::create_contract_listener(ws_url, contract_address).await?; + // Registry Listener + let registry_contract_listener = EventListener::create_contract_listener_from_provider( + registry_address, + readonly_contract.get_provider(), + )?; + let mut registry_listener = + register_committee_published(registry_contract_listener, readwrite_contract).await?; // CRISP indexer let crisp_indexer = @@ -415,14 +417,9 @@ pub async fn start_indexer( let crisp_indexer = register_e3_requested(crisp_indexer).await?; let crisp_indexer = register_e3_activated(crisp_indexer).await?; let crisp_indexer = register_ciphertext_output_published(crisp_indexer).await?; - let crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; + let mut crisp_indexer = register_plaintext_output_published(crisp_indexer).await?; crisp_indexer.start(); - // Registry Listener - let registry_contract_listener = - EventListener::create_contract_listener(&ws_url, registry_address).await?; - let registry_listener = - register_committee_published(registry_contract_listener, readwrite_contract).await?; registry_listener.start(); Ok(()) diff --git a/examples/CRISP/server/src/server/program_server_request.rs b/examples/CRISP/server/src/server/program_server_request.rs index 10687f82e4..05ffbdd014 100644 --- a/examples/CRISP/server/src/server/program_server_request.rs +++ b/examples/CRISP/server/src/server/program_server_request.rs @@ -44,6 +44,7 @@ pub struct ProcessingResponse { pub e3_id: u64, } +/// Run the computation remotely pub async fn run_compute( e3_id: u64, params: Vec, diff --git a/package.json b/package.json index fed02430e4..d4f78b860e 100644 --- a/package.json +++ b/package.json @@ -12,9 +12,9 @@ "bump:versions": "tsx scripts/bump-versions.ts", "clean": "tsx scripts/clean.ts", "compile": "pnpm build:ts && pnpm rust:build", - "lint": "eslint . && pnpm evm:lint && pnpm rust:lint && pnpm noir:lint", - "format": "prettier --write \"**/*.{js,jsx,mjs,cjs,json,md,mdx,ts,tsx,yml,yaml,css}\"", - "format:check": "prettier --check \"**/*.{js,jsx,mjs,cjs,json,md,mdx,ts,tsx,yml,yaml,css}\"", + "lint": "pnpm eslint . && pnpm evm:lint && pnpm rust:lint && pnpm noir:lint", + "format": "pnpm prettier --write \"**/*.{js,jsx,mjs,cjs,json,md,mdx,ts,tsx,yml,yaml,css}\"", + "format:check": "pnpm prettier --check \"**/*.{js,jsx,mjs,cjs,json,md,mdx,ts,tsx,yml,yaml,css}\"", "check:license": "./scripts/check-license-headers.sh", "check:size": "./scripts/check-size.sh", "check:pnpm": "./scripts/check-pnpm.sh", diff --git a/packages/enclave-contracts/contracts/interfaces/IE3.sol b/packages/enclave-contracts/contracts/interfaces/IE3.sol index e18023d8df..d2d787eb51 100644 --- a/packages/enclave-contracts/contracts/interfaces/IE3.sol +++ b/packages/enclave-contracts/contracts/interfaces/IE3.sol @@ -18,7 +18,7 @@ import { IDecryptionVerifier } from "./IDecryptionVerifier.sol"; * @param requestBlock Block number when the E3 computation was requested * @param startWindow Start window for the computation: index 0 is minimum block, index 1 is the maximum block * @param duration Duration of the E3 computation in blocks or time units - * @param expiration Timestamp when committee duties expire and computation is considered failed + * @param expiration Timestamp when input deadline has expired and computation should commence * @param encryptionSchemeId Identifier for the encryption scheme used in this computation * @param e3Program Address of the E3 Program contract that validates and verifies the computation * @param e3ProgramParams ABI encoded computation parameters specific to the E3 program diff --git a/packages/enclave-contracts/contracts/interfaces/IEnclave.sol b/packages/enclave-contracts/contracts/interfaces/IEnclave.sol index f8b926a4d9..2a7a36cbce 100644 --- a/packages/enclave-contracts/contracts/interfaces/IEnclave.sol +++ b/packages/enclave-contracts/contracts/interfaces/IEnclave.sol @@ -26,7 +26,7 @@ interface IEnclave { /// @notice This event MUST be emitted when an Encrypted Execution Environment (E3) is successfully activated. /// @param e3Id ID of the E3. - /// @param expiration Timestamp when committee duties expire. + /// @param expiration Timestamp when input deadline has expired. /// @param committeePublicKey Public key of the committee. event E3Activated( uint256 e3Id, diff --git a/packages/enclave-sdk/src/enclave-sdk.ts b/packages/enclave-sdk/src/enclave-sdk.ts index 1b1089670c..754b814cde 100644 --- a/packages/enclave-sdk/src/enclave-sdk.ts +++ b/packages/enclave-sdk/src/enclave-sdk.ts @@ -450,36 +450,6 @@ export class EnclaveSDK { this.eventListener.cleanup() } - /** - * Update SDK configuration - */ - // TODO: We should delete this as we don't want a stateful client. - public updateConfig(newConfig: Partial): void { - if (newConfig.publicClient) { - this.config.publicClient = newConfig.publicClient - this.eventListener = new EventListener(newConfig.publicClient) - } - - if (newConfig.walletClient) { - this.config.walletClient = newConfig.walletClient - } - - if (newConfig.contracts) { - this.config.contracts = { - ...this.config.contracts, - ...newConfig.contracts, - } - } - - if (newConfig.chainId) { - this.config.chainId = newConfig.chainId - } - - this.contractClient = new ContractClient(this.config.publicClient, this.config.walletClient, this.config.contracts) - - this.initialized = false - } - public static create(options: { rpcUrl: string contracts: {