From b6eb654be361cf1d0d1ba7694a11544c7a58cf11 Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 14:32:22 +0800 Subject: [PATCH 1/6] [ISSUE #10278] Add enableGrpcChannelReceiptHandleRenew switch to disable proxy periodic handle renewal for gRPC clients - Add enableGrpcChannelReceiptHandleRenew config in ProxyConfig (default true for backward compatibility) - When disabled, use group's consumeTimeoutMinute as invisibleTime for gRPC clients instead of short defaultInvisibleTimeMills - Skip periodic changeInvisibleTime renewal in DefaultReceiptHandleManager only for GrpcClientChannel when disabled - Still save handles and monitor client connectivity for disconnect cleanup (nack) --- .../rocketmq/proxy/config/ProxyConfig.java | 20 +++++++++++++++++++ .../v2/consumer/ReceiveMessageActivity.java | 20 ++++++++++++++++++- .../receipt/DefaultReceiptHandleManager.java | 6 ++++++ 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 5a1a5859305..a2d5bc2a8ee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -214,6 +214,18 @@ public class ProxyConfig implements ConfigFile { private long invisibleTimeMillisWhenClear = 1000L; private boolean enableProxyAutoRenew = true; + /** + * When enableProxyAutoRenew is true, this controls whether the proxy periodically renews + * (extends) the invisible time of receipt handles for gRPC (GrpcClientChannel) consumers + * via changeInvisibleTime calls to the broker. + *

+ * When set to false, the proxy will NOT periodically renew handles for gRPC clients. Instead, + * it will use the consumer group's consumeTimeoutMinute as the initial invisible time during + * pop, and only save handles for client-disconnect cleanup (nack). This avoids the + * handle-mapping complexity and makes the client's original receipt handle remain valid + * even if the proxy crashes. + */ + private boolean enableGrpcChannelReceiptHandleRenew = true; private int maxRenewRetryTimes = 3; private int renewThreadPoolNums = 2; private int renewMaxThreadPoolNums = 4; @@ -1113,6 +1125,14 @@ public void setEnableProxyAutoRenew(boolean enableProxyAutoRenew) { this.enableProxyAutoRenew = enableProxyAutoRenew; } + public boolean isEnableGrpcChannelReceiptHandleRenew() { + return enableGrpcChannelReceiptHandleRenew; + } + + public void setEnableGrpcChannelReceiptHandleRenew(boolean enableGrpcChannelReceiptHandleRenew) { + this.enableGrpcChannelReceiptHandleRenew = enableGrpcChannelReceiptHandleRenew; + } + public int getMaxRenewRetryTimes() { return maxRenewRetryTimes; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index f5e1c7b76f3..cada05df7e4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -27,6 +27,7 @@ import io.grpc.stub.StreamObserver; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PopStatus; @@ -50,6 +51,7 @@ import org.apache.rocketmq.proxy.service.route.MessageQueueView; import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class ReceiveMessageActivity extends AbstractMessagingActivity { private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; @@ -107,7 +109,11 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration()); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) { - actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills(); + if (proxyConfig.isEnableGrpcChannelReceiptHandleRenew()) { + actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills(); + } else { + actualInvisibleTime = getConsumeTimeoutInvisibleTime(ctx, group, proxyConfig); + } } else { validateInvisibleTime(actualInvisibleTime, ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv()); @@ -229,6 +235,18 @@ protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx, ); } + private long getConsumeTimeoutInvisibleTime(ProxyContext ctx, String group, ProxyConfig proxyConfig) { + try { + SubscriptionGroupConfig groupConfig = this.messagingProcessor.getSubscriptionGroupConfig(ctx, group); + if (groupConfig != null && groupConfig.getConsumeTimeoutMinute() > 0) { + return TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute()); + } + } catch (Exception e) { + // fall through to default + } + return proxyConfig.getDefaultInvisibleTimeMills(); + } + protected static class ReceiveMessageQueueSelector implements QueueSelector { private final String brokerName; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index f9dfd825337..377b45f1269 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -54,6 +54,7 @@ import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.metadata.MetadataService; @@ -165,6 +166,11 @@ protected void scheduleRenewTask() { continue; } + if (!proxyConfig.isEnableGrpcChannelReceiptHandleRenew() + && key.getChannel() instanceof GrpcClientChannel) { + continue; + } + ReceiptHandleGroup group = entry.getValue(); group.scan((msgID, handleStr, v) -> { long current = System.currentTimeMillis(); From c71312cc43baf282d69f82eaa604ef3b1e961ca5 Mon Sep 17 00:00:00 2001 From: qianye Date: Tue, 28 Apr 2026 16:22:27 +0800 Subject: [PATCH 2/6] fix --- .../service/receipt/DefaultReceiptHandleManager.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index 377b45f1269..12d4bd79aff 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -166,12 +166,21 @@ protected void scheduleRenewTask() { continue; } + ReceiptHandleGroup group = entry.getValue(); + if (!proxyConfig.isEnableGrpcChannelReceiptHandleRenew() && key.getChannel() instanceof GrpcClientChannel) { + // When renew is disabled, only clean up expired handles to avoid memory leak + group.scan((msgID, handleStr, v) -> { + ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); + if (handle.isExpired()) { + group.computeIfPresent(msgID, handleStr, + messageReceiptHandle -> CompletableFuture.completedFuture(null), 0); + } + }); continue; } - ReceiptHandleGroup group = entry.getValue(); group.scan((msgID, handleStr, v) -> { long current = System.currentTimeMillis(); ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); From 982b3a00d3b98bbf585264aefffdee5f9da498d7 Mon Sep 17 00:00:00 2001 From: qianye Date: Fri, 8 May 2026 11:01:32 +0800 Subject: [PATCH 3/6] Empty commit message From 0216c4b806ea0d8f5aaab240a0ca17cf4f3ffb02 Mon Sep 17 00:00:00 2001 From: qianye Date: Fri, 8 May 2026 14:31:47 +0800 Subject: [PATCH 4/6] fix --- .../proxy/common/MessageReceiptHandle.java | 18 +++ .../v2/consumer/ReceiveMessageActivity.java | 49 +++++-- .../receipt/DefaultReceiptHandleManager.java | 18 ++- .../consumer/ReceiveMessageActivityTest.java | 129 ++++++++++++++++++ .../DefaultReceiptHandleManagerTest.java | 99 ++++++++++++++ 5 files changed, 289 insertions(+), 24 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java index 2f67d4ca14a..037b5644765 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java @@ -37,6 +37,14 @@ public class MessageReceiptHandle { private final long consumeTimestamp; private String liteTopic; private volatile String receiptHandleStr; + /** + * Indicates whether this handle needs periodic renewal by the proxy. + * When true, the proxy will periodically call changeInvisibleTime to extend the handle's lifetime. + * When false, the handle was popped with a long invisibleTime (e.g., consumeTimeoutMinute) and + * only needs expiry cleanup. This flag is set at handle creation time based on the + * enableGrpcChannelReceiptHandleRenew config, ensuring safe transitions when the config changes. + */ + private final boolean needRenew; public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes) { @@ -45,6 +53,11 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes, String liteTopic) { + this(group, topic, queueId, receiptHandleStr, messageId, queueOffset, reconsumeTimes, liteTopic, true); + } + + public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, + long queueOffset, int reconsumeTimes, String liteTopic, boolean needRenew) { this.originalReceiptHandle = ReceiptHandle.decode(receiptHandleStr); this.group = group; this.topic = topic; @@ -56,6 +69,7 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece this.reconsumeTimes = reconsumeTimes; this.consumeTimestamp = originalReceiptHandle.getRetrieveTime(); this.liteTopic = liteTopic; + this.needRenew = needRenew; } @Override @@ -169,4 +183,8 @@ public String getLiteTopic() { public void setLiteTopic(String liteTopic) { this.liteTopic = liteTopic; } + + public boolean isNeedRenew() { + return needRenew; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index cada05df7e4..e194470e222 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -16,18 +16,9 @@ */ package org.apache.rocketmq.proxy.grpc.v2.consumer; -import apache.rocketmq.v2.ClientType; -import apache.rocketmq.v2.Code; -import apache.rocketmq.v2.FilterExpression; -import apache.rocketmq.v2.ReceiveMessageRequest; -import apache.rocketmq.v2.ReceiveMessageResponse; -import apache.rocketmq.v2.Settings; -import apache.rocketmq.v2.Subscription; +import apache.rocketmq.v2.*; import com.google.protobuf.util.Durations; import io.grpc.stub.StreamObserver; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PopStatus; @@ -53,8 +44,17 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + public class ReceiveMessageActivity extends AbstractMessagingActivity { private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; + /** + * Default fallback invisibleTime when renewal is disabled and SubscriptionGroupConfig is unavailable. + * Uses the same default as SubscriptionGroupConfig.consumeTimeoutMinute (15 minutes). + */ + private static final long DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(15); public ReceiveMessageActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { @@ -213,6 +213,10 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt)); throw e; } + // Capture the renew mode at pop time. This ensures that when the config switches dynamically, + // handles already created continue to be handled correctly (renewed or not) regardless of + // the current config state. + final boolean needRenew = ConfigurationManager.getProxyConfig().isEnableGrpcChannelReceiptHandleRenew(); return () -> { List messageExtList = popResult.getMsgFoundList(); for (MessageExt messageExt : messageExtList) { @@ -220,7 +224,7 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request if (receiptHandle != null) { MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), - messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); + messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), null, needRenew); messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle); } } @@ -236,15 +240,32 @@ protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx, } private long getConsumeTimeoutInvisibleTime(ProxyContext ctx, String group, ProxyConfig proxyConfig) { + long invisibleTime; try { SubscriptionGroupConfig groupConfig = this.messagingProcessor.getSubscriptionGroupConfig(ctx, group); if (groupConfig != null && groupConfig.getConsumeTimeoutMinute() > 0) { - return TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute()); + invisibleTime = TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute()); + } else { + // Use default consumeTimeoutMinute as fallback when config is null or invalid. + // This is intentionally NOT defaultInvisibleTimeMills (60s) because without renewal, + // 60s is too short and would cause duplicate consumption. + invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS; } } catch (Exception e) { - // fall through to default + log.warn("Failed to get SubscriptionGroupConfig for group: {}, using default consumeTimeoutMinute (15min) " + + "as invisibleTime.", group, e); + // Safe fallback: use default consumeTimeoutMinute instead of defaultInvisibleTimeMills (60s). + // Since renewal is disabled, the handle must live long enough for the client to process it. + invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS; + } + // Clamp to maxInvisibleTimeMills to avoid exceeding broker's allowed range + long maxInvisibleTime = proxyConfig.getMaxInvisibleTimeMills(); + if (maxInvisibleTime > 0 && invisibleTime > maxInvisibleTime) { + log.warn("Calculated invisibleTime {}ms for group {} exceeds maxInvisibleTimeMills {}ms, clamping.", + invisibleTime, group, maxInvisibleTime); + invisibleTime = maxInvisibleTime; } - return proxyConfig.getDefaultInvisibleTimeMills(); + return invisibleTime; } protected static class ReceiveMessageQueueSelector implements QueueSelector { diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index 12d4bd79aff..f5bdd8361fd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -54,7 +54,6 @@ import org.apache.rocketmq.proxy.common.RenewEvent; import org.apache.rocketmq.proxy.common.RenewStrategyPolicy; import org.apache.rocketmq.proxy.common.channel.ChannelHelper; -import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcClientChannel; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.metadata.MetadataService; @@ -168,20 +167,19 @@ protected void scheduleRenewTask() { ReceiptHandleGroup group = entry.getValue(); - if (!proxyConfig.isEnableGrpcChannelReceiptHandleRenew() - && key.getChannel() instanceof GrpcClientChannel) { - // When renew is disabled, only clean up expired handles to avoid memory leak - group.scan((msgID, handleStr, v) -> { + group.scan((msgID, handleStr, v) -> { + // Per-handle renewal decision: handles that were created with needRenew=false + // (popped when enableGrpcChannelReceiptHandleRenew was disabled) only need + // expiry cleanup, NOT periodic renewal. This ensures safe transitions when + // the config is dynamically switched. + if (!v.isNeedRenew()) { ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); if (handle.isExpired()) { group.computeIfPresent(msgID, handleStr, messageReceiptHandle -> CompletableFuture.completedFuture(null), 0); } - }); - continue; - } - - group.scan((msgID, handleStr, v) -> { + return; + } long current = System.currentTimeMillis(); ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index f7074dedd63..baebcf834ae 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -70,6 +70,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class ReceiveMessageActivityTest extends BaseActivityTest { @@ -409,4 +411,131 @@ public void testReceiveMessageQueueSelector() throws Exception { assertEquals(BROKER_NAME + i, selectorBrokerName.select(ProxyContext.create(), messageQueueView).getBrokerName()); } } + + @Test + public void testReceiveMessageWithRenewDisabledUsesConsumeTimeout() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + groupConfig.setConsumeTimeoutMinute(20); + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should use consumeTimeoutMinute (20min = 1200000ms), not defaultInvisibleTimeMills (60s) + assertEquals(20 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + } + + @Test + public void testReceiveMessageWithRenewDisabledFallbackOnException() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + // Simulate exception when getting subscription group config + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())) + .thenThrow(new RuntimeException("broker unavailable")); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should fallback to 15min (900000ms), NOT defaultInvisibleTimeMills (60s) + assertEquals(15 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + } + + @Test + public void testReceiveMessageWithRenewDisabledClampsToMaxInvisibleTime() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + long originalMax = proxyConfig.getMaxInvisibleTimeMills(); + // Set maxInvisibleTimeMills to 10 min for testing + proxyConfig.setMaxInvisibleTimeMills(10 * 60 * 1000L); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + groupConfig.setConsumeTimeoutMinute(30); // 30min > maxInvisibleTimeMills (10min) + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should be clamped to maxInvisibleTimeMills (10min = 600000ms) + assertEquals(10 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + proxyConfig.setMaxInvisibleTimeMills(originalMax); + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index a01c356f779..854cb71f601 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -463,4 +463,103 @@ public void testClientOffline() { listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } + + @Test + public void testNoRenewHandleSkipsRenewalAndCleansExpired() { + // Create a handle with needRenew=false (simulating enableGrpcChannelReceiptHandleRenew=false at pop time) + ProxyConfig config = ConfigurationManager.getProxyConfig(); + long invisibleTime = 900000L; // 15min + String noRenewReceiptHandle = ReceiptHandle.builder() + .startOffset(0L) + .retrieveTime(System.currentTimeMillis() - invisibleTime - 1000) // already expired + .invisibleTime(invisibleTime) + .reviveQueueId(1) + .topicType(ReceiptHandle.NORMAL_TOPIC) + .brokerName(BROKER_NAME) + .queueId(QUEUE_ID) + .offset(OFFSET) + .commitLogOffset(0L) + .build().encode(); + MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false); + + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + + receiptHandleManager.scheduleRenewTask(); + + // Should NOT call changeInvisibleTime (no renewal for needRenew=false) + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); + + // The expired handle should be cleaned up + await().atMost(Duration.ofSeconds(1)).until(() -> { + try { + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; + } + }); + } + + @Test + public void testNoRenewHandleNotExpiredStaysInMemory() { + // Create a handle with needRenew=false that is NOT yet expired + long invisibleTime = 900000L; // 15min + String noRenewReceiptHandle = ReceiptHandle.builder() + .startOffset(0L) + .retrieveTime(System.currentTimeMillis()) // not expired yet + .invisibleTime(invisibleTime) + .reviveQueueId(1) + .topicType(ReceiptHandle.NORMAL_TOPIC) + .brokerName(BROKER_NAME) + .queueId(QUEUE_ID) + .offset(OFFSET) + .commitLogOffset(0L) + .build().encode(); + MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false); + + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + + receiptHandleManager.scheduleRenewTask(); + + // Should NOT call changeInvisibleTime + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); + + // Handle should still be in memory (not expired, kept for nack on disconnect) + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + assertEquals(false, receiptHandleGroup.isEmpty()); + } + + @Test + public void testTransitionNeedRenewTrueHandleStillRenewed() { + // Simulate a handle created when enableGrpcChannelReceiptHandleRenew=true (needRenew=true) + // Even if config later changes to false, this handle should still be renewed + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + + // Create handle with needRenew=true (default) + MessageReceiptHandle needRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, true); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, needRenewHandle); + + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); + + // Even though config says no renew now, the handle has needRenew=true + // so it should still be renewed + receiptHandleManager.scheduleRenewTask(); + + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.anyLong()); + } } \ No newline at end of file From b27eb267396235f031daa7ab536b48f8c88fb0a5 Mon Sep 17 00:00:00 2001 From: qianye Date: Fri, 8 May 2026 19:05:25 +0800 Subject: [PATCH 5/6] fix --- .../proxy/grpc/v2/consumer/ReceiveMessageActivity.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index e194470e222..150acd948e0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -16,7 +16,13 @@ */ package org.apache.rocketmq.proxy.grpc.v2.consumer; -import apache.rocketmq.v2.*; +import apache.rocketmq.v2.ClientType; +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.FilterExpression; +import apache.rocketmq.v2.ReceiveMessageRequest; +import apache.rocketmq.v2.ReceiveMessageResponse; +import apache.rocketmq.v2.Settings; +import apache.rocketmq.v2.Subscription; import com.google.protobuf.util.Durations; import io.grpc.stub.StreamObserver; import org.apache.commons.lang3.StringUtils; From b9c16ebbee59cbeb4f47319cac6c5105aeee7b37 Mon Sep 17 00:00:00 2001 From: qianye Date: Sat, 9 May 2026 10:15:05 +0800 Subject: [PATCH 6/6] Empty commit message