From 26c3da568ba30bfa05bf0bb410d5263deafec206 Mon Sep 17 00:00:00 2001 From: Kostiantyn Severynov Date: Wed, 13 May 2020 14:38:52 +0300 Subject: [PATCH] Revert async message processing --- .../proxy/HttpProxyServerBootstrap.java | 9 - .../proxy/impl/ClientToProxyConnection.java | 163 +++----- .../proxy/impl/DefaultHttpProxyServer.java | 20 +- .../impl/GlobalStateWrapperEvenLoop.java | 183 --------- .../proxy/impl/HttpInitialHandler.java | 25 -- .../proxy/impl/ProxyConnection.java | 12 +- .../proxy/impl/ProxyToServerConnection.java | 71 +--- .../littleshoot/proxy/impl/ServerGroup.java | 37 +- .../proxy/impl/UpstreamConnectionHandler.java | 41 -- .../littleshoot/proxy/ServerGroupTest.java | 382 +++--------------- 10 files changed, 134 insertions(+), 809 deletions(-) delete mode 100644 src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java delete mode 100644 src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java delete mode 100644 src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java index 8764d3521..cab76a06a 100644 --- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java +++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java @@ -4,7 +4,6 @@ import org.littleshoot.proxy.ratelimit.RateLimiter; import java.net.InetSocketAddress; -import java.util.concurrent.ExecutorService; /** * Configures and starts an {@link HttpProxyServer}. The HttpProxyServer is @@ -243,14 +242,6 @@ HttpProxyServerBootstrap withProxyToServerExHandler( HttpProxyServerBootstrap withCustomGlobalState( GlobalStateHandler globalStateHandler); - /** - * - * @param messageProcessorExecutor - * @return - */ - HttpProxyServerBootstrap withMessageProcessingExecutor( - ExecutorService messageProcessorExecutor); - /** *

* Specify a {@link HttpFiltersSource} to use for filtering requests and/or diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java index 59abf0335..d492980c6 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java @@ -3,8 +3,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; @@ -22,17 +20,14 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; - import org.apache.commons.lang3.StringUtils; import org.littleshoot.proxy.ActivityTracker; import org.littleshoot.proxy.DefaultFailureHttpResponseComposer; import org.littleshoot.proxy.ExceptionHandler; import org.littleshoot.proxy.FailureHttpResponseComposer; -import org.littleshoot.proxy.GlobalStateHandler; import org.littleshoot.proxy.ratelimit.RateLimiter; import org.littleshoot.proxy.FlowContext; import org.littleshoot.proxy.FullFlowContext; @@ -49,7 +44,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; @@ -149,17 +143,15 @@ public class ClientToProxyConnection extends ProxyConnection { final DefaultHttpProxyServer proxyServer, SslEngineSource sslEngineSource, boolean authenticateClients, - GlobalTrafficShapingHandler globalTrafficShapingHandler, - Channel channel) { + ChannelPipeline pipeline, + GlobalTrafficShapingHandler globalTrafficShapingHandler) { super(AWAITING_INITIAL, proxyServer, false); - this.channel = channel; - - initChannelPipeline(channel.pipeline()); + initChannelPipeline(pipeline); if (sslEngineSource != null) { LOG.debug("Enabling encryption of traffic from client to proxy"); - encrypt(channel.pipeline(), sslEngineSource.newSslEngine(), + encrypt(pipeline, sslEngineSource.newSslEngine(), authenticateClients) .addListener( new GenericFutureListener>() { @@ -185,8 +177,7 @@ public void operationComplete( **************************************************************************/ @Override - protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) { - HttpRequest httpRequest = (HttpRequest) httpRequestObj; + protected ConnectionState readHTTPInitial(HttpRequest httpRequest) { LOG.debug("Received raw request: {}", httpRequest); // if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state @@ -204,9 +195,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http return DISCONNECT_REQUESTED; } - ctx.fireChannelRead(httpRequest); + boolean authenticationRequired = authenticationRequired(httpRequest); - return getCurrentState(); + if (authenticationRequired) { + LOG.debug("Not authenticated!!"); + return AWAITING_PROXY_AUTHENTICATION; + } else { + return doReadHTTPInitial(httpRequest); + } } /** @@ -227,11 +223,34 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http * @param httpRequest * @return */ - public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) { - if (shortCircuitResponse != null) { - LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse); + private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) { + // Make a copy of the original request + final HttpRequest currentRequest = copy(httpRequest); + + // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response + // should not be filtered), fall back to the default no-op filter source. + HttpFilters filterInstance; + try { + filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx); + } finally { + // releasing a copied http request + if (currentRequest instanceof ReferenceCounted) { + ((ReferenceCounted)currentRequest).release(); + } + } + if (filterInstance != null) { + currentFilters = filterInstance; + } else { + currentFilters = HttpFiltersAdapter.NOOP_FILTER; + } - boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse); + // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required + HttpResponse clientToProxyFilterResponse = currentFilters.clientToProxyRequest(httpRequest); + + if (clientToProxyFilterResponse != null) { + LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyFilterResponse); + + boolean keepAlive = respondWithShortCircuitResponse(clientToProxyFilterResponse); if (keepAlive) { return AWAITING_INITIAL; } else { @@ -347,93 +366,6 @@ public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse } } - @Sharable - protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - final HttpRequest httpRequest = (HttpRequest) msg; - - if (ProxyUtils.isChunked(httpRequest)) { - process(ctx, httpRequest, true); - } else { - ReferenceCountUtil.retain(httpRequest); - - proxyServer.getMessageProcessingExecutor() - .execute(() -> { - try { - wrapTask(() -> process(ctx, httpRequest, false)).run(); - } catch (Exception e) { - ctx.fireExceptionCaught(e); - } finally { - ReferenceCountUtil.release(httpRequest); - } - }); - } - } - - private void process(ChannelHandlerContext ctx, HttpRequest httpRequest, boolean chunked) { - - boolean authenticationRequired = authenticationRequired(httpRequest); - - if (authenticationRequired) { - LOG.debug("Not authenticated!!"); - become(AWAITING_PROXY_AUTHENTICATION); - } else { - // Make a copy of the original request - final HttpRequest currentRequest = copy(httpRequest); - - // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response - // should not be filtered), fall back to the default no-op filter source. - HttpFilters filterInstance; - try { - filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx); - } finally { - // releasing a copied http request - ReferenceCountUtil.release(currentRequest); - } - if (filterInstance != null) { - currentFilters = filterInstance; - } else { - currentFilters = HttpFiltersAdapter.NOOP_FILTER; - } - - // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required - final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest); - - if (chunked) { - ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse)); - } else { - ReferenceCountUtil.retain(httpRequest); - channel.eventLoop().execute(() -> { - try { - wrapTask(() -> - ctx.fireChannelRead( - new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse)) - ).run(); - } finally { - ReferenceCountUtil.release(httpRequest); - } - }); - } - - } - } - } - - Runnable wrapTask(Runnable task) { - return () -> { - final Optional globalStateHandler = - Optional.ofNullable(proxyServer.getGlobalStateHandler()); - try { - globalStateHandler.ifPresent(it -> it.restoreFromChannel(channel)); - task.run(); - } finally { - globalStateHandler.ifPresent(GlobalStateHandler::clear); - } - }; - } - /** * Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this * request is being MITM'd, this method always returns false. The format of requests to a proxy server are defined @@ -868,10 +800,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) { pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this)); } - EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this); - - pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor); - pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor); + pipeline.addLast("bytesReadMonitor", bytesReadMonitor); + pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor); pipeline.addLast("proxyProtocolReader", new ProtocolHeadersRequestDecoder()); @@ -890,8 +820,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) { aggregateContentForFiltering(pipeline, numberOfBytesToBuffer); } - pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor); - pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor); + pipeline.addLast("requestReadMonitor", requestReadMonitor); + pipeline.addLast("responseWrittenMonitor", responseWrittenMonitor); pipeline.addLast( "idle", @@ -902,10 +832,7 @@ private void initChannelPipeline(ChannelPipeline pipeline) { pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this)); } - pipeline.addLast(globalStateWrapperEvenLoop, "router", this); - pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this)); - pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor()); - pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this)); + pipeline.addLast("handler", this); } diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java index 65a9a9ce4..aaee33af8 100644 --- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java +++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java @@ -51,7 +51,6 @@ import java.util.Collection; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -552,8 +551,8 @@ protected void initChannel(Channel ch) throws Exception { DefaultHttpProxyServer.this, sslEngineSource, authenticateSslClients, - globalTrafficShapingHandler, - ch); + ch.pipeline(), + globalTrafficShapingHandler); }; }; switch (transportProtocol) { @@ -621,10 +620,6 @@ protected GlobalStateHandler getGlobalStateHandler() { return globalStateHandler; } - protected ExecutorService getMessageProcessingExecutor() { - return serverGroup.getMessageProcessingExecutor(); - } - protected RequestTracer getRequestTracer() { return requestTracer; } @@ -675,7 +670,6 @@ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerB private ExceptionHandler proxyToServerExHandler = null; private RequestTracer requestTracer = null; private GlobalStateHandler globalStateHandler = null; - private ExecutorService messageProcessorExecutor = null; private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter(); private FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer = new DefaultFailureHttpResponseComposer(); private boolean transparent = false; @@ -901,13 +895,6 @@ public HttpProxyServerBootstrap withCustomGlobalState( return this; } - @Override - public HttpProxyServerBootstrap withMessageProcessingExecutor( - ExecutorService messageProcessorExecutor) { - this.messageProcessorExecutor = messageProcessorExecutor; - return this; - } - @Override public HttpProxyServerBootstrap withFiltersSource( HttpFiltersSource filtersSource) { @@ -1023,8 +1010,7 @@ private DefaultHttpProxyServer build() { serverGroup = this.serverGroup; } else { - serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, - clientToProxyWorkerThreads, proxyToServerWorkerThreads, messageProcessorExecutor); + serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads); } return new DefaultHttpProxyServer(serverGroup, diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java deleted file mode 100644 index f3df53ed9..000000000 --- a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java +++ /dev/null @@ -1,183 +0,0 @@ -package org.littleshoot.proxy.impl; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import io.netty.util.concurrent.EventExecutor; -import io.netty.util.concurrent.EventExecutorGroup; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.ProgressivePromise; -import io.netty.util.concurrent.Promise; -import io.netty.util.concurrent.ScheduledFuture; - -public class GlobalStateWrapperEvenLoop implements EventExecutor { - - private final ClientToProxyConnection connection; - - private final EventExecutor eventLoop; - - GlobalStateWrapperEvenLoop(ClientToProxyConnection connection) { - this.connection = connection; - this.eventLoop = connection.channel.eventLoop(); - } - - GlobalStateWrapperEvenLoop(ClientToProxyConnection connection, EventExecutor eventLoop) { - this.connection = connection; - this.eventLoop = eventLoop; - } - - @Override - public void execute(Runnable command) { - eventLoop.execute(connection.wrapTask(command)); - } - - @Override - public boolean isShuttingDown() { - return eventLoop.isShuttingDown(); - } - - @Override - public Future shutdownGracefully() { - return eventLoop.shutdownGracefully(); - } - - @Override - public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { - return eventLoop.shutdownGracefully(quietPeriod, timeout, unit); - } - - @Override - public Future terminationFuture() { - return eventLoop.terminationFuture(); - } - - @Override - public void shutdown() { - eventLoop.shutdown(); - } - - @Override - public List shutdownNow() { - return eventLoop.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return eventLoop.isShutdown(); - } - - @Override - public boolean isTerminated() { - return eventLoop.isTerminated(); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return eventLoop.awaitTermination(timeout, unit); - } - - @Override - public EventExecutor next() { - return this; - } - - @Override - public Iterator iterator() { - return eventLoop.iterator(); - } - - @Override - public Future submit(Runnable task) { - return eventLoop.submit(connection.wrapTask(task)); - } - - @Override - public List> invokeAll(Collection> tasks) throws InterruptedException { - return eventLoop.invokeAll(tasks); - } - - @Override - public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - return eventLoop.invokeAll(tasks, timeout, unit); - } - - @Override - public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - return eventLoop.invokeAny(tasks); - } - - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return eventLoop.invokeAny(tasks, timeout, unit); - } - - @Override - public Future submit(Runnable task, T result) { - return eventLoop.submit(connection.wrapTask(task), result); - } - - @Override - public Future submit(Callable task) { - return eventLoop.submit(task); - } - - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - return eventLoop.schedule(command, delay, unit); - } - - @Override - public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - return eventLoop.schedule(callable, delay, unit); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - @Override - public EventExecutorGroup parent() { - return eventLoop.parent(); - } - - @Override - public boolean inEventLoop() { - return eventLoop.inEventLoop(); - } - - @Override - public boolean inEventLoop(Thread thread) { - return eventLoop.inEventLoop(thread); - } - - @Override - public Promise newPromise() { - return eventLoop.newPromise(); - } - - @Override - public ProgressivePromise newProgressivePromise() { - return eventLoop.newProgressivePromise(); - } - - @Override - public Future newSucceededFuture(V result) { - return eventLoop.newSucceededFuture(result); - } - - @Override - public Future newFailedFuture(Throwable cause) { - return eventLoop.newFailedFuture(cause); - } -} \ No newline at end of file diff --git a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java deleted file mode 100644 index e715bb8ee..000000000 --- a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.littleshoot.proxy.impl; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.HttpObject; - -public class HttpInitialHandler extends ChannelInboundHandlerAdapter { - - private final ProxyConnection proxyConnection; - - HttpInitialHandler(ProxyConnection proxyConnection) { - this.proxyConnection = proxyConnection; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - final ConnectionState connectionState = proxyConnection.readHTTPInitial(ctx, msg); - proxyConnection.become(connectionState); - } - - @Override - public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - proxyConnection.exceptionCaught(cause); - } -} diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java index eed893c22..1f4ee876b 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java @@ -129,10 +129,11 @@ protected void read(Object msg) { */ @SuppressWarnings("unchecked") private void readHTTP(HttpObject httpObject) { + ConnectionState nextState = getCurrentState(); switch (getCurrentState()) { case AWAITING_INITIAL: if (httpObject instanceof HttpMessage) { - ctx.fireChannelRead(httpObject); + nextState = readHTTPInitial((I) httpObject); } else { // Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL // state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit @@ -144,13 +145,13 @@ private void readHTTP(HttpObject httpObject) { case AWAITING_CHUNK: HttpContent chunk = (HttpContent) httpObject; readHTTPChunk(chunk); - become(ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL - : AWAITING_CHUNK); + nextState = ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL + : AWAITING_CHUNK; break; case AWAITING_PROXY_AUTHENTICATION: if (httpObject instanceof HttpRequest) { // Once we get an HttpRequest, try to process it as usual - ctx.fireChannelRead(httpObject); + nextState = readHTTPInitial((I) httpObject); } else { // Anything that's not an HttpRequest that came in while // we're pending authentication gets dropped on the floor. This @@ -178,6 +179,7 @@ private void readHTTP(HttpObject httpObject) { LOG.info("Ignoring message since the connection is closed or about to close"); break; } + become(nextState); } /** @@ -187,7 +189,7 @@ private void readHTTP(HttpObject httpObject) { * @param httpObject * @return */ - protected abstract ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpObject); + protected abstract ConnectionState readHTTPInitial(I httpObject); /** * Implement this to handle reading a chunk in a chunked transfer. diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java index 3cb87c38c..96e470864 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java @@ -8,8 +8,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -32,9 +30,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; -import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.littleshoot.proxy.ActivityTracker; @@ -225,8 +221,7 @@ protected void read(Object msg) { } @Override - protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpResponseObj) { - HttpResponse httpResponse = (HttpResponse) httpResponseObj; + protected ConnectionState readHTTPInitial(HttpResponse httpResponse) { LOG.debug("Received raw response: {}", httpResponse); if (httpResponse.getDecoderResult().isFailure()) { @@ -245,43 +240,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http currentFilters.serverToProxyResponseReceiving(); rememberCurrentResponse(httpResponse); + respondWith(httpResponse); - ctx.fireChannelRead(httpResponse); - - return getCurrentState(); - } - - public class RespondToClientHandler extends ChannelInboundHandlerAdapter { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - final HttpResponse httpResponse = (HttpResponse) msg; - if (ProxyUtils.isChunked(httpResponse)) { - respondWith(httpResponse); - become(AWAITING_CHUNK); - } else { - ReferenceCountUtil.retain(httpResponse); - - proxyServer.getMessageProcessingExecutor() - .execute(() -> { - try { - clientConnection.wrapTask(() -> { - respondWith(httpResponse); - currentFilters.serverToProxyResponseReceived(); - become(AWAITING_INITIAL); - }).run(); - } catch (Exception e) { - exceptionCaught(ctx, e); - } finally { - ReferenceCountUtil.release(httpResponse); - } - }); - } - } + if (ProxyUtils.isChunked(httpResponse)) { + return AWAITING_CHUNK; + } else { + currentFilters.serverToProxyResponseReceived(); - @Override - public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - serverConnection.exceptionCaught(cause); + return AWAITING_INITIAL; } } @@ -677,8 +643,8 @@ public Channel newChannel() { cb.handler(new ChannelInitializer() { protected void initChannel(Channel ch) throws Exception { - initChannelPipeline(ch.pipeline(), ch); - } + initChannelPipeline(ch.pipeline(), initialRequest); + }; }); cb.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, proxyServer.getConnectTimeout()); @@ -863,8 +829,6 @@ protected boolean connectionFailed(Throwable cause) private void resetConnectionForRetry() throws UnknownHostException { // Remove ourselves as handler on the old context this.ctx.pipeline().remove(this); - this.ctx.pipeline().remove("httpInitialHandler"); - this.ctx.pipeline().remove("respondToClientHandler"); this.ctx.close(); this.ctx = null; @@ -931,7 +895,8 @@ private void setupConnectionParameters() throws UnknownHostException { * @param pipeline * @param httpRequest */ - private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) { + private void initChannelPipeline(ChannelPipeline pipeline, + HttpRequest httpRequest) { if (proxyServer.getGlobalStateHandler() != null) { pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection)); @@ -941,10 +906,8 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) { pipeline.addLast("global-traffic-shaping", trafficHandler); } - final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop()); - - pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor); - pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor); + pipeline.addLast("bytesReadMonitor", bytesReadMonitor); + pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor); pipeline.addLast("encoder", new HttpRequestEncoder()); pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder( @@ -959,8 +922,8 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) { aggregateContentForFiltering(pipeline, numberOfBytesToBuffer); } - pipeline.addLast(globalStateWrapperEvenLoop, "responseReadMonitor", responseReadMonitor); - pipeline.addLast(globalStateWrapperEvenLoop, "requestWrittenMonitor", requestWrittenMonitor); + pipeline.addLast("responseReadMonitor", responseReadMonitor); + pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor); // Set idle timeout pipeline.addLast( @@ -972,9 +935,7 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) { pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection)); } - pipeline.addLast(globalStateWrapperEvenLoop, "router", this); - pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this)); - pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToClientHandler()); + pipeline.addLast("handler", this); } /** diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java index dbd898305..d359c7be1 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java +++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java @@ -12,8 +12,6 @@ import java.util.ArrayList; import java.util.EnumMap; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -74,8 +72,6 @@ public class ServerGroup { */ private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class); - private final ExecutorService messageProcessingExecutor; - /** * A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during * transport protocol initialization. @@ -108,19 +104,12 @@ public class ServerGroup { * @param incomingWorkerThreads number of client-to-proxy worker threads per protocol * @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol */ - public ServerGroup(String name, int incomingAcceptorThreads, - int incomingWorkerThreads, int outgoingWorkerThreads, - ExecutorService messageProcessingExecutor) { + public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) { this.name = name; this.serverGroupId = serverGroupCount.getAndIncrement(); this.incomingAcceptorThreads = incomingAcceptorThreads; this.incomingWorkerThreads = incomingWorkerThreads; this.outgoingWorkerThreads = outgoingWorkerThreads; - if (messageProcessingExecutor == null) { - this.messageProcessingExecutor = Executors.newCachedThreadPool(); - } else { - this.messageProcessingExecutor = messageProcessingExecutor; - } } /** @@ -230,8 +219,6 @@ private void shutdown(boolean graceful) { allEventLoopGroups.addAll(threadPools.getAllEventLoops()); } - shutdownAndAwaitTermination(messageProcessingExecutor); - for (EventLoopGroup group : allEventLoopGroups) { if (graceful) { group.shutdownGracefully(); @@ -255,24 +242,6 @@ private void shutdown(boolean graceful) { log.debug("Done shutting down server group"); } - private void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted - try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(60, TimeUnit.SECONDS)) - log.warn("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - /** * Retrieves the client-to-proxy acceptor thread pool for the specified protocol. Initializes the pool if it has not * yet been initialized. @@ -312,10 +281,6 @@ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol p return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool(); } - public ExecutorService getMessageProcessingExecutor() { - return messageProcessingExecutor; - } - /** * @return true if this ServerGroup has already been stopped */ diff --git a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java deleted file mode 100644 index 196256f0c..000000000 --- a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java +++ /dev/null @@ -1,41 +0,0 @@ -package org.littleshoot.proxy.impl; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; - -public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter { - - private final ClientToProxyConnection clientToProxyConnection; - - UpstreamConnectionHandler(ClientToProxyConnection clientToProxyConnection) { - this.clientToProxyConnection = clientToProxyConnection; - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object request) { - final ConnectionState connectionState = - clientToProxyConnection.setupUpstreamConnection(((Request) request).getShortCircuitResponse(), - ((Request) request).getInitialRequest()); - clientToProxyConnection.become(connectionState); - } - - @Override - public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - clientToProxyConnection.exceptionCaught(cause); - } - - public static class Request { - private final HttpRequest initialRequest; - private final HttpResponse shortCircuitResponse; - - public Request(HttpRequest initialRequest, HttpResponse shortCircuitResponse) { - this.initialRequest = initialRequest; - this.shortCircuitResponse = shortCircuitResponse; - } - - HttpRequest getInitialRequest() { return initialRequest; } - HttpResponse getShortCircuitResponse() { return shortCircuitResponse; } - } -} diff --git a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java index e236e4b67..629a47c2f 100644 --- a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java +++ b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java @@ -4,9 +4,7 @@ import io.netty.handler.codec.http.HttpRequest; import org.apache.http.HttpResponse; import org.junit.After; -import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.littleshoot.proxy.impl.DefaultHttpProxyServer; import org.littleshoot.proxy.impl.ThreadPoolConfiguration; @@ -14,280 +12,82 @@ import org.mockserver.integration.ClientAndServer; import org.mockserver.matchers.Times; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.fail; import static org.mockserver.model.HttpRequest.request; import static org.mockserver.model.HttpResponse.response; - -// set up two server responses that will execute more or less simultaneously. the first request has a small -// delay, to reduce the chance that the first request will finish entirely before the second request is finished -// (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is -// not behaving properly). - - -// save the names of the threads that execute the filter methods. filter methods are executed by the worker thread -// handling the request/response, so if there is only one worker thread, the filter methods should be executed -// by the same thread. - public class ServerGroupTest { private ClientAndServer mockServer; private int mockServerPort; - final String firstRequestPath = "/testSingleThreadFirstRequest"; - final String secondRequestPath = "/testSingleThreadSecondRequest"; - final String messageProcessingThreadName = UUID.randomUUID().toString(); - - final AtomicReference firstClientThreadName = new AtomicReference(); - final AtomicReference secondClientThreadName = new AtomicReference(); - - final AtomicReference firstProxyThreadName = new AtomicReference(); - final AtomicReference secondProxyThreadName = new AtomicReference(); + private HttpProxyServer proxyServer; @Before public void setUp() { mockServer = new ClientAndServer(0); mockServerPort = mockServer.getPort(); - - mockServer.when(request() - .withMethod("GET") - .withPath(firstRequestPath), - Times.exactly(1)) - .respond(response() - .withStatusCode(200) - .withBody("first") - .withDelay(TimeUnit.MILLISECONDS, 500) - ); - - mockServer.when(request() - .withMethod("GET") - .withPath(secondRequestPath), - Times.exactly(1)) - .respond(response() - .withStatusCode(200) - .withBody("second") - ); } @After public void tearDown() { - firstClientThreadName.set(null); - secondClientThreadName.set(null); - firstProxyThreadName.set(null); - secondProxyThreadName.set(null); - if (mockServer != null) { - mockServer.stop(); - } - } - - @Test - public void testChunkedRequest() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(2, false, - false, false, false, false, - false, false, false, true); - - final Futures futures = runTwoRequests(proxyServer); - - futures.getFirstFuture().get(); - futures.getSecondFuture().get(); - - assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); - assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); - - assertNotEquals(firstClientThreadName.get(), messageProcessingThreadName); - assertNotEquals(firstProxyThreadName.get(), messageProcessingThreadName); - } - - @Test - public void testBlockFirstRequest() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(2, true, - false, false, false, false, - false, false, false, false); - - final Futures futures = runTwoRequests(proxyServer); - - try { - futures.getSecondFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("Second request took longer than expected"); - } - - boolean firstStillExecuting = false; - try { - futures.getFirstFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - firstStillExecuting = true; - } - - Assert.assertTrue("First request must be still executing", firstStillExecuting); - - try { - futures.getFirstFuture().get(3, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("First request took longer than expected"); - } - - assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); - assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); - - assertEquals(firstClientThreadName.get(), messageProcessingThreadName); - assertEquals(firstProxyThreadName.get(), messageProcessingThreadName); - } - - @Test - public void testBlockFirstResponse() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(2, false, - false, true, false, false, - false, false, false, false); - - final Futures futures = runTwoRequests(proxyServer); - try { - futures.getSecondFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("Second request took longer than expected"); + if (mockServer != null) { + mockServer.stop(); + } + } finally { + if (proxyServer != null) { + proxyServer.abort(); + } } - - boolean firstStillExecuting = false; - try { - futures.getFirstFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - firstStillExecuting = true; - } - - Assert.assertTrue("First request must be still executing", firstStillExecuting); - - try { - futures.getFirstFuture().get(3, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("First request took longer than expected"); - } - - assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); - assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); - - assertEquals(firstClientThreadName.get(), messageProcessingThreadName); - assertEquals(firstProxyThreadName.get(), messageProcessingThreadName); - } - - @Test(expected = ExecutionException.class) - public void testExceptionFirstRequest() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(2, false, - false, false, false, true, - false, false, false, false); - - final Futures futures = runTwoRequests(proxyServer); - - futures.getFirstFuture().get(); - futures.getSecondFuture().get(); - } - - @Test(expected = ExecutionException.class) - @Ignore // for some reason the test hangs even with original logic - public void testExceptionFirstResponse() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(2, false, - false, false, false, false, - false, true, false, false); - - final Futures futures = runTwoRequests(proxyServer); - - futures.getFirstFuture().get(); - futures.getSecondFuture().get(); } @Test - public void testBlockFirstRequestSingleProcessingThread() throws ExecutionException, InterruptedException { - - final HttpProxyServer proxyServer = getProxy(1, true, - false, false, false, false, - false, false, false, false); - - final Futures futures = runTwoRequests(proxyServer); - - boolean secondStillExecuting = false; - try { - futures.getSecondFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - secondStillExecuting = true; - } - - Assert.assertTrue("Second request must be still executing", secondStillExecuting); - - boolean firstStillExecuting = false; - try { - futures.getFirstFuture().get(2, TimeUnit.SECONDS); - } catch (TimeoutException e) { - firstStillExecuting = true; - } - - Assert.assertTrue("First request must be still executing", firstStillExecuting); - - try { - futures.getFirstFuture().get(3, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("First request took longer than expected"); - } - - try { - futures.getSecondFuture().get(3, TimeUnit.SECONDS); - } catch (TimeoutException e) { - fail("Second request took longer than expected"); - } - - assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); - assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); - - assertEquals(firstClientThreadName.get(), messageProcessingThreadName); - assertEquals(firstProxyThreadName.get(), messageProcessingThreadName); - } + public void testSingleWorkerThreadPoolConfiguration() throws ExecutionException, InterruptedException { + final String firstRequestPath = "/testSingleThreadFirstRequest"; + final String secondRequestPath = "/testSingleThreadSecondRequest"; + + // set up two server responses that will execute more or less simultaneously. the first request has a small + // delay, to reduce the chance that the first request will finish entirely before the second request is finished + // (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is + // not behaving properly). + mockServer.when(request() + .withMethod("GET") + .withPath(firstRequestPath), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("first") + .withDelay(TimeUnit.MILLISECONDS, 500) + ); - private HttpProxyServer getProxy(int processingThreads, - boolean blockFirstRequest, - boolean blockSecondRequest, - boolean blockFirstResponse, - boolean blockSecondResponse, - boolean throwFirstRequestException, - boolean throwSecondRequestException, - boolean throwFirstResponseException, - boolean throwSecondResponseException, - boolean withChunks) { - return DefaultHttpProxyServer.bootstrap() + mockServer.when(request() + .withMethod("GET") + .withPath(secondRequestPath), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("second") + ); + + // save the names of the threads that execute the filter methods. filter methods are executed by the worker thread + // handling the request/response, so if there is only one worker thread, the filter methods should be executed + // by the same thread. + final AtomicReference firstClientThreadName = new AtomicReference(); + final AtomicReference secondClientThreadName = new AtomicReference(); + + final AtomicReference firstProxyThreadName = new AtomicReference(); + final AtomicReference secondProxyThreadName = new AtomicReference(); + + proxyServer = DefaultHttpProxyServer.bootstrap() .withPort(0) .withFiltersSource(new HttpFiltersSourceAdapter() { - - // required so chinks for used - @Override - public int getMaximumRequestBufferSizeInBytes() { - if (withChunks) { - return 0; - } - return 8388608 * 2; - } - - @Override - public int getMaximumResponseBufferSizeInBytes() { - if (withChunks) { - return 0; - } - return 8388608 * 2; - } - - @Override public HttpFilters filterRequest(HttpRequest originalRequest) { return new HttpFiltersAdapter(originalRequest) { @@ -295,55 +95,20 @@ public HttpFilters filterRequest(HttpRequest originalRequest) { public io.netty.handler.codec.http.HttpResponse clientToProxyRequest(HttpObject httpObject) { if (originalRequest.getUri().endsWith(firstRequestPath)) { firstClientThreadName.set(Thread.currentThread().getName()); - - if (throwFirstRequestException) { - throw new RuntimeException("first-request"); - } - - if (blockFirstRequest) { - block(); - } - } else if (originalRequest.getUri().endsWith(secondRequestPath)) { secondClientThreadName.set(Thread.currentThread().getName()); - - if (throwSecondRequestException) { - throw new RuntimeException("second-request"); - } - - if (blockSecondRequest) { - block(); - } } return super.clientToProxyRequest(httpObject); } @Override - public HttpObject serverToProxyResponse(HttpObject httpObject) { + public void serverToProxyResponseReceived() { if (originalRequest.getUri().endsWith(firstRequestPath)) { firstProxyThreadName.set(Thread.currentThread().getName()); - - if (throwFirstResponseException) { - throw new RuntimeException("first-response"); - } - - if (blockFirstResponse) { - block(); - } - } else if (originalRequest.getUri().endsWith(secondRequestPath)) { secondProxyThreadName.set(Thread.currentThread().getName()); - - if (throwSecondResponseException) { - throw new RuntimeException("second-response"); - } - - if (blockSecondResponse) { - block(); - } } - return httpObject; } }; } @@ -352,60 +117,37 @@ public HttpObject serverToProxyResponse(HttpObject httpObject) { .withAcceptorThreads(1) .withClientToProxyWorkerThreads(1) .withProxyToServerWorkerThreads(1)) - .withMessageProcessingExecutor(Executors.newFixedThreadPool(processingThreads, r -> { - final Thread thread = new Thread(r); - thread.setName(messageProcessingThreadName); - return thread; - })) .start(); - } - private void block() { - try { - Thread.sleep(4000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - private Futures runTwoRequests(HttpProxyServer proxyServer) throws InterruptedException { // execute both requests in parallel, to increase the chance of blocking due to the single-threaded ThreadPoolConfiguration - final Runnable firstRequest = () -> { - HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer); - assertEquals(200, response.getStatusLine().getStatusCode()); + Runnable firstRequest = new Runnable() { + @Override + public void run() { + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer); + assertEquals(200, response.getStatusLine().getStatusCode()); + } }; - final Runnable secondRequest = () -> { - HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer); - assertEquals(200, response.getStatusLine().getStatusCode()); + Runnable secondRequest = new Runnable () { + @Override + public void run() { + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer); + assertEquals(200, response.getStatusLine().getStatusCode()); + } }; ExecutorService executor = Executors.newFixedThreadPool(2); Future firstFuture = executor.submit(firstRequest); - Thread.sleep(500); Future secondFuture = executor.submit(secondRequest); - return new Futures(firstFuture, secondFuture); - } - - private static class Futures { - Future getFirstFuture() { - return firstFuture; - } + firstFuture.get(); + secondFuture.get(); - Future getSecondFuture() { - return secondFuture; - } - - final Future firstFuture; - final Future secondFuture; + Thread.sleep(500); - private Futures(Future firstFuture, Future secondFuture) { - this.firstFuture = firstFuture; - this.secondFuture = secondFuture; - } + assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); + assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); } - }