Before Creating the Bug Report
Runtime platform environment
All platforms
RocketMQ version
develop branch
JDK Version
JDK 8+
Describe the Bug
In PopBufferMergeService.scan(), when enablePopBatchAck is true, the method collects ACK indices into a class-level shared variable batchAckIndexList, then passes this same list reference to putBatchAckToStore(). Immediately afterwards, it calls indexList.clear() in the finally block.
When appendAckAsync is also true, putBatchAckToStore() performs an asynchronous write. The async callback handleBatchAckPutMessageResult captures the same list reference. By the time the callback executes, the list has already been cleared (or contains stale data from the next loop iteration).
As a result, pointWrapper.getToStoreBits() is never updated correctly. This causes isCkDoneForFinish() to always return false, preventing PopCheckPointWrapper objects from ever being removed from the commitOffsets queue (an unbounded LinkedBlockingDeque). Over time, this leads to unbounded memory growth and eventually OOM.
Steps to Reproduce
Set enablePopBatchAck=true and appendAckAsync=true
Run a Pop consumer with moderate traffic
Observe that commitOffsets queue size grows indefinitely
What Did You Expect to See?
toStoreBits should be correctly updated after async IO completes. commitOffsets queue should be drained normally.
What Did You See Instead?
toStoreBits remains 0. commitOffsets queue grows without bound.
Additional Context
Fix: Pass a defensive copy new ArrayList<>(indexList) instead of the shared reference.
Before Creating the Bug Report
I found a bug, not just asking a question, which should be created in GitHub Discussions.
I have searched the GitHub Issues and GitHub Discussions of this repository and believe that this is not a duplicate.
I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ.
Runtime platform environment
All platforms
RocketMQ version
develop branch
JDK Version
JDK 8+
Describe the Bug
In
PopBufferMergeService.scan(), whenenablePopBatchAckistrue, the method collects ACK indices into a class-level shared variablebatchAckIndexList, then passes this same list reference toputBatchAckToStore(). Immediately afterwards, it callsindexList.clear()in thefinallyblock.When
appendAckAsyncis alsotrue,putBatchAckToStore()performs an asynchronous write. The async callbackhandleBatchAckPutMessageResultcaptures the same list reference. By the time the callback executes, the list has already been cleared (or contains stale data from the next loop iteration).As a result,
pointWrapper.getToStoreBits()is never updated correctly. This causesisCkDoneForFinish()to always returnfalse, preventingPopCheckPointWrapperobjects from ever being removed from thecommitOffsetsqueue (an unboundedLinkedBlockingDeque). Over time, this leads to unbounded memory growth and eventually OOM.Steps to Reproduce
Set
enablePopBatchAck=trueandappendAckAsync=trueRun a Pop consumer with moderate traffic
Observe that
commitOffsetsqueue size grows indefinitelyWhat Did You Expect to See?
toStoreBitsshould be correctly updated after async IO completes.commitOffsetsqueue should be drained normally.What Did You See Instead?
toStoreBitsremains 0.commitOffsetsqueue grows without bound.Additional Context
Fix: Pass a defensive copy
new ArrayList<>(indexList)instead of the shared reference.