Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.littleshoot.proxy.ratelimit.RateLimiter;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;

/**
* Configures and starts an {@link HttpProxyServer}. The HttpProxyServer is
Expand Down Expand Up @@ -243,14 +242,6 @@ HttpProxyServerBootstrap withProxyToServerExHandler(
HttpProxyServerBootstrap withCustomGlobalState(
GlobalStateHandler globalStateHandler);

/**
*
* @param messageProcessorExecutor
* @return
*/
HttpProxyServerBootstrap withMessageProcessingExecutor(
ExecutorService messageProcessorExecutor);

/**
* <p>
* Specify a {@link HttpFiltersSource} to use for filtering requests and/or
Expand Down
163 changes: 45 additions & 118 deletions src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
Expand All @@ -22,17 +20,14 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import org.apache.commons.lang3.StringUtils;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.DefaultFailureHttpResponseComposer;
import org.littleshoot.proxy.ExceptionHandler;
import org.littleshoot.proxy.FailureHttpResponseComposer;
import org.littleshoot.proxy.GlobalStateHandler;
import org.littleshoot.proxy.ratelimit.RateLimiter;
import org.littleshoot.proxy.FlowContext;
import org.littleshoot.proxy.FullFlowContext;
Expand All @@ -49,7 +44,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
Expand Down Expand Up @@ -149,17 +143,15 @@ public class ClientToProxyConnection extends ProxyConnection<HttpRequest> {
final DefaultHttpProxyServer proxyServer,
SslEngineSource sslEngineSource,
boolean authenticateClients,
GlobalTrafficShapingHandler globalTrafficShapingHandler,
Channel channel) {
ChannelPipeline pipeline,
GlobalTrafficShapingHandler globalTrafficShapingHandler) {
super(AWAITING_INITIAL, proxyServer, false);

this.channel = channel;

initChannelPipeline(channel.pipeline());
initChannelPipeline(pipeline);

if (sslEngineSource != null) {
LOG.debug("Enabling encryption of traffic from client to proxy");
encrypt(channel.pipeline(), sslEngineSource.newSslEngine(),
encrypt(pipeline, sslEngineSource.newSslEngine(),
authenticateClients)
.addListener(
new GenericFutureListener<Future<? super Channel>>() {
Expand All @@ -185,8 +177,7 @@ public void operationComplete(
**************************************************************************/

@Override
protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) {
HttpRequest httpRequest = (HttpRequest) httpRequestObj;
protected ConnectionState readHTTPInitial(HttpRequest httpRequest) {
LOG.debug("Received raw request: {}", httpRequest);

// if we cannot parse the request, immediately return a 400 and close the connection, since we do not know what state
Expand All @@ -204,9 +195,14 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
return DISCONNECT_REQUESTED;
}

ctx.fireChannelRead(httpRequest);
boolean authenticationRequired = authenticationRequired(httpRequest);

return getCurrentState();
if (authenticationRequired) {
LOG.debug("Not authenticated!!");
return AWAITING_PROXY_AUTHENTICATION;
} else {
return doReadHTTPInitial(httpRequest);
}
}

