Skip to content

Commit 5d46e02

Browse files
OmarAlJarrahclaude
andcommitted
fix(transport): correct timeout classification and release connections on error
Classify connect-phase and read-phase timeouts consistently across the adapters: a read timeout maps to a response timeout and a connect timeout to a request timeout, so the retry policy charges them to the right budget and never auto-retries a non-idempotent request whose response may already be in flight. Map the asyncio transport's read-phase timeouts and stream errors into the SDK error hierarchy instead of leaking raw stdlib exceptions, and stop overwriting a caller-supplied Host header. Release the underlying connection when a response carries a status code outside the known set before raising, so an unrecognised code no longer leaks a pooled connection, and make the aiohttp client refuse use after close instead of silently opening a fresh session. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 3d26c0b commit 5d46e02

12 files changed

Lines changed: 457 additions & 48 deletions

File tree

packages/dexpace-sdk-http-aiohttp/src/dexpace/sdk/http/aiohttp/client.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Copyright (c) 2026 dexpace and Omar Aljarrah.
22
# Licensed under the MIT License. See LICENSE.md in the repository root for details.
33

4-
"""``AsyncHttpClient`` implementation backed by :mod:`aiohttp`.
4+
"""``AsyncHttpClient`` implementation backed by `aiohttp`.
55
66
``aiohttp`` exposes only an async API; this package therefore ships an
77
async transport without a sync twin. The adapter is a thin pass-through:
88
99
- Request bodies are forwarded to ``aiohttp`` via an async-iterable shim
10-
over :meth:`RequestBody.iter_bytes`, so uploads stream without buffering
10+
over `RequestBody.iter_bytes`, so uploads stream without buffering
1111
the full payload into memory.
1212
- Response content streams through ``aiohttp.StreamReader``; we wrap it as
13-
an :class:`AsyncResponseBody` so the SDK's body lifecycle (deferred
13+
an `AsyncResponseBody` so the SDK's body lifecycle (deferred
1414
read, deterministic close) is preserved.
1515
- Transport exceptions are mapped to the SDK's typed error hierarchy.
1616
@@ -27,6 +27,7 @@
2727
import aiohttp
2828
from dexpace.sdk.core.errors import (
2929
ServiceRequestError,
30+
ServiceRequestTimeoutError,
3031
ServiceResponseError,
3132
ServiceResponseTimeoutError,
3233
)
@@ -45,7 +46,7 @@
4546

4647

