From 1845df93a96ceeb8138d04cd8fd10824cca6bcbf Mon Sep 17 00:00:00 2001 From: wangtao_ Date: Thu, 7 May 2026 15:18:40 +0800 Subject: [PATCH] Fix consumerOffset deserialization error and add test --- .../rocketmq/broker/offset/ConsumerOffsetManager.java | 2 +- .../broker/processor/AdminBrokerProcessorTest.java | 6 ++++++ .../protocol/body/ConsumerOffsetSerializeWrapper.java | 8 ++++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index e062ceca96a..f9debf38579 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -51,7 +51,7 @@ public class ConsumerOffsetManager extends ConfigManager { protected final ConcurrentMap> resetOffsetTable = new ConcurrentHashMap<>(512); - private final ConcurrentMap> pullOffsetTable = + private final transient ConcurrentMap> pullOffsetTable = new ConcurrentHashMap<>(512); protected transient BrokerController brokerController; diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 656c783e1f4..c342400d141 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -74,6 +74,7 @@ import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.AclInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; import org.apache.rocketmq.remoting.protocol.body.GroupList; import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; @@ -170,7 +171,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -946,7 +949,10 @@ public void testGetAllConsumerOffset() throws RemotingCommandException { when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset)); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + ConsumerOffsetSerializeWrapper consumerOffsetSerializeWrapper = ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + assertFalse(new String(response.getBody()).contains("pullOffsetTable")); + assertTrue(consumerOffsetSerializeWrapper.getPullOffsetTable().isEmpty()); } @Test diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConsumerOffsetSerializeWrapper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConsumerOffsetSerializeWrapper.java index 407be4670e8..1d5897e4436 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConsumerOffsetSerializeWrapper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/ConsumerOffsetSerializeWrapper.java @@ -25,6 +25,10 @@ public class ConsumerOffsetSerializeWrapper extends RemotingSerializable { private ConcurrentMap> offsetTable = new ConcurrentHashMap<>(512); + + private final ConcurrentMap> pullOffsetTable = + new ConcurrentHashMap<>(512); + private DataVersion dataVersion; public ConcurrentMap> getOffsetTable() { @@ -35,6 +39,10 @@ public void setOffsetTable(ConcurrentMap> o this.offsetTable = offsetTable; } + public ConcurrentMap> getPullOffsetTable() { + return pullOffsetTable; + } + public DataVersion getDataVersion() { return dataVersion; }