diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 4b47f115..f889ab0b 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -1,5 +1,6 @@ """Subprocess transport implementation using Claude Code CLI.""" +import asyncio import json import logging import os @@ -50,7 +51,7 @@ def __init__( self._stdout_stream: TextReceiveStream | None = None self._stdin_stream: TextSendStream | None = None self._stderr_stream: TextReceiveStream | None = None - self._stderr_task_group: anyio.abc.TaskGroup | None = None + self._stderr_task: asyncio.Task[None] | None = None self._ready = False self._exit_error: Exception | None = None # Track process exit errors self._max_buffer_size = ( @@ -393,10 +394,11 @@ async def connect(self) -> None: # Setup stderr stream if piped if should_pipe_stderr and self._process.stderr: self._stderr_stream = TextReceiveStream(self._process.stderr) - # Start async task to read stderr - self._stderr_task_group = anyio.create_task_group() - await self._stderr_task_group.__aenter__() - self._stderr_task_group.start_soon(self._handle_stderr) + # Use an asyncio task instead of a manually-entered anyio + # task group so close() can safely run from a different task. + self._stderr_task = asyncio.create_task( + self._handle_stderr(), name="claude-sdk-stderr-reader" + ) # Setup stdin for streaming (always used now) if self._process.stdin: @@ -454,12 +456,13 @@ async def close(self) -> None: self._ready = False return - # Close stderr task group if active - if self._stderr_task_group: - with suppress(Exception): - self._stderr_task_group.cancel_scope.cancel() - await self._stderr_task_group.__aexit__(None, None, None) - self._stderr_task_group = None + # Close stderr reader task if active. This may run from a different + # task context than connect(), so avoid anyio task groups here. + if self._stderr_task is not None: + self._stderr_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await self._stderr_task + self._stderr_task = None # Close stdin stream (acquire lock to prevent race with concurrent writes) async with self._write_lock: diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index a82a8b9b..36222e2a 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -26,7 +26,9 @@ ] # SDK Beta features - see https://docs.anthropic.com/en/api/beta-headers -SdkBeta = Literal["context-1m-2025-08-07"] +# Keep known values discoverable while allowing callers to opt into newly +# released beta headers without waiting for an SDK release. +SdkBeta = Literal["context-1m-2025-08-07"] | str # Agent definitions SettingSource = Literal["user", "project", "local"] diff --git a/tests/test_transport.py b/tests/test_transport.py index b2c40923..bb519d36 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1,5 +1,6 @@ """Tests for Claude SDK transport layer.""" +import asyncio import os import uuid from contextlib import nullcontext @@ -174,6 +175,25 @@ def test_build_command_with_options(self): assert "--max-turns" in cmd assert "5" in cmd + def test_build_command_with_custom_betas(self): + """Test that arbitrary beta header values flow through to the CLI.""" + transport = SubprocessCLITransport( + prompt="test", + options=make_options( + betas=[ + "context-1m-2025-08-07", + "token-efficient-tools-2025-02-19", + ] + ), + ) + + cmd = transport._build_command() + idx = cmd.index("--betas") + assert cmd[idx : idx + 2] == [ + "--betas", + "context-1m-2025-08-07,token-efficient-tools-2025-02-19", + ] + def test_build_command_with_dont_ask_permission_mode(self): """Test building CLI command with dontAsk permission mode.""" transport = SubprocessCLITransport( @@ -376,6 +396,57 @@ async def _test(): anyio.run(_test) + def test_close_from_different_task_context(self): + """Test close() can safely run from a different task than connect().""" + + async def _test(): + with patch("anyio.open_process") as mock_exec: + mock_version_process = MagicMock() + mock_version_process.stdout = MagicMock() + mock_version_process.stdout.receive = AsyncMock( + return_value=b"2.0.0 (Claude Code)" + ) + mock_version_process.terminate = MagicMock() + mock_version_process.wait = AsyncMock() + + mock_process = MagicMock() + mock_process.returncode = None + mock_process.terminate = MagicMock() + mock_process.wait = AsyncMock() + mock_process.stdout = MagicMock() + mock_process.stderr = MagicMock() + + mock_stdin = MagicMock() + mock_stdin.aclose = AsyncMock() + mock_process.stdin = mock_stdin + + mock_exec.side_effect = [mock_version_process, mock_process] + + transport = SubprocessCLITransport( + prompt="test", + options=make_options(), + ) + + await transport.connect() + assert transport._stderr_task is not None + + close_error = None + + async def close_in_other_task(): + nonlocal close_error + try: + await transport.close() + except Exception as exc: # pragma: no cover - regression guard + close_error = exc + + task = asyncio.create_task(close_in_other_task()) + await task + + assert close_error is None + assert transport._stderr_task is None + + anyio.run(_test) + def test_read_messages(self): """Test reading messages from CLI output.""" # This test is simplified to just test the transport creation