Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions crates/multithread/src/multithread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ use e3_zk_prover::{Provable, ZkBackend, ZkProver};
use fhe::bfv::PublicKey;
use fhe_traits::DeserializeParametrized;
use rand::Rng;
use tracing::error;
use tracing::info;
use tracing::{error, info};

/// Multithread actor
pub struct Multithread {
Expand Down Expand Up @@ -187,13 +186,46 @@ async fn handle_compute_request_event(
let msg_string = msg.to_string();
let job_name = msg_string.clone();
let (msg, ctx) = msg.into_components();
// We spawn a thread on rayon moving to "sync"-land
let request_snapshot = msg.clone();

let (result, duration) = pool
let pool_result = pool
.spawn(job_name, TaskTimeouts::default(), move || {
handle_compute_request(rng, cipher, zk_prover, msg)
})
.await?;
.await;

let (result, duration) = match pool_result {
Ok(v) => v,
Err(pool_err) => {
error!(
"Task pool error for compute request '{}': {pool_err}",
msg_string
);
let error_kind = match &request_snapshot.request {
ComputeRequestKind::Zk(_) => ComputeRequestErrorKind::Zk(
ZkEventError::ProofGenerationFailed(format!("Pool error: {pool_err}")),
),
ComputeRequestKind::TrBFV(ref trbfv_req) => {
let msg = format!("Pool error: {pool_err}");
ComputeRequestErrorKind::TrBFV(match trbfv_req {
TrBFVRequest::GenPkShareAndSkSss(_) => TrBFVError::GenPkShareAndSkSss(msg),
TrBFVRequest::GenEsiSss(_) => TrBFVError::GenEsiSss(msg),
TrBFVRequest::CalculateDecryptionKey(_) => {
TrBFVError::CalculateDecryptionKey(msg)
}
TrBFVRequest::CalculateDecryptionShare(_) => {
TrBFVError::CalculateDecryptionShare(msg)
}
TrBFVRequest::CalculateThresholdDecryption(_) => {
TrBFVError::CalculateThresholdDecryption(msg)
}
})
}
};
bus.publish(ComputeRequestError::new(error_kind, request_snapshot), ctx)?;
return Ok(());
}
};

if let Some(report) = report {
report.do_send(TrackDuration::new(msg_string, duration))
Expand Down
37 changes: 28 additions & 9 deletions crates/multithread/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub enum TaskPoolError {

#[error("{0}")]
RecvError(RecvError),

#[error("Task panicked: {0}")]
Panic(String),
}

impl TaskPool {
Expand Down Expand Up @@ -86,18 +89,34 @@ impl TaskPool {
// This uses channels to track pending and complete tasks when
// using the thread pool
let (tx, rx) = tokio::sync::oneshot::channel();
self.thread_pool.spawn(|| {
let t = op();
// try to return the result and it's duration note this is sync as it is a oneshot sender.
if let Err(res) = tx.send(t) {
error!(
"There was an error sending the result from the multithread actor: result = {:?}",
res
);
self.thread_pool.spawn(move || {
// Catch panics inside the Rayon thread so we can report them
// as errors instead of silently dropping the oneshot sender.
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(op));
match result {
Ok(t) => {
if let Err(res) = tx.send(Ok(t)) {
error!(
"There was an error sending the result from the multithread actor: result = {:?}",
res
);
}
}
Err(panic_info) => {
let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = panic_info.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic".to_string()
};
error!("Rayon task panicked: {}", panic_msg);
let _ = tx.send(Err(TaskPoolError::Panic(panic_msg)));
}
}
});

let output = rx.await.map_err(|r| TaskPoolError::RecvError(r))?;
let output = rx.await.map_err(|r| TaskPoolError::RecvError(r))??;

warning_handle.abort();

Expand Down
12 changes: 6 additions & 6 deletions crates/net/src/net_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async fn process_swarm_event(
match record {
Ok(record) => {
let key = ContentHash(record.key.to_vec());
info!("PUT RECORD SUCCESS: {:?}", key);
debug!("PUT RECORD SUCCESS: {:?}", key);
event_tx.send(NetEvent::DhtPutRecordSucceeded {
key,
correlation_id,
Expand Down Expand Up @@ -411,7 +411,7 @@ async fn process_swarm_event(
peer_id,
topic,
})) => {
info!("Peer {} subscribed to {}", peer_id, topic);
debug!("Peer {} subscribed to {}", peer_id, topic);
let count = swarm.behaviour().gossipsub.mesh_peers(&topic).count();
event_tx.send(NetEvent::GossipSubscribed { count, topic })?;
}
Expand All @@ -425,7 +425,7 @@ async fn process_swarm_event(
},
..
})) => {
info!("Incoming sync request received (id={})", request_id);
debug!("Incoming sync request received (id={})", request_id);

// received a request for events
event_tx.send(NetEvent::SyncRequestReceived(SyncRequestReceived {
Expand Down Expand Up @@ -635,7 +635,7 @@ fn handle_put_record(
Ok(qid) => {
// QueryId is returned synchronously and we immediately add it to the correlator so race conditions should not be an issue.
correlator.track(qid, correlation_id);
info!("PUT RECORD OK qid={:?} cid={}", qid, correlation_id);
debug!("PUT RECORD OK qid={:?} cid={}", qid, correlation_id);
}
Err(error) => {
event_tx.send(NetEvent::DhtPutRecordError {
Expand All @@ -660,7 +660,7 @@ fn handle_get_record(

// QueryId is returned synchronously and we immediately add it to the correlator so race conditions should not be an issue.
correlator.track(query_id, correlation_id);
info!(
debug!(
"GET RECORD CORRELATED! query_id={:?} correlation_id={}",
query_id, correlation_id
);
Expand All @@ -687,7 +687,7 @@ fn handle_outgoing_sync_request(
correlation_id: CorrelationId,
value: SyncRequestValue,
) -> Result<()> {
info!("Outgoing sync request (cid={})", correlation_id);
debug!("Outgoing sync request (cid={})", correlation_id);
// TODO:
// This is a first pass.
// Lots of stuff to work through here:
Expand Down
8 changes: 8 additions & 0 deletions crates/zk-prover/src/actors/proof_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ impl ProofRequestActor {
ec: &EventContext<Sequenced>,
) {
let Some(pending) = self.pending_threshold.remove(correlation_id) else {
error!(
"Received PkGeneration ComputeResponse with correlation_id {:?} but no matching pending request found.",
correlation_id
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return;
};

Expand Down Expand Up @@ -214,6 +218,10 @@ impl ProofRequestActor {
ec: &EventContext<Sequenced>,
) {
let Some(pending) = self.pending.remove(&correlation_id) else {
error!(
"Received PkBfv ComputeResponse with correlation_id {:?} but no matching pending request found.",
correlation_id
);
return;
};

Expand Down
Loading