Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions agent/flow-trace/05_FAILURE_REFUND_SLASHING.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,6 @@ check_quorum(accusation_id):
│ ├─ Multiple data_hashes across ALL votes?
│ │ └─ YES → AccusationOutcome::Equivocation (SLASHABLE)
│ │
│ ├─ Only accuser says bad, others disagree?
│ │ └─ AccusationOutcome::AccuserLied (NOT slashable)
│ │
│ └─ Otherwise → AccusationOutcome::Inconclusive (NOT slashable)
└─ CASE C: Still waiting for more votes
Expand Down
205 changes: 205 additions & 0 deletions crates/aggregator/src/domain/failover.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

//! Aggregator liveness / failover policy.
//!
//! The active aggregator for an E3 is chosen deterministically as the lowest
//! non-skipped party in the (score-ordered) committee
//! ([`e3_events::Committee::active_aggregator_party_id`]). If that node crashes
//! after the committee is finalised but before it publishes the aggregated
//! result on-chain, the round stalls indefinitely: no on-chain expulsion is
//! triggered (the node committed no fault), so nothing demotes it.
//!
//! This module is the pure, testable brain of an automatic failover: it tracks
//! how long the *expected on-chain progress* from the active aggregator has been
//! absent and, once a wall-clock budget elapses, decides to mark the current
//! aggregator as locally unresponsive and promote the next standby. Because the
//! committee order and the skip set are derived from signals every node shares,
//! all honest nodes converge on the same replacement without a leader-election
//! round. A brief overlap (old + new aggregator both publishing) is bounded by
//! the timeout and made harmless by the on-chain publish being single-shot.
//!
//! All decisions here are deterministic given their inputs (no clock access),
//! so the policy is exercised entirely by unit tests; the driving actor owns the
//! wall clock and feeds `elapsed` in.

use std::time::Duration;

/// The progress an aggregator round can be waiting on. Used to scope which
/// absence of progress should arm the failover timer.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AggregatorPhase {
/// Waiting for the public key to be published on-chain (DKG output).
AwaitingPublicKey,
/// Waiting for the plaintext output to be published on-chain (decryption).
AwaitingPlaintext,
/// The round is finished; no aggregator action is pending.
Settled,
}

/// Policy parameters for failover. A single wall-clock budget governs how long
/// the active aggregator may be silent before the next standby is promoted.
#[derive(Debug, Clone, Copy)]
pub struct FailoverPolicy {
/// How long the expected on-chain progress may be absent before the active
/// aggregator is presumed down and the next standby promoted.
timeout: Duration,
}

impl FailoverPolicy {
pub fn new(timeout: Duration) -> Self {
Self { timeout }
}

pub fn timeout(&self) -> Duration {
self.timeout
}
}

/// The outcome of a liveness evaluation for one E3.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FailoverDecision {
/// The round is settled or making progress within budget; do nothing.
Hold,
/// The active aggregator (`demote`, a party_id) has been silent past the
/// budget. Add it to the locally-presumed-unresponsive set and promote the
/// next standby (`promote_to`, with its address).
Promote {
demote: u64,
promote_to: u64,
new_addr: String,
},
/// Every standby has been exhausted; the round cannot make progress and must
/// be failed by the caller.
Exhausted { demote: u64 },
}

/// Decide whether to fail over for a single E3.
///
/// Inputs:
/// - `phase`: what on-chain progress is pending (Settled => always `Hold`).
/// - `elapsed`: time since the active aggregator was (re)assigned or last made
/// observable progress.
/// - `active`: the current active aggregator party_id.
/// - `standbys`: ordered `[(party_id, addr), ...]` of non-skipped members in
/// promotion order, as produced by
/// [`e3_events::Committee::aggregator_standbys`]. `active` is expected to be
/// `standbys[0].0` when present.
///
/// Deterministic and clock-free.
pub fn decide_failover(
policy: &FailoverPolicy,
phase: AggregatorPhase,
elapsed: Duration,
active: u64,
standbys: &[(u64, String)],
) -> FailoverDecision {
if phase == AggregatorPhase::Settled {
return FailoverDecision::Hold;
}
if elapsed < policy.timeout {
return FailoverDecision::Hold;
}
// Promote the first standby strictly after the active aggregator in order.
match standbys.iter().find(|(party_id, _)| *party_id > active) {
Some((next_party, next_addr)) => FailoverDecision::Promote {
demote: active,
promote_to: *next_party,
new_addr: next_addr.clone(),
},
None => FailoverDecision::Exhausted { demote: active },
}
}

