diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
index 2f67d4ca14a..037b5644765 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java
@@ -37,6 +37,14 @@ public class MessageReceiptHandle {
private final long consumeTimestamp;
private String liteTopic;
private volatile String receiptHandleStr;
+ /**
+ * Indicates whether this handle needs periodic renewal by the proxy.
+ * When true, the proxy will periodically call changeInvisibleTime to extend the handle's lifetime.
+ * When false, the handle was popped with a long invisibleTime (e.g., consumeTimeoutMinute) and
+ * only needs expiry cleanup. This flag is set at handle creation time based on the
+ * enableGrpcChannelReceiptHandleRenew config, ensuring safe transitions when the config changes.
+ */
+ private final boolean needRenew;
public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
long queueOffset, int reconsumeTimes) {
@@ -45,6 +53,11 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece
public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
long queueOffset, int reconsumeTimes, String liteTopic) {
+ this(group, topic, queueId, receiptHandleStr, messageId, queueOffset, reconsumeTimes, liteTopic, true);
+ }
+
+ public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
+ long queueOffset, int reconsumeTimes, String liteTopic, boolean needRenew) {
this.originalReceiptHandle = ReceiptHandle.decode(receiptHandleStr);
this.group = group;
this.topic = topic;
@@ -56,6 +69,7 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece
this.reconsumeTimes = reconsumeTimes;
this.consumeTimestamp = originalReceiptHandle.getRetrieveTime();
this.liteTopic = liteTopic;
+ this.needRenew = needRenew;
}
@Override
@@ -169,4 +183,8 @@ public String getLiteTopic() {
public void setLiteTopic(String liteTopic) {
this.liteTopic = liteTopic;
}
+
+ public boolean isNeedRenew() {
+ return needRenew;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 5a1a5859305..a2d5bc2a8ee 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -214,6 +214,18 @@ public class ProxyConfig implements ConfigFile {
private long invisibleTimeMillisWhenClear = 1000L;
private boolean enableProxyAutoRenew = true;
+ /**
+ * When enableProxyAutoRenew is true, this controls whether the proxy periodically renews
+ * (extends) the invisible time of receipt handles for gRPC (GrpcClientChannel) consumers
+ * via changeInvisibleTime calls to the broker.
+ *
+ * When set to false, the proxy will NOT periodically renew handles for gRPC clients. Instead,
+ * it will use the consumer group's consumeTimeoutMinute as the initial invisible time during
+ * pop, and only save handles for client-disconnect cleanup (nack). This avoids the
+ * handle-mapping complexity and makes the client's original receipt handle remain valid
+ * even if the proxy crashes.
+ */
+ private boolean enableGrpcChannelReceiptHandleRenew = true;
private int maxRenewRetryTimes = 3;
private int renewThreadPoolNums = 2;
private int renewMaxThreadPoolNums = 4;
@@ -1113,6 +1125,14 @@ public void setEnableProxyAutoRenew(boolean enableProxyAutoRenew) {
this.enableProxyAutoRenew = enableProxyAutoRenew;
}
+ public boolean isEnableGrpcChannelReceiptHandleRenew() {
+ return enableGrpcChannelReceiptHandleRenew;
+ }
+
+ public void setEnableGrpcChannelReceiptHandleRenew(boolean enableGrpcChannelReceiptHandleRenew) {
+ this.enableGrpcChannelReceiptHandleRenew = enableGrpcChannelReceiptHandleRenew;
+ }
+
public int getMaxRenewRetryTimes() {
return maxRenewRetryTimes;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
index f5e1c7b76f3..150acd948e0 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java
@@ -25,8 +25,6 @@
import apache.rocketmq.v2.Subscription;
import com.google.protobuf.util.Durations;
import io.grpc.stub.StreamObserver;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
@@ -50,9 +48,19 @@
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
public class ReceiveMessageActivity extends AbstractMessagingActivity {
private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3";
+ /**
+ * Default fallback invisibleTime when renewal is disabled and SubscriptionGroupConfig is unavailable.
+ * Uses the same default as SubscriptionGroupConfig.consumeTimeoutMinute (15 minutes).
+ */
+ private static final long DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(15);
public ReceiveMessageActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
@@ -107,7 +115,11 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request,
long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration());
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) {
- actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
+ if (proxyConfig.isEnableGrpcChannelReceiptHandleRenew()) {
+ actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills();
+ } else {
+ actualInvisibleTime = getConsumeTimeoutInvisibleTime(ctx, group, proxyConfig);
+ }
} else {
validateInvisibleTime(actualInvisibleTime,
ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv());
@@ -207,6 +219,10 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request
writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt));
throw e;
}
+ // Capture the renew mode at pop time. This ensures that when the config switches dynamically,
+ // handles already created continue to be handled correctly (renewed or not) regardless of
+ // the current config state.
+ final boolean needRenew = ConfigurationManager.getProxyConfig().isEnableGrpcChannelReceiptHandleRenew();
return () -> {
List messageExtList = popResult.getMsgFoundList();
for (MessageExt messageExt : messageExtList) {
@@ -214,7 +230,7 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request
if (receiptHandle != null) {
MessageReceiptHandle messageReceiptHandle =
new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(),
- messageExt.getQueueOffset(), messageExt.getReconsumeTimes());
+ messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), null, needRenew);
messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle);
}
}
@@ -229,6 +245,35 @@ protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx,
);
}
+ private long getConsumeTimeoutInvisibleTime(ProxyContext ctx, String group, ProxyConfig proxyConfig) {
+ long invisibleTime;
+ try {
+ SubscriptionGroupConfig groupConfig = this.messagingProcessor.getSubscriptionGroupConfig(ctx, group);
+ if (groupConfig != null && groupConfig.getConsumeTimeoutMinute() > 0) {
+ invisibleTime = TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute());
+ } else {
+ // Use default consumeTimeoutMinute as fallback when config is null or invalid.
+ // This is intentionally NOT defaultInvisibleTimeMills (60s) because without renewal,
+ // 60s is too short and would cause duplicate consumption.
+ invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS;
+ }
+ } catch (Exception e) {
+ log.warn("Failed to get SubscriptionGroupConfig for group: {}, using default consumeTimeoutMinute (15min) "
+ + "as invisibleTime.", group, e);
+ // Safe fallback: use default consumeTimeoutMinute instead of defaultInvisibleTimeMills (60s).
+ // Since renewal is disabled, the handle must live long enough for the client to process it.
+ invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS;
+ }
+ // Clamp to maxInvisibleTimeMills to avoid exceeding broker's allowed range
+ long maxInvisibleTime = proxyConfig.getMaxInvisibleTimeMills();
+ if (maxInvisibleTime > 0 && invisibleTime > maxInvisibleTime) {
+ log.warn("Calculated invisibleTime {}ms for group {} exceeds maxInvisibleTimeMills {}ms, clamping.",
+ invisibleTime, group, maxInvisibleTime);
+ invisibleTime = maxInvisibleTime;
+ }
+ return invisibleTime;
+ }
+
protected static class ReceiveMessageQueueSelector implements QueueSelector {
private final String brokerName;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
index f9dfd825337..f5bdd8361fd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java
@@ -166,7 +166,20 @@ protected void scheduleRenewTask() {
}
ReceiptHandleGroup group = entry.getValue();
+
group.scan((msgID, handleStr, v) -> {
+ // Per-handle renewal decision: handles that were created with needRenew=false
+ // (popped when enableGrpcChannelReceiptHandleRenew was disabled) only need
+ // expiry cleanup, NOT periodic renewal. This ensures safe transitions when
+ // the config is dynamically switched.
+ if (!v.isNeedRenew()) {
+ ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
+ if (handle.isExpired()) {
+ group.computeIfPresent(msgID, handleStr,
+ messageReceiptHandle -> CompletableFuture.completedFuture(null), 0);
+ }
+ return;
+ }
long current = System.currentTimeMillis();
ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr());
if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) {
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
index f7074dedd63..baebcf834ae 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java
@@ -70,6 +70,8 @@
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import org.apache.rocketmq.proxy.config.ProxyConfig;
+import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
public class ReceiveMessageActivityTest extends BaseActivityTest {
@@ -409,4 +411,131 @@ public void testReceiveMessageQueueSelector() throws Exception {
assertEquals(BROKER_NAME + i, selectorBrokerName.select(ProxyContext.create(), messageQueueView).getBrokerName());
}
}
+
+ @Test
+ public void testReceiveMessageWithRenewDisabledUsesConsumeTimeout() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ proxyConfig.setEnableProxyAutoRenew(true);
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false);
+
+ StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class);
+ doNothing().when(receiveStreamObserver).onNext(any());
+ when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setConsumeTimeoutMinute(20);
+ when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig);
+
+ ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class);
+ when(this.messagingProcessor.popMessage(
+ any(), any(), anyString(), anyString(), anyInt(),
+ invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
+
+ this.receiveMessageActivity.receiveMessage(
+ createContext(),
+ ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+ .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(true)
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build(),
+ receiveStreamObserver
+ );
+
+ // Should use consumeTimeoutMinute (20min = 1200000ms), not defaultInvisibleTimeMills (60s)
+ assertEquals(20 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue());
+
+ // Restore
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true);
+ }
+
+ @Test
+ public void testReceiveMessageWithRenewDisabledFallbackOnException() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ proxyConfig.setEnableProxyAutoRenew(true);
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false);
+
+ StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class);
+ doNothing().when(receiveStreamObserver).onNext(any());
+ when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+ // Simulate exception when getting subscription group config
+ when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString()))
+ .thenThrow(new RuntimeException("broker unavailable"));
+
+ ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class);
+ when(this.messagingProcessor.popMessage(
+ any(), any(), anyString(), anyString(), anyInt(),
+ invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
+
+ this.receiveMessageActivity.receiveMessage(
+ createContext(),
+ ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+ .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(true)
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build(),
+ receiveStreamObserver
+ );
+
+ // Should fallback to 15min (900000ms), NOT defaultInvisibleTimeMills (60s)
+ assertEquals(15 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue());
+
+ // Restore
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true);
+ }
+
+ @Test
+ public void testReceiveMessageWithRenewDisabledClampsToMaxInvisibleTime() {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
+ proxyConfig.setEnableProxyAutoRenew(true);
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false);
+ long originalMax = proxyConfig.getMaxInvisibleTimeMills();
+ // Set maxInvisibleTimeMills to 10 min for testing
+ proxyConfig.setMaxInvisibleTimeMills(10 * 60 * 1000L);
+
+ StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class);
+ doNothing().when(receiveStreamObserver).onNext(any());
+ when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType());
+
+ SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
+ groupConfig.setConsumeTimeoutMinute(30); // 30min > maxInvisibleTimeMills (10min)
+ when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig);
+
+ ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class);
+ when(this.messagingProcessor.popMessage(
+ any(), any(), anyString(), anyString(), anyInt(),
+ invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong()))
+ .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())));
+
+ this.receiveMessageActivity.receiveMessage(
+ createContext(),
+ ReceiveMessageRequest.newBuilder()
+ .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build())
+ .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build())
+ .setAutoRenew(true)
+ .setFilterExpression(FilterExpression.newBuilder()
+ .setType(FilterType.TAG)
+ .setExpression("*")
+ .build())
+ .build(),
+ receiveStreamObserver
+ );
+
+ // Should be clamped to maxInvisibleTimeMills (10min = 600000ms)
+ assertEquals(10 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue());
+
+ // Restore
+ proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true);
+ proxyConfig.setMaxInvisibleTimeMills(originalMax);
+ }
}
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
index a01c356f779..854cb71f601 100644
--- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
+++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java
@@ -463,4 +463,103 @@ public void testClientOffline() {
listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0));
assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty());
}
+
+ @Test
+ public void testNoRenewHandleSkipsRenewalAndCleansExpired() {
+ // Create a handle with needRenew=false (simulating enableGrpcChannelReceiptHandleRenew=false at pop time)
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ long invisibleTime = 900000L; // 15min
+ String noRenewReceiptHandle = ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(System.currentTimeMillis() - invisibleTime - 1000) // already expired
+ .invisibleTime(invisibleTime)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName(BROKER_NAME)
+ .queueId(QUEUE_ID)
+ .offset(OFFSET)
+ .commitLogOffset(0L)
+ .build().encode();
+ MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle,
+ MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false);
+
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+
+ receiptHandleManager.scheduleRenewTask();
+
+ // Should NOT call changeInvisibleTime (no renewal for needRenew=false)
+ Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
+
+ // The expired handle should be cleaned up
+ await().atMost(Duration.ofSeconds(1)).until(() -> {
+ try {
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
+ return receiptHandleGroup.isEmpty();
+ } catch (Exception e) {
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testNoRenewHandleNotExpiredStaysInMemory() {
+ // Create a handle with needRenew=false that is NOT yet expired
+ long invisibleTime = 900000L; // 15min
+ String noRenewReceiptHandle = ReceiptHandle.builder()
+ .startOffset(0L)
+ .retrieveTime(System.currentTimeMillis()) // not expired yet
+ .invisibleTime(invisibleTime)
+ .reviveQueueId(1)
+ .topicType(ReceiptHandle.NORMAL_TOPIC)
+ .brokerName(BROKER_NAME)
+ .queueId(QUEUE_ID)
+ .offset(OFFSET)
+ .commitLogOffset(0L)
+ .build().encode();
+ MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle,
+ MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false);
+
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle);
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+
+ receiptHandleManager.scheduleRenewTask();
+
+ // Should NOT call changeInvisibleTime
+ Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(),
+ Mockito.anyString(), Mockito.anyString(), Mockito.anyLong());
+
+ // Handle should still be in memory (not expired, kept for nack on disconnect)
+ ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get();
+ assertEquals(false, receiptHandleGroup.isEmpty());
+ }
+
+ @Test
+ public void testTransitionNeedRenewTrueHandleStillRenewed() {
+ // Simulate a handle created when enableGrpcChannelReceiptHandleRenew=true (needRenew=true)
+ // Even if config later changes to false, this handle should still be renewed
+ ProxyConfig config = ConfigurationManager.getProxyConfig();
+ Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL);
+
+ // Create handle with needRenew=true (default)
+ MessageReceiptHandle needRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle,
+ MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, true);
+ receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, needRenewHandle);
+
+ Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class));
+ Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig());
+
+ // Even though config says no renew now, the handle has needRenew=true
+ // so it should still be renewed
+ receiptHandleManager.scheduleRenewTask();
+
+ Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1))
+ .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID),
+ Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.anyLong());
+ }
}
\ No newline at end of file