diff --git a/dash-spv/src/bridge/mod.rs b/dash-spv/src/bridge/mod.rs index bbd15c084..19de03801 100644 --- a/dash-spv/src/bridge/mod.rs +++ b/dash-spv/src/bridge/mod.rs @@ -13,7 +13,7 @@ use dashcore::sml::llmq_entry_verification::LLMQEntryVerificationStatus; use dashcore::Network; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet_manager::wallet_manager::WalletManager; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use crate::client::{ClientConfig, DashSpvClient}; use crate::error::SpvError; @@ -486,6 +486,8 @@ type ConcreteClient = #[derive(uniffi::Object)] pub struct SpvClient { inner: ConcreteClient, + /// Handle to the background event-forwarding task, if a subscription is active. + subscription_handle: Mutex>>, } #[uniffi::export] @@ -509,6 +511,7 @@ impl SpvClient { Ok(Arc::new(Self { inner, + subscription_handle: Mutex::new(None), })) } @@ -963,6 +966,225 @@ impl SpvClient { } } +// ============ Event conversion helpers ============ + +/// Convert an internal [`crate::sync::SyncEvent`] to the bridge [`SyncEvent`]. +/// +/// Returns `None` for internal events that have no bridge equivalent +/// (e.g. `MempoolActivated`). +fn convert_sync_event(event: crate::sync::SyncEvent) -> Option { + use crate::sync::SyncEvent as I; + match event { + I::SyncStart { + identifier, + } => Some(SyncEvent::SyncStart { + identifier: identifier.to_string(), + }), + I::BlockHeadersStored { + tip_height, + } => Some(SyncEvent::BlockHeadersStored { + tip_height, + }), + I::BlockHeaderSyncComplete { + tip_height, + } => Some(SyncEvent::BlockHeaderSyncComplete { + tip_height, + }), + I::FilterHeadersStored { + start_height, + end_height, + tip_height, + } => Some(SyncEvent::FilterHeadersStored { + start_height, + end_height, + tip_height, + }), + I::FilterHeadersSyncComplete { + tip_height, + } => Some(SyncEvent::FilterHeadersSyncComplete { + tip_height, + }), + I::FiltersStored { + start_height, + end_height, + } => Some(SyncEvent::FiltersStored { + start_height, + end_height, + }), + I::FiltersSyncComplete { + tip_height, + } => Some(SyncEvent::FiltersSyncComplete { + tip_height, + }), + I::BlocksNeeded { + blocks, + } => Some(SyncEvent::BlocksNeeded { + block_count: blocks.len() as u32, + }), + I::BlockProcessed { + block_hash, + height, + new_addresses, + .. + } => Some(SyncEvent::BlockProcessed { + block_hash: block_hash.to_string(), + height, + new_address_count: new_addresses.len() as u32, + }), + I::MasternodeStateUpdated { + height, + } => Some(SyncEvent::MasternodeStateUpdated { + height, + }), + I::ManagerError { + manager, + error, + } => Some(SyncEvent::ManagerError { + manager: manager.to_string(), + error, + }), + I::ChainLockReceived { + chain_lock, + validated, + } => Some(SyncEvent::ChainLockReceived { + block_height: chain_lock.block_height, + validated, + }), + I::InstantLockReceived { + instant_lock, + validated, + } => Some(SyncEvent::InstantLockReceived { + txid: instant_lock.txid.to_string(), + validated, + }), + // MempoolActivated has no bridge equivalent — silently drop it. + I::MempoolActivated { + .. + } => None, + I::SyncComplete { + header_tip, + cycle, + } => Some(SyncEvent::SyncComplete { + header_tip, + cycle, + }), + } +} + +/// Convert an internal [`crate::network::NetworkEvent`] to the bridge [`NetworkEvent`]. +fn convert_network_event(event: crate::network::NetworkEvent) -> NetworkEvent { + use crate::network::NetworkEvent as I; + match event { + I::PeerConnected { + address, + } => NetworkEvent::PeerConnected { + address: address.to_string(), + }, + I::PeerDisconnected { + address, + } => NetworkEvent::PeerDisconnected { + address: address.to_string(), + }, + I::PeersUpdated { + connected_count, + addresses, + best_height, + } => NetworkEvent::PeersUpdated { + connected_count: connected_count as u64, + addresses: addresses.into_iter().map(|a| a.to_string()).collect(), + best_height, + }, + } +} + +// ============ Subscription methods ============ + +#[uniffi::export] +impl SpvClient { + /// Subscribe to SPV client events via the given listener. + /// + /// Spawns a single background tokio task that reads from the client's + /// internal broadcast channels and forwards sync events, network events, + /// and sync-progress updates to `listener`. + /// + /// Only one subscription is active at a time. Calling `subscribe` again + /// cancels the previous task before starting a new one. + pub async fn subscribe(&self, listener: Arc) { + // Cancel any existing subscription first. + self.unsubscribe().await; + + // Obtain broadcast/watch receivers from the inner client. + let mut sync_rx = self.inner.subscribe_sync_events().await; + let mut net_rx = self.inner.subscribe_network_events().await; + let mut progress_rx = self.inner.subscribe_progress().await; + + let handle = tokio::spawn(async move { + loop { + tokio::select! { + result = sync_rx.recv() => { + match result { + Ok(event) => { + if let Some(bridge_event) = convert_sync_event(event) { + listener.on_sync_event(bridge_event); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "SPV event subscriber lagged; some sync events were dropped" + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + result = net_rx.recv() => { + match result { + Ok(event) => { + listener.on_network_event(convert_network_event(event)); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + tracing::warn!( + skipped = n, + "SPV network-event subscriber lagged; some events were dropped" + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + result = progress_rx.changed() => { + if result.is_err() { + // Sender dropped — client is shutting down. + break; + } + let (percentage, current_height, target_height) = { + let progress = progress_rx.borrow_and_update(); + let pct = progress.percentage(); + let (cur, tgt) = progress + .headers() + .map(|h| (h.current_height(), h.target_height())) + .unwrap_or((0, 0)); + (pct, cur, tgt) + }; + listener.on_sync_progress(percentage, current_height, target_height); + } + } + } + }); + + *self.subscription_handle.lock().await = Some(handle); + } + + /// Cancel the active event subscription, if any. + /// + /// Aborts the background task that was spawned by [`subscribe`](Self::subscribe). + /// No-op when no subscription is active. + pub async fn unsubscribe(&self) { + if let Some(handle) = self.subscription_handle.lock().await.take() { + handle.abort(); + } + } +} + // ============ Stub functions ============ /// Returns a greeting string (sanity-check export). @@ -1860,6 +2082,292 @@ mod tests { assert_eq!(result, cloned); } + // ---- subscribe / unsubscribe tests ---- + + /// `subscribe` stores a handle; `unsubscribe` clears it. + #[tokio::test] + async fn test_subscribe_and_unsubscribe() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let config = ClientConfig::regtest() + .without_filters() + .without_masternodes() + .with_storage_path(temp_dir.path()); + + let client = SpvClient::new(config).await.expect("SpvClient construction must succeed"); + + // No subscription handle before subscribing. + assert!( + client.subscription_handle.lock().await.is_none(), + "no subscription handle before subscribe()" + ); + + let listener = Arc::new(MockListener::new()); + client.subscribe(listener.clone()).await; + + // A handle should be present after subscribing. + assert!( + client.subscription_handle.lock().await.is_some(), + "subscription handle should exist after subscribe()" + ); + + client.unsubscribe().await; + + // Handle should be cleared after unsubscribe. + assert!( + client.subscription_handle.lock().await.is_none(), + "subscription handle should be gone after unsubscribe()" + ); + } + + /// Calling `subscribe` twice replaces the first subscription. + #[tokio::test] + async fn test_subscribe_replaces_previous_subscription() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let config = ClientConfig::regtest() + .without_filters() + .without_masternodes() + .with_storage_path(temp_dir.path()); + + let client = SpvClient::new(config).await.expect("SpvClient construction must succeed"); + + let listener1 = Arc::new(MockListener::new()); + client.subscribe(listener1).await; + + let listener2 = Arc::new(MockListener::new()); + client.subscribe(listener2).await; + + // Only one handle should exist. + assert!( + client.subscription_handle.lock().await.is_some(), + "exactly one subscription handle should exist after two subscribe() calls" + ); + + client.unsubscribe().await; + } + + /// `unsubscribe` is a no-op when no subscription is active. + #[tokio::test] + async fn test_unsubscribe_no_op_when_not_subscribed() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let config = ClientConfig::regtest() + .without_filters() + .without_masternodes() + .with_storage_path(temp_dir.path()); + + let client = SpvClient::new(config).await.expect("SpvClient construction must succeed"); + + // Should not panic. + client.unsubscribe().await; + client.unsubscribe().await; + } + + // ---- convert_sync_event tests ---- + + /// All internal SyncEvent variants produce the expected bridge variant. + #[test] + fn test_convert_sync_event_all_variants() { + use crate::sync::ManagerIdentifier; + use crate::sync::SyncEvent as I; + use std::collections::BTreeSet; + + // SyncStart + let e = convert_sync_event(I::SyncStart { + identifier: ManagerIdentifier::BlockHeader, + }); + assert!( + matches!(e, Some(SyncEvent::SyncStart { identifier }) if identifier == "BlockHeader") + ); + + // BlockHeadersStored + let e = convert_sync_event(I::BlockHeadersStored { + tip_height: 42, + }); + assert!(matches!( + e, + Some(SyncEvent::BlockHeadersStored { + tip_height: 42 + }) + )); + + // BlockHeaderSyncComplete + let e = convert_sync_event(I::BlockHeaderSyncComplete { + tip_height: 100, + }); + assert!(matches!( + e, + Some(SyncEvent::BlockHeaderSyncComplete { + tip_height: 100 + }) + )); + + // FilterHeadersStored + let e = convert_sync_event(I::FilterHeadersStored { + start_height: 0, + end_height: 99, + tip_height: 100, + }); + assert!(matches!( + e, + Some(SyncEvent::FilterHeadersStored { + start_height: 0, + end_height: 99, + tip_height: 100 + }) + )); + + // FilterHeadersSyncComplete + let e = convert_sync_event(I::FilterHeadersSyncComplete { + tip_height: 200, + }); + assert!(matches!( + e, + Some(SyncEvent::FilterHeadersSyncComplete { + tip_height: 200 + }) + )); + + // FiltersStored + let e = convert_sync_event(I::FiltersStored { + start_height: 10, + end_height: 20, + }); + assert!(matches!( + e, + Some(SyncEvent::FiltersStored { + start_height: 10, + end_height: 20 + }) + )); + + // FiltersSyncComplete + let e = convert_sync_event(I::FiltersSyncComplete { + tip_height: 300, + }); + assert!(matches!( + e, + Some(SyncEvent::FiltersSyncComplete { + tip_height: 300 + }) + )); + + // BlocksNeeded — 3 items in the set → block_count == 3 + use dashcore_hashes::Hash as _; + use key_wallet_manager::wallet_manager::FilterMatchKey; + let mut blocks = BTreeSet::new(); + blocks.insert(FilterMatchKey::new(100, dashcore::BlockHash::all_zeros())); + blocks.insert(FilterMatchKey::new(101, dashcore::BlockHash::all_zeros())); + blocks.insert(FilterMatchKey::new(102, dashcore::BlockHash::all_zeros())); + let e = convert_sync_event(I::BlocksNeeded { + blocks, + }); + assert!(matches!( + e, + Some(SyncEvent::BlocksNeeded { + block_count: 3 + }) + )); + + // MasternodeStateUpdated + let e = convert_sync_event(I::MasternodeStateUpdated { + height: 500, + }); + assert!(matches!( + e, + Some(SyncEvent::MasternodeStateUpdated { + height: 500 + }) + )); + + // ManagerError + let e = convert_sync_event(I::ManagerError { + manager: ManagerIdentifier::Filter, + error: "timeout".to_string(), + }); + assert!(matches!( + e, + Some(SyncEvent::ManagerError { ref manager, ref error }) + if manager == "Filter" && error == "timeout" + )); + + // SyncComplete + let e = convert_sync_event(I::SyncComplete { + header_tip: 1000, + cycle: 1, + }); + assert!(matches!( + e, + Some(SyncEvent::SyncComplete { + header_tip: 1000, + cycle: 1 + }) + )); + + // MempoolActivated — should return None (no bridge equivalent) + let e = convert_sync_event(I::MempoolActivated { + peer: "127.0.0.1:9999".parse().unwrap(), + }); + assert!(e.is_none(), "MempoolActivated should map to None"); + } + + // ---- convert_network_event tests ---- + + /// All internal NetworkEvent variants are converted correctly. + #[test] + fn test_convert_network_event_all_variants() { + use crate::network::NetworkEvent as I; + + // PeerConnected + let addr: std::net::SocketAddr = "192.0.2.1:9999".parse().unwrap(); + let e = convert_network_event(I::PeerConnected { + address: addr, + }); + assert!( + matches!(e, NetworkEvent::PeerConnected { ref address } if address == "192.0.2.1:9999") + ); + + // PeerDisconnected + let addr: std::net::SocketAddr = "10.0.0.1:9999".parse().unwrap(); + let e = convert_network_event(I::PeerDisconnected { + address: addr, + }); + assert!( + matches!(e, NetworkEvent::PeerDisconnected { ref address } if address == "10.0.0.1:9999") + ); + + // PeersUpdated + let addrs: Vec = + vec!["1.2.3.4:9999".parse().unwrap(), "5.6.7.8:9999".parse().unwrap()]; + let e = convert_network_event(I::PeersUpdated { + connected_count: 2, + addresses: addrs, + best_height: Some(500), + }); + match e { + NetworkEvent::PeersUpdated { + connected_count, + addresses, + best_height, + } => { + assert_eq!(connected_count, 2); + assert_eq!(addresses.len(), 2); + assert_eq!(best_height, Some(500)); + } + other => panic!("unexpected variant: {other:?}"), + } + + // PeersUpdated with no best_height + let e = convert_network_event(I::PeersUpdated { + connected_count: 0, + addresses: vec![], + best_height: None, + }); + assert!(matches!( + e, + NetworkEvent::PeersUpdated { connected_count: 0, ref addresses, best_height: None } + if addresses.is_empty() + )); + } + // ---- send_transaction error-path tests ---- /// `send_transaction` with invalid hex returns `SpvClientError::Transaction`.