diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java index 25032a032..ad5e48381 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java @@ -153,21 +153,24 @@ public class ClientToProxyConnection extends ProxyConnection { if (sslEngineSource != null) { LOG.debug("Enabling encryption of traffic from client to proxy"); - encrypt(pipeline, sslEngineSource.newSslEngine(), - authenticateClients) - .addListener( - new GenericFutureListener>() { - @Override - public void operationComplete( - Future future) - throws Exception { - if (future.isSuccess()) { - clientSslSession = sslEngine - .getSession(); - recordClientSSLHandshakeSucceeded(); - } - } - }); + GenericFutureListener> futureListener = new GenericFutureListener>() { + @Override + public void operationComplete( + Future future) + throws Exception { + if (future.isSuccess()) { + clientSslSession = sslEngine + .getSession(); + recordClientSSLHandshakeSucceeded(); + } + } + }; + Future future = encrypt(pipeline, sslEngineSource.newSslEngine(), + authenticateClients) + .addListener( + futureListener); + future.addListener(futureListener); + listeners.put(future, futureListener); } this.globalTrafficShapingHandler = globalTrafficShapingHandler; diff --git a/src/main/java/org/littleshoot/proxy/impl/ConnectionFlow.java b/src/main/java/org/littleshoot/proxy/impl/ConnectionFlow.java index 5a044bb58..40d6dfacd 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ConnectionFlow.java +++ b/src/main/java/org/littleshoot/proxy/impl/ConnectionFlow.java @@ -1,9 +1,12 @@ package org.littleshoot.proxy.impl; +import io.netty.channel.Channel; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.HashMap; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -20,7 +23,8 @@ class ConnectionFlow { private volatile ConnectionFlowStep currentStep; private volatile boolean suppressInitialRequest = false; private final Object connectLock; - + private final Map, GenericFutureListener>> listeners = new HashMap(); + /** * Construct a new {@link ConnectionFlow} for the given client and server * connections. @@ -138,24 +142,29 @@ public void run() { */ @SuppressWarnings("unchecked") private void doProcessCurrentStep(final ProxyConnectionLogger LOG) { - currentStep.execute().addListener( - new GenericFutureListener>() { - public void operationComplete( - io.netty.util.concurrent.Future future) - throws Exception { - synchronized (connectLock) { - if (future.isSuccess()) { - LOG.debug("ConnectionFlowStep succeeded"); - currentStep - .onSuccess(ConnectionFlow.this); - } else { - LOG.debug("ConnectionFlowStep failed", - future.cause()); - fail(future.cause()); - } - } - }; - }); + GenericFutureListener> futureListener = new GenericFutureListener>() { + public void operationComplete( + Future future) + throws Exception { + synchronized (connectLock) { + if (future.isSuccess()) { + LOG.debug("ConnectionFlowStep succeeded"); + currentStep + .onSuccess(ConnectionFlow.this); + } else { + LOG.debug("ConnectionFlowStep failed", + future.cause()); + fail(future.cause()); + } + } + } + + ; + }; + Future stepFuture = currentStep.execute(); + stepFuture.addListener( + futureListener); + listeners.put(stepFuture, futureListener); } /** @@ -176,6 +185,7 @@ void succeed() { if (serverConnection.getInitialRequest() instanceof ReferenceCounted) { ((ReferenceCounted)serverConnection.getInitialRequest()).release(); } + removeListeners(); } } } @@ -189,47 +199,57 @@ void succeed() { void fail(final Throwable cause) { final ConnectionState lastStateBeforeFailure = serverConnection .getCurrentState(); - serverConnection.disconnect().addListener( - new GenericFutureListener() { - @Override - public void operationComplete(Future future) - throws Exception { - synchronized (connectLock) { - - boolean fallbackToAnotherChainedProxy = false; - - try { - fallbackToAnotherChainedProxy = clientConnection.serverConnectionFailed( - serverConnection, - lastStateBeforeFailure, - cause); - } finally { - // Do not release when there is fallback chained proxy - if (!fallbackToAnotherChainedProxy) { - if (serverConnection.getInitialRequest() instanceof ReferenceCounted) { - ((ReferenceCounted)serverConnection.getInitialRequest()).release(); - } - - // the connection to the server failed and we are not retrying, so transition to the - // DISCONNECTED state - serverConnection.become(ConnectionState.DISCONNECTED); - - // We are not retrying our connection, let anyone waiting for a connection know that we're done - notifyThreadsWaitingForConnection(); - } + Future disconnectFuture = serverConnection.disconnect(); + GenericFutureListener listener = new GenericFutureListener() { + @Override + public void operationComplete(Future future) + throws Exception { + synchronized (connectLock) { + + boolean fallbackToAnotherChainedProxy = false; + + try { + fallbackToAnotherChainedProxy = clientConnection.serverConnectionFailed( + serverConnection, + lastStateBeforeFailure, + cause); + } finally { + // Do not release when there is fallback chained proxy + if (!fallbackToAnotherChainedProxy) { + if (serverConnection.getInitialRequest() instanceof ReferenceCounted) { + ((ReferenceCounted) serverConnection.getInitialRequest()).release(); } + + // the connection to the server failed and we are not retrying, so transition to the + // DISCONNECTED state + serverConnection.become(ConnectionState.DISCONNECTED); + + // We are not retrying our connection, let anyone waiting for a connection know that we're done + notifyThreadsWaitingForConnection(); } } - }); + } + } + }; + disconnectFuture.addListener( + listener); + listeners.put(disconnectFuture, listener); } /** * Like {@link #fail(Throwable)} but with no cause. */ void fail() { + removeListeners(); fail(null); } + private void removeListeners() { + for (Future future : listeners.keySet()) { + future.removeListener(listeners.get(future)); + } + } + /** * Once we've finished recording our connection and written our initial * request, we can notify anyone who is waiting on the connection that it's diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java index 2f9aa64b7..d4956d360 100644 --- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java +++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java @@ -16,6 +16,8 @@ import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.util.concurrent.GlobalEventExecutor; +import java.util.HashMap; +import java.util.Map; import org.littleshoot.proxy.ActivityTracker; import org.littleshoot.proxy.GlobalStateHandler; import org.littleshoot.proxy.DefaultFailureHttpResponseComposer; @@ -165,6 +167,8 @@ public void run() { } }, "LittleProxy-JVM-shutdown-hook"); + private Map listeners = new HashMap(); + /** * Bootstrap a new {@link DefaultHttpProxyServer} starting from scratch. * @@ -474,6 +478,11 @@ protected void doStop(boolean graceful) { LOG.info("Shutting down proxy server immediately (non-graceful)"); } + try { + removeListeners(); + } catch (Exception e) { + LOG.warn("Listeners removing error", e); + } closeAllChannels(graceful); serverGroup.unregisterProxyServer(this, graceful); @@ -489,6 +498,12 @@ protected void doStop(boolean graceful) { } } + private void removeListeners() { + for (ChannelFuture channelFuture : listeners.keySet()) { + channelFuture.removeListener(listeners.get(channelFuture)); + } + } + /** * Register a new {@link Channel} with this server, for later closing. * @@ -577,16 +592,18 @@ public ServerChannel newChannel() { throw new UnknownTransportProtocolException(transportProtocol); } serverBootstrap.childHandler(initializer); + ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + if (future.isSuccess()) { + registerChannel(future.channel()); + } + } + }; ChannelFuture future = serverBootstrap.bind(requestedAddress) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - if (future.isSuccess()) { - registerChannel(future.channel()); - } - } - }).awaitUninterruptibly(); + .addListener(listener).awaitUninterruptibly(); + listeners.put(future, listener); Throwable cause = future.cause(); if (cause != null) { diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java index c4d3e9c54..857ef8fe6 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java @@ -11,6 +11,8 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; +import java.util.HashMap; +import java.util.Map; import org.littleshoot.proxy.HttpFilters; import javax.net.ssl.SSLEngine; @@ -80,6 +82,7 @@ abstract class ProxyConnection extends * If using encryption, this holds our {@link SSLEngine}. */ protected volatile SSLEngine sslEngine; + protected Map listeners = new HashMap(); /** * Construct a new ProxyConnection. @@ -465,34 +468,39 @@ Future disconnect() { return null; } else { final Promise promise = channel.newPromise(); - writeToChannel(Unpooled.EMPTY_BUFFER).addListener( - new GenericFutureListener>() { - @Override - public void operationComplete( - Future future) - throws Exception { - closeChannel(promise); - } - }); + ChannelFuture channelFuture = writeToChannel(Unpooled.EMPTY_BUFFER); + GenericFutureListener> genericFutureListener = new GenericFutureListener>() { + @Override + public void operationComplete( + Future future) + throws Exception { + closeChannel(promise); + } + }; + channelFuture.addListener(genericFutureListener); + listeners.put(channelFuture, genericFutureListener); return promise; } } private void closeChannel(final Promise promise) { - channel.close().addListener( - new GenericFutureListener>() { - public void operationComplete( - Future future) - throws Exception { - if (future - .isSuccess()) { - promise.setSuccess(null); - } else { - promise.setFailure(future - .cause()); - } - }; - }); + ChannelFuture channelFuture = channel.close(); + GenericFutureListener> futureListener = new GenericFutureListener>() { + public void operationComplete( + Future future) + throws Exception { + if (future + .isSuccess()) { + promise.setSuccess(null); + } else { + promise.setFailure(future + .cause()); + } + }; + }; + channelFuture.addListener( + futureListener); + listeners.put(channelFuture, futureListener); } /** @@ -623,6 +631,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { disconnected(); } finally { super.channelInactive(ctx); + try { + removeListeners(); + } catch (Exception e) { + LOG.warn("Listeners removing error", e); + } + } + } + + private void removeListeners() { + for (Future channelFuture : listeners.keySet()) { + channelFuture.removeListener(listeners.get(channelFuture)); } } diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java index 5b1299965..d7a2958d8 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java @@ -33,6 +33,8 @@ import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; +import java.util.HashMap; +import java.util.Map; import org.littleshoot.proxy.ActivityTracker; import org.littleshoot.proxy.ChainedProxy; import org.littleshoot.proxy.ChainedProxyAdapter; @@ -685,7 +687,7 @@ protected Future execute() { if(isMitmEnabled){ ChannelFuture future = writeToChannel(initialRequest); - future.addListener(new ChannelFutureListener() { + ChannelFutureListener listener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture arg0) throws Exception { @@ -693,7 +695,9 @@ public void operationComplete(ChannelFuture arg0) throws Exception { writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT); } } - }); + }; + future.addListener(listener); + listeners.put(future, listener); return future; } else { return writeToChannel(initialRequest);