Skip to content

[PIP] Add BatchReceive to Consumer #1496

@PavelZeger

Description

@PavelZeger

Is your feature request related to a problem? Please describe.

The Java client exposes a batched receive mode on every Consumer:

Messages<T> msgs = consumer.batchReceive();
CompletableFuture<Messages<T>> f = consumer.batchReceiveAsync();

Behaviour is governed by BatchReceivePolicy, configured on the builder via consumerBuilder.batchReceivePolicy(...). The policy lets the user say "give me up to N messages, or up to S bytes, or wait at most T, whichever happens first."

Here the Consumer interface only offers single-message receive:

Receive(context.Context) (Message, error)

There is no equivalent of batchReceive. Users who want batched processing have to loop over Receive themselves, but that approach:

  • Cannot honour a "max bytes" boundary without re-implementing accounting.
  • Cannot signal "I want partial batches if the timeout fires" cleanly.
  • Forces an extra goroutine if you want to bound wait time independently of the per-message context deadline.

Batched receive is a major feature for high-throughput sinks: connectors, ETL processors, anything that wants to amortize a downstream write across many messages. It is one of the most-requested features when teams compare the Go client against the Java client.

Describe the solution you'd like

No breaking change. Add a BatchReceivePolicy and two methods on the Consumer interface. To stay compatible with existing implementations, expose the new methods with default no-op implementations on the multi-topic consumer first and roll out per-impl support.

Describe alternatives you've considered

Edge cases for testing

  • The first message arrives, then the channel becomes empty for longer than Timeout — return the partial batch.
  • A MaxNumBytes of 1 with a payload of 100 — should return that one message, not get stuck.
  • BatchReceive on a multi-topic consumer must round-robin or merge fairly, not starve some topics.
  • Cancelling the parent ctx mid-batch should return what we already have, plus the context error.

Additional context

  • Java: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java (batchReceive, batchReceiveAsync)
  • Java: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

No labels
No labels
No fields configured for Feature.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions