diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/WellKnownHeaders.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/WellKnownHeaders.java new file mode 100644 index 0000000..7bab923 --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/WellKnownHeaders.java @@ -0,0 +1,13 @@ +package io.roastedroot.proxywasm; + +/** + * Holds constants for the well-known header keys. + */ +public final class WellKnownHeaders { + private WellKnownHeaders() {} + + public static final String SCHEME = ":scheme"; + public static final String AUTHORITY = ":authority"; + public static final String PATH = ":path"; + public static final String METHOD = ":method"; +} diff --git a/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/App.java b/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/App.java index 4176ece..ac248b6 100644 --- a/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/App.java +++ b/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/App.java @@ -49,4 +49,14 @@ public WasmPluginFactory httpHeaders() throws StartException { .withPluginConfig("{\"header\": \"x-wasm-header\", \"value\": \"foo\"}") .build(parseTestModule("/go-examples/http_headers/main.wasm")); } + + @Produces + public WasmPluginFactory dispatchCallOnTickTest() throws StartException { + return () -> + WasmPlugin.builder() + .withName("dispatchCallOnTickTest") + .withLogger(new MockLogger()) + .withUpstreams(Map.of("web_service", "localhost:8081")) + .build(parseTestModule("/go-examples/dispatch_call_on_tick/main.wasm")); + } } diff --git a/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/Resources.java b/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/Resources.java index dae22da..770c29f 100644 --- a/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/Resources.java +++ b/proxy-wasm-jaxrs-quarkus-example/src/main/java/io/roastedroot/proxywasm/jaxrs/example/Resources.java @@ -3,21 +3,46 @@ import io.roastedroot.proxywasm.jaxrs.NamedWasmPlugin; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response; -@Path("/test") +@Path("/") public class Resources { - @Path("/httpHeaders") + @Context ContainerRequestContext requestContext; + + @Path("/test/httpHeaders") @GET @NamedWasmPlugin("httpHeaders") public String httpHeaders() { return "hello world"; } - @Path("/notSharedHttpHeaders") + @Path("/test/notSharedHttpHeaders") @GET @NamedWasmPlugin("notSharedHttpHeaders") public String notSharedHttpHeaders() { return "hello world"; } + + @Path("/fail") + @GET + public Response fail() { + Response.ResponseBuilder builder = Response.status(Response.Status.BAD_REQUEST); + for (String header : requestContext.getHeaders().keySet()) { + builder.header("echo-" + header, requestContext.getHeaderString(header)); + } + return builder.build(); + } + + @Path("/ok") + @GET + public Response ok() { + Response.ResponseBuilder builder = Response.status(Response.Status.OK); + for (String header : requestContext.getHeaders().keySet()) { + builder.header("echo-" + header, requestContext.getHeaderString(header)); + } + return builder.entity("ok").build(); + } } diff --git a/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/DispatchCallOnTickTest.java b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/DispatchCallOnTickTest.java new file mode 100644 index 0000000..9e659b7 --- /dev/null +++ b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/DispatchCallOnTickTest.java @@ -0,0 +1,40 @@ +package io.roastedroot.proxywasm.jaxrs.example; + +import static io.roastedroot.proxywasm.jaxrs.example.Helpers.assertLogsContain; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import io.quarkus.test.junit.QuarkusTest; +import io.roastedroot.proxywasm.StartException; +import io.roastedroot.proxywasm.jaxrs.WasmPlugin; +import io.roastedroot.proxywasm.jaxrs.WasmPluginFeature; +import jakarta.inject.Inject; +import org.junit.jupiter.api.Test; + +@QuarkusTest +public class DispatchCallOnTickTest { + + @Inject WasmPluginFeature feature; + + @Test + public void test() throws InterruptedException, StartException { + WasmPlugin plugin = feature.pool("dispatchCallOnTickTest").borrow(); + assertNotNull(plugin); + + var logger = (MockLogger) plugin.logger(); + Thread.sleep(300); + + // for (var l : logger.loggedMessages()) { + // System.out.println(l); + // } + assertLogsContain( + logger.loggedMessages(), + "set tick period milliseconds: 100", + "called 1 for contextID=1", + "called 2 for contextID=1", + "response header for the dispatched call: Content-Type: text/plain;charset=UTF-8", + "response header for the dispatched call: echo-accept: */*", + "response header for the dispatched call: echo-content-length: 0", + "response header for the dispatched call: echo-Host: some_authority"); + plugin.close(); // so that the ticks don't keep running in the background. + } +} diff --git a/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/ForeignCallOnTickTest.java b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/ForeignCallOnTickTest.java index 4d1ecda..905172b 100644 --- a/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/ForeignCallOnTickTest.java +++ b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/ForeignCallOnTickTest.java @@ -1,5 +1,6 @@ package io.roastedroot.proxywasm.jaxrs.example; +import static io.roastedroot.proxywasm.jaxrs.example.Helpers.assertLogsContain; import static org.junit.jupiter.api.Assertions.assertNotNull; import io.quarkus.test.junit.QuarkusTest; @@ -7,8 +8,6 @@ import io.roastedroot.proxywasm.jaxrs.WasmPlugin; import io.roastedroot.proxywasm.jaxrs.WasmPluginFeature; import jakarta.inject.Inject; -import java.util.ArrayList; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @QuarkusTest @@ -30,12 +29,4 @@ public void testRequest() throws InterruptedException, StartException { 1, "68656c6c6f20776f726c6421")); plugin.close(); // so that the ticks don't keep running in the background. } - - public synchronized void assertLogsContain( - ArrayList loggedMessages, String... message) { - for (String m : message) { - Assertions.assertTrue( - loggedMessages.contains(m), "logged messages does not contain: " + m); - } - } } diff --git a/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/Helpers.java b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/Helpers.java new file mode 100644 index 0000000..66b093b --- /dev/null +++ b/proxy-wasm-jaxrs-quarkus-example/src/test/java/io/roastedroot/proxywasm/jaxrs/example/Helpers.java @@ -0,0 +1,15 @@ +package io.roastedroot.proxywasm.jaxrs.example; + +import java.util.ArrayList; +import org.junit.jupiter.api.Assertions; + +public class Helpers { + private Helpers() {} + + public static void assertLogsContain(ArrayList loggedMessages, String... message) { + for (String m : message) { + Assertions.assertTrue( + loggedMessages.contains(m), "logged messages does not contain: " + m); + } + } +} diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/PluginHandler.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/PluginHandler.java index bdd4976..24aa2dc 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/PluginHandler.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/PluginHandler.java @@ -1,9 +1,14 @@ package io.roastedroot.proxywasm.jaxrs; import static io.roastedroot.proxywasm.Helpers.bytes; +import static io.roastedroot.proxywasm.WellKnownHeaders.AUTHORITY; +import static io.roastedroot.proxywasm.WellKnownHeaders.METHOD; +import static io.roastedroot.proxywasm.WellKnownHeaders.PATH; +import static io.roastedroot.proxywasm.WellKnownHeaders.SCHEME; import static io.roastedroot.proxywasm.WellKnownProperties.PLUGIN_NAME; import static io.roastedroot.proxywasm.WellKnownProperties.PLUGIN_VM_ID; +import io.roastedroot.proxywasm.ArrayProxyMap; import io.roastedroot.proxywasm.ChainedHandler; import io.roastedroot.proxywasm.ForeignFunction; import io.roastedroot.proxywasm.Handler; @@ -12,6 +17,8 @@ import io.roastedroot.proxywasm.ProxyMap; import io.roastedroot.proxywasm.WasmException; import io.roastedroot.proxywasm.WasmResult; +import jakarta.ws.rs.core.UriBuilder; +import java.net.URI; import java.util.HashMap; import java.util.List; import java.util.Objects; @@ -46,6 +53,10 @@ public void close() { cancelTick.run(); cancelTick = null; } + for (var cancelHttpCall : httpCalls.values()) { + cancelHttpCall.run(); + } + httpCalls.clear(); } // ////////////////////////////////////////////////////////////////////// @@ -200,61 +211,105 @@ public void setPlugin(WasmPlugin plugin) { // HTTP calls // ////////////////////////////////////////////////////////////////////// - public static class HttpCall { - public enum Type { - REGULAR, - DISPATCH - } - - public final int id; - public final Type callType; - public final String uri; - public final Object headers; - public final byte[] body; - public final ProxyMap trailers; - public final int timeoutMilliseconds; - - public HttpCall( - int id, - Type callType, - String uri, - ProxyMap headers, - byte[] body, - ProxyMap trailers, - int timeoutMilliseconds) { - this.id = id; - this.callType = callType; - this.uri = uri; - this.headers = headers; - this.body = body; - this.trailers = trailers; - this.timeoutMilliseconds = timeoutMilliseconds; - } - } - private final AtomicInteger lastCallId = new AtomicInteger(0); - private final HashMap httpCalls = new HashMap(); + private final HashMap httpCalls = new HashMap<>(); - public HashMap getHttpCalls() { - return httpCalls; - } + HashMap upstreams = new HashMap<>(); + boolean strictUpstreams; @Override public int httpCall( - String uri, ProxyMap headers, byte[] body, ProxyMap trailers, int timeoutMilliseconds) + String upstreamName, + ProxyMap headers, + byte[] body, + ProxyMap trailers, + int timeoutMilliseconds) throws WasmException { - var id = lastCallId.incrementAndGet(); - HttpCall value = - new HttpCall( - id, - HttpCall.Type.REGULAR, - uri, - headers, - body, - trailers, - timeoutMilliseconds); - httpCalls.put(id, value); - return id; + + var method = headers.get(METHOD); + if (method == null) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + + var scheme = headers.get(SCHEME); + if (scheme == null) { + scheme = "http"; + } + var authority = headers.get(AUTHORITY); + if (authority == null) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + headers.put("Host", authority); + + var connectHostPort = upstreams.get(upstreamName); + if (connectHostPort == null && strictUpstreams) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + if (connectHostPort == null) { + connectHostPort = authority; + } + + var connectUri = UriBuilder.newInstance().scheme(scheme).host(connectHostPort).build(); + var connectHost = connectUri.getHost(); + var connectPort = connectUri.getPort(); + if (connectPort == -1) { + connectPort = "https".equals(scheme) ? 443 : 80; + } + + var path = headers.get(PATH); + if (path == null) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + if (!path.isEmpty() && !path.startsWith("/")) { + path = "/" + path; + } + + var uri = + URI.create( + UriBuilder.newInstance() + .scheme(scheme) + .host(authority) + .port(connectPort) + .build() + .toString() + + path); + + // Remove all the pseudo headers + for (var r : new ArrayProxyMap(headers).entries()) { + if (r.getKey().startsWith(":")) { + headers.remove(r.getKey()); + } + } + + try { + var id = lastCallId.incrementAndGet(); + var future = + this.plugin.httpServer.scheduleHttpCall( + method, + connectHost, + connectPort, + uri, + headers, + body, + trailers, + timeoutMilliseconds, + (resp) -> { + this.plugin.lock(); + try { + if (httpCalls.remove(id) == null) { + return; // the call could have already been cancelled + } + this.plugin.wasm.sendHttpCallResponse( + id, resp.headers, new ArrayProxyMap(), resp.body); + } finally { + this.plugin.unlock(); + } + }); + httpCalls.put(id, future); + return id; + } catch (InterruptedException e) { + throw new WasmException(WasmResult.INTERNAL_FAILURE); + } } @Override @@ -265,18 +320,7 @@ public int dispatchHttpCall( ProxyMap trailers, int timeoutMilliseconds) throws WasmException { - var id = lastCallId.incrementAndGet(); - HttpCall value = - new HttpCall( - id, - HttpCall.Type.DISPATCH, - upstreamName, - headers, - body, - trailers, - timeoutMilliseconds); - httpCalls.put(id, value); - return id; + return httpCall(upstreamName, headers, body, trailers, timeoutMilliseconds); } // ////////////////////////////////////////////////////////////////////// @@ -298,8 +342,8 @@ public Metric(int id, MetricType type, String name) { } private final AtomicInteger lastMetricId = new AtomicInteger(0); - private HashMap metrics = new HashMap(); - private HashMap metricsByName = new HashMap(); + private HashMap metrics = new HashMap<>(); + private HashMap metricsByName = new HashMap<>(); @Override public int defineMetric(MetricType type, String name) throws WasmException { diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/WasmPlugin.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/WasmPlugin.java index 6a16ca1..2d29b8c 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/WasmPlugin.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/WasmPlugin.java @@ -17,7 +17,8 @@ public class WasmPlugin { final PluginHandler handler; - private final ReentrantLock lock; + private final ReentrantLock lock = new ReentrantLock(); + private final boolean shared; final ProxyWasm wasm; HttpServer httpServer; @@ -28,9 +29,9 @@ public Logger logger() { private WasmPlugin(ProxyWasm proxyWasm, PluginHandler handler, boolean shared) { Objects.requireNonNull(proxyWasm); Objects.requireNonNull(handler); + this.shared = shared; this.wasm = proxyWasm; this.handler = handler; - this.lock = shared ? new ReentrantLock() : null; this.handler.setPlugin(this); } @@ -43,21 +44,15 @@ public static WasmPlugin.Builder builder() { } public void lock() { - if (lock == null) { - return; - } lock.lock(); } public void unlock() { - if (lock == null) { - return; - } lock.unlock(); } public boolean isShared() { - return lock != null; + return shared; } public void setHttpServer(HttpServer httpServer) { @@ -91,6 +86,16 @@ public Builder withForeignFunctions(Map functions) { return this; } + public Builder withUpstreams(Map upstreams) { + this.handler.upstreams = new HashMap<>(upstreams); + return this; + } + + public Builder withStrictUpstreams(boolean strictUpstreams) { + this.handler.strictUpstreams = strictUpstreams; + return this; + } + public Builder withMinTickPeriodMilliseconds(int minTickPeriodMilliseconds) { this.handler.minTickPeriodMilliseconds = minTickPeriodMilliseconds; return this; diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/servlet/ServletHttpServer.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/servlet/ServletHttpServer.java index 6c363b2..b24afee 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/servlet/ServletHttpServer.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/servlet/ServletHttpServer.java @@ -1,9 +1,19 @@ package io.roastedroot.proxywasm.jaxrs.servlet; +import io.roastedroot.proxywasm.ArrayProxyMap; +import io.roastedroot.proxywasm.ProxyMap; import io.roastedroot.proxywasm.jaxrs.spi.HttpServer; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Alternative; +import jakarta.ws.rs.core.UriBuilder; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -13,14 +23,85 @@ @ApplicationScoped public class ServletHttpServer implements HttpServer { - ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + ScheduledExecutorService tickExecutorService = Executors.newScheduledThreadPool(1); + ExecutorService executorService = Executors.newWorkStealingPool(5); + HttpClient client = HttpClient.newHttpClient(); @Override public Runnable scheduleTick(long delay, Runnable task) { - var f = executorService.scheduleAtFixedRate(task, delay, delay, TimeUnit.MILLISECONDS); + var f = tickExecutorService.scheduleAtFixedRate(task, delay, delay, TimeUnit.MILLISECONDS); return () -> { ; f.cancel(false); }; } + + @Override + public Runnable scheduleHttpCall( + String method, + String host, + int port, + URI uri, + ProxyMap headers, + byte[] body, + ProxyMap trailers, + int timeout, + HandlerHttpResponseHandler handler) + throws InterruptedException { + + var f = + executorService + .invokeAll( + List.of( + () -> { + var resp = + httpCall( + method, host, port, uri, headers, body); + handler.call(resp); + return null; + }), + timeout, + TimeUnit.MILLISECONDS) + .get(0); + return () -> { + f.cancel(true); + }; + } + + private HandlerHttpResponse httpCall( + String method, String host, int port, URI uri, ProxyMap headers, byte[] body) + throws IOException, InterruptedException { + + var connectUri = UriBuilder.fromUri(uri).host(host).port(port).build(); + + var builder = HttpRequest.newBuilder().uri(connectUri); + for (var e : headers.entries()) { + builder.header(e.getKey(), e.getValue()); + } + builder.method(method, HttpRequest.BodyPublishers.ofByteArray(body)); + var request = builder.build(); + + HttpResponse response = + client.send(request, HttpResponse.BodyHandlers.ofByteArray()); + response.headers() + .map() + .forEach( + (k, v) -> { + for (var s : v) { + headers.add(k, s); + } + }); + + var h = new ArrayProxyMap(); + response.headers() + .map() + .forEach( + (k, v) -> { + for (var s : v) { + h.add(k, s); + } + }); + + return new HandlerHttpResponse(response.statusCode(), h, response.body()); + } } diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/spi/HttpServer.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/spi/HttpServer.java index 5d34bb5..8fdd7e0 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/spi/HttpServer.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/spi/HttpServer.java @@ -1,9 +1,41 @@ package io.roastedroot.proxywasm.jaxrs.spi; +import io.roastedroot.proxywasm.ProxyMap; +import java.net.URI; + /** * This interface will help us deal with differences in the http server impl. */ public interface HttpServer { Runnable scheduleTick(long delay, Runnable task); + + Runnable scheduleHttpCall( + String method, + String host, + int port, + URI uri, + ProxyMap headers, + byte[] body, + ProxyMap trailers, + int timeout, + HandlerHttpResponseHandler handler) + throws InterruptedException; + + class HandlerHttpResponse { + + public final int statusCode; + public final ProxyMap headers; + public final byte[] body; + + public HandlerHttpResponse(int statusCode, ProxyMap headers, byte[] body) { + this.statusCode = statusCode; + this.headers = headers; + this.body = body; + } + } + + interface HandlerHttpResponseHandler { + void call(HandlerHttpResponse response); + } } diff --git a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/vertx/VertxHttpServer.java b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/vertx/VertxHttpServer.java index f5e610b..c6ebe73 100644 --- a/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/vertx/VertxHttpServer.java +++ b/proxy-wasm-jaxrs/src/main/java/io/roastedroot/proxywasm/jaxrs/vertx/VertxHttpServer.java @@ -1,11 +1,19 @@ package io.roastedroot.proxywasm.jaxrs.vertx; +import io.roastedroot.proxywasm.ProxyMap; import io.roastedroot.proxywasm.jaxrs.spi.HttpServer; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpMethod; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Alternative; import jakarta.inject.Inject; +import java.net.URI; @Alternative @Priority(200) @@ -14,6 +22,18 @@ public class VertxHttpServer implements HttpServer { @Inject Vertx vertx; + HttpClient client; + + @PostConstruct + public void setup() { + this.client = vertx.createHttpClient(); + } + + @PreDestroy + public void close() { + client.close(); + } + @Override public Runnable scheduleTick(long delay, Runnable task) { var id = vertx.setPeriodic(delay, x -> task.run()); @@ -21,4 +41,59 @@ public Runnable scheduleTick(long delay, Runnable task) { vertx.cancelTimer(id); }; } + + @Override + public Runnable scheduleHttpCall( + String method, + String host, + int port, + URI uri, + ProxyMap headers, + byte[] body, + ProxyMap trailers, + int timeout, + HandlerHttpResponseHandler handler) + throws InterruptedException { + var f = + client.request(HttpMethod.valueOf(method), port, host, uri.toString()) + .compose( + req -> { + for (var e : headers.entries()) { + req.headers().add(e.getKey(), e.getValue()); + } + req.idleTimeout(timeout); + return req.send(Buffer.buffer(body)); + }) + .onComplete( + resp -> { + if (resp.succeeded()) { + HttpClientResponse result = resp.result(); + result.bodyHandler( + bodyHandler -> { + var h = ProxyMap.of(); + result.headers() + .forEach( + e -> + h.add( + e.getKey(), + e.getValue())); + handler.call( + new HandlerHttpResponse( + result.statusCode(), + h, + bodyHandler.getBytes())); + }); + } else { + handler.call( + new HandlerHttpResponse( + 500, + ProxyMap.of(), + resp.cause().getMessage().getBytes())); + } + }); + + return () -> { + // There doesn't seem to be a way to cancel the request. + }; + } }