diff --git a/CHANGELOG.md b/CHANGELOG.md index dd347a3a8..8ef2c3f23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ - **[jdbc-v2]** `ResultSet#getObject(int|String, Map>)` now accepts ClickHouse type names as map keys in addition to the JDBC `SQLType` names it has always accepted. Only unwrapped type names are used for the lookup — `Nullable(...)` and `LowCardinality(...)` wrappers are stripped and do not affect resolution, so a key like `"Int32"` matches both `Int32` and `Nullable(Int32)` columns; keys like `"Nullable(Int32)"` are not recognized. Lookup order is the `ClickHouseDataType` enum name (e.g. `"Int32"`, `"String"`, `"DateTime"`) then the JDBC `SQLType` name (e.g. `"INTEGER"`, `"VARCHAR"`, `"TIMESTAMP"`); a missing entry leaves the value uncoerced. The feature is supported for primitive ClickHouse types only — `Array`, `Tuple`, `Map`, `Nested`, and geometry types are not supported and continue to be returned in their native form regardless of the user-supplied map. Existing maps keyed only by JDBC `SQLType` names continue to work unchanged. +- **[client-v2]** Added endpoint failover support: when multiple endpoints are configured and a request fails with a retryable error (connect timeout, connection refused, HTTP 503, etc.), the client now automatically retries against the next available endpoint instead of always targeting the first one. Failed endpoints are quarantined for 30 seconds before being retried. (https://github.com/ClickHouse/clickhouse-java/issues/2855) + ### Bug Fixes - **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 58d94da9b..5978b4f2e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -34,6 +34,7 @@ import com.clickhouse.client.api.serde.POJOFieldDeserializer; import com.clickhouse.client.api.serde.POJOFieldSerializer; import com.clickhouse.client.api.serde.POJOSerDe; +import com.clickhouse.client.api.transport.ClientNodeSelector; import com.clickhouse.client.api.transport.Endpoint; import com.clickhouse.client.api.transport.HttpEndpoint; import com.clickhouse.client.config.ClickHouseClientOption; @@ -60,8 +61,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -141,6 +142,7 @@ public class Client implements AutoCloseable { private final int retries; private LZ4Factory lz4Factory = null; private final Supplier queryIdGenerator; + private final ClientNodeSelector nodeSelector; private final CredentialsManager credentialsManager; private Client(Collection endpoints, Map configuration, @@ -185,6 +187,7 @@ private Client(Collection endpoints, Map configuration, } this.endpoints = tmpEndpoints.build(); + this.nodeSelector = new ClientNodeSelector(this.endpoints); String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); this.retries = retry == null ? 0 : Integer.parseInt(retry); @@ -270,7 +273,7 @@ public static class Builder { private Supplier queryIdGenerator; public Builder() { - this.endpoints = new HashSet<>(); + this.endpoints = new LinkedHashSet<>(); this.configuration = new HashMap<>(); for (ClientConfigProperties p : ClientConfigProperties.values()) { @@ -1314,7 +1317,7 @@ public CompletableFuture insert(String tableName, List data, Supplier supplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { @@ -1344,7 +1347,7 @@ public CompletableFuture insert(String tableName, List data, // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); continue; } @@ -1361,7 +1364,7 @@ public CompletableFuture insert(String tableName, List data, lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); } else { throw lastException; } @@ -1536,7 +1539,7 @@ public CompletableFuture insert(String tableName, responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= retries; i++) { @@ -1552,7 +1555,14 @@ public CompletableFuture insert(String tableName, // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); + if (i < retries) { + try { + writer.onRetry(); + } catch (IOException ioe) { + throw new ClientException("Failed to reset stream before next attempt", ioe); + } + } continue; } @@ -1568,7 +1578,7 @@ public CompletableFuture insert(String tableName, lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); } else { throw lastException; } @@ -1665,7 +1675,7 @@ public CompletableFuture query(String sqlQuery, Map responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= retries; i++) { ClassicHttpResponse httpResponse = null; @@ -1682,7 +1692,7 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map requestConfig, Htt } else if (httpResponse.getCode() == HttpStatus.SC_BAD_GATEWAY) { httpResponse.close(); throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); + } else if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + return httpResponse; } else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { try { throw readError(req, httpResponse); diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/ClientNodeSelector.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/ClientNodeSelector.java new file mode 100644 index 000000000..1fefff8f2 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/ClientNodeSelector.java @@ -0,0 +1,64 @@ +package com.clickhouse.client.api.transport; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * So we will look up for the first-alive node and then assign that node to + * client to talk. + * + *

Endpoints are tried in the order they were registered (index 0 is the + * "primary"). When a request fails, the failed endpoint is quarantined for + * {@link #DEFAULT_QUARANTINE_MS} milliseconds and the next alive endpoint + * is returned. Once the quarantine expires the node automatically becomes + * eligible again, so traffic returns to the primary without any explicit + * reset.

+ * + *

If all endpoints are quarantined, the primary (index 0) is returned + * as a fallback to avoid a complete lockout.

+ * + *

This class is thread-safe: concurrent callers may invoke + * {@link #getEndpoint()} and {@link #getNextAliveNode(Endpoint)} + * from different threads.

+ */ +public class ClientNodeSelector { + + private static final Logger LOG = LoggerFactory.getLogger(ClientNodeSelector.class); + + static final long DEFAULT_QUARANTINE_MS = 30_000; + + private final List endpointStates; + + public ClientNodeSelector(List endpoints) { + List states = new ArrayList<>(endpoints.size()); + for (Endpoint ep : endpoints) { + states.add(new EndpointState(ep)); + } + this.endpointStates = Collections.unmodifiableList(states); + } + + public Endpoint getEndpoint() { + for (EndpointState state : endpointStates) { + if (state.isAlive()) { + return state.getEndpoint(); + } + } + LOG.warn("All endpoints are non-responsive, falling back to primary endpoint"); + return endpointStates.get(0).getEndpoint(); + } + + public Endpoint getNextAliveNode(Endpoint failedEndpoint) { + for (EndpointState state : endpointStates) { + if (state.getEndpoint().equals(failedEndpoint)) { + state.markFailed(DEFAULT_QUARANTINE_MS); + LOG.warn("Endpoint {} quarantined for {} ms", failedEndpoint.getHost(), DEFAULT_QUARANTINE_MS); + break; + } + } + return getEndpoint(); + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java new file mode 100644 index 000000000..6ee52cd00 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java @@ -0,0 +1,30 @@ +package com.clickhouse.client.api.transport; + +/** + *{@link Endpoint} is wrapped to track for failover. + * When a node fails, it can be quarantined for a fixed duration and after + * it is expired, it will be considered as alive again. + */ +class EndpointState { + + private final Endpoint endpoint; + + private volatile long failedUntil; + + EndpointState(Endpoint endpoint) { + this.endpoint = endpoint; + this.failedUntil = 0; + } + + Endpoint getEndpoint() { + return endpoint; + } + + void markFailed(long quarantineMs) { + this.failedUntil = System.currentTimeMillis() + quarantineMs; + } + + boolean isAlive() { + return System.currentTimeMillis() >= failedUntil; + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java b/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java new file mode 100644 index 000000000..6a724151d --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java @@ -0,0 +1,253 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.ClientFaultCause; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.data.ClickHouseFormat; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Integration tests for endpoint failover behavior in client-v2. + * + *

Verifies that when multiple endpoints are configured and the first + * endpoint is unreachable, the client automatically fails over to the + * next available endpoint.

+ */ +public class ClientFailoverTest extends BaseIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ClientFailoverTest.class); + + /** + * Configures a dead endpoint (port 1, nothing listens) as the primary + * and the actual test server as the backup. Verifies that a query + * succeeds by failing over to the backup node. + */ + @Test(groups = {"integration"}) + public void testQueryFailoverToBackupNode() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty(), "Expected at least one record"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1), "Query should succeed via failover to the backup node"); + } + } + + /** + * Verifies that when all endpoints are healthy, the primary (first) + * endpoint is consistently used. This tests the "affinity" behavior. + */ + @Test(groups = {"integration"}) + public void testPrimaryEndpointAffinityWhenHealthy() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + + for (int i = 0; i < 5; i++) { + List result = client.queryAll("SELECT " + i + " AS val"); + Assert.assertFalse(result.isEmpty()); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(i)); + } + } + } + + /** + * Verifies that insert operations also failover when the primary + * endpoint is down. + */ + @Test(groups = {"integration"}) + public void testInsertFailoverToBackupNode() throws Exception { + if (isCloud()) { + return; + } + + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + try (Client adminClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + adminClient.execute("DROP TABLE IF EXISTS failover_insert_test").get(10, TimeUnit.SECONDS).close(); + adminClient.execute("CREATE TABLE failover_insert_test (val UInt32) ENGINE MergeTree ORDER BY ()").get(10, TimeUnit.SECONDS).close(); + } + + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + String csvData = "42\n"; + client.insert("failover_insert_test", + new ByteArrayInputStream(csvData.getBytes()), + ClickHouseFormat.CSV).get(30, TimeUnit.SECONDS).close(); + } + + try (Client verifyClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + List result = verifyClient.queryAll("SELECT val FROM failover_insert_test"); + Assert.assertFalse(result.isEmpty(), "Expected at least one row after failover insert"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(42)); + + verifyClient.execute("DROP TABLE IF EXISTS failover_insert_test").get(10, TimeUnit.SECONDS).close(); + } + } + + @Test(groups = {"integration"}) + public void testMultipleBackups() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) // working + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty()); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1)); + } + } + + @Test(groups = {"integration"}, expectedExceptions = ClientException.class) + public void testAllEndpointsDead() { + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + client.queryAll("SELECT 1 AS val"); + } + } + + @Test(groups = {"integration"}, expectedExceptions = ClientException.class) + public void testRetryLimitReached() { + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(1) + .build()) { + + client.queryAll("SELECT 1 AS val"); + } + } + + @Test(groups = {"integration"}) + public void testDuplicateEndpointRegistrationAndOrderPreservation() throws Exception { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + String uriA = "http://127.0.0.1:1"; + String uriB = "http://127.0.0.1:2"; + + try (Client client = new Client.Builder() + .addEndpoint(uriA) + .addEndpoint(uriA) + .addEndpoint(uriB) + .addEndpoint(uriB) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + + java.lang.reflect.Field field = Client.class.getDeclaredField("endpoints"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + List endpoints = + (List) field.get(client); + + Assert.assertEquals(endpoints.size(), 2, "Should only have unique endpoints"); + Assert.assertEquals(endpoints.get(0).getURI().toString(), uriA + "/", "First endpoint should be " + uriA + "/"); + Assert.assertEquals(endpoints.get(1).getURI().toString(), uriB + "/", "Second endpoint should be " + uriB + "/"); + } + } + + @Test(groups = {"integration"}, expectedExceptions = IllegalArgumentException.class) + public void testNoEndpointsConfigured() { + new Client.Builder() + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build(); + } + + @Test(groups = {"integration"}) + public void testHTTP503Failover() throws Exception { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration.options().dynamicPort()); + mockServer.start(); + try { + mockServer.stubFor(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().withStatus(503).withBody("Service Unavailable"))); + + try (Client client = new Client.Builder() + .addEndpoint("http://localhost:" + mockServer.port()) + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty(), "Expected at least one record"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1), + "Query should succeed via failover after 503"); + } + } finally { + mockServer.stop(); + } + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java new file mode 100644 index 000000000..cbc547175 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java @@ -0,0 +1,119 @@ +package com.clickhouse.client.api.transport; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ClientNodeSelectorTest { + + @Test + public void testEndpointStateQuarantineAndExpiry() throws InterruptedException { + Endpoint ep = new HttpEndpoint("localhost", 8123, false, "/"); + EndpointState state = new EndpointState(ep); + + Assert.assertTrue(state.isAlive(), "New endpoint state should be alive"); + Assert.assertSame(state.getEndpoint(), ep); + + state.markFailed(20); + Assert.assertFalse(state.isAlive(), "Endpoint state should not be alive immediately after failure"); + + Thread.sleep(40); + Assert.assertTrue(state.isAlive(), "Endpoint state should be alive again after quarantine expires"); + } + + @Test + public void testClientNodeSelectorAffinityAndQuarantine() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + Assert.assertEquals(selector.getEndpoint(), epA); + + Endpoint next = selector.getNextAliveNode(epA); + Assert.assertEquals(next, epB); + Assert.assertEquals(selector.getEndpoint(), epB); + + next = selector.getNextAliveNode(epB); + Assert.assertEquals(next, epC); + Assert.assertEquals(selector.getEndpoint(), epC); + + next = selector.getNextAliveNode(epC); + Assert.assertEquals(next, epA); + Assert.assertEquals(selector.getEndpoint(), epA); + } + + @Test + public void testClientNodeSelectorFallbackWhenAllDead() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB)); + + selector.getNextAliveNode(epA); + selector.getNextAliveNode(epB); + + Assert.assertEquals(selector.getEndpoint(), epA); + } + + @Test + public void testClientNodeSelectorSequentialFailures() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + Assert.assertEquals(selector.getEndpoint(), epA); + + Assert.assertEquals(selector.getNextAliveNode(epA), epB); + Assert.assertEquals(selector.getNextAliveNode(epB), epC); + Assert.assertEquals(selector.getNextAliveNode(epC), epA); + } + + @Test + public void testClientNodeSelectorConcurrency() throws Exception { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + int threadCount = 10; + int loopCount = 1000; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threadCount; i++) { + futures.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + for (int j = 0; j < loopCount; j++) { + Endpoint ep = selector.getEndpoint(); + Assert.assertNotNull(ep); + if (j % 10 == 0) { + selector.getNextAliveNode(ep); + } + } + return null; + } + })); + } + + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + for (Future future : futures) { + future.get(); + } + } +}