From 4e6e50b7050feb5b9e9ae881517840006a93ebbe Mon Sep 17 00:00:00 2001 From: Harsh Date: Mon, 8 Jun 2026 12:56:54 +0530 Subject: [PATCH] fix: MockRelayPool live broadcast now respects subscription filters --- src/relay/mock.rs | 312 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 243 insertions(+), 69 deletions(-) diff --git a/src/relay/mock.rs b/src/relay/mock.rs index 301d639..0b0b26e 100644 --- a/src/relay/mock.rs +++ b/src/relay/mock.rs @@ -1,14 +1,19 @@ //! In-memory mock relay pool for network-free testing. //! //! Mirrors the design of the TypeScript SDK's `MockRelayHub`: -//! - `publish_event` stores the event and broadcasts it to all `notifications()` receivers. -//! - `subscribe` registers filters and immediately replays matching stored events through the -//! broadcast, so listeners that called `notifications()` before `subscribe()` see the replay. +//! - `publish_event` stores the event and delivers it only to pools whose +//! `subscribe()` filters match it — like a real relay, a subscriber sees only +//! the events its subscription selected. A pool that never subscribed (or +//! subscribed with no matching filter) receives nothing from live publishes. +//! - `subscribe` registers a pool's filters and immediately replays matching +//! stored events through that pool's channel, so listeners that called +//! `notifications()` before `subscribe()` see the replay. //! - `connect` / `disconnect` are no-ops — no sockets are opened. //! - Signing uses a freshly generated ephemeral `Keys`; `signer()` returns it wrapped in `Arc` //! so encryption code can call it without any real relay connection. use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -20,21 +25,32 @@ use nostr_sdk::prelude::*; use crate::core::error::{Error, Result}; use crate::relay::RelayPoolTrait; +/// Process-global source of unique pool ids, so each pool can key its own +/// subscription slot in the shared store without locking at construction time. +static NEXT_POOL_ID: AtomicU64 = AtomicU64::new(0); + // ── Internal state ──────────────────────────────────────────────────────────── +/// One linked pool's delivery slot: its broadcast sender plus the filters it +/// registered via `subscribe()`. A live publish is delivered through `tx` only +/// when one of `filters` matches the event. +struct Subscriber { + tx: tokio::sync::broadcast::Sender, + filters: Vec, +} + struct MockRelayInner { events: Vec, - /// Active subscriptions: id → filters registered by that subscription. - subscriptions: HashMap>, - next_sub_id: u32, + /// Per-pool subscription slots, keyed by `MockRelayPool::pool_id`. Populated + /// lazily on a pool's first `subscribe()` call. + subscribers: HashMap, } impl MockRelayInner { fn new() -> Self { Self { events: Vec::new(), - subscriptions: HashMap::new(), - next_sub_id: 0, + subscribers: HashMap::new(), } } } @@ -47,9 +63,12 @@ impl MockRelayInner { /// an `Arc` is expected. pub struct MockRelayPool { inner: Arc>, - /// Broadcast sender — every published event is sent here so that all - /// `notifications()` receivers see it. + /// This pool's own broadcast sender. `notifications()` receivers read from + /// it, and a live publish reaches it only when this pool's registered + /// `subscribe()` filters match the event. notification_tx: tokio::sync::broadcast::Sender, + /// Unique id keying this pool's subscription slot in the shared store. + pool_id: u64, /// Ephemeral key used for signing in `publish` / `sign` / `signer`. keys: Keys, } @@ -57,13 +76,7 @@ pub struct MockRelayPool { impl MockRelayPool { /// Create a new mock relay pool with a freshly generated ephemeral signing key. pub fn new() -> Self { - let keys = Keys::generate(); - let (tx, _rx) = tokio::sync::broadcast::channel(1024); - Self { - inner: Arc::new(Mutex::new(MockRelayInner::new())), - notification_tx: tx, - keys, - } + Self::with_keys(Keys::generate()) } /// The ephemeral public key used by this mock for signing. @@ -78,49 +91,43 @@ impl MockRelayPool { /// Like [`new`](Self::new) but with caller-provided signing keys. pub fn with_keys(keys: Keys) -> Self { + Self::linked(Arc::new(Mutex::new(MockRelayInner::new())), keys) + } + + /// Build a pool bound to an existing shared store, with its own channel, + /// pool id, and signing keys. + fn linked(inner: Arc>, keys: Keys) -> Self { let (tx, _rx) = tokio::sync::broadcast::channel(1024); Self { - inner: Arc::new(Mutex::new(MockRelayInner::new())), + inner, notification_tx: tx, + pool_id: NEXT_POOL_ID.fetch_add(1, Ordering::Relaxed), keys, } } /// Create a pair of linked mock relay pools with different signing keys. /// - /// Both pools share the same event store and notification channel; events - /// published by one are visible to the other's `notifications()` receivers. + /// Both pools share the same event store. Each has its own notification + /// channel; an event published by one reaches the other only when the + /// other's `subscribe()` filters match it (as a real relay would). pub fn create_pair() -> (Self, Self) { - let (tx, _rx) = tokio::sync::broadcast::channel(1024); let inner = Arc::new(Mutex::new(MockRelayInner::new())); - let a = Self { - inner: Arc::clone(&inner), - notification_tx: tx.clone(), - keys: Keys::generate(), - }; - let b = Self { - inner, - notification_tx: tx, - keys: Keys::generate(), - }; + let a = Self::linked(Arc::clone(&inner), Keys::generate()); + let b = Self::linked(inner, Keys::generate()); (a, b) } /// Create `n` linked mock relay pools with different signing keys. /// - /// All pools share the same event store and notification channel so events - /// published by any one pool are visible to all others' `notifications()` - /// receivers. Useful for multi-client integration tests. + /// All pools share the same event store; each gets its own notification + /// channel and delivery is scoped to each pool's `subscribe()` filters. + /// Useful for multi-client integration tests. pub fn create_linked_group(n: usize) -> Vec { assert!(n > 0, "group must have at least one pool"); - let (tx, _rx) = tokio::sync::broadcast::channel(1024); let inner = Arc::new(Mutex::new(MockRelayInner::new())); (0..n) - .map(|_| Self { - inner: Arc::clone(&inner), - notification_tx: tx.clone(), - keys: Keys::generate(), - }) + .map(|_| Self::linked(Arc::clone(&inner), Keys::generate())) .collect() } @@ -157,21 +164,27 @@ impl RelayPoolTrait for MockRelayPool { Ok(()) } - /// Store the event and broadcast it to all current `notifications()` receivers. + /// Store the event and deliver it to every linked pool whose registered + /// `subscribe()` filters match it, mirroring a real relay: a subscriber sees + /// only the events its subscription selected. Pools with no matching filter + /// (or no subscription at all) receive nothing. async fn publish_event(&self, event: &Event) -> Result { let event_id = event.id; - { - let mut inner = self.inner.lock().await; - inner.events.push(event.clone()); + let mut inner = self.inner.lock().await; + inner.events.push(event.clone()); + + for subscriber in inner.subscribers.values() { + let matches = subscriber + .filters + .iter() + .any(|f| f.match_event(event, MatchEventOptions::default())); + if matches { + // Ignore send errors: they just mean this pool has no active receiver. + let _ = subscriber.tx.send(make_notification(event.clone())); + } } - // Always broadcast — consumers filter by kind/pubkey/tag themselves, - // which mirrors how nostr-sdk's real notification stream works. - let notification = make_notification(event.clone()); - // Ignore send errors: they just mean there are no active receivers yet. - let _ = self.notification_tx.send(notification); - Ok(event_id) } @@ -193,9 +206,10 @@ impl RelayPoolTrait for MockRelayPool { Ok(Arc::new(self.keys.clone()) as Arc) } - /// Return a new broadcast receiver. Each call gets an independent receiver - /// that sees all events published *after* this call, plus any replayed by - /// a subsequent `subscribe()`. + /// Return a new broadcast receiver for this pool. It only sees events that + /// match the filters this pool registers via `subscribe()` — events + /// published *after* this call that match, plus any replayed by a subsequent + /// `subscribe()`. Without a matching subscription it sees nothing. fn notifications(&self) -> tokio::sync::broadcast::Receiver { self.notification_tx.subscribe() } @@ -205,26 +219,36 @@ impl RelayPoolTrait for MockRelayPool { Ok(self.keys.public_key()) } - /// Register the filters and immediately replay any already-stored events that - /// match them through the broadcast channel, mirroring the behaviour of a - /// real relay that sends historical events before EOSE. + /// Register this pool's filters (accumulating across calls, like multiple + /// active REQs) and immediately replay any already-stored events that match + /// the new filters through this pool's channel, mirroring a real relay that + /// sends historical events before EOSE. async fn subscribe(&self, filters: Vec) -> Result<()> { let replay = { let mut inner = self.inner.lock().await; - let sub_id = inner.next_sub_id; - inner.next_sub_id += 1; - - // Store filters first so the replay read comes from the stored value, - // ensuring the field is both written and read (no dead-code warning). - inner.subscriptions.insert(sub_id, filters); - // Clone events so we can release the events borrow before borrowing subscriptions. + // Snapshot events before touching the subscriber slot to keep the + // borrows disjoint. let events_snapshot = inner.events.clone(); - let stored = inner.subscriptions.get(&sub_id).expect("just inserted"); + + // Upsert this pool's slot (created on first subscribe), binding it to + // this pool's own channel and accumulating the new filters. + let tx = self.notification_tx.clone(); + let subscriber = inner + .subscribers + .entry(self.pool_id) + .or_insert_with(|| Subscriber { + tx, + filters: Vec::new(), + }); + subscriber.filters.extend(filters.iter().cloned()); + + // Replay only events matching the newly-added filters, as a real + // relay replays historical events per REQ. events_snapshot .into_iter() .filter(|e| { - stored + filters .iter() .any(|f| f.match_event(e, MatchEventOptions::default())) }) @@ -281,6 +305,7 @@ fn make_notification(event: Event) -> RelayPoolNotification { #[cfg(test)] mod tests { use super::*; + use crate::core::constants::CTXVM_MESSAGES_KIND; #[tokio::test] async fn connect_and_disconnect_are_noops() { @@ -290,9 +315,13 @@ mod tests { } #[tokio::test] - async fn publish_event_stores_and_broadcasts() { + async fn publish_event_stores_and_delivers_to_matching_subscriber() { let pool = MockRelayPool::new(); let mut rx = pool.notifications(); + // A live publish is only delivered to a pool that subscribed for it. + pool.subscribe(vec![Filter::new().kind(Kind::TextNote)]) + .await + .unwrap(); let keys = Keys::generate(); let event = EventBuilder::new(Kind::TextNote, "hello") @@ -310,6 +339,149 @@ mod tests { } } + #[tokio::test] + async fn publish_without_subscription_delivers_nothing() { + let pool = MockRelayPool::new(); + let mut rx = pool.notifications(); + + let keys = Keys::generate(); + let event = EventBuilder::new(Kind::TextNote, "unheard") + .sign_with_keys(&keys) + .unwrap(); + pool.publish_event(&event).await.unwrap(); + + // Stored, but never delivered live without a matching subscription. + assert_eq!(pool.stored_events().await.len(), 1); + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn live_publish_respects_subscriber_filters() { + let pool = MockRelayPool::new(); + let mut rx = pool.notifications(); + pool.subscribe(vec![Filter::new().kind(Kind::TextNote)]) + .await + .unwrap(); + + let keys = Keys::generate(); + let matching = EventBuilder::new(Kind::TextNote, "keep") + .sign_with_keys(&keys) + .unwrap(); + let other = EventBuilder::new(Kind::Custom(9999), "drop") + .sign_with_keys(&keys) + .unwrap(); + pool.publish_event(&matching).await.unwrap(); + pool.publish_event(&other).await.unwrap(); + + // Only the kind that matches the subscription is delivered. + let notif = rx.try_recv().unwrap(); + if let RelayPoolNotification::Event { event: e, .. } = notif { + assert_eq!(e.id, matching.id); + } else { + panic!("expected Event notification"); + } + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn subscribe_accumulates_filters_across_calls() { + // Multiple subscribe() calls behave like multiple active REQs: every + // registered filter stays live, so events matching any of them deliver. + let pool = MockRelayPool::new(); + let mut rx = pool.notifications(); + pool.subscribe(vec![Filter::new().kind(Kind::TextNote)]) + .await + .unwrap(); + pool.subscribe(vec![Filter::new().kind(Kind::Custom(7777))]) + .await + .unwrap(); + + let keys = Keys::generate(); + let kind_a = EventBuilder::new(Kind::TextNote, "a") + .sign_with_keys(&keys) + .unwrap(); + let kind_b = EventBuilder::new(Kind::Custom(7777), "b") + .sign_with_keys(&keys) + .unwrap(); + let neither = EventBuilder::new(Kind::Custom(9999), "c") + .sign_with_keys(&keys) + .unwrap(); + + pool.publish_event(&kind_a).await.unwrap(); + pool.publish_event(&kind_b).await.unwrap(); + pool.publish_event(&neither).await.unwrap(); + + // Both accumulated filters remain active; the unmatched kind is dropped. + let first = rx.try_recv().unwrap(); + let second = rx.try_recv().unwrap(); + let received: Vec = [first, second] + .into_iter() + .map(|n| match n { + RelayPoolNotification::Event { event, .. } => event.id, + _ => panic!("expected Event notification"), + }) + .collect(); + assert!(received.contains(&kind_a.id), "kind A must be delivered"); + assert!(received.contains(&kind_b.id), "kind B must be delivered"); + assert!( + !received.contains(&neither.id), + "unmatched kind must not be delivered" + ); + assert!( + rx.try_recv().is_err(), + "only the two matching events deliver" + ); + } + + #[tokio::test] + async fn linked_pools_only_receive_their_subscribed_events() { + // The fix that makes EncryptionMode::Disabled e2e tests work: a pool + // subscribed to its own p-tag never receives an event addressed to a peer. + let (a, b) = MockRelayPool::create_pair(); + let a_pubkey = a.mock_public_key(); + let b_pubkey = b.mock_public_key(); + let mut a_rx = a.notifications(); + let mut b_rx = b.notifications(); + + // Each pool subscribes for events p-tagged to itself. + a.subscribe(vec![Filter::new() + .kind(Kind::Custom(CTXVM_MESSAGES_KIND)) + .custom_tag( + SingleLetterTag::lowercase(Alphabet::P), + a_pubkey.to_hex(), + )]) + .await + .unwrap(); + b.subscribe(vec![Filter::new() + .kind(Kind::Custom(CTXVM_MESSAGES_KIND)) + .custom_tag( + SingleLetterTag::lowercase(Alphabet::P), + b_pubkey.to_hex(), + )]) + .await + .unwrap(); + + // `a` publishes a response addressed to `b` (p-tag = b). + let keys = Keys::generate(); + let response = EventBuilder::new(Kind::Custom(CTXVM_MESSAGES_KIND), "to-b") + .tag(Tag::public_key(b_pubkey)) + .sign_with_keys(&keys) + .unwrap(); + a.publish_event(&response).await.unwrap(); + + // `b` receives it; `a` does NOT echo its own peer-addressed event. + let notif = b_rx.try_recv().unwrap(); + if let RelayPoolNotification::Event { event: e, .. } = notif { + assert_eq!(e.id, response.id); + } else { + panic!("expected Event notification for b"); + } + assert!( + a_rx.try_recv().is_err(), + "a must not receive its own b-addressed event" + ); + } + #[tokio::test] async fn publish_signs_and_stores() { let pool = MockRelayPool::new(); @@ -353,9 +525,8 @@ mod tests { pool.publish_event(&e1).await.unwrap(); pool.publish_event(&e2).await.unwrap(); - // Drain the two publish notifications - rx.try_recv().unwrap(); - rx.try_recv().unwrap(); + // No subscription yet, so nothing was delivered live. + assert!(rx.try_recv().is_err()); // Subscribe for TextNote only — e1 should be replayed, e2 not let filter = Filter::new().kind(Kind::TextNote); @@ -375,6 +546,9 @@ mod tests { async fn notifications_receives_future_publishes() { let pool = MockRelayPool::new(); let mut rx = pool.notifications(); + pool.subscribe(vec![Filter::new().kind(Kind::TextNote)]) + .await + .unwrap(); let keys = Keys::generate(); let event = EventBuilder::new(Kind::TextNote, "future")