diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java index 53dcbdcb05e..18dfdacd5c2 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java @@ -85,11 +85,17 @@ private synchronized void reput(Timeout timeout) { @Override public synchronized void run(Timeout timeout) throws Exception { Collection channels = channelProvider.getChannels(); + boolean allChannelsClosed = !channels.isEmpty(); for (Channel channel : channels) { if (!channel.isClosed()) { + allChannelsClosed = false; doTask(channel); } } + if (allChannelsClosed) { + cancel(); + return; + } reput(timeout); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index e654b327c09..b96f0937898 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -148,9 +148,14 @@ public void send(Object message, boolean sent) throws RemotingException { channel.send(message, sent); } + /** + * Check both HeaderExchangeChannel and underlying transport client, + * because the transport client may be closed independently (e.g., by protocol destroy) + * without going through HeaderExchangeClient.close(). + */ @Override public boolean isClosed() { - return channel.isClosed(); + return channel.isClosed() || client.isClosed(); } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTaskTest.java new file mode 100644 index 00000000000..998853f4a03 --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTaskTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.exchange.support.header; + +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.timer.HashedWheelTimer; +import org.apache.dubbo.remoting.Channel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK; + +class AbstractTimerTaskTest { + + private HashedWheelTimer timer; + private MockChannel channel; + private AtomicInteger taskExecutionCount; + + @BeforeEach + public void setup() { + long tickDuration = 1000; + timer = new HashedWheelTimer(tickDuration / HEARTBEAT_CHECK_TICK, TimeUnit.MILLISECONDS); + channel = new MockChannel() { + @Override + public URL getUrl() { + return URL.valueOf("dubbo://localhost:20880"); + } + }; + taskExecutionCount = new AtomicInteger(0); + } + + @AfterEach + public void teardown() { + timer.stop(); + } + + @Test + void testAutoCancelWhenAllChannelsClosed() throws Exception { + long tick = 1000 / HEARTBEAT_CHECK_TICK; + AbstractTimerTask task = new AbstractTimerTask(() -> Collections.singleton(channel), timer, tick) { + @Override + protected void doTask(Channel channel) { + taskExecutionCount.incrementAndGet(); + } + }; + task.start(); + + // Let the task run a few times while channel is open + Thread.sleep(1500L); + Assertions.assertTrue(taskExecutionCount.get() > 0, "Task should have executed at least once"); + + int countBeforeClose = taskExecutionCount.get(); + + // Close the channel + channel.close(); + + // Wait for the task to detect closure and auto-cancel + Thread.sleep(1500L); + + int countAfterClose = taskExecutionCount.get(); + + // Task should not have executed after channel was closed + Assertions.assertEquals( + countBeforeClose, countAfterClose, "Task should not execute after all channels are closed"); + + // Verify the task was cancelled + Assertions.assertTrue(task.cancel, "Task should be cancelled when all channels are closed"); + } + + @Test + void testTaskContinuesWhenChannelIsOpen() throws Exception { + long tick = 1000 / HEARTBEAT_CHECK_TICK; + AbstractTimerTask task = new AbstractTimerTask(() -> Collections.singleton(channel), timer, tick) { + @Override + protected void doTask(Channel channel) { + taskExecutionCount.incrementAndGet(); + } + }; + task.start(); + + Thread.sleep(2000L); + + Assertions.assertTrue(taskExecutionCount.get() > 1, "Task should keep executing when channel is open"); + Assertions.assertFalse(task.cancel, "Task should not be cancelled when channel is open"); + + task.cancel(); + } + + @Test + void testTaskNotCancelledWhenChannelCollectionIsEmpty() throws Exception { + long tick = 1000 / HEARTBEAT_CHECK_TICK; + // Server-side scenario: ChannelProvider returns empty collection when no clients are connected + AbstractTimerTask task = new AbstractTimerTask(ArrayList::new, timer, tick) { + @Override + protected void doTask(Channel channel) { + taskExecutionCount.incrementAndGet(); + } + }; + task.start(); + + // Let the task run several ticks with empty channel collection + Thread.sleep(2000L); + + // Task should NOT be cancelled — empty collection is not the same as all-closed + Assertions.assertFalse(task.cancel, "Task should not be cancelled when channel collection is empty"); + Assertions.assertEquals(0, taskExecutionCount.get(), "doTask should not be called when there are no channels"); + + task.cancel(); + } +} diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClientTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClientTest.java index 7fad611e7f9..fdc11688dae 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClientTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClientTest.java @@ -37,4 +37,31 @@ void testReconnect() { Assertions.assertFalse(headerExchangeClient.shouldReconnect(URL.valueOf("localhost?reconnect=false"))); Assertions.assertFalse(headerExchangeClient.shouldReconnect(URL.valueOf("localhost?reconnect=FALSE"))); } + + @Test + void testIsClosedWhenUnderlyingClientClosed() { + Client mockClient = Mockito.mock(Client.class); + // Underlying client is closed, but HeaderExchangeChannel is not + Mockito.when(mockClient.isClosed()).thenReturn(true); + Mockito.when(mockClient.getUrl()).thenReturn(URL.valueOf("dubbo://localhost:20880")); + + HeaderExchangeClient headerExchangeClient = new HeaderExchangeClient(mockClient, false); + + Assertions.assertTrue( + headerExchangeClient.isClosed(), + "HeaderExchangeClient should report closed when underlying client is closed"); + } + + @Test + void testIsNotClosedWhenBothOpen() { + Client mockClient = Mockito.mock(Client.class); + Mockito.when(mockClient.isClosed()).thenReturn(false); + Mockito.when(mockClient.getUrl()).thenReturn(URL.valueOf("dubbo://localhost:20880")); + + HeaderExchangeClient headerExchangeClient = new HeaderExchangeClient(mockClient, false); + + Assertions.assertFalse( + headerExchangeClient.isClosed(), + "HeaderExchangeClient should report open when both channel and client are open"); + } } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTaskTest.java index 6c758b372a9..72413959811 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTaskTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTaskTest.java @@ -80,4 +80,47 @@ void testReconnect() throws Exception { Thread.sleep(2000L); Assertions.assertTrue(channel.getReconnectCount() > 1); } + + @Test + void testStopReconnectWhenClientClosed() throws Exception { + url = url.addParameter(DUBBO_VERSION_KEY, "2.1.1"); + + // Let the task attempt reconnection while client is open and disconnected + Thread.sleep(1500L); + int reconnectCountBeforeClose = channel.getReconnectCount(); + Assertions.assertTrue( + reconnectCountBeforeClose > 0, "Should have attempted reconnection while channel is not connected"); + + // Close the client - simulates provider going offline and client being destroyed + channel.close(); + + // Wait for the timer to fire again + Thread.sleep(1500L); + + // After closing, reconnect count should not increase + Assertions.assertEquals( + reconnectCountBeforeClose, + channel.getReconnectCount(), + "Should stop reconnecting after client is closed"); + + // The timer task should have been cancelled + Assertions.assertTrue(reconnectTimerTask.cancel, "Timer task should be cancelled when client is closed"); + } + + @Test + void testReconnectContinuesWhenNotClosed() throws Exception { + url = url.addParameter(DUBBO_VERSION_KEY, "2.1.1"); + + // Channel is disconnected but not closed - reconnection should continue + Thread.sleep(2000L); + int count1 = channel.getReconnectCount(); + Assertions.assertTrue(count1 > 0, "Should attempt reconnection when disconnected but not closed"); + + Thread.sleep(1500L); + int count2 = channel.getReconnectCount(); + Assertions.assertTrue(count2 > count1, "Should keep trying to reconnect"); + + Assertions.assertFalse( + reconnectTimerTask.cancel, "Timer task should not be cancelled when client is still open"); + } }