Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/commands/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ async fn monitor_contract(
let mut printed_any = false;

while running.load(Ordering::SeqCst) {
match stream.next_batch() {
match stream.next_batch().await {
Ok(batch) => {
for event in batch {
let as_text = event.value.to_string();
Expand All @@ -178,7 +178,7 @@ async fn monitor_contract(
}
break;
}
stream.sleep();
stream.sleep().await;
}
Err(err) => {
if !follow && !printed_any {
Expand All @@ -188,7 +188,7 @@ async fn monitor_contract(
"Event stream error: {}. Reconnecting with backoff…",
err
));
stream.sleep_backoff();
stream.sleep_backoff().await;
}
}
}
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn monitor_wallet(
}
}

std::thread::sleep(std::time::Duration::from_secs(interval.max(1)));
tokio::time::sleep(std::time::Duration::from_secs(interval.max(1))).await;
}

Ok(())
Expand Down
30 changes: 21 additions & 9 deletions src/commands/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::utils::{config, print as p};
use anyhow::Result;
use clap::Subcommand;
use std::time::Duration;

#[derive(Subcommand)]
pub enum NetworkCommands {
Expand Down Expand Up @@ -188,10 +189,20 @@ async fn test_network(network_name: Option<String>) -> Result<()> {
p::info(&format!("Testing connectivity to '{}'…", test_network));
p::info(&format!("Horizon: {}", net_cfg.horizon_url));

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.pool_max_idle_per_host(10)
.build()?;

// Test Horizon endpoint
match ureq::get(&format!("{}/health", net_cfg.horizon_url)).call() {
Ok(_) => {
p::success("✓ Horizon endpoint is reachable");
let health_url = format!("{}/health", net_cfg.horizon_url.trim_end_matches('/'));
match client.get(&health_url).send().await {
Ok(response) => {
if response.status().is_success() {
p::success("✓ Horizon endpoint is reachable");
} else {
p::warn(&format!("✗ Horizon endpoint failed: HTTP {}", response.status()));
}
}
Err(e) => {
p::warn(&format!("✗ Horizon endpoint failed: {}", e));
Expand All @@ -208,12 +219,13 @@ async fn test_network(network_name: Option<String>) -> Result<()> {
"params": []
});

match ureq::post(soroban_url)
.set("Content-Type", "application/json")
.send_json(&req)
{
Ok(_) => {
p::success("✓ Soroban RPC endpoint is reachable");
match client.post(soroban_url).json(&req).send().await {
Ok(response) => {
if response.status().is_success() {
p::success("✓ Soroban RPC endpoint is reachable");
} else {
p::warn(&format!("✗ Soroban RPC endpoint failed: HTTP {}", response.status()));
}
}
Err(e) => {
p::warn(&format!("✗ Soroban RPC endpoint failed: {}", e));
Expand Down
10 changes: 5 additions & 5 deletions src/commands/security.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub async fn handle(cmd: SecurityCommands) -> Result<()> {
SecurityCommands::Checklist(args) => handle_checklist(args),
SecurityCommands::Validate(args) => handle_validate(args),
SecurityCommands::Report(args) => handle_report(args),
SecurityCommands::Monitor(args) => handle_monitor(args),
SecurityCommands::Monitor(args) => handle_monitor(args).await,
SecurityCommands::Incident(args) => handle_incident(args),
SecurityCommands::Audit(args) => handle_audit(args),
SecurityCommands::Pentest(args) => handle_pentest(args),
Expand Down Expand Up @@ -268,7 +268,7 @@ fn handle_report(args: ReportArgs) -> Result<()> {
Ok(())
}

fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> {
async fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> {
config::validate_contract_id(&args.contract)?;
config::validate_network(&args.network)?;

Expand All @@ -295,7 +295,7 @@ fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> {
fs::create_dir_all(&report_dir)?;

while running.load(Ordering::SeqCst) {
match stream.next_batch() {
match stream.next_batch().await {
Ok(batch) => {
for event in batch {
let security_events = evaluate_event(
Expand Down Expand Up @@ -336,11 +336,11 @@ fn handle_monitor(args: SecurityMonitorArgs) -> Result<()> {
if !args.follow {
break;
}
stream.sleep();
stream.sleep().await;
}
Err(err) => {
notifications::warn(&format!("Stream error: {}. Retrying…", err));
stream.sleep_backoff();
stream.sleep_backoff().await;
}
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/utils/horizon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;

static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
fn build_http_client(timeout: Duration) -> Result<Client> {
Client::builder()
.timeout(Duration::from_secs(30))
.timeout(timeout)
.pool_max_idle_per_host(10)
.build()
.expect("Failed to create HTTP client")
.context("Failed to create Horizon HTTP client")
}

static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
build_http_client(Duration::from_secs(30))
.expect("Failed to create shared Horizon HTTP client")
});

pub fn network_config(network: &str) -> Result<config::NetworkConfig> {
Expand Down
32 changes: 17 additions & 15 deletions src/utils/soroban.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
use crate::utils::config::{self, WalletEntry};
use anyhow::{Context, Result};
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use stellar_strkey::{ed25519, Contract};
use std::time::Duration;
use stellar_xdr::curr::{
AccountId, ContractDataDurability, ContractExecutable, Hash, LedgerEntryData, LedgerKey,
LedgerKeyContractData, PublicKey, ScAddress, ScMap, ScString, ScSymbol, ScVal, Uint256,
};

fn build_http_client(timeout: Duration) -> Result<Client> {
Client::builder()
.timeout(timeout)
.pool_max_idle_per_host(10)
.build()
.context("Failed to create Soroban HTTP client")
}

static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
build_http_client(Duration::from_secs(30)).expect("Failed to create shared Soroban HTTP client")
});

#[derive(Debug, Serialize, Deserialize)]
pub struct SimulationResult {
pub return_value: String,
Expand Down Expand Up @@ -313,15 +328,7 @@ pub async fn check_soroban_rpc_url(url: &str) -> bool {
params: serde_json::json!({}),
};

let client = match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
{
Ok(c) => c,
Err(_) => return false,
};

match client.post(url).json(&request).send().await {
match HTTP_CLIENT.post(url).json(&request).send().await {
Ok(response) => {
if response.status() != 200 {
return false;
Expand All @@ -342,12 +349,7 @@ async fn rpc_request_with_url<T>(rpc_url: &str, request: SorobanRpcRequest) -> R
where
T: DeserializeOwned,
{
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.with_context(|| format!("Failed to create HTTP client for {}", rpc_url))?;

let response: SorobanRpcResponse<T> = client
let response: SorobanRpcResponse<T> = HTTP_CLIENT
.post(rpc_url)
.json(&request)
.send()
Expand Down
34 changes: 23 additions & 11 deletions src/utils/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{Context, Result};
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use once_cell::sync::Lazy;
use reqwest::Client;
use serde::Deserialize;
use std::thread;
use std::time::Duration;
use stellar_xdr::curr::{Limited, Limits, ScSymbol, ScVal, WriteXdr};

Expand All @@ -13,6 +14,14 @@ pub struct EventStreamFilters {
pub value_match: Option<String>,
}

static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.timeout(Duration::from_secs(10))
.pool_max_idle_per_host(10)
.build()
.expect("Failed to create Soroban event stream client")
});

#[derive(Debug, Clone)]
pub struct SorobanEventStream {
rpc_url: String,
Expand Down Expand Up @@ -90,7 +99,7 @@ impl SorobanEventStream {
self
}

pub fn next_batch(&mut self) -> Result<Vec<SorobanEvent>> {
pub async fn next_batch(&mut self) -> Result<Vec<SorobanEvent>> {
let filter = self.build_rpc_filter();
let request = serde_json::json!({
"jsonrpc": "2.0",
Expand All @@ -99,17 +108,20 @@ impl SorobanEventStream {
"params": {
"filters": [filter],
"pagination": {
"cursor": self.cursor,
"cursor": self.cursor.clone(),
"limit": 10
}
}
});

let response: SorobanGetEventsResponse = ureq::post(&self.rpc_url)
.set("Content-Type", "application/json")
.send_json(&request)
let response: SorobanGetEventsResponse = HTTP_CLIENT
.post(&self.rpc_url)
.json(&request)
.send()
.await
.with_context(|| format!("Soroban RPC request to {} failed", self.rpc_url))?
.into_json()
.json::<SorobanGetEventsResponse>()
.await
.with_context(|| "Failed to decode Soroban getEvents response")?;

if let Some(error) = response.error {
Expand Down Expand Up @@ -138,12 +150,12 @@ impl SorobanEventStream {
Ok(events)
}

pub fn sleep(&self) {
thread::sleep(self.poll_interval);
pub async fn sleep(&self) {
tokio::time::sleep(self.poll_interval).await;
}

pub fn sleep_backoff(&mut self) {
thread::sleep(self.backoff.next_delay());
pub async fn sleep_backoff(&mut self) {
tokio::time::sleep(self.backoff.next_delay()).await;
}

fn build_rpc_filter(&self) -> serde_json::Value {
Expand Down