From 476b01d02c4fa990a6e3526eff64ac0472afdf19 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Wed, 29 Dec 2021 14:13:06 +0800 Subject: [PATCH 1/2] add parition reader --- pulsar/reader_impl.go | 127 +++++++++++++++------------------ pulsar/reader_partition.go | 142 +++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+), 70 deletions(-) create mode 100644 pulsar/reader_partition.go diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index c8543259a3..6e209660d4 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -33,10 +33,13 @@ const ( type reader struct { sync.Mutex + topic string client *client - pc *partitionConsumer + options ReaderOptions + consumers []*partitionConsumer messageCh chan ConsumerMessage lastMessageInBroker trackingMessageID + dlq *dlqRouter log log.Logger metrics *internal.LeveledMetrics } @@ -50,74 +53,36 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, newError(InvalidConfiguration, "StartMessageID is required") } - startMessageID, ok := toTrackingMessageID(options.StartMessageID) - if !ok { - // a custom type satisfying MessageID may not be a messageID or trackingMessageID - // so re-create messageID using its data - deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize()) - if err != nil { - return nil, err - } - // de-serialized MessageID is a messageID - startMessageID = trackingMessageID{ - messageID: deserMsgID.(messageID), - receivedTime: time.Now(), - } - } - - subscriptionName := options.SubscriptionRolePrefix - if subscriptionName == "" { - subscriptionName = "reader" - } - subscriptionName += "-" + generateRandomName() - - receiverQueueSize := options.ReceiverQueueSize - if receiverQueueSize <= 0 { - receiverQueueSize = defaultReceiverQueueSize + if options.ReceiverQueueSize <= 0 { + options.ReceiverQueueSize = defaultReceiverQueueSize } - consumerOptions := &partitionConsumerOpts{ - topic: options.Topic, - consumerName: options.Name, - subscription: subscriptionName, - subscriptionType: Exclusive, - receiverQueueSize: receiverQueueSize, - startMessageID: startMessageID, - startMessageIDInclusive: options.StartMessageIDInclusive, - subscriptionMode: nonDurable, - readCompacted: options.ReadCompacted, - metadata: options.Properties, - nackRedeliveryDelay: defaultNackRedeliveryDelay, - replicateSubscriptionState: false, - decryption: options.Decryption, + // Provide dummy dlq router with not dlq policy + dlq, err := newDlqRouter(client, nil, client.log) + if err != nil { + return nil, err } reader := &reader{ + topic: options.Topic, client: client, + options: options, messageCh: make(chan ConsumerMessage), - log: client.log.SubLogger(log.Fields{"topic": options.Topic}), + dlq: dlq, metrics: client.metrics.GetLeveledMetrics(options.Topic), + log: client.log.SubLogger(log.Fields{"topic": options.Topic}), } - // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, client.log) - if err != nil { - return nil, err - } - - pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics) - if err != nil { - close(reader.messageCh) + if err := reader.internalTopicReadToPartitions(); err != nil { return nil, err } - reader.pc = pc reader.metrics.ReadersOpened.Inc() return reader, nil } func (r *reader) Topic() string { - return r.pc.topic + return r.topic } func (r *reader) Next(ctx context.Context) (Message, error) { @@ -132,8 +97,8 @@ func (r *reader) Next(ctx context.Context) (Message, error) { // it will specify the subscription position anyway msgID := cm.Message.ID() if mid, ok := toTrackingMessageID(msgID); ok { - r.pc.lastDequeuedMsg = mid - r.pc.AckID(mid) + r.consumers[mid.partitionIdx].lastDequeuedMsg = mid + r.consumers[mid.partitionIdx].AckID(mid) return cm.Message, nil } return nil, newError(InvalidMessage, fmt.Sprintf("invalid message id type %T", msgID)) @@ -148,35 +113,54 @@ func (r *reader) HasNext() bool { return true } +retryLoop: for { - lastMsgID, err := r.pc.getLastMessageID() - if err != nil { - r.log.WithError(err).Error("Failed to get last message id from broker") - continue - } else { + consumerLoop: + for _, consumer := range r.consumers { + lastMsgID, err := consumer.getLastMessageID() + if err != nil { + r.log.WithError(err).Error("Failed to get last message id from broker") + continue retryLoop + } + if r.lastMessageInBroker.greater(lastMsgID.messageID) { + continue consumerLoop + } r.lastMessageInBroker = lastMsgID - break } + break retryLoop } return r.hasMoreMessages() } func (r *reader) hasMoreMessages() bool { - if !r.pc.lastDequeuedMsg.Undefined() { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.lastDequeuedMsg.messageID) - } + moreMessagesCheck := func(idx int) bool { + if !r.consumers[idx].lastDequeuedMsg.Undefined() { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].lastDequeuedMsg.messageID) + } - if r.pc.options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.pc.startMessageID.messageID) - } + if r.consumers[idx].options.startMessageIDInclusive { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.consumers[idx].startMessageID.messageID) + } - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.pc.startMessageID.messageID) + // Non-inclusive + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].startMessageID.messageID) + } + for idx := range r.consumers { + if moreMessagesCheck(idx) { + return true + } + } + return false } func (r *reader) Close() { - r.pc.Close() + for _, consumer := range r.consumers { + if consumer != nil { + consumer.Close() + } + } + r.dlq.close() r.client.handlers.Del(r) r.metrics.ReadersClosed.Inc() } @@ -207,12 +191,15 @@ func (r *reader) Seek(msgID MessageID) error { return nil } - return r.pc.Seek(mid) + return r.consumers[mid.partitionIdx].Seek(mid) } func (r *reader) SeekByTime(time time.Time) error { r.Lock() defer r.Unlock() - return r.pc.SeekByTime(time) + if len(r.consumers) > 1 { + return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions") + } + return r.consumers[0].SeekByTime(time) } diff --git a/pulsar/reader_partition.go b/pulsar/reader_partition.go new file mode 100644 index 0000000000..9eab5cbacb --- /dev/null +++ b/pulsar/reader_partition.go @@ -0,0 +1,142 @@ +package pulsar + +import ( + "sync" + "time" +) + +func (r *reader) internalTopicReadToPartitions() error { + partitions, err := r.client.TopicPartitions(r.topic) + if err != nil { + return err + } + + oldNumPartitions, newNumPartitions := 0, len(partitions) + + r.Lock() + defer r.Unlock() + + oldReaders, oldNumPartitions := r.consumers, len(r.consumers) + if oldReaders != nil { + if oldNumPartitions == newNumPartitions { + r.log.Debug("Number of partitions in topic has not changed") + return nil + } + + r.log.WithField("old_partitions", oldNumPartitions). + WithField("new_partitions", newNumPartitions). + Info("Changed number of partitions in topic") + } + + r.consumers = make([]*partitionConsumer, newNumPartitions) + + // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, + // we need to rebuild the cache of new consumers, otherwise the array will be out of bounds. + if oldReaders != nil && oldNumPartitions < newNumPartitions { + // Copy over the existing consumer instances + for i := 0; i < oldNumPartitions; i++ { + r.consumers[i] = oldReaders[i] + } + } + + type ConsumerError struct { + err error + partition int + consumer *partitionConsumer + } + + subscriptionName := r.options.SubscriptionRolePrefix + if subscriptionName == "" { + subscriptionName = "reader" + } + subscriptionName += "-" + generateRandomName() + + startMessageID, ok := toTrackingMessageID(r.options.StartMessageID) + if !ok { + // a custom type satisfying MessageID may not be a messageID or trackingMessageID + // so re-create messageID using its data + deserMsgID, err := deserializeMessageID(r.options.StartMessageID.Serialize()) + if err != nil { + return err + } + // de-serialized MessageID is a messageID + startMessageID = trackingMessageID{ + messageID: deserMsgID.(messageID), + receivedTime: time.Now(), + } + } + + startPartition := oldNumPartitions + partitionsToAdd := newNumPartitions - oldNumPartitions + + if partitionsToAdd < 0 { + partitionsToAdd = newNumPartitions + startPartition = 0 + } + + var wg sync.WaitGroup + ch := make(chan ConsumerError, partitionsToAdd) + wg.Add(partitionsToAdd) + + for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { + partitionTopic := partitions[partitionIdx] + + go func(idx int, pt string) { + defer wg.Done() + + opts := &partitionConsumerOpts{ + topic: pt, + consumerName: r.options.Name, + subscription: subscriptionName, + subscriptionType: Exclusive, + partitionIdx: idx, + receiverQueueSize: r.options.ReceiverQueueSize, + nackRedeliveryDelay: defaultNackRedeliveryDelay, + metadata: r.options.Properties, + replicateSubscriptionState: false, + startMessageID: startMessageID, + subscriptionMode: durable, + readCompacted: r.options.ReadCompacted, + decryption: r.options.Decryption, + } + + cons, err := newPartitionConsumer(nil, r.client, opts, r.messageCh, r.dlq, r.metrics) + ch <- ConsumerError{ + err: err, + partition: idx, + consumer: cons, + } + }(partitionIdx, partitionTopic) + } + + go func() { + wg.Wait() + close(ch) + }() + + for ce := range ch { + if ce.err != nil { + err = ce.err + } else { + r.consumers[ce.partition] = ce.consumer + } + } + + if err != nil { + // Since there were some failures, + // cleanup all the partitions that succeeded in creating the consumer + for _, c := range r.consumers { + if c != nil { + c.Close() + } + } + return err + } + + if newNumPartitions < oldNumPartitions { + r.metrics.ConsumersPartitions.Set(float64(newNumPartitions)) + } else { + r.metrics.ConsumersPartitions.Add(float64(partitionsToAdd)) + } + return nil +} From 8ad69abdd64e2f3ddc9d8074e73335fcef3cd130 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Fri, 14 Jan 2022 16:47:43 +0800 Subject: [PATCH 2/2] fix comment --- pulsar/reader_impl.go | 80 ++++++++++++++++++++++++++++---------- pulsar/reader_partition.go | 30 ++++++++++---- pulsar/reader_test.go | 48 +++++++++++++++++++++++ 3 files changed, 129 insertions(+), 29 deletions(-) diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 6e209660d4..b4bb6aec8a 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -42,6 +42,8 @@ type reader struct { dlq *dlqRouter log log.Logger metrics *internal.LeveledMetrics + stopDiscovery func() + closeOnce sync.Once } func newReader(client *client, options ReaderOptions) (Reader, error) { @@ -77,6 +79,8 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { return nil, err } + reader.stopDiscovery = reader.runBackgroundPartitionDiscovery(time.Second * 60) + reader.metrics.ReadersOpened.Inc() return reader, nil } @@ -134,35 +138,43 @@ retryLoop: } func (r *reader) hasMoreMessages() bool { - moreMessagesCheck := func(idx int) bool { - if !r.consumers[idx].lastDequeuedMsg.Undefined() { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].lastDequeuedMsg.messageID) - } - - if r.consumers[idx].options.startMessageIDInclusive { - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(r.consumers[idx].startMessageID.messageID) - } - - // Non-inclusive - return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(r.consumers[idx].startMessageID.messageID) - } - for idx := range r.consumers { - if moreMessagesCheck(idx) { + for _, c := range r.consumers { + if r.consumerHasMoreMessages(c) { return true } } return false } +func (r *reader) consumerHasMoreMessages(pc *partitionConsumer) bool { + if !pc.lastDequeuedMsg.Undefined() { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(pc.lastDequeuedMsg.messageID) + } + + if pc.options.startMessageIDInclusive { + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greaterEqual(pc.startMessageID.messageID) + } + + // Non-inclusive + return r.lastMessageInBroker.isEntryIDValid() && r.lastMessageInBroker.greater(pc.startMessageID.messageID) +} + func (r *reader) Close() { - for _, consumer := range r.consumers { - if consumer != nil { - consumer.Close() + r.closeOnce.Do(func() { + r.stopDiscovery() + + r.Lock() + defer r.Unlock() + + for _, consumer := range r.consumers { + if consumer != nil { + consumer.Close() + } } - } - r.dlq.close() - r.client.handlers.Del(r) - r.metrics.ReadersClosed.Inc() + r.dlq.close() + r.client.handlers.Del(r) + r.metrics.ReadersClosed.Inc() + }) } func (r *reader) messageID(msgID MessageID) (trackingMessageID, bool) { @@ -203,3 +215,29 @@ func (r *reader) SeekByTime(time time.Time) error { } return r.consumers[0].SeekByTime(time) } + +func (r *reader) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { + var wg sync.WaitGroup + stopDiscoveryCh := make(chan struct{}) + ticker := time.NewTicker(period) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopDiscoveryCh: + return + case <-ticker.C: + r.log.Debug("Auto discovering new partitions") + r.internalTopicReadToPartitions() + } + } + }() + + return func() { + ticker.Stop() + close(stopDiscoveryCh) + wg.Wait() + } +} diff --git a/pulsar/reader_partition.go b/pulsar/reader_partition.go index 9eab5cbacb..106bdd9f14 100644 --- a/pulsar/reader_partition.go +++ b/pulsar/reader_partition.go @@ -1,3 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. package pulsar import ( @@ -5,6 +21,10 @@ import ( "time" ) +const ( + ReaderSubNamePrefix = "reader" +) + func (r *reader) internalTopicReadToPartitions() error { partitions, err := r.client.TopicPartitions(r.topic) if err != nil { @@ -45,12 +65,6 @@ func (r *reader) internalTopicReadToPartitions() error { consumer *partitionConsumer } - subscriptionName := r.options.SubscriptionRolePrefix - if subscriptionName == "" { - subscriptionName = "reader" - } - subscriptionName += "-" + generateRandomName() - startMessageID, ok := toTrackingMessageID(r.options.StartMessageID) if !ok { // a custom type satisfying MessageID may not be a messageID or trackingMessageID @@ -87,7 +101,7 @@ func (r *reader) internalTopicReadToPartitions() error { opts := &partitionConsumerOpts{ topic: pt, consumerName: r.options.Name, - subscription: subscriptionName, + subscription: ReaderSubNamePrefix + "-" + generateRandomName(), subscriptionType: Exclusive, partitionIdx: idx, receiverQueueSize: r.options.ReceiverQueueSize, @@ -95,7 +109,7 @@ func (r *reader) internalTopicReadToPartitions() error { metadata: r.options.Properties, replicateSubscriptionState: false, startMessageID: startMessageID, - subscriptionMode: durable, + subscriptionMode: nonDurable, readCompacted: r.options.ReadCompacted, decryption: r.options.Decryption, } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index bdafea086f..4d27504ae3 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "net/http" "testing" "time" @@ -92,6 +93,53 @@ func TestReader(t *testing.T) { } } +func TestMultipleTopicReader(t *testing.T) { + ctx := context.Background() + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + testURL := adminURL + "/" + "admin/v2/persistent/public/default/" + topic + "/partitions" + makeHTTPCall(t, http.MethodPut, testURL, "3") + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +} + func TestReaderConnectError(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://invalid-hostname:6650",