Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions openless-all/app/src-tauri/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ struct Inner {
qa_asr: Mutex<Option<Arc<VolcengineStreamingASR>>>,
/// QA 用的 Recorder 句柄。
qa_recorder: Mutex<Option<Recorder>>,
/// QA SSE 流取消标志。begin_qa_session 重置为 false;cancel_qa_session 设 true;
/// polish::chat_completion_history_streaming 的 loop 每帧检查,true 时 break loop
/// 避免取消后 LLM 仍 drain HTTP body 烧 token。详见 issue #161。
qa_stream_cancelled: Arc<AtomicBool>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -187,6 +191,7 @@ impl Coordinator {
capsule_layout: Mutex::new(None),
qa_asr: Mutex::new(None),
qa_recorder: Mutex::new(None),
qa_stream_cancelled: Arc::new(AtomicBool::new(false)),
}),
}
}
Expand Down Expand Up @@ -755,7 +760,18 @@ async fn handle_window_hotkey_event(
repeat: bool,
) -> Result<(), String> {
if event_type == "keydown" && key == "Escape" {
cancel_session(inner);
// Esc 路由(issue #161):QA 浮窗可见时优先取消 QA(不动 dictation);
// 否则走 dictation 取消通路。之前无条件 cancel_session 导致 QA 浮窗
// 按 Esc 杀的是 dictation 而 QA 流还在烧 token。
let qa_active = {
let st = inner.qa_state.lock();
st.panel_visible || st.phase != QaPhase::Idle
};
if qa_active {
close_qa_panel(inner);
} else {
cancel_session(inner);
}
return Ok(());
}

Expand Down Expand Up @@ -1716,6 +1732,8 @@ async fn begin_qa_session(inner: &Arc<Inner>) -> Result<(), String> {
state.front_app = capture_frontmost_app();
state.selection = None;
}
// 重置 SSE 取消标志:上一轮可能 set 过的 true 留着会让本轮流式立即 break。
inner.qa_stream_cancelled.store(false, Ordering::SeqCst);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Scope QA stream cancel flag per session

qa_stream_cancelled is shared across all QA sessions, and this reset runs at the start of every new session. If the user closes QA (sets cancel=true) and quickly starts another session before the previous streaming task exits, this line flips the old task back to false, so the previous SSE loop keeps draining tokens again. That reintroduces the cancellation race this patch is trying to fix; the cancel signal should be session-scoped (e.g., token/id) rather than a single global boolean.

Useful? React with 👍 / 👎.


// 抓选区。每轮按 Option 都重新抓一次:用户多轮提问中可以重新选别处文字。
// 浮窗 focus:false,原 app 仍是 frontmost,AX/Cmd+C fallback 都能拿到。
Expand Down Expand Up @@ -1951,8 +1969,17 @@ async fn end_qa_session(inner: &Arc<Inner>) -> Result<(), String> {

// 流式回调:每个 SSE delta 立刻推一帧 qa:state{kind:"answer_delta"} 给前端,
// 浮窗里气泡边收边长。最终的 messages 由 answer 事件统一下发(保证一致性)。
//
// session_id 守卫(issue #161):闭包捕获本会话 id;用户取消 → 关浮窗 → 开新浮窗
// 开新一轮时,旧的 in-flight LLM 流仍可能 emit chunk,必须在 emit 前比对当前
// qa_state.session_id == 捕获 id,否则跳过——避免旧会话的字漏进新气泡。
let captured_session_id = inner.qa_state.lock().session_id;
let inner_for_delta = Arc::clone(inner);
let on_delta = move |chunk: &str| {
let cur_id = inner_for_delta.qa_state.lock().session_id;
if cur_id != captured_session_id {
return; // 旧 session 漏来的 chunk,丢弃
}
if let Some(app) = inner_for_delta.app.lock().clone() {
let _ = app.emit_to(
"qa",
Expand All @@ -1965,11 +1992,17 @@ async fn end_qa_session(inner: &Arc<Inner>) -> Result<(), String> {
}
};

// SSE 流取消旗标:cancel_qa_session / close_qa_panel 会 set true,
// polish 的 SSE loop 每帧检查 → break,释放 HTTP body。详见 issue #161。
let cancel_flag = Arc::clone(&inner.qa_stream_cancelled);
let should_cancel = move || cancel_flag.load(Ordering::Relaxed);

let answer = match answer_chat_dispatch(
&messages_for_llm,
&working_languages,
front_app.as_deref(),
on_delta,
should_cancel,
)
.await
{
Expand Down Expand Up @@ -2093,6 +2126,10 @@ fn cancel_qa_session(inner: &Arc<Inner>) {
return;
}
inner.qa_state.lock().cancelled = true;
// SSE 流取消旗标——polish::chat_completion_history_streaming 的 loop 每帧检查
// 这个 flag,true 时立即 break 不再 drain HTTP body,避免取消后 LLM 仍烧 token。
// 详见 issue #161。
inner.qa_stream_cancelled.store(true, Ordering::SeqCst);
if let Some(rec) = inner.qa_recorder.lock().take() {
rec.stop();
}
Expand All @@ -2107,14 +2144,16 @@ fn cancel_qa_session(inner: &Arc<Inner>) {
log::info!("[coord] QA session cancelled (was {phase:?})");
}

async fn answer_chat_dispatch<F>(
async fn answer_chat_dispatch<F, C>(
messages: &[crate::types::QaChatMessage],
working_languages: &[String],
front_app: Option<&str>,
on_delta: F,
should_cancel: C,
) -> anyhow::Result<String>
where
F: Fn(&str) + Send + Sync,
C: Fn() -> bool + Send + Sync,
{
let api_key = CredentialsVault::get(CredentialAccount::ArkApiKey)?.unwrap_or_default();
if api_key.is_empty() {
Expand All @@ -2133,7 +2172,7 @@ where
let config = OpenAICompatibleConfig::new("ark", "Doubao Ark", base_url, api_key, model);
let provider = OpenAICompatibleLLMProvider::new(config);
Ok(provider
.answer_chat_streaming(messages, working_languages, front_app, on_delta)
.answer_chat_streaming(messages, working_languages, front_app, on_delta, should_cancel)
.await?)
}

Expand Down
16 changes: 13 additions & 3 deletions openless-all/app/src-tauri/src/polish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,23 @@ impl OpenAICompatibleLLMProvider {
/// 最后一条必须是新一轮的 user 提问。第一条 user 消息里如果有选区,调用方应在
/// content 里就把选区原文注入。`on_delta` 在每个 SSE chunk 到达时被调;最终返回
/// 拼好的完整字符串(用于写入 messages 历史)。详见 issue #118 v2。
pub async fn answer_chat_streaming<F>(
pub async fn answer_chat_streaming<F, C>(
&self,
messages: &[QaChatMessage],
working_languages: &[String],
front_app: Option<&str>,
on_delta: F,
should_cancel: C,
) -> Result<String, LLMError>
where
F: Fn(&str) + Send + Sync,
C: Fn() -> bool + Send + Sync,
{
let mut system_prompt = prompts::qa_system_prompt();
if let Some(premise) = context_premise(working_languages, front_app) {
system_prompt = format!("{}\n\n{}", premise, system_prompt);
}
self.chat_completion_history_streaming(&system_prompt, messages, on_delta)
self.chat_completion_history_streaming(&system_prompt, messages, on_delta, should_cancel)
.await
}

Expand Down Expand Up @@ -209,14 +211,16 @@ impl OpenAICompatibleLLMProvider {
/// 与 `chat_completion` 同条 HTTP 通路,但开 `stream: true` 并把 SSE chunk 一边
/// 解析、一边通过 `on_delta` 推给调用方(用于实时把答案塞进浮窗气泡)。
/// 最终返回拼好的完整字符串供调用方写入对话历史。
async fn chat_completion_history_streaming<F>(
async fn chat_completion_history_streaming<F, C>(
&self,
system_prompt: &str,
history: &[QaChatMessage],
on_delta: F,
should_cancel: C,
) -> Result<String, LLMError>
where
F: Fn(&str) + Send + Sync,
C: Fn() -> bool + Send + Sync,
{
if self.config.api_key.trim().is_empty() {
return Err(LLMError::MissingCredentials);
Expand Down Expand Up @@ -287,6 +291,12 @@ impl OpenAICompatibleLLMProvider {
let mut buffer = String::new();
let mut full_text = String::new();
loop {
// 取消旗标:用户取消 / 关浮窗时立即 break,不再 drain HTTP body。
// 否则 reqwest 会读完整个流(包括 LLM 后续 token)烧 quota。详见 issue #161。
if should_cancel() {
log::info!("[llm] stream cancelled by caller; breaking SSE loop");
break;
Comment on lines +296 to +298
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Return cancellation explicitly instead of generic stream break

Breaking the SSE loop on should_cancel() without a cancellation-specific return path means the function can fall through with empty full_text, which is later treated as InvalidResponse("empty stream"). In the QA flow this turns user-initiated cancel/close into an error path (finish_qa_with_error), causing spurious failure handling instead of a silent cancel. The loop should propagate an explicit cancelled outcome so callers can short-circuit normally.

Useful? React with 👍 / 👎.

}
let chunk_opt = response
.chunk()
.await
Expand Down
1 change: 1 addition & 0 deletions openless-all/app/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export interface UserPreferences {
export type QaStateKind =
| 'idle'
| 'recording'
| 'loading'
| 'thinking'
| 'answer_delta'
| 'answer'
Expand Down
9 changes: 9 additions & 0 deletions openless-all/app/src/pages/QaPanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ export function QaPanel() {
setErrorMsg('');
setStreamingAnswer('');
break;
case 'loading':
// ASR 在 finalize、user message 还没 push 的过渡帧。提前切到 thinking
// 视图避免 UI 卡 recording 几百 ms 反馈缺失。详见 issue #161。
setStatus('thinking');
setSelectionPreview('');
setErrorMsg('');
setStreamingAnswer('');
setLevel(0);
break;
case 'thinking':
setStatus('thinking');
setSelectionPreview('');
Expand Down
Loading