diff --git a/src/transport/client/correlation_store.rs b/src/transport/client/correlation_store.rs index a0678b6..2b1ddd2 100644 --- a/src/transport/client/correlation_store.rs +++ b/src/transport/client/correlation_store.rs @@ -96,6 +96,20 @@ impl ClientCorrelationStore { .map(|r| r.original_id.clone()) } + /// Refresh a pending request's registration timestamp without disturbing its + /// `original_id`/`is_initialize`. Used by CEP-22 oversized transfers (OD-2): + /// each inbound frame "touches" the entry so [`sweep_expired`](Self::sweep_expired) + /// does not evict it mid-transfer. Returns `true` if the entry existed. + pub async fn touch(&self, event_id: &str) -> bool { + let mut cache = self.pending_requests.write().await; + if let Some(entry) = cache.get_mut(event_id) { + entry.registered_at = Instant::now(); + true + } else { + false + } + } + /// Number of pending requests currently tracked. pub async fn count(&self) -> usize { self.pending_requests.read().await.len() diff --git a/src/transport/client/mod.rs b/src/transport/client/mod.rs index d114455..4f87f5c 100644 --- a/src/transport/client/mod.rs +++ b/src/transport/client/mod.rs @@ -10,6 +10,7 @@ pub mod server_relay_discovery; pub use correlation_store::ClientCorrelationStore; +use std::collections::HashMap; use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -17,6 +18,7 @@ use std::time::Duration; use lru::LruCache; use nostr_sdk::prelude::*; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use crate::core::constants::*; @@ -28,7 +30,11 @@ use crate::encryption; use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::{parse_discovered_peer_capabilities, PeerCapabilities}; -use crate::transport::oversized_transfer::OversizedTransferConfig; +use crate::transport::oversized_transfer::{ + build_oversized_frames, resolve_safe_chunk_size, send_oversized_transfer, OversizedFrame, + OversizedSenderOptions, OversizedTransferConfig, OversizedTransferReceiver, + NOTIFICATIONS_PROGRESS_METHOD, +}; const LOG_TARGET: &str = "contextvm_sdk::transport::client"; @@ -158,6 +164,13 @@ pub struct NostrClientTransport { /// Duplicate outer ids are skipped before decrypt; ids are inserted only after success /// so failed decrypt/verify can be retried on redelivery. seen_gift_wrap_ids: Arc>>, + /// CEP-22: reassembly engine for inbound oversized responses from the server + /// (single peer). Cleared on [`close`](Self::close). + oversized_receiver: Arc>, + /// CEP-22: outstanding `accept` handshake waiters keyed by `progressToken`. A + /// `send()` awaiting the server's `accept` registers a one-shot here before + /// publishing `start`; the event loop fires it when the `accept` frame arrives. + accept_waiters: Arc>>>, /// Channel for receiving processed MCP messages from the event loop. message_tx: Option>, message_rx: Option>, @@ -216,12 +229,19 @@ impl NostrClientTransport { .clone() .unwrap_or_default(); + let oversized_receiver = Arc::new(Mutex::new(OversizedTransferReceiver::with_policy( + (&config.oversized_transfer).into(), + ))); + let accept_waiters = Arc::new(Mutex::new(HashMap::new())); + Ok(Self { base: BaseTransport { relay_pool, encryption_mode: config.encryption_mode, is_connected: false, }, + oversized_receiver, + accept_waiters, config, server_pubkey, hinted_relay_urls, @@ -279,12 +299,19 @@ impl NostrClientTransport { encryption_mode = ?config.encryption_mode, "Created client transport (with_relay_pool)" ); + let oversized_receiver = Arc::new(Mutex::new(OversizedTransferReceiver::with_policy( + (&config.oversized_transfer).into(), + ))); + let accept_waiters = Arc::new(Mutex::new(HashMap::new())); + Ok(Self { base: BaseTransport { relay_pool, encryption_mode: config.encryption_mode, is_connected: false, }, + oversized_receiver, + accept_waiters, config, server_pubkey, hinted_relay_urls, @@ -374,6 +401,8 @@ impl NostrClientTransport { let init_event = self.server_initialize_event.clone(); let server_supports_ephemeral = self.server_supports_ephemeral.clone(); let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone(); + let oversized_receiver = self.oversized_receiver.clone(); + let accept_waiters = self.accept_waiters.clone(); let timeout = self.config.timeout; let token = self.cancellation_token.child_token(); @@ -389,6 +418,8 @@ impl NostrClientTransport { init_event, server_supports_ephemeral, seen_gift_wrap_ids, + oversized_receiver, + accept_waiters, timeout, token, ) @@ -410,6 +441,23 @@ impl NostrClientTransport { let _ = handle.await; } self.message_tx.take(); + // CEP-22: release reassembly state and drop any accept waiters so an + // in-flight `send()` awaiter unblocks (cancelled) instead of hanging to + // its accept timeout. + { + let mut receiver = match self.oversized_receiver.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + receiver.clear(); + } + { + let mut waiters = match self.accept_waiters.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + waiters.clear(); + } self.base.disconnect().await } @@ -438,7 +486,111 @@ impl NostrClientTransport { vec![] }; let tags = BaseTransport::compose_outbound_tags(&base_tags, &discovery_tags, &[]); + let gift_wrap_kind = self.choose_outbound_gift_wrap_kind(); + let discovery_sent = !discovery_tags.is_empty(); + + // CEP-22: only a request carrying a `progressToken` is eligible for oversized + // fragmentation (the token addresses the frames); extract it once up front. + let oversized_token: Option = + if is_request && self.config.oversized_transfer.enabled { + match message { + JsonRpcMessage::Request(req) => req + .params + .as_ref() + .and_then(|p| p.get("_meta")) + .and_then(|m| m.get("progressToken")) + .and_then(|t| t.as_str()) + .map(String::from), + _ => None, + } + } else { + None + }; + // CEP-22: fragment when the message would not fit in a single Nostr event. + // Relay size limits apply to the *published* event, so the decision is made + // on the published byte size — not the raw payload — which is what actually + // grows under JSON escaping and gift-wrap encryption (mirrors TS + // `measurePublishedMcpMessageSize`). The raw serialized length is a cheap + // lower bound: when it already meets the threshold the message is + // conclusively oversized and we fragment without building a single event — + // an escape-heavy payload could otherwise overflow NIP-44's plaintext limit + // while we measure. + if let Some(token) = oversized_token.as_deref() { + let content = serde_json::to_string(message)?; + let threshold = self.config.oversized_transfer.threshold; + if content.len() >= threshold { + return self + .send_oversized_request( + message, + &content, + token, + base_tags, + tags, + discovery_sent, + ) + .await; + } + // Borderline: a sub-threshold payload can still cross the threshold once + // signed, JSON-escaped, and (when enabled) gift-wrapped. Build the single + // event once, measure its real published size, and reuse it if it fits. + match self + .base + .prepare_mcp_message( + message, + &self.server_pubkey, + CTXVM_MESSAGES_KIND, + tags.clone(), + None, + Some(gift_wrap_kind), + ) + .await + { + Ok((event_id, publishable_event)) => { + let published_len = serde_json::to_string(&publishable_event) + .map(|s| s.len()) + .unwrap_or(usize::MAX); + if published_len > threshold { + return self + .send_oversized_request( + message, + &content, + token, + base_tags, + tags, + discovery_sent, + ) + .await; + } + return self + .publish_single_event(message, event_id, publishable_event, discovery_sent) + .await; + } + Err(error) => { + // Could not build even one event (e.g. NIP-44 plaintext overflow + // from an escape-heavy payload) → it cannot be sent as a single + // event; fragment it. + tracing::debug!( + target: LOG_TARGET, + error = %error, + "Single-event build failed; sending as oversized transfer" + ); + return self + .send_oversized_request( + message, + &content, + token, + base_tags, + tags, + discovery_sent, + ) + .await; + } + } + } + + // Single-event path: not oversized-eligible (notification, feature disabled, + // or no progressToken). let (event_id, publishable_event) = self .base .prepare_mcp_message( @@ -447,7 +599,7 @@ impl NostrClientTransport { CTXVM_MESSAGES_KIND, tags, None, - Some(self.choose_outbound_gift_wrap_kind()), + Some(gift_wrap_kind), ) .await .map_err(|error| { @@ -461,6 +613,21 @@ impl NostrClientTransport { error })?; + self.publish_single_event(message, event_id, publishable_event, discovery_sent) + .await + } + + /// Register (for requests) and publish one prepared MCP event, flipping the + /// one-shot discovery flag after a successful publish. Shared by the + /// non-oversized send paths so the event built for the CEP-22 size check is + /// reused for publishing rather than re-encrypted. + async fn publish_single_event( + &self, + message: &JsonRpcMessage, + event_id: EventId, + publishable_event: Event, + discovery_sent: bool, + ) -> Result<()> { if let JsonRpcMessage::Request(ref req) = message { let is_initialize = req.method == INITIALIZE_METHOD; self.pending_requests @@ -481,7 +648,7 @@ impl NostrClientTransport { } // Flip one-shot flag only after successful publish - if is_request && !discovery_tags.is_empty() { + if discovery_sent { self.has_sent_discovery_tags.store(true, Ordering::Relaxed); } @@ -494,6 +661,146 @@ impl NostrClientTransport { Ok(()) } + /// CEP-22: publish a request as an ordered oversized-transfer sequence. + /// + /// Builds `start → chunks… → end` frames, registers an `accept` waiter before + /// publishing `start` when the server's support is not yet known, drives the + /// §1 [`send_oversized_transfer`] sequencer, and registers the pending request + /// against the **end** frame's event id (the value the server correlates its + /// response to). One-shot discovery tags ride the `start` frame only. + async fn send_oversized_request( + &self, + message: &JsonRpcMessage, + content: &str, + token: &str, + base_tags: Vec, + start_tags: Vec, + discovery_sent: bool, + ) -> Result<()> { + // The handshake is required until the server is known to support oversized + // transfer; once learned, chunks start immediately (no accept slot). + let needs_accept = !self + .discovered_server_capabilities() + .supports_oversized_transfer; + + let gift_wrap_kind = self.choose_outbound_gift_wrap_kind(); + // Effective encryption for these frames (the publish closure passes `None`, + // letting `should_encrypt` decide from the mode — resolve the same boolean + // here so the sizing measurement matches the real published frames). + let is_encrypted = self.base.should_encrypt(CTXVM_MESSAGES_KIND, None); + + // CEP-22: derive a per-chunk payload budget so every published frame stays + // under the threshold even after the JSON-RPC envelope, signature, and + // (when encrypted) gift-wrap expansion. Mirrors TS `resolveSafeOversizedChunkSize`. + // Continuation (chunk) frames carry the bare recipient `p`-tags (`base_tags`), + // so size against those. + let chunk_size = resolve_safe_chunk_size( + self.config.oversized_transfer.chunk_size, + &self.base, + &self.server_pubkey, + &base_tags, + is_encrypted, + Kind::Custom(gift_wrap_kind), + self.config.oversized_transfer.threshold, + ) + .await?; + + let options = OversizedSenderOptions::new(token) + .with_chunk_size(chunk_size) + .with_accept_handshake(needs_accept); + let frames = build_oversized_frames(content, &options)?; + + // Register the accept-waiter BEFORE publishing `start` so an early `accept` + // (decoded on the event-loop task) is never lost. + let await_accept = if needs_accept { + let (accept_tx, accept_rx) = oneshot::channel(); + { + let mut waiters = match self.accept_waiters.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + waiters.insert(token.to_string(), accept_tx); + } + Some(accept_rx) + } else { + None + }; + + // Per-frame publish: the start frame carries one-shot discovery tags; the + // rest carry bare recipient tags. Mirrors the prepare+publish pair in `send`. + let base = &self.base; + let server_pubkey = self.server_pubkey; + let mut start_tags = Some(start_tags); + let publish = move |frame: JsonRpcNotification| { + let tags = start_tags.take().unwrap_or_else(|| base_tags.clone()); + async move { + let msg = JsonRpcMessage::Notification(frame); + let (event_id, publishable) = base + .prepare_mcp_message( + &msg, + &server_pubkey, + CTXVM_MESSAGES_KIND, + tags, + None, + Some(gift_wrap_kind), + ) + .await?; + base.relay_pool.publish_event(&publishable).await?; + Ok::(event_id) + } + }; + + let accept_timeout = + Duration::from_millis(self.config.oversized_transfer.accept_timeout_ms); + let result = + send_oversized_transfer(frames, needs_accept, await_accept, accept_timeout, publish) + .await; + + // Drop the accept-waiter entry regardless of outcome. + if needs_accept { + let mut waiters = match self.accept_waiters.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + waiters.remove(token); + } + + let end_id = match result { + Ok(id) => id, + Err(error) => { + tracing::error!( + target: LOG_TARGET, + error = %error, + server_pubkey = %self.server_pubkey.to_hex(), + method = ?message.method(), + "Failed to send oversized client request" + ); + return Err(error); + } + }; + + // Register the pending request against the END frame's event id. + if let JsonRpcMessage::Request(ref req) = message { + let is_initialize = req.method == INITIALIZE_METHOD; + self.pending_requests + .register(end_id.to_hex(), req.id.clone(), is_initialize) + .await; + } + + // Flip the one-shot discovery flag after a successful transfer. + if discovery_sent { + self.has_sent_discovery_tags.store(true, Ordering::Relaxed); + } + + tracing::debug!( + target: LOG_TARGET, + end_event_id = %end_id.to_hex(), + method = ?message.method(), + "Sent oversized client request" + ); + Ok(()) + } + /// Take the message receiver for consuming incoming messages. pub fn take_message_receiver( &mut self, @@ -535,6 +842,8 @@ impl NostrClientTransport { init_event: Arc>>, server_supports_ephemeral: Arc, seen_gift_wrap_ids: Arc>>, + oversized_receiver: Arc>, + accept_waiters: Arc>>>, timeout: Duration, cancel: CancellationToken, ) { @@ -577,6 +886,8 @@ impl NostrClientTransport { &init_event, &server_supports_ephemeral, &seen_gift_wrap_ids, + &oversized_receiver, + &accept_waiters, &relay_pool, ) .await; @@ -659,12 +970,35 @@ impl NostrClientTransport { Ok(g) => g, Err(p) => p.into_inner(), }; - if stored.is_none() { - *stored = Some(event.clone()); + match stored.as_ref() { + // First discovery-tag-carrying event becomes the session baseline. + None => *stored = Some(event.clone()), + // CEP-35 upgrade (mirrors TS `inbound-coordinator`): if the baseline was + // captured from a non-initialize event (e.g. the first discovery tags + // arrived on a notification) and this event carries a full + // `InitializeResult` (has `protocolVersion`), upgrade the baseline to the + // richer initialize response so `get_server_initialize_event` exposes the + // full server identity/capabilities. Never downgrades. + Some(existing) => { + if !Self::event_has_initialize_result(existing) + && Self::event_has_initialize_result(event) + { + *stored = Some(event.clone()); + } + } } - // Note: TS SDK has an upgrade path where a later event with an InitializeResult - // replaces a non-initialize baseline. Not implemented here -- edge case only - // relevant if the first server message with discovery tags is a notification. + } + + /// Returns `true` when the event's `content` parses to a JSON-RPC response + /// whose `result` is a full MCP `InitializeResult` (keyed on `protocolVersion`, + /// matching the TS `InitializeResultSchema` marker). + fn event_has_initialize_result(event: &Event) -> bool { + serde_json::from_str::(&event.content) + .ok() + .as_ref() + .and_then(|content| content.get("result")) + .and_then(|result| result.get("protocolVersion")) + .is_some() } /// Returns a clone of the first inbound event that carried server discovery tags. @@ -697,6 +1031,8 @@ impl NostrClientTransport { init_event: &Arc>>, server_supports_ephemeral: &Arc, seen_gift_wrap_ids: &Arc>>, + oversized_receiver: &Arc>, + accept_waiters: &Arc>>>, relay_pool: &Arc, ) { let event = match notification { @@ -840,6 +1176,85 @@ impl NostrClientTransport { server_supports_ephemeral.store(true, Ordering::Relaxed); } + // CEP-22: intercept oversized-transfer frames ABOVE the correlation gate + // below. This is mandatory: an `accept` is e-tagged to the start frame + // (not in `pending`), and chunk/end response frames must be reassembled + // rather than delivered raw. Plain `notifications/progress` (no `cvm`) and + // ordinary responses fall through untouched. + if let Ok(notif) = serde_json::from_str::(&actual_event_content) { + if notif.method == NOTIFICATIONS_PROGRESS_METHOD + && OversizedTransferReceiver::is_oversized_frame(¬if) + { + let token = notif + .params + .as_ref() + .and_then(|p| p.get("progressToken")) + .and_then(|t| t.as_str()); + + // Route `accept` frames to the waiting sender by progressToken + // (their e-tag is the start-frame id, which is not in `pending`). + let is_accept = notif + .params + .as_ref() + .and_then(|p| p.get("cvm")) + .and_then(OversizedFrame::from_cvm_value) + .is_some_and(|f| matches!(f, OversizedFrame::Accept)); + if is_accept { + if let Some(token) = token { + let waiter = { + let mut waiters = match accept_waiters.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + waiters.remove(token) + }; + if let Some(waiter) = waiter { + let _ = waiter.send(()); + } + } + return; + } + + // OD-2: touch the pending entry so the sweep does not evict the + // request mid-transfer (chunks do not otherwise refresh it). + if let Some(ref correlated_id) = e_tag { + pending.touch(correlated_id.as_str()).await; + } + + // Feed the frame to the reassembler (process_frame is sync; the + // guard is dropped before any await). + let outcome = { + let mut receiver = match oversized_receiver.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; + receiver.process_frame(¬if) + }; + match outcome { + // start/chunk consumed — do NOT touch `pending` or validate. + Ok(None) => return, + // end frame: deliver the reassembled (already-validated, may + // exceed 1 MB) message and clear the pending entry. + Ok(Some(message)) => { + if let Some(ref correlated_id) = e_tag { + pending.remove(correlated_id.as_str()).await; + } + let _ = tx.send(message); + return; + } + // Failure: clean up locally, let the request time out. + Err(error) => { + tracing::warn!( + target: LOG_TARGET, + error = %error, + "Inbound oversized transfer failed" + ); + return; + } + } + } + } + // Correlate response if let Some(ref correlated_id) = e_tag { let is_pending = pending.contains(correlated_id.as_str()).await; @@ -1198,6 +1613,8 @@ mod tests { server_initialize_event: Arc::new(Mutex::new(None)), server_supports_ephemeral: Arc::new(AtomicBool::new(false)), seen_gift_wrap_ids: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(10).unwrap()))), + oversized_receiver: Arc::new(Mutex::new(OversizedTransferReceiver::new())), + accept_waiters: Arc::new(Mutex::new(HashMap::new())), message_tx: Some(tokio::sync::mpsc::unbounded_channel().0), message_rx: None, cancellation_token: CancellationToken::new(), @@ -1296,13 +1713,31 @@ mod tests { // ── CEP-35 client capability learning ─────────────────────── fn make_event_with_tags(tag_parts: &[&[&str]]) -> Event { + make_event_with_content_and_tags("{}", tag_parts) + } + + fn make_event_with_content_and_tags(content: &str, tag_parts: &[&[&str]]) -> Event { let keys = Keys::generate(); let tags: Vec = tag_parts.iter().map(|p| make_tag(p)).collect(); - let builder = EventBuilder::new(Kind::Custom(CTXVM_MESSAGES_KIND), "{}").tags(tags); + let builder = EventBuilder::new(Kind::Custom(CTXVM_MESSAGES_KIND), content).tags(tags); let unsigned = builder.build(keys.public_key()); unsigned.sign_with_keys(&keys).unwrap() } + /// A JSON-RPC response carrying a full `InitializeResult` (has `protocolVersion`). + fn initialize_result_content() -> String { + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "protocolVersion": "2025-06-18", + "capabilities": {}, + "serverInfo": { "name": "UpgradedServer", "version": "1.0.0" } + } + }) + .to_string() + } + #[test] fn client_learn_server_discovery_sets_baseline() { let caps = Mutex::new(PeerCapabilities::default()); @@ -1357,4 +1792,36 @@ mod tests { "baseline must not be replaced" ); } + + #[test] + fn client_baseline_upgraded_to_initialize_result() { + let caps = Mutex::new(PeerCapabilities::default()); + let init = Mutex::new(None); + + // First discovery tags arrive on a non-initialize event (e.g. a notification). + let baseline = make_event_with_tags(&[&["support_encryption"]]); + NostrClientTransport::learn_server_discovery(&caps, &init, &baseline); + assert_eq!(init.lock().unwrap().as_ref().unwrap().id, baseline.id); + + // A later event carries a full InitializeResult → baseline is upgraded. + let init_event = make_event_with_content_and_tags( + &initialize_result_content(), + &[&["support_encryption"]], + ); + NostrClientTransport::learn_server_discovery(&caps, &init, &init_event); + assert_eq!( + init.lock().unwrap().as_ref().unwrap().id, + init_event.id, + "baseline must upgrade to the initialize-result event" + ); + + // A still-later non-initialize event must NOT downgrade the baseline. + let later = make_event_with_tags(&[&["support_encryption_ephemeral"]]); + NostrClientTransport::learn_server_discovery(&caps, &init, &later); + assert_eq!( + init.lock().unwrap().as_ref().unwrap().id, + init_event.id, + "baseline must not downgrade away from the initialize result" + ); + } } diff --git a/src/transport/oversized_transfer/mod.rs b/src/transport/oversized_transfer/mod.rs index 88a665b..f517725 100644 --- a/src/transport/oversized_transfer/mod.rs +++ b/src/transport/oversized_transfer/mod.rs @@ -47,6 +47,8 @@ pub mod constants; pub mod errors; pub mod frame; pub mod receiver; +pub mod sender; +pub mod sizing; pub use codec::{ build_oversized_frames, sha256_digest, split_string_by_byte_size, utf8_byte_len, @@ -56,6 +58,8 @@ pub use constants::*; pub use errors::OversizedTransferError; pub use frame::{CompletionMode, OversizedFrame}; pub use receiver::{OversizedTransferReceiver, TransferPolicy}; +pub use sender::send_oversized_transfer; +pub use sizing::{measure_published_event_size, resolve_safe_chunk_size}; /// CEP-22 oversized-transfer configuration shared by both transports. /// @@ -180,3 +184,18 @@ impl OversizedTransferConfig { self } } + +impl From<&OversizedTransferConfig> for TransferPolicy { + /// Project the receiver-relevant knobs of an [`OversizedTransferConfig`] into + /// a [`TransferPolicy`] (D6 → receiver admission policy). + fn from(config: &OversizedTransferConfig) -> Self { + TransferPolicy { + max_transfer_bytes: config.max_transfer_bytes, + max_transfer_chunks: config.max_transfer_chunks, + max_concurrent_transfers: config.max_concurrent_transfers, + max_out_of_order_window: config.max_out_of_order_window, + max_out_of_order_chunks: config.max_out_of_order_chunks, + transfer_timeout_ms: config.transfer_timeout_ms, + } + } +} diff --git a/src/transport/oversized_transfer/sender.rs b/src/transport/oversized_transfer/sender.rs new file mode 100644 index 0000000..e3dbb36 --- /dev/null +++ b/src/transport/oversized_transfer/sender.rs @@ -0,0 +1,241 @@ +//! Transport-agnostic async driver for sending an oversized transfer. +//! +//! Given the ordered frames produced by [`build_oversized_frames`](super::codec::build_oversized_frames), +//! this sequences `start → [await accept] → chunks → end`, publishing each frame +//! through a caller-supplied closure, and returns the **end-frame `EventId`** so +//! the transport can correlate the eventual response. +//! +//! The driver carries no transport itself: the client publishes via +//! `prepare_mcp_message` + `publish_event` (registering a pending entry between +//! the two), the server via `base.send_mcp_message`. Those signatures differ, so +//! the publish step is injected as a closure and this module stays transport-free +//! (D11 / OD-5: there is no active `abort` frame in v1 — on a missed `accept` the +//! sender fails and cleans up locally, letting the peer's own timeout fire). + +use std::future::Future; +use std::time::Duration; + +use nostr_sdk::prelude::EventId; +use tokio::sync::oneshot; + +use crate::core::types::JsonRpcNotification; + +use super::codec::BuiltOversizedFrames; +use super::errors::OversizedTransferError; + +/// Extract the outer `progressToken` from a frame's `params`, used to label a +/// locally-raised [`OversizedTransferError::Abort`]. +fn progress_token_of(frame: &JsonRpcNotification) -> String { + frame + .params + .as_ref() + .and_then(|params| params.get("progressToken")) + .and_then(|value| value.as_str()) + .unwrap_or_default() + .to_string() +} + +/// Publish an oversized transfer in canonical frame order, returning the +/// `EventId` of the **end** frame (the id the peer correlates its response to). +/// +/// `publish` is invoked once per frame, in order, and must return the published +/// event's inner `EventId`. When `needs_accept` is set the driver waits up to +/// `accept_timeout` for the receiver's `accept` to fire `await_accept` before +/// sending any chunk; on timeout (or a dropped waiter) it returns +/// [`OversizedTransferError::Abort`] without emitting an `abort` frame. +/// +/// `await_accept` is `Some` iff `needs_accept` is `true`. +pub async fn send_oversized_transfer( + frames: BuiltOversizedFrames, + needs_accept: bool, + await_accept: Option>, + accept_timeout: Duration, + mut publish: P, +) -> crate::Result +where + P: FnMut(JsonRpcNotification) -> Fut, + Fut: Future>, +{ + // The token is only needed to label a potential abort; capture it before the + // start frame is moved into `publish`. + let token = progress_token_of(&frames.start); + + // 1. Publish `start`. Its id is not used for correlation — the end id is. + publish(frames.start).await?; + + // 2. If a handshake is required, block until the receiver's `accept` arrives. + if needs_accept { + let outcome = match await_accept { + Some(rx) => tokio::time::timeout(accept_timeout, rx).await, + None => { + // Defensive: `needs_accept` implies a waiter was provided. + return Err(OversizedTransferError::abort( + token, + Some("no accept waiter registered".to_string()), + ) + .into()); + } + }; + + // `Err(_)` → timed out; `Ok(Err(_))` → waiter dropped. Either way we abort + // locally and let the peer's own timeout fire (no active abort frame). + if matches!(outcome, Err(_) | Ok(Err(_))) { + return Err(OversizedTransferError::abort( + token, + Some("timed out waiting for accept frame".to_string()), + ) + .into()); + } + } + + // 3. Publish each chunk sequentially in canonical (`progress`) order. + for chunk in frames.chunks { + publish(chunk).await?; + } + + // 4. Publish `end`; its id correlates the eventual response. + let end_id = publish(frames.end).await?; + Ok(end_id) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::cell::RefCell; + use std::rc::Rc; + + use crate::transport::oversized_transfer::codec::{ + build_oversized_frames, OversizedSenderOptions, + }; + + /// `frameType` discriminator of a published frame. + fn frame_type(notification: &JsonRpcNotification) -> String { + notification.params.as_ref().unwrap()["cvm"]["frameType"] + .as_str() + .unwrap() + .to_string() + } + + /// Deterministic [`EventId`] for the n-th published frame (1-based). + fn nth_event_id(n: usize) -> EventId { + EventId::from_hex(&format!("{n:064x}")).unwrap() + } + + /// Build frames for a payload large enough to span several chunks. + fn sample_frames(token: &str, needs_accept: bool) -> BuiltOversizedFrames { + let payload = "x".repeat(50); + let options = OversizedSenderOptions::new(token) + .with_chunk_size(8) + .with_accept_handshake(needs_accept); + build_oversized_frames(&payload, &options).unwrap() + } + + /// An in-memory publish closure: records each frame and hands back a + /// deterministic id keyed on publish order (matching [`nth_event_id`]). + fn recording_publisher( + log: Rc>>, + ) -> impl FnMut(JsonRpcNotification) -> std::pin::Pin>>> + { + move |frame: JsonRpcNotification| { + let log = log.clone(); + Box::pin(async move { + log.borrow_mut().push(frame); + let n = log.borrow().len(); + Ok(nth_event_id(n)) + }) + } + } + + #[tokio::test] + async fn publishes_frames_in_canonical_order_and_returns_end_id() { + let frames = sample_frames("tok", false); + let expected_count = frames.frame_count(); + + let log = Rc::new(RefCell::new(Vec::new())); + let publish = recording_publisher(log.clone()); + + let end_id = send_oversized_transfer(frames, false, None, Duration::from_secs(1), publish) + .await + .unwrap(); + + let frames_log = log.borrow(); + assert_eq!(frames_log.len(), expected_count); + assert!(expected_count > 2, "payload should span multiple chunks"); + + // start → chunks… → end + assert_eq!(frame_type(&frames_log[0]), "start"); + assert_eq!(frame_type(frames_log.last().unwrap()), "end"); + for mid in &frames_log[1..frames_log.len() - 1] { + assert_eq!(frame_type(mid), "chunk"); + } + + // Returned id is the end frame's id (the last one published). + assert_eq!(end_id, nth_event_id(expected_count)); + } + + #[tokio::test] + async fn accept_is_awaited_and_satisfied_when_needed() { + let frames = sample_frames("tok", true); + let expected_count = frames.frame_count(); + + let log = Rc::new(RefCell::new(Vec::new())); + let publish = recording_publisher(log.clone()); + + // Pre-arm the oneshot so the awaited accept resolves immediately. + let (tx, rx) = oneshot::channel(); + tx.send(()).unwrap(); + + let end_id = + send_oversized_transfer(frames, true, Some(rx), Duration::from_secs(1), publish) + .await + .unwrap(); + + let frames_log = log.borrow(); + assert_eq!(frames_log.len(), expected_count); + assert_eq!(frame_type(&frames_log[0]), "start"); + assert_eq!(frame_type(frames_log.last().unwrap()), "end"); + assert_eq!(end_id, nth_event_id(expected_count)); + } + + #[tokio::test] + async fn accept_is_not_awaited_when_not_needed() { + let frames = sample_frames("tok", false); + + let log = Rc::new(RefCell::new(Vec::new())); + let publish = recording_publisher(log.clone()); + + // A waiter is supplied but never fired; because `needs_accept` is false + // the driver must ignore it and complete without abort. + let (_tx, rx) = oneshot::channel(); + let result = + send_oversized_transfer(frames, false, Some(rx), Duration::from_millis(5), publish) + .await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn missed_accept_returns_abort() { + let frames = sample_frames("tok", true); + + let log = Rc::new(RefCell::new(Vec::new())); + let publish = recording_publisher(log.clone()); + + // Keep `_tx` alive so the waiter is not "closed" — this exercises the + // timeout (elapsed) path specifically. + let (_tx, rx) = oneshot::channel(); + let result = + send_oversized_transfer(frames, true, Some(rx), Duration::from_millis(20), publish) + .await; + + match result { + Err(crate::Error::OversizedTransfer(OversizedTransferError::Abort { + token, .. + })) => assert_eq!(token, "tok"), + other => panic!("expected abort error, got {other:?}"), + } + + // Only the start frame was published before the failed handshake. + assert_eq!(log.borrow().len(), 1); + } +} diff --git a/src/transport/oversized_transfer/sizing.rs b/src/transport/oversized_transfer/sizing.rs new file mode 100644 index 0000000..975b5c3 --- /dev/null +++ b/src/transport/oversized_transfer/sizing.rs @@ -0,0 +1,278 @@ +//! CEP-22 published-event sizing helpers. +//! +//! Ports the TypeScript `measurePublishedMcpMessageSize` and +//! `resolveSafeOversizedChunkSize` from `base-nostr-transport.ts`. +//! +//! Each oversized frame is published as an independently signed (and, when +//! encrypted, gift-wrapped) Nostr event. The *published* event is materially +//! larger than its raw chunk payload: the JSON-RPC `notifications/progress` +//! envelope, the 64-byte id / 64-byte pubkey / 128-byte signature, the tags, and +//! — when encrypted — the NIP-44 + base64 gift-wrap expansion all add overhead, +//! and the chunk `data` string is escaped at every JSON layer. Picking a fixed +//! chunk size therefore risks frames much larger than intended. These helpers +//! measure the *real* published size so the sender can choose a per-chunk budget +//! that keeps every frame within a byte ceiling. + +use nostr_sdk::prelude::{Kind, PublicKey, Tag}; + +use crate::core::constants::CTXVM_MESSAGES_KIND; +use crate::core::error::Result; +use crate::core::types::{JsonRpcMessage, JsonRpcNotification}; +use crate::transport::base::BaseTransport; + +use super::frame::OversizedFrame; + +/// Probe `progressToken` used when sizing chunk frames. A UUID-length placeholder +/// so the measured envelope is representative of (and slightly conservative for) +/// real progress tokens, which the sizing signature deliberately does not thread +/// through (cf. TS, which passes the real token — a negligible byte difference). +const SIZING_PROBE_TOKEN: &str = "00000000-0000-0000-0000-000000000000"; + +/// Probe `progress` slot used when sizing chunk frames (the no-handshake +/// first-chunk slot). The slot only affects size by a digit or two. +const SIZING_PROBE_PROGRESS: u64 = 2; + +/// Build the final outbound Nostr event for `frame` — signing, and gift-wrapping +/// when `is_encrypted` — and return its serialized UTF-8 byte length. +/// +/// Mirrors TS `measurePublishedMcpMessageSize`: it reuses +/// [`BaseTransport::prepare_mcp_message`] (the rs-sdk equivalent of TS +/// `buildPublishedMcpEvent`) to produce the exact event that would hit the relay, +/// then measures `serde_json::to_string(event).len()` (Rust strings are UTF-8, so +/// `.len()` is the byte length). The caller passes the continuation-frame `tags` +/// so the measurement reflects exactly what the real chunk frames carry — the +/// client's recipient `p`-tag, or the server's response `p`+`e` tags. +pub async fn measure_published_event_size( + frame: &JsonRpcNotification, + base: &BaseTransport, + recipient: &PublicKey, + tags: &[Tag], + is_encrypted: bool, + gift_wrap_kind: Kind, +) -> Result { + let message = JsonRpcMessage::Notification(frame.clone()); + let (_event_id, published) = base + .prepare_mcp_message( + &message, + recipient, + CTXVM_MESSAGES_KIND, + tags.to_vec(), + Some(is_encrypted), + Some(gift_wrap_kind.as_u16()), + ) + .await?; + + Ok(serde_json::to_string(&published)?.len()) +} + +/// Binary-search the largest per-chunk payload size in `[1, desired]` whose +/// published frame event stays within `max_event_bytes`. +/// +/// Mirrors TS `resolveSafeOversizedChunkSize`: the probe chunk's payload is a run +/// of backslash bytes — the JSON worst case, since a backslash escapes to two +/// bytes at *every* serialization layer — so the chosen budget stays safe for +/// arbitrary real payloads. `tags` are the continuation-frame tags (passed +/// straight to [`measure_published_event_size`]). Always returns at least 1 +/// (matching TS, which never returns 0, even when a single-byte payload already +/// overflows the ceiling). +pub async fn resolve_safe_chunk_size( + desired: usize, + base: &BaseTransport, + recipient: &PublicKey, + tags: &[Tag], + is_encrypted: bool, + gift_wrap_kind: Kind, + max_event_bytes: usize, +) -> Result { + let mut low: usize = 1; + let mut high: usize = desired.max(1); + let mut best: usize = 1; + + while low <= high { + let mid = low + (high - low) / 2; + let probe = OversizedFrame::Chunk { + data: "\\".repeat(mid), + } + .into_progress_notification(SIZING_PROBE_TOKEN, SIZING_PROBE_PROGRESS, None)?; + + // A frame we cannot even build at this size is, by definition, too big and + // must be searched lower. This is not hypothetical: in the encrypted path a + // large probe's JSON double-escapes past NIP-44's ~64 KiB plaintext ceiling + // and `gift_wrap` errors — without this, the very first probe (mid = + // desired/2 ≈ 24 000 for the default config) would abort the whole resolve. + let fits = match measure_published_event_size( + &probe, + base, + recipient, + tags, + is_encrypted, + gift_wrap_kind, + ) + .await + { + Ok(size) => size <= max_event_bytes, + Err(_) => false, + }; + + if fits { + best = mid; + low = mid + 1; + } else { + // `mid >= 1` always (low starts at 1), so `mid - 1` cannot underflow. + high = mid - 1; + } + } + + Ok(best) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + use nostr_sdk::prelude::Keys; + + use crate::core::types::EncryptionMode; + use crate::relay::mock::MockRelayPool; + use crate::relay::RelayPoolTrait; + + fn test_base(mode: EncryptionMode) -> BaseTransport { + BaseTransport { + relay_pool: Arc::new(MockRelayPool::new()) as Arc, + encryption_mode: mode, + is_connected: true, + } + } + + fn chunk_frame(data_len: usize) -> JsonRpcNotification { + OversizedFrame::Chunk { + data: "x".repeat(data_len), + } + .into_progress_notification("tok", 2, None) + .unwrap() + } + + #[tokio::test] + async fn measure_grows_with_data_length() { + let base = test_base(EncryptionMode::Disabled); + let recipient = Keys::generate().public_key(); + let tags = BaseTransport::create_recipient_tags(&recipient); + + let small = measure_published_event_size( + &chunk_frame(10), + &base, + &recipient, + &tags, + false, + Kind::Custom(crate::core::constants::GIFT_WRAP_KIND), + ) + .await + .unwrap(); + let large = measure_published_event_size( + &chunk_frame(1000), + &base, + &recipient, + &tags, + false, + Kind::Custom(crate::core::constants::GIFT_WRAP_KIND), + ) + .await + .unwrap(); + + assert!(large > small, "larger data must yield a larger event"); + assert!(small > 0); + } + + #[tokio::test] + async fn resolve_never_exceeds_desired_and_is_at_least_one() { + let base = test_base(EncryptionMode::Disabled); + let recipient = Keys::generate().public_key(); + let tags = BaseTransport::create_recipient_tags(&recipient); + + // A tiny ceiling that even a 1-byte frame overflows still yields 1. + let clamped = resolve_safe_chunk_size( + 5000, + &base, + &recipient, + &tags, + false, + Kind::Custom(crate::core::constants::GIFT_WRAP_KIND), + 10, + ) + .await + .unwrap(); + assert_eq!(clamped, 1, "an unsatisfiable ceiling must still return 1"); + + // The result is capped at `desired`. + let capped = resolve_safe_chunk_size( + 8, + &base, + &recipient, + &tags, + false, + Kind::Custom(crate::core::constants::GIFT_WRAP_KIND), + 10_000_000, + ) + .await + .unwrap(); + assert_eq!(capped, 8, "a generous ceiling must cap at `desired`"); + } + + #[tokio::test] + async fn resolve_is_monotonic_in_budget() { + let base = test_base(EncryptionMode::Disabled); + let recipient = Keys::generate().public_key(); + let tags = BaseTransport::create_recipient_tags(&recipient); + let gw = Kind::Custom(crate::core::constants::GIFT_WRAP_KIND); + + let small_budget = + resolve_safe_chunk_size(48_000, &base, &recipient, &tags, false, gw, 4_000) + .await + .unwrap(); + let large_budget = + resolve_safe_chunk_size(48_000, &base, &recipient, &tags, false, gw, 16_000) + .await + .unwrap(); + + assert!( + large_budget >= small_budget, + "a larger byte ceiling must allow an equal-or-larger chunk ({large_budget} >= {small_budget})" + ); + } + + #[tokio::test] + async fn encrypted_frames_force_smaller_chunks_than_plaintext() { + let recipient = Keys::generate().public_key(); + let tags = BaseTransport::create_recipient_tags(&recipient); + let gw = Kind::Custom(crate::core::constants::GIFT_WRAP_KIND); + + let plain = resolve_safe_chunk_size( + 48_000, + &test_base(EncryptionMode::Disabled), + &recipient, + &tags, + false, + gw, + 8_000, + ) + .await + .unwrap(); + let encrypted = resolve_safe_chunk_size( + 48_000, + &test_base(EncryptionMode::Required), + &recipient, + &tags, + true, + gw, + 8_000, + ) + .await + .unwrap(); + + assert!( + encrypted < plain, + "gift-wrap expansion must shrink the safe chunk ({encrypted} < {plain})" + ); + } +} diff --git a/src/transport/server/mod.rs b/src/transport/server/mod.rs index 6063728..251b79c 100644 --- a/src/transport/server/mod.rs +++ b/src/transport/server/mod.rs @@ -29,7 +29,10 @@ use crate::encryption; use crate::relay::{RelayPool, RelayPoolTrait}; use crate::transport::base::BaseTransport; use crate::transport::discovery_tags::learn_peer_capabilities; -use crate::transport::oversized_transfer::OversizedTransferConfig; +use crate::transport::oversized_transfer::{ + build_oversized_frames, resolve_safe_chunk_size, OversizedFrame, OversizedSenderOptions, + OversizedTransferConfig, OversizedTransferReceiver, TransferPolicy, ACCEPT_PROGRESS, +}; const LOG_TARGET: &str = "contextvm_sdk::transport::server"; @@ -46,6 +49,17 @@ fn oversized_support_tags(config: &NostrServerTransportConfig) -> Vec { } } +/// CEP-22: build the empty per-peer reassembly store, bounded to `max_sessions` +/// peers (one [`OversizedTransferReceiver`] per client pubkey, inserted lazily by +/// the inbound event loop). +fn new_oversized_receiver_store( + max_sessions: usize, +) -> Arc>> { + Arc::new(RwLock::new(LruCache::new( + NonZeroUsize::new(max_sessions).unwrap_or(NonZeroUsize::new(1).unwrap()), + ))) +} + /// Configuration for the server transport. #[derive(Debug, Clone)] #[non_exhaustive] @@ -134,6 +148,11 @@ pub struct NostrServerTransport { /// Duplicate outer ids are skipped before decrypt; ids are inserted only after success /// so failed decrypt/verify can be retried on redelivery. seen_gift_wrap_ids: Arc>>, + /// CEP-22: per-peer reassembly engines for inbound oversized transfers, keyed + /// by client pubkey (hex) and bounded to `max_sessions` peers. Each receiver + /// enforces the configured per-peer admission policy. Populated by the inbound + /// event loop; cleared on [`close`](Self::close). + oversized_receiver: Arc>>, /// Channel for incoming MCP messages (consumed by the MCP server). message_tx: Option>, message_rx: Option>, @@ -295,6 +314,7 @@ impl NostrServerTransport { is_connected: false, }, sessions: SessionStore::with_capacity(config.max_sessions), + oversized_receiver: new_oversized_receiver_store(config.max_sessions), config, event_routes: ServerEventRouteStore::new(), request_wrap_kinds: Arc::new(RwLock::new(HashMap::new())), @@ -345,6 +365,7 @@ impl NostrServerTransport { is_connected: false, }, sessions: SessionStore::with_capacity(config.max_sessions), + oversized_receiver: new_oversized_receiver_store(config.max_sessions), config, request_wrap_kinds: Arc::new(RwLock::new(HashMap::new())), event_routes: ServerEventRouteStore::new(), @@ -413,6 +434,8 @@ impl NostrServerTransport { let gift_wrap_mode = self.config.gift_wrap_mode; let is_announced_server = self.config.is_announced_server; let oversized_enabled = self.config.oversized_transfer.enabled; + let oversized_receiver = self.oversized_receiver.clone(); + let transfer_policy: TransferPolicy = (&self.config.oversized_transfer).into(); let common_tags_snapshot = self.announcement_manager.common_tags_snapshot(); let seen_gift_wrap_ids = self.seen_gift_wrap_ids.clone(); let event_loop_token = self.cancellation_token.child_token(); @@ -430,6 +453,8 @@ impl NostrServerTransport { gift_wrap_mode, is_announced_server, oversized_enabled, + oversized_receiver, + transfer_policy, common_tags_snapshot, seen_gift_wrap_ids, event_loop_token, @@ -519,6 +544,7 @@ impl NostrServerTransport { self.base.disconnect().await?; self.sessions.clear().await; self.event_routes.clear().await; + self.oversized_receiver.write().await.clear(); Ok(()) } @@ -556,7 +582,14 @@ impl NostrServerTransport { _ => {} } + // CEP-22: serialize once, *after* id restoration. The threshold check and + // the oversized split must both derive from this exact post-restoration + // string so the client reassembles bytes whose `id` matches the digest. + let serialized = serde_json::to_string(&response)?; + let is_encrypted = session.is_encrypted; + // CEP-22: capture the peer's oversized support while the session lock is held. + let supports_oversized_transfer = session.supports_oversized_transfer; // CEP-35: include discovery tags on first response to this client let discovery_tags = self.take_pending_server_discovery_tags(session); @@ -593,23 +626,105 @@ impl NostrServerTransport { let base_tags = BaseTransport::create_response_tags(&client_pubkey, &event_id_parsed); let tags = BaseTransport::compose_outbound_tags(&base_tags, &discovery_tags, &[]); + let gift_wrap_kind = Self::select_outbound_gift_wrap_kind( + self.config.gift_wrap_mode, + is_encrypted, + mirrored_wrap_kind, + ); - if let Err(error) = self - .base - .send_mcp_message( - &response, + // CEP-22: a response is eligible for oversized fragmentation only when the + // feature is enabled, the peer advertised support, and the request carried a + // progressToken to address the frames with. + let oversized_eligible = self.config.oversized_transfer.enabled + && progress_token.is_some() + && supports_oversized_transfer; + let threshold = self.config.oversized_transfer.threshold; + + // CEP-22: relay size limits apply to the *published* Nostr event, so decide + // on the published byte size, not the raw payload (mirrors TS + // `measurePublishedMcpMessageSize`). The raw serialized length is a cheap + // lower bound: at/above the threshold the response is conclusively oversized + // and we fragment without building a single event — an escape-heavy payload + // could otherwise overflow NIP-44's plaintext limit while measuring. Below + // it, build the single event once, measure it, and reuse it when it fits. + let mut reuse_event: Option = None; + let fragment = if !oversized_eligible { + false + } else if serialized.len() >= threshold { + true + } else { + match self + .base + .prepare_mcp_message( + &response, + &client_pubkey, + CTXVM_MESSAGES_KIND, + tags.clone(), + Some(is_encrypted), + gift_wrap_kind, + ) + .await + { + Ok((_id, publishable)) => { + let published_len = serde_json::to_string(&publishable) + .map(|s| s.len()) + .unwrap_or(usize::MAX); + if published_len > threshold { + true + } else { + reuse_event = Some(publishable); + false + } + } + // Could not build one event (e.g. NIP-44 plaintext overflow from an + // escape-heavy payload) → it cannot be sent as a single event. + Err(error) => { + tracing::debug!( + target: LOG_TARGET, + error = %error, + event_id = %event_id, + "Single-event build failed; sending response as oversized transfer" + ); + true + } + } + }; + + // Both paths converge on the cleanup tail below — neither early-returns on + // success. + let send_result: Result<()> = if fragment { + self.send_oversized_response( + &serialized, + progress_token.as_deref().unwrap_or_default(), &client_pubkey, - CTXVM_MESSAGES_KIND, + &base_tags, tags, - Some(is_encrypted), - Self::select_outbound_gift_wrap_kind( - self.config.gift_wrap_mode, - is_encrypted, - mirrored_wrap_kind, - ), + is_encrypted, + gift_wrap_kind, ) .await - { + } else if let Some(publishable) = reuse_event { + // Reuse the event already built for the size check — no re-encryption. + self.base + .relay_pool + .publish_event(&publishable) + .await + .map(|_| ()) + } else { + self.base + .send_mcp_message( + &response, + &client_pubkey, + CTXVM_MESSAGES_KIND, + tags, + Some(is_encrypted), + gift_wrap_kind, + ) + .await + .map(|_| ()) + }; + + if let Err(error) = send_result { tracing::error!( target: LOG_TARGET, error = %error, @@ -655,6 +770,65 @@ impl NostrServerTransport { Ok(()) } + /// CEP-22: publish a response as an ordered oversized-transfer frame sequence. + /// + /// Splits the post-restoration `serialized` string into `start → chunks… → + /// end` frames (digest and split both derived from that exact string) and + /// publishes each as a `notifications/progress` event to `recipient`. The + /// server never reserves the `accept` slot or waits for a handshake — it only + /// fragments for peers already known to support the feature. One-shot + /// discovery tags ride the `start` frame only (`start_tags`); every later + /// frame carries bare recipient + `e`-tags (`base_tags`). + #[allow(clippy::too_many_arguments)] + async fn send_oversized_response( + &self, + serialized: &str, + progress_token: &str, + recipient: &PublicKey, + base_tags: &[Tag], + start_tags: Vec, + is_encrypted: bool, + gift_wrap_kind: Option, + ) -> Result<()> { + // CEP-22: derive a per-chunk payload budget so every published frame stays + // under the threshold even after the JSON-RPC envelope, signature, and + // (when encrypted) gift-wrap expansion. Mirrors TS `resolveSafeOversizedChunkSize`. + // Continuation (chunk) frames carry the response `p`+`e` tags (`base_tags`), + // so size against those — not the bare recipient tag — or the budget would + // be ~70 bytes optimistic. + let chunk_size = resolve_safe_chunk_size( + self.config.oversized_transfer.chunk_size, + &self.base, + recipient, + base_tags, + is_encrypted, + Kind::Custom(gift_wrap_kind.unwrap_or(GIFT_WRAP_KIND)), + self.config.oversized_transfer.threshold, + ) + .await?; + let options = OversizedSenderOptions::new(progress_token).with_chunk_size(chunk_size); + let frames = build_oversized_frames(serialized, &options)?.into_ordered(); + + // Discovery tags ride the start frame; `take` yields them once, then the + // remaining frames fall back to bare recipient + `e`-tags. + let mut start_tags = Some(start_tags); + for frame in frames { + let tags = start_tags.take().unwrap_or_else(|| base_tags.to_vec()); + let message = JsonRpcMessage::Notification(frame); + self.base + .send_mcp_message( + &message, + recipient, + CTXVM_MESSAGES_KIND, + tags, + Some(is_encrypted), + gift_wrap_kind, + ) + .await?; + } + Ok(()) + } + /// Send a notification to a specific client. pub async fn send_notification( &self, @@ -915,6 +1089,8 @@ impl NostrServerTransport { gift_wrap_mode: GiftWrapMode, is_announced_server: bool, oversized_enabled: bool, + oversized_receiver: Arc>>, + transfer_policy: TransferPolicy, common_tags_snapshot: announcement_manager::CommonTagsSnapshot, seen_gift_wrap_ids: Arc>>, cancel: CancellationToken, @@ -1196,10 +1372,44 @@ impl NostrServerTransport { let discovered = learn_peer_capabilities(&inner_tags); session.supports_encryption |= discovered.supports_encryption; session.supports_ephemeral_encryption |= discovered.supports_ephemeral_encryption; + // CEP-22 (OD-4): snapshot the flag BEFORE the learning gate mutates + // it — the very `start` frame carries the client's support tag, so + // without this snapshot the first transfer would never get an `accept`. + let client_already_supported = session.supports_oversized_transfer; // CEP-22: only learn oversized support if it is enabled on this server. session.supports_oversized_transfer |= oversized_enabled && discovered.supports_oversized_transfer; + // CEP-22: intercept oversized-transfer frames before request + // correlation/dispatch. A disabled server forwards raw progress + // notifications as before (OD-6). + if oversized_enabled { + if let JsonRpcMessage::Notification(ref n) = mcp_msg { + if OversizedTransferReceiver::is_oversized_frame(n) { + drop(sessions_w); + Self::handle_oversized_frame( + n, + &sender_pubkey, + &event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + client_already_supported, + &oversized_receiver, + transfer_policy, + &relay_pool, + encryption_mode, + gift_wrap_mode, + &event_routes, + &request_wrap_kinds, + &tx, + ) + .await; + continue; + } + } + } + // Track request for correlation if let JsonRpcMessage::Request(ref req) = mcp_msg { let original_id = req.id.clone(); @@ -1270,6 +1480,198 @@ impl NostrServerTransport { } } + /// CEP-22 server inbound: process one oversized-transfer frame. + /// + /// Emits an `accept` on the opening frame when the client's support is not yet + /// known (OD-4), feeds the frame to this peer's reassembler, and — on the + /// `end` frame — registers a response route and dispatches the reassembled + /// request as a synthetic [`IncomingRequest`] (keyed by the end frame's real + /// carrying event id, collision-free against the reserved sentinels). + #[allow(clippy::too_many_arguments)] + async fn handle_oversized_frame( + frame: &JsonRpcNotification, + sender_pubkey: &str, + event_id: &str, + is_encrypted: bool, + is_gift_wrap: bool, + outer_kind: u16, + client_already_supported: bool, + oversized_receiver: &Arc>>, + transfer_policy: TransferPolicy, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + event_routes: &ServerEventRouteStore, + request_wrap_kinds: &Arc>>>, + tx: &tokio::sync::mpsc::UnboundedSender, + ) { + // The outer progressToken keys the transfer (needed for accept + route). + let token = frame + .params + .as_ref() + .and_then(|p| p.get("progressToken")) + .and_then(|t| t.as_str()) + .map(String::from); + + // 1. Emit `accept` on the opening frame if support is not yet known. + let is_start = frame + .params + .as_ref() + .and_then(|p| p.get("cvm")) + .and_then(OversizedFrame::from_cvm_value) + .is_some_and(|f| matches!(f, OversizedFrame::Start { .. })); + let issued_accept = is_start && !client_already_supported && token.is_some(); + if issued_accept { + if let Some(ref token) = token { + Self::emit_accept_frame( + token, + sender_pubkey, + event_id, + is_encrypted, + is_gift_wrap, + outer_kind, + relay_pool, + encryption_mode, + gift_wrap_mode, + ) + .await; + } + } + + // 2. Feed the frame to this peer's reassembler (process_frame is sync; the + // write guard is held only across the sync call, never an await). When we + // issued an `accept`, the sender reserved progress slot 2 and its chunks + // begin at slot 3 — but we never *receive* an `accept`, so feed a synthetic + // one into our own receiver to align chunk-slot tracking with the handshake + // layout (otherwise the frontier sticks at slot 2 and chunks pile up as + // out-of-order until the gap exceeds the window). + let outcome = { + let mut store = oversized_receiver.write().await; + if !store.contains(sender_pubkey) { + store.put( + sender_pubkey.to_string(), + OversizedTransferReceiver::with_policy(transfer_policy), + ); + } + let receiver = store.get_mut(sender_pubkey).unwrap(); + let outcome = receiver.process_frame(frame); + if issued_accept && matches!(outcome, Ok(None)) { + if let Some(ref token) = token { + if let Ok(accept) = OversizedFrame::Accept.into_progress_notification( + token, + ACCEPT_PROGRESS, + None, + ) { + let _ = receiver.process_frame(&accept); + } + } + } + outcome + }; + + match outcome { + // start/accept/chunk consumed — nothing to dispatch yet. + Ok(None) => {} + // The `end` frame: reassembled request ready to dispatch. + Ok(Some(message)) => { + let original_id = message.id().cloned().unwrap_or(serde_json::Value::Null); + // Mirror the incoming wrap kind for the eventual response (CEP-19). + { + let mut kinds_w = request_wrap_kinds.write().await; + kinds_w.insert( + event_id.to_string(), + if is_gift_wrap { Some(outer_kind) } else { None }, + ); + } + event_routes + .register( + event_id.to_string(), + sender_pubkey.to_string(), + original_id, + token, + ) + .await; + let _ = tx.send(IncomingRequest { + message, + client_pubkey: sender_pubkey.to_string(), + event_id: event_id.to_string(), + is_encrypted, + }); + } + // D11: clean up locally, let the peer's own timeout fire. + Err(error) => { + tracing::warn!( + target: LOG_TARGET, + error = %error, + sender_pubkey = %sender_pubkey, + "Oversized transfer frame rejected; cleaning up locally" + ); + } + } + } + + /// CEP-22: publish a single `accept` frame back to `sender_pubkey`, e-tagged to + /// the `start` frame's carrying event. Best-effort — failures are logged only + /// (the sender falls back to its own accept timeout). + #[allow(clippy::too_many_arguments)] + async fn emit_accept_frame( + token: &str, + sender_pubkey: &str, + start_event_id: &str, + is_encrypted: bool, + is_gift_wrap: bool, + outer_kind: u16, + relay_pool: &Arc, + encryption_mode: EncryptionMode, + gift_wrap_mode: GiftWrapMode, + ) { + let client_pk = match PublicKey::from_hex(sender_pubkey) { + Ok(pk) => pk, + Err(_) => return, + }; + let event_id_parsed = EventId::from_hex(start_event_id).unwrap_or(EventId::all_zeros()); + let accept = + match OversizedFrame::Accept.into_progress_notification(token, ACCEPT_PROGRESS, None) { + Ok(n) => JsonRpcMessage::Notification(n), + Err(error) => { + tracing::error!( + target: LOG_TARGET, + error = %error, + "Failed to build oversized-transfer accept frame" + ); + return; + } + }; + let tags = BaseTransport::create_response_tags(&client_pk, &event_id_parsed); + let base = BaseTransport { + relay_pool: Arc::clone(relay_pool), + encryption_mode, + is_connected: true, + }; + if let Err(error) = base + .send_mcp_message( + &accept, + &client_pk, + CTXVM_MESSAGES_KIND, + tags, + Some(is_encrypted), + Self::select_outbound_gift_wrap_kind( + gift_wrap_mode, + is_encrypted, + if is_gift_wrap { Some(outer_kind) } else { None }, + ), + ) + .await + { + tracing::error!( + target: LOG_TARGET, + error = %error, + sender_pubkey = %sender_pubkey, + "Failed to send oversized-transfer accept frame" + ); + } + } + async fn cleanup_sessions( sessions: &SessionStore, event_routes: &ServerEventRouteStore, diff --git a/tests/transport_integration.rs b/tests/transport_integration.rs index f6f2946..55fdbea 100644 --- a/tests/transport_integration.rs +++ b/tests/transport_integration.rs @@ -15,13 +15,19 @@ use async_trait::async_trait; use contextvm_sdk::core::constants::tags; use contextvm_sdk::core::constants::{ mcp_protocol_version, CTXVM_MESSAGES_KIND, EPHEMERAL_GIFT_WRAP_KIND, GIFT_WRAP_KIND, - PROMPTS_LIST_KIND, RESOURCES_LIST_KIND, RESOURCETEMPLATES_LIST_KIND, SERVER_ANNOUNCEMENT_KIND, - TOOLS_LIST_KIND, + MAX_MESSAGE_SIZE, PROMPTS_LIST_KIND, RESOURCES_LIST_KIND, RESOURCETEMPLATES_LIST_KIND, + SERVER_ANNOUNCEMENT_KIND, TOOLS_LIST_KIND, }; use contextvm_sdk::core::types::{EncryptionMode, GiftWrapMode}; use contextvm_sdk::relay::mock::MockRelayPool; +use contextvm_sdk::transport::base::BaseTransport; use contextvm_sdk::transport::client::{NostrClientTransport, NostrClientTransportConfig}; -use contextvm_sdk::transport::server::{NostrServerTransport, NostrServerTransportConfig}; +use contextvm_sdk::transport::oversized_transfer::{ + build_oversized_frames, OversizedFrame, OversizedSenderOptions, OversizedTransferConfig, +}; +use contextvm_sdk::transport::server::{ + IncomingRequest, NostrServerTransport, NostrServerTransportConfig, +}; use contextvm_sdk::{ CapabilityExclusion, JsonRpcMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, RelayPoolTrait, ServerInfo, @@ -2788,12 +2794,15 @@ async fn server_gate_blocks_oversized_when_disabled() { async fn server_gate_blocks_oversized_when_client_does_not_advertise() { let (client_pool, server_pool) = MockRelayPool::create_pair(); let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + // Server is oversized-capable with a low threshold; the only thing that should + // keep the (large) response un-fragmented is the client not advertising support. let mut server = NostrServerTransport::with_relay_pool( NostrServerTransportConfig::default() .with_encryption_mode(EncryptionMode::Disabled) - .with_oversized_enabled(true), - as_pool(server_pool), + .with_oversized_transfer(OversizedTransferConfig::enabled().with_threshold(2000)), + Arc::clone(&server_pool) as Arc, ) .await .unwrap(); @@ -2814,12 +2823,14 @@ async fn server_gate_blocks_oversized_when_client_does_not_advertise() { client.start().await.unwrap(); let_event_loops_start().await; + // Request carries a progressToken so the server's only remaining fragmentation + // blocker is the missing client capability (not a missing token). client .send(&JsonRpcMessage::Request(JsonRpcRequest { jsonrpc: "2.0".to_string(), id: serde_json::json!(1), - method: "initialize".to_string(), - params: None, + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": "tok-block" } })), })) .await .unwrap(); @@ -2837,6 +2848,30 @@ async fn server_gate_blocks_oversized_when_client_does_not_advertise() { !snap.supports_oversized_transfer, "server must not learn oversized support the client never advertised" ); + + // Server replies with a payload well over the threshold. Because the client + // never advertised support, the gate must block fragmentation: the response is + // a single event, not start/chunk/end frames. + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(1), + result: serde_json::json!({ "blob": "Z".repeat(8000) }), + }); + server + .send_response(&incoming.event_id, response) + .await + .expect("server send response"); + + let server_response_frames = server_pool + .stored_events() + .await + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND) && e.pubkey == server_pubkey) + .count(); + assert_eq!( + server_response_frames, 1, + "gate off (client did not advertise) must send the response as exactly one event" + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -3614,3 +3649,819 @@ async fn server_close_stops_event_loop() { "after close(), message receiver must yield None (channel closed)" ); } + +// ── CEP-22 PR 3 (M3): oversized transfer end-to-end wiring ─────────────────── + +/// C→S: a request whose serialized size exceeds the threshold is fragmented into +/// multiple kind-25910 frames and reassembled into exactly one IncomingRequest. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_request_roundtrip_client_to_server() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // A request carrying a progressToken and a payload well over the threshold. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("big-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": "tok-req" }, + "blob": "A".repeat(10000), + })), + }); + client.send(&request).await.expect("send oversized request"); + + // The wire must carry more than one kind-25910 frame (start + chunks + end). + let frame_count = server_pool + .stored_events() + .await + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .count(); + assert!( + frame_count > 1, + "oversized request must publish multiple frames, got {frame_count}" + ); + + // Exactly one reassembled request arrives at the server. + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.method(), Some("tools/call")); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("big-1"))); + + // No second message should surface. + let second = tokio::time::timeout(Duration::from_millis(100), server_rx.recv()).await; + assert!(second.is_err(), "only one reassembled request expected"); +} + +/// S→C: a large response is fragmented by the server and reassembled by the +/// client into a single response delivered on its receiver. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_response_roundtrip_server_to_client() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + let mut client_rx = client.take_message_receiver().expect("client rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // Small request (single event) but carrying a progressToken so the server may + // fragment its response. + let request = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("rsp-1"), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ "_meta": { "progressToken": "tok-rsp" } })), + }); + client.send(&request).await.expect("send request"); + + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for request") + .expect("server channel closed"); + + // Server replies with a payload well over the threshold. + let response = JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: serde_json::json!("rsp-1"), + result: serde_json::json!({ "blob": "B".repeat(10000) }), + }); + server + .send_response(&incoming.event_id, response) + .await + .expect("server send oversized response"); + + // The client reassembles and delivers the full response. + let client_msg = tokio::time::timeout(Duration::from_millis(1000), client_rx.recv()) + .await + .expect("timeout waiting for reassembled response") + .expect("client channel closed"); + assert!(client_msg.is_response()); + assert_eq!(client_msg.id(), Some(&serde_json::json!("rsp-1"))); + if let JsonRpcMessage::Response(r) = client_msg { + assert_eq!(r.result["blob"], serde_json::json!("B".repeat(10000))); + } else { + panic!("expected a response"); + } + + // Test gap 6: the start frame of the oversized response carried the server's + // discovery tags, so the client must have learned the server supports oversized + // transfer by the time the reassembled response is delivered. + assert!( + client + .discovered_server_capabilities() + .supports_oversized_transfer, + "client must learn server oversized support from the first oversized response" + ); +} + +// ── CEP-22 PR 3 (M4): remaining §7 oversized-transfer tests ────────────────── + +/// Start an oversized-enabled server over `server_pool`, returning the live +/// transport (keep alive) and its request receiver. +async fn start_oversized_server( + server_pool: Arc, + encryption_mode: EncryptionMode, + oversized: OversizedTransferConfig, +) -> ( + NostrServerTransport, + tokio::sync::mpsc::UnboundedReceiver, +) { + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(encryption_mode) + .with_oversized_transfer(oversized), + server_pool as Arc, + ) + .await + .expect("create server transport"); + let rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + (server, rx) +} + +/// A plaintext `BaseTransport` over `client_pool` used to inject raw frames +/// (grey-box: bypasses `NostrClientTransport::send`). +fn greybox_client_base(client_pool: MockRelayPool) -> BaseTransport { + BaseTransport { + relay_pool: as_pool(client_pool), + encryption_mode: EncryptionMode::Disabled, + is_connected: true, + } +} + +/// Publish one frame as a plaintext kind-25910 event tagged to `server_pubkey`. +async fn publish_plain_frame( + base: &BaseTransport, + server_pubkey: &PublicKey, + tags: &[Tag], + frame: JsonRpcNotification, +) { + let message = JsonRpcMessage::Notification(frame); + base.send_mcp_message( + &message, + server_pubkey, + CTXVM_MESSAGES_KIND, + tags.to_vec(), + Some(false), + None, + ) + .await + .expect("publish frame"); +} + +/// A request whose serialized form carries `progressToken` and a payload of +/// `blob_len` bytes (used to force fragmentation). +fn oversized_request(id: &str, token: &str, blob_len: usize) -> JsonRpcMessage { + JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: serde_json::json!(id), + method: "tools/call".to_string(), + params: Some(serde_json::json!({ + "_meta": { "progressToken": token }, + "blob": "Q".repeat(blob_len), + })), + }) +} + +/// Out-of-order delivery: chunks published in a permuted order (within the +/// out-of-order window) still reassemble into the original request. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_out_of_order_delivery_reassembles() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(64) + .with_chunk_size(32); + let (_server, mut server_rx) = start_oversized_server( + Arc::clone(&server_pool), + EncryptionMode::Disabled, + oversized, + ) + .await; + let_event_loops_start().await; + + let base = greybox_client_base(client_pool); + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + + let request = oversized_request("ooo-1", "tok-ooo", 300); + let serialized = serde_json::to_string(&request).unwrap(); + let opts = OversizedSenderOptions::new("tok-ooo") + .with_chunk_size(32) + .with_accept_handshake(true); + let frames = build_oversized_frames(&serialized, &opts).unwrap(); + let start = frames.start; + let mut chunks = frames.chunks; + let end = frames.end; + assert!( + chunks.len() > 2, + "need several chunks to permute meaningfully" + ); + + // start first; then the base chunk (so the reserved-accept-slot guard is + // satisfied); then the remaining chunks reversed; then end. + publish_plain_frame(&base, &server_pubkey, &tags, start).await; + let base_chunk = chunks.remove(0); + publish_plain_frame(&base, &server_pubkey, &tags, base_chunk).await; + for chunk in chunks.into_iter().rev() { + publish_plain_frame(&base, &server_pubkey, &tags, chunk).await; + } + publish_plain_frame(&base, &server_pubkey, &tags, end).await; + + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.method(), Some("tools/call")); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("ooo-1"))); +} + +/// Digest mismatch: corrupting one chunk's `cvm.data` fails reassembly, and no +/// partial message is ever surfaced (recv times out). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_digest_mismatch_surfaces_nothing() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(64) + .with_chunk_size(32); + let (_server, mut server_rx) = start_oversized_server( + Arc::clone(&server_pool), + EncryptionMode::Disabled, + oversized, + ) + .await; + let_event_loops_start().await; + + let base = greybox_client_base(client_pool); + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + + let request = oversized_request("dig-1", "tok-dig", 300); + let serialized = serde_json::to_string(&request).unwrap(); + let opts = OversizedSenderOptions::new("tok-dig") + .with_chunk_size(32) + .with_accept_handshake(true); + let frames = build_oversized_frames(&serialized, &opts).unwrap(); + let start = frames.start; + let mut chunks = frames.chunks; + let end = frames.end; + + // Corrupt a middle chunk's payload so the reassembled bytes/digest mismatch. + let mid = chunks.len() / 2; + if let Some(params) = chunks[mid].params.as_mut() { + params["cvm"]["data"] = serde_json::json!("corrupted-bytes-not-original"); + } + + publish_plain_frame(&base, &server_pubkey, &tags, start).await; + for chunk in chunks { + publish_plain_frame(&base, &server_pubkey, &tags, chunk).await; + } + publish_plain_frame(&base, &server_pubkey, &tags, end).await; + + let result = tokio::time::timeout(Duration::from_millis(400), server_rx.recv()).await; + assert!( + result.is_err(), + "digest mismatch must not surface any (partial) message" + ); +} + +/// Abort: a `start` followed by an `abort` frame terminates the transfer; no +/// message surfaces. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_abort_surfaces_nothing() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(64) + .with_chunk_size(32); + let (_server, mut server_rx) = start_oversized_server( + Arc::clone(&server_pool), + EncryptionMode::Disabled, + oversized, + ) + .await; + let_event_loops_start().await; + + let base = greybox_client_base(client_pool); + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + + let request = oversized_request("abt-1", "tok-abt", 300); + let serialized = serde_json::to_string(&request).unwrap(); + let opts = OversizedSenderOptions::new("tok-abt") + .with_chunk_size(32) + .with_accept_handshake(true); + let frames = build_oversized_frames(&serialized, &opts).unwrap(); + + publish_plain_frame(&base, &server_pubkey, &tags, frames.start).await; + let abort = OversizedFrame::Abort { reason: None } + .into_progress_notification("tok-abt", 2, None) + .expect("build abort frame"); + publish_plain_frame(&base, &server_pubkey, &tags, abort).await; + + let result = tokio::time::timeout(Duration::from_millis(400), server_rx.recv()).await; + assert!( + result.is_err(), + "an aborted transfer must not surface a message" + ); +} + +/// Per-frame size bound: every published kind-25910 frame stays under the 1 MiB +/// single-message cap, even for a payload far larger than one frame. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_each_frame_under_max_message_size() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // Default chunk size (48_000) keeps each frame well under MAX_MESSAGE_SIZE. + let oversized = OversizedTransferConfig::enabled(); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // ~200 KB payload → several ~48 KB frames. + let request = oversized_request("sz-1", "tok-sz", 200_000); + client.send(&request).await.expect("send oversized request"); + + let incoming = tokio::time::timeout(Duration::from_millis(1500), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("sz-1"))); + + let frames: Vec<_> = server_pool + .stored_events() + .await + .into_iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .collect(); + assert!(frames.len() > 1, "payload must have been fragmented"); + for frame in &frames { + assert!( + frame.content.len() < MAX_MESSAGE_SIZE, + "every frame must stay under MAX_MESSAGE_SIZE, got {}", + frame.content.len() + ); + } +} + +/// A reassembled payload larger than the 1 MiB single-message cap is delivered +/// in full (the per-transfer cap defaults to 100 MiB). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_reassembled_over_one_mib_succeeds() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled(); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // ~1.2 MB payload, comfortably over MAX_MESSAGE_SIZE (1 MiB). + let blob_len = 1_200_000; + let request = oversized_request("big-mib", "tok-mib", blob_len); + let serialized_len = serde_json::to_string(&request).unwrap().len(); + assert!( + serialized_len > MAX_MESSAGE_SIZE, + "request must exceed 1 MiB" + ); + + client.send(&request).await.expect("send >1MiB request"); + + let incoming = tokio::time::timeout(Duration::from_millis(3000), server_rx.recv()) + .await + .expect("timeout waiting for >1MiB reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("big-mib"))); + if let JsonRpcMessage::Request(req) = incoming.message { + assert_eq!( + req.params.unwrap()["blob"], + serde_json::json!("Q".repeat(blob_len)) + ); + } else { + panic!("expected a request"); + } +} + +/// Gate off: with oversized transfer disabled on the client, an above-threshold +/// request is published as a single event (no fragmentation). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_gate_off_sends_single_event() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + // Server is oversized-capable, but the client's gate is OFF. + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(EncryptionMode::Disabled) + .with_oversized_transfer(OversizedTransferConfig::enabled().with_threshold(256)), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(EncryptionMode::Disabled) + // Disabled gate; threshold present only to prove it is above-threshold. + .with_oversized_transfer(OversizedTransferConfig::default().with_threshold(256)), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + // Above the 256-byte threshold, but below the 1 MiB single-event cap. + let request = oversized_request("gate-off", "tok-gate", 2000); + client.send(&request).await.expect("send request"); + + let incoming = tokio::time::timeout(Duration::from_millis(1000), server_rx.recv()) + .await + .expect("timeout waiting for request") + .expect("server channel closed"); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("gate-off"))); + + let frame_count = server_pool + .stored_events() + .await + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .count(); + assert_eq!( + frame_count, 1, + "gate off must publish exactly one single event, got {frame_count}" + ); +} + +/// Oversized request roundtrip under a given encryption mode. +async fn run_oversized_request_roundtrip_mode(encryption_mode: EncryptionMode) { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(6000) + .with_chunk_size(6000); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(encryption_mode) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(encryption_mode) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + let request = oversized_request("mode-1", "tok-mode", 10000); + client.send(&request).await.expect("send oversized request"); + + let incoming = tokio::time::timeout(Duration::from_millis(1500), server_rx.recv()) + .await + .expect("timeout waiting for reassembled request") + .expect("server channel closed"); + assert_eq!(incoming.message.method(), Some("tools/call")); + assert_eq!(incoming.message.id(), Some(&serde_json::json!("mode-1"))); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_request_roundtrip_encryption_disabled() { + run_oversized_request_roundtrip_mode(EncryptionMode::Disabled).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_request_roundtrip_encryption_optional() { + run_oversized_request_roundtrip_mode(EncryptionMode::Optional).await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_request_roundtrip_encryption_required() { + run_oversized_request_roundtrip_mode(EncryptionMode::Required).await; +} + +// ── CEP-22 Gap #1: oversized decision measures the published event ──────────── + +/// Send `request` through a fresh client/server pair under `encryption_mode` with +/// the given oversized `threshold`, then return how many kind-25910 (plaintext +/// frames) and kind-1059 (gift-wrapped frames) events the client published. A +/// single non-oversized send is one event; a fragmented send is several. +async fn oversized_published_frame_counts( + encryption_mode: EncryptionMode, + threshold: usize, + request: &JsonRpcMessage, +) -> (usize, usize) { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled().with_threshold(threshold); + + let mut server = NostrServerTransport::with_relay_pool( + NostrServerTransportConfig::default() + .with_encryption_mode(encryption_mode) + .with_oversized_transfer(oversized.clone()), + Arc::clone(&server_pool) as Arc, + ) + .await + .expect("create server transport"); + + let mut client = NostrClientTransport::with_relay_pool( + NostrClientTransportConfig::default() + .with_relay_urls(vec!["wss://mock.relay".to_string()]) + .with_server_pubkey(server_pubkey.to_hex()) + .with_encryption_mode(encryption_mode) + .with_oversized_transfer(oversized), + as_pool(client_pool), + ) + .await + .expect("create client transport"); + + let mut server_rx = server.take_message_receiver().expect("server rx"); + server.start().await.expect("server start"); + client.start().await.expect("client start"); + let_event_loops_start().await; + + client.send(request).await.expect("send request"); + + // Wait for the request to surface (single event or fully reassembled transfer), + // then settle briefly so every frame has landed in the shared store. + let _ = tokio::time::timeout(Duration::from_millis(1500), server_rx.recv()).await; + tokio::time::sleep(Duration::from_millis(50)).await; + + let events = server_pool.stored_events().await; + let kind_25910 = events + .iter() + .filter(|e| e.kind == Kind::Custom(CTXVM_MESSAGES_KIND)) + .count(); + let kind_1059 = events + .iter() + .filter(|e| e.kind == Kind::Custom(GIFT_WRAP_KIND)) + .count(); + (kind_25910, kind_1059) +} + +/// Gap #1 regression: the oversized decision is made on the *published* Nostr +/// event size, not the raw payload. A request whose raw serialized length is below +/// the threshold is sent as a single event in plaintext, but the SAME payload must +/// be fragmented once gift-wrap encryption inflates the published event past the +/// threshold (base64 ×1.333 + NIP-44 padding). Mirrors TS +/// `measurePublishedMcpMessageSize`; the old raw-length check sent both as a single +/// (over-cap) event. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn oversized_decision_accounts_for_encryption_inflation() { + // Margins (measured): payload=5900, threshold=8000. + // raw serialized ≈ 6013 bytes (< threshold — old check fragmented neither) + // plaintext published ≈ 6455 bytes (< threshold → single event) + // encrypted published ≈ 10065 bytes (> threshold → fragments) + // The 5900 payload is chosen so gift-wrap inflation (base64 ×1.333 + NIP-44 + // padding) is what crosses the threshold; adjust it if the envelope changes. + let threshold = 8000; + let request = oversized_request("enc-infl", "tok-infl", 5900); + + // Precondition: the raw payload is below the threshold, so the *old* raw-length + // decision would have sent a single event in BOTH modes. + let raw_len = serde_json::to_string(&request) + .expect("serialize request") + .len(); + assert!( + raw_len < threshold, + "precondition: raw payload ({raw_len}) must be below the threshold ({threshold})" + ); + + // Plaintext: the published event stays under the threshold → exactly one event. + let (plain_25910, _) = + oversized_published_frame_counts(EncryptionMode::Disabled, threshold, &request).await; + assert_eq!( + plain_25910, 1, + "a below-threshold plaintext request must publish exactly one kind-25910 event, got {plain_25910}" + ); + + // Encrypted: gift-wrap inflation pushes the published event over the threshold, + // so the SAME payload is now fragmented into multiple gift-wrapped frames. + let (_, enc_1059) = + oversized_published_frame_counts(EncryptionMode::Required, threshold, &request).await; + assert!( + enc_1059 > 1, + "the same request must fragment once gift-wrap inflates it past the threshold, got {enc_1059} gift-wrap frames" + ); +} + +// ── CEP-22 PR 3 gap 4: server accept-frame emission shape ──────────────────── + +/// Poll the shared event store for the server's emitted `accept` frame (a +/// plaintext kind-25910 notification authored by the server whose `cvm.frameType` +/// is `accept`). Panics if none appears within the window. +async fn poll_for_accept_frame( + server_pool: &Arc, + server_pubkey: &PublicKey, +) -> JsonRpcNotification { + for _ in 0..50 { + for event in server_pool.stored_events().await { + if event.kind != Kind::Custom(CTXVM_MESSAGES_KIND) || event.pubkey != *server_pubkey { + continue; + } + if let Ok(notif) = serde_json::from_str::(&event.content) { + let is_accept = notif + .params + .as_ref() + .and_then(|p| p.get("cvm")) + .and_then(|cvm| cvm.get("frameType")) + .and_then(|ft| ft.as_str()) + == Some("accept"); + if is_accept { + return notif; + } + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("server did not emit an accept frame within the poll window"); +} + +/// When a client sends an oversized `start` before the server knows it supports +/// oversized transfer, the server emits an `accept` frame with `progress == 2` +/// and `cvm.frameType == "accept"`. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn server_emits_accept_frame_with_expected_shape() { + let (client_pool, server_pool) = MockRelayPool::create_pair(); + let server_pubkey = server_pool.mock_public_key(); + let server_pool = Arc::new(server_pool); + + let oversized = OversizedTransferConfig::enabled() + .with_threshold(2000) + .with_chunk_size(2000); + let (_server, _server_rx) = start_oversized_server( + Arc::clone(&server_pool), + EncryptionMode::Disabled, + oversized, + ) + .await; + let_event_loops_start().await; + + // Grey-box: inject only the `start` frame (handshake slot reserved) from a + // client that has not advertised oversized support. + let base = greybox_client_base(client_pool); + let tags = BaseTransport::create_recipient_tags(&server_pubkey); + let request = oversized_request("acc-1", "tok-acc", 300); + let serialized = serde_json::to_string(&request).unwrap(); + let opts = OversizedSenderOptions::new("tok-acc") + .with_chunk_size(64) + .with_accept_handshake(true); + let frames = build_oversized_frames(&serialized, &opts).unwrap(); + publish_plain_frame(&base, &server_pubkey, &tags, frames.start).await; + + let accept = poll_for_accept_frame(&server_pool, &server_pubkey).await; + let params = accept.params.as_ref().expect("accept frame has params"); + assert_eq!( + params["progress"].as_u64(), + Some(2), + "accept frame must be published at progress slot 2" + ); + assert_eq!( + params["cvm"]["frameType"].as_str(), + Some("accept"), + "frame must be an oversized-transfer accept" + ); + assert_eq!( + params["cvm"]["type"].as_str(), + Some("oversized-transfer"), + "accept frame must carry the oversized-transfer cvm type" + ); + assert_eq!( + params["progressToken"].as_str(), + Some("tok-acc"), + "accept frame must echo the transfer's progressToken" + ); +}