diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index a5c8e7ca7a..c871a68f78 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -55,6 +55,8 @@ pub struct CiphernodeBuilder { keyshare: Option, logging: bool, multithread_cache: Option>, + multithread_concurrent_jobs: Option, + multithread_capture_events: bool, plaintext_agg: bool, pubkey_agg: bool, rng: SharedRng, @@ -99,12 +101,14 @@ impl CiphernodeBuilder { multithread_cache: None, plaintext_agg: false, pubkey_agg: false, + multithread_concurrent_jobs: None, rng, source_bus: None, sortition_backend: SortitionBackend::score(), testmode_errors: false, testmode_history: false, threads: None, + multithread_capture_events: false, threshold_plaintext_agg: false, } } @@ -192,13 +196,36 @@ impl CiphernodeBuilder { self } - /// Setup how many threads to use within the multithread actor - #[deprecated(note = "This method is under construction and should not be used yet")] + /// Setup how many threads to use within the multithread actor for it's rayon based workload pub fn with_threads(mut self, threads: usize) -> Self { self.threads = Some(threads); self } + /// This will provide one thread for the actor model and use all other threads for + /// rayon based workloads + pub fn with_max_threads(mut self) -> Self { + self.threads = Some(Multithread::get_max_threads_minus(1)); + self + } + + /// This will save the given number of threads from being used by the rayon threadpool + pub fn with_max_threads_minus(mut self, threads: usize) -> Self { + self.threads = Some(Multithread::get_max_threads_minus(threads)); + self + } + + /// Set the number of concurrent jobs defaults to 1 + pub fn with_multithread_concurrent_jobs(mut self, jobs: usize) -> Self { + self.multithread_concurrent_jobs = if jobs >= 1 { Some(jobs) } else { None }; + self + } + + pub fn with_multithread_capture_events(mut self) -> Self { + self.multithread_capture_events = true; + self + } + /// Setup a ThresholdPlaintextAggregator pub fn with_threshold_plaintext_aggregation(mut self) -> Self { self.threshold_plaintext_agg = true; @@ -474,6 +501,8 @@ impl CiphernodeBuilder { self.rng.clone(), self.cipher.clone(), self.threads.unwrap_or(1), + self.multithread_concurrent_jobs.unwrap_or(1), + self.multithread_capture_events, ); // Set the cache diff --git a/crates/entrypoint/src/start/aggregator_start.rs b/crates/entrypoint/src/start/aggregator_start.rs index 8260ddf199..c9a93f386a 100644 --- a/crates/entrypoint/src/start/aggregator_start.rs +++ b/crates/entrypoint/src/start/aggregator_start.rs @@ -43,6 +43,7 @@ pub async fn execute( .with_contract_enclave_full() .with_contract_bonding_registry() .with_contract_ciphernode_registry() + .with_max_threads() .with_pubkey_aggregation(); if experimental_trbfv { diff --git a/crates/entrypoint/src/start/start.rs b/crates/entrypoint/src/start/start.rs index 6ad6930cb6..1cc8bec79a 100644 --- a/crates/entrypoint/src/start/start.rs +++ b/crates/entrypoint/src/start/start.rs @@ -43,6 +43,7 @@ pub async fn execute( .with_chains(&config.chains()) .with_contract_enclave_reader() .with_contract_bonding_registry() + .with_max_threads() .with_contract_ciphernode_registry(); if experimental_trbfv { diff --git a/crates/events/src/enclave_event/compute_request/mod.rs b/crates/events/src/enclave_event/compute_request/mod.rs index 75ff87d3a4..64ff467424 100644 --- a/crates/events/src/enclave_event/compute_request/mod.rs +++ b/crates/events/src/enclave_event/compute_request/mod.rs @@ -27,6 +27,25 @@ pub enum ComputeRequest { // Eg. TFHE(TFHERequest) } +impl ToString for ComputeRequest { + fn to_string(&self) -> String { + match self { + Self::TrBFV(e3_trbfv::TrBFVRequest::GenEsiSss(_)) => "GenEsiSss", + Self::TrBFV(e3_trbfv::TrBFVRequest::GenPkShareAndSkSss(_)) => "GenPkShareAndSkSss", + Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateDecryptionKey(_)) => { + "CalculateDecryptionKey" + } + Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateDecryptionShare(_)) => { + "CalculateDecryptionShare" + } + Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateThresholdDecryption(_)) => { + "CalculateThresholdDecryption" + } + } + .to_string() + } +} + /// The compute result from a threadpool computation /// This enum provides protocol disambiguation #[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -43,6 +62,8 @@ pub enum ComputeResponse { pub enum ComputeRequestError { /// By Protocol TrBFV(e3_trbfv::TrBFVError), + RecvError(String), + SemaphoreError(String), // Eg. TFHE(TFHEError) } @@ -50,6 +71,7 @@ impl std::error::Error for ComputeRequestError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { ComputeRequestError::TrBFV(err) => Some(err), + _ => None, } } } @@ -58,7 +80,13 @@ impl fmt::Display for ComputeRequestError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ComputeRequestError::TrBFV(err) => { - write!(f, "TrBFV error: {:?}", err) + write!(f, "We had an error number crunching: {:?}", err) + } + ComputeRequestError::SemaphoreError(name) => { + write!(f, "Multithread SemaphoreError. This means there was a problem acquiring the semaphore lock for this ComputeRequest: '{name}'") + } + ComputeRequestError::RecvError(name) => { + write!(f, "Multithread RecvError. This means there was a problem receiving a response for this ComputeRequest: '{name}'") } } } diff --git a/crates/keyshare/src/threshold_keyshare.rs b/crates/keyshare/src/threshold_keyshare.rs index d141672973..8d8a5b5918 100644 --- a/crates/keyshare/src/threshold_keyshare.rs +++ b/crates/keyshare/src/threshold_keyshare.rs @@ -717,7 +717,7 @@ impl ThresholdKeyshare { /// handler with the results. Errors at this stage are simply /// logged. Eventually we will need to configure a policy here /// For example retry with exponential backoff - fn handle_compute_request( + fn multithread_request( &mut self, request_fn: F, response_fn: R, @@ -778,7 +778,7 @@ impl Handler for ThresholdKeyshare { impl Handler for ThresholdKeyshare { type Result = ResponseActFuture; fn handle(&mut self, msg: GenEsiSss, _: &mut Self::Context) -> Self::Result { - self.handle_compute_request( + self.multithread_request( |act| act.handle_gen_esi_sss_requested(msg), |act, res, _| act.handle_gen_esi_sss_response(res), ) @@ -788,7 +788,7 @@ impl Handler for ThresholdKeyshare { impl Handler for ThresholdKeyshare { type Result = ResponseActFuture; fn handle(&mut self, msg: GenPkShareAndSkSss, _: &mut Self::Context) -> Self::Result { - self.handle_compute_request( + self.multithread_request( |act| act.handle_gen_pk_share_and_sk_sss_requested(msg), |act, res, _| act.handle_gen_pk_share_and_sk_sss_response(res), ) @@ -798,7 +798,7 @@ impl Handler for ThresholdKeyshare { impl Handler for ThresholdKeyshare { type Result = ResponseActFuture; fn handle(&mut self, msg: AllThresholdSharesCollected, _: &mut Self::Context) -> Self::Result { - self.handle_compute_request( + self.multithread_request( |act| act.handle_all_threshold_shares_collected(msg), |act, res, _| act.handle_calculate_decryption_key_response(res), ) @@ -808,7 +808,7 @@ impl Handler for ThresholdKeyshare { impl Handler for ThresholdKeyshare { type Result = ResponseActFuture; fn handle(&mut self, msg: CiphertextOutputPublished, _: &mut Self::Context) -> Self::Result { - self.handle_compute_request( + self.multithread_request( |act| act.handle_ciphertext_output_published(msg), |act, res, _| act.handle_calculate_decryption_share_response(res), ) diff --git a/crates/multithread/src/lib.rs b/crates/multithread/src/lib.rs index 4dce9f8d7e..3cb3e8a78a 100644 --- a/crates/multithread/src/lib.rs +++ b/crates/multithread/src/lib.rs @@ -4,10 +4,11 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering; +mod report; + use std::sync::Arc; use std::thread; +use std::time::Duration; use std::time::Instant; use actix::prelude::*; @@ -24,42 +25,58 @@ use e3_trbfv::{TrBFVError, TrBFVRequest, TrBFVResponse}; use e3_utils::SharedRng; use rand::Rng; use rayon::{self, ThreadPool}; +use report::MultithreadReport; +use tokio::sync::Semaphore; use tracing::error; use tracing::info; +use tracing::warn; /// Multithread actor pub struct Multithread { rng: SharedRng, cipher: Arc, - thread_pool: Option>, + rayon_limit: Arc, + thread_pool: Arc, + report: Option, } impl Multithread { - pub fn new(rng: SharedRng, cipher: Arc, threads: usize) -> Self { - let thread_pool = if threads == 1 { - None - } else { - let thread_pool = Arc::new( - rayon::ThreadPoolBuilder::new() - .num_threads(threads) - .build() - .expect("Failed to create Rayon thread pool"), - ); - info!( - "Created threadpool with {} threads.", - thread_pool.current_num_threads() - ); - - Some(thread_pool) - }; + pub fn new( + rng: SharedRng, + cipher: Arc, + rayon_threads: usize, + max_simultaneous_rayon_tasks: usize, + capture_events: bool, + ) -> Self { + let thread_pool = Arc::new( + rayon::ThreadPoolBuilder::new() + .num_threads(rayon_threads) + .build() + .expect("Failed to create Rayon thread pool"), + ); + info!( + "Created threadpool with {} threads.", + thread_pool.current_num_threads() + ); + let rayon_limit = Arc::new(Semaphore::new(max_simultaneous_rayon_tasks)); Self { rng, cipher, thread_pool, + rayon_limit, + report: if capture_events { + Some(MultithreadReport::new( + rayon_threads, + max_simultaneous_rayon_tasks, + )) + } else { + None + }, } } + /// Subtract the given amount from the total number of available threads and return the result pub fn get_max_threads_minus(amount: usize) -> usize { let total_threads = thread::available_parallelism() .map(|n| n.get()) @@ -68,8 +85,21 @@ impl Multithread { threads_to_use } - pub fn attach(rng: SharedRng, cipher: Arc, threads: usize) -> Addr { - Self::new(rng.clone(), cipher.clone(), threads).start() + pub fn attach( + rng: SharedRng, + cipher: Arc, + rayon_threads: usize, + max_simultaneous_rayon_tasks: usize, + capture_events: bool, + ) -> Addr { + Self::new( + rng.clone(), + cipher.clone(), + rayon_threads, + max_simultaneous_rayon_tasks, + capture_events, + ) + .start() } } @@ -77,48 +107,122 @@ impl Actor for Multithread { type Context = actix::Context; } -static PENDING_TASKS: AtomicUsize = AtomicUsize::new(0); -static COMPLETED_TASKS: AtomicUsize = AtomicUsize::new(0); - impl Handler for Multithread { type Result = ResponseFuture>; - fn handle(&mut self, msg: ComputeRequest, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: ComputeRequest, ctx: &mut Self::Context) -> Self::Result { let cipher = self.cipher.clone(); let rng = self.rng.clone(); let thread_pool = self.thread_pool.clone(); + let semaphore = self.rayon_limit.clone(); + let msg_string = msg.to_string(); + let self_addr = ctx.address(); + let capture_events = self.report.is_some(); + let job_name = msg_string.clone(); Box::pin(async move { - // This uses channels to traack pending and complete tasks when - // using the thread pool - let res = if let Some(pool) = thread_pool { - let pending = PENDING_TASKS.fetch_add(1, Ordering::Relaxed); - info!( - "Spawning task. Pending: {}, Completed: {}", - pending + 1, - COMPLETED_TASKS.load(Ordering::Relaxed) + // Block until we have enough task slots available we have to do this this way as + // because we use do_send() everywhere there is no backpressure on the actors + let _permit = semaphore + .acquire() + .await + .map_err(|_| ComputeRequestError::SemaphoreError(msg_string.to_string()))?; + + // Warn of long running jobs + let warning_handle = tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + warn!( + "Job '{}' has been running for more than 10 seconds", + job_name ); - let (tx, rx) = tokio::sync::oneshot::channel(); - pool.spawn(move || { - let res = handle_compute_request(rng, cipher, msg); - PENDING_TASKS.fetch_sub(1, Ordering::Relaxed); - COMPLETED_TASKS.fetch_add(1, Ordering::Relaxed); - - let _ = tx.send(res); - }); - // TODO: handle recv error - rx.await.unwrap() - } else { - // If not using the thread pool simply call inline - handle_compute_request(rng, cipher, msg) + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; + error!( + "Job '{}' has been running for more than 30 seconds", + job_name + ); + }); + + // This uses channels to track pending and complete tasks when + // using the thread pool + let (tx, rx) = tokio::sync::oneshot::channel(); + + // We spawn a thread on rayon moving to "sync"-land + thread_pool.spawn(move || { + // Do the actual work this is gonna take a while... + let (result, duration) = handle_compute_request(rng, cipher, msg); + + // try to return the result and it's duration note this is sync as it is a oneshot sender. + if let Err(res) = tx.send((result, Some(duration))) { + error!( + "There was an error sending the result from the multithread actor: result = {:?}", + res + ); + } + }); + // we are back in async io land... + + // await the oneshot + let (result, duration) = rx.await.unwrap_or_else(|_| { + ( + Err(ComputeRequestError::RecvError(msg_string.to_string())), + None, + ) + }); + + warning_handle.abort(); + + // incase we are collecting events for a report + if capture_events { + if let Some(dur) = duration { + self_addr.do_send(TrackDuration::new(msg_string, dur)) + } }; - res + result }) } } -// TODO: implement tracing for this -// This enabled us to get insight into the timing of our long running functions -fn timefunc(name: &str, id: u8, func: F) -> Result +impl Handler for Multithread { + type Result = (); + fn handle(&mut self, msg: TrackDuration, _: &mut Self::Context) -> Self::Result { + // If the report is there we are tracking durations + if let Some(report) = &mut self.report { + report.track(msg); + }; + } +} + +impl Handler for Multithread { + type Result = Option; + fn handle(&mut self, _: GetReport, _: &mut Self::Context) -> Self::Result { + if let Some(ref report) = self.report { + return Some(report.to_report().to_string()); + } + None + } +} + +#[derive(Message, Debug)] +#[rtype("()")] +pub struct TrackDuration { + name: String, + duration: Duration, +} + +impl TrackDuration { + pub fn new(name: String, duration: Duration) -> Self { + Self { name, duration } + } +} + +#[derive(Message, Debug)] +#[rtype("Option")] +pub struct GetReport; + +fn timefunc( + name: &str, + id: u8, + func: F, +) -> (Result, Duration) where F: FnOnce() -> Result, { @@ -127,14 +231,15 @@ where let out = func(); let dur = start.elapsed(); info!("\nFINISHED MULTITHREAD `{}`({}) in {:?}\n", name, id, dur); - out + (out, dur) // return output as well as timing info } +/// Handle our compute request. This function is run on a rayon threadpool. fn handle_compute_request( rng: SharedRng, cipher: Arc, request: ComputeRequest, -) -> Result { +) -> (Result, Duration) { let id: u8 = rand::thread_rng().gen(); match request { ComputeRequest::TrBFV(TrBFVRequest::GenPkShareAndSkSss(req)) => timefunc( diff --git a/crates/multithread/src/report.rs b/crates/multithread/src/report.rs new file mode 100644 index 0000000000..13c72f209d --- /dev/null +++ b/crates/multithread/src/report.rs @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::{collections::HashMap, thread, time::Duration}; + +use crate::TrackDuration; + +#[derive(Default)] +pub struct MultithreadReport { + rayon_threads: usize, + max_simultaneous_rayon_tasks: usize, + events: Vec, +} + +impl MultithreadReport { + pub fn new(rayon_threads: usize, max_simultaneous_rayon_tasks: usize) -> Self { + Self { + rayon_threads, + max_simultaneous_rayon_tasks, + events: Vec::new(), + } + } + + pub fn track(&mut self, msg: TrackDuration) { + self.events.push(msg); + } + + pub fn to_report(&self) -> FlattenedReport { + let mut total_dur: HashMap = HashMap::new(); + let mut runs: HashMap = HashMap::new(); + let cores_available: usize = match thread::available_parallelism() { + Ok(count) => count.into(), + Err(_) => 0usize, + }; + + // Accumulate durations and count runs + for event in &self.events { + *runs.entry(event.name.clone()).or_insert(0) += 1; + + total_dur + .entry(event.name.clone()) + .and_modify(|d| *d += event.duration) + .or_insert(event.duration); + } + + // Calculate averages + let avg_dur = total_dur + .clone() + .into_iter() + .map(|(name, total)| { + let count = runs[&name]; + let avg = Duration::from_nanos((total.as_nanos() / count as u128) as u64); + (name, avg) + }) + .collect(); + + let mt_total = total_dur + .clone() + .into_iter() + .fold(Duration::ZERO, |acc, (_, item)| acc + item); + + FlattenedReport { + cores_available, + total_dur, + avg_dur, + mt_total, + rayon_threads: self.rayon_threads, + max_simultaneous_rayon_tasks: self.max_simultaneous_rayon_tasks, + runs, + } + } +} + +pub struct FlattenedReport { + cores_available: usize, + rayon_threads: usize, + max_simultaneous_rayon_tasks: usize, + avg_dur: HashMap, + total_dur: HashMap, + mt_total: Duration, + runs: HashMap, +} + +impl std::fmt::Display for FlattenedReport { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, "{:<32} {:>5}", "Rayon Threads:", self.rayon_threads)?; + writeln!( + f, + "{:<32} {:>5}", + "Max Simultaneous Rayon Tasks:", self.max_simultaneous_rayon_tasks + )?; + writeln!(f, "{:<32} {:>5}", "Cores Available:", self.cores_available)?; + writeln!(f)?; + writeln!( + f, + "{:<30} {:>15} {:>10} {:>15}", + "Name", "Avg Duration", "Runs", "Total Duration" + )?; + writeln!(f, "{}", "-".repeat(73))?; + + let mut entries: Vec<_> = self.avg_dur.iter().collect(); + entries.sort_by(|a, b| a.0.cmp(b.0)); + + for (name, avg_dur) in entries { + let runs = self.runs.get(name).unwrap_or(&0); + let total_dur = self.total_dur.get(name).unwrap(); + writeln!( + f, + "{:<30} {:>15?} {:>10} {:>15?}", + name, avg_dur, runs, total_dur + )?; + } + + writeln!(f, "{:<50} {:>22?}", "Total time", self.mt_total)?; + + Ok(()) + } +} diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index 2035e5abd7..0e5728b6cd 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -14,7 +14,7 @@ use e3_events::{ EnclaveEvent, EventBus, EventBusConfig, OperatorActivationChanged, PlaintextAggregated, TicketBalanceUpdated, }; -use e3_multithread::Multithread; +use e3_multithread::{GetReport, Multithread}; use e3_sdk::bfv_helpers::{build_bfv_params_arc, decode_bytes_to_vec_u64, encode_bfv_params}; use e3_test_helpers::ciphernode_system::CiphernodeSystemBuilder; use e3_test_helpers::{create_seed_from_u64, create_shared_rng_from_u64, AddToCommittee}; @@ -24,7 +24,7 @@ use e3_utils::utility_types::ArcBytes; use fhe::bfv::PublicKey; use fhe_traits::{DeserializeParametrized, Serialize}; use num_bigint::BigUint; -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{fs, sync::Arc}; pub fn save_snapshot(file_name: &str, bytes: &[u8]) { @@ -69,10 +69,29 @@ async fn setup_score_sortition_environment( Ok(()) } +fn serialize_report(report: &[(&str, Duration)]) -> String { + let max_key_len = report.iter().map(|(k, _)| k.len()).max().unwrap_or(0); + + report + .iter() + .map(|(key, duration)| { + format!( + "{:width$}: {:.3}s", + key, + duration.as_secs_f64(), + width = max_key_len + ) + }) + .collect::>() + .join("\n") +} + /// Test trbfv #[actix::test] #[serial_test::serial] async fn test_trbfv_actor() -> Result<()> { + let mut report: Vec<(&str, Duration)> = vec![]; + let whole_test = Instant::now(); use tracing_subscriber::{fmt, EnvFilter}; let subscriber = fmt() @@ -96,6 +115,8 @@ async fn test_trbfv_actor() -> Result<()> { // - Loopback libp2p simulation /////////////////////////////////////////////////////////////////////////////////// + let setup = Instant::now(); + // Create rng let rng = create_shared_rng_from_u64(42); @@ -115,6 +136,7 @@ async fn test_trbfv_actor() -> Result<()> { ); // Params for BFV + // TODO: use params set with secure params in test let params_raw = build_bfv_params_arc(degree, plaintext_modulus, moduli, None); // Encoded Params @@ -135,12 +157,15 @@ async fn test_trbfv_actor() -> Result<()> { let cipher = Arc::new(Cipher::from_password("I am the music man.").await?); // Actor system setup + // Seems like you cannot send more than one job at a time to rayon + let concurrent_jobs = 1; // leaving at 1 + let max_threadroom = Multithread::get_max_threads_minus(1); let multithread = Multithread::attach( rng.clone(), cipher.clone(), - // Multithread::get_max_threads_minus(2), - 1, // TODO: There is a bug running multithread around thread starvation. We may have to - // setup a queue + max_threadroom, + concurrent_jobs, + true, ); let nodes = CiphernodeSystemBuilder::new() @@ -178,12 +203,16 @@ async fn test_trbfv_actor() -> Result<()> { .build() .await?; + report.push(("Setup", setup.elapsed())); + + let committee_setup = Instant::now(); let chain_id = 1u64; let eth_addrs: Vec = nodes.iter().map(|n| n.address()).collect(); setup_score_sortition_environment(&bus, ð_addrs, chain_id).await?; // Flush all events nodes.flush_all_history(100).await?; + report.push(("Committee Setup", committee_setup.elapsed())); /////////////////////////////////////////////////////////////////////////////////// // 2. Trigger E3Requested @@ -195,7 +224,7 @@ async fn test_trbfv_actor() -> Result<()> { /////////////////////////////////////////////////////////////////////////////////// // Prepare round - + let e3_requested_timer = Instant::now(); // Trigger actor DKG let e3_id = E3id::new("0", 1); @@ -240,14 +269,34 @@ async fn test_trbfv_actor() -> Result<()> { })) .await?; + let committee_finalized_timer = Instant::now(); + + let expected = vec!["E3Requested", "CommitteeFinalized"]; + + let _ = nodes + .take_history_with_timeout(0, expected.len(), Duration::from_secs(1000)) + .await?; + + report.push(( + "Committee Finalization", + committee_finalized_timer.elapsed(), + )); + + let shares_timer = Instant::now(); let expected = vec![ - "E3Requested", - "CommitteeFinalized", "ThresholdShareCreated", "ThresholdShareCreated", "ThresholdShareCreated", "ThresholdShareCreated", "ThresholdShareCreated", + ]; + let _ = nodes + .take_history_with_timeout(0, expected.len(), Duration::from_secs(1000)) + .await?; + report.push(("All ThresholdShareCreated events", shares_timer.elapsed())); + + let shares_to_pubkey_agg_timer = Instant::now(); + let expected = vec![ "KeyshareCreated", "KeyshareCreated", "KeyshareCreated", @@ -255,11 +304,19 @@ async fn test_trbfv_actor() -> Result<()> { "KeyshareCreated", "PublicKeyAggregated", ]; - let h = nodes .take_history_with_timeout(0, expected.len(), Duration::from_secs(1000)) .await?; - + report.push(( + "ThresholdShares -> PublicKeyAggregated", + shares_to_pubkey_agg_timer.elapsed(), + )); + + report.push(( + "E3Request -> PublicKeyAggregated", + e3_requested_timer.elapsed(), + )); + let app_gen_timer = Instant::now(); assert_eq!(h.event_types(), expected); // Aggregate decryption @@ -287,11 +344,15 @@ async fn test_trbfv_actor() -> Result<()> { num_voters, num_votes_per_voter, ); + report.push(("Application CT Gen", app_gen_timer.elapsed())); + let running_app_timer = Instant::now(); println!("Running application to generate outputs..."); let outputs = e3_test_helpers::application::run_application(&inputs, params_raw, num_votes_per_voter); + report.push(("Running FHE Application", running_app_timer.elapsed())); + let publishing_ct_timer = Instant::now(); println!("Have outputs. Creating ciphertexts..."); let ciphertexts = outputs .into_iter() @@ -325,6 +386,10 @@ async fn test_trbfv_actor() -> Result<()> { .await?; assert_eq!(h.event_types(), expected); + report.push(( + "Ciphertext published -> PlaintextAggregated", + publishing_ct_timer.elapsed(), + )); let Some(EnclaveEvent::PlaintextAggregated { data: @@ -361,5 +426,11 @@ async fn test_trbfv_actor() -> Result<()> { assert_eq!(res, exp); } + let mt_report = multithread.send(GetReport).await.unwrap().unwrap(); + println!("{}", mt_report); + + report.push(("Entire Test", whole_test.elapsed())); + println!("{}", serialize_report(&report)); + Ok(()) }