Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
logger = logging.getLogger(__name__)

_DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
_MAX_STDERR_TAIL_LINES = 50
MINIMUM_CLAUDE_CODE_VERSION = "2.0.0"


Expand All @@ -51,6 +52,8 @@ def __init__(
self._stdin_stream: TextSendStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stderr_task_group: anyio.abc.TaskGroup | None = None
self._stderr_tail: list[str] = []
self._stderr_complete: anyio.Event | None = None
self._ready = False
self._exit_error: Exception | None = None # Track process exit errors
self._max_buffer_size = (
Expand Down Expand Up @@ -340,6 +343,9 @@ async def connect(self) -> None:
if self._process:
return

self._stderr_tail = []
self._stderr_complete = None

if self._cli_path is None:
self._cli_path = await anyio.to_thread.run_sync(self._find_cli)

Expand Down Expand Up @@ -394,14 +400,11 @@ async def connect(self) -> None:
if self._cwd:
process_env["PWD"] = self._cwd

# Pipe stderr if we have a callback OR debug mode is enabled
should_pipe_stderr = (
self._options.stderr is not None
or "debug-to-stderr" in self._options.extra_args
)

# For backward compat: use debug_stderr file object if no callback and debug is on
stderr_dest = PIPE if should_pipe_stderr else None
# Keep the existing live stderr forwarding behavior in
# _handle_stderr() for callbacks and debug mode, and always pipe
# stderr so non-zero exits can surface CLI error text in
# ProcessError.
stderr_dest = PIPE

self._process = await anyio.open_process(
cmd,
Expand All @@ -417,8 +420,9 @@ async def connect(self) -> None:
self._stdout_stream = TextReceiveStream(self._process.stdout)

# Setup stderr stream if piped
if should_pipe_stderr and self._process.stderr:
if self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
self._stderr_complete = anyio.Event()
# Start async task to read stderr
self._stderr_task_group = anyio.create_task_group()
await self._stderr_task_group.__aenter__()
Expand Down Expand Up @@ -456,6 +460,9 @@ async def _handle_stderr(self) -> None:
line_str = line.rstrip()
if not line_str:
continue
self._stderr_tail.append(line_str)
if len(self._stderr_tail) > _MAX_STDERR_TAIL_LINES:
self._stderr_tail.pop(0)

# Call the stderr callback if provided
if self._options.stderr:
Expand All @@ -473,6 +480,9 @@ async def _handle_stderr(self) -> None:
pass # Stream closed, exit normally
except Exception:
pass # Ignore other errors during stderr reading
finally:
if self._stderr_complete is not None:
self._stderr_complete.set()

async def close(self) -> None:
"""Close the transport and clean up resources."""
Expand Down Expand Up @@ -526,6 +536,8 @@ async def close(self) -> None:
self._stdout_stream = None
self._stdin_stream = None
self._stderr_stream = None
self._stderr_tail = []
self._stderr_complete = None
self._exit_error = None

async def write(self, data: str) -> None:
Expand Down Expand Up @@ -634,12 +646,17 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
except Exception:
returncode = -1

if self._stderr_complete is not None:
with anyio.move_on_after(0.5):
await self._stderr_complete.wait()

# Use exit code for error detection
if returncode is not None and returncode != 0:
stderr_output = "\n".join(self._stderr_tail).strip() or None
self._exit_error = ProcessError(
f"Command failed with exit code {returncode}",
exit_code=returncode,
stderr="Check stderr output for details",
stderr=stderr_output or "Check stderr output for details",
)
raise self._exit_error

Expand Down
5 changes: 3 additions & 2 deletions src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

# Agent definitions
SettingSource = Literal["user", "project", "local"]
EffortLevel = Literal["low", "medium", "high", "xhigh", "max"]


class SystemPromptPreset(TypedDict):
Expand Down Expand Up @@ -95,7 +96,7 @@ class AgentDefinition:
initialPrompt: str | None = None # noqa: N815
maxTurns: int | None = None # noqa: N815
background: bool | None = None
effort: Literal["low", "medium", "high", "max"] | int | None = None
effort: EffortLevel | int | None = None
permissionMode: PermissionMode | None = None # noqa: N815


Expand Down Expand Up @@ -1234,7 +1235,7 @@ class ClaudeAgentOptions:
# Controls extended thinking behavior. Takes precedence over max_thinking_tokens.
thinking: ThinkingConfig | None = None
# Effort level for thinking depth.
effort: Literal["low", "medium", "high", "max"] | None = None
effort: EffortLevel | None = None
# Output format for structured outputs (matches Messages API structure)
# Example: {"type": "json_schema", "schema": {"type": "object", "properties": {...}}}
output_format: dict[str, Any] | None = None
Expand Down
38 changes: 38 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,18 @@ def test_build_command_without_tools(self):
cmd = transport._build_command()
assert "--tools" not in cmd

def test_build_command_with_xhigh_effort(self):
"""Test building CLI command with the xhigh effort option."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(effort="xhigh"),
)

cmd = transport._build_command()
assert "--effort" in cmd
effort_idx = cmd.index("--effort")
assert cmd[effort_idx + 1] == "xhigh"

def test_concurrent_writes_are_serialized(self):
"""Test that concurrent write() calls are serialized by the lock.

Expand Down Expand Up @@ -1655,6 +1667,32 @@ async def _test():

anyio.run(_test)

def test_read_messages_includes_stderr_in_process_error(self):
"""Test non-zero exits include captured stderr output."""

async def _test():
transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
)
transport._process = MagicMock()
transport._process.wait = AsyncMock(return_value=1)
transport._stdout_stream = MagicMock()
transport._stdout_stream.__aiter__.return_value = iter(())
transport._stderr_tail = ["invalid value for --effort: xhigh"]

from claude_agent_sdk._errors import ProcessError

with pytest.raises(ProcessError) as exc_info:
async for _ in transport.read_messages():
pass

assert exc_info.value.exit_code == 1
assert exc_info.value.stderr == "invalid value for --effort: xhigh"
assert "invalid value for --effort: xhigh" in str(exc_info.value)

anyio.run(_test)

def test_build_command_agents_always_via_initialize(self):
"""Test that --agents is NEVER passed via CLI.

Expand Down
17 changes: 17 additions & 0 deletions tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ def test_claude_code_options_with_model_specification(self):
assert options.model == "claude-sonnet-4-5"
assert options.permission_prompt_tool_name == "CustomTool"

def test_claude_code_options_effort_accepts_xhigh(self):
"""Test Options accepts the xhigh effort level."""
options = ClaudeAgentOptions(effort="xhigh")
assert options.effort == "xhigh"


class TestHookInputTypes:
"""Test hook input type definitions."""
Expand Down Expand Up @@ -583,6 +588,18 @@ def test_effort_accepts_named_level(self):

assert payload["effort"] == "high"

def test_effort_accepts_xhigh_level(self):
from claude_agent_sdk import AgentDefinition

agent = AgentDefinition(
description="test",
prompt="p",
effort="xhigh",
)
payload = self._serialize(agent)

assert payload["effort"] == "xhigh"

def test_effort_accepts_integer(self):
from claude_agent_sdk import AgentDefinition

Expand Down