diff --git a/pom.xml b/pom.xml
index cbf44d5..db8b27a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -225,6 +225,7 @@
quarkus-proxy-wasm
quarkus-proxy-wasm-example
quarkus-x-corazawaf-example
+ quarkus-x-kuadrant-example
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java
index 5a76d3e..fb05194 100644
--- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java
@@ -13,6 +13,7 @@
import com.dylibso.chicory.runtime.WasmRuntimeException;
import com.dylibso.chicory.wasm.InvalidException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -487,7 +488,11 @@ void proxyOnTick(int arg0) {
*/
@WasmExport
int proxyGetBufferBytes(
- int bufferType, int start, int length, int returnBufferData, int returnBufferSize) {
+ int bufferType,
+ int start,
+ int chunkLength,
+ int returnBufferData,
+ int returnBufferSize) {
try {
// Get the buffer based on the buffer type
@@ -496,29 +501,34 @@ int proxyGetBufferBytes(
return WasmResult.NOT_FOUND.getValue();
}
- if (start > start + length) {
+ if (start < 0) {
return WasmResult.BAD_ARGUMENT.getValue();
}
+ int maxChunkLength = b.length - start;
+ if (chunkLength < 0 || chunkLength > maxChunkLength) {
+ chunkLength = maxChunkLength;
+ }
+
ByteBuffer buffer = ByteBuffer.wrap(b);
- if (start + length > buffer.capacity()) {
- length = buffer.capacity() - start;
+ if (start + chunkLength > buffer.capacity()) {
+ chunkLength = buffer.capacity() - start;
}
try {
buffer.position(start);
- buffer.limit(start + length);
+ buffer.limit(start + chunkLength);
} catch (IllegalArgumentException e) {
return WasmResult.BAD_ARGUMENT.getValue();
}
// Allocate memory in the WebAssembly instance
- int addr = malloc(length);
+ int addr = malloc(chunkLength);
putMemory(addr, buffer);
// Write the address to the return pointer
putUint32(returnBufferData, addr);
// Write the length to the return size pointer
- putUint32(returnBufferSize, length);
+ putUint32(returnBufferSize, chunkLength);
return WasmResult.OK.getValue();
} catch (WasmException e) {
@@ -713,16 +723,24 @@ int proxyGetHeaderMapPairs(int mapType, int returnDataPtr, int returnDataSize) {
return WasmResult.NOT_FOUND.getValue();
}
- // to clone the headers so that they don't change on while we process them in the loop
- var cloneMap = new ArrayProxyMap(header);
+ var cloneMap = new ArrayList>();
int totalBytesLen = U32_LEN; // Start with space for the count
- for (Map.Entry entry : cloneMap.entries()) {
- String key = entry.getKey();
- String value = entry.getValue();
- totalBytesLen += U32_LEN + U32_LEN; // keyLen + valueLen
- totalBytesLen += key.length() + 1 + value.length() + 1; // key + \0 + value + \0
- }
+ totalBytesLen +=
+ header.streamBytes()
+ .mapToInt(
+ entry -> {
+ var key = entry.getKey();
+ var value = entry.getValue();
+ cloneMap.add(Map.entry(key, value));
+ return U32_LEN
+ + U32_LEN // keyLen + valueLen
+ + key.length
+ + 1
+ + value.length
+ + 1; // key + \0 + value + \0
+ })
+ .sum();
// Allocate memory in the WebAssembly instance
int addr = malloc(totalBytesLen);
@@ -735,29 +753,29 @@ int proxyGetHeaderMapPairs(int mapType, int returnDataPtr, int returnDataSize) {
int dataPtr = lenPtr + ((U32_LEN + U32_LEN) * cloneMap.size());
// Write each key-value pair to memory
- for (Map.Entry entry : cloneMap.entries()) {
- String key = entry.getKey();
- String value = entry.getValue();
+ for (Map.Entry entry : cloneMap) {
+ var key = entry.getKey();
+ var value = entry.getValue();
// Write key length
- putUint32(lenPtr, key.length());
+ putUint32(lenPtr, key.length);
lenPtr += U32_LEN;
// Write value length
- putUint32(lenPtr, value.length());
+ putUint32(lenPtr, value.length);
lenPtr += U32_LEN;
// Write key bytes
- putMemory(dataPtr, key.getBytes());
- dataPtr += key.length();
+ putMemory(dataPtr, key);
+ dataPtr += key.length;
// Write null terminator for key
putByte(dataPtr, (byte) 0);
dataPtr++;
// Write value bytes
- putMemory(dataPtr, value.getBytes());
- dataPtr += value.length();
+ putMemory(dataPtr, value);
+ dataPtr += value.length;
// Write null terminator for value
putByte(dataPtr, (byte) 0);
@@ -1447,7 +1465,7 @@ int proxyGrpcCall(
message,
timeout);
putUint32(returnCalloutID, callId);
- return callId;
+ return WasmResult.OK.getValue();
} catch (WasmException e) {
return e.result().getValue();
}
@@ -1478,7 +1496,7 @@ int proxyGrpcStream(
int streamId =
handler.grpcStream(upstreamName, serviceName, methodName, initialMetadata);
putUint32(returnStreamId, streamId);
- return streamId;
+ return WasmResult.OK.getValue();
} catch (WasmException e) {
return e.result().getValue();
}
@@ -1539,21 +1557,21 @@ void proxyOnGrpcReceive(int contextId, int callId, int messageSize) {
/**
* implements https://github.com/proxy-wasm/spec/tree/main/abi-versions/vNEXT#proxy_on_grpc_receive_trailing_metadata
*/
- void proxyOnGrpcReceiveTrailingMetadata(int arg0, int arg1, int arg2) {
+ void proxyOnGrpcReceiveTrailingMetadata(int contextId, int callId, int numElements) {
if (proxyOnGrpcReceiveTrailingMetadataFn == null) {
return;
}
- proxyOnGrpcReceiveTrailingMetadataFn.apply(arg0, arg1, arg2);
+ proxyOnGrpcReceiveTrailingMetadataFn.apply(contextId, callId, numElements);
}
/**
* implements https://github.com/proxy-wasm/spec/tree/main/abi-versions/vNEXT#proxy_on_grpc_close
*/
- void proxyOnGrpcClose(int arg0, int arg1, int arg2) {
+ void proxyOnGrpcClose(int contextId, int callId, int statusCode) {
if (proxyOnGrpcCloseFn == null) {
return;
}
- proxyOnGrpcCloseFn.apply(arg0, arg1, arg2);
+ proxyOnGrpcCloseFn.apply(contextId, callId, statusCode);
}
// //////////////////////////////////////////////////////////////////////
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java
new file mode 100644
index 0000000..af6c8a7
--- /dev/null
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java
@@ -0,0 +1,131 @@
+package io.roastedroot.proxywasm;
+
+import static io.roastedroot.proxywasm.Helpers.bytes;
+import static io.roastedroot.proxywasm.Helpers.len;
+import static io.roastedroot.proxywasm.Helpers.string;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ArrayBytesProxyMap implements ProxyMap {
+
+ final ArrayList> entries;
+
+ public ArrayBytesProxyMap() {
+ this.entries = new ArrayList<>();
+ }
+
+ public ArrayBytesProxyMap(int mapSize) {
+ this.entries = new ArrayList<>(mapSize);
+ }
+
+ @Override
+ public int size() {
+ return entries.size();
+ }
+
+ @Override
+ public void add(String key, String value) {
+ entries.add(Map.entry(key, bytes(value)));
+ }
+
+ public void add(String key, byte[] value) {
+ entries.add(Map.entry(key, value));
+ }
+
+ @Override
+ public void put(String key, String value) {
+ this.remove(key);
+ entries.add(Map.entry(key, bytes(value)));
+ }
+
+ public void put(String key, byte[] value) {
+ this.remove(key);
+ entries.add(Map.entry(key, value));
+ }
+
+ @Override
+ public Iterable extends Map.Entry> entries() {
+ return entries.stream()
+ .map(x -> Map.entry(x.getKey(), string(x.getValue())))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Stream> streamBytes() {
+ return entries.stream().map(x -> Map.entry(bytes(x.getKey()), x.getValue()));
+ }
+
+ @Override
+ public String get(String key) {
+ return entries.stream()
+ .filter(x -> x.getKey().equals(key))
+ .map(Map.Entry::getValue)
+ .map(Helpers::string)
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Override
+ public void remove(String key) {
+ entries.removeIf(x -> x.getKey().equals(key));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ArrayBytesProxyMap that = (ArrayBytesProxyMap) o;
+ return Objects.equals(entries, that.entries);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(entries);
+ }
+
+ @Override
+ public String toString() {
+ return entries.toString();
+ }
+
+ /**
+ * Encode the map into a byte array.
+ */
+ @Override
+ public byte[] encode() {
+ try {
+ var baos = new ByteArrayOutputStream();
+ var o = new DataOutputStream(baos);
+ // Write header size (number of entries)
+ int mapSize = this.size();
+ o.writeInt(mapSize);
+
+ // write all the key / value sizes.
+ for (var entry : entries) {
+ o.writeInt(len(entry.getKey()));
+ o.writeInt(len(entry.getValue()));
+ }
+
+ // write all the key / values
+ for (var entry : entries) {
+ o.write(bytes(entry.getKey()));
+ o.write(0);
+ o.write(entry.getValue());
+ o.write(0);
+ }
+ o.close();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ // this should never happen since we are not really doing IO
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java
index de897a4..b2c0165 100644
--- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java
@@ -8,6 +8,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
public interface ProxyMap {
@@ -37,6 +39,11 @@ static ProxyMap copyOf(Map headers) {
Iterable extends Map.Entry> entries();
+ default Stream> streamBytes() {
+ return StreamSupport.stream(entries().spliterator(), false)
+ .map(x -> Map.entry(bytes(x.getKey()), bytes(x.getValue())));
+ }
+
String get(String key);
void remove(String key);
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java
index a039d21..f502aff 100644
--- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java
@@ -36,6 +36,9 @@ public final class ProxyWasm implements Closeable {
private ProxyMap httpCallResponseTrailers;
private byte[] httpCallResponseBody;
private Handler pluginHandler;
+ private ArrayBytesProxyMap grpcReceiveInitialMetadata;
+ private byte[] grpcReceive;
+ private ArrayBytesProxyMap grpcReceiveTrailingMetadata;
private ProxyWasm(Builder other) throws StartException {
this.pluginHandler = Objects.requireNonNullElse(other.pluginHandler, new Handler() {});
@@ -126,6 +129,21 @@ public ProxyMap getHttpCallResponseTrailers() {
public byte[] getHttpCallResponseBody() {
return httpCallResponseBody;
}
+
+ @Override
+ public ProxyMap getGrpcReceiveInitialMetaData() {
+ return grpcReceiveInitialMetadata;
+ }
+
+ @Override
+ public byte[] getGrpcReceiveBuffer() {
+ return grpcReceive;
+ }
+
+ @Override
+ public ProxyMap getGrpcReceiveTrailerMetaData() {
+ return grpcReceiveTrailingMetadata;
+ }
};
}
@@ -195,7 +213,6 @@ public void sendHttpCallResponse(
public void sendHttpCallResponse(
int calloutID, ProxyMap headers, ProxyMap trailers, byte[] body) {
-
this.httpCallResponseHeaders = headers;
this.httpCallResponseTrailers = trailers;
this.httpCallResponseBody = body;
@@ -208,6 +225,28 @@ public void sendHttpCallResponse(
this.httpCallResponseBody = null;
}
+ public void sendGrpcReceiveInitialMetadata(int calloutID, ArrayBytesProxyMap headers) {
+ this.grpcReceiveInitialMetadata = headers;
+ this.abi.proxyOnGrpcReceiveInitialMetadata(pluginContext.id(), calloutID, headers.size());
+ this.grpcReceiveInitialMetadata = null;
+ }
+
+ public void sendGrpcReceive(int calloutID, byte[] body) {
+ this.grpcReceive = body;
+ this.abi.proxyOnGrpcReceive(pluginContext.id(), calloutID, len(body));
+ this.grpcReceive = null;
+ }
+
+ public void sendGrpcReceiveTrailingMetadata(int calloutID, ArrayBytesProxyMap headers) {
+ this.grpcReceiveTrailingMetadata = headers;
+ this.abi.proxyOnGrpcReceiveTrailingMetadata(pluginContext.id(), calloutID, headers.size());
+ this.grpcReceiveTrailingMetadata = null;
+ }
+
+ public void sendGrpcClose(int calloutID, int statusCode) {
+ this.abi.proxyOnGrpcClose(pluginContext.id(), calloutID, statusCode);
+ }
+
public void sendOnQueueReady(int queueId) {
this.abi.proxyOnQueueReady(pluginContext.id(), queueId);
}
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java
new file mode 100644
index 0000000..e9fd1f3
--- /dev/null
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java
@@ -0,0 +1,16 @@
+package io.roastedroot.proxywasm.plugin;
+
+import io.roastedroot.proxywasm.ProxyMap;
+
+public class GrpcCallResponse {
+
+ public final int statusCode;
+ public final ProxyMap headers;
+ public final byte[] body;
+
+ public GrpcCallResponse(int statusCode, ProxyMap headers, byte[] body) {
+ this.statusCode = statusCode;
+ this.headers = headers;
+ this.body = body;
+ }
+}
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java
new file mode 100644
index 0000000..fdb2949
--- /dev/null
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java
@@ -0,0 +1,14 @@
+package io.roastedroot.proxywasm.plugin;
+
+import io.roastedroot.proxywasm.ArrayBytesProxyMap;
+
+public interface GrpcCallResponseHandler {
+
+ void onHeaders(ArrayBytesProxyMap trailerMap);
+
+ void onMessage(byte[] data);
+
+ void onTrailers(ArrayBytesProxyMap trailers);
+
+ void onClose(int status);
+}
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java
index 9d0a82a..a2dd6b6 100644
--- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java
@@ -13,6 +13,7 @@
import com.dylibso.chicory.runtime.Machine;
import com.dylibso.chicory.wasi.WasiOptions;
import com.dylibso.chicory.wasm.WasmModule;
+import io.roastedroot.proxywasm.ArrayBytesProxyMap;
import io.roastedroot.proxywasm.ArrayProxyMap;
import io.roastedroot.proxywasm.ChainedHandler;
import io.roastedroot.proxywasm.ForeignFunction;
@@ -55,8 +56,9 @@ private Plugin(Builder builder, ProxyWasm proxyWasm) throws StartException {
Objects.requireNonNull(proxyWasm);
this.name = Objects.requireNonNullElse(builder.name, "default");
this.shared = builder.shared;
- this.foreignFunctions = builder.foreignFunctions;
- this.upstreams = builder.upstreams;
+ this.foreignFunctions =
+ Objects.requireNonNullElseGet(builder.foreignFunctions, HashMap::new);
+ this.upstreams = Objects.requireNonNullElseGet(builder.upstreams, HashMap::new);
this.strictUpstreams = builder.strictUpstreams;
this.minTickPeriodMilliseconds = builder.minTickPeriodMilliseconds;
this.vmConfig = builder.vmConfig;
@@ -119,10 +121,14 @@ public void close() {
cancelTick.run();
cancelTick = null;
}
- for (var cancelHttpCall : httpCalls.values()) {
- cancelHttpCall.run();
+ for (var cancel : httpCalls.values()) {
+ cancel.run();
}
httpCalls.clear();
+ for (var cancel : grpcCalls.values()) {
+ cancel.run();
+ }
+ grpcCalls.clear();
} finally {
unlock();
@@ -253,12 +259,13 @@ public Plugin build(ProxyWasm proxyWasm) throws StartException {
byte[] pluginConfig;
private final AtomicInteger lastCallId = new AtomicInteger(0);
private final HashMap httpCalls = new HashMap<>();
- HashMap upstreams = new HashMap<>();
+ private final HashMap grpcCalls = new HashMap<>();
+ private final HashMap upstreams;
boolean strictUpstreams;
int minTickPeriodMilliseconds;
private int tickPeriodMilliseconds;
private Runnable cancelTick;
- HashMap foreignFunctions;
+ private final HashMap foreignFunctions;
private byte[] funcCallData = new byte[0];
private final HashMap, byte[]> properties = new HashMap<>();
@@ -470,6 +477,8 @@ public int httpCall(
return id;
} catch (InterruptedException e) {
throw new WasmException(WasmResult.INTERNAL_FAILURE);
+ } catch (UnsupportedOperationException e) {
+ throw new WasmException(WasmResult.UNIMPLEMENTED);
}
}
@@ -484,6 +493,129 @@ public int dispatchHttpCall(
return httpCall(upstreamName, headers, body, trailers, timeoutMilliseconds);
}
+ // //////////////////////////////////////////////////////////////////////
+ // GRPC calls
+ // //////////////////////////////////////////////////////////////////////
+
+ @Override
+ public int grpcCall(
+ String upstreamName,
+ String serviceName,
+ String methodName,
+ ProxyMap headers,
+ byte[] body,
+ int timeoutMilliseconds)
+ throws WasmException {
+
+ var connectHostPort = upstreams.get(upstreamName);
+ if (connectHostPort == null && strictUpstreams) {
+ throw new WasmException(WasmResult.BAD_ARGUMENT);
+ }
+ if (connectHostPort == null) {
+ connectHostPort = upstreamName;
+ }
+
+ URI connectUri = null;
+ try {
+ connectUri = URI.create(connectHostPort);
+ } catch (IllegalArgumentException e) {
+ throw new WasmException(WasmResult.BAD_ARGUMENT);
+ }
+
+ if (!("http".equals(connectUri.getScheme())
+ || "https".equals(connectUri.getScheme()))) {
+ logger.log(
+ LogLevel.ERROR,
+ "grpc call upstream not mapped to URL with a http/https scheme: "
+ + upstreamName);
+ throw new WasmException(WasmResult.BAD_ARGUMENT);
+ }
+
+ var connectHost = connectUri.getHost();
+ var connectPort = connectUri.getPort();
+ if (connectPort == -1) {
+ connectPort = "https".equals(connectUri.getScheme()) ? 443 : 80;
+ }
+
+ try {
+
+ var id = lastCallId.incrementAndGet();
+ var callHandler =
+ new GrpcCallResponseHandler() {
+
+ @Override
+ public void onHeaders(ArrayBytesProxyMap headers) {
+ lock();
+ try {
+ if (grpcCalls.get(id) == null) {
+ return; // the call could have already been cancelled
+ }
+ wasm.sendGrpcReceiveInitialMetadata(id, headers);
+ } finally {
+ unlock();
+ }
+ }
+
+ @Override
+ public void onMessage(byte[] data) {
+ lock();
+ try {
+ if (grpcCalls.get(id) == null) {
+ return; // the call could have already been cancelled
+ }
+ wasm.sendGrpcReceive(id, data);
+ } finally {
+ unlock();
+ }
+ }
+
+ @Override
+ public void onTrailers(ArrayBytesProxyMap trailers) {
+ lock();
+ try {
+ if (grpcCalls.get(id) == null) {
+ return; // the call could have already been cancelled
+ }
+ wasm.sendGrpcReceiveTrailingMetadata(id, trailers);
+ } finally {
+ unlock();
+ }
+ }
+
+ @Override
+ public void onClose(int status) {
+ lock();
+ try {
+ if (grpcCalls.get(id) == null) {
+ return; // the call could have already been cancelled
+ }
+ wasm.sendGrpcClose(id, status);
+ } finally {
+ unlock();
+ }
+ }
+ };
+
+ var future =
+ serverAdaptor.scheduleGrpcCall(
+ connectHost,
+ connectPort,
+ "http".equals(connectUri.getScheme()),
+ serviceName,
+ methodName,
+ headers,
+ body,
+ timeoutMilliseconds,
+ callHandler);
+ grpcCalls.put(id, future);
+ return id;
+ } catch (InterruptedException e) {
+ throw new WasmException(WasmResult.INTERNAL_FAILURE);
+ } catch (UnsupportedOperationException e) {
+ throw new WasmException(WasmResult.UNIMPLEMENTED);
+ }
+ }
+
// //////////////////////////////////////////////////////////////////////
// Metrics
// //////////////////////////////////////////////////////////////////////
@@ -519,9 +651,6 @@ public WasmResult removeMetric(int metricId) {
@Override
public ForeignFunction getForeignFunction(String name) {
- if (foreignFunctions == null) {
- return null;
- }
return foreignFunctions.get(name);
}
diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java
index 66af5e1..54b4557 100644
--- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java
+++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java
@@ -12,7 +12,7 @@ public interface ServerAdaptor {
HttpRequestAdaptor httpRequestAdaptor(Object context);
- Runnable scheduleHttpCall(
+ default Runnable scheduleHttpCall(
String method,
String host,
int port,
@@ -22,5 +22,21 @@ Runnable scheduleHttpCall(
ProxyMap trailers,
int timeout,
HttpCallResponseHandler handler)
- throws InterruptedException;
+ throws InterruptedException {
+ throw new UnsupportedOperationException("scheduleHttpCall not implemented");
+ }
+
+ default Runnable scheduleGrpcCall(
+ String host,
+ int port,
+ boolean plainText,
+ String serviceName,
+ String methodName,
+ ProxyMap headers,
+ byte[] message,
+ int timeoutMillis,
+ GrpcCallResponseHandler handler)
+ throws InterruptedException {
+ throw new UnsupportedOperationException("scheduleGrpcCall not implemented");
+ }
}
diff --git a/quarkus-proxy-wasm/deployment/pom.xml b/quarkus-proxy-wasm/deployment/pom.xml
index 5a82137..7e58a8c 100644
--- a/quarkus-proxy-wasm/deployment/pom.xml
+++ b/quarkus-proxy-wasm/deployment/pom.xml
@@ -21,6 +21,10 @@
io.quarkus
quarkus-arc-deployment
+
+ io.quarkus
+ quarkus-grpc-deployment
+
io.quarkus
quarkus-jaxrs-spi-deployment
diff --git a/quarkus-proxy-wasm/runtime/pom.xml b/quarkus-proxy-wasm/runtime/pom.xml
index 9ad43ea..7ba6e84 100644
--- a/quarkus-proxy-wasm/runtime/pom.xml
+++ b/quarkus-proxy-wasm/runtime/pom.xml
@@ -17,6 +17,10 @@
io.quarkus
quarkus-arc
+
+ io.quarkus
+ quarkus-grpc
+
io.quarkus
quarkus-rest
diff --git a/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java b/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java
index 73f6ebb..8a2c854 100644
--- a/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java
+++ b/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java
@@ -1,6 +1,15 @@
package io.quarkiverse.proxywasm.runtime;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.roastedroot.proxywasm.ArrayBytesProxyMap;
import io.roastedroot.proxywasm.ProxyMap;
+import io.roastedroot.proxywasm.plugin.GrpcCallResponseHandler;
import io.roastedroot.proxywasm.plugin.HttpCallResponse;
import io.roastedroot.proxywasm.plugin.HttpCallResponseHandler;
import io.roastedroot.proxywasm.plugin.HttpRequestAdaptor;
@@ -17,7 +26,12 @@
import jakarta.enterprise.inject.Alternative;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
@Alternative
@Priority(200)
@@ -107,4 +121,129 @@ public Runnable scheduleHttpCall(
// There doesn't seem to be a way to cancel the request.
};
}
+
+ @Override
+ public Runnable scheduleGrpcCall(
+ String host,
+ int port,
+ boolean plainText,
+ String serviceName,
+ String methodName,
+ ProxyMap headers,
+ byte[] message,
+ int timeoutMillis,
+ GrpcCallResponseHandler handler)
+ throws InterruptedException {
+
+ ManagedChannelBuilder> managedChannelBuilder =
+ ManagedChannelBuilder.forAddress(host, port);
+ if (plainText) {
+ managedChannelBuilder.usePlaintext();
+ } else {
+ managedChannelBuilder.useTransportSecurity();
+ }
+ ManagedChannel channel = managedChannelBuilder.build();
+
+ // Construct method descriptor (assuming unary request/response and protobuf)
+ MethodDescriptor methodDescriptor =
+ MethodDescriptor.newBuilder()
+ .setType(MethodDescriptor.MethodType.UNARY)
+ .setFullMethodName(
+ MethodDescriptor.generateFullMethodName(serviceName, methodName))
+ .setRequestMarshaller(new BytesMessageMarshaller())
+ .setResponseMarshaller(new BytesMessageMarshaller())
+ .build();
+
+ ClientCall call =
+ channel.newCall(
+ methodDescriptor,
+ CallOptions.DEFAULT.withDeadlineAfter(
+ timeoutMillis * 1000, TimeUnit.MILLISECONDS));
+
+ Metadata metadata = new Metadata();
+ for (Map.Entry entry : headers.entries()) {
+ Metadata.Key key =
+ Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER);
+ metadata.put(key, entry.getValue());
+ }
+
+ call.start(
+ new ClientCall.Listener<>() {
+
+ @Override
+ public void onReady() {
+ super.onReady();
+ }
+
+ @Override
+ public void onHeaders(Metadata metadata) {
+ if (metadata.keys().isEmpty()) {
+ return;
+ }
+ ArrayBytesProxyMap trailerMap = new ArrayBytesProxyMap();
+ for (var key : metadata.keys()) {
+ if (key.endsWith("-bin")) {
+ var value =
+ metadata.get(
+ Metadata.Key.of(
+ key, Metadata.BINARY_BYTE_MARSHALLER));
+ trailerMap.add(key, value);
+ } else {
+ var value =
+ metadata.get(
+ Metadata.Key.of(
+ key, Metadata.ASCII_STRING_MARSHALLER));
+ trailerMap.add(key, value);
+ }
+ }
+ handler.onHeaders(trailerMap);
+ }
+
+ @Override
+ public void onMessage(byte[] data) {
+ handler.onMessage(data);
+ }
+
+ @Override
+ public void onClose(Status status, Metadata metadata) {
+ if (!metadata.keys().isEmpty()) {
+ ArrayBytesProxyMap trailerMap = new ArrayBytesProxyMap();
+ for (var key : metadata.keys()) {
+ var value =
+ metadata.get(
+ Metadata.Key.of(
+ key, Metadata.BINARY_BYTE_MARSHALLER));
+ trailerMap.add(key, value);
+ }
+ handler.onTrailers(trailerMap);
+ }
+ handler.onClose(status.getCode().value());
+ channel.shutdownNow();
+ }
+ },
+ metadata);
+ call.sendMessage(message);
+ call.halfClose();
+ call.request(1); // Request a single response
+ return () -> {
+ call.cancel("shutdown", null);
+ channel.shutdownNow();
+ };
+ }
+
+ static class BytesMessageMarshaller implements io.grpc.MethodDescriptor.Marshaller {
+ @Override
+ public InputStream stream(byte[] value) {
+ return new ByteArrayInputStream(value);
+ }
+
+ @Override
+ public byte[] parse(InputStream stream) {
+ try {
+ return stream.readAllBytes();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
diff --git a/quarkus-x-kuadrant-example/.gitignore b/quarkus-x-kuadrant-example/.gitignore
new file mode 100644
index 0000000..91a800a
--- /dev/null
+++ b/quarkus-x-kuadrant-example/.gitignore
@@ -0,0 +1,45 @@
+#Maven
+target/
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+release.properties
+.flattened-pom.xml
+
+# Eclipse
+.project
+.classpath
+.settings/
+bin/
+
+# IntelliJ
+.idea
+*.ipr
+*.iml
+*.iws
+
+# NetBeans
+nb-configuration.xml
+
+# Visual Studio Code
+.vscode
+.factorypath
+
+# OSX
+.DS_Store
+
+# Vim
+*.swp
+*.swo
+
+# patch
+*.orig
+*.rej
+
+# Local environment
+.env
+
+# Plugin directory
+/.quarkus/cli/plugins/
+# TLS Certificates
+.certs/
diff --git a/quarkus-x-kuadrant-example/pom.xml b/quarkus-x-kuadrant-example/pom.xml
new file mode 100644
index 0000000..53f8ccf
--- /dev/null
+++ b/quarkus-x-kuadrant-example/pom.xml
@@ -0,0 +1,136 @@
+
+
+ 4.0.0
+
+
+ io.roastedroot
+ proxy-wasm-java-host-parent
+ 1.0-SNAPSHOT
+ ../pom.xml
+
+
+ quarkus-kuadrant-example
+ jar
+ quarkus-kuadrant-example
+
+
+
+
+ ${quarkus.platform.group-id}
+ ${quarkus.platform.artifact-id}
+ ${quarkus.platform.version}
+ pom
+ import
+
+
+
+
+
+
+ com.google.code.gson
+ gson
+ 2.12.1
+
+
+ io.quarkiverse.proxy-wasm
+ quarkus-proxy-wasm
+ ${project.version}
+
+
+ io.quarkus
+ quarkus-arc
+ true
+
+
+
+
+ io.quarkus
+ quarkus-junit5
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+ org.testcontainers
+ testcontainers
+ 1.20.6
+ test
+
+
+ uk.co.datumedge
+ hamcrest-json
+ 0.2
+ test
+
+
+
+
+
+
+
+ ${quarkus.platform.group-id}
+ quarkus-maven-plugin
+ ${quarkus.platform.version}
+ true
+
+
+
+ build
+ generate-code
+ generate-code-tests
+ native-image-agent
+
+
+
+
+
+ maven-failsafe-plugin
+ ${surefire-plugin.version}
+
+
+ ${project.build.directory}/${project.build.finalName}-runner
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
+ maven-surefire-plugin
+ ${surefire-plugin.version}
+
+
+ org.jboss.logmanager.LogManager
+ ${maven.home}
+
+
+
+
+
+
+
+
+ native
+
+
+ native
+
+
+
+ false
+ true
+
+
+
+
+
diff --git a/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java
new file mode 100644
index 0000000..8648e3f
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java
@@ -0,0 +1,51 @@
+package io.roastedroot.proxywasm.kuadrant.example;
+
+import com.dylibso.chicory.wasm.Parser;
+import com.dylibso.chicory.wasm.WasmModule;
+import io.roastedroot.proxywasm.LogHandler;
+import io.roastedroot.proxywasm.StartException;
+import io.roastedroot.proxywasm.plugin.Plugin;
+import io.roastedroot.proxywasm.plugin.PluginFactory;
+import io.roastedroot.proxywasm.plugin.SimpleMetricsHandler;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+public class App {
+
+ private static WasmModule module =
+ Parser.parse(App.class.getResourceAsStream("wasm_shim.wasm"));
+
+ static final String CONFIG;
+
+ static {
+ try (InputStream is = App.class.getResourceAsStream("config.json")) {
+ CONFIG = new String(is.readAllBytes(), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ static final boolean DEBUG = "true".equals(System.getenv("DEBUG"));
+
+ @ConfigProperty(name = "limitador.rls.url")
+ String limitadorUrl;
+
+ @Produces
+ public PluginFactory kuadrant() throws StartException {
+ return () ->
+ Plugin.builder()
+ .withName("kuadrant")
+ .withLogger(DEBUG ? LogHandler.SYSTEM : null)
+ .withPluginConfig(CONFIG)
+ .withUpstreams(Map.of("limitador", limitadorUrl))
+ .withMetricsHandler(new SimpleMetricsHandler())
+ .build(module);
+ }
+}
diff --git a/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java
new file mode 100644
index 0000000..6e03cd2
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java
@@ -0,0 +1,15 @@
+package io.roastedroot.proxywasm.kuadrant.example;
+
+import io.roastedroot.proxywasm.jaxrs.WasmPlugin;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+
+@WasmPlugin("kuadrant") // use the corsaWAF filter
+@Path("/")
+public class Resources {
+
+ @GET
+ public String root() {
+ return "Hello World";
+ }
+}
diff --git a/quarkus-x-kuadrant-example/src/main/resources/application.properties b/quarkus-x-kuadrant-example/src/main/resources/application.properties
new file mode 100644
index 0000000..1c824cb
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/main/resources/application.properties
@@ -0,0 +1,3 @@
+quarkus.log.level=INFO
+quarkus.log.category."org.hibernate".level=DEBUG
+limitador.rls.url=http://localhost:18080
\ No newline at end of file
diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md
new file mode 100644
index 0000000..3e05a1d
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md
@@ -0,0 +1,6 @@
+## Attribution
+
+The wasm_shim.wasm plugin comes from:
+
+https://github.com/Kuadrant/wasm-shim
+
diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json
new file mode 100644
index 0000000..ff024b9
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json
@@ -0,0 +1,50 @@
+{
+ "services": {
+ "limitadorA": {
+ "type": "ratelimit",
+ "endpoint": "limitador",
+ "failureMode": "deny"
+ },
+ "limitadorB": {
+ "type": "ratelimit",
+ "endpoint": "limitador",
+ "failureMode": "deny"
+ }
+ },
+ "actionSets": [
+ {
+ "name": "basic",
+ "routeRuleConditions": {
+ "hostnames": [
+ "*.example.com"
+ ]
+ },
+ "actions": [
+ {
+ "service": "limitadorA",
+ "scope": "basic",
+ "data": [
+ {
+ "expression": {
+ "key": "a",
+ "value": "1"
+ }
+ }
+ ]
+ },
+ {
+ "service": "limitadorB",
+ "scope": "basic",
+ "data": [
+ {
+ "expression": {
+ "key": "a",
+ "value": "1"
+ }
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm
new file mode 100755
index 0000000..ab8618a
Binary files /dev/null and b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm differ
diff --git a/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java
new file mode 100644
index 0000000..1d72645
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java
@@ -0,0 +1,52 @@
+package io.roastedroot.proxywasm.kuadrant.example;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.images.builder.Transferable;
+
+public class LimitadorTestContainer implements QuarkusTestResourceLifecycleManager {
+
+ GenericContainer container;
+
+ @Override
+ public Map start() {
+
+ String config;
+ try (InputStream is = LimitadorTestContainer.class.getResourceAsStream("limits.yaml")) {
+ config = new String(is.readAllBytes(), StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ container =
+ new GenericContainer("quay.io/kuadrant/limitador:v2.0.0")
+ .withCommand("limitador-server", "-vvv", "/opt/kuadrant/limits/limits.yaml")
+ .withCopyToContainer(
+ Transferable.of(config, 0777), "/opt/kuadrant/limits/limits.yaml")
+ .withExposedPorts(8080, 8081);
+ container.start();
+ Map result =
+ Map.of(
+ "limitador.http.url",
+ "http://"
+ + container.getHost()
+ + ":"
+ + container.getMappedPort(8080),
+ "limitador.rls.url",
+ "http://"
+ + container.getHost()
+ + ":"
+ + container.getMappedPort(8081));
+ return result;
+ }
+
+ @Override
+ public void stop() {
+ container.stop();
+ }
+}
diff --git a/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java
new file mode 100644
index 0000000..f395c6b
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java
@@ -0,0 +1,52 @@
+package io.roastedroot.proxywasm.kuadrant.example;
+
+import static io.restassured.RestAssured.given;
+import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs;
+
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
+@QuarkusTestResource(value = LimitadorTestContainer.class)
+public class ResourcesTest {
+
+ @ConfigProperty(name = "limitador.http.url")
+ String limitadorUrl;
+
+ @Test
+ public void testCountersRemaining() throws InterruptedException {
+
+ var remaining = 28;
+ while (remaining > 0) {
+ given().header("Host", "test.example.com").when().get("/").then().statusCode(200);
+ given().when()
+ .get(limitadorUrl + "/counters/basic")
+ .then()
+ .statusCode(200)
+ .body(
+ sameJSONAs(String.format("[{\"remaining\":%s}]", remaining))
+ .allowingExtraUnexpectedFields());
+ remaining -= 2;
+ }
+
+ given().header("Host", "test.example.com").when().get("/").then().statusCode(200);
+ given().when()
+ .get(limitadorUrl + "/counters/basic")
+ .then()
+ .statusCode(200)
+ .body(
+ sameJSONAs(String.format("[{\"remaining\":%s}]", remaining))
+ .allowingExtraUnexpectedFields());
+
+ given().header("Host", "test.example.com").when().get("/").then().statusCode(429);
+ given().when()
+ .get(limitadorUrl + "/counters/basic")
+ .then()
+ .statusCode(200)
+ .body(
+ sameJSONAs(String.format("[{\"remaining\":%s}]", remaining))
+ .allowingExtraUnexpectedFields());
+ }
+}
diff --git a/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml b/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml
new file mode 100644
index 0000000..03e2ae2
--- /dev/null
+++ b/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml
@@ -0,0 +1,7 @@
+---
+- namespace: basic
+ max_value: 30
+ seconds: 60
+ conditions:
+ - "descriptors[0]['a'] == '1'"
+ variables: []