Skip to content

[PIP] Add Pause() / Resume() to Consumer #1504

@PavelZeger

Description

@PavelZeger

Gap

Java exposes:

void pause();
void resume();

on every Consumer. While paused, the consumer stops issuing flow permits to the broker, so no new messages are delivered. Already-delivered (in-flight) messages are unaffected.
The Go client has no equivalent. The closest workaround is closing and re-subscribing, which is heavyweight (re-resolves the topic, redelivers any unacked, may rebalance other consumers in KeyShared/Failover).

Why it's important

Pause/resume is the standard pattern for backpressure when downstream is temporarily unavailable: a database is failing over, a feature flag is in the "halt ingestion" state, etc. It is also useful for orderly shutdown - "stop accepting new work, finish what's in flight, then close."

Proposed Go API

type Consumer interface {
  // ... existing methods ...

  // Pause stops the consumer from requesting more messages from the broker.
  // In-flight messages already delivered to this consumer remain available
  // via Receive() / Chan(); ack/nack still work. Idempotent.
  Pause()

  // Resume reverses Pause. Idempotent.
  Resume()
}

A small accessor is also useful for tests and for users who want to expose the state via metrics:

// IsPaused reports whether Pause() has been called more recently than Resume().
IsPaused() bool

(Java doesn't expose an isPaused getter, but it's cheap and harmless and helps reduce flaky tests.)

Implementation

The Go consumer maintains an availablePermits counter on each partitionConsumer that gets refilled when the application drains the queue. Pausing means: stop sending CommandFlow until resumed.

// partitionConsumer fields:
paused atomic.Bool

// In the place that decides whether to send a flow update:
if pc.paused.Load() {
  return
}

// Public API (consumer_impl.go):
func (c *consumer) Pause() {
  for _, pc := range c.consumers {
      pc.paused.Store(true)
  }
}

func (c *consumer) Resume() {
  for _, pc := range c.consumers {
      if pc.paused.CompareAndSwap(true, false) {
          pc.requestMorePermits() // a small helper that re-checks thresholds
      }
  }
}

Pause must be applied per partition consumer. The multi-topic and regex consumers must propagate to all underlying partition consumers.

Edge cases

  • Pausing right before a reconnect: when the partition consumer reconnects, it must re-check paused and not blindly send a fresh flow command.
  • KeyShared semantics: pausing a single consumer in a key-shared subscription will cause its keys to be redistributed to other consumers only if it disconnects. Pause does not change membership, so it just builds a backlog on that key range. This should be documented as well.
  • Interaction with BatchReceive (suggestion 01): a paused consumer should return an empty batch on timeout rather than blocking forever.

References

  • Java: Consumer.pause() / Consumer.resume() in pulsar-client-api/.../api/Consumer.java.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions