diff --git a/Cargo.lock b/Cargo.lock index 1f3226d48..6518e25e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3060,6 +3060,7 @@ dependencies = [ "e3-polynomial", "e3-request", "e3-sortition", + "e3-test-helpers", "e3-trbfv", "e3-utils", "e3-zk-helpers", diff --git a/agent/flow-trace/00_INDEX.md b/agent/flow-trace/00_INDEX.md index ff709f803..565b7f8a1 100644 --- a/agent/flow-trace/00_INDEX.md +++ b/agent/flow-trace/00_INDEX.md @@ -176,19 +176,20 @@ _Found during source-code cross-referencing of these trace documents._ | 4 | `activate()` calls `register()` → `registerOperator()` which has `require(!registered, AlreadyRegistered())`. So activate **reverts** for already-registered operators. It only works for re-registration after deregistration. | BondingRegistry.sol:308 | 01_REGISTRATION | | 5 | `E3Requested` event is `(uint256 e3Id, E3 e3, IE3Program indexed e3Program)` — seed and params are inside the E3 struct, not separate parameters. | IEnclave.sol:82 | 03_E3_REQUEST | | 6 | `finalizeCommittee()` checks `>=` deadline, not `>`. | CiphernodeRegistryOwnable.sol | 03_E3_REQUEST | -| 7 | `publishCommittee()` is now permissionless. The effective access control is the C5 proof verification plus the single-publish guard `publicKeyHashes[e3Id] == 0`; the old `onlyOwner` note is obsolete. | CiphernodeRegistryOwnable.sol | 04_DKG | -| 8 | `CommitteePublished` event emits `(e3Id, nodes, publicKey, proof)` — full PK bytes and C5 proof, not just pkHash. | CiphernodeRegistryOwnable.sol | 04_DKG | +| 7 | `publishCommittee()` is now permissionless. The effective access control is DKG proof verification plus the single-publish guard `publicKeyHashes[e3Id] == 0`; the old `onlyOwner` note is obsolete. | CiphernodeRegistryOwnable.sol | 04_DKG | +| 8 | `CommitteePublished` event emits `(e3Id, nodes, publicKey, pkCommitment, proof)` — full PK bytes, pkCommitment, and proof bytes (DkgAggregator when proof aggregation is enabled), not just pkHash. | CiphernodeRegistryOwnable.sol | 04_DKG | | 9 | `_validateNodeEligibility` calls `bondingRegistry.getTicketBalanceAtBlock()` (not `ticketToken.getPastVotes()` directly). | CiphernodeRegistryOwnable.sol:668 | 03_E3_REQUEST | | 10 | Lane A slashing uses **attestation-based** verification (committee quorum votes), not direct ZK proof re-verification on-chain. `proposeSlash()` decodes voter addresses, agrees, data hashes, and ECDSA signatures — not ZK proofs. | SlashingManager.sol | 05_FAILURE | ### Protocol Design Concerns -| # | Concern | Severity | Detail | -| --- | ------------------------------------------ | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 1 | **Deregister-before-slash race** | Accepted | SlashingManager Lane B (evidence+appeal) has a window during which the operator can deregister and claim their exit. If they do, the slash executes against 0 funds. The contract comments acknowledge this as an accepted tradeoff for the appeal window design. | -| 2 | **Committee publication decentralized** | Resolved | `publishCommittee()` is permissionless. Off-chain role selection chooses the active aggregator, while on-chain C5 proof verification and the single-publish guard prevent invalid or duplicate committee publication. | -| 3 | **`gracePeriod` is dead code** | Medium | `gracePeriod` is stored and validated during config updates but never actually used in any timeout check. Either the deadlines already bake in sufficient buffer, or this is a missing feature. | -| 4 | **`activate` CLI command is misleading** | Low | Named "activate" but actually calls "register" — will fail for already-registered operators. There's no standalone way to trigger re-evaluation of active status; instead, `_updateOperatorStatus()` runs automatically inside `addTicketBalance()`, `bondLicense()`, etc. | -| 5 | **Active-job load balancing bug fixed** | Info | The Rust `NodeStateStore.available_tickets()` subtracts `active_jobs` from total tickets, reducing the chance of busy nodes being selected for new E3s. Previously, the `Sortition` actor's `Handler` was missing match arms for `E3Failed` and `E3StageChanged`, causing these events to fall to the default `_ => ()` — the typed handlers for decrementing jobs were dead code. This has been fixed: E3Failed and E3StageChanged are now routed to their handlers, and `finalized_committees` is cleaned up in `decrement_jobs_for_e3` to prevent unbounded memory growth. | -| 6 | **Committee member expulsion** | Info | `SlashingManager` can call `expelCommitteeMember()` mid-DKG. The `Sortition` actor enriches the raw `CommitteeMemberExpelled` event with the expelled member's `party_id` (resolved from its stored `Committee` list) and re-publishes it. `ThresholdKeyshare` then uses the enriched `party_id` to update its collectors, potentially completing DKG with fewer parties. `ThresholdKeyshare` itself does not hold committee state. | -| 7 | **NodeProofAggregator stall bridge fixed** | Info | `NodeProofAggregator` no longer drops `DKGInnerProofReady` events that arrive before `ThresholdSharePending`; it prebuffers them until collection state exists. It also converts `NodeDkgFold` `ComputeRequestError` into `DKGRecursiveAggregationComplete { aggregated_proof: None }` instead of silently discarding actor state, preventing DKG proof aggregation stalls when fold workers fail or events arrive slightly out of order. | +| # | Concern | Severity | Detail | +| --- | ---------------------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 1 | **Deregister-before-slash race** | Accepted | SlashingManager Lane B (evidence+appeal) has a window during which the operator can deregister and claim their exit. If they do, the slash executes against 0 funds. The contract comments acknowledge this as an accepted tradeoff for the appeal window design. | +| 2 | **Committee publication decentralized** | Resolved | `publishCommittee()` is permissionless. Off-chain role selection chooses the active aggregator, while on-chain DKG proof verification and the single-publish guard prevent invalid or duplicate committee publication. | +| 3 | **`gracePeriod` is dead code** | Medium | `gracePeriod` is stored and validated during config updates but never actually used in any timeout check. Either the deadlines already bake in sufficient buffer, or this is a missing feature. | +| 4 | **`activate` CLI command is misleading** | Low | Named "activate" but actually calls "register" — will fail for already-registered operators. There's no standalone way to trigger re-evaluation of active status; instead, `_updateOperatorStatus()` runs automatically inside `addTicketBalance()`, `bondLicense()`, etc. | +| 5 | **Active-job load balancing bug fixed** | Info | The Rust `NodeStateStore.available_tickets()` subtracts `active_jobs` from total tickets, reducing the chance of busy nodes being selected for new E3s. Previously, the `Sortition` actor's `Handler` was missing match arms for `E3Failed` and `E3StageChanged`, causing these events to fall to the default `_ => ()` — the typed handlers for decrementing jobs were dead code. This has been fixed: E3Failed and E3StageChanged are now routed to their handlers, and `finalized_committees` is cleaned up in `decrement_jobs_for_e3` to prevent unbounded memory growth. | +| 6 | **Committee member expulsion** | Info | `SlashingManager` can call `expelCommitteeMember()` mid-DKG. The `Sortition` actor enriches the raw `CommitteeMemberExpelled` event with the expelled member's `party_id` (resolved from its stored `Committee` list) and re-publishes it. `ThresholdKeyshare` then uses the enriched `party_id` to update its collectors, potentially completing DKG with fewer parties. `ThresholdKeyshare` itself does not hold committee state. | +| 7 | **PublicKeyAggregator failure bridge fixed** | Info | `PublicKeyAggregator` now routes `ComputeRequestError` for `ZkRequest::DkgAggregation` and converts mixed Some/None honest NodeFold proof sets into `E3Failed { failed_at_stage: CommitteeFinalized, reason: DKGInvalidShares }`, preventing PK aggregation stalls that previously surfaced only as `EnclaveError`. | +| 8 | **ThresholdPlaintextAggregator failure bridge fixed** | Info | `ThresholdPlaintextAggregator` now routes `ComputeRequestError` for `CalculateThresholdDecryption` and `DecryptionAggregation`, and fatal C6/C7/decryption-aggregation checks now emit `E3Failed { failed_at_stage: CiphertextReady, reason: DecryptionInvalidShares }` instead of halting locally under `trap()`. | diff --git a/agent/flow-trace/04_DKG_AND_COMPUTATION.md b/agent/flow-trace/04_DKG_AND_COMPUTATION.md index a331fa80f..98eb079eb 100644 --- a/agent/flow-trace/04_DKG_AND_COMPUTATION.md +++ b/agent/flow-trace/04_DKG_AND_COMPUTATION.md @@ -171,11 +171,27 @@ ThresholdKeyshare receives AllEncryptionKeysCollected │ │ Output: esi_sss[num_ciphertexts][N] │ │ └─────────────────────────────────────────────────────────┘ ``` + │ + ├─ ThresholdKeyshare tracks the correlation id for both TrBFV requests: + │ ├─ `GenPkShareAndSkSss` + │ └─ `GenEsiSss` + │ → If the worker returns `ComputeRequestError` for either request, + │ `ThresholdKeyshare` now emits `E3Failed { + │ failed_at_stage: CommitteeFinalized, + │ reason: DKGInvalidShares + │ }` and stops instead of remaining stuck in `GeneratingThresholdShare` ### Step 5: Encrypt & Broadcast Shares (with C1, C2, C3 Proofs) ``` Both GenPkShareAndSkSss and GenEsiSss complete + │ + ├─ `ThresholdKeyshare` tracks the `CalculateDecryptionKey` correlation id: + │ → `ComputeRequestError` for this request now emits + │ `E3Failed { + │ failed_at_stage: CommitteeFinalized, + │ reason: DKGInvalidShares + │ }` and stops before C4 proof dispatch │ ├─ handle_shares_generated(): │ │ @@ -555,8 +571,21 @@ ThresholdKeyshare receives AllThresholdSharesCollected │ │ e3_id, party_id, signed_proof(C5) │ │ } │ │ -│ └─ 5. Publish PublicKeyAggregated { -│ e3_id, aggregate_pk, pk_hash, node_list +│ ├─ 5. DKG AGGREGATION REQUEST (when proof aggregation is enabled): +│ │ ├─ PublicKeyAggregator buffers one optional NodeFold proof per honest party from +│ │ │ DKGRecursiveAggregationComplete +│ │ ├─ Dispatches ComputeRequest::zk(ZkRequest::DkgAggregation { +│ │ │ node_fold_proofs, c5_proof, party_ids, params_preset +│ │ │ }) +│ │ ├─ Tracks the in-flight correlation id +│ │ ├─ ComputeRequestError now emits +│ │ │ E3Failed { failed_at_stage: CommitteeFinalized, reason: DKGInvalidShares } +│ │ └─ A mixed Some/None honest NodeFold-proof set is treated as the same terminal DKG +│ │ failure instead of only surfacing as EnclaveError telemetry +│ │ +│ └─ 6. Publish PublicKeyAggregated { +│ e3_id, pubkey: aggregate_pk, pk_commitment, nodes, +│ dkg_aggregator_proof │ } │ └─ CiphernodeRegistrySolWriter receives PublicKeyAggregated: @@ -663,6 +692,13 @@ EnclaveSolReader decodes CiphertextOutputPublished event │ │ │ │ │ │ │ Output: Vec │ │ │ └─────────────────────────────────────────────────────┘ + │ + ├─ `ThresholdKeyshare` tracks the `CalculateDecryptionShare` correlation id: + │ → `ComputeRequestError` for this request now emits + │ `E3Failed { + │ failed_at_stage: CiphertextReady, + │ reason: DecryptionInvalidShares + │ }` and stops before C6 proof generation │ ├─ REQUEST C6 PROOF: │ Publish ShareDecryptionProofPending { @@ -736,6 +772,12 @@ EnclaveSolReader decodes CiphertextOutputPublished event │ │ │ │ Output: plaintext_bytes │ │ │ │ └─────────────────────────────────────────────────────┘ │ │ +│ ├─ ThresholdPlaintextAggregator tracks the `CalculateThresholdDecryption` correlation id: +│ │ ├─ `ComputeRequestError` now emits +│ │ │ `E3Failed { failed_at_stage: CiphertextReady, reason: DecryptionInvalidShares }` +│ │ └─ Fatal C6 filtering failures (too few honest shares or post-proof commitment +│ │ mismatches) emit the same terminal failure instead of only trapping locally +│ │ │ ├─ REQUEST C7 PROOF: │ │ Publish AggregationProofPending { │ │ proof_request: DecryptedSharesAggregationProofRequest, @@ -755,7 +797,23 @@ EnclaveSolReader decodes CiphertextOutputPublished event │ │ e3_id, party_id, signed_proof(C7) │ │ } │ │ -│ └─ Publish PlaintextAggregated { e3_id, decrypted_output } +│ ├─ DECRYPTION AGGREGATION REQUEST: +│ │ ├─ ThresholdPlaintextAggregator stores the signed C7 proofs plus the honest C6 inner +│ │ │ proofs for the first `T + 1` parties after sorting by `party_id` +│ │ ├─ Dispatches ComputeRequest::zk(ZkRequest::DecryptionAggregation { +│ │ │ c6_total_slots, jobs, params_preset +│ │ │ }) +│ │ ├─ Each job folds the selected C6 proofs for one ciphertext index and checks them +│ │ │ against the matching C7 proof inside `DecryptionAggregator` +│ │ ├─ Tracks the in-flight correlation id +│ │ ├─ ComputeRequestError, missing C6 inner proofs, or C7/decryption-aggregator proof-count +│ │ │ mismatches now emit +│ │ │ `E3Failed { failed_at_stage: CiphertextReady, reason: DecryptionInvalidShares }` +│ │ └─ On success, stores `decryption_aggregator_proofs` +│ │ +│ └─ Publish PlaintextAggregated { +│ e3_id, decrypted_output, decryption_aggregator_proofs +│ } │ └─ EnclaveSolWriter receives PlaintextAggregated: ├─ Requires EffectsEnabled diff --git a/crates/aggregator/Cargo.toml b/crates/aggregator/Cargo.toml index 70abbb63d..66132a749 100644 --- a/crates/aggregator/Cargo.toml +++ b/crates/aggregator/Cargo.toml @@ -29,3 +29,6 @@ e3-zk-helpers = { workspace = true } e3-utils = { workspace = true } serde = { workspace = true } tracing = { workspace = true } + +[dev-dependencies] +e3-test-helpers = { workspace = true } diff --git a/crates/aggregator/src/publickey_aggregator.rs b/crates/aggregator/src/publickey_aggregator.rs index 6d4680804..3f720cbe8 100644 --- a/crates/aggregator/src/publickey_aggregator.rs +++ b/crates/aggregator/src/publickey_aggregator.rs @@ -8,13 +8,13 @@ use actix::prelude::*; use anyhow::Result; use e3_data::Persistable; use e3_events::{ - prelude::*, BusHandle, CircuitName, ComputeRequest, ComputeResponse, ComputeResponseKind, - CorrelationId, DKGRecursiveAggregationComplete, Die, DkgAggregationRequest, E3Failed, E3Stage, - E3id, EnclaveEvent, EnclaveEventData, EventContext, FailureReason, KeyshareCreated, OrderedSet, - PartyProofsToVerify, PkAggregationProofPending, PkAggregationProofRequest, - PkAggregationProofSigned, Proof, ProofType, PublicKeyAggregated, Seed, Sequenced, - ShareVerificationComplete, ShareVerificationDispatched, SignedProofFailed, SignedProofPayload, - TypedEvent, VerificationKind, ZkRequest, ZkResponse, + prelude::*, BusHandle, CircuitName, ComputeRequest, ComputeRequestError, ComputeResponse, + ComputeResponseKind, CorrelationId, DKGRecursiveAggregationComplete, Die, + DkgAggregationRequest, E3Failed, E3Stage, E3id, EnclaveEvent, EnclaveEventData, EventContext, + FailureReason, KeyshareCreated, OrderedSet, PartyProofsToVerify, PkAggregationProofPending, + PkAggregationProofRequest, PkAggregationProofSigned, Proof, ProofType, PublicKeyAggregated, + Seed, Sequenced, ShareVerificationComplete, ShareVerificationDispatched, SignedProofFailed, + SignedProofPayload, TypedEvent, VerificationKind, ZkRequest, ZkResponse, }; use e3_events::{trap, EType}; use e3_fhe::{Fhe, GetAggregatePublicKey}; @@ -649,11 +649,51 @@ impl PublicKeyAggregator { }) .count(); if some_count != 0 && some_count != honest_party_ids.len() { - anyhow::bail!( + error!( "PublicKeyAggregator: mixed Some/None DKG node proofs across honest parties \ - ({some_count} of {} present); refusing to dispatch a truncated DkgAggregation", - honest_party_ids.len() + ({some_count} of {} present); failing E3 {}", + honest_party_ids.len(), + self.e3_id ); + self.bus.publish( + E3Failed { + e3_id: self.e3_id.clone(), + failed_at_stage: E3Stage::CommitteeFinalized, + reason: FailureReason::DKGInvalidShares, + }, + ec.clone(), + )?; + self.state.try_mutate(ec, |state| { + let PublicKeyAggregatorState::GeneratingC5Proof { + public_key, + keyshare_bytes, + nodes, + dkg_node_proofs, + honest_party_ids, + dishonest_parties, + dkg_aggregation_correlation: _, + dkg_aggregated_proof, + c5_proof_pending: _, + last_ec, + } = state + else { + return Ok(state); + }; + + Ok(PublicKeyAggregatorState::GeneratingC5Proof { + public_key, + keyshare_bytes, + nodes, + dkg_node_proofs, + honest_party_ids, + dishonest_parties, + dkg_aggregation_correlation: None, + dkg_aggregated_proof, + c5_proof_pending: None, + last_ec, + }) + })?; + return Ok(()); } let mut pairs: Vec<_> = dkg_node_proofs @@ -887,6 +927,73 @@ impl PublicKeyAggregator { Ok(()) } + fn handle_compute_request_error(&mut self, msg: TypedEvent) -> Result<()> { + let (msg, ec) = msg.into_components(); + if msg.request().e3_id != self.e3_id { + return Ok(()); + } + + let matched_correlation = matches!( + self.state.get(), + Some(PublicKeyAggregatorState::GeneratingC5Proof { + dkg_aggregation_correlation, + .. + }) if dkg_aggregation_correlation.as_ref() == Some(msg.correlation_id()) + ); + + if !matched_correlation { + return Ok(()); + } + + error!( + "PublicKeyAggregator: DkgAggregation failed for E3 {}: {:?}", + self.e3_id, + msg.get_err() + ); + + self.bus.publish( + E3Failed { + e3_id: self.e3_id.clone(), + failed_at_stage: E3Stage::CommitteeFinalized, + reason: FailureReason::DKGInvalidShares, + }, + ec.clone(), + )?; + + self.state.try_mutate(&ec, |state| { + let PublicKeyAggregatorState::GeneratingC5Proof { + public_key, + keyshare_bytes, + nodes, + dkg_node_proofs, + honest_party_ids, + dishonest_parties, + dkg_aggregation_correlation: _, + dkg_aggregated_proof, + c5_proof_pending: _, + last_ec, + } = state + else { + return Ok(state); + }; + + Ok(PublicKeyAggregatorState::GeneratingC5Proof { + public_key, + keyshare_bytes, + nodes, + dkg_node_proofs, + honest_party_ids, + dishonest_parties, + dkg_aggregation_correlation: None, + dkg_aggregated_proof, + c5_proof_pending: None, + last_ec, + }) + })?; + + Ok(()) + } + pub fn handle_member_expelled( &mut self, node: &str, @@ -977,6 +1084,9 @@ impl Handler for PublicKeyAggregator { EnclaveEventData::ComputeResponse(data) => { self.notify_sync(ctx, TypedEvent::new(data, ec)) } + EnclaveEventData::ComputeRequestError(data) => { + self.notify_sync(ctx, TypedEvent::new(data, ec)) + } EnclaveEventData::E3RequestComplete(_) => self.notify_sync(ctx, Die), EnclaveEventData::CommitteeMemberExpelled(data) => { // Only process raw events from chain (party_id not yet resolved). @@ -1131,9 +1241,201 @@ impl Handler> for PublicKeyAggregator { } } +impl Handler> for PublicKeyAggregator { + type Result = (); + + fn handle( + &mut self, + msg: TypedEvent, + _ctx: &mut Self::Context, + ) -> Self::Result { + trap( + EType::PublickeyAggregation, + &self.bus.with_ec(msg.get_ctx()), + || self.handle_compute_request_error(msg), + ) + } +} + impl Handler for PublicKeyAggregator { type Result = (); fn handle(&mut self, _: Die, ctx: &mut Self::Context) -> Self::Result { ctx.stop(); } } + +#[cfg(test)] +mod tests { + use super::*; + use e3_data::{AutoPersist, DataStore, InMemStore, Repository}; + use e3_events::{ComputeRequestErrorKind, HistoryCollector, TakeEvents, Unsequenced, ZkError}; + use e3_test_helpers::get_common_setup; + + fn test_ctx(data: impl Into) -> EventContext { + EventContext::::from(data.into()).sequence(0) + } + + fn test_state( + initial_state: PublicKeyAggregatorState, + ) -> Persistable { + let repo = Repository::::new(DataStore::from_in_mem( + &InMemStore::new(false).start(), + )); + repo.to_connector().send(Some(initial_state)) + } + + fn dummy_proof(circuit: CircuitName) -> Proof { + Proof::new( + circuit, + ArcBytes::from_bytes(&[1]), + ArcBytes::from_bytes(&[2]), + ) + } + + fn generating_c5_state(correlation_id: CorrelationId) -> PublicKeyAggregatorState { + PublicKeyAggregatorState::GeneratingC5Proof { + public_key: ArcBytes::from_bytes(&[1, 2, 3]), + keyshare_bytes: Vec::new(), + nodes: OrderedSet::new(), + dkg_node_proofs: HashMap::new(), + honest_party_ids: BTreeSet::new(), + dishonest_parties: BTreeSet::new(), + dkg_aggregation_correlation: Some(correlation_id), + dkg_aggregated_proof: None, + c5_proof_pending: Some(dummy_proof(CircuitName::PkAggregation)), + last_ec: None, + } + } + + async fn build_public_key_aggregator( + initial_state: PublicKeyAggregatorState, + ) -> Result<( + PublicKeyAggregator, + Addr>, + E3id, + )> { + let (bus, rng, _seed, params, crp, _errors, history) = + get_common_setup(Some(BfvPreset::InsecureThreshold512.into()))?; + let e3_id = E3id::new("42", 1); + let fhe = Arc::new(Fhe::new(params, crp, rng)); + let aggregator = PublicKeyAggregator::new( + PublicKeyAggregatorParams { + fhe, + bus, + e3_id: e3_id.clone(), + params_preset: BfvPreset::InsecureThreshold512, + }, + test_state(initial_state), + ); + + Ok((aggregator, history, e3_id)) + } + + async fn next_event(history: &Addr>) -> Result { + let mut result = history.send(TakeEvents::::new(1)).await?; + assert!(!result.timed_out, "timed out waiting for an event"); + Ok(result.events.pop().expect("expected one event")) + } + + #[actix::test] + async fn dkg_aggregation_compute_error_emits_e3_failed() -> Result<()> { + let correlation_id = CorrelationId::new(); + let (mut aggregator, history, e3_id) = + build_public_key_aggregator(generating_c5_state(correlation_id.clone())).await?; + + let request = ComputeRequest::zk( + ZkRequest::DkgAggregation(DkgAggregationRequest { + node_fold_proofs: vec![dummy_proof(CircuitName::PkAggregation)], + c5_proof: dummy_proof(CircuitName::PkAggregation), + party_ids: vec![0], + params_preset: BfvPreset::InsecureThreshold512, + }), + correlation_id, + e3_id.clone(), + ); + + aggregator.handle_compute_request_error(TypedEvent::new( + ComputeRequestError::new( + ComputeRequestErrorKind::Zk(ZkError::ProofGenerationFailed("boom".to_string())), + request, + ), + test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }), + ))?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CommitteeFinalized + && data.reason == FailureReason::DKGInvalidShares + )); + + let Some(PublicKeyAggregatorState::GeneratingC5Proof { + dkg_aggregation_correlation, + c5_proof_pending, + .. + }) = aggregator.state.get() + else { + panic!("expected GeneratingC5Proof state"); + }; + assert!(dkg_aggregation_correlation.is_none()); + assert!(c5_proof_pending.is_none()); + + Ok(()) + } + + #[actix::test] + async fn mixed_dkg_proofs_emit_e3_failed() -> Result<()> { + let correlation_id = CorrelationId::new(); + let mut initial_state = generating_c5_state(correlation_id); + let PublicKeyAggregatorState::GeneratingC5Proof { + ref mut dkg_aggregation_correlation, + ref mut dkg_node_proofs, + ref mut honest_party_ids, + .. + } = initial_state + else { + unreachable!(); + }; + *dkg_aggregation_correlation = None; + honest_party_ids.extend([0, 1]); + dkg_node_proofs.insert(0, Some(dummy_proof(CircuitName::PkAggregation))); + dkg_node_proofs.insert(1, None); + + let (mut aggregator, history, e3_id) = build_public_key_aggregator(initial_state).await?; + let ec = test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }); + + aggregator.try_dispatch_dkg_aggregation(&ec)?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CommitteeFinalized + && data.reason == FailureReason::DKGInvalidShares + )); + + let Some(PublicKeyAggregatorState::GeneratingC5Proof { + dkg_aggregation_correlation, + c5_proof_pending, + .. + }) = aggregator.state.get() + else { + panic!("expected GeneratingC5Proof state"); + }; + assert!(dkg_aggregation_correlation.is_none()); + assert!(c5_proof_pending.is_none()); + + Ok(()) + } +} diff --git a/crates/aggregator/src/threshold_plaintext_aggregator.rs b/crates/aggregator/src/threshold_plaintext_aggregator.rs index 056a0d496..73cc1ec3b 100644 --- a/crates/aggregator/src/threshold_plaintext_aggregator.rs +++ b/crates/aggregator/src/threshold_plaintext_aggregator.rs @@ -11,12 +11,13 @@ use anyhow::{anyhow, bail, ensure, Result}; use e3_data::Persistable; use e3_events::{ prelude::*, trap, AggregationProofPending, AggregationProofSigned, BusHandle, CircuitName, - CommitteeMemberExpelled, ComputeRequest, ComputeResponse, ComputeResponseKind, CorrelationId, - DecryptedSharesAggregationProofRequest, DecryptionAggregationJobRequest, - DecryptionAggregationRequest, DecryptionshareCreated, Die, E3id, EType, EnclaveEvent, - EnclaveEventData, EventContext, PartyProofsToVerify, PlaintextAggregated, Proof, Seed, - Sequenced, ShareVerificationComplete, ShareVerificationDispatched, SignedProofPayload, - TypedEvent, VerificationKind, ZkRequest, ZkResponse, + CommitteeMemberExpelled, ComputeRequest, ComputeRequestError, ComputeResponse, + ComputeResponseKind, CorrelationId, DecryptedSharesAggregationProofRequest, + DecryptionAggregationJobRequest, DecryptionAggregationRequest, DecryptionshareCreated, Die, + E3Failed, E3Stage, E3id, EType, EnclaveEvent, EnclaveEventData, EventContext, FailureReason, + PartyProofsToVerify, PlaintextAggregated, Proof, Seed, Sequenced, ShareVerificationComplete, + ShareVerificationDispatched, SignedProofPayload, TypedEvent, VerificationKind, ZkRequest, + ZkResponse, }; use e3_fhe_params::BfvPreset; use e3_sortition::{E3CommitteeContainsRequest, E3CommitteeContainsResponse, Sortition}; @@ -175,6 +176,8 @@ pub struct ThresholdPlaintextAggregator { state: Persistable, /// Honest parties' C6 inner proofs (sorted by party id) for [`ZkRequest::DecryptionAggregation`]. honest_c6_proofs_for_agg: Option)>>, + /// In-flight threshold decryption request. + threshold_decryption_correlation: Option, /// In-flight decryption aggregation request. decryption_aggregation_correlation: Option, /// C7 proofs stored while waiting for decryption aggregation. @@ -206,6 +209,7 @@ impl ThresholdPlaintextAggregator { proof_aggregation_enabled: params.proof_aggregation_enabled, state, honest_c6_proofs_for_agg: None, + threshold_decryption_correlation: None, decryption_aggregation_correlation: None, c7_proofs_pending: None, decryption_aggregator_proofs: None, @@ -390,12 +394,14 @@ impl ThresholdPlaintextAggregator { .map(|(id, s)| (*id, s.clone())) .collect(); - ensure!( - honest_shares.len() > state.threshold_m as usize, - "Not enough honest shares after C6 verification: {} honest shares, {} required", - honest_shares.len(), - state.threshold_m + 1 - ); + if honest_shares.len() <= state.threshold_m as usize { + warn!( + "Not enough honest shares after C6 verification: {} honest shares, {} required", + honest_shares.len(), + state.threshold_m + 1 + ); + return self.fail_decryption_round(ec); + } // Verify each honest party's raw decryption share matches the // d_commitment attested by their verified C6 proof. Catches the attack @@ -412,12 +418,14 @@ impl ThresholdPlaintextAggregator { dishonest_parties.extend(&share_mismatch_parties); honest_shares.retain(|(id, _)| !share_mismatch_parties.contains(id)); - ensure!( - honest_shares.len() > state.threshold_m as usize, - "Not enough honest shares after d_commitment check: {} honest, {} required", - honest_shares.len(), - state.threshold_m + 1 - ); + if honest_shares.len() <= state.threshold_m as usize { + warn!( + "Not enough honest shares after d_commitment check: {} honest, {} required", + honest_shares.len(), + state.threshold_m + 1 + ); + return self.fail_decryption_round(ec); + } } info!( @@ -446,6 +454,7 @@ impl ThresholdPlaintextAggregator { let trbfv_config = TrBFVConfig::new(state.params.clone(), state.threshold_n, state.threshold_m); + let correlation_id = CorrelationId::new(); let event = ComputeRequest::trbfv( TrBFVRequest::CalculateThresholdDecryption( CalculateThresholdDecryptionRequest { @@ -455,12 +464,13 @@ impl ThresholdPlaintextAggregator { } .into(), ), - CorrelationId::new(), + correlation_id.clone(), self.e3_id.clone(), ); self.bus.publish(event, ec.clone())?; self.honest_c6_proofs_for_agg = Some(honest_c6); + self.threshold_decryption_correlation = Some(correlation_id); self.state.try_mutate(&ec, |_| { Ok(ThresholdPlaintextAggregatorState::Computing(Computing { @@ -630,12 +640,14 @@ impl ThresholdPlaintextAggregator { .map(|sp| sp.payload.proof.clone()) .collect(); - ensure!( - proofs.len() == state.plaintext.len(), - "C7 proof count mismatch: got {} proofs for {} ciphertext indices", - proofs.len(), - state.plaintext.len() - ); + if proofs.len() != state.plaintext.len() { + warn!( + "C7 proof count mismatch: got {} proofs for {} ciphertext indices", + proofs.len(), + state.plaintext.len() + ); + return self.fail_decryption_round(ec); + } info!("C7 proof signed — awaiting DecryptionAggregation..."); self.c7_proofs_pending = Some(proofs); @@ -663,10 +675,12 @@ impl ThresholdPlaintextAggregator { // With proof aggregation enabled we must have a complete C6 set; otherwise we'd publish // `decryption_aggregator_proofs = Vec::new()`, which downstream consumers interpret as // "aggregation disabled". Fail loudly instead so the missing shares are surfaced. - ensure!( - !honest_c6.is_empty() && honest_c6.iter().all(|(_, w)| !w.is_empty()), - "DecryptionAggregation: honest C6 inner proofs missing while proof aggregation is enabled" - ); + if honest_c6.is_empty() || honest_c6.iter().any(|(_, w)| w.is_empty()) { + warn!( + "DecryptionAggregation: honest C6 inner proofs missing while proof aggregation is enabled" + ); + return self.fail_decryption_round(ec.clone()); + } let state: GeneratingC7Proof = self .state .get() @@ -676,12 +690,14 @@ impl ThresholdPlaintextAggregator { // first `T + 1` parties after sorting by party id (`handle_decrypted_shares_aggregation_proof` // truncates); fold slot indices must stay in `0..T+1` and use that same party subset. let c6_total_slots = state.threshold_m as usize + 1; - ensure!( - honest_c6.len() >= c6_total_slots, - "DecryptionAggregation needs at least {} honest C6 parties, have {}", - c6_total_slots, - honest_c6.len() - ); + if honest_c6.len() < c6_total_slots { + warn!( + "DecryptionAggregation needs at least {} honest C6 parties, have {}", + c6_total_slots, + honest_c6.len() + ); + return self.fail_decryption_round(ec.clone()); + } let num_ct = c7_proofs.len(); let mut jobs = Vec::with_capacity(num_ct); for ct_idx in 0..num_ct { @@ -689,7 +705,8 @@ impl ThresholdPlaintextAggregator { let c6_slot_indices: Vec = (0..c6_total_slots as u32).collect(); for (_, wps) in honest_c6.iter().take(c6_total_slots) { let Some(p) = wps.get(ct_idx) else { - bail!("C6 inner proof missing for party at ct index {}", ct_idx); + warn!("C6 inner proof missing for party at ct index {}", ct_idx); + return self.fail_decryption_round(ec.clone()); }; c6_inner_proofs.push(p.clone()); } @@ -727,6 +744,10 @@ impl ThresholdPlaintextAggregator { match msg.response { // TrBFV threshold decryption response -> transition to GeneratingC7Proof ComputeResponseKind::TrBFV(TrBFVResponse::CalculateThresholdDecryption(response)) => { + if self.threshold_decryption_correlation.as_ref() != Some(&correlation_id) { + return Ok(()); + } + self.threshold_decryption_correlation = None; info!("Received TrBFV threshold decryption response"); let plaintext = response.plaintext; @@ -769,12 +790,14 @@ impl ThresholdPlaintextAggregator { self.decryption_aggregation_correlation = None; // Worker must return one DecryptionAggregator proof per pending C7 ciphertext. if let Some(c7_proofs) = self.c7_proofs_pending.as_ref() { - ensure!( - resp.proofs.len() == c7_proofs.len(), - "DecryptionAggregation response proof count {} != expected {}", - resp.proofs.len(), - c7_proofs.len() - ); + if resp.proofs.len() != c7_proofs.len() { + warn!( + "DecryptionAggregation response proof count {} != expected {}", + resp.proofs.len(), + c7_proofs.len() + ); + return self.fail_decryption_round(ec); + } } self.decryption_aggregator_proofs = Some(resp.proofs); self.try_publish_complete()?; @@ -789,6 +812,43 @@ impl ThresholdPlaintextAggregator { Ok(()) } + fn fail_decryption_round(&mut self, ec: EventContext) -> Result<()> { + self.bus.publish( + E3Failed { + e3_id: self.e3_id.clone(), + failed_at_stage: E3Stage::CiphertextReady, + reason: FailureReason::DecryptionInvalidShares, + }, + ec, + )?; + + self.honest_c6_proofs_for_agg = None; + self.threshold_decryption_correlation = None; + self.decryption_aggregation_correlation = None; + self.c7_proofs_pending = None; + self.decryption_aggregator_proofs = None; + + Ok(()) + } + + fn handle_compute_request_error(&mut self, msg: TypedEvent) -> Result<()> { + let (msg, ec) = msg.into_components(); + if msg.request().e3_id != self.e3_id { + return Ok(()); + } + + let threshold_decryption_failed = + self.threshold_decryption_correlation.as_ref() == Some(msg.correlation_id()); + let decryption_aggregation_failed = + self.decryption_aggregation_correlation.as_ref() == Some(msg.correlation_id()); + + if !threshold_decryption_failed && !decryption_aggregation_failed { + return Ok(()); + } + + self.fail_decryption_round(ec) + } + /// Publish `PlaintextAggregated` when both C7 proofs and decryption aggregation are complete. fn try_publish_complete(&mut self) -> Result<()> { let Some(c7_proofs) = self.c7_proofs_pending.clone() else { @@ -874,6 +934,9 @@ impl Handler for ThresholdPlaintextAggregator { EnclaveEventData::ComputeResponse(data) => { self.notify_sync(ctx, TypedEvent::new(data, ec)) } + EnclaveEventData::ComputeRequestError(data) => { + self.notify_sync(ctx, TypedEvent::new(data, ec)) + } EnclaveEventData::CommitteeMemberExpelled(data) => { self.notify_sync(ctx, TypedEvent::new(data, ec)) } @@ -977,6 +1040,22 @@ impl Handler> for ThresholdPlaintextAggregator { } } +impl Handler> for ThresholdPlaintextAggregator { + type Result = (); + + fn handle( + &mut self, + msg: TypedEvent, + _: &mut Self::Context, + ) -> Self::Result { + trap( + EType::PlaintextAggregation, + &self.bus.with_ec(msg.get_ctx()), + || self.handle_compute_request_error(msg), + ) + } +} + impl Handler> for ThresholdPlaintextAggregator { type Result = (); @@ -1044,3 +1123,283 @@ impl Handler for ThresholdPlaintextAggregator { ctx.stop() } } + +#[cfg(test)] +mod tests { + use super::*; + use e3_data::{AutoPersist, DataStore, InMemStore, PersistableData, Repository}; + use e3_events::{ + Committee, ComputeRequestErrorKind, HistoryCollector, TakeEvents, Unsequenced, ZkError, + }; + use e3_fhe_params::{encode_bfv_params, BfvParamSet, DEFAULT_BFV_PRESET}; + use e3_sortition::{ + CiphernodeSelector, CiphernodeSelectorState, NodeStateStore, SortitionBackend, + SortitionParams, + }; + use e3_test_helpers::get_common_setup; + use std::collections::{BTreeMap, BTreeSet, HashMap}; + + fn test_ctx(data: impl Into) -> EventContext { + EventContext::::from(data.into()).sequence(0) + } + + fn test_persistable(value: T) -> Persistable { + let repo = Repository::::new(DataStore::from_in_mem(&InMemStore::new(false).start())); + repo.to_connector().send(Some(value)) + } + + fn test_params() -> ArcBytes { + ArcBytes::from_bytes(&encode_bfv_params( + &BfvParamSet::from(DEFAULT_BFV_PRESET).build_arc(), + )) + } + + fn dummy_proof(circuit: CircuitName) -> Proof { + Proof::new( + circuit, + ArcBytes::from_bytes(&[1]), + ArcBytes::from_bytes(&[2]), + ) + } + + fn computing_state() -> ThresholdPlaintextAggregatorState { + ThresholdPlaintextAggregatorState::Computing(Computing { + threshold_m: 1, + threshold_n: 2, + shares: vec![(0, vec![ArcBytes::from_bytes(&[7])])], + ciphertext_output: vec![ArcBytes::from_bytes(&[8])], + params: test_params(), + }) + } + + fn verifying_c6_state() -> ThresholdPlaintextAggregatorState { + ThresholdPlaintextAggregatorState::VerifyingC6(VerifyingC6 { + threshold_m: 1, + threshold_n: 2, + shares: BTreeMap::from([ + (0, vec![ArcBytes::from_bytes(&[7])]), + (1, vec![ArcBytes::from_bytes(&[8])]), + ]), + c6_proofs: BTreeMap::new(), + ciphertext_output: vec![ArcBytes::from_bytes(&[9])], + params: test_params(), + }) + } + + fn generating_c7_state() -> ThresholdPlaintextAggregatorState { + ThresholdPlaintextAggregatorState::GeneratingC7Proof(GeneratingC7Proof { + threshold_m: 1, + threshold_n: 2, + shares: vec![(0, vec![ArcBytes::from_bytes(&[7])])], + plaintext: vec![ArcBytes::from_bytes(&[9])], + }) + } + + fn start_sortition(bus: &BusHandle) -> Addr { + let selector = CiphernodeSelector::new( + bus, + test_persistable(CiphernodeSelectorState::default()), + "node-1", + ) + .start(); + + Sortition::new(SortitionParams { + bus: bus.clone(), + backends: test_persistable(HashMap::::new()), + node_state: test_persistable(HashMap::::new()), + finalized_committees: test_persistable(HashMap::::new()), + ciphernode_selector: selector, + address: "node-1".to_string(), + }) + .start() + } + + async fn build_plaintext_aggregator( + initial_state: ThresholdPlaintextAggregatorState, + ) -> Result<( + ThresholdPlaintextAggregator, + Addr>, + E3id, + )> { + let (bus, _rng, _seed, _params, _crp, _errors, history) = + get_common_setup(Some(BfvPreset::InsecureThreshold512.into()))?; + let e3_id = E3id::new("42", 1); + let aggregator = ThresholdPlaintextAggregator::new( + ThresholdPlaintextAggregatorParams { + bus: bus.clone(), + sortition: start_sortition(&bus), + e3_id: e3_id.clone(), + params_preset: BfvPreset::InsecureThreshold512, + proof_aggregation_enabled: true, + }, + test_persistable(initial_state), + ); + + Ok((aggregator, history, e3_id)) + } + + async fn next_event(history: &Addr>) -> Result { + let mut result = history.send(TakeEvents::::new(1)).await?; + assert!(!result.timed_out, "timed out waiting for an event"); + Ok(result.events.pop().expect("expected one event")) + } + + #[actix::test] + async fn threshold_decryption_compute_error_emits_e3_failed() -> Result<()> { + let correlation_id = CorrelationId::new(); + let (mut aggregator, history, e3_id) = + build_plaintext_aggregator(computing_state()).await?; + aggregator.threshold_decryption_correlation = Some(correlation_id.clone()); + + let request = ComputeRequest::trbfv( + TrBFVRequest::CalculateThresholdDecryption(CalculateThresholdDecryptionRequest { + ciphertexts: vec![ArcBytes::from_bytes(&[8])], + trbfv_config: TrBFVConfig::new(test_params(), 2, 1), + d_share_polys: vec![(0, vec![ArcBytes::from_bytes(&[7])])], + }), + correlation_id, + e3_id.clone(), + ); + + aggregator.handle_compute_request_error(TypedEvent::new( + ComputeRequestError::new( + ComputeRequestErrorKind::TrBFV(e3_trbfv::TrBFVError::CalculateThresholdDecryption( + "boom".to_string(), + )), + request, + ), + test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }), + ))?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CiphertextReady + && data.reason == FailureReason::DecryptionInvalidShares + )); + assert!(aggregator.threshold_decryption_correlation.is_none()); + + Ok(()) + } + + #[actix::test] + async fn insufficient_honest_c6_shares_emit_e3_failed() -> Result<()> { + let (mut aggregator, history, e3_id) = + build_plaintext_aggregator(verifying_c6_state()).await?; + + aggregator.handle_c6_verification_complete(TypedEvent::new( + ShareVerificationComplete { + e3_id: e3_id.clone(), + kind: VerificationKind::ThresholdDecryptionProofs, + dishonest_parties: BTreeSet::from([1]), + }, + test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }), + ))?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CiphertextReady + && data.reason == FailureReason::DecryptionInvalidShares + )); + + Ok(()) + } + + #[actix::test] + async fn decryption_aggregation_compute_error_emits_e3_failed() -> Result<()> { + let correlation_id = CorrelationId::new(); + let (mut aggregator, history, e3_id) = + build_plaintext_aggregator(generating_c7_state()).await?; + aggregator.c7_proofs_pending = Some(vec![dummy_proof(CircuitName::PkAggregation)]); + aggregator.honest_c6_proofs_for_agg = Some(vec![( + 0, + vec![dummy_proof(CircuitName::ThresholdShareDecryption)], + )]); + aggregator.decryption_aggregation_correlation = Some(correlation_id.clone()); + aggregator.last_ec = Some(test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + })); + + let request = ComputeRequest::zk( + ZkRequest::DecryptionAggregation(DecryptionAggregationRequest { + c6_total_slots: 1, + jobs: Vec::new(), + params_preset: BfvPreset::InsecureThreshold512, + }), + correlation_id, + e3_id.clone(), + ); + + aggregator.handle_compute_request_error(TypedEvent::new( + ComputeRequestError::new( + ComputeRequestErrorKind::Zk(ZkError::ProofGenerationFailed("boom".to_string())), + request, + ), + test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }), + ))?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CiphertextReady + && data.reason == FailureReason::DecryptionInvalidShares + )); + assert!(aggregator.decryption_aggregation_correlation.is_none()); + assert!(aggregator.c7_proofs_pending.is_none()); + + Ok(()) + } + + #[actix::test] + async fn missing_c6_inner_proofs_emit_e3_failed() -> Result<()> { + let (mut aggregator, history, e3_id) = + build_plaintext_aggregator(generating_c7_state()).await?; + aggregator.c7_proofs_pending = Some(vec![dummy_proof(CircuitName::PkAggregation)]); + aggregator.honest_c6_proofs_for_agg = Some(vec![ + (0, vec![]), + (1, vec![dummy_proof(CircuitName::ThresholdShareDecryption)]), + ]); + + aggregator.dispatch_decryption_aggregation(&test_ctx(E3Failed { + e3_id: e3_id.clone(), + failed_at_stage: E3Stage::None, + reason: FailureReason::None, + }))?; + + let event = next_event(&history).await?; + assert!(matches!( + event.into_data(), + EnclaveEventData::E3Failed(data) + if data.e3_id == e3_id + && data.failed_at_stage == E3Stage::CiphertextReady + && data.reason == FailureReason::DecryptionInvalidShares + )); + assert!(aggregator.honest_c6_proofs_for_agg.is_none()); + assert!(aggregator.decryption_aggregation_correlation.is_none()); + assert!(aggregator.c7_proofs_pending.is_none()); + assert!(aggregator.decryption_aggregator_proofs.is_none()); + + Ok(()) + } +}