diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpRequestBody.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpRequestBody.java new file mode 100644 index 0000000..efb9d5e --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpRequestBody.java @@ -0,0 +1,55 @@ +package io.roastedroot.proxywasm.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.function.Supplier; + +/** + * Holds the HTTP request body, loading it from a stream when first accessed. + */ +public class HttpRequestBody { + + private byte[] body; + private boolean loaded = false; + private final Supplier streamSupplier; + + public HttpRequestBody(Supplier streamSupplier) { + this.streamSupplier = streamSupplier; + } + + public byte[] get() { + if (!loaded) { + try { + body = streamSupplier.get().readAllBytes(); + } catch (IOException e) { + throw new RuntimeException("Failed to read request body", e); + } + loaded = true; + } + return body; + } + + /** + * Returns true if the request body has been loaded from the stream. + */ + public boolean isLoaded() { + return loaded; + } + + /** + * Sets the request body directly, marking it as loaded. + * This is used when the body is modified by WASM plugins. + */ + public void setBody(byte[] body) { + this.body = body; + this.loaded = true; + } + + /** + * Returns the request body if it has been loaded, null otherwise. + * This allows checking if the body was accessed without triggering a load. + */ + public byte[] getBodyIfLoaded() { + return loaded ? body : null; + } +} diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpResponseBody.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpResponseBody.java new file mode 100644 index 0000000..10f2fe2 --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/HttpResponseBody.java @@ -0,0 +1,64 @@ +package io.roastedroot.proxywasm.internal; + +import java.util.function.Supplier; + +/** + * Holds the HTTP response body, loading it from a supplier when first accessed. + * Unlike request bodies which come from streams, response bodies can be provided + * directly as byte arrays or from suppliers. + */ +public class HttpResponseBody { + + private byte[] body; + private boolean loaded = false; + private final Supplier bodySupplier; + + public HttpResponseBody(Supplier bodySupplier) { + this.bodySupplier = bodySupplier; + } + + /** + * Creates an HttpResponseBody with a fixed byte array (no lazy loading needed). + */ + public HttpResponseBody(byte[] body) { + this.body = body; + this.loaded = true; + this.bodySupplier = null; + } + + public byte[] get() { + if (!loaded) { + if (bodySupplier != null) { + body = bodySupplier.get(); + } else { + body = new byte[0]; + } + loaded = true; + } + return body; + } + + /** + * Returns true if the response body has been loaded. + */ + public boolean isLoaded() { + return loaded; + } + + /** + * Sets the response body directly, marking it as loaded. + * This is used when the body is modified by WASM plugins. + */ + public void setBody(byte[] body) { + this.body = body; + this.loaded = true; + } + + /** + * Returns the response body if it has been loaded, null otherwise. + * This allows checking if the body was accessed without triggering a load. + */ + public byte[] getBodyIfLoaded() { + return loaded ? body : null; + } +} diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java index a579f75..5bac7a4 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/internal/PluginHttpContext.java @@ -13,21 +13,89 @@ public class PluginHttpContext { private final long startedAt = System.currentTimeMillis(); final HashMap, byte[]> properties = new HashMap<>(); - private byte[] httpRequestBody = new byte[0]; + private HttpRequestBody httpRequestBodyState; + private HttpResponseBody httpResponseBodyState; + + // Other body buffers and state fields (not lazy) private byte[] grpcReceiveBuffer = new byte[0]; private byte[] upstreamData = new byte[0]; private byte[] downStreamData = new byte[0]; - private byte[] httpResponseBody = new byte[0]; private SendResponse sendResponse; private Action action; private CountDownLatch resumeLatch; - PluginHttpContext(Plugin plugin, HttpRequestAdaptor requestAdaptor) { + public PluginHttpContext(Plugin plugin, HttpRequestAdaptor requestAdaptor) { this.plugin = plugin; this.requestAdaptor = requestAdaptor; this.context = plugin.wasm.createHttpContext(new HandlerImpl()); } + /** + * Sets the lazy request body supplier. + */ + public void setHttpRequestBodyState(HttpRequestBody supplier) { + this.httpRequestBodyState = supplier; + } + + /** + * Gets the lazy request body supplier. + */ + public HttpRequestBody getHttpRequestBodyState() { + return httpRequestBodyState; + } + + /** + * Gets the HTTP request body, triggering lazy loading if needed. + */ + public byte[] getHttpRequestBody() { + if (httpRequestBodyState != null) { + return httpRequestBodyState.get(); + } + return new byte[0]; + } + + /** + * Sets the HTTP request body, updating the supplier if present. + */ + public void setHttpRequestBody(byte[] body) { + if (httpRequestBodyState != null && body != null) { + httpRequestBodyState.setBody(body); + } + } + + /** + * Sets the HTTP response body state. + */ + public void setHttpResponseBodyState(HttpResponseBody responseBody) { + this.httpResponseBodyState = responseBody; + } + + /** + * Gets the HTTP response body state. + */ + public HttpResponseBody getHttpResponseBodyState() { + return httpResponseBodyState; + } + + /** + * Gets the HTTP response body, triggering lazy loading if needed. + */ + public byte[] getHttpResponseBody() { + if (httpResponseBodyState != null) { + return httpResponseBodyState.get(); + } + return new byte[0]; + } + + /** + * Sets the HTTP response body, updating the state if present. + */ + public void setHttpResponseBody(byte[] httpResponseBody) { + if (httpResponseBodyState != null && httpResponseBody != null) { + httpResponseBodyState.setBody(httpResponseBody); + } + } + public Plugin plugin() { return plugin; } @@ -68,22 +136,6 @@ public void maybePause() { } } - public byte[] getHttpRequestBody() { - return httpRequestBody; - } - - public void setHttpRequestBody(byte[] httpRequestBody) { - this.httpRequestBody = httpRequestBody; - } - - public byte[] getHttpResponseBody() { - return httpResponseBody; - } - - public void setHttpResponseBody(byte[] httpResponseBody) { - this.httpResponseBody = httpResponseBody; - } - public byte[] getGrpcReceiveBuffer() { return grpcReceiveBuffer; } @@ -156,17 +208,21 @@ public ProxyMap getGrpcReceiveTrailerMetaData() { @Override public byte[] getHttpRequestBody() { - return httpRequestBody; + return PluginHttpContext.this + .getHttpRequestBody(); // Delegate to outer class for lazy loading } @Override public WasmResult setHttpRequestBody(byte[] body) { - httpRequestBody = body; + PluginHttpContext.this.setHttpRequestBody(body); // Delegate to outer class return WasmResult.OK; } public void appendHttpRequestBody(byte[] body) { - httpRequestBody = Helpers.append(httpRequestBody, body); + byte[] currentBody = + PluginHttpContext.this + .getHttpRequestBody(); // This will trigger lazy loading if needed + PluginHttpContext.this.setHttpRequestBody(Helpers.append(currentBody, body)); } @Override @@ -204,17 +260,18 @@ public WasmResult setDownStreamData(byte[] data) { @Override public byte[] getHttpResponseBody() { - return httpResponseBody; + return PluginHttpContext.this.getHttpResponseBody(); } @Override public WasmResult setHttpResponseBody(byte[] body) { - httpResponseBody = body; + PluginHttpContext.this.setHttpResponseBody(body); return WasmResult.OK; } public void appendHttpResponseBody(byte[] body) { - httpResponseBody = Helpers.append(httpResponseBody, body); + byte[] currentBody = PluginHttpContext.this.getHttpResponseBody(); + PluginHttpContext.this.setHttpResponseBody(Helpers.append(currentBody, body)); } // ////////////////////////////////////////////////////////////////////// diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java index 98853e7..642f751 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/ProxyWasmFilter.java @@ -4,6 +4,8 @@ import io.roastedroot.proxywasm.StartException; import io.roastedroot.proxywasm.internal.Action; +import io.roastedroot.proxywasm.internal.HttpRequestBody; +import io.roastedroot.proxywasm.internal.HttpResponseBody; import io.roastedroot.proxywasm.internal.Plugin; import io.roastedroot.proxywasm.internal.PluginHttpContext; import io.roastedroot.proxywasm.internal.Pool; @@ -25,14 +27,19 @@ import java.util.logging.Logger; /** - * Implements the JAX-RS {@link ContainerRequestFilter}, {@link ContainerResponseFilter}, - * and {@link WriterInterceptor} interfaces to intercept HTTP requests and responses, + * Implements the JAX-RS {@link ContainerRequestFilter}, + * {@link ContainerResponseFilter}, + * and {@link WriterInterceptor} interfaces to intercept HTTP requests and + * responses, * allowing Proxy-Wasm plugins to process them. * - *

This filter is registered by the {@link ProxyWasmFeature}. It interacts with - * {@link Plugin} instances obtained from configured {@link Pool}s to execute the + *

+ * This filter is registered by the {@link ProxyWasmFeature}. It interacts with + * {@link Plugin} instances obtained from configured {@link Pool}s to execute + * the * appropriate Proxy-Wasm ABI functions (e.g., {@code on_http_request_headers}, - * {@code on_http_response_body}) at different stages of the JAX-RS request/response lifecycle. + * {@code on_http_response_body}) at different stages of the JAX-RS + * request/response lifecycle. * * @see ProxyWasmFeature * @see ProxyWasm @@ -50,7 +57,8 @@ public class ProxyWasmFilter /** * Constructs a ProxyWasmFilter. * - * @param pluginPools A list of {@link Pool} instances, each managing a pool of {@link Plugin} + * @param pluginPools A list of {@link Pool} instances, each managing a pool of + * {@link Plugin} * instances for a specific Wasm module. */ public ProxyWasmFilter(List pluginPools) { @@ -76,10 +84,15 @@ public void release() { /** * Intercepts incoming JAX-RS requests before they reach the resource method. * - *

This method iterates through the configured plugin pools, borrows a {@link Plugin} - * instance from each, creates a {@link PluginHttpContext}, and calls the plugin's - * {@code on_http_request_headers} and potentially {@code on_http_request_body} functions. - * It handles potential early responses or modifications dictated by the plugins. + *

+ * This method iterates through the configured plugin pools, borrows a + * {@link Plugin} + * instance from each, creates a {@link PluginHttpContext}, and calls the + * plugin's + * {@code on_http_request_headers} and potentially {@code on_http_request_body} + * functions. + * It handles potential early responses or modifications dictated by the + * plugins. * * @param requestContext The JAX-RS request context. * @throws IOException If an I/O error occurs, typically during body processing. @@ -89,6 +102,7 @@ public void filter(ContainerRequestContext requestContext) throws IOException { ArrayList filterContexts = new ArrayList<>(); requestContext.setProperty(FILTER_CONTEXT, filterContexts); + for (var pluginPool : pluginPools) { try { Plugin plugin = pluginPool.borrow(); @@ -100,6 +114,7 @@ public void filter(ContainerRequestContext requestContext) throws IOException { serverAdaptor.httpRequestAdaptor(requestContext); requestAdaptor.setRequestContext(requestContext); var httpContext = plugin.createHttpContext(requestAdaptor); + filterContexts.add(new FilterContext(pluginPool, plugin, httpContext)); } finally { plugin.unlock(); @@ -116,6 +131,17 @@ public void filter(ContainerRequestContext requestContext) throws IOException { return; } } + + // Create a shared lazy body supplier for all plugins + HttpRequestBody bodySupplier = new HttpRequestBody(() -> requestContext.getEntityStream()); + + // Set up lazy providers for all plugins that need the body + for (var filterContext : filterContexts) { + if (filterContext.httpContext.context().hasOnRequestBody()) { + filterContext.httpContext.setHttpRequestBodyState(bodySupplier); + } + } + for (var filterContext : filterContexts) { filter(requestContext, filterContext); } @@ -145,21 +171,15 @@ private void filter(ContainerRequestContext requestContext, FilterContext filter } // the plugin may not be interested in the request body. - if (httpContext.context().hasOnRequestBody()) { + if (!httpContext.context().hasOnRequestBody()) { + return; + } - // TODO: find out if it's more efficient to read the body in chunks and do multiple - // callOnRequestBody calls. - byte[] bytes = requestContext.getEntityStream().readAllBytes(); + HttpRequestBody httpRequestBodyState = httpContext.getHttpRequestBodyState(); - httpContext.setHttpRequestBody(bytes); + while (true) { + // if we streamed body updates, then endOfStream would be initially false var action = httpContext.context().callOnRequestBody(true); - bytes = httpContext.getHttpRequestBody(); - if (action == Action.CONTINUE) { - // continue means plugin is done reading the body. - httpContext.setHttpRequestBody(null); - } else { - httpContext.maybePause(); - } // does the plugin want to respond early? var sendResponse = httpContext.consumeSentHttpResponse(); @@ -168,7 +188,15 @@ private void filter(ContainerRequestContext requestContext, FilterContext filter return; } - // plugin may have modified the body + if (action == Action.CONTINUE) { + break; + } + httpContext.maybePause(); + } + + // Body was accessed and potentially modified, update the request stream + if (httpRequestBodyState.isLoaded()) { + byte[] bytes = httpRequestBodyState.getBodyIfLoaded(); requestContext.setEntityStream(new java.io.ByteArrayInputStream(bytes)); } @@ -184,10 +212,14 @@ private static Response internalServerError() { /** * Intercepts outgoing JAX-RS responses before the entity is written. * - *

This method iterates through the configured plugin pools, retrieves the - * {@link PluginHttpContext} created during the request phase, and calls the plugin's - * {@code on_http_response_headers} function. It handles potential modifications to the - * response headers dictated by the plugins. If the response has no entity but the plugin + *

+ * This method iterates through the configured plugin pools, retrieves the + * {@link PluginHttpContext} created during the request phase, and calls the + * plugin's + * {@code on_http_response_headers} function. It handles potential modifications + * to the + * response headers dictated by the plugins. If the response has no entity but + * the plugin * implements {@code on_http_response_body}, it invokes that callback as well. * * @param requestContext The JAX-RS request context. @@ -239,19 +271,17 @@ private void filter( } // aroundWriteTo won't be called if there is no entity to send. - if (responseContext.getEntity() == null - && httpContext.context().hasOnResponseBody()) { + if (responseContext.getEntity() != null + || !httpContext.context().hasOnResponseBody()) { + return; + } - byte[] bytes = new byte[0]; - httpContext.setHttpResponseBody(bytes); + // Set up empty response body for plugins that need it + HttpResponseBody responseBodyState = new HttpResponseBody(new byte[0]); + httpContext.setHttpResponseBodyState(responseBodyState); + + while (true) { action = httpContext.context().callOnResponseBody(true); - bytes = httpContext.getHttpResponseBody(); - if (action == Action.CONTINUE) { - // continue means plugin is done reading the body. - httpContext.setHttpResponseBody(null); - } else { - httpContext.maybePause(); - } // does the plugin want to respond early? sendResponse = httpContext.consumeSentHttpResponse(); @@ -262,6 +292,11 @@ private void filter( responseContext.setEntity(response.getEntity()); return; } + + if (action == Action.CONTINUE) { + break; + } + httpContext.maybePause(); } } finally { @@ -274,15 +309,20 @@ private void filter( /** * Intercepts the response body writing process. * - *

This method is called when the JAX-RS framework is about to serialize and write + *

+ * This method is called when the JAX-RS framework is about to serialize and + * write * the response entity. It captures the original response body, allows plugins - * (via {@code on_http_response_body}) to inspect or modify it, and then writes the + * (via {@code on_http_response_body}) to inspect or modify it, and then writes + * the * potentially modified body to the original output stream. It handles potential * early responses dictated by the plugins during body processing. * * @param ctx The JAX-RS writer interceptor context. - * @throws IOException If an I/O error occurs during stream processing. - * @throws WebApplicationException If a plugin decides to abort processing and send an + * @throws IOException If an I/O error occurs during stream + * processing. + * @throws WebApplicationException If a plugin decides to abort processing and + * send an * alternative response during body filtering. */ @Override @@ -297,41 +337,57 @@ public void aroundWriteTo(WriterInterceptorContext ctx) try { var original = ctx.getOutputStream(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ctx.setOutputStream(baos); - ctx.proceed(); - byte[] bytes = baos.toByteArray(); + HttpResponseBody sharedResponseBody = + new HttpResponseBody( + () -> { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ctx.setOutputStream(baos); + try { + ctx.proceed(); + } catch (Exception e) { + throw new RuntimeException("Failed to read response body", e); + } + return baos.toByteArray(); + }); for (var filterContext : List.copyOf(filterContexts)) { var httpContext = filterContext.httpContext; httpContext.plugin().lock(); - // the plugin may not be interested in the request body. + // the plugin may not be interested in the response body. if (!httpContext.context().hasOnResponseBody()) { - ctx.proceed(); + continue; } - httpContext.setHttpResponseBody(bytes); - var action = httpContext.context().callOnResponseBody(true); - bytes = httpContext.getHttpResponseBody(); - if (action == Action.CONTINUE) { - // continue means plugin is done reading the body. - httpContext.setHttpResponseBody(null); - } else { - httpContext.maybePause(); - } + // Set up lazy response body - will only be accessed if plugin needs it + httpContext.setHttpResponseBodyState(sharedResponseBody); - // does the plugin want to respond early? - var sendResponse = httpContext.consumeSentHttpResponse(); - if (sendResponse != null) { - throw new WebApplicationException(toResponse(sendResponse)); + while (true) { + var action = httpContext.context().callOnResponseBody(true); + + // does the plugin want to respond early? + var sendResponse = httpContext.consumeSentHttpResponse(); + if (sendResponse != null) { + throw new WebApplicationException(toResponse(sendResponse)); + } + + if (action == Action.CONTINUE) { + break; + } + httpContext.maybePause(); } } - // plugin may have modified the body - original.write(bytes); + // Write the response body - if it was accessed and modified, use that, + // otherwise continue with the original stream. + if (sharedResponseBody.isLoaded()) { + original.write(sharedResponseBody.get()); + } else { + // Body was never accessed by any plugin, use original + ctx.proceed(); + } } finally { for (var filterContext : List.copyOf(filterContexts)) {