diff --git a/pom.xml b/pom.xml index cbf44d5..db8b27a 100644 --- a/pom.xml +++ b/pom.xml @@ -225,6 +225,7 @@ quarkus-proxy-wasm quarkus-proxy-wasm-example quarkus-x-corazawaf-example + quarkus-x-kuadrant-example diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java index 5a76d3e..fb05194 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ABI.java @@ -13,6 +13,7 @@ import com.dylibso.chicory.runtime.WasmRuntimeException; import com.dylibso.chicory.wasm.InvalidException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -487,7 +488,11 @@ void proxyOnTick(int arg0) { */ @WasmExport int proxyGetBufferBytes( - int bufferType, int start, int length, int returnBufferData, int returnBufferSize) { + int bufferType, + int start, + int chunkLength, + int returnBufferData, + int returnBufferSize) { try { // Get the buffer based on the buffer type @@ -496,29 +501,34 @@ int proxyGetBufferBytes( return WasmResult.NOT_FOUND.getValue(); } - if (start > start + length) { + if (start < 0) { return WasmResult.BAD_ARGUMENT.getValue(); } + int maxChunkLength = b.length - start; + if (chunkLength < 0 || chunkLength > maxChunkLength) { + chunkLength = maxChunkLength; + } + ByteBuffer buffer = ByteBuffer.wrap(b); - if (start + length > buffer.capacity()) { - length = buffer.capacity() - start; + if (start + chunkLength > buffer.capacity()) { + chunkLength = buffer.capacity() - start; } try { buffer.position(start); - buffer.limit(start + length); + buffer.limit(start + chunkLength); } catch (IllegalArgumentException e) { return WasmResult.BAD_ARGUMENT.getValue(); } // Allocate memory in the WebAssembly instance - int addr = malloc(length); + int addr = malloc(chunkLength); putMemory(addr, buffer); // Write the address to the return pointer putUint32(returnBufferData, addr); // Write the length to the return size pointer - putUint32(returnBufferSize, length); + putUint32(returnBufferSize, chunkLength); return WasmResult.OK.getValue(); } catch (WasmException e) { @@ -713,16 +723,24 @@ int proxyGetHeaderMapPairs(int mapType, int returnDataPtr, int returnDataSize) { return WasmResult.NOT_FOUND.getValue(); } - // to clone the headers so that they don't change on while we process them in the loop - var cloneMap = new ArrayProxyMap(header); + var cloneMap = new ArrayList>(); int totalBytesLen = U32_LEN; // Start with space for the count - for (Map.Entry entry : cloneMap.entries()) { - String key = entry.getKey(); - String value = entry.getValue(); - totalBytesLen += U32_LEN + U32_LEN; // keyLen + valueLen - totalBytesLen += key.length() + 1 + value.length() + 1; // key + \0 + value + \0 - } + totalBytesLen += + header.streamBytes() + .mapToInt( + entry -> { + var key = entry.getKey(); + var value = entry.getValue(); + cloneMap.add(Map.entry(key, value)); + return U32_LEN + + U32_LEN // keyLen + valueLen + + key.length + + 1 + + value.length + + 1; // key + \0 + value + \0 + }) + .sum(); // Allocate memory in the WebAssembly instance int addr = malloc(totalBytesLen); @@ -735,29 +753,29 @@ int proxyGetHeaderMapPairs(int mapType, int returnDataPtr, int returnDataSize) { int dataPtr = lenPtr + ((U32_LEN + U32_LEN) * cloneMap.size()); // Write each key-value pair to memory - for (Map.Entry entry : cloneMap.entries()) { - String key = entry.getKey(); - String value = entry.getValue(); + for (Map.Entry entry : cloneMap) { + var key = entry.getKey(); + var value = entry.getValue(); // Write key length - putUint32(lenPtr, key.length()); + putUint32(lenPtr, key.length); lenPtr += U32_LEN; // Write value length - putUint32(lenPtr, value.length()); + putUint32(lenPtr, value.length); lenPtr += U32_LEN; // Write key bytes - putMemory(dataPtr, key.getBytes()); - dataPtr += key.length(); + putMemory(dataPtr, key); + dataPtr += key.length; // Write null terminator for key putByte(dataPtr, (byte) 0); dataPtr++; // Write value bytes - putMemory(dataPtr, value.getBytes()); - dataPtr += value.length(); + putMemory(dataPtr, value); + dataPtr += value.length; // Write null terminator for value putByte(dataPtr, (byte) 0); @@ -1447,7 +1465,7 @@ int proxyGrpcCall( message, timeout); putUint32(returnCalloutID, callId); - return callId; + return WasmResult.OK.getValue(); } catch (WasmException e) { return e.result().getValue(); } @@ -1478,7 +1496,7 @@ int proxyGrpcStream( int streamId = handler.grpcStream(upstreamName, serviceName, methodName, initialMetadata); putUint32(returnStreamId, streamId); - return streamId; + return WasmResult.OK.getValue(); } catch (WasmException e) { return e.result().getValue(); } @@ -1539,21 +1557,21 @@ void proxyOnGrpcReceive(int contextId, int callId, int messageSize) { /** * implements https://github.com/proxy-wasm/spec/tree/main/abi-versions/vNEXT#proxy_on_grpc_receive_trailing_metadata */ - void proxyOnGrpcReceiveTrailingMetadata(int arg0, int arg1, int arg2) { + void proxyOnGrpcReceiveTrailingMetadata(int contextId, int callId, int numElements) { if (proxyOnGrpcReceiveTrailingMetadataFn == null) { return; } - proxyOnGrpcReceiveTrailingMetadataFn.apply(arg0, arg1, arg2); + proxyOnGrpcReceiveTrailingMetadataFn.apply(contextId, callId, numElements); } /** * implements https://github.com/proxy-wasm/spec/tree/main/abi-versions/vNEXT#proxy_on_grpc_close */ - void proxyOnGrpcClose(int arg0, int arg1, int arg2) { + void proxyOnGrpcClose(int contextId, int callId, int statusCode) { if (proxyOnGrpcCloseFn == null) { return; } - proxyOnGrpcCloseFn.apply(arg0, arg1, arg2); + proxyOnGrpcCloseFn.apply(contextId, callId, statusCode); } // ////////////////////////////////////////////////////////////////////// diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java new file mode 100644 index 0000000..af6c8a7 --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ArrayBytesProxyMap.java @@ -0,0 +1,131 @@ +package io.roastedroot.proxywasm; + +import static io.roastedroot.proxywasm.Helpers.bytes; +import static io.roastedroot.proxywasm.Helpers.len; +import static io.roastedroot.proxywasm.Helpers.string; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ArrayBytesProxyMap implements ProxyMap { + + final ArrayList> entries; + + public ArrayBytesProxyMap() { + this.entries = new ArrayList<>(); + } + + public ArrayBytesProxyMap(int mapSize) { + this.entries = new ArrayList<>(mapSize); + } + + @Override + public int size() { + return entries.size(); + } + + @Override + public void add(String key, String value) { + entries.add(Map.entry(key, bytes(value))); + } + + public void add(String key, byte[] value) { + entries.add(Map.entry(key, value)); + } + + @Override + public void put(String key, String value) { + this.remove(key); + entries.add(Map.entry(key, bytes(value))); + } + + public void put(String key, byte[] value) { + this.remove(key); + entries.add(Map.entry(key, value)); + } + + @Override + public Iterable> entries() { + return entries.stream() + .map(x -> Map.entry(x.getKey(), string(x.getValue()))) + .collect(Collectors.toList()); + } + + @Override + public Stream> streamBytes() { + return entries.stream().map(x -> Map.entry(bytes(x.getKey()), x.getValue())); + } + + @Override + public String get(String key) { + return entries.stream() + .filter(x -> x.getKey().equals(key)) + .map(Map.Entry::getValue) + .map(Helpers::string) + .findFirst() + .orElse(null); + } + + @Override + public void remove(String key) { + entries.removeIf(x -> x.getKey().equals(key)); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + ArrayBytesProxyMap that = (ArrayBytesProxyMap) o; + return Objects.equals(entries, that.entries); + } + + @Override + public int hashCode() { + return Objects.hashCode(entries); + } + + @Override + public String toString() { + return entries.toString(); + } + + /** + * Encode the map into a byte array. + */ + @Override + public byte[] encode() { + try { + var baos = new ByteArrayOutputStream(); + var o = new DataOutputStream(baos); + // Write header size (number of entries) + int mapSize = this.size(); + o.writeInt(mapSize); + + // write all the key / value sizes. + for (var entry : entries) { + o.writeInt(len(entry.getKey())); + o.writeInt(len(entry.getValue())); + } + + // write all the key / values + for (var entry : entries) { + o.write(bytes(entry.getKey())); + o.write(0); + o.write(entry.getValue()); + o.write(0); + } + o.close(); + return baos.toByteArray(); + } catch (IOException e) { + // this should never happen since we are not really doing IO + throw new RuntimeException(e); + } + } +} diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java index de897a4..b2c0165 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyMap.java @@ -8,6 +8,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; public interface ProxyMap { @@ -37,6 +39,11 @@ static ProxyMap copyOf(Map headers) { Iterable> entries(); + default Stream> streamBytes() { + return StreamSupport.stream(entries().spliterator(), false) + .map(x -> Map.entry(bytes(x.getKey()), bytes(x.getValue()))); + } + String get(String key); void remove(String key); diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java index a039d21..f502aff 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/ProxyWasm.java @@ -36,6 +36,9 @@ public final class ProxyWasm implements Closeable { private ProxyMap httpCallResponseTrailers; private byte[] httpCallResponseBody; private Handler pluginHandler; + private ArrayBytesProxyMap grpcReceiveInitialMetadata; + private byte[] grpcReceive; + private ArrayBytesProxyMap grpcReceiveTrailingMetadata; private ProxyWasm(Builder other) throws StartException { this.pluginHandler = Objects.requireNonNullElse(other.pluginHandler, new Handler() {}); @@ -126,6 +129,21 @@ public ProxyMap getHttpCallResponseTrailers() { public byte[] getHttpCallResponseBody() { return httpCallResponseBody; } + + @Override + public ProxyMap getGrpcReceiveInitialMetaData() { + return grpcReceiveInitialMetadata; + } + + @Override + public byte[] getGrpcReceiveBuffer() { + return grpcReceive; + } + + @Override + public ProxyMap getGrpcReceiveTrailerMetaData() { + return grpcReceiveTrailingMetadata; + } }; } @@ -195,7 +213,6 @@ public void sendHttpCallResponse( public void sendHttpCallResponse( int calloutID, ProxyMap headers, ProxyMap trailers, byte[] body) { - this.httpCallResponseHeaders = headers; this.httpCallResponseTrailers = trailers; this.httpCallResponseBody = body; @@ -208,6 +225,28 @@ public void sendHttpCallResponse( this.httpCallResponseBody = null; } + public void sendGrpcReceiveInitialMetadata(int calloutID, ArrayBytesProxyMap headers) { + this.grpcReceiveInitialMetadata = headers; + this.abi.proxyOnGrpcReceiveInitialMetadata(pluginContext.id(), calloutID, headers.size()); + this.grpcReceiveInitialMetadata = null; + } + + public void sendGrpcReceive(int calloutID, byte[] body) { + this.grpcReceive = body; + this.abi.proxyOnGrpcReceive(pluginContext.id(), calloutID, len(body)); + this.grpcReceive = null; + } + + public void sendGrpcReceiveTrailingMetadata(int calloutID, ArrayBytesProxyMap headers) { + this.grpcReceiveTrailingMetadata = headers; + this.abi.proxyOnGrpcReceiveTrailingMetadata(pluginContext.id(), calloutID, headers.size()); + this.grpcReceiveTrailingMetadata = null; + } + + public void sendGrpcClose(int calloutID, int statusCode) { + this.abi.proxyOnGrpcClose(pluginContext.id(), calloutID, statusCode); + } + public void sendOnQueueReady(int queueId) { this.abi.proxyOnQueueReady(pluginContext.id(), queueId); } diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java new file mode 100644 index 0000000..e9fd1f3 --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponse.java @@ -0,0 +1,16 @@ +package io.roastedroot.proxywasm.plugin; + +import io.roastedroot.proxywasm.ProxyMap; + +public class GrpcCallResponse { + + public final int statusCode; + public final ProxyMap headers; + public final byte[] body; + + public GrpcCallResponse(int statusCode, ProxyMap headers, byte[] body) { + this.statusCode = statusCode; + this.headers = headers; + this.body = body; + } +} diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java new file mode 100644 index 0000000..fdb2949 --- /dev/null +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/GrpcCallResponseHandler.java @@ -0,0 +1,14 @@ +package io.roastedroot.proxywasm.plugin; + +import io.roastedroot.proxywasm.ArrayBytesProxyMap; + +public interface GrpcCallResponseHandler { + + void onHeaders(ArrayBytesProxyMap trailerMap); + + void onMessage(byte[] data); + + void onTrailers(ArrayBytesProxyMap trailers); + + void onClose(int status); +} diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java index 9d0a82a..a2dd6b6 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/Plugin.java @@ -13,6 +13,7 @@ import com.dylibso.chicory.runtime.Machine; import com.dylibso.chicory.wasi.WasiOptions; import com.dylibso.chicory.wasm.WasmModule; +import io.roastedroot.proxywasm.ArrayBytesProxyMap; import io.roastedroot.proxywasm.ArrayProxyMap; import io.roastedroot.proxywasm.ChainedHandler; import io.roastedroot.proxywasm.ForeignFunction; @@ -55,8 +56,9 @@ private Plugin(Builder builder, ProxyWasm proxyWasm) throws StartException { Objects.requireNonNull(proxyWasm); this.name = Objects.requireNonNullElse(builder.name, "default"); this.shared = builder.shared; - this.foreignFunctions = builder.foreignFunctions; - this.upstreams = builder.upstreams; + this.foreignFunctions = + Objects.requireNonNullElseGet(builder.foreignFunctions, HashMap::new); + this.upstreams = Objects.requireNonNullElseGet(builder.upstreams, HashMap::new); this.strictUpstreams = builder.strictUpstreams; this.minTickPeriodMilliseconds = builder.minTickPeriodMilliseconds; this.vmConfig = builder.vmConfig; @@ -119,10 +121,14 @@ public void close() { cancelTick.run(); cancelTick = null; } - for (var cancelHttpCall : httpCalls.values()) { - cancelHttpCall.run(); + for (var cancel : httpCalls.values()) { + cancel.run(); } httpCalls.clear(); + for (var cancel : grpcCalls.values()) { + cancel.run(); + } + grpcCalls.clear(); } finally { unlock(); @@ -253,12 +259,13 @@ public Plugin build(ProxyWasm proxyWasm) throws StartException { byte[] pluginConfig; private final AtomicInteger lastCallId = new AtomicInteger(0); private final HashMap httpCalls = new HashMap<>(); - HashMap upstreams = new HashMap<>(); + private final HashMap grpcCalls = new HashMap<>(); + private final HashMap upstreams; boolean strictUpstreams; int minTickPeriodMilliseconds; private int tickPeriodMilliseconds; private Runnable cancelTick; - HashMap foreignFunctions; + private final HashMap foreignFunctions; private byte[] funcCallData = new byte[0]; private final HashMap, byte[]> properties = new HashMap<>(); @@ -470,6 +477,8 @@ public int httpCall( return id; } catch (InterruptedException e) { throw new WasmException(WasmResult.INTERNAL_FAILURE); + } catch (UnsupportedOperationException e) { + throw new WasmException(WasmResult.UNIMPLEMENTED); } } @@ -484,6 +493,129 @@ public int dispatchHttpCall( return httpCall(upstreamName, headers, body, trailers, timeoutMilliseconds); } + // ////////////////////////////////////////////////////////////////////// + // GRPC calls + // ////////////////////////////////////////////////////////////////////// + + @Override + public int grpcCall( + String upstreamName, + String serviceName, + String methodName, + ProxyMap headers, + byte[] body, + int timeoutMilliseconds) + throws WasmException { + + var connectHostPort = upstreams.get(upstreamName); + if (connectHostPort == null && strictUpstreams) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + if (connectHostPort == null) { + connectHostPort = upstreamName; + } + + URI connectUri = null; + try { + connectUri = URI.create(connectHostPort); + } catch (IllegalArgumentException e) { + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + + if (!("http".equals(connectUri.getScheme()) + || "https".equals(connectUri.getScheme()))) { + logger.log( + LogLevel.ERROR, + "grpc call upstream not mapped to URL with a http/https scheme: " + + upstreamName); + throw new WasmException(WasmResult.BAD_ARGUMENT); + } + + var connectHost = connectUri.getHost(); + var connectPort = connectUri.getPort(); + if (connectPort == -1) { + connectPort = "https".equals(connectUri.getScheme()) ? 443 : 80; + } + + try { + + var id = lastCallId.incrementAndGet(); + var callHandler = + new GrpcCallResponseHandler() { + + @Override + public void onHeaders(ArrayBytesProxyMap headers) { + lock(); + try { + if (grpcCalls.get(id) == null) { + return; // the call could have already been cancelled + } + wasm.sendGrpcReceiveInitialMetadata(id, headers); + } finally { + unlock(); + } + } + + @Override + public void onMessage(byte[] data) { + lock(); + try { + if (grpcCalls.get(id) == null) { + return; // the call could have already been cancelled + } + wasm.sendGrpcReceive(id, data); + } finally { + unlock(); + } + } + + @Override + public void onTrailers(ArrayBytesProxyMap trailers) { + lock(); + try { + if (grpcCalls.get(id) == null) { + return; // the call could have already been cancelled + } + wasm.sendGrpcReceiveTrailingMetadata(id, trailers); + } finally { + unlock(); + } + } + + @Override + public void onClose(int status) { + lock(); + try { + if (grpcCalls.get(id) == null) { + return; // the call could have already been cancelled + } + wasm.sendGrpcClose(id, status); + } finally { + unlock(); + } + } + }; + + var future = + serverAdaptor.scheduleGrpcCall( + connectHost, + connectPort, + "http".equals(connectUri.getScheme()), + serviceName, + methodName, + headers, + body, + timeoutMilliseconds, + callHandler); + grpcCalls.put(id, future); + return id; + } catch (InterruptedException e) { + throw new WasmException(WasmResult.INTERNAL_FAILURE); + } catch (UnsupportedOperationException e) { + throw new WasmException(WasmResult.UNIMPLEMENTED); + } + } + // ////////////////////////////////////////////////////////////////////// // Metrics // ////////////////////////////////////////////////////////////////////// @@ -519,9 +651,6 @@ public WasmResult removeMetric(int metricId) { @Override public ForeignFunction getForeignFunction(String name) { - if (foreignFunctions == null) { - return null; - } return foreignFunctions.get(name); } diff --git a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java index 66af5e1..54b4557 100644 --- a/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java +++ b/proxy-wasm-java-host/src/main/java/io/roastedroot/proxywasm/plugin/ServerAdaptor.java @@ -12,7 +12,7 @@ public interface ServerAdaptor { HttpRequestAdaptor httpRequestAdaptor(Object context); - Runnable scheduleHttpCall( + default Runnable scheduleHttpCall( String method, String host, int port, @@ -22,5 +22,21 @@ Runnable scheduleHttpCall( ProxyMap trailers, int timeout, HttpCallResponseHandler handler) - throws InterruptedException; + throws InterruptedException { + throw new UnsupportedOperationException("scheduleHttpCall not implemented"); + } + + default Runnable scheduleGrpcCall( + String host, + int port, + boolean plainText, + String serviceName, + String methodName, + ProxyMap headers, + byte[] message, + int timeoutMillis, + GrpcCallResponseHandler handler) + throws InterruptedException { + throw new UnsupportedOperationException("scheduleGrpcCall not implemented"); + } } diff --git a/quarkus-proxy-wasm/deployment/pom.xml b/quarkus-proxy-wasm/deployment/pom.xml index 5a82137..7e58a8c 100644 --- a/quarkus-proxy-wasm/deployment/pom.xml +++ b/quarkus-proxy-wasm/deployment/pom.xml @@ -21,6 +21,10 @@ io.quarkus quarkus-arc-deployment + + io.quarkus + quarkus-grpc-deployment + io.quarkus quarkus-jaxrs-spi-deployment diff --git a/quarkus-proxy-wasm/runtime/pom.xml b/quarkus-proxy-wasm/runtime/pom.xml index 9ad43ea..7ba6e84 100644 --- a/quarkus-proxy-wasm/runtime/pom.xml +++ b/quarkus-proxy-wasm/runtime/pom.xml @@ -17,6 +17,10 @@ io.quarkus quarkus-arc + + io.quarkus + quarkus-grpc + io.quarkus quarkus-rest diff --git a/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java b/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java index 73f6ebb..8a2c854 100644 --- a/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java +++ b/quarkus-proxy-wasm/runtime/src/main/java/io/quarkiverse/proxywasm/runtime/VertxServerAdaptor.java @@ -1,6 +1,15 @@ package io.quarkiverse.proxywasm.runtime; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.roastedroot.proxywasm.ArrayBytesProxyMap; import io.roastedroot.proxywasm.ProxyMap; +import io.roastedroot.proxywasm.plugin.GrpcCallResponseHandler; import io.roastedroot.proxywasm.plugin.HttpCallResponse; import io.roastedroot.proxywasm.plugin.HttpCallResponseHandler; import io.roastedroot.proxywasm.plugin.HttpRequestAdaptor; @@ -17,7 +26,12 @@ import jakarta.enterprise.inject.Alternative; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; +import java.util.Map; +import java.util.concurrent.TimeUnit; @Alternative @Priority(200) @@ -107,4 +121,129 @@ public Runnable scheduleHttpCall( // There doesn't seem to be a way to cancel the request. }; } + + @Override + public Runnable scheduleGrpcCall( + String host, + int port, + boolean plainText, + String serviceName, + String methodName, + ProxyMap headers, + byte[] message, + int timeoutMillis, + GrpcCallResponseHandler handler) + throws InterruptedException { + + ManagedChannelBuilder managedChannelBuilder = + ManagedChannelBuilder.forAddress(host, port); + if (plainText) { + managedChannelBuilder.usePlaintext(); + } else { + managedChannelBuilder.useTransportSecurity(); + } + ManagedChannel channel = managedChannelBuilder.build(); + + // Construct method descriptor (assuming unary request/response and protobuf) + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName( + MethodDescriptor.generateFullMethodName(serviceName, methodName)) + .setRequestMarshaller(new BytesMessageMarshaller()) + .setResponseMarshaller(new BytesMessageMarshaller()) + .build(); + + ClientCall call = + channel.newCall( + methodDescriptor, + CallOptions.DEFAULT.withDeadlineAfter( + timeoutMillis * 1000, TimeUnit.MILLISECONDS)); + + Metadata metadata = new Metadata(); + for (Map.Entry entry : headers.entries()) { + Metadata.Key key = + Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER); + metadata.put(key, entry.getValue()); + } + + call.start( + new ClientCall.Listener<>() { + + @Override + public void onReady() { + super.onReady(); + } + + @Override + public void onHeaders(Metadata metadata) { + if (metadata.keys().isEmpty()) { + return; + } + ArrayBytesProxyMap trailerMap = new ArrayBytesProxyMap(); + for (var key : metadata.keys()) { + if (key.endsWith("-bin")) { + var value = + metadata.get( + Metadata.Key.of( + key, Metadata.BINARY_BYTE_MARSHALLER)); + trailerMap.add(key, value); + } else { + var value = + metadata.get( + Metadata.Key.of( + key, Metadata.ASCII_STRING_MARSHALLER)); + trailerMap.add(key, value); + } + } + handler.onHeaders(trailerMap); + } + + @Override + public void onMessage(byte[] data) { + handler.onMessage(data); + } + + @Override + public void onClose(Status status, Metadata metadata) { + if (!metadata.keys().isEmpty()) { + ArrayBytesProxyMap trailerMap = new ArrayBytesProxyMap(); + for (var key : metadata.keys()) { + var value = + metadata.get( + Metadata.Key.of( + key, Metadata.BINARY_BYTE_MARSHALLER)); + trailerMap.add(key, value); + } + handler.onTrailers(trailerMap); + } + handler.onClose(status.getCode().value()); + channel.shutdownNow(); + } + }, + metadata); + call.sendMessage(message); + call.halfClose(); + call.request(1); // Request a single response + return () -> { + call.cancel("shutdown", null); + channel.shutdownNow(); + }; + } + + static class BytesMessageMarshaller implements io.grpc.MethodDescriptor.Marshaller { + @Override + public InputStream stream(byte[] value) { + return new ByteArrayInputStream(value); + } + + @Override + public byte[] parse(InputStream stream) { + try { + return stream.readAllBytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } } diff --git a/quarkus-x-kuadrant-example/.gitignore b/quarkus-x-kuadrant-example/.gitignore new file mode 100644 index 0000000..91a800a --- /dev/null +++ b/quarkus-x-kuadrant-example/.gitignore @@ -0,0 +1,45 @@ +#Maven +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +release.properties +.flattened-pom.xml + +# Eclipse +.project +.classpath +.settings/ +bin/ + +# IntelliJ +.idea +*.ipr +*.iml +*.iws + +# NetBeans +nb-configuration.xml + +# Visual Studio Code +.vscode +.factorypath + +# OSX +.DS_Store + +# Vim +*.swp +*.swo + +# patch +*.orig +*.rej + +# Local environment +.env + +# Plugin directory +/.quarkus/cli/plugins/ +# TLS Certificates +.certs/ diff --git a/quarkus-x-kuadrant-example/pom.xml b/quarkus-x-kuadrant-example/pom.xml new file mode 100644 index 0000000..53f8ccf --- /dev/null +++ b/quarkus-x-kuadrant-example/pom.xml @@ -0,0 +1,136 @@ + + + 4.0.0 + + + io.roastedroot + proxy-wasm-java-host-parent + 1.0-SNAPSHOT + ../pom.xml + + + quarkus-kuadrant-example + jar + quarkus-kuadrant-example + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + + + + + com.google.code.gson + gson + 2.12.1 + + + io.quarkiverse.proxy-wasm + quarkus-proxy-wasm + ${project.version} + + + io.quarkus + quarkus-arc + true + + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.testcontainers + testcontainers + 1.20.6 + test + + + uk.co.datumedge + hamcrest-json + 0.2 + test + + + + + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus.platform.version} + true + + + + build + generate-code + generate-code-tests + native-image-agent + + + + + + maven-failsafe-plugin + ${surefire-plugin.version} + + + ${project.build.directory}/${project.build.finalName}-runner + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + integration-test + verify + + + + + + maven-surefire-plugin + ${surefire-plugin.version} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + + + + native + + + native + + + + false + true + + + + + diff --git a/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java new file mode 100644 index 0000000..8648e3f --- /dev/null +++ b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/App.java @@ -0,0 +1,51 @@ +package io.roastedroot.proxywasm.kuadrant.example; + +import com.dylibso.chicory.wasm.Parser; +import com.dylibso.chicory.wasm.WasmModule; +import io.roastedroot.proxywasm.LogHandler; +import io.roastedroot.proxywasm.StartException; +import io.roastedroot.proxywasm.plugin.Plugin; +import io.roastedroot.proxywasm.plugin.PluginFactory; +import io.roastedroot.proxywasm.plugin.SimpleMetricsHandler; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +@ApplicationScoped +public class App { + + private static WasmModule module = + Parser.parse(App.class.getResourceAsStream("wasm_shim.wasm")); + + static final String CONFIG; + + static { + try (InputStream is = App.class.getResourceAsStream("config.json")) { + CONFIG = new String(is.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + static final boolean DEBUG = "true".equals(System.getenv("DEBUG")); + + @ConfigProperty(name = "limitador.rls.url") + String limitadorUrl; + + @Produces + public PluginFactory kuadrant() throws StartException { + return () -> + Plugin.builder() + .withName("kuadrant") + .withLogger(DEBUG ? LogHandler.SYSTEM : null) + .withPluginConfig(CONFIG) + .withUpstreams(Map.of("limitador", limitadorUrl)) + .withMetricsHandler(new SimpleMetricsHandler()) + .build(module); + } +} diff --git a/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java new file mode 100644 index 0000000..6e03cd2 --- /dev/null +++ b/quarkus-x-kuadrant-example/src/main/java/io/roastedroot/proxywasm/kuadrant/example/Resources.java @@ -0,0 +1,15 @@ +package io.roastedroot.proxywasm.kuadrant.example; + +import io.roastedroot.proxywasm.jaxrs.WasmPlugin; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; + +@WasmPlugin("kuadrant") // use the corsaWAF filter +@Path("/") +public class Resources { + + @GET + public String root() { + return "Hello World"; + } +} diff --git a/quarkus-x-kuadrant-example/src/main/resources/application.properties b/quarkus-x-kuadrant-example/src/main/resources/application.properties new file mode 100644 index 0000000..1c824cb --- /dev/null +++ b/quarkus-x-kuadrant-example/src/main/resources/application.properties @@ -0,0 +1,3 @@ +quarkus.log.level=INFO +quarkus.log.category."org.hibernate".level=DEBUG +limitador.rls.url=http://localhost:18080 \ No newline at end of file diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md new file mode 100644 index 0000000..3e05a1d --- /dev/null +++ b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/README.md @@ -0,0 +1,6 @@ +## Attribution + +The wasm_shim.wasm plugin comes from: + +https://github.com/Kuadrant/wasm-shim + diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json new file mode 100644 index 0000000..ff024b9 --- /dev/null +++ b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/config.json @@ -0,0 +1,50 @@ +{ + "services": { + "limitadorA": { + "type": "ratelimit", + "endpoint": "limitador", + "failureMode": "deny" + }, + "limitadorB": { + "type": "ratelimit", + "endpoint": "limitador", + "failureMode": "deny" + } + }, + "actionSets": [ + { + "name": "basic", + "routeRuleConditions": { + "hostnames": [ + "*.example.com" + ] + }, + "actions": [ + { + "service": "limitadorA", + "scope": "basic", + "data": [ + { + "expression": { + "key": "a", + "value": "1" + } + } + ] + }, + { + "service": "limitadorB", + "scope": "basic", + "data": [ + { + "expression": { + "key": "a", + "value": "1" + } + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm new file mode 100755 index 0000000..ab8618a Binary files /dev/null and b/quarkus-x-kuadrant-example/src/main/resources/io/roastedroot/proxywasm/kuadrant/example/wasm_shim.wasm differ diff --git a/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java new file mode 100644 index 0000000..1d72645 --- /dev/null +++ b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/LimitadorTestContainer.java @@ -0,0 +1,52 @@ +package io.roastedroot.proxywasm.kuadrant.example; + +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.images.builder.Transferable; + +public class LimitadorTestContainer implements QuarkusTestResourceLifecycleManager { + + GenericContainer container; + + @Override + public Map start() { + + String config; + try (InputStream is = LimitadorTestContainer.class.getResourceAsStream("limits.yaml")) { + config = new String(is.readAllBytes(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + container = + new GenericContainer("quay.io/kuadrant/limitador:v2.0.0") + .withCommand("limitador-server", "-vvv", "/opt/kuadrant/limits/limits.yaml") + .withCopyToContainer( + Transferable.of(config, 0777), "/opt/kuadrant/limits/limits.yaml") + .withExposedPorts(8080, 8081); + container.start(); + Map result = + Map.of( + "limitador.http.url", + "http://" + + container.getHost() + + ":" + + container.getMappedPort(8080), + "limitador.rls.url", + "http://" + + container.getHost() + + ":" + + container.getMappedPort(8081)); + return result; + } + + @Override + public void stop() { + container.stop(); + } +} diff --git a/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java new file mode 100644 index 0000000..f395c6b --- /dev/null +++ b/quarkus-x-kuadrant-example/src/test/java/io/roastedroot/proxywasm/kuadrant/example/ResourcesTest.java @@ -0,0 +1,52 @@ +package io.roastedroot.proxywasm.kuadrant.example; + +import static io.restassured.RestAssured.given; +import static uk.co.datumedge.hamcrest.json.SameJSONAs.sameJSONAs; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Test; + +@QuarkusTest +@QuarkusTestResource(value = LimitadorTestContainer.class) +public class ResourcesTest { + + @ConfigProperty(name = "limitador.http.url") + String limitadorUrl; + + @Test + public void testCountersRemaining() throws InterruptedException { + + var remaining = 28; + while (remaining > 0) { + given().header("Host", "test.example.com").when().get("/").then().statusCode(200); + given().when() + .get(limitadorUrl + "/counters/basic") + .then() + .statusCode(200) + .body( + sameJSONAs(String.format("[{\"remaining\":%s}]", remaining)) + .allowingExtraUnexpectedFields()); + remaining -= 2; + } + + given().header("Host", "test.example.com").when().get("/").then().statusCode(200); + given().when() + .get(limitadorUrl + "/counters/basic") + .then() + .statusCode(200) + .body( + sameJSONAs(String.format("[{\"remaining\":%s}]", remaining)) + .allowingExtraUnexpectedFields()); + + given().header("Host", "test.example.com").when().get("/").then().statusCode(429); + given().when() + .get(limitadorUrl + "/counters/basic") + .then() + .statusCode(200) + .body( + sameJSONAs(String.format("[{\"remaining\":%s}]", remaining)) + .allowingExtraUnexpectedFields()); + } +} diff --git a/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml b/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml new file mode 100644 index 0000000..03e2ae2 --- /dev/null +++ b/quarkus-x-kuadrant-example/src/test/resources/io/roastedroot/proxywasm/kuadrant/example/limits.yaml @@ -0,0 +1,7 @@ +--- +- namespace: basic + max_value: 30 + seconds: 60 + conditions: + - "descriptors[0]['a'] == '1'" + variables: []