Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _examples/queues/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func produceBytes(q *queues.Producer, req *http.Request) error {
return nil
}

func consumeBatch(batch *queues.ConsumerMessageBatch) error {
func consumeBatch(batch *queues.MessageBatch) error {
for _, msg := range batch.Messages {
log.Printf("Received message: %v\n", msg.Body.Get("name").String())
}
Expand Down
2 changes: 2 additions & 0 deletions cloudflare/queues/batchmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/syumai/workers/internal/jsutil"
)

// FIXME: rename to MessageSendRequest
// see: https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagesendrequest
type BatchMessage struct {
body js.Value
options *sendOptions
Expand Down
4 changes: 2 additions & 2 deletions cloudflare/queues/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// A returned error will cause the batch to be retried (unless the batch or individual messages are acked).
// NOTE: to do long-running message processing task within the Consumer, use cloudflare.WaitUntil, this will postpone the message
// acknowledgment until the task is completed witout blocking the queue consumption.
type Consumer func(batch *ConsumerMessageBatch) error
type Consumer func(batch *MessageBatch) error

var consumer Consumer

Expand Down Expand Up @@ -44,7 +44,7 @@ func init() {
}

func consumeBatch(batch js.Value) error {
b, err := newConsumerMessageBatch(batch)
b, err := newMessageBatch(batch)
if err != nil {
return fmt.Errorf("failed to parse message batch: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/syumai/workers/internal/jsutil"
)

// ConsumerMessage represents a message of the batch received by the consumer.
// Message represents a message of the batch received by the consumer.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
type ConsumerMessage struct {
type Message struct {
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value

// Id - The unique Cloudflare-generated identifier of the message
Id string
// ID - The unique Cloudflare-generated identifier of the message
ID string
// Timestamp - The time when the message was enqueued
Timestamp time.Time
// Body - The message body. Could be accessed directly or using converting helpers as StringBody, BytesBody, IntBody, FloatBody.
Expand All @@ -24,15 +24,15 @@ type ConsumerMessage struct {
Attempts int
}

func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) {
func newMessage(obj js.Value) (*Message, error) {
timestamp, err := jsutil.DateToTime(obj.Get("timestamp"))
if err != nil {
return nil, fmt.Errorf("failed to parse message timestamp: %v", err)
}

return &ConsumerMessage{
return &Message{
instance: obj,
Id: obj.Get("id").String(),
ID: obj.Get("id").String(),
Body: obj.Get("body"),
Attempts: obj.Get("attempts").Int(),
Timestamp: timestamp,
Expand All @@ -41,13 +41,13 @@ func newConsumerMessage(obj js.Value) (*ConsumerMessage, error) {

// Ack acknowledges the message as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#message
func (m *ConsumerMessage) Ack() {
func (m *Message) Ack() {
m.instance.Call("ack")
}

// Retry marks the message to be re-delivered.
// The message will be retried after the optional delay configured with RetryOption.
func (m *ConsumerMessage) Retry(opts ...RetryOption) {
func (m *Message) Retry(opts ...RetryOption) {
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
Expand All @@ -59,40 +59,19 @@ func (m *ConsumerMessage) Retry(opts ...RetryOption) {
m.instance.Call("retry", o.toJS())
}

func (m *ConsumerMessage) StringBody() (string, error) {
func (m *Message) StringBody() (string, error) {
if m.Body.Type() != js.TypeString {
return "", fmt.Errorf("message body is not a string: %v", m.Body)
}
return m.Body.String(), nil
}

func (m *ConsumerMessage) BytesBody() ([]byte, error) {
switch m.Body.Type() {
case js.TypeString:
return []byte(m.Body.String()), nil
case js.TypeObject:
if m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass) {
b := make([]byte, m.Body.Get("byteLength").Int())
js.CopyBytesToGo(b, m.Body)
return b, nil
}
}

return nil, fmt.Errorf("message body is not a byte array: %v", m.Body)
}

func (m *ConsumerMessage) IntBody() (int, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Int(), nil
func (m *Message) BytesBody() ([]byte, error) {
if m.Body.Type() != js.TypeObject ||
!(m.Body.InstanceOf(jsutil.Uint8ArrayClass) || m.Body.InstanceOf(jsutil.Uint8ClampedArrayClass)) {
return nil, fmt.Errorf("message body is not a byte array: %v", m.Body)
}

return 0, fmt.Errorf("message body is not a number: %v", m.Body)
}

func (m *ConsumerMessage) FloatBody() (float64, error) {
if m.Body.Type() == js.TypeNumber {
return m.Body.Float(), nil
}

return 0, fmt.Errorf("message body is not a number: %v", m.Body)
b := make([]byte, m.Body.Get("byteLength").Int())
js.CopyBytesToGo(b, m.Body)
return b, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ func TestNewConsumerMessage(t *testing.T) {
"attempts": 1,
}

got, err := newConsumerMessage(js.ValueOf(m))
got, err := newMessage(js.ValueOf(m))
if err != nil {
t.Fatalf("newConsumerMessage failed: %v", err)
t.Fatalf("newMessage failed: %v", err)
}

if body := got.Body.String(); body != "hello" {
t.Fatalf("Body() = %v, want %v", body, "hello")
}

if got.Id != id {
t.Fatalf("Id = %v, want %v", got.Id, id)
if got.ID != id {
t.Fatalf("ID = %v, want %v", got.ID, id)
}

if got.Attempts != 1 {
Expand All @@ -49,7 +49,7 @@ func TestConsumerMessage_Ack(t *testing.T) {
ackCalled = true
return nil
}))
m := &ConsumerMessage{
m := &Message{
instance: jsObj,
}

Expand All @@ -67,7 +67,7 @@ func TestConsumerMessage_Retry(t *testing.T) {
retryCalled = true
return nil
}))
m := &ConsumerMessage{
m := &Message{
instance: jsObj,
}

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestConsumerMessage_RetryWithDelay(t *testing.T) {
return nil
}))

m := &ConsumerMessage{
m := &Message{
instance: jsObj,
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestNewConsumerMessage_StringBody(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
m := &Message{
Body: tt.body(),
}

Expand All @@ -174,13 +174,6 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
want []byte
wantErr bool
}{
{
name: "string",
body: func() js.Value {
return js.ValueOf("hello")
},
want: []byte("hello"),
},
{
name: "uint8 array",
body: func() js.Value {
Expand All @@ -202,15 +195,15 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
{
name: "incorrect type",
body: func() js.Value {
return js.ValueOf(42)
return js.ValueOf("hello")
},
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
m := &Message{
Body: tt.body(),
}

Expand All @@ -225,97 +218,3 @@ func TestConsumerMessage_BytesBody(t *testing.T) {
})
}
}

func TestConsumerMessage_IntBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want int
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}

got, err := m.IntBody()
if (err != nil) != tt.wantErr {
t.Fatalf("IntBody() error = %v, wantErr %v", err, tt.wantErr)
}

if got != tt.want {
t.Fatalf("IntBody() = %v, want %v", got, tt.want)
}
})
}
}

func TestConsumerMessage_FloatBody(t *testing.T) {
tests := []struct {
name string
body js.Value
want float64
wantErr bool
}{
{
name: "int",
body: js.ValueOf(42),
want: 42.0,
},
{
name: "float",
body: js.ValueOf(42.5),
want: 42.5,
},
{
name: "string",
body: js.ValueOf("42"),
wantErr: true,
},
{
name: "undefined",
body: js.Undefined(),
wantErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &ConsumerMessage{
Body: tt.body,
}

got, err := m.FloatBody()
if (err != nil) != tt.wantErr {
t.Fatalf("FloatBody() error = %v, wantErr %v", err, tt.wantErr)
}

if got != tt.want {
t.Fatalf("FloatBody() = %v, want %v", got, tt.want)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,33 @@ import (
"syscall/js"
)

// ConsumerMessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the
// MessageBatch represents a batch of messages received by the consumer. The size of the batch is determined by the
// worker configuration.
// - https://developers.cloudflare.com/queues/configuration/configure-queues/#consumer
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
type ConsumerMessageBatch struct {
type MessageBatch struct {
// instance - The underlying instance of the JS message object passed by the cloudflare
instance js.Value

// Queue - The name of the queue from which the messages were received
Queue string

// Messages - The messages in the batch
Messages []*ConsumerMessage
Messages []*Message
}

func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) {
func newMessageBatch(obj js.Value) (*MessageBatch, error) {
msgArr := obj.Get("messages")
messages := make([]*ConsumerMessage, msgArr.Length())
messages := make([]*Message, msgArr.Length())
for i := 0; i < msgArr.Length(); i++ {
m, err := newConsumerMessage(msgArr.Index(i))
m, err := newMessage(msgArr.Index(i))
if err != nil {
return nil, fmt.Errorf("failed to parse message %d: %v", i, err)
}
messages[i] = m
}

return &ConsumerMessageBatch{
return &MessageBatch{
instance: obj,
Queue: obj.Get("queue").String(),
Messages: messages,
Expand All @@ -40,14 +40,14 @@ func newConsumerMessageBatch(obj js.Value) (*ConsumerMessageBatch, error) {

// AckAll acknowledges all messages in the batch as successfully delivered despite the result returned from the consuming function.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) AckAll() {
func (b *MessageBatch) AckAll() {
b.instance.Call("ackAll")
}

// RetryAll marks all messages in the batch to be re-delivered.
// The messages will be retried after the optional delay configured with RetryOption.
// - https://developers.cloudflare.com/queues/configuration/javascript-apis/#messagebatch
func (b *ConsumerMessageBatch) RetryAll(opts ...RetryOption) {
func (b *MessageBatch) RetryAll(opts ...RetryOption) {
var o *retryOptions
if len(opts) > 0 {
o = &retryOptions{}
Expand Down
Loading
Loading