diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 32fb8710d0..e9807e3141 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -473,7 +473,7 @@ jobs: strategy: matrix: # TODO removed base test for now - test-suite: [persist] + test-suite: [base, persist] fail-fast: false steps: - name: 'Check out the repo' diff --git a/Cargo.lock b/Cargo.lock index 026e46afe4..2f4746ac98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3126,12 +3126,15 @@ dependencies = [ "compile-time", "dialoguer", "dirs 5.0.1", + "e3-ciphernode-builder", "e3-config", + "e3-console", "e3-crypto", "e3-entrypoint", "e3-events", "e3-evm", "e3-init", + "e3-socket-server", "e3-support-scripts", "e3-utils", "e3-zk-prover", @@ -3143,6 +3146,7 @@ dependencies = [ "petname", "rand 0.8.5", "serde", + "serde_json", "tokio", "toml 0.8.23", "tracing", @@ -3192,6 +3196,13 @@ dependencies = [ "url", ] +[[package]] +name = "e3-console" +version = "0.1.15" +dependencies = [ + "tokio", +] + [[package]] name = "e3-crypto" version = "0.1.15" @@ -3253,6 +3264,7 @@ dependencies = [ "e3-logger", "e3-net", "e3-request", + "e3-socket-server", "e3-sortition", "e3-test-helpers", "e3-zk-prover", @@ -3635,6 +3647,19 @@ dependencies = [ "e3-indexer", ] +[[package]] +name = "e3-socket-server" +version = "0.1.15" +dependencies = [ + "anyhow", + "e3-config", + "e3-console", + "serde", + "serde_json", + "tokio", + "tracing", +] + [[package]] name = "e3-sortition" version = "0.1.15" diff --git a/Cargo.toml b/Cargo.toml index d72b7f4da4..f32c173941 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/cli", "crates/compute-provider", "crates/config", + "crates/console", "crates/crypto", "crates/data", "crates/enclaveup", @@ -23,11 +24,13 @@ members = [ "crates/logger", "crates/multithread", "crates/net", - "crates/zk-prover", + "crates/parity-matrix", + "crates/polynomial", "crates/program-server", "crates/request", "crates/safe", "crates/sdk", + "crates/socket-server", "crates/sortition", "crates/support-scripts", "crates/sync", @@ -36,9 +39,8 @@ members = [ "crates/trbfv", "crates/utils", "crates/wasm", - "crates/parity-matrix", - "crates/polynomial", "crates/zk-helpers", + "crates/zk-prover", ] exclude = [ "examples/CRISP", @@ -75,6 +77,7 @@ repository = "https://github.com/gnosisguild/enclave" e3-aggregator = { version = "0.1.15", path = "./crates/aggregator" } e3-bfv-client = { version = "0.1.15", path = "./crates/bfv-client" } e3-config = { version = "0.1.15", path = "./crates/config" } +e3-console = { version = "0.1.15", path = "./crates/console" } e3-ciphernode-builder = { version = "0.1.15", path = "./crates/ciphernode-builder" } e3-crypto = { version = "0.1.15", path = "./crates/crypto" } e3-data = { version = "0.1.15", path = "./crates/data" } @@ -96,6 +99,7 @@ e3-logger = { version = "0.1.15", path = "./crates/logger" } e3-net = { version = "0.1.15", path = "./crates/net" } e3-compute-provider = { version = "0.1.15", path = "./crates/compute-provider" } e3-sortition = { version = "0.1.15", path = "./crates/sortition" } +e3-socket-server = { version = "0.1.15", path = "./crates/socket-server" } e3-program-server = { version = "0.1.15", path = "./crates/program-server" } e3-polynomial = { version = "0.1.15", path = "./crates/polynomial" } e3-support-scripts = { version = "0.1.15", path = "./crates/support-scripts" } diff --git a/crates/Dockerfile b/crates/Dockerfile index 467a8082d7..54f81f8187 100644 --- a/crates/Dockerfile +++ b/crates/Dockerfile @@ -48,6 +48,7 @@ COPY crates/cli/Cargo.toml ./cli/Cargo.toml COPY crates/ciphernode-builder/Cargo.toml ./ciphernode-builder/Cargo.toml COPY crates/compute-provider/Cargo.toml ./compute-provider/Cargo.toml COPY crates/config/Cargo.toml ./config/Cargo.toml +COPY crates/console/Cargo.toml ./console/Cargo.toml COPY crates/crypto/Cargo.toml ./crypto/Cargo.toml COPY crates/data/Cargo.toml ./data/Cargo.toml COPY crates/enclaveup/Cargo.toml ./enclaveup/Cargo.toml @@ -74,6 +75,7 @@ COPY crates/request/Cargo.toml ./request/Cargo.toml COPY crates/safe/Cargo.toml ./safe/Cargo.toml COPY crates/sdk/Cargo.toml ./sdk/Cargo.toml COPY crates/sortition/Cargo.toml ./sortition/Cargo.toml +COPY crates/socket-server/Cargo.toml ./socket-server/Cargo.toml COPY crates/support-scripts/Cargo.toml ./support-scripts/Cargo.toml COPY crates/sync/Cargo.toml ./sync/Cargo.toml COPY crates/test-helpers/Cargo.toml ./test-helpers/Cargo.toml diff --git a/crates/ciphernode-builder/src/ciphernode_builder.rs b/crates/ciphernode-builder/src/ciphernode_builder.rs index 2fb75df292..210f827e71 100644 --- a/crates/ciphernode-builder/src/ciphernode_builder.rs +++ b/crates/ciphernode-builder/src/ciphernode_builder.rs @@ -81,6 +81,8 @@ pub struct CiphernodeBuilder { zk_backend: Option, net_config: Option, ignore_address_check: bool, + global_shared_store: bool, + global_shared_eventstore: bool, } // Simple Net Configuration @@ -148,6 +150,8 @@ impl CiphernodeBuilder { net_config: None, zk_backend: None, ignore_address_check: false, + global_shared_store: false, + global_shared_eventstore: false, } } @@ -311,6 +315,18 @@ impl CiphernodeBuilder { self } + /// Share the store this ciphernode uses with socket server commands + pub fn with_shared_store(mut self) -> Self { + self.global_shared_store = true; + self + } + + /// Share the eventstore this ciphernode uses with socket server commands + pub fn with_shared_eventstore(mut self) -> Self { + self.global_shared_eventstore = true; + self + } + /// Setup net package components. pub fn with_net(mut self, peers: Vec, quic_port: u16) -> Self { self.net_config = Some(NetConfig::new(peers, quic_port)); @@ -388,21 +404,22 @@ impl CiphernodeBuilder { EventSystem::persisted(log_path, kv_path) .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) + .with_global_shared_store(self.global_shared_store) } else { if let Some(ref store) = self.in_mem_store { EventSystem::in_mem_from_store(store) .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) + .with_global_shared_store(self.global_shared_store) } else { EventSystem::in_mem() .with_event_bus(local_bus) .with_aggregate_config(aggregate_config.clone()) + .with_global_shared_store(self.global_shared_store) } }; - let store = event_system.store()?; - let eventstore_ts = event_system.eventstore_getter_ts()?; - let eventstore_seq = event_system.eventstore_getter_seq()?; + let eventstore = event_system.eventstore_reader()?; let cipher = &self.cipher; let repositories = Arc::new(store.repositories()); let mut provider_cache = @@ -456,6 +473,8 @@ impl CiphernodeBuilder { .as_ref() .ok_or_else(|| anyhow::anyhow!("ZK backend is required for threshold keyshare"))?; + backend.ensure_installed().await?; + // Ensure signer is available before setting up extensions that need it let signer = provider_cache.ensure_signer().await?; @@ -539,7 +558,7 @@ impl CiphernodeBuilder { (peer_id, interface, channel_bridge) }; - setup_net(topic, bus.clone(), eventstore_ts, interface)?; + setup_net(topic, bus.clone(), eventstore.ts(), interface)?; // Run the sync routine sync( @@ -547,7 +566,7 @@ impl CiphernodeBuilder { &evm_config, &repositories, &aggregate_config, - &eventstore_seq, + &eventstore.seq(), ) .await?; diff --git a/crates/ciphernode-builder/src/event_system.rs b/crates/ciphernode-builder/src/event_system.rs index 055030f417..3e1eb01489 100644 --- a/crates/ciphernode-builder/src/event_system.rs +++ b/crates/ciphernode-builder/src/event_system.rs @@ -5,6 +5,8 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use crate::get_enclave_event_bus; +use crate::global_eventstore_cache::{share_eventstore_reader, EventStoreReader}; +use crate::global_store_cache::share_store; use actix::{Actor, Addr, Handler, Recipient}; use anyhow::{anyhow, Result}; use e3_data::{ @@ -87,6 +89,11 @@ pub struct EventSystem { aggregate_config: OnceCell, /// Cached EventStoreAddrs for idempotency eventstore_addrs: OnceCell, + /// Global shared store. This can allow commands to access the database while a node is running. + global_shared_store: bool, + /// Global shared eventstore. This can allow commands to access the eventstore while a node is + /// running. + global_shared_eventstore: bool, } impl EventSystem { @@ -108,6 +115,8 @@ impl EventSystem { handle: OnceCell::new(), aggregate_config: OnceCell::new(), eventstore_addrs: OnceCell::new(), + global_shared_store: false, + global_shared_eventstore: false, } } @@ -124,6 +133,8 @@ impl EventSystem { handle: OnceCell::new(), aggregate_config: OnceCell::new(), eventstore_addrs: OnceCell::new(), + global_shared_store: false, + global_shared_eventstore: false, } } @@ -142,6 +153,8 @@ impl EventSystem { handle: OnceCell::new(), aggregate_config: OnceCell::new(), eventstore_addrs: OnceCell::new(), + global_shared_store: false, + global_shared_eventstore: false, } } @@ -165,6 +178,18 @@ impl EventSystem { self } + /// Share the store with other processes + pub fn with_global_shared_store(mut self, value: bool) -> Self { + self.global_shared_store = value; + self + } + + /// Share the store with other processes + pub fn with_global_shared_eventstore(mut self, value: bool) -> Self { + self.global_shared_eventstore = value; + self + } + /// Get the eventbus address pub fn eventbus(&self) -> Addr> { self.eventbus.get_or_init(get_enclave_event_bus).clone() @@ -287,20 +312,24 @@ impl EventSystem { } } - pub fn eventstore_getter_seq(&self) -> Result>> { + pub fn eventstore_reader(&self) -> Result { let eventstores = self.eventstore_addrs()?; - match &eventstores { - EventStoreAddrs::InMem(_) => Ok(self.in_mem_eventstore_router()?.recipient()), - EventStoreAddrs::Persisted(_) => Ok(self.persisted_eventstore_router()?.recipient()), - } - } + let reader = match &eventstores { + EventStoreAddrs::InMem(_) => { + let router = self.in_mem_eventstore_router()?; + EventStoreReader::new(router.clone().recipient(), router.recipient()) + } + EventStoreAddrs::Persisted(_) => { + let router = self.persisted_eventstore_router()?; + EventStoreReader::new(router.clone().recipient(), router.recipient()) + } + }; - pub fn eventstore_getter_ts(&self) -> Result>> { - let eventstores = self.eventstore_addrs()?; - match &eventstores { - EventStoreAddrs::InMem(_) => Ok(self.in_mem_eventstore_router()?.recipient()), - EventStoreAddrs::Persisted(_) => Ok(self.persisted_eventstore_router()?.recipient()), + if self.global_shared_eventstore { + share_eventstore_reader(&reader); } + + Ok(reader) } /// Get the BusHandle @@ -337,6 +366,10 @@ impl EventSystem { } }; + if self.global_shared_store { + share_store(&store) + } + Ok(store) } } diff --git a/crates/ciphernode-builder/src/global_eventstore_cache.rs b/crates/ciphernode-builder/src/global_eventstore_cache.rs new file mode 100644 index 0000000000..4f60f4e936 --- /dev/null +++ b/crates/ciphernode-builder/src/global_eventstore_cache.rs @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::sync::OnceLock; + +use actix::Recipient; +use e3_events::{EventStoreQueryBy, SeqAgg, TsAgg}; + +#[derive(Clone)] +pub struct EventStoreReader { + query_by_seq: Recipient>, + query_by_ts: Recipient>, +} + +impl EventStoreReader { + pub fn new( + ts: Recipient>, + seq: Recipient>, + ) -> Self { + Self { + query_by_ts: ts, + query_by_seq: seq, + } + } + + pub fn seq(&self) -> Recipient> { + self.query_by_seq.clone() + } + + pub fn ts(&self) -> Recipient> { + self.query_by_ts.clone() + } +} + +// Hold shared eventstore seq - this is a singleton for production only +static CACHED_EVENTSTORE_READER: OnceLock = OnceLock::new(); + +/// Save the eventstore to a cache for use by socket commands. This solves the problem of reusing a +/// commitlog connection while the node is running in start mode. We can use this during node start. +/// Only the first call to this is shared. +pub fn share_eventstore_reader(store: &EventStoreReader) { + CACHED_EVENTSTORE_READER.get_or_init(|| store.clone()); +} + +pub fn get_shared_eventstore() -> Option { + CACHED_EVENTSTORE_READER.get().cloned() +} diff --git a/crates/ciphernode-builder/src/global_store_cache.rs b/crates/ciphernode-builder/src/global_store_cache.rs new file mode 100644 index 0000000000..8fcd038260 --- /dev/null +++ b/crates/ciphernode-builder/src/global_store_cache.rs @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use std::sync::OnceLock; + +use e3_data::DataStore; + +// Hold shared data store - this is for production only - for testing we can create new stores per +// routine +static CACHED_STORE: OnceLock = OnceLock::new(); + +/// Save the store to a cache for use by socket commands. This solves the problem of reusing a +/// database connection while the node is running in start mode. We can use this during node start. +/// Only the first call to this is satisfied. +pub fn share_store(store: &DataStore) { + CACHED_STORE.get_or_init(|| store.clone()); +} + +pub fn get_cached_store() -> Option { + CACHED_STORE.get().cloned() +} diff --git a/crates/ciphernode-builder/src/lib.rs b/crates/ciphernode-builder/src/lib.rs index 70a5a3079d..5d83b23719 100644 --- a/crates/ciphernode-builder/src/lib.rs +++ b/crates/ciphernode-builder/src/lib.rs @@ -9,6 +9,8 @@ mod ciphernode_builder; mod event_system; mod eventbus_factory; mod evm_system; +pub mod global_eventstore_cache; +pub mod global_store_cache; mod provider_caches; pub use ciphernode::*; pub use ciphernode_builder::*; diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 1ed13553a6..8a4492cab1 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -20,12 +20,15 @@ compile-time = { workspace = true } dialoguer = { workspace = true } dirs = { workspace = true } e3-config = { workspace = true } +e3-console = { workspace = true } +e3-ciphernode-builder = { workspace = true } e3-crypto = { workspace = true } e3-entrypoint = { workspace = true } e3-events = { workspace = true } e3-evm = { workspace = true } e3-init = { workspace = true } e3-support-scripts = { workspace = true } +e3-socket-server = { workspace = true } e3-utils = { workspace = true } e3-zk-prover = { workspace = true } hex = { workspace = true } @@ -36,6 +39,7 @@ opentelemetry_sdk = { workspace = true } petname = { workspace = true } rand = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } toml = { workspace = true } tracing = { workspace = true } diff --git a/crates/cli/src/ciphernode/license.rs b/crates/cli/src/ciphernode/license.rs index ac6761f336..f7d8c48153 100644 --- a/crates/cli/src/ciphernode/license.rs +++ b/crates/cli/src/ciphernode/license.rs @@ -6,15 +6,20 @@ use alloy::primitives::U256; use anyhow::Result; +use e3_console::{log, Console}; use super::context::ChainContext; use super::utils::{ensure_allowance, parse_amount}; use super::LicenseCommands; -pub(crate) async fn execute(ctx: &ChainContext, command: LicenseCommands) -> Result<()> { +pub(crate) async fn execute( + out: Console, + ctx: &ChainContext, + command: LicenseCommands, +) -> Result<()> { match command { LicenseCommands::Bond { amount } => { - bond_license(ctx, &amount).await?; + bond_license(out, ctx, &amount).await?; } LicenseCommands::Unbond { amount } => { let license = ctx.license_token_address().await?; @@ -27,9 +32,11 @@ pub(crate) async fn execute(ctx: &ChainContext, command: LicenseCommands) -> Res .await? .get_receipt() .await?; - println!( + log!( + out, "Queued {} ENCL for exit (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); } LicenseCommands::Claim { @@ -64,14 +71,14 @@ pub(crate) async fn execute(ctx: &ChainContext, command: LicenseCommands) -> Res .await? .get_receipt() .await?; - println!("Claimed exits (tx: {:#x})", receipt.transaction_hash); + log!(out, "Claimed exits (tx: {:#x})", receipt.transaction_hash); } } Ok(()) } -async fn bond_license(ctx: &ChainContext, amount: &str) -> Result<()> { +async fn bond_license(out: Console, ctx: &ChainContext, amount: &str) -> Result<()> { let license = ctx.license_token_address().await?; let erc20 = ctx.erc20(license); let decimals = erc20.decimals().call().await?; @@ -84,9 +91,11 @@ async fn bond_license(ctx: &ChainContext, amount: &str) -> Result<()> { .await? .get_receipt() .await?; - println!( + log!( + out, "Bonded {} ENCL (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); Ok(()) } diff --git a/crates/cli/src/ciphernode/lifecycle.rs b/crates/cli/src/ciphernode/lifecycle.rs index dbfda59305..a7ab02d405 100644 --- a/crates/cli/src/ciphernode/lifecycle.rs +++ b/crates/cli/src/ciphernode/lifecycle.rs @@ -6,11 +6,12 @@ use alloy::primitives::U256; use anyhow::{bail, Result}; +use e3_console::{log, Console}; use super::context::ChainContext; use super::utils::{format_amount, parse_amount}; -pub(crate) async fn register(ctx: &ChainContext) -> Result<()> { +pub(crate) async fn register(out: Console, ctx: &ChainContext) -> Result<()> { let receipt = ctx .bonding() .registerOperator() @@ -18,7 +19,8 @@ pub(crate) async fn register(ctx: &ChainContext) -> Result<()> { .await? .get_receipt() .await?; - println!( + log!( + out, "Registered ciphernode on {} (tx: {:#x})", ctx.chain_label(), receipt.transaction_hash @@ -26,7 +28,7 @@ pub(crate) async fn register(ctx: &ChainContext) -> Result<()> { Ok(()) } -pub(crate) async fn deregister(ctx: &ChainContext) -> Result<()> { +pub(crate) async fn deregister(out: Console, ctx: &ChainContext) -> Result<()> { let receipt = ctx .bonding() .deregisterOperator() @@ -34,18 +36,20 @@ pub(crate) async fn deregister(ctx: &ChainContext) -> Result<()> { .await? .get_receipt() .await?; - println!( + log!( + out, "Deregistration requested (tx: {:#x})", receipt.transaction_hash ); Ok(()) } -pub(crate) async fn activate(ctx: &ChainContext) -> Result<()> { - register(ctx).await +pub(crate) async fn activate(out: Console, ctx: &ChainContext) -> Result<()> { + register(out, ctx).await } pub(crate) async fn deactivate( + out: Console, ctx: &ChainContext, ticket_amount: Option, license_amount: Option, @@ -67,9 +71,11 @@ pub(crate) async fn deactivate( .await? .get_receipt() .await?; - println!( + log!( + out, "Removed {} tickets (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); } @@ -84,17 +90,22 @@ pub(crate) async fn deactivate( .await? .get_receipt() .await?; - println!( + log!( + out, "Queued {} ENCL for exit (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); } - println!("Submitted deactivation transactions; monitor exit delays before claiming."); + log!( + out, + "Submitted deactivation transactions; monitor exit delays before claiming." + ); Ok(()) } -pub(crate) async fn status(ctx: &ChainContext) -> Result<()> { +pub(crate) async fn status(out: Console, ctx: &ChainContext) -> Result<()> { let contract = ctx.bonding(); let operator = ctx.operator(); let ticket_balance: U256 = contract.getTicketBalance(operator).call().await?; @@ -115,26 +126,30 @@ pub(crate) async fn status(ctx: &ChainContext) -> Result<()> { let ticket_decimals = ctx.erc20(ticket_token).decimals().call().await?; let license_decimals = ctx.erc20(license_token).decimals().call().await?; - println!("Ciphernode status on {}:", ctx.chain_label()); - println!(" Address: {:#x}", operator); - println!(" Registered: {}", is_registered); - println!(" Active: {}", is_active); - println!(" Exit pending: {}", has_exit); - println!( + log!(out, "Ciphernode status on {}:", ctx.chain_label()); + log!(out, " Address: {:#x}", operator); + log!(out, " Registered: {}", is_registered); + log!(out, " Active: {}", is_active); + log!(out, " Exit pending: {}", has_exit); + log!( + out, " Ticket balance: {} ({} available)", format_amount(ticket_balance, ticket_decimals), format_amount(available_tickets, ticket_decimals) ); - println!( + log!( + out, " License bond: {}", format_amount(license_bond, license_decimals) ); - println!( + log!( + out, " Pending exits: tickets={}, license={}", format_amount(pending_tickets, ticket_decimals), format_amount(pending_license, license_decimals) ); - println!( + log!( + out, " Requirements: minTickets={}, ticketPrice={} EKT, licenseBond={} ENCL", format_amount(min_ticket_balance, ticket_decimals), format_amount(ticket_price, ticket_decimals), diff --git a/crates/cli/src/ciphernode/mod.rs b/crates/cli/src/ciphernode/mod.rs index 1f5b2667d1..c654fd8824 100644 --- a/crates/cli/src/ciphernode/mod.rs +++ b/crates/cli/src/ciphernode/mod.rs @@ -16,11 +16,13 @@ mod tickets; mod utils; use context::ChainContext; +use e3_console::Console; +use serde::{Deserialize, Serialize}; use zeroize::Zeroizing; use crate::helpers::{ensure_hex_zeroizing, parse_zeroizing}; -#[derive(Debug, Args, Clone, Default)] +#[derive(Debug, Args, Clone, Default, Serialize, Deserialize)] pub struct ChainArgs { /// Chain name as defined in the enclave config (defaults to the first entry) #[arg(long = "chain")] @@ -33,7 +35,7 @@ impl ChainArgs { } } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum CiphernodeCommands { /// Setup local ciphernode configuration Setup { @@ -42,11 +44,11 @@ pub enum CiphernodeCommands { rpc_url: Option, /// The password - #[arg(short, long, value_parser = parse_zeroizing)] + #[arg(short='p', long, value_parser = parse_zeroizing)] password: Option>, /// Wallet Private Key - #[arg(short, long, value_parser = ensure_hex_zeroizing)] + #[arg(short='k', long, value_parser = ensure_hex_zeroizing)] private_key: Option>, }, /// Manage ENCL license tokens and bonding state @@ -94,7 +96,7 @@ pub enum CiphernodeCommands { }, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum LicenseCommands { /// Bond ENCL into the bonding registry Bond { @@ -115,7 +117,7 @@ pub enum LicenseCommands { }, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum TicketCommands { /// Deposit stablecoins to mint tickets Buy { @@ -129,27 +131,27 @@ pub enum TicketCommands { }, } -pub async fn execute(command: CiphernodeCommands, config: &AppConfig) -> Result<()> { +pub async fn execute(out: Console, command: CiphernodeCommands, config: &AppConfig) -> Result<()> { match command { CiphernodeCommands::License { chain, command } => { let ctx = ChainContext::new(config, chain.selection()).await?; - license::execute(&ctx, command).await? + license::execute(out, &ctx, command).await? } CiphernodeCommands::Tickets { chain, command } => { let ctx = ChainContext::new(config, chain.selection()).await?; - tickets::execute(&ctx, command).await? + tickets::execute(out, &ctx, command).await? } CiphernodeCommands::Register { chain } => { let ctx = ChainContext::new(config, chain.selection()).await?; - lifecycle::register(&ctx).await? + lifecycle::register(out, &ctx).await? } CiphernodeCommands::Deregister { chain } => { let ctx = ChainContext::new(config, chain.selection()).await?; - lifecycle::deregister(&ctx).await? + lifecycle::deregister(out, &ctx).await? } CiphernodeCommands::Activate { chain } => { let ctx = ChainContext::new(config, chain.selection()).await?; - lifecycle::activate(&ctx).await? + lifecycle::activate(out, &ctx).await? } CiphernodeCommands::Deactivate { chain, @@ -157,11 +159,11 @@ pub async fn execute(command: CiphernodeCommands, config: &AppConfig) -> Result< license_amount, } => { let ctx = ChainContext::new(config, chain.selection()).await?; - lifecycle::deactivate(&ctx, ticket_amount, license_amount).await? + lifecycle::deactivate(out, &ctx, ticket_amount, license_amount).await? } CiphernodeCommands::Status { chain } => { let ctx = ChainContext::new(config, chain.selection()).await?; - lifecycle::status(&ctx).await? + lifecycle::status(out, &ctx).await? } CiphernodeCommands::Setup { .. } => { bail!( diff --git a/crates/cli/src/ciphernode/setup.rs b/crates/cli/src/ciphernode/setup.rs index e95a910807..a35f893af3 100644 --- a/crates/cli/src/ciphernode/setup.rs +++ b/crates/cli/src/ciphernode/setup.rs @@ -8,6 +8,7 @@ use alloy::primitives::Address; use anyhow::Result; use dialoguer::{theme::ColorfulTheme, Input}; use e3_config::AppConfig; +use e3_console::{log, Console}; use e3_entrypoint::config::setup; use e3_utils::{colorize, Color}; use std::path::PathBuf; @@ -19,6 +20,7 @@ use crate::wallet_set::ask_for_private_key; #[instrument(name = "app", skip_all)] pub async fn execute( + out: Console, rpc_url: Option, password: Option>, private_key: Option>, @@ -64,26 +66,34 @@ pub async fn execute( e3_entrypoint::password::set::execute(&config, pw).await?; let (address, peer_id) = e3_entrypoint::wallet::set::execute(&config, private_key).await?; - print_info(&config, address, &peer_id.to_string(), &rpc_url)?; + print_info(out, &config, address, &peer_id.to_string(), &rpc_url)?; Ok(()) } -fn print_info(config: &AppConfig, address: Address, peer_id: &str, rpc_url: &str) -> Result<()> { +fn print_info( + out: Console, + config: &AppConfig, + address: Address, + peer_id: &str, + rpc_url: &str, +) -> Result<()> { let abs_config = config.config_file().canonicalize()?; - println!("\nEnclave configuration successfully created!"); - println!( + log!(out, "\nEnclave configuration successfully created!"); + log!( + out, "Editable configuration has been written to:\n\n {}", colorize(abs_config.to_string_lossy(), Color::Yellow) ); - println!(""); - println!("Data written:"); - println!(" address: {}", colorize(address, Color::Cyan)); - println!(" peer_id: {}", colorize(peer_id, Color::Cyan)); - println!(" rpc_url: {}", colorize(rpc_url, Color::Cyan)); - println!(""); + log!(out, ""); + log!(out, "Data written:"); + log!(out, " address: {}", colorize(address, Color::Cyan)); + log!(out, " peer_id: {}", colorize(peer_id, Color::Cyan)); + log!(out, " rpc_url: {}", colorize(rpc_url, Color::Cyan)); + log!(out, ""); if config.using_custom_config() { - println!( + log!( + out, "Run future commands from within this directory tree, or pass\n {}\n", colorize( format!("--config {}", abs_config.to_string_lossy()), @@ -91,7 +101,8 @@ fn print_info(config: &AppConfig, address: Address, peer_id: &str, rpc_url: &str ) ); } - println!( + log!( + out, "You can start your node using:\n `{}`\n", colorize("enclave start", Color::Yellow) ); diff --git a/crates/cli/src/ciphernode/tickets.rs b/crates/cli/src/ciphernode/tickets.rs index 6a439af705..55fbbd2fef 100644 --- a/crates/cli/src/ciphernode/tickets.rs +++ b/crates/cli/src/ciphernode/tickets.rs @@ -5,12 +5,17 @@ // or FITNESS FOR A PARTICULAR PURPOSE. use anyhow::Result; +use e3_console::{log, Console}; use super::context::ChainContext; use super::utils::{ensure_allowance, parse_amount}; use super::TicketCommands; -pub(crate) async fn execute(ctx: &ChainContext, command: TicketCommands) -> Result<()> { +pub(crate) async fn execute( + out: Console, + ctx: &ChainContext, + command: TicketCommands, +) -> Result<()> { match command { TicketCommands::Buy { amount } => { let ticket_contract = ctx.ticket_token_address().await?; @@ -26,9 +31,11 @@ pub(crate) async fn execute(ctx: &ChainContext, command: TicketCommands) -> Resu .await? .get_receipt() .await?; - println!( + log!( + out, "Purchased {} tickets (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); } TicketCommands::Burn { amount } => { @@ -42,9 +49,11 @@ pub(crate) async fn execute(ctx: &ChainContext, command: TicketCommands) -> Resu .await? .get_receipt() .await?; - println!( + log!( + out, "Removed {} tickets (tx: {:#x})", - amount, receipt.transaction_hash + amount, + receipt.transaction_hash ); } } diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index 1a43d45e79..d42eed453c 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -4,9 +4,7 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use std::path::PathBuf; - -use crate::ciphernode::{self, CiphernodeCommands}; +use crate::ciphernode::{self, ChainArgs, CiphernodeCommands}; use crate::helpers::telemetry::{setup_simple_tracing, setup_tracing}; use crate::net::{self, NetCommands}; use crate::nodes::{self, NodeCommands}; @@ -20,10 +18,14 @@ use anyhow::{bail, Result}; use clap::{command, ArgAction, Parser, Subcommand}; use e3_config::validation::ValidUrl; use e3_config::{load_config, AppConfig}; +use e3_console::{log, Console}; use e3_entrypoint::helpers::datastore::close_all_connections; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::str::FromStr; use tracing::{info, instrument, Level}; -#[derive(Parser, Debug)] +#[derive(Parser, Clone, Debug)] #[command(name = "enclave")] #[command(about = "A CLI for interacting with Enclave the open-source protocol for Encrypted Execution Environments (E3)", long_about = None)] #[command(version = env!("CARGO_PKG_VERSION"))] @@ -79,11 +81,11 @@ impl Cli { } #[instrument(skip_all)] - pub async fn execute(self) -> Result<()> { + pub async fn execute(self, out: Console, config_result: Result) -> Result<()> { let log_level = self.log_level(); // Attempt to load the config, but only treat "not found" as // the trigger for the init flow. All other errors bubble up. - let config = match self.load_config() { + let config = match config_result { Ok(cfg) => cfg, // If the file truly doesn't exist, fall back to init Err(e) @@ -94,7 +96,7 @@ impl Cli { { // Existing init branch match self.command { - Commands::Rev => rev::execute().await?, + Commands::Rev => rev::execute(out).await?, Commands::Init {path, template, skip_cleanup} => { setup_simple_tracing(log_level); init::execute(path, template, skip_cleanup, self.verbose > 0).await? @@ -107,6 +109,7 @@ impl Cli { } } => { ciphernode::setup::execute( + out, rpc_url, password, private_key, @@ -114,8 +117,9 @@ impl Cli { .await?; } Commands::Start { .. } => { - println!("No configuration found. Setting up enclave configuration..."); + log!(out,"No configuration found. Setting up enclave configuration..."); ciphernode::setup::execute( + out, None, None, None, @@ -124,7 +128,7 @@ impl Cli { }, Commands::Noir { command } => { setup_simple_tracing(log_level); - noir::execute_without_config(command).await? + noir::execute_without_config(out, command).await? }, _ => bail!( "Configuration file not found. Run `enclave ciphernode setup` to create a configuration." @@ -155,7 +159,9 @@ impl Cli { Commands::Compile { dev } => { e3_support_scripts::program_compile(config.program().clone(), dev).await? } - Commands::PrintEnv { vite, chain } => print_env::execute(&config, &chain, vite).await?, + Commands::PrintEnv { vite, chain } => { + print_env::execute(out, &config, &chain, vite).await? + } Commands::Program { command } => program::execute(command, &config).await?, Commands::PurgeAll => { purge_all::execute().await?; @@ -170,12 +176,12 @@ impl Cli { ) .await? } - Commands::Password { command } => password::execute(command, &config).await?, - Commands::Wallet { command } => wallet::execute(command, config).await?, - Commands::Ciphernode { command } => ciphernode::execute(command, &config).await?, - Commands::Noir { command } => noir::execute(command, &config).await?, - Commands::Net { command } => net::execute(command, &config).await?, - Commands::Rev => rev::execute().await?, + Commands::Password { command } => password::execute(out, command, &config).await?, + Commands::Wallet { command } => wallet::execute(out, command, config).await?, + Commands::Ciphernode { command } => ciphernode::execute(out, command, &config).await?, + Commands::Noir { command } => noir::execute(out, command, &config).await?, + Commands::Net { command } => net::execute(&out, command, &config).await?, + Commands::Rev => rev::execute(out).await?, } close_all_connections(); @@ -198,7 +204,7 @@ impl Cli { } } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum Commands { /// Start the application Start { @@ -292,3 +298,99 @@ pub enum Commands { command: NetCommands, }, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RemoteCli { + name: Option, + otel: Option, + quiet: bool, + config: Option, + verbose: u8, + command: RemoteCommand, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum RemoteCommand { + NetGetPeerId, + CiphernodeStatus { chain: ChainArgs }, + NoirStatus, + WalletGet, + Rev, + PrintEnv { vite: bool, chain: String }, +} + +impl TryFrom for RemoteCommand { + type Error = anyhow::Error; + + fn try_from(value: Commands) -> std::result::Result { + match value { + Commands::Rev => Ok(RemoteCommand::Rev), + Commands::Net { + command: NetCommands::GetPeerId, + } => Ok(RemoteCommand::NetGetPeerId), + Commands::Noir { + command: NoirCommands::Status, + } => Ok(RemoteCommand::NoirStatus), + Commands::Ciphernode { + command: CiphernodeCommands::Status { chain }, + } => Ok(RemoteCommand::CiphernodeStatus { chain }), + Commands::PrintEnv { chain, vite } => Ok(RemoteCommand::PrintEnv { vite, chain }), + Commands::Wallet { + command: WalletCommands::Get, + } => Ok(RemoteCommand::WalletGet), + _ => bail!("Command not allowed while node is running."), + } + } +} + +impl TryFrom for RemoteCli { + type Error = anyhow::Error; + fn try_from(value: Cli) -> Result { + Ok(RemoteCli { + otel: value.otel.map(|o| o.to_string()), + verbose: value.verbose, + config: value.config, + name: value.name, + quiet: value.quiet, + command: value.command.try_into()?, + }) + } +} + +impl TryFrom for Cli { + type Error = anyhow::Error; + fn try_from(value: RemoteCli) -> std::result::Result { + Ok(Cli { + verbose: value.verbose, + config: value.config, + quiet: value.quiet, + otel: value.otel.and_then(|o| ValidUrl::from_str(&o).ok()), + command: value.command.try_into()?, + name: value.name, + }) + } +} + +impl TryFrom for Commands { + type Error = anyhow::Error; + fn try_from(value: RemoteCommand) -> std::result::Result { + let command = match value { + RemoteCommand::Rev => Commands::Rev, + RemoteCommand::WalletGet => Commands::Wallet { + command: WalletCommands::Get, + }, + RemoteCommand::PrintEnv { vite, chain } => Commands::PrintEnv { vite, chain }, + RemoteCommand::CiphernodeStatus { chain } => Commands::Ciphernode { + command: CiphernodeCommands::Status { chain }, + }, + RemoteCommand::NoirStatus => Commands::Noir { + command: NoirCommands::Status, + }, + RemoteCommand::NetGetPeerId => Commands::Net { + command: NetCommands::GetPeerId, + }, + }; + // We might have to hold this stuff on RemoteCommand + Ok(command) + } +} diff --git a/crates/cli/src/helpers/telemetry.rs b/crates/cli/src/helpers/telemetry.rs index 8525db2dc0..d7e7d3ce4c 100644 --- a/crates/cli/src/helpers/telemetry.rs +++ b/crates/cli/src/helpers/telemetry.rs @@ -20,7 +20,8 @@ pub fn setup_simple_tracing(log_level: Level) { .with(tracing_subscriber::filter::LevelFilter::from_level( log_level, )) - .init(); + .try_init() + .ok(); } pub fn setup_tracing(config: &AppConfig, log_level: Level) -> Result<()> { @@ -51,7 +52,8 @@ pub fn setup_tracing(config: &AppConfig, log_level: Level) -> Result<()> { .with(tracing_subscriber::filter::LevelFilter::from_level( log_level, )) - .init(); + .try_init() + .ok(); } None => { // TODO: we might be able to dedupe this with above but there were @@ -61,7 +63,8 @@ pub fn setup_tracing(config: &AppConfig, log_level: Level) -> Result<()> { .with(tracing_subscriber::filter::LevelFilter::from_level( log_level, )) - .init(); + .try_init() + .ok(); } } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index ca271a26cd..56e50674e8 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -4,8 +4,11 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use anyhow::Result; use clap::Parser; -use cli::Cli; +use cli::{Cli, RemoteCli}; +use e3_console::Console; +use e3_socket_server::{connect_socket, run_on_socket}; use e3_utils::{colorize, Color}; use tracing::info; @@ -59,12 +62,27 @@ pub fn owo() { } #[actix::main] -pub async fn main() { +pub async fn main() -> Result<()> { info!("COMPILATION ID: '{}'", helpers::compile_id::generate_id()); + let handle = Console::stdout(); + let out = handle.writer(); + let cli = Cli::parse(); - // Execute the cli - if let Err(err) = Cli::parse().execute().await { + let config_result = cli.load_config(); + let maybe_stream = connect_socket(config_result.as_ref().ok()).await; + let maybe_remote_command = TryInto::::try_into(cli.clone()).ok(); + + // If the socket exists and the command can be parsed as remote + if let Err(err) = if let (Some(stream), Some(command)) = (maybe_stream, maybe_remote_command) { + // Run the command over the socket + run_on_socket(out, stream, command).await + } else { + // Run the command locally + cli.execute(out, config_result).await + } { eprintln!("{}", colorize(err, Color::Red)); std::process::exit(1); } + handle.flush().await; + Ok(()) } diff --git a/crates/cli/src/net.rs b/crates/cli/src/net.rs index bcb54d0e6c..3160dc7b1d 100644 --- a/crates/cli/src/net.rs +++ b/crates/cli/src/net.rs @@ -7,18 +7,19 @@ use anyhow::*; use clap::Subcommand; use e3_config::AppConfig; +use e3_console::Console; use crate::net_get_peer_id; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum NetCommands { /// Get the ciphernode's libp2p PeerId GetPeerId, } -pub async fn execute(command: NetCommands, config: &AppConfig) -> Result<()> { +pub async fn execute(out: &Console, command: NetCommands, config: &AppConfig) -> Result<()> { match command { - NetCommands::GetPeerId => net_get_peer_id::execute(config).await?, + NetCommands::GetPeerId => net_get_peer_id::execute(out, config).await?, }; Ok(()) diff --git a/crates/cli/src/net_get_peer_id.rs b/crates/cli/src/net_get_peer_id.rs index be7a397208..e486e1d122 100644 --- a/crates/cli/src/net_get_peer_id.rs +++ b/crates/cli/src/net_get_peer_id.rs @@ -6,9 +6,10 @@ use anyhow::Result; use e3_config::AppConfig; +use e3_console::{log, Console}; -pub async fn execute(config: &AppConfig) -> Result<()> { +pub async fn execute(out: &Console, config: &AppConfig) -> Result<()> { let peer_id = e3_entrypoint::net::get_peer_id::execute(config).await?; - println!("{}", peer_id); + log!(out, "{}", peer_id); Ok(()) } diff --git a/crates/cli/src/nodes.rs b/crates/cli/src/nodes.rs index bcd4f0866b..dd4d8403f5 100644 --- a/crates/cli/src/nodes.rs +++ b/crates/cli/src/nodes.rs @@ -13,7 +13,7 @@ use crate::{ nodes_stop, nodes_up, }; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum NodeCommands { /// Launch all nodes Up { diff --git a/crates/cli/src/noir.rs b/crates/cli/src/noir.rs index 2b7e1bad16..081689d7c0 100644 --- a/crates/cli/src/noir.rs +++ b/crates/cli/src/noir.rs @@ -7,9 +7,10 @@ use anyhow::*; use clap::Subcommand; use e3_config::AppConfig; +use e3_console::{log, Console}; use e3_zk_prover::{SetupStatus, ZkBackend}; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum NoirCommands { Status, Setup { @@ -18,119 +19,123 @@ pub enum NoirCommands { }, } -pub async fn execute(command: NoirCommands, config: &AppConfig) -> Result<()> { +pub async fn execute(out: Console, command: NoirCommands, config: &AppConfig) -> Result<()> { let backend = ZkBackend::new(config.bb_binary(), config.circuits_dir(), config.work_dir()); match command { NoirCommands::Status => { - execute_status(&backend).await?; + execute_status(out, &backend).await?; } NoirCommands::Setup { force } => { - execute_setup(&backend, force).await?; + execute_setup(out, &backend, force).await?; } } Ok(()) } -pub async fn execute_without_config(command: NoirCommands) -> Result<()> { +pub async fn execute_without_config(out: Console, command: NoirCommands) -> Result<()> { let backend = ZkBackend::with_default_dir("default") .map_err(|e| anyhow!("Failed to initialize ZK backend: {}", e))?; match command { NoirCommands::Status => { - execute_status(&backend).await?; + execute_status(out, &backend).await?; } NoirCommands::Setup { force } => { - execute_setup(&backend, force).await?; + execute_setup(out, &backend, force).await?; } } Ok(()) } -async fn execute_status(backend: &ZkBackend) -> Result<()> { +async fn execute_status(out: Console, backend: &ZkBackend) -> Result<()> { let status = backend.check_status().await; let version_info = backend.load_version_info().await; - println!("=== ZK Prover Status ===\n"); + log!(out, "=== ZK Prover Status ===\n"); - println!("Barretenberg (bb):"); - println!(" Path: {}", backend.bb_binary.display()); + log!(out, "Barretenberg (bb):"); + log!(out, " Path: {}", backend.bb_binary.display()); if let Some(ref v) = version_info.bb_version { - println!(" Version: {}", v); + log!(out, " Version: {}", v); } if backend.bb_binary.exists() { - println!(" Installed"); + log!(out, " Installed"); } else { - println!(" Not installed"); + log!(out, " Not installed"); } - println!(); + log!(out, ""); - println!("Circuits:"); - println!(" Path: {}", backend.circuits_dir.display()); + log!(out, "Circuits:"); + log!(out, " Path: {}", backend.circuits_dir.display()); if let Some(ref v) = version_info.circuits_version { - println!(" Version: {}", v); + log!(out, " Version: {}", v); } if backend.circuits_dir.exists() { - println!(" Installed"); + log!(out, " Installed"); } else { - println!(" Not installed"); + log!(out, " Not installed"); } - println!(); + log!(out, ""); match status { SetupStatus::Ready => { - println!("Status: Ready"); + log!(out, "Status: Ready"); } SetupStatus::BbNeedsUpdate { installed, required, } => { - println!("Status: Barretenberg needs update"); - println!( + log!(out, "Status: Barretenberg needs update"); + log!( + out, " Installed: {}", installed.as_deref().unwrap_or("not installed") ); - println!(" Required: {}", required); - println!("\nRun `enclave noir setup` to update"); + log!(out, " Required: {}", required); + log!(out, "\nRun `enclave noir setup` to update"); } SetupStatus::CircuitsNeedUpdate { installed, required, } => { - println!("Status: Circuits need update"); - println!( + log!(out, "Status: Circuits need update"); + log!( + out, " Installed: {}", installed.as_deref().unwrap_or("not installed") ); - println!(" Required: {}", required); - println!("\nRun `enclave noir setup` to update"); + log!(out, " Required: {}", required); + log!(out, "\nRun `enclave noir setup` to update"); } SetupStatus::FullSetupNeeded => { - println!("Status: Setup required"); - println!("\nRun `enclave noir setup` to install"); + log!(out, "Status: Setup required"); + log!(out, "\nRun `enclave noir setup` to install"); } } Ok(()) } -async fn execute_setup(backend: &ZkBackend, force: bool) -> Result<()> { - println!("Setting up ZK prover...\n"); - println!( +async fn execute_setup(out: Console, backend: &ZkBackend, force: bool) -> Result<()> { + log!(out, "Setting up ZK prover...\n"); + log!( + out, " target bb version: {}", backend.config.required_bb_version ); - println!( + log!( + out, " target circuits version: {}\n", backend.config.required_circuits_version ); if force { - println!("Force reinstalling ZK prover components...\n"); + log!(out, "Force reinstalling ZK prover components...\n"); // Force reinstall by directly downloading components backend @@ -145,19 +150,21 @@ async fn execute_setup(backend: &ZkBackend, force: bool) -> Result<()> { let status = backend.check_status().await; if matches!(status, SetupStatus::Ready) { let version_info = backend.load_version_info().await; - println!("ZK prover is already set up and up to date."); - println!( + log!(out, "ZK prover is already set up and up to date."); + log!( + out, " bb version: {}", version_info.bb_version.as_deref().unwrap_or("unknown") ); - println!( + log!( + out, " circuits version: {}", version_info .circuits_version .as_deref() .unwrap_or("unknown") ); - println!(" Use --force to reinstall."); + log!(out, " Use --force to reinstall."); return Ok(()); } @@ -169,15 +176,21 @@ async fn execute_setup(backend: &ZkBackend, force: bool) -> Result<()> { let version_info = backend.load_version_info().await; - println!("\nZK prover setup complete!"); - println!(); - println!(" bb binary: {}", backend.bb_binary.display()); - println!( + log!(out, "\nZK prover setup complete!"); + log!(out, ""); + log!(out, " bb binary: {}", backend.bb_binary.display()); + log!( + out, " bb version: {}", version_info.bb_version.as_deref().unwrap_or("unknown") ); - println!(" circuits dir: {}", backend.circuits_dir.display()); - println!( + log!( + out, + " circuits dir: {}", + backend.circuits_dir.display() + ); + log!( + out, " circuits version: {}", version_info .circuits_version @@ -185,7 +198,7 @@ async fn execute_setup(backend: &ZkBackend, force: bool) -> Result<()> { .unwrap_or("unknown") ); if let Some(ref ts) = version_info.last_updated { - println!(" last updated: {}", ts); + log!(out, " last updated: {}", ts); } Ok(()) diff --git a/crates/cli/src/password.rs b/crates/cli/src/password.rs index 4cc08b9553..338f7501ee 100644 --- a/crates/cli/src/password.rs +++ b/crates/cli/src/password.rs @@ -7,11 +7,12 @@ use anyhow::*; use clap::Subcommand; use e3_config::AppConfig; +use e3_console::Console; use zeroize::Zeroizing; use crate::{helpers::parse_zeroizing, password_delete, password_set}; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum PasswordCommands { /// Set (or overwrite) a password Set { @@ -24,10 +25,10 @@ pub enum PasswordCommands { Delete, } -pub async fn execute(command: PasswordCommands, config: &AppConfig) -> Result<()> { +pub async fn execute(out: Console, command: PasswordCommands, config: &AppConfig) -> Result<()> { match command { - PasswordCommands::Set { password } => password_set::execute(&config, password).await?, - PasswordCommands::Delete => password_delete::execute(&config).await?, + PasswordCommands::Set { password } => password_set::execute(out, &config, password).await?, + PasswordCommands::Delete => password_delete::execute(&out, &config).await?, }; Ok(()) diff --git a/crates/cli/src/password_delete.rs b/crates/cli/src/password_delete.rs index 9c5ed85308..d6792d73d3 100644 --- a/crates/cli/src/password_delete.rs +++ b/crates/cli/src/password_delete.rs @@ -8,9 +8,10 @@ use crate::helpers::prompt_password::prompt_password; use anyhow::Result; use dialoguer::{theme::ColorfulTheme, Confirm}; use e3_config::AppConfig; +use e3_console::{log, Console}; use zeroize::Zeroize; -pub async fn prompt_delete(config: &AppConfig) -> Result { +pub async fn prompt_delete(out: Console, config: &AppConfig) -> Result { if !Confirm::with_theme(&ColorfulTheme::default()) .with_prompt("Are you sure you want to delete the key? This action cannot be undone.") .default(false) @@ -20,7 +21,7 @@ pub async fn prompt_delete(config: &AppConfig) -> Result { } let Ok(mut cur_pw) = e3_entrypoint::password::delete::get_current_password(config).await else { - println!("Password is not set. Nothing to do."); + log!(out, "Password is not set. Nothing to do."); return Ok(false); }; @@ -35,12 +36,12 @@ pub async fn prompt_delete(config: &AppConfig) -> Result { Ok(true) } -pub async fn execute(config: &AppConfig) -> Result<()> { - if prompt_delete(config).await? { +pub async fn execute(out: &Console, config: &AppConfig) -> Result<()> { + if prompt_delete(out.clone(), config).await? { e3_entrypoint::password::delete::execute(config).await?; - println!("Password successfully deleted."); + log!(out, "Password successfully deleted."); } else { - println!("Operation cancelled."); + log!(out, "Operation cancelled."); } Ok(()) } diff --git a/crates/cli/src/password_set.rs b/crates/cli/src/password_set.rs index 5b302098eb..755d402441 100644 --- a/crates/cli/src/password_set.rs +++ b/crates/cli/src/password_set.rs @@ -6,6 +6,7 @@ use anyhow::{bail, Result}; use e3_config::AppConfig; +use e3_console::{log, Console}; use zeroize::{Zeroize, Zeroizing}; use crate::helpers::prompt_password::prompt_password; @@ -43,15 +44,19 @@ pub fn ask_for_password(input: Option>) -> Result>) -> Result<()> { - println!("Setting password..."); +pub async fn execute( + out: Console, + config: &AppConfig, + input: Option>, +) -> Result<()> { + log!(out, "Setting password..."); e3_entrypoint::password::set::preflight(config).await?; let pw = ask_for_password(input)?; e3_entrypoint::password::set::execute(config, pw).await?; - println!("Password successfully set."); + log!(out, "Password successfully set."); Ok(()) } diff --git a/crates/cli/src/print_env.rs b/crates/cli/src/print_env.rs index 99ccba221d..3bd3f95633 100644 --- a/crates/cli/src/print_env.rs +++ b/crates/cli/src/print_env.rs @@ -6,6 +6,7 @@ use anyhow::Result; use e3_config::AppConfig; +use e3_console::{log, Console}; pub fn extract_env_vars_vite(config: &AppConfig, chain: &str) -> String { let mut env_vars = Vec::new(); @@ -70,11 +71,11 @@ pub fn extract_env_vars(config: &AppConfig, chain: &str) -> String { env_vars.join(" ") } -pub async fn execute(config: &AppConfig, chain: &str, as_vite: bool) -> Result<()> { +pub async fn execute(out: Console, config: &AppConfig, chain: &str, as_vite: bool) -> Result<()> { if as_vite { - println!("{}", extract_env_vars_vite(config, chain)); + log!(out, "{}", extract_env_vars_vite(config, chain)); } else { - println!("{}", extract_env_vars(config, chain)); + log!(out, "{}", extract_env_vars(config, chain)); } Ok(()) } diff --git a/crates/cli/src/program.rs b/crates/cli/src/program.rs index 9f4578465a..6cc9bfe009 100644 --- a/crates/cli/src/program.rs +++ b/crates/cli/src/program.rs @@ -8,7 +8,7 @@ use anyhow::Result; use clap::Subcommand; use e3_config::AppConfig; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum ProgramCommands { /// Start the program Start { @@ -38,7 +38,7 @@ pub enum ProgramCommands { }, } -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum ProgramCacheCommands { /// Purge program compilation caches. Will make program compilation take longer. Purge, diff --git a/crates/cli/src/rev.rs b/crates/cli/src/rev.rs index faca607142..b44d302958 100644 --- a/crates/cli/src/rev.rs +++ b/crates/cli/src/rev.rs @@ -4,9 +4,11 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. +use e3_console::{log, Console}; + pub const GIT_SHA: &str = env!("GIT_SHA"); -pub async fn execute() -> anyhow::Result<()> { - println!("{}", GIT_SHA); +pub async fn execute(out: Console) -> anyhow::Result<()> { + log!(out, "{}", GIT_SHA); Ok(()) } diff --git a/crates/cli/src/start.rs b/crates/cli/src/start.rs index 94d3e59410..3f5c54416e 100644 --- a/crates/cli/src/start.rs +++ b/crates/cli/src/start.rs @@ -4,16 +4,83 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use crate::owo; +use std::time::Duration; + +use crate::{ + cli::{Cli, RemoteCli}, + owo, +}; use anyhow::Result; +use e3_ciphernode_builder::CiphernodeHandle; use e3_config::{AppConfig, NodeRole}; -use e3_entrypoint::helpers::listen_for_shutdown; -use tracing::{info, instrument}; +use e3_console::Console; +use e3_events::{prelude::*, Shutdown}; +use e3_socket_server::start_socket_server; +use e3_utils::{colorize, Color}; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::signal::unix::{signal, SignalKind}; +use tracing::{error, info, instrument}; #[instrument(skip_all)] pub async fn execute(mut config: AppConfig, peers: Vec) -> Result<()> { + // Register signal listeners immediately at startup + let shutdown = shutdown_signal(); + tokio::pin!(shutdown); + owo(); + launch_socket_server(config.ctrl_port()); + + let node = tokio::select! { + // build the ciphernode and if it completes first return the result + result = build_ciphernode(&mut config, peers) => result, + // if the shutdown signal completes first then do shutdown without the node + _ = &mut shutdown => { + graceful_shutdown(None).await; + return Ok(()); + } + }?; + + info!( + "LAUNCHING CIPHERNODE: ({}/{}/{})", + config.name(), + node.address, + node.peer_id + ); + + shutdown.await; + graceful_shutdown(Some(node)).await; + + Ok(()) +} + +/// Launch a socket server to read RemoteCli commands +pub fn launch_socket_server(ctrl_port: u16) { + // Setup socket server for daemon + tokio::task::spawn_local(start_socket_server(ctrl_port, |stream| async move { + let (reader, mut writer) = stream.into_split(); + let mut lines = BufReader::new(reader).lines(); + + if let Some(line) = lines.next_line().await? { + let (out, mut rx) = Console::channel(); + info!("CMD: {}", &colorize(&line, Color::Blue)); + let remote_cli: RemoteCli = serde_json::from_str(&line)?; + let cli: Cli = remote_cli.try_into()?; + let config_result = cli.load_config(); + cli.execute(out, config_result).await?; + while let Some(msg) = rx.recv().await { + writer.write_all(format!("{msg}\n").as_bytes()).await?; + } + } + + writer.shutdown().await?; + Ok(()) + })); +} +pub async fn build_ciphernode( + config: &mut AppConfig, + peers: Vec, +) -> Result { // add cli peers to the config config.add_peers(peers); @@ -35,14 +102,32 @@ pub async fn execute(mut config: AppConfig, peers: Vec) -> Result<()> { NodeRole::Ciphernode => e3_entrypoint::start::start::execute(&config).await?, }; - info!( - "LAUNCHING CIPHERNODE: ({}/{}/{})", - config.name(), - node.address, - node.peer_id - ); + Ok(node) +} - tokio::spawn(listen_for_shutdown(node)).await?; +pub fn shutdown_signal() -> impl std::future::Future { + let mut sigint = + signal(SignalKind::interrupt()).expect("Failed to create SIGINT signal stream"); + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal stream"); - Ok(()) + async move { + tokio::select! { + _ = sigint.recv() => info!("SIGINT received"), + _ = sigterm.recv() => info!("SIGTERM received"), + } + } +} + +pub async fn graceful_shutdown(node: Option) { + info!("initiating graceful shutdown..."); + + if let Some(node) = node { + if let Err(e) = node.bus.publish_without_context(Shutdown) { + error!("Shutdown failed to publish! {e}"); + } + } + + tokio::time::sleep(Duration::from_secs(2)).await; + info!("Graceful shutdown complete"); } diff --git a/crates/cli/src/wallet.rs b/crates/cli/src/wallet.rs index 36225b6ebc..63f9bd15aa 100644 --- a/crates/cli/src/wallet.rs +++ b/crates/cli/src/wallet.rs @@ -7,11 +7,12 @@ use anyhow::*; use clap::Subcommand; use e3_config::AppConfig; +use e3_console::Console; use zeroize::Zeroizing; use crate::{helpers::ensure_hex_zeroizing, wallet_get, wallet_set}; -#[derive(Subcommand, Debug)] +#[derive(Subcommand, Clone, Debug)] pub enum WalletCommands { /// Set wallet private key Set { @@ -24,10 +25,12 @@ pub enum WalletCommands { Get, } -pub async fn execute(command: WalletCommands, config: AppConfig) -> Result<()> { +pub async fn execute(out: Console, command: WalletCommands, config: AppConfig) -> Result<()> { match command { - WalletCommands::Set { private_key } => wallet_set::execute(&config, private_key).await?, - WalletCommands::Get => wallet_get::execute(&config).await?, + WalletCommands::Set { private_key } => { + wallet_set::execute(out, &config, private_key).await? + } + WalletCommands::Get => wallet_get::execute(out, &config).await?, }; Ok(()) diff --git a/crates/cli/src/wallet_get.rs b/crates/cli/src/wallet_get.rs index 383ae256f1..7b3d3fc218 100644 --- a/crates/cli/src/wallet_get.rs +++ b/crates/cli/src/wallet_get.rs @@ -6,10 +6,11 @@ use anyhow::Result; use e3_config::AppConfig; +use e3_console::{log, Console}; -pub async fn execute(config: &AppConfig) -> Result<()> { +pub async fn execute(out: Console, config: &AppConfig) -> Result<()> { let address = e3_entrypoint::wallet::get::execute(config).await?; - println!("{}", address); + log!(out, "{}", address); Ok(()) } diff --git a/crates/cli/src/wallet_set.rs b/crates/cli/src/wallet_set.rs index 27b38d2964..6973b98dd3 100644 --- a/crates/cli/src/wallet_set.rs +++ b/crates/cli/src/wallet_set.rs @@ -7,6 +7,7 @@ use anyhow::Result; use dialoguer::{theme::ColorfulTheme, Password}; use e3_config::AppConfig; +use e3_console::{log, Console}; use e3_entrypoint::wallet::set::validate_private_key; use zeroize::Zeroizing; @@ -28,10 +29,17 @@ pub fn ask_for_private_key(given_key: Option>) -> Result>) -> Result<()> { +pub async fn execute( + out: Console, + config: &AppConfig, + private_key: Option>, +) -> Result<()> { let input = ask_for_private_key(private_key)?; e3_entrypoint::wallet::set::execute(config, input).await?; - println!("Wallet key has been successfully stored and encrypted."); + log!( + out, + "Wallet key has been successfully stored and encrypted." + ); Ok(()) } diff --git a/crates/config/src/app_config.rs b/crates/config/src/app_config.rs index a5f7f27c8f..7e027c5a0c 100644 --- a/crates/config/src/app_config.rs +++ b/crates/config/src/app_config.rs @@ -53,6 +53,8 @@ pub struct NodeDefinition { pub peers: Vec, /// The port to use for the quic listener pub quic_port: u16, + /// The port to use for the ctrl socket listener + pub ctrl_port: u16, /// The name for the database pub db_file: PathBuf, /// The name for the keyfile @@ -80,6 +82,7 @@ impl Default for NodeDefinition { peers: vec![], // NOTE: We should look at generation via ipns fetch for the latest nodes address: None, quic_port: 9091, + ctrl_port: 50505, key_file: PathBuf::from("key"), // ~/.config/enclave/key db_file: PathBuf::from("db"), // ~/.config/enclave/db log_file: PathBuf::from("log"), // ~/.config/enclave/log @@ -335,6 +338,11 @@ impl AppConfig { self.node_def().quic_port } + /// get the ctrl port + pub fn ctrl_port(&self) -> u16 { + self.node_def().ctrl_port + } + /// Get the config file path pub fn config_file(&self) -> PathBuf { self.paths.config_file() diff --git a/crates/config/src/validation.rs b/crates/config/src/validation.rs index c6a4f0369b..8c6246881a 100644 --- a/crates/config/src/validation.rs +++ b/crates/config/src/validation.rs @@ -18,6 +18,12 @@ impl FromStr for ValidUrl { } } +impl ToString for ValidUrl { + fn to_string(&self) -> String { + self.0.to_string() + } +} + impl From for String { fn from(value: ValidUrl) -> Self { value.0.to_string() diff --git a/crates/console/Cargo.toml b/crates/console/Cargo.toml new file mode 100644 index 0000000000..804a00634f --- /dev/null +++ b/crates/console/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "e3-console" +version.workspace = true +edition.workspace = true +license.workspace = true +description.workspace = true +repository.workspace = true + +[dependencies] +tokio.workspace = true diff --git a/crates/console/src/lib.rs b/crates/console/src/lib.rs new file mode 100644 index 0000000000..4cb39ac66e --- /dev/null +++ b/crates/console/src/lib.rs @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: LGPL-3.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use tokio::io::{AsyncWrite, AsyncWriteExt}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +#[derive(Clone)] +pub struct Console { + tx: mpsc::UnboundedSender, +} +pub struct ConsoleHandle { + console: Console, + join: JoinHandle<()>, +} +impl Console { + /// Output goes to stdout. + pub fn stdout() -> ConsoleHandle { + let (tx, mut rx) = mpsc::unbounded_channel(); + let join = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + println!("{msg}"); + } + }); + ConsoleHandle { + console: Console { tx }, + join, + } + } + + pub fn writer(mut w: impl AsyncWrite + Unpin + Send + 'static) -> ConsoleHandle { + let (tx, mut rx) = mpsc::unbounded_channel::(); + let join = tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + if w.write_all(msg.as_bytes()).await.is_err() { + break; + } + if w.write_all(b"\n").await.is_err() { + break; + } + } + let _ = w.flush().await; + }); + ConsoleHandle { + console: Console { tx }, + join, + } + } + + /// Output goes to the returned receiver. Caller decides the destination. + pub fn channel() -> (Self, mpsc::UnboundedReceiver) { + let (tx, rx) = mpsc::unbounded_channel(); + (Self { tx }, rx) + } + + /// Emit a message to whatever destination this context is wired to. + pub fn log(&self, msg: String) { + let _ = self.tx.send(msg); + } +} + +impl ConsoleHandle { + /// Get a cheap cloneable reference to pass around. + pub fn writer(&self) -> Console { + self.console.clone() + } + + /// Drop the sender and wait for the printer task to drain. + pub async fn flush(self) { + drop(self.console); + let _ = self.join.await; + } +} + +#[macro_export] +macro_rules! log { + ($ctx:expr, $($arg:tt)*) => { + $ctx.log(format!($($arg)*)) + }; +} diff --git a/crates/entrypoint/Cargo.toml b/crates/entrypoint/Cargo.toml index 947ecdba46..a7e090a404 100644 --- a/crates/entrypoint/Cargo.toml +++ b/crates/entrypoint/Cargo.toml @@ -23,6 +23,7 @@ dirs = { workspace = true } e3-events = { workspace = true } e3-evm = { workspace = true } e3-fhe = { workspace = true } +e3-socket-server = { workspace = true } hex = { workspace = true } e3-keyshare = { workspace = true } e3-logger = { workspace = true } diff --git a/crates/entrypoint/src/helpers/datastore.rs b/crates/entrypoint/src/helpers/datastore.rs index eea5ee5a09..ab965b339c 100644 --- a/crates/entrypoint/src/helpers/datastore.rs +++ b/crates/entrypoint/src/helpers/datastore.rs @@ -4,15 +4,16 @@ // without even the implied warranty of MERCHANTABILITY // or FITNESS FOR A PARTICULAR PURPOSE. -use std::path::PathBuf; - use actix::Actor; use anyhow::Result; -use e3_ciphernode_builder::get_enclave_bus_handle; +use e3_ciphernode_builder::global_eventstore_cache::{get_shared_eventstore, EventStoreReader}; +use e3_ciphernode_builder::global_store_cache::get_cached_store; +use e3_ciphernode_builder::{get_enclave_bus_handle, EventSystem}; use e3_config::AppConfig; use e3_data::{DataStore, InMemStore, SledDb, SledStore}; use e3_data::{Repositories, RepositoriesFactory}; use e3_events::{BusHandle, Disabled}; +use std::path::PathBuf; pub fn get_sled_store(bus: &BusHandle, db_file: &PathBuf) -> Result { Ok((&SledStore::new(bus, db_file)?).into()) @@ -31,12 +32,32 @@ pub fn setup_datastore(config: &AppConfig, bus: &BusHandle) -> Result< Ok(store) } +/// Command helper to get a store pub fn get_repositories(config: &AppConfig) -> Result { + // We are probably in a socket command so get the shared store + if let Some(store) = get_cached_store() { + return Ok(store.repositories()); + } + + // We are probably in a standalone command so setup a fresh data store let bus = get_enclave_bus_handle()?; let store = setup_datastore(config, &bus)?; Ok(store.repositories()) } +/// Command helper to get an eventstore reader for reading events from the event store +pub fn get_eventstore_reader(config: &AppConfig) -> Result { + // We are probably in a socket command so get the shared eventstore reader + if let Some(es) = get_shared_eventstore() { + return Ok(es); + } + + // We are probably in a standalone command so get a new reader + let system = EventSystem::persisted(config.log_file(), config.db_file()); + let es = system.eventstore_reader()?; + Ok(es) +} + pub fn close_all_connections() { SledDb::close_all_connections(); } diff --git a/crates/entrypoint/src/helpers/mod.rs b/crates/entrypoint/src/helpers/mod.rs index ea837e2a25..30d12518ce 100644 --- a/crates/entrypoint/src/helpers/mod.rs +++ b/crates/entrypoint/src/helpers/mod.rs @@ -6,6 +6,4 @@ pub mod datastore; pub mod rand; -pub mod shutdown; pub mod termtable; -pub use shutdown::*; diff --git a/crates/entrypoint/src/helpers/shutdown.rs b/crates/entrypoint/src/helpers/shutdown.rs deleted file mode 100644 index b81f126092..0000000000 --- a/crates/entrypoint/src/helpers/shutdown.rs +++ /dev/null @@ -1,26 +0,0 @@ -// SPDX-License-Identifier: LGPL-3.0-only -// -// This file is provided WITHOUT ANY WARRANTY; -// without even the implied warranty of MERCHANTABILITY -// or FITNESS FOR A PARTICULAR PURPOSE. - -use e3_ciphernode_builder::CiphernodeHandle; -use e3_events::{prelude::*, Shutdown}; -use std::time::Duration; -use tokio::signal::unix::{signal, SignalKind}; -use tracing::{error, info}; - -pub async fn listen_for_shutdown(node: CiphernodeHandle) { - let bus = node.bus; - let mut sigterm = - signal(SignalKind::terminate()).expect("Failed to create SIGTERM signal stream"); - sigterm.recv().await; - info!("SIGTERM received, initiating graceful shutdown..."); - - if let Err(e) = bus.publish_without_context(Shutdown) { - error!("Shutdown failed to publish! {e}"); - } - - tokio::time::sleep(Duration::from_secs(2)).await; - info!("Graceful shutdown complete"); -} diff --git a/crates/entrypoint/src/start/aggregator_start.rs b/crates/entrypoint/src/start/aggregator_start.rs index b628a5d810..32b428aa85 100644 --- a/crates/entrypoint/src/start/aggregator_start.rs +++ b/crates/entrypoint/src/start/aggregator_start.rs @@ -25,7 +25,6 @@ pub async fn execute( let rng = Arc::new(Mutex::new(ChaCha20Rng::from_rng(OsRng)?)); let cipher = Arc::new(Cipher::from_file(config.key_file()).await?); let backend = ZkBackend::new(config.bb_binary(), config.circuits_dir(), config.work_dir()); - backend.ensure_installed().await?; let node = CiphernodeBuilder::new(rng.clone(), cipher.clone()) .with_persistence(&config.log_file(), &config.db_file()) @@ -40,6 +39,8 @@ pub async fn execute( .with_pubkey_aggregation() .with_threshold_plaintext_aggregation() .with_net(config.peers(), config.quic_port()) + .with_shared_store() + .with_shared_eventstore() .build() .await?; diff --git a/crates/entrypoint/src/start/start.rs b/crates/entrypoint/src/start/start.rs index 497022f90d..f1db8bd827 100644 --- a/crates/entrypoint/src/start/start.rs +++ b/crates/entrypoint/src/start/start.rs @@ -19,7 +19,6 @@ pub async fn execute(config: &AppConfig) -> Result { let rng = Arc::new(Mutex::new(rand_chacha::ChaCha20Rng::from_rng(OsRng)?)); let cipher = Arc::new(Cipher::from_file(&config.key_file()).await?); let backend = ZkBackend::new(config.bb_binary(), config.circuits_dir(), config.work_dir()); - backend.ensure_installed().await?; let node = CiphernodeBuilder::new(rng.clone(), cipher.clone()) .with_persistence(&config.log_file(), &config.db_file()) @@ -33,6 +32,8 @@ pub async fn execute(config: &AppConfig) -> Result { .with_trbfv() .with_zkproof(backend) .with_net(config.peers(), config.quic_port()) + .with_shared_store() + .with_shared_eventstore() .build() .await?; diff --git a/crates/net/src/net_interface.rs b/crates/net/src/net_interface.rs index 84185705d0..6f69cfc711 100644 --- a/crates/net/src/net_interface.rs +++ b/crates/net/src/net_interface.rs @@ -210,7 +210,7 @@ impl Libp2pNetInterface { // Process commands Some(command) = cmd_rx.recv() => { if let NetCommand::Shutdown = command { - if let Err(e) = handle_shutdown(&mut self.swarm) { + if let Err(e) = handle_shutdown(&mut self.swarm).await { error!("Error processing NetCommand: {e}"); } break; @@ -907,16 +907,20 @@ fn handle_get_record( Ok(()) } -fn handle_shutdown(swarm: &mut Swarm) -> Result<()> { +async fn handle_shutdown(swarm: &mut Swarm) -> Result<()> { info!("Starting graceful shutdown"); - - // Disconnect all peers let peers: Vec<_> = swarm.connected_peers().copied().collect(); for peer in peers { - info!("Disconnecting from peer: {}", peer); let _ = swarm.disconnect_peer_id(peer); } - + // Drive the swarm briefly to flush QUIC CONNECTION_CLOSE frames + let drain_deadline = Instant::now() + Duration::from_secs(2); + while Instant::now() < drain_deadline { + match tokio::time::timeout(Duration::from_millis(100), swarm.select_next_some()).await { + Ok(_event) => continue, + Err(_timeout) => break, // No more events, frames flushed + } + } info!("Graceful shutdown complete"); Ok(()) } diff --git a/crates/net/src/net_sync_manager.rs b/crates/net/src/net_sync_manager.rs index 5a0f7c35e9..e18a28bf50 100644 --- a/crates/net/src/net_sync_manager.rs +++ b/crates/net/src/net_sync_manager.rs @@ -30,7 +30,7 @@ use crate::{ const NET_READY_CONNECT_TIMEOUT: Duration = Duration::from_secs(60); /// Maximum time to wait for the `AllPeersDialed` event before giving up. -const ALL_PEERS_DIALED_TIMEOUT: Duration = Duration::from_secs(30); +const ALL_PEERS_DIALED_TIMEOUT: Duration = Duration::from_secs(120); /// Maximum time to wait for a peer to reconnect after sync fetch fails. /// On restart, peers may briefly connect then disconnect (the remote side still @@ -347,20 +347,36 @@ async fn handle_sync_request_event( let (event, ctx) = event.into_components(); info!("Checking for AllPeersDialed..."); if wait_for_event { - await_event( + info!("Waiting for peer connection..."); + let has_peers = await_event( &net_events, - |e| { - if matches!(e, &NetEvent::AllPeersDialed { .. }) { - info!("AllPeersDialed matched!"); - Some(e.clone()) - } else { - None + |e| match e { + NetEvent::ConnectionEstablished { .. } => { + info!("Peer connection established"); + Some(true) } + NetEvent::AllPeersDialed { total: 0, .. } => { + info!("No peers configured, proceeding without sync"); + Some(false) + } + _ => None, }, - ALL_PEERS_DIALED_TIMEOUT, + NET_READY_CONNECT_TIMEOUT, ) .await - .ok(); // Timeout is non-fatal — proceed regardless + .context("No peer connections established within timeout")?; + + if !has_peers { + let value = SyncRequestSucceeded { + response: SyncResponseValue { + events: vec![], + ts: 0, + }, + }; + + address.into().try_send(TypedEvent::new(value, ctx))?; + return Ok(()); + } } info!("handle_sync_request_event: ready to sync"); @@ -373,7 +389,10 @@ async fn handle_sync_request_event( "Requesting batched events for aggregate_id={} since={}", aggregate_id, since ); - let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()).build(); + let requester = DirectRequester::builder(net_cmds.clone(), net_events.clone()) + .max_retries(10) + .retry_timeout(Duration::from_secs(5)) + .build(); match fetch_all_batched_events::>( requester, PeerTarget::Random, diff --git a/crates/socket-server/Cargo.toml b/crates/socket-server/Cargo.toml new file mode 100644 index 0000000000..c21bbc1420 --- /dev/null +++ b/crates/socket-server/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "e3-socket-server" +version.workspace = true +edition.workspace = true +license.workspace = true +description.workspace = true +repository.workspace = true + +[dependencies] +anyhow.workspace = true +e3-console.workspace = true +e3-config.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true diff --git a/crates/socket-server/src/lib.rs b/crates/socket-server/src/lib.rs new file mode 100644 index 0000000000..c3ff33ae64 --- /dev/null +++ b/crates/socket-server/src/lib.rs @@ -0,0 +1,78 @@ +// SPDX-License-Identifier: LGPL-2.0-only +// +// This file is provided WITHOUT ANY WARRANTY; +// without even the implied warranty of MERCHANTABILITY +// or FITNESS FOR A PARTICULAR PURPOSE. + +use anyhow::Result; +use e3_config::AppConfig; +use e3_console::{log, Console}; +use serde::Serialize; +use std::future::Future; +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::TcpStream; +use tracing::error; + +const TCP_ADDRESS: &str = "127.0.0.1"; // using localhost specifically so that it is not mounted + // externally. We might change this if we need to control + // externally and add authentication or TLS + +pub async fn connect_socket(maybe_config: Option<&AppConfig>) -> Option { + let config = maybe_config?; + let addr = format!("{}:{}", TCP_ADDRESS, config.ctrl_port()); + TcpStream::connect(addr).await.ok() +} + +pub async fn run_on_socket( + out: Console, + stream: TcpStream, + cli: T, +) -> anyhow::Result<()> { + let (reader, mut writer) = stream.into_split(); + let payload = serde_json::to_string(&cli)?; + writer.write_all(payload.as_bytes()).await?; + writer.write_all(b"\n").await?; + writer.shutdown().await?; + + let mut lines = BufReader::new(reader).lines(); + while let Some(line) = lines.next_line().await? { + log!(out, "{}", line); + } + + Ok(()) +} + +pub async fn start_socket_server(tcp_port: u16, handler: F) +where + F: Fn(TcpStream) -> Fut + Send + Sync + 'static, + Fut: Future> + 'static, +{ + let addr = format!("{}:{}", TCP_ADDRESS, tcp_port); + let listener = match tokio::net::TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + error!("Failed to bind socket: {e}"); + return; + } + }; + + let handler = Arc::new(handler); + loop { + match listener.accept().await { + Ok((stream, _)) => { + let handler = Arc::clone(&handler); + + tokio::task::spawn_local(async move { + if let Err(e) = handler(stream).await { + error!("Connection error: {e}"); + } + }); + } + Err(e) => { + error!("Accept error: {e}"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + } +} diff --git a/docs/pages/ciphernode-operators/running.mdx b/docs/pages/ciphernode-operators/running.mdx index 267c551d6f..157ce2720a 100644 --- a/docs/pages/ciphernode-operators/running.mdx +++ b/docs/pages/ciphernode-operators/running.mdx @@ -289,6 +289,9 @@ Open the following ports: | ------ | -------- | -------------------------- | | `9091` | UDP | QUIC/libp2p P2P networking | +> **Important:** Port `50505` (TCP) is used for local CLI commands and binds to `localhost` only. +> **Do not expose this port externally** - it provides control plane access to your node. + ### Bootstrap Peers Connect to the Interfold bootstrap network: diff --git a/examples/CRISP/enclave.config.yaml b/examples/CRISP/enclave.config.yaml index 6a37ccad3a..bbb720bc06 100644 --- a/examples/CRISP/enclave.config.yaml +++ b/examples/CRISP/enclave.config.yaml @@ -34,33 +34,38 @@ nodes: cn1: address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" quic_port: 9201 + ctrl_port: 50501 autonetkey: true autopassword: true cn2: address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" quic_port: 9202 + ctrl_port: 50502 autonetkey: true autopassword: true cn3: address: "0x90F79bf6EB2c4f870365E785982E1f101E93b906" quic_port: 9203 + ctrl_port: 50503 autonetkey: true autopassword: true cn4: address: "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65" quic_port: 9204 + ctrl_port: 50504 autonetkey: true autopassword: true cn5: address: "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc" quic_port: 9205 + ctrl_port: 50505 autonetkey: true autopassword: true ag: address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" quic_port: 9206 + ctrl_port: 50506 autonetkey: true autopassword: true role: type: "aggregator" - diff --git a/examples/CRISP/server/Dockerfile b/examples/CRISP/server/Dockerfile index 8c6ad43ff6..888feee3f5 100644 --- a/examples/CRISP/server/Dockerfile +++ b/examples/CRISP/server/Dockerfile @@ -64,6 +64,7 @@ COPY crates/ciphernode-builder/Cargo.toml crates/ciphernode-builder/Cargo.toml COPY crates/cli/Cargo.toml crates/cli/Cargo.toml COPY crates/compute-provider/Cargo.toml crates/compute-provider/Cargo.toml COPY crates/config/Cargo.toml crates/config/Cargo.toml +COPY crates/console/Cargo.toml crates/console/Cargo.toml COPY crates/crypto/Cargo.toml crates/crypto/Cargo.toml COPY crates/data/Cargo.toml crates/data/Cargo.toml COPY crates/entrypoint/Cargo.toml crates/entrypoint/Cargo.toml @@ -84,6 +85,7 @@ COPY crates/program-server/Cargo.toml crates/program-server/Cargo.toml COPY crates/request/Cargo.toml crates/request/Cargo.toml COPY crates/sdk/Cargo.toml crates/sdk/Cargo.toml COPY crates/sortition/Cargo.toml crates/sortition/Cargo.toml +COPY crates/socket-server/Cargo.toml crates/socket-server/Cargo.toml COPY crates/support-scripts/Cargo.toml crates/support-scripts/Cargo.toml COPY crates/sync/Cargo.toml crates/sync/Cargo.toml COPY crates/test-helpers/Cargo.toml crates/test-helpers/Cargo.toml diff --git a/templates/default/enclave.config.yaml b/templates/default/enclave.config.yaml index 27a411623c..682b02a85f 100644 --- a/templates/default/enclave.config.yaml +++ b/templates/default/enclave.config.yaml @@ -26,33 +26,38 @@ nodes: cn1: address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" quic_port: 9201 + ctrl_port: 50501 autonetkey: true autopassword: true cn2: address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" quic_port: 9202 + ctrl_port: 50502 autonetkey: true autopassword: true cn3: address: "0x90F79bf6EB2c4f870365E785982E1f101E93b906" quic_port: 9203 + ctrl_port: 50503 autonetkey: true autopassword: true cn4: address: "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65" quic_port: 9204 + ctrl_port: 50504 autonetkey: true autopassword: true cn5: address: "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc" quic_port: 9205 + ctrl_port: 50505 autonetkey: true autopassword: true ag: address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" quic_port: 9206 + ctrl_port: 50506 autonetkey: true autopassword: true role: type: "aggregator" - diff --git a/tests/integration/enclave.config.yaml b/tests/integration/enclave.config.yaml index 6e50b6d7c4..3dc7463498 100644 --- a/tests/integration/enclave.config.yaml +++ b/tests/integration/enclave.config.yaml @@ -36,31 +36,37 @@ nodes: cn1: address: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" quic_port: 9201 + ctrl_port: 50501 autonetkey: true autopassword: true cn2: address: "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" quic_port: 9202 + ctrl_port: 50502 autonetkey: true autopassword: true cn3: address: "0x90F79bf6EB2c4f870365E785982E1f101E93b906" quic_port: 9203 + ctrl_port: 50503 autonetkey: true autopassword: true cn4: address: "0x15d34AAf54267DB7D7c367839AAf71A00a2C6A65" quic_port: 9204 + ctrl_port: 50504 autonetkey: true autopassword: true cn5: address: "0x9965507D1a55bcC2695C58ba16FB37d819B0A4dc" quic_port: 9205 + ctrl_port: 50505 autonetkey: true autopassword: true ag: address: "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" quic_port: 9206 + ctrl_port: 50506 autonetkey: true autopassword: true role: diff --git a/tests/integration/persist.sh b/tests/integration/persist.sh index f75326601d..20fce089cd 100755 --- a/tests/integration/persist.sh +++ b/tests/integration/persist.sh @@ -76,12 +76,12 @@ waiton "$SCRIPT_DIR/output/pubkey.bin" # kill aggregator enclave_nodes_stop ag -sleep 2 +sleep 8 # relaunch the aggregator enclave_nodes_start ag -sleep 4 +sleep 8 heading "Mock encrypted plaintext" $SCRIPT_DIR/lib/fake_encrypt.sh --input "$SCRIPT_DIR/output/pubkey.bin" --output "$SCRIPT_DIR/output/output.bin" --plaintext $PLAINTEXT --params "$ENCODED_PARAMS"