Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions src-tauri/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ pub async fn stream_ollama_chat(
};

let mut accumulated = String::new();
// Tracks whether a terminal Done was already emitted, so the stream-end
// branch can emit one when Ollama closes without a done:true line without
// double-emitting on the normal completion path.
let mut done_emitted = false;

let res = client.post(endpoint).json(&request_payload).send().await;

Expand Down Expand Up @@ -447,6 +451,7 @@ pub async fn stream_ollama_chat(
}
if let Some(true) = json.done {
on_chunk(StreamChunk::Done);
done_emitted = true;
}
}
}
Expand All @@ -456,7 +461,19 @@ pub async fn stream_ollama_chat(
on_chunk(StreamChunk::Error(classify_stream_error(&e)));
return accumulated;
}
None => return accumulated,
None => {
// Ollama can drop the stream without its
// terminal done:true line (e.g. a small model
// degenerating on a long repetitive token run
// and the runner closing the connection). Emit
// a terminal Done so the frontend always leaves
// its streaming state instead of spinning
// forever on the missing terminal event.
if !done_emitted {
on_chunk(StreamChunk::Done);
}
return accumulated;
}
}
}
}
Expand Down Expand Up @@ -883,6 +900,67 @@ mod tests {
std::mem::discriminant(&chunks[2]),
std::mem::discriminant(&StreamChunk::Done)
);
assert_eq!(
chunks.len(),
3,
"a single terminal Done; the stream-end branch must not emit a duplicate"
);
assert_eq!(accumulated, "Hello world");
}

/// Ollama can end the response stream without its usual terminal
/// `done:true` line (observed when a small model degenerates on a long
/// repetitive token run and the runner drops the connection). The loop
/// must still emit a terminal `Done` so the frontend exits its streaming
/// state instead of spinning forever.
#[tokio::test]
async fn emits_done_when_stream_ends_without_done_marker() {
let mut server = mockito::Server::new_async().await;
// Note: no `chat_line("", true)` line; the stream just stops.
let body = format!(
"{}{}",
chat_line("Hello", false),
chat_line(" world", false)
);
let mock = server
.mock("POST", "/api/chat")
.with_body(body)
.create_async()
.await;

let client = reqwest::Client::new();
let token = CancellationToken::new();
let (chunks, callback) = collect_chunks();

let accumulated = stream_ollama_chat(
OllamaChatParams {
endpoint: format!("{}/api/chat", server.url()),
model: "test-model".to_string(),
messages: vec![ChatMessage {
role: "user".to_string(),
content: "hi".to_string(),
images: None,
}],
think: false,
keep_alive: None,
num_ctx: DEFAULT_NUM_CTX,
},
&client,
token,
callback,
)
.await;

mock.assert_async().await;
let chunks = chunks.lock().unwrap();
assert!(matches!(&chunks[0], StreamChunk::Token(t) if t == "Hello"));
assert!(matches!(&chunks[1], StreamChunk::Token(t) if t == " world"));
assert_eq!(
std::mem::discriminant(&chunks[2]),
std::mem::discriminant(&StreamChunk::Done),
"stream ending without done:true must still produce a terminal Done"
);
assert_eq!(chunks.len(), 3);
assert_eq!(accumulated, "Hello world");
}

Expand Down Expand Up @@ -1025,7 +1103,13 @@ mod tests {

mock.assert_async().await;
let chunks = chunks.lock().unwrap();
assert!(chunks.is_empty());
// An empty 200 body still ends the stream: emit a single terminal Done
// so the frontend leaves its streaming state, with no content.
assert_eq!(chunks.len(), 1);
assert_eq!(
std::mem::discriminant(&chunks[0]),
std::mem::discriminant(&StreamChunk::Done)
);
assert!(accumulated.is_empty());
}

Expand Down