fix: deadlock when increasing partitioned consumers#1500
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses a deadlock risk introduced by prior thread-safety changes when partitioned consumers auto-discover and subscribe to new partitions, by switching partition-consumer management to a lock-free publication approach and delaying dispatcher startup until after the new consumer list is published.
Changes:
- Replace the partition-consumer container on
consumerwith anatomic.Value-backed, copy-on-write list and update call sites to read viapartitionConsumers(). - Delay starting newly-created partition dispatchers until after the updated partition-consumer list is published.
- Add targeted regression tests to reproduce the deadlock and to ensure consumers are published before dispatching begins.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| pulsar/reader_test.go | Updates tests to use partitionConsumers() instead of direct consumers indexing. |
| pulsar/reader_impl.go | Uses partitionConsumers() snapshot when checking/using last message ID. |
| pulsar/message_chunking_test.go | Updates tests to use partitionConsumers() accessors. |
| pulsar/consumer_zero_queue.go | Updates newPartitionConsumer call to pass the new dispatcher-start flag. |
| pulsar/consumer_test.go | Adds regression tests + helpers for partition expansion deadlock and publication ordering. |
| pulsar/consumer_partition.go | Adds startDispatcher parameter and defers dispatcher startup accordingly; buffers connectedCh. |
| pulsar/consumer_impl.go | Reworks partition subscription to publish consumers atomically, updates partition lookups to use snapshots, and introduces partitionConsumers() helper. |
Comments suppressed due to low confidence (1)
pulsar/consumer_impl.go:770
- Seek/SeekByTime no longer coordinate with background partition discovery/expansion. If internalTopicSubscribeToPartitions publishes/starts dispatchers concurrently, newly-added partitions won’t be paused and can dispatch into messageCh while these methods are draining it, leading to inconsistent seek results and possible busy-drain loops. Consider introducing a lightweight barrier (eg, an RWMutex or atomic ‘update in progress’ gate) so seek operations can prevent partition publication/dispatcher start during their critical sections, without holding a lock across RPCs.
func (c *consumer) Seek(msgID MessageID) error {
consumers := c.partitionConsumers()
if len(consumers) > 1 {
return newError(SeekFailed, "for partition topic, seek command should perform on the individual partitions")
}
consumer, err := findPartitionConsumer(consumers, msgID)
if err != nil {
return err
}
consumer.pauseDispatchMessage()
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
return consumer.Seek(msgID)
}
func (c *consumer) SeekByTime(time time.Time) error {
var errs error
consumers := c.partitionConsumers()
for _, cons := range consumers {
cons.pauseDispatchMessage()
}
// clear messageCh
for len(c.messageCh) > 0 {
<-c.messageCh
}
// run SeekByTime on every partition of topic
for _, cons := range consumers {
if err := cons.SeekByTime(time); err != nil {
msg := fmt.Sprintf("unable to SeekByTime for topic=%s subscription=%s", c.topic, c.Subscription())
errs = pkgerrors.Wrap(newError(SeekFailed, err.Error()), msg)
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
RobertIndie
left a comment
There was a problem hiding this comment.
Please take a look at the copilot's comments. Other part looks good to me.
|
@RobertIndie addressed, PTAL again |
|
Great fix on the deadlock path from #1494 — the lock-free I think there is still a new concurrency window introduced by removing the old mutex guarding In the new code:
So this interleaving seems possible:
Result: newly added partition consumers may be published/started after close has already closed only the old snapshot. Before this PR, the parent lock serialized expansion and close-related operations over Could we gate expansion with context cancellation or a closing flag (checked before create and before Store/startDispatcher), so in-flight expansion stops/cleans up once close starts? |
|
@nodece pulsar-client-go/pulsar/consumer_impl.go Lines 346 to 349 in a1765ef |
Motivation
#1494 introduces a possible deadlock after ensuring the thread safety.
To fix the thread safety issue, every APIs that need to find a specific sub-consumer, like
AckandSeek, will now require locking the parent consumer. However, ininternalTopicSubscribeToPartitions, it will wait all sub-consumers' creations are done:pulsar-client-go/pulsar/consumer_impl.go
Lines 414 to 416 in 87ce8f9
pulsar-client-go/pulsar/consumer_impl.go
Line 428 in 87ce8f9
range chwill wait theerrreturned bynewPartitionConsumeris sent to thech.However, it could be blocked by
grabConn:pulsar-client-go/pulsar/consumer_partition.go
Line 434 in 87ce8f9
this could start a Subscribe RPC in the connection that might receive user's Ack or Seek requests.
Modifications
TestInternalTopicSubscribeToPartitionsDoesNotBlockExistingPartitionLookupto reproduce the deadlock issue.c.consumers, otherwise, a message could be received from a new sub-consumer and acknowledged before that consumer is added toc.consumers.TestInternalTopicSubscribeToPartitionsPublishesConsumersBeforeDispatchingMessagesto verify it works