Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package pulsar
import (
"context"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
)

type HashingScheme int
Expand Down Expand Up @@ -199,3 +201,15 @@ type Producer interface {
// of errors, pending writes will not be retried.
Close()
}

// GetHashingFunction return the corresponding hashing function for the hashing scheme
func GetHashingFunction(s HashingScheme) func(string) uint32 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to expose this publicly? An application is free to use a custom message router function, at that point it can use any hashing function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to mimic what the default router does including the hash function. The only difference would be to use a snapshotted numPartitions instead of a latest one that is from TopicMetadata:

if options.MessageRouter == nil {

switch s {
case JavaStringHash:
return internal.JavaStringHash
case Murmur3_32Hash:
return internal.Murmur3_32Hash
default:
return internal.JavaStringHash
}
}
13 changes: 1 addition & 12 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,6 @@ type producer struct {

var partitionsAutoDiscoveryInterval = 1 * time.Minute

func getHashingFunction(s HashingScheme) func(string) uint32 {
switch s {
case JavaStringHash:
return internal.JavaStringHash
case Murmur3_32Hash:
return internal.Murmur3_32Hash
default:
return internal.JavaStringHash
}
}

func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
return nil, newError(InvalidTopicName, "Topic name is required for producer")
Expand Down Expand Up @@ -102,7 +91,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) {

if options.MessageRouter == nil {
internalRouter := NewDefaultRouter(
getHashingFunction(options.HashingScheme),
GetHashingFunction(options.HashingScheme),
options.BatchingMaxMessages,
options.BatchingMaxSize,
options.BatchingMaxPublishDelay,
Expand Down
14 changes: 14 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"net/http"
"reflect"
"runtime"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -41,6 +43,12 @@ func TestInvalidURL(t *testing.T) {
}
}

func TestGetHashingFunction(t *testing.T) {
assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(-1))
assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(JavaStringHash))
assertHashingFunctionEqual(t, internal.Murmur3_32Hash, GetHashingFunction(Murmur3_32Hash))
}

func TestProducerConnectError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://invalid-hostname:6650",
Expand Down Expand Up @@ -1031,3 +1039,9 @@ func TestProducerWithInterceptors(t *testing.T) {
assert.Equal(t, 10, metric.sendn)
assert.Equal(t, 10, metric.ackn)
}

func assertHashingFunctionEqual(t *testing.T, func1, func2 func(string) uint32) {
funcName1 := runtime.FuncForPC(reflect.ValueOf(func1).Pointer()).Name()
funcName2 := runtime.FuncForPC(reflect.ValueOf(func2).Pointer()).Name()
assert.Equal(t, funcName1, funcName2)
}