diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index f5e1c7b76f3..becf2c2165d 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -214,7 +214,8 @@ private Runnable handleAutoRenew(ProxyContext ctx, ReceiveMessageRequest request if (receiptHandle != null) { MessageReceiptHandle messageReceiptHandle = new MessageReceiptHandle(group, topic, messageExt.getQueueId(), receiptHandle, messageExt.getMsgId(), - messageExt.getQueueOffset(), messageExt.getReconsumeTimes()); + messageExt.getQueueOffset(), messageExt.getReconsumeTimes(), + messageExt.getProperty(MessageConst.PROPERTY_LITE_TOPIC)); messagingProcessor.addReceiptHandle(ctx, clientChannel, group, messageExt.getMsgId(), messageReceiptHandle); } } diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java index f7074dedd63..6478f90cb64 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivityTest.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer; import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.ClientType; import apache.rocketmq.v2.FilterExpression; import apache.rocketmq.v2.FilterType; import apache.rocketmq.v2.MessageQueue; @@ -45,6 +46,7 @@ import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.proxy.common.MessageReceiptHandle; import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.proxy.config.ConfigurationManager; import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest; @@ -64,6 +66,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -319,6 +322,64 @@ public void testReceiveMessageAddReceiptHandle() { assertEquals(Arrays.asList(popCk1, popCk2), receiptHandleCaptor.getAllValues().stream().map(ReceiptHandle::encode).collect(Collectors.toList())); } + @Test + public void testReceiveLiteMessageAddReceiptHandleWithLiteTopic() { + ConfigurationManager.getProxyConfig().setEnableProxyAutoRenew(true); + StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class); + doNothing().when(receiveStreamObserver).onNext(any()); + when(this.grpcClientSettingsManager.getClientSettings(any())).thenReturn(Settings.newBuilder() + .setClientType(ClientType.LITE_PUSH_CONSUMER) + .build()); + + String msgId = "liteMsgId"; + String liteTopic = "liteTopic"; + String popCk = "0 0 60000 0 0 broker 0 0 0"; + MessageExt messageExt = new MessageExt(); + messageExt.setTopic(TOPIC); + messageExt.setMsgId(msgId); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK, popCk); + MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_LITE_TOPIC, liteTopic); + messageExt.setBody("body".getBytes()); + + PopResult popResult = new PopResult(PopStatus.FOUND, Collections.singletonList(messageExt)); + when(this.messagingProcessor.popLiteMessage( + any(), + any(), + anyString(), + anyString(), + anyInt(), + anyLong(), + anyLong(), + any(), + any(), + isNull(), + anyLong())).thenReturn(CompletableFuture.completedFuture(popResult)); + + ArgumentCaptor messageReceiptHandleCaptor = ArgumentCaptor.forClass(MessageReceiptHandle.class); + + ProxyContext ctx = createContext(); + this.grpcChannelManager.createChannel(ctx, ctx.getClientID()); + ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.newBuilder() + .setGroup(Resource.newBuilder().setName(CONSUMER_GROUP).build()) + .setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(TOPIC).build()).build()) + .setAutoRenew(true) + .setFilterExpression(FilterExpression.newBuilder() + .setType(FilterType.TAG) + .setExpression("*") + .build()) + .build(); + + this.receiveMessageActivity.receiveMessage(ctx, receiveMessageRequest, receiveStreamObserver); + + verify(this.messagingProcessor).addReceiptHandle( + any(), + any(), + eq(CONSUMER_GROUP), + eq(msgId), + messageReceiptHandleCaptor.capture()); + assertEquals(liteTopic, messageReceiptHandleCaptor.getValue().getLiteTopic()); + } + @Test public void testReceiveMessage() { StreamObserver receiveStreamObserver = mock(ServerCallStreamObserver.class);