-
Notifications
You must be signed in to change notification settings - Fork 376
Add messageID from string and from parts #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,11 @@ package pulsar | |
|
|
||
| import ( | ||
| "math" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/pkg/errors" | ||
| ) | ||
|
|
||
| // ProducerMessage abstraction used in Pulsar producer | ||
|
|
@@ -120,19 +124,65 @@ type Message interface { | |
| type MessageID interface { | ||
| // Serialize the message id into a sequence of bytes that can be stored somewhere else | ||
| Serialize() []byte | ||
| // String the message id represented as a string | ||
| String() string | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not needed the struct just needs to implement the Stringer interface |
||
| // Equals indicates to message IDs are equal | ||
| Equals(other MessageID) bool | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should add this either. We should avoid changing the interfaces since it's a breaking change and this can be accomplished with out doing that. We can add a util method like |
||
| } | ||
|
|
||
| // DeserializeMessageID reconstruct a MessageID object from its serialized representation | ||
| func DeserializeMessageID(data []byte) (MessageID, error) { | ||
| return deserializeMessageID(data) | ||
| } | ||
|
|
||
| func MessageIDFromParts(ledgerID, entryID int64, batchIdx, partitionIdx int32) MessageID { | ||
| return newMessageID(ledgerID, entryID, batchIdx, partitionIdx) | ||
| } | ||
|
|
||
| func MessageIDFromString(str string) (MessageID, error) { | ||
|
|
||
| s := strings.Split(str, ":") | ||
|
|
||
| if len(s) < 2 || len(s) > 4 { | ||
| return nil, errors.Errorf("invalid message id string. %s", str) | ||
| } | ||
|
|
||
| ledgerID, err := strconv.ParseInt(s[0], 10, 64) | ||
| if err != nil { | ||
| return nil, errors.Errorf("invalid ledger id. %s", str) | ||
| } | ||
|
|
||
| entryID, err := strconv.ParseInt(s[1], 10, 64) | ||
| if err != nil { | ||
| return nil, errors.Errorf("invalid entry id. %s", str) | ||
| } | ||
|
|
||
| partitionIdx := int32(-1) | ||
| if len(s) > 2 { | ||
| pi, err := strconv.Atoi(s[2]) | ||
| if err != nil { | ||
| return nil, errors.Errorf("invalid partition index. %s", str) | ||
| } | ||
| partitionIdx = int32(pi) | ||
| } | ||
|
|
||
| batchIdx := int32(-1) | ||
| if len(s) == 4 { | ||
| bi, err := strconv.Atoi(s[3]) | ||
| if err != nil { | ||
| return nil, errors.Errorf("invalid batch index. %s", str) | ||
| } | ||
| batchIdx = int32(bi) | ||
| } | ||
| return newMessageID(ledgerID, entryID, batchIdx, partitionIdx), nil | ||
| } | ||
|
|
||
| // EarliestMessageID returns a messageID that points to the earliest message available in a topic | ||
| func EarliestMessageID() MessageID { | ||
| return newMessageID(-1, -1, -1, -1) | ||
| } | ||
|
|
||
| // LatestMessage returns a messageID that points to the latest message | ||
| // LatestMessageID returns a messageID that points to the latest message | ||
| func LatestMessageID() MessageID { | ||
| return newMessageID(math.MaxInt64, math.MaxInt64, -1, -1) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package pulsar | ||
|
|
||
| import ( | ||
| "github.com/stretchr/testify/assert" | ||
|
|
||
| "testing" | ||
| ) | ||
|
|
||
| func TestMessageIDFromString(t *testing.T) { | ||
| id, err := MessageIDFromString("1:2") | ||
| assert.Nil(t, err) | ||
| assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, -1))) | ||
|
|
||
| id, err = MessageIDFromString("1:2:3") | ||
| assert.Nil(t, err) | ||
| assert.True(t, id.Equals(MessageIDFromParts(1, 2, -1, 3))) | ||
|
|
||
| id, err = MessageIDFromString("1:2:3:4") | ||
| assert.Nil(t, err) | ||
| assert.True(t, id.Equals(MessageIDFromParts(1, 2, 4, 3))) | ||
| } | ||
|
|
||
| func TestMessageIDFromStringErrors(t *testing.T) { | ||
| id, err := MessageIDFromString("1;1") | ||
| assert.Nil(t, id) | ||
| assert.NotNil(t, err) | ||
| assert.Equal(t, "invalid message id string. 1;1", err.Error()) | ||
|
|
||
| id, err = MessageIDFromString("a:1") | ||
| assert.Nil(t, id) | ||
| assert.NotNil(t, err) | ||
| assert.Equal(t, "invalid ledger id. a:1", err.Error()) | ||
|
|
||
| id, err = MessageIDFromString("1:a") | ||
| assert.Nil(t, id) | ||
| assert.NotNil(t, err) | ||
| assert.Equal(t, "invalid entry id. 1:a", err.Error()) | ||
|
|
||
| id, err = MessageIDFromString("1:2:a") | ||
| assert.Nil(t, id) | ||
| assert.NotNil(t, err) | ||
| assert.Equal(t, "invalid partition index. 1:2:a", err.Error()) | ||
|
|
||
| id, err = MessageIDFromString("1:2:3:a") | ||
| assert.Nil(t, id) | ||
| assert.NotNil(t, err) | ||
| assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
MessageIDis an interface, Are we here trying to compare whether two interfaces are equal?