Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,17 @@ private synchronized void reput(Timeout timeout) {
@Override
public synchronized void run(Timeout timeout) throws Exception {
Collection<Channel> channels = channelProvider.getChannels();
boolean allChannelsClosed = !channels.isEmpty();
for (Channel channel : channels) {
if (!channel.isClosed()) {
allChannelsClosed = false;
doTask(channel);
}
}
if (allChannelsClosed) {
cancel();
return;
}
reput(timeout);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,14 @@ public void send(Object message, boolean sent) throws RemotingException {
channel.send(message, sent);
}

/**
* Check both HeaderExchangeChannel and underlying transport client,
* because the transport client may be closed independently (e.g., by protocol destroy)
* without going through HeaderExchangeClient.close().
*/
@Override
public boolean isClosed() {
return channel.isClosed();
return channel.isClosed() || client.isClosed();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.exchange.support.header;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.timer.HashedWheelTimer;
import org.apache.dubbo.remoting.Channel;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;

class AbstractTimerTaskTest {

private HashedWheelTimer timer;
private MockChannel channel;
private AtomicInteger taskExecutionCount;

@BeforeEach
public void setup() {
long tickDuration = 1000;
timer = new HashedWheelTimer(tickDuration / HEARTBEAT_CHECK_TICK, TimeUnit.MILLISECONDS);
channel = new MockChannel() {
@Override
public URL getUrl() {
return URL.valueOf("dubbo://localhost:20880");
}
};
taskExecutionCount = new AtomicInteger(0);
}

@AfterEach
public void teardown() {
timer.stop();
}

@Test
void testAutoCancelWhenAllChannelsClosed() throws Exception {
long tick = 1000 / HEARTBEAT_CHECK_TICK;
AbstractTimerTask task = new AbstractTimerTask(() -> Collections.singleton(channel), timer, tick) {
@Override
protected void doTask(Channel channel) {
taskExecutionCount.incrementAndGet();
}
};
task.start();

// Let the task run a few times while channel is open
Thread.sleep(1500L);
Assertions.assertTrue(taskExecutionCount.get() > 0, "Task should have executed at least once");

int countBeforeClose = taskExecutionCount.get();

// Close the channel
channel.close();

// Wait for the task to detect closure and auto-cancel
Thread.sleep(1500L);

int countAfterClose = taskExecutionCount.get();

// Task should not have executed after channel was closed
Assertions.assertEquals(
countBeforeClose, countAfterClose, "Task should not execute after all channels are closed");

// Verify the task was cancelled
Assertions.assertTrue(task.cancel, "Task should be cancelled when all channels are closed");
}

@Test
void testTaskContinuesWhenChannelIsOpen() throws Exception {
long tick = 1000 / HEARTBEAT_CHECK_TICK;
AbstractTimerTask task = new AbstractTimerTask(() -> Collections.singleton(channel), timer, tick) {
@Override
protected void doTask(Channel channel) {
taskExecutionCount.incrementAndGet();
}
};
task.start();

Thread.sleep(2000L);

Assertions.assertTrue(taskExecutionCount.get() > 1, "Task should keep executing when channel is open");
Assertions.assertFalse(task.cancel, "Task should not be cancelled when channel is open");

task.cancel();
}

@Test
void testTaskNotCancelledWhenChannelCollectionIsEmpty() throws Exception {
long tick = 1000 / HEARTBEAT_CHECK_TICK;
// Server-side scenario: ChannelProvider returns empty collection when no clients are connected
AbstractTimerTask task = new AbstractTimerTask(ArrayList::new, timer, tick) {
@Override
protected void doTask(Channel channel) {
taskExecutionCount.incrementAndGet();
}
};
task.start();

// Let the task run several ticks with empty channel collection
Thread.sleep(2000L);

// Task should NOT be cancelled — empty collection is not the same as all-closed
Assertions.assertFalse(task.cancel, "Task should not be cancelled when channel collection is empty");
Assertions.assertEquals(0, taskExecutionCount.get(), "doTask should not be called when there are no channels");

task.cancel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,31 @@ void testReconnect() {
Assertions.assertFalse(headerExchangeClient.shouldReconnect(URL.valueOf("localhost?reconnect=false")));
Assertions.assertFalse(headerExchangeClient.shouldReconnect(URL.valueOf("localhost?reconnect=FALSE")));
}

@Test
void testIsClosedWhenUnderlyingClientClosed() {
Client mockClient = Mockito.mock(Client.class);
// Underlying client is closed, but HeaderExchangeChannel is not
Mockito.when(mockClient.isClosed()).thenReturn(true);
Mockito.when(mockClient.getUrl()).thenReturn(URL.valueOf("dubbo://localhost:20880"));

HeaderExchangeClient headerExchangeClient = new HeaderExchangeClient(mockClient, false);

Assertions.assertTrue(
headerExchangeClient.isClosed(),
"HeaderExchangeClient should report closed when underlying client is closed");
}

@Test
void testIsNotClosedWhenBothOpen() {
Client mockClient = Mockito.mock(Client.class);
Mockito.when(mockClient.isClosed()).thenReturn(false);
Mockito.when(mockClient.getUrl()).thenReturn(URL.valueOf("dubbo://localhost:20880"));

HeaderExchangeClient headerExchangeClient = new HeaderExchangeClient(mockClient, false);

Assertions.assertFalse(
headerExchangeClient.isClosed(),
"HeaderExchangeClient should report open when both channel and client are open");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,47 @@ void testReconnect() throws Exception {
Thread.sleep(2000L);
Assertions.assertTrue(channel.getReconnectCount() > 1);
}

@Test
void testStopReconnectWhenClientClosed() throws Exception {
url = url.addParameter(DUBBO_VERSION_KEY, "2.1.1");

// Let the task attempt reconnection while client is open and disconnected
Thread.sleep(1500L);
int reconnectCountBeforeClose = channel.getReconnectCount();
Assertions.assertTrue(
reconnectCountBeforeClose > 0, "Should have attempted reconnection while channel is not connected");

// Close the client - simulates provider going offline and client being destroyed
channel.close();

// Wait for the timer to fire again
Thread.sleep(1500L);

// After closing, reconnect count should not increase
Assertions.assertEquals(
reconnectCountBeforeClose,
channel.getReconnectCount(),
"Should stop reconnecting after client is closed");

// The timer task should have been cancelled
Assertions.assertTrue(reconnectTimerTask.cancel, "Timer task should be cancelled when client is closed");
}

@Test
void testReconnectContinuesWhenNotClosed() throws Exception {
url = url.addParameter(DUBBO_VERSION_KEY, "2.1.1");

// Channel is disconnected but not closed - reconnection should continue
Thread.sleep(2000L);
int count1 = channel.getReconnectCount();
Assertions.assertTrue(count1 > 0, "Should attempt reconnection when disconnected but not closed");

Thread.sleep(1500L);
int count2 = channel.getReconnectCount();
Assertions.assertTrue(count2 > count1, "Should keep trying to reconnect");

Assertions.assertFalse(
reconnectTimerTask.cancel, "Timer task should not be cancelled when client is still open");
}
}
Loading