diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
index 8764d3521..cab76a06a 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
@@ -4,7 +4,6 @@
import org.littleshoot.proxy.ratelimit.RateLimiter;
import java.net.InetSocketAddress;
-import java.util.concurrent.ExecutorService;
/**
* Configures and starts an {@link HttpProxyServer}. The HttpProxyServer is
@@ -243,14 +242,6 @@ HttpProxyServerBootstrap withProxyToServerExHandler(
HttpProxyServerBootstrap withCustomGlobalState(
GlobalStateHandler globalStateHandler);
- /**
- *
- * @param messageProcessorExecutor
- * @return
- */
- HttpProxyServerBootstrap withMessageProcessingExecutor(
- ExecutorService messageProcessorExecutor);
-
/**
*
* Specify a {@link HttpFiltersSource} to use for filtering requests and/or
diff --git a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
index 59abf0335..d492980c6 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
@@ -3,8 +3,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
@@ -22,17 +20,14 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
-import io.netty.util.ReferenceCountUtil;
-import io.netty.util.concurrent.EventExecutorGroup;
+import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-
import org.apache.commons.lang3.StringUtils;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.DefaultFailureHttpResponseComposer;
import org.littleshoot.proxy.ExceptionHandler;
import org.littleshoot.proxy.FailureHttpResponseComposer;
-import org.littleshoot.proxy.GlobalStateHandler;
import org.littleshoot.proxy.ratelimit.RateLimiter;
import org.littleshoot.proxy.FlowContext;
import org.littleshoot.proxy.FullFlowContext;
@@ -49,7 +44,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
@@ -149,17 +143,15 @@ public class ClientToProxyConnection extends ProxyConnection {
final DefaultHttpProxyServer proxyServer,
SslEngineSource sslEngineSource,
boolean authenticateClients,
- GlobalTrafficShapingHandler globalTrafficShapingHandler,
- Channel channel) {
+ ChannelPipeline pipeline,
+ GlobalTrafficShapingHandler globalTrafficShapingHandler) {
super(AWAITING_INITIAL, proxyServer, false);
- this.channel = channel;
-
- initChannelPipeline(channel.pipeline());
+ initChannelPipeline(pipeline);
if (sslEngineSource != null) {
LOG.debug("Enabling encryption of traffic from client to proxy");
- encrypt(channel.pipeline(), sslEngineSource.newSslEngine(),
+ encrypt(pipeline, sslEngineSource.newSslEngine(),
authenticateClients)
.addListener(
new GenericFutureListener>() {
@@ -185,8 +177,7 @@ public void operationComplete(
**************************************************************************/
@Override
- protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) {
- HttpRequest httpRequest = (HttpRequest) httpRequestObj;
+ protected ConnectionState readHTTPInitial(HttpRequest httpRequest) {
LOG.debug("Received raw request: {}", httpRequest);
// if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state
@@ -204,9 +195,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
return DISCONNECT_REQUESTED;
}
- ctx.fireChannelRead(httpRequest);
+ boolean authenticationRequired = authenticationRequired(httpRequest);
- return getCurrentState();
+ if (authenticationRequired) {
+ LOG.debug("Not authenticated!!");
+ return AWAITING_PROXY_AUTHENTICATION;
+ } else {
+ return doReadHTTPInitial(httpRequest);
+ }
}
/**
@@ -227,11 +223,34 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
* @param httpRequest
* @return
*/
- public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) {
- if (shortCircuitResponse != null) {
- LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse);
+ private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
+ // Make a copy of the original request
+ final HttpRequest currentRequest = copy(httpRequest);
+
+ // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
+ // should not be filtered), fall back to the default no-op filter source.
+ HttpFilters filterInstance;
+ try {
+ filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
+ } finally {
+ // releasing a copied http request
+ if (currentRequest instanceof ReferenceCounted) {
+ ((ReferenceCounted)currentRequest).release();
+ }
+ }
+ if (filterInstance != null) {
+ currentFilters = filterInstance;
+ } else {
+ currentFilters = HttpFiltersAdapter.NOOP_FILTER;
+ }
- boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse);
+ // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
+ HttpResponse clientToProxyFilterResponse = currentFilters.clientToProxyRequest(httpRequest);
+
+ if (clientToProxyFilterResponse != null) {
+ LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyFilterResponse);
+
+ boolean keepAlive = respondWithShortCircuitResponse(clientToProxyFilterResponse);
if (keepAlive) {
return AWAITING_INITIAL;
} else {
@@ -347,93 +366,6 @@ public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse
}
}
- @Sharable
- protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final HttpRequest httpRequest = (HttpRequest) msg;
-
- if (ProxyUtils.isChunked(httpRequest)) {
- process(ctx, httpRequest, true);
- } else {
- ReferenceCountUtil.retain(httpRequest);
-
- proxyServer.getMessageProcessingExecutor()
- .execute(() -> {
- try {
- wrapTask(() -> process(ctx, httpRequest, false)).run();
- } catch (Exception e) {
- ctx.fireExceptionCaught(e);
- } finally {
- ReferenceCountUtil.release(httpRequest);
- }
- });
- }
- }
-
- private void process(ChannelHandlerContext ctx, HttpRequest httpRequest, boolean chunked) {
-
- boolean authenticationRequired = authenticationRequired(httpRequest);
-
- if (authenticationRequired) {
- LOG.debug("Not authenticated!!");
- become(AWAITING_PROXY_AUTHENTICATION);
- } else {
- // Make a copy of the original request
- final HttpRequest currentRequest = copy(httpRequest);
-
- // Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
- // should not be filtered), fall back to the default no-op filter source.
- HttpFilters filterInstance;
- try {
- filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
- } finally {
- // releasing a copied http request
- ReferenceCountUtil.release(currentRequest);
- }
- if (filterInstance != null) {
- currentFilters = filterInstance;
- } else {
- currentFilters = HttpFiltersAdapter.NOOP_FILTER;
- }
-
- // Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
- final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);
-
- if (chunked) {
- ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
- } else {
- ReferenceCountUtil.retain(httpRequest);
- channel.eventLoop().execute(() -> {
- try {
- wrapTask(() ->
- ctx.fireChannelRead(
- new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse))
- ).run();
- } finally {
- ReferenceCountUtil.release(httpRequest);
- }
- });
- }
-
- }
- }
- }
-
- Runnable wrapTask(Runnable task) {
- return () -> {
- final Optional globalStateHandler =
- Optional.ofNullable(proxyServer.getGlobalStateHandler());
- try {
- globalStateHandler.ifPresent(it -> it.restoreFromChannel(channel));
- task.run();
- } finally {
- globalStateHandler.ifPresent(GlobalStateHandler::clear);
- }
- };
- }
-
/**
* Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this
* request is being MITM'd, this method always returns false. The format of requests to a proxy server are defined
@@ -868,10 +800,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
}
- EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);
-
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("proxyProtocolReader", new ProtocolHeadersRequestDecoder());
@@ -890,8 +820,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor);
+ pipeline.addLast("requestReadMonitor", requestReadMonitor);
+ pipeline.addLast("responseWrittenMonitor", responseWrittenMonitor);
pipeline.addLast(
"idle",
@@ -902,10 +832,7 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
}
- pipeline.addLast(globalStateWrapperEvenLoop, "router", this);
- pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
- pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor());
- pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this));
+ pipeline.addLast("handler", this);
}
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index 65a9a9ce4..aaee33af8 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -51,7 +51,6 @@
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -552,8 +551,8 @@ protected void initChannel(Channel ch) throws Exception {
DefaultHttpProxyServer.this,
sslEngineSource,
authenticateSslClients,
- globalTrafficShapingHandler,
- ch);
+ ch.pipeline(),
+ globalTrafficShapingHandler);
};
};
switch (transportProtocol) {
@@ -621,10 +620,6 @@ protected GlobalStateHandler getGlobalStateHandler() {
return globalStateHandler;
}
- protected ExecutorService getMessageProcessingExecutor() {
- return serverGroup.getMessageProcessingExecutor();
- }
-
protected RequestTracer getRequestTracer() {
return requestTracer;
}
@@ -675,7 +670,6 @@ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerB
private ExceptionHandler proxyToServerExHandler = null;
private RequestTracer requestTracer = null;
private GlobalStateHandler globalStateHandler = null;
- private ExecutorService messageProcessorExecutor = null;
private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter();
private FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer = new DefaultFailureHttpResponseComposer();
private boolean transparent = false;
@@ -901,13 +895,6 @@ public HttpProxyServerBootstrap withCustomGlobalState(
return this;
}
- @Override
- public HttpProxyServerBootstrap withMessageProcessingExecutor(
- ExecutorService messageProcessorExecutor) {
- this.messageProcessorExecutor = messageProcessorExecutor;
- return this;
- }
-
@Override
public HttpProxyServerBootstrap withFiltersSource(
HttpFiltersSource filtersSource) {
@@ -1023,8 +1010,7 @@ private DefaultHttpProxyServer build() {
serverGroup = this.serverGroup;
}
else {
- serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads,
- clientToProxyWorkerThreads, proxyToServerWorkerThreads, messageProcessorExecutor);
+ serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads);
}
return new DefaultHttpProxyServer(serverGroup,
diff --git a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java b/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
deleted file mode 100644
index f3df53ed9..000000000
--- a/src/main/java/org/littleshoot/proxy/impl/GlobalStateWrapperEvenLoop.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package org.littleshoot.proxy.impl;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import io.netty.util.concurrent.EventExecutor;
-import io.netty.util.concurrent.EventExecutorGroup;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.ProgressivePromise;
-import io.netty.util.concurrent.Promise;
-import io.netty.util.concurrent.ScheduledFuture;
-
-public class GlobalStateWrapperEvenLoop implements EventExecutor {
-
- private final ClientToProxyConnection connection;
-
- private final EventExecutor eventLoop;
-
- GlobalStateWrapperEvenLoop(ClientToProxyConnection connection) {
- this.connection = connection;
- this.eventLoop = connection.channel.eventLoop();
- }
-
- GlobalStateWrapperEvenLoop(ClientToProxyConnection connection, EventExecutor eventLoop) {
- this.connection = connection;
- this.eventLoop = eventLoop;
- }
-
- @Override
- public void execute(Runnable command) {
- eventLoop.execute(connection.wrapTask(command));
- }
-
- @Override
- public boolean isShuttingDown() {
- return eventLoop.isShuttingDown();
- }
-
- @Override
- public Future> shutdownGracefully() {
- return eventLoop.shutdownGracefully();
- }
-
- @Override
- public Future> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
- return eventLoop.shutdownGracefully(quietPeriod, timeout, unit);
- }
-
- @Override
- public Future> terminationFuture() {
- return eventLoop.terminationFuture();
- }
-
- @Override
- public void shutdown() {
- eventLoop.shutdown();
- }
-
- @Override
- public List shutdownNow() {
- return eventLoop.shutdownNow();
- }
-
- @Override
- public boolean isShutdown() {
- return eventLoop.isShutdown();
- }
-
- @Override
- public boolean isTerminated() {
- return eventLoop.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
- return eventLoop.awaitTermination(timeout, unit);
- }
-
- @Override
- public EventExecutor next() {
- return this;
- }
-
- @Override
- public Iterator iterator() {
- return eventLoop.iterator();
- }
-
- @Override
- public Future> submit(Runnable task) {
- return eventLoop.submit(connection.wrapTask(task));
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
- return eventLoop.invokeAll(tasks);
- }
-
- @Override
- public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- return eventLoop.invokeAll(tasks, timeout, unit);
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
- return eventLoop.invokeAny(tasks);
- }
-
- @Override
- public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- return eventLoop.invokeAny(tasks, timeout, unit);
- }
-
- @Override
- public Future submit(Runnable task, T result) {
- return eventLoop.submit(connection.wrapTask(task), result);
- }
-
- @Override
- public Future submit(Callable task) {
- return eventLoop.submit(task);
- }
-
- @Override
- public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
- return eventLoop.schedule(command, delay, unit);
- }
-
- @Override
- public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
- return eventLoop.schedule(callable, delay, unit);
- }
-
- @Override
- public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
- return eventLoop.scheduleAtFixedRate(command, initialDelay, period, unit);
- }
-
- @Override
- public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
- return eventLoop.scheduleWithFixedDelay(command, initialDelay, delay, unit);
- }
-
- @Override
- public EventExecutorGroup parent() {
- return eventLoop.parent();
- }
-
- @Override
- public boolean inEventLoop() {
- return eventLoop.inEventLoop();
- }
-
- @Override
- public boolean inEventLoop(Thread thread) {
- return eventLoop.inEventLoop(thread);
- }
-
- @Override
- public Promise newPromise() {
- return eventLoop.newPromise();
- }
-
- @Override
- public ProgressivePromise newProgressivePromise() {
- return eventLoop.newProgressivePromise();
- }
-
- @Override
- public Future newSucceededFuture(V result) {
- return eventLoop.newSucceededFuture(result);
- }
-
- @Override
- public Future newFailedFuture(Throwable cause) {
- return eventLoop.newFailedFuture(cause);
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java b/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
deleted file mode 100644
index e715bb8ee..000000000
--- a/src/main/java/org/littleshoot/proxy/impl/HttpInitialHandler.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package org.littleshoot.proxy.impl;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.HttpObject;
-
-public class HttpInitialHandler extends ChannelInboundHandlerAdapter {
-
- private final ProxyConnection proxyConnection;
-
- HttpInitialHandler(ProxyConnection proxyConnection) {
- this.proxyConnection = proxyConnection;
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final ConnectionState connectionState = proxyConnection.readHTTPInitial(ctx, msg);
- proxyConnection.become(connectionState);
- }
-
- @Override
- public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- proxyConnection.exceptionCaught(cause);
- }
-}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
index eed893c22..1f4ee876b 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyConnection.java
@@ -129,10 +129,11 @@ protected void read(Object msg) {
*/
@SuppressWarnings("unchecked")
private void readHTTP(HttpObject httpObject) {
+ ConnectionState nextState = getCurrentState();
switch (getCurrentState()) {
case AWAITING_INITIAL:
if (httpObject instanceof HttpMessage) {
- ctx.fireChannelRead(httpObject);
+ nextState = readHTTPInitial((I) httpObject);
} else {
// Similar to the AWAITING_PROXY_AUTHENTICATION case below, we may enter an AWAITING_INITIAL
// state if the proxy responded to an earlier request with a 502 or 504 response, or a short-circuit
@@ -144,13 +145,13 @@ private void readHTTP(HttpObject httpObject) {
case AWAITING_CHUNK:
HttpContent chunk = (HttpContent) httpObject;
readHTTPChunk(chunk);
- become(ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
- : AWAITING_CHUNK);
+ nextState = ProxyUtils.isLastChunk(chunk) ? AWAITING_INITIAL
+ : AWAITING_CHUNK;
break;
case AWAITING_PROXY_AUTHENTICATION:
if (httpObject instanceof HttpRequest) {
// Once we get an HttpRequest, try to process it as usual
- ctx.fireChannelRead(httpObject);
+ nextState = readHTTPInitial((I) httpObject);
} else {
// Anything that's not an HttpRequest that came in while
// we're pending authentication gets dropped on the floor. This
@@ -178,6 +179,7 @@ private void readHTTP(HttpObject httpObject) {
LOG.info("Ignoring message since the connection is closed or about to close");
break;
}
+ become(nextState);
}
/**
@@ -187,7 +189,7 @@ private void readHTTP(HttpObject httpObject) {
* @param httpObject
* @return
*/
- protected abstract ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpObject);
+ protected abstract ConnectionState readHTTPInitial(I httpObject);
/**
* Implement this to handle reading a chunk in a chunked transfer.
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 3cb87c38c..96e470864 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -8,8 +8,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -32,9 +30,7 @@
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.AttributeKey;
-import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
-import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.littleshoot.proxy.ActivityTracker;
@@ -225,8 +221,7 @@ protected void read(Object msg) {
}
@Override
- protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpResponseObj) {
- HttpResponse httpResponse = (HttpResponse) httpResponseObj;
+ protected ConnectionState readHTTPInitial(HttpResponse httpResponse) {
LOG.debug("Received raw response: {}", httpResponse);
if (httpResponse.getDecoderResult().isFailure()) {
@@ -245,43 +240,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
currentFilters.serverToProxyResponseReceiving();
rememberCurrentResponse(httpResponse);
+ respondWith(httpResponse);
- ctx.fireChannelRead(httpResponse);
-
- return getCurrentState();
- }
-
- public class RespondToClientHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- final HttpResponse httpResponse = (HttpResponse) msg;
- if (ProxyUtils.isChunked(httpResponse)) {
- respondWith(httpResponse);
- become(AWAITING_CHUNK);
- } else {
- ReferenceCountUtil.retain(httpResponse);
-
- proxyServer.getMessageProcessingExecutor()
- .execute(() -> {
- try {
- clientConnection.wrapTask(() -> {
- respondWith(httpResponse);
- currentFilters.serverToProxyResponseReceived();
- become(AWAITING_INITIAL);
- }).run();
- } catch (Exception e) {
- exceptionCaught(ctx, e);
- } finally {
- ReferenceCountUtil.release(httpResponse);
- }
- });
- }
- }
+ if (ProxyUtils.isChunked(httpResponse)) {
+ return AWAITING_CHUNK;
+ } else {
+ currentFilters.serverToProxyResponseReceived();
- @Override
- public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- serverConnection.exceptionCaught(cause);
+ return AWAITING_INITIAL;
}
}
@@ -677,8 +643,8 @@ public Channel newChannel() {
cb.handler(new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
- initChannelPipeline(ch.pipeline(), ch);
- }
+ initChannelPipeline(ch.pipeline(), initialRequest);
+ };
});
cb.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
proxyServer.getConnectTimeout());
@@ -863,8 +829,6 @@ protected boolean connectionFailed(Throwable cause)
private void resetConnectionForRetry() throws UnknownHostException {
// Remove ourselves as handler on the old context
this.ctx.pipeline().remove(this);
- this.ctx.pipeline().remove("httpInitialHandler");
- this.ctx.pipeline().remove("respondToClientHandler");
this.ctx.close();
this.ctx = null;
@@ -931,7 +895,8 @@ private void setupConnectionParameters() throws UnknownHostException {
* @param pipeline
* @param httpRequest
*/
- private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) {
+ private void initChannelPipeline(ChannelPipeline pipeline,
+ HttpRequest httpRequest) {
if (proxyServer.getGlobalStateHandler() != null) {
pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(clientConnection));
@@ -941,10 +906,8 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) {
pipeline.addLast("global-traffic-shaping", trafficHandler);
}
- final EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(clientConnection, channel.eventLoop());
-
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
+ pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
+ pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("decoder", new HeadAwareHttpResponseDecoder(
@@ -959,8 +922,8 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}
- pipeline.addLast(globalStateWrapperEvenLoop, "responseReadMonitor", responseReadMonitor);
- pipeline.addLast(globalStateWrapperEvenLoop, "requestWrittenMonitor", requestWrittenMonitor);
+ pipeline.addLast("responseReadMonitor", responseReadMonitor);
+ pipeline.addLast("requestWrittenMonitor", requestWrittenMonitor);
// Set idle timeout
pipeline.addLast(
@@ -972,9 +935,7 @@ private void initChannelPipeline(ChannelPipeline pipeline, Channel channel) {
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(clientConnection));
}
- pipeline.addLast(globalStateWrapperEvenLoop, "router", this);
- pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
- pipeline.addLast(globalStateWrapperEvenLoop, "respondToClientHandler", new RespondToClientHandler());
+ pipeline.addLast("handler", this);
}
/**
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
index dbd898305..d359c7be1 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -12,8 +12,6 @@
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -74,8 +72,6 @@ public class ServerGroup {
*/
private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class);
- private final ExecutorService messageProcessingExecutor;
-
/**
* A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during
* transport protocol initialization.
@@ -108,19 +104,12 @@ public class ServerGroup {
* @param incomingWorkerThreads number of client-to-proxy worker threads per protocol
* @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol
*/
- public ServerGroup(String name, int incomingAcceptorThreads,
- int incomingWorkerThreads, int outgoingWorkerThreads,
- ExecutorService messageProcessingExecutor) {
+ public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) {
this.name = name;
this.serverGroupId = serverGroupCount.getAndIncrement();
this.incomingAcceptorThreads = incomingAcceptorThreads;
this.incomingWorkerThreads = incomingWorkerThreads;
this.outgoingWorkerThreads = outgoingWorkerThreads;
- if (messageProcessingExecutor == null) {
- this.messageProcessingExecutor = Executors.newCachedThreadPool();
- } else {
- this.messageProcessingExecutor = messageProcessingExecutor;
- }
}
/**
@@ -230,8 +219,6 @@ private void shutdown(boolean graceful) {
allEventLoopGroups.addAll(threadPools.getAllEventLoops());
}
- shutdownAndAwaitTermination(messageProcessingExecutor);
-
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
group.shutdownGracefully();
@@ -255,24 +242,6 @@ private void shutdown(boolean graceful) {
log.debug("Done shutting down server group");
}
- private void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(60, TimeUnit.SECONDS))
- log.warn("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
/**
* Retrieves the client-to-proxy acceptor thread pool for the specified protocol. Initializes the pool if it has not
* yet been initialized.
@@ -312,10 +281,6 @@ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol p
return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool();
}
- public ExecutorService getMessageProcessingExecutor() {
- return messageProcessingExecutor;
- }
-
/**
* @return true if this ServerGroup has already been stopped
*/
diff --git a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java b/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
deleted file mode 100644
index 196256f0c..000000000
--- a/src/main/java/org/littleshoot/proxy/impl/UpstreamConnectionHandler.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.littleshoot.proxy.impl;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-
-public class UpstreamConnectionHandler extends ChannelInboundHandlerAdapter {
-
- private final ClientToProxyConnection clientToProxyConnection;
-
- UpstreamConnectionHandler(ClientToProxyConnection clientToProxyConnection) {
- this.clientToProxyConnection = clientToProxyConnection;
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object request) {
- final ConnectionState connectionState =
- clientToProxyConnection.setupUpstreamConnection(((Request) request).getShortCircuitResponse(),
- ((Request) request).getInitialRequest());
- clientToProxyConnection.become(connectionState);
- }
-
- @Override
- public final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- clientToProxyConnection.exceptionCaught(cause);
- }
-
- public static class Request {
- private final HttpRequest initialRequest;
- private final HttpResponse shortCircuitResponse;
-
- public Request(HttpRequest initialRequest, HttpResponse shortCircuitResponse) {
- this.initialRequest = initialRequest;
- this.shortCircuitResponse = shortCircuitResponse;
- }
-
- HttpRequest getInitialRequest() { return initialRequest; }
- HttpResponse getShortCircuitResponse() { return shortCircuitResponse; }
- }
-}
diff --git a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
index e236e4b67..629a47c2f 100644
--- a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
+++ b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
@@ -4,9 +4,7 @@
import io.netty.handler.codec.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
@@ -14,280 +12,82 @@
import org.mockserver.integration.ClientAndServer;
import org.mockserver.matchers.Times;
-import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.fail;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;
-
-// set up two server responses that will execute more or less simultaneously. the first request has a small
-// delay, to reduce the chance that the first request will finish entirely before the second request is finished
-// (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is
-// not behaving properly).
-
-
-// save the names of the threads that execute the filter methods. filter methods are executed by the worker thread
-// handling the request/response, so if there is only one worker thread, the filter methods should be executed
-// by the same thread.
-
public class ServerGroupTest {
private ClientAndServer mockServer;
private int mockServerPort;
- final String firstRequestPath = "/testSingleThreadFirstRequest";
- final String secondRequestPath = "/testSingleThreadSecondRequest";
- final String messageProcessingThreadName = UUID.randomUUID().toString();
-
- final AtomicReference firstClientThreadName = new AtomicReference();
- final AtomicReference secondClientThreadName = new AtomicReference();
-
- final AtomicReference firstProxyThreadName = new AtomicReference();
- final AtomicReference secondProxyThreadName = new AtomicReference();
+ private HttpProxyServer proxyServer;
@Before
public void setUp() {
mockServer = new ClientAndServer(0);
mockServerPort = mockServer.getPort();
-
- mockServer.when(request()
- .withMethod("GET")
- .withPath(firstRequestPath),
- Times.exactly(1))
- .respond(response()
- .withStatusCode(200)
- .withBody("first")
- .withDelay(TimeUnit.MILLISECONDS, 500)
- );
-
- mockServer.when(request()
- .withMethod("GET")
- .withPath(secondRequestPath),
- Times.exactly(1))
- .respond(response()
- .withStatusCode(200)
- .withBody("second")
- );
}
@After
public void tearDown() {
- firstClientThreadName.set(null);
- secondClientThreadName.set(null);
- firstProxyThreadName.set(null);
- secondProxyThreadName.set(null);
- if (mockServer != null) {
- mockServer.stop();
- }
- }
-
- @Test
- public void testChunkedRequest() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(2, false,
- false, false, false, false,
- false, false, false, true);
-
- final Futures futures = runTwoRequests(proxyServer);
-
- futures.getFirstFuture().get();
- futures.getSecondFuture().get();
-
- assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
- assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
-
- assertNotEquals(firstClientThreadName.get(), messageProcessingThreadName);
- assertNotEquals(firstProxyThreadName.get(), messageProcessingThreadName);
- }
-
- @Test
- public void testBlockFirstRequest() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(2, true,
- false, false, false, false,
- false, false, false, false);
-
- final Futures futures = runTwoRequests(proxyServer);
-
- try {
- futures.getSecondFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("Second request took longer than expected");
- }
-
- boolean firstStillExecuting = false;
- try {
- futures.getFirstFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- firstStillExecuting = true;
- }
-
- Assert.assertTrue("First request must be still executing", firstStillExecuting);
-
- try {
- futures.getFirstFuture().get(3, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("First request took longer than expected");
- }
-
- assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
- assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
-
- assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
- assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
- }
-
- @Test
- public void testBlockFirstResponse() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(2, false,
- false, true, false, false,
- false, false, false, false);
-
- final Futures futures = runTwoRequests(proxyServer);
-
try {
- futures.getSecondFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("Second request took longer than expected");
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ } finally {
+ if (proxyServer != null) {
+ proxyServer.abort();
+ }
}
-
- boolean firstStillExecuting = false;
- try {
- futures.getFirstFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- firstStillExecuting = true;
- }
-
- Assert.assertTrue("First request must be still executing", firstStillExecuting);
-
- try {
- futures.getFirstFuture().get(3, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("First request took longer than expected");
- }
-
- assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
- assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
-
- assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
- assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
- }
-
- @Test(expected = ExecutionException.class)
- public void testExceptionFirstRequest() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(2, false,
- false, false, false, true,
- false, false, false, false);
-
- final Futures futures = runTwoRequests(proxyServer);
-
- futures.getFirstFuture().get();
- futures.getSecondFuture().get();
- }
-
- @Test(expected = ExecutionException.class)
- @Ignore // for some reason the test hangs even with original logic
- public void testExceptionFirstResponse() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(2, false,
- false, false, false, false,
- false, true, false, false);
-
- final Futures futures = runTwoRequests(proxyServer);
-
- futures.getFirstFuture().get();
- futures.getSecondFuture().get();
}
@Test
- public void testBlockFirstRequestSingleProcessingThread() throws ExecutionException, InterruptedException {
-
- final HttpProxyServer proxyServer = getProxy(1, true,
- false, false, false, false,
- false, false, false, false);
-
- final Futures futures = runTwoRequests(proxyServer);
-
- boolean secondStillExecuting = false;
- try {
- futures.getSecondFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- secondStillExecuting = true;
- }
-
- Assert.assertTrue("Second request must be still executing", secondStillExecuting);
-
- boolean firstStillExecuting = false;
- try {
- futures.getFirstFuture().get(2, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- firstStillExecuting = true;
- }
-
- Assert.assertTrue("First request must be still executing", firstStillExecuting);
-
- try {
- futures.getFirstFuture().get(3, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("First request took longer than expected");
- }
-
- try {
- futures.getSecondFuture().get(3, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- fail("Second request took longer than expected");
- }
-
- assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
- assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
-
- assertEquals(firstClientThreadName.get(), messageProcessingThreadName);
- assertEquals(firstProxyThreadName.get(), messageProcessingThreadName);
- }
+ public void testSingleWorkerThreadPoolConfiguration() throws ExecutionException, InterruptedException {
+ final String firstRequestPath = "/testSingleThreadFirstRequest";
+ final String secondRequestPath = "/testSingleThreadSecondRequest";
+
+ // set up two server responses that will execute more or less simultaneously. the first request has a small
+ // delay, to reduce the chance that the first request will finish entirely before the second request is finished
+ // (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is
+ // not behaving properly).
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(firstRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("first")
+ .withDelay(TimeUnit.MILLISECONDS, 500)
+ );
- private HttpProxyServer getProxy(int processingThreads,
- boolean blockFirstRequest,
- boolean blockSecondRequest,
- boolean blockFirstResponse,
- boolean blockSecondResponse,
- boolean throwFirstRequestException,
- boolean throwSecondRequestException,
- boolean throwFirstResponseException,
- boolean throwSecondResponseException,
- boolean withChunks) {
- return DefaultHttpProxyServer.bootstrap()
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(secondRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("second")
+ );
+
+ // save the names of the threads that execute the filter methods. filter methods are executed by the worker thread
+ // handling the request/response, so if there is only one worker thread, the filter methods should be executed
+ // by the same thread.
+ final AtomicReference firstClientThreadName = new AtomicReference();
+ final AtomicReference secondClientThreadName = new AtomicReference();
+
+ final AtomicReference firstProxyThreadName = new AtomicReference();
+ final AtomicReference secondProxyThreadName = new AtomicReference();
+
+ proxyServer = DefaultHttpProxyServer.bootstrap()
.withPort(0)
.withFiltersSource(new HttpFiltersSourceAdapter() {
-
- // required so chinks for used
- @Override
- public int getMaximumRequestBufferSizeInBytes() {
- if (withChunks) {
- return 0;
- }
- return 8388608 * 2;
- }
-
- @Override
- public int getMaximumResponseBufferSizeInBytes() {
- if (withChunks) {
- return 0;
- }
- return 8388608 * 2;
- }
-
-
@Override
public HttpFilters filterRequest(HttpRequest originalRequest) {
return new HttpFiltersAdapter(originalRequest) {
@@ -295,55 +95,20 @@ public HttpFilters filterRequest(HttpRequest originalRequest) {
public io.netty.handler.codec.http.HttpResponse clientToProxyRequest(HttpObject httpObject) {
if (originalRequest.getUri().endsWith(firstRequestPath)) {
firstClientThreadName.set(Thread.currentThread().getName());
-
- if (throwFirstRequestException) {
- throw new RuntimeException("first-request");
- }
-
- if (blockFirstRequest) {
- block();
- }
-
} else if (originalRequest.getUri().endsWith(secondRequestPath)) {
secondClientThreadName.set(Thread.currentThread().getName());
-
- if (throwSecondRequestException) {
- throw new RuntimeException("second-request");
- }
-
- if (blockSecondRequest) {
- block();
- }
}
return super.clientToProxyRequest(httpObject);
}
@Override
- public HttpObject serverToProxyResponse(HttpObject httpObject) {
+ public void serverToProxyResponseReceived() {
if (originalRequest.getUri().endsWith(firstRequestPath)) {
firstProxyThreadName.set(Thread.currentThread().getName());
-
- if (throwFirstResponseException) {
- throw new RuntimeException("first-response");
- }
-
- if (blockFirstResponse) {
- block();
- }
-
} else if (originalRequest.getUri().endsWith(secondRequestPath)) {
secondProxyThreadName.set(Thread.currentThread().getName());
-
- if (throwSecondResponseException) {
- throw new RuntimeException("second-response");
- }
-
- if (blockSecondResponse) {
- block();
- }
}
- return httpObject;
}
};
}
@@ -352,60 +117,37 @@ public HttpObject serverToProxyResponse(HttpObject httpObject) {
.withAcceptorThreads(1)
.withClientToProxyWorkerThreads(1)
.withProxyToServerWorkerThreads(1))
- .withMessageProcessingExecutor(Executors.newFixedThreadPool(processingThreads, r -> {
- final Thread thread = new Thread(r);
- thread.setName(messageProcessingThreadName);
- return thread;
- }))
.start();
- }
- private void block() {
- try {
- Thread.sleep(4000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- private Futures runTwoRequests(HttpProxyServer proxyServer) throws InterruptedException {
// execute both requests in parallel, to increase the chance of blocking due to the single-threaded ThreadPoolConfiguration
- final Runnable firstRequest = () -> {
- HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer);
- assertEquals(200, response.getStatusLine().getStatusCode());
+ Runnable firstRequest = new Runnable() {
+ @Override
+ public void run() {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
};
- final Runnable secondRequest = () -> {
- HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer);
- assertEquals(200, response.getStatusLine().getStatusCode());
+ Runnable secondRequest = new Runnable () {
+ @Override
+ public void run() {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
};
ExecutorService executor = Executors.newFixedThreadPool(2);
Future> firstFuture = executor.submit(firstRequest);
- Thread.sleep(500);
Future> secondFuture = executor.submit(secondRequest);
- return new Futures(firstFuture, secondFuture);
- }
-
- private static class Futures {
- Future> getFirstFuture() {
- return firstFuture;
- }
+ firstFuture.get();
+ secondFuture.get();
- Future> getSecondFuture() {
- return secondFuture;
- }
-
- final Future> firstFuture;
- final Future> secondFuture;
+ Thread.sleep(500);
- private Futures(Future> firstFuture, Future> secondFuture) {
- this.firstFuture = firstFuture;
- this.secondFuture = secondFuture;
- }
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
}
-
}