From 269c2c84004b24183da328380da6e3caf0841bdd Mon Sep 17 00:00:00 2001 From: Nagarjun HP Date: Wed, 24 Dec 2025 12:28:21 +0530 Subject: [PATCH] Fix retry ack handling in DownStreamMsgContext and add tests --- .../session/push/DownStreamMsgContext.java | 34 +++++++-- .../push/DownStreamMSGContextTest.java | 73 +++++++++++++++++++ 2 files changed, 101 insertions(+), 6 deletions(-) create mode 100644 eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMSGContextTest.java diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java index 1a4d53f3dc..e65fbbe829 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.java @@ -185,14 +185,36 @@ private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) { * * @param downStreamMsgContext Down Stream Message Context */ - private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) { - List msgExts = new ArrayList<>(); - msgExts.add(downStreamMsgContext.event); - log.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", downStreamMsgContext.event.getSubject(), - downStreamMsgContext.seq, downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS)); - downStreamMsgContext.consumer.updateOffset(msgExts, downStreamMsgContext.consumeConcurrentlyContext); + private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) { + if (downStreamMsgContext.consumer == null + || downStreamMsgContext.consumeConcurrentlyContext == null + || downStreamMsgContext.event == null) { + + log.warn( + "eventMeshAckMsg skipped, consumer:{}, context:{}, event:{}", + downStreamMsgContext.consumer == null, + downStreamMsgContext.consumeConcurrentlyContext == null, + downStreamMsgContext.event == null + ); + return; } + List msgExts = new ArrayList<>(); + msgExts.add(downStreamMsgContext.event); + + log.warn( + "eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", + downStreamMsgContext.event.getSubject(), + downStreamMsgContext.seq, + downStreamMsgContext.event.getExtension(EventMeshConstants.PROPERTY_MESSAGE_KEYS) + ); + + downStreamMsgContext.consumer.updateOffset( + msgExts, + downStreamMsgContext.consumeConcurrentlyContext + ); +} + @Override public void doRun() { retry(); diff --git a/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMSGContextTest.java b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMSGContextTest.java new file mode 100644 index 0000000000..b895249717 --- /dev/null +++ b/eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMSGContextTest.java @@ -0,0 +1,73 @@ +package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push; + +import org.apache.eventmesh.api.AbstractContext; +import org.apache.eventmesh.common.protocol.SubscriptionItem; +import org.apache.eventmesh.common.protocol.SubscriptionMode; +import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; +import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; + +import org.junit.jupiter.api.Test; + +import java.net.URI; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +class DownStreamMsgContextTest { + + private CloudEvent buildEvent() { + return CloudEventBuilder.v1() + .withId("test-id") + .withSource(URI.create("test://source")) + .withType("test-type") + .withSubject("test-topic") + .build(); + } + + private SubscriptionItem buildSubscriptionItem() { + SubscriptionItem item = new SubscriptionItem(); + item.setMode(SubscriptionMode.CLUSTERING); + return item; + } + + @Test + void retry_shouldNotThrowException_whenConsumerOrContextIsNull() { + + CloudEvent event = buildEvent(); + + // Intentionally set to null to simulate edge case + Session session = null; + MQConsumerWrapper consumer = null; + AbstractContext context = null; + + DownStreamMsgContext msgContext = new DownStreamMsgContext( + event, + session, + consumer, + context, + false, + buildSubscriptionItem() + ); + + assertDoesNotThrow(msgContext::retry); + } + + @Test + void ackMsg_shouldNotThrowException_whenDependenciesAreNull() { + + CloudEvent event = buildEvent(); + + DownStreamMsgContext msgContext = new DownStreamMsgContext( + event, + null, + null, + null, + false, + buildSubscriptionItem() + ); + + assertDoesNotThrow(msgContext::ackMsg); + } +}