From b7e3344da57665977a72e52bda1adec3a481adf3 Mon Sep 17 00:00:00 2001 From: Enrique Moreno <403428+chuwik@users.noreply.github.com> Date: Mon, 8 Jun 2026 20:34:47 -0700 Subject: [PATCH 1/2] fix(jsonrpc): isolate malformed frames so one bad message can't cancel all in-flight requests The read loop parsed each Content-Length frame inside read_message and treated a serde_json body error the same as a fatal I/O error: it logged "error reading from CLI", broke the loop, and drained every pending request. One corrupt frame therefore cancelled all concurrent in-flight requests sharing the connection (e.g. list_models + list_global_skills + account.getQuota), leaving the desktop model picker empty on launch (github/app#836). Content-Length framing is honest: the reader assembles a full frame body before parsing and stays byte-aligned to the next frame regardless of any single body's content. So a body-level JSON error is self-contained and must not tear down the shared connection. Separate framing from parsing: read_message becomes read_frame returning the raw body bytes (I/O and protocol/framing errors still propagate as fatal). read_loop parses the body itself; on a parse error it recovers the response id from the frame head and fails only that one awaiter with a -32700 parse error, then continues serving the connection. Frames with no recoverable id are dropped. If framing were ever desynced, the next read_frame hits a framing error and the loop still breaks and reconnects. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- rust/src/jsonrpc.rs | 253 +++++++++++++++++++++++++++---------- rust/tests/jsonrpc_test.rs | 103 +++++++++++++++ 2 files changed, 289 insertions(+), 67 deletions(-) diff --git a/rust/src/jsonrpc.rs b/rust/src/jsonrpc.rs index fbdc96505..0f59b89b8 100644 --- a/rust/src/jsonrpc.rs +++ b/rust/src/jsonrpc.rs @@ -77,6 +77,8 @@ pub struct JsonRpcError { /// Standard JSON-RPC 2.0 error codes. pub mod error_codes { + /// Parse error (-32700): the server sent a message that is not valid JSON. + pub const PARSE_ERROR: i32 = -32700; /// Method not found (-32601). pub const METHOD_NOT_FOUND: i32 = -32601; /// Invalid method parameters (-32602). @@ -169,6 +171,38 @@ impl JsonRpcResponse { const CONTENT_LENGTH_HEADER: &str = "Content-Length: "; +/// Best-effort recovery of a response `id` from a frame whose body failed to +/// parse. JSON-RPC responses serialize `id` near the start of the object — +/// ahead of the `result`/`error` payload that may be truncated — so a bounded +/// scan of the leading bytes finds it without a full parse. Returns `None` for +/// notifications (no `id`) or when no numeric id is present in the prefix. +fn extract_response_id(body: &[u8]) -> Option { + const SCAN_LIMIT: usize = 256; + let head = &body[..body.len().min(SCAN_LIMIT)]; + // Only responses carry a recoverable awaiter. Notifications and server + // requests are distinguished by a `method` field; bail so a stray numeric + // `id` nested in their params can't fail an unrelated pending request. + if head.windows(8).any(|window| window == b"\"method\"") { + return None; + } + let key = b"\"id\""; + let key_pos = head.windows(key.len()).position(|window| window == key)?; + let after = &head[key_pos + key.len()..]; + + let mut i = 0; + while i < after.len() && matches!(after[i], b' ' | b'\t' | b':') { + i += 1; + } + let start = i; + while i < after.len() && after[i].is_ascii_digit() { + i += 1; + } + if i == start { + return None; + } + std::str::from_utf8(&after[start..i]).ok()?.parse().ok() +} + /// One framed JSON-RPC message handed to the writer actor. /// /// `frame` is the fully serialized bytes (header + body); the caller pays @@ -308,77 +342,91 @@ impl JsonRpcClient { let mut reader = BufReader::new(reader); loop { - match Self::read_message(&mut reader).await { - Ok(Some(message)) => match message { - JsonRpcMessage::Response(mut response) => { - let id = response.id; - let pending = pending_requests.write().remove(&id); - if let Some(PendingRequest { - sender, - inline_callback, - }) = pending + let body = match Self::read_frame(&mut reader).await { + Ok(Some(body)) => body, + Ok(None) => break, + Err(e) => { + error!(error = %e, "error reading from CLI"); + break; + } + }; + + // Parse the fully assembled frame. A body-level JSON error means + // this single message is corrupt, not that the transport is + // broken: Content-Length framing has already left the reader + // aligned to the next frame. Fail only the implicated request and + // keep serving every other in-flight request, rather than tearing + // down the shared connection and cancelling them all. + let message = match serde_json::from_slice::(&body) { + Ok(message) => message, + Err(parse_error) => { + Self::fail_unparseable_frame(&body, &parse_error, &pending_requests); + continue; + } + }; + + match message { + JsonRpcMessage::Response(mut response) => { + let id = response.id; + let pending = pending_requests.write().remove(&id); + if let Some(PendingRequest { + sender, + inline_callback, + }) = pending + { + // Run the inline callback synchronously on the + // read loop so any state it mutates (e.g. + // registering a server-assigned session id with + // the router) is visible before the loop reads + // and dispatches the next message. + if let Some(cb) = inline_callback + && response.error.is_none() { - // Run the inline callback synchronously on the - // read loop so any state it mutates (e.g. - // registering a server-assigned session id with - // the router) is visible before the loop reads - // and dispatches the next message. - if let Some(cb) = inline_callback - && response.error.is_none() - { - let cb_outcome = - std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - cb(&response) - })); - match cb_outcome { - Ok(Ok(())) => {} - Ok(Err(error)) => { - response.result = None; - response.error = Some(JsonRpcError { - code: -32603, - message: error.to_string(), - data: None, - }); - } - Err(panic) => { - let message = panic - .downcast_ref::<&'static str>() - .map(|s| (*s).to_string()) - .or_else(|| panic.downcast_ref::().cloned()) - .unwrap_or_else(|| { - "inline response callback panicked".to_string() - }); - response.result = None; - response.error = Some(JsonRpcError { - code: -32603, - message, - data: None, + let cb_outcome = + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + cb(&response) + })); + match cb_outcome { + Ok(Ok(())) => {} + Ok(Err(error)) => { + response.result = None; + response.error = Some(JsonRpcError { + code: -32603, + message: error.to_string(), + data: None, + }); + } + Err(panic) => { + let message = panic + .downcast_ref::<&'static str>() + .map(|s| (*s).to_string()) + .or_else(|| panic.downcast_ref::().cloned()) + .unwrap_or_else(|| { + "inline response callback panicked".to_string() }); - } + response.result = None; + response.error = Some(JsonRpcError { + code: -32603, + message, + data: None, + }); } } - if sender.send(response).is_err() { - warn!(request_id = %id, "failed to send response for request"); - } - } else { - warn!(request_id = %id, "received response for unknown request id"); } - } - JsonRpcMessage::Notification(notification) => { - let _ = notification_tx.send(notification); - } - JsonRpcMessage::Request(request) => { - if request_tx.send(request).is_err() { - warn!("failed to forward JSON-RPC request, channel closed"); + if sender.send(response).is_err() { + warn!(request_id = %id, "failed to send response for request"); } + } else { + warn!(request_id = %id, "received response for unknown request id"); } - }, - Ok(None) => { - break; } - Err(e) => { - error!(error = %e, "error reading from CLI"); - break; + JsonRpcMessage::Notification(notification) => { + let _ = notification_tx.send(notification); + } + JsonRpcMessage::Request(request) => { + if request_tx.send(request).is_err() { + warn!("failed to forward JSON-RPC request, channel closed"); + } } } } @@ -395,9 +443,56 @@ impl JsonRpcClient { } } - async fn read_message( + /// Deliver a parse-error response to the request implicated by a corrupt + /// frame, then return so the read loop can keep serving the connection. + /// + /// Honest Content-Length framing keeps the stream aligned to the next + /// frame whether or not a body is valid JSON, so a single unparseable + /// message must not cancel every concurrent request. The offending + /// request's `id` sits at the head of the frame — before the possibly + /// truncated payload — so we recover it without a full parse and fail just + /// that one awaiter. Frames with no recoverable id (notifications, server + /// requests) carry no client-side awaiter and are simply dropped. + fn fail_unparseable_frame( + body: &[u8], + error: &serde_json::Error, + pending_requests: &RwLock>, + ) { + let recovered_id = extract_response_id(body); + warn!( + error = %error, + frame_len = body.len(), + request_id = ?recovered_id, + "skipping unparseable JSON-RPC frame; connection preserved" + ); + let Some(id) = recovered_id else { + return; + }; + if let Some(PendingRequest { sender, .. }) = pending_requests.write().remove(&id) { + let response = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + id, + result: None, + error: Some(JsonRpcError { + code: error_codes::PARSE_ERROR, + message: format!("malformed JSON-RPC response from CLI: {error}"), + data: None, + }), + }; + let _ = sender.send(response); + } + } + + /// Read a single Content-Length-framed message body from the transport. + /// + /// Returns `Ok(Some(body))` with the exact frame bytes, `Ok(None)` on a + /// clean EOF at a frame boundary, or `Err` for an I/O or framing error + /// (which the read loop treats as a fatal transport failure). Parsing the + /// returned bytes is deliberately left to the caller so a JSON error can + /// be handled per-message without tearing down the connection. + async fn read_frame( reader: &mut BufReader, - ) -> Result, Error> { + ) -> Result>, Error> { let mut line = String::new(); let mut content_length = None; @@ -428,8 +523,7 @@ impl JsonRpcClient { let mut body = vec![0u8; length]; reader.read_exact(&mut body).await?; - let message: JsonRpcMessage = serde_json::from_slice(&body)?; - Ok(Some(message)) + Ok(Some(body)) } /// Send a JSON-RPC request and wait for the matching response. @@ -660,6 +754,31 @@ mod tests { assert!(result.is_err()); } + #[test] + fn extract_response_id_recovers_id_from_truncated_body() { + // Body cut off mid-`\u` escape — the failure mode from issue github/app#836. + let body = br#"{"jsonrpc":"2.0","id":4271,"result":{"text":"\u00"#; + assert_eq!(extract_response_id(body), Some(4271)); + } + + #[test] + fn extract_response_id_handles_whitespace_and_error_frames() { + assert_eq!( + extract_response_id(br#"{ "id" : 12 , "result": null}"#), + Some(12) + ); + assert_eq!( + extract_response_id(br#"{"jsonrpc":"2.0","id":9,"error":{"code":-32603"#), + Some(9) + ); + } + + #[test] + fn extract_response_id_returns_none_for_notifications() { + let body = br#"{"jsonrpc":"2.0","method":"session.event","params":{"id":"e1"}}"#; + assert_eq!(extract_response_id(body), None); + } + #[test] fn request_new_sets_version() { let req = JsonRpcRequest::new(42, "test.method", None); diff --git a/rust/tests/jsonrpc_test.rs b/rust/tests/jsonrpc_test.rs index 7f7d43213..a67aff2ca 100644 --- a/rust/tests/jsonrpc_test.rs +++ b/rust/tests/jsonrpc_test.rs @@ -410,3 +410,106 @@ async fn send_request_cancellation_does_not_leak_pending() { assert_eq!(response.result.unwrap()["ok"], true); server_task.await.unwrap(); } + +/// Regression for issue github/app#836: a single unparseable response frame +/// (a body truncated mid-`\uXXXX` escape) must fail only its own request and +/// leave every other concurrent request untouched — not tear down the shared +/// read loop and cancel them all (which left the model picker empty on launch). +#[tokio::test] +async fn malformed_frame_fails_only_its_own_request() { + use std::time::Duration; + use tokio::time::timeout; + + async fn read_one_request(reader: &mut tokio::io::DuplexStream) -> JsonRpcRequest { + let mut header = String::new(); + loop { + let mut byte = [0u8; 1]; + tokio::io::AsyncReadExt::read_exact(reader, &mut byte) + .await + .unwrap(); + header.push(byte[0] as char); + if header.ends_with("\r\n\r\n") { + break; + } + } + let length: usize = header + .trim() + .strip_prefix("Content-Length: ") + .unwrap() + .parse() + .unwrap(); + let mut body = vec![0u8; length]; + tokio::io::AsyncReadExt::read_exact(reader, &mut body) + .await + .unwrap(); + serde_json::from_slice(&body).unwrap() + } + + let (client_write, mut server_read) = duplex(8192); + let (mut server_write, client_read) = duplex(8192); + + let (notification_tx, _) = broadcast::channel(16); + let (request_tx, _) = mpsc::unbounded_channel(); + let client = std::sync::Arc::new(JsonRpcClient::new( + client_write, + client_read, + notification_tx, + request_tx, + )); + + // Two concurrent in-flight requests sharing the one read loop. + let bad = tokio::spawn({ + let client = client.clone(); + async move { client.send_request("bad", None).await } + }); + let good = tokio::spawn({ + let client = client.clone(); + async move { client.send_request("good", None).await } + }); + + // Learn the wire ids regardless of which request was written first. + let server = tokio::spawn(async move { + let mut ids = std::collections::HashMap::new(); + for _ in 0..2 { + let req = read_one_request(&mut server_read).await; + ids.insert(req.method.clone(), req.id); + } + + // Respond to "bad" with a correctly framed but truncated body — the + // Content-Length is honest, the JSON ends mid-`\u` escape. + let truncated = format!(r#"{{"jsonrpc":"2.0","id":{},"result":"\u00"#, ids["bad"]); + write_framed(&mut server_write, truncated.as_bytes()).await; + + // Respond to "good" with a well-formed response. + let ok = serde_json::json!({ + "jsonrpc": "2.0", + "id": ids["good"], + "result": {"ok": true} + }); + write_framed(&mut server_write, &serde_json::to_vec(&ok).unwrap()).await; + + // Hold the transport open so the read loop never sees EOF (which would + // itself cancel pending requests and mask the behavior under test). + tokio::time::sleep(Duration::from_millis(200)).await; + }); + + // The healthy request must succeed — it is NOT collateral damage. + let good_response = timeout(Duration::from_secs(2), good) + .await + .expect("good request hung") + .unwrap() + .unwrap(); + assert_eq!(good_response.result.unwrap()["ok"], true); + + // The malformed request resolves promptly with a parse error instead of + // hanging forever. + let bad_response = timeout(Duration::from_secs(2), bad) + .await + .expect("bad request hung") + .unwrap() + .unwrap(); + assert!(bad_response.is_error()); + assert_eq!(bad_response.error.unwrap().code, -32700); + + server.await.unwrap(); +} From f30cb259bfc4dbf112a4214f3df2f5878a44925a Mon Sep 17 00:00:00 2001 From: Enrique Moreno <403428+chuwik@users.noreply.github.com> Date: Tue, 9 Jun 2026 09:28:15 -0700 Subject: [PATCH 2/2] fix(jsonrpc): recover response id across JSON whitespace Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- rust/src/jsonrpc.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/src/jsonrpc.rs b/rust/src/jsonrpc.rs index 0f59b89b8..6a2c02848 100644 --- a/rust/src/jsonrpc.rs +++ b/rust/src/jsonrpc.rs @@ -190,7 +190,7 @@ fn extract_response_id(body: &[u8]) -> Option { let after = &head[key_pos + key.len()..]; let mut i = 0; - while i < after.len() && matches!(after[i], b' ' | b'\t' | b':') { + while i < after.len() && matches!(after[i], b' ' | b'\t' | b'\n' | b'\r' | b':') { i += 1; } let start = i; @@ -773,6 +773,12 @@ mod tests { ); } + #[test] + fn extract_response_id_handles_json_whitespace_after_id_key() { + let body = b"{\r\n \"jsonrpc\": \"2.0\",\r\n \"id\"\r\n :\r\n 4271,\r\n \"result\": null\r\n}"; + assert_eq!(extract_response_id(body), Some(4271)); + } + #[test] fn extract_response_id_returns_none_for_notifications() { let body = br#"{"jsonrpc":"2.0","method":"session.event","params":{"id":"e1"}}"#;