Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.oxia.client.batch.Operation.WriteOperation.PutOperation;
import io.oxia.client.grpc.OxiaStubManager;
import io.oxia.client.grpc.OxiaStubProvider;
import io.oxia.client.util.BlockedThreadChecker;
import io.oxia.client.metrics.Counter;
import io.oxia.client.metrics.InstrumentProvider;
import io.oxia.client.metrics.LatencyHistogram;
Expand Down Expand Up @@ -67,14 +68,16 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import lombok.NonNull;

class AsyncOxiaClientImpl implements AsyncOxiaClient {

static @NonNull CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig config) {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-client"));
var stubManager = new OxiaStubManager(config);
var blockedThreadChecker = BlockedThreadChecker.createIfEnabled();
var stubManager = new OxiaStubManager(config, blockedThreadChecker);

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
Expand Down Expand Up @@ -102,6 +105,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
readBatchManager,
writeBatchManager,
sessionManager,
blockedThreadChecker,
config.requestTimeout());
return shardManager.start().thenApply(v -> client);
}
Expand All @@ -114,6 +118,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
private final @NonNull BatchManager readBatchManager;
private final @NonNull BatchManager writeBatchManager;
private final @NonNull SessionManager sessionManager;
private final BlockedThreadChecker blockedThreadChecker;
private final long requestTimeoutMs;
private volatile boolean closed;

Expand Down Expand Up @@ -150,6 +155,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
@NonNull BatchManager readBatchManager,
@NonNull BatchManager writeBatchManager,
@NonNull SessionManager sessionManager,
BlockedThreadChecker blockedThreadChecker,
Duration requestTimeout) {
this.clientIdentifier = clientIdentifier;
this.instrumentProvider = instrumentProvider;
Expand All @@ -159,6 +165,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
this.readBatchManager = readBatchManager;
this.writeBatchManager = writeBatchManager;
this.sessionManager = sessionManager;
this.blockedThreadChecker = blockedThreadChecker;
this.scheduledExecutor = scheduledExecutor;
this.requestTimeoutMs = requestTimeout.toMillis();

Expand Down Expand Up @@ -863,6 +870,9 @@ public void close() throws Exception {
notificationManager.close();
shardManager.close();
stubManager.close();
if (blockedThreadChecker != null) {
blockedThreadChecker.close();
}
scheduledExecutor.shutdownNow();
}

Expand Down
124 changes: 93 additions & 31 deletions client/src/main/java/io/oxia/client/batch/ReadBatch.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,27 +17,38 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.stub.StreamObserver;
import io.oxia.client.grpc.OxiaStatus;
import io.oxia.client.grpc.OxiaStubProvider;
import io.oxia.proto.GetResponse;
import io.oxia.client.util.Backoff;
import io.oxia.proto.ReadRequest;
import io.oxia.proto.ReadResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

final class ReadBatch extends BatchBase implements Batch, StreamObserver<ReadResponse> {
@Slf4j
final class ReadBatch extends BatchBase implements Batch {

private final ReadBatchFactory factory;

@VisibleForTesting final List<Operation.ReadOperation.GetOperation> gets = new ArrayList<>();

private int responseIndex = 0;
private final Duration requestTimeout;
long startSendTimeNanos;

ReadBatch(ReadBatchFactory factory, OxiaStubProvider stubProvider, long shardId) {
ReadBatch(
ReadBatchFactory factory,
OxiaStubProvider stubProvider,
long shardId,
@NonNull Duration requestTimeout) {
super(stubProvider, shardId);
this.factory = factory;
this.requestTimeout = requestTimeout;
}

@Override
Expand All @@ -59,39 +70,90 @@ public int size() {
@Override
public void send() {
startSendTimeNanos = System.nanoTime();
try {
getStub().async().read(toProto(), this);
} catch (Throwable t) {
onError(t);
}
ReadRequest request = toProto();
long deadlineNanos = System.nanoTime() + requestTimeout.toNanos();

doRequestWithRetries(request, deadlineNanos, new Backoff())
.thenAccept(
response -> {
factory
.getReadRequestLatencyHistogram()
.recordSuccess(System.nanoTime() - startSendTimeNanos);

for (int i = 0; i < response.getGetsCount(); i++) {
gets.get(i).complete(response.getGets(i));
}
})
.exceptionally(
ex -> {
onError(ex);
return null;
});
}

@Override
public void onNext(ReadResponse response) {
for (int i = 0; i < response.getGetsCount(); i++) {
GetResponse gr = response.getGets(i);
gets.get(responseIndex).complete(gr);
CompletableFuture<ReadResponse> doRequestWithRetries(
ReadRequest request, long deadlineNanos, Backoff backoff) {
return doRequest(request)
.exceptionallyCompose(
ex -> {
if (!OxiaStatus.isRetriable(ex)) {
return CompletableFuture.failedFuture(ex);
}

++responseIndex;
long remainingMillis =
TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime());
if (remainingMillis <= 0) {
return CompletableFuture.failedFuture(ex);
}

long delayMillis = Math.min(backoff.nextDelayMillis(), remainingMillis);
log.warn(
"Read request failed, retrying. shard={} retry-after={}ms error={}",
getShardId(),
delayMillis,
ex.getMessage());

Executor delayed =
CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS);
return CompletableFuture.supplyAsync(() -> null, delayed)
.thenCompose(__ -> doRequestWithRetries(request, deadlineNanos, backoff));
});
}

private CompletableFuture<ReadResponse> doRequest(ReadRequest request) {
CompletableFuture<ReadResponse> future = new CompletableFuture<>();
try {
getStub()
.async()
.read(
request,
new StreamObserver<>() {
ReadResponse result;

@Override
public void onNext(ReadResponse response) {
result = response;
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onCompleted() {
future.complete(result);
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}

@Override
public void onError(Throwable batchError) {
gets.forEach(g -> g.fail(batchError));
factory.getReadRequestLatencyHistogram().recordFailure(System.nanoTime() - startSendTimeNanos);
}

@Override
public void onCompleted() {
// complete pending request if the server close stream without any response
gets.forEach(
g -> {
if (!g.callback().isDone()) {
g.fail(new CancellationException());
}
});
factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos);
gets.forEach(g -> g.fail(batchError));
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,6 @@ public ReadBatchFactory(

@Override
public Batch getBatch(long shardId) {
return new ReadBatch(this, stubProvider, shardId);
return new ReadBatch(this, stubProvider, shardId, getConfig().requestTimeout());
}
}
94 changes: 68 additions & 26 deletions client/src/main/java/io/oxia/client/batch/WriteBatch.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2025 The Oxia Authors
* Copyright © 2022-2026 The Oxia Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,13 +19,22 @@
import static java.util.stream.Collectors.toList;

import com.google.common.annotations.VisibleForTesting;
import io.oxia.client.grpc.OxiaStatus;
import io.oxia.client.grpc.OxiaStubProvider;
import io.oxia.client.session.SessionManager;
import io.oxia.client.util.Backoff;
import io.oxia.proto.WriteRequest;
import io.oxia.proto.WriteResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
final class WriteBatch extends BatchBase implements Batch {

private final WriteBatchFactory factory;
Expand All @@ -40,6 +49,7 @@ final class WriteBatch extends BatchBase implements Batch {

private final SessionManager sessionManager;
private final int maxBatchSize;
private final Duration requestTimeout;
private int byteSize;
private long bytes;
private long startSendTimeNanos;
Expand All @@ -49,12 +59,14 @@ final class WriteBatch extends BatchBase implements Batch {
@NonNull OxiaStubProvider stubProvider,
@NonNull SessionManager sessionManager,
long shardId,
int maxBatchSize) {
int maxBatchSize,
@NonNull Duration requestTimeout) {
super(stubProvider, shardId);
this.factory = factory;
this.sessionManager = sessionManager;
this.byteSize = 0;
this.maxBatchSize = maxBatchSize;
this.requestTimeout = requestTimeout;
}

int sizeOf(@NonNull Operation<?> operation) {
Expand Down Expand Up @@ -95,32 +107,62 @@ public int size() {
@Override
public void send() {
startSendTimeNanos = System.nanoTime();
WriteRequest request = toProto();
long deadlineNanos = System.nanoTime() + requestTimeout.toNanos();

doRequestWithRetries(request, deadlineNanos, new Backoff())
.thenAccept(
response -> {
factory.writeRequestLatencyHistogram.recordSuccess(
System.nanoTime() - startSendTimeNanos);

for (var i = 0; i < deletes.size(); i++) {
deletes.get(i).complete(response.getDeletes(i));
}
for (var i = 0; i < deleteRanges.size(); i++) {
deleteRanges.get(i).complete(response.getDeleteRanges(i));
}
for (var i = 0; i < puts.size(); i++) {
puts.get(i).complete(response.getPuts(i));
}
})
.exceptionally(
ex -> {
handleError(ex);
return null;
});
}

CompletableFuture<WriteResponse> doRequestWithRetries(
WriteRequest request, long deadlineNanos, Backoff backoff) {
CompletableFuture<WriteResponse> attempt;
try {
getWriteStream()
.send(toProto())
.thenAccept(
response -> {
factory.writeRequestLatencyHistogram.recordSuccess(
System.nanoTime() - startSendTimeNanos);

for (var i = 0; i < deletes.size(); i++) {
deletes.get(i).complete(response.getDeletes(i));
}
for (var i = 0; i < deleteRanges.size(); i++) {
deleteRanges.get(i).complete(response.getDeleteRanges(i));
}
for (var i = 0; i < puts.size(); i++) {
puts.get(i).complete(response.getPuts(i));
}
})
.exceptionally(
ex -> {
handleError(ex);
return null;
});
} catch (Throwable t) {
handleError(t);
attempt = getWriteStream().send(request);
} catch (Exception e) {
attempt = CompletableFuture.failedFuture(e);
}
return attempt.exceptionallyCompose(
ex -> {
if (!OxiaStatus.isRetriable(ex)) {
return CompletableFuture.failedFuture(ex);
}

long remainingMillis = TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime());
if (remainingMillis <= 0) {
return CompletableFuture.failedFuture(ex);
}

long delayMillis = Math.min(backoff.nextDelayMillis(), remainingMillis);
log.warn(
"Write request failed, retrying. shard={} retry-after={}ms error={}",
getShardId(),
delayMillis,
ex.getMessage());

Executor delayed = CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS);
return CompletableFuture.supplyAsync(() -> null, delayed)
.thenCompose(__ -> doRequestWithRetries(request, deadlineNanos, backoff));
});
}

public void handleError(Throwable batchError) {
Expand Down
Loading
Loading