diff --git a/Cargo.toml b/Cargo.toml index c90b711..6105f39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,10 @@ tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" +# Hashing for CEP-22 oversized payload transfer (SHA-256 digest of reassembled payloads) +sha2 = "0.10" +hex = "0.4" + # Error handling thiserror = "2.0" async-trait = "0.1" diff --git a/src/core/error.rs b/src/core/error.rs index fe8891a..505a17d 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -34,6 +34,10 @@ pub enum Error { #[error("Serialization error: {0}")] Serialization(#[from] serde_json::Error), + /// CEP-22 oversized payload transfer error (framing/reassembly) + #[error("Oversized transfer error: {0}")] + OversizedTransfer(#[from] crate::transport::oversized_transfer::OversizedTransferError), + /// Generic error #[error("{0}")] Other(String), diff --git a/src/core/validation.rs b/src/core/validation.rs index 9e6dc5d..355c60d 100644 --- a/src/core/validation.rs +++ b/src/core/validation.rs @@ -19,6 +19,29 @@ pub fn validate_and_parse(content: &str) -> Option { validate_message(&value) } +/// Validate size against a custom byte bound, then parse into a [`JsonRpcMessage`]. +/// +/// Unlike [`validate_and_parse`], which enforces the 1 MB per-event cap +/// ([`MAX_MESSAGE_SIZE`]), this bounds `content` to `max_bytes` — the receiver +/// policy's `maxTransferBytes` (100 MiB default). It is the dedicated entrypoint +/// for **reassembled** CEP-22 oversized payloads, which legitimately exceed the +/// per-event cap (the very limit the transfer profile works around). The bound +/// keeps a single ceiling on reassembled-payload memory rather than leaving it +/// unbounded. +pub fn validate_and_parse_oversized(content: &str, max_bytes: usize) -> Option { + if content.len() > max_bytes { + tracing::warn!( + "Oversized message exceeds max transfer bytes: {} > {}", + content.len(), + max_bytes + ); + return None; + } + + let value: serde_json::Value = serde_json::from_str(content).ok()?; + validate_message(&value) +} + /// Validate that a JSON value is a well-formed JSON-RPC 2.0 message. /// /// Checks: @@ -98,4 +121,35 @@ mod tests { fn test_validate_and_parse_rejects_invalid_json() { assert!(validate_and_parse("not json").is_none()); } + + #[test] + fn test_validate_and_parse_oversized_bypasses_1mb_cap() { + // A valid message larger than the 1 MB per-event cap. + let big_value = "m".repeat(MAX_MESSAGE_SIZE + 1024); + let content = format!(r#"{{"jsonrpc":"2.0","id":1,"result":{{"value":"{big_value}"}}}}"#); + assert!(content.len() > MAX_MESSAGE_SIZE); + + // The standard entrypoint rejects it on size... + assert!(validate_and_parse(&content).is_none()); + // ...but the oversized entrypoint accepts it under a higher bound. + let parsed = validate_and_parse_oversized(&content, 100 * 1024 * 1024).unwrap(); + assert!(parsed.is_response()); + } + + #[test] + fn test_validate_and_parse_oversized_enforces_its_own_bound() { + let content = r#"{"jsonrpc":"2.0","id":1,"method":"tools/list"}"#; + // Rejected when the content exceeds the supplied max_bytes. + assert!(validate_and_parse_oversized(content, 8).is_none()); + // Accepted under a sufficient bound. + assert!(validate_and_parse_oversized(content, 1024) + .unwrap() + .is_request()); + } + + #[test] + fn test_validate_and_parse_oversized_rejects_invalid_version() { + let content = r#"{"jsonrpc":"1.0","id":1,"method":"test"}"#; + assert!(validate_and_parse_oversized(content, 1024).is_none()); + } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index a5d53e8..7b17104 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -6,6 +6,7 @@ pub mod base; pub mod client; pub mod discovery_tags; +pub mod oversized_transfer; pub mod server; pub use client::{ClientCorrelationStore, NostrClientTransport, NostrClientTransportConfig}; diff --git a/src/transport/oversized_transfer/codec.rs b/src/transport/oversized_transfer/codec.rs new file mode 100644 index 0000000..f5b1bf4 --- /dev/null +++ b/src/transport/oversized_transfer/codec.rs @@ -0,0 +1,360 @@ +//! Pure framing codec: digest, UTF-8 char-aware splitting, and frame building. +//! +//! Ports `sdk/src/transport/oversized-transfer/sender.ts`. No transport or I/O — +//! given a serialized JSON-RPC string this produces the ordered sequence of +//! `notifications/progress` frames a sender publishes. + +use sha2::{Digest, Sha256}; + +use crate::core::types::JsonRpcNotification; + +use super::constants::{ACCEPT_PROGRESS, DEFAULT_CHUNK_SIZE, DIGEST_PREFIX, START_PROGRESS}; +use super::errors::OversizedTransferError; +use super::frame::{CompletionMode, OversizedFrame}; + +/// Options controlling how a payload is split into frames. +#[derive(Debug, Clone)] +pub struct OversizedSenderOptions { + /// The MCP `progressToken` stamped on every frame in the transfer. + pub progress_token: String, + /// Per-chunk byte size. Defaults to [`DEFAULT_CHUNK_SIZE`]. + pub chunk_size_bytes: usize, + /// Whether to reserve the `accept` slot (progress 2) for a handshake, so the + /// first `chunk` starts at progress 3. When `false`, chunks start at 2. + pub needs_accept_handshake: bool, +} + +impl OversizedSenderOptions { + /// Create options for `progress_token` with default chunk size and no handshake. + pub fn new(progress_token: impl Into) -> Self { + Self { + progress_token: progress_token.into(), + chunk_size_bytes: DEFAULT_CHUNK_SIZE, + needs_accept_handshake: false, + } + } + + /// Override the per-chunk byte size. + pub fn with_chunk_size(mut self, chunk_size_bytes: usize) -> Self { + self.chunk_size_bytes = chunk_size_bytes; + self + } + + /// Set whether the `accept` handshake slot is reserved. + pub fn with_accept_handshake(mut self, needs_accept_handshake: bool) -> Self { + self.needs_accept_handshake = needs_accept_handshake; + self + } +} + +/// The ordered frames produced from a serialized payload. +#[derive(Debug, Clone)] +pub struct BuiltOversizedFrames { + /// The opening `start` frame (progress [`START_PROGRESS`]). + pub start: JsonRpcNotification, + /// The `chunk` frames in ascending `progress` order. + pub chunks: Vec, + /// The closing `end` frame. + pub end: JsonRpcNotification, +} + +impl BuiltOversizedFrames { + /// Total number of frames (`start` + chunks + `end`). + pub fn frame_count(&self) -> usize { + self.chunks.len() + 2 + } + + /// Consume into a flat vector in canonical send order: `start`, chunks…, `end`. + pub fn into_ordered(self) -> Vec { + let mut frames = Vec::with_capacity(self.frame_count()); + frames.push(self.start); + frames.extend(self.chunks); + frames.push(self.end); + frames + } +} + +/// UTF-8 byte length of a string (Rust strings are already UTF-8). +pub fn utf8_byte_len(value: &str) -> usize { + value.len() +} + +/// Compute the CEP-22 digest of `value`: `"sha256:"` + lowercase hex of the +/// SHA-256 of its UTF-8 bytes. +pub fn sha256_digest(value: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(value.as_bytes()); + format!("{DIGEST_PREFIX}{}", hex::encode(hasher.finalize())) +} + +/// Split `value` into fragments each at most `max_bytes` UTF-8 bytes, never +/// breaking a multibyte codepoint across a boundary. +/// +/// Concatenating the fragments in order reproduces `value` exactly. Returns an +/// error if `max_bytes` is zero or a single character is larger than `max_bytes`. +pub fn split_string_by_byte_size( + value: &str, + max_bytes: usize, +) -> Result, OversizedTransferError> { + if max_bytes == 0 { + return Err(OversizedTransferError::Policy( + "chunk_size_bytes must be greater than zero".to_string(), + )); + } + + let mut chunks: Vec = Vec::new(); + let mut current = String::new(); + let mut current_bytes = 0usize; + + for ch in value.chars() { + let char_bytes = ch.len_utf8(); + if char_bytes > max_bytes { + return Err(OversizedTransferError::Policy(format!( + "Unable to split message: single character exceeds chunk size ({char_bytes} > {max_bytes})" + ))); + } + + if current_bytes > 0 && current_bytes + char_bytes > max_bytes { + chunks.push(std::mem::take(&mut current)); + current_bytes = 0; + } + + current.push(ch); + current_bytes += char_bytes; + } + + if !current.is_empty() { + chunks.push(current); + } + + Ok(chunks) +} + +/// Build the ordered `start` / `chunk…` / `end` frames for `serialized`. +/// +/// Serializes once at the call site (the caller passes the exact JSON-RPC +/// string), then: computes the digest over that string, char-aware splits it, +/// and lays the chunks out on `progress` slots. When +/// [`needs_accept_handshake`](OversizedSenderOptions::needs_accept_handshake) is +/// set, progress 2 is reserved for the receiver's `accept` and chunks begin at 3. +pub fn build_oversized_frames( + serialized: &str, + options: &OversizedSenderOptions, +) -> crate::core::error::Result { + let total_bytes = utf8_byte_len(serialized) as u64; + let digest = sha256_digest(serialized); + + let text_chunks = split_string_by_byte_size(serialized, options.chunk_size_bytes)?; + let total_chunks = text_chunks.len() as u64; + + // No-handshake: chunks reuse the reserved accept slot (progress 2). + // Handshake: accept occupies slot 2, so chunks begin at slot 3. + let chunk_base_progress = if options.needs_accept_handshake { + ACCEPT_PROGRESS + 1 + } else { + ACCEPT_PROGRESS + }; + + let token = options.progress_token.as_str(); + + let start = OversizedFrame::Start { + completion_mode: CompletionMode::Render, + digest, + total_bytes, + total_chunks, + } + .into_progress_notification(token, START_PROGRESS, Some("starting oversized transfer"))?; + + let mut chunks = Vec::with_capacity(text_chunks.len()); + for (i, data) in text_chunks.into_iter().enumerate() { + let progress = chunk_base_progress + i as u64; + let frame = OversizedFrame::Chunk { data }; + chunks.push(frame.into_progress_notification(token, progress, None)?); + } + + let end = OversizedFrame::End.into_progress_notification( + token, + chunk_base_progress + total_chunks, + Some("oversized transfer complete"), + )?; + + Ok(BuiltOversizedFrames { start, chunks, end }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::Value; + + fn frame_type(notification: &JsonRpcNotification) -> String { + notification.params.as_ref().unwrap()["cvm"]["frameType"] + .as_str() + .unwrap() + .to_string() + } + + fn progress(notification: &JsonRpcNotification) -> u64 { + notification.params.as_ref().unwrap()["progress"] + .as_u64() + .unwrap() + } + + // ── digest ────────────────────────────────────────────────────── + + #[test] + fn sha256_digest_known_vectors() { + // Reference SHA-256 vectors with the CEP-22 "sha256:" prefix. + assert_eq!( + sha256_digest(""), + "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + ); + assert_eq!( + sha256_digest("abc"), + "sha256:ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" + ); + } + + #[test] + fn sha256_digest_has_prefix_and_lowercase_hex() { + let digest = sha256_digest("hello world"); + assert!(digest.starts_with("sha256:")); + let hex_part = &digest["sha256:".len()..]; + assert_eq!(hex_part.len(), 64); + assert!(hex_part + .chars() + .all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase())); + } + + #[test] + fn utf8_byte_len_counts_bytes_not_chars() { + assert_eq!(utf8_byte_len("abc"), 3); + assert_eq!(utf8_byte_len("é"), 2); + assert_eq!(utf8_byte_len("🦀"), 4); + assert_eq!(utf8_byte_len("a🦀"), 5); + } + + // ── split_string_by_byte_size ─────────────────────────────────── + + #[test] + fn split_ascii_exact_and_remainder() { + let chunks = split_string_by_byte_size("abcdefghij", 4).unwrap(); + assert_eq!(chunks, vec!["abcd", "efgh", "ij"]); + assert_eq!(chunks.concat(), "abcdefghij"); + } + + #[test] + fn split_empty_yields_no_chunks() { + assert!(split_string_by_byte_size("", 4).unwrap().is_empty()); + } + + #[test] + fn split_rejects_zero_max() { + let err = split_string_by_byte_size("abc", 0).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + } + + #[test] + fn split_multibyte_never_breaks_codepoints() { + // Mixed 1/2/3/4-byte codepoints; chunk size 5 forces boundaries mid-string. + let original = "aé🦀b日x☃yz🦀"; + let chunks = split_string_by_byte_size(original, 5).unwrap(); + + // Concatenation in order reproduces the exact bytes. + assert_eq!(chunks.concat(), original); + // No chunk exceeds the byte budget, and every chunk is valid UTF-8 + // (guaranteed by `String`, i.e. no codepoint was split). + for chunk in &chunks { + assert!(utf8_byte_len(chunk) <= 5, "chunk {chunk:?} exceeds 5 bytes"); + assert!(!chunk.is_empty()); + } + } + + #[test] + fn split_packs_multibyte_up_to_budget() { + // Each 🦀 is 4 bytes; with a budget of 8 two fit per chunk. + assert_eq!( + split_string_by_byte_size("🦀🦀🦀", 8).unwrap(), + vec!["🦀🦀", "🦀"] + ); + } + + #[test] + fn split_rejects_single_char_larger_than_budget() { + // 🦀 is 4 bytes; a 3-byte budget cannot hold it. + let err = split_string_by_byte_size("🦀", 3).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + } + + // ── build_oversized_frames ────────────────────────────────────── + + #[test] + fn build_lays_out_progress_slots_without_handshake() { + let opts = OversizedSenderOptions::new("tok").with_chunk_size(4); + let frames = build_oversized_frames("hello world", &opts).unwrap(); + + // "hello world" (11 bytes) / 4 → 3 chunks. + assert_eq!(frames.chunks.len(), 3); + assert_eq!(frames.frame_count(), 5); + + assert_eq!(progress(&frames.start), 1); + assert_eq!(frame_type(&frames.start), "start"); + // No handshake: chunks reuse the reserved accept slot (start at 2). + assert_eq!(progress(&frames.chunks[0]), 2); + assert_eq!(progress(&frames.chunks[1]), 3); + assert_eq!(progress(&frames.chunks[2]), 4); + assert_eq!(frame_type(&frames.chunks[0]), "chunk"); + assert_eq!(progress(&frames.end), 5); + assert_eq!(frame_type(&frames.end), "end"); + } + + #[test] + fn build_reserves_accept_slot_with_handshake() { + let opts = OversizedSenderOptions::new("tok") + .with_chunk_size(4) + .with_accept_handshake(true); + let frames = build_oversized_frames("hello world", &opts).unwrap(); + + assert_eq!(progress(&frames.start), 1); + // Handshake: slot 2 reserved for accept, chunks begin at 3. + assert_eq!(progress(&frames.chunks[0]), 3); + assert_eq!(progress(&frames.chunks[2]), 5); + assert_eq!(progress(&frames.end), 6); + } + + #[test] + fn build_start_frame_declares_digest_bytes_chunks() { + let opts = OversizedSenderOptions::new("tok").with_chunk_size(4); + let frames = build_oversized_frames("hello world", &opts).unwrap(); + let cvm = &frames.start.params.as_ref().unwrap()["cvm"]; + + assert_eq!(cvm["type"].as_str(), Some("oversized-transfer")); + assert_eq!(cvm["completionMode"], Value::String("render".to_string())); + assert_eq!(cvm["digest"], Value::String(sha256_digest("hello world"))); + assert_eq!(cvm["totalBytes"].as_u64(), Some(11)); + assert_eq!(cvm["totalChunks"].as_u64(), Some(3)); + } + + #[test] + fn build_propagates_single_char_over_budget_error() { + let opts = OversizedSenderOptions::new("tok").with_chunk_size(3); + let err = build_oversized_frames("🦀", &opts).unwrap_err(); + // Surfaced through the crate error as an oversized-transfer policy error. + assert!(matches!( + err, + crate::core::error::Error::OversizedTransfer(OversizedTransferError::Policy(_)) + )); + } + + #[test] + fn build_into_ordered_is_start_chunks_end() { + let opts = OversizedSenderOptions::new("tok").with_chunk_size(4); + let frames = build_oversized_frames("hello world", &opts).unwrap(); + let ordered = frames.into_ordered(); + assert_eq!(ordered.len(), 5); + assert_eq!(frame_type(&ordered[0]), "start"); + assert_eq!(frame_type(&ordered[4]), "end"); + // Progress is strictly increasing across the whole sequence. + let progresses: Vec = ordered.iter().map(progress).collect(); + assert_eq!(progresses, vec![1, 2, 3, 4, 5]); + } +} diff --git a/src/transport/oversized_transfer/constants.rs b/src/transport/oversized_transfer/constants.rs new file mode 100644 index 0000000..ab308dc --- /dev/null +++ b/src/transport/oversized_transfer/constants.rs @@ -0,0 +1,60 @@ +//! CEP-22 oversized-transfer constants. +//! +//! The normative CEP leaves these numeric defaults to implementers, so we adopt +//! the TypeScript SDK's values (`sdk/src/transport/oversized-transfer/constants.ts`) +//! for cross-implementation interop. + +/// The `cvm.type` discriminator carried in every oversized-transfer frame. +pub const OVERSIZED_TRANSFER_TYPE: &str = "oversized-transfer"; + +/// Prefix applied to SHA-256 digest values (lowercase hex follows). +pub const DIGEST_PREFIX: &str = "sha256:"; + +/// JSON-RPC method that carries oversized-transfer frames in `params.cvm`. +pub const NOTIFICATIONS_PROGRESS_METHOD: &str = "notifications/progress"; + +/// Default per-chunk data size (bytes). +/// +/// Conservative: leaves ~16 KiB of headroom under the ~64 KiB relay event +/// threshold so a single gift-wrapped frame stays well below it. +pub const DEFAULT_CHUNK_SIZE: usize = 48_000; + +/// Serialized byte length at or above which the sender switches to oversized transfer. +pub const DEFAULT_OVERSIZED_THRESHOLD: usize = 48_000; + +/// Default upper bound on the total reassembled payload a receiver will accept (100 MiB). +pub const DEFAULT_MAX_TRANSFER_BYTES: u64 = 100 * 1024 * 1024; + +/// Default upper bound on the number of chunks a receiver will accept. +pub const DEFAULT_MAX_TRANSFER_CHUNKS: u64 = 10_000; + +/// Default upper bound on concurrently active receiver-side transfers. +pub const DEFAULT_MAX_CONCURRENT_TRANSFERS: usize = 64; + +/// Default hard timeout for an in-flight transfer (milliseconds). +/// +/// Not enforced by the PR-1 pure engine (no timers yet); wired into the +/// transport watchdog in a later PR. +pub const DEFAULT_TRANSFER_TIMEOUT_MS: u64 = 5 * 60 * 1000; + +/// Default maximum forward gap between the next expected chunk and an +/// out-of-order chunk that will still be buffered. +pub const DEFAULT_MAX_OUT_OF_ORDER_WINDOW: u64 = 21; + +/// Default maximum number of buffered out-of-order chunks. +pub const DEFAULT_MAX_OUT_OF_ORDER_CHUNKS: usize = 42; + +/// Default timeout a sender waits for an `accept` frame before giving up (milliseconds). +/// +/// Used by the sender handshake in a later PR; defined here for parity. +pub const DEFAULT_ACCEPT_TIMEOUT_MS: u64 = 30_000; + +/// Canonical progress slot for the `start` frame. +pub const START_PROGRESS: u64 = 1; + +/// Progress slot reserved for the `accept` frame. +/// +/// In the handshake case the `accept` frame occupies this slot and the first +/// `chunk` starts at [`START_PROGRESS`] + 2. In the no-handshake case no +/// `accept` is sent, so this slot is reused as the first `chunk`. +pub const ACCEPT_PROGRESS: u64 = 2; diff --git a/src/transport/oversized_transfer/errors.rs b/src/transport/oversized_transfer/errors.rs new file mode 100644 index 0000000..baf9a23 --- /dev/null +++ b/src/transport/oversized_transfer/errors.rs @@ -0,0 +1,54 @@ +//! Error taxonomy for CEP-22 oversized transfers. +//! +//! Mirrors the TypeScript SDK's error classes +//! (`sdk/src/transport/oversized-transfer/errors.ts`) so failure classification +//! is identical across implementations. Surfaced through the crate-level +//! [`crate::Error`] via a `#[from]` conversion. + +/// Errors raised while building, framing, or reassembling an oversized transfer. +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum OversizedTransferError { + /// The remote peer aborted the transfer (terminal). + #[error("Transfer aborted (token: {token}){}", .reason.as_deref().map(|r| format!(": {r}")).unwrap_or_default())] + Abort { + /// The `progressToken` of the aborted transfer. + token: String, + /// Optional advisory reason supplied in the `abort` frame. + reason: Option, + }, + + /// A frame violated a declared or configured policy limit + /// (`totalBytes`, `totalChunks`, concurrency, out-of-order bounds, chunk size). + #[error("Oversized transfer policy violation: {0}")] + Policy(String), + + /// The byte length or SHA-256 digest of the reassembled payload did not match + /// the values declared in the `start` frame. + #[error("Oversized transfer integrity error: {0}")] + Digest(String), + + /// Chunks could not be reassembled (missing frames, unresolved gaps, or the + /// reassembled payload is not a valid JSON-RPC message). + #[error("Oversized transfer reassembly error: {0}")] + Reassembly(String), + + /// Frames violated CEP-22 sequencing rules (bad progress ordering, + /// duplicate `start`, conflicting duplicate chunk, missing token). + #[error("Oversized transfer sequence error: {0}")] + Sequence(String), +} + +impl OversizedTransferError { + /// Construct an [`OversizedTransferError::Abort`]. + pub fn abort(token: impl Into, reason: Option) -> Self { + Self::Abort { + token: token.into(), + reason, + } + } + + /// Returns `true` if this is a terminal [`abort`](OversizedTransferError::Abort). + pub fn is_abort(&self) -> bool { + matches!(self, Self::Abort { .. }) + } +} diff --git a/src/transport/oversized_transfer/frame.rs b/src/transport/oversized_transfer/frame.rs new file mode 100644 index 0000000..b5242e2 --- /dev/null +++ b/src/transport/oversized_transfer/frame.rs @@ -0,0 +1,247 @@ +//! CEP-22 `cvm` frame types and their `notifications/progress` envelope. +//! +//! A frame is the ContextVM `cvm` extension object embedded in the `params` of +//! an MCP `notifications/progress` notification. The outer `params` also carry +//! the `progressToken` and a strictly-monotonic `progress` value (the canonical +//! reassembly index). See `frame` shapes in +//! `sdk/src/transport/oversized-transfer/types.ts`. + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +use crate::core::types::JsonRpcNotification; + +use super::constants::{NOTIFICATIONS_PROGRESS_METHOD, OVERSIZED_TRANSFER_TYPE}; + +/// Completion mode of a `start` frame. CEP-22 v1 mandates `render`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum CompletionMode { + /// The only completion mode defined in v1: the receiver renders the + /// reassembled payload as a single MCP message. + #[default] + Render, +} + +/// A CEP-22 oversized-transfer frame (the `cvm` object). +/// +/// Internally tagged on `frameType`. The constant `cvm.type` +/// (`"oversized-transfer"`) is handled separately by +/// [`to_cvm_value`](Self::to_cvm_value) / [`from_cvm_value`](Self::from_cvm_value) +/// rather than encoded as an enum field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "frameType", rename_all = "camelCase")] +pub enum OversizedFrame { + /// Opens a transfer: declares the digest, total byte length, and chunk count. + #[serde(rename_all = "camelCase")] + Start { + /// Completion mode (MUST be [`CompletionMode::Render`] in v1). + completion_mode: CompletionMode, + /// `"sha256:"` + lowercase hex of the SHA-256 of the serialized payload. + digest: String, + /// Total UTF-8 byte length of the serialized JSON-RPC payload. + total_bytes: u64, + /// Number of `chunk` frames that follow. + total_chunks: u64, + }, + /// Receiver handshake acknowledgement; lets the sender begin sending chunks. + Accept, + /// One ordered fragment of the serialized JSON-RPC string. + Chunk { + /// A UTF-8 substring fragment; concatenating chunks in `progress` order + /// reproduces the exact serialized payload bytes. + data: String, + }, + /// Closes a transfer; the receiver validates and materializes the payload. + End, + /// Terminates a transfer early (terminal). + Abort { + /// Optional advisory reason. + #[serde(default, skip_serializing_if = "Option::is_none")] + reason: Option, + }, +} + +impl OversizedFrame { + /// Returns the `frameType` discriminator string for this frame. + pub fn frame_type(&self) -> &'static str { + match self { + Self::Start { .. } => "start", + Self::Accept => "accept", + Self::Chunk { .. } => "chunk", + Self::End => "end", + Self::Abort { .. } => "abort", + } + } + + /// Serialize this frame into a `cvm` object [`Value`], injecting the constant + /// `type` discriminator (`"oversized-transfer"`). + pub fn to_cvm_value(&self) -> Result { + let mut value = serde_json::to_value(self)?; + if let Value::Object(map) = &mut value { + map.insert( + "type".to_string(), + Value::String(OVERSIZED_TRANSFER_TYPE.to_string()), + ); + } + Ok(value) + } + + /// Parse a `cvm` object [`Value`] into a typed frame, verifying the `type` + /// discriminator. Returns `None` when `value` is not an oversized-transfer + /// frame or fails to parse. + pub fn from_cvm_value(value: &Value) -> Option { + if value.get("type").and_then(Value::as_str) != Some(OVERSIZED_TRANSFER_TYPE) { + return None; + } + serde_json::from_value(value.clone()).ok() + } + + /// Returns `true` when `value` structurally looks like an oversized-transfer + /// frame (`type == "oversized-transfer"` and a string `frameType`). + /// + /// Mirrors the TS `isOversizedTransferFrame` structural narrowing. + pub fn is_frame_value(value: &Value) -> bool { + value.get("type").and_then(Value::as_str) == Some(OVERSIZED_TRANSFER_TYPE) + && value.get("frameType").and_then(Value::as_str).is_some() + } + + /// Wrap this frame in a `notifications/progress` [`JsonRpcNotification`]. + /// + /// Builds the outer `params` with `progressToken`, `progress`, an optional + /// human-readable `message` (non-normative UX), and the `cvm` frame. + pub fn into_progress_notification( + &self, + progress_token: &str, + progress: u64, + message: Option<&str>, + ) -> Result { + let mut params = Map::new(); + params.insert( + "progressToken".to_string(), + Value::String(progress_token.to_string()), + ); + params.insert("progress".to_string(), Value::Number(progress.into())); + if let Some(message) = message { + params.insert("message".to_string(), Value::String(message.to_string())); + } + params.insert("cvm".to_string(), self.to_cvm_value()?); + + Ok(JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: NOTIFICATIONS_PROGRESS_METHOD.to_string(), + params: Some(Value::Object(params)), + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn assert_cvm_roundtrip(frame: OversizedFrame) { + let value = frame.to_cvm_value().unwrap(); + assert_eq!(value["type"], json!("oversized-transfer")); + assert_eq!(value["frameType"], json!(frame.frame_type())); + assert_eq!(OversizedFrame::from_cvm_value(&value), Some(frame)); + } + + #[test] + fn start_frame_serializes_with_camel_case_fields() { + let frame = OversizedFrame::Start { + completion_mode: CompletionMode::Render, + digest: "sha256:abcd".to_string(), + total_bytes: 100, + total_chunks: 3, + }; + let value = frame.to_cvm_value().unwrap(); + assert_eq!(value["type"], json!("oversized-transfer")); + assert_eq!(value["frameType"], json!("start")); + assert_eq!(value["completionMode"], json!("render")); + assert_eq!(value["digest"], json!("sha256:abcd")); + assert_eq!(value["totalBytes"], json!(100)); + assert_eq!(value["totalChunks"], json!(3)); + assert_cvm_roundtrip(frame); + } + + #[test] + fn all_frame_variants_roundtrip() { + assert_cvm_roundtrip(OversizedFrame::Start { + completion_mode: CompletionMode::Render, + digest: "sha256:abcd".to_string(), + total_bytes: 8, + total_chunks: 2, + }); + assert_cvm_roundtrip(OversizedFrame::Accept); + assert_cvm_roundtrip(OversizedFrame::Chunk { + data: "payload".to_string(), + }); + assert_cvm_roundtrip(OversizedFrame::End); + assert_cvm_roundtrip(OversizedFrame::Abort { + reason: Some("boom".to_string()), + }); + assert_cvm_roundtrip(OversizedFrame::Abort { reason: None }); + } + + #[test] + fn abort_reason_omitted_when_none() { + let value = OversizedFrame::Abort { reason: None } + .to_cvm_value() + .unwrap(); + assert!(!value.as_object().unwrap().contains_key("reason")); + } + + #[test] + fn from_cvm_value_rejects_wrong_type() { + let value = json!({ "type": "something-else", "frameType": "end" }); + assert_eq!(OversizedFrame::from_cvm_value(&value), None); + assert!(!OversizedFrame::is_frame_value(&value)); + } + + #[test] + fn from_cvm_value_rejects_unknown_frame_type() { + let value = json!({ "type": "oversized-transfer", "frameType": "bogus" }); + assert_eq!(OversizedFrame::from_cvm_value(&value), None); + } + + #[test] + fn is_frame_value_requires_type_and_frame_type() { + assert!(OversizedFrame::is_frame_value( + &json!({ "type": "oversized-transfer", "frameType": "chunk", "data": "x" }) + )); + assert!(!OversizedFrame::is_frame_value( + &json!({ "type": "oversized-transfer" }) + )); + assert!(!OversizedFrame::is_frame_value( + &json!({ "frameType": "chunk" }) + )); + assert!(!OversizedFrame::is_frame_value(&json!("not an object"))); + } + + #[test] + fn into_progress_notification_builds_progress_envelope() { + let notification = OversizedFrame::Chunk { + data: "frag".to_string(), + } + .into_progress_notification("tok-1", 7, Some("hi")) + .unwrap(); + + assert_eq!(notification.method, "notifications/progress"); + let params = notification.params.as_ref().unwrap(); + assert_eq!(params["progressToken"], json!("tok-1")); + assert_eq!(params["progress"], json!(7)); + assert_eq!(params["message"], json!("hi")); + assert_eq!(params["cvm"]["frameType"], json!("chunk")); + assert_eq!(params["cvm"]["data"], json!("frag")); + } + + #[test] + fn into_progress_notification_omits_absent_message() { + let notification = OversizedFrame::End + .into_progress_notification("tok-1", 9, None) + .unwrap(); + let params = notification.params.as_ref().unwrap(); + assert!(!params.as_object().unwrap().contains_key("message")); + } +} diff --git a/src/transport/oversized_transfer/mod.rs b/src/transport/oversized_transfer/mod.rs new file mode 100644 index 0000000..66759ac --- /dev/null +++ b/src/transport/oversized_transfer/mod.rs @@ -0,0 +1,57 @@ +//! CEP-22 oversized payload transfer — transport-agnostic framing engine. +//! +//! A serialized JSON-RPC message too large to publish as a single relay event is +//! split into an ordered sequence of frames carried inside MCP +//! `notifications/progress` messages, transmitted as ordinary kind-`25910` +//! events, and reassembled by the receiver after SHA-256 + size validation. +//! See the CEP-22 spec and the TypeScript reference at +//! `sdk/src/transport/oversized-transfer/`. +//! +//! This module is the **pure engine**: building frames ([`codec`]) and +//! reassembling them ([`receiver`]). It carries no transport, I/O, or timers — +//! those are wired in by the client and server transports in later PRs. Until +//! then the module is intentionally unused by the rest of the crate. +//! +//! ``` +//! use contextvm_sdk::transport::oversized_transfer::{ +//! build_oversized_frames, OversizedSenderOptions, OversizedTransferReceiver, +//! }; +//! use contextvm_sdk::core::types::{JsonRpcMessage, JsonRpcResponse}; +//! use serde_json::json; +//! +//! let message = JsonRpcMessage::Response(JsonRpcResponse { +//! jsonrpc: "2.0".to_string(), +//! id: json!(1), +//! result: json!({ "value": "a large payload" }), +//! }); +//! let serialized = serde_json::to_string(&message).unwrap(); +//! +//! // Sender: split into ordered frames. +//! let opts = OversizedSenderOptions::new("token-1").with_chunk_size(8); +//! let frames = build_oversized_frames(&serialized, &opts).unwrap(); +//! +//! // Receiver: feed frames back; the last frame yields the reassembled message. +//! let mut receiver = OversizedTransferReceiver::new(); +//! let mut reassembled = None; +//! for frame in frames.into_ordered() { +//! if let Some(message) = receiver.process_frame(&frame).unwrap() { +//! reassembled = Some(message); +//! } +//! } +//! assert_eq!(reassembled.unwrap().id(), message.id()); +//! ``` + +pub mod codec; +pub mod constants; +pub mod errors; +pub mod frame; +pub mod receiver; + +pub use codec::{ + build_oversized_frames, sha256_digest, split_string_by_byte_size, utf8_byte_len, + BuiltOversizedFrames, OversizedSenderOptions, +}; +pub use constants::*; +pub use errors::OversizedTransferError; +pub use frame::{CompletionMode, OversizedFrame}; +pub use receiver::{OversizedTransferReceiver, TransferPolicy}; diff --git a/src/transport/oversized_transfer/receiver.rs b/src/transport/oversized_transfer/receiver.rs new file mode 100644 index 0000000..4b4f4e7 --- /dev/null +++ b/src/transport/oversized_transfer/receiver.rs @@ -0,0 +1,1083 @@ +//! Stateful reassembly engine for CEP-22 oversized transfers. +//! +//! Ports `sdk/src/transport/oversized-transfer/receiver.ts`. Feeds inbound +//! `notifications/progress` frames through [`OversizedTransferReceiver::process_frame`] +//! and returns the reassembled [`JsonRpcMessage`] once a transfer completes and +//! passes byte-length, SHA-256, and JSON-RPC validation. +//! +//! This PR-1 engine is **pure and synchronous**: it owns no timers. The hard +//! per-transfer watchdog (`transfer_timeout_ms`) and the sender-side +//! accept-waiter are added when the engine is wired into the transport. + +use std::collections::{BTreeMap, HashMap}; + +use serde_json::Value; + +use crate::core::types::{JsonRpcMessage, JsonRpcNotification}; +use crate::core::validation::validate_and_parse_oversized; + +use super::codec::sha256_digest; +use super::constants::{ + DEFAULT_MAX_CONCURRENT_TRANSFERS, DEFAULT_MAX_OUT_OF_ORDER_CHUNKS, + DEFAULT_MAX_OUT_OF_ORDER_WINDOW, DEFAULT_MAX_TRANSFER_BYTES, DEFAULT_MAX_TRANSFER_CHUNKS, + DEFAULT_TRANSFER_TIMEOUT_MS, DIGEST_PREFIX, +}; +use super::errors::OversizedTransferError; +use super::frame::OversizedFrame; + +/// Receiver-side admission and out-of-order policy. +/// +/// Mirrors the TS `TransferPolicy`. Construct via [`TransferPolicy::default`] +/// and override individual fields with struct-update syntax. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TransferPolicy { + /// Maximum total reassembled payload size accepted (bytes). + pub max_transfer_bytes: u64, + /// Maximum number of chunks accepted. + pub max_transfer_chunks: u64, + /// Maximum number of concurrently active transfers. + pub max_concurrent_transfers: usize, + /// Maximum forward gap from the contiguous frontier that is still buffered. + pub max_out_of_order_window: u64, + /// Maximum number of buffered out-of-order chunks. + pub max_out_of_order_chunks: usize, + /// Hard per-transfer timeout (milliseconds). Reserved for the transport + /// watchdog in a later PR; the pure engine does not enforce it. + pub transfer_timeout_ms: u64, +} + +impl Default for TransferPolicy { + fn default() -> Self { + Self { + max_transfer_bytes: DEFAULT_MAX_TRANSFER_BYTES, + max_transfer_chunks: DEFAULT_MAX_TRANSFER_CHUNKS, + max_concurrent_transfers: DEFAULT_MAX_CONCURRENT_TRANSFERS, + max_out_of_order_window: DEFAULT_MAX_OUT_OF_ORDER_WINDOW, + max_out_of_order_chunks: DEFAULT_MAX_OUT_OF_ORDER_CHUNKS, + transfer_timeout_ms: DEFAULT_TRANSFER_TIMEOUT_MS, + } + } +} + +/// In-flight state for a single transfer, keyed by `progressToken`. +#[derive(Debug)] +struct ActiveTransfer { + digest: String, + total_bytes: u64, + total_chunks: u64, + start_progress: u64, + accept_progress: Option, + first_chunk_progress: Option, + next_expected_chunk_progress: Option, + highest_observed_progress: u64, + /// Chunk fragments keyed by the outer `progress` value (canonical index). + chunks: BTreeMap, +} + +impl ActiveTransfer { + /// The progress slot the first chunk occupies, given handshake state. + fn first_chunk_slot(&self) -> u64 { + if self.accept_progress.is_some() { + self.start_progress + 2 + } else { + self.start_progress + 1 + } + } + + /// The next contiguous chunk slot the assembler is waiting for. + fn next_expected_chunk_progress(&self) -> u64 { + self.next_expected_chunk_progress + .unwrap_or_else(|| self.first_chunk_slot()) + } + + /// Recompute the contiguous-frontier bookkeeping after a chunk insert. + fn refresh_chunk_progress_state(&mut self) { + if self.chunks.is_empty() { + self.first_chunk_progress = None; + self.next_expected_chunk_progress = None; + return; + } + + let first = self.first_chunk_slot(); + let mut next = first; + while self.chunks.contains_key(&next) { + next += 1; + } + + self.first_chunk_progress = Some(first); + self.next_expected_chunk_progress = Some(next); + } + + /// Count of buffered chunks sitting beyond the contiguous frontier. + fn buffered_out_of_order_count(&self) -> usize { + match (self.first_chunk_progress, self.next_expected_chunk_progress) { + (Some(first), Some(next)) => self.chunks.len() - (next - first) as usize, + _ => 0, + } + } + + /// Whether every slot in `[first, first + total_chunks)` is present. + fn has_complete_chunk_range(&self, first: u64) -> bool { + (0..self.total_chunks).all(|i| self.chunks.contains_key(&(first + i))) + } + + /// Resolve the first chunk slot for assembly, tolerating the reserved accept + /// slot being used or skipped. Returns `None` if neither layout is complete. + fn assembly_first_chunk_progress(&self) -> Option { + let direct = self.start_progress + 1; + let accept_gated = self.start_progress + 2; + if self.has_complete_chunk_range(direct) { + Some(direct) + } else if self.has_complete_chunk_range(accept_gated) { + Some(accept_gated) + } else { + None + } + } + + /// Concatenate chunks in `progress` order. `None` on any unresolved gap. + fn assemble(&self) -> Option { + let first = self.assembly_first_chunk_progress()?; + let mut out = String::with_capacity(self.total_bytes as usize); + for i in 0..self.total_chunks { + out.push_str(self.chunks.get(&(first + i))?); + } + Some(out) + } +} + +/// Stateful CEP-22 reassembly engine. +/// +/// Tracks per-`progressToken` transfer state, enforces admission and +/// out-of-order policy, and validates integrity before surfacing a reassembled +/// message. Never surfaces partial payloads. +#[derive(Debug)] +pub struct OversizedTransferReceiver { + max_transfer_bytes: u64, + max_transfer_chunks: u64, + max_concurrent_transfers: usize, + max_out_of_order_window: u64, + max_out_of_order_chunks: usize, + transfers: HashMap, +} + +impl Default for OversizedTransferReceiver { + fn default() -> Self { + Self::new() + } +} + +impl OversizedTransferReceiver { + /// Create a receiver with the default [`TransferPolicy`]. + pub fn new() -> Self { + Self::with_policy(TransferPolicy::default()) + } + + /// Create a receiver with an explicit policy. + pub fn with_policy(policy: TransferPolicy) -> Self { + Self { + max_transfer_bytes: policy.max_transfer_bytes, + max_transfer_chunks: policy.max_transfer_chunks, + max_concurrent_transfers: policy.max_concurrent_transfers, + max_out_of_order_window: policy.max_out_of_order_window, + max_out_of_order_chunks: policy.max_out_of_order_chunks, + transfers: HashMap::new(), + } + } + + /// Number of currently active in-flight transfers. + pub fn active_transfer_count(&self) -> usize { + self.transfers.len() + } + + /// Release all in-flight transfer state. + pub fn clear(&mut self) { + self.transfers.clear(); + } + + /// Returns `true` when `notification` carries an oversized-transfer frame in + /// its `params.cvm` field. + pub fn is_oversized_frame(notification: &JsonRpcNotification) -> bool { + notification + .params + .as_ref() + .and_then(|params| params.get("cvm")) + .map(OversizedFrame::is_frame_value) + .unwrap_or(false) + } + + /// Process one inbound `notifications/progress` frame. + /// + /// Returns `Ok(Some(message))` once the transfer is complete and validated, + /// `Ok(None)` when more frames are needed (or the notification is not an + /// oversized frame), and `Err(..)` on abort, policy, integrity, sequence, or + /// reassembly failure. A failing or aborting transfer is cleaned up before + /// the error is returned. + pub fn process_frame( + &mut self, + notification: &JsonRpcNotification, + ) -> Result, OversizedTransferError> { + let params = match notification.params.as_ref() { + Some(params) => params, + None => return Ok(None), + }; + let cvm = match params.get("cvm") { + Some(cvm) => cvm, + None => return Ok(None), + }; + let frame = match OversizedFrame::from_cvm_value(cvm) { + Some(frame) => frame, + None => return Ok(None), + }; + + // The outer progressToken keys the transfer; the outer progress is the + // canonical ordering index. Both are validated before dispatch. + let token = token_to_string(params.get("progressToken")); + assert_valid_token(&token)?; + let progress = parse_progress(params.get("progress"), &token)?; + + match frame { + OversizedFrame::Start { + digest, + total_bytes, + total_chunks, + .. + } => self.handle_start(&token, progress, digest, total_bytes, total_chunks), + OversizedFrame::Accept => self.handle_accept(&token, progress), + OversizedFrame::Chunk { data } => self.handle_chunk(&token, progress, data), + OversizedFrame::End => self.handle_end(&token, progress), + OversizedFrame::Abort { reason } => self.handle_abort(&token, reason), + } + } + + fn handle_start( + &mut self, + token: &str, + progress: u64, + digest: String, + total_bytes: u64, + total_chunks: u64, + ) -> Result, OversizedTransferError> { + if self.transfers.contains_key(token) { + return Err(OversizedTransferError::Sequence(format!( + "Duplicate start frame for active transfer (token: {token})" + ))); + } + if total_bytes > self.max_transfer_bytes { + return Err(OversizedTransferError::Policy(format!( + "totalBytes {total_bytes} exceeds policy limit {} (token: {token})", + self.max_transfer_bytes + ))); + } + if total_chunks > self.max_transfer_chunks { + return Err(OversizedTransferError::Policy(format!( + "totalChunks {total_chunks} exceeds policy limit {} (token: {token})", + self.max_transfer_chunks + ))); + } + if self.transfers.len() >= self.max_concurrent_transfers { + return Err(OversizedTransferError::Policy(format!( + "Active transfers exceed policy limit {} (token: {token})", + self.max_concurrent_transfers + ))); + } + if !digest.starts_with(DIGEST_PREFIX) { + return Err(OversizedTransferError::Reassembly(format!( + "Invalid digest format in start frame (token: {token})" + ))); + } + + self.transfers.insert( + token.to_string(), + ActiveTransfer { + digest, + total_bytes, + total_chunks, + start_progress: progress, + accept_progress: None, + first_chunk_progress: None, + next_expected_chunk_progress: None, + highest_observed_progress: progress, + chunks: BTreeMap::new(), + }, + ); + Ok(None) + } + + fn handle_accept( + &mut self, + token: &str, + progress: u64, + ) -> Result, OversizedTransferError> { + // Dropping `transfer` on the error path is the cleanup (failTransfer). + let mut transfer = match self.transfers.remove(token) { + Some(transfer) => transfer, + // Late or duplicated accept frames are ignored after cleanup. + None => return Ok(None), + }; + + if progress <= transfer.start_progress { + return Err(OversizedTransferError::Sequence(format!( + "Accept frame progress must be greater than start progress (token: {token})" + ))); + } + + transfer.highest_observed_progress = transfer.highest_observed_progress.max(progress); + transfer.accept_progress = Some(progress); + self.transfers.insert(token.to_string(), transfer); + Ok(None) + } + + fn handle_chunk( + &mut self, + token: &str, + progress: u64, + data: String, + ) -> Result, OversizedTransferError> { + let max_window = self.max_out_of_order_window; + let max_ooo_chunks = self.max_out_of_order_chunks; + + let mut transfer = match self.transfers.remove(token) { + Some(transfer) => transfer, + // Late or duplicated chunk frames are ignored after cleanup. + None => return Ok(None), + }; + + let minimum_chunk_progress = transfer.start_progress + 1; + let maximum_chunk_progress = transfer.start_progress + transfer.total_chunks + 1; + + if progress < minimum_chunk_progress { + return Err(OversizedTransferError::Sequence(format!( + "Chunk progress must be greater than start progress (token: {token})" + ))); + } + + let next_expected = transfer.next_expected_chunk_progress(); + // i128 so a hostile `progress` near u64::MAX cannot overflow the subtraction. + let forward_gap = i128::from(progress) - i128::from(next_expected); + if forward_gap > i128::from(max_window) { + return Err(OversizedTransferError::Policy(format!( + "Out-of-order gap {forward_gap} exceeds policy limit {max_window} (token: {token})" + ))); + } + + if progress > maximum_chunk_progress { + return Err(OversizedTransferError::Sequence(format!( + "Chunk progress exceeds declared transfer bounds (token: {token})" + ))); + } + + if progress > transfer.start_progress + 2 + && !transfer.chunks.contains_key(&(transfer.start_progress + 1)) + && !transfer.chunks.contains_key(&(transfer.start_progress + 2)) + { + return Err(OversizedTransferError::Sequence(format!( + "First chunk skips beyond the reserved accept slot (token: {token})" + ))); + } + + if let Some(existing) = transfer.chunks.get(&progress) { + if existing != &data { + return Err(OversizedTransferError::Sequence(format!( + "Conflicting duplicate chunk detected (token: {token}, progress: {progress})" + ))); + } + // Idempotent identical duplicate: keep state, await more frames. + self.transfers.insert(token.to_string(), transfer); + return Ok(None); + } + + transfer.chunks.insert(progress, data); + transfer.highest_observed_progress = transfer.highest_observed_progress.max(progress); + transfer.refresh_chunk_progress_state(); + + if forward_gap > 0 && transfer.buffered_out_of_order_count() > max_ooo_chunks { + return Err(OversizedTransferError::Policy(format!( + "Buffered out-of-order chunks exceed policy limit {max_ooo_chunks} (token: {token})" + ))); + } + + self.transfers.insert(token.to_string(), transfer); + Ok(None) + } + + fn handle_end( + &mut self, + token: &str, + progress: u64, + ) -> Result, OversizedTransferError> { + let max_bytes = self.max_transfer_bytes; + // Removed up front: every end-path outcome (success or failure) is terminal. + let transfer = match self.transfers.remove(token) { + Some(transfer) => transfer, + // Late or duplicated end frames are ignored after cleanup. + None => return Ok(None), + }; + + if progress <= transfer.highest_observed_progress { + return Err(OversizedTransferError::Sequence(format!( + "End frame progress must be greater than all prior transfer frames (token: {token})" + ))); + } + if transfer.total_chunks > 0 && transfer.chunks.is_empty() { + return Err(OversizedTransferError::Reassembly(format!( + "Transfer ended before any chunks were received (token: {token})" + ))); + } + if transfer.chunks.len() as u64 != transfer.total_chunks { + return Err(OversizedTransferError::Reassembly(format!( + "Expected {} chunks but received {} (token: {token})", + transfer.total_chunks, + transfer.chunks.len() + ))); + } + + let assembled = match transfer.assemble() { + Some(assembled) => assembled, + None => { + return Err(OversizedTransferError::Reassembly(format!( + "Transfer ended with unresolved chunk gaps (token: {token})" + ))) + } + }; + + // 1. Byte-length validation. + if assembled.len() as u64 != transfer.total_bytes { + return Err(OversizedTransferError::Digest(format!( + "Byte length mismatch: expected {}, got {} (token: {token})", + transfer.total_bytes, + assembled.len() + ))); + } + + // 2. SHA-256 digest validation. + if sha256_digest(&assembled) != transfer.digest { + return Err(OversizedTransferError::Digest(format!( + "SHA-256 digest mismatch (token: {token})" + ))); + } + + // 3. Parse + validate as a JSON-RPC message, bypassing the 1 MB + // per-event cap in favor of the receiver policy's maxTransferBytes. + match validate_and_parse_oversized(&assembled, max_bytes as usize) { + Some(message) => Ok(Some(message)), + None => Err(OversizedTransferError::Reassembly(format!( + "Reassembled payload is not a valid JSON-RPC message (token: {token})" + ))), + } + } + + fn handle_abort( + &mut self, + token: &str, + reason: Option, + ) -> Result, OversizedTransferError> { + // Abort is terminal whether or not a transfer is currently tracked. + self.transfers.remove(token); + Err(OversizedTransferError::abort(token, reason)) + } +} + +/// Coerce a `progressToken` value to a string (mirrors TS `String(token ?? '')`). +fn token_to_string(value: Option<&Value>) -> String { + match value { + Some(Value::String(s)) => s.clone(), + Some(Value::Number(n)) => n.to_string(), + _ => String::new(), + } +} + +/// A non-empty token is required on every frame. +fn assert_valid_token(token: &str) -> Result<(), OversizedTransferError> { + if token.is_empty() { + return Err(OversizedTransferError::Sequence( + "Oversized transfer frame is missing progressToken".to_string(), + )); + } + Ok(()) +} + +/// Parse and validate the outer `progress`: a positive integer. +fn parse_progress(value: Option<&Value>, token: &str) -> Result { + let progress = match value { + Some(Value::Number(n)) => n.as_u64().or_else(|| { + n.as_f64().and_then(|f| { + if f.fract() == 0.0 && f > 0.0 { + Some(f as u64) + } else { + None + } + }) + }), + _ => None, + }; + + match progress { + Some(progress) if progress > 0 => Ok(progress), + _ => Err(OversizedTransferError::Sequence(format!( + "Invalid progress value (token: {token})" + ))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::types::{JsonRpcRequest, JsonRpcResponse}; + use crate::transport::oversized_transfer::codec::{ + build_oversized_frames, BuiltOversizedFrames, OversizedSenderOptions, + }; + use crate::transport::oversized_transfer::frame::{CompletionMode, OversizedFrame}; + use crate::transport::oversized_transfer::START_PROGRESS; + use serde_json::json; + + const TOKEN: &str = "tok"; + + fn sample_response(id: i64, value: &str) -> JsonRpcMessage { + JsonRpcMessage::Response(JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id: json!(id), + result: json!({ "value": value }), + }) + } + + fn build(message: &JsonRpcMessage, chunk_size: usize) -> BuiltOversizedFrames { + let serialized = serde_json::to_string(message).unwrap(); + let opts = OversizedSenderOptions::new(TOKEN).with_chunk_size(chunk_size); + build_oversized_frames(&serialized, &opts).unwrap() + } + + fn start_frame( + token: &str, + progress: u64, + digest: &str, + total_bytes: u64, + total_chunks: u64, + ) -> JsonRpcNotification { + OversizedFrame::Start { + completion_mode: CompletionMode::Render, + digest: digest.to_string(), + total_bytes, + total_chunks, + } + .into_progress_notification(token, progress, None) + .unwrap() + } + + fn chunk_frame(token: &str, progress: u64, data: &str) -> JsonRpcNotification { + OversizedFrame::Chunk { + data: data.to_string(), + } + .into_progress_notification(token, progress, None) + .unwrap() + } + + fn end_frame(token: &str, progress: u64) -> JsonRpcNotification { + OversizedFrame::End + .into_progress_notification(token, progress, None) + .unwrap() + } + + /// Drive a full ordered frame set through the receiver, returning the message. + fn run_to_completion( + receiver: &mut OversizedTransferReceiver, + frames: BuiltOversizedFrames, + ) -> JsonRpcMessage { + let mut out = None; + for frame in frames.into_ordered() { + if let Some(message) = receiver.process_frame(&frame).unwrap() { + out = Some(message); + } + } + out.expect("transfer should complete on the end frame") + } + + // ── roundtrip ─────────────────────────────────────────────────── + + #[test] + fn roundtrip_in_order() { + let message = sample_response(1, &"x".repeat(60)); + let frames = build(&message, 8); + assert!(frames.chunks.len() > 1); + + let mut receiver = OversizedTransferReceiver::new(); + let reconstructed = run_to_completion(&mut receiver, frames); + + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn roundtrip_request_message() { + // Reassembly validates any JSON-RPC variant, not just responses. + let message = JsonRpcMessage::Request(JsonRpcRequest { + jsonrpc: "2.0".to_string(), + id: json!("abc"), + method: "tools/call".to_string(), + params: Some(json!({ "blob": "y".repeat(40) })), + }); + let frames = build(&message, 10); + + let mut receiver = OversizedTransferReceiver::new(); + let reconstructed = run_to_completion(&mut receiver, frames); + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + } + + #[test] + fn roundtrip_multibyte_payload() { + // Small chunks force boundaries inside multibyte runs (CEP-22 D4). + let message = sample_response(7, "héllo 🦀 wörld 日本語 ☃ même 🚀🚀"); + let frames = build(&message, 5); + + let mut receiver = OversizedTransferReceiver::new(); + let reconstructed = run_to_completion(&mut receiver, frames); + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + } + + #[test] + fn roundtrip_out_of_order_within_window() { + let message = sample_response(2, &"z".repeat(60)); + let frames = build(&message, 8); + assert!(frames.chunks.len() >= 3); + + let mut receiver = OversizedTransferReceiver::with_policy(TransferPolicy { + max_out_of_order_window: 4, + max_out_of_order_chunks: 4, + ..Default::default() + }); + + receiver.process_frame(&frames.start).unwrap(); + // Deliver the first two chunks swapped. + receiver.process_frame(&frames.chunks[1]).unwrap(); + receiver.process_frame(&frames.chunks[0]).unwrap(); + for chunk in &frames.chunks[2..] { + receiver.process_frame(chunk).unwrap(); + } + let reconstructed = receiver.process_frame(&frames.end).unwrap().unwrap(); + + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn roundtrip_accept_gated_handshake() { + let message = sample_response(6, &"q".repeat(40)); + let serialized = serde_json::to_string(&message).unwrap(); + let opts = OversizedSenderOptions::new(TOKEN) + .with_chunk_size(8) + .with_accept_handshake(true); + let frames = build_oversized_frames(&serialized, &opts).unwrap(); + + let mut receiver = OversizedTransferReceiver::new(); + receiver.process_frame(&frames.start).unwrap(); + // Sender waits for the receiver's accept on the reserved slot 2. + let accept = OversizedFrame::Accept + .into_progress_notification(TOKEN, 2, None) + .unwrap(); + receiver.process_frame(&accept).unwrap(); + for chunk in &frames.chunks { + receiver.process_frame(chunk).unwrap(); + } + let reconstructed = receiver.process_frame(&frames.end).unwrap().unwrap(); + + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + } + + // ── out-of-order policy ───────────────────────────────────────── + + #[test] + fn out_of_order_gap_over_window_is_policy_error() { + let message = sample_response(2, "abcdefghijklmnop"); + let frames = build(&message, 4); + assert!(frames.chunks.len() >= 3); + + let mut receiver = OversizedTransferReceiver::with_policy(TransferPolicy { + max_out_of_order_window: 1, + ..Default::default() + }); + receiver.process_frame(&frames.start).unwrap(); + + // Jump straight to the third chunk: forward gap 2 > window 1. + let err = receiver.process_frame(&frames.chunks[2]).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn chunk_with_u64_max_progress_does_not_panic() { + // Regression: the forward-gap subtraction must not overflow when a + // hostile peer sends progress = u64::MAX (previously panicked under i64). + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + + let err = receiver + .process_frame(&chunk_frame(TOKEN, u64::MAX, "x")) + .unwrap_err(); + // Rejected cleanly (huge forward gap → Policy) rather than panicking. + assert!(matches!( + err, + OversizedTransferError::Policy(_) | OversizedTransferError::Sequence(_) + )); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn first_chunk_skipping_reserved_accept_slot_is_sequence_error() { + // 5 declared chunks keep slot 4 within the declared bounds (slots 2..=6), + // so this exercises the reserved-slot guard, not the bounds check. + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 20, 5)) + .unwrap(); + + // First chunk jumps to slot 4, skipping the reserved accept slot (2) and + // the first chunk slot (3) while both are still empty. + let err = receiver + .process_frame(&chunk_frame(TOKEN, 4, "data")) + .unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + assert!(err.to_string().contains("reserved accept slot")); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn accept_not_advancing_past_start_is_sequence_error() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, START_PROGRESS, "sha256:abcd", 4, 1)) + .unwrap(); + + // The accept frame MUST advance past the start slot; progress equal to + // START_PROGRESS does not. + let accept = OversizedFrame::Accept + .into_progress_notification(TOKEN, START_PROGRESS, None) + .unwrap(); + let err = receiver.process_frame(&accept).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + assert_eq!(receiver.active_transfer_count(), 0); + } + + // ── integrity ─────────────────────────────────────────────────── + + #[test] + fn digest_mismatch_is_digest_error() { + let message = sample_response(1, "hello"); + let serialized = serde_json::to_string(&message).unwrap(); + let total_bytes = serialized.len() as u64; + + let mut receiver = OversizedTransferReceiver::new(); + // Correct byte length, but a wrong (well-formed) digest. + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:deadbeef", total_bytes, 1)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, &serialized)) + .unwrap(); + let err = receiver.process_frame(&end_frame(TOKEN, 3)).unwrap_err(); + + assert!(matches!(err, OversizedTransferError::Digest(_))); + assert!(err.to_string().to_lowercase().contains("digest")); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn byte_length_mismatch_is_digest_error() { + let message = sample_response(1, "hello"); + let serialized = serde_json::to_string(&message).unwrap(); + let digest = sha256_digest(&serialized); + let wrong_total = serialized.len() as u64 + 1; + + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, &digest, wrong_total, 1)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, &serialized)) + .unwrap(); + let err = receiver.process_frame(&end_frame(TOKEN, 3)).unwrap_err(); + + assert!(matches!(err, OversizedTransferError::Digest(_))); + assert!(err.to_string().to_lowercase().contains("byte length")); + } + + // ── sequencing ────────────────────────────────────────────────── + + #[test] + fn duplicate_start_is_sequence_error() { + let start = start_frame(TOKEN, 1, "sha256:abcd", 4, 1); + let mut receiver = OversizedTransferReceiver::new(); + receiver.process_frame(&start).unwrap(); + + let err = receiver.process_frame(&start).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + assert!(err.to_string().contains("Duplicate start")); + // The pre-existing transfer is left intact (duplicate is rejected, not reset). + assert_eq!(receiver.active_transfer_count(), 1); + } + + #[test] + fn chunk_count_mismatch_is_reassembly_error() { + let mut receiver = OversizedTransferReceiver::new(); + // Declares two chunks but only one arrives before end. + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 8, 2)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, "abcd")) + .unwrap(); + let err = receiver.process_frame(&end_frame(TOKEN, 3)).unwrap_err(); + + assert!(matches!(err, OversizedTransferError::Reassembly(_))); + assert!(err.to_string().contains("Expected 2 chunks but received 1")); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn conflicting_duplicate_chunk_is_sequence_error() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 8, 2)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, "abcd")) + .unwrap(); + let err = receiver + .process_frame(&chunk_frame(TOKEN, 2, "wxyz")) + .unwrap_err(); + + assert!(matches!(err, OversizedTransferError::Sequence(_))); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn identical_duplicate_chunk_is_idempotent() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 8, 2)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, "abcd")) + .unwrap(); + // Re-delivering the identical chunk is a no-op, not an error. + assert!(receiver + .process_frame(&chunk_frame(TOKEN, 2, "abcd")) + .unwrap() + .is_none()); + assert_eq!(receiver.active_transfer_count(), 1); + } + + #[test] + fn end_not_advancing_progress_is_sequence_error() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + receiver + .process_frame(&chunk_frame(TOKEN, 2, "test")) + .unwrap(); + // End at the same progress as the last chunk must not advance. + let err = receiver.process_frame(&end_frame(TOKEN, 2)).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + } + + #[test] + fn missing_token_is_sequence_error() { + let frame = chunk_frame("", 2, "abcd"); + let mut receiver = OversizedTransferReceiver::new(); + let err = receiver.process_frame(&frame).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + assert!(err.to_string().contains("missing progressToken")); + } + + #[test] + fn non_positive_progress_is_sequence_error() { + let frame = start_frame(TOKEN, 0, "sha256:abcd", 4, 1); + let mut receiver = OversizedTransferReceiver::new(); + let err = receiver.process_frame(&frame).unwrap_err(); + assert!(matches!(err, OversizedTransferError::Sequence(_))); + } + + // ── abort ─────────────────────────────────────────────────────── + + #[test] + fn abort_terminates_active_transfer() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + + let abort = OversizedFrame::Abort { + reason: Some("peer cancelled".to_string()), + } + .into_progress_notification(TOKEN, 5, None) + .unwrap(); + let err = receiver.process_frame(&abort).unwrap_err(); + + assert!(err.is_abort()); + match err { + OversizedTransferError::Abort { token, reason } => { + assert_eq!(token, TOKEN); + assert_eq!(reason.as_deref(), Some("peer cancelled")); + } + other => panic!("expected abort, got {other:?}"), + } + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn orphan_abort_still_errors() { + let mut receiver = OversizedTransferReceiver::new(); + let abort = OversizedFrame::Abort { reason: None } + .into_progress_notification(TOKEN, 2, None) + .unwrap(); + assert!(receiver.process_frame(&abort).unwrap_err().is_abort()); + } + + // ── admission limits ──────────────────────────────────────────── + + #[test] + fn start_over_byte_limit_is_policy_error() { + let mut receiver = OversizedTransferReceiver::with_policy(TransferPolicy { + max_transfer_bytes: 10, + ..Default::default() + }); + let err = receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 11, 1)) + .unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn start_over_chunk_limit_is_policy_error() { + let mut receiver = OversizedTransferReceiver::with_policy(TransferPolicy { + max_transfer_chunks: 1, + ..Default::default() + }); + let err = receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 2)) + .unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + } + + #[test] + fn start_over_concurrency_limit_is_policy_error() { + let mut receiver = OversizedTransferReceiver::with_policy(TransferPolicy { + max_concurrent_transfers: 1, + ..Default::default() + }); + receiver + .process_frame(&start_frame("tok-a", 1, "sha256:abcd", 4, 1)) + .unwrap(); + let err = receiver + .process_frame(&start_frame("tok-b", 1, "sha256:efgh", 4, 1)) + .unwrap_err(); + assert!(matches!(err, OversizedTransferError::Policy(_))); + } + + #[test] + fn start_with_malformed_digest_is_reassembly_error() { + let mut receiver = OversizedTransferReceiver::new(); + // Missing the "sha256:" prefix. + let err = receiver + .process_frame(&start_frame(TOKEN, 1, "abcd", 4, 1)) + .unwrap_err(); + assert!(matches!(err, OversizedTransferError::Reassembly(_))); + } + + // ── orphan / non-frame handling ───────────────────────────────── + + #[test] + fn orphan_late_frames_are_ignored() { + let mut receiver = OversizedTransferReceiver::new(); + let accept = OversizedFrame::Accept + .into_progress_notification("orphan-accept", 2, None) + .unwrap(); + assert!(receiver.process_frame(&accept).unwrap().is_none()); + assert!(receiver + .process_frame(&chunk_frame("orphan-chunk", 2, "x")) + .unwrap() + .is_none()); + assert!(receiver + .process_frame(&end_frame("orphan-end", 2)) + .unwrap() + .is_none()); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn non_frame_progress_notification_is_passthrough_none() { + let mut receiver = OversizedTransferReceiver::new(); + let plain = JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/progress".to_string(), + params: Some(json!({ "progressToken": "t", "progress": 3 })), + }; + assert!(receiver.process_frame(&plain).unwrap().is_none()); + + let no_params = JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/initialized".to_string(), + params: None, + }; + assert!(receiver.process_frame(&no_params).unwrap().is_none()); + } + + #[test] + fn is_oversized_frame_detects_cvm_payload() { + let frame = end_frame(TOKEN, 3); + assert!(OversizedTransferReceiver::is_oversized_frame(&frame)); + + let plain = JsonRpcNotification { + jsonrpc: "2.0".to_string(), + method: "notifications/progress".to_string(), + params: Some(json!({ "progressToken": "t", "progress": 3 })), + }; + assert!(!OversizedTransferReceiver::is_oversized_frame(&plain)); + } + + #[test] + fn clear_releases_in_flight_state() { + let mut receiver = OversizedTransferReceiver::new(); + receiver + .process_frame(&start_frame(TOKEN, 1, "sha256:abcd", 4, 1)) + .unwrap(); + assert_eq!(receiver.active_transfer_count(), 1); + receiver.clear(); + assert_eq!(receiver.active_transfer_count(), 0); + } + + #[test] + fn reassembled_payload_exceeds_one_megabyte() { + // The whole point of CEP-22: reassemble a payload larger than the 1 MB + // per-event cap. Chunks stay small; the result is ~1.2 MB. + let message = sample_response(1, &"m".repeat(1_200_000)); + let serialized = serde_json::to_string(&message).unwrap(); + assert!(serialized.len() > crate::core::constants::MAX_MESSAGE_SIZE); + + let frames = build(&message, 48_000); + let mut receiver = OversizedTransferReceiver::new(); + let reconstructed = run_to_completion(&mut receiver, frames); + assert_eq!( + serde_json::to_value(&reconstructed).unwrap(), + serde_json::to_value(&message).unwrap() + ); + } +}