From 58d96c75a094ab99c307cbeb840ec060a3355d0f Mon Sep 17 00:00:00 2001 From: kishansinghifs1 Date: Tue, 16 Jun 2026 21:52:11 +0530 Subject: [PATCH 1/4] feat: implement client-side failover logic with endpoint health tracking and selection support --- .../com/clickhouse/client/api/Client.java | 44 ++- .../client/api/ServerException.java | 7 +- .../api/transport/ClientNodeSelector.java | 64 +++++ .../client/api/transport/EndpointState.java | 30 +++ .../clickhouse/client/ClientFailoverTest.java | 253 ++++++++++++++++++ .../api/transport/ClientNodeSelectorTest.java | 119 ++++++++ 6 files changed, 502 insertions(+), 15 deletions(-) create mode 100644 client-v2/src/main/java/com/clickhouse/client/api/transport/ClientNodeSelector.java create mode 100644 client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java create mode 100644 client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java create mode 100644 client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 58d94da9b..288517916 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -34,6 +34,7 @@ import com.clickhouse.client.api.serde.POJOFieldDeserializer; import com.clickhouse.client.api.serde.POJOFieldSerializer; import com.clickhouse.client.api.serde.POJOSerDe; +import com.clickhouse.client.api.transport.ClientNodeSelector; import com.clickhouse.client.api.transport.Endpoint; import com.clickhouse.client.api.transport.HttpEndpoint; import com.clickhouse.client.config.ClickHouseClientOption; @@ -60,8 +61,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -141,6 +142,7 @@ public class Client implements AutoCloseable { private final int retries; private LZ4Factory lz4Factory = null; private final Supplier queryIdGenerator; + private final ClientNodeSelector nodeSelector; private final CredentialsManager credentialsManager; private Client(Collection endpoints, Map configuration, @@ -185,6 +187,7 @@ private Client(Collection endpoints, Map configuration, } this.endpoints = tmpEndpoints.build(); + this.nodeSelector = new ClientNodeSelector(this.endpoints); String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); this.retries = retry == null ? 0 : Integer.parseInt(retry); @@ -270,7 +273,7 @@ public static class Builder { private Supplier queryIdGenerator; public Builder() { - this.endpoints = new HashSet<>(); + this.endpoints = new LinkedHashSet<>(); this.configuration = new HashMap<>(); for (ClientConfigProperties p : ClientConfigProperties.values()) { @@ -1314,7 +1317,7 @@ public CompletableFuture insert(String tableName, List data, Supplier supplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= maxRetries; i++) { @@ -1344,7 +1347,7 @@ public CompletableFuture insert(String tableName, List data, // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); continue; } @@ -1361,7 +1364,7 @@ public CompletableFuture insert(String tableName, List data, lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); } else { throw lastException; } @@ -1536,7 +1539,7 @@ public CompletableFuture insert(String tableName, responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= retries; i++) { @@ -1552,7 +1555,7 @@ public CompletableFuture insert(String tableName, // Check response if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); continue; } @@ -1568,7 +1571,7 @@ public CompletableFuture insert(String tableName, lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); - selectedEndpoint = getNextAliveNode(); + selectedEndpoint = getNextAliveNode(selectedEndpoint); } else { throw lastException; } @@ -1665,7 +1668,7 @@ public CompletableFuture query(String sqlQuery, Map responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node - Endpoint selectedEndpoint = getNextAliveNode(); + Endpoint selectedEndpoint = getEndpoint(); RuntimeException lastException = null; for (int i = 0; i <= retries; i++) { ClassicHttpResponse httpResponse = null; @@ -1682,7 +1685,7 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, MapEndpoints are tried in the order they were registered (index 0 is the + * "primary"). When a request fails, the failed endpoint is quarantined for + * {@link #DEFAULT_QUARANTINE_MS} milliseconds and the next alive endpoint + * is returned. Once the quarantine expires the node automatically becomes + * eligible again, so traffic returns to the primary without any explicit + * reset.

+ * + *

If all endpoints are quarantined, the primary (index 0) is returned + * as a fallback to avoid a complete lockout.

+ * + *

This class is thread-safe: concurrent callers may invoke + * {@link #getEndpoint()} and {@link #getNextAliveNode(Endpoint)} + * from different threads.

+ */ +public class ClientNodeSelector { + + private static final Logger LOG = LoggerFactory.getLogger(ClientNodeSelector.class); + + static final long DEFAULT_QUARANTINE_MS = 30_000; + + private final List endpointStates; + + public ClientNodeSelector(List endpoints) { + List states = new ArrayList<>(endpoints.size()); + for (Endpoint ep : endpoints) { + states.add(new EndpointState(ep)); + } + this.endpointStates = Collections.unmodifiableList(states); + } + + public Endpoint getEndpoint() { + for (EndpointState state : endpointStates) { + if (state.isAlive()) { + return state.getEndpoint(); + } + } + LOG.warn("All endpoints are non-responsive, falling back to primary endpoint"); + return endpointStates.get(0).getEndpoint(); + } + + public Endpoint getNextAliveNode(Endpoint failedEndpoint) { + for (EndpointState state : endpointStates) { + if (state.getEndpoint().equals(failedEndpoint)) { + state.markFailed(DEFAULT_QUARANTINE_MS); + LOG.warn("Endpoint {} quarantined for {} ms", failedEndpoint.getHost(), DEFAULT_QUARANTINE_MS); + break; + } + } + return getEndpoint(); + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java b/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java new file mode 100644 index 000000000..6ee52cd00 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/transport/EndpointState.java @@ -0,0 +1,30 @@ +package com.clickhouse.client.api.transport; + +/** + *{@link Endpoint} is wrapped to track for failover. + * When a node fails, it can be quarantined for a fixed duration and after + * it is expired, it will be considered as alive again. + */ +class EndpointState { + + private final Endpoint endpoint; + + private volatile long failedUntil; + + EndpointState(Endpoint endpoint) { + this.endpoint = endpoint; + this.failedUntil = 0; + } + + Endpoint getEndpoint() { + return endpoint; + } + + void markFailed(long quarantineMs) { + this.failedUntil = System.currentTimeMillis() + quarantineMs; + } + + boolean isAlive() { + return System.currentTimeMillis() >= failedUntil; + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java b/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java new file mode 100644 index 000000000..6a724151d --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/ClientFailoverTest.java @@ -0,0 +1,253 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.ClientException; +import com.clickhouse.client.api.ClientFaultCause; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.data.ClickHouseFormat; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Integration tests for endpoint failover behavior in client-v2. + * + *

Verifies that when multiple endpoints are configured and the first + * endpoint is unreachable, the client automatically fails over to the + * next available endpoint.

+ */ +public class ClientFailoverTest extends BaseIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ClientFailoverTest.class); + + /** + * Configures a dead endpoint (port 1, nothing listens) as the primary + * and the actual test server as the backup. Verifies that a query + * succeeds by failing over to the backup node. + */ + @Test(groups = {"integration"}) + public void testQueryFailoverToBackupNode() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty(), "Expected at least one record"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1), "Query should succeed via failover to the backup node"); + } + } + + /** + * Verifies that when all endpoints are healthy, the primary (first) + * endpoint is consistently used. This tests the "affinity" behavior. + */ + @Test(groups = {"integration"}) + public void testPrimaryEndpointAffinityWhenHealthy() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + + for (int i = 0; i < 5; i++) { + List result = client.queryAll("SELECT " + i + " AS val"); + Assert.assertFalse(result.isEmpty()); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(i)); + } + } + } + + /** + * Verifies that insert operations also failover when the primary + * endpoint is down. + */ + @Test(groups = {"integration"}) + public void testInsertFailoverToBackupNode() throws Exception { + if (isCloud()) { + return; + } + + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + try (Client adminClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + adminClient.execute("DROP TABLE IF EXISTS failover_insert_test").get(10, TimeUnit.SECONDS).close(); + adminClient.execute("CREATE TABLE failover_insert_test (val UInt32) ENGINE MergeTree ORDER BY ()").get(10, TimeUnit.SECONDS).close(); + } + + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + String csvData = "42\n"; + client.insert("failover_insert_test", + new ByteArrayInputStream(csvData.getBytes()), + ClickHouseFormat.CSV).get(30, TimeUnit.SECONDS).close(); + } + + try (Client verifyClient = new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + List result = verifyClient.queryAll("SELECT val FROM failover_insert_test"); + Assert.assertFalse(result.isEmpty(), "Expected at least one row after failover insert"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(42)); + + verifyClient.execute("DROP TABLE IF EXISTS failover_insert_test").get(10, TimeUnit.SECONDS).close(); + } + } + + @Test(groups = {"integration"}) + public void testMultipleBackups() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) // working + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty()); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1)); + } + } + + @Test(groups = {"integration"}, expectedExceptions = ClientException.class) + public void testAllEndpointsDead() { + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(3) + .build()) { + + client.queryAll("SELECT 1 AS val"); + } + } + + @Test(groups = {"integration"}, expectedExceptions = ClientException.class) + public void testRetryLimitReached() { + try (Client client = new Client.Builder() + .addEndpoint("http://127.0.0.1:1") + .addEndpoint("http://127.0.0.1:2") + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .retryOnFailures(ClientFaultCause.ConnectTimeout, ClientFaultCause.NoHttpResponse) + .setMaxRetries(1) + .build()) { + + client.queryAll("SELECT 1 AS val"); + } + } + + @Test(groups = {"integration"}) + public void testDuplicateEndpointRegistrationAndOrderPreservation() throws Exception { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + String uriA = "http://127.0.0.1:1"; + String uriB = "http://127.0.0.1:2"; + + try (Client client = new Client.Builder() + .addEndpoint(uriA) + .addEndpoint(uriA) + .addEndpoint(uriB) + .addEndpoint(uriB) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build()) { + + java.lang.reflect.Field field = Client.class.getDeclaredField("endpoints"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + List endpoints = + (List) field.get(client); + + Assert.assertEquals(endpoints.size(), 2, "Should only have unique endpoints"); + Assert.assertEquals(endpoints.get(0).getURI().toString(), uriA + "/", "First endpoint should be " + uriA + "/"); + Assert.assertEquals(endpoints.get(1).getURI().toString(), uriB + "/", "Second endpoint should be " + uriB + "/"); + } + } + + @Test(groups = {"integration"}, expectedExceptions = IllegalArgumentException.class) + public void testNoEndpointsConfigured() { + new Client.Builder() + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .build(); + } + + @Test(groups = {"integration"}) + public void testHTTP503Failover() throws Exception { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + + WireMockServer mockServer = new WireMockServer(WireMockConfiguration.options().dynamicPort()); + mockServer.start(); + try { + mockServer.stubFor(WireMock.post(WireMock.anyUrl()) + .willReturn(WireMock.aResponse().withStatus(503).withBody("Service Unavailable"))); + + try (Client client = new Client.Builder() + .addEndpoint("http://localhost:" + mockServer.port()) + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()) + .setMaxRetries(3) + .build()) { + + List result = client.queryAll("SELECT 1 AS val"); + Assert.assertFalse(result.isEmpty(), "Expected at least one record"); + Assert.assertEquals(result.get(0).getInteger("val"), Integer.valueOf(1), + "Query should succeed via failover after 503"); + } + } finally { + mockServer.stop(); + } + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java b/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java new file mode 100644 index 000000000..cbc547175 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/api/transport/ClientNodeSelectorTest.java @@ -0,0 +1,119 @@ +package com.clickhouse.client.api.transport; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class ClientNodeSelectorTest { + + @Test + public void testEndpointStateQuarantineAndExpiry() throws InterruptedException { + Endpoint ep = new HttpEndpoint("localhost", 8123, false, "/"); + EndpointState state = new EndpointState(ep); + + Assert.assertTrue(state.isAlive(), "New endpoint state should be alive"); + Assert.assertSame(state.getEndpoint(), ep); + + state.markFailed(20); + Assert.assertFalse(state.isAlive(), "Endpoint state should not be alive immediately after failure"); + + Thread.sleep(40); + Assert.assertTrue(state.isAlive(), "Endpoint state should be alive again after quarantine expires"); + } + + @Test + public void testClientNodeSelectorAffinityAndQuarantine() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + Assert.assertEquals(selector.getEndpoint(), epA); + + Endpoint next = selector.getNextAliveNode(epA); + Assert.assertEquals(next, epB); + Assert.assertEquals(selector.getEndpoint(), epB); + + next = selector.getNextAliveNode(epB); + Assert.assertEquals(next, epC); + Assert.assertEquals(selector.getEndpoint(), epC); + + next = selector.getNextAliveNode(epC); + Assert.assertEquals(next, epA); + Assert.assertEquals(selector.getEndpoint(), epA); + } + + @Test + public void testClientNodeSelectorFallbackWhenAllDead() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB)); + + selector.getNextAliveNode(epA); + selector.getNextAliveNode(epB); + + Assert.assertEquals(selector.getEndpoint(), epA); + } + + @Test + public void testClientNodeSelectorSequentialFailures() { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + Assert.assertEquals(selector.getEndpoint(), epA); + + Assert.assertEquals(selector.getNextAliveNode(epA), epB); + Assert.assertEquals(selector.getNextAliveNode(epB), epC); + Assert.assertEquals(selector.getNextAliveNode(epC), epA); + } + + @Test + public void testClientNodeSelectorConcurrency() throws Exception { + Endpoint epA = new HttpEndpoint("localhost", 8123, false, "/"); + Endpoint epB = new HttpEndpoint("localhost", 8124, false, "/"); + Endpoint epC = new HttpEndpoint("localhost", 8125, false, "/"); + + ClientNodeSelector selector = new ClientNodeSelector(Arrays.asList(epA, epB, epC)); + + int threadCount = 10; + int loopCount = 1000; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threadCount; i++) { + futures.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + for (int j = 0; j < loopCount; j++) { + Endpoint ep = selector.getEndpoint(); + Assert.assertNotNull(ep); + if (j % 10 == 0) { + selector.getNextAliveNode(ep); + } + } + return null; + } + })); + } + + executor.shutdown(); + Assert.assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS)); + + for (Future future : futures) { + future.get(); + } + } +} From 881ecfb5740f70b3bc65b4c37617a864a9eb88d9 Mon Sep 17 00:00:00 2001 From: kishansinghifs1 Date: Tue, 16 Jun 2026 22:13:09 +0530 Subject: [PATCH 2/4] feat: add endpoint failover support with automatic retries and quarantine for client-v2 --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd347a3a8..8ef2c3f23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,8 @@ - **[jdbc-v2]** `ResultSet#getObject(int|String, Map>)` now accepts ClickHouse type names as map keys in addition to the JDBC `SQLType` names it has always accepted. Only unwrapped type names are used for the lookup — `Nullable(...)` and `LowCardinality(...)` wrappers are stripped and do not affect resolution, so a key like `"Int32"` matches both `Int32` and `Nullable(Int32)` columns; keys like `"Nullable(Int32)"` are not recognized. Lookup order is the `ClickHouseDataType` enum name (e.g. `"Int32"`, `"String"`, `"DateTime"`) then the JDBC `SQLType` name (e.g. `"INTEGER"`, `"VARCHAR"`, `"TIMESTAMP"`); a missing entry leaves the value uncoerced. The feature is supported for primitive ClickHouse types only — `Array`, `Tuple`, `Map`, `Nested`, and geometry types are not supported and continue to be returned in their native form regardless of the user-supplied map. Existing maps keyed only by JDBC `SQLType` names continue to work unchanged. +- **[client-v2]** Added endpoint failover support: when multiple endpoints are configured and a request fails with a retryable error (connect timeout, connection refused, HTTP 503, etc.), the client now automatically retries against the next available endpoint instead of always targeting the first one. Failed endpoints are quarantined for 30 seconds before being retried. (https://github.com/ClickHouse/clickhouse-java/issues/2855) + ### Bug Fixes - **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690) From f4a17cd05e162acd069f341a31f3f59f61ea6a18 Mon Sep 17 00:00:00 2001 From: kishansinghifs1 Date: Tue, 16 Jun 2026 22:28:42 +0530 Subject: [PATCH 3/4] fix: ensure HTTP 503 responses are returned for failover processing instead of throwing ServerException --- .../com/clickhouse/client/api/internal/HttpAPIClientHelper.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 5ae4730b7..6aa20de08 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -578,6 +578,8 @@ private ClassicHttpResponse doPostRequest(Map requestConfig, Htt } else if (httpResponse.getCode() == HttpStatus.SC_BAD_GATEWAY) { httpResponse.close(); throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings."); + } else if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { + return httpResponse; } else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) { try { throw readError(req, httpResponse); From 790cdebb03054aaaf0517f97673c8d6f98bfa25f Mon Sep 17 00:00:00 2001 From: kishansinghifs1 Date: Tue, 16 Jun 2026 22:33:38 +0530 Subject: [PATCH 4/4] fix: ensure stream is reset on 503 retry in insert loop --- .../src/main/java/com/clickhouse/client/api/Client.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index 288517916..5978b4f2e 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -1556,6 +1556,13 @@ public CompletableFuture insert(String tableName, if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) { LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime)); selectedEndpoint = getNextAliveNode(selectedEndpoint); + if (i < retries) { + try { + writer.onRetry(); + } catch (IOException ioe) { + throw new ClientException("Failed to reset stream before next attempt", ioe); + } + } continue; }