Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions crates/ciphernode-builder/src/ciphernode_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct CiphernodeBuilder {
keyshare: Option<KeyshareKind>,
logging: bool,
multithread_cache: Option<Addr<Multithread>>,
multithread_concurrent_jobs: Option<usize>,
multithread_capture_events: bool,
Comment thread
ryardley marked this conversation as resolved.
plaintext_agg: bool,
pubkey_agg: bool,
rng: SharedRng,
Expand Down Expand Up @@ -99,12 +101,14 @@ impl CiphernodeBuilder {
multithread_cache: None,
plaintext_agg: false,
pubkey_agg: false,
multithread_concurrent_jobs: None,
rng,
source_bus: None,
sortition_backend: SortitionBackend::score(),
testmode_errors: false,
testmode_history: false,
threads: None,
multithread_capture_events: false,
threshold_plaintext_agg: false,
}
}
Expand Down Expand Up @@ -192,13 +196,36 @@ impl CiphernodeBuilder {
self
}

/// Setup how many threads to use within the multithread actor
#[deprecated(note = "This method is under construction and should not be used yet")]
/// Setup how many threads to use within the multithread actor for it's rayon based workload
pub fn with_threads(mut self, threads: usize) -> Self {
self.threads = Some(threads);
self
}

/// This will provide one thread for the actor model and use all other threads for
/// rayon based workloads
pub fn with_max_threads(mut self) -> Self {
self.threads = Some(Multithread::get_max_threads_minus(1));
self
}

/// This will save the given number of threads from being used by the rayon threadpool
pub fn with_max_threads_minus(mut self, threads: usize) -> Self {
self.threads = Some(Multithread::get_max_threads_minus(threads));
self
}

/// Set the number of concurrent jobs defaults to 1
pub fn with_multithread_concurrent_jobs(mut self, jobs: usize) -> Self {
self.multithread_concurrent_jobs = if jobs >= 1 { Some(jobs) } else { None };
self
}

pub fn with_multithread_capture_events(mut self) -> Self {
self.multithread_capture_events = true;
self
}

/// Setup a ThresholdPlaintextAggregator
pub fn with_threshold_plaintext_aggregation(mut self) -> Self {
self.threshold_plaintext_agg = true;
Expand Down Expand Up @@ -474,6 +501,8 @@ impl CiphernodeBuilder {
self.rng.clone(),
self.cipher.clone(),
self.threads.unwrap_or(1),
self.multithread_concurrent_jobs.unwrap_or(1),
self.multithread_capture_events,
);

// Set the cache
Expand Down
1 change: 1 addition & 0 deletions crates/entrypoint/src/start/aggregator_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn execute(
.with_contract_enclave_full()
.with_contract_bonding_registry()
.with_contract_ciphernode_registry()
.with_max_threads()
.with_pubkey_aggregation();

if experimental_trbfv {
Expand Down
1 change: 1 addition & 0 deletions crates/entrypoint/src/start/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub async fn execute(
.with_chains(&config.chains())
.with_contract_enclave_reader()
.with_contract_bonding_registry()
.with_max_threads()
.with_contract_ciphernode_registry();

if experimental_trbfv {
Expand Down
30 changes: 29 additions & 1 deletion crates/events/src/enclave_event/compute_request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ pub enum ComputeRequest {
// Eg. TFHE(TFHERequest)
}

impl ToString for ComputeRequest {
fn to_string(&self) -> String {
match self {
Self::TrBFV(e3_trbfv::TrBFVRequest::GenEsiSss(_)) => "GenEsiSss",
Self::TrBFV(e3_trbfv::TrBFVRequest::GenPkShareAndSkSss(_)) => "GenPkShareAndSkSss",
Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateDecryptionKey(_)) => {
"CalculateDecryptionKey"
}
Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateDecryptionShare(_)) => {
"CalculateDecryptionShare"
}
Self::TrBFV(e3_trbfv::TrBFVRequest::CalculateThresholdDecryption(_)) => {
"CalculateThresholdDecryption"
}
}
.to_string()
}
}
Comment thread
ryardley marked this conversation as resolved.

/// The compute result from a threadpool computation
/// This enum provides protocol disambiguation
#[derive(Message, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand All @@ -43,13 +62,16 @@ pub enum ComputeResponse {
pub enum ComputeRequestError {
/// By Protocol
TrBFV(e3_trbfv::TrBFVError),
RecvError(String),
SemaphoreError(String),
// Eg. TFHE(TFHEError)
}

impl std::error::Error for ComputeRequestError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ComputeRequestError::TrBFV(err) => Some(err),
_ => None,
}
}
}
Expand All @@ -58,7 +80,13 @@ impl fmt::Display for ComputeRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ComputeRequestError::TrBFV(err) => {
write!(f, "TrBFV error: {:?}", err)
write!(f, "We had an error number crunching: {:?}", err)
}
ComputeRequestError::SemaphoreError(name) => {
write!(f, "Multithread SemaphoreError. This means there was a problem acquiring the semaphore lock for this ComputeRequest: '{name}'")
}
ComputeRequestError::RecvError(name) => {
write!(f, "Multithread RecvError. This means there was a problem receiving a response for this ComputeRequest: '{name}'")
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions crates/keyshare/src/threshold_keyshare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ impl ThresholdKeyshare {
/// handler with the results. Errors at this stage are simply
/// logged. Eventually we will need to configure a policy here
/// For example retry with exponential backoff
fn handle_compute_request<F, R>(
fn multithread_request<F, R>(
&mut self,
request_fn: F,
response_fn: R,
Expand Down Expand Up @@ -778,7 +778,7 @@ impl Handler<CiphernodeSelected> for ThresholdKeyshare {
impl Handler<GenEsiSss> for ThresholdKeyshare {
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: GenEsiSss, _: &mut Self::Context) -> Self::Result {
self.handle_compute_request(
self.multithread_request(
|act| act.handle_gen_esi_sss_requested(msg),
|act, res, _| act.handle_gen_esi_sss_response(res),
)
Expand All @@ -788,7 +788,7 @@ impl Handler<GenEsiSss> for ThresholdKeyshare {
impl Handler<GenPkShareAndSkSss> for ThresholdKeyshare {
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: GenPkShareAndSkSss, _: &mut Self::Context) -> Self::Result {
self.handle_compute_request(
self.multithread_request(
|act| act.handle_gen_pk_share_and_sk_sss_requested(msg),
|act, res, _| act.handle_gen_pk_share_and_sk_sss_response(res),
)
Expand All @@ -798,7 +798,7 @@ impl Handler<GenPkShareAndSkSss> for ThresholdKeyshare {
impl Handler<AllThresholdSharesCollected> for ThresholdKeyshare {
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: AllThresholdSharesCollected, _: &mut Self::Context) -> Self::Result {
self.handle_compute_request(
self.multithread_request(
|act| act.handle_all_threshold_shares_collected(msg),
|act, res, _| act.handle_calculate_decryption_key_response(res),
)
Expand All @@ -808,7 +808,7 @@ impl Handler<AllThresholdSharesCollected> for ThresholdKeyshare {
impl Handler<CiphertextOutputPublished> for ThresholdKeyshare {
type Result = ResponseActFuture<Self, ()>;
fn handle(&mut self, msg: CiphertextOutputPublished, _: &mut Self::Context) -> Self::Result {
self.handle_compute_request(
self.multithread_request(
|act| act.handle_ciphertext_output_published(msg),
|act, res, _| act.handle_calculate_decryption_share_response(res),
)
Expand Down
Loading
Loading