diff --git a/crates/multithread/src/multithread.rs b/crates/multithread/src/multithread.rs index a62106fbc3..096bd7db37 100644 --- a/crates/multithread/src/multithread.rs +++ b/crates/multithread/src/multithread.rs @@ -46,8 +46,7 @@ use e3_zk_prover::{Provable, ZkBackend, ZkProver}; use fhe::bfv::PublicKey; use fhe_traits::DeserializeParametrized; use rand::Rng; -use tracing::error; -use tracing::info; +use tracing::{error, info}; /// Multithread actor pub struct Multithread { @@ -187,13 +186,46 @@ async fn handle_compute_request_event( let msg_string = msg.to_string(); let job_name = msg_string.clone(); let (msg, ctx) = msg.into_components(); - // We spawn a thread on rayon moving to "sync"-land + let request_snapshot = msg.clone(); - let (result, duration) = pool + let pool_result = pool .spawn(job_name, TaskTimeouts::default(), move || { handle_compute_request(rng, cipher, zk_prover, msg) }) - .await?; + .await; + + let (result, duration) = match pool_result { + Ok(v) => v, + Err(pool_err) => { + error!( + "Task pool error for compute request '{}': {pool_err}", + msg_string + ); + let error_kind = match &request_snapshot.request { + ComputeRequestKind::Zk(_) => ComputeRequestErrorKind::Zk( + ZkEventError::ProofGenerationFailed(format!("Pool error: {pool_err}")), + ), + ComputeRequestKind::TrBFV(ref trbfv_req) => { + let msg = format!("Pool error: {pool_err}"); + ComputeRequestErrorKind::TrBFV(match trbfv_req { + TrBFVRequest::GenPkShareAndSkSss(_) => TrBFVError::GenPkShareAndSkSss(msg), + TrBFVRequest::GenEsiSss(_) => TrBFVError::GenEsiSss(msg), + TrBFVRequest::CalculateDecryptionKey(_) => { + TrBFVError::CalculateDecryptionKey(msg) + } + TrBFVRequest::CalculateDecryptionShare(_) => { + TrBFVError::CalculateDecryptionShare(msg) + } + TrBFVRequest::CalculateThresholdDecryption(_) => { + TrBFVError::CalculateThresholdDecryption(msg) + } + }) + } + }; + bus.publish(ComputeRequestError::new(error_kind, request_snapshot), ctx)?; + return Ok(()); + } + }; if let Some(report) = report { report.do_send(TrackDuration::new(msg_string, duration)) diff --git a/crates/multithread/src/pool.rs b/crates/multithread/src/pool.rs index 152b3a25b5..d6fe9d7956 100644 --- a/crates/multithread/src/pool.rs +++ b/crates/multithread/src/pool.rs @@ -27,6 +27,9 @@ pub enum TaskPoolError { #[error("{0}")] RecvError(RecvError), + + #[error("Task panicked: {0}")] + Panic(String), } impl TaskPool { @@ -86,18 +89,34 @@ impl TaskPool { // This uses channels to track pending and complete tasks when // using the thread pool let (tx, rx) = tokio::sync::oneshot::channel(); - self.thread_pool.spawn(|| { - let t = op(); - // try to return the result and it's duration note this is sync as it is a oneshot sender. - if let Err(res) = tx.send(t) { - error!( - "There was an error sending the result from the multithread actor: result = {:?}", - res - ); + self.thread_pool.spawn(move || { + // Catch panics inside the Rayon thread so we can report them + // as errors instead of silently dropping the oneshot sender. + let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(op)); + match result { + Ok(t) => { + if let Err(res) = tx.send(Ok(t)) { + error!( + "There was an error sending the result from the multithread actor: result = {:?}", + res + ); + } + } + Err(panic_info) => { + let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = panic_info.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_string() + }; + error!("Rayon task panicked: {}", panic_msg); + let _ = tx.send(Err(TaskPoolError::Panic(panic_msg))); + } } }); - let output = rx.await.map_err(|r| TaskPoolError::RecvError(r))?; + let output = rx.await.map_err(|r| TaskPoolError::RecvError(r))??; warning_handle.abort(); diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 246a3c87eb..d5d32b2df0 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -377,7 +377,7 @@ async fn process_swarm_event( match record { Ok(record) => { let key = ContentHash(record.key.to_vec()); - info!("PUT RECORD SUCCESS: {:?}", key); + debug!("PUT RECORD SUCCESS: {:?}", key); event_tx.send(NetEvent::DhtPutRecordSucceeded { key, correlation_id, @@ -411,7 +411,7 @@ async fn process_swarm_event( peer_id, topic, })) => { - info!("Peer {} subscribed to {}", peer_id, topic); + debug!("Peer {} subscribed to {}", peer_id, topic); let count = swarm.behaviour().gossipsub.mesh_peers(&topic).count(); event_tx.send(NetEvent::GossipSubscribed { count, topic })?; } @@ -425,7 +425,7 @@ async fn process_swarm_event( }, .. })) => { - info!("Incoming sync request received (id={})", request_id); + debug!("Incoming sync request received (id={})", request_id); // received a request for events event_tx.send(NetEvent::SyncRequestReceived(SyncRequestReceived { @@ -635,7 +635,7 @@ fn handle_put_record( Ok(qid) => { // QueryId is returned synchronously and we immediately add it to the correlator so race conditions should not be an issue. correlator.track(qid, correlation_id); - info!("PUT RECORD OK qid={:?} cid={}", qid, correlation_id); + debug!("PUT RECORD OK qid={:?} cid={}", qid, correlation_id); } Err(error) => { event_tx.send(NetEvent::DhtPutRecordError { @@ -660,7 +660,7 @@ fn handle_get_record( // QueryId is returned synchronously and we immediately add it to the correlator so race conditions should not be an issue. correlator.track(query_id, correlation_id); - info!( + debug!( "GET RECORD CORRELATED! query_id={:?} correlation_id={}", query_id, correlation_id ); @@ -687,7 +687,7 @@ fn handle_outgoing_sync_request( correlation_id: CorrelationId, value: SyncRequestValue, ) -> Result<()> { - info!("Outgoing sync request (cid={})", correlation_id); + debug!("Outgoing sync request (cid={})", correlation_id); // TODO: // This is a first pass. // Lots of stuff to work through here: diff --git a/crates/zk-prover/src/actors/proof_request.rs b/crates/zk-prover/src/actors/proof_request.rs index 2cf1d8d35f..170942d18b 100644 --- a/crates/zk-prover/src/actors/proof_request.rs +++ b/crates/zk-prover/src/actors/proof_request.rs @@ -134,6 +134,10 @@ impl ProofRequestActor { ec: &EventContext, ) { let Some(pending) = self.pending_threshold.remove(correlation_id) else { + error!( + "Received PkGeneration ComputeResponse with correlation_id {:?} but no matching pending request found.", + correlation_id + ); return; }; @@ -214,6 +218,10 @@ impl ProofRequestActor { ec: &EventContext, ) { let Some(pending) = self.pending.remove(&correlation_id) else { + error!( + "Received PkBfv ComputeResponse with correlation_id {:?} but no matching pending request found.", + correlation_id + ); return; };