Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Subprocess transport implementation using Claude Code CLI."""

import asyncio
import json
import logging
import os
Expand Down Expand Up @@ -50,7 +51,7 @@ def __init__(
self._stdout_stream: TextReceiveStream | None = None
self._stdin_stream: TextSendStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stderr_task_group: anyio.abc.TaskGroup | None = None
self._stderr_task: asyncio.Task[None] | None = None
self._ready = False
self._exit_error: Exception | None = None # Track process exit errors
self._max_buffer_size = (
Expand Down Expand Up @@ -393,10 +394,11 @@ async def connect(self) -> None:
# Setup stderr stream if piped
if should_pipe_stderr and self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
# Start async task to read stderr
self._stderr_task_group = anyio.create_task_group()
await self._stderr_task_group.__aenter__()
self._stderr_task_group.start_soon(self._handle_stderr)
# Use an asyncio task instead of a manually-entered anyio
# task group so close() can safely run from a different task.
self._stderr_task = asyncio.create_task(
self._handle_stderr(), name="claude-sdk-stderr-reader"
)

# Setup stdin for streaming (always used now)
if self._process.stdin:
Expand Down Expand Up @@ -454,12 +456,13 @@ async def close(self) -> None:
self._ready = False
return

# Close stderr task group if active
if self._stderr_task_group:
with suppress(Exception):
self._stderr_task_group.cancel_scope.cancel()
await self._stderr_task_group.__aexit__(None, None, None)
self._stderr_task_group = None
# Close stderr reader task if active. This may run from a different
# task context than connect(), so avoid anyio task groups here.
if self._stderr_task is not None:
self._stderr_task.cancel()
with suppress(asyncio.CancelledError, Exception):
await self._stderr_task
self._stderr_task = None

# Close stdin stream (acquire lock to prevent race with concurrent writes)
async with self._write_lock:
Expand Down
52 changes: 52 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for Claude SDK transport layer."""

import asyncio
import os
import uuid
from contextlib import nullcontext
Expand Down Expand Up @@ -376,6 +377,57 @@ async def _test():

anyio.run(_test)

def test_close_from_different_task_context(self):
"""Test close() can safely run from a different task than connect()."""

async def _test():
with patch("anyio.open_process") as mock_exec:
mock_version_process = MagicMock()
mock_version_process.stdout = MagicMock()
mock_version_process.stdout.receive = AsyncMock(
return_value=b"2.0.0 (Claude Code)"
)
mock_version_process.terminate = MagicMock()
mock_version_process.wait = AsyncMock()

mock_process = MagicMock()
mock_process.returncode = None
mock_process.terminate = MagicMock()
mock_process.wait = AsyncMock()
mock_process.stdout = MagicMock()
mock_process.stderr = MagicMock()

mock_stdin = MagicMock()
mock_stdin.aclose = AsyncMock()
mock_process.stdin = mock_stdin

mock_exec.side_effect = [mock_version_process, mock_process]

transport = SubprocessCLITransport(
prompt="test",
options=make_options(),
)

await transport.connect()
assert transport._stderr_task is not None

close_error = None

async def close_in_other_task():
nonlocal close_error
try:
await transport.close()
except Exception as exc: # pragma: no cover - regression guard
close_error = exc

task = asyncio.create_task(close_in_other_task())
await task

assert close_error is None
assert transport._stderr_task is None

anyio.run(_test)

def test_read_messages(self):
"""Test reading messages from CLI output."""
# This test is simplified to just test the transport creation
Expand Down