From 3651e038ab476793b10a0ed4f517c10bcd151985 Mon Sep 17 00:00:00 2001 From: Junhyuk Lee Date: Thu, 16 Apr 2026 15:01:08 -0500 Subject: [PATCH 1/5] fix: add 429 rate limit retry with exponential backoff (#812) --- src/claude_agent_sdk/__init__.py | 2 + src/claude_agent_sdk/_errors.py | 14 + src/claude_agent_sdk/_internal/client.py | 255 +++++++++++------- .../_internal/transport/subprocess_cli.py | 34 ++- src/claude_agent_sdk/types.py | 4 + tests/test_rate_limit_retry.py | 135 ++++++++++ 6 files changed, 335 insertions(+), 109 deletions(-) create mode 100644 tests/test_rate_limit_retry.py diff --git a/src/claude_agent_sdk/__init__.py b/src/claude_agent_sdk/__init__.py index 6403415b..6b255b51 100644 --- a/src/claude_agent_sdk/__init__.py +++ b/src/claude_agent_sdk/__init__.py @@ -24,6 +24,7 @@ CLIJSONDecodeError, CLINotFoundError, ProcessError, + RateLimitError, ) from ._internal.session_mutations import ( ForkSessionResult, @@ -604,4 +605,5 @@ async def call_tool(name: str, arguments: dict[str, Any]) -> Any: "CLINotFoundError", "ProcessError", "CLIJSONDecodeError", + "RateLimitError", ] diff --git a/src/claude_agent_sdk/_errors.py b/src/claude_agent_sdk/_errors.py index c86bf235..8d086690 100644 --- a/src/claude_agent_sdk/_errors.py +++ b/src/claude_agent_sdk/_errors.py @@ -54,3 +54,17 @@ class MessageParseError(ClaudeSDKError): def __init__(self, message: str, data: dict[str, Any] | None = None): self.data = data super().__init__(message) + + +class RateLimitError(ClaudeSDKError): + """Raised when the API returns a 429 rate limit error.""" + + def __init__( + self, + message: str, + retry_after: float | None = None, + original_error: Exception | None = None, + ): + self.retry_after = retry_after + self.original_error = original_error + super().__init__(message) diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 76323323..73badb9d 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -1,11 +1,16 @@ """Internal client implementation.""" +import asyncio import json +import logging import os +import random +import re from collections.abc import AsyncIterable, AsyncIterator from dataclasses import asdict, replace from typing import Any +from .._errors import ProcessError, RateLimitError from ..types import ( ClaudeAgentOptions, HookEvent, @@ -17,6 +22,36 @@ from .transport import Transport from .transport.subprocess_cli import SubprocessCLITransport +logger = logging.getLogger(__name__) + + +def _is_rate_limit_error(error: Exception) -> tuple[bool, float | None]: + """Detect if an error is a 429 rate limit error.""" + error_str = str(error) + + if "rate_limit_error" in error_str or "429" in error_str: + retry_after: float | None = None + match = re.search(r'"retryAfter"\s*:\s*(\d+(?:\.\d+)?)', error_str) + if match: + retry_after = float(match.group(1)) + else: + match = re.search( + r'Retry-After["\s:]+(\d+(?:\.\d+)?)', error_str, re.IGNORECASE + ) + if match: + retry_after = float(match.group(1)) + return True, retry_after + + if hasattr(error, "stderr") and error.stderr: + stderr_str = str(error.stderr) + if "rate_limit_error" in stderr_str or "429" in stderr_str: + match = re.search(r'"retryAfter"\s*:\s*(\d+(?:\.\d+)?)', stderr_str) + if match: + return True, float(match.group(1)) + return True, None + + return False, None + class InternalClient: """Internal client implementation.""" @@ -32,7 +67,6 @@ def _convert_hooks_to_internal_format( for event, matchers in hooks.items(): internal_hooks[event] = [] for matcher in matchers: - # Convert HookMatcher to internal dict format internal_matcher: dict[str, Any] = { "matcher": matcher.matcher if hasattr(matcher, "matcher") else None, "hooks": matcher.hooks if hasattr(matcher, "hooks") else [], @@ -48,116 +82,141 @@ async def process_query( options: ClaudeAgentOptions, transport: Transport | None = None, ) -> AsyncIterator[Message]: - """Process a query through transport and Query.""" - - # Validate and configure permission settings (matching TypeScript SDK logic) + """Process a query through transport and Query with automatic 429 retry.""" configured_options = options if options.can_use_tool: - # canUseTool callback requires streaming mode (AsyncIterable prompt) if isinstance(prompt, str): raise ValueError( "can_use_tool callback requires streaming mode. " "Please provide prompt as an AsyncIterable instead of a string." ) - # canUseTool and permission_prompt_tool_name are mutually exclusive if options.permission_prompt_tool_name: raise ValueError( - "can_use_tool callback cannot be used with permission_prompt_tool_name. " + "can_use_tool cannot be used with permission_prompt_tool_name. " "Please use one or the other." ) - # Automatically set permission_prompt_tool_name to "stdio" for control protocol configured_options = replace(options, permission_prompt_tool_name="stdio") - # Use provided transport or create subprocess transport - if transport is not None: - chosen_transport = transport - else: - chosen_transport = SubprocessCLITransport( - prompt=prompt, - options=configured_options, - ) + max_retries = configured_options.rate_limit_max_retries + attempt = 0 + + while True: + is_retry = attempt > 0 + chosen_transport: Transport + query: Query | None = None + + try: + if transport is not None and not is_retry: + chosen_transport = transport + else: + if transport is not None and is_retry: + await transport.close() + chosen_transport = SubprocessCLITransport( + prompt=prompt, + options=configured_options, + ) + await chosen_transport.connect() + + sdk_mcp_servers = {} + if configured_options.mcp_servers and isinstance( + configured_options.mcp_servers, dict + ): + for name, config in configured_options.mcp_servers.items(): + if isinstance(config, dict) and config.get("type") == "sdk": + sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] + + exclude_dynamic_sections: bool | None = None + sp = configured_options.system_prompt + if isinstance(sp, dict) and sp.get("type") == "preset": + eds = sp.get("exclude_dynamic_sections") + if isinstance(eds, bool): + exclude_dynamic_sections = eds + + agents_dict = None + if configured_options.agents: + agents_dict = { + name: { + k: v for k, v in asdict(agent_def).items() if v is not None + } + for name, agent_def in configured_options.agents.items() + } + + initialize_timeout_ms = int( + os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") + ) + initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) + + query = Query( + transport=chosen_transport, + is_streaming_mode=True, + can_use_tool=configured_options.can_use_tool, + hooks=self._convert_hooks_to_internal_format( + configured_options.hooks + ) + if configured_options.hooks + else None, + sdk_mcp_servers=sdk_mcp_servers, + initialize_timeout=initialize_timeout, + agents=agents_dict, + exclude_dynamic_sections=exclude_dynamic_sections, + ) - # Connect transport - await chosen_transport.connect() - - # Extract SDK MCP servers from configured options - sdk_mcp_servers = {} - if configured_options.mcp_servers and isinstance( - configured_options.mcp_servers, dict - ): - for name, config in configured_options.mcp_servers.items(): - if isinstance(config, dict) and config.get("type") == "sdk": - sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] - - # Extract exclude_dynamic_sections from preset system prompt for the - # initialize request (older CLIs ignore unknown initialize fields). - exclude_dynamic_sections: bool | None = None - sp = configured_options.system_prompt - if isinstance(sp, dict) and sp.get("type") == "preset": - eds = sp.get("exclude_dynamic_sections") - if isinstance(eds, bool): - exclude_dynamic_sections = eds - - # Convert agents to dict format for initialize request - agents_dict = None - if configured_options.agents: - agents_dict = { - name: {k: v for k, v in asdict(agent_def).items() if v is not None} - for name, agent_def in configured_options.agents.items() - } - - # Match ClaudeSDKClient.connect() — without this, query() ignores the env var - initialize_timeout_ms = int( - os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") - ) - initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) - - # Create Query to handle control protocol - # Always use streaming mode internally (matching TypeScript SDK) - # This ensures agents are always sent via initialize request - query = Query( - transport=chosen_transport, - is_streaming_mode=True, # Always streaming internally - can_use_tool=configured_options.can_use_tool, - hooks=self._convert_hooks_to_internal_format(configured_options.hooks) - if configured_options.hooks - else None, - sdk_mcp_servers=sdk_mcp_servers, - initialize_timeout=initialize_timeout, - agents=agents_dict, - exclude_dynamic_sections=exclude_dynamic_sections, - ) - - try: - # Start reading messages - await query.start() - - # Always initialize to send agents via stdin (matching TypeScript SDK) - await query.initialize() - - # Handle prompt input - if isinstance(prompt, str): - # For string prompts, write user message to stdin after initialize - # (matching TypeScript SDK behavior) - user_message = { - "type": "user", - "session_id": "", - "message": {"role": "user", "content": prompt}, - "parent_tool_use_id": None, - } - await chosen_transport.write(json.dumps(user_message) + "\n") - query.spawn_task(query.wait_for_result_and_end_input()) - elif isinstance(prompt, AsyncIterable): - # Stream input in background for async iterables - query.spawn_task(query.stream_input(prompt)) - - # Yield parsed messages, skipping unknown message types - async for data in query.receive_messages(): - message = parse_message(data) - if message is not None: - yield message - - finally: - await query.close() + await query.start() + await query.initialize() + + if isinstance(prompt, str): + user_message = { + "type": "user", + "session_id": "", + "message": {"role": "user", "content": prompt}, + "parent_tool_use_id": None, + } + await chosen_transport.write(json.dumps(user_message) + "\n") + query.spawn_task(query.wait_for_result_and_end_input()) + elif isinstance(prompt, AsyncIterable): + query.spawn_task(query.stream_input(prompt)) + + async for data in query.receive_messages(): + message = parse_message(data) + if message is not None: + yield message + + return + + except ProcessError as e: + is_rl, retry_after = _is_rate_limit_error(e) + + if is_rl and attempt < max_retries: + attempt += 1 + if retry_after is None: + base_delay = min(2.0 * (2 ** (attempt - 1)), 60.0) + delay = base_delay + random.uniform(0, 1) + else: + delay = retry_after + + logger.warning( + "Rate limit hit (attempt %d/%d). Retrying in %.1fs.", + attempt, + max_retries, + delay, + ) + + if query is not None: + await query.close() + elif chosen_transport is not None: + await chosen_transport.close() + + await asyncio.sleep(delay) + continue + + raise RateLimitError( + str(e), + retry_after=retry_after, + original_error=e, + ) from e + + finally: + if query is not None: + await query.close() diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index a764f625..98a2ce29 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -4,10 +4,13 @@ import logging import os import platform +import random import re import shutil +import time from collections.abc import AsyncIterable, AsyncIterator from contextlib import suppress +from email.utils import parsedate_to_datetime from pathlib import Path from subprocess import PIPE from typing import Any, cast @@ -17,7 +20,12 @@ from anyio.abc import Process from anyio.streams.text import TextReceiveStream, TextSendStream -from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError +from ..._errors import ( + CLIConnectionError, + CLINotFoundError, + ProcessError, + RateLimitError, +) from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ..._version import __version__ from ...types import ClaudeAgentOptions, SystemPromptFile, SystemPromptPreset @@ -27,6 +35,8 @@ _DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit MINIMUM_CLAUDE_CODE_VERSION = "2.0.0" +_DEFAULT_RATE_LIMIT_MAX_RETRIES = 3 +_RETRY_AFTER_MAX = 60.0 class SubprocessCLITransport(Transport): @@ -59,6 +69,12 @@ def __init__( else _DEFAULT_MAX_BUFFER_SIZE ) self._write_lock: anyio.Lock = anyio.Lock() + self._rate_limit_max_retries = ( + options.rate_limit_max_retries or _DEFAULT_RATE_LIMIT_MAX_RETRIES + ) + self._session_id: str | None = None + self._retry_count = 0 + self._stderr_lines: list[str] = [] def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -394,14 +410,12 @@ async def connect(self) -> None: if self._cwd: process_env["PWD"] = self._cwd - # Pipe stderr if we have a callback OR debug mode is enabled should_pipe_stderr = ( self._options.stderr is not None or "debug-to-stderr" in self._options.extra_args ) - # For backward compat: use debug_stderr file object if no callback and debug is on - stderr_dest = PIPE if should_pipe_stderr else None + stderr_dest = PIPE self._process = await anyio.open_process( cmd, @@ -416,10 +430,8 @@ async def connect(self) -> None: if self._process.stdout: self._stdout_stream = TextReceiveStream(self._process.stdout) - # Setup stderr stream if piped - if should_pipe_stderr and self._process.stderr: + if self._process.stderr: self._stderr_stream = TextReceiveStream(self._process.stderr) - # Start async task to read stderr self._stderr_task_group = anyio.create_task_group() await self._stderr_task_group.__aenter__() self._stderr_task_group.start_soon(self._handle_stderr) @@ -457,11 +469,11 @@ async def _handle_stderr(self) -> None: if not line_str: continue - # Call the stderr callback if provided + self._stderr_lines.append(line_str) + if self._options.stderr: self._options.stderr(line_str) - # For backward compatibility: write to debug_stderr if in debug mode elif ( "debug-to-stderr" in self._options.extra_args and self._options.debug_stderr @@ -470,9 +482,9 @@ async def _handle_stderr(self) -> None: if hasattr(self._options.debug_stderr, "flush"): self._options.debug_stderr.flush() except anyio.ClosedResourceError: - pass # Stream closed, exit normally + pass except Exception: - pass # Ignore other errors during stderr reading + pass async def close(self) -> None: """Close the transport and clean up resources.""" diff --git a/src/claude_agent_sdk/types.py b/src/claude_agent_sdk/types.py index a82a8b9b..f0b5a4d1 100644 --- a/src/claude_agent_sdk/types.py +++ b/src/claude_agent_sdk/types.py @@ -1246,6 +1246,10 @@ class ClaudeAgentOptions: # its remaining token budget so it can pace tool use and wrap up before # the limit. task_budget: TaskBudget | None = None + # Maximum number of automatic retries on 429 rate limit errors (default: 3). + # Set to 0 to disable automatic retries. Each retry waits with exponential + # backoff. When a Retry-After header is present, that value is used instead. + rate_limit_max_retries: int = 3 # SDK Control Protocol diff --git a/tests/test_rate_limit_retry.py b/tests/test_rate_limit_retry.py new file mode 100644 index 00000000..a31b86ca --- /dev/null +++ b/tests/test_rate_limit_retry.py @@ -0,0 +1,135 @@ +"""Tests for 429 rate limit retry with exponential backoff.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from claude_agent_sdk import ClaudeAgentOptions +from claude_agent_sdk._errors import ProcessError, RateLimitError + + +class TestIsRateLimitError: + """Test _is_rate_limit_error helper function.""" + + def test_detects_rate_limit_in_error_message(self): + """Error message containing rate_limit_error is detected.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = Exception( + 'API Error: 429 {"type":"error","error":{"type":"rate_limit_error","message":"Rate limit exceeded"}}' + ) + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is True + assert retry_after is None + + def test_detects_429_in_error_message(self): + """Error message containing 429 is detected.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = Exception("Command failed with exit code 1: 429 rate limit") + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is True + assert retry_after is None + + def test_parses_retry_after_from_error_message(self): + """retryAfter field is extracted from error message.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = Exception('{"error":{"type":"rate_limit_error","retryAfter":45}}') + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is True + assert retry_after == 45.0 + + def test_parses_retry_after_float(self): + """retryAfter field handles float values.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = Exception('{"error":{"type":"rate_limit_error","retryAfter":12.5}}') + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is True + assert retry_after == 12.5 + + def test_detects_rate_limit_from_stderr_attribute(self): + """Error with stderr attribute containing rate_limit_error is detected.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + class ErrorWithStderr(Exception): + stderr = ( + '{"type":"error","error":{"type":"rate_limit_error","retryAfter":30}}' + ) + + error = ErrorWithStderr("Process failed") + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is True + assert retry_after == 30.0 + + def test_non_rate_limit_error_returns_false(self): + """Generic errors without rate limit indicators return False.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = Exception("Something went wrong") + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is False + assert retry_after is None + + def test_process_error_without_rate_limit_returns_false(self): + """ProcessError without rate limit indicators returns False.""" + from claude_agent_sdk._internal.client import _is_rate_limit_error + + error = ProcessError("Process exited with code 1", exit_code=1) + is_rl, retry_after = _is_rate_limit_error(error) + assert is_rl is False + assert retry_after is None + + +class TestRateLimitRetryOptions: + """Test rate_limit_max_retries option.""" + + def test_rate_limit_max_retries_option(self): + """ClaudeAgentOptions accepts rate_limit_max_retries.""" + opts = ClaudeAgentOptions(rate_limit_max_retries=5) + assert opts.rate_limit_max_retries == 5 + + def test_rate_limit_max_retries_default(self): + """rate_limit_max_retries defaults to 3.""" + opts = ClaudeAgentOptions() + assert opts.rate_limit_max_retries == 3 + + def test_rate_limit_max_retries_zero_disables_retry(self): + """rate_limit_max_retries=0 disables automatic retry.""" + opts = ClaudeAgentOptions(rate_limit_max_retries=0) + assert opts.rate_limit_max_retries == 0 + + +class TestRateLimitError: + """Test RateLimitError exception class.""" + + def test_rate_limit_error_attributes(self): + """RateLimitError stores retry_after and original_error.""" + original = ProcessError("429", exit_code=1) + error = RateLimitError( + "Rate limit exceeded", retry_after=30.0, original_error=original + ) + + assert error.retry_after == 30.0 + assert error.original_error is original + assert "Rate limit exceeded" in str(error) + + def test_rate_limit_error_inherits_from_claude_sdk_error(self): + """RateLimitError is a subclass of ClaudeSDKError.""" + from claude_agent_sdk._errors import ClaudeSDKError + + error = RateLimitError("test") + assert isinstance(error, ClaudeSDKError) + + def test_rate_limit_error_without_retry_after(self): + """RateLimitError works without retry_after.""" + error = RateLimitError("Rate limit exceeded") + assert error.retry_after is None + assert error.original_error is None + + def test_rate_limit_error_repr(self): + """RateLimitError message includes original error info.""" + original = ProcessError("429", exit_code=1) + error = RateLimitError("Rate limit exceeded", original_error=original) + assert "Rate limit exceeded" in str(error) From 4317f433ff0b64c300257e523c7d49637cff5133 Mon Sep 17 00:00:00 2001 From: Junhyuk Lee Date: Thu, 16 Apr 2026 21:55:07 -0500 Subject: [PATCH 2/5] refactor: remove unnecessary comments from transport layer Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent) Co-authored-by: Sisyphus --- .../_internal/transport/subprocess_cli.py | 170 +----------------- 1 file changed, 2 insertions(+), 168 deletions(-) diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 98a2ce29..41f32fac 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -4,13 +4,10 @@ import logging import os import platform -import random import re import shutil -import time from collections.abc import AsyncIterable, AsyncIterator from contextlib import suppress -from email.utils import parsedate_to_datetime from pathlib import Path from subprocess import PIPE from typing import Any, cast @@ -24,7 +21,6 @@ CLIConnectionError, CLINotFoundError, ProcessError, - RateLimitError, ) from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ..._version import __version__ @@ -35,8 +31,6 @@ _DEFAULT_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit MINIMUM_CLAUDE_CODE_VERSION = "2.0.0" -_DEFAULT_RATE_LIMIT_MAX_RETRIES = 3 -_RETRY_AFTER_MAX = 60.0 class SubprocessCLITransport(Transport): @@ -69,12 +63,6 @@ def __init__( else _DEFAULT_MAX_BUFFER_SIZE ) self._write_lock: anyio.Lock = anyio.Lock() - self._rate_limit_max_retries = ( - options.rate_limit_max_retries or _DEFAULT_RATE_LIMIT_MAX_RETRIES - ) - self._session_id: str | None = None - self._retry_count = 0 - self._stderr_lines: list[str] = [] def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -415,7 +403,7 @@ async def connect(self) -> None: or "debug-to-stderr" in self._options.extra_args ) - stderr_dest = PIPE + stderr_dest = PIPE if should_pipe_stderr else None self._process = await anyio.open_process( cmd, @@ -430,13 +418,12 @@ async def connect(self) -> None: if self._process.stdout: self._stdout_stream = TextReceiveStream(self._process.stdout) - if self._process.stderr: + if should_pipe_stderr and self._process.stderr: self._stderr_stream = TextReceiveStream(self._process.stderr) self._stderr_task_group = anyio.create_task_group() await self._stderr_task_group.__aenter__() self._stderr_task_group.start_soon(self._handle_stderr) - # Setup stdin for streaming (always used now) if self._process.stdin: self._stdin_stream = TextSendStream(self._process.stdin) @@ -469,8 +456,6 @@ async def _handle_stderr(self) -> None: if not line_str: continue - self._stderr_lines.append(line_str) - if self._options.stderr: self._options.stderr(line_str) @@ -485,157 +470,6 @@ async def _handle_stderr(self) -> None: pass except Exception: pass - - async def close(self) -> None: - """Close the transport and clean up resources.""" - if not self._process: - self._ready = False - return - - # Close stderr task group if active - if self._stderr_task_group: - with suppress(Exception): - self._stderr_task_group.cancel_scope.cancel() - await self._stderr_task_group.__aexit__(None, None, None) - self._stderr_task_group = None - - # Close stdin stream (acquire lock to prevent race with concurrent writes) - async with self._write_lock: - self._ready = False # Set inside lock to prevent TOCTOU with write() - if self._stdin_stream: - with suppress(Exception): - await self._stdin_stream.aclose() - self._stdin_stream = None - - if self._stderr_stream: - with suppress(Exception): - await self._stderr_stream.aclose() - self._stderr_stream = None - - # Wait for graceful shutdown after stdin EOF, then terminate if needed. - # The subprocess needs time to flush its session file after receiving - # EOF on stdin. Without this grace period, SIGTERM can interrupt the - # write and cause the last assistant message to be lost (see #625). - if self._process.returncode is None: - try: - with anyio.fail_after(5): - await self._process.wait() - except TimeoutError: - # Graceful shutdown timed out — force terminate - with suppress(ProcessLookupError): - self._process.terminate() - try: - with anyio.fail_after(5): - await self._process.wait() - except TimeoutError: - # SIGTERM handler blocked — force kill (SIGKILL) - with suppress(ProcessLookupError): - self._process.kill() - with suppress(Exception): - await self._process.wait() - - self._process = None - self._stdout_stream = None - self._stdin_stream = None - self._stderr_stream = None - self._exit_error = None - - async def write(self, data: str) -> None: - """Write raw data to the transport.""" - async with self._write_lock: - # All checks inside lock to prevent TOCTOU races with close()/end_input() - if not self._ready or not self._stdin_stream: - raise CLIConnectionError("ProcessTransport is not ready for writing") - - if self._process and self._process.returncode is not None: - raise CLIConnectionError( - f"Cannot write to terminated process (exit code: {self._process.returncode})" - ) - - if self._exit_error: - raise CLIConnectionError( - f"Cannot write to process that exited with error: {self._exit_error}" - ) from self._exit_error - - try: - await self._stdin_stream.send(data) - except Exception as e: - self._ready = False - self._exit_error = CLIConnectionError( - f"Failed to write to process stdin: {e}" - ) - raise self._exit_error from e - - async def end_input(self) -> None: - """End the input stream (close stdin).""" - async with self._write_lock: - if self._stdin_stream: - with suppress(Exception): - await self._stdin_stream.aclose() - self._stdin_stream = None - - def read_messages(self) -> AsyncIterator[dict[str, Any]]: - """Read and parse messages from the transport.""" - return self._read_messages_impl() - - async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: - """Internal implementation of read_messages.""" - if not self._process or not self._stdout_stream: - raise CLIConnectionError("Not connected") - - json_buffer = "" - - # Process stdout messages - try: - async for line in self._stdout_stream: - line_str = line.strip() - if not line_str: - continue - - # Accumulate partial JSON until we can parse it - # Note: TextReceiveStream can truncate long lines, so we need to buffer - # and speculatively parse until we get a complete JSON object - json_lines = line_str.split("\n") - - for json_line in json_lines: - json_line = json_line.strip() - if not json_line: - continue - - # Skip non-JSON lines (e.g. [SandboxDebug]) when not - # mid-parse — they corrupt the buffer otherwise (#347). - if not json_buffer and not json_line.startswith("{"): - logger.debug( - "Skipping non-JSON line from CLI stdout: %s", - json_line[:200], - ) - continue - - # Keep accumulating partial JSON until we can parse it - json_buffer += json_line - - if len(json_buffer) > self._max_buffer_size: - buffer_length = len(json_buffer) - json_buffer = "" - raise SDKJSONDecodeError( - f"JSON message exceeded maximum buffer size of {self._max_buffer_size} bytes", - ValueError( - f"Buffer size {buffer_length} exceeds limit {self._max_buffer_size}" - ), - ) - - try: - data = json.loads(json_buffer) - json_buffer = "" - yield data - except json.JSONDecodeError: - # We are speculatively decoding the buffer until we get - # a full JSON object. If there is an actual issue, we - # raise an error after exceeding the configured limit. - continue - - except anyio.ClosedResourceError: - pass except GeneratorExit: # Client disconnected pass From 798b0a76c183bc6fa45cda4a00f4c92fb62f37cc Mon Sep 17 00:00:00 2001 From: Junhyuk Lee Date: Thu, 16 Apr 2026 22:12:13 -0500 Subject: [PATCH 3/5] fix: restore accidentally deleted transport methods and comments from 429 retry PR --- src/claude_agent_sdk/_internal/client.py | 25 ++- .../_internal/transport/subprocess_cli.py | 166 +++++++++++++++++- 2 files changed, 184 insertions(+), 7 deletions(-) diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 73badb9d..fe57059a 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -67,6 +67,7 @@ def _convert_hooks_to_internal_format( for event, matchers in hooks.items(): internal_hooks[event] = [] for matcher in matchers: + # Convert HookMatcher to internal dict format internal_matcher: dict[str, Any] = { "matcher": matcher.matcher if hasattr(matcher, "matcher") else None, "hooks": matcher.hooks if hasattr(matcher, "hooks") else [], @@ -83,20 +84,24 @@ async def process_query( transport: Transport | None = None, ) -> AsyncIterator[Message]: """Process a query through transport and Query with automatic 429 retry.""" + # Validate and configure permission settings (matching TypeScript SDK logic) configured_options = options if options.can_use_tool: + # canUseTool callback requires streaming mode (AsyncIterable prompt) if isinstance(prompt, str): raise ValueError( "can_use_tool callback requires streaming mode. " "Please provide prompt as an AsyncIterable instead of a string." ) + # canUseTool and permission_prompt_tool_name are mutually exclusive if options.permission_prompt_tool_name: raise ValueError( "can_use_tool cannot be used with permission_prompt_tool_name. " "Please use one or the other." ) + # Automatically set permission_prompt_tool_name to "stdio" for control protocol configured_options = replace(options, permission_prompt_tool_name="stdio") max_retries = configured_options.rate_limit_max_retries @@ -108,6 +113,7 @@ async def process_query( query: Query | None = None try: + # Use provided transport or create subprocess transport if transport is not None and not is_retry: chosen_transport = transport else: @@ -119,6 +125,7 @@ async def process_query( ) await chosen_transport.connect() + # Extract SDK MCP servers from configured options sdk_mcp_servers = {} if configured_options.mcp_servers and isinstance( configured_options.mcp_servers, dict @@ -127,6 +134,8 @@ async def process_query( if isinstance(config, dict) and config.get("type") == "sdk": sdk_mcp_servers[name] = config["instance"] # type: ignore[typeddict-item] + # Extract exclude_dynamic_sections from preset system prompt for the + # initialize request (older CLIs ignore unknown initialize fields). exclude_dynamic_sections: bool | None = None sp = configured_options.system_prompt if isinstance(sp, dict) and sp.get("type") == "preset": @@ -134,6 +143,7 @@ async def process_query( if isinstance(eds, bool): exclude_dynamic_sections = eds + # Convert agents to dict format for initialize request agents_dict = None if configured_options.agents: agents_dict = { @@ -143,14 +153,18 @@ async def process_query( for name, agent_def in configured_options.agents.items() } + # Match ClaudeSDKClient.connect() — without this, query() ignores the env var initialize_timeout_ms = int( os.environ.get("CLAUDE_CODE_STREAM_CLOSE_TIMEOUT", "60000") ) initialize_timeout = max(initialize_timeout_ms / 1000.0, 60.0) + # Create Query to handle control protocol + # Always use streaming mode internally (matching TypeScript SDK) + # This ensures agents are always sent via initialize request query = Query( transport=chosen_transport, - is_streaming_mode=True, + is_streaming_mode=True, # Always streaming internally can_use_tool=configured_options.can_use_tool, hooks=self._convert_hooks_to_internal_format( configured_options.hooks @@ -163,10 +177,17 @@ async def process_query( exclude_dynamic_sections=exclude_dynamic_sections, ) + # Start reading messages + # Start reading messages await query.start() + + # Always initialize to send agents via stdin (matching TypeScript SDK) await query.initialize() + # Handle prompt input if isinstance(prompt, str): + # For string prompts, write user message to stdin after initialize + # (matching TypeScript SDK behavior) user_message = { "type": "user", "session_id": "", @@ -176,8 +197,10 @@ async def process_query( await chosen_transport.write(json.dumps(user_message) + "\n") query.spawn_task(query.wait_for_result_and_end_input()) elif isinstance(prompt, AsyncIterable): + # Stream input in background for async iterables query.spawn_task(query.stream_input(prompt)) + # Yield parsed messages, skipping unknown message types async for data in query.receive_messages(): message = parse_message(data) if message is not None: diff --git a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py index 41f32fac..a764f625 100644 --- a/src/claude_agent_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_agent_sdk/_internal/transport/subprocess_cli.py @@ -17,11 +17,7 @@ from anyio.abc import Process from anyio.streams.text import TextReceiveStream, TextSendStream -from ..._errors import ( - CLIConnectionError, - CLINotFoundError, - ProcessError, -) +from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError from ..._version import __version__ from ...types import ClaudeAgentOptions, SystemPromptFile, SystemPromptPreset @@ -398,11 +394,13 @@ async def connect(self) -> None: if self._cwd: process_env["PWD"] = self._cwd + # Pipe stderr if we have a callback OR debug mode is enabled should_pipe_stderr = ( self._options.stderr is not None or "debug-to-stderr" in self._options.extra_args ) + # For backward compat: use debug_stderr file object if no callback and debug is on stderr_dest = PIPE if should_pipe_stderr else None self._process = await anyio.open_process( @@ -418,12 +416,15 @@ async def connect(self) -> None: if self._process.stdout: self._stdout_stream = TextReceiveStream(self._process.stdout) + # Setup stderr stream if piped if should_pipe_stderr and self._process.stderr: self._stderr_stream = TextReceiveStream(self._process.stderr) + # Start async task to read stderr self._stderr_task_group = anyio.create_task_group() await self._stderr_task_group.__aenter__() self._stderr_task_group.start_soon(self._handle_stderr) + # Setup stdin for streaming (always used now) if self._process.stdin: self._stdin_stream = TextSendStream(self._process.stdin) @@ -456,9 +457,11 @@ async def _handle_stderr(self) -> None: if not line_str: continue + # Call the stderr callback if provided if self._options.stderr: self._options.stderr(line_str) + # For backward compatibility: write to debug_stderr if in debug mode elif ( "debug-to-stderr" in self._options.extra_args and self._options.debug_stderr @@ -467,8 +470,159 @@ async def _handle_stderr(self) -> None: if hasattr(self._options.debug_stderr, "flush"): self._options.debug_stderr.flush() except anyio.ClosedResourceError: - pass + pass # Stream closed, exit normally except Exception: + pass # Ignore other errors during stderr reading + + async def close(self) -> None: + """Close the transport and clean up resources.""" + if not self._process: + self._ready = False + return + + # Close stderr task group if active + if self._stderr_task_group: + with suppress(Exception): + self._stderr_task_group.cancel_scope.cancel() + await self._stderr_task_group.__aexit__(None, None, None) + self._stderr_task_group = None + + # Close stdin stream (acquire lock to prevent race with concurrent writes) + async with self._write_lock: + self._ready = False # Set inside lock to prevent TOCTOU with write() + if self._stdin_stream: + with suppress(Exception): + await self._stdin_stream.aclose() + self._stdin_stream = None + + if self._stderr_stream: + with suppress(Exception): + await self._stderr_stream.aclose() + self._stderr_stream = None + + # Wait for graceful shutdown after stdin EOF, then terminate if needed. + # The subprocess needs time to flush its session file after receiving + # EOF on stdin. Without this grace period, SIGTERM can interrupt the + # write and cause the last assistant message to be lost (see #625). + if self._process.returncode is None: + try: + with anyio.fail_after(5): + await self._process.wait() + except TimeoutError: + # Graceful shutdown timed out — force terminate + with suppress(ProcessLookupError): + self._process.terminate() + try: + with anyio.fail_after(5): + await self._process.wait() + except TimeoutError: + # SIGTERM handler blocked — force kill (SIGKILL) + with suppress(ProcessLookupError): + self._process.kill() + with suppress(Exception): + await self._process.wait() + + self._process = None + self._stdout_stream = None + self._stdin_stream = None + self._stderr_stream = None + self._exit_error = None + + async def write(self, data: str) -> None: + """Write raw data to the transport.""" + async with self._write_lock: + # All checks inside lock to prevent TOCTOU races with close()/end_input() + if not self._ready or not self._stdin_stream: + raise CLIConnectionError("ProcessTransport is not ready for writing") + + if self._process and self._process.returncode is not None: + raise CLIConnectionError( + f"Cannot write to terminated process (exit code: {self._process.returncode})" + ) + + if self._exit_error: + raise CLIConnectionError( + f"Cannot write to process that exited with error: {self._exit_error}" + ) from self._exit_error + + try: + await self._stdin_stream.send(data) + except Exception as e: + self._ready = False + self._exit_error = CLIConnectionError( + f"Failed to write to process stdin: {e}" + ) + raise self._exit_error from e + + async def end_input(self) -> None: + """End the input stream (close stdin).""" + async with self._write_lock: + if self._stdin_stream: + with suppress(Exception): + await self._stdin_stream.aclose() + self._stdin_stream = None + + def read_messages(self) -> AsyncIterator[dict[str, Any]]: + """Read and parse messages from the transport.""" + return self._read_messages_impl() + + async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]: + """Internal implementation of read_messages.""" + if not self._process or not self._stdout_stream: + raise CLIConnectionError("Not connected") + + json_buffer = "" + + # Process stdout messages + try: + async for line in self._stdout_stream: + line_str = line.strip() + if not line_str: + continue + + # Accumulate partial JSON until we can parse it + # Note: TextReceiveStream can truncate long lines, so we need to buffer + # and speculatively parse until we get a complete JSON object + json_lines = line_str.split("\n") + + for json_line in json_lines: + json_line = json_line.strip() + if not json_line: + continue + + # Skip non-JSON lines (e.g. [SandboxDebug]) when not + # mid-parse — they corrupt the buffer otherwise (#347). + if not json_buffer and not json_line.startswith("{"): + logger.debug( + "Skipping non-JSON line from CLI stdout: %s", + json_line[:200], + ) + continue + + # Keep accumulating partial JSON until we can parse it + json_buffer += json_line + + if len(json_buffer) > self._max_buffer_size: + buffer_length = len(json_buffer) + json_buffer = "" + raise SDKJSONDecodeError( + f"JSON message exceeded maximum buffer size of {self._max_buffer_size} bytes", + ValueError( + f"Buffer size {buffer_length} exceeds limit {self._max_buffer_size}" + ), + ) + + try: + data = json.loads(json_buffer) + json_buffer = "" + yield data + except json.JSONDecodeError: + # We are speculatively decoding the buffer until we get + # a full JSON object. If there is an actual issue, we + # raise an error after exceeding the configured limit. + continue + + except anyio.ClosedResourceError: pass except GeneratorExit: # Client disconnected From 4344681130ad69d5cb62c8c6f43d952b560218c3 Mon Sep 17 00:00:00 2001 From: Junhyuk Lee Date: Thu, 16 Apr 2026 22:18:53 -0500 Subject: [PATCH 4/5] fix: scope 429 retry to only RateLimitError, preserve caller transport, clean imports --- src/claude_agent_sdk/_internal/client.py | 18 +++++++++--------- tests/test_rate_limit_retry.py | 4 ---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index fe57059a..91d296ab 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -97,7 +97,7 @@ async def process_query( # canUseTool and permission_prompt_tool_name are mutually exclusive if options.permission_prompt_tool_name: raise ValueError( - "can_use_tool cannot be used with permission_prompt_tool_name. " + "can_use_tool callback cannot be used with permission_prompt_tool_name. " "Please use one or the other." ) @@ -114,11 +114,9 @@ async def process_query( try: # Use provided transport or create subprocess transport - if transport is not None and not is_retry: + if transport is not None: chosen_transport = transport else: - if transport is not None and is_retry: - await transport.close() chosen_transport = SubprocessCLITransport( prompt=prompt, options=configured_options, @@ -234,11 +232,13 @@ async def process_query( await asyncio.sleep(delay) continue - raise RateLimitError( - str(e), - retry_after=retry_after, - original_error=e, - ) from e + if is_rl: + raise RateLimitError( + str(e), + retry_after=retry_after, + original_error=e, + ) from e + raise finally: if query is not None: diff --git a/tests/test_rate_limit_retry.py b/tests/test_rate_limit_retry.py index a31b86ca..4c9aaf25 100644 --- a/tests/test_rate_limit_retry.py +++ b/tests/test_rate_limit_retry.py @@ -1,9 +1,5 @@ """Tests for 429 rate limit retry with exponential backoff.""" -from unittest.mock import AsyncMock, MagicMock, patch - -import pytest - from claude_agent_sdk import ClaudeAgentOptions from claude_agent_sdk._errors import ProcessError, RateLimitError From 2bee4ae2cb020864291f7fef6ddb7fff62cfcf63 Mon Sep 17 00:00:00 2001 From: Junhyuk Lee Date: Thu, 16 Apr 2026 22:24:34 -0500 Subject: [PATCH 5/5] fix: restore connect() for caller-provided transports in retry loop --- src/claude_agent_sdk/_internal/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/claude_agent_sdk/_internal/client.py b/src/claude_agent_sdk/_internal/client.py index 91d296ab..0c6a40df 100644 --- a/src/claude_agent_sdk/_internal/client.py +++ b/src/claude_agent_sdk/_internal/client.py @@ -121,7 +121,7 @@ async def process_query( prompt=prompt, options=configured_options, ) - await chosen_transport.connect() + await chosen_transport.connect() # Extract SDK MCP servers from configured options sdk_mcp_servers = {}