Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.roastedroot.proxywasm;

/**
* Holds constants for the well-known header keys.
*/
public final class WellKnownHeaders {
private WellKnownHeaders() {}

public static final String SCHEME = ":scheme";
public static final String AUTHORITY = ":authority";
public static final String PATH = ":path";
public static final String METHOD = ":method";
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ public WasmPluginFactory httpHeaders() throws StartException {
.withPluginConfig("{\"header\": \"x-wasm-header\", \"value\": \"foo\"}")
.build(parseTestModule("/go-examples/http_headers/main.wasm"));
}

@Produces
public WasmPluginFactory dispatchCallOnTickTest() throws StartException {
return () ->
WasmPlugin.builder()
.withName("dispatchCallOnTickTest")
.withLogger(new MockLogger())
.withUpstreams(Map.of("web_service", "localhost:8081"))
.build(parseTestModule("/go-examples/dispatch_call_on_tick/main.wasm"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,46 @@
import io.roastedroot.proxywasm.jaxrs.NamedWasmPlugin;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.Response;

@Path("/test")
@Path("/")
public class Resources {

@Path("/httpHeaders")
@Context ContainerRequestContext requestContext;

@Path("/test/httpHeaders")
@GET
@NamedWasmPlugin("httpHeaders")
public String httpHeaders() {
return "hello world";
}

@Path("/notSharedHttpHeaders")
@Path("/test/notSharedHttpHeaders")
@GET
@NamedWasmPlugin("notSharedHttpHeaders")
public String notSharedHttpHeaders() {
return "hello world";
}

@Path("/fail")
@GET
public Response fail() {
Response.ResponseBuilder builder = Response.status(Response.Status.BAD_REQUEST);
for (String header : requestContext.getHeaders().keySet()) {
builder.header("echo-" + header, requestContext.getHeaderString(header));
}
return builder.build();
}

@Path("/ok")
@GET
public Response ok() {
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
for (String header : requestContext.getHeaders().keySet()) {
builder.header("echo-" + header, requestContext.getHeaderString(header));
}
return builder.entity("ok").build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.roastedroot.proxywasm.jaxrs.example;

import static io.roastedroot.proxywasm.jaxrs.example.Helpers.assertLogsContain;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.quarkus.test.junit.QuarkusTest;
import io.roastedroot.proxywasm.StartException;
import io.roastedroot.proxywasm.jaxrs.WasmPlugin;
import io.roastedroot.proxywasm.jaxrs.WasmPluginFeature;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

@QuarkusTest
public class DispatchCallOnTickTest {

@Inject WasmPluginFeature feature;

@Test
public void test() throws InterruptedException, StartException {
WasmPlugin plugin = feature.pool("dispatchCallOnTickTest").borrow();
assertNotNull(plugin);

var logger = (MockLogger) plugin.logger();
Thread.sleep(300);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to trigger plugin.close() instead waiting here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We basically have to wait for the tick events to get delivered to the plugin. . They are getting delivered every 100 ms (configured by the plugin). If we want avoid the wait, we would need to implement a new wasm module, avoiding the use of the async tick event to trigger the http call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for explaining, (the config is not in the diff).
A Thread.sleep always makes me a little anxious should we use Awaitility or something similar to avoid this becoming flaky over time?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.. I think I might just implement a better wasm plugin that allows us to test things better.


// for (var l : logger.loggedMessages()) {
// System.out.println(l);
// }
assertLogsContain(
logger.loggedMessages(),
"set tick period milliseconds: 100",
"called 1 for contextID=1",
"called 2 for contextID=1",
"response header for the dispatched call: Content-Type: text/plain;charset=UTF-8",
"response header for the dispatched call: echo-accept: */*",
"response header for the dispatched call: echo-content-length: 0",
"response header for the dispatched call: echo-Host: some_authority");
plugin.close(); // so that the ticks don't keep running in the background.
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package io.roastedroot.proxywasm.jaxrs.example;

import static io.roastedroot.proxywasm.jaxrs.example.Helpers.assertLogsContain;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.quarkus.test.junit.QuarkusTest;
import io.roastedroot.proxywasm.StartException;
import io.roastedroot.proxywasm.jaxrs.WasmPlugin;
import io.roastedroot.proxywasm.jaxrs.WasmPluginFeature;
import jakarta.inject.Inject;
import java.util.ArrayList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@QuarkusTest
Expand All @@ -30,12 +29,4 @@ public void testRequest() throws InterruptedException, StartException {
1, "68656c6c6f20776f726c6421"));
plugin.close(); // so that the ticks don't keep running in the background.
}

public synchronized void assertLogsContain(
ArrayList<String> loggedMessages, String... message) {
for (String m : message) {
Assertions.assertTrue(
loggedMessages.contains(m), "logged messages does not contain: " + m);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.roastedroot.proxywasm.jaxrs.example;

import java.util.ArrayList;
import org.junit.jupiter.api.Assertions;

public class Helpers {
private Helpers() {}

public static void assertLogsContain(ArrayList<String> loggedMessages, String... message) {
for (String m : message) {
Assertions.assertTrue(
loggedMessages.contains(m), "logged messages does not contain: " + m);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.roastedroot.proxywasm.jaxrs;

import static io.roastedroot.proxywasm.Helpers.bytes;
import static io.roastedroot.proxywasm.WellKnownHeaders.AUTHORITY;
import static io.roastedroot.proxywasm.WellKnownHeaders.METHOD;
import static io.roastedroot.proxywasm.WellKnownHeaders.PATH;
import static io.roastedroot.proxywasm.WellKnownHeaders.SCHEME;
import static io.roastedroot.proxywasm.WellKnownProperties.PLUGIN_NAME;
import static io.roastedroot.proxywasm.WellKnownProperties.PLUGIN_VM_ID;

import io.roastedroot.proxywasm.ArrayProxyMap;
import io.roastedroot.proxywasm.ChainedHandler;
import io.roastedroot.proxywasm.ForeignFunction;
import io.roastedroot.proxywasm.Handler;
Expand All @@ -12,6 +17,8 @@
import io.roastedroot.proxywasm.ProxyMap;
import io.roastedroot.proxywasm.WasmException;
import io.roastedroot.proxywasm.WasmResult;
import jakarta.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -46,6 +53,10 @@ public void close() {
cancelTick.run();
cancelTick = null;
}
for (var cancelHttpCall : httpCalls.values()) {
cancelHttpCall.run();
}
httpCalls.clear();
}

// //////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -200,61 +211,105 @@ public void setPlugin(WasmPlugin plugin) {
// HTTP calls
// //////////////////////////////////////////////////////////////////////

public static class HttpCall {
public enum Type {
REGULAR,
DISPATCH
}

public final int id;
public final Type callType;
public final String uri;
public final Object headers;
public final byte[] body;
public final ProxyMap trailers;
public final int timeoutMilliseconds;

public HttpCall(
int id,
Type callType,
String uri,
ProxyMap headers,
byte[] body,
ProxyMap trailers,
int timeoutMilliseconds) {
this.id = id;
this.callType = callType;
this.uri = uri;
this.headers = headers;
this.body = body;
this.trailers = trailers;
this.timeoutMilliseconds = timeoutMilliseconds;
}
}

private final AtomicInteger lastCallId = new AtomicInteger(0);
private final HashMap<Integer, HttpCall> httpCalls = new HashMap();
private final HashMap<Integer, Runnable> httpCalls = new HashMap<>();

public HashMap<Integer, HttpCall> getHttpCalls() {
return httpCalls;
}
HashMap<String, String> upstreams = new HashMap<>();
boolean strictUpstreams;

@Override
public int httpCall(
String uri, ProxyMap headers, byte[] body, ProxyMap trailers, int timeoutMilliseconds)
String upstreamName,
ProxyMap headers,
byte[] body,
ProxyMap trailers,
int timeoutMilliseconds)
throws WasmException {
var id = lastCallId.incrementAndGet();
HttpCall value =
new HttpCall(
id,
HttpCall.Type.REGULAR,
uri,
headers,
body,
trailers,
timeoutMilliseconds);
httpCalls.put(id, value);
return id;

var method = headers.get(METHOD);
if (method == null) {
throw new WasmException(WasmResult.BAD_ARGUMENT);
}

var scheme = headers.get(SCHEME);
if (scheme == null) {
scheme = "http";
}
var authority = headers.get(AUTHORITY);
if (authority == null) {
throw new WasmException(WasmResult.BAD_ARGUMENT);
}
headers.put("Host", authority);

var connectHostPort = upstreams.get(upstreamName);
if (connectHostPort == null && strictUpstreams) {
throw new WasmException(WasmResult.BAD_ARGUMENT);
}
if (connectHostPort == null) {
connectHostPort = authority;
}

var connectUri = UriBuilder.newInstance().scheme(scheme).host(connectHostPort).build();
var connectHost = connectUri.getHost();
var connectPort = connectUri.getPort();
if (connectPort == -1) {
connectPort = "https".equals(scheme) ? 443 : 80;
}

var path = headers.get(PATH);
if (path == null) {
throw new WasmException(WasmResult.BAD_ARGUMENT);
}
if (!path.isEmpty() && !path.startsWith("/")) {
path = "/" + path;
}

var uri =
URI.create(
UriBuilder.newInstance()
.scheme(scheme)
.host(authority)
.port(connectPort)
.build()
.toString()
+ path);

// Remove all the pseudo headers
for (var r : new ArrayProxyMap(headers).entries()) {
if (r.getKey().startsWith(":")) {
headers.remove(r.getKey());
}
}

try {
var id = lastCallId.incrementAndGet();
var future =
this.plugin.httpServer.scheduleHttpCall(
method,
connectHost,
connectPort,
uri,
headers,
body,
trailers,
timeoutMilliseconds,
(resp) -> {
this.plugin.lock();
try {
if (httpCalls.remove(id) == null) {
return; // the call could have already been cancelled
}
this.plugin.wasm.sendHttpCallResponse(
id, resp.headers, new ArrayProxyMap(), resp.body);
} finally {
this.plugin.unlock();
}
});
httpCalls.put(id, future);
return id;
} catch (InterruptedException e) {
throw new WasmException(WasmResult.INTERNAL_FAILURE);
}
}

@Override
Expand All @@ -265,18 +320,7 @@ public int dispatchHttpCall(
ProxyMap trailers,
int timeoutMilliseconds)
throws WasmException {
var id = lastCallId.incrementAndGet();
HttpCall value =
new HttpCall(
id,
HttpCall.Type.DISPATCH,
upstreamName,
headers,
body,
trailers,
timeoutMilliseconds);
httpCalls.put(id, value);
return id;
return httpCall(upstreamName, headers, body, trailers, timeoutMilliseconds);
}

// //////////////////////////////////////////////////////////////////////
Expand All @@ -298,8 +342,8 @@ public Metric(int id, MetricType type, String name) {
}

private final AtomicInteger lastMetricId = new AtomicInteger(0);
private HashMap<Integer, Metric> metrics = new HashMap();
private HashMap<String, Metric> metricsByName = new HashMap();
private HashMap<Integer, Metric> metrics = new HashMap<>();
private HashMap<String, Metric> metricsByName = new HashMap<>();

@Override
public int defineMetric(MetricType type, String name) throws WasmException {
Expand Down
Loading