4748
class AiohttpHttpClient:
48-
"""Async ``HttpClient`` over an :class:`aiohttp.ClientSession`.
49+
"""Async ``HttpClient`` over an `aiohttp.ClientSession`.
4950
5051
The client owns the session by default and releases it on ``aclose``.
5152
Pass an existing ``session`` to share connection pooling with other
@@ -57,7 +58,7 @@ class AiohttpHttpClient:
5758
timeout entirely (not recommended).
5859
"""
5960

60-
__slots__ = ("_owns_session", "_session", "_session_factory", "timeout")
61+
__slots__ = ("_closed", "_owns_session", "_session", "timeout")
6162

6263
def __init__(
6364
self,
@@ -70,9 +71,11 @@ def __init__(
7071
self.timeout = timeout
7172
self._session = session
7273
self._owns_session = session is None
73-
self._session_factory = aiohttp.ClientSession
74+
self._closed = False
7475

7576
async def execute(self, request: Request) -> AsyncResponse:
77+
if self._closed:
78+
raise ServiceRequestError("AiohttpHttpClient is closed")
7679
session = await self._ensure_session()
7780
timeout_cfg = (
7881
aiohttp.ClientTimeout(total=self.timeout) if self.timeout is not None else None
@@ -90,6 +93,10 @@ async def execute(self, request: Request) -> AsyncResponse:
9093
aio_response = await ctx
9194
except aiohttp.ClientConnectorError as err:
9295
raise ServiceRequestError(f"Connect failed: {err}", error=err) from err
96+
except aiohttp.ConnectionTimeoutError as err:
97+
raise ServiceRequestTimeoutError(
98+
f"Connection to {request.url} timed out", error=err
99+
) from err
93100
except TimeoutError as err:
94101
raise ServiceResponseTimeoutError(
95102
f"Request to {request.url} timed out", error=err
@@ -101,6 +108,9 @@ async def execute(self, request: Request) -> AsyncResponse:
101108
return _wrap_response(request, aio_response)
102109

103110
async def aclose(self) -> None:
111+
if self._closed:
112+
return
113+
self._closed = True
104114
if self._session is not None and self._owns_session:
105115
await self._session.close()
106116
self._session = None
@@ -119,7 +129,8 @@ async def __aexit__(
119129

120130
async def _ensure_session(self) -> aiohttp.ClientSession:
121131
if self._session is None:
122-
self._session = self._session_factory()
132+
# Construct lazily so the session binds the running event loop.
133+
self._session = aiohttp.ClientSession()
123134
self._owns_session = True
124135
return self._session
125136

@@ -159,6 +170,9 @@ def _wrap_response(request: Request, aio_response: aiohttp.ClientResponse) -> As
159170
try:
160171
status = Status(aio_response.status)
161172
except ValueError as err:
173+
# Release the handle before bailing so the connection returns to the
174+
# pool instead of leaking (aiohttp's release() is synchronous).
175+
aio_response.release()
162176
raise ServiceResponseError(
163177
f"Unknown status code: {aio_response.status}", error=err
164178
) from err
@@ -179,7 +193,7 @@ def _wrap_response(request: Request, aio_response: aiohttp.ClientResponse) -> As
179193

180194

181195
class _StreamReaderAdapter:
182-
"""``SupportsAsyncRead`` adapter over an :class:`aiohttp.ClientResponse`.
196+
"""``SupportsAsyncRead`` adapter over an `aiohttp.ClientResponse`.
183197
184198
Owns the response handle; closing the adapter releases the connection
185199
back to the pool.

packages/dexpace-sdk-http-aiohttp/tests/test_aiohttp_client.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@
1313

1414
from dexpace.sdk.core.errors import (
1515
ServiceRequestError,
16+
ServiceRequestTimeoutError,
17+
ServiceResponseError,
1618
ServiceResponseTimeoutError,
1719
)
1820
from dexpace.sdk.core.http.common import Url
1921
from dexpace.sdk.core.http.request import Method, Request
2022
from dexpace.sdk.core.http.request.request_body import RequestBody
2123
from dexpace.sdk.core.http.response import Status
2224
from dexpace.sdk.http.aiohttp import AiohttpHttpClient
25+
from dexpace.sdk.http.aiohttp.client import _wrap_response
2326

2427
# ---------------------------------------------------------------------- handlers
2528

@@ -170,3 +173,85 @@ async def test_content_length_extracted_from_response(base_url: str) -> None:
170173
assert response.body is not None
171174
# /ok returns a small JSON; Content-Length should be set by aiohttp.
172175
assert response.body.content_length() > 0
176+
177+
178+
# ----------------------------------------------------------- unknown status (M21)
179+
180+
181+
class _FakeAioResponse:
182+
"""Minimal stand-in for an ``aiohttp.ClientResponse`` carrying a raw status."""
183+
184+
def __init__(self, status: int) -> None:
185+
self.status = status
186+
self.released = False
187+
188+
def release(self) -> None: # aiohttp's release() is synchronous
189+
self.released = True
190+
191+
192+
def test_unknown_status_releases_connection_and_raises() -> None:
193+
"""An unregistered status code must release the response before raising."""
194+
request = Request(method=Method.GET, url=Url.parse("http://example.test/"))
195+
fake = _FakeAioResponse(520) # Cloudflare 'Web Server Returned an Unknown Error'
196+
197+
with pytest.raises(ServiceResponseError) as exc_info:
198+
_wrap_response(request, fake) # type: ignore[arg-type]
199+
200+
assert fake.released, "connection leaked: release() was not called"
201+
assert "520" in str(exc_info.value)
202+
203+
204+
# ----------------------------------------------------------- post-close (M22)
205+
206+
207+
async def test_execute_after_aclose_raises() -> None:
208+
"""A closed client must not resurrect; execute() raises ServiceRequestError."""
209+
client = AiohttpHttpClient(timeout=5.0)
210+
await client.aclose()
211+
with pytest.raises(ServiceRequestError, match="closed"):
212+
await client.execute(Request(method=Method.GET, url=Url.parse("http://example.test/")))
213+
214+
215+
async def test_aclose_is_idempotent() -> None:
216+
"""Calling aclose() twice is a no-op and stays in the closed state."""
217+
client = AiohttpHttpClient(timeout=5.0)
218+
await client.aclose()
219+
await client.aclose() # must not raise
220+
with pytest.raises(ServiceRequestError, match="closed"):
221+
await client.execute(Request(method=Method.GET, url=Url.parse("http://example.test/")))
222+
223+
224+
# ----------------------------------------------------- connect timeout (L36)
225+
226+
227+
class _ConnectTimeoutSession:
228+
"""Stub session whose ``request`` raises aiohttp's connect-phase timeout.
229+
230+
aiohttp only surfaces ``ConnectionTimeoutError`` for a connect-scoped
231+
timeout (``connect=`` / ``sock_connect=``); a plain ``total=`` timeout that
232+
happens to expire mid-connect raises a bare ``TimeoutError`` instead, which
233+
is indistinguishable from a read-phase timeout. We therefore drive the
234+
branch directly with the exception aiohttp actually raises for a connect
235+
timeout, keeping the test hermetic.
236+
"""
237+
238+
def request(self, **_kwargs: object) -> _ConnectTimeoutSession:
239+
return self
240+
241+
def __await__(self) -> object:
242+
raise aiohttp.ConnectionTimeoutError("connect timed out")
243+
yield # pragma: no cover - makes this an awaitable generator
244+
245+
246+
async def test_connect_timeout_maps_to_request_timeout() -> None:
247+
"""A connect-phase timeout maps to ServiceRequestTimeoutError, not a response timeout."""
248+
client = AiohttpHttpClient(timeout=5.0, session=_ConnectTimeoutSession()) # type: ignore[arg-type]
249+
with pytest.raises(ServiceRequestTimeoutError):
250+
await client.execute(Request(method=Method.GET, url=Url.parse("http://example.test/")))
251+
252+
253+
async def test_total_timeout_maps_to_response_timeout(base_url: str) -> None:
254+
"""A bare total timeout (read phase) still maps to ServiceResponseTimeoutError."""
255+
async with AiohttpHttpClient(timeout=0.25) as client:
256+
with pytest.raises(ServiceResponseTimeoutError):
257+
await client.execute(Request(method=Method.GET, url=Url.parse(f"{base_url}/slow")))

packages/dexpace-sdk-http-httpx/src/dexpace/sdk/http/httpx/async_.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
# Copyright (c) 2026 dexpace and Omar Aljarrah.
22
# Licensed under the MIT License. See LICENSE.md in the repository root for details.
33

4-
"""Async ``HttpClient`` implementation built on :class:`httpx.AsyncClient`.
4+
"""Async ``HttpClient`` implementation built on `httpx.AsyncClient`.
55
66
Supports streaming uploads and downloads, per-phase timeouts (connect /
7-
read / write / pool), HTTP/2 (opt-in via :mod:`httpx`), and proxies.
7+
read / write / pool), HTTP/2 (opt-in via `httpx`), and proxies.
88
Production-grade alternative to the asyncio reference client shipped in
99
``dexpace-sdk-http-stdlib``.
1010
11-
The transport delegates to :class:`httpx.AsyncClient`, streaming the
12-
request body via :meth:`AsyncRequestBody.aiter_bytes` (when provided) or
11+
The transport delegates to `httpx.AsyncClient`, streaming the
12+
request body via `AsyncRequestBody.aiter_bytes` (when provided) or
1313
the sync ``RequestBody.iter_bytes`` (drained eagerly into bytes; httpx
1414
accepts both). The response exposes ``AsyncResponseBody.from_async_stream``
1515
wrapping httpx's ``aiter_bytes`` iterator.
@@ -40,11 +40,11 @@
4040

4141

4242
class AsyncHttpxHttpClient:
43-
"""Async transport over :class:`httpx.AsyncClient`.
43+
"""Async transport over `httpx.AsyncClient`.
4444
4545
Per-phase timeouts (``connect_timeout``, ``read_timeout``,
4646
``write_timeout``, ``pool_timeout``) are forwarded to
47-
:class:`httpx.Timeout`. ``None`` disables the phase's timeout.
47+
`httpx.Timeout`. ``None`` disables the phase's timeout.
4848
4949
Args:
5050
connect_timeout: Seconds allowed for connection establishment.
@@ -54,9 +54,9 @@ class AsyncHttpxHttpClient:
5454
request body.
5555
pool_timeout: Seconds allowed to acquire a connection from the
5656
pool.
57-
transport: Optional :class:`httpx.AsyncBaseTransport`; the primary
57+
transport: Optional `httpx.AsyncBaseTransport`; the primary
5858
extension hook for tests (``httpx.MockTransport``).
59-
client: Optional pre-built :class:`httpx.AsyncClient` — overrides
59+
client: Optional pre-built `httpx.AsyncClient` — overrides
6060
the timeout / transport kwargs entirely. Ownership transfers
6161
to this transport.
6262
"""

packages/dexpace-sdk-http-httpx/src/dexpace/sdk/http/httpx/sync.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
# Copyright (c) 2026 dexpace and Omar Aljarrah.
22
# Licensed under the MIT License. See LICENSE.md in the repository root for details.
33

4-
"""Synchronous ``HttpClient`` implementation built on :mod:`httpx`.
4+
"""Synchronous ``HttpClient`` implementation built on `httpx`.
55
66
Supports streaming uploads and downloads, per-phase timeouts (connect /
7-
read / write / pool), HTTP/2 (opt-in via :mod:`httpx`), and proxies.
7+
read / write / pool), HTTP/2 (opt-in via `httpx`), and proxies.
88
Production-grade alternative to the urllib reference client shipped in
99
``dexpace-sdk-http-stdlib``.
1010
11-
The transport delegates to :class:`httpx.Client`, streaming the request
12-
body via :meth:`RequestBody.iter_bytes` and exposing the response as a
13-
:class:`ResponseBody` whose ``iter_bytes`` walks the httpx response's own
11+
The transport delegates to `httpx.Client`, streaming the request
12+
body via `RequestBody.iter_bytes` and exposing the response as a
13+
`ResponseBody` whose ``iter_bytes`` walks the httpx response's own
1414
``iter_bytes`` iterator. Closing the SDK response closes the underlying
1515
httpx response and releases the connection back to the pool.
1616
"""
@@ -40,15 +40,15 @@
4040

4141

4242
class HttpxHttpClient:
43-
"""Synchronous transport over :class:`httpx.Client`.
43+
"""Synchronous transport over `httpx.Client`.
4444
4545
Behaves as a structural ``HttpClient`` (single ``execute`` method) plus
4646
a context-manager surface so a ``Pipeline`` can take ownership and call
4747
``close`` on exit.
4848
4949
Per-phase timeouts (``connect_timeout``, ``read_timeout``,
5050
``write_timeout``, ``pool_timeout``) are forwarded to
51-
:class:`httpx.Timeout`. ``None`` disables the phase's timeout.
51+
`httpx.Timeout`. ``None`` disables the phase's timeout.
5252
5353
Args:
5454
connect_timeout: Seconds allowed for connection establishment.
@@ -58,9 +58,9 @@ class HttpxHttpClient:
5858
request body.
5959
pool_timeout: Seconds allowed to acquire a connection from the
6060
pool.
61-
transport: Optional :class:`httpx.BaseTransport`; the primary
61+
transport: Optional `httpx.BaseTransport`; the primary
6262
extension hook for tests (``httpx.MockTransport``).
63-
client: Optional pre-built :class:`httpx.Client` — overrides the
63+
client: Optional pre-built `httpx.Client` — overrides the
6464
timeout / transport kwargs entirely. Ownership transfers to
6565
this transport.
6666
"""

packages/dexpace-sdk-http-httpx/tests/test_async_httpx_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2026 dexpace and Omar Aljarrah.
22
# Licensed under the MIT License. See LICENSE.md in the repository root for details.
33

4-
"""Tests for ``AsyncHttpxHttpClient`` using :class:`httpx.MockTransport`."""
4+
"""Tests for ``AsyncHttpxHttpClient`` using `httpx.MockTransport`."""
55

66
from __future__ import annotations
77

packages/dexpace-sdk-http-httpx/tests/test_httpx_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Copyright (c) 2026 dexpace and Omar Aljarrah.
22
# Licensed under the MIT License. See LICENSE.md in the repository root for details.
33

4-
"""Tests for ``HttpxHttpClient`` using :class:`httpx.MockTransport`."""
4+
"""Tests for ``HttpxHttpClient`` using `httpx.MockTransport`."""
55

66
from __future__ import annotations
77

packages/dexpace-sdk-http-requests/src/dexpace/sdk/http/requests/client.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
66
``RequestsHttpClient`` wraps a ``requests.Session`` configured with
77
``stream=True`` so response bodies are read lazily and surfaced through the
8-
SDK's :class:`ResponseBody` streaming API. Request bodies are produced via
9-
:meth:`RequestBody.iter_bytes` in 8 KiB chunks.
8+
SDK's `ResponseBody` streaming API. Request bodies are produced via
9+
`RequestBody.iter_bytes` in 8 KiB chunks.
1010
1111
Exception mapping (``requests`` -> SDK):
1212
13-
- ``requests.ConnectTimeout`` -> :class:`ServiceRequestTimeoutError`
14-
- ``requests.ReadTimeout`` -> :class:`ServiceResponseTimeoutError`
15-
- ``requests.ConnectionError`` -> :class:`ServiceRequestError`
16-
- ``requests.RequestException`` (catch-all) -> :class:`ServiceRequestError`
13+
- ``requests.ConnectTimeout`` -> `ServiceRequestTimeoutError`
14+
- ``requests.ReadTimeout`` -> `ServiceResponseTimeoutError`
15+
- ``requests.ConnectionError`` -> `ServiceRequestError`
16+
- ``requests.RequestException`` (catch-all) -> `ServiceRequestError`
1717
"""
1818

1919
from __future__ import annotations
@@ -50,7 +50,7 @@ class RequestsHttpClient:
5050
a context-manager surface so a ``Pipeline`` can take ownership and call
5151
``close`` on exit. Each call sends one ``requests`` request with
5252
``stream=True`` and wraps the streamed response into a
53-
:class:`ResponseBody`.
53+
`ResponseBody`.
5454
5555
Attributes:
5656
timeout: Single timeout in seconds applied to ``Session.request``;
@@ -145,6 +145,7 @@ def _build_response(request: Request, raw: requests.Response) -> Response:
145145
try:
146146
status = Status(raw.status_code)
147147
except ValueError as err:
148+
raw.close()
148149
raise ServiceResponseError(f"Unknown status code: {raw.status_code}", error=err) from err
149150
headers = Headers(list(raw.headers.items()))
150151
body = ResponseBody.from_stream(_IterContentStream(raw)) # type: ignore[arg-type]
@@ -162,7 +163,7 @@ def _build_response(request: Request, raw: requests.Response) -> Response:
162163
class _IterContentStream:
163164
"""Adapter that exposes ``requests.Response.iter_content`` as a stream.
164165
165-
:class:`ResponseBody.from_stream` calls ``read(chunk_size)`` and
166+
`ResponseBody.from_stream` calls ``read(chunk_size)`` and
166167
``close()`` on its argument. ``requests`` doesn't expose a file-like
167168
object that honours chunk-size hints once decoded, but ``iter_content``
168169
does — this adapter buffers what the iterator yields and serves it in
@@ -182,7 +183,7 @@ def read(self, size: int = -1) -> bytes:
182183
return b""
183184
if self._iter is None:
184185
self._iter = self._response.iter_content(chunk_size=_CHUNK_SIZE)
185-
if size is None or size < 0:
186+
if size < 0:
186187
for chunk in self._iter:
187188
if chunk:
188189
self._buf.extend(chunk)

0 commit comments

Comments
 (0)