diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java index 2f67d4ca14a..037b5644765 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/MessageReceiptHandle.java @@ -37,6 +37,14 @@ public class MessageReceiptHandle { private final long consumeTimestamp; private String liteTopic; private volatile String receiptHandleStr; + /** + * Indicates whether this handle needs periodic renewal by the proxy. + * When true, the proxy will periodically call changeInvisibleTime to extend the handle's lifetime. + * When false, the handle was popped with a long invisibleTime (e.g., consumeTimeoutMinute) and + * only needs expiry cleanup. This flag is set at handle creation time based on the + * enableGrpcChannelReceiptHandleRenew config, ensuring safe transitions when the config changes. + */ + private final boolean needRenew; public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes) { @@ -45,6 +53,11 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, long queueOffset, int reconsumeTimes, String liteTopic) { + this(group, topic, queueId, receiptHandleStr, messageId, queueOffset, reconsumeTimes, liteTopic, true); + } + + public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId, + long queueOffset, int reconsumeTimes, String liteTopic, boolean needRenew) { this.originalReceiptHandle = ReceiptHandle.decode(receiptHandleStr); this.group = group; this.topic = topic; @@ -56,6 +69,7 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece this.reconsumeTimes = reconsumeTimes; this.consumeTimestamp = originalReceiptHandle.getRetrieveTime(); this.liteTopic = liteTopic; + this.needRenew = needRenew; } @Override @@ -169,4 +183,8 @@ public String getLiteTopic() { public void setLiteTopic(String liteTopic) { this.liteTopic = liteTopic; } + + public boolean isNeedRenew() { + return needRenew; + } } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 5a1a5859305..a2d5bc2a8ee 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -214,6 +214,18 @@ public class ProxyConfig implements ConfigFile { private long invisibleTimeMillisWhenClear = 1000L; private boolean enableProxyAutoRenew = true; + /** + * When enableProxyAutoRenew is true, this controls whether the proxy periodically renews + * (extends) the invisible time of receipt handles for gRPC (GrpcClientChannel) consumers + * via changeInvisibleTime calls to the broker. + *

+ * When set to false, the proxy will NOT periodically renew handles for gRPC clients. Instead, + * it will use the consumer group's consumeTimeoutMinute as the initial invisible time during + * pop, and only save handles for client-disconnect cleanup (nack). This avoids the + * handle-mapping complexity and makes the client's original receipt handle remain valid + * even if the proxy crashes. + */ + private boolean enableGrpcChannelReceiptHandleRenew = true; private int maxRenewRetryTimes = 3; private int renewThreadPoolNums = 2; private int renewMaxThreadPoolNums = 4; @@ -1113,6 +1125,14 @@ public void setEnableProxyAutoRenew(boolean enableProxyAutoRenew) { this.enableProxyAutoRenew = enableProxyAutoRenew; } + public boolean isEnableGrpcChannelReceiptHandleRenew() { + return enableGrpcChannelReceiptHandleRenew; + } + + public void setEnableGrpcChannelReceiptHandleRenew(boolean enableGrpcChannelReceiptHandleRenew) { + this.enableGrpcChannelReceiptHandleRenew = enableGrpcChannelReceiptHandleRenew; + } + public int getMaxRenewRetryTimes() { return maxRenewRetryTimes; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index f5e1c7b76f3..150acd948e0 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -25,8 +25,6 @@ import apache.rocketmq.v2.Subscription; import com.google.protobuf.util.Durations; import io.grpc.stub.StreamObserver; -import java.util.List; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PopStatus; @@ -50,9 +48,19 @@ import org.apache.rocketmq.proxy.service.route.MessageQueueView; import org.apache.rocketmq.remoting.protocol.filter.FilterAPI; import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class ReceiveMessageActivity extends AbstractMessagingActivity { private static final String ILLEGAL_POLLING_TIME_INTRODUCED_CLIENT_VERSION = "5.0.3"; + /** + * Default fallback invisibleTime when renewal is disabled and SubscriptionGroupConfig is unavailable. + * Uses the same default as SubscriptionGroupConfig.consumeTimeoutMinute (15 minutes). + */ + private static final long DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS = TimeUnit.MINUTES.toMillis(15); public ReceiveMessageActivity(MessagingProcessor messagingProcessor, GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) { @@ -107,7 +115,11 @@ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, long actualInvisibleTime = Durations.toMillis(request.getInvisibleDuration()); ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); if (proxyConfig.isEnableProxyAutoRenew() && request.getAutoRenew()) { - actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills(); + if (proxyConfig.isEnableGrpcChannelReceiptHandleRenew()) { + actualInvisibleTime = proxyConfig.getDefaultInvisibleTimeMills(); + } else { + actualInvisibleTime = getConsumeTimeoutInvisibleTime(ctx, group, proxyConfig); + } } else { validateInvisibleTime(actualInvisibleTime, ConfigurationManager.getProxyConfig().getMinInvisibleTimeMillsForRecv()); @@ -207,6 +219,10 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request writer.processThrowableWhenWriteMessage(e, ctx, request, messageExt)); throw e; } + // Capture the renew mode at pop time. This ensures that when the config switches dynamically, + // handles already created continue to be handled correctly (renewed or not) regardless of + // the current config state. + final boolean needRenew = ConfigurationManager.getProxyConfig().isEnableGrpcChannelReceiptHandleRenew(); return () -> { List messageExtList = popResult.getMsgFoundList(); for (MessageExt messageExt : messageExtList) { @@ -214,7 +230,7 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request if (receiptHandle != null) { MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), - messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); + messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), null, needRenew); messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle); } } @@ -229,6 +245,35 @@ protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx, ); } + private long getConsumeTimeoutInvisibleTime(ProxyContext ctx, String group, ProxyConfig proxyConfig) { + long invisibleTime; + try { + SubscriptionGroupConfig groupConfig = this.messagingProcessor.getSubscriptionGroupConfig(ctx, group); + if (groupConfig != null && groupConfig.getConsumeTimeoutMinute() > 0) { + invisibleTime = TimeUnit.MINUTES.toMillis(groupConfig.getConsumeTimeoutMinute()); + } else { + // Use default consumeTimeoutMinute as fallback when config is null or invalid. + // This is intentionally NOT defaultInvisibleTimeMills (60s) because without renewal, + // 60s is too short and would cause duplicate consumption. + invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS; + } + } catch (Exception e) { + log.warn("Failed to get SubscriptionGroupConfig for group: {}, using default consumeTimeoutMinute (15min) " + + "as invisibleTime.", group, e); + // Safe fallback: use default consumeTimeoutMinute instead of defaultInvisibleTimeMills (60s). + // Since renewal is disabled, the handle must live long enough for the client to process it. + invisibleTime = DEFAULT_CONSUME_TIMEOUT_INVISIBLE_TIME_MILLIS; + } + // Clamp to maxInvisibleTimeMills to avoid exceeding broker's allowed range + long maxInvisibleTime = proxyConfig.getMaxInvisibleTimeMills(); + if (maxInvisibleTime > 0 && invisibleTime > maxInvisibleTime) { + log.warn("Calculated invisibleTime {}ms for group {} exceeds maxInvisibleTimeMills {}ms, clamping.", + invisibleTime, group, maxInvisibleTime); + invisibleTime = maxInvisibleTime; + } + return invisibleTime; + } + protected static class ReceiveMessageQueueSelector implements QueueSelector { private final String brokerName; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java index f9dfd825337..f5bdd8361fd 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManager.java @@ -166,7 +166,20 @@ protected void scheduleRenewTask() { } ReceiptHandleGroup group = entry.getValue(); + group.scan((msgID, handleStr, v) -> { + // Per-handle renewal decision: handles that were created with needRenew=false + // (popped when enableGrpcChannelReceiptHandleRenew was disabled) only need + // expiry cleanup, NOT periodic renewal. This ensures safe transitions when + // the config is dynamically switched. + if (!v.isNeedRenew()) { + ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); + if (handle.isExpired()) { + group.computeIfPresent(msgID, handleStr, + messageReceiptHandle -> CompletableFuture.completedFuture(null), 0); + } + return; + } long current = System.currentTimeMillis(); ReceiptHandle handle = ReceiptHandle.decode(v.getReceiptHandleStr()); if (handle.getNextVisibleTime() - current > proxyConfig.getRenewAheadTimeMillis()) { diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index f7074dedd63..baebcf834ae 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -70,6 +70,8 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.rocketmq.proxy.config.ProxyConfig; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; public class ReceiveMessageActivityTest extends BaseActivityTest { @@ -409,4 +411,131 @@ public void testReceiveMessageQueueSelector() throws Exception { assertEquals(BROKER_NAME + i, selectorBrokerName.select(ProxyContext.create(), messageQueueView).getBrokerName()); } } + + @Test + public void testReceiveMessageWithRenewDisabledUsesConsumeTimeout() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + groupConfig.setConsumeTimeoutMinute(20); + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should use consumeTimeoutMinute (20min = 1200000ms), not defaultInvisibleTimeMills (60s) + assertEquals(20 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + } + + @Test + public void testReceiveMessageWithRenewDisabledFallbackOnException() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + // Simulate exception when getting subscription group config + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())) + .thenThrow(new RuntimeException("broker unavailable")); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should fallback to 15min (900000ms), NOT defaultInvisibleTimeMills (60s) + assertEquals(15 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + } + + @Test + public void testReceiveMessageWithRenewDisabledClampsToMaxInvisibleTime() { + ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig(); + proxyConfig.setEnableProxyAutoRenew(true); + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(false); + long originalMax = proxyConfig.getMaxInvisibleTimeMills(); + // Set maxInvisibleTimeMills to 10 min for testing + proxyConfig.setMaxInvisibleTimeMills(10 * 60 * 1000L); + + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder().getDefaultInstanceForType()); + + SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig(); + groupConfig.setConsumeTimeoutMinute(30); // 30min > maxInvisibleTimeMills (10min) + when(this.messagingProcessor.getSubscriptionGroupConfig(any(), anyString())).thenReturn(groupConfig); + + ArgumentCaptor invisibleTimeCaptor = ArgumentCaptor.forClass(Long.class); + when(this.messagingProcessor.popMessage( + any(), any(), anyString(), anyString(), anyInt(), + invisibleTimeCaptor.capture(), anyLong(), anyInt(), any(), anyBoolean(), any(), isNull(), anyLong())) + .thenReturn(CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()))); + + this.receiveMessageActivity.receiveMessage( + createContext(), + ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(), + receiveStreamObserver + ); + + // Should be clamped to maxInvisibleTimeMills (10min = 600000ms) + assertEquals(10 * 60 * 1000L, invisibleTimeCaptor.getValue().longValue()); + + // Restore + proxyConfig.setEnableGrpcChannelReceiptHandleRenew(true); + proxyConfig.setMaxInvisibleTimeMills(originalMax); + } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java index a01c356f779..854cb71f601 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/service/receipt/DefaultReceiptHandleManagerTest.java @@ -463,4 +463,103 @@ public void testClientOffline() { listenerArgumentCaptor.getValue().handle(ConsumerGroupEvent.CLIENT_UNREGISTER, GROUP, new ClientChannelInfo(channel, "", LanguageCode.JAVA, 0)); assertTrue(receiptHandleManager.receiptHandleGroupMap.isEmpty()); } + + @Test + public void testNoRenewHandleSkipsRenewalAndCleansExpired() { + // Create a handle with needRenew=false (simulating enableGrpcChannelReceiptHandleRenew=false at pop time) + ProxyConfig config = ConfigurationManager.getProxyConfig(); + long invisibleTime = 900000L; // 15min + String noRenewReceiptHandle = ReceiptHandle.builder() + .startOffset(0L) + .retrieveTime(System.currentTimeMillis() - invisibleTime - 1000) // already expired + .invisibleTime(invisibleTime) + .reviveQueueId(1) + .topicType(ReceiptHandle.NORMAL_TOPIC) + .brokerName(BROKER_NAME) + .queueId(QUEUE_ID) + .offset(OFFSET) + .commitLogOffset(0L) + .build().encode(); + MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false); + + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + + receiptHandleManager.scheduleRenewTask(); + + // Should NOT call changeInvisibleTime (no renewal for needRenew=false) + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); + + // The expired handle should be cleaned up + await().atMost(Duration.ofSeconds(1)).until(() -> { + try { + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + return receiptHandleGroup.isEmpty(); + } catch (Exception e) { + return false; + } + }); + } + + @Test + public void testNoRenewHandleNotExpiredStaysInMemory() { + // Create a handle with needRenew=false that is NOT yet expired + long invisibleTime = 900000L; // 15min + String noRenewReceiptHandle = ReceiptHandle.builder() + .startOffset(0L) + .retrieveTime(System.currentTimeMillis()) // not expired yet + .invisibleTime(invisibleTime) + .reviveQueueId(1) + .topicType(ReceiptHandle.NORMAL_TOPIC) + .brokerName(BROKER_NAME) + .queueId(QUEUE_ID) + .offset(OFFSET) + .commitLogOffset(0L) + .build().encode(); + MessageReceiptHandle noRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, noRenewReceiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, false); + + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, noRenewHandle); + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + + receiptHandleManager.scheduleRenewTask(); + + // Should NOT call changeInvisibleTime + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(0)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.anyString(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyLong()); + + // Handle should still be in memory (not expired, kept for nack on disconnect) + ReceiptHandleGroup receiptHandleGroup = receiptHandleManager.receiptHandleGroupMap.values().stream().findFirst().get(); + assertEquals(false, receiptHandleGroup.isEmpty()); + } + + @Test + public void testTransitionNeedRenewTrueHandleStillRenewed() { + // Simulate a handle created when enableGrpcChannelReceiptHandleRenew=true (needRenew=true) + // Even if config later changes to false, this handle should still be renewed + ProxyConfig config = ConfigurationManager.getProxyConfig(); + Channel channel = PROXY_CONTEXT.getVal(ContextVariable.CHANNEL); + + // Create handle with needRenew=true (default) + MessageReceiptHandle needRenewHandle = new MessageReceiptHandle(GROUP, TOPIC, QUEUE_ID, receiptHandle, + MESSAGE_ID, OFFSET, RECONSUME_TIMES, null, true); + receiptHandleManager.addReceiptHandle(PROXY_CONTEXT, channel, GROUP, MSG_ID, needRenewHandle); + + Mockito.when(consumerManager.findChannel(Mockito.eq(GROUP), Mockito.eq(channel))).thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(metadataService.getSubscriptionGroupConfig(Mockito.any(), Mockito.eq(GROUP))).thenReturn(new SubscriptionGroupConfig()); + + // Even though config says no renew now, the handle has needRenew=true + // so it should still be renewed + receiptHandleManager.scheduleRenewTask(); + + Mockito.verify(messagingProcessor, Mockito.timeout(1000).times(1)) + .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), + Mockito.eq(GROUP), Mockito.eq(TOPIC), Mockito.anyLong()); + } } \ No newline at end of file