-
Notifications
You must be signed in to change notification settings - Fork 918
fix: preserve ProcessError context from failed CLI subprocesses #843
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,13 @@ | ||
| """Subprocess transport implementation using Claude Code CLI.""" | ||
|
|
||
| import inspect | ||
| import json | ||
| import logging | ||
| import os | ||
| import platform | ||
| import re | ||
| import shutil | ||
| from collections import deque | ||
| from collections.abc import AsyncIterable, AsyncIterator | ||
| from contextlib import suppress | ||
| from pathlib import Path | ||
|
|
@@ -26,6 +28,7 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
| _DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit | ||
| _STDERR_CAPTURE_LIMIT = 8 * 1024 # Keep the tail of stderr for ProcessError. | ||
| MINIMUM_CLAUDE_CODE_VERSION = "2.0.0" | ||
|
|
||
|
|
||
|
|
@@ -50,7 +53,11 @@ def __init__( | |
| self._stdout_stream: TextReceiveStream | None = None | ||
| self._stdin_stream: TextSendStream | None = None | ||
| self._stderr_stream: TextReceiveStream | None = None | ||
| self._stderr_process_stream: Any | None = None | ||
| self._stderr_task_group: anyio.abc.TaskGroup | None = None | ||
| self._stderr_reader_finished: anyio.Event | None = None | ||
| self._stderr_buffer: deque[str] = deque() | ||
| self._stderr_buffer_size = 0 | ||
| self._ready = False | ||
| self._exit_error: Exception | None = None # Track process exit errors | ||
| self._max_buffer_size = ( | ||
|
|
@@ -436,20 +443,18 @@ async def connect(self) -> None: | |
| if self._cwd: | ||
| process_env["PWD"] = self._cwd | ||
|
|
||
| # Pipe stderr if we have a callback OR debug mode is enabled | ||
| # Pipe stderr so ProcessError can include the CLI's actual stderr. | ||
| # We only stream it live when callers asked for callback/debug output. | ||
| should_pipe_stderr = ( | ||
| self._options.stderr is not None | ||
| or "debug-to-stderr" in self._options.extra_args | ||
| ) | ||
|
Comment on lines
+446
to
451
|
||
|
|
||
| # For backward compat: use debug_stderr file object if no callback and debug is on | ||
| stderr_dest = PIPE if should_pipe_stderr else None | ||
|
|
||
| self._process = await anyio.open_process( | ||
| cmd, | ||
| stdin=PIPE, | ||
| stdout=PIPE, | ||
| stderr=stderr_dest, | ||
| stderr=PIPE, | ||
| cwd=self._cwd, | ||
| env=process_env, | ||
| user=self._options.user, | ||
|
|
@@ -458,9 +463,14 @@ async def connect(self) -> None: | |
| if self._process.stdout: | ||
| self._stdout_stream = TextReceiveStream(self._process.stdout) | ||
|
|
||
| # Setup stderr stream if piped | ||
| if should_pipe_stderr and self._process.stderr: | ||
| self._stderr_stream = TextReceiveStream(self._process.stderr) | ||
| # Setup stderr stream if available. | ||
| if self._process.stderr: | ||
| self._stderr_process_stream = self._process.stderr | ||
| if self._stderr_process_stream and ( | ||
| should_pipe_stderr or self._supports_live_stderr_reading() | ||
| ): | ||
| self._stderr_stream = TextReceiveStream(self._stderr_process_stream) | ||
| self._stderr_reader_finished = anyio.Event() | ||
| # Start async task to read stderr | ||
| self._stderr_task_group = anyio.create_task_group() | ||
| await self._stderr_task_group.__aenter__() | ||
|
|
@@ -488,6 +498,35 @@ async def connect(self) -> None: | |
| self._exit_error = error | ||
| raise error from e | ||
|
|
||
| def _capture_stderr_line(self, line: str) -> None: | ||
| """Keep a bounded tail of stderr for ProcessError diagnostics.""" | ||
| self._stderr_buffer.append(line) | ||
| self._stderr_buffer_size += len(line) + 1 | ||
|
|
||
| while self._stderr_buffer and self._stderr_buffer_size > _STDERR_CAPTURE_LIMIT: | ||
| removed = self._stderr_buffer.popleft() | ||
| self._stderr_buffer_size -= len(removed) + 1 | ||
|
|
||
| async def _drain_stderr_stream(self, stream: AsyncIterator[str]) -> None: | ||
| """Read any remaining stderr lines into the capture buffer.""" | ||
| try: | ||
| async for line in stream: | ||
| line_str = line.rstrip() | ||
| if line_str: | ||
| self._capture_stderr_line(line_str) | ||
| except anyio.ClosedResourceError: | ||
| pass | ||
| except Exception: | ||
| pass | ||
|
|
||
| def _supports_live_stderr_reading(self) -> bool: | ||
| """Return True when the stderr stream can be consumed concurrently.""" | ||
| if self._stderr_process_stream is None: | ||
| return False | ||
|
|
||
| receive = getattr(self._stderr_process_stream, "receive", None) | ||
| return inspect.iscoroutinefunction(receive) | ||
|
|
||
| async def _handle_stderr(self) -> None: | ||
| """Handle stderr stream - read and invoke callbacks.""" | ||
| if not self._stderr_stream: | ||
|
|
@@ -499,22 +538,25 @@ async def _handle_stderr(self) -> None: | |
| if not line_str: | ||
| continue | ||
|
|
||
| self._capture_stderr_line(line_str) | ||
|
|
||
| # Call the stderr callback if provided | ||
| if self._options.stderr: | ||
| self._options.stderr(line_str) | ||
|
|
||
| # For backward compatibility: write to debug_stderr if in debug mode | ||
| elif ( | ||
| "debug-to-stderr" in self._options.extra_args | ||
| and self._options.debug_stderr | ||
| ): | ||
| # Preserve inherited-stderr behavior by forwarding stderr to | ||
| # the configured sink (defaults to sys.stderr). | ||
| elif self._options.debug_stderr: | ||
| self._options.debug_stderr.write(line_str + "\n") | ||
| if hasattr(self._options.debug_stderr, "flush"): | ||
| self._options.debug_stderr.flush() | ||
|
Comment on lines
+547
to
552
|
||
| except anyio.ClosedResourceError: | ||
| pass # Stream closed, exit normally | ||
| except Exception: | ||
| pass # Ignore other errors during stderr reading | ||
| finally: | ||
| if self._stderr_reader_finished is not None: | ||
| self._stderr_reader_finished.set() | ||
|
|
||
| async def close(self) -> None: | ||
| """Close the transport and clean up resources.""" | ||
|
|
@@ -528,6 +570,7 @@ async def close(self) -> None: | |
| self._stderr_task_group.cancel_scope.cancel() | ||
| await self._stderr_task_group.__aexit__(None, None, None) | ||
| self._stderr_task_group = None | ||
| self._stderr_reader_finished = None | ||
|
|
||
| # Close stdin stream (acquire lock to prevent race with concurrent writes) | ||
| async with self._write_lock: | ||
|
|
@@ -541,6 +584,7 @@ async def close(self) -> None: | |
| with suppress(Exception): | ||
| await self._stderr_stream.aclose() | ||
| self._stderr_stream = None | ||
| self._stderr_process_stream = None | ||
|
|
||
| # Wait for graceful shutdown after stdin EOF, then terminate if needed. | ||
| # The subprocess needs time to flush its session file after receiving | ||
|
|
@@ -568,6 +612,10 @@ async def close(self) -> None: | |
| self._stdout_stream = None | ||
| self._stdin_stream = None | ||
| self._stderr_stream = None | ||
| self._stderr_process_stream = None | ||
| self._stderr_reader_finished = None | ||
| self._stderr_buffer.clear() | ||
| self._stderr_buffer_size = 0 | ||
| self._exit_error = None | ||
|
|
||
| async def write(self, data: str) -> None: | ||
|
|
@@ -678,10 +726,22 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: | |
|
|
||
| # Use exit code for error detection | ||
| if returncode is not None and returncode != 0: | ||
| if self._stderr_task_group is None: | ||
| if self._stderr_stream is not None: | ||
| await self._drain_stderr_stream(self._stderr_stream) | ||
| elif self._stderr_process_stream is not None: | ||
| stderr_stream = TextReceiveStream(self._stderr_process_stream) | ||
| await self._drain_stderr_stream(stderr_stream) | ||
| with suppress(Exception): | ||
| await stderr_stream.aclose() | ||
| elif self._stderr_reader_finished is not None: | ||
| await self._stderr_reader_finished.wait() | ||
|
|
||
| stderr_output = "\n".join(self._stderr_buffer) or None | ||
| self._exit_error = ProcessError( | ||
| f"Command failed with exit code {returncode}", | ||
| exit_code=returncode, | ||
| stderr="Check stderr output for details", | ||
| stderr=stderr_output, | ||
| ) | ||
| raise self._exit_error | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the error message is still logged via
logger.error(... {e})(just above), adding the raw exception object here will commonly mean the log line now includes captured CLI stderr (sinceProcessErrorembeds stderr in__str__). This may leak sensitive subprocess output into logs and duplicates stderr already forwarded to the sink. Consider redacting what gets logged (e.g., exception type + exit_code) or switching tologger.error("Fatal error in message reader", exc_info=True)without interpolating the message.