Is your feature request related to a problem? Please describe.
The Java client exposes a batched receive mode on every Consumer:
Messages<T> msgs = consumer.batchReceive();
CompletableFuture<Messages<T>> f = consumer.batchReceiveAsync();
Behaviour is governed by BatchReceivePolicy, configured on the builder via consumerBuilder.batchReceivePolicy(...). The policy lets the user say "give me up to N messages, or up to S bytes, or wait at most T, whichever happens first."
Here the Consumer interface only offers single-message receive:
Receive(context.Context) (Message, error)
There is no equivalent of batchReceive. Users who want batched processing have to loop over Receive themselves, but that approach:
- Cannot honour a "max bytes" boundary without re-implementing accounting.
- Cannot signal "I want partial batches if the timeout fires" cleanly.
- Forces an extra goroutine if you want to bound wait time independently of the per-message context deadline.
Batched receive is a major feature for high-throughput sinks: connectors, ETL processors, anything that wants to amortize a downstream write across many messages. It is one of the most-requested features when teams compare the Go client against the Java client.
Describe the solution you'd like
No breaking change. Add a BatchReceivePolicy and two methods on the Consumer interface. To stay compatible with existing implementations, expose the new methods with default no-op implementations on the multi-topic consumer first and roll out per-impl support.
Describe alternatives you've considered
Edge cases for testing
- The first message arrives, then the channel becomes empty for longer than
Timeout — return the partial batch.
- A
MaxNumBytes of 1 with a payload of 100 — should return that one message, not get stuck.
BatchReceive on a multi-topic consumer must round-robin or merge fairly, not starve some topics.
- Cancelling the parent
ctx mid-batch should return what we already have, plus the context error.
Additional context
- Java:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java (batchReceive, batchReceiveAsync)
- Java:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.java
Are you willing to submit a PR?
Is your feature request related to a problem? Please describe.
The Java client exposes a batched receive mode on every
Consumer:Behaviour is governed by
BatchReceivePolicy, configured on the builder viaconsumerBuilder.batchReceivePolicy(...). The policy lets the user say "give me up to N messages, or up to S bytes, or wait at most T, whichever happens first."Here the
Consumerinterface only offers single-message receive:There is no equivalent of
batchReceive. Users who want batched processing have to loop overReceivethemselves, but that approach:Batched receive is a major feature for high-throughput sinks: connectors, ETL processors, anything that wants to amortize a downstream write across many messages. It is one of the most-requested features when teams compare the Go client against the Java client.
Describe the solution you'd like
No breaking change. Add a
BatchReceivePolicyand two methods on theConsumerinterface. To stay compatible with existing implementations, expose the new methods with default no-op implementations on the multi-topic consumer first and roll out per-impl support.Describe alternatives you've considered
Edge cases for testing
Timeout— return the partial batch.MaxNumBytesof 1 with a payload of 100 — should return that one message, not get stuck.BatchReceiveon a multi-topic consumer must round-robin or merge fairly, not starve some topics.ctxmid-batch should return what we already have, plus the context error.Additional context
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java(batchReceive,batchReceiveAsync)pulsar-client-api/src/main/java/org/apache/pulsar/client/api/BatchReceivePolicy.javaAre you willing to submit a PR?