#[cfg(test)]
mod tests {
use super::*;

fn policy() -> FailoverPolicy {
FailoverPolicy::new(Duration::from_secs(60))
}

fn standbys() -> Vec<(u64, String)> {
vec![(0, "0xa".into()), (1, "0xb".into()), (2, "0xc".into())]
}

#[test]
fn settled_always_holds() {
let d = decide_failover(
&policy(),
AggregatorPhase::Settled,
Duration::from_secs(10_000),
0,
&standbys(),
);
assert_eq!(d, FailoverDecision::Hold);
}

#[test]
fn within_budget_holds() {
let d = decide_failover(
&policy(),
AggregatorPhase::AwaitingPublicKey,
Duration::from_secs(59),
0,
&standbys(),
);
assert_eq!(d, FailoverDecision::Hold);
}

#[test]
fn past_budget_promotes_next_standby() {
let d = decide_failover(
&policy(),
AggregatorPhase::AwaitingPlaintext,
Duration::from_secs(61),
0,
&standbys(),
);
assert_eq!(
d,
FailoverDecision::Promote {
demote: 0,
promote_to: 1,
new_addr: "0xb".into()
}
);
}

#[test]
fn promotes_past_already_skipped_members() {
// active is party 1 (party 0 already skipped); next is party 2.
let remaining = vec![(1, "0xb".to_string()), (2, "0xc".to_string())];
let d = decide_failover(
&policy(),
AggregatorPhase::AwaitingPlaintext,
Duration::from_secs(120),
1,
&remaining,
);
assert_eq!(
d,
FailoverDecision::Promote {
demote: 1,
promote_to: 2,
new_addr: "0xc".into()
}
);
}

#[test]
fn exhausted_when_no_standby_remains() {
let only_last = vec![(2, "0xc".to_string())];
let d = decide_failover(
&policy(),
AggregatorPhase::AwaitingPlaintext,
Duration::from_secs(120),
2,
&only_last,
);
assert_eq!(d, FailoverDecision::Exhausted { demote: 2 });
}
}
1 change: 1 addition & 0 deletions crates/aggregator/src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@

pub mod committee;
pub mod committee_hash;
pub mod failover;
pub mod publickey_aggregation;
pub mod threshold_plaintext_aggregation;
9 changes: 6 additions & 3 deletions crates/ciphernode-builder/src/evm_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ impl<P: Provider + Clone + 'static> EvmSystemChainBuilder<P> {
// The event is defined here
move |msg| {
// Extract config
let deploy_block = msg.get_evm_config(chain_id)?.deploy_block();
let chain_config = msg.get_evm_config(chain_id)?;
let deploy_block = chain_config.deploy_block();
let confirmations = chain_config.confirmations();

// Pass next to the router
let router = configure_router(next, route_factories);

// Extract filters from the router
let filters = filters_from_router(&router, deploy_block);
let filters = filters_from_router(&router, deploy_block, confirmations);

// Setup and start the read interface and the router
EvmReadInterface::setup_with_factory(
Expand Down Expand Up @@ -117,6 +119,7 @@ fn configure_router(
router
}

fn filters_from_router(router: &EvmRouter, deploy_block: u64) -> Filters {
fn filters_from_router(router: &EvmRouter, deploy_block: u64, confirmations: u64) -> Filters {
Filters::from_routing_table(router.get_routing_table(), deploy_block)
.with_confirmations(confirmations)
}
8 changes: 8 additions & 0 deletions crates/cli/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::config::{self, ConfigCommands};
use crate::events::{self, EventsCommands};
use crate::helpers::telemetry::{setup_simple_tracing, setup_tracing};
use crate::net::{self, NetCommands};
use crate::node::{self, NodeCommands as NodeStateCommands};
use crate::nodes::{self, NodeCommands};
use crate::noir::NoirCommands;
use crate::password::PasswordCommands;
Expand Down Expand Up @@ -184,6 +185,7 @@ impl Cli {
Commands::Noir { command } => noir::execute(out, command, &config).await?,
Commands::Net { command } => net::execute(&out, command, &config).await?,
Commands::Events { command } => events::execute(out, command, &config).await?,
Commands::Node { command } => node::execute(out, command, &config).await?,
Commands::Rev => rev::execute(out).await?,
Commands::Config { command } => config::execute(out, command, &config).await?,
}
Expand Down Expand Up @@ -296,6 +298,12 @@ pub enum Commands {
command: NodeCommands,
},

/// Single-node maintenance commands (validate on-disk state, etc.)
Node {
#[command(subcommand)]
command: NodeStateCommands,
},

/// Manage net configuration
Net {
#[command(subcommand)]
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/src/config_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ chains:
#[cfg(test)]
mod tests {
use super::{execute, validate_eth_address, BOOTSTRAP_PEER};
use alloy::primitives::Address;
use anyhow::Result;
use std::fs;

Expand All @@ -125,7 +126,7 @@ mod tests {
));
let _ = fs::remove_dir_all(&dir);

let config = execute("ws://localhost:8545", &dir)?;
let config = execute("ws://localhost:8545", &Address::ZERO, &dir)?;

// The loader must actually read the prepopulated peer from `node.peers`.
assert_eq!(config.peers(), vec![BOOTSTRAP_PEER.to_string()]);
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod helpers;
mod init;
mod net;
mod net_get_peer_id;
mod node;
mod nodes;
mod nodes_daemon;
mod nodes_down;
Expand Down
41 changes: 41 additions & 0 deletions crates/cli/src/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use anyhow::{bail, Result};
use clap::Subcommand;
use e3_config::AppConfig;
use e3_console::{log, Console};
use e3_entrypoint::validate::validate_node;

#[derive(Subcommand, Clone, Debug)]
pub enum NodeCommands {
/// Validate the on-disk state of a single node without starting it.
///
/// Opens the node's persisted stores read-only and checks that the schema
/// is loadable by this binary, the event log is intact, the snapshot cursor
/// is consistent, and there are no orphaned committee tickets ("loose
/// ends"). Safe to run while the node is stopped; intended as the
/// pre-upgrade and post-crash health check. Exits non-zero on failure.
Validate,
}

pub async fn execute(out: Console, command: NodeCommands, config: &AppConfig) -> Result<()> {
match command {
NodeCommands::Validate => {
// Offline-only contract: hold the same cross-host fence `start` uses so the
// validator cannot read state out from under a live node or race a concurrent
// `enclave start`. Released when this scope ends.
let _fence =
e3_entrypoint::fence::ProcessFence::acquire(&config.db_file(), &config.name())?;
let report = validate_node(config).await?;
log!(out, "{}", report.render());
if report.has_failure() {
bail!("node validation failed");
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
}
Ok(())
}
7 changes: 7 additions & 0 deletions crates/cli/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ pub async fn execute(mut config: AppConfig, peers: Vec<String>) -> Result<()> {
tokio::pin!(shutdown);

owo();

// Cross-host fence: ensure only one instance runs against this data directory.
// Acquired *before* binding the control port or spawning background work so a second
// instance fails fast instead of racing on the shared data directory.
// Held for the lifetime of this function (the running process); released on exit.
let _fence = e3_entrypoint::fence::ProcessFence::acquire(&config.db_file(), &config.name())?;

Comment thread
coderabbitai[bot] marked this conversation as resolved.
launch_socket_server(config.ctrl_port());

if let Some(dashboard_port) = config.dashboard_port() {
Expand Down
Loading
Loading