diff --git a/Cargo.lock b/Cargo.lock index 4d827d245..8af87c239 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3326,7 +3326,9 @@ dependencies = [ "e3-net", "e3-request", "e3-sortition", + "e3-sync", "e3-test-helpers", + "e3-utils", "e3-zk-prover", "hex", "libp2p", @@ -3337,6 +3339,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "tempfile", "tokio", "tracing", "zeroize", @@ -3898,7 +3901,6 @@ dependencies = [ "alloy", "anyhow", "derivative", - "e3-committee-hash", "e3-utils-derive", "hex", "rand 0.9.2", diff --git a/agent/flow-trace/05_FAILURE_REFUND_SLASHING.md b/agent/flow-trace/05_FAILURE_REFUND_SLASHING.md index e5d9481e8..c22e2168e 100644 --- a/agent/flow-trace/05_FAILURE_REFUND_SLASHING.md +++ b/agent/flow-trace/05_FAILURE_REFUND_SLASHING.md @@ -421,9 +421,6 @@ check_quorum(accusation_id): │ ├─ Multiple data_hashes across ALL votes? │ │ └─ YES → AccusationOutcome::Equivocation (SLASHABLE) │ │ -│ ├─ Only accuser says bad, others disagree? -│ │ └─ AccusationOutcome::AccuserLied (NOT slashable) -│ │ │ └─ Otherwise → AccusationOutcome::Inconclusive (NOT slashable) │ └─ CASE C: Still waiting for more votes diff --git a/crates/aggregator/src/domain/failover.rs b/crates/aggregator/src/domain/failover.rs new file mode 100644 index 000000000..3e8af9ab1 --- /dev/null +++ b/crates/aggregator/src/domain/failover.rs @@ -0,0 +1,205 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +//! Aggregator liveness / failover policy. +//! +//! The active aggregator for an E3 is chosen deterministically as the lowest +//! non-skipped party in the (score-ordered) committee +//! ([`e3_events::Committee::active_aggregator_party_id`]). If that node crashes +//! after the committee is finalised but before it publishes the aggregated +//! result on-chain, the round stalls indefinitely: no on-chain expulsion is +//! triggered (the node committed no fault), so nothing demotes it. +//! +//! This module is the pure, testable brain of an automatic failover: it tracks +//! how long the *expected on-chain progress* from the active aggregator has been +//! absent and, once a wall-clock budget elapses, decides to mark the current +//! aggregator as locally unresponsive and promote the next standby. Because the +//! committee order and the skip set are derived from signals every node shares, +//! all honest nodes converge on the same replacement without a leader-election +//! round. A brief overlap (old + new aggregator both publishing) is bounded by +//! the timeout and made harmless by the on-chain publish being single-shot. +//! +//! All decisions here are deterministic given their inputs (no clock access), +//! so the policy is exercised entirely by unit tests; the driving actor owns the +//! wall clock and feeds `elapsed` in. + +use std::time::Duration; + +/// The progress an aggregator round can be waiting on. Used to scope which +/// absence of progress should arm the failover timer. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AggregatorPhase { + /// Waiting for the public key to be published on-chain (DKG output). + AwaitingPublicKey, + /// Waiting for the plaintext output to be published on-chain (decryption). + AwaitingPlaintext, + /// The round is finished; no aggregator action is pending. + Settled, +} + +/// Policy parameters for failover. A single wall-clock budget governs how long +/// the active aggregator may be silent before the next standby is promoted. +#[derive(Debug, Clone, Copy)] +pub struct FailoverPolicy { + /// How long the expected on-chain progress may be absent before the active + /// aggregator is presumed down and the next standby promoted. + timeout: Duration, +} + +impl FailoverPolicy { + pub fn new(timeout: Duration) -> Self { + Self { timeout } + } + + pub fn timeout(&self) -> Duration { + self.timeout + } +} + +/// The outcome of a liveness evaluation for one E3. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FailoverDecision { + /// The round is settled or making progress within budget; do nothing. + Hold, + /// The active aggregator (`demote`, a party_id) has been silent past the + /// budget. Add it to the locally-presumed-unresponsive set and promote the + /// next standby (`promote_to`, with its address). + Promote { + demote: u64, + promote_to: u64, + new_addr: String, + }, + /// Every standby has been exhausted; the round cannot make progress and must + /// be failed by the caller. + Exhausted { demote: u64 }, +} + +/// Decide whether to fail over for a single E3. +/// +/// Inputs: +/// - `phase`: what on-chain progress is pending (Settled => always `Hold`). +/// - `elapsed`: time since the active aggregator was (re)assigned or last made +/// observable progress. +/// - `active`: the current active aggregator party_id. +/// - `standbys`: ordered `[(party_id, addr), ...]` of non-skipped members in +/// promotion order, as produced by +/// [`e3_events::Committee::aggregator_standbys`]. `active` is expected to be +/// `standbys[0].0` when present. +/// +/// Deterministic and clock-free. +pub fn decide_failover( + policy: &FailoverPolicy, + phase: AggregatorPhase, + elapsed: Duration, + active: u64, + standbys: &[(u64, String)], +) -> FailoverDecision { + if phase == AggregatorPhase::Settled { + return FailoverDecision::Hold; + } + if elapsed < policy.timeout { + return FailoverDecision::Hold; + } + // Promote the first standby strictly after the active aggregator in order. + match standbys.iter().find(|(party_id, _)| *party_id > active) { + Some((next_party, next_addr)) => FailoverDecision::Promote { + demote: active, + promote_to: *next_party, + new_addr: next_addr.clone(), + }, + None => FailoverDecision::Exhausted { demote: active }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn policy() -> FailoverPolicy { + FailoverPolicy::new(Duration::from_secs(60)) + } + + fn standbys() -> Vec<(u64, String)> { + vec![(0, "0xa".into()), (1, "0xb".into()), (2, "0xc".into())] + } + + #[test] + fn settled_always_holds() { + let d = decide_failover( + &policy(), + AggregatorPhase::Settled, + Duration::from_secs(10_000), + 0, + &standbys(), + ); + assert_eq!(d, FailoverDecision::Hold); + } + + #[test] + fn within_budget_holds() { + let d = decide_failover( + &policy(), + AggregatorPhase::AwaitingPublicKey, + Duration::from_secs(59), + 0, + &standbys(), + ); + assert_eq!(d, FailoverDecision::Hold); + } + + #[test] + fn past_budget_promotes_next_standby() { + let d = decide_failover( + &policy(), + AggregatorPhase::AwaitingPlaintext, + Duration::from_secs(61), + 0, + &standbys(), + ); + assert_eq!( + d, + FailoverDecision::Promote { + demote: 0, + promote_to: 1, + new_addr: "0xb".into() + } + ); + } + + #[test] + fn promotes_past_already_skipped_members() { + // active is party 1 (party 0 already skipped); next is party 2. + let remaining = vec![(1, "0xb".to_string()), (2, "0xc".to_string())]; + let d = decide_failover( + &policy(), + AggregatorPhase::AwaitingPlaintext, + Duration::from_secs(120), + 1, + &remaining, + ); + assert_eq!( + d, + FailoverDecision::Promote { + demote: 1, + promote_to: 2, + new_addr: "0xc".into() + } + ); + } + + #[test] + fn exhausted_when_no_standby_remains() { + let only_last = vec![(2, "0xc".to_string())]; + let d = decide_failover( + &policy(), + AggregatorPhase::AwaitingPlaintext, + Duration::from_secs(120), + 2, + &only_last, + ); + assert_eq!(d, FailoverDecision::Exhausted { demote: 2 }); + } +} diff --git a/crates/aggregator/src/domain/mod.rs b/crates/aggregator/src/domain/mod.rs index eb31cc8f8..559a56980 100644 --- a/crates/aggregator/src/domain/mod.rs +++ b/crates/aggregator/src/domain/mod.rs @@ -10,5 +10,6 @@ pub mod committee; pub mod committee_hash; +pub mod failover; pub mod publickey_aggregation; pub mod threshold_plaintext_aggregation; diff --git a/crates/ciphernode-builder/src/evm_system.rs b/crates/ciphernode-builder/src/evm_system.rs index fa68574a2..33bbebd21 100644 --- a/crates/ciphernode-builder/src/evm_system.rs +++ b/crates/ciphernode-builder/src/evm_system.rs @@ -77,13 +77,15 @@ impl EvmSystemChainBuilder

{ // The event is defined here move |msg| { // Extract config - let deploy_block = msg.get_evm_config(chain_id)?.deploy_block(); + let chain_config = msg.get_evm_config(chain_id)?; + let deploy_block = chain_config.deploy_block(); + let confirmations = chain_config.confirmations(); // Pass next to the router let router = configure_router(next, route_factories); // Extract filters from the router - let filters = filters_from_router(&router, deploy_block); + let filters = filters_from_router(&router, deploy_block, confirmations); // Setup and start the read interface and the router EvmReadInterface::setup_with_factory( @@ -117,6 +119,7 @@ fn configure_router( router } -fn filters_from_router(router: &EvmRouter, deploy_block: u64) -> Filters { +fn filters_from_router(router: &EvmRouter, deploy_block: u64, confirmations: u64) -> Filters { Filters::from_routing_table(router.get_routing_table(), deploy_block) + .with_confirmations(confirmations) } diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index 0c22d0f3e..512f462c8 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -9,6 +9,7 @@ use crate::config::{self, ConfigCommands}; use crate::events::{self, EventsCommands}; use crate::helpers::telemetry::{setup_simple_tracing, setup_tracing}; use crate::net::{self, NetCommands}; +use crate::node::{self, NodeCommands as NodeStateCommands}; use crate::nodes::{self, NodeCommands}; use crate::noir::NoirCommands; use crate::password::PasswordCommands; @@ -184,6 +185,7 @@ impl Cli { Commands::Noir { command } => noir::execute(out, command, &config).await?, Commands::Net { command } => net::execute(&out, command, &config).await?, Commands::Events { command } => events::execute(out, command, &config).await?, + Commands::Node { command } => node::execute(out, command, &config).await?, Commands::Rev => rev::execute(out).await?, Commands::Config { command } => config::execute(out, command, &config).await?, } @@ -296,6 +298,12 @@ pub enum Commands { command: NodeCommands, }, + /// Single-node maintenance commands (validate on-disk state, etc.) + Node { + #[command(subcommand)] + command: NodeStateCommands, + }, + /// Manage net configuration Net { #[command(subcommand)] diff --git a/crates/cli/src/config_setup.rs b/crates/cli/src/config_setup.rs index de892a780..1084b85e5 100644 --- a/crates/cli/src/config_setup.rs +++ b/crates/cli/src/config_setup.rs @@ -113,6 +113,7 @@ chains: #[cfg(test)] mod tests { use super::{execute, validate_eth_address, BOOTSTRAP_PEER}; + use alloy::primitives::Address; use anyhow::Result; use std::fs; @@ -125,7 +126,7 @@ mod tests { )); let _ = fs::remove_dir_all(&dir); - let config = execute("ws://localhost:8545", &dir)?; + let config = execute("ws://localhost:8545", &Address::ZERO, &dir)?; // The loader must actually read the prepopulated peer from `node.peers`. assert_eq!(config.peers(), vec![BOOTSTRAP_PEER.to_string()]); diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index a99066484..bc7958d61 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -21,6 +21,7 @@ pub mod helpers; mod init; mod net; mod net_get_peer_id; +mod node; mod nodes; mod nodes_daemon; mod nodes_down; diff --git a/crates/cli/src/node.rs b/crates/cli/src/node.rs new file mode 100644 index 000000000..e64d1bed3 --- /dev/null +++ b/crates/cli/src/node.rs @@ -0,0 +1,41 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::{bail, Result}; +use clap::Subcommand; +use e3_config::AppConfig; +use e3_console::{log, Console}; +use e3_entrypoint::validate::validate_node; + +#[derive(Subcommand, Clone, Debug)] +pub enum NodeCommands { + /// Validate the on-disk state of a single node without starting it. + /// + /// Opens the node's persisted stores read-only and checks that the schema + /// is loadable by this binary, the event log is intact, the snapshot cursor + /// is consistent, and there are no orphaned committee tickets ("loose + /// ends"). Safe to run while the node is stopped; intended as the + /// pre-upgrade and post-crash health check. Exits non-zero on failure. + Validate, +} + +pub async fn execute(out: Console, command: NodeCommands, config: &AppConfig) -> Result<()> { + match command { + NodeCommands::Validate => { + // Offline-only contract: hold the same cross-host fence `start` uses so the + // validator cannot read state out from under a live node or race a concurrent + // `enclave start`. Released when this scope ends. + let _fence = + e3_entrypoint::fence::ProcessFence::acquire(&config.db_file(), &config.name())?; + let report = validate_node(config).await?; + log!(out, "{}", report.render()); + if report.has_failure() { + bail!("node validation failed"); + } + } + } + Ok(()) +} diff --git a/crates/cli/src/start.rs b/crates/cli/src/start.rs index 139c6fe0c..636b2959f 100644 --- a/crates/cli/src/start.rs +++ b/crates/cli/src/start.rs @@ -27,6 +27,13 @@ pub async fn execute(mut config: AppConfig, peers: Vec) -> Result<()> { tokio::pin!(shutdown); owo(); + + // Cross-host fence: ensure only one instance runs against this data directory. + // Acquired *before* binding the control port or spawning background work so a second + // instance fails fast instead of racing on the shared data directory. + // Held for the lifetime of this function (the running process); released on exit. + let _fence = e3_entrypoint::fence::ProcessFence::acquire(&config.db_file(), &config.name())?; + launch_socket_server(config.ctrl_port()); if let Some(dashboard_port) = config.dashboard_port() { diff --git a/crates/config/src/chain_config.rs b/crates/config/src/chain_config.rs index 69a203932..392fedee9 100644 --- a/crates/config/src/chain_config.rs +++ b/crates/config/src/chain_config.rs @@ -24,6 +24,10 @@ pub struct ChainConfig { pub rpc_auth: RpcAuth, pub contracts: ContractAddresses, pub finalization_ms: Option, + /// Number of block confirmations to wait before ingesting an on-chain log. + /// `None`/`0` reads to head; a positive value makes historical/backfill + /// ingestion reorg-safe by only acting on logs buried this deep. + pub reorg_confirmations: Option, pub chain_id: Option, } @@ -57,7 +61,8 @@ impl TryFrom<&ChainConfig> for EvmEventConfigChain { lowest_block = [lowest_block, deploy_block].into_iter().flatten().min(); } let start_block = lowest_block.unwrap_or(0); - Ok(EvmEventConfigChain::new(start_block)) + Ok(EvmEventConfigChain::new(start_block) + .with_confirmations(value.reorg_confirmations.unwrap_or(0))) } } diff --git a/crates/data/src/in_mem_kv_store.rs b/crates/data/src/in_mem_kv_store.rs index 93fc2a109..12c2dac27 100644 --- a/crates/data/src/in_mem_kv_store.rs +++ b/crates/data/src/in_mem_kv_store.rs @@ -74,13 +74,13 @@ impl InMemKvStore { self.log.clone() } - /// Serializes the store to the legacy bincode `BTreeMap` dump format. + /// Serializes the store to the bincode `BTreeMap` dump format. pub fn dump(&self) -> Result> { let map: BTreeMap, Vec> = self.db.entries().into_iter().collect(); bincode::serialize(&map).context("Error serializing in-memory store") } - /// Reconstructs a store from a legacy bincode `BTreeMap` dump. + /// Reconstructs a store from a bincode `BTreeMap` dump. pub fn from_dump(bytes: &[u8], capture: bool) -> Result { let map: BTreeMap, Vec> = bincode::deserialize(bytes).context("Error deserializing in-memory store")?; @@ -164,9 +164,8 @@ mod tests { } #[test] - fn dump_format_matches_legacy_btreemap() { - // The dump must be byte-identical to a bincode-encoded BTreeMap so that - // dumps written by older nodes remain readable. + fn dump_format_is_bincode_btreemap() { + // The dump is a bincode-encoded BTreeMap of the store's entries. let mut store = InMemKvStore::new(false); store.insert(b"alpha".to_vec(), b"1".to_vec(), None); store.insert(b"beta".to_vec(), b"2".to_vec(), None); diff --git a/crates/entrypoint/Cargo.toml b/crates/entrypoint/Cargo.toml index 490e4da55..8e651c84e 100644 --- a/crates/entrypoint/Cargo.toml +++ b/crates/entrypoint/Cargo.toml @@ -35,6 +35,8 @@ e3-request = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } e3-sortition = { workspace = true } +e3-sync = { workspace = true } +e3-utils = { workspace = true } serde_yaml = { workspace = true } e3-test-helpers = { workspace = true } e3-zk-prover = { workspace = true } @@ -46,3 +48,6 @@ reqwest = { workspace = true } [build-dependencies] serde_json = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/entrypoint/src/fence.rs b/crates/entrypoint/src/fence.rs new file mode 100644 index 000000000..429e4a640 --- /dev/null +++ b/crates/entrypoint/src/fence.rs @@ -0,0 +1,169 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +//! Cross-host fencing: a process-lifetime advisory lock that guarantees at most +//! one active instance per node data directory. +//! +//! Two copies of the same ciphernode running against the same data directory +//! would double-sign, race on the commitlog, and corrupt derived state. `sled` +//! already takes an OS lock on its own directory, but its error surfaces late +//! and cryptically, and other on-disk state (the key file, snapshots) is not +//! covered. This fence is acquired *before* any store is opened so the second +//! instance fails fast with a clear, actionable message. +//! +//! The lock is an exclusive advisory lock (`flock`-style, via stable +//! [`std::fs::File::try_lock`]) on a dedicated `enclave.lock` file in the node's +//! data directory. The OS releases the lock automatically when the holding +//! process exits (gracefully, by crash, or by kill), so there is no stale-lock +//! recovery problem: a crashed node's lock is immediately reacquirable. +//! +//! Hold the returned [`ProcessFence`] for the lifetime of the process; dropping +//! it releases the lock. + +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use anyhow::{bail, Context, Result}; +use tracing::info; + +/// The lock-file name placed in the node's data directory. +const LOCK_FILE_NAME: &str = "enclave.lock"; + +/// A held cross-host fence. While this value is alive the process holds an +/// exclusive advisory lock on the node's data directory. Dropping it (on exit) +/// releases the lock. +#[derive(Debug)] +pub struct ProcessFence { + /// Keeps the OS lock alive for the lifetime of this guard. + _file: File, + path: PathBuf, +} + +impl ProcessFence { + /// The path to the lock file being held. + pub fn path(&self) -> &Path { + &self.path + } + + /// Acquire the fence for the data directory derived from `db_path` (the + /// directory passed to the store, e.g. `config.db_file()`). + /// + /// The lock file is created alongside the database directory and tagged with + /// the holder's pid and `node_name` for diagnostics. Returns an error if + /// another live process already holds the fence. + pub fn acquire(db_path: &Path, node_name: &str) -> Result { + let lock_path = lock_path_for(db_path); + Self::acquire_at(&lock_path, node_name) + } + + /// Acquire the fence at an explicit lock-file path. Exposed for tests and + /// callers that know the precise lock location. + pub fn acquire_at(lock_path: &Path, node_name: &str) -> Result { + if let Some(parent) = lock_path.parent() { + std::fs::create_dir_all(parent).with_context(|| { + format!("failed to create data directory for fence at {parent:?}") + })?; + } + + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .open(lock_path) + .with_context(|| format!("failed to open fence lock file at {lock_path:?}"))?; + + match file.try_lock() { + Ok(()) => {} + Err(std::fs::TryLockError::WouldBlock) => { + bail!( + "another enclave instance is already running for node '{node_name}' \ + against this data directory (lock held at {lock_path:?}). Refusing to \ + start a second instance, which would double-sign and corrupt state. \ + Stop the other instance first." + ); + } + Err(std::fs::TryLockError::Error(err)) => { + return Err(err) + .with_context(|| format!("failed to acquire fence lock at {lock_path:?}")); + } + } + + // Best-effort diagnostics: record who holds the lock. Failure to write + // does not affect correctness (the OS lock is already held). + let pid = std::process::id(); + let _ = (|| -> std::io::Result<()> { + file.set_len(0)?; + write!(file, "node={node_name}\npid={pid}\n")?; + file.flush() + })(); + + info!("Acquired process fence for node '{node_name}' (pid {pid}) at {lock_path:?}"); + + Ok(Self { + _file: file, + path: lock_path.to_path_buf(), + }) + } +} + +/// Compute the lock-file path for a given database path. The lock lives in the +/// database directory's parent (the node data directory) so it covers all of +/// the node's on-disk state, not just the sled DB. +fn lock_path_for(db_path: &Path) -> PathBuf { + match db_path.parent() { + Some(parent) if !parent.as_os_str().is_empty() => parent.join(LOCK_FILE_NAME), + _ => db_path.with_extension("lock"), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + + #[test] + fn lock_path_is_sibling_of_db_dir() { + let p = lock_path_for(Path::new("/data/node1/db")); + assert_eq!(p, PathBuf::from("/data/node1/enclave.lock")); + } + + #[test] + fn second_acquire_on_same_dir_fails() { + let dir = tempdir().unwrap(); + let db = dir.path().join("db"); + let first = ProcessFence::acquire(&db, "node1").expect("first acquire"); + + let err = ProcessFence::acquire(&db, "node1") + .expect_err("second acquire must fail while first is held"); + assert!( + err.to_string().contains("already running"), + "unexpected error: {err}" + ); + + drop(first); + // After releasing, a fresh acquire succeeds again. + let _again = ProcessFence::acquire(&db, "node1").expect("reacquire after drop"); + } + + #[test] + fn different_data_dirs_do_not_conflict() { + let dir_a = tempdir().unwrap(); + let dir_b = tempdir().unwrap(); + let _a = ProcessFence::acquire(&dir_a.path().join("db"), "a").unwrap(); + let _b = ProcessFence::acquire(&dir_b.path().join("db"), "b").unwrap(); + } + + #[test] + fn lock_file_records_pid() { + let dir = tempdir().unwrap(); + let fence = ProcessFence::acquire(&dir.path().join("db"), "diag").unwrap(); + let contents = std::fs::read_to_string(fence.path()).unwrap(); + assert!(contents.contains("node=diag")); + assert!(contents.contains(&format!("pid={}", std::process::id()))); + } +} diff --git a/crates/entrypoint/src/lib.rs b/crates/entrypoint/src/lib.rs index 7fbfbb024..7931c7db5 100644 --- a/crates/entrypoint/src/lib.rs +++ b/crates/entrypoint/src/lib.rs @@ -5,9 +5,11 @@ // or FITNESS FOR A PARTICULAR PURPOSE. pub mod config; +pub mod fence; pub mod helpers; pub mod net; pub mod nodes; pub mod password; pub mod start; +pub mod validate; pub mod wallet; diff --git a/crates/entrypoint/src/validate.rs b/crates/entrypoint/src/validate.rs new file mode 100644 index 000000000..115bf7a22 --- /dev/null +++ b/crates/entrypoint/src/validate.rs @@ -0,0 +1,590 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +//! Offline node-state validation. +//! +//! Backs the `enclave node validate` CLI command. It opens a node's persisted +//! stores **read-only** (no actors, no network, no chain writes) and answers the +//! operator question: *"Is my on-disk state intact, internally consistent, free +//! of loose ends, and will this binary be able to load it after an upgrade?"* +//! +//! It is deliberately non-destructive. It never mutates the store, never talks to +//! the chain, and never starts the node. It is safe to run while the node is +//! stopped (the recommended pre-upgrade step) and surfaces problems as a +//! structured report with a non-zero exit on failure. +//! +//! ## Checks performed +//! +//! 1. **Event-store integrity** — reads every event for every aggregate from +//! sequence 0 and verifies the sequence numbers are contiguous and strictly +//! increasing. A gap or a decode failure means the commit log (the source of +//! truth) is truncated or corrupt. +//! 2. **Snapshot cursor consistency** — verifies the persisted per-aggregate +//! sequence cursor does not point past the last event actually present in the +//! log (which would indicate a snapshot that is ahead of a truncated log). +//! 3. **Open-loop / loose-ends audit** — loads the persisted sortition state and +//! flags any committee that still holds an active-job slot **even though the +//! event log already contains a terminal event** for that E3. These are the +//! orphaned tickets that a crash mid-E3 can leave behind; they are the +//! "loose ends" a restart should clean up. + +use crate::helpers::datastore::{get_eventstore_reader, get_repositories}; +use anyhow::{anyhow, Result}; +use e3_ciphernode_builder::global_eventstore_cache::EventStoreReader; +use e3_config::AppConfig; +use e3_data::Repositories; +use e3_events::{ + AggregateId, CorrelationId, E3Stage, EnclaveEvent, EnclaveEventData, Event, + EventContextAccessors, EventContextSeq, EventStoreQueryBy, EventStoreQueryResponse, SeqAgg, +}; +use e3_sortition::{committee_key, NodeRegistry, NodeStateRepositoryFactory, NodeStateStore}; +use e3_sync::SyncRepositoryFactory; +use e3_utils::actix::channel as actix_toolbox; +use std::collections::{BTreeMap, HashMap, HashSet}; + +/// Outcome severity for a single validation check. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Severity { + /// Check passed; nothing to do. + Pass, + /// Non-fatal observation the operator should be aware of. + Warn, + /// A real problem that must be resolved before the node can be trusted. + Fail, +} + +impl Severity { + fn label(self) -> &'static str { + match self { + Severity::Pass => "PASS", + Severity::Warn => "WARN", + Severity::Fail => "FAIL", + } + } +} + +/// Result of a single named validation check. +#[derive(Clone, Debug)] +pub struct CheckResult { + /// Short, stable name of the check (e.g. `"schema"`). + pub name: String, + /// Severity of the outcome. + pub severity: Severity, + /// Human-readable detail explaining the outcome. + pub detail: String, +} + +impl CheckResult { + fn pass(name: &str, detail: impl Into) -> Self { + Self { + name: name.into(), + severity: Severity::Pass, + detail: detail.into(), + } + } + fn warn(name: &str, detail: impl Into) -> Self { + Self { + name: name.into(), + severity: Severity::Warn, + detail: detail.into(), + } + } + fn fail(name: &str, detail: impl Into) -> Self { + Self { + name: name.into(), + severity: Severity::Fail, + detail: detail.into(), + } + } +} + +/// Aggregated result of running every validation check. +#[derive(Clone, Debug, Default)] +pub struct ValidationReport { + /// Individual check outcomes, in execution order. + pub checks: Vec, +} + +impl ValidationReport { + fn push(&mut self, check: CheckResult) { + self.checks.push(check); + } + + /// Whether any check failed (i.e. the node should not be trusted/upgraded as-is). + pub fn has_failure(&self) -> bool { + self.checks.iter().any(|c| c.severity == Severity::Fail) + } + + /// Whether any check produced a warning. + pub fn has_warning(&self) -> bool { + self.checks.iter().any(|c| c.severity == Severity::Warn) + } + + /// Render the report as human-readable text. + pub fn render(&self) -> String { + let mut out = String::new(); + out.push_str("Enclave node validation report\n"); + out.push_str("==============================\n"); + for c in &self.checks { + out.push_str(&format!( + "[{}] {}: {}\n", + c.severity.label(), + c.name, + c.detail + )); + } + let verdict = if self.has_failure() { + "VALIDATION FAILED — resolve the FAIL items before starting or upgrading this node." + } else if self.has_warning() { + "VALIDATION PASSED WITH WARNINGS — review the WARN items." + } else { + "VALIDATION PASSED — state is intact and consistent." + }; + out.push_str("------------------------------\n"); + out.push_str(verdict); + out.push('\n'); + out + } +} + +/// Run every validation check against the node configured by `config`. +/// +/// Opens the persisted stores read-only. Returns the full report; callers decide +/// how to surface it (the CLI prints it and exits non-zero on failure). +pub async fn validate_node(config: &AppConfig) -> Result { + let repositories = get_repositories(config)?; + let eventstore = get_eventstore_reader(config)?; + let aggregate_ids = aggregate_ids(config); + + let mut report = ValidationReport::default(); + + // 1 + 2. Read every event per aggregate, check integrity + cursor consistency, + // and collect the terminal-event keys for the open-loop audit. + let mut terminal_keys: HashSet = HashSet::new(); + let mut total_events: u64 = 0; + for agg in &aggregate_ids { + let events = read_all_events(&eventstore, *agg).await?; + total_events += events.len() as u64; + + collect_terminal_keys(&events, &mut terminal_keys); + + let seqs: Vec = events.iter().map(|e| e.seq()).collect(); + report.push(check_sequence_integrity(*agg, &seqs)); + + let cursor = repositories.aggregate_seq(*agg).read().await?.unwrap_or(0); + report.push(check_cursor_consistency(*agg, cursor, &seqs)); + } + report.push(CheckResult::pass( + "event-store", + format!( + "read {total_events} event(s) across {} aggregate(s)", + aggregate_ids.len() + ), + )); + + // 3. Open-loop / loose-ends audit against the persisted sortition state. + report.push(check_open_loops(&repositories, &terminal_keys).await?); + + Ok(report) +} + +/// The set of aggregate ids to inspect: the local aggregate (0) plus one per +/// configured chain. Mirrors [`AggregateId::from_chain_id`] so the validator +/// looks at exactly the aggregates the running node persists. +fn aggregate_ids(config: &AppConfig) -> Vec { + let mut ids: Vec = vec![AggregateId::new(0)]; + for chain in config.chains() { + let id = AggregateId::from_chain_id(chain.chain_id); + if !ids.contains(&id) { + ids.push(id); + } + } + ids +} + +/// Verify the event sequence numbers are contiguous and strictly increasing. +fn check_sequence_integrity(agg: AggregateId, seqs: &[u64]) -> CheckResult { + let name = "event-sequence"; + if seqs.is_empty() { + return CheckResult::pass(name, format!("aggregate {}: no events", agg.to_usize())); + } + // Per-aggregate sequences are 1-indexed (the commit log returns `offset + 1`), + // so a healthy log's first event is seq 1. A higher first seq means the head of + // the log was truncated — catch it explicitly, since an internal-gap scan alone + // treats e.g. [5, 6, 7] as healthy. + if seqs[0] != 1 { + return CheckResult::fail( + name, + format!( + "aggregate {}: first event starts at seq {} instead of 1 (log truncated at head)", + agg.to_usize(), + seqs[0] + ), + ); + } + match detect_sequence_gaps(seqs) { + SequenceCheck::Ok { first, last, count } => CheckResult::pass( + name, + format!( + "aggregate {}: {count} contiguous event(s), seq {first}..={last}", + agg.to_usize() + ), + ), + SequenceCheck::Gaps(gaps) => CheckResult::fail( + name, + format!( + "aggregate {}: commit log has {} gap(s) (truncated/corrupt): {}", + agg.to_usize(), + gaps.len(), + gaps.iter() + .map(|(a, b)| format!("{a}->{b}")) + .collect::>() + .join(", ") + ), + ), + SequenceCheck::NonMonotonic => CheckResult::fail( + name, + format!( + "aggregate {}: event sequence numbers are not strictly increasing (corrupt)", + agg.to_usize() + ), + ), + } +} + +/// Verify the persisted snapshot cursor does not point past the last event in +/// the log. A cursor ahead of the log means the snapshot survived but the commit +/// log behind it was truncated — replay would silently lose state. +fn check_cursor_consistency(agg: AggregateId, cursor: u64, seqs: &[u64]) -> CheckResult { + let name = "snapshot-cursor"; + let max_seq = seqs.iter().copied().max(); + match max_seq { + None => { + if cursor == 0 { + CheckResult::pass( + name, + format!("aggregate {}: empty + cursor 0", agg.to_usize()), + ) + } else { + CheckResult::fail( + name, + format!( + "aggregate {}: snapshot cursor {cursor} but the commit log is empty \ + (log truncated behind snapshot)", + agg.to_usize() + ), + ) + } + } + Some(max) if cursor > max => CheckResult::fail( + name, + format!( + "aggregate {}: snapshot cursor {cursor} is ahead of last event seq {max} \ + (log truncated behind snapshot)", + agg.to_usize() + ), + ), + Some(max) => CheckResult::pass( + name, + format!( + "aggregate {}: cursor {cursor} <= last event seq {max}", + agg.to_usize() + ), + ), + } +} + +/// Cross-check the persisted open committees against terminal events in the log. +async fn check_open_loops( + repositories: &Repositories, + terminal_keys: &HashSet, +) -> Result { + let name = "open-loops"; + let node_state: HashMap = + repositories.node_state().read().await?.unwrap_or_default(); + + let open = NodeRegistry::open_committees(&node_state); + let orphaned = find_orphaned_committees(&open, terminal_keys); + + if open.is_empty() { + return Ok(CheckResult::pass( + name, + "no committees holding active-job slots", + )); + } + if orphaned.is_empty() { + return Ok(CheckResult::pass( + name, + format!( + "{} committee(s) in flight; none have a terminal event in the log", + open.len() + ), + )); + } + Ok(CheckResult::fail( + name, + format!( + "{} orphaned committee(s) still hold active-job slots despite a terminal event in \ + the log (tickets stuck). Affected E3 committee keys: {}. A restart re-applies the \ + terminal events and releases these slots.", + orphaned.len(), + orphaned.join(", ") + ), + )) +} + +/// Outcome of a pure sequence-integrity check. +#[derive(Debug, PartialEq, Eq)] +enum SequenceCheck { + Ok { + first: u64, + last: u64, + count: usize, + }, + /// One or more `(before, after)` gaps where `after > before + 1`. + Gaps(Vec<(u64, u64)>), + /// Sequence numbers did not strictly increase. + NonMonotonic, +} + +/// Pure check that `seqs` (in event order) are strictly increasing by exactly 1. +fn detect_sequence_gaps(seqs: &[u64]) -> SequenceCheck { + let first = match seqs.first() { + Some(f) => *f, + None => { + return SequenceCheck::Ok { + first: 0, + last: 0, + count: 0, + } + } + }; + let mut gaps = Vec::new(); + for w in seqs.windows(2) { + let (a, b) = (w[0], w[1]); + if b <= a { + return SequenceCheck::NonMonotonic; + } + if b != a + 1 { + gaps.push((a, b)); + } + } + if gaps.is_empty() { + SequenceCheck::Ok { + first, + last: *seqs.last().unwrap(), + count: seqs.len(), + } + } else { + SequenceCheck::Gaps(gaps) + } +} + +/// Pure: open committee keys that also have a terminal event in the log. +fn find_orphaned_committees( + open: &[e3_sortition::OpenCommittee], + terminal_keys: &HashSet, +) -> Vec { + let mut out: Vec = open + .iter() + .filter(|c| terminal_keys.contains(&c.committee_key)) + .map(|c| c.committee_key.clone()) + .collect(); + out.sort(); + out.dedup(); + out +} + +/// Collect the committee key of every terminal lifecycle event in `events`. +/// +/// Mirrors the terminal-release dispatch in the `Sortition` actor: an E3 is +/// terminal on `PlaintextOutputPublished`, `E3Failed`, or `E3StageChanged` to +/// `Complete`/`Failed`. +fn collect_terminal_keys(events: &[EnclaveEvent], out: &mut HashSet) { + for event in events { + match event.get_data() { + EnclaveEventData::PlaintextOutputPublished(d) => { + out.insert(committee_key(&d.e3_id)); + } + EnclaveEventData::E3Failed(d) => { + out.insert(committee_key(&d.e3_id)); + } + EnclaveEventData::E3StageChanged(d) + if matches!(d.new_stage, E3Stage::Complete | E3Stage::Failed) => + { + out.insert(committee_key(&d.e3_id)); + } + _ => {} + } + } +} + +/// Read every event for a single aggregate from sequence 0, paginating until the +/// store is exhausted. +async fn read_all_events( + eventstore: &EventStoreReader, + aggregate: AggregateId, +) -> Result> { + const PAGE: u64 = 1024; + let mut all: Vec = Vec::new(); + let mut since: u64 = 0; + loop { + let (addr, rx) = actix_toolbox::oneshot::(); + let msg = EventStoreQueryBy::::new( + CorrelationId::new(), + HashMap::from([(aggregate, since)]), + addr, + ) + .with_limit(PAGE); + eventstore + .seq() + .try_send(msg) + .map_err(|e| anyhow!("event store query failed: {e}"))?; + let page = rx.await?.into_events(); + if page.is_empty() { + break; + } + let max_seq = page.iter().map(|e| e.seq()).max().unwrap_or(since); + all.extend(page); + // Advance past the highest sequence we just read. If the store could not + // advance (defensive), stop to avoid an infinite loop. + let next = max_seq.saturating_add(1); + if next <= since { + break; + } + since = next; + } + // The store may interleave aggregates depending on the query; keep only this + // aggregate's events and order them by sequence for the integrity check. Do not + // deduplicate by seq — repeated sequence numbers are exactly the corruption the + // integrity check must surface (pagination advances strictly past the last seq, + // so it never produces legitimate duplicates). + all.retain(|e| e.aggregate_id() == aggregate); + all.sort_by_key(|e| e.seq()); + Ok(all) +} + +/// A non-empty `BTreeMap` alias kept for readability in tests. +#[allow(dead_code)] +type SeqMap = BTreeMap; + +#[cfg(test)] +mod tests { + use super::*; + use e3_sortition::OpenCommittee; + + #[test] + fn sequence_ok_when_contiguous() { + assert_eq!( + detect_sequence_gaps(&[0, 1, 2, 3]), + SequenceCheck::Ok { + first: 0, + last: 3, + count: 4 + } + ); + } + + #[test] + fn sequence_ok_when_empty() { + assert_eq!( + detect_sequence_gaps(&[]), + SequenceCheck::Ok { + first: 0, + last: 0, + count: 0 + } + ); + } + + #[test] + fn sequence_detects_gap() { + assert_eq!( + detect_sequence_gaps(&[0, 1, 4, 5]), + SequenceCheck::Gaps(vec![(1, 4)]) + ); + } + + #[test] + fn sequence_detects_multiple_gaps() { + assert_eq!( + detect_sequence_gaps(&[2, 5, 6, 9]), + SequenceCheck::Gaps(vec![(2, 5), (6, 9)]) + ); + } + + #[test] + fn sequence_detects_non_monotonic() { + assert_eq!( + detect_sequence_gaps(&[0, 1, 1, 2]), + SequenceCheck::NonMonotonic + ); + assert_eq!( + detect_sequence_gaps(&[3, 2, 1]), + SequenceCheck::NonMonotonic + ); + } + + fn open(key: &str) -> OpenCommittee { + OpenCommittee { + chain_id: 1, + committee_key: key.to_string(), + members: vec!["0xabc".to_string()], + } + } + + #[test] + fn orphans_are_open_committees_with_terminal_events() { + let open_set = vec![open("1:5"), open("1:6"), open("1:7")]; + let mut terminal = HashSet::new(); + terminal.insert("1:5".to_string()); // finished but still open -> orphan + terminal.insert("1:9".to_string()); // finished and not open -> fine + + let orphans = find_orphaned_committees(&open_set, &terminal); + assert_eq!(orphans, vec!["1:5".to_string()]); + } + + #[test] + fn no_orphans_when_no_terminal_overlap() { + let open_set = vec![open("1:5"), open("1:6")]; + let terminal = HashSet::new(); + assert!(find_orphaned_committees(&open_set, &terminal).is_empty()); + } + + #[test] + fn cursor_ahead_of_log_fails() { + let r = check_cursor_consistency(AggregateId::new(1), 10, &[0, 1, 2]); + assert_eq!(r.severity, Severity::Fail); + } + + #[test] + fn cursor_within_log_passes() { + let r = check_cursor_consistency(AggregateId::new(1), 2, &[0, 1, 2, 3]); + assert_eq!(r.severity, Severity::Pass); + } + + #[test] + fn cursor_nonzero_on_empty_log_fails() { + let r = check_cursor_consistency(AggregateId::new(1), 5, &[]); + assert_eq!(r.severity, Severity::Fail); + } + + #[test] + fn report_verdict_reflects_severities() { + let mut report = ValidationReport::default(); + report.push(CheckResult::pass("a", "ok")); + assert!(!report.has_failure()); + assert!(!report.has_warning()); + + report.push(CheckResult::warn("b", "hmm")); + assert!(report.has_warning()); + assert!(!report.has_failure()); + + report.push(CheckResult::fail("c", "bad")); + assert!(report.has_failure()); + assert!(report.render().contains("VALIDATION FAILED")); + } +} diff --git a/crates/events/src/committee.rs b/crates/events/src/committee.rs index 76096cfdd..b65a064ec 100644 --- a/crates/events/src/committee.rs +++ b/crates/events/src/committee.rs @@ -86,6 +86,55 @@ impl Committee { .map(|addr| addr.eq_ignore_ascii_case(my_addr)) .unwrap_or(false) } + + /// The party_id of the active aggregator given a set of `skipped` party_ids + /// (the union of on-chain-expelled members and any locally presumed-down + /// members during failover). Returns the lowest party_id not in `skipped`, + /// or `None` if every member is skipped. + /// + /// Because the committee order is deterministic for all nodes and `skipped` + /// is derived from shared signals, every node computes the same aggregator, + /// so failover needs no leader-election protocol. + pub fn active_aggregator_party_id(&self, skipped: &[u64]) -> Option { + (0..self.members.len() as u64).find(|party_id| !skipped.contains(party_id)) + } + + /// Whether `my_addr` is the active aggregator once both on-chain-expelled and + /// locally-presumed-unresponsive members are skipped. This is the failover + /// generalisation of [`Self::is_active_aggregator`]: passing an empty + /// `unresponsive` slice reproduces the original behaviour exactly. + pub fn effective_aggregator( + &self, + my_addr: &str, + expelled: &[u64], + unresponsive: &[u64], + ) -> bool { + let skipped: Vec = expelled + .iter() + .chain(unresponsive.iter()) + .copied() + .collect(); + self.active_aggregator_party_id(&skipped) + .and_then(|party_id| self.members.get(party_id as usize)) + .map(|addr| addr.eq_ignore_ascii_case(my_addr)) + .unwrap_or(false) + } + + /// Ordered standby list `[(party_id, address), ...]` of members eligible to + /// take over aggregation, in promotion order, excluding `skipped` members. + /// The first entry is the current active aggregator; subsequent entries are + /// the standbys promoted in turn as predecessors are presumed down. + pub fn aggregator_standbys(&self, skipped: &[u64], limit: usize) -> Vec<(u64, String)> { + (0..self.members.len() as u64) + .filter(|party_id| !skipped.contains(party_id)) + .take(limit) + .filter_map(|party_id| { + self.members + .get(party_id as usize) + .map(|addr| (party_id, addr.clone())) + }) + .collect() + } } #[cfg(test)] @@ -106,4 +155,71 @@ mod tests { assert!(committee.is_active_aggregator("0xaaa", &[0, 1])); assert!(!committee.is_active_aggregator("0xaaa", &[0, 1, 2])); } + + fn committee() -> Committee { + Committee::new(vec![ + "0xbbb".to_string(), + "0xccc".to_string(), + "0xaaa".to_string(), + ]) + } + + #[test] + fn effective_aggregator_promotes_next_standby_when_primary_unresponsive() { + let c = committee(); + assert!(c.effective_aggregator("0xbbb", &[], &[])); + assert!(!c.effective_aggregator("0xccc", &[], &[])); + + // Primary (party 0) presumed unresponsive: party 1 (0xccc) takes over. + assert!(!c.effective_aggregator("0xbbb", &[], &[0])); + assert!(c.effective_aggregator("0xccc", &[], &[0])); + + // Expelled and unresponsive combine: 0 expelled, 1 unresponsive -> party 2. + assert!(c.effective_aggregator("0xaaa", &[0], &[1])); + } + + #[test] + fn effective_aggregator_matches_legacy_when_no_unresponsive() { + let c = committee(); + for (addr, expelled) in [ + ("0xbbb", &[][..]), + ("0xccc", &[0][..]), + ("0xaaa", &[0, 1][..]), + ] { + assert_eq!( + c.effective_aggregator(addr, expelled, &[]), + c.is_active_aggregator(addr, expelled), + ); + } + } + + #[test] + fn active_aggregator_party_id_skips_in_order() { + let c = committee(); + assert_eq!(c.active_aggregator_party_id(&[]), Some(0)); + assert_eq!(c.active_aggregator_party_id(&[0]), Some(1)); + assert_eq!(c.active_aggregator_party_id(&[0, 1]), Some(2)); + assert_eq!(c.active_aggregator_party_id(&[0, 1, 2]), None); + } + + #[test] + fn aggregator_standbys_are_ordered_and_filtered() { + let c = committee(); + assert_eq!( + c.aggregator_standbys(&[], 10), + vec![ + (0, "0xbbb".to_string()), + (1, "0xccc".to_string()), + (2, "0xaaa".to_string()), + ] + ); + assert_eq!( + c.aggregator_standbys(&[0], 10), + vec![(1, "0xccc".to_string()), (2, "0xaaa".to_string())] + ); + assert_eq!( + c.aggregator_standbys(&[], 1), + vec![(0, "0xbbb".to_string())] + ); + } } diff --git a/crates/events/src/enclave_event/accusation_quorum_reached.rs b/crates/events/src/enclave_event/accusation_quorum_reached.rs index 08a6717f8..8d158175e 100644 --- a/crates/events/src/enclave_event/accusation_quorum_reached.rs +++ b/crates/events/src/enclave_event/accusation_quorum_reached.rs @@ -15,14 +15,6 @@ use std::fmt::{self, Display}; pub enum AccusationOutcome { /// >= M nodes agree the proof is bad → slash the accused. AccusedFaulted, - /// **Deprecated.** Previously emitted when `votes_against >= M`. The - /// `AccusationVote` gossip wire no longer carries disagreement - /// signatures (a peer who finds the proof passes simply stays silent), - /// so this outcome is no longer produced by the off-chain quorum - /// protocol. Kept in the enum for serialized-event backwards - /// compatibility — downstream consumers should treat any historic - /// `AccuserLied` event the same as `Inconclusive`. - AccuserLied, /// data_hashes differ between voters → accused sent different data to different nodes. Equivocation, /// Vote timeout expired or not enough votes → proceed with E3 timeout. @@ -33,7 +25,6 @@ impl Display for AccusationOutcome { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { AccusationOutcome::AccusedFaulted => write!(f, "AccusedFaulted"), - AccusationOutcome::AccuserLied => write!(f, "AccuserLied"), AccusationOutcome::Equivocation => write!(f, "Equivocation"), AccusationOutcome::Inconclusive => write!(f, "Inconclusive"), } diff --git a/crates/events/src/sync.rs b/crates/events/src/sync.rs index 55ff42285..c2dd55bad 100644 --- a/crates/events/src/sync.rs +++ b/crates/events/src/sync.rs @@ -14,15 +14,33 @@ type DeployBlock = u64; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct EvmEventConfigChain { deploy_block: DeployBlock, + /// Number of block confirmations required before a log is ingested. `0` + /// preserves reading to the chain head; a positive value gates ingestion to + /// make the append-only log reorg-safe by construction. + confirmations: u64, } impl EvmEventConfigChain { pub fn new(deploy_block: DeployBlock) -> Self { - Self { deploy_block } + Self { + deploy_block, + confirmations: 0, + } + } + + /// Builder: set the required confirmation depth for reorg safety. + pub fn with_confirmations(mut self, confirmations: u64) -> Self { + self.confirmations = confirmations; + self } + pub fn deploy_block(&self) -> u64 { self.deploy_block } + + pub fn confirmations(&self) -> u64 { + self.confirmations + } } /// Configuration value object for starting the evm reader for all chains diff --git a/crates/evm/src/actors/evm_read_interface.rs b/crates/evm/src/actors/evm_read_interface.rs index 87f6e0315..0d0eb4a0f 100644 --- a/crates/evm/src/actors/evm_read_interface.rs +++ b/crates/evm/src/actors/evm_read_interface.rs @@ -50,6 +50,10 @@ pub struct Filters { historical: Filter, current: Filter, start_block: u64, + /// Number of confirmations required before a log is ingested. `0` (default) + /// reads to the chain head exactly as before; a positive value clamps the + /// historical and backfill heads to make ingestion reorg-safe. + confirmations: u64, } impl Filters { @@ -65,9 +69,21 @@ impl Filters { historical, current, start_block, + confirmations: 0, } } + /// Builder: require `confirmations` block confirmations before ingestion. + pub fn with_confirmations(mut self, confirmations: u64) -> Self { + self.confirmations = confirmations; + self + } + + /// The configured confirmation depth (0 = read to head). + pub fn confirmations(&self) -> u64 { + self.confirmations + } + pub fn from_routing_table(table: &HashMap, start_block: u64) -> Self { let addresses: Vec

= table.keys().cloned().collect(); Self::new(addresses, start_block) @@ -282,7 +298,7 @@ async fn stream_from_evm( // ── Phase 1: Historical sync (must succeed, fatal on failure) ── let latest_block = match provider.provider().get_block_number().await { - Ok(bn) => bn, + Ok(bn) => crate::domain::reorg::confirmed_head(bn, filters.confirmations()), Err(e) => { error!(chain_id, error = %e, "Failed to get latest block number"); bus.err(EType::Evm, anyhow!(e)); @@ -335,6 +351,7 @@ async fn stream_from_evm( &next, &mut timestamp_tracker, &mut last_block, + filters.confirmations(), ) .await { diff --git a/crates/evm/src/actors/log_fetcher.rs b/crates/evm/src/actors/log_fetcher.rs index 871cdb66e..38a2c20f2 100644 --- a/crates/evm/src/actors/log_fetcher.rs +++ b/crates/evm/src/actors/log_fetcher.rs @@ -151,11 +151,15 @@ pub(crate) async fn backfill_to_head( next: &EvmEventProcessor, timestamp_tracker: &mut TimestampTracker, last_block: &mut u64, + confirmations: u64, ) -> Result<(), anyhow::Error> { - let current_head = provider + let raw_head = provider .fetch_block_number() .await .map_err(|e| anyhow!("Failed to get block number for gap backfill: {}", e))?; + // Clamp to the confirmed head so we never ingest logs that a reorg of depth + // `confirmations` could still orphan. `confirmations == 0` is a no-op. + let current_head = crate::domain::reorg::confirmed_head(raw_head, confirmations); let gap_start = *last_block + 1; if gap_start > current_head { @@ -424,7 +428,7 @@ mod tests { let filter = Filter::new(); let mut last_block = 100u64; - let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block, 0).await; assert!(result.is_ok()); assert_eq!(last_block, 100); @@ -440,7 +444,7 @@ mod tests { let filter = Filter::new(); let mut last_block = 100u64; - let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block, 0).await; assert!(result.is_ok()); assert_eq!(last_block, 200); @@ -478,7 +482,7 @@ mod tests { let filter = Filter::new(); let mut last_block = 100u64; - let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block, 0).await; // Should fail because chunk 3 exhausted retries assert!(result.is_err()); @@ -488,8 +492,25 @@ mod tests { // On retry: gap_start = 20101, head still 25000 → single chunk succeeds mock.push_logs(vec![make_test_log(22000)]); - let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block).await; + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block, 0).await; assert!(result.is_ok()); assert_eq!(last_block, 25000); } + + #[actix::test] + async fn test_backfill_clamps_to_confirmed_head() { + // Head at 200, but require 12 confirmations => only ingest up to 188. + let mock = MockLogProvider::new(200); + mock.push_logs(vec![make_test_log(150)]); + let (next, _rx) = setup_collector(); + let mut ts = TimestampTracker::new(); + let filter = Filter::new(); + let mut last_block = 100u64; + + let result = backfill_to_head(&mock, &filter, 1, &next, &mut ts, &mut last_block, 12).await; + + assert!(result.is_ok()); + // Advanced only to the confirmed head, not the raw head of 200. + assert_eq!(last_block, 188); + } } diff --git a/crates/evm/src/domain/mod.rs b/crates/evm/src/domain/mod.rs index 006b1d97f..aaad362f6 100644 --- a/crates/evm/src/domain/mod.rs +++ b/crates/evm/src/domain/mod.rs @@ -22,6 +22,7 @@ pub(crate) mod enclave_events; pub(crate) mod historical_order_fixer; pub(crate) mod log_timestamp; pub(crate) mod plaintext_publication; +pub(crate) mod reorg; pub(crate) mod slash_submission; pub(crate) mod slashing_events; diff --git a/crates/evm/src/domain/reorg.rs b/crates/evm/src/domain/reorg.rs new file mode 100644 index 000000000..87e3cef28 --- /dev/null +++ b/crates/evm/src/domain/reorg.rs @@ -0,0 +1,39 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +//! Pure reorg-safety primitive for EVM ingestion. +//! +//! The local event log is append-only and has no truncation primitive, so once +//! a chain log is promoted to an `EnclaveEvent` and folded into state it cannot +//! be cleanly un-applied. The only sound defence against a chain reorg is +//! therefore *prevention*: do not ingest a log until it is buried under enough +//! confirmations that a reorg of that depth is infeasible. +//! +//! [`confirmed_head`] is that gate: it clamps how far the reader may advance. It +//! is clock-free and provider-free so it is fully unit-tested; the actors own the +//! wall clock and provider I/O and feed values in. + +/// The highest block height that is safe to ingest given the current chain head +/// and the required confirmation depth. Returns `chain_head` when +/// `confirmations == 0` (no gating), and saturates at 0 for shallow chains. +/// +/// A log at height `h` is safe once `h <= chain_head - confirmations`, i.e. once +/// at least `confirmations` blocks have been built on top of it. +pub fn confirmed_head(chain_head: u64, confirmations: u64) -> u64 { + chain_head.saturating_sub(confirmations) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn confirmed_head_clamps_and_saturates() { + assert_eq!(confirmed_head(100, 0), 100); + assert_eq!(confirmed_head(100, 12), 88); + assert_eq!(confirmed_head(5, 12), 0); + } +} diff --git a/crates/keyshare/src/actors/threshold_keyshare.rs b/crates/keyshare/src/actors/threshold_keyshare.rs index a346548d9..d39adcf63 100644 --- a/crates/keyshare/src/actors/threshold_keyshare.rs +++ b/crates/keyshare/src/actors/threshold_keyshare.rs @@ -1519,9 +1519,17 @@ impl ThresholdKeyshare { party_id, signed_pk_generation_proof: signed_pk_generation_proof.clone(), }, - ec, + ec.clone(), )?; + // Record that publishing was authorized and has occurred, so resume-after-crash + // may safely re-publish (idempotent at the aggregator) without ever emitting a + // keyshare for a state that had not yet passed C4 honest-set filtering. + self.state.try_mutate(&ec, |mut s| { + s.keyshare_published = true; + Ok(s) + })?; + Ok(()) } @@ -1573,6 +1581,83 @@ impl ThresholdKeyshare { Ok(()) } + /// (Re)issue the `CalculateDecryptionShare` compute request from the current + /// `Decrypting` state. Factored out of `handle_ciphertext_output_published` so the + /// boot-time resume path can re-drive the decryption-share computation idempotently + /// (the resulting `DecryptionshareCreated` is deduped by `party_id` at the aggregator). + fn issue_decryption_share_request(&self, ec: EventContext) -> Result<()> { + let state = self.state.try_get()?; + let e3_id = state.get_e3_id(); + let decrypting: Decrypting = state.clone().try_into()?; + let trbfv_config = state.get_trbfv_config(); + let event = ComputeRequest::trbfv( + TrBFVRequest::CalculateDecryptionShare(CalculateDecryptionShareRequest { + name: format!("party_id({})", state.party_id), + ciphertexts: decrypting.ciphertext_output, + sk_poly_sum: decrypting.sk_poly_sum, + es_poly_sum: decrypting.es_poly_sum, + trbfv_config, + }), + CorrelationId::new(), + e3_id.clone(), + ); + self.bus.publish(event, ec)?; + Ok(()) + } + + /// Re-drive this node's own in-flight DKG/decryption work after a crash or restart. + /// + /// Invoked when `EffectsEnabled` is broadcast at the end of boot sync, *after* this + /// actor has been hydrated from its persisted state. The dangerous, value-bearing + /// concern with re-driving is double-emission; this is safe here because every output + /// re-emitted below is deduplicated downstream by `party_id`: + /// * `KeyshareCreated` — `PublicKeyAggregation::add_keyshare` ignores a `party_id` + /// that already submitted (idempotent), and + /// * `DecryptionshareCreated` — threshold plaintext aggregation keys shares by + /// `party_id` (re-insert overwrites with the identical deterministic share). + /// + /// Only states where the local result is already determined are re-driven. Earlier + /// phases depend on peer gossip that cannot be reconstructed locally and are surfaced + /// (non-destructively) by `enclave node validate` instead of being force-re-driven. + fn resume_in_flight_work(&mut self, ec: EventContext) -> Result<()> { + let Some(state) = self.state.get() else { + return Ok(()); + }; + match &state.state { + // We have produced our public-key share but may have crashed before (or while) + // publishing KeyshareCreated. Re-publishing is idempotent at the aggregator, but + // ReadyForDecryption is entered *before* C4 honest-set verification authorizes the + // publish, so only re-drive when a prior authorized publish was recorded. An + // un-published ReadyForDecryption is a loose end surfaced by `enclave node validate`. + KeyshareState::ReadyForDecryption(_) if state.keyshare_published => { + info!( + e3_id = %state.e3_id, + "Resuming in-flight work: re-publishing KeyshareCreated" + ); + self.publish_keyshare_created(ec)?; + } + // The ciphertext to decrypt has arrived. Re-publish our keyshare (in case the + // crash happened before it propagated) and re-issue the decryption-share + // computation so a DecryptionshareCreated is (re)produced. + KeyshareState::Decrypting(_) => { + info!( + e3_id = %state.e3_id, + "Resuming in-flight work: re-publishing KeyshareCreated and re-issuing decryption-share request" + ); + self.publish_keyshare_created(ec.clone())?; + self.issue_decryption_share_request(ec)?; + } + other => { + trace!( + e3_id = %state.e3_id, + state = %other.variant_name(), + "No locally re-drivable work on resume; loose ends are surfaced by `enclave node validate`" + ); + } + } + Ok(()) + } + /// CalculateDecryptionShareResponse — publish ShareDecryptionProofPending /// so ProofRequestActor generates and signs C6 proofs. pub fn handle_calculate_decryption_share_response( @@ -1799,6 +1884,13 @@ impl Handler for ThresholdKeyshare { EnclaveEventData::CommitteeMemberExpelled(data) => { self.handle_committee_member_expelled(data, ec); } + EnclaveEventData::EffectsEnabled(_) => { + // Broadcast once at the end of boot sync. Re-drive any of this node's own + // in-flight work that a crash may have interrupted (idempotent downstream). + if let Err(err) = self.resume_in_flight_work(ec) { + warn!("resume_in_flight_work failed: {err}"); + } + } _ => (), } } diff --git a/crates/keyshare/src/domain/keyshare_state.rs b/crates/keyshare/src/domain/keyshare_state.rs index d28650da6..8c1119703 100644 --- a/crates/keyshare/src/domain/keyshare_state.rs +++ b/crates/keyshare/src/domain/keyshare_state.rs @@ -196,6 +196,12 @@ pub struct ThresholdKeyshareState { pub honest_parties: Option>, pub dkg_started_at_unix_secs: Option, pub proof_aggregation_enabled: bool, + /// Set once `KeyshareCreated` has actually been published from an authorized + /// path (after C4 honest-set verification, the no-C4-proofs path, or the + /// sole-honest fast path). `ReadyForDecryption` is entered *before* that + /// authorization, so resume-after-crash must only re-publish when this is set; + /// otherwise it could emit a keyshare that never passed C4 filtering. + pub keyshare_published: bool, } impl ThresholdKeyshareState { @@ -223,6 +229,7 @@ impl ThresholdKeyshareState { honest_parties: None, dkg_started_at_unix_secs: Some(now_unix_secs()), proof_aggregation_enabled, + keyshare_published: false, } } diff --git a/crates/request/src/domain/routing.rs b/crates/request/src/domain/routing.rs index 1ac66f2b8..89bb7bd9a 100644 --- a/crates/request/src/domain/routing.rs +++ b/crates/request/src/domain/routing.rs @@ -46,8 +46,16 @@ pub struct RequestRouter; impl RequestRouter { /// Decide how an incoming event should be routed given the set of completed requests. pub fn route(msg: &EnclaveEvent, completed: &HashSet) -> RoutingDecision { - // If we are shutting down then broadcast to every context and bail on anything else. - if let EnclaveEventData::Shutdown(_) = msg.get_data() { + // Broadcast non-E3-scoped lifecycle signals to every active context: + // * `Shutdown` so children can tear themselves down, and + // * `EffectsEnabled` so a hydrated request can re-drive its own in-flight work + // once side effects are switched on at the end of boot sync. + // Both carry no `e3_id`, so without this they would be `Ignore`d and never reach the + // per-E3 child actors. + if matches!( + msg.get_data(), + EnclaveEventData::Shutdown(_) | EnclaveEventData::EffectsEnabled(_) + ) { return RoutingDecision::Broadcast; } @@ -118,6 +126,17 @@ mod tests { ); } + #[test] + fn effects_enabled_broadcasts() { + // EffectsEnabled has no e3_id but must reach every hydrated context so each can + // re-drive its own in-flight work after a restart. + let msg = from_data(e3_events::EffectsEnabled::new()); + assert_eq!( + RequestRouter::route(&msg, &HashSet::new()), + RoutingDecision::Broadcast + ); + } + #[test] fn event_without_e3_id_is_ignored() { let msg = EnclaveEvent::::test_event("no-id") diff --git a/crates/sortition/src/actors/ciphernode_selector.rs b/crates/sortition/src/actors/ciphernode_selector.rs index a0f4839ff..6aae1f1e8 100644 --- a/crates/sortition/src/actors/ciphernode_selector.rs +++ b/crates/sortition/src/actors/ciphernode_selector.rs @@ -43,6 +43,11 @@ pub struct CiphernodeSelectorState { pub e3_cache: HashMap, pub committees: HashMap, pub expelled: HashMap>, + /// Party ids the local node presumes unresponsive for failover purposes. + /// Treated identically to `expelled` when computing the active aggregator, + /// but populated by local liveness signals rather than on-chain expulsion. + /// Empty by default, so behaviour is unchanged unless a standby is promoted. + pub unresponsive: HashMap>, pub is_aggregator: HashMap, } @@ -114,7 +119,8 @@ impl CiphernodeSelector { .cloned() .ok_or_else(|| anyhow::anyhow!("Missing finalized committee for {}", e3_id))?; let expelled = state.expelled.get(e3_id).cloned().unwrap_or_default(); - let is_aggregator = committee.is_active_aggregator(&self.address, &expelled); + let unresponsive = state.unresponsive.get(e3_id).cloned().unwrap_or_default(); + let is_aggregator = committee.effective_aggregator(&self.address, &expelled, &unresponsive); let previous = state.is_aggregator.get(e3_id).copied(); self.state.try_mutate(ec, |mut selector_state| { @@ -246,6 +252,7 @@ impl Handler> for CiphernodeSelector { state.e3_cache.remove(&msg.e3_id); state.committees.remove(&msg.e3_id); state.expelled.remove(&msg.e3_id); + state.unresponsive.remove(&msg.e3_id); state.is_aggregator.remove(&msg.e3_id); Ok(state) }) diff --git a/crates/sortition/src/domain/node_registry.rs b/crates/sortition/src/domain/node_registry.rs index 3c81bfc2e..e4dc40b9b 100644 --- a/crates/sortition/src/domain/node_registry.rs +++ b/crates/sortition/src/domain/node_registry.rs @@ -241,6 +241,43 @@ impl NodeRegistry { "E3 completed/failed - decremented active jobs for committee" ); } + + /// Enumerate every committee that still holds active jobs (i.e. has not been + /// released by a completion/failure event). + /// + /// These are the node's "open loops": E3s it is still accounted as busy on. + /// On a clean shutdown/restart this set should only contain genuinely + /// in-flight E3s. If it contains an E3 that has already reached a terminal + /// stage on-chain, the corresponding active-job slot is orphaned and should + /// be released (see `enclave node validate`). The returned `committee_key` + /// matches [`committee_key`] so callers can correlate it with an `E3id`. + pub fn open_committees(store: &HashMap) -> Vec { + let mut out = Vec::new(); + for (chain_id, chain_state) in store { + for (key, members) in &chain_state.e3_committees { + out.push(OpenCommittee { + chain_id: *chain_id, + committee_key: key.clone(), + members: members.clone(), + }); + } + } + out + } +} + +/// A committee that still holds active-job slots in a [`NodeStateStore`]. +/// +/// Produced by [`NodeRegistry::open_committees`]. `committee_key` is the same +/// `"{chain_id}:{e3_id}"` string used internally (see [`committee_key`]). +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct OpenCommittee { + /// Chain the committee belongs to. + pub chain_id: u64, + /// Canonical committee key (`"{chain_id}:{e3_id}"`). + pub committee_key: String, + /// Addresses of the committee members holding the open slot. + pub members: Vec, } #[cfg(test)] @@ -333,4 +370,31 @@ mod tests { assert_eq!(with_tickets[0].0, "active"); assert_eq!(with_tickets[0].1, 3); } + + #[test] + fn open_committees_lists_only_unreleased() { + let mut store = HashMap::new(); + let a = e3(1, "1"); + let b = e3(1, "2"); + NodeRegistry::record_committee_published(&mut store, &a, &["0xabc".into()]); + NodeRegistry::record_committee_published(&mut store, &b, &["0xabc".into(), "0xdef".into()]); + + let open = NodeRegistry::open_committees(&store); + assert_eq!(open.len(), 2); + + // Releasing one removes it from the open set. + NodeRegistry::release_committee_jobs(&mut store, &a, "test"); + let open = NodeRegistry::open_committees(&store); + assert_eq!(open.len(), 1); + assert_eq!(open[0].committee_key, committee_key(&b)); + assert_eq!(open[0].chain_id, 1); + assert_eq!( + open[0].members, + vec!["0xabc".to_string(), "0xdef".to_string()] + ); + + // Fully drained -> empty. + NodeRegistry::release_committee_jobs(&mut store, &b, "test"); + assert!(NodeRegistry::open_committees(&store).is_empty()); + } } diff --git a/crates/sync/src/actors/sync.rs b/crates/sync/src/actors/sync.rs index 779b2329b..4e646aa8e 100644 --- a/crates/sync/src/actors/sync.rs +++ b/crates/sync/src/actors/sync.rs @@ -86,8 +86,31 @@ pub async fn sync( } info!("Events replayed."); - // TODO: Detect open loops - incase we crashed in the middle of a request we need to play the - // request event again once effects are on + // Loose ends after a crash: + // + // Terminal E3 work that *completed while this node was down* is recovered by the + // historical EVM re-fetch in step 5 below: the terminal on-chain events + // (PlaintextOutputPublished / E3Failed / committee completion) are re-delivered once + // effects are enabled, which re-drives the Sortition release path and frees any tickets + // the node was still holding. So "an E3 finished while we were offline" needs no special + // handling here — it is reconciled by replaying the canonical chain state. + // + // What is intentionally NOT auto-re-driven *here in sync* is this node's *own* in-flight + // request work by replaying the originating request events. Blindly re-publishing the + // originating request event is a no-op: the event bus dedups by EventId (payload hash), so + // the replayed event is dropped. Forcibly minting a fresh EventId to force re-execution is + // unsafe on a value-bearing protocol (it can double-emit or race the canonical chain state) + // and is therefore deliberately left out of the sync path. + // + // Note: this is *not* a global absence of restart recovery. Actors that hold determined, + // idempotent in-flight results re-drive themselves when `EffectsEnabled` is broadcast at the + // end of this sync (e.g. `ThresholdKeyshare::resume_in_flight_work` re-publishes a computed + // keyshare / decryption share). What sync deliberately avoids is replaying *request* events. + // + // Detection of loose ends that cannot be locally re-driven is exposed offline and + // non-destructively via `enclave node validate`, which cross-checks the persisted committee + // slots against terminal events in the log and reports orphaned tickets. See + // `crates/entrypoint/src/validate.rs`. // 5. Load the historical evm events to memory from all chains info!("Loading historical blockchain events..."); diff --git a/crates/tests/tests/integration.rs b/crates/tests/tests/integration.rs index 98a84885d..5d6c3e175 100644 --- a/crates/tests/tests/integration.rs +++ b/crates/tests/tests/integration.rs @@ -1290,6 +1290,7 @@ async fn test_trbfv_actor() -> Result<()> { .map(|a| e3_config::Contract::AddressOnly(a.to_string())), }, finalization_ms: None, + reorg_confirmations: None, chain_id: Some(1), }; diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index d09821b42..cebe48427 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -11,7 +11,6 @@ actix.workspace = true alloy.workspace = true anyhow.workspace = true derivative.workspace = true -e3-committee-hash.workspace = true rand.workspace = true hex.workspace = true regex.workspace = true diff --git a/crates/utils/src/committee_hash.rs b/crates/utils/src/committee_hash.rs deleted file mode 100644 index 7dbdde951..000000000 --- a/crates/utils/src/committee_hash.rs +++ /dev/null @@ -1,8 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -//! Re-export from `e3-committee-hash` for backward compatibility. -pub use e3_committee_hash::*; diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index b00bbbe2c..d76f7b6b5 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -7,7 +7,6 @@ extern crate self as e3_utils; // need this for e3_utils_derive to reference this crate pub mod actix; pub mod alloy; -pub mod committee_hash; pub mod constants; pub mod error; pub mod formatters; diff --git a/crates/zk-helpers/src/circuits/commitments.rs b/crates/zk-helpers/src/circuits/commitments.rs index 41264d195..9383aec0a 100644 --- a/crates/zk-helpers/src/circuits/commitments.rs +++ b/crates/zk-helpers/src/circuits/commitments.rs @@ -692,8 +692,6 @@ mod tests { use fhe::bfv::SecretKey; use fhe::mbfv::PublicKeyShare; use fhe_traits::Serialize; - use rand::Rng; - let preset = BfvPreset::InsecureThreshold512; let (params, _) = build_pair_for_preset(preset).unwrap(); let crp = create_deterministic_crp_from_default_seed(¶ms); diff --git a/crates/zk-helpers/src/circuits/threshold/pk_generation/codegen.rs b/crates/zk-helpers/src/circuits/threshold/pk_generation/codegen.rs index ce6866226..5408c6ec0 100644 --- a/crates/zk-helpers/src/circuits/threshold/pk_generation/codegen.rs +++ b/crates/zk-helpers/src/circuits/threshold/pk_generation/codegen.rs @@ -148,8 +148,6 @@ mod tests { use crate::codegen::write_artifacts; use crate::threshold::pk_generation::computation::{Bits, Bounds}; use crate::threshold::pk_generation::PkGenerationCircuitData; - use crate::CiphernodesCommitteeSize; - use e3_fhe_params::BfvPreset; use tempfile::TempDir;