Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,21 +153,24 @@ public class ClientToProxyConnection extends ProxyConnection<HttpRequest> {

if (sslEngineSource != null) {
LOG.debug("Enabling encryption of traffic from client to proxy");
encrypt(pipeline, sslEngineSource.newSslEngine(),
authenticateClients)
.addListener(
new GenericFutureListener<Future<? super Channel>>() {
@Override
public void operationComplete(
Future<? super Channel> future)
throws Exception {
if (future.isSuccess()) {
clientSslSession = sslEngine
.getSession();
recordClientSSLHandshakeSucceeded();
}
}
});
GenericFutureListener<Future<Channel>> futureListener = new GenericFutureListener<Future<Channel>>() {
@Override
public void operationComplete(
Future<Channel> future)
throws Exception {
if (future.isSuccess()) {
clientSslSession = sslEngine
.getSession();
recordClientSSLHandshakeSucceeded();
}
}
};
Future<Channel> future = encrypt(pipeline, sslEngineSource.newSslEngine(),
authenticateClients)
.addListener(
futureListener);
future.addListener(futureListener);
listeners.put(future, futureListener);
}
this.globalTrafficShapingHandler = globalTrafficShapingHandler;

Expand Down
116 changes: 68 additions & 48 deletions src/main/java/org/littleshoot/proxy/impl/ConnectionFlow.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.littleshoot.proxy.impl;

import io.netty.channel.Channel;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

Expand All @@ -20,7 +23,8 @@ class ConnectionFlow {
private volatile ConnectionFlowStep currentStep;
private volatile boolean suppressInitialRequest = false;
private final Object connectLock;

private final Map<Future<Channel>, GenericFutureListener<Future<?>>> listeners = new HashMap();

/**
* Construct a new {@link ConnectionFlow} for the given client and server
* connections.
Expand Down Expand Up @@ -138,24 +142,29 @@ public void run() {
*/
@SuppressWarnings("unchecked")
private void doProcessCurrentStep(final ProxyConnectionLogger LOG) {
currentStep.execute().addListener(
new GenericFutureListener<Future<?>>() {
public void operationComplete(
io.netty.util.concurrent.Future<?> future)
throws Exception {
synchronized (connectLock) {
if (future.isSuccess()) {
LOG.debug("ConnectionFlowStep succeeded");
currentStep
.onSuccess(ConnectionFlow.this);
} else {
LOG.debug("ConnectionFlowStep failed",
future.cause());
fail(future.cause());
}
}
};
});
GenericFutureListener<Future<?>> futureListener = new GenericFutureListener<Future<?>>() {
public void operationComplete(
Future<?> future)
throws Exception {
synchronized (connectLock) {
if (future.isSuccess()) {
LOG.debug("ConnectionFlowStep succeeded");
currentStep
.onSuccess(ConnectionFlow.this);
} else {
LOG.debug("ConnectionFlowStep failed",
future.cause());
fail(future.cause());
}
}
}

;
};
Future stepFuture = currentStep.execute();
stepFuture.addListener(
futureListener);
listeners.put(stepFuture, futureListener);
}

/**
Expand All @@ -176,6 +185,7 @@ void succeed() {
if (serverConnection.getInitialRequest() instanceof ReferenceCounted) {
((ReferenceCounted)serverConnection.getInitialRequest()).release();
}
removeListeners();
}
}
}
Expand All @@ -189,47 +199,57 @@ void succeed() {
void fail(final Throwable cause) {
final ConnectionState lastStateBeforeFailure = serverConnection
.getCurrentState();
serverConnection.disconnect().addListener(
new GenericFutureListener() {
@Override
public void operationComplete(Future future)
throws Exception {
synchronized (connectLock) {

boolean fallbackToAnotherChainedProxy = false;

try {
fallbackToAnotherChainedProxy = clientConnection.serverConnectionFailed(
serverConnection,
lastStateBeforeFailure,
cause);
} finally {
// Do not release when there is fallback chained proxy
if (!fallbackToAnotherChainedProxy) {
if (serverConnection.getInitialRequest() instanceof ReferenceCounted) {
((ReferenceCounted)serverConnection.getInitialRequest()).release();
}

// the connection to the server failed and we are not retrying, so transition to the
// DISCONNECTED state
serverConnection.become(ConnectionState.DISCONNECTED);

// We are not retrying our connection, let anyone waiting for a connection know that we're done
notifyThreadsWaitingForConnection();
}
Future disconnectFuture = serverConnection.disconnect();
GenericFutureListener listener = new GenericFutureListener() {
@Override
public void operationComplete(Future future)
throws Exception {
synchronized (connectLock) {

boolean fallbackToAnotherChainedProxy = false;

try {
fallbackToAnotherChainedProxy = clientConnection.serverConnectionFailed(
serverConnection,
lastStateBeforeFailure,
cause);
} finally {
// Do not release when there is fallback chained proxy
if (!fallbackToAnotherChainedProxy) {
if (serverConnection.getInitialRequest() instanceof ReferenceCounted) {
((ReferenceCounted) serverConnection.getInitialRequest()).release();
}

// the connection to the server failed and we are not retrying, so transition to the
// DISCONNECTED state
serverConnection.become(ConnectionState.DISCONNECTED);

// We are not retrying our connection, let anyone waiting for a connection know that we're done
notifyThreadsWaitingForConnection();
}
}
});
}
}
};
disconnectFuture.addListener(
listener);
listeners.put(disconnectFuture, listener);
}

/**
* Like {@link #fail(Throwable)} but with no cause.
*/
void fail() {
removeListeners();
fail(null);
}

private void removeListeners() {
for (Future future : listeners.keySet()) {
future.removeListener(listeners.get(future));
}
}

/**
* Once we've finished recording our connection and written our initial
* request, we can notify anyone who is waiting on the connection that it's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.HashMap;
import java.util.Map;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.GlobalStateHandler;
import org.littleshoot.proxy.DefaultFailureHttpResponseComposer;
Expand Down Expand Up @@ -165,6 +167,8 @@ public void run() {
}
}, "LittleProxy-JVM-shutdown-hook");

private Map<ChannelFuture, ChannelFutureListener> listeners = new HashMap();

/**
* Bootstrap a new {@link DefaultHttpProxyServer} starting from scratch.
*
Expand Down Expand Up @@ -474,6 +478,11 @@ protected void doStop(boolean graceful) {
LOG.info("Shutting down proxy server immediately (non-graceful)");
}

try {
removeListeners();
} catch (Exception e) {
LOG.warn("Listeners removing error", e);
}
closeAllChannels(graceful);

serverGroup.unregisterProxyServer(this, graceful);
Expand All @@ -489,6 +498,12 @@ protected void doStop(boolean graceful) {
}
}

private void removeListeners() {
for (ChannelFuture channelFuture : listeners.keySet()) {
channelFuture.removeListener(listeners.get(channelFuture));
}
}

/**
* Register a new {@link Channel} with this server, for later closing.
*
Expand Down Expand Up @@ -577,16 +592,18 @@ public ServerChannel newChannel() {
throw new UnknownTransportProtocolException(transportProtocol);
}
serverBootstrap.childHandler(initializer);
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
registerChannel(future.channel());
}
}
};
ChannelFuture future = serverBootstrap.bind(requestedAddress)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
registerChannel(future.channel());
}
}
}).awaitUninterruptibly();
.addListener(listener).awaitUninterruptibly();
listeners.put(future, listener);

Throwable cause = future.cause();
if (cause != null) {
Expand Down
65 changes: 42 additions & 23 deletions src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.util.HashMap;
import java.util.Map;
import org.littleshoot.proxy.HttpFilters;

import javax.net.ssl.SSLEngine;
Expand Down Expand Up @@ -80,6 +82,7 @@ abstract class ProxyConnection<I extends HttpObject> extends
* If using encryption, this holds our {@link SSLEngine}.
*/
protected volatile SSLEngine sslEngine;
protected Map<Future, GenericFutureListener> listeners = new HashMap();

/**
* Construct a new ProxyConnection.
Expand Down Expand Up @@ -465,34 +468,39 @@ Future<Void> disconnect() {
return null;
} else {
final Promise<Void> promise = channel.newPromise();
writeToChannel(Unpooled.EMPTY_BUFFER).addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(
Future<? super Void> future)
throws Exception {
closeChannel(promise);
}
});
ChannelFuture channelFuture = writeToChannel(Unpooled.EMPTY_BUFFER);
GenericFutureListener<Future<? super Void>> genericFutureListener = new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(
Future<? super Void> future)
throws Exception {
closeChannel(promise);
}
};
channelFuture.addListener(genericFutureListener);
listeners.put(channelFuture, genericFutureListener);
return promise;
}
}

private void closeChannel(final Promise<Void> promise) {
channel.close().addListener(
new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(
Future<? super Void> future)
throws Exception {
if (future
.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future
.cause());
}
};
});
ChannelFuture channelFuture = channel.close();
GenericFutureListener<Future<? super Void>> futureListener = new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(
Future<? super Void> future)
throws Exception {
if (future
.isSuccess()) {
promise.setSuccess(null);
} else {
promise.setFailure(future
.cause());
}
};
};
channelFuture.addListener(
futureListener);
listeners.put(channelFuture, futureListener);
}

/**
Expand Down Expand Up @@ -623,6 +631,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
disconnected();
} finally {
super.channelInactive(ctx);
try {
removeListeners();
} catch (Exception e) {
LOG.warn("Listeners removing error", e);
}
}
}

private void removeListeners() {
for (Future channelFuture : listeners.keySet()) {
channelFuture.removeListener(listeners.get(channelFuture));
}
}

Expand Down
Loading