From 452cc26ccc4f5efcfcaaae96386e40414bfb90a2 Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Mon, 10 Nov 2025 17:19:55 -0600 Subject: [PATCH 1/5] Add retry logic and backoff mechanism for client requests - Implemented exponential backoff with jitter for retryable errors. - Added `max_retries` parameter to clients for configurable retry limits. - Enhanced error handling for transport, timeout, and server-side errors. - Updated unit tests to validate retry behavior, including success after retries and error propagation after retry limits. - Covered edge cases, such as negative retry limits and proper delay calculations. Assisted-by: Codex --- src/pdfrest/client.py | 168 +++++++++++++++++++++++++++++++++++++----- tests/test_client.py | 91 +++++++++++++++++++++++ 2 files changed, 241 insertions(+), 18 deletions(-) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 8b8ea380..a31696ca 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -6,12 +6,29 @@ import importlib.metadata import json import os +import random +import time import uuid -from collections.abc import AsyncIterator, Iterator, Mapping, Sequence +from collections.abc import ( + AsyncIterator, + Awaitable, + Callable, + Iterator, + Mapping, + Sequence, +) from contextlib import ExitStack from os import PathLike from pathlib import Path -from typing import IO, Any, Generic, Literal, TypeAlias, TypeVar, cast +from typing import ( + IO, + Any, + Generic, + Literal, + TypeAlias, + TypeVar, + cast, +) import httpx from httpx import URL @@ -21,6 +38,10 @@ PdfRestApiError, PdfRestAuthenticationError, PdfRestConfigurationError, + PdfRestError, + PdfRestRequestError, + PdfRestTimeoutError, + PdfRestTransportError, translate_httpx_error, ) from .models import ( @@ -65,6 +86,10 @@ DEFAULT_READ_TIMEOUT_SECONDS = 120.0 FILE_UPLOAD_FIELD_NAME = "file" DEFAULT_FILE_INFO_CONCURRENCY = 8 +DEFAULT_MAX_RETRIES = 2 +INITIAL_BACKOFF_SECONDS = 0.5 +MAX_BACKOFF_SECONDS = 8.0 +BACKOFF_JITTER_SECONDS = 0.1 HttpMethod = Literal["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"] QueryParamValue = str | int | float | bool | None @@ -245,6 +270,7 @@ def _normalize_file_id(file_ref: PdfRestFileID | str) -> PdfRestFileID: ClientType = TypeVar("ClientType", httpx.Client, httpx.AsyncClient) +ReturnType = TypeVar("ReturnType") class _ClientConfig(BaseModel): @@ -341,7 +367,12 @@ def __init__( base_url: str | URL | None = None, timeout: TimeoutTypes | None = None, headers: AnyMapping | None = None, + max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: + if not isinstance(max_retries, int) or max_retries < 0: + msg = "max_retries must be a non-negative integer." + raise PdfRestConfigurationError(msg) + self._max_retries = max_retries raw_api_key = api_key if api_key is not None else os.getenv(API_KEY_ENV_VAR) resolved_api_key = ( raw_api_key.strip() if raw_api_key and raw_api_key.strip() else None @@ -387,6 +418,27 @@ def __init__( except ValidationError as exc: # pragma: no cover - defensive raise PdfRestConfigurationError(str(exc)) from exc + @staticmethod + def _is_retryable_status(status_code: int) -> bool: + return status_code == 429 or 500 <= status_code < 600 + + def _should_retry_exception(self, exc: PdfRestError) -> bool: + if isinstance(exc, PdfRestApiError): + return self._is_retryable_status(exc.status_code) + return isinstance( + exc, (PdfRestTimeoutError, PdfRestTransportError, PdfRestRequestError) + ) + + def _compute_backoff_delay(self, retry_number: int) -> float: + base_delay = min( + INITIAL_BACKOFF_SECONDS * (2**retry_number), + MAX_BACKOFF_SECONDS, + ) + # ignoring S311 because this isn't being used for cryptography + jitter = random.uniform(-BACKOFF_JITTER_SECONDS, BACKOFF_JITTER_SECONDS) # noqa: S311 + delay = base_delay + jitter + return delay if delay > 0 else 0.0 + @staticmethod def _base_url_requires_api_key(url: URL) -> bool: host = url.host or "" @@ -567,12 +619,14 @@ def __init__( headers: AnyMapping | None = None, http_client: httpx.Client | None = None, transport: httpx.BaseTransport | None = None, + max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: super().__init__( api_key=api_key, base_url=base_url, timeout=timeout, headers=headers, + max_retries=max_retries, ) self._owns_http_client = http_client is None self._client = http_client or httpx.Client( @@ -592,8 +646,30 @@ def __enter__(self) -> _SyncApiClient: def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: self.close() + def _execute_with_retry(self, func: Callable[[], ReturnType]) -> ReturnType: + for attempt in range(self._max_retries + 1): + try: + return func() + except PdfRestError as exc: + if attempt == self._max_retries or not self._should_retry_exception( + exc + ): + raise + delay = self._compute_backoff_delay(attempt) + if delay > 0: + time.sleep(delay) + msg = "Retry loop exited unexpectedly." + raise RuntimeError(msg) # pragma: no cover + def _send_request(self, request: _RequestModel) -> Any: http_client = self._client + return self._execute_with_retry( + lambda: self._perform_request(http_client, request) + ) + + def _perform_request( + self, http_client: httpx.Client, request: _RequestModel + ) -> Any: try: response = http_client.request( method=request.method, @@ -607,7 +683,13 @@ def _send_request(self, request: _RequestModel) -> Any: ) except httpx.HTTPError as exc: raise translate_httpx_error(exc) from exc - return self._handle_response(response) + try: + payload = self._handle_response(response) + except PdfRestApiError: + response.close() + raise + response.close() + return payload def _post_file_operation( self, @@ -668,13 +750,18 @@ def download_file( extra_headers: AnyMapping | None = None, timeout: TimeoutTypes | None = None, ) -> httpx.Response: - request = self.prepare_request( + request_model = self.prepare_request( "GET", f"/resource/{file_id}", extra_query=extra_query, extra_headers=extra_headers, timeout=timeout, ) + return self._execute_with_retry( + lambda: self._download_with_retry(request_model) + ) + + def _download_with_retry(self, request: _RequestModel) -> httpx.Response: http_request = self._client.build_request( request.method, request.endpoint, @@ -692,12 +779,14 @@ def download_file( response = self._client.send(http_request, stream=True) except httpx.HTTPError as exc: raise translate_httpx_error(exc) from exc - if not response.is_success: - try: - self._handle_response(response) - finally: - response.close() - return response + if response.is_success: + return response + try: + self._handle_response(response) + finally: + response.close() + msg = "Unreachable" + raise RuntimeError(msg) # pragma: no cover def fetch_file_info( self, @@ -734,12 +823,14 @@ def __init__( http_client: httpx.AsyncClient | None = None, transport: httpx.AsyncBaseTransport | None = None, concurrency_limit: int = DEFAULT_FILE_INFO_CONCURRENCY, + max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: super().__init__( api_key=api_key, base_url=base_url, timeout=timeout, headers=headers, + max_retries=max_retries, ) self._owns_http_client = http_client is None self._client = http_client or httpx.AsyncClient( @@ -760,8 +851,32 @@ async def __aenter__(self) -> _AsyncApiClient: async def __aexit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: await self.aclose() + async def _execute_with_retry( + self, func: Callable[[], Awaitable[ReturnType]] + ) -> ReturnType: + for attempt in range(self._max_retries + 1): + try: + return await func() + except PdfRestError as exc: + if attempt == self._max_retries or not self._should_retry_exception( + exc + ): + raise + delay = self._compute_backoff_delay(attempt) + if delay > 0: + await asyncio.sleep(delay) + msg = "Retry loop exited unexpectedly." + raise RuntimeError(msg) # pragma: no cover + async def _send_request(self, request: _RequestModel) -> Any: http_client = self._client + return await self._execute_with_retry( + lambda: self._perform_request(http_client, request) + ) + + async def _perform_request( + self, http_client: httpx.AsyncClient, request: _RequestModel + ) -> Any: try: response = await http_client.request( method=request.method, @@ -775,7 +890,13 @@ async def _send_request(self, request: _RequestModel) -> Any: ) except httpx.HTTPError as exc: raise translate_httpx_error(exc) from exc - return self._handle_response(response) + try: + payload = self._handle_response(response) + except PdfRestApiError: + await response.aclose() + raise + await response.aclose() + return payload async def _post_file_operation( self, @@ -844,13 +965,18 @@ async def download_file( extra_headers: AnyMapping | None = None, timeout: TimeoutTypes | None = None, ) -> httpx.Response: - request = self.prepare_request( + request_model = self.prepare_request( "GET", f"/resource/{file_id}", extra_query=extra_query, extra_headers=extra_headers, timeout=timeout, ) + return await self._execute_with_retry( + lambda: self._download_with_retry(request_model) + ) + + async def _download_with_retry(self, request: _RequestModel) -> httpx.Response: http_request = self._client.build_request( request.method, request.endpoint, @@ -868,12 +994,14 @@ async def download_file( response = await self._client.send(http_request, stream=True) except httpx.HTTPError as exc: raise translate_httpx_error(exc) from exc - if not response.is_success: - try: - self._handle_response(response) - finally: - await response.aclose() - return response + if response.is_success: + return response + try: + self._handle_response(response) + finally: + await response.aclose() + msg = "Unreachable" + raise RuntimeError(msg) # pragma: no cover async def fetch_file_info( self, @@ -1441,6 +1569,7 @@ def __init__( headers: AnyMapping | None = None, http_client: httpx.Client | None = None, transport: httpx.BaseTransport | None = None, + max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: """Create a synchronous pdfRest client.""" @@ -1451,6 +1580,7 @@ def __init__( headers=headers, http_client=http_client, transport=transport, + max_retries=max_retries, ) self._files_client = _FilesClient(self) @@ -1867,6 +1997,7 @@ def __init__( http_client: httpx.AsyncClient | None = None, transport: httpx.AsyncBaseTransport | None = None, concurrency_limit: int = DEFAULT_FILE_INFO_CONCURRENCY, + max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: """Create an asynchronous pdfRest client.""" @@ -1878,6 +2009,7 @@ def __init__( http_client=http_client, transport=transport, concurrency_limit=concurrency_limit, + max_retries=max_retries, ) self._files_client = _AsyncFilesClient(self) diff --git a/tests/test_client.py b/tests/test_client.py index a664de3e..56d7215f 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -193,6 +193,97 @@ def handler(_: httpx.Request) -> httpx.Response: client.up(extra_body={"unexpected": "value"}) +def test_client_retries_on_server_error(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] < 3: + return httpx.Response(500, json={"error": "try-again"}) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.up() + + assert attempts["count"] == 3 + assert response.status == "OK" + assert sleep_calls == [0.5, 1.0] + + +def test_client_raises_after_retry_limit(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + return httpx.Response(503, json={"error": "busy"}) + + transport = httpx.MockTransport(handler) + with ( + pytest.raises(PdfRestApiError), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=1 + ) as client, + ): + client.up() + + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + +@pytest.mark.asyncio +async def test_async_client_retries_on_server_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] < 3: + return httpx.Response(503, json={"error": "retry"}) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.up() + + assert attempts["count"] == 3 + assert response.status == "OK" + assert sleep_calls == [0.5, 1.0] + + +def test_client_rejects_negative_max_retries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + + with pytest.raises(PdfRestConfigurationError): + PdfRestClient(max_retries=-1) + + def test_prepare_request_merges_queries(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("PDFREST_API_KEY", "key") with PdfRestClient(api_key=VALID_API_KEY) as client: From 1b70258e92445b09378e0740487d7e768a1431ab Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Mon, 10 Nov 2025 18:01:13 -0600 Subject: [PATCH 2/5] Add logging to pdfRest clients - Introduced structured logging for request and response details, including retry attempts, delays, payloads, and errors. - Integrated sanitized logging for sensitive header information. - Improved debugging of request execution and retry behavior for better traceability. - Updated exception handling to include detailed log outputs. Assisted-by: Codex --- src/pdfrest/client.py | 199 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 181 insertions(+), 18 deletions(-) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index a31696ca..48cfe0b2 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -5,6 +5,7 @@ import asyncio import importlib.metadata import json +import logging import os import random import time @@ -98,6 +99,48 @@ Query = Mapping[str, QueryParamValue] Body = Mapping[str, Any] +PDFREST_LOGGER = logging.getLogger("pdfrest") +PDFREST_LOGGER.addHandler(logging.NullHandler()) +LOGGER = logging.getLogger("pdfrest.client") +_PDFREST_HANDLER_IDS: set[int] = set() + + +def _ensure_stream_handler( + logger: logging.Logger, formatter: logging.Formatter +) -> None: + for handler in logger.handlers: + if id(handler) in _PDFREST_HANDLER_IDS: + return + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger.addHandler(handler) + _PDFREST_HANDLER_IDS.add(id(handler)) + + +def _configure_logging() -> None: + level_name = os.getenv("PDFREST_LOG") + if not level_name: + return + normalized = level_name.strip().lower() + level_map = {"debug": logging.DEBUG, "info": logging.INFO} + level = level_map.get(normalized) + if level is None: + return + formatter = logging.Formatter("%(asctime)s %(levelname)s [%(name)s] %(message)s") + pdfrest_logger = PDFREST_LOGGER + pdfrest_logger.setLevel(level) + _ensure_stream_handler(pdfrest_logger, formatter) + pdfrest_logger.propagate = False + + httpx_logger = logging.getLogger("httpx") + httpx_logger.setLevel(level) + _ensure_stream_handler(httpx_logger, formatter) + httpx_logger.propagate = False + + +_configure_logging() + + FileContent = IO[bytes] | bytes | str FileTuple2 = tuple[str | None, FileContent] FileTuple3 = tuple[str | None, FileContent, str | None] @@ -369,6 +412,7 @@ def __init__( headers: AnyMapping | None = None, max_retries: int = DEFAULT_MAX_RETRIES, ) -> None: + self._logger = LOGGER if not isinstance(max_retries, int) or max_retries < 0: msg = "max_retries must be a non-negative integer." raise PdfRestConfigurationError(msg) @@ -439,6 +483,42 @@ def _compute_backoff_delay(self, retry_number: int) -> float: delay = base_delay + jitter return delay if delay > 0 else 0.0 + @staticmethod + def _sanitize_headers(headers: Mapping[str, Any] | None) -> dict[str, Any]: + if not headers: + return {} + sanitized: dict[str, Any] = {} + for key, value in headers.items(): + if key.lower() == API_KEY_HEADER_NAME.lower(): + sanitized[key] = "******" + else: + sanitized[key] = value + return sanitized + + def _log_request(self, request: _RequestModel) -> None: + if not self._logger.isEnabledFor(logging.DEBUG): + return + sanitized_headers = self._sanitize_headers(request.headers) + self._logger.debug( + "Request %s %s params=%s timeout=%s headers=%s", + request.method, + request.endpoint, + request.params, + request.timeout, + sanitized_headers, + ) + if request.method in {"POST", "PUT", "PATCH"} and request.json_body is not None: + self._logger.debug( + "Request payload %s %s: %s", + request.method, + request.endpoint, + request.json_body, + ) + + @staticmethod + def _describe_request(request: _RequestModel) -> str: + return f"{request.method} {request.endpoint}" + @staticmethod def _base_url_requires_api_key(url: URL) -> bool: host = url.host or "" @@ -567,19 +647,46 @@ def _compose_json_body( return payload def _handle_response(self, response: httpx.Response) -> Any: + request = response.request + request_label = ( + f"{getattr(request, 'method', 'UNKNOWN')} {getattr(request, 'url', '')}" + if request is not None + else "UNKNOWN" + ) if response.is_success: + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Response %s status=%s", request_label, response.status_code + ) return self._decode_json(response) message, error_payload = self._extract_error_details(response) if response.status_code == 401: auth_message = message or "Authentication with pdfRest failed." + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Authentication error response %s status=%s message=%s payload=%s", + request_label, + response.status_code, + auth_message, + error_payload, + ) raise PdfRestAuthenticationError( response.status_code, message=auth_message, response_content=error_payload, ) + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Error response %s status=%s message=%s payload=%s", + request_label, + response.status_code, + message, + error_payload, + ) + raise PdfRestApiError( response.status_code, message=message, response_content=error_payload ) @@ -646,16 +753,29 @@ def __enter__(self) -> _SyncApiClient: def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: self.close() - def _execute_with_retry(self, func: Callable[[], ReturnType]) -> ReturnType: - for attempt in range(self._max_retries + 1): + def _execute_with_retry( + self, func: Callable[[], ReturnType], *, operation: str + ) -> ReturnType: + total_attempts = self._max_retries + 1 + for attempt in range(total_attempts): try: return func() except PdfRestError as exc: - if attempt == self._max_retries or not self._should_retry_exception( - exc - ): + self._logger.debug( + "Exception during %s attempt %d/%d: %s", + operation, + attempt + 1, + total_attempts, + exc, + ) + should_retry = ( + attempt < self._max_retries and self._should_retry_exception(exc) + ) + if not should_retry: + self._logger.debug("No retry for %s; raising exception.", operation) raise delay = self._compute_backoff_delay(attempt) + self._logger.debug("Retrying %s after %.2f seconds.", operation, delay) if delay > 0: time.sleep(delay) msg = "Retry loop exited unexpectedly." @@ -664,12 +784,14 @@ def _execute_with_retry(self, func: Callable[[], ReturnType]) -> ReturnType: def _send_request(self, request: _RequestModel) -> Any: http_client = self._client return self._execute_with_retry( - lambda: self._perform_request(http_client, request) + lambda: self._perform_request(http_client, request), + operation=self._describe_request(request), ) def _perform_request( self, http_client: httpx.Client, request: _RequestModel ) -> Any: + self._log_request(request) try: response = http_client.request( method=request.method, @@ -682,7 +804,13 @@ def _perform_request( data=request.data, ) except httpx.HTTPError as exc: - raise translate_httpx_error(exc) from exc + translated = translate_httpx_error(exc) + self._logger.debug( + "HTTPX exception for %s: %s", + self._describe_request(request), + translated, + ) + raise translated from exc try: payload = self._handle_response(response) except PdfRestApiError: @@ -758,10 +886,12 @@ def download_file( timeout=timeout, ) return self._execute_with_retry( - lambda: self._download_with_retry(request_model) + lambda: self._download_with_retry(request_model), + operation=f"{request_model.method} {request_model.endpoint} (download)", ) def _download_with_retry(self, request: _RequestModel) -> httpx.Response: + self._log_request(request) http_request = self._client.build_request( request.method, request.endpoint, @@ -778,7 +908,13 @@ def _download_with_retry(self, request: _RequestModel) -> httpx.Response: try: response = self._client.send(http_request, stream=True) except httpx.HTTPError as exc: - raise translate_httpx_error(exc) from exc + translated = translate_httpx_error(exc) + self._logger.debug( + "HTTPX exception for %s: %s", + self._describe_request(request), + translated, + ) + raise translated from exc if response.is_success: return response try: @@ -852,17 +988,28 @@ async def __aexit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: await self.aclose() async def _execute_with_retry( - self, func: Callable[[], Awaitable[ReturnType]] + self, func: Callable[[], Awaitable[ReturnType]], *, operation: str ) -> ReturnType: - for attempt in range(self._max_retries + 1): + total_attempts = self._max_retries + 1 + for attempt in range(total_attempts): try: return await func() except PdfRestError as exc: - if attempt == self._max_retries or not self._should_retry_exception( - exc - ): + self._logger.debug( + "Exception during %s attempt %d/%d: %s", + operation, + attempt + 1, + total_attempts, + exc, + ) + should_retry = ( + attempt < self._max_retries and self._should_retry_exception(exc) + ) + if not should_retry: + self._logger.debug("No retry for %s; raising exception.", operation) raise delay = self._compute_backoff_delay(attempt) + self._logger.debug("Retrying %s after %.2f seconds.", operation, delay) if delay > 0: await asyncio.sleep(delay) msg = "Retry loop exited unexpectedly." @@ -871,12 +1018,14 @@ async def _execute_with_retry( async def _send_request(self, request: _RequestModel) -> Any: http_client = self._client return await self._execute_with_retry( - lambda: self._perform_request(http_client, request) + lambda: self._perform_request(http_client, request), + operation=self._describe_request(request), ) async def _perform_request( self, http_client: httpx.AsyncClient, request: _RequestModel ) -> Any: + self._log_request(request) try: response = await http_client.request( method=request.method, @@ -889,7 +1038,13 @@ async def _perform_request( data=request.data, ) except httpx.HTTPError as exc: - raise translate_httpx_error(exc) from exc + translated = translate_httpx_error(exc) + self._logger.debug( + "HTTPX exception for %s: %s", + self._describe_request(request), + translated, + ) + raise translated from exc try: payload = self._handle_response(response) except PdfRestApiError: @@ -973,10 +1128,12 @@ async def download_file( timeout=timeout, ) return await self._execute_with_retry( - lambda: self._download_with_retry(request_model) + lambda: self._download_with_retry(request_model), + operation=f"{request_model.method} {request_model.endpoint} (download)", ) async def _download_with_retry(self, request: _RequestModel) -> httpx.Response: + self._log_request(request) http_request = self._client.build_request( request.method, request.endpoint, @@ -993,7 +1150,13 @@ async def _download_with_retry(self, request: _RequestModel) -> httpx.Response: try: response = await self._client.send(http_request, stream=True) except httpx.HTTPError as exc: - raise translate_httpx_error(exc) from exc + translated = translate_httpx_error(exc) + self._logger.debug( + "HTTPX exception for %s: %s", + self._describe_request(request), + translated, + ) + raise translated from exc if response.is_success: return response try: From 9186eb72e8b99afba7f90f114819a9f78183d557 Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Tue, 25 Nov 2025 13:26:35 -0600 Subject: [PATCH 3/5] Add retryable stream handling, more retry cases - Implemented logic to track and rewind file streams between retries. - Enhanced retry mechanism to respect `Retry-After` headers with both seconds and HTTP-date formats. - Improved handling of non-seekable streams, logging errors and aborting retries gracefully. - Updated tests to cover stream behavior during retries, including seekable and non-seekable streams. - Refactored retry logic to include customizable pre-attempt hooks and delay adjustments based on responses. - Handle 408, 429, and 499 retries. Assisted-by: Codex --- src/pdfrest/client.py | 258 ++++++++++++++++++++++- src/pdfrest/exceptions.py | 3 + tests/test_client.py | 418 +++++++++++++++++++++++++++++++++++++- 3 files changed, 668 insertions(+), 11 deletions(-) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 48cfe0b2..1a466980 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -19,6 +19,8 @@ Sequence, ) from contextlib import ExitStack +from datetime import datetime, timezone +from email.utils import parsedate_to_datetime from os import PathLike from pathlib import Path from typing import ( @@ -33,7 +35,14 @@ import httpx from httpx import URL -from pydantic import BaseModel, ConfigDict, Field, ValidationError, field_validator +from pydantic import ( + BaseModel, + ConfigDict, + Field, + PrivateAttr, + ValidationError, + field_validator, +) from .exceptions import ( PdfRestApiError, @@ -91,6 +100,13 @@ INITIAL_BACKOFF_SECONDS = 0.5 MAX_BACKOFF_SECONDS = 8.0 BACKOFF_JITTER_SECONDS = 0.1 +RETRYABLE_STATUS_CODES = {408, 425, 429, 499} +FileStreamSnapshot = tuple[IO[Any], int] + + +def _empty_snapshot_list() -> list[FileStreamSnapshot]: + return [] + HttpMethod = Literal["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"] QueryParamValue = str | int | float | bool | None @@ -141,6 +157,26 @@ def _configure_logging() -> None: _configure_logging() +def _parse_retry_after_header(header_value: str | None) -> float | None: + if not header_value: + return None + trimmed = header_value.strip() + if not trimmed: + return None + try: + seconds = float(trimmed) + except ValueError: + try: + retry_datetime = parsedate_to_datetime(trimmed) + except (TypeError, ValueError): + return None + if retry_datetime.tzinfo is None: + retry_datetime = retry_datetime.replace(tzinfo=timezone.utc) + delay = (retry_datetime - datetime.now(timezone.utc)).total_seconds() + return delay if delay > 0 else 0.0 + return seconds if seconds > 0 else 0.0 + + FileContent = IO[bytes] | bytes | str FileTuple2 = tuple[str | None, FileContent] FileTuple3 = tuple[str | None, FileContent, str | None] @@ -387,6 +423,12 @@ class _RequestModel(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) + _stream_snapshots: list[FileStreamSnapshot] = PrivateAttr( + default_factory=_empty_snapshot_list + ) + _stream_tracking_ready: bool = PrivateAttr(default=False) + _has_non_rewindable_streams: bool = PrivateAttr(default=False) + @field_validator("endpoint") @classmethod def _validate_endpoint(cls, value: str) -> str: @@ -395,6 +437,26 @@ def _validate_endpoint(cls, value: str) -> str: raise PdfRestConfigurationError(msg) return value + def stream_tracking_ready(self) -> bool: + return self._stream_tracking_ready + + def mark_stream_tracking_ready(self) -> None: + self._stream_tracking_ready = True + + def has_non_rewindable_streams(self) -> bool: + return self._has_non_rewindable_streams + + def mark_non_rewindable_streams(self) -> None: + self._has_non_rewindable_streams = True + self._stream_snapshots.clear() + self._stream_tracking_ready = True + + def record_stream_snapshot(self, stream: IO[Any], position: int) -> None: + self._stream_snapshots.append((stream, position)) + + def stream_snapshots(self) -> tuple[FileStreamSnapshot, ...]: + return tuple(self._stream_snapshots) + class _BaseApiClient(Generic[ClientType]): """Shared logic between sync and async client variants.""" @@ -464,9 +526,14 @@ def __init__( @staticmethod def _is_retryable_status(status_code: int) -> bool: - return status_code == 429 or 500 <= status_code < 600 + if status_code in RETRYABLE_STATUS_CODES: + return True + return 500 <= status_code < 600 def _should_retry_exception(self, exc: PdfRestError) -> bool: + allow_retry = getattr(exc, "allow_retry", True) + if not allow_retry: + return False if isinstance(exc, PdfRestApiError): return self._is_retryable_status(exc.status_code) return isinstance( @@ -483,6 +550,13 @@ def _compute_backoff_delay(self, retry_number: int) -> float: delay = base_delay + jitter return delay if delay > 0 else 0.0 + def _determine_retry_delay(self, attempt: int, exc: PdfRestError) -> float: + delay = self._compute_backoff_delay(attempt) + retry_after_value = getattr(exc, "retry_after", None) + if isinstance(retry_after_value, (int, float)): + delay = max(delay, float(retry_after_value)) + return delay + @staticmethod def _sanitize_headers(headers: Mapping[str, Any] | None) -> dict[str, Any]: if not headers: @@ -565,6 +639,10 @@ def _prepare_request( raise PdfRestConfigurationError(msg) timeout_value = timeout if timeout is not None else self._config.timeout + files_payload: Any | None = files + if isinstance(files_payload, Iterator): + files_payload = list(files_payload) + try: request = _RequestModel( method=method, @@ -573,7 +651,7 @@ def _prepare_request( headers=headers, timeout=timeout_value, json_body=json_payload, - files=files, + files=files_payload, data=data, ) except PdfRestConfigurationError: @@ -646,6 +724,111 @@ def _compose_json_body( payload[str(key)] = value return payload + @staticmethod + def _iterate_file_like_objects(value: Any) -> Iterator[IO[Any]]: + if value is None: + return + if hasattr(value, "read"): + yield cast(IO[Any], value) + return + if isinstance(value, (bytes, bytearray, str)): + return + if isinstance(value, Mapping): + for item in value.values(): + yield from _BaseApiClient._iterate_file_like_objects(item) + return + if isinstance(value, Sequence): + for item in value: + yield from _BaseApiClient._iterate_file_like_objects(item) + + def _log_non_rewindable_stream(self, request: _RequestModel, reason: str) -> None: + self._logger.error( + "Cannot retry %s because %s", + self._describe_request(request), + reason, + ) + + def _capture_file_stream_positions(self, request: _RequestModel) -> None: + if request.stream_tracking_ready(): + return + if request.files is None or self._max_retries == 0: + request.mark_stream_tracking_ready() + return + snapshots = request.stream_snapshots() + seen_ids: set[int] = {id(stream) for stream, _ in snapshots} + for stream in self._iterate_file_like_objects(request.files): + stream_id = id(stream) + if stream_id in seen_ids: + continue + seek_fn = getattr(stream, "seek", None) + tell_fn = getattr(stream, "tell", None) + if not callable(seek_fn) or not callable(tell_fn): + request.mark_non_rewindable_streams() + self._log_non_rewindable_stream( + request, + "one or more upload streams do not provide seek/tell", + ) + return + try: + position_value = tell_fn() + except (OSError, ValueError): + request.mark_non_rewindable_streams() + self._log_non_rewindable_stream( + request, + "reading the current position failed for an upload stream", + ) + return + position = cast(int, position_value) + request.record_stream_snapshot(stream, position) + seen_ids.add(stream_id) + request.mark_stream_tracking_ready() + + def _rewind_stream_snapshots(self, request: _RequestModel) -> bool: + for stream, position in request.stream_snapshots(): + seek_fn = getattr(stream, "seek", None) + if not callable(seek_fn): + request.mark_non_rewindable_streams() + self._log_non_rewindable_stream( + request, + "one or more upload streams do not support seek", + ) + return False + try: + seek_fn(position) + except (OSError, ValueError) as exc: # pragma: no cover - defensive + request.mark_non_rewindable_streams() + self._log_non_rewindable_stream( + request, + "resetting an upload stream failed", + ) + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.debug( + "Failed to reset upload stream for %s: %s", + self._describe_request(request), + exc, + ) + return False + return True + + def _prepare_request_files_for_attempt( + self, + request: _RequestModel, + *, + is_retry: bool, + ) -> bool: + if request.files is None or self._max_retries == 0: + return True + if not request.stream_tracking_ready(): + self._capture_file_stream_positions(request) + if not is_retry: + return True + if request.has_non_rewindable_streams(): + return False + snapshots = request.stream_snapshots() + if not snapshots: + return True + return self._rewind_stream_snapshots(request) + def _handle_response(self, response: httpx.Response) -> Any: request = response.request request_label = ( @@ -661,6 +844,7 @@ def _handle_response(self, response: httpx.Response) -> Any: return self._decode_json(response) message, error_payload = self._extract_error_details(response) + retry_after = _parse_retry_after_header(response.headers.get("Retry-After")) if response.status_code == 401: auth_message = message or "Authentication with pdfRest failed." @@ -676,6 +860,7 @@ def _handle_response(self, response: httpx.Response) -> Any: response.status_code, message=auth_message, response_content=error_payload, + retry_after=retry_after, ) if self._logger.isEnabledFor(logging.DEBUG): @@ -688,7 +873,10 @@ def _handle_response(self, response: httpx.Response) -> Any: ) raise PdfRestApiError( - response.status_code, message=message, response_content=error_payload + response.status_code, + message=message, + response_content=error_payload, + retry_after=retry_after, ) def _decode_json(self, response: httpx.Response) -> Any: @@ -754,13 +942,28 @@ def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: self.close() def _execute_with_retry( - self, func: Callable[[], ReturnType], *, operation: str + self, + func: Callable[[], ReturnType], + *, + operation: str, + before_attempt: Callable[[bool], bool] | None = None, + should_continue: Callable[[PdfRestError], bool] | None = None, ) -> ReturnType: total_attempts = self._max_retries + 1 + last_exception: PdfRestError | None = None for attempt in range(total_attempts): + is_retry = attempt > 0 + if before_attempt is not None: + can_continue = before_attempt(is_retry) + if not can_continue: + if last_exception is not None: + raise last_exception + msg = "Retry aborted before the initial attempt." + raise RuntimeError(msg) try: return func() except PdfRestError as exc: + last_exception = exc self._logger.debug( "Exception during %s attempt %d/%d: %s", operation, @@ -768,13 +971,18 @@ def _execute_with_retry( total_attempts, exc, ) + additional_retry_allowed = ( + should_continue(exc) if should_continue is not None else True + ) should_retry = ( - attempt < self._max_retries and self._should_retry_exception(exc) + attempt < self._max_retries + and self._should_retry_exception(exc) + and additional_retry_allowed ) if not should_retry: self._logger.debug("No retry for %s; raising exception.", operation) raise - delay = self._compute_backoff_delay(attempt) + delay = self._determine_retry_delay(attempt, exc) self._logger.debug("Retrying %s after %.2f seconds.", operation, delay) if delay > 0: time.sleep(delay) @@ -783,9 +991,14 @@ def _execute_with_retry( def _send_request(self, request: _RequestModel) -> Any: http_client = self._client + + def before_attempt(is_retry: bool) -> bool: + return self._prepare_request_files_for_attempt(request, is_retry=is_retry) + return self._execute_with_retry( lambda: self._perform_request(http_client, request), operation=self._describe_request(request), + before_attempt=before_attempt, ) def _perform_request( @@ -988,13 +1201,28 @@ async def __aexit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: await self.aclose() async def _execute_with_retry( - self, func: Callable[[], Awaitable[ReturnType]], *, operation: str + self, + func: Callable[[], Awaitable[ReturnType]], + *, + operation: str, + before_attempt: Callable[[bool], bool] | None = None, + should_continue: Callable[[PdfRestError], bool] | None = None, ) -> ReturnType: total_attempts = self._max_retries + 1 + last_exception: PdfRestError | None = None for attempt in range(total_attempts): + is_retry = attempt > 0 + if before_attempt is not None: + can_continue = before_attempt(is_retry) + if not can_continue: + if last_exception is not None: + raise last_exception + msg = "Retry aborted before the initial attempt." + raise RuntimeError(msg) try: return await func() except PdfRestError as exc: + last_exception = exc self._logger.debug( "Exception during %s attempt %d/%d: %s", operation, @@ -1002,13 +1230,18 @@ async def _execute_with_retry( total_attempts, exc, ) + additional_retry_allowed = ( + should_continue(exc) if should_continue is not None else True + ) should_retry = ( - attempt < self._max_retries and self._should_retry_exception(exc) + attempt < self._max_retries + and self._should_retry_exception(exc) + and additional_retry_allowed ) if not should_retry: self._logger.debug("No retry for %s; raising exception.", operation) raise - delay = self._compute_backoff_delay(attempt) + delay = self._determine_retry_delay(attempt, exc) self._logger.debug("Retrying %s after %.2f seconds.", operation, delay) if delay > 0: await asyncio.sleep(delay) @@ -1017,9 +1250,14 @@ async def _execute_with_retry( async def _send_request(self, request: _RequestModel) -> Any: http_client = self._client + + def before_attempt(is_retry: bool) -> bool: + return self._prepare_request_files_for_attempt(request, is_retry=is_retry) + return await self._execute_with_retry( lambda: self._perform_request(http_client, request), operation=self._describe_request(request), + before_attempt=before_attempt, ) async def _perform_request( diff --git a/src/pdfrest/exceptions.py b/src/pdfrest/exceptions.py index d08556e1..d7d74ea2 100644 --- a/src/pdfrest/exceptions.py +++ b/src/pdfrest/exceptions.py @@ -46,9 +46,12 @@ def __init__( status_code: int, message: str | None = None, response_content: Any | None = None, + *, + retry_after: float | None = None, ) -> None: self.status_code = status_code self.response_content = response_content + self.retry_after = retry_after detail = message or f"pdfRest API returned status code {status_code}" super().__init__(detail) diff --git a/tests/test_client.py b/tests/test_client.py index 56d7215f..ce6c34d8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,6 +1,8 @@ from __future__ import annotations -from datetime import date +from datetime import date, datetime, timedelta, timezone +from email.utils import format_datetime +from io import BytesIO, UnsupportedOperation from typing import Any import httpx @@ -31,6 +33,33 @@ def _build_up_response() -> dict[str, Any]: } +def _build_file_info( + file_id: str = "1de305d2-b6a0-4b5d-9a55-4e4e6d8c2d39", +) -> dict[str, Any]: + return { + "id": file_id, + "name": "sample.pdf", + "url": "https://files.example.com/sample.pdf", + "type": "application/pdf", + "size": 1024, + "modified": "2024-01-01T00:00:00+00:00", + "scheduledDeletionTimeUtc": "2024-01-02T00:00:00+00:00", + } + + +class NonSeekableByteStream(BytesIO): + def __init__(self, payload: bytes) -> None: + super().__init__(payload) + + def seek(self, *args: Any, **kwargs: Any) -> int: + msg = "non-seekable" + raise UnsupportedOperation(msg) + + def tell(self) -> int: + msg = "non-seekable" + raise UnsupportedOperation(msg) + + def test_client_uses_provided_api_key(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("PDFREST_API_KEY", raising=False) @@ -218,6 +247,123 @@ def handler(request: httpx.Request) -> httpx.Response: assert sleep_calls == [0.5, 1.0] +def test_client_retry_honors_retry_after_seconds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response( + 429, + headers={"Retry-After": "2"}, + json={"error": "slow down"}, + ) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls == [pytest.approx(2.0)] + + +def test_client_retry_honors_retry_after_http_date( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + retry_after = format_datetime(datetime.now(timezone.utc) + timedelta(seconds=3)) + + def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response( + 503, + headers={"Retry-After": retry_after}, + json={"error": "busy"}, + ) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls + assert sleep_calls[0] >= 2.0 + + +def test_client_retries_on_timeout_exception( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "timeout" + raise httpx.TimeoutException(msg) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + +def test_client_retries_on_408_status(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(408, text="timeout") + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + def test_client_raises_after_retry_limit(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) @@ -275,6 +421,71 @@ async def handler(request: httpx.Request) -> httpx.Response: assert sleep_calls == [0.5, 1.0] +@pytest.mark.asyncio +async def test_async_client_retries_on_timeout_exception( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "timeout" + raise httpx.TimeoutException(msg) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + +@pytest.mark.asyncio +async def test_async_client_retry_honors_retry_after_seconds( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(_: httpx.Request) -> httpx.Response: + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response( + 429, + headers={"Retry-After": "4"}, + json={"error": "slow"}, + ) + return httpx.Response(200, json=_build_up_response()) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.up() + + assert response.status == "OK" + assert attempts["count"] == 2 + assert sleep_calls == [pytest.approx(4.0)] + + def test_client_rejects_negative_max_retries( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -321,6 +532,75 @@ def test_prepare_request_rejects_files_with_json( ) +def test_download_file_retries_on_error(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + monkeypatch.setattr( + client_module.time, "sleep", lambda delay: sleep_calls.append(delay) + ) + + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/resource/file-123": + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(503, json={"error": "retry"}) + return httpx.Response(200, content=b"file-bytes") + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + response = client.download_file("file-123") + try: + payload = response.read() + finally: + response.close() + + assert payload == b"file-bytes" + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + +@pytest.mark.asyncio +async def test_async_download_file_retries_on_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + sleep_calls: list[float] = [] + + async def fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/resource/async-file": + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(500, json={"error": "retry"}) + return httpx.Response(200, content=b"async-bytes") + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + response = await client.download_file("async-file") + try: + payload = await response.aread() + finally: + await response.aclose() + + assert payload == b"async-bytes" + assert attempts["count"] == 2 + assert sleep_calls == [0.5] + + def test_authentication_error_raises_specific_exception( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -448,6 +728,142 @@ def handler(_: httpx.Request) -> httpx.Response: await client.up(extra_body={"unexpected": "value"}) +def test_client_rewinds_file_streams_between_retries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + file_id = _build_file_info()["id"] + upload_attempts = {"count": 0} + payload_sizes: list[int] = [] + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + payload_sizes.append(len(request.content)) + upload_attempts["count"] += 1 + if upload_attempts["count"] == 1: + return httpx.Response(500, json={"error": "retry"}) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + file_stream = BytesIO(b"payload bytes") + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create([("doc.pdf", file_stream, "application/pdf")]) + + assert payload_sizes[0] == payload_sizes[1] > 0 + assert upload_attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +def test_client_retry_fails_for_non_seekable_stream( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + return httpx.Response(500, json={"error": "retry"}) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = NonSeekableByteStream(b"payload") + with ( + pytest.raises(PdfRestApiError, match="retry"), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=1 + ) as client, + ): + client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +@pytest.mark.asyncio +async def test_async_client_rewinds_file_streams_between_retries( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + file_id = _build_file_info()["id"] + upload_attempts = {"count": 0} + payload_sizes: list[int] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + payload_sizes.append(len(request.content)) + upload_attempts["count"] += 1 + if upload_attempts["count"] == 1: + return httpx.Response(503, json={"error": "retry"}) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + file_stream = BytesIO(b"async payload") + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create([("doc.pdf", file_stream, "application/pdf")]) + + assert payload_sizes[0] == payload_sizes[1] > 0 + assert upload_attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +@pytest.mark.asyncio +async def test_async_retry_fails_for_non_seekable_stream( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + return httpx.Response(500, json={"error": "retry"}) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = NonSeekableByteStream(b"payload") + with pytest.raises(PdfRestApiError, match="retry"): + async with AsyncPdfRestClient( + api_key=ASYNC_API_KEY, + transport=transport, + max_retries=1, + ) as client: + await client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + def test_live_client_up(pdfrest_api_key: str, pdfrest_live_base_url: str) -> None: with PdfRestClient( api_key=pdfrest_api_key, base_url=pdfrest_live_base_url From b45a81f6a128adee77e3184ffd30b126ad602ad2 Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Mon, 1 Dec 2025 14:32:59 -0600 Subject: [PATCH 4/5] Retry only unused streams without rewinding - To prevent complexity, the file stream and its position belong to the caller. - Any failure that happens after the connection starts might have read from the stream. After that, doing the retry is the responsibility of the caller. - Removed complex stream tracking logic in favor of a simplified `_has_stream_uploads` attribute. - Introduced exceptions `PdfRestConnectTimeoutError` and `PdfRestPoolTimeoutError` for granularity in timeout scenarios, including new cases for retry logic. - Updated `translate_httpx_error` method for finer mapping between `httpx` exceptions and retry behavior. - Enhanced `_contains_open_stream` method for better stream detection in nested structures. - Updated tests for refined handling of retries and no-retry cases, including transport errors, timeouts, and server errors. Assisted-by: Codex --- src/pdfrest/client.py | 191 ++++----------------- src/pdfrest/exceptions.py | 18 ++ tests/test_client.py | 350 +++++++++++++++++++++++++++++++++++--- 3 files changed, 383 insertions(+), 176 deletions(-) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 1a466980..9de8bfb7 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -48,7 +48,9 @@ PdfRestApiError, PdfRestAuthenticationError, PdfRestConfigurationError, + PdfRestConnectTimeoutError, PdfRestError, + PdfRestPoolTimeoutError, PdfRestRequestError, PdfRestTimeoutError, PdfRestTransportError, @@ -101,11 +103,6 @@ MAX_BACKOFF_SECONDS = 8.0 BACKOFF_JITTER_SECONDS = 0.1 RETRYABLE_STATUS_CODES = {408, 425, 429, 499} -FileStreamSnapshot = tuple[IO[Any], int] - - -def _empty_snapshot_list() -> list[FileStreamSnapshot]: - return [] HttpMethod = Literal["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS", "HEAD"] @@ -423,11 +420,7 @@ class _RequestModel(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) - _stream_snapshots: list[FileStreamSnapshot] = PrivateAttr( - default_factory=_empty_snapshot_list - ) - _stream_tracking_ready: bool = PrivateAttr(default=False) - _has_non_rewindable_streams: bool = PrivateAttr(default=False) + _has_stream_uploads: bool = PrivateAttr(default=False) @field_validator("endpoint") @classmethod @@ -437,25 +430,11 @@ def _validate_endpoint(cls, value: str) -> str: raise PdfRestConfigurationError(msg) return value - def stream_tracking_ready(self) -> bool: - return self._stream_tracking_ready - - def mark_stream_tracking_ready(self) -> None: - self._stream_tracking_ready = True - - def has_non_rewindable_streams(self) -> bool: - return self._has_non_rewindable_streams + def mark_has_stream_uploads(self) -> None: + self._has_stream_uploads = True - def mark_non_rewindable_streams(self) -> None: - self._has_non_rewindable_streams = True - self._stream_snapshots.clear() - self._stream_tracking_ready = True - - def record_stream_snapshot(self, stream: IO[Any], position: int) -> None: - self._stream_snapshots.append((stream, position)) - - def stream_snapshots(self) -> tuple[FileStreamSnapshot, ...]: - return tuple(self._stream_snapshots) + def has_stream_uploads(self) -> bool: + return self._has_stream_uploads class _BaseApiClient(Generic[ClientType]): @@ -658,6 +637,8 @@ def _prepare_request( raise except ValidationError as exc: # pragma: no cover - defensive raise PdfRestConfigurationError(str(exc)) from exc + if self._contains_open_stream(files_payload): + request.mark_has_stream_uploads() return request def prepare_request( @@ -725,109 +706,35 @@ def _compose_json_body( return payload @staticmethod - def _iterate_file_like_objects(value: Any) -> Iterator[IO[Any]]: + def _contains_open_stream(value: Any) -> bool: if value is None: - return - if hasattr(value, "read"): - yield cast(IO[Any], value) - return + return False if isinstance(value, (bytes, bytearray, str)): - return + return False + if hasattr(value, "read"): + return True if isinstance(value, Mapping): - for item in value.values(): - yield from _BaseApiClient._iterate_file_like_objects(item) - return - if isinstance(value, Sequence): - for item in value: - yield from _BaseApiClient._iterate_file_like_objects(item) + return any( + _BaseApiClient._contains_open_stream(item) for item in value.values() + ) + if isinstance(value, Sequence) and not isinstance( + value, (str, bytes, bytearray) + ): + return any(_BaseApiClient._contains_open_stream(item) for item in value) + return False - def _log_non_rewindable_stream(self, request: _RequestModel, reason: str) -> None: - self._logger.error( - "Cannot retry %s because %s", - self._describe_request(request), - reason, - ) + def _build_stream_retry_checker( + self, request: _RequestModel + ) -> Callable[[PdfRestError], bool] | None: + if not request.has_stream_uploads(): + return None - def _capture_file_stream_positions(self, request: _RequestModel) -> None: - if request.stream_tracking_ready(): - return - if request.files is None or self._max_retries == 0: - request.mark_stream_tracking_ready() - return - snapshots = request.stream_snapshots() - seen_ids: set[int] = {id(stream) for stream, _ in snapshots} - for stream in self._iterate_file_like_objects(request.files): - stream_id = id(stream) - if stream_id in seen_ids: - continue - seek_fn = getattr(stream, "seek", None) - tell_fn = getattr(stream, "tell", None) - if not callable(seek_fn) or not callable(tell_fn): - request.mark_non_rewindable_streams() - self._log_non_rewindable_stream( - request, - "one or more upload streams do not provide seek/tell", - ) - return - try: - position_value = tell_fn() - except (OSError, ValueError): - request.mark_non_rewindable_streams() - self._log_non_rewindable_stream( - request, - "reading the current position failed for an upload stream", - ) - return - position = cast(int, position_value) - request.record_stream_snapshot(stream, position) - seen_ids.add(stream_id) - request.mark_stream_tracking_ready() - - def _rewind_stream_snapshots(self, request: _RequestModel) -> bool: - for stream, position in request.stream_snapshots(): - seek_fn = getattr(stream, "seek", None) - if not callable(seek_fn): - request.mark_non_rewindable_streams() - self._log_non_rewindable_stream( - request, - "one or more upload streams do not support seek", - ) - return False - try: - seek_fn(position) - except (OSError, ValueError) as exc: # pragma: no cover - defensive - request.mark_non_rewindable_streams() - self._log_non_rewindable_stream( - request, - "resetting an upload stream failed", - ) - if self._logger.isEnabledFor(logging.DEBUG): - self._logger.debug( - "Failed to reset upload stream for %s: %s", - self._describe_request(request), - exc, - ) - return False - return True - - def _prepare_request_files_for_attempt( - self, - request: _RequestModel, - *, - is_retry: bool, - ) -> bool: - if request.files is None or self._max_retries == 0: - return True - if not request.stream_tracking_ready(): - self._capture_file_stream_positions(request) - if not is_retry: - return True - if request.has_non_rewindable_streams(): - return False - snapshots = request.stream_snapshots() - if not snapshots: - return True - return self._rewind_stream_snapshots(request) + def checker(exc: PdfRestError) -> bool: + return bool( + isinstance(exc, (PdfRestConnectTimeoutError, PdfRestPoolTimeoutError)) + ) + + return checker def _handle_response(self, response: httpx.Response) -> Any: request = response.request @@ -946,24 +853,13 @@ def _execute_with_retry( func: Callable[[], ReturnType], *, operation: str, - before_attempt: Callable[[bool], bool] | None = None, should_continue: Callable[[PdfRestError], bool] | None = None, ) -> ReturnType: total_attempts = self._max_retries + 1 - last_exception: PdfRestError | None = None for attempt in range(total_attempts): - is_retry = attempt > 0 - if before_attempt is not None: - can_continue = before_attempt(is_retry) - if not can_continue: - if last_exception is not None: - raise last_exception - msg = "Retry aborted before the initial attempt." - raise RuntimeError(msg) try: return func() except PdfRestError as exc: - last_exception = exc self._logger.debug( "Exception during %s attempt %d/%d: %s", operation, @@ -992,13 +888,12 @@ def _execute_with_retry( def _send_request(self, request: _RequestModel) -> Any: http_client = self._client - def before_attempt(is_retry: bool) -> bool: - return self._prepare_request_files_for_attempt(request, is_retry=is_retry) + stream_retry_checker = self._build_stream_retry_checker(request) return self._execute_with_retry( lambda: self._perform_request(http_client, request), operation=self._describe_request(request), - before_attempt=before_attempt, + should_continue=stream_retry_checker, ) def _perform_request( @@ -1205,24 +1100,13 @@ async def _execute_with_retry( func: Callable[[], Awaitable[ReturnType]], *, operation: str, - before_attempt: Callable[[bool], bool] | None = None, should_continue: Callable[[PdfRestError], bool] | None = None, ) -> ReturnType: total_attempts = self._max_retries + 1 - last_exception: PdfRestError | None = None for attempt in range(total_attempts): - is_retry = attempt > 0 - if before_attempt is not None: - can_continue = before_attempt(is_retry) - if not can_continue: - if last_exception is not None: - raise last_exception - msg = "Retry aborted before the initial attempt." - raise RuntimeError(msg) try: return await func() except PdfRestError as exc: - last_exception = exc self._logger.debug( "Exception during %s attempt %d/%d: %s", operation, @@ -1251,13 +1135,12 @@ async def _execute_with_retry( async def _send_request(self, request: _RequestModel) -> Any: http_client = self._client - def before_attempt(is_retry: bool) -> bool: - return self._prepare_request_files_for_attempt(request, is_retry=is_retry) + stream_retry_checker = self._build_stream_retry_checker(request) return await self._execute_with_retry( lambda: self._perform_request(http_client, request), operation=self._describe_request(request), - before_attempt=before_attempt, + should_continue=stream_retry_checker, ) async def _perform_request( diff --git a/src/pdfrest/exceptions.py b/src/pdfrest/exceptions.py index d7d74ea2..421c51d8 100644 --- a/src/pdfrest/exceptions.py +++ b/src/pdfrest/exceptions.py @@ -10,7 +10,9 @@ "PdfRestApiError", "PdfRestAuthenticationError", "PdfRestConfigurationError", + "PdfRestConnectTimeoutError", "PdfRestError", + "PdfRestPoolTimeoutError", "PdfRestRequestError", "PdfRestTimeoutError", "PdfRestTransportError", @@ -30,6 +32,14 @@ class PdfRestTimeoutError(PdfRestError): """Raised when a request to pdfrest exceeds the configured timeout.""" +class PdfRestConnectTimeoutError(PdfRestTimeoutError): + """Raised when the client cannot establish a connection before timeout.""" + + +class PdfRestPoolTimeoutError(PdfRestTimeoutError): + """Raised when the connection pool cannot provide a connection in time.""" + + class PdfRestTransportError(PdfRestError): """Raised when a transport-level error occurs while communicating with pdfrest.""" @@ -69,6 +79,14 @@ class PdfRestAuthenticationError(PdfRestApiError): def translate_httpx_error(exc: httpx.HTTPError) -> PdfRestError: """Convert an httpx exception into a library-specific exception.""" + if isinstance(exc, httpx.ConnectTimeout): + return PdfRestConnectTimeoutError( + str(exc) or "Connection timed out while calling pdfRest." + ) + if isinstance(exc, httpx.PoolTimeout): + return PdfRestPoolTimeoutError( + str(exc) or "Connection pool timed out while calling pdfRest." + ) if isinstance(exc, httpx.TimeoutException): return PdfRestTimeoutError( str(exc) or "Request timed out while calling pdfRest." diff --git a/tests/test_client.py b/tests/test_client.py index ce6c34d8..5f61120d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -15,6 +15,7 @@ PdfRestClient, PdfRestConfigurationError, PdfRestTimeoutError, + PdfRestTransportError, UpResponse, client as client_module, ) @@ -728,7 +729,7 @@ def handler(_: httpx.Request) -> httpx.Response: await client.up(extra_body={"unexpected": "value"}) -def test_client_rewinds_file_streams_between_retries( +def test_stream_upload_does_not_retry_on_429( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) @@ -736,15 +737,13 @@ def test_client_rewinds_file_streams_between_retries( monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) file_id = _build_file_info()["id"] - upload_attempts = {"count": 0} - payload_sizes: list[int] = [] + attempts = {"count": 0} def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/upload": - payload_sizes.append(len(request.content)) - upload_attempts["count"] += 1 - if upload_attempts["count"] == 1: - return httpx.Response(500, json={"error": "retry"}) + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(429, json={"error": "slow"}) return httpx.Response(200, json={"files": [{"id": file_id}]}) if request.url.path == f"/resource/{file_id}": return httpx.Response(200, json=_build_file_info(file_id)) @@ -753,15 +752,175 @@ def handler(request: httpx.Request) -> httpx.Response: transport = httpx.MockTransport(handler) file_stream = BytesIO(b"payload bytes") + with ( + pytest.raises(PdfRestApiError, match="slow"), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=2 + ) as client, + ): + client.files.create([("doc.pdf", file_stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +def test_stream_upload_does_not_retry_on_transport_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "boom" + raise httpx.TransportError(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"payload") + with ( + pytest.raises(PdfRestTransportError, match="boom"), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=2 + ) as client, + ): + client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +def test_stream_upload_retries_on_connect_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "connect timeout" + raise httpx.ConnectTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"payload") with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: - files = client.files.create([("doc.pdf", file_stream, "application/pdf")]) + files = client.files.create([("doc.pdf", stream, "application/pdf")]) - assert payload_sizes[0] == payload_sizes[1] > 0 - assert upload_attempts["count"] == 2 - assert len(files) == 1 + assert attempts["count"] == 2 + assert files[0].id == file_id + + +def test_stream_upload_retries_on_pool_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "pool timeout" + raise httpx.PoolTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"payload") + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 2 assert files[0].id == file_id +def test_stream_upload_does_not_retry_on_server_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + return httpx.Response(500, json={"error": "retry"}) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"payload") + with ( + pytest.raises(PdfRestApiError, match="retry"), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=2 + ) as client, + ): + client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +def test_stream_upload_does_not_retry_on_read_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "read timeout" + raise httpx.ReadTimeout(msg) + return httpx.Response(200, json={"files": []}) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"payload") + with ( + pytest.raises(PdfRestTimeoutError, match="timeout"), + PdfRestClient( + api_key=VALID_API_KEY, transport=transport, max_retries=2 + ) as client, + ): + client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + def test_client_retry_fails_for_non_seekable_stream( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -792,7 +951,7 @@ def handler(request: httpx.Request) -> httpx.Response: @pytest.mark.asyncio -async def test_async_client_rewinds_file_streams_between_retries( +async def test_async_stream_upload_retries_on_429( monkeypatch: pytest.MonkeyPatch, ) -> None: monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) @@ -804,15 +963,51 @@ async def fake_sleep(_delay: float) -> None: monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) file_id = _build_file_info()["id"] - upload_attempts = {"count": 0} - payload_sizes: list[int] = [] + attempts = {"count": 0} async def handler(request: httpx.Request) -> httpx.Response: if request.url.path == "/upload": - payload_sizes.append(len(request.content)) - upload_attempts["count"] += 1 - if upload_attempts["count"] == 1: - return httpx.Response(503, json={"error": "retry"}) + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(429, json={"error": "slow"}) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"async payload") + with pytest.raises(PdfRestApiError, match="slow"): + async with AsyncPdfRestClient( + api_key=ASYNC_API_KEY, transport=transport, max_retries=2 + ) as client: + await client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +@pytest.mark.asyncio +async def test_async_stream_upload_does_not_retry_on_transport_error( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "boom" + raise httpx.TransportError(msg) return httpx.Response(200, json={"files": [{"id": file_id}]}) if request.url.path == f"/resource/{file_id}": return httpx.Response(200, json=_build_file_info(file_id)) @@ -820,16 +1015,127 @@ async def handler(request: httpx.Request) -> httpx.Response: raise AssertionError(msg) transport = httpx.MockTransport(handler) - file_stream = BytesIO(b"async payload") + stream = BytesIO(b"async payload") + with pytest.raises(PdfRestTransportError, match="boom"): + async with AsyncPdfRestClient( + api_key=ASYNC_API_KEY, transport=transport, max_retries=2 + ) as client: + await client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + +@pytest.mark.asyncio +async def test_async_stream_upload_retries_on_connect_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "connect timeout" + raise httpx.ConnectTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"async payload") async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: - files = await client.files.create([("doc.pdf", file_stream, "application/pdf")]) + files = await client.files.create([("doc.pdf", stream, "application/pdf")]) - assert payload_sizes[0] == payload_sizes[1] > 0 - assert upload_attempts["count"] == 2 + assert attempts["count"] == 2 assert len(files) == 1 assert files[0].id == file_id +@pytest.mark.asyncio +async def test_async_stream_upload_retries_on_pool_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "pool timeout" + raise httpx.PoolTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"async payload") + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +@pytest.mark.asyncio +async def test_async_stream_upload_does_not_retry_on_read_timeout( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + async def fake_sleep(_delay: float) -> None: + return None + + monkeypatch.setattr(client_module.asyncio, "sleep", fake_sleep) + + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "read timeout" + raise httpx.ReadTimeout(msg) + return httpx.Response(200, json={"files": []}) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + stream = BytesIO(b"async payload") + with pytest.raises(PdfRestTimeoutError, match="timeout"): + async with AsyncPdfRestClient( + api_key=ASYNC_API_KEY, + transport=transport, + max_retries=2, + ) as client: + await client.files.create([("doc.pdf", stream, "application/pdf")]) + + assert attempts["count"] == 1 + + @pytest.mark.asyncio async def test_async_retry_fails_for_non_seekable_stream( monkeypatch: pytest.MonkeyPatch, From b670bec4d5a8e369b7ce35f1067bab5b9c2b85b7 Mon Sep 17 00:00:00 2001 From: "Kevin A. Mitchell" Date: Mon, 1 Dec 2025 15:53:10 -0600 Subject: [PATCH 5/5] create_from_paths: Allow complete retry - Introduced `send_request_once` and `run_with_retry` for consistent request execution and retry handling. - Implemented retries for `create_from_paths` in both sync and async clients, covering 429, transport, and timeout errors. - Added concurrency control using semaphores for async file info fetches after uploads. - Updated tests to validate retry behavior, including various error scenarios and proper retry counts. Assisted-by: Codex --- src/pdfrest/client.py | 158 +++++++++++++++++++------ tests/test_client.py | 261 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 381 insertions(+), 38 deletions(-) diff --git a/src/pdfrest/client.py b/src/pdfrest/client.py index 9de8bfb7..7cec145c 100644 --- a/src/pdfrest/client.py +++ b/src/pdfrest/client.py @@ -896,6 +896,22 @@ def _send_request(self, request: _RequestModel) -> Any: should_continue=stream_retry_checker, ) + def send_request_once(self, request: _RequestModel) -> Any: + return self._perform_request(self._client, request) + + def run_with_retry( + self, + func: Callable[[], ReturnType], + *, + operation: str, + should_continue: Callable[[PdfRestError], bool] | None = None, + ) -> ReturnType: + return self._execute_with_retry( + func, + operation=operation, + should_continue=should_continue, + ) + def _perform_request( self, http_client: httpx.Client, request: _RequestModel ) -> Any: @@ -1143,6 +1159,22 @@ async def _send_request(self, request: _RequestModel) -> Any: should_continue=stream_retry_checker, ) + async def send_request_once(self, request: _RequestModel) -> Any: + return await self._perform_request(self._client, request) + + async def run_with_retry( + self, + func: Callable[[], Awaitable[ReturnType]], + *, + operation: str, + should_continue: Callable[[PdfRestError], bool] | None = None, + ) -> ReturnType: + return await self._execute_with_retry( + func, + operation=operation, + should_continue=should_continue, + ) + async def _perform_request( self, http_client: httpx.AsyncClient, request: _RequestModel ) -> Any: @@ -1441,25 +1473,44 @@ def create_from_paths( closed once the request completes. """ normalized_paths = _normalize_path_inputs(file_paths) - - with ExitStack() as stack: - upload_specs: list[FileTypes] = [] - for spec in normalized_paths: - path, content_type, headers = _parse_path_spec(spec) - file_obj = stack.enter_context(path.open("rb")) - filename = path.name - if headers: - upload_specs.append((filename, file_obj, content_type, headers)) - elif content_type is not None: - upload_specs.append((filename, file_obj, content_type)) - else: - upload_specs.append((filename, file_obj)) - return self.create( - upload_specs, - extra_query=extra_query, - extra_headers=extra_headers, - timeout=timeout, - ) + path_specs = [_parse_path_spec(spec) for spec in normalized_paths] + + def attempt() -> list[PdfRestFile]: + with ExitStack() as stack: + upload_specs: list[tuple[str, FileTypes]] = [] + for path, content_type, headers in path_specs: + file_obj = stack.enter_context(path.open("rb")) + filename = path.name + normalized_spec: FileTypes + if headers: + normalized_spec = (filename, file_obj, content_type, headers) + elif content_type is not None: + normalized_spec = (filename, file_obj, content_type) + else: + normalized_spec = (filename, file_obj) + upload_specs.append((FILE_UPLOAD_FIELD_NAME, normalized_spec)) + + request = self._client.prepare_request( + "POST", + "/upload", + files=upload_specs, + extra_query=extra_query, + extra_headers=extra_headers, + timeout=timeout, + ) + payload = self._client.send_request_once(request) + file_ids = _extract_uploaded_file_ids(payload) + return [ + self._client.fetch_file_info( + file_id, + extra_query=extra_query, + extra_headers=extra_headers, + timeout=timeout, + ) + for file_id in file_ids + ] + + return self._client.run_with_retry(attempt, operation="POST /upload (paths)") def create_from_urls( self, @@ -1681,25 +1732,56 @@ async def create_from_paths( closed once the request completes. """ normalized_paths = _normalize_path_inputs(file_paths) - - with ExitStack() as stack: - upload_specs: list[FileTypes] = [] - for spec in normalized_paths: - path, content_type, headers = _parse_path_spec(spec) - file_obj = stack.enter_context(path.open("rb")) - filename = path.name - if headers: - upload_specs.append((filename, file_obj, content_type, headers)) - elif content_type is not None: - upload_specs.append((filename, file_obj, content_type)) - else: - upload_specs.append((filename, file_obj)) - return await self.create( - upload_specs, - extra_query=extra_query, - extra_headers=extra_headers, - timeout=timeout, - ) + path_specs = [_parse_path_spec(spec) for spec in normalized_paths] + + async def attempt() -> list[PdfRestFile]: + with ExitStack() as stack: + upload_specs: list[tuple[str, FileTypes]] = [] + for path, content_type, headers in path_specs: + file_obj = stack.enter_context(path.open("rb")) + filename = path.name + normalized_spec: FileTypes + if headers: + normalized_spec = (filename, file_obj, content_type, headers) + elif content_type is not None: + normalized_spec = (filename, file_obj, content_type) + else: + normalized_spec = (filename, file_obj) + upload_specs.append((FILE_UPLOAD_FIELD_NAME, normalized_spec)) + + request = self._client.prepare_request( + "POST", + "/upload", + files=upload_specs, + extra_query=extra_query, + extra_headers=extra_headers, + timeout=timeout, + ) + payload = await self._client.send_request_once(request) + file_ids = _extract_uploaded_file_ids(payload) + results: list[PdfRestFile] = [] + semaphore = asyncio.Semaphore(self._concurrency_limit) + + async def throttled_fetch(file_id: str) -> PdfRestFile: + async with semaphore: + return await self._client.fetch_file_info( + file_id, + extra_query=extra_query, + extra_headers=extra_headers, + timeout=timeout, + ) + + if file_ids: + results = list( + await asyncio.gather( + *(throttled_fetch(fid) for fid in file_ids) + ) + ) + return results + + return await self._client.run_with_retry( + attempt, operation="POST /upload (paths)" + ) async def create_from_urls( self, diff --git a/tests/test_client.py b/tests/test_client.py index 5f61120d..3c21bd8d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -3,6 +3,7 @@ from datetime import date, datetime, timedelta, timezone from email.utils import format_datetime from io import BytesIO, UnsupportedOperation +from pathlib import Path from typing import Any import httpx @@ -889,6 +890,139 @@ def handler(request: httpx.Request) -> httpx.Response: assert attempts["count"] == 1 +def _write_temp_file(tmp_path: Path, content: bytes = b"payload") -> Path: + path = tmp_path / "doc.pdf" + path.write_bytes(content) + return path + + +def test_create_from_paths_retries_on_429( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(429, json={"error": "slow"}) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +def test_create_from_paths_retries_on_transport_error( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "boom" + raise httpx.TransportError(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +def test_create_from_paths_retries_on_connect_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "connect timeout" + raise httpx.ConnectTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +def test_create_from_paths_retries_on_pool_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", VALID_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + monkeypatch.setattr(client_module.time, "sleep", lambda _delay: None) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "pool timeout" + raise httpx.PoolTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + with PdfRestClient(api_key=VALID_API_KEY, transport=transport) as client: + files = client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + def test_stream_upload_does_not_retry_on_read_timeout( monkeypatch: pytest.MonkeyPatch, ) -> None: @@ -1170,6 +1304,133 @@ async def handler(request: httpx.Request) -> httpx.Response: assert attempts["count"] == 1 +@pytest.mark.asyncio +async def test_async_create_from_paths_retries_on_429( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + return httpx.Response(429, json={"error": "slow"}) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +@pytest.mark.asyncio +async def test_async_create_from_paths_retries_on_transport_error( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "boom" + raise httpx.TransportError(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +@pytest.mark.asyncio +async def test_async_create_from_paths_retries_on_connect_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "connect timeout" + raise httpx.ConnectTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + +@pytest.mark.asyncio +async def test_async_create_from_paths_retries_on_pool_timeout( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("PDFREST_API_KEY", ASYNC_API_KEY) + monkeypatch.setattr(client_module.random, "uniform", lambda *_: 0.0) + + upload_path = _write_temp_file(tmp_path) + file_id = _build_file_info()["id"] + attempts = {"count": 0} + + async def handler(request: httpx.Request) -> httpx.Response: + if request.url.path == "/upload": + attempts["count"] += 1 + if attempts["count"] == 1: + msg = "pool timeout" + raise httpx.PoolTimeout(msg) + return httpx.Response(200, json={"files": [{"id": file_id}]}) + if request.url.path == f"/resource/{file_id}": + return httpx.Response(200, json=_build_file_info(file_id)) + msg = f"Unexpected path {request.url.path}" + raise AssertionError(msg) + + transport = httpx.MockTransport(handler) + async with AsyncPdfRestClient(api_key=ASYNC_API_KEY, transport=transport) as client: + files = await client.files.create_from_paths(upload_path) + + assert attempts["count"] == 2 + assert len(files) == 1 + assert files[0].id == file_id + + def test_live_client_up(pdfrest_api_key: str, pdfrest_live_base_url: str) -> None: with PdfRestClient( api_key=pdfrest_api_key, base_url=pdfrest_live_base_url