From a40b3aabc6130fd48f94e679b7b6d2d6a1f1b9b9 Mon Sep 17 00:00:00 2001 From: Logan Nguyen Date: Tue, 9 Jun 2026 14:10:41 -0500 Subject: [PATCH] fix: emit terminal Done when the model stream ends without a done marker Signed-off-by: Logan Nguyen --- src-tauri/src/commands.rs | 88 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 2 deletions(-) diff --git a/src-tauri/src/commands.rs b/src-tauri/src/commands.rs index c0af4753..d9d4d7d4 100644 --- a/src-tauri/src/commands.rs +++ b/src-tauri/src/commands.rs @@ -378,6 +378,10 @@ pub async fn stream_ollama_chat( }; let mut accumulated = String::new(); + // Tracks whether a terminal Done was already emitted, so the stream-end + // branch can emit one when Ollama closes without a done:true line without + // double-emitting on the normal completion path. + let mut done_emitted = false; let res = client.post(endpoint).json(&request_payload).send().await; @@ -447,6 +451,7 @@ pub async fn stream_ollama_chat( } if let Some(true) = json.done { on_chunk(StreamChunk::Done); + done_emitted = true; } } } @@ -456,7 +461,19 @@ pub async fn stream_ollama_chat( on_chunk(StreamChunk::Error(classify_stream_error(&e))); return accumulated; } - None => return accumulated, + None => { + // Ollama can drop the stream without its + // terminal done:true line (e.g. a small model + // degenerating on a long repetitive token run + // and the runner closing the connection). Emit + // a terminal Done so the frontend always leaves + // its streaming state instead of spinning + // forever on the missing terminal event. + if !done_emitted { + on_chunk(StreamChunk::Done); + } + return accumulated; + } } } } @@ -883,6 +900,67 @@ mod tests { std::mem::discriminant(&chunks[2]), std::mem::discriminant(&StreamChunk::Done) ); + assert_eq!( + chunks.len(), + 3, + "a single terminal Done; the stream-end branch must not emit a duplicate" + ); + assert_eq!(accumulated, "Hello world"); + } + + /// Ollama can end the response stream without its usual terminal + /// `done:true` line (observed when a small model degenerates on a long + /// repetitive token run and the runner drops the connection). The loop + /// must still emit a terminal `Done` so the frontend exits its streaming + /// state instead of spinning forever. + #[tokio::test] + async fn emits_done_when_stream_ends_without_done_marker() { + let mut server = mockito::Server::new_async().await; + // Note: no `chat_line("", true)` line; the stream just stops. + let body = format!( + "{}{}", + chat_line("Hello", false), + chat_line(" world", false) + ); + let mock = server + .mock("POST", "/api/chat") + .with_body(body) + .create_async() + .await; + + let client = reqwest::Client::new(); + let token = CancellationToken::new(); + let (chunks, callback) = collect_chunks(); + + let accumulated = stream_ollama_chat( + OllamaChatParams { + endpoint: format!("{}/api/chat", server.url()), + model: "test-model".to_string(), + messages: vec![ChatMessage { + role: "user".to_string(), + content: "hi".to_string(), + images: None, + }], + think: false, + keep_alive: None, + num_ctx: DEFAULT_NUM_CTX, + }, + &client, + token, + callback, + ) + .await; + + mock.assert_async().await; + let chunks = chunks.lock().unwrap(); + assert!(matches!(&chunks[0], StreamChunk::Token(t) if t == "Hello")); + assert!(matches!(&chunks[1], StreamChunk::Token(t) if t == " world")); + assert_eq!( + std::mem::discriminant(&chunks[2]), + std::mem::discriminant(&StreamChunk::Done), + "stream ending without done:true must still produce a terminal Done" + ); + assert_eq!(chunks.len(), 3); assert_eq!(accumulated, "Hello world"); } @@ -1025,7 +1103,13 @@ mod tests { mock.assert_async().await; let chunks = chunks.lock().unwrap(); - assert!(chunks.is_empty()); + // An empty 200 body still ends the stream: emit a single terminal Done + // so the frontend leaves its streaming state, with no content. + assert_eq!(chunks.len(), 1); + assert_eq!( + std::mem::discriminant(&chunks[0]), + std::mem::discriminant(&StreamChunk::Done) + ); assert!(accumulated.is_empty()); }