Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ func (id messageID) String() string {
return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
}

func (id messageID) Equals(other MessageID) bool {
rmsgid, ok := other.(messageID)
if !ok {
return false
}
return id.equal(rmsgid)
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MessageID is an interface, Are we here trying to compare whether two interfaces are equal?

func deserializeMessageID(data []byte) (MessageID, error) {
msgID := &pb.MessageIdData{}
err := proto.Unmarshal(data, msgID)
Expand Down
52 changes: 51 additions & 1 deletion pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package pulsar

import (
"math"
"strconv"
"strings"
"time"

"github.com/pkg/errors"
)

// ProducerMessage abstraction used in Pulsar producer
Expand Down Expand Up @@ -120,19 +124,65 @@ type Message interface {
type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte
// String the message id represented as a string
String() string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed the struct just needs to implement the Stringer interface
https://golang.org/pkg/fmt/#Stringer

// Equals indicates to message IDs are equal
Equals(other MessageID) bool
Copy link
Copy Markdown
Contributor

@cckellogg cckellogg May 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add this either. We should avoid changing the interfaces since it's a breaking change and this can be accomplished with out doing that. We can add a util method like messageIDsEqual or MessageIDsEqual.

}

// DeserializeMessageID reconstruct a MessageID object from its serialized representation
func DeserializeMessageID(data []byte) (MessageID, error) {
return deserializeMessageID(data)
}

func MessageIDFromParts(ledgerID, entryID int64, batchIdx, partitionIdx int32) MessageID {
return newMessageID(ledgerID, entryID, batchIdx, partitionIdx)
}

func MessageIDFromString(str string) (MessageID, error) {

s := strings.Split(str, ":")

if len(s) < 2 || len(s) > 4 {
return nil, errors.Errorf("invalid message id string. %s", str)
}

ledgerID, err := strconv.ParseInt(s[0], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid ledger id. %s", str)
}

entryID, err := strconv.ParseInt(s[1], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid entry id. %s", str)
}

partitionIdx := int32(-1)
if len(s) > 2 {
pi, err := strconv.Atoi(s[2])
if err != nil {
return nil, errors.Errorf("invalid partition index. %s", str)
}
partitionIdx = int32(pi)
}

batchIdx := int32(-1)
if len(s) == 4 {
bi, err := strconv.Atoi(s[3])
if err != nil {
return nil, errors.Errorf("invalid batch index. %s", str)
}
batchIdx = int32(bi)
}
return newMessageID(ledgerID, entryID, batchIdx, partitionIdx), nil
}

// EarliestMessageID returns a messageID that points to the earliest message available in a topic
func EarliestMessageID() MessageID {
return newMessageID(-1, -1, -1, -1)
}

// LatestMessage returns a messageID that points to the latest message
// LatestMessageID returns a messageID that points to the latest message
func LatestMessageID() MessageID {
return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1)
}
48 changes: 48 additions & 0 deletions pulsar/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package pulsar

import (
"github.com/stretchr/testify/assert"

"testing"
)

func TestMessageIDFromString(t *testing.T) {
id, err := MessageIDFromString("1:2")
assert.Nil(t, err)
assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1)))

id, err = MessageIDFromString("1:2:3")
assert.Nil(t, err)
assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3)))

id, err = MessageIDFromString("1:2:3:4")
assert.Nil(t, err)
assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3)))
}

func TestMessageIDFromStringErrors(t *testing.T) {
id, err := MessageIDFromString("1;1")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid message id string. 1;1", err.Error())

id, err = MessageIDFromString("a:1")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid ledger id. a:1", err.Error())

id, err = MessageIDFromString("1:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid entry id. 1:a", err.Error())

id, err = MessageIDFromString("1:2:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid partition index. 1:2:a", err.Error())

id, err = MessageIDFromString("1:2:3:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error())
}
8 changes: 8 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,14 @@ func (id *myMessageID) Serialize() []byte {
return id.data
}

func (id *myMessageID) String() string {
return ""
}

func (id *myMessageID) Equals(other MessageID) bool {
return true
}

func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
Expand Down