Skip to content

Commit d685824

Browse files
committed
Fixed: Preserve buffered output on tokio bash timeout
Avoid canceling stdout/stderr drains by spawning independent pipe-read tasks and racing only child completion against timeout. On timeout, kill the process tree, await drain tasks briefly to capture buffered output, and include captured stdout/stderr in timeout errors; add a regression test for pre-timeout output.
1 parent 4cac3b1 commit d685824

1 file changed

Lines changed: 120 additions & 31 deletions

File tree

src/llm-coding-tools-core/src/tools/bash/tokio_impl.rs

Lines changed: 120 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,77 @@
22
33
use super::{BashOutput, PIPE_BUFFER_CAPACITY};
44
use crate::error::{ToolError, ToolResult};
5+
use core::fmt::Write;
56
use process_wrap::tokio::*;
67
use std::path::Path;
8+
use std::pin::Pin;
79
use std::process::Stdio;
810
use std::time::Duration;
9-
use tokio::io::AsyncReadExt;
11+
use tokio::io::{AsyncRead, AsyncReadExt};
12+
use tokio::task::JoinHandle;
13+
14+
/// Maximum time to wait for pipe drains after timeout kill.
15+
const PIPE_DRAIN_GRACE_PERIOD: Duration = Duration::from_millis(100);
16+
17+
#[inline]
18+
fn spawn_pipe_drain_task<R>(mut pipe: R) -> JoinHandle<Vec<u8>>
19+
where
20+
R: AsyncRead + Unpin + Send + 'static,
21+
{
22+
tokio::spawn(async move {
23+
let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY);
24+
let _ = pipe.read_to_end(&mut buf).await;
25+
buf
26+
})
27+
}
28+
29+
#[inline]
30+
async fn await_pipe_drain_task(task: JoinHandle<Vec<u8>>) -> Vec<u8> {
31+
task.await.unwrap_or_default()
32+
}
33+
34+
#[inline]
35+
async fn await_pipe_drain_task_with_grace(
36+
mut task: JoinHandle<Vec<u8>>,
37+
grace: Duration,
38+
) -> Vec<u8> {
39+
tokio::select! {
40+
result = &mut task => result.unwrap_or_default(),
41+
_ = tokio::time::sleep(grace) => {
42+
task.abort();
43+
task.await.unwrap_or_default()
44+
}
45+
}
46+
}
47+
48+
#[inline]
49+
fn timeout_with_buffered_output(
50+
timeout: Duration,
51+
stdout_data: &[u8],
52+
stderr_data: &[u8],
53+
) -> ToolError {
54+
let stdout = String::from_utf8_lossy(stdout_data);
55+
let stderr = String::from_utf8_lossy(stderr_data);
56+
57+
// Base message + outputs + stderr label.
58+
let mut message = String::with_capacity(stdout.len() + stderr.len() + 64);
59+
let _ = write!(message, "command timed out after {}ms", timeout.as_millis());
60+
61+
if !stdout.is_empty() {
62+
message.push('\n');
63+
message.push_str(&stdout);
64+
}
65+
66+
if !stderr.is_empty() {
67+
if stdout.is_empty() || !stdout.ends_with('\n') {
68+
message.push('\n');
69+
}
70+
message.push_str("[stderr]\n");
71+
message.push_str(&stderr);
72+
}
73+
74+
ToolError::Timeout(message)
75+
}
1076

1177
/// Executes a shell command with optional working directory and timeout.
1278
///
@@ -67,40 +133,29 @@ pub async fn execute_command(
67133

68134
// Take stdout/stderr handles to drain them concurrently with process wait.
69135
// This prevents deadlock when output exceeds pipe buffer (~64KB Linux, ~4KB Windows).
70-
let mut stdout_pipe = child.stdout().take().expect("stdout was piped");
71-
let mut stderr_pipe = child.stderr().take().expect("stderr was piped");
136+
let stdout_pipe = child.stdout().take().expect("stdout was piped");
137+
let stderr_pipe = child.stderr().take().expect("stderr was piped");
72138

73-
// Race between timeout and (process completion + pipe draining).
74-
// Using join! inside select! avoids tokio::spawn overhead while still
75-
// providing concurrent I/O for the pipe reads.
76-
tokio::select! {
139+
// Keep output drains independent from timeout selection so timed-out
140+
// commands can still return buffered stdout/stderr.
141+
let stdout_task = spawn_pipe_drain_task(stdout_pipe);
142+
let stderr_task = spawn_pipe_drain_task(stderr_pipe);
143+
144+
// Race between timeout and process completion. Pipe drain tasks keep running
145+
// regardless of which branch wins this select.
146+
let wait_result = tokio::select! {
77147
biased; // Check timeout first for consistent behavior
78148

79-
_ = tokio::time::sleep(timeout) => {
80-
// Timeout: explicitly kill the process tree (Job Object on Windows, process group on Unix)
81-
let _ = Pin::from(child.kill()).await;
82-
Err(ToolError::Timeout(format!(
83-
"command timed out after {}ms",
84-
timeout.as_millis()
85-
)))
86-
}
149+
_ = tokio::time::sleep(timeout) => None,
150+
status = child.wait() => Some(status),
151+
};
87152

88-
result = async {
89-
tokio::join!(
90-
child.wait(),
91-
async {
92-
let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY);
93-
let _ = stdout_pipe.read_to_end(&mut buf).await;
94-
buf
95-
},
96-
async {
97-
let mut buf = Vec::with_capacity(PIPE_BUFFER_CAPACITY);
98-
let _ = stderr_pipe.read_to_end(&mut buf).await;
99-
buf
100-
}
101-
)
102-
} => {
103-
let (status, stdout_data, stderr_data) = result;
153+
match wait_result {
154+
Some(status) => {
155+
let (stdout_data, stderr_data) = tokio::join!(
156+
await_pipe_drain_task(stdout_task),
157+
await_pipe_drain_task(stderr_task)
158+
);
104159
let status = status.map_err(|e| ToolError::Execution(e.to_string()))?;
105160

106161
Ok(BashOutput {
@@ -109,6 +164,22 @@ pub async fn execute_command(
109164
stderr: String::from_utf8_lossy(&stderr_data).into_owned(),
110165
})
111166
}
167+
None => {
168+
// Timeout: explicitly kill the process tree (Job Object on Windows,
169+
// process group on Unix), then briefly await pipe drains for buffered output.
170+
let _ = Pin::from(child.kill()).await;
171+
172+
let (stdout_data, stderr_data) = tokio::join!(
173+
await_pipe_drain_task_with_grace(stdout_task, PIPE_DRAIN_GRACE_PERIOD),
174+
await_pipe_drain_task_with_grace(stderr_task, PIPE_DRAIN_GRACE_PERIOD)
175+
);
176+
177+
Err(timeout_with_buffered_output(
178+
timeout,
179+
&stdout_data,
180+
&stderr_data,
181+
))
182+
}
112183
}
113184
}
114185

@@ -157,6 +228,24 @@ mod tests {
157228
assert!(matches!(result, Err(ToolError::Timeout(_))));
158229
}
159230

231+
#[tokio::test]
232+
async fn timeout_preserves_buffered_output() {
233+
let cmd = if cfg!(target_os = "windows") {
234+
"echo stdout-before-timeout & echo stderr-before-timeout 1>&2 & ping -n 10 127.0.0.1 >nul"
235+
} else {
236+
"echo stdout-before-timeout; echo stderr-before-timeout 1>&2; sleep 10"
237+
};
238+
239+
let result = execute_command(cmd, None, Duration::from_millis(100)).await;
240+
match result {
241+
Err(ToolError::Timeout(message)) => {
242+
assert!(message.contains("stdout-before-timeout"));
243+
assert!(message.contains("stderr-before-timeout"));
244+
}
245+
other => panic!("expected timeout error, got: {other:?}"),
246+
}
247+
}
248+
160249
#[tokio::test]
161250
async fn invalid_workdir_returns_error() {
162251
let result = execute_command(

0 commit comments

Comments
 (0)