diff --git a/src/commands/monitor.rs b/src/commands/monitor.rs index 47f63fc7..2d648191 100644 --- a/src/commands/monitor.rs +++ b/src/commands/monitor.rs @@ -151,7 +151,7 @@ async fn monitor_contract( let mut printed_any = false; while running.load(Ordering::SeqCst) { - match stream.next_batch() { + match stream.next_batch().await { Ok(batch) => { for event in batch { let as_text = event.value.to_string(); @@ -178,7 +178,7 @@ async fn monitor_contract( } break; } - stream.sleep(); + stream.sleep().await; } Err(err) => { if !follow && !printed_any { @@ -188,7 +188,7 @@ async fn monitor_contract( "Event stream error: {}. Reconnecting with backoff…", err )); - stream.sleep_backoff(); + stream.sleep_backoff().await; } } } @@ -281,7 +281,7 @@ async fn monitor_wallet( } } - std::thread::sleep(std::time::Duration::from_secs(interval.max(1))); + tokio::time::sleep(std::time::Duration::from_secs(interval.max(1))).await; } Ok(()) diff --git a/src/commands/network.rs b/src/commands/network.rs index a56af9b2..30e426a9 100644 --- a/src/commands/network.rs +++ b/src/commands/network.rs @@ -1,6 +1,7 @@ use crate::utils::{config, print as p}; use anyhow::Result; use clap::Subcommand; +use std::time::Duration; #[derive(Subcommand)] pub enum NetworkCommands { @@ -188,10 +189,20 @@ async fn test_network(network_name: Option) -> Result<()> { p::info(&format!("Testing connectivity to '{}'…", test_network)); p::info(&format!("Horizon: {}", net_cfg.horizon_url)); + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .pool_max_idle_per_host(10) + .build()?; + // Test Horizon endpoint - match ureq::get(&format!("{}/health", net_cfg.horizon_url)).call() { - Ok(_) => { - p::success("✓ Horizon endpoint is reachable"); + let health_url = format!("{}/health", net_cfg.horizon_url.trim_end_matches('/')); + match client.get(&health_url).send().await { + Ok(response) => { + if response.status().is_success() { + p::success("✓ Horizon endpoint is reachable"); + } else { + p::warn(&format!("✗ Horizon endpoint failed: HTTP {}", response.status())); + } } Err(e) => { p::warn(&format!("✗ Horizon endpoint failed: {}", e)); @@ -208,12 +219,13 @@ async fn test_network(network_name: Option) -> Result<()> { "params": [] }); - match ureq::post(soroban_url) - .set("Content-Type", "application/json") - .send_json(&req) - { - Ok(_) => { - p::success("✓ Soroban RPC endpoint is reachable"); + match client.post(soroban_url).json(&req).send().await { + Ok(response) => { + if response.status().is_success() { + p::success("✓ Soroban RPC endpoint is reachable"); + } else { + p::warn(&format!("✗ Soroban RPC endpoint failed: HTTP {}", response.status())); + } } Err(e) => { p::warn(&format!("✗ Soroban RPC endpoint failed: {}", e)); diff --git a/src/commands/security.rs b/src/commands/security.rs index 369b1fbd..7fa9ef9e 100644 --- a/src/commands/security.rs +++ b/src/commands/security.rs @@ -165,7 +165,7 @@ pub async fn handle(cmd: SecurityCommands) -> Result<()> { SecurityCommands::Checklist(args) => handle_checklist(args), SecurityCommands::Validate(args) => handle_validate(args), SecurityCommands::Report(args) => handle_report(args), - SecurityCommands::Monitor(args) => handle_monitor(args), + SecurityCommands::Monitor(args) => handle_monitor(args).await, SecurityCommands::Incident(args) => handle_incident(args), SecurityCommands::Audit(args) => handle_audit(args), SecurityCommands::Pentest(args) => handle_pentest(args), @@ -268,7 +268,7 @@ fn handle_report(args: ReportArgs) -> Result<()> { Ok(()) } -fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> { +async fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> { config::validate_contract_id(&args.contract)?; config::validate_network(&args.network)?; @@ -295,7 +295,7 @@ fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> { fs::create_dir_all(&report_dir)?; while running.load(Ordering::SeqCst) { - match stream.next_batch() { + match stream.next_batch().await { Ok(batch) => { for event in batch { let security_events = evaluate_event( @@ -336,11 +336,11 @@ fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> { if !args.follow { break; } - stream.sleep(); + stream.sleep().await; } Err(err) => { notifications::warn(&format!("Stream error: {}. Retrying…", err)); - stream.sleep_backoff(); + stream.sleep_backoff().await; } } } diff --git a/src/utils/horizon.rs b/src/utils/horizon.rs index 8c4df845..a7538228 100644 --- a/src/utils/horizon.rs +++ b/src/utils/horizon.rs @@ -5,11 +5,17 @@ use reqwest::Client; use serde::Deserialize; use std::time::Duration; -static HTTP_CLIENT: Lazy = Lazy::new(|| { +fn build_http_client(timeout: Duration) -> Result { Client::builder() - .timeout(Duration::from_secs(30)) + .timeout(timeout) + .pool_max_idle_per_host(10) .build() - .expect("Failed to create HTTP client") + .context("Failed to create Horizon HTTP client") +} + +static HTTP_CLIENT: Lazy = Lazy::new(|| { + build_http_client(Duration::from_secs(30)) + .expect("Failed to create shared Horizon HTTP client") }); pub fn network_config(network: &str) -> Result { diff --git a/src/utils/soroban.rs b/src/utils/soroban.rs index c9cb1a63..7b1c7878 100644 --- a/src/utils/soroban.rs +++ b/src/utils/soroban.rs @@ -1,13 +1,28 @@ use crate::utils::config::{self, WalletEntry}; use anyhow::{Context, Result}; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; +use once_cell::sync::Lazy; +use reqwest::Client; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use stellar_strkey::{ed25519, Contract}; +use std::time::Duration; use stellar_xdr::curr::{ AccountId, ContractDataDurability, ContractExecutable, Hash, LedgerEntryData, LedgerKey, LedgerKeyContractData, PublicKey, ScAddress, ScMap, ScString, ScSymbol, ScVal, Uint256, }; +fn build_http_client(timeout: Duration) -> Result { + Client::builder() + .timeout(timeout) + .pool_max_idle_per_host(10) + .build() + .context("Failed to create Soroban HTTP client") +} + +static HTTP_CLIENT: Lazy = Lazy::new(|| { + build_http_client(Duration::from_secs(30)).expect("Failed to create shared Soroban HTTP client") +}); + #[derive(Debug, Serialize, Deserialize)] pub struct SimulationResult { pub return_value: String, @@ -313,15 +328,7 @@ pub async fn check_soroban_rpc_url(url: &str) -> bool { params: serde_json::json!({}), }; - let client = match reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(10)) - .build() - { - Ok(c) => c, - Err(_) => return false, - }; - - match client.post(url).json(&request).send().await { + match HTTP_CLIENT.post(url).json(&request).send().await { Ok(response) => { if response.status() != 200 { return false; @@ -342,12 +349,7 @@ async fn rpc_request_with_url(rpc_url: &str, request: SorobanRpcRequest) -> R where T: DeserializeOwned, { - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(30)) - .build() - .with_context(|| format!("Failed to create HTTP client for {}", rpc_url))?; - - let response: SorobanRpcResponse = client + let response: SorobanRpcResponse = HTTP_CLIENT .post(rpc_url) .json(&request) .send() diff --git a/src/utils/stream.rs b/src/utils/stream.rs index 6277fac2..ed33bf10 100644 --- a/src/utils/stream.rs +++ b/src/utils/stream.rs @@ -1,7 +1,8 @@ use anyhow::{Context, Result}; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; +use once_cell::sync::Lazy; +use reqwest::Client; use serde::Deserialize; -use std::thread; use std::time::Duration; use stellar_xdr::curr::{Limited, Limits, ScSymbol, ScVal, WriteXdr}; @@ -13,6 +14,14 @@ pub struct EventStreamFilters { pub value_match: Option, } +static HTTP_CLIENT: Lazy = Lazy::new(|| { + Client::builder() + .timeout(Duration::from_secs(10)) + .pool_max_idle_per_host(10) + .build() + .expect("Failed to create Soroban event stream client") +}); + #[derive(Debug, Clone)] pub struct SorobanEventStream { rpc_url: String, @@ -90,7 +99,7 @@ impl SorobanEventStream { self } - pub fn next_batch(&mut self) -> Result> { + pub async fn next_batch(&mut self) -> Result> { let filter = self.build_rpc_filter(); let request = serde_json::json!({ "jsonrpc": "2.0", @@ -99,17 +108,20 @@ impl SorobanEventStream { "params": { "filters": [filter], "pagination": { - "cursor": self.cursor, + "cursor": self.cursor.clone(), "limit": 10 } } }); - let response: SorobanGetEventsResponse = ureq::post(&self.rpc_url) - .set("Content-Type", "application/json") - .send_json(&request) + let response: SorobanGetEventsResponse = HTTP_CLIENT + .post(&self.rpc_url) + .json(&request) + .send() + .await .with_context(|| format!("Soroban RPC request to {} failed", self.rpc_url))? - .into_json() + .json::() + .await .with_context(|| "Failed to decode Soroban getEvents response")?; if let Some(error) = response.error { @@ -138,12 +150,12 @@ impl SorobanEventStream { Ok(events) } - pub fn sleep(&self) { - thread::sleep(self.poll_interval); + pub async fn sleep(&self) { + tokio::time::sleep(self.poll_interval).await; } - pub fn sleep_backoff(&mut self) { - thread::sleep(self.backoff.next_delay()); + pub async fn sleep_backoff(&mut self) { + tokio::time::sleep(self.backoff.next_delay()).await; } fn build_rpc_filter(&self) -> serde_json::Value {