Gap
Java exposes:
void pause();
void resume();
on every Consumer. While paused, the consumer stops issuing flow permits to the broker, so no new messages are delivered. Already-delivered (in-flight) messages are unaffected.
The Go client has no equivalent. The closest workaround is closing and re-subscribing, which is heavyweight (re-resolves the topic, redelivers any unacked, may rebalance other consumers in KeyShared/Failover).
Why it's important
Pause/resume is the standard pattern for backpressure when downstream is temporarily unavailable: a database is failing over, a feature flag is in the "halt ingestion" state, etc. It is also useful for orderly shutdown - "stop accepting new work, finish what's in flight, then close."
Proposed Go API
type Consumer interface {
// ... existing methods ...
// Pause stops the consumer from requesting more messages from the broker.
// In-flight messages already delivered to this consumer remain available
// via Receive() / Chan(); ack/nack still work. Idempotent.
Pause()
// Resume reverses Pause. Idempotent.
Resume()
}
A small accessor is also useful for tests and for users who want to expose the state via metrics:
// IsPaused reports whether Pause() has been called more recently than Resume().
IsPaused() bool
(Java doesn't expose an isPaused getter, but it's cheap and harmless and helps reduce flaky tests.)
Implementation
The Go consumer maintains an availablePermits counter on each partitionConsumer that gets refilled when the application drains the queue. Pausing means: stop sending CommandFlow until resumed.
// partitionConsumer fields:
paused atomic.Bool
// In the place that decides whether to send a flow update:
if pc.paused.Load() {
return
}
// Public API (consumer_impl.go):
func (c *consumer) Pause() {
for _, pc := range c.consumers {
pc.paused.Store(true)
}
}
func (c *consumer) Resume() {
for _, pc := range c.consumers {
if pc.paused.CompareAndSwap(true, false) {
pc.requestMorePermits() // a small helper that re-checks thresholds
}
}
}
Pause must be applied per partition consumer. The multi-topic and regex consumers must propagate to all underlying partition consumers.
Edge cases
- Pausing right before a reconnect: when the partition consumer reconnects, it must re-check
paused and not blindly send a fresh flow command.
KeyShared semantics: pausing a single consumer in a key-shared subscription will cause its keys to be redistributed to other consumers only if it disconnects. Pause does not change membership, so it just builds a backlog on that key range. This should be documented as well.
- Interaction with
BatchReceive (suggestion 01): a paused consumer should return an empty batch on timeout rather than blocking forever.
References
- Java:
Consumer.pause() / Consumer.resume() in pulsar-client-api/.../api/Consumer.java.
Are you willing to submit a PR?
Gap
Java exposes:
on every
Consumer. While paused, the consumer stops issuing flow permits to the broker, so no new messages are delivered. Already-delivered (in-flight) messages are unaffected.The Go client has no equivalent. The closest workaround is closing and re-subscribing, which is heavyweight (re-resolves the topic, redelivers any unacked, may rebalance other consumers in
KeyShared/Failover).Why it's important
Pause/resume is the standard pattern for backpressure when downstream is temporarily unavailable: a database is failing over, a feature flag is in the "halt ingestion" state, etc. It is also useful for orderly shutdown - "stop accepting new work, finish what's in flight, then close."
Proposed Go API
A small accessor is also useful for tests and for users who want to expose the state via metrics:
(Java doesn't expose an
isPausedgetter, but it's cheap and harmless and helps reduce flaky tests.)Implementation
The Go consumer maintains an
availablePermitscounter on eachpartitionConsumerthat gets refilled when the application drains the queue. Pausing means: stop sendingCommandFlowuntil resumed.Pause must be applied per partition consumer. The multi-topic and regex consumers must propagate to all underlying partition consumers.
Edge cases
pausedand not blindly send a fresh flow command.KeySharedsemantics: pausing a single consumer in a key-shared subscription will cause its keys to be redistributed to other consumers only if it disconnects. Pause does not change membership, so it just builds a backlog on that key range. This should be documented as well.BatchReceive(suggestion 01): a paused consumer should return an empty batch on timeout rather than blocking forever.References
Consumer.pause()/Consumer.resume()inpulsar-client-api/.../api/Consumer.java.Are you willing to submit a PR?