/**
Expand All @@ -227,11 +223,34 @@ protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object http
* @param httpRequest
* @return
*/
public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse, HttpRequest httpRequest) {
if (shortCircuitResponse != null) {
LOG.debug("Responding to client with short-circuit response from filter: {}", shortCircuitResponse);
private ConnectionState doReadHTTPInitial(HttpRequest httpRequest) {
// Make a copy of the original request
final HttpRequest currentRequest = copy(httpRequest);

// Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
// should not be filtered), fall back to the default no-op filter source.
HttpFilters filterInstance;
try {
filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
} finally {
// releasing a copied http request
if (currentRequest instanceof ReferenceCounted) {
((ReferenceCounted)currentRequest).release();
}
}
if (filterInstance != null) {
currentFilters = filterInstance;
} else {
currentFilters = HttpFiltersAdapter.NOOP_FILTER;
}

boolean keepAlive = respondWithShortCircuitResponse(shortCircuitResponse);
// Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
HttpResponse clientToProxyFilterResponse = currentFilters.clientToProxyRequest(httpRequest);

if (clientToProxyFilterResponse != null) {
LOG.debug("Responding to client with short-circuit response from filter: {}", clientToProxyFilterResponse);

boolean keepAlive = respondWithShortCircuitResponse(clientToProxyFilterResponse);
if (keepAlive) {
return AWAITING_INITIAL;
} else {
Expand Down Expand Up @@ -347,93 +366,6 @@ public ConnectionState setupUpstreamConnection(HttpResponse shortCircuitResponse
}
}

@Sharable
protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final HttpRequest httpRequest = (HttpRequest) msg;

if (ProxyUtils.isChunked(httpRequest)) {
process(ctx, httpRequest, true);
} else {
ReferenceCountUtil.retain(httpRequest);

proxyServer.getMessageProcessingExecutor()
.execute(() -> {
try {
wrapTask(() -> process(ctx, httpRequest, false)).run();
} catch (Exception e) {
ctx.fireExceptionCaught(e);
} finally {
ReferenceCountUtil.release(httpRequest);
}
});
}
}

private void process(ChannelHandlerContext ctx, HttpRequest httpRequest, boolean chunked) {

boolean authenticationRequired = authenticationRequired(httpRequest);

if (authenticationRequired) {
LOG.debug("Not authenticated!!");
become(AWAITING_PROXY_AUTHENTICATION);
} else {
// Make a copy of the original request
final HttpRequest currentRequest = copy(httpRequest);

// Set up our filters based on the original request. If the HttpFiltersSource returns null (meaning the request/response
// should not be filtered), fall back to the default no-op filter source.
HttpFilters filterInstance;
try {
filterInstance = proxyServer.getFiltersSource().filterRequest(currentRequest, ctx);
} finally {
// releasing a copied http request
ReferenceCountUtil.release(currentRequest);
}
if (filterInstance != null) {
currentFilters = filterInstance;
} else {
currentFilters = HttpFiltersAdapter.NOOP_FILTER;
}

// Send the request through the clientToProxyRequest filter, and respond with the short-circuit response if required
final HttpResponse shortCircuitResponse = currentFilters.clientToProxyRequest(httpRequest);

if (chunked) {
ctx.fireChannelRead(new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse));
} else {
ReferenceCountUtil.retain(httpRequest);
channel.eventLoop().execute(() -> {
try {
wrapTask(() ->
ctx.fireChannelRead(
new UpstreamConnectionHandler.Request(httpRequest, shortCircuitResponse))
).run();
} finally {
ReferenceCountUtil.release(httpRequest);
}
});
}

}
}
}

Runnable wrapTask(Runnable task) {
return () -> {
final Optional<GlobalStateHandler> globalStateHandler =
Optional.ofNullable(proxyServer.getGlobalStateHandler());
try {
globalStateHandler.ifPresent(it -> it.restoreFromChannel(channel));
task.run();
} finally {
globalStateHandler.ifPresent(GlobalStateHandler::clear);
}
};
}

/**
* Returns true if the specified request is a request to an origin server, rather than to a proxy server. If this
* request is being MITM'd, this method always returns false. The format of requests to a proxy server are defined
Expand Down Expand Up @@ -868,10 +800,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("inboundGlobalStateHandler", new InboundGlobalStateHandler(this));
}

EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this);

pipeline.addLast(globalStateWrapperEvenLoop, "bytesReadMonitor", bytesReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "bytesWrittenMonitor", bytesWrittenMonitor);
pipeline.addLast("bytesReadMonitor", bytesReadMonitor);
pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor);

pipeline.addLast("proxyProtocolReader", new ProtocolHeadersRequestDecoder());

Expand All @@ -890,8 +820,8 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
aggregateContentForFiltering(pipeline, numberOfBytesToBuffer);
}

pipeline.addLast(globalStateWrapperEvenLoop, "requestReadMonitor", requestReadMonitor);
pipeline.addLast(globalStateWrapperEvenLoop, "responseWrittenMonitor", responseWrittenMonitor);
pipeline.addLast("requestReadMonitor", requestReadMonitor);
pipeline.addLast("responseWrittenMonitor", responseWrittenMonitor);

pipeline.addLast(
"idle",
Expand All @@ -902,10 +832,7 @@ private void initChannelPipeline(ChannelPipeline pipeline) {
pipeline.addLast("outboundGlobalStateHandler", new OutboundGlobalStateHandler(this));
}

pipeline.addLast(globalStateWrapperEvenLoop, "router", this);
pipeline.addLast(globalStateWrapperEvenLoop, "httpInitialHandler", new HttpInitialHandler<>(this));
pipeline.addLast(globalStateWrapperEvenLoop, "clientToProxyMessageProcessor", new ClientToProxyMessageProcessor());
pipeline.addLast(globalStateWrapperEvenLoop, "upstreamConnectionHandler", new UpstreamConnectionHandler(this));
pipeline.addLast("handler", this);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -552,8 +551,8 @@ protected void initChannel(Channel ch) throws Exception {
DefaultHttpProxyServer.this,
sslEngineSource,
authenticateSslClients,
globalTrafficShapingHandler,
ch);
ch.pipeline(),
globalTrafficShapingHandler);
};
};
switch (transportProtocol) {
Expand Down Expand Up @@ -621,10 +620,6 @@ protected GlobalStateHandler getGlobalStateHandler() {
return globalStateHandler;
}

protected ExecutorService getMessageProcessingExecutor() {
return serverGroup.getMessageProcessingExecutor();
}

protected RequestTracer getRequestTracer() {
return requestTracer;
}
Expand Down Expand Up @@ -675,7 +670,6 @@ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerB
private ExceptionHandler proxyToServerExHandler = null;
private RequestTracer requestTracer = null;
private GlobalStateHandler globalStateHandler = null;
private ExecutorService messageProcessorExecutor = null;
private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter();
private FailureHttpResponseComposer unrecoverableFailureHttpResponseComposer = new DefaultFailureHttpResponseComposer();
private boolean transparent = false;
Expand Down Expand Up @@ -901,13 +895,6 @@ public HttpProxyServerBootstrap withCustomGlobalState(
return this;
}

@Override
public HttpProxyServerBootstrap withMessageProcessingExecutor(
ExecutorService messageProcessorExecutor) {
this.messageProcessorExecutor = messageProcessorExecutor;
return this;
}

@Override
public HttpProxyServerBootstrap withFiltersSource(
HttpFiltersSource filtersSource) {
Expand Down Expand Up @@ -1023,8 +1010,7 @@ private DefaultHttpProxyServer build() {
serverGroup = this.serverGroup;
}
else {
serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads,
clientToProxyWorkerThreads, proxyToServerWorkerThreads, messageProcessorExecutor);
serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads);
}

return new DefaultHttpProxyServer(serverGroup,
Expand Down
Loading