Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/main/java/io/roastedroot/proxywasm/impl/Imports.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.roastedroot.proxywasm.v1.LogLevel;
import io.roastedroot.proxywasm.v1.MapType;
import io.roastedroot.proxywasm.v1.MetricType;
import io.roastedroot.proxywasm.v1.QueueName;
import io.roastedroot.proxywasm.v1.StreamType;
import io.roastedroot.proxywasm.v1.WasmException;
import io.roastedroot.proxywasm.v1.WasmResult;
Expand Down Expand Up @@ -1002,4 +1003,77 @@ int proxySetSharedData(int keyDataPtr, int keySize, int valueDataPtr, int valueS
return e.result().getValue();
}
}

@WasmExport
int proxyRegisterSharedQueue(int queueNameDataPtr, int queueNameSize, int returnQueueId) {
try {
// Get queue name from memory
String queueName = string(readMemory(queueNameDataPtr, queueNameSize));

var vmId = handler.getProperty("vm_id");

// Register shared queue using handler
int queueId = handler.registerSharedQueue(new QueueName(vmId, queueName));
putUint32(returnQueueId, queueId);
return WasmResult.OK.getValue();

} catch (WasmException e) {
return e.result().getValue();
}
}

@WasmExport
int proxyResolveSharedQueue(
int vmIdDataPtr,
int vmIdSize,
int queueNameDataPtr,
int queueNameSize,
int returnQueueId) {
try {
// Get vm id from memory
String vmId = string(readMemory(vmIdDataPtr, vmIdSize));
// Get queue name from memory
String queueName = string(readMemory(queueNameDataPtr, queueNameSize));

// Resolve shared queue using handler
int queueId = handler.resolveSharedQueue(new QueueName(vmId, queueName));
putUint32(returnQueueId, queueId);
return WasmResult.OK.getValue();

} catch (WasmException e) {
return e.result().getValue();
}
}

@WasmExport
int proxyEnqueueSharedQueue(int queueId, int valueDataPtr, int valueSize) {
try {
// Get value from memory
byte[] value = readMemory(valueDataPtr, valueSize);

// Enqueue shared queue using handler
WasmResult result = handler.enqueueSharedQueue(queueId, value);
return result.getValue();

} catch (WasmException e) {
return e.result().getValue();
}
}

@WasmExport
int proxyDequeueSharedQueue(int queueId, int returnValueData, int returnValueSize) {
try {
// Dequeue shared queue using handler
byte[] value = handler.dequeueSharedQueue(queueId);
if (value == null) {
return WasmResult.EMPTY.getValue();
}

copyIntoInstance(value, returnValueData, returnValueSize);
return WasmResult.OK.getValue();

} catch (WasmException e) {
return e.result().getValue();
}
}
}
20 changes: 20 additions & 0 deletions src/main/java/io/roastedroot/proxywasm/v1/ChainedHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,24 @@ public SharedData getSharedData(String key) throws WasmException {
public WasmResult setSharedData(String key, byte[] value, int cas) {
return next().setSharedData(key, value, cas);
}

@Override
public int registerSharedQueue(QueueName queueName) throws WasmException {
return next().registerSharedQueue(queueName);
}

@Override
public int resolveSharedQueue(QueueName queueName) throws WasmException {
return next().resolveSharedQueue(queueName);
}

@Override
public byte[] dequeueSharedQueue(int queueId) throws WasmException {
return next().dequeueSharedQueue(queueId);
}

@Override
public WasmResult enqueueSharedQueue(int queueId, byte[] value) {
return next().enqueueSharedQueue(queueId, value);
}
}
16 changes: 16 additions & 0 deletions src/main/java/io/roastedroot/proxywasm/v1/Handler.java
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,20 @@ default SharedData getSharedData(String key) throws WasmException {
default WasmResult setSharedData(String key, byte[] value, int cas) {
return WasmResult.UNIMPLEMENTED;
}

default int registerSharedQueue(QueueName name) throws WasmException {
throw new WasmException(WasmResult.UNIMPLEMENTED);
}

default int resolveSharedQueue(QueueName name) throws WasmException {
throw new WasmException(WasmResult.UNIMPLEMENTED);
}

default byte[] dequeueSharedQueue(int queueId) throws WasmException {
throw new WasmException(WasmResult.UNIMPLEMENTED);
}

default WasmResult enqueueSharedQueue(int queueId, byte[] value) {
return WasmResult.UNIMPLEMENTED;
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/roastedroot/proxywasm/v1/ProxyWasm.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ public void sendHttpCallResponse(
this.httpCallResponseBody = null;
}

public void sendOnQueueReady(int queueId) {
this.exports.proxyOnQueueReady(pluginContext.id(), queueId);
}

public int contextId() {
return pluginContext.id();
}
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/roastedroot/proxywasm/v1/QueueName.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.roastedroot.proxywasm.v1;

import java.util.Objects;

public class QueueName {
private final String vmId;
private final String name;

public QueueName(String vmId, String name) {
this.vmId = vmId;
this.name = name;
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
QueueName queue = (QueueName) o;
return Objects.equals(vmId, queue.vmId) && Objects.equals(name, queue.name);
}

@Override
public int hashCode() {
return Objects.hash(vmId, name);
}

public String vmId() {
return vmId;
}

public String name() {
return name;
}
}
4 changes: 4 additions & 0 deletions src/test/go-examples/shared_queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## Attribution

This example originally came from:
https://github.com/proxy-wasm/proxy-wasm-go-sdk/blob/ab4161dcf9246a828008b539a82a1556cf0f2e24/examples/shared_queue
5 changes: 5 additions & 0 deletions src/test/go-examples/shared_queue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module github.com/proxy-wasm/proxy-wasm-go-sdk/examples/shared_queue

go 1.24

require github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924
10 changes: 10 additions & 0 deletions src/test/go-examples/shared_queue/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924 h1:wTcK6gcyTKJMeDka69AMjZYvisdI8CBXzTEfZ+2pOxI=
github.com/proxy-wasm/proxy-wasm-go-sdk v0.0.0-20250212164326-ab4161dcf924/go.mod h1:9mBRvh8I6Td6sg3CwEY+zGFE4DKaIoieCaca1kQnDBE=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
80 changes: 80 additions & 0 deletions src/test/go-examples/shared_queue/receiver/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2020-2024 Tetrate
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"

"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm"
"github.com/proxy-wasm/proxy-wasm-go-sdk/proxywasm/types"
)

func main() {}
func init() {
proxywasm.SetVMContext(&vmContext{})
}

// vmContext implements types.VMContext.
type vmContext struct {
// Embed the default VM context here,
// so that we don't need to reimplement all the methods.
types.DefaultVMContext
}

// NewPluginContext implements types.VMContext.
func (*vmContext) NewPluginContext(contextID uint32) types.PluginContext {
return &receiverPluginContext{contextID: contextID}
}

// receiverPluginContext implements types.PluginContext.
type receiverPluginContext struct {
// Embed the default plugin context here,
// so that we don't need to reimplement all the methods.
contextID uint32
types.DefaultPluginContext
queueName string
}

// OnPluginStart implements types.PluginContext.
func (ctx *receiverPluginContext) OnPluginStart(pluginConfigurationSize int) types.OnPluginStartStatus {
// Get Plugin configuration.
config, err := proxywasm.GetPluginConfiguration()
if err != nil {
panic(fmt.Sprintf("failed to get plugin config: %v", err))
}

// Treat the config as the queue name for receiving.
ctx.queueName = string(config)

queueID, err := proxywasm.RegisterSharedQueue(ctx.queueName)
if err != nil {
panic("failed register queue")
}
proxywasm.LogInfof("queue \"%s\" registered as queueID=%d by contextID=%d", ctx.queueName, queueID, ctx.contextID)
return types.OnPluginStartStatusOK
}

// OnQueueReady implements types.PluginContext.
func (ctx *receiverPluginContext) OnQueueReady(queueID uint32) {
data, err := proxywasm.DequeueSharedQueue(queueID)
switch err {
case types.ErrorStatusEmpty:
return
case nil:
proxywasm.LogInfof("(contextID=%d) dequeued data from %s(queueID=%d): %s", ctx.contextID, ctx.queueName, queueID, string(data))
default:
proxywasm.LogCriticalf("error retrieving data from queue %d: %v", queueID, err)
}
}
Binary file not shown.
Loading