From 63d21402dcd081f9ca2c2ad2caf761e3a87416aa Mon Sep 17 00:00:00 2001 From: MalikHou Date: Mon, 13 Sep 2021 12:50:18 +0800 Subject: [PATCH 1/8] fix negative_acks_tracker.go --- pulsar/consumer.go | 4 + pulsar/consumer_impl.go | 32 +++++- pulsar/consumer_partition.go | 12 ++- pulsar/error.go | 4 + pulsar/impl_message.go | 7 ++ pulsar/internal/time_wheel.go | 154 +++++++++++++++++++++++++++ pulsar/negative_acks_tracker.go | 103 ++++++++---------- pulsar/negative_acks_tracker_test.go | 12 +-- 8 files changed, 264 insertions(+), 64 deletions(-) create mode 100644 pulsar/internal/time_wheel.go diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 1c52b29935..fee6ef2a35 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -132,6 +132,10 @@ type ConsumerOptions struct { // processed. Default is 1min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration + // The AckTimeOut after which to redeliver the messages that timeout to be + // processed. Default is close + AckTimeOut time.Duration + // Set the consumer name. Name string diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 78ae0d7759..a151f86702 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -30,11 +30,15 @@ import ( "github.com/apache/pulsar-client-go/pulsar/log" ) -const defaultNackRedeliveryDelay = 1 * time.Minute +const ( + defaultNackRedeliveryDelay = 1 * time.Minute + defaultAckTimeOut = 1 * time.Second +) type acker interface { AckID(id trackingMessageID) NackID(id trackingMessageID) + NackIDDelay(id trackingMessageID, delay time.Duration) } type consumer struct { @@ -138,6 +142,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } + if options.AckTimeOut != 0 && options.AckTimeOut < defaultAckTimeOut { + return nil, newError(AckTimeoutLess, "Ack timeout should be greater than 1000 ms") + } + // normalize as FQDN topics var tns []*internal.TopicName // single topic consumer @@ -325,6 +333,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { partitionIdx: idx, receiverQueueSize: receiverQueueSize, nackRedeliveryDelay: nackRedeliveryDelay, + ackTimeout: c.options.AckTimeOut, metadata: metadata, replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: trackingMessageID{}, @@ -414,6 +423,9 @@ func (c *consumer) Receive(ctx context.Context) (message Message, err error) { if !ok { return nil, newError(ConsumerClosed, "consumer closed") } + if c.options.AckTimeOut != 0 { + c.NackDelay(cm.Message, c.options.AckTimeOut) + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -499,6 +511,10 @@ func (c *consumer) Nack(msg Message) { c.NackID(msg.ID()) } +func (c *consumer) NackDelay(msg Message, delay time.Duration) { + c.NackIDDelay(msg.ID(), delay) +} + func (c *consumer) NackID(msgID MessageID) { mid, ok := c.messageID(msgID) if !ok { @@ -513,6 +529,20 @@ func (c *consumer) NackID(msgID MessageID) { c.consumers[mid.partitionIdx].NackID(mid) } +func (c *consumer) NackIDDelay(msgID MessageID, delay time.Duration) { + mid, ok := c.messageID(msgID) + if !ok { + return + } + + if mid.consumer != nil { + mid.NackDelay(delay) + return + } + + c.consumers[mid.partitionIdx].NackIDDelay(mid, delay) +} + func (c *consumer) Close() { c.closeOnce.Do(func() { c.stopDiscovery() diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index cf9294978d..21f4bdfdc1 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -86,6 +86,7 @@ type partitionConsumerOpts struct { partitionIdx int receiverQueueSize int nackRedeliveryDelay time.Duration + ackTimeout time.Duration metadata map[string]string replicateSubscriptionState bool startMessageID trackingMessageID @@ -294,6 +295,9 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) { req := &ackRequest{ msgID: msgID, } + if pc.options.ackTimeout != 0 { + pc.nackTracker.Remove(msgID.messageID) + } pc.eventsCh <- req pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) @@ -301,11 +305,17 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) { } func (pc *partitionConsumer) NackID(msgID trackingMessageID) { - pc.nackTracker.Add(msgID.messageID) + pc.nackTracker.Add(msgID.messageID, pc.options.nackRedeliveryDelay) + pc.metrics.NacksCounter.Inc() +} + +func (pc *partitionConsumer) NackIDDelay(msgID trackingMessageID, dalay time.Duration) { + pc.nackTracker.Add(msgID.messageID, dalay) pc.metrics.NacksCounter.Inc() } func (pc *partitionConsumer) Redeliver(msgIds []messageID) { + fmt.Println("msgIds:", msgIds) pc.eventsCh <- &redeliveryRequest{msgIds} iMsgIds := make([]MessageID, len(msgIds)) diff --git a/pulsar/error.go b/pulsar/error.go index f433bfc973..b396bd41d6 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -101,6 +101,8 @@ const ( SeekFailed // ProducerClosed means producer already been closed ProducerClosed + // AckTimeoutLess means Ack timeout should be greater than 1000 ms + AckTimeoutLess ) // Error implement error interface, composed of two parts: msg and result. @@ -205,6 +207,8 @@ func getResultStr(r Result) string { return "SeekFailed" case ProducerClosed: return "ProducerClosed" + case AckTimeoutLess: + return "AckTimeoutLess" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index c4f215c5e3..1d81d98df4 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -79,6 +79,13 @@ func (id trackingMessageID) Nack() { id.consumer.NackID(id) } +func (id trackingMessageID) NackDelay(delay time.Duration) { + if id.consumer == nil { + return + } + id.consumer.NackIDDelay(id, delay) +} + func (id trackingMessageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { return id.tracker.ack(int(id.batchIdx)) diff --git a/pulsar/internal/time_wheel.go b/pulsar/internal/time_wheel.go new file mode 100644 index 0000000000..c0bb02c351 --- /dev/null +++ b/pulsar/internal/time_wheel.go @@ -0,0 +1,154 @@ +package internal + +import ( + "errors" + "time" +) + +// Task means handle unit in time wheel +type Task struct { + delay time.Duration + key interface{} + round int // optimize time wheel to handle delay bigger than bucketsNum * tick + callback func() +} + +// TimeWheel means time wheel +type TimeWheel struct { + tick time.Duration + ticker *time.Ticker + + bucketsNum int + buckets []map[interface{}]*Task // key: added item, value: *Task + bucketIndexes map[interface{}]int // key: added item, value: bucket position + + currentIndex int + + addC chan *Task + removeC chan interface{} + stopC chan struct{} +} + +// NewTimeWheel create new time wheel +func NewTimeWheel(tick time.Duration, bucketsNum int) (*TimeWheel, error) { + if bucketsNum <= 0 { + return nil, errors.New("bucket number must be greater than 0") + } + if int(tick.Seconds()) < 1 { + return nil, errors.New("tick cannot be less than 1s") + } + + tw := &TimeWheel{ + tick: tick, + bucketsNum: bucketsNum, + bucketIndexes: make(map[interface{}]int, 1024), + buckets: make([]map[interface{}]*Task, bucketsNum), + currentIndex: 0, + addC: make(chan *Task, 1024), + removeC: make(chan interface{}, 1024), + stopC: make(chan struct{}), + } + + for i := 0; i < bucketsNum; i++ { + tw.buckets[i] = make(map[interface{}]*Task, 16) + } + + return tw, nil +} + +// Start start the time wheel +func (tw *TimeWheel) Start() { + tw.ticker = time.NewTicker(tw.tick) + go tw.start() +} + +func (tw *TimeWheel) start() { + for { + select { + case <-tw.ticker.C: + tw.handleTick() + case task := <-tw.addC: + tw.add(task) + case key := <-tw.removeC: + tw.remove(key) + case <-tw.stopC: + tw.ticker.Stop() + return + } + } +} + +// Stop stop the time wheel +func (tw *TimeWheel) Stop() { + tw.stopC <- struct{}{} +} + +func (tw *TimeWheel) handleTick() { + bucket := tw.buckets[tw.currentIndex] + for k := range bucket { + if bucket[k].round > 0 { + bucket[k].round-- + continue + } + go bucket[k].callback() + delete(bucket, k) + delete(tw.bucketIndexes, k) + } + if tw.currentIndex == tw.bucketsNum-1 { + tw.currentIndex = 0 + return + } + tw.currentIndex++ +} + +// Add add an item into time wheel +func (tw *TimeWheel) Add(delay time.Duration, key interface{}, callback func()) error { + if delay <= 0 || key == nil { + return errors.New("invalid params") + } + tw.addC <- &Task{delay: delay, key: key, callback: callback} + return nil +} + +func (tw *TimeWheel) add(task *Task) { + round := tw.calculateRound(task.delay) + index := tw.calculateIndex(task.delay) + task.round = round + if originIndex, ok := tw.bucketIndexes[task.key]; ok { + delete(tw.buckets[originIndex], task.key) + } + tw.bucketIndexes[task.key] = index + tw.buckets[index][task.key] = task +} + +func (tw *TimeWheel) calculateRound(delay time.Duration) (round int) { + delaySeconds := int(delay.Seconds()) + tickSeconds := int(tw.tick.Seconds()) + round = delaySeconds / tickSeconds / tw.bucketsNum + return +} + +func (tw *TimeWheel) calculateIndex(delay time.Duration) (index int) { + delaySeconds := int(delay.Seconds()) + tickSeconds := int(tw.tick.Seconds()) + index = (tw.currentIndex + delaySeconds/tickSeconds) % tw.bucketsNum + return +} + +// Remove remove an item from time wheel +func (tw *TimeWheel) Remove(key interface{}) error { + if key == nil { + return errors.New("invalid params") + } + tw.removeC <- key + return nil +} + +// don't need to call callback +func (tw *TimeWheel) remove(key interface{}) { + if index, ok := tw.bucketIndexes[key]; ok { + delete(tw.bucketIndexes, key) + delete(tw.buckets[index], key) + } + return +} diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index e10ab49c2d..8dccc8af0b 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/internal" log "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -31,30 +32,36 @@ type redeliveryConsumer interface { type negativeAcksTracker struct { sync.Mutex - doneCh chan interface{} - doneOnce sync.Once - negativeAcks map[messageID]time.Time - rc redeliveryConsumer - tick *time.Ticker - delay time.Duration - log log.Logger + doneOnce sync.Once + rc redeliveryConsumer + log log.Logger + msgIds []messageID + tw *internal.TimeWheel } +const ( + defaultCheckBatchKey = "check_batch_key" + + batchSize = 1024 + checkBatchinterval = time.Second * 5 +) + func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker { + tw, _ := internal.NewTimeWheel(time.Second*1, 1024) t := &negativeAcksTracker{ - doneCh: make(chan interface{}), - negativeAcks: make(map[messageID]time.Time), - rc: rc, - tick: time.NewTicker(delay / 3), - delay: delay, - log: logger, + rc: rc, + log: logger, + msgIds: make([]messageID, 0), + tw: tw, } - go t.track() + t.tw.Start() + t.tw.Add(checkBatchinterval, defaultCheckBatchKey, t.checkBatch) + return t } -func (t *negativeAcksTracker) Add(msgID messageID) { +func (t *negativeAcksTracker) Add(msgID messageID, negativeAckDelay time.Duration) { // Always clear up the batch index since we want to track the nack // for the entire batch batchMsgID := messageID{ @@ -63,57 +70,41 @@ func (t *negativeAcksTracker) Add(msgID messageID) { batchIdx: 0, } - t.Lock() - defer t.Unlock() + t.tw.Add(negativeAckDelay, batchMsgID, func() { + t.Lock() + t.msgIds = append(t.msgIds, batchMsgID) + if len(t.msgIds) >= batchSize { + t.rc.Redeliver(t.msgIds) + t.msgIds = make([]messageID, 0) + } + t.Unlock() + }) +} - _, present := t.negativeAcks[batchMsgID] - if present { - // The batch is already being tracked - return +func (t *negativeAcksTracker) Remove(msgID messageID) { + batchMsgID := messageID{ + ledgerID: msgID.ledgerID, + entryID: msgID.entryID, + batchIdx: 0, } - targetTime := time.Now().Add(t.delay) - t.negativeAcks[batchMsgID] = targetTime + t.tw.Remove(batchMsgID) } -func (t *negativeAcksTracker) track() { - for { - select { - case <-t.doneCh: - t.log.Debug("Closing nack tracker") - return - - case <-t.tick.C: - { - now := time.Now() - msgIds := make([]messageID, 0) - - t.Lock() - - for msgID, targetTime := range t.negativeAcks { - t.log.Debugf("MsgId: %v -- targetTime: %v -- now: %v", msgID, targetTime, now) - if targetTime.Before(now) { - t.log.Debugf("Adding MsgId: %v", msgID) - msgIds = append(msgIds, msgID) - delete(t.negativeAcks, msgID) - } - } - - t.Unlock() - - if len(msgIds) > 0 { - t.rc.Redeliver(msgIds) - } - } - - } +func (t *negativeAcksTracker) checkBatch() { + t.Lock() + if len(t.msgIds) > 0 { + t.rc.Redeliver(t.msgIds) + t.msgIds = make([]messageID, 0) } + t.Unlock() + + t.tw.Add(checkBatchinterval, defaultCheckBatchKey, t.checkBatch) } func (t *negativeAcksTracker) Close() { // allow Close() to be invoked multiple times by consumer_partition to avoid panic t.doneOnce.Do(func() { - t.tick.Stop() - t.doneCh <- nil + t.tw.Stop() }) } diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index e587f3f19e..d5b2ce1787 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -81,13 +81,13 @@ func TestNacksTracker(t *testing.T) { ledgerID: 1, entryID: 1, batchIdx: 1, - }) + }, testNackDelay) nacks.Add(messageID{ ledgerID: 2, entryID: 2, batchIdx: 1, - }) + }, testNackDelay) msgIds := make([]messageID, 0) for id := range nmc.Wait() { @@ -114,25 +114,25 @@ func TestNacksWithBatchesTracker(t *testing.T) { ledgerID: 1, entryID: 1, batchIdx: 1, - }) + }, testNackDelay) nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 2, - }) + }, testNackDelay) nacks.Add(messageID{ ledgerID: 1, entryID: 1, batchIdx: 3, - }) + }, testNackDelay) nacks.Add(messageID{ ledgerID: 2, entryID: 2, batchIdx: 1, - }) + }, testNackDelay) msgIds := make([]messageID, 0) for id := range nmc.Wait() { From ec278ed738f8adfbd14c3a8b55fefd91ffe905ca Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Mon, 13 Sep 2021 13:52:56 +0800 Subject: [PATCH 2/8] del return in internal time_wheel remove() --- pulsar/internal/time_wheel.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/internal/time_wheel.go b/pulsar/internal/time_wheel.go index c0bb02c351..4785b9882d 100644 --- a/pulsar/internal/time_wheel.go +++ b/pulsar/internal/time_wheel.go @@ -150,5 +150,4 @@ func (tw *TimeWheel) remove(key interface{}) { delete(tw.bucketIndexes, key) delete(tw.buckets[index], key) } - return } From adb1f953979de4d0f39c6b5b72073a9f3e5ecebf Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Mon, 13 Sep 2021 13:57:23 +0800 Subject: [PATCH 3/8] Remove newNegativeAcksTracker parameters --- pulsar/consumer_partition.go | 2 +- pulsar/negative_acks_tracker.go | 2 +- pulsar/negative_acks_tracker_test.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 21f4bdfdc1..89af5310a8 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -176,7 +176,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon "subscription": options.subscription, "consumerID": pc.consumerID, }) - pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, pc.log) + pc.nackTracker = newNegativeAcksTracker(pc, pc.log) err := pc.grabConn() if err != nil { diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 8dccc8af0b..97344720a5 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -46,7 +46,7 @@ const ( checkBatchinterval = time.Second * 5 ) -func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger log.Logger) *negativeAcksTracker { +func newNegativeAcksTracker(rc redeliveryConsumer, logger log.Logger) *negativeAcksTracker { tw, _ := internal.NewTimeWheel(time.Second*1, 1024) t := &negativeAcksTracker{ rc: rc, diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go index d5b2ce1787..bec58a260b 100644 --- a/pulsar/negative_acks_tracker_test.go +++ b/pulsar/negative_acks_tracker_test.go @@ -75,7 +75,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID { func TestNacksTracker(t *testing.T) { nmc := newNackMockedConsumer() - nacks := newNegativeAcksTracker(nmc, testNackDelay, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, log.DefaultNopLogger()) nacks.Add(messageID{ ledgerID: 1, @@ -108,7 +108,7 @@ func TestNacksTracker(t *testing.T) { func TestNacksWithBatchesTracker(t *testing.T) { nmc := newNackMockedConsumer() - nacks := newNegativeAcksTracker(nmc, testNackDelay, log.DefaultNopLogger()) + nacks := newNegativeAcksTracker(nmc, log.DefaultNopLogger()) nacks.Add(messageID{ ledgerID: 1, From c38073c6fe2686e37a352e134648c4eccf831db1 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Mon, 13 Sep 2021 14:00:08 +0800 Subject: [PATCH 4/8] Remove print --- pulsar/consumer_partition.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 89af5310a8..c46eb06a2b 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -315,7 +315,6 @@ func (pc *partitionConsumer) NackIDDelay(msgID trackingMessageID, dalay time.Dur } func (pc *partitionConsumer) Redeliver(msgIds []messageID) { - fmt.Println("msgIds:", msgIds) pc.eventsCh <- &redeliveryRequest{msgIds} iMsgIds := make([]MessageID, len(msgIds)) From ed034c12115dc812f142ba9f0f0bcbf635428eb0 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Mon, 13 Sep 2021 14:52:08 +0800 Subject: [PATCH 5/8] add function in consumer_multitopic --- pulsar/consumer_multitopic.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index dc4ad7b9b1..102291cc65 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -102,6 +102,9 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err if !ok { return nil, newError(ConsumerClosed, "consumer closed") } + if c.options.AckTimeOut != 0 { + c.NackDelay(cm.Message, c.options.AckTimeOut) + } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -159,6 +162,10 @@ func (c *multiTopicConsumer) Nack(msg Message) { c.NackID(msg.ID()) } +func (c *multiTopicConsumer) NackDelay(msg Message, delay time.Duration) { + c.NackIDDelay(msg.ID(), delay) +} + func (c *multiTopicConsumer) NackID(msgID MessageID) { mid, ok := toTrackingMessageID(msgID) if !ok { @@ -174,6 +181,20 @@ func (c *multiTopicConsumer) NackID(msgID MessageID) { mid.Nack() } +func (c *multiTopicConsumer) NackIDDelay(msgID MessageID, delay time.Duration) { + mid, ok := toTrackingMessageID(msgID) + if !ok { + return + } + + if mid.consumer == nil { + c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) + return + } + + mid.NackDelay(delay) +} + func (c *multiTopicConsumer) Close() { c.closeOnce.Do(func() { var wg sync.WaitGroup From 92426b626065fde97cb9f493eb48e85e7038b394 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Tue, 14 Sep 2021 12:53:58 +0800 Subject: [PATCH 6/8] Optimize negative_acks_tracker --- pulsar/consumer.go | 4 ---- pulsar/consumer_impl.go | 27 --------------------------- pulsar/consumer_multitopic.go | 21 --------------------- pulsar/consumer_partition.go | 9 --------- pulsar/impl_message.go | 7 ------- pulsar/negative_acks_tracker.go | 12 ++++++------ 6 files changed, 6 insertions(+), 74 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index fee6ef2a35..1c52b29935 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -132,10 +132,6 @@ type ConsumerOptions struct { // processed. Default is 1min. (See `Consumer.Nack()`) NackRedeliveryDelay time.Duration - // The AckTimeOut after which to redeliver the messages that timeout to be - // processed. Default is close - AckTimeOut time.Duration - // Set the consumer name. Name string diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index a151f86702..ceea1a6408 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -38,7 +38,6 @@ const ( type acker interface { AckID(id trackingMessageID) NackID(id trackingMessageID) - NackIDDelay(id trackingMessageID, delay time.Duration) } type consumer struct { @@ -142,10 +141,6 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } - if options.AckTimeOut != 0 && options.AckTimeOut < defaultAckTimeOut { - return nil, newError(AckTimeoutLess, "Ack timeout should be greater than 1000 ms") - } - // normalize as FQDN topics var tns []*internal.TopicName // single topic consumer @@ -333,7 +328,6 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { partitionIdx: idx, receiverQueueSize: receiverQueueSize, nackRedeliveryDelay: nackRedeliveryDelay, - ackTimeout: c.options.AckTimeOut, metadata: metadata, replicateSubscriptionState: c.options.ReplicateSubscriptionState, startMessageID: trackingMessageID{}, @@ -423,9 +417,6 @@ func (c *consumer) Receive(ctx context.Context) (message Message, err error) { if !ok { return nil, newError(ConsumerClosed, "consumer closed") } - if c.options.AckTimeOut != 0 { - c.NackDelay(cm.Message, c.options.AckTimeOut) - } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -511,10 +502,6 @@ func (c *consumer) Nack(msg Message) { c.NackID(msg.ID()) } -func (c *consumer) NackDelay(msg Message, delay time.Duration) { - c.NackIDDelay(msg.ID(), delay) -} - func (c *consumer) NackID(msgID MessageID) { mid, ok := c.messageID(msgID) if !ok { @@ -529,20 +516,6 @@ func (c *consumer) NackID(msgID MessageID) { c.consumers[mid.partitionIdx].NackID(mid) } -func (c *consumer) NackIDDelay(msgID MessageID, delay time.Duration) { - mid, ok := c.messageID(msgID) - if !ok { - return - } - - if mid.consumer != nil { - mid.NackDelay(delay) - return - } - - c.consumers[mid.partitionIdx].NackIDDelay(mid, delay) -} - func (c *consumer) Close() { c.closeOnce.Do(func() { c.stopDiscovery() diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index 102291cc65..dc4ad7b9b1 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -102,9 +102,6 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err if !ok { return nil, newError(ConsumerClosed, "consumer closed") } - if c.options.AckTimeOut != 0 { - c.NackDelay(cm.Message, c.options.AckTimeOut) - } return cm.Message, nil case <-ctx.Done(): return nil, ctx.Err() @@ -162,10 +159,6 @@ func (c *multiTopicConsumer) Nack(msg Message) { c.NackID(msg.ID()) } -func (c *multiTopicConsumer) NackDelay(msg Message, delay time.Duration) { - c.NackIDDelay(msg.ID(), delay) -} - func (c *multiTopicConsumer) NackID(msgID MessageID) { mid, ok := toTrackingMessageID(msgID) if !ok { @@ -181,20 +174,6 @@ func (c *multiTopicConsumer) NackID(msgID MessageID) { mid.Nack() } -func (c *multiTopicConsumer) NackIDDelay(msgID MessageID, delay time.Duration) { - mid, ok := toTrackingMessageID(msgID) - if !ok { - return - } - - if mid.consumer == nil { - c.log.Warnf("unable to nack messageID=%+v can not determine topic", msgID) - return - } - - mid.NackDelay(delay) -} - func (c *multiTopicConsumer) Close() { c.closeOnce.Do(func() { var wg sync.WaitGroup diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index c46eb06a2b..8c24b7e452 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -86,7 +86,6 @@ type partitionConsumerOpts struct { partitionIdx int receiverQueueSize int nackRedeliveryDelay time.Duration - ackTimeout time.Duration metadata map[string]string replicateSubscriptionState bool startMessageID trackingMessageID @@ -295,9 +294,6 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) { req := &ackRequest{ msgID: msgID, } - if pc.options.ackTimeout != 0 { - pc.nackTracker.Remove(msgID.messageID) - } pc.eventsCh <- req pc.options.interceptors.OnAcknowledge(pc.parentConsumer, msgID) @@ -309,11 +305,6 @@ func (pc *partitionConsumer) NackID(msgID trackingMessageID) { pc.metrics.NacksCounter.Inc() } -func (pc *partitionConsumer) NackIDDelay(msgID trackingMessageID, dalay time.Duration) { - pc.nackTracker.Add(msgID.messageID, dalay) - pc.metrics.NacksCounter.Inc() -} - func (pc *partitionConsumer) Redeliver(msgIds []messageID) { pc.eventsCh <- &redeliveryRequest{msgIds} diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index 1d81d98df4..c4f215c5e3 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -79,13 +79,6 @@ func (id trackingMessageID) Nack() { id.consumer.NackID(id) } -func (id trackingMessageID) NackDelay(delay time.Duration) { - if id.consumer == nil { - return - } - id.consumer.NackIDDelay(id, delay) -} - func (id trackingMessageID) ack() bool { if id.tracker != nil && id.batchIdx > -1 { return id.tracker.ack(int(id.batchIdx)) diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go index 97344720a5..1d50828929 100644 --- a/pulsar/negative_acks_tracker.go +++ b/pulsar/negative_acks_tracker.go @@ -40,10 +40,10 @@ type negativeAcksTracker struct { } const ( - defaultCheckBatchKey = "check_batch_key" + defaultCheckNegativeAcksBatchKey = "negative_acks_check_batch_key" - batchSize = 1024 - checkBatchinterval = time.Second * 5 + negativeAcksbatchSize = 1024 + checkNegativeAcksBatchinterval = time.Second * 5 ) func newNegativeAcksTracker(rc redeliveryConsumer, logger log.Logger) *negativeAcksTracker { @@ -56,7 +56,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, logger log.Logger) *negativeA } t.tw.Start() - t.tw.Add(checkBatchinterval, defaultCheckBatchKey, t.checkBatch) + t.tw.Add(checkNegativeAcksBatchinterval, defaultCheckNegativeAcksBatchKey, t.checkBatch) return t } @@ -73,7 +73,7 @@ func (t *negativeAcksTracker) Add(msgID messageID, negativeAckDelay time.Duratio t.tw.Add(negativeAckDelay, batchMsgID, func() { t.Lock() t.msgIds = append(t.msgIds, batchMsgID) - if len(t.msgIds) >= batchSize { + if len(t.msgIds) >= negativeAcksbatchSize { t.rc.Redeliver(t.msgIds) t.msgIds = make([]messageID, 0) } @@ -99,7 +99,7 @@ func (t *negativeAcksTracker) checkBatch() { } t.Unlock() - t.tw.Add(checkBatchinterval, defaultCheckBatchKey, t.checkBatch) + t.tw.Add(checkNegativeAcksBatchinterval, defaultCheckNegativeAcksBatchKey, t.checkBatch) } func (t *negativeAcksTracker) Close() { From d124cb86519f04dfaba35a2eadbca27116240608 Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Tue, 14 Sep 2021 13:09:15 +0800 Subject: [PATCH 7/8] del defaultAckTimeOut --- pulsar/consumer_impl.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index ceea1a6408..0f9493788f 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -32,7 +32,6 @@ import ( const ( defaultNackRedeliveryDelay = 1 * time.Minute - defaultAckTimeOut = 1 * time.Second ) type acker interface { From 326ac6cde0458bde28e3231123a76802363a6d5b Mon Sep 17 00:00:00 2001 From: EAHITechnology <2806701873@qq.com> Date: Tue, 14 Sep 2021 13:10:07 +0800 Subject: [PATCH 8/8] del AckTimeoutLess --- pulsar/error.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar/error.go b/pulsar/error.go index b396bd41d6..f433bfc973 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -101,8 +101,6 @@ const ( SeekFailed // ProducerClosed means producer already been closed ProducerClosed - // AckTimeoutLess means Ack timeout should be greater than 1000 ms - AckTimeoutLess ) // Error implement error interface, composed of two parts: msg and result. @@ -207,8 +205,6 @@ func getResultStr(r Result) string { return "SeekFailed" case ProducerClosed: return "ProducerClosed" - case AckTimeoutLess: - return "AckTimeoutLess" default: return fmt.Sprintf("Result(%d)", r) }