Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@

- **[jdbc-v2]** `ResultSet#getObject(int|String, Map<String, Class<?>>)` now accepts ClickHouse type names as map keys in addition to the JDBC `SQLType` names it has always accepted. Only unwrapped type names are used for the lookup — `Nullable(...)` and `LowCardinality(...)` wrappers are stripped and do not affect resolution, so a key like `"Int32"` matches both `Int32` and `Nullable(Int32)` columns; keys like `"Nullable(Int32)"` are not recognized. Lookup order is the `ClickHouseDataType` enum name (e.g. `"Int32"`, `"String"`, `"DateTime"`) then the JDBC `SQLType` name (e.g. `"INTEGER"`, `"VARCHAR"`, `"TIMESTAMP"`); a missing entry leaves the value uncoerced. The feature is supported for primitive ClickHouse types only — `Array`, `Tuple`, `Map`, `Nested`, and geometry types are not supported and continue to be returned in their native form regardless of the user-supplied map. Existing maps keyed only by JDBC `SQLType` names continue to work unchanged.

- **[client-v2]** Added endpoint failover support: when multiple endpoints are configured and a request fails with a retryable error (connect timeout, connection refused, HTTP 503, etc.), the client now automatically retries against the next available endpoint instead of always targeting the first one. Failed endpoints are quarantined for 30 seconds before being retried. (https://github.com/ClickHouse/clickhouse-java/issues/2855)

### Bug Fixes

- **[jdbc-v2]** Fixed `Statement.cancel()` throwing `SESSION_IS_LOCKED` when the statement was running inside a ClickHouse session (e.g. via `clickhouse_setting_session_id`). The `KILL QUERY` request issued by `cancel()` now runs outside the session, so it no longer contends with the running query for the session lock. (https://github.com/ClickHouse/clickhouse-java/issues/2690)
Expand Down
51 changes: 38 additions & 13 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.clickhouse.client.api.serde.POJOFieldDeserializer;
import com.clickhouse.client.api.serde.POJOFieldSerializer;
import com.clickhouse.client.api.serde.POJOSerDe;
import com.clickhouse.client.api.transport.ClientNodeSelector;
import com.clickhouse.client.api.transport.Endpoint;
import com.clickhouse.client.api.transport.HttpEndpoint;
import com.clickhouse.client.config.ClickHouseClientOption;
Expand All @@ -60,8 +61,8 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -141,6 +142,7 @@ public class Client implements AutoCloseable {
private final int retries;
private LZ4Factory lz4Factory = null;
private final Supplier<String> queryIdGenerator;
private final ClientNodeSelector nodeSelector;
private final CredentialsManager credentialsManager;

private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
Expand Down Expand Up @@ -185,6 +187,7 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
}

this.endpoints = tmpEndpoints.build();
this.nodeSelector = new ClientNodeSelector(this.endpoints);

String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
this.retries = retry == null ? 0 : Integer.parseInt(retry);
Expand Down Expand Up @@ -270,7 +273,7 @@ public static class Builder {
private Supplier<String> queryIdGenerator;

public Builder() {
this.endpoints = new HashSet<>();
this.endpoints = new LinkedHashSet<>();
this.configuration = new HashMap<>();

for (ClientConfigProperties p : ClientConfigProperties.values()) {
Expand Down Expand Up @@ -1314,7 +1317,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
Supplier<InsertResponse> supplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Endpoint selectedEndpoint = getNextAliveNode();
Endpoint selectedEndpoint = getEndpoint();

RuntimeException lastException = null;
for (int i = 0; i <= maxRetries; i++) {
Expand Down Expand Up @@ -1344,7 +1347,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
// Check response
if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
Comment thread
cursor[bot] marked this conversation as resolved.
continue;
}

Expand All @@ -1361,7 +1364,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
} else {
throw lastException;
}
Expand Down Expand Up @@ -1536,7 +1539,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Endpoint selectedEndpoint = getNextAliveNode();
Endpoint selectedEndpoint = getEndpoint();

RuntimeException lastException = null;
for (int i = 0; i <= retries; i++) {
Expand All @@ -1552,7 +1555,14 @@ public CompletableFuture<InsertResponse> insert(String tableName,
// Check response
if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
Comment thread
cursor[bot] marked this conversation as resolved.
if (i < retries) {
try {
writer.onRetry();
} catch (IOException ioe) {
throw new ClientException("Failed to reset stream before next attempt", ioe);
}
}
continue;
}

Expand All @@ -1568,7 +1578,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
} else {
throw lastException;
}
Expand Down Expand Up @@ -1665,7 +1675,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
Supplier<QueryResponse> responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Endpoint selectedEndpoint = getNextAliveNode();
Endpoint selectedEndpoint = getEndpoint();
RuntimeException lastException = null;
for (int i = 0; i <= retries; i++) {
ClassicHttpResponse httpResponse = null;
Expand All @@ -1682,7 +1692,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
// Check response
if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
HttpAPIClientHelper.closeQuietly(httpResponse);
continue;
}
Expand Down Expand Up @@ -1710,7 +1720,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
selectedEndpoint = getNextAliveNode(selectedEndpoint);
} else {
throw lastException;
}
Expand Down Expand Up @@ -2222,8 +2232,23 @@ public void updateAccessToken(String accessToken) {
this.credentialsManager.setAccessToken(accessToken);
}

private Endpoint getNextAliveNode() {
return endpoints.get(0);
/**
* Returns the first alive endpoint for the initial request attempt.
* Keeps affinity to the primary server when it is healthy.
*/
private Endpoint getEndpoint() {
return nodeSelector.getEndpoint();
}

/**
* Quarantines the failed endpoint and returns the next alive endpoint.
* Used inside retry loops after a retryable failure.
*
* @param failedEndpoint the endpoint that just failed
* @return the next alive endpoint
*/
private Endpoint getNextAliveNode(Endpoint failedEndpoint) {
return nodeSelector.getNextAliveNode(failedEndpoint);
}

public static final String VALUES_LIST_DELIMITER = ",";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ public String getQueryId() {
}

private boolean discoverIsRetryable(int code, String message, int transportProtocolCode) {
//Let's check if we have a ServerException to reference the error code
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
if (transportProtocolCode == 503) {
return true;
}
// Let's check if we have a ServerException to reference the error code
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
switch (code) { // UNEXPECTED_END_OF_FILE
case 3: // UNEXPECTED_END_OF_FILE
case 107: // FILE_DOESNT_EXIST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ private ClassicHttpResponse doPostRequest(Map<String, Object> requestConfig, Htt
} else if (httpResponse.getCode() == HttpStatus.SC_BAD_GATEWAY) {
httpResponse.close();
throw new ClientException("Server returned '502 Bad gateway'. Check network and proxy settings.");
} else if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
return httpResponse;
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST || httpResponse.containsHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE)) {
try {
throw readError(req, httpResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.clickhouse.client.api.transport;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* So we will look up for the first-alive node and then assign that node to
* client to talk.
*
* <p>Endpoints are tried in the order they were registered (index 0 is the
* "primary"). When a request fails, the failed endpoint is quarantined for
* {@link #DEFAULT_QUARANTINE_MS} milliseconds and the next alive endpoint
* is returned. Once the quarantine expires the node automatically becomes
* eligible again, so traffic returns to the primary without any explicit
* reset.</p>
*
* <p>If all endpoints are quarantined, the primary (index 0) is returned
* as a fallback to avoid a complete lockout.</p>
*
* <p>This class is thread-safe: concurrent callers may invoke
* {@link #getEndpoint()} and {@link #getNextAliveNode(Endpoint)}
* from different threads.</p>
*/
public class ClientNodeSelector {

private static final Logger LOG = LoggerFactory.getLogger(ClientNodeSelector.class);

static final long DEFAULT_QUARANTINE_MS = 30_000;

private final List<EndpointState> endpointStates;

public ClientNodeSelector(List<Endpoint> endpoints) {
List<EndpointState> states = new ArrayList<>(endpoints.size());
for (Endpoint ep : endpoints) {
states.add(new EndpointState(ep));
}
this.endpointStates = Collections.unmodifiableList(states);
}

public Endpoint getEndpoint() {
for (EndpointState state : endpointStates) {
if (state.isAlive()) {
return state.getEndpoint();
}
}
LOG.warn("All endpoints are non-responsive, falling back to primary endpoint");
return endpointStates.get(0).getEndpoint();
}

public Endpoint getNextAliveNode(Endpoint failedEndpoint) {
for (EndpointState state : endpointStates) {
if (state.getEndpoint().equals(failedEndpoint)) {
state.markFailed(DEFAULT_QUARANTINE_MS);
LOG.warn("Endpoint {} quarantined for {} ms", failedEndpoint.getHost(), DEFAULT_QUARANTINE_MS);
break;
}
}
return getEndpoint();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.clickhouse.client.api.transport;

/**
*{@link Endpoint} is wrapped to track for failover.
* When a node fails, it can be quarantined for a fixed duration and after
* it is expired, it will be considered as alive again.
*/
class EndpointState {

private final Endpoint endpoint;

private volatile long failedUntil;

EndpointState(Endpoint endpoint) {
this.endpoint = endpoint;
this.failedUntil = 0;
}

Endpoint getEndpoint() {
return endpoint;
}

void markFailed(long quarantineMs) {
this.failedUntil = System.currentTimeMillis() + quarantineMs;
}

boolean isAlive() {
return System.currentTimeMillis() >= failedUntil;
}
}
Loading
Loading