Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Subprocess transport implementation using Claude Code CLI."""

import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(
self._stdout_stream: TextReceiveStream | None = None
self._stdin_stream: TextSendStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stderr_task_group: anyio.abc.TaskGroup | None = None
self._stderr_task: asyncio.Task[None] | None = None
self._ready = False
self._exit_error: Exception | None = None # Track process exit errors
self._max_buffer_size = (
Expand Down Expand Up @@ -393,10 +394,11 @@ async def connect(self) -> None:
# Setup stderr stream if piped
if should_pipe_stderr and self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
# Start async task to read stderr
self._stderr_task_group = anyio.create_task_group()
await self._stderr_task_group.__aenter__()
self._stderr_task_group.start_soon(self._handle_stderr)
# Use an asyncio task instead of a manually-entered anyio
# task group so close() can safely run from a different task.
self._stderr_task = asyncio.create_task(
self._handle_stderr(), name="claude-sdk-stderr-reader"
)

# Setup stdin for streaming (always used now)
if self._process.stdin:
Expand Down Expand Up @@ -454,12 +456,13 @@ async def close(self) -> None:
self._ready = False
return

# Close stderr task group if active
if self._stderr_task_group:
with suppress(Exception):
self._stderr_task_group.cancel_scope.cancel()
await self._stderr_task_group.__aexit__(None, None, None)
self._stderr_task_group = None
# Close stderr reader task if active. This may run from a different
# task context than connect(), so avoid anyio task groups here.
if self._stderr_task is not None:
self._stderr_task.cancel()
with suppress(asyncio.CancelledError, Exception):
await self._stderr_task
self._stderr_task = None

# Close stdin stream (acquire lock to prevent race with concurrent writes)
async with self._write_lock:
Expand Down
4 changes: 3 additions & 1 deletion src/claude_agent_sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
]

# SDK Beta features - see https://docs.anthropic.com/en/api/beta-headers
SdkBeta = Literal["context-1m-2025-08-07"]
# Keep known values discoverable while allowing callers to opt into newly
# released beta headers without waiting for an SDK release.
SdkBeta = Literal["context-1m-2025-08-07"] | str

# Agent definitions
SettingSource = Literal["user", "project", "local"]
Expand Down
71 changes: 71 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for Claude SDK transport layer."""

import asyncio
import os
import uuid
from contextlib import nullcontext
Expand Down Expand Up @@ -174,6 +175,25 @@ def test_build_command_with_options(self):
assert "--max-turns" in cmd
assert "5" in cmd

def test_build_command_with_custom_betas(self):
"""Test that arbitrary beta header values flow through to the CLI."""
transport = SubprocessCLITransport(
prompt="test",
options=make_options(
betas=[
"context-1m-2025-08-07",
"token-efficient-tools-2025-02-19",
]
),
)

cmd = transport._build_command()
idx = cmd.index("--betas")
assert cmd[idx : idx + 2] == [
"--betas",
"context-1m-2025-08-07,token-efficient-tools-2025-02-19",
]

def test_build_command_with_dont_ask_permission_mode(self):
"""Test building CLI command with dontAsk permission mode."""
transport = SubprocessCLITransport(
Expand Down Expand Up @@ -376,6 +396,57 @@ async def _test():

anyio.run(_test)

def test_close_from_different_task_context(self):
"""Test close() can safely run from a different task than connect()."""

async def _test():
with patch("anyio.open_process") as mock_exec:
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()

mock_process = MagicMock()
mock_process.returncode = None
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock()
mock_process.stdout = MagicMock()
mock_process.stderr = MagicMock()

mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock()
mock_process.stdin = mock_stdin

mock_exec.side_effect = [mock_version_process, mock_process]

transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
)

await transport.connect()
assert transport._stderr_task is not None

close_error = None

async def close_in_other_task():
nonlocal close_error
try:
await transport.close()
except Exception as exc: # pragma: no cover - regression guard
close_error = exc

task = asyncio.create_task(close_in_other_task())
await task

assert close_error is None
assert transport._stderr_task is None

anyio.run(_test)

def test_read_messages(self):
"""Test reading messages from CLI output."""
# This test is simplified to just test the transport creation
Expand Down