Skip to content

fix: deadlock when increasing partitioned consumers#1500

Merged
BewareMyPower merged 9 commits into
apache:masterfrom
BewareMyPower:bewaremypower/fix-increase-deadlock
May 21, 2026
Merged

fix: deadlock when increasing partitioned consumers#1500
BewareMyPower merged 9 commits into
apache:masterfrom
BewareMyPower:bewaremypower/fix-increase-deadlock

Conversation

@BewareMyPower
Copy link
Copy Markdown
Contributor

Motivation

#1494 introduces a possible deadlock after ensuring the thread safety.

To fix the thread safety issue, every APIs that need to find a specific sub-consumer, like Ack and Seek, will now require locking the parent consumer. However, in internalTopicSubscribeToPartitions, it will wait all sub-consumers' creations are done:

cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,

for ce := range ch {

range ch will wait the err returned by newPartitionConsumer is sent to the ch.

However, it could be blocked by grabConn:

err := pc.grabConn("")

this could start a Subscribe RPC in the connection that might receive user's Ack or Seek requests.

Modifications

  • Add TestInternalTopicSubscribeToPartitionsDoesNotBlockExistingPartitionLookup to reproduce the deadlock issue.
  • Adopt the lock-free implementation to manage consumers, the test above will pass after test
  • Delay the dispatching logic after adding new sub-consumers to c.consumers, otherwise, a message could be received from a new sub-consumer and acknowledged before that consumer is added to c.consumers.
  • Add TestInternalTopicSubscribeToPartitionsPublishesConsumersBeforeDispatchingMessages to verify it works

@BewareMyPower BewareMyPower self-assigned this May 19, 2026
@BewareMyPower BewareMyPower added this to the 0.20.0 milestone May 19, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses a deadlock risk introduced by prior thread-safety changes when partitioned consumers auto-discover and subscribe to new partitions, by switching partition-consumer management to a lock-free publication approach and delaying dispatcher startup until after the new consumer list is published.

Changes:

  • Replace the partition-consumer container on consumer with an atomic.Value-backed, copy-on-write list and update call sites to read via partitionConsumers().
  • Delay starting newly-created partition dispatchers until after the updated partition-consumer list is published.
  • Add targeted regression tests to reproduce the deadlock and to ensure consumers are published before dispatching begins.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pulsar/reader_test.go Updates tests to use partitionConsumers() instead of direct consumers indexing.
pulsar/reader_impl.go Uses partitionConsumers() snapshot when checking/using last message ID.
pulsar/message_chunking_test.go Updates tests to use partitionConsumers() accessors.
pulsar/consumer_zero_queue.go Updates newPartitionConsumer call to pass the new dispatcher-start flag.
pulsar/consumer_test.go Adds regression tests + helpers for partition expansion deadlock and publication ordering.
pulsar/consumer_partition.go Adds startDispatcher parameter and defers dispatcher startup accordingly; buffers connectedCh.
pulsar/consumer_impl.go Reworks partition subscription to publish consumers atomically, updates partition lookups to use snapshots, and introduces partitionConsumers() helper.
Comments suppressed due to low confidence (1)

pulsar/consumer_impl.go:770

  • Seek/SeekByTime no longer coordinate with background partition discovery/expansion. If internalTopicSubscribeToPartitions publishes/starts dispatchers concurrently, newly-added partitions won’t be paused and can dispatch into messageCh while these methods are draining it, leading to inconsistent seek results and possible busy-drain loops. Consider introducing a lightweight barrier (eg, an RWMutex or atomic ‘update in progress’ gate) so seek operations can prevent partition publication/dispatcher start during their critical sections, without holding a lock across RPCs.
func (c *consumer) Seek(msgID MessageID) error {
	consumers := c.partitionConsumers()

	if len(consumers) > 1 {
		return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
	}

	consumer, err := findPartitionConsumer(consumers, msgID)
	if err != nil {
		return err
	}
	consumer.pauseDispatchMessage()
	// clear messageCh
	for len(c.messageCh) > 0 {
		<-c.messageCh
	}

	return consumer.Seek(msgID)
}

func (c *consumer) SeekByTime(time time.Time) error {
	var errs error
	consumers := c.partitionConsumers()

	for _, cons := range consumers {
		cons.pauseDispatchMessage()
	}
	// clear messageCh
	for len(c.messageCh) > 0 {
		<-c.messageCh
	}

	// run SeekByTime on every partition of topic
	for _, cons := range consumers {
		if err := cons.SeekByTime(time); err != nil {
			msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
			errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
		}
	}


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pulsar/consumer_impl.go Outdated
Comment thread pulsar/consumer_impl.go
Comment thread pulsar/consumer_test.go Outdated
Copy link
Copy Markdown
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at the copilot's comments. Other part looks good to me.

@BewareMyPower
Copy link
Copy Markdown
Contributor Author

@RobertIndie addressed, PTAL again

@nodece
Copy link
Copy Markdown
Member

nodece commented May 21, 2026

Great fix on the deadlock path from #1494 — the lock-free partitionConsumers() snapshot + delayed startDispatcher() is a solid direction for the original blocking issue.

I think there is still a new concurrency window introduced by removing the old mutex guarding consumers.

In the new code:

  • internalTopicSubscribeToPartitions() builds newConsumers, then publishes via c.consumers.Store(...), then starts dispatchers for new partitions.
  • closeWithCause() closes only one snapshot (consumers := c.partitionConsumers()) and no longer serializes with partition expansion.

So this interleaving seems possible:

  1. Goroutine A enters internalTopicSubscribeToPartitions() and starts creating new partition consumers.
  2. Goroutine B calls closeWithCause(), takes an old snapshot, closes those consumers, and continues close flow.
  3. Goroutine A continues and still executes c.consumers.Store(...) + startDispatcher() for newly created consumers.

Result: newly added partition consumers may be published/started after close has already closed only the old snapshot.

Before this PR, the parent lock serialized expansion and close-related operations over c.consumers; removing that lock fixes deadlock, but also removes that lifecycle serialization.

Could we gate expansion with context cancellation or a closing flag (checked before create and before Store/startDispatcher), so in-flight expansion stops/cleans up once close starts?

@BewareMyPower
Copy link
Copy Markdown
Contributor Author

@nodece closeWithCause calls c.stopDiscovery(), which cancels the timer and waits for the previous internalTopicSubscribeToPartitions is done by wg.Wait().

return func() {
ticker.Stop()
close(stopDiscoveryCh)
wg.Wait()

@BewareMyPower BewareMyPower merged commit eade693 into apache:master May 21, 2026
11 of 12 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/fix-increase-deadlock branch May 21, 2026 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants