diff --git a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java
index 6bdb2248d..766797a87 100644
--- a/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java
+++ b/client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java
@@ -321,6 +321,16 @@ default int getHttp2MaxConcurrentStreams() {
return -1;
}
+ /**
+ * @return the maximum number of bytes a single HTTP/2 response body may decompress to before the stream
+ * is failed. Guards against decompression-bomb responses — a tiny compressed body that inflates
+ * to gigabytes and can OOM the client (the limit is enforced transparently when automatic
+ * decompression is enabled). {@code 0} disables the limit. Defaults to 256 MiB.
+ */
+ default long getHttp2MaxDecompressedResponseSize() {
+ return 256L * 1024 * 1024;
+ }
+
/**
* @return the interval between HTTP/2 PING keepalive frames, {@link Duration#ZERO} disables pinging
*/
diff --git a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java
index 74f9937b5..4f1ce426f 100644
--- a/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java
+++ b/client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java
@@ -107,6 +107,7 @@
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2HeaderTableSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2InitialWindowSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxConcurrentStreams;
+import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxDecompressedResponseSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxFrameSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxHeaderListSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2PingInterval;
@@ -181,6 +182,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final int http2HeaderTableSize;
private final int http2MaxHeaderListSize;
private final int http2MaxConcurrentStreams;
+ private final long http2MaxDecompressedResponseSize;
private final Duration http2PingInterval;
private final boolean http2CleartextEnabled;
@@ -277,6 +279,7 @@ private DefaultAsyncHttpClientConfig(// http
int http2HeaderTableSize,
int http2MaxHeaderListSize,
int http2MaxConcurrentStreams,
+ long http2MaxDecompressedResponseSize,
Duration http2PingInterval,
boolean http2CleartextEnabled,
@@ -381,6 +384,7 @@ private DefaultAsyncHttpClientConfig(// http
this.http2HeaderTableSize = http2HeaderTableSize;
this.http2MaxHeaderListSize = http2MaxHeaderListSize;
this.http2MaxConcurrentStreams = http2MaxConcurrentStreams;
+ this.http2MaxDecompressedResponseSize = http2MaxDecompressedResponseSize;
this.http2PingInterval = http2PingInterval;
this.http2CleartextEnabled = http2CleartextEnabled;
@@ -682,6 +686,11 @@ public int getHttp2MaxConcurrentStreams() {
return http2MaxConcurrentStreams;
}
+ @Override
+ public long getHttp2MaxDecompressedResponseSize() {
+ return http2MaxDecompressedResponseSize;
+ }
+
@Override
public Duration getHttp2PingInterval() {
return http2PingInterval;
@@ -942,6 +951,7 @@ public static class Builder {
private int http2HeaderTableSize = defaultHttp2HeaderTableSize();
private int http2MaxHeaderListSize = defaultHttp2MaxHeaderListSize();
private int http2MaxConcurrentStreams = defaultHttp2MaxConcurrentStreams();
+ private long http2MaxDecompressedResponseSize = defaultHttp2MaxDecompressedResponseSize();
private Duration http2PingInterval = defaultHttp2PingInterval();
private boolean http2CleartextEnabled = defaultHttp2CleartextEnabled();
@@ -1043,6 +1053,7 @@ public Builder(AsyncHttpClientConfig config) {
http2HeaderTableSize = config.getHttp2HeaderTableSize();
http2MaxHeaderListSize = config.getHttp2MaxHeaderListSize();
http2MaxConcurrentStreams = config.getHttp2MaxConcurrentStreams();
+ http2MaxDecompressedResponseSize = config.getHttp2MaxDecompressedResponseSize();
http2PingInterval = config.getHttp2PingInterval();
http2CleartextEnabled = config.isHttp2CleartextEnabled();
@@ -1391,6 +1402,11 @@ public Builder setHttp2MaxConcurrentStreams(int http2MaxConcurrentStreams) {
return this;
}
+ public Builder setHttp2MaxDecompressedResponseSize(long http2MaxDecompressedResponseSize) {
+ this.http2MaxDecompressedResponseSize = http2MaxDecompressedResponseSize;
+ return this;
+ }
+
public Builder setHttp2PingInterval(Duration http2PingInterval) {
this.http2PingInterval = http2PingInterval;
return this;
@@ -1658,6 +1674,7 @@ public DefaultAsyncHttpClientConfig build() {
http2HeaderTableSize,
http2MaxHeaderListSize,
http2MaxConcurrentStreams,
+ http2MaxDecompressedResponseSize,
http2PingInterval,
http2CleartextEnabled,
requestFilters.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(requestFilters),
diff --git a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java
index 29bbaa670..b185cbdc9 100644
--- a/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java
+++ b/client/src/main/java/org/asynchttpclient/RequestBuilderBase.java
@@ -478,6 +478,18 @@ public T setBody(ByteBuffer data) {
return asDerivedType();
}
+ /**
+ * Sets the request body from a Netty {@link ByteBuf}.
+ *
+ * Ownership: the caller retains ownership of {@code data}. AsyncHttpClient sends a
+ * retained duplicate per attempt (so redirects, auth replays and retries each get their own reference and
+ * the body survives across them) and never releases {@code data} itself. The caller is responsible for
+ * releasing {@code data} once the request has completed. (This differs from older releases, which consumed
+ * and released the buffer on the first send — and double-freed it on any retry.)
+ *
+ * @param data the request body; the caller keeps ownership and must release it after the request completes
+ * @return this builder
+ */
public T setBody(ByteBuf data) {
resetBody();
byteBufData = data;
diff --git a/client/src/main/java/org/asynchttpclient/SslEngineFactory.java b/client/src/main/java/org/asynchttpclient/SslEngineFactory.java
index 15ec9748e..54b91e5fe 100644
--- a/client/src/main/java/org/asynchttpclient/SslEngineFactory.java
+++ b/client/src/main/java/org/asynchttpclient/SslEngineFactory.java
@@ -31,6 +31,26 @@ public interface SslEngineFactory {
*/
SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort);
+ /**
+ * Creates a new {@link SSLEngine}, optionally permitting HTTP/2 (ALPN {@code h2}) negotiation.
+ *
+ * WebSocket connections pass {@code http2Allowed = false}: AsyncHttpClient does not implement RFC 8441
+ * (WebSocket over HTTP/2), so a WebSocket connection must not negotiate {@code h2} — otherwise the
+ * handshake is written as a plain HTTP/2 request and corrupts the connection. The default implementation
+ * ignores the flag and delegates to {@link #newSslEngine(AsyncHttpClientConfig, String, int)} for
+ * backwards compatibility; {@code DefaultSslEngineFactory} overrides it to advertise only {@code http/1.1}
+ * in ALPN when {@code http2Allowed} is {@code false}.
+ *
+ * @param config the client config
+ * @param peerHost the peer hostname
+ * @param peerPort the peer port
+ * @param http2Allowed whether HTTP/2 (ALPN {@code h2}) may be negotiated on this connection
+ * @return new engine
+ */
+ default SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort, boolean http2Allowed) {
+ return newSslEngine(config, peerHost, peerPort);
+ }
+
/**
* Perform any necessary one-time configuration. This will be called just once before {@code newSslEngine} is called
* for the first time.
diff --git a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java
index 4f97926bd..7c7356a95 100644
--- a/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java
+++ b/client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java
@@ -88,6 +88,7 @@ public final class AsyncHttpClientConfigDefaults {
public static final String HTTP2_HEADER_TABLE_SIZE_CONFIG = "http2HeaderTableSize";
public static final String HTTP2_MAX_HEADER_LIST_SIZE_CONFIG = "http2MaxHeaderListSize";
public static final String HTTP2_MAX_CONCURRENT_STREAMS_CONFIG = "http2MaxConcurrentStreams";
+ public static final String HTTP2_MAX_DECOMPRESSED_RESPONSE_SIZE_CONFIG = "http2MaxDecompressedResponseSize";
public static final String HTTP2_PING_INTERVAL_CONFIG = "http2PingInterval";
public static final String HTTP2_CLEARTEXT_ENABLED_CONFIG = "http2CleartextEnabled";
@@ -360,6 +361,11 @@ public static int defaultHttp2MaxConcurrentStreams() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_CONCURRENT_STREAMS_CONFIG);
}
+ public static long defaultHttp2MaxDecompressedResponseSize() {
+ // getInt suffices for the 256 MiB default; values above Integer.MAX_VALUE are set via the builder.
+ return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_DECOMPRESSED_RESPONSE_SIZE_CONFIG);
+ }
+
public static Duration defaultHttp2PingInterval() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getDuration(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_PING_INTERVAL_CONFIG);
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java
index 8cbbcfc50..ce8f602ec 100755
--- a/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java
+++ b/client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java
@@ -189,6 +189,8 @@ public boolean cancel(boolean force) {
return false;
}
+ releaseRequestIfNotHandedToChannel();
+
final Channel ch = channel; //atomic read, so that it won't end up in TOCTOU
if (ch != null) {
Channels.setDiscard(ch);
@@ -256,7 +258,29 @@ private boolean terminateAndExit() {
cancelTimeouts();
channel = null;
reuseChannel = false;
- return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
+ boolean alreadyTerminated = IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
+ if (!alreadyTerminated) {
+ releaseRequestIfNotHandedToChannel();
+ }
+ return alreadyTerminated;
+ }
+
+ /**
+ * Frees the request body buffer when the request was never handed to a channel encoder. On the HTTP/1.1
+ * success path Netty's encoder releases {@code httpRequest} after the write; but on an abort/cancel BEFORE
+ * the write (connect failure, onRequestSend crash, pool closed, cancellation) — or on replacement during a
+ * redirect/retry — nothing else would, leaking a {@code setBody(ByteBuf)} retained duplicate. In the
+ * HTTP/2 path {@code httpRequest} is never written to a channel, so AHC always owns its release.
+ * {@link NettyRequest#release()} is idempotent (CAS), so this never double-frees the encoder or the
+ * explicit HTTP/2 releases.
+ */
+ private void releaseRequestIfNotHandedToChannel() {
+ NettyRequest request = nettyRequest;
+ if (request != null) {
+ // release() atomically no-ops if the request was already handed to the channel encoder (which then
+ // owns the release), so there is no check-then-act race with the concurrent event-loop write.
+ request.release();
+ }
}
@Override
@@ -353,6 +377,13 @@ public NettyRequest getNettyRequest() {
}
public void setNettyRequest(NettyRequest nettyRequest) {
+ // On a redirect/auth/retry the request is rebuilt; release the previous one if it was never written,
+ // so its body buffer is not leaked. The replaced request is normally already written (handed to the
+ // channel) or already released, in which case this is a no-op (release() is idempotent).
+ NettyRequest previous = this.nettyRequest;
+ if (previous != null && previous != nettyRequest && !previous.isHandedToChannel()) {
+ previous.release();
+ }
this.nettyRequest = nettyRequest;
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
index 50599f86e..f2aa4b653 100755
--- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
@@ -360,7 +360,18 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
if (state != null) {
state.setPartitionKey(partitionKey);
}
- http2Connections.put(partitionKey, channel);
+ // Coalesce connections opened concurrently for the same partition (thundering herd): keep the
+ // first live one canonical in the registry. A connection that lost the race is marked redundant,
+ // so it serves only its own opening request and then closes (its stream's closeFuture listener
+ // closes the parent once activeStreams hits 0) instead of lingering open and unregistered. A dead
+ // registered entry is replaced.
+ Channel existing = http2Connections.putIfAbsent(partitionKey, channel);
+ if (existing != null && existing != channel) {
+ boolean replacedDead = !existing.isActive() && http2Connections.replace(partitionKey, existing, channel);
+ if (!replacedDead && state != null) {
+ state.markRedundant();
+ }
+ }
// When the connection closes, remove it from the registry AND fail any requests still queued
// for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
// connection drops have no stream channel (so no channelInactive is ever delivered for them)
@@ -368,12 +379,25 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
channel.closeFuture().addListener(future -> {
removeHttp2Connection(partitionKey, channel);
if (state != null) {
- state.failPendingOpeners(orphan ->
- orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened")));
+ state.failPendingOpeners(orphan -> failOrphanedH2Opener(orphan,
+ "HTTP/2 connection closed before a stream could be opened"));
}
});
}
+ /**
+ * Fails a request that was queued in {@link Http2ConnectionState} waiting for a stream slot but can
+ * never get one (the connection dropped or started draining). Releases its request body first —
+ * {@code sendHttp2Frames} never ran for it, so nothing else will — to avoid leaking the body ByteBuf.
+ * {@code NettyRequest.release()} is idempotent, so this is safe even if another path also releases.
+ */
+ private static void failOrphanedH2Opener(NettyResponseFuture> orphan, String message) {
+ if (orphan.getNettyRequest() != null) {
+ orphan.getNettyRequest().release();
+ }
+ orphan.abort(new IOException(message));
+ }
+
/**
* Removes an HTTP/2 connection from the registry, but only if it's the currently registered
* connection for that partition key (avoids removing a replacement connection).
@@ -466,8 +490,8 @@ private HttpClientCodec newHttpClientCodec() {
config.getHttpClientCodecInitialBufferSize());
}
- private SslHandler createSslHandler(String peerHost, int peerPort) {
- SSLEngine sslEngine = sslEngineFactory.newSslEngine(config, peerHost, peerPort);
+ private SslHandler createSslHandler(String peerHost, int peerPort, boolean http2Allowed) {
+ SSLEngine sslEngine = sslEngineFactory.newSslEngine(config, peerHost, peerPort, http2Allowed);
SslHandler sslHandler = new SslHandler(sslEngine);
if (handshakeTimeout > 0) {
sslHandler.setHandshakeTimeoutMillis(handshakeTimeout);
@@ -489,7 +513,7 @@ public Future updatePipelineForHttpTunneling(ChannelPipeline pipeline,
// Remove existing SSL handler (for proxy) and replace with SSL handler for target
pipeline.remove(SSL_HANDLER);
}
- SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort());
+ SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort(), !requestUri.isWebSocket());
whenHandshaked = sslHandler.handshakeFuture();
pipeline.addBefore(INFLATER_HANDLER, SSL_HANDLER, sslHandler);
pipeline.addAfter(SSL_HANDLER, HTTP_CLIENT_CODEC, newHttpClientCodec());
@@ -527,9 +551,9 @@ public Future updatePipelineForHttpsTunneling(ChannelPipeline pipeline,
// The proxy SSL handler should remain as it provides the tunnel transport
// We need to add target SSL handler that will negotiate with the target through the tunnel
- SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort());
+ SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort(), !requestUri.isWebSocket());
whenHandshaked = sslHandler.handshakeFuture();
-
+
// For HTTPS proxy tunnel, add target SSL handler after the existing proxy SSL handler
// This creates a nested SSL setup: Target SSL -> Proxy SSL -> Network
if (isSslHandlerConfigured(pipeline)) {
@@ -580,7 +604,8 @@ public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtua
peerPort = uri.getExplicitPort();
}
- SslHandler sslHandler = createSslHandler(peerHost, peerPort);
+ // A WebSocket connection must not negotiate h2 (no RFC 8441 support), so advertise only http/1.1 in ALPN.
+ SslHandler sslHandler = createSslHandler(peerHost, peerPort, !uri.isWebSocket());
// Check if SOCKS handler actually exists in the pipeline before trying to add after it
if (hasSocksProxyHandler && pipeline.get(SOCKS_HANDLER) != null) {
pipeline.addAfter(SOCKS_HANDLER, SSL_HANDLER, sslHandler);
@@ -710,7 +735,11 @@ public void upgradePipelineToHttp2(ChannelPipeline pipeline) {
.initialWindowSize(config.getHttp2InitialWindowSize())
.maxFrameSize(config.getHttp2MaxFrameSize())
.headerTableSize(config.getHttp2HeaderTableSize())
- .maxHeaderListSize(config.getHttp2MaxHeaderListSize());
+ .maxHeaderListSize(config.getHttp2MaxHeaderListSize())
+ // RFC 9113 §8.4: AsyncHttpClient never consumes server push, so advertise ENABLE_PUSH=0.
+ // A conformant server then never opens push streams; without this the client relies on
+ // Netty's default and a pushing server could trip a connection-level PROTOCOL_ERROR.
+ .pushEnabled(false);
Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forClient()
.initialSettings(settings)
@@ -734,7 +763,9 @@ protected void initChannel(Channel ch) {
Http2ConnectionState state = new Http2ConnectionState();
int configMaxStreams = config.getHttp2MaxConcurrentStreams();
if (configMaxStreams > 0) {
- state.updateMaxConcurrentStreams(configMaxStreams);
+ // Client's own cap; the server-advertised value (applied by the http2-settings-listener below)
+ // can only lower the effective limit, never raise it above this.
+ state.setClientMaxConcurrentStreams(configMaxStreams);
}
pipeline.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).set(state);
@@ -772,6 +803,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (pk != null) {
removeHttp2Connection(pk, ctx.channel());
}
+ // Fail requests still queued for a stream slot: a draining connection accepts no
+ // new streams, so they can never be opened here and would otherwise wait until the
+ // connection finally closes. Fail them now so they retry on a fresh connection (the
+ // registry no longer offers this one). Already-open streams below lastStreamId are
+ // untouched and complete normally. #12
+ connState.failPendingOpeners(orphan -> failOrphanedH2Opener(orphan,
+ "HTTP/2 connection received GOAWAY; request must retry on a new connection"));
}
LOGGER.debug("HTTP/2 GOAWAY received on {}, lastStreamId={}, errorCode={}",
ctx.channel(), lastStreamId, goAwayFrame.errorCode());
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java
index 3a081cf5a..d1dee9e0a 100644
--- a/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/Http2ConnectionState.java
@@ -49,12 +49,31 @@ private static final class PendingOpener {
}
private final AtomicInteger activeStreams = new AtomicInteger(0);
+ // The effective cap on concurrently open client-initiated streams is the MIN of the client's
+ // configured limit and the server-advertised SETTINGS_MAX_CONCURRENT_STREAMS (RFC 9113 §5.1.2 — a
+ // peer may not exceed the limit the other side advertises). Both are tracked so a later server
+ // SETTINGS frame can't silently raise the client's own configured ceiling.
+ private volatile int clientMaxConcurrentStreams = Integer.MAX_VALUE;
+ private volatile int serverMaxConcurrentStreams = Integer.MAX_VALUE;
private volatile int maxConcurrentStreams = Integer.MAX_VALUE;
private final AtomicBoolean draining = new AtomicBoolean(false);
private volatile int lastGoAwayStreamId = Integer.MAX_VALUE;
+ // Hard cap on requests queued waiting for a free stream slot. A peer that accepts the connection but never
+ // grants slots — SETTINGS_MAX_CONCURRENT_STREAMS=0, or a small limit with streams it never completes — would
+ // otherwise make every subsequent request queue forever, each pinning a NettyResponseFuture and its request
+ // body buffer, until the client OOMs. Past the cap, offerPendingOpener rejects and the caller fails the
+ // request fast. All queue mutations happen under pendingLock, so pendingCount tracks size in O(1)
+ // (ConcurrentLinkedQueue.size() is O(n)).
+ private static final int MAX_PENDING_OPENERS = 10_000;
private final ConcurrentLinkedQueue pendingOpeners = new ConcurrentLinkedQueue<>();
+ private int pendingCount;
private final Object pendingLock = new Object();
private final AtomicBoolean closed = new AtomicBoolean(false);
+ // Set when this connection lost the per-partition registration race (thundering herd): it is not in
+ // the registry, so it only ever serves its own opening request, then closes once its last stream ends
+ // — rather than lingering open and unregistered. Distinct from draining: it does NOT block its own
+ // opening request from acquiring a slot.
+ private final AtomicBoolean redundant = new AtomicBoolean(false);
private volatile Object partitionKey;
public boolean tryAcquireStream() {
@@ -77,30 +96,68 @@ public void releaseStream() {
drainPendingOpeners();
}
+ /**
+ * Binary-compatibility wrapper for the {@code void addPendingOpener(Runnable)} member released in 3.0.9
+ * (descriptor {@code ()V}); the branch's change to a {@code boolean} return was binary-incompatible. This
+ * restores the original member. Delegates to {@link #offerPendingOpener(Runnable)}, ignoring the result;
+ * internal callers that must observe the draining/closed rejection (the GOAWAY-orphan fix) use
+ * {@code offerPendingOpener} directly.
+ */
public void addPendingOpener(Runnable opener) {
- addPendingOpener(null, opener);
+ offerPendingOpener(opener);
+ }
+
+ public boolean offerPendingOpener(Runnable opener) {
+ return offerPendingOpener(null, opener);
}
- public void addPendingOpener(NettyResponseFuture> future, Runnable opener) {
+ /**
+ * Runs {@code opener} immediately if a stream slot is free, otherwise queues it for a later
+ * {@link #releaseStream()}. Returns {@code false} — without queuing — when the connection is
+ * already draining or closed, or when the pending queue is already at {@link #MAX_PENDING_OPENERS}: in
+ * each case the caller MUST fail the request itself rather than let it sit until the request timeout fires
+ * (Issue #2160). A draining/closed connection never runs a queued opener ({@link #drainPendingOpeners} only
+ * re-offers it, and {@link #failPendingOpeners} has already drained the queue); a full queue means the peer
+ * is starving slots and the request would otherwise grow heap without bound.
+ *
+ * Race-free against {@link #failPendingOpeners}: that method sets {@code closed} and drains the queue under
+ * {@code pendingLock}. An opener enqueued before the drain runs is caught by the drain; an enqueue attempt
+ * sequenced after it observes {@code closed} here (the lock provides the happens-before) and is rejected.
+ * Either way no opener is left stranded.
+ *
+ * @return {@code true} if the opener was run inline or queued; {@code false} if rejected because the
+ * connection is draining/closed or the pending queue is full (caller must fail the request)
+ */
+ public boolean offerPendingOpener(NettyResponseFuture> future, Runnable opener) {
synchronized (pendingLock) {
+ if (draining.get() || closed.get()) {
+ return false;
+ }
if (tryAcquireStream()) {
opener.run();
} else {
+ if (pendingCount >= MAX_PENDING_OPENERS) {
+ return false;
+ }
pendingOpeners.add(new PendingOpener(future, opener));
+ pendingCount++;
}
+ return true;
}
}
private void drainPendingOpeners() {
synchronized (pendingLock) {
- PendingOpener pending = pendingOpeners.poll();
- if (pending != null) {
- if (tryAcquireStream()) {
- pending.opener.run();
- } else {
- // Put it back — another releaseStream() will pick it up
- pendingOpeners.offer(pending);
- }
+ // Open as many queued requests as there are now-free stream slots. A single stream completion
+ // frees exactly one slot (so this usually runs one opener), but a SETTINGS frame that RAISES
+ // SETTINGS_MAX_CONCURRENT_STREAMS frees several at once — drain them all here rather than waking
+ // only one and stalling the rest until the next completion (a missed-wakeup; the Issue #2160
+ // silent-timeout class). tryAcquireStream() enforces the cap and the draining/closed gate, so
+ // this never over-opens; every poll is under pendingLock, so a non-empty queue always yields a
+ // non-null opener.
+ while (!pendingOpeners.isEmpty() && tryAcquireStream()) {
+ pendingCount--;
+ pendingOpeners.poll().opener.run();
}
}
}
@@ -119,13 +176,18 @@ private void drainPendingOpeners() {
* @param failer invoked once per orphaned request future (e.g. to abort it)
*/
public void failPendingOpeners(Consumer> failer) {
- closed.set(true);
List drained = new ArrayList<>();
synchronized (pendingLock) {
+ // Set closed UNDER pendingLock, before draining: an offerPendingOpener that already holds the lock
+ // finishes its enqueue and is caught by the drain below; one that has not yet acquired the lock
+ // observes closed==true (lock happens-before) and is rejected. Setting it outside the lock would
+ // leave the invariant the offerPendingOpener javadoc relies on resting on luck, not the lock.
+ closed.set(true);
PendingOpener p;
while ((p = pendingOpeners.poll()) != null) {
drained.add(p);
}
+ pendingCount = 0;
}
// Fail outside the lock — failer may re-enter client code.
for (PendingOpener p : drained) {
@@ -135,8 +197,34 @@ public void failPendingOpeners(Consumer> failer) {
}
}
- public void updateMaxConcurrentStreams(int maxConcurrentStreams) {
- this.maxConcurrentStreams = maxConcurrentStreams;
+ /**
+ * Sets the client's own configured cap (from {@code config.getHttp2MaxConcurrentStreams()}). The
+ * effective limit becomes the min of this and any server-advertised value.
+ */
+ public void setClientMaxConcurrentStreams(int clientMaxConcurrentStreams) {
+ this.clientMaxConcurrentStreams = clientMaxConcurrentStreams;
+ recomputeMaxConcurrentStreams();
+ }
+
+ /**
+ * Applies the server-advertised SETTINGS_MAX_CONCURRENT_STREAMS. The effective limit is the min of
+ * this and the client's configured cap — a server raising its limit never overrides the client's.
+ */
+ public void updateMaxConcurrentStreams(int serverMaxConcurrentStreams) {
+ int previous = this.maxConcurrentStreams;
+ this.serverMaxConcurrentStreams = serverMaxConcurrentStreams;
+ recomputeMaxConcurrentStreams();
+ // If the effective cap ROSE (the server raised SETTINGS_MAX_CONCURRENT_STREAMS), the newly-available
+ // slots must wake queued openers now. Otherwise they wait for the next stream completion to call
+ // drainPendingOpeners — and if no in-flight stream completes (e.g. all are long-lived), the queue
+ // stalls until the request timeout fires (a missed-wakeup, the Issue #2160 silent-timeout class).
+ if (this.maxConcurrentStreams > previous) {
+ drainPendingOpeners();
+ }
+ }
+
+ private void recomputeMaxConcurrentStreams() {
+ this.maxConcurrentStreams = Math.min(clientMaxConcurrentStreams, serverMaxConcurrentStreams);
}
public int getMaxConcurrentStreams() {
@@ -151,6 +239,15 @@ public boolean isDraining() {
return draining.get();
}
+ /** Marks this connection as a redundant duplicate that should close once its last stream ends. */
+ public void markRedundant() {
+ redundant.set(true);
+ }
+
+ public boolean isRedundant() {
+ return redundant.get();
+ }
+
public void setDraining(int lastStreamId) {
this.lastGoAwayStreamId = lastStreamId;
this.draining.set(true);
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
index 165e3de4e..167a83c2f 100755
--- a/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java
@@ -197,9 +197,20 @@ protected void onSuccess(Channel value) {
NettyConnectListener.this.onFailure(channel, e);
return;
}
- // Detect ALPN-negotiated protocol and upgrade pipeline to HTTP/2 if "h2" was selected
+ // Detect ALPN-negotiated protocol and upgrade pipeline to HTTP/2 if "h2" was selected.
+ // WebSocket is excluded: AsyncHttpClient does not implement RFC 8441 (WebSocket over
+ // HTTP/2), so a wss:// request must stay on HTTP/1.1. The WebSocket SSL engine advertises
+ // only http/1.1 (DefaultSslEngineFactory), so a conformant server will not select h2 here;
+ // this guard is the backstop for a custom SslEngineFactory that still advertises h2.
+ // Without it, the WebSocket handshake would be written as a plain HTTP/2 request and the
+ // broken connection pooled in the H2 registry, mis-routing later wss:// requests. See #2160.
String alpnProtocol = sslHandler.applicationProtocol();
- if (ApplicationProtocolNames.HTTP_2.equals(alpnProtocol)) {
+ boolean http2Negotiated = ApplicationProtocolNames.HTTP_2.equals(alpnProtocol);
+ if (http2Negotiated && uri.isWebSocket()) {
+ LOGGER.warn("Server negotiated HTTP/2 for WebSocket request to {}; WebSocket over HTTP/2 "
+ + "(RFC 8441) is not supported — continuing on HTTP/1.1", uri);
+ }
+ if (http2Negotiated && !uri.isWebSocket()) {
channelManager.upgradePipelineToHttp2(channel.pipeline());
registerHttp2AndReleaseSemaphore(channel);
releaseSemaphoreImmediately(partitionKeyLock);
@@ -223,8 +234,9 @@ protected void onFailure(Throwable cause) {
});
} else {
- // h2c (cleartext HTTP/2 prior knowledge): upgrade to HTTP/2 without TLS
- if (!uri.isSecured() && channelManager.isHttp2CleartextEnabled()) {
+ // h2c (cleartext HTTP/2 prior knowledge): upgrade to HTTP/2 without TLS. WebSocket (ws://) is
+ // excluded for the same RFC 8441 reason as the TLS path above — it stays on HTTP/1.1.
+ if (!uri.isSecured() && channelManager.isHttp2CleartextEnabled() && !uri.isWebSocket()) {
channelManager.upgradePipelineToHttp2(channel.pipeline());
registerHttp2AndReleaseSemaphore(channel);
releaseSemaphoreImmediately(partitionKeyLock);
diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java
index b53c8fe65..c6c6aa71c 100644
--- a/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java
+++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2ContentDecompressor.java
@@ -20,12 +20,15 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.compression.DecompressionException;
import io.netty.handler.codec.compression.JdkZlibDecoder;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2HeadersFrame;
+import java.util.Locale;
+
/**
* HTTP/2 content decompressor that transparently decompresses gzip/deflate response bodies.
* Installed on stream child channels when automatic decompression is enabled.
@@ -36,10 +39,20 @@
public class Http2ContentDecompressor extends ChannelInboundHandlerAdapter {
private final boolean keepEncodingHeader;
+ // Maximum cumulative decompressed bytes for this stream's response; 0 disables the limit. Guards against
+ // decompression-bomb responses — a tiny, highly compressible body that inflates to gigabytes and OOMs
+ // the client. One handler instance lives for one stream, so this counter spans the whole response.
+ private final long maxDecompressedBytes;
private EmbeddedChannel decompressor;
+ private long totalDecompressedBytes;
public Http2ContentDecompressor(boolean keepEncodingHeader) {
+ this(keepEncodingHeader, 0L);
+ }
+
+ public Http2ContentDecompressor(boolean keepEncodingHeader, long maxDecompressedBytes) {
this.keepEncodingHeader = keepEncodingHeader;
+ this.maxDecompressedBytes = maxDecompressedBytes;
}
@Override
@@ -48,7 +61,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
Http2HeadersFrame headersFrame = (Http2HeadersFrame) msg;
CharSequence contentEncoding = headersFrame.headers().get("content-encoding");
if (contentEncoding != null) {
- String enc = contentEncoding.toString().toLowerCase();
+ // Locale.ROOT: a default-locale toLowerCase() would mangle "GZIP" under the Turkish locale
+ // ("gzıp", dotless i), so the contains() checks below would miss it and the body would be
+ // forwarded still-compressed. Matches the toLowerCaseHeaderName idiom on the request path.
+ String enc = contentEncoding.toString().toLowerCase(Locale.ROOT);
if (enc.contains("gzip") || enc.contains("deflate")) {
ZlibWrapper wrapper = enc.contains("gzip") ? ZlibWrapper.GZIP : ZlibWrapper.ZLIB_OR_NONE;
decompressor = new EmbeddedChannel(false, new JdkZlibDecoder(wrapper));
@@ -61,29 +77,76 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.fireChannelRead(msg);
} else if (msg instanceof Http2DataFrame && decompressor != null) {
Http2DataFrame dataFrame = (Http2DataFrame) msg;
- ByteBuf content = dataFrame.content();
boolean endStream = dataFrame.isEndStream();
- if (content.isReadable()) {
- decompressor.writeInbound(content.retain());
- }
-
- // Release the original frame
- dataFrame.release();
-
- // Read all decompressed output from the embedded channel
+ // Accumulate decompressed output here. On ANY failure (typically a DecompressionException
+ // from a corrupt gzip/deflate body) the decode throws before the dataFrame.release() below,
+ // which would otherwise leak both the retained content (now held by the embedded decoder)
+ // and the EmbeddedChannel itself, and surface a raw codec error. The catch releases all
+ // three and re-raises a clean DecompressionException so the stream fails predictably.
CompositeByteBuf decompressed = ctx.alloc().compositeBuffer();
- ByteBuf decoded;
- while ((decoded = decompressor.readInbound()) != null) {
- decompressed.addComponent(true, decoded);
- }
+ try {
+ ByteBuf content = dataFrame.content();
+ if (content.isReadable()) {
+ decompressor.writeInbound(content.retain());
+ }
- if (endStream) {
- decompressor.finish();
+ ByteBuf decoded;
while ((decoded = decompressor.readInbound()) != null) {
decompressed.addComponent(true, decoded);
}
- releaseDecompressor();
+
+ if (endStream) {
+ decompressor.finish();
+ while ((decoded = decompressor.readInbound()) != null) {
+ decompressed.addComponent(true, decoded);
+ }
+ releaseDecompressor();
+ }
+ } catch (Throwable t) {
+ // Release everything we own BEFORE tearing the decoder down. releaseDecompressor() closes
+ // the embedded channel, which re-runs the decoder over any readable leftover (a body
+ // corrupted PAST the gzip header) and can throw a SECOND DecompressionException out of
+ // finishAndReleaseAll(). Releasing the frame first — and swallowing that cleanup throw —
+ // guarantees the frame is freed and the caller sees the clean exception below instead of
+ // the raw codec error. The decoder's own cumulation is released in its channelInputClosed
+ // finally regardless, so swallowing here leaks nothing.
+ decompressed.release();
+ dataFrame.release();
+ try {
+ releaseDecompressor();
+ } catch (Throwable cleanupError) {
+ // best-effort decoder teardown; see above
+ }
+ throw new DecompressionException("Failed to decompress HTTP/2 response body", t);
+ }
+
+ // Release the original frame — its readable content was retained into the decoder above.
+ dataFrame.release();
+
+ // Decompression-bomb guard: bound the cumulative decompressed size for this stream. A malicious
+ // server can send a tiny, highly compressible body that inflates to gigabytes; without this the
+ // forwarded output (and the byte[] copies the body-part factory makes) can OOM the client. The
+ // frame is already released above, so release the accumulator and tear down the decoder, then
+ // fail just this stream — the thrown DecompressionException routes via exceptionCaught ->
+ // handleException -> streamFailed, leaving sibling multiplexed streams untouched.
+ if (maxDecompressedBytes > 0) {
+ totalDecompressedBytes += decompressed.readableBytes();
+ if (totalDecompressedBytes > maxDecompressedBytes) {
+ decompressed.release();
+ // Swallow any teardown throw (finishAndReleaseAll() can re-run the decoder over a leftover
+ // and throw a second DecompressionException) so the caller sees the bomb-limit message below,
+ // not the raw codec error. The decoder's own cumulation is freed in channelInputClosed's
+ // finally regardless, so swallowing here leaks nothing — same as the corrupt-body catch.
+ try {
+ releaseDecompressor();
+ } catch (Throwable cleanupError) {
+ // best-effort decoder teardown
+ }
+ throw new DecompressionException(
+ "HTTP/2 response body exceeds the maximum decompressed size of "
+ + maxDecompressedBytes + " bytes");
+ }
}
if (decompressed.isReadable() || endStream) {
diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java
index 0d8e7c6c4..b83634b68 100644
--- a/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java
+++ b/client/src/main/java/org/asynchttpclient/netty/handler/Http2Handler.java
@@ -18,6 +18,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
@@ -25,7 +26,6 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2ResetFrame;
@@ -37,7 +37,7 @@
import org.asynchttpclient.netty.NettyResponseFuture;
import org.asynchttpclient.netty.NettyResponseStatus;
import org.asynchttpclient.netty.channel.ChannelManager;
-import org.asynchttpclient.netty.channel.Http2ConnectionState;
+import org.asynchttpclient.netty.channel.Channels;
import org.asynchttpclient.netty.request.NettyRequestSender;
import java.io.IOException;
@@ -83,21 +83,48 @@ public void handleRead(final Channel channel, final NettyResponseFuture> futur
}
} else if (e instanceof Http2DataFrame) {
handleHttp2DataFrame((Http2DataFrame) e, channel, future, handler);
- } else if (e instanceof Http2ResetFrame) {
- handleHttp2ResetFrame((Http2ResetFrame) e, channel, future);
- } else if (e instanceof Http2GoAwayFrame) {
- handleHttp2GoAwayFrame((Http2GoAwayFrame) e, channel, future);
}
+ // RST_STREAM is delivered as a user event (see userEventTriggered), never via channelRead.
+ // GOAWAY is a connection-level frame handled on the PARENT pipeline (ChannelManager's
+ // http2-goaway-listener); Http2MultiplexHandler closes any streams above lastStreamId itself,
+ // surfacing here as channelInactive. Neither is dispatched to this child handleRead.
} catch (Exception t) {
if (hasIOExceptionFilters && t instanceof IOException
&& requestSender.applyIoExceptionFiltersAndReplayRequest(future, (IOException) t, channel)) {
return;
}
- readFailed(channel, future, t);
- throw t;
+ // Stream-scoped failure (RFC 7540 §5.4.2): a processing error on ONE stream — a malformed frame,
+ // or far more commonly a user AsyncHandler callback (onStatusReceived/onHeadersReceived/
+ // onBodyPartReceived/onTrailingHeadersReceived) that throws — must fail only this stream, not
+ // close the parent TCP connection and take down every sibling multiplexed request. streamFailed
+ // routes through finishUpdate(close=false), closing only the single-use stream child channel.
+ // Do NOT re-throw: that would reach exceptionCaught and close the channel a second time.
+ streamFailed(channel, future, t);
}
}
+ /**
+ * Netty's {@link io.netty.handler.codec.http2.Http2MultiplexHandler} delivers RST_STREAM to the
+ * stream child channel as a user event ({@link Http2ResetFrame} is an
+ * {@link io.netty.handler.codec.http2.Http2StreamFrame}), NOT via {@code channelRead}. Without this
+ * override {@link #handleHttp2ResetFrame} never runs and the stream is failed only later by the
+ * generic {@code channelInactive}, discarding the server's RST error code. Handle it here so the
+ * stream fails promptly carrying that code, while staying stream-scoped (RFC 7540 §6.4): the parent
+ * connection and its sibling multiplexed streams are left untouched.
+ */
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof Http2ResetFrame) {
+ Channel channel = ctx.channel();
+ Object attribute = Channels.getAttribute(channel);
+ if (attribute instanceof NettyResponseFuture) {
+ handleHttp2ResetFrame((Http2ResetFrame) evt, channel, (NettyResponseFuture>) attribute);
+ }
+ return;
+ }
+ super.userEventTriggered(ctx, evt);
+ }
+
/**
* Processes an HTTP/2 HEADERS frame, which carries the response status and headers.
* Builds a synthetic {@link HttpResponse} from the HTTP/2 pseudo-headers so the existing
@@ -107,9 +134,16 @@ private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel cha
NettyResponseFuture> future, AsyncHandler> handler) throws Exception {
Http2Headers h2Headers = headersFrame.headers();
- // Extract :status pseudo-header and convert to HTTP status
+ // Extract :status pseudo-header and convert to HTTP status. Netty's header validation normally
+ // rejects a malformed :status upstream, but guard the parse so a bad value fails just this stream
+ // (via handleRead's catch) instead of throwing an unwrapped NumberFormatException.
CharSequence statusValue = h2Headers.status();
- int statusCode = statusValue != null ? Integer.parseInt(statusValue.toString()) : 200;
+ int statusCode;
+ try {
+ statusCode = statusValue != null ? Integer.parseInt(statusValue.toString()) : 200;
+ } catch (NumberFormatException nfe) {
+ throw new IOException("Malformed HTTP/2 :status pseudo-header: " + statusValue, nfe);
+ }
HttpResponseStatus nettyStatus = HttpResponseStatus.valueOf(statusCode);
// Build HTTP/1.1-style headers, skipping HTTP/2 pseudo-headers (start with ':')
@@ -129,7 +163,22 @@ private void handleHttp2HeadersFrame(Http2HeadersFrame headersFrame, Channel cha
NettyResponseStatus status = new NettyResponseStatus(future.getUri(), syntheticResponse, channel);
+ // RFC 9110 §15.2: a 1xx is an INTERIM response, not the final one. 100-continue must still run the
+ // interceptor chain (Continue100Interceptor resumes the deferred body), but any OTHER interim — 102
+ // Processing, 103 Early Hints — must NOT touch the chain at all: running it would persist the interim's
+ // Set-Cookie into the CookieStore and execute response filters against a non-final response, then do it
+ // again on the real response. The interim HEADERS has endStream=false, so just wait for the final frame.
+ if (statusCode > 100 && statusCode < 200) {
+ return;
+ }
+
if (!interceptors.exitAfterIntercept(channel, future, handler, syntheticResponse, status, responseHeaders)) {
+ // A 100 that the interceptor chain did not consume (no Expect/100-continue in flight) is still
+ // interim and must not be delivered to the AsyncHandler as the final status — that would fire
+ // onStatusReceived/onHeadersReceived a second time when the real response arrives.
+ if (statusCode == 100) {
+ return;
+ }
boolean abort = handler.onStatusReceived(status) == State.ABORT;
if (!abort) {
abort = handler.onHeadersReceived(responseHeaders) == State.ABORT;
@@ -202,43 +251,6 @@ private void handleHttp2ResetFrame(Http2ResetFrame resetFrame, Channel channel,
streamFailed(channel, future, new IOException("HTTP/2 stream reset by server, error code: " + errorCode));
}
- /**
- * Processes an HTTP/2 GOAWAY frame, which indicates the server is shutting down the connection.
- * The parent connection is removed from the pool to prevent new streams from being created on it.
- * The current stream's future is failed so the request can be retried on a new connection.
- */
- private void handleHttp2GoAwayFrame(Http2GoAwayFrame goAwayFrame, Channel channel, NettyResponseFuture> future) {
- long errorCode = goAwayFrame.errorCode();
- int lastStreamId = goAwayFrame.lastStreamId();
-
- // Remove the parent connection from the HTTP/2 registry so no new streams are opened on it
- Channel parentChannel = (channel instanceof Http2StreamChannel)
- ? ((Http2StreamChannel) channel).parent()
- : channel;
-
- // Mark the connection as draining and remove from registry
- Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
- if (state != null) {
- state.setDraining(lastStreamId);
- Object partitionKey = state.getPartitionKey();
- if (partitionKey != null) {
- channelManager.removeHttp2Connection(partitionKey, parentChannel);
- }
- }
-
- // Check if this stream's ID is within the allowed range
- if (channel instanceof Http2StreamChannel) {
- int streamId = ((Http2StreamChannel) channel).stream().id();
- if (streamId <= lastStreamId) {
- // This stream is allowed to complete — don't fail it
- return;
- }
- }
-
- readFailed(channel, future, new IOException("HTTP/2 connection GOAWAY received, error code: " + errorCode
- + ", lastStreamId: " + lastStreamId));
- }
-
/**
* Overrides the base {@link AsyncHttpClientHandler#finishUpdate} to correctly handle HTTP/2
* connection pooling. HTTP/2 stream channels are single-use — after the stream completes,
@@ -263,15 +275,12 @@ void finishUpdate(NettyResponseFuture> future, Channel streamChannel, boolean
: null;
if (parentChannel != null) {
- Http2ConnectionState state = parentChannel.attr(Http2ConnectionState.HTTP2_STATE_KEY).get();
- if (state != null) {
- state.releaseStream();
-
- // If connection is draining and no more active streams, close it
- if (state.isDraining() && state.getActiveStreams() <= 0) {
- channelManager.closeChannel(parentChannel);
- }
- }
+ // The stream slot is released — and a fully-drained draining connection closed — by the
+ // closeFuture listener bound in NettyRequestSender.openHttp2Stream, which the
+ // streamChannel.close() above triggers. Releasing here instead would leak the slot on the
+ // paths where an exception completes the future before finishUpdate runs (e.g. a
+ // DecompressionException routed through exceptionCaught -> streamFailed, whose isDone() guard
+ // then skips finishUpdate entirely).
// Fire onConnectionOffer to maintain event lifecycle contract
try {
@@ -293,16 +302,6 @@ void finishUpdate(NettyResponseFuture> future, Channel streamChannel, boolean
}
}
- private void readFailed(Channel channel, NettyResponseFuture> future, Throwable t) {
- try {
- requestSender.abort(channel, future, t);
- } catch (Exception abortException) {
- logger.debug("Abort failed", abortException);
- } finally {
- finishUpdate(future, channel, true);
- }
- }
-
/**
* Fails a single stream's future WITHOUT closing the parent connection. Used for stream-scoped
* events (RST_STREAM, and the {@code channelInactive}/exception Netty delivers when one stream
diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java
index 5c3968858..fec5daa6e 100644
--- a/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java
+++ b/client/src/main/java/org/asynchttpclient/netty/handler/intercept/Continue100Interceptor.java
@@ -31,13 +31,31 @@ class Continue100Interceptor {
}
public boolean exitAfterHandling100(final Channel channel, final NettyResponseFuture> future) {
+ // Capture whether THIS request actually deferred its body for a 100-continue, before resetting the
+ // flag below. A server may send an (unsolicited) 100 even when the client did not send
+ // Expect: 100-continue (RFC 9110 §15.2.1); in that case the body was already sent and must not be
+ // sent a second time.
+ boolean bodyWasDeferred = future.isDontWriteBodyBecauseExpectContinue();
future.setHeadersAlreadyWrittenOnContinue(true);
future.setDontWriteBodyBecauseExpectContinue(false);
if (channel instanceof Http2StreamChannel) {
- // HTTP/2 stream channels don't produce LastHttpContent.
- // Directly write the body on the stream channel.
- requestSender.writeRequest(future, channel);
+ // HTTP/2: the HEADERS frame was already sent with endStream=false; now send the deferred body
+ // as DATA frame(s). writeRequest() can't be reused here — its isHttp2() check looks for the
+ // parent connection's multiplex handler, which a stream child channel doesn't have, so it would
+ // mis-route to the HTTP/1.1 writer (UnsupportedMessageTypeException + use-after-free).
+ //
+ // Only resume when the body was genuinely deferred. For a request WITHOUT Expect: 100-continue
+ // the body was already written with endStream=true, so writing DATA now would be a frame after
+ // endStream on a half-closed (local) stream — a STREAM_CLOSED protocol error (RFC 9113 §5.1).
+ // An unsolicited interim 100 on such a request is ignored (keep waiting for the final response).
+ if (bodyWasDeferred) {
+ try {
+ requestSender.sendHttp2RequestBody(future, (Http2StreamChannel) channel);
+ } catch (Exception e) {
+ requestSender.abort(channel, future, e);
+ }
+ }
} else {
// HTTP/1.1: wait for LastHttpContent before sending the body
Channels.setAttribute(channel, new OnLastHttpContentCallback(future) {
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequest.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequest.java
index 71cc658a0..8a4951b0a 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequest.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequest.java
@@ -16,12 +16,29 @@
package org.asynchttpclient.netty.request;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.util.ReferenceCountUtil;
import org.asynchttpclient.netty.request.body.NettyBody;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
public final class NettyRequest {
+ @SuppressWarnings("rawtypes")
+ private static final AtomicIntegerFieldUpdater STATE_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(NettyRequest.class, "state");
+
+ // Single atomic state machine guarding who releases httpRequest (and the request-body ByteBuf it holds),
+ // so it is released EXACTLY once — no double-free, no leak — even when an abort/cancel/timeout on one
+ // thread races the channel write on the event loop. The two transitions out of OWNED_BY_AHC are mutually
+ // exclusive (CAS), so exactly one of {Netty's encoder, AHC} ends up owning the release.
+ private static final int OWNED_BY_AHC = 0; // initial: AHC owns the release
+ private static final int HANDED_TO_CHANNEL = 1; // handed to Netty's HTTP/1.1 encoder; IT releases
+ private static final int RELEASED = 2; // AHC released it
+
private final HttpRequest httpRequest;
private final NettyBody body;
+ @SuppressWarnings("unused")
+ private volatile int state;
NettyRequest(HttpRequest httpRequest, NettyBody body) {
this.httpRequest = httpRequest;
@@ -35,4 +52,38 @@ public HttpRequest getHttpRequest() {
public NettyBody getBody() {
return body;
}
+
+ /**
+ * Releases the underlying HTTP/1.1 {@link HttpRequest} (and the request-body {@link io.netty.buffer.ByteBuf}
+ * it holds) — but only while AsyncHttpClient still owns it (it was never handed to a channel write). Once
+ * {@link #markHandedToChannel()} has claimed it, Netty's encoder owns the release and this becomes a no-op,
+ * so the two can never double-free. The atomic CAS also closes the check-then-release TOCTOU an external
+ * {@code isHandedToChannel()} test would leave between an abort thread and the event-loop write.
+ *
+ * In the HTTP/2 path the {@code httpRequest} object is never written to a channel — its content is
+ * re-encoded as HTTP/2 frames — so AHC always owns its release and this performs it. The early-abort paths
+ * (a draining/closed/queued-then-dropped connection, or a crashing {@code onRequestSend}) call it too.
+ * Idempotent and thread-safe — extra calls are no-ops.
+ */
+ public void release() {
+ if (STATE_UPDATER.compareAndSet(this, OWNED_BY_AHC, RELEASED)) {
+ ReferenceCountUtil.release(httpRequest);
+ }
+ }
+
+ /**
+ * Atomically claims {@link #getHttpRequest()} for a channel write (the HTTP/1.1 path), after which Netty's
+ * encoder owns its release. Returns {@code true} if the caller may proceed with the write; returns
+ * {@code false} when a concurrent abort/cancel/timeout has ALREADY released the request body — in which case
+ * the caller MUST NOT write the now-freed buffer. This single CAS resolves the double-free/use-after-free
+ * race a separate {@code handedToChannel} flag plus a non-atomic {@code release()} would otherwise allow
+ * (e.g. a {@code setBody(ByteBuf)} request cancelled on one thread while the event loop writes it).
+ */
+ public boolean markHandedToChannel() {
+ return STATE_UPDATER.compareAndSet(this, OWNED_BY_AHC, HANDED_TO_CHANNEL);
+ }
+
+ public boolean isHandedToChannel() {
+ return state == HANDED_TO_CHANNEL;
+ }
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
index 7609218b5..3fdef5769 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java
@@ -38,8 +38,8 @@
import io.netty.resolver.AddressResolver;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.AsciiString;
-import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timer;
+import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
@@ -457,6 +457,25 @@ private static CharSequence toLowerCaseHeaderName(CharSequence name) {
return name.toString().toLowerCase(Locale.ROOT);
}
+ /**
+ * Removes any userinfo subcomponent from an authority before it becomes the HTTP/2 {@code :authority}
+ * pseudo-header. RFC 9113 §8.3.1: ":authority MUST NOT include the deprecated userinfo subcomponent". Drops
+ * everything up to and including the last {@code '@'} (userinfo itself cannot contain a raw '@', RFC 3986
+ * §3.2.1), leaving {@code host[:port]} — an IPv6 literal such as {@code [::1]:443} contains no '@' and is
+ * untouched. Allocation-free when there is no userinfo (the common case).
+ */
+ private static CharSequence stripUserInfo(CharSequence authority) {
+ if (authority == null) {
+ return null;
+ }
+ for (int i = authority.length() - 1; i >= 0; i--) {
+ if (authority.charAt(i) == '@') {
+ return authority.subSequence(i + 1, authority.length());
+ }
+ }
+ return authority;
+ }
+
public void writeRequest(NettyResponseFuture future, Channel channel) {
// if the channel is dead because it was pooled and the remote server decided to close it,
// we just let it go and the channelInactive do its work
@@ -489,6 +508,15 @@ public void writeRequest(NettyResponseFuture future, Channel channel) {
return;
}
+ // Atomically claim httpRequest for the channel write — from here Netty's HTTP/1.1 encoder owns
+ // and releases it, so a later abort must not release it again (NettyRequest.release() then
+ // no-ops). If the claim FAILS, a concurrent abort/cancel/timeout has already released the
+ // request body on another thread; writing the freed buffer would be a use-after-free, so bail
+ // (the future is already terminal). Before this point an abort releases the request body itself.
+ if (!nettyRequest.markHandedToChannel()) {
+ return;
+ }
+
// if the request has a body, we want to track progress
if (writeBody) {
// FIXME does this really work??? the promise is for the request without body!!!
@@ -530,17 +558,30 @@ private void writeHttp2Request(NettyResponseFuture future, Channel parent
if (state != null && !state.tryAcquireStream()) {
if (state.isDraining()) {
// Connection is draining from GOAWAY — fail the future so it retries on a new connection.
- // Don't close the parent channel since it may still have active streams.
+ // Don't close the parent channel since it may still have active streams. sendHttp2Frames
+ // never runs for this future, so release its request body here to avoid leaking it.
+ releaseHttp2Request(future);
future.abort(new java.io.IOException("HTTP/2 connection is draining (GOAWAY received)"));
return;
}
- // Queue for later when a stream slot opens up
- state.addPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state));
+ // Queue for later when a stream slot opens up. offerPendingOpener returns false if the
+ // connection started draining/closing (e.g. a GOAWAY processed on the event loop) between our
+ // tryAcquireStream() above and this enqueue — such an opener would never run, so fail the
+ // request now instead of leaving it to be completed only by the request timeout (Issue #2160).
+ if (!state.offerPendingOpener(future, () -> openHttp2Stream(future, parentChannel, state))) {
+ releaseHttp2Request(future);
+ // Fail ONLY this future (future.abort, not abort(parentChannel, ...)): the parent may be
+ // draining-but-still-active with healthy sibling streams, and abort(channel, ...) would close
+ // it — the multiplexed-connection blast radius. The parent closes itself once its streams drain.
+ future.abort(new java.io.IOException("HTTP/2 connection draining or closed while request was queued"));
+ return;
+ }
// The parent connection may have closed concurrently with the enqueue above; if so its
// close listener already drained the queue and this future would be orphaned (it has no
// stream channel, so no channelInactive is ever delivered for it). Detect that race and
// fail it here so it never survives only to the request timeout (Issue #2160).
if (!parentChannel.isActive() && !future.isDone()) {
+ releaseHttp2Request(future);
abort(parentChannel, future,
new java.io.IOException("HTTP/2 connection closed while request was queued"));
}
@@ -549,12 +590,6 @@ private void writeHttp2Request(NettyResponseFuture future, Channel parent
openHttp2Stream(future, parentChannel, state);
}
- private static void releaseHttp2Stream(Http2ConnectionState state) {
- if (state != null) {
- state.releaseStream();
- }
- }
-
private void openHttp2Stream(NettyResponseFuture future, Channel parentChannel, Http2ConnectionState state) {
new Http2StreamChannelBootstrap(parentChannel)
.handler(new ChannelInitializer() {
@@ -562,7 +597,8 @@ private void openHttp2Stream(NettyResponseFuture future, Channel parentCh
protected void initChannel(Http2StreamChannel streamCh) {
if (config.isEnableAutomaticDecompression()) {
streamCh.pipeline().addLast("http2-decompressor",
- new Http2ContentDecompressor(config.isKeepEncodingHeader()));
+ new Http2ContentDecompressor(config.isKeepEncodingHeader(),
+ config.getHttp2MaxDecompressedResponseSize()));
}
streamCh.pipeline().addLast(channelManager.getHttp2Handler());
}
@@ -571,6 +607,40 @@ protected void initChannel(Http2StreamChannel streamCh) {
.addListener((Future f) -> {
if (f.isSuccess()) {
Http2StreamChannel streamChannel = f.getNow();
+
+ // Pin the request THIS stream is opened for so the closeFuture listener below frees
+ // exactly that buffer — even if the future is later replayed/redirected onto a
+ // freshly-built NettyRequest (re-reading future.getNettyRequest() at close time could
+ // otherwise free the new request's body before it is written). release() is idempotent.
+ final NettyRequest openedRequest = future.getNettyRequest();
+
+ // Release the acquired stream slot exactly once, when this stream channel closes —
+ // no matter HOW it closes: normal completion, abort, a non-IOException such as a
+ // DecompressionException that completes the future before finishUpdate runs, or a
+ // parent-connection drop. Binding the slot to the channel lifecycle (as
+ // NettyConnectListener does for the HTTP/1.1 connection semaphore) is the one place
+ // guaranteed to run exactly once per opened stream, so activeStreams cannot leak and
+ // wedge the connection. (The open-FAILURE branch below has no channel and releases
+ // inline.) The listener also closes a draining connection once its last stream ends.
+ streamChannel.closeFuture().addListener(closed -> {
+ // Safety net: free the request body when the stream ends, however it ends —
+ // covers an Expect/100-continue request whose server answered the final response
+ // without a 100, so the deferred body was never sent (hence never released).
+ // Idempotent with the prompt releases on the normal/abort paths.
+ if (openedRequest != null) {
+ openedRequest.release();
+ }
+ if (state != null) {
+ state.releaseStream();
+ // Close the parent once it has no active streams AND it is either draining
+ // (GOAWAY) or a redundant duplicate (#10 thundering-herd loser) — neither
+ // will serve further requests, so it must not linger open.
+ if ((state.isDraining() || state.isRedundant()) && state.getActiveStreams() <= 0) {
+ channelManager.closeChannel(parentChannel);
+ }
+ }
+ });
+
channelManager.registerOpenChannel(streamChannel);
Channels.setAttribute(streamChannel, future);
Channels.setActiveToken(streamChannel);
@@ -581,11 +651,10 @@ protected void initChannel(Http2StreamChannel streamCh) {
asyncHandler.onRequestSend(future.getNettyRequest());
} catch (Exception e) {
LOGGER.error("onRequestSend crashed", e);
- // The slot was acquired before open(); aborting here completes the
- // future, so the stream's later channelInactive won't run finishUpdate
- // (its !future.isDone() guard fails). Release the slot explicitly,
- // otherwise activeStreams leaks and eventually wedges the connection.
- releaseHttp2Stream(state);
+ // sendHttp2Frames never ran, so it never released the request body — do it
+ // here. The slot is released by the closeFuture listener above once the
+ // abort closes the stream channel.
+ releaseHttp2Request(future);
abort(streamChannel, future, e);
return;
}
@@ -598,16 +667,26 @@ protected void initChannel(Http2StreamChannel streamCh) {
scheduleReadTimeout(future);
} catch (Exception e) {
LOGGER.error("Can't write HTTP/2 request", e);
- // See above: release the slot the failed stream will never release itself.
- releaseHttp2Stream(state);
+ // If the throw happened before sendHttp2Frames released the request body,
+ // release it now (idempotent). The slot is released via the closeFuture
+ // listener above once the abort closes the stream channel.
+ releaseHttp2Request(future);
abort(streamChannel, future, e);
}
} else {
- // Stream channel was never opened — release the acquired stream slot
+ // Stream channel was never opened — no closeFuture will fire, so release the
+ // acquired slot and the unsent request body inline.
if (state != null) {
state.releaseStream();
}
- abort(parentChannel, future, f.cause());
+ releaseHttp2Request(future);
+ // Fail ONLY this future (future.abort, not abort(parentChannel, ...)): opening one stream
+ // can fail for a stream-local reason (e.g. Netty rejecting it as the outbound max-streams
+ // bookkeeping races AHC's own cap) while the parent connection is healthy with sibling
+ // streams. abort(channel, ...) would closeChannel(parentChannel) and take them all down —
+ // the multiplexed-connection blast radius. Keep it stream-scoped, like the draining/queued
+ // paths above and Http2Handler.streamFailed(close=false).
+ future.abort(f.cause());
}
});
}
@@ -628,15 +707,25 @@ private void sendHttp2Frames(NettyResponseFuture future, Http2StreamChann
NettyRequest nettyRequest = future.getNettyRequest();
HttpRequest httpRequest = nettyRequest.getHttpRequest();
Uri uri = future.getUri();
+ boolean releaseRequest = true;
try {
// Build HTTP/2 pseudo-headers + regular headers. :path reuses Uri.toRelativeUrl() (pooled
// StringBuilder) instead of re-concatenating path + "?" + query on every request.
+ // :authority must carry the effective Host — RFC 9113 §8.3.1 makes :authority authoritative and
+ // it replaces the Host header (which isHttp2ExcludedHeader strips below). Reuse the Host the
+ // request factory already computed (which honours setVirtualHost(...) and any explicit Host,
+ // NettyRequestFactory) instead of re-deriving from the URI, so a virtual host is not dropped on
+ // HTTP/2 the way it would be with a bare hostHeader(uri).
+ // RFC 9113 §8.3.1: :authority MUST NOT include the deprecated userinfo subcomponent, so strip any
+ // "user@" / "user:pass@" prefix from the effective Host before it becomes :authority.
+ CharSequence hostValue = httpRequest.headers().get(HttpHeaderNames.HOST);
+ CharSequence authority = stripUserInfo(hostValue != null ? hostValue : hostHeader(uri));
Http2Headers h2Headers = new DefaultHttp2Headers()
.method(httpRequest.method().name())
.path(uri.toRelativeUrl())
.scheme(uri.getScheme())
- .authority(hostHeader(uri));
+ .authority(authority);
// Copy the HTTP/1.1 headers, dropping connection-specific names forbidden in HTTP/2 (RFC 7540
// §8.1.2.2). iteratorCharSequence() avoids the per-name String the String-typed iterator forces;
@@ -645,52 +734,127 @@ private void sendHttp2Frames(NettyResponseFuture future, Http2StreamChann
while (it.hasNext()) {
Map.Entry entry = it.next();
CharSequence name = entry.getKey();
- if (!isHttp2ExcludedHeader(name)) {
- h2Headers.add(toLowerCaseHeaderName(name), entry.getValue());
+ CharSequence value = entry.getValue();
+ if (isHttp2ExcludedHeader(name)) {
+ continue;
}
- }
-
- // Determine if we have a body to write.
- // Support both DefaultFullHttpRequest (inline content) and NettyDirectBody (byte array/buffer bodies).
- ByteBuf bodyBuf = null;
- if (httpRequest instanceof DefaultFullHttpRequest) {
- ByteBuf content = ((DefaultFullHttpRequest) httpRequest).content();
- if (content != null && content.isReadable()) {
- bodyBuf = content;
+ // RFC 9113 §8.2.2: TE MUST NOT be sent over HTTP/2 with any value other than "trailers".
+ // Forwarding a user's "TE: gzip" verbatim makes a conformant server reset the stream
+ // (PROTOCOL_ERROR), so drop every TE value except the permitted "trailers".
+ if (HttpHeaderNames.TE.contentEqualsIgnoreCase(name)
+ && !HttpHeaderValues.TRAILERS.contentEqualsIgnoreCase(value)) {
+ continue;
}
+ h2Headers.add(toLowerCaseHeaderName(name), value);
}
+ // Determine the body to send: an in-memory buffer (DefaultFullHttpRequest content or a
+ // NettyDirectBody) or a streaming body written via NettyBody.writeHttp2. See http2BodyBuf.
NettyBody nettyBody = nettyRequest.getBody();
- if (bodyBuf == null && nettyBody != null) {
- if (nettyBody instanceof NettyDirectBody) {
- ByteBuf directBuf = ((NettyDirectBody) nettyBody).byteBuf();
- if (directBuf != null && directBuf.isReadable()) {
- bodyBuf = directBuf;
- }
- }
- }
-
- // Determine if we have a streaming body that needs writeHttp2()
+ ByteBuf bodyBuf = http2BodyBuf(httpRequest, nettyBody);
boolean hasStreamingBody = bodyBuf == null && nettyBody != null && !(nettyBody instanceof NettyDirectBody);
boolean hasBody = bodyBuf != null || hasStreamingBody;
+ if (hasBody && future.isDontWriteBodyBecauseExpectContinue()) {
+ // Expect: 100-continue (RFC 9110 §10.1.1) — send only the HEADERS frame with
+ // endStream=false and wait for the server's 100 (Continue) before sending the body. The
+ // request (and its body buffer) must survive until the resume, so don't release it here:
+ // Continue100Interceptor -> sendHttp2RequestBody() sends the body and releases. If the
+ // server answers the final response WITHOUT a 100, the stream's closeFuture safety-net in
+ // openHttp2Stream releases the never-sent body.
+ streamChannel.write(new DefaultHttp2HeadersFrame(h2Headers, false));
+ streamChannel.flush();
+ releaseRequest = false;
+ return;
+ }
+
// Write HEADERS frame (endStream=true when there is no body)
streamChannel.write(new DefaultHttp2HeadersFrame(h2Headers, !hasBody));
+ writeHttp2BodyFrames(future, streamChannel, bodyBuf, hasStreamingBody, nettyBody);
+ } finally {
+ // Release the original HTTP/1.1 request — in the HTTP/2 path it is not written to the channel,
+ // so we must release it manually to avoid leaking its content ByteBuf. Routed through the
+ // idempotent NettyRequest.release() so the early-abort paths (writeHttp2Request /
+ // openHttp2Stream / failPendingOpeners) can release it too with no risk of a double-free here.
+ // Skipped only when an Expect/100-continue request is parked waiting for its 100 (above).
+ if (releaseRequest) {
+ nettyRequest.release();
+ }
+ }
+ }
- if (hasStreamingBody) {
- streamChannel.flush();
- nettyBody.writeHttp2(streamChannel, future);
- } else if (bodyBuf != null) {
- // Write DATA frame with endStream=true — body is sent as a single frame
- streamChannel.write(new DefaultHttp2DataFrame(bodyBuf.retainedDuplicate(), true));
- streamChannel.flush();
+ /**
+ * Sends the body of an HTTP/2 request whose HEADERS were already written with {@code endStream=false}
+ * because it carried {@code Expect: 100-continue}. Invoked by {@code Continue100Interceptor} once the
+ * server's 100 (Continue) arrives: writes the body as DATA frame(s) with {@code endStream=true} and
+ * releases the request.
+ */
+ public void sendHttp2RequestBody(NettyResponseFuture> future, Http2StreamChannel streamChannel) throws IOException {
+ NettyRequest nettyRequest = future.getNettyRequest();
+ NettyBody nettyBody = nettyRequest.getBody();
+ ByteBuf bodyBuf = http2BodyBuf(nettyRequest.getHttpRequest(), nettyBody);
+ boolean hasStreamingBody = bodyBuf == null && nettyBody != null && !(nettyBody instanceof NettyDirectBody);
+ try {
+ if (bodyBuf != null || hasStreamingBody) {
+ writeHttp2BodyFrames(future, streamChannel, bodyBuf, hasStreamingBody, nettyBody);
} else {
- streamChannel.flush();
+ // No body materialised (unexpected on this path) — just end the stream.
+ streamChannel.writeAndFlush(new DefaultHttp2DataFrame(streamChannel.alloc().buffer(0), true));
}
} finally {
- // Release the original HTTP/1.1 request — in the HTTP/2 path it is not written to the channel,
- // so we must release it manually to avoid leaking its content ByteBuf.
- ReferenceCountUtil.release(httpRequest);
+ nettyRequest.release();
+ }
+ }
+
+ /**
+ * Extracts the in-memory request body buffer for the HTTP/2 path: the inline content of a
+ * {@link DefaultFullHttpRequest}, or a {@link NettyDirectBody}'s buffer. Returns {@code null} when
+ * there is no in-memory body (a streaming body, if any, is written via {@link NettyBody#writeHttp2}).
+ */
+ private static ByteBuf http2BodyBuf(HttpRequest httpRequest, NettyBody nettyBody) {
+ if (httpRequest instanceof DefaultFullHttpRequest) {
+ ByteBuf content = ((DefaultFullHttpRequest) httpRequest).content();
+ if (content != null && content.isReadable()) {
+ return content;
+ }
+ }
+ if (nettyBody instanceof NettyDirectBody) {
+ ByteBuf directBuf = ((NettyDirectBody) nettyBody).byteBuf();
+ if (directBuf != null && directBuf.isReadable()) {
+ return directBuf;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Writes the request body as HTTP/2 DATA frame(s) with {@code endStream=true} (the HEADERS frame has
+ * already been written).
+ */
+ private void writeHttp2BodyFrames(NettyResponseFuture> future, Http2StreamChannel streamChannel,
+ ByteBuf bodyBuf, boolean hasStreamingBody, NettyBody nettyBody) throws IOException {
+ if (hasStreamingBody) {
+ streamChannel.flush();
+ nettyBody.writeHttp2(streamChannel, future);
+ } else if (bodyBuf != null) {
+ // Single DATA frame; retainedDuplicate so releasing the request doesn't free the bytes
+ // mid-write (Netty releases the duplicate after the write completes).
+ streamChannel.write(new DefaultHttp2DataFrame(bodyBuf.retainedDuplicate(), true));
+ streamChannel.flush();
+ } else {
+ streamChannel.flush();
+ }
+ }
+
+ /**
+ * Releases the request body buffer for an HTTP/2 request that is being aborted before
+ * {@link #sendHttp2Frames} runs (and would otherwise release it). Idempotent — safe to call on a
+ * path that may or may not have already reached {@code sendHttp2Frames}.
+ */
+ private static void releaseHttp2Request(NettyResponseFuture> future) {
+ NettyRequest nettyRequest = future.getNettyRequest();
+ if (nettyRequest != null) {
+ nettyRequest.release();
}
}
@@ -824,11 +988,29 @@ private static void validateWebSocketRequest(Request request, AsyncHandler> as
*/
private Channel waitForHttp2Connection(Request request, ProxyServer proxy) {
Uri uri = request.getUri();
+ // WebSocket requests must never multiplex onto an HTTP/2 connection (no RFC 8441 support). See #2160.
+ if (uri.isWebSocket()) {
+ return null;
+ }
String virtualHost = request.getVirtualHost();
- long deadline = System.nanoTime() + config.getConnectTimeout().toNanos();
+ Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
+ if (h2Channel != null) {
+ return h2Channel;
+ }
+
+ // NEVER block an event-loop thread here. A redirect / 401 / 407 retry re-enters sendRequest ON the
+ // event loop, and the HTTP/2 connection we would wait for is being established on that SAME loop —
+ // a Thread.sleep would freeze the loop and can self-deadlock (the connection never finishes because
+ // its loop is parked here). On the loop, do the single non-blocking poll above and give up; the
+ // caller then proceeds as if no poolable connection was found.
+ if (isOnEventLoop()) {
+ return null;
+ }
+
+ long deadline = System.nanoTime() + config.getConnectTimeout().toNanos();
while (System.nanoTime() < deadline) {
- Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
+ h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
if (h2Channel != null) {
return h2Channel;
}
@@ -842,6 +1024,15 @@ private Channel waitForHttp2Connection(Request request, ProxyServer proxy) {
return null;
}
+ private boolean isOnEventLoop() {
+ for (EventExecutor executor : channelManager.getEventLoopGroup()) {
+ if (executor.inEventLoop()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private Channel pollPooledChannel(Request request, ProxyServer proxy, AsyncHandler> asyncHandler) {
try {
asyncHandler.onConnectionPoolAttempt();
@@ -852,12 +1043,18 @@ private Channel pollPooledChannel(Request request, ProxyServer proxy, AsyncHandl
Uri uri = request.getUri();
String virtualHost = request.getVirtualHost();
- // Check HTTP/2 connection registry first — these connections support multiplexing
- // and are not removed from the registry on poll (unlike the regular pool)
- Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
- if (h2Channel != null) {
- LOGGER.debug("Using HTTP/2 multiplexed Channel '{}' for '{}' to '{}'", h2Channel, request.getMethod(), uri);
- return h2Channel;
+ // Check HTTP/2 connection registry first — these connections support multiplexing and are not
+ // removed from the registry on poll (unlike the regular pool). WebSocket requests are excluded:
+ // AsyncHttpClient does not implement RFC 8441 (WebSocket over HTTP/2), so reusing a pooled h2
+ // connection would send the WS handshake as a plain HTTP/2 request and the WebSocket handler would
+ // receive raw frames ("Invalid message ... AdaptiveByteBuf"). Fall through to an HTTP/1.1 connection.
+ // See Issue #2160.
+ if (!uri.isWebSocket()) {
+ Channel h2Channel = channelManager.pollHttp2(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
+ if (h2Channel != null) {
+ LOGGER.debug("Using HTTP/2 multiplexed Channel '{}' for '{}' to '{}'", h2Channel, request.getMethod(), uri);
+ return h2Channel;
+ }
}
final Channel channel = channelManager.poll(uri, virtualHost, proxy, request.getChannelPoolPartitioning());
@@ -884,7 +1081,19 @@ public void replayRequest(final NettyResponseFuture> future, FilterContext fc,
return;
}
- channelManager.drainChannelAndOffer(channel, future);
+ // An HTTP/2 stream child channel is single-use and never emits LastHttpContent, so the HTTP/1.1
+ // drain-and-pool path (drainChannelAndOffer installs an OnLastHttpContentCallback) would wait for it
+ // forever — leaking the stream slot (the closeFuture listener that releases the slot never fires)
+ // and orphaning the channel until the connection wedges (Issue #2160). Instead detach the future from
+ // the old stream (so the stream's imminent channelInactive does not fail the just-replayed request —
+ // the same disconnect drainChannelAndOffer performs by swapping the channel attribute) and close it,
+ // which fires the closeFuture slot release. The replay opens a fresh stream/connection below.
+ if (channel instanceof Http2StreamChannel) {
+ Channels.setDiscard(channel);
+ channelManager.closeChannel(channel);
+ } else {
+ channelManager.drainChannelAndOffer(channel, future);
+ }
sendNextRequest(newRequest, future);
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/Http2BodyWriter.java b/client/src/main/java/org/asynchttpclient/netty/request/body/Http2BodyWriter.java
new file mode 100644
index 000000000..77ebdb8cf
--- /dev/null
+++ b/client/src/main/java/org/asynchttpclient/netty/request/body/Http2BodyWriter.java
@@ -0,0 +1,317 @@
+/*
+ * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.asynchttpclient.netty.request.body;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Streams a request body to an HTTP/2 stream child channel as a sequence of
+ * {@link DefaultHttp2DataFrame DATA frames}, honouring HTTP/2 flow control and channel writability.
+ *
+ * The previous implementation looped over the whole body up front, queuing one DATA frame per chunk
+ * in the channel's outbound buffer (heap) and flushing once at the end — O(body size) memory and an
+ * inline read of the entire source on the event loop, which could OOM on large uploads.
+ *
+ * This writer is a one-chunk-at-a-time pump driven entirely on the stream channel's event loop:
+ *
+ *
It produces and writes exactly one chunk, then flushes, so frames actually leave the process
+ * instead of accumulating.
+ *
It only produces the next chunk while {@link Http2StreamChannel#isWritable()} is {@code true}.
+ * A stream child channel becomes unwritable when the HTTP/2 flow-control window is exhausted or
+ * the local high-water mark is reached (child writes go through {@code incrementPendingOutboundBytes}).
+ * When it goes unwritable the pump parks; a transient {@link ChannelInboundHandlerAdapter} added to
+ * the stream pipeline resumes it from {@code channelWritabilityChanged}.
+ *
It reads one chunk ahead so the final DATA frame can carry {@code endStream=true};
+ * an empty body still sends a single empty DATA frame with {@code endStream=true}.
+ *
This writer holds at most one buffered "pending" chunk across a writability wait. Production stops
+ * once the channel goes unwritable, so total in-flight heap is bounded by the channel's write
+ * high-water mark (the already-written chunks the channel's outbound buffer still owns) plus that one
+ * pending chunk — not by the size of the whole body.
+ *
+ *
+ * Lifecycle / cleanup. Because the pump completes asynchronously (after {@code writeHttp2}
+ * returns), source cleanup ({@link ChunkSource#close()}) happens when the pump finishes — on success after
+ * the last write completes, or on any read/write error. {@link #finish(Throwable)} is idempotent: it closes
+ * the source, removes the transient writability handler, releases any unwritten chunk it still owns, and on
+ * error closes the stream channel. Closing the stream channel fires {@code channelInactive}, which
+ * AsyncHttpClient's {@code Http2Handler.handleChannelInactive} turns into a stream-scoped failure of the
+ * request future — matching how the rest of the HTTP/2 path signals stream errors without touching sibling
+ * multiplexed streams.
+ *
+ * Reference counting. Each chunk {@link ByteBuf} is owned by this writer from allocation
+ * until it is wrapped in a {@link DefaultHttp2DataFrame} and handed to {@link Http2StreamChannel#write},
+ * after which Netty owns and releases it (including on write failure). The only buffer held across an await
+ * is the single {@code pending} chunk, which {@link #finish(Throwable)} releases if it was never written.
+ */
+final class Http2BodyWriter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Http2BodyWriter.class);
+
+ /**
+ * Sentinel returned by {@link ChunkSource#nextChunk(ByteBufAllocator)} to mean "no chunk is available
+ * right now, but the body is not finished" (e.g. a {@code FeedableBodyGenerator} awaiting a feed). The
+ * pump parks when it sees this; the source is responsible for arranging a resume via the {@link Runnable}
+ * passed to {@link ChunkSource#onResume(Runnable)}. It is never written and never released.
+ */
+ static final ByteBuf SUSPEND = Unpooled.EMPTY_BUFFER;
+
+ /**
+ * Supplies the body one bounded chunk at a time and owns the underlying source's cleanup. Implementations
+ * read at most one chunk per {@link #nextChunk(ByteBufAllocator)} call (no read-ahead of the whole source),
+ * matching the bounded behaviour of the HTTP/1.1 {@code ChunkedInput} path.
+ */
+ interface ChunkSource {
+
+ /**
+ * Reads the next chunk of the body.
+ *
+ * @param alloc allocator to use for the returned buffer
+ * @return a readable buffer with the next chunk; {@code null} when the body is fully consumed; or
+ * {@link #SUSPEND} when no chunk is available yet but more is expected. A returned data buffer
+ * is owned by the caller. If this method throws, it must not leak a buffer.
+ * @throws IOException if the chunk could not be read
+ */
+ ByteBuf nextChunk(ByteBufAllocator alloc) throws IOException;
+
+ /**
+ * Registers a one-shot callback the source must invoke (on or off the event loop) when, after having
+ * returned {@link #SUSPEND}, a chunk becomes available again. Only sources that can return
+ * {@code SUSPEND} need to honour this; others may ignore it. The pump arranges for the callback to be
+ * re-marshalled onto the event loop.
+ *
+ * @param resume callback that resumes the pump
+ */
+ default void onResume(Runnable resume) {
+ }
+
+ /**
+ * Releases the underlying source (stream, file, body). Called exactly once when the pump finishes,
+ * whether it succeeded or failed. Must not throw.
+ */
+ void close();
+ }
+
+ private final Http2StreamChannel channel;
+ private final ChunkSource source;
+
+ // Single buffered chunk read ahead so the final DATA frame can carry endStream=true. Owned by this
+ // writer until written; released by finish() if still set when the pump ends.
+ private ByteBuf pending;
+ private boolean done;
+
+ // Transient handler that resumes the pump when the channel becomes writable again. Added lazily the
+ // first time the pump parks, removed by finish().
+ private WritabilityResumeHandler resumeHandler;
+
+ // Set while the pump is parked on a ChunkSource.SUSPEND, to ignore spurious/duplicate feed resumes.
+ private boolean suspended;
+
+ private Http2BodyWriter(Http2StreamChannel channel, ChunkSource source) {
+ this.channel = channel;
+ this.source = source;
+ source.onResume(this::resumeFromSuspend);
+ // Guarantee cleanup if the stream is closed out from under a PARKED pump — i.e. one waiting on
+ // channelWritabilityChanged (flow-control window exhausted) or on a feed (ChunkSource.SUSPEND).
+ // In those states the last write already completed, so no write-future listener will fire; an
+ // external close (request timeout, server RST_STREAM, GOAWAY) would otherwise leave finish()
+ // uncalled, leaking the body source (file descriptor / InputStream / Body) and the pending chunk.
+ // finish() is idempotent and runs on the event loop (closeFuture fires there), so this is a no-op
+ // once the pump has completed normally. cause=null: the stream is already closed, so finish() only
+ // needs to release resources — the request future is failed separately via handleChannelInactive.
+ channel.closeFuture().addListener(f -> finish(null));
+ }
+
+ /**
+ * Resumes a pump parked on {@link #SUSPEND}. Safe to call from any thread; re-marshals onto the event
+ * loop and is a no-op unless the pump is currently suspended.
+ */
+ private void resumeFromSuspend() {
+ if (channel.eventLoop().inEventLoop()) {
+ if (suspended && !done) {
+ suspended = false;
+ pump();
+ }
+ } else {
+ channel.eventLoop().execute(this::resumeFromSuspend);
+ }
+ }
+
+ /**
+ * Starts streaming {@code source} to {@code channel}. Returns immediately; the body is written
+ * asynchronously on the channel's event loop. The caller (which has already written the HEADERS frame
+ * with {@code endStream=false}) must not write further frames on this stream.
+ */
+ static void start(Http2StreamChannel channel, ChunkSource source) {
+ Http2BodyWriter writer = new Http2BodyWriter(channel, source);
+ if (channel.eventLoop().inEventLoop()) {
+ writer.pump();
+ } else {
+ channel.eventLoop().execute(writer::pump);
+ }
+ }
+
+ /**
+ * Produces and writes chunks until the channel goes unwritable (then parks for
+ * {@code channelWritabilityChanged}) or the body is exhausted. Always runs on the event loop.
+ */
+ private void pump() {
+ if (done) {
+ return;
+ }
+ try {
+ while (true) {
+ if (done) {
+ // A write-failure listener can complete the pump synchronously (an already-failed write
+ // future fires inline); stop immediately rather than reading from the now-closed source.
+ return;
+ }
+ // Read one chunk ahead of what we write so we can mark the last DATA frame endStream=true.
+ ByteBuf next = source.nextChunk(channel.alloc());
+
+ if (next == SUSPEND) {
+ // No data available yet but the body is not finished (feedable body). Park until the
+ // source signals more via the onResume callback. Any already-buffered `pending` chunk is
+ // retained (O(1)); we deliberately do not flush an early endStream.
+ suspended = true;
+ return;
+ }
+
+ if (next == null) {
+ // End of body. `pending` (if any) is the last real chunk and must carry endStream=true;
+ // it can only be null here when the body was empty (no chunk was ever read).
+ ByteBuf last = pending;
+ pending = null;
+ ByteBuf terminal = last != null ? last
+ // Empty body — preserve existing behaviour: a single empty DATA frame ends the stream.
+ : channel.alloc().buffer(0);
+ writeLastFrame(terminal);
+ channel.flush();
+ return;
+ }
+
+ // We now hold two chunks at most: the previously buffered `pending` and the freshly read
+ // `next`. Write `pending` (not last, since `next` exists) and keep `next` buffered.
+ ByteBuf toWrite = pending;
+ pending = next;
+ if (toWrite != null) {
+ writeFrame(toWrite, false);
+ channel.flush();
+
+ if (!channel.isWritable()) {
+ // Flow-control window exhausted / high-water mark reached: stop producing and resume
+ // from channelWritabilityChanged. `pending` (one chunk) is retained until then.
+ ensureResumeHandler();
+ return;
+ }
+ }
+ // First iteration (toWrite == null): nothing written yet, loop to read the second chunk.
+ }
+ } catch (Throwable t) {
+ finish(t);
+ }
+ }
+
+ private void writeFrame(ByteBuf buf, boolean endStream) {
+ // Netty takes ownership of `buf` here and releases it (including on write failure). We only react
+ // to failure to fail the request future and clean up the source.
+ ChannelFuture wf = channel.write(new DefaultHttp2DataFrame(buf, endStream));
+ wf.addListener(f -> {
+ if (!f.isSuccess()) {
+ finish(f.cause());
+ }
+ });
+ }
+
+ /**
+ * Writes the terminal DATA frame (endStream=true) and defers cleanup until that write completes, mirroring
+ * the HTTP/1.1 path's {@code WriteProgressListener.operationComplete -> closeSilently}. On success the
+ * source is closed; on failure the stream is failed via {@link #finish(Throwable)}.
+ */
+ private void writeLastFrame(ByteBuf buf) {
+ ChannelFuture wf = channel.write(new DefaultHttp2DataFrame(buf, true));
+ wf.addListener(f -> finish(f.isSuccess() ? null : f.cause()));
+ }
+
+ private void ensureResumeHandler() {
+ if (resumeHandler == null) {
+ resumeHandler = new WritabilityResumeHandler();
+ channel.pipeline().addLast(resumeHandler);
+ }
+ }
+
+ /**
+ * Completes the pump exactly once. Closes the source (this is where the body / input stream / file is
+ * released), removes the transient writability handler, releases any unwritten chunk, and on error closes
+ * the stream channel so the request future is failed via {@code handleChannelInactive}.
+ */
+ private void finish(Throwable cause) {
+ if (done) {
+ return;
+ }
+ done = true;
+
+ if (pending != null) {
+ pending.release();
+ pending = null;
+ }
+
+ if (resumeHandler != null) {
+ // removeLast/remove may be called from within the handler's own callback; Netty handles this.
+ if (channel.pipeline().context(resumeHandler) != null) {
+ channel.pipeline().remove(resumeHandler);
+ }
+ resumeHandler = null;
+ }
+
+ try {
+ source.close();
+ } catch (Throwable t) {
+ LOGGER.warn("Failed to close HTTP/2 request body source", t);
+ }
+
+ if (cause != null) {
+ LOGGER.debug("HTTP/2 request body streaming failed; closing stream", cause);
+ // Signal the failure the way the rest of the HTTP/2 path does: closing the stream child channel
+ // fires channelInactive -> Http2Handler.handleChannelInactive -> streamFailed, which aborts this
+ // stream's future without disturbing sibling multiplexed streams.
+ channel.close();
+ }
+ }
+
+ /**
+ * Resumes the parked pump when the stream channel becomes writable again.
+ */
+ private final class WritabilityResumeHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) {
+ if (!done && ctx.channel().isWritable()) {
+ pump();
+ }
+ ctx.fireChannelWritabilityChanged();
+ }
+ }
+}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java
index b4145ef07..0b6192f72 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyBodyBody.java
@@ -16,10 +16,10 @@
package org.asynchttpclient.netty.request.body;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -95,31 +95,85 @@ public void operationComplete(ChannelProgressiveFuture cf) {
@Override
public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture> future) throws IOException {
- try {
- ByteBuf buf = channel.alloc().buffer(8192);
- ByteBuf pending = null;
- while (true) {
- buf.clear();
- BodyState state = body.transferTo(buf);
- if (buf.isReadable()) {
- if (pending != null) {
- channel.write(new DefaultHttp2DataFrame(pending, false));
+ // Stream the body one bounded chunk at a time with HTTP/2 flow control / writability backpressure,
+ // so a large body does not buffer in heap or get drained inline on the event loop. Cleanup
+ // (closeSilently(body)) happens when the async pump completes — see BodyChunkSource.close.
+ BodyGenerator bg = future.getTargetRequest().getBodyGenerator();
+ FeedableBodyGenerator feedable = bg instanceof FeedableBodyGenerator ? (FeedableBodyGenerator) bg : null;
+ Http2BodyWriter.start(channel, new BodyChunkSource(body, feedable));
+ }
+
+ /**
+ * Drains a {@link Body} in {@link #CHUNK_SIZE}-bounded chunks for {@link Http2BodyWriter}. A
+ * {@link FeedableBodyGenerator} that has no data yet ({@link BodyState#SUSPEND}) parks the pump; the
+ * generator's {@link FeedListener} resumes it when more content is fed — mirroring how the HTTP/1.1 path
+ * uses {@code ChunkedWriteHandler.resumeTransfer()}.
+ */
+ private static final class BodyChunkSource implements Http2BodyWriter.ChunkSource {
+
+ private static final int CHUNK_SIZE = 8192;
+
+ private final Body body;
+ private final FeedableBodyGenerator feedable;
+
+ BodyChunkSource(Body body, FeedableBodyGenerator feedable) {
+ this.body = body;
+ this.feedable = feedable;
+ }
+
+ @Override
+ public ByteBuf nextChunk(ByteBufAllocator alloc) throws IOException {
+ ByteBuf buf = alloc.buffer(CHUNK_SIZE);
+ try {
+ while (true) {
+ buf.clear();
+ BodyState state = body.transferTo(buf);
+ if (buf.isReadable()) {
+ ByteBuf chunk = buf;
+ buf = null; // ownership transferred to caller
+ return chunk;
+ }
+ switch (state) {
+ case STOP:
+ return null;
+ case SUSPEND:
+ return Http2BodyWriter.SUSPEND;
+ case CONTINUE:
+ // No data produced this turn but the body continues; retry. Finite in-memory
+ // bodies never hit this — it only matters for generators that momentarily yield
+ // nothing without suspending.
+ break;
+ default:
+ throw new IllegalStateException("Unknown body state: " + state);
}
- pending = buf;
- buf = channel.alloc().buffer(8192);
}
- if (state == BodyState.STOP) {
- break;
+ } finally {
+ if (buf != null) {
+ buf.release();
}
}
- buf.release();
- if (pending != null) {
- channel.write(new DefaultHttp2DataFrame(pending, true));
- } else {
- channel.write(new DefaultHttp2DataFrame(channel.alloc().buffer(0), true));
+ }
+
+ @Override
+ public void onResume(Runnable resume) {
+ // Only feedable generators can return SUSPEND; wire their feed notification to resume the pump.
+ if (feedable != null) {
+ feedable.setListener(new FeedListener() {
+ @Override
+ public void onContentAdded() {
+ resume.run();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // The pump's own read/write error handling closes the stream; nothing to do here.
+ }
+ });
}
- channel.flush();
- } finally {
+ }
+
+ @Override
+ public void close() {
closeSilently(body);
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyByteBufBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyByteBufBody.java
index d236cdade..6be21e1f6 100644
--- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyByteBufBody.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyByteBufBody.java
@@ -45,6 +45,13 @@ public CharSequence getContentTypeOverride() {
@Override
public ByteBuf byteBuf() {
- return bb;
+ // Hand out a retained duplicate, NOT the user's buffer itself. The returned buffer is wrapped into
+ // a DefaultFullHttpRequest that gets read and released when the request is written (or released
+ // on an HTTP/2 early-abort) — if that were the user's buffer, it would be consumed and freed, so a
+ // retry (redirect / auth replay / IOExceptionFilter) that rebuilds the request from the same
+ // user buffer would double-free it (IllegalReferenceCountException) and re-send an emptied body.
+ // The duplicate has independent reader/writer indices and shares the refcount, so each send owns
+ // its own reference and the user's original buffer (and its content) is preserved across retries.
+ return bb.retainedDuplicate();
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java
index 01fb5d175..66ee644fa 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyFileBody.java
@@ -16,10 +16,10 @@
package org.asynchttpclient.netty.request.body;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.stream.ChunkedNioFile;
import org.asynchttpclient.AsyncHttpClientConfig;
@@ -30,9 +30,10 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import static org.asynchttpclient.util.MiscUtils.closeSilently;
+
public class NettyFileBody implements NettyBody {
private final File file;
@@ -78,25 +79,60 @@ public void write(Channel channel, NettyResponseFuture> future) throws IOExcep
@Override
public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture> future) throws IOException {
- int chunkSize = config.getChunkedFileChunkSize();
- try (RandomAccessFile raf = new RandomAccessFile(file, "r");
- FileChannel fileChannel = raf.getChannel()) {
- long remaining = length;
- long pos = offset;
- while (remaining > 0) {
- int toRead = (int) Math.min(chunkSize, remaining);
- ByteBuf buf = channel.alloc().buffer(toRead);
+ // Stream the file region one bounded chunk at a time with HTTP/2 flow control / writability
+ // backpressure, so a large file upload does not buffer in heap or read the whole file inline on the
+ // event loop. The file is opened here so an open failure still surfaces synchronously to the caller's
+ // openHttp2Stream catch; cleanup happens when the async pump completes (see FileChunkSource.close).
+ Http2BodyWriter.start(channel, new FileChunkSource(file, offset, length, config.getChunkedFileChunkSize()));
+ }
+
+ /**
+ * Reads a file region in {@code chunkSize}-bounded chunks for {@link Http2BodyWriter}.
+ */
+ private static final class FileChunkSource implements Http2BodyWriter.ChunkSource {
+
+ private final RandomAccessFile raf;
+ private final FileChannel fileChannel;
+ private final int chunkSize;
+ private long remaining;
+ private long pos;
+
+ FileChunkSource(File file, long offset, long length, int chunkSize) throws IOException {
+ this.raf = new RandomAccessFile(file, "r");
+ this.fileChannel = raf.getChannel();
+ this.chunkSize = chunkSize;
+ this.remaining = length;
+ this.pos = offset;
+ }
+
+ @Override
+ public ByteBuf nextChunk(ByteBufAllocator alloc) throws IOException {
+ if (remaining <= 0) {
+ return null;
+ }
+ int toRead = (int) Math.min(chunkSize, remaining);
+ ByteBuf buf = alloc.buffer(toRead);
+ try {
int read = buf.writeBytes(fileChannel, pos, toRead);
if (read <= 0) {
+ // Truncated file relative to the declared length — treat as end of body.
buf.release();
- break;
+ remaining = 0;
+ return null;
}
remaining -= read;
pos += read;
- boolean last = remaining <= 0;
- channel.write(new DefaultHttp2DataFrame(buf, last));
+ return buf;
+ } catch (IOException | RuntimeException e) {
+ buf.release();
+ throw e;
}
- channel.flush();
+ }
+
+ @Override
+ public void close() {
+ closeSilently(fileChannel);
+ closeSilently(raf);
}
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java
index 61517b651..32dbdc0fe 100755
--- a/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java
+++ b/client/src/main/java/org/asynchttpclient/netty/request/body/NettyInputStreamBody.java
@@ -16,10 +16,10 @@
package org.asynchttpclient.netty.request.body;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.stream.ChunkedStream;
import org.asynchttpclient.netty.NettyResponseFuture;
@@ -91,32 +91,64 @@ public void writeHttp2(Http2StreamChannel channel, NettyResponseFuture> future
if (is.markSupported()) {
is.reset();
} else {
- LOGGER.warn("Stream has already been consumed and cannot be reset");
- return;
+ // The HEADERS frame was already written with endStream=false (sendHttp2Frames), so silently
+ // returning would leave the stream half-open with no terminating DATA frame — the request
+ // would then hang until it times out (the Issue #2160 silent-timeout class). A non-resettable
+ // InputStream cannot be replayed (retry / redirect / auth), so fail the stream explicitly:
+ // the caller (openHttp2Stream / sendHttp2RequestBody) aborts this single stream on the
+ // IOException, leaving sibling multiplexed streams untouched.
+ throw new IOException("HTTP/2 request body InputStream already consumed and cannot be reset for a retry");
}
} else {
future.setStreamConsumed(true);
}
- try {
- // Read all data into chunks, then send with last frame having endStream=true
- byte[] buffer = new byte[8192];
- ByteBuf pending = null;
+ // Stream the InputStream one bounded chunk at a time with HTTP/2 flow control / writability
+ // backpressure, so a large upload does not buffer the whole stream in heap or read it all inline on
+ // the event loop. Cleanup (closeSilently) happens when the async pump completes — see
+ // InputStreamChunkSource.close.
+ Http2BodyWriter.start(channel, new InputStreamChunkSource(is));
+ }
+
+ /**
+ * Reads an {@link InputStream} in fixed-size chunks for {@link Http2BodyWriter}.
+ */
+ private static final class InputStreamChunkSource implements Http2BodyWriter.ChunkSource {
+
+ private static final int CHUNK_SIZE = 8192;
+
+ private final InputStream is;
+ private final byte[] buffer = new byte[CHUNK_SIZE];
+
+ InputStreamChunkSource(InputStream is) {
+ this.is = is;
+ }
+
+ @Override
+ public ByteBuf nextChunk(ByteBufAllocator alloc) throws IOException {
+ // Blocking InputStream.read returns >0 (data), or -1 (EOF). A transient 0 is possible for some
+ // streams; loop on it so we never emit an empty non-final DATA frame nor end the stream early. The
+ // read blocks until data or EOF, so this does not busy-spin.
int read;
- while ((read = is.read(buffer)) != -1) {
- if (pending != null) {
- channel.write(new DefaultHttp2DataFrame(pending, false));
- }
- pending = channel.alloc().buffer(read);
- pending.writeBytes(buffer, 0, read);
+ do {
+ read = is.read(buffer);
+ } while (read == 0);
+
+ if (read == -1) {
+ return null;
}
- if (pending != null) {
- channel.write(new DefaultHttp2DataFrame(pending, true));
- } else {
- channel.write(new DefaultHttp2DataFrame(channel.alloc().buffer(0), true));
+ ByteBuf buf = alloc.buffer(read);
+ try {
+ buf.writeBytes(buffer, 0, read);
+ return buf;
+ } catch (RuntimeException e) {
+ buf.release();
+ throw e;
}
- channel.flush();
- } finally {
+ }
+
+ @Override
+ public void close() {
closeSilently(is);
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java b/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java
index da0feae8c..e5fd00e8e 100644
--- a/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java
+++ b/client/src/main/java/org/asynchttpclient/netty/ssl/DefaultSslEngineFactory.java
@@ -35,8 +35,11 @@
public class DefaultSslEngineFactory extends SslEngineFactoryBase {
private volatile SslContext sslContext;
+ // WebSocket connections use a context that advertises only http/1.1 in ALPN: AsyncHttpClient does not
+ // implement RFC 8441 (WebSocket over HTTP/2), so the server must not be able to negotiate h2 for them.
+ private volatile SslContext http1OnlySslContext;
- private SslContext buildSslContext(AsyncHttpClientConfig config) throws SSLException {
+ private SslContext buildSslContext(AsyncHttpClientConfig config, boolean http2Allowed) throws SSLException {
if (config.getSslContext() != null) {
return config.getSslContext();
}
@@ -64,12 +67,16 @@ private SslContext buildSslContext(AsyncHttpClientConfig config) throws SSLExcep
config.isDisableHttpsEndpointIdentificationAlgorithm() ? "" : "HTTPS");
if (config.isHttp2Enabled()) {
+ // For a WebSocket connection (http2Allowed=false) advertise only http/1.1, so the server cannot
+ // select h2 (which AHC cannot speak for WebSocket — no RFC 8441). Otherwise advertise h2 then http/1.1.
+ String[] protocols = http2Allowed
+ ? new String[]{ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1}
+ : new String[]{ApplicationProtocolNames.HTTP_1_1};
sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
- ApplicationProtocolNames.HTTP_2,
- ApplicationProtocolNames.HTTP_1_1));
+ protocols));
}
return configureSslContextBuilder(sslContextBuilder).build();
@@ -77,21 +84,65 @@ private SslContext buildSslContext(AsyncHttpClientConfig config) throws SSLExcep
@Override
public SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort) {
+ return newSslEngine(config, peerHost, peerPort, true);
+ }
+
+ @Override
+ public SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort, boolean http2Allowed) {
+ SslContext context = http2Allowed ? sslContext : http1OnlySslContext(config);
SSLEngine sslEngine = config.isDisableHttpsEndpointIdentificationAlgorithm() ?
- sslContext.newEngine(ByteBufAllocator.DEFAULT) :
- sslContext.newEngine(ByteBufAllocator.DEFAULT, domain(peerHost), peerPort);
+ context.newEngine(ByteBufAllocator.DEFAULT) :
+ context.newEngine(ByteBufAllocator.DEFAULT, domain(peerHost), peerPort);
configureSslEngine(sslEngine, config);
return sslEngine;
}
+ /**
+ * Returns the context for a WebSocket connection, which must advertise only http/1.1 (AsyncHttpClient does
+ * not implement RFC 8441, WebSocket over HTTP/2). Built lazily and cached on first use so a client that
+ * never opens a {@code wss://} connection never pays for a second {@link SslContext}.
+ *
+ * Only a self-built, h2-enabled context advertises h2 and therefore needs a separate http/1.1-only variant;
+ * a user-supplied context or an h2-disabled one already negotiates http/1.1, so it is reused (which also
+ * avoids double-releasing it in {@link #destroy()}). Note: a user-supplied
+ * {@link AsyncHttpClientConfig#getSslContext()} is used as-is for every connection type — if it advertises
+ * h2 in ALPN, a {@code wss://} connection may still negotiate h2, which AHC cannot speak for WebSocket. A
+ * caller needing WebSocket with a custom context must supply one that negotiates http/1.1.
+ */
+ private SslContext http1OnlySslContext(AsyncHttpClientConfig config) {
+ SslContext ctx = http1OnlySslContext;
+ if (ctx == null) {
+ synchronized (this) {
+ ctx = http1OnlySslContext;
+ if (ctx == null) {
+ if (config.getSslContext() != null || !config.isHttp2Enabled()) {
+ ctx = sslContext;
+ } else {
+ try {
+ ctx = buildSslContext(config, false);
+ } catch (SSLException e) {
+ throw new RuntimeException("Failed to build the http/1.1-only SslContext for WebSocket", e);
+ }
+ }
+ http1OnlySslContext = ctx;
+ }
+ }
+ }
+ return ctx;
+ }
+
@Override
public void init(AsyncHttpClientConfig config) throws SSLException {
- sslContext = buildSslContext(config);
+ sslContext = buildSslContext(config, true);
+ // http1OnlySslContext is built lazily on the first WebSocket connection — see http1OnlySslContext().
}
@Override
public void destroy() {
ReferenceCountUtil.release(sslContext);
+ if (http1OnlySslContext != null && http1OnlySslContext != sslContext) {
+ ReferenceCountUtil.release(http1OnlySslContext);
+ }
}
/**
diff --git a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties
index 9a9e06c99..1a41778b8 100644
--- a/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties
+++ b/client/src/main/resources/org/asynchttpclient/config/ahc-default.properties
@@ -60,5 +60,6 @@ org.asynchttpclient.http2MaxFrameSize=16384
org.asynchttpclient.http2HeaderTableSize=4096
org.asynchttpclient.http2MaxHeaderListSize=8192
org.asynchttpclient.http2MaxConcurrentStreams=-1
+org.asynchttpclient.http2MaxDecompressedResponseSize=268435456
org.asynchttpclient.http2PingInterval=PT0S
org.asynchttpclient.http2CleartextEnabled=false
diff --git a/client/src/test/java/org/asynchttpclient/ByteBufBodyReleaseOnAbortTest.java b/client/src/test/java/org/asynchttpclient/ByteBufBodyReleaseOnAbortTest.java
new file mode 100644
index 000000000..23e32ee23
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/ByteBufBodyReleaseOnAbortTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.asynchttpclient;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import org.asynchttpclient.netty.request.body.NettyByteBufBody;
+import org.junit.jupiter.api.Test;
+
+import java.net.ServerSocket;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+import static org.asynchttpclient.Dsl.config;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Regression test for the {@code setBody(ByteBuf)} request-body leak on an abort-before-write.
+ *
+ * {@link org.asynchttpclient.netty.request.body.NettyByteBufBody#byteBuf()} hands a {@code retainedDuplicate}
+ * of the caller's buffer to the {@code DefaultFullHttpRequest} (so a retry/redirect can re-send the body
+ * without double-freeing the user's buffer). On the normal HTTP/1.1 path Netty's encoder releases that
+ * duplicate after the write; but on an abort BEFORE the request is written (connection failure,
+ * {@code onRequestSend} crash, pool closed) nothing in the HTTP/1.1 path used to release it — leaking the
+ * extra reference (the user's own release would only take the shared count {@code 2 -> 1}, never freeing the
+ * orphaned duplicate). The fix releases the never-written request on the terminal/abort path, gated on
+ * whether it was handed to a channel encoder, so it can never double-free.
+ */
+public class ByteBufBodyReleaseOnAbortTest {
+
+ @Test
+ public void byteBufBodyReleasedOnConnectFailureBeforeWrite() throws Exception {
+ // A reference-counted request body owned by the caller (refCnt == 1).
+ ByteBuf body = UnpooledByteBufAllocator.DEFAULT.buffer().writeBytes("payload-bytes".getBytes());
+ assertEquals(1, body.refCnt());
+
+ // A port nothing listens on, so the request aborts at connect — before it is ever written.
+ int closedPort;
+ try (ServerSocket ss = new ServerSocket(0)) {
+ closedPort = ss.getLocalPort();
+ }
+
+ try (AsyncHttpClient client = asyncHttpClient(config()
+ .setConnectTimeout(Duration.ofSeconds(2))
+ .setMaxRequestRetry(0))) {
+ assertThrows(ExecutionException.class, () ->
+ client.preparePost("http://localhost:" + closedPort + "/")
+ .setBody(body)
+ .execute().get(10, SECONDS));
+ }
+
+ assertEquals(1, body.refCnt(),
+ "AHC must release its internal retained-duplicate of the request body on an abort-before-write, "
+ + "leaving the caller's buffer at its original refCnt (otherwise the duplicate leaks)");
+
+ // The caller still fully owns the buffer and can release it.
+ body.release();
+ assertEquals(0, body.refCnt());
+ }
+
+ /**
+ * Directly pins the ownership contract that the abort test above cannot distinguish from a pre-fix revert
+ * (both leave the caller's buffer at refCnt 1 on an abort-before-write). {@code byteBuf()} must hand out a
+ * distinct retained duplicate — not the caller's own buffer — so each send/retry owns its own
+ * reference and the caller's buffer survives. Reverting it to {@code return bb} makes this fail.
+ */
+ @Test
+ public void byteBufBodyHandsOutDistinctRetainedDuplicate() {
+ ByteBuf body = UnpooledByteBufAllocator.DEFAULT.buffer().writeBytes("payload-bytes".getBytes());
+ assertEquals(1, body.refCnt());
+
+ ByteBuf handed = new NettyByteBufBody(body).byteBuf();
+
+ assertNotSame(body, handed, "byteBuf() must hand out a duplicate, never the caller's own buffer");
+ assertEquals(2, body.refCnt(),
+ "byteBuf() must retain (shared refcount 1 -> 2) so the caller's buffer survives the send and retries");
+
+ // Releasing the duplicate (as Netty's encoder, or an abort, does) returns to the caller's single reference.
+ handed.release();
+ assertEquals(1, body.refCnt());
+
+ body.release();
+ assertEquals(0, body.refCnt());
+ }
+}
diff --git a/client/src/test/java/org/asynchttpclient/DefaultAsyncHttpClientConfigTest.java b/client/src/test/java/org/asynchttpclient/DefaultAsyncHttpClientConfigTest.java
index 1548d6812..d97a61d8a 100644
--- a/client/src/test/java/org/asynchttpclient/DefaultAsyncHttpClientConfigTest.java
+++ b/client/src/test/java/org/asynchttpclient/DefaultAsyncHttpClientConfigTest.java
@@ -2,10 +2,36 @@
import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
class DefaultAsyncHttpClientConfigTest {
+
+ @Test
+ void testHttp2MaxDecompressedResponseSize_DefaultIs256MiB() {
+ DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().build();
+ assertEquals(256L * 1024 * 1024, config.getHttp2MaxDecompressedResponseSize(),
+ "Default HTTP/2 max decompressed response size should be 256 MiB");
+ }
+
+ @Test
+ void testHttp2MaxDecompressedResponseSize_SetCustom() {
+ DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
+ .setHttp2MaxDecompressedResponseSize(123_456L)
+ .build();
+ assertEquals(123_456L, config.getHttp2MaxDecompressedResponseSize(),
+ "Builder must plumb the configured value through (the knob was previously unconfigurable)");
+ }
+
+ @Test
+ void testHttp2MaxDecompressedResponseSize_ZeroDisables() {
+ DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
+ .setHttp2MaxDecompressedResponseSize(0L)
+ .build();
+ assertEquals(0L, config.getHttp2MaxDecompressedResponseSize(), "0 must disable the limit");
+ }
+
@Test
void testStripAuthorizationOnRedirect_DefaultIsFalse() {
DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().build();
diff --git a/client/src/test/java/org/asynchttpclient/Http2ConformanceRegressionTest.java b/client/src/test/java/org/asynchttpclient/Http2ConformanceRegressionTest.java
new file mode 100644
index 000000000..01b7b856d
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/Http2ConformanceRegressionTest.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (c) 2014-2026 AsyncHttpClient Project. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.asynchttpclient;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.pkitesting.CertificateBuilder;
+import io.netty.pkitesting.X509Bundle;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import org.asynchttpclient.AsyncHandler.State;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.asynchttpclient.Dsl.asyncHttpClient;
+import static org.asynchttpclient.Dsl.config;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Protocol-conformance regression tests for the residual HTTP/2 review fixes:
+ *
+ *
#6 1xx interim responses (103 Early Hints) must NOT be delivered as the final status
+ * ({@code onStatusReceived} must fire exactly once, for the real response).
+ *
#7 {@code Expect: 100-continue} over HTTP/2 must defer the body until the server's 100 and
+ * then send it (previously the resume mis-routed to the HTTP/1.1 writer and failed).
+ *
#11 {@code TE} must be stripped unless its value is exactly {@code trailers} (RFC 9113 §8.2.2).