Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,16 @@ default int getHttp2MaxConcurrentStreams() {
return -1;
}

/**
* @return the maximum number of bytes a single HTTP/2 response body may decompress to before the stream
* is failed. Guards against decompression-bomb responses — a tiny compressed body that inflates
* to gigabytes and can OOM the client (the limit is enforced transparently when automatic
* decompression is enabled). {@code 0} disables the limit. Defaults to 256 MiB.
*/
default long getHttp2MaxDecompressedResponseSize() {
return 256L * 1024 * 1024;
}

/**
* @return the interval between HTTP/2 PING keepalive frames, {@link Duration#ZERO} disables pinging
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2HeaderTableSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2InitialWindowSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxConcurrentStreams;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxDecompressedResponseSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxFrameSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2MaxHeaderListSize;
import static org.asynchttpclient.config.AsyncHttpClientConfigDefaults.defaultHttp2PingInterval;
Expand Down Expand Up @@ -181,6 +182,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final int http2HeaderTableSize;
private final int http2MaxHeaderListSize;
private final int http2MaxConcurrentStreams;
private final long http2MaxDecompressedResponseSize;
private final Duration http2PingInterval;
private final boolean http2CleartextEnabled;

Expand Down Expand Up @@ -277,6 +279,7 @@ private DefaultAsyncHttpClientConfig(// http
int http2HeaderTableSize,
int http2MaxHeaderListSize,
int http2MaxConcurrentStreams,
long http2MaxDecompressedResponseSize,
Duration http2PingInterval,
boolean http2CleartextEnabled,

Expand Down Expand Up @@ -381,6 +384,7 @@ private DefaultAsyncHttpClientConfig(// http
this.http2HeaderTableSize = http2HeaderTableSize;
this.http2MaxHeaderListSize = http2MaxHeaderListSize;
this.http2MaxConcurrentStreams = http2MaxConcurrentStreams;
this.http2MaxDecompressedResponseSize = http2MaxDecompressedResponseSize;
this.http2PingInterval = http2PingInterval;
this.http2CleartextEnabled = http2CleartextEnabled;

Expand Down Expand Up @@ -682,6 +686,11 @@ public int getHttp2MaxConcurrentStreams() {
return http2MaxConcurrentStreams;
}

@Override
public long getHttp2MaxDecompressedResponseSize() {
return http2MaxDecompressedResponseSize;
}

@Override
public Duration getHttp2PingInterval() {
return http2PingInterval;
Expand Down Expand Up @@ -942,6 +951,7 @@ public static class Builder {
private int http2HeaderTableSize = defaultHttp2HeaderTableSize();
private int http2MaxHeaderListSize = defaultHttp2MaxHeaderListSize();
private int http2MaxConcurrentStreams = defaultHttp2MaxConcurrentStreams();
private long http2MaxDecompressedResponseSize = defaultHttp2MaxDecompressedResponseSize();
private Duration http2PingInterval = defaultHttp2PingInterval();
private boolean http2CleartextEnabled = defaultHttp2CleartextEnabled();

Expand Down Expand Up @@ -1043,6 +1053,7 @@ public Builder(AsyncHttpClientConfig config) {
http2HeaderTableSize = config.getHttp2HeaderTableSize();
http2MaxHeaderListSize = config.getHttp2MaxHeaderListSize();
http2MaxConcurrentStreams = config.getHttp2MaxConcurrentStreams();
http2MaxDecompressedResponseSize = config.getHttp2MaxDecompressedResponseSize();
http2PingInterval = config.getHttp2PingInterval();
http2CleartextEnabled = config.isHttp2CleartextEnabled();

Expand Down Expand Up @@ -1391,6 +1402,11 @@ public Builder setHttp2MaxConcurrentStreams(int http2MaxConcurrentStreams) {
return this;
}

public Builder setHttp2MaxDecompressedResponseSize(long http2MaxDecompressedResponseSize) {
this.http2MaxDecompressedResponseSize = http2MaxDecompressedResponseSize;
return this;
}

public Builder setHttp2PingInterval(Duration http2PingInterval) {
this.http2PingInterval = http2PingInterval;
return this;
Expand Down Expand Up @@ -1658,6 +1674,7 @@ public DefaultAsyncHttpClientConfig build() {
http2HeaderTableSize,
http2MaxHeaderListSize,
http2MaxConcurrentStreams,
http2MaxDecompressedResponseSize,
http2PingInterval,
http2CleartextEnabled,
requestFilters.isEmpty() ? Collections.emptyList() : Collections.unmodifiableList(requestFilters),
Expand Down
12 changes: 12 additions & 0 deletions client/src/main/java/org/asynchttpclient/RequestBuilderBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,18 @@ public T setBody(ByteBuffer data) {
return asDerivedType();
}

/**
* Sets the request body from a Netty {@link ByteBuf}.
* <p>
* <strong>Ownership:</strong> the caller retains ownership of {@code data}. AsyncHttpClient sends a
* retained duplicate per attempt (so redirects, auth replays and retries each get their own reference and
* the body survives across them) and never releases {@code data} itself. The caller is responsible for
* releasing {@code data} once the request has completed. (This differs from older releases, which consumed
* and released the buffer on the first send — and double-freed it on any retry.)
*
* @param data the request body; the caller keeps ownership and must release it after the request completes
* @return this builder
*/
public T setBody(ByteBuf data) {
resetBody();
byteBufData = data;
Expand Down
20 changes: 20 additions & 0 deletions client/src/main/java/org/asynchttpclient/SslEngineFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ public interface SslEngineFactory {
*/
SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort);

/**
* Creates a new {@link SSLEngine}, optionally permitting HTTP/2 (ALPN {@code h2}) negotiation.
* <p>
* WebSocket connections pass {@code http2Allowed = false}: AsyncHttpClient does not implement RFC 8441
* (WebSocket over HTTP/2), so a WebSocket connection must not negotiate {@code h2} — otherwise the
* handshake is written as a plain HTTP/2 request and corrupts the connection. The default implementation
* ignores the flag and delegates to {@link #newSslEngine(AsyncHttpClientConfig, String, int)} for
* backwards compatibility; {@code DefaultSslEngineFactory} overrides it to advertise only {@code http/1.1}
* in ALPN when {@code http2Allowed} is {@code false}.
*
* @param config the client config
* @param peerHost the peer hostname
* @param peerPort the peer port
* @param http2Allowed whether HTTP/2 (ALPN {@code h2}) may be negotiated on this connection
* @return new engine
*/
default SSLEngine newSslEngine(AsyncHttpClientConfig config, String peerHost, int peerPort, boolean http2Allowed) {
return newSslEngine(config, peerHost, peerPort);
}

/**
* Perform any necessary one-time configuration. This will be called just once before {@code newSslEngine} is called
* for the first time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class AsyncHttpClientConfigDefaults {
public static final String HTTP2_HEADER_TABLE_SIZE_CONFIG = "http2HeaderTableSize";
public static final String HTTP2_MAX_HEADER_LIST_SIZE_CONFIG = "http2MaxHeaderListSize";
public static final String HTTP2_MAX_CONCURRENT_STREAMS_CONFIG = "http2MaxConcurrentStreams";
public static final String HTTP2_MAX_DECOMPRESSED_RESPONSE_SIZE_CONFIG = "http2MaxDecompressedResponseSize";
public static final String HTTP2_PING_INTERVAL_CONFIG = "http2PingInterval";
public static final String HTTP2_CLEARTEXT_ENABLED_CONFIG = "http2CleartextEnabled";

Expand Down Expand Up @@ -360,6 +361,11 @@ public static int defaultHttp2MaxConcurrentStreams() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_CONCURRENT_STREAMS_CONFIG);
}

public static long defaultHttp2MaxDecompressedResponseSize() {
// getInt suffices for the 256 MiB default; values above Integer.MAX_VALUE are set via the builder.
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_MAX_DECOMPRESSED_RESPONSE_SIZE_CONFIG);
}

public static Duration defaultHttp2PingInterval() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getDuration(ASYNC_CLIENT_CONFIG_ROOT + HTTP2_PING_INTERVAL_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public boolean cancel(boolean force) {
return false;
}

releaseRequestIfNotHandedToChannel();

final Channel ch = channel; //atomic read, so that it won't end up in TOCTOU
if (ch != null) {
Channels.setDiscard(ch);
Expand Down Expand Up @@ -256,7 +258,29 @@ private boolean terminateAndExit() {
cancelTimeouts();
channel = null;
reuseChannel = false;
return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
boolean alreadyTerminated = IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
if (!alreadyTerminated) {
releaseRequestIfNotHandedToChannel();
}
return alreadyTerminated;
}

/**
* Frees the request body buffer when the request was never handed to a channel encoder. On the HTTP/1.1
* success path Netty's encoder releases {@code httpRequest} after the write; but on an abort/cancel BEFORE
* the write (connect failure, onRequestSend crash, pool closed, cancellation) — or on replacement during a
* redirect/retry — nothing else would, leaking a {@code setBody(ByteBuf)} retained duplicate. In the
* HTTP/2 path {@code httpRequest} is never written to a channel, so AHC always owns its release.
* {@link NettyRequest#release()} is idempotent (CAS), so this never double-frees the encoder or the
* explicit HTTP/2 releases.
*/
private void releaseRequestIfNotHandedToChannel() {
NettyRequest request = nettyRequest;
if (request != null) {
// release() atomically no-ops if the request was already handed to the channel encoder (which then
// owns the release), so there is no check-then-act race with the concurrent event-loop write.
request.release();
}
}

@Override
Expand Down Expand Up @@ -353,6 +377,13 @@ public NettyRequest getNettyRequest() {
}

public void setNettyRequest(NettyRequest nettyRequest) {
// On a redirect/auth/retry the request is rebuilt; release the previous one if it was never written,
// so its body buffer is not leaked. The replaced request is normally already written (handed to the
// channel) or already released, in which case this is a no-op (release() is idempotent).
NettyRequest previous = this.nettyRequest;
if (previous != null && previous != nettyRequest && !previous.isHandedToChannel()) {
previous.release();
}
this.nettyRequest = nettyRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,20 +360,44 @@ public void registerHttp2Connection(Object partitionKey, Channel channel) {
if (state != null) {
state.setPartitionKey(partitionKey);
}
http2Connections.put(partitionKey, channel);
// Coalesce connections opened concurrently for the same partition (thundering herd): keep the
// first live one canonical in the registry. A connection that lost the race is marked redundant,
// so it serves only its own opening request and then closes (its stream's closeFuture listener
// closes the parent once activeStreams hits 0) instead of lingering open and unregistered. A dead
// registered entry is replaced.
Channel existing = http2Connections.putIfAbsent(partitionKey, channel);
if (existing != null && existing != channel) {
boolean replacedDead = !existing.isActive() && http2Connections.replace(partitionKey, existing, channel);
if (!replacedDead && state != null) {
state.markRedundant();
}
}
// When the connection closes, remove it from the registry AND fail any requests still queued
// for a stream slot. Without the latter, requests sitting in pendingOpeners when the parent
// connection drops have no stream channel (so no channelInactive is ever delivered for them)
// and would survive only until the request timeout fires — the silent-timeout bug of #2160.
channel.closeFuture().addListener(future -> {
removeHttp2Connection(partitionKey, channel);
if (state != null) {
state.failPendingOpeners(orphan ->
orphan.abort(new IOException("HTTP/2 connection closed before a stream could be opened")));
state.failPendingOpeners(orphan -> failOrphanedH2Opener(orphan,
"HTTP/2 connection closed before a stream could be opened"));
}
});
}

/**
* Fails a request that was queued in {@link Http2ConnectionState} waiting for a stream slot but can
* never get one (the connection dropped or started draining). Releases its request body first —
* {@code sendHttp2Frames} never ran for it, so nothing else will — to avoid leaking the body ByteBuf.
* {@code NettyRequest.release()} is idempotent, so this is safe even if another path also releases.
*/
private static void failOrphanedH2Opener(NettyResponseFuture<?> orphan, String message) {
if (orphan.getNettyRequest() != null) {
orphan.getNettyRequest().release();
}
orphan.abort(new IOException(message));
}

/**
* Removes an HTTP/2 connection from the registry, but only if it's the currently registered
* connection for that partition key (avoids removing a replacement connection).
Expand Down Expand Up @@ -466,8 +490,8 @@ private HttpClientCodec newHttpClientCodec() {
config.getHttpClientCodecInitialBufferSize());
}

private SslHandler createSslHandler(String peerHost, int peerPort) {
SSLEngine sslEngine = sslEngineFactory.newSslEngine(config, peerHost, peerPort);
private SslHandler createSslHandler(String peerHost, int peerPort, boolean http2Allowed) {
SSLEngine sslEngine = sslEngineFactory.newSslEngine(config, peerHost, peerPort, http2Allowed);
SslHandler sslHandler = new SslHandler(sslEngine);
if (handshakeTimeout > 0) {
sslHandler.setHandshakeTimeoutMillis(handshakeTimeout);
Expand All @@ -489,7 +513,7 @@ public Future<Channel> updatePipelineForHttpTunneling(ChannelPipeline pipeline,
// Remove existing SSL handler (for proxy) and replace with SSL handler for target
pipeline.remove(SSL_HANDLER);
}
SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort());
SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort(), !requestUri.isWebSocket());
whenHandshaked = sslHandler.handshakeFuture();
pipeline.addBefore(INFLATER_HANDLER, SSL_HANDLER, sslHandler);
pipeline.addAfter(SSL_HANDLER, HTTP_CLIENT_CODEC, newHttpClientCodec());
Expand Down Expand Up @@ -527,9 +551,9 @@ public Future<Channel> updatePipelineForHttpsTunneling(ChannelPipeline pipeline,
// The proxy SSL handler should remain as it provides the tunnel transport
// We need to add target SSL handler that will negotiate with the target through the tunnel

SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort());
SslHandler sslHandler = createSslHandler(requestUri.getHost(), requestUri.getExplicitPort(), !requestUri.isWebSocket());
whenHandshaked = sslHandler.handshakeFuture();

// For HTTPS proxy tunnel, add target SSL handler after the existing proxy SSL handler
// This creates a nested SSL setup: Target SSL -> Proxy SSL -> Network
if (isSslHandlerConfigured(pipeline)) {
Expand Down Expand Up @@ -580,7 +604,8 @@ public SslHandler addSslHandler(ChannelPipeline pipeline, Uri uri, String virtua
peerPort = uri.getExplicitPort();
}

SslHandler sslHandler = createSslHandler(peerHost, peerPort);
// A WebSocket connection must not negotiate h2 (no RFC 8441 support), so advertise only http/1.1 in ALPN.
SslHandler sslHandler = createSslHandler(peerHost, peerPort, !uri.isWebSocket());
// Check if SOCKS handler actually exists in the pipeline before trying to add after it
if (hasSocksProxyHandler && pipeline.get(SOCKS_HANDLER) != null) {
pipeline.addAfter(SOCKS_HANDLER, SSL_HANDLER, sslHandler);
Expand Down Expand Up @@ -710,7 +735,11 @@ public void upgradePipelineToHttp2(ChannelPipeline pipeline) {
.initialWindowSize(config.getHttp2InitialWindowSize())
.maxFrameSize(config.getHttp2MaxFrameSize())
.headerTableSize(config.getHttp2HeaderTableSize())
.maxHeaderListSize(config.getHttp2MaxHeaderListSize());
.maxHeaderListSize(config.getHttp2MaxHeaderListSize())
// RFC 9113 §8.4: AsyncHttpClient never consumes server push, so advertise ENABLE_PUSH=0.
// A conformant server then never opens push streams; without this the client relies on
// Netty's default and a pushing server could trip a connection-level PROTOCOL_ERROR.
.pushEnabled(false);

Http2FrameCodec frameCodec = Http2FrameCodecBuilder.forClient()
.initialSettings(settings)
Expand All @@ -734,7 +763,9 @@ protected void initChannel(Channel ch) {
Http2ConnectionState state = new Http2ConnectionState();
int configMaxStreams = config.getHttp2MaxConcurrentStreams();
if (configMaxStreams > 0) {
state.updateMaxConcurrentStreams(configMaxStreams);
// Client's own cap; the server-advertised value (applied by the http2-settings-listener below)
// can only lower the effective limit, never raise it above this.
state.setClientMaxConcurrentStreams(configMaxStreams);
}
pipeline.channel().attr(Http2ConnectionState.HTTP2_STATE_KEY).set(state);

Expand Down Expand Up @@ -772,6 +803,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
if (pk != null) {
removeHttp2Connection(pk, ctx.channel());
}
// Fail requests still queued for a stream slot: a draining connection accepts no
// new streams, so they can never be opened here and would otherwise wait until the
// connection finally closes. Fail them now so they retry on a fresh connection (the
// registry no longer offers this one). Already-open streams below lastStreamId are
// untouched and complete normally. #12
connState.failPendingOpeners(orphan -> failOrphanedH2Opener(orphan,
"HTTP/2 connection received GOAWAY; request must retry on a new connection"));
}
LOGGER.debug("HTTP/2 GOAWAY received on {}, lastStreamId={}, errorCode={}",
ctx.channel(), lastStreamId, goAwayFrame.errorCode());
Expand Down
Loading