diff --git a/pulsar/producer.go b/pulsar/producer.go index b41415a4aa..ae2afae1ea 100644 --- a/pulsar/producer.go +++ b/pulsar/producer.go @@ -20,6 +20,8 @@ package pulsar import ( "context" "time" + + "github.com/apache/pulsar-client-go/pulsar/internal" ) type HashingScheme int @@ -199,3 +201,15 @@ type Producer interface { // of errors, pending writes will not be retried. Close() } + +// GetHashingFunction return the corresponding hashing function for the hashing scheme +func GetHashingFunction(s HashingScheme) func(string) uint32 { + switch s { + case JavaStringHash: + return internal.JavaStringHash + case Murmur3_32Hash: + return internal.Murmur3_32Hash + default: + return internal.JavaStringHash + } +} diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index e8d43e04b1..85b120962f 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -59,17 +59,6 @@ type producer struct { var partitionsAutoDiscoveryInterval = 1 * time.Minute -func getHashingFunction(s HashingScheme) func(string) uint32 { - switch s { - case JavaStringHash: - return internal.JavaStringHash - case Murmur3_32Hash: - return internal.Murmur3_32Hash - default: - return internal.JavaStringHash - } -} - func newProducer(client *client, options *ProducerOptions) (*producer, error) { if options.Topic == "" { return nil, newError(InvalidTopicName, "Topic name is required for producer") @@ -102,7 +91,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { if options.MessageRouter == nil { internalRouter := NewDefaultRouter( - getHashingFunction(options.HashingScheme), + GetHashingFunction(options.HashingScheme), options.BatchingMaxMessages, options.BatchingMaxSize, options.BatchingMaxPublishDelay, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 4d62cac169..e0ac56cfef 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -21,6 +21,8 @@ import ( "context" "fmt" "net/http" + "reflect" + "runtime" "strconv" "sync" "sync/atomic" @@ -41,6 +43,12 @@ func TestInvalidURL(t *testing.T) { } } +func TestGetHashingFunction(t *testing.T) { + assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(-1)) + assertHashingFunctionEqual(t, internal.JavaStringHash, GetHashingFunction(JavaStringHash)) + assertHashingFunctionEqual(t, internal.Murmur3_32Hash, GetHashingFunction(Murmur3_32Hash)) +} + func TestProducerConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650", @@ -1031,3 +1039,9 @@ func TestProducerWithInterceptors(t *testing.T) { assert.Equal(t, 10, metric.sendn) assert.Equal(t, 10, metric.ackn) } + +func assertHashingFunctionEqual(t *testing.T, func1, func2 func(string) uint32) { + funcName1 := runtime.FuncForPC(reflect.ValueOf(func1).Pointer()).Name() + funcName2 := runtime.FuncForPC(reflect.ValueOf(func2).Pointer()).Name() + assert.Equal(t, funcName1, funcName2) +}