From b89e4e6226aa048236c96b08e84faf06b0760a7a Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sat, 22 May 2021 15:27:31 -0600 Subject: [PATCH 1/3] Add methods to deal with string message ids. If we want to deal with string message ids, we need methods for being able to create them from either a string or a using the individual parts of the message id. This exposes some constructors for this. Tests are needed for the string message parsing, but it is taking from pulsarctl where it is well used --- pulsar/message.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/pulsar/message.go b/pulsar/message.go index 397c51e992..fc33e293c6 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -18,7 +18,10 @@ package pulsar import ( + "github.com/pkg/errors" "math" + "strconv" + "strings" "time" ) @@ -120,6 +123,8 @@ type Message interface { type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte + // String the message id represented as a string + String() string } // DeserializeMessageID reconstruct a MessageID object from its serialized representation @@ -127,6 +132,48 @@ func DeserializeMessageID(data []byte) (MessageID, error) { return deserializeMessageID(data) } +func MessageIDFromParts(ledgerId, entryId int64, batchIdx, partitionIdx int32) MessageID { + return newMessageID(ledgerId, entryId, batchIdx, partitionIdx) +} + +func MessageIDFromString(str string) (MessageID, error) { + + s := strings.Split(str, ":") + + if len(s) < 2 || len(s) > 4 { + return nil, errors.Errorf("invalid message id string. %s", str) + } + + ledgerId, err := strconv.ParseInt(s[0], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid ledger id. %s", str) + } + + entryId, err := strconv.ParseInt(s[1], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid entry id. %s", str) + } + + partitionIdx := int32(-1) + if len(s) > 2 { + pi, err := strconv.Atoi(s[2]) + if err != nil { + return nil, errors.Errorf("invalid partition index. %s", str) + } + partitionIdx = int32(pi) + } + + batchIdx := int32(-1) + if len(s) == 4 { + bi, err := strconv.Atoi(s[3]) + if err != nil { + return nil, errors.Errorf("invalid batch index. %s", str) + } + batchIdx = int32(bi) + } + return newMessageID(ledgerId, entryId, batchIdx, partitionIdx), nil +} + // EarliestMessageID returns a messageID that points to the earliest message available in a topic func EarliestMessageID() MessageID { return newMessageID(-1, -1, -1, -1) From 0bd27e79654018afa3cdf464728da172b88e4215 Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sat, 22 May 2021 15:53:25 -0600 Subject: [PATCH 2/3] add tests --- pulsar/impl_message.go | 8 +++++++ pulsar/message.go | 4 +++- pulsar/message_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++ pulsar/reader_test.go | 8 +++++++ 4 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 pulsar/message_test.go diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index c358c22e66..4a34768c3e 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -112,6 +112,14 @@ func (id messageID) String() string { return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx) } +func (id messageID) Equals(other MessageID) bool { + rmsgid, ok := other.(messageID) + if !ok { + return false + } + return id.equal(rmsgid) +} + func deserializeMessageID(data []byte) (MessageID, error) { msgID := &pb.MessageIdData{} err := proto.Unmarshal(data, msgID) diff --git a/pulsar/message.go b/pulsar/message.go index fc33e293c6..4131726f3a 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -125,6 +125,8 @@ type MessageID interface { Serialize() []byte // String the message id represented as a string String() string + // Equals indicates to message IDs are equal + Equals(other MessageID) bool } // DeserializeMessageID reconstruct a MessageID object from its serialized representation @@ -179,7 +181,7 @@ func EarliestMessageID() MessageID { return newMessageID(-1, -1, -1, -1) } -// LatestMessage returns a messageID that points to the latest message +// LatestMessageID returns a messageID that points to the latest message func LatestMessageID() MessageID { return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1) } diff --git a/pulsar/message_test.go b/pulsar/message_test.go new file mode 100644 index 0000000000..1467a23488 --- /dev/null +++ b/pulsar/message_test.go @@ -0,0 +1,47 @@ +package pulsar + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestMessageIDFromString(t *testing.T) { + id, err := MessageIDFromString("1:2") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1))) + + id, err = MessageIDFromString("1:2:3") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3))) + + id, err = MessageIDFromString("1:2:3:4") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3))) +} + +func TestMessageIDFromStringErrors(t *testing.T) { + id, err := MessageIDFromString("1;1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid message id string. 1;1", err.Error()) + + id, err = MessageIDFromString("a:1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid ledger id. a:1", err.Error()) + + id, err = MessageIDFromString("1:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid entry id. 1:a", err.Error()) + + id, err = MessageIDFromString("1:2:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid partition index. 1:2:a", err.Error()) + + id, err = MessageIDFromString("1:2:3:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error()) +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index f72ba1d1a0..fe99c793af 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -391,6 +391,14 @@ func (id *myMessageID) Serialize() []byte { return id.data } +func (id *myMessageID) String() string { + return "" +} + +func (id *myMessageID) Equals(other MessageID) bool { + return true +} + func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, From dfbbaad1e2d94a5610230228d01e72268412f77e Mon Sep 17 00:00:00 2001 From: Addison Higham Date: Sat, 22 May 2021 16:01:58 -0600 Subject: [PATCH 3/3] fix test errors --- pulsar/message.go | 13 +++++++------ pulsar/message_test.go | 7 ++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/pulsar/message.go b/pulsar/message.go index 4131726f3a..fc5f038830 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -18,11 +18,12 @@ package pulsar import ( - "github.com/pkg/errors" "math" "strconv" "strings" "time" + + "github.com/pkg/errors" ) // ProducerMessage abstraction used in Pulsar producer @@ -134,8 +135,8 @@ func DeserializeMessageID(data []byte) (MessageID, error) { return deserializeMessageID(data) } -func MessageIDFromParts(ledgerId, entryId int64, batchIdx, partitionIdx int32) MessageID { - return newMessageID(ledgerId, entryId, batchIdx, partitionIdx) +func MessageIDFromParts(ledgerID, entryID int64, batchIdx, partitionIdx int32) MessageID { + return newMessageID(ledgerID, entryID, batchIdx, partitionIdx) } func MessageIDFromString(str string) (MessageID, error) { @@ -146,12 +147,12 @@ func MessageIDFromString(str string) (MessageID, error) { return nil, errors.Errorf("invalid message id string. %s", str) } - ledgerId, err := strconv.ParseInt(s[0], 10, 64) + ledgerID, err := strconv.ParseInt(s[0], 10, 64) if err != nil { return nil, errors.Errorf("invalid ledger id. %s", str) } - entryId, err := strconv.ParseInt(s[1], 10, 64) + entryID, err := strconv.ParseInt(s[1], 10, 64) if err != nil { return nil, errors.Errorf("invalid entry id. %s", str) } @@ -173,7 +174,7 @@ func MessageIDFromString(str string) (MessageID, error) { } batchIdx = int32(bi) } - return newMessageID(ledgerId, entryId, batchIdx, partitionIdx), nil + return newMessageID(ledgerID, entryID, batchIdx, partitionIdx), nil } // EarliestMessageID returns a messageID that points to the earliest message available in a topic diff --git a/pulsar/message_test.go b/pulsar/message_test.go index 1467a23488..14d7d8335b 100644 --- a/pulsar/message_test.go +++ b/pulsar/message_test.go @@ -2,21 +2,22 @@ package pulsar import ( "github.com/stretchr/testify/assert" + "testing" ) func TestMessageIDFromString(t *testing.T) { id, err := MessageIDFromString("1:2") assert.Nil(t, err) - assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1))) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1))) id, err = MessageIDFromString("1:2:3") assert.Nil(t, err) - assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3))) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3))) id, err = MessageIDFromString("1:2:3:4") assert.Nil(t, err) - assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3))) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3))) } func TestMessageIDFromStringErrors(t *testing.T) {