From 33c3c2744f00efb6fa14d5c8dcc91fa87b6ec53e Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:24:13 +0800 Subject: [PATCH 1/6] fix: avoid query fail when query timerMsg by realTopic --- .../java/org/apache/rocketmq/client/impl/MQAdminImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java index b6fd72ad013..4f5acb62647 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java @@ -326,11 +326,12 @@ public QueryResult queryMessage(String clusterName, String topic, String key, in public QueryResult queryMessage(String clusterName, String topic, String key, int maxNum, long begin, long end, boolean isUniqKey, String indexType, String lastKey) throws MQClientException, InterruptedException { boolean isLmq = MixAll.isLmq(topic); + boolean isSysWheelTimer = topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"); String routeTopic = topic; // if topic is lmq ,then use clusterName as lmq parent topic // Use clusterName or lmq parent topic to get topic route for lmq or rmq_sys_wheel_timer - if (!StringUtils.isEmpty(topic) && (isLmq || topic.equals(TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer")) + if (!StringUtils.isEmpty(topic) && (isLmq || isSysWheelTimer) && !StringUtils.isEmpty(clusterName)) { routeTopic = clusterName; } @@ -344,7 +345,7 @@ public QueryResult queryMessage(String clusterName, String topic, String key, in if (topicRouteData != null) { List brokerAddrs = new LinkedList<>(); for (BrokerData brokerData : topicRouteData.getBrokerDatas()) { - if (!isLmq && clusterName != null && !clusterName.isEmpty() + if (!isLmq && !isSysWheelTimer && clusterName != null && !clusterName.isEmpty() && !clusterName.equals(brokerData.getCluster())) { continue; } From b249ff9ca51b86db4271a0bae0c6c26c2bcb2621 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:53:17 +0800 Subject: [PATCH 2/6] add test --- .../rocketmq/client/impl/MQAdminImplTest.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index f52aba2dc00..cfbae850d33 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -22,10 +22,12 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.sysflag.MessageSysFlag; +import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -51,6 +53,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -79,6 +82,8 @@ public class MQAdminImplTest { private final long defaultTimeout = 3000L; + private final String sysWheelTimerTopic = TopicValidator.SYSTEM_TOPIC_PREFIX + "wheel_timer"; + @Before public void init() throws RemotingException, InterruptedException, MQClientException { when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mQClientAPIImpl); @@ -200,6 +205,33 @@ public void assertQueryMessageByUniqKey() throws InterruptedException, MQClientE assertEquals(defaultTopic, queryResult.getMessageList().get(0).getTopic()); } + @Test + public void testQueryMessageBySysWheelTimerTopicShouldUseClusterNameAsRouteTopic() throws Exception { + String realTopic = "realTopic"; + TopicRouteData routeData = createRouteData(); + + when(mQClientFactory.getAnExistTopicRouteData(sysWheelTimerTopic)).thenReturn(null); + when(mQClientFactory.getAnExistTopicRouteData(realTopic)).thenReturn(routeData); + + doAnswer(invocation -> { + InvokeCallback callback = invocation.getArgument(3); + QueryMessageResponseHeader responseHeader = new QueryMessageResponseHeader(); + responseHeader.setIndexLastUpdatePhyoffset(1L); + responseHeader.setIndexLastUpdateTimestamp(System.currentTimeMillis()); + RemotingCommand response = mock(RemotingCommand.class); + when(response.decodeCommandCustomHeader(QueryMessageResponseHeader.class)).thenReturn(responseHeader); + when(response.getBody()).thenReturn(getMessageResult()); + when(response.getCode()).thenReturn(ResponseCode.SUCCESS); + callback.operationSucceed(response); + return null; + }).when(mQClientAPIImpl).queryMessage(anyString(), any(), anyLong(), any(InvokeCallback.class), any()); + String msgId = buildMsgId(); + QueryResult result = mqAdminImpl.queryMessage(realTopic, sysWheelTimerTopic, msgId, 32, + 0L, Long.MAX_VALUE, true, MessageConst.INDEX_UNIQUE_TYPE, null); + assertNotNull(result); + assertNotNull(result.getMessageList()); + } + private String buildMsgId() { MessageExt msgExt = createMessageExt(); int storeHostIPLength = (msgExt.getFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 : 16; From f1244840b038eb54843c9a6f62f096ca4170b617 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 11:53:39 +0800 Subject: [PATCH 3/6] add test --- .../java/org/apache/rocketmq/client/impl/MQAdminImplTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index cfbae850d33..af408f57648 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; From 79d3d3303bbfe358245611618987d962af3658a1 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 12:03:00 +0800 Subject: [PATCH 4/6] add test --- .../java/org/apache/rocketmq/client/impl/MQAdminImplTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java index af408f57648..d01a9a4d390 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQAdminImplTest.java @@ -207,10 +207,6 @@ public void assertQueryMessageByUniqKey() throws InterruptedException, MQClientE @Test public void testQueryMessageBySysWheelTimerTopicShouldUseClusterNameAsRouteTopic() throws Exception { String realTopic = "realTopic"; - TopicRouteData routeData = createRouteData(); - - when(mQClientFactory.getAnExistTopicRouteData(sysWheelTimerTopic)).thenReturn(null); - when(mQClientFactory.getAnExistTopicRouteData(realTopic)).thenReturn(routeData); doAnswer(invocation -> { InvokeCallback callback = invocation.getArgument(3); From ce8f8745bfd55d67faa344f9e43e699280d14e84 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 14:21:08 +0800 Subject: [PATCH 5/6] trigger ci From a63b83a61551829cdc938e40bde3bd1237d6211c Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 15 May 2026 15:02:09 +0800 Subject: [PATCH 6/6] trigger ci