diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index c358c22e66..4a34768c3e 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -112,6 +112,14 @@ func (id messageID) String() string { return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx) } +func (id messageID) Equals(other MessageID) bool { + rmsgid, ok := other.(messageID) + if !ok { + return false + } + return id.equal(rmsgid) +} + func deserializeMessageID(data []byte) (MessageID, error) { msgID := &pb.MessageIdData{} err := proto.Unmarshal(data, msgID) diff --git a/pulsar/message.go b/pulsar/message.go index 397c51e992..fc5f038830 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -19,7 +19,11 @@ package pulsar import ( "math" + "strconv" + "strings" "time" + + "github.com/pkg/errors" ) // ProducerMessage abstraction used in Pulsar producer @@ -120,6 +124,10 @@ type Message interface { type MessageID interface { // Serialize the message id into a sequence of bytes that can be stored somewhere else Serialize() []byte + // String the message id represented as a string + String() string + // Equals indicates to message IDs are equal + Equals(other MessageID) bool } // DeserializeMessageID reconstruct a MessageID object from its serialized representation @@ -127,12 +135,54 @@ func DeserializeMessageID(data []byte) (MessageID, error) { return deserializeMessageID(data) } +func MessageIDFromParts(ledgerID, entryID int64, batchIdx, partitionIdx int32) MessageID { + return newMessageID(ledgerID, entryID, batchIdx, partitionIdx) +} + +func MessageIDFromString(str string) (MessageID, error) { + + s := strings.Split(str, ":") + + if len(s) < 2 || len(s) > 4 { + return nil, errors.Errorf("invalid message id string. %s", str) + } + + ledgerID, err := strconv.ParseInt(s[0], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid ledger id. %s", str) + } + + entryID, err := strconv.ParseInt(s[1], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid entry id. %s", str) + } + + partitionIdx := int32(-1) + if len(s) > 2 { + pi, err := strconv.Atoi(s[2]) + if err != nil { + return nil, errors.Errorf("invalid partition index. %s", str) + } + partitionIdx = int32(pi) + } + + batchIdx := int32(-1) + if len(s) == 4 { + bi, err := strconv.Atoi(s[3]) + if err != nil { + return nil, errors.Errorf("invalid batch index. %s", str) + } + batchIdx = int32(bi) + } + return newMessageID(ledgerID, entryID, batchIdx, partitionIdx), nil +} + // EarliestMessageID returns a messageID that points to the earliest message available in a topic func EarliestMessageID() MessageID { return newMessageID(-1, -1, -1, -1) } -// LatestMessage returns a messageID that points to the latest message +// LatestMessageID returns a messageID that points to the latest message func LatestMessageID() MessageID { return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1) } diff --git a/pulsar/message_test.go b/pulsar/message_test.go new file mode 100644 index 0000000000..14d7d8335b --- /dev/null +++ b/pulsar/message_test.go @@ -0,0 +1,48 @@ +package pulsar + +import ( + "github.com/stretchr/testify/assert" + + "testing" +) + +func TestMessageIDFromString(t *testing.T) { + id, err := MessageIDFromString("1:2") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1))) + + id, err = MessageIDFromString("1:2:3") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3))) + + id, err = MessageIDFromString("1:2:3:4") + assert.Nil(t, err) + assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3))) +} + +func TestMessageIDFromStringErrors(t *testing.T) { + id, err := MessageIDFromString("1;1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid message id string. 1;1", err.Error()) + + id, err = MessageIDFromString("a:1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid ledger id. a:1", err.Error()) + + id, err = MessageIDFromString("1:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid entry id. 1:a", err.Error()) + + id, err = MessageIDFromString("1:2:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid partition index. 1:2:a", err.Error()) + + id, err = MessageIDFromString("1:2:3:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error()) +} diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index f72ba1d1a0..fe99c793af 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -391,6 +391,14 @@ func (id *myMessageID) Serialize() []byte { return id.data } +func (id *myMessageID) String() string { + return "" +} + +func (id *myMessageID) Equals(other MessageID) bool { + return true +} + func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL,