From bf876595b0960ba0099cb2f424ba09d6375e4b86 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 11:37:43 +0800 Subject: [PATCH 1/6] Replace CustomStatusCode with OxiaStatus enum - Map all 12 Oxia gRPC status codes with retriability flags - Extract codes from grpc-status-details-bin trailer with description-based fallback for plain gRPC errors - Update ShardManager to use OxiaStatus for namespace-not-found Co-Authored-By: Claude Opus 4.6 --- .../io/oxia/client/grpc/CustomStatusCode.java | 34 ---- .../java/io/oxia/client/grpc/OxiaStatus.java | 169 ++++++++++++++++++ .../io/oxia/client/shard/ShardManager.java | 28 +-- .../io/oxia/client/grpc/OxiaStatusTest.java | 142 +++++++++++++++ 4 files changed, 319 insertions(+), 54 deletions(-) delete mode 100644 client/src/main/java/io/oxia/client/grpc/CustomStatusCode.java create mode 100644 client/src/main/java/io/oxia/client/grpc/OxiaStatus.java create mode 100644 client/src/test/java/io/oxia/client/grpc/OxiaStatusTest.java diff --git a/client/src/main/java/io/oxia/client/grpc/CustomStatusCode.java b/client/src/main/java/io/oxia/client/grpc/CustomStatusCode.java deleted file mode 100644 index 4c27ba14..00000000 --- a/client/src/main/java/io/oxia/client/grpc/CustomStatusCode.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright © 2022-2025 The Oxia Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.oxia.client.grpc; - -import java.util.Objects; -import lombok.NonNull; - -/** Customised GRPC status code. */ -public enum CustomStatusCode { - ErrorNamespaceNotFound, - // fallback to Status.code - UNKNOWN; - - public static @NonNull CustomStatusCode fromDescription(String description) { - Objects.requireNonNull(description); - return switch (description) { - case "oxia: namespace not found" -> ErrorNamespaceNotFound; - default -> UNKNOWN; - }; - } -} diff --git a/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java b/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java new file mode 100644 index 00000000..57f2b943 --- /dev/null +++ b/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java @@ -0,0 +1,169 @@ +/* + * Copyright © 2022-2026 The Oxia Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.oxia.client.grpc; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.ProtoUtils; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.annotation.Nullable; + +/** + * Custom Oxia gRPC status codes, matching the Go server definitions in + * common/constant/grpc_errors.go. + */ +public enum OxiaStatus { + NOT_INITIALIZED(100, false), + INVALID_TERM(101, false), + INVALID_STATUS(102, true), + CANCELLED(103, false), + ALREADY_CLOSED(104, true), + LEADER_ALREADY_CONNECTED(105, false), + NODE_IS_NOT_LEADER(106, true), + NODE_IS_NOT_FOLLOWER(107, false), + SESSION_NOT_FOUND(108, false), + INVALID_SESSION_TIMEOUT(109, false), + NAMESPACE_NOT_FOUND(110, false), + NOTIFICATIONS_NOT_ENABLED(111, false), + UNKNOWN(-1, false); + + private final int code; + private final boolean retriable; + + OxiaStatus(int code, boolean retriable) { + this.code = code; + this.retriable = retriable; + } + + public int code() { + return code; + } + + public boolean isRetriable() { + return retriable; + } + + private static final Map BY_CODE = + Stream.of(values()) + .filter(s -> s.code >= 0) + .collect(Collectors.toMap(OxiaStatus::code, Function.identity())); + + public static OxiaStatus fromCode(int code) { + return BY_CODE.getOrDefault(code, UNKNOWN); + } + + // ---- Static helpers for extracting status from gRPC errors ---- + + private static final Metadata.Key STATUS_DETAILS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", + ProtoUtils.metadataMarshaller(com.google.rpc.Status.getDefaultInstance())); + + /** + * Returns whether the error is retriable. Standard gRPC UNAVAILABLE and custom Oxia retriable + * codes return true. + */ + public static boolean isRetriable(@Nullable Throwable err) { + if (err == null) { + return false; + } + + Status grpcStatus = grpcStatusFromThrowable(err); + if (grpcStatus != null && grpcStatus.getCode() == Status.Code.UNAVAILABLE) { + return true; + } + + return fromError(err).retriable; + } + + /** + * Extracts the OxiaStatus from a gRPC error. Reads the custom code from the + * grpc-status-details-bin trailer, then falls back to the gRPC status description. The fallback + * is needed because Go gRPC only sends the trailer when details are explicitly attached. + */ + public static OxiaStatus fromError(@Nullable Throwable err) { + if (err == null) { + return UNKNOWN; + } + com.google.rpc.Status rpcStatus = rpcStatusFromThrowable(err); + if (rpcStatus != null) { + return fromCode(rpcStatus.getCode()); + } + Status grpcStatus = grpcStatusFromThrowable(err); + if (grpcStatus != null && grpcStatus.getDescription() != null) { + return fromDescription(grpcStatus.getDescription()); + } + return UNKNOWN; + } + + static OxiaStatus fromDescription(String description) { + if (description.startsWith("oxia: namespace not found")) { + return NAMESPACE_NOT_FOUND; + } + return UNKNOWN; + } + + @Nullable + private static com.google.rpc.Status rpcStatusFromThrowable(@Nullable Throwable err) { + if (err == null) { + return null; + } + Metadata trailers = trailersFromThrowable(err); + if (trailers != null) { + com.google.rpc.Status status = trailers.get(STATUS_DETAILS_KEY); + if (status != null) { + return status; + } + } + Throwable cause = err.getCause(); + if (cause != null && cause != err) { + return rpcStatusFromThrowable(cause); + } + return null; + } + + @Nullable + private static Status grpcStatusFromThrowable(@Nullable Throwable err) { + if (err == null) { + return null; + } + if (err instanceof StatusRuntimeException sre) { + return sre.getStatus(); + } else if (err instanceof StatusException se) { + return se.getStatus(); + } + Throwable cause = err.getCause(); + if (cause != null && cause != err) { + return grpcStatusFromThrowable(cause); + } + return null; + } + + @Nullable + private static Metadata trailersFromThrowable(@Nullable Throwable err) { + if (err instanceof StatusRuntimeException sre) { + return sre.getTrailers(); + } else if (err instanceof StatusException se) { + return se.getTrailers(); + } + return null; + } +} diff --git a/client/src/main/java/io/oxia/client/shard/ShardManager.java b/client/src/main/java/io/oxia/client/shard/ShardManager.java index c41554c0..7381628e 100644 --- a/client/src/main/java/io/oxia/client/shard/ShardManager.java +++ b/client/src/main/java/io/oxia/client/shard/ShardManager.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,10 @@ import static java.util.stream.Collectors.toSet; import com.google.common.annotations.VisibleForTesting; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.common.Attributes; import io.oxia.client.CompositeConsumer; -import io.oxia.client.grpc.CustomStatusCode; +import io.oxia.client.grpc.OxiaStatus; import io.oxia.client.grpc.OxiaStub; import io.oxia.client.metrics.Counter; import io.oxia.client.metrics.InstrumentProvider; @@ -124,22 +122,12 @@ public void onError(Throwable error) { return; } - if (error instanceof StatusRuntimeException statusError) { - var status = statusError.getStatus(); - if (status.getCode() == Status.Code.UNKNOWN) { - // Suppress unknown errors - final var description = status.getDescription(); - if (description != null) { - var customStatusCode = CustomStatusCode.fromDescription(description); - if (customStatusCode == CustomStatusCode.ErrorNamespaceNotFound) { - log.error("Namespace not found: {}", assignments.getNamespace()); - if (!initialAssignmentsFuture.isDone()) { - if (initialAssignmentsFuture.completeExceptionally( - new NamespaceNotFoundException(assignments.getNamespace()))) { - close(); - } - } - } + if (OxiaStatus.fromError(error) == OxiaStatus.NAMESPACE_NOT_FOUND) { + log.error("Namespace not found: {}", assignments.getNamespace()); + if (!initialAssignmentsFuture.isDone()) { + if (initialAssignmentsFuture.completeExceptionally( + new NamespaceNotFoundException(assignments.getNamespace()))) { + close(); } } } diff --git a/client/src/test/java/io/oxia/client/grpc/OxiaStatusTest.java b/client/src/test/java/io/oxia/client/grpc/OxiaStatusTest.java new file mode 100644 index 00000000..d930373a --- /dev/null +++ b/client/src/test/java/io/oxia/client/grpc/OxiaStatusTest.java @@ -0,0 +1,142 @@ +/* + * Copyright © 2022-2026 The Oxia Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.oxia.client.grpc; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.ProtoUtils; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class OxiaStatusTest { + + private static final Metadata.Key STATUS_DETAILS_KEY = + Metadata.Key.of( + "grpc-status-details-bin", + ProtoUtils.metadataMarshaller(com.google.rpc.Status.getDefaultInstance())); + + /** Simulates a gRPC error with the custom Oxia code in grpc-status-details-bin trailer. */ + private static StatusRuntimeException oxiaErrorWithTrailer( + OxiaStatus oxiaStatus, String message) { + var rpcStatus = + com.google.rpc.Status.newBuilder().setCode(oxiaStatus.code()).setMessage(message).build(); + Metadata trailers = new Metadata(); + trailers.put(STATUS_DETAILS_KEY, rpcStatus); + return Status.UNKNOWN.withDescription(message).asRuntimeException(trailers); + } + + /** Simulates a plain gRPC error without trailer (how Go server sends simple errors). */ + private static StatusRuntimeException oxiaErrorWithoutTrailer(String message) { + return Status.UNKNOWN.withDescription(message).asRuntimeException(); + } + + @Nested + class FromCode { + @Test + void knownCodes() { + assertThat(OxiaStatus.fromCode(100)).isEqualTo(OxiaStatus.NOT_INITIALIZED); + assertThat(OxiaStatus.fromCode(106)).isEqualTo(OxiaStatus.NODE_IS_NOT_LEADER); + assertThat(OxiaStatus.fromCode(110)).isEqualTo(OxiaStatus.NAMESPACE_NOT_FOUND); + } + + @Test + void unknownCode() { + assertThat(OxiaStatus.fromCode(999)).isEqualTo(OxiaStatus.UNKNOWN); + } + } + + @Nested + class FromError { + @Test + void extractsFromTrailer() { + var err = + oxiaErrorWithTrailer(OxiaStatus.NODE_IS_NOT_LEADER, "node is not leader for shard 0"); + assertThat(OxiaStatus.fromError(err)).isEqualTo(OxiaStatus.NODE_IS_NOT_LEADER); + } + + @Test + void fallsBackToDescriptionWhenNoTrailer() { + var err = oxiaErrorWithoutTrailer("oxia: namespace not found"); + assertThat(OxiaStatus.fromError(err)).isEqualTo(OxiaStatus.NAMESPACE_NOT_FOUND); + } + + @Test + void returnsUnknownForPlainGrpcError() { + var err = Status.INTERNAL.asRuntimeException(); + assertThat(OxiaStatus.fromError(err)).isEqualTo(OxiaStatus.UNKNOWN); + } + + @Test + void returnsUnknownForNull() { + assertThat(OxiaStatus.fromError(null)).isEqualTo(OxiaStatus.UNKNOWN); + } + } + + @Nested + class IsRetriableTests { + + @Test + void nullIsNotRetriable() { + assertThat(OxiaStatus.isRetriable(null)).isFalse(); + } + + @Test + void unavailableIsRetriable() { + var err = Status.UNAVAILABLE.withDescription("connection refused").asRuntimeException(); + assertThat(OxiaStatus.isRetriable(err)).isTrue(); + } + + @Test + void unknownIsNotRetriable() { + var err = Status.UNKNOWN.asRuntimeException(); + assertThat(OxiaStatus.isRetriable(err)).isFalse(); + } + + @Test + void nodeIsNotLeaderIsRetriable() { + var err = + oxiaErrorWithTrailer(OxiaStatus.NODE_IS_NOT_LEADER, "node is not leader for shard 0"); + assertThat(OxiaStatus.isRetriable(err)).isTrue(); + } + + @Test + void invalidStatusIsRetriable() { + var err = oxiaErrorWithTrailer(OxiaStatus.INVALID_STATUS, "oxia: invalid status"); + assertThat(OxiaStatus.isRetriable(err)).isTrue(); + } + + @Test + void alreadyClosedIsRetriable() { + var err = oxiaErrorWithTrailer(OxiaStatus.ALREADY_CLOSED, "oxia: resource is already closed"); + assertThat(OxiaStatus.isRetriable(err)).isTrue(); + } + + @Test + void sessionNotFoundIsNotRetriable() { + var err = oxiaErrorWithTrailer(OxiaStatus.SESSION_NOT_FOUND, "oxia: session not found"); + assertThat(OxiaStatus.isRetriable(err)).isFalse(); + } + + @Test + void namespaceNotFoundIsNotRetriable() { + var err = oxiaErrorWithTrailer(OxiaStatus.NAMESPACE_NOT_FOUND, "oxia: namespace not found"); + assertThat(OxiaStatus.isRetriable(err)).isFalse(); + } + } +} From e6f7354bc6d1790bbf5bfb494e75a633fae67d43 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 12:15:04 +0800 Subject: [PATCH 2/6] Simplify OxiaStatus.fromError() by inlining namespace-not-found fallback Remove the separate fromDescription() method and inline the description-based check directly in fromError(). The fallback is needed because Go gRPC only sends grpc-status-details-bin when WithDetails() is used. Co-Authored-By: Claude Opus 4.6 --- .../java/io/oxia/client/grpc/OxiaStatus.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java b/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java index 57f2b943..f464eb9d 100644 --- a/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java +++ b/client/src/main/java/io/oxia/client/grpc/OxiaStatus.java @@ -94,11 +94,7 @@ public static boolean isRetriable(@Nullable Throwable err) { return fromError(err).retriable; } - /** - * Extracts the OxiaStatus from a gRPC error. Reads the custom code from the - * grpc-status-details-bin trailer, then falls back to the gRPC status description. The fallback - * is needed because Go gRPC only sends the trailer when details are explicitly attached. - */ + /** Extracts the OxiaStatus from a gRPC error's grpc-status-details-bin trailer. */ public static OxiaStatus fromError(@Nullable Throwable err) { if (err == null) { return UNKNOWN; @@ -107,15 +103,12 @@ public static OxiaStatus fromError(@Nullable Throwable err) { if (rpcStatus != null) { return fromCode(rpcStatus.getCode()); } + // Fallback: Go gRPC only sends the trailer when details are attached. + // For plain errors, match the description. Status grpcStatus = grpcStatusFromThrowable(err); - if (grpcStatus != null && grpcStatus.getDescription() != null) { - return fromDescription(grpcStatus.getDescription()); - } - return UNKNOWN; - } - - static OxiaStatus fromDescription(String description) { - if (description.startsWith("oxia: namespace not found")) { + if (grpcStatus != null + && grpcStatus.getDescription() != null + && grpcStatus.getDescription().startsWith("oxia: namespace not found")) { return NAMESPACE_NOT_FOUND; } return UNKNOWN; From 92b31cae14389c20107fa3d9ca7281651f4fce57 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 11:37:57 +0800 Subject: [PATCH 3/6] Add async retry with backoff for batch operations - WriteBatch and ReadBatch retry on retriable errors using CompletableFuture.exceptionallyCompose() with exponential backoff - Retry is non-blocking: uses CompletableFuture.delayedExecutor() instead of Thread.sleep() to avoid blocking the batcher thread - Convert ReadBatch from StreamObserver to CompletableFuture-based to support retry composition - Bounded by requestTimeout deadline Co-Authored-By: Claude Opus 4.6 --- .../java/io/oxia/client/batch/ReadBatch.java | 124 +++++++++++++----- .../oxia/client/batch/ReadBatchFactory.java | 4 +- .../java/io/oxia/client/batch/WriteBatch.java | 94 +++++++++---- .../oxia/client/batch/WriteBatchFactory.java | 10 +- .../java/io/oxia/client/batch/BatchTest.java | 17 ++- 5 files changed, 184 insertions(+), 65 deletions(-) diff --git a/client/src/main/java/io/oxia/client/batch/ReadBatch.java b/client/src/main/java/io/oxia/client/batch/ReadBatch.java index 09e0b752..5791cf5c 100644 --- a/client/src/main/java/io/oxia/client/batch/ReadBatch.java +++ b/client/src/main/java/io/oxia/client/batch/ReadBatch.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,27 +17,38 @@ import com.google.common.annotations.VisibleForTesting; import io.grpc.stub.StreamObserver; +import io.oxia.client.grpc.OxiaStatus; import io.oxia.client.grpc.OxiaStubProvider; -import io.oxia.proto.GetResponse; +import io.oxia.client.util.Backoff; import io.oxia.proto.ReadRequest; import io.oxia.proto.ReadResponse; +import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; -final class ReadBatch extends BatchBase implements Batch, StreamObserver { +@Slf4j +final class ReadBatch extends BatchBase implements Batch { private final ReadBatchFactory factory; @VisibleForTesting final List gets = new ArrayList<>(); - private int responseIndex = 0; + private final Duration requestTimeout; long startSendTimeNanos; - ReadBatch(ReadBatchFactory factory, OxiaStubProvider stubProvider, long shardId) { + ReadBatch( + ReadBatchFactory factory, + OxiaStubProvider stubProvider, + long shardId, + @NonNull Duration requestTimeout) { super(stubProvider, shardId); this.factory = factory; + this.requestTimeout = requestTimeout; } @Override @@ -59,39 +70,90 @@ public int size() { @Override public void send() { startSendTimeNanos = System.nanoTime(); - try { - getStub().async().read(toProto(), this); - } catch (Throwable t) { - onError(t); - } + ReadRequest request = toProto(); + long deadlineNanos = System.nanoTime() + requestTimeout.toNanos(); + + doRequestWithRetries(request, deadlineNanos, new Backoff()) + .thenAccept( + response -> { + factory + .getReadRequestLatencyHistogram() + .recordSuccess(System.nanoTime() - startSendTimeNanos); + + for (int i = 0; i < response.getGetsCount(); i++) { + gets.get(i).complete(response.getGets(i)); + } + }) + .exceptionally( + ex -> { + onError(ex); + return null; + }); } - @Override - public void onNext(ReadResponse response) { - for (int i = 0; i < response.getGetsCount(); i++) { - GetResponse gr = response.getGets(i); - gets.get(responseIndex).complete(gr); + CompletableFuture doRequestWithRetries( + ReadRequest request, long deadlineNanos, Backoff backoff) { + return doRequest(request) + .exceptionallyCompose( + ex -> { + if (!OxiaStatus.isRetriable(ex)) { + return CompletableFuture.failedFuture(ex); + } - ++responseIndex; + long remainingMillis = + TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime()); + if (remainingMillis <= 0) { + return CompletableFuture.failedFuture(ex); + } + + long delayMillis = Math.min(backoff.nextDelayMillis(), remainingMillis); + log.warn( + "Read request failed, retrying. shard={} retry-after={}ms error={}", + getShardId(), + delayMillis, + ex.getMessage()); + + Executor delayed = + CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayed) + .thenCompose(__ -> doRequestWithRetries(request, deadlineNanos, backoff)); + }); + } + + private CompletableFuture doRequest(ReadRequest request) { + CompletableFuture future = new CompletableFuture<>(); + try { + getStub() + .async() + .read( + request, + new StreamObserver<>() { + ReadResponse result; + + @Override + public void onNext(ReadResponse response) { + result = response; + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + future.complete(result); + } + }); + } catch (Exception e) { + future.completeExceptionally(e); } + return future; } - @Override public void onError(Throwable batchError) { - gets.forEach(g -> g.fail(batchError)); factory.getReadRequestLatencyHistogram().recordFailure(System.nanoTime() - startSendTimeNanos); - } - - @Override - public void onCompleted() { - // complete pending request if the server close stream without any response - gets.forEach( - g -> { - if (!g.callback().isDone()) { - g.fail(new CancellationException()); - } - }); - factory.getReadRequestLatencyHistogram().recordSuccess(System.nanoTime() - startSendTimeNanos); + gets.forEach(g -> g.fail(batchError)); } @NonNull diff --git a/client/src/main/java/io/oxia/client/batch/ReadBatchFactory.java b/client/src/main/java/io/oxia/client/batch/ReadBatchFactory.java index 92b055c0..16969fb3 100644 --- a/client/src/main/java/io/oxia/client/batch/ReadBatchFactory.java +++ b/client/src/main/java/io/oxia/client/batch/ReadBatchFactory.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,6 @@ public ReadBatchFactory( @Override public Batch getBatch(long shardId) { - return new ReadBatch(this, stubProvider, shardId); + return new ReadBatch(this, stubProvider, shardId, getConfig().requestTimeout()); } } diff --git a/client/src/main/java/io/oxia/client/batch/WriteBatch.java b/client/src/main/java/io/oxia/client/batch/WriteBatch.java index 26e394b7..3d6245c5 100644 --- a/client/src/main/java/io/oxia/client/batch/WriteBatch.java +++ b/client/src/main/java/io/oxia/client/batch/WriteBatch.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,13 +19,22 @@ import static java.util.stream.Collectors.toList; import com.google.common.annotations.VisibleForTesting; +import io.oxia.client.grpc.OxiaStatus; import io.oxia.client.grpc.OxiaStubProvider; import io.oxia.client.session.SessionManager; +import io.oxia.client.util.Backoff; import io.oxia.proto.WriteRequest; +import io.oxia.proto.WriteResponse; +import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +@Slf4j final class WriteBatch extends BatchBase implements Batch { private final WriteBatchFactory factory; @@ -40,6 +49,7 @@ final class WriteBatch extends BatchBase implements Batch { private final SessionManager sessionManager; private final int maxBatchSize; + private final Duration requestTimeout; private int byteSize; private long bytes; private long startSendTimeNanos; @@ -49,12 +59,14 @@ final class WriteBatch extends BatchBase implements Batch { @NonNull OxiaStubProvider stubProvider, @NonNull SessionManager sessionManager, long shardId, - int maxBatchSize) { + int maxBatchSize, + @NonNull Duration requestTimeout) { super(stubProvider, shardId); this.factory = factory; this.sessionManager = sessionManager; this.byteSize = 0; this.maxBatchSize = maxBatchSize; + this.requestTimeout = requestTimeout; } int sizeOf(@NonNull Operation operation) { @@ -95,32 +107,62 @@ public int size() { @Override public void send() { startSendTimeNanos = System.nanoTime(); + WriteRequest request = toProto(); + long deadlineNanos = System.nanoTime() + requestTimeout.toNanos(); + + doRequestWithRetries(request, deadlineNanos, new Backoff()) + .thenAccept( + response -> { + factory.writeRequestLatencyHistogram.recordSuccess( + System.nanoTime() - startSendTimeNanos); + + for (var i = 0; i < deletes.size(); i++) { + deletes.get(i).complete(response.getDeletes(i)); + } + for (var i = 0; i < deleteRanges.size(); i++) { + deleteRanges.get(i).complete(response.getDeleteRanges(i)); + } + for (var i = 0; i < puts.size(); i++) { + puts.get(i).complete(response.getPuts(i)); + } + }) + .exceptionally( + ex -> { + handleError(ex); + return null; + }); + } + + CompletableFuture doRequestWithRetries( + WriteRequest request, long deadlineNanos, Backoff backoff) { + CompletableFuture attempt; try { - getWriteStream() - .send(toProto()) - .thenAccept( - response -> { - factory.writeRequestLatencyHistogram.recordSuccess( - System.nanoTime() - startSendTimeNanos); - - for (var i = 0; i < deletes.size(); i++) { - deletes.get(i).complete(response.getDeletes(i)); - } - for (var i = 0; i < deleteRanges.size(); i++) { - deleteRanges.get(i).complete(response.getDeleteRanges(i)); - } - for (var i = 0; i < puts.size(); i++) { - puts.get(i).complete(response.getPuts(i)); - } - }) - .exceptionally( - ex -> { - handleError(ex); - return null; - }); - } catch (Throwable t) { - handleError(t); + attempt = getWriteStream().send(request); + } catch (Exception e) { + attempt = CompletableFuture.failedFuture(e); } + return attempt.exceptionallyCompose( + ex -> { + if (!OxiaStatus.isRetriable(ex)) { + return CompletableFuture.failedFuture(ex); + } + + long remainingMillis = TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime()); + if (remainingMillis <= 0) { + return CompletableFuture.failedFuture(ex); + } + + long delayMillis = Math.min(backoff.nextDelayMillis(), remainingMillis); + log.warn( + "Write request failed, retrying. shard={} retry-after={}ms error={}", + getShardId(), + delayMillis, + ex.getMessage()); + + Executor delayed = CompletableFuture.delayedExecutor(delayMillis, TimeUnit.MILLISECONDS); + return CompletableFuture.supplyAsync(() -> null, delayed) + .thenCompose(__ -> doRequestWithRetries(request, deadlineNanos, backoff)); + }); } public void handleError(Throwable batchError) { diff --git a/client/src/main/java/io/oxia/client/batch/WriteBatchFactory.java b/client/src/main/java/io/oxia/client/batch/WriteBatchFactory.java index 551c1235..093f92d4 100644 --- a/client/src/main/java/io/oxia/client/batch/WriteBatchFactory.java +++ b/client/src/main/java/io/oxia/client/batch/WriteBatchFactory.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,12 @@ public WriteBatchFactory( @Override public Batch getBatch(long shardId) { - return new WriteBatch(this, stubProvider, sessionManager, shardId, getConfig().maxBatchSize()); + return new WriteBatch( + this, + stubProvider, + sessionManager, + shardId, + getConfig().maxBatchSize(), + getConfig().requestTimeout()); } } diff --git a/client/src/test/java/io/oxia/client/batch/BatchTest.java b/client/src/test/java/io/oxia/client/batch/BatchTest.java index 89a3fc05..980de51f 100644 --- a/client/src/test/java/io/oxia/client/batch/BatchTest.java +++ b/client/src/test/java/io/oxia/client/batch/BatchTest.java @@ -235,7 +235,14 @@ void setup() { mock(SessionManager.class), config, InstrumentProvider.NOOP); - batch = new WriteBatch(factory, clientByShardId, sessionManager, shardId, 1024 * 1024); + batch = + new WriteBatch( + factory, + clientByShardId, + sessionManager, + shardId, + 1024 * 1024, + config.requestTimeout()); } @Test @@ -350,7 +357,8 @@ public void sendFailNoClient() { stubProvider, sessionManager, shardId, - 1024 * 1024); + 1024 * 1024, + config.requestTimeout()); batch.add(put); batch.add(delete); batch.add(deleteRange); @@ -406,7 +414,7 @@ class ReadBatchTests { void setup() { var factory = new ReadBatchFactory(mock(OxiaStubProvider.class), config, InstrumentProvider.NOOP); - batch = new ReadBatch(factory, clientByShardId, shardId); + batch = new ReadBatch(factory, clientByShardId, shardId, config.requestTimeout()); } @Test @@ -464,7 +472,8 @@ public void sendFailNoClient() { new ReadBatch( new ReadBatchFactory(mock(OxiaStubProvider.class), config, InstrumentProvider.NOOP), stubProvider, - shardId); + shardId, + config.requestTimeout()); batch.add(get); batch.send(); From 3e974b714228e0457b963da5b1e7e8fcdbd9605e Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 13:12:43 +0800 Subject: [PATCH 4/6] Add BlockedThreadChecker to detect blocking on gRPC direct executor threads Similar to Vert.x's "Don't block me" feature, this monitors the internal direct executor thread used by gRPC/Netty and logs a warning with stack trace when a callback blocks longer than the threshold (default 500ms). Co-Authored-By: Claude Opus 4.6 --- .../io/oxia/client/AsyncOxiaClientImpl.java | 9 +- .../java/io/oxia/client/grpc/OxiaStub.java | 30 +++- .../io/oxia/client/grpc/OxiaStubManager.java | 14 +- .../client/util/BlockedThreadChecker.java | 158 ++++++++++++++++++ 4 files changed, 203 insertions(+), 8 deletions(-) create mode 100644 client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java diff --git a/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java index 20ec197b..fba77789 100644 --- a/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java @@ -38,6 +38,7 @@ import io.oxia.client.batch.Operation.WriteOperation.PutOperation; import io.oxia.client.grpc.OxiaStubManager; import io.oxia.client.grpc.OxiaStubProvider; +import io.oxia.client.util.BlockedThreadChecker; import io.oxia.client.metrics.Counter; import io.oxia.client.metrics.InstrumentProvider; import io.oxia.client.metrics.LatencyHistogram; @@ -74,7 +75,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { static @NonNull CompletableFuture newInstance(@NonNull ClientConfig config) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-client")); - var stubManager = new OxiaStubManager(config); + var blockedThreadChecker = new BlockedThreadChecker(); + var stubManager = new OxiaStubManager(config, blockedThreadChecker); var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace()); var serviceAddrStub = stubManager.getStub(config.serviceAddress()); @@ -102,6 +104,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { readBatchManager, writeBatchManager, sessionManager, + blockedThreadChecker, config.requestTimeout()); return shardManager.start().thenApply(v -> client); } @@ -114,6 +117,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { private final @NonNull BatchManager readBatchManager; private final @NonNull BatchManager writeBatchManager; private final @NonNull SessionManager sessionManager; + private final @NonNull BlockedThreadChecker blockedThreadChecker; private final long requestTimeoutMs; private volatile boolean closed; @@ -150,6 +154,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { @NonNull BatchManager readBatchManager, @NonNull BatchManager writeBatchManager, @NonNull SessionManager sessionManager, + @NonNull BlockedThreadChecker blockedThreadChecker, Duration requestTimeout) { this.clientIdentifier = clientIdentifier; this.instrumentProvider = instrumentProvider; @@ -159,6 +164,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { this.readBatchManager = readBatchManager; this.writeBatchManager = writeBatchManager; this.sessionManager = sessionManager; + this.blockedThreadChecker = blockedThreadChecker; this.scheduledExecutor = scheduledExecutor; this.requestTimeoutMs = requestTimeout.toMillis(); @@ -863,6 +869,7 @@ public void close() throws Exception { notificationManager.close(); shardManager.close(); stubManager.close(); + blockedThreadChecker.close(); scheduledExecutor.shutdownNow(); } diff --git a/client/src/main/java/io/oxia/client/grpc/OxiaStub.java b/client/src/main/java/io/oxia/client/grpc/OxiaStub.java index 025378a4..87395c21 100644 --- a/client/src/main/java/io/oxia/client/grpc/OxiaStub.java +++ b/client/src/main/java/io/oxia/client/grpc/OxiaStub.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import io.grpc.stub.StreamObserver; import io.oxia.client.ClientConfig; import io.oxia.client.api.Authentication; +import io.oxia.client.util.BlockedThreadChecker; import io.oxia.proto.CloseSessionRequest; import io.oxia.proto.CloseSessionResponse; import io.oxia.proto.OxiaClientGrpc; @@ -57,16 +58,35 @@ static ChannelCredentials getChannelCredential(String address, boolean tlsEnable } public OxiaStub(String address, ClientConfig clientConfig) { + this(address, clientConfig, null); + } + + public OxiaStub( + String address, + ClientConfig clientConfig, + @Nullable BlockedThreadChecker blockedThreadChecker) { this( + buildChannel(address, clientConfig, blockedThreadChecker), + clientConfig.authentication()); + } + + private static ManagedChannel buildChannel( + String address, + ClientConfig clientConfig, + @Nullable BlockedThreadChecker blockedThreadChecker) { + var builder = Grpc.newChannelBuilder( getAddress(address), getChannelCredential(address, clientConfig.enableTls())) .keepAliveTime(clientConfig.connectionKeepAliveTime().toMillis(), MILLISECONDS) .keepAliveTimeout(clientConfig.connectionKeepAliveTimeout().toMillis(), MILLISECONDS) .keepAliveWithoutCalls(true) - .disableRetry() - .directExecutor() - .build(), - clientConfig.authentication()); + .disableRetry(); + if (blockedThreadChecker != null) { + builder.executor(blockedThreadChecker.checkedDirectExecutor()); + } else { + builder.directExecutor(); + } + return builder.build(); } public OxiaStub(ManagedChannel channel) { diff --git a/client/src/main/java/io/oxia/client/grpc/OxiaStubManager.java b/client/src/main/java/io/oxia/client/grpc/OxiaStubManager.java index 6243526a..77b8a8f6 100644 --- a/client/src/main/java/io/oxia/client/grpc/OxiaStubManager.java +++ b/client/src/main/java/io/oxia/client/grpc/OxiaStubManager.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2025 The Oxia Authors + * Copyright © 2022-2026 The Oxia Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,19 +17,28 @@ import com.google.common.annotations.VisibleForTesting; import io.oxia.client.ClientConfig; +import io.oxia.client.util.BlockedThreadChecker; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import javax.annotation.Nullable; public class OxiaStubManager implements AutoCloseable { @VisibleForTesting final Map stubs = new ConcurrentHashMap<>(); private final int maxConnectionPerNode; private final ClientConfig clientConfig; + private final @Nullable BlockedThreadChecker blockedThreadChecker; public OxiaStubManager(ClientConfig clientConfig) { + this(clientConfig, null); + } + + public OxiaStubManager( + ClientConfig clientConfig, @Nullable BlockedThreadChecker blockedThreadChecker) { this.clientConfig = clientConfig; this.maxConnectionPerNode = clientConfig.maxConnectionPerNode(); + this.blockedThreadChecker = blockedThreadChecker; } public OxiaStub getStub(String address) { @@ -39,7 +48,8 @@ public OxiaStub getStub(String address) { modKey += maxConnectionPerNode; } return stubs.computeIfAbsent( - new Key(address, modKey), key -> new OxiaStub(key.address, clientConfig)); + new Key(address, modKey), + key -> new OxiaStub(key.address, clientConfig, blockedThreadChecker)); } @Override diff --git a/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java b/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java new file mode 100644 index 00000000..70eca62d --- /dev/null +++ b/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java @@ -0,0 +1,158 @@ +/* + * Copyright © 2022-2026 The Oxia Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.oxia.client.util; + +import java.time.Duration; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; + +/** + * Monitors threads for blocking operations, similar to Vert.x's BlockedThreadChecker. + * + *

When the gRPC channel is configured with a direct executor, callbacks run on Netty I/O + * threads. If a callback blocks for too long, it stalls all gRPC I/O on that channel. This checker + * periodically inspects registered threads and logs a warning (with stack trace) when a thread has + * been executing a task longer than the configured threshold. + */ +@Slf4j +public class BlockedThreadChecker implements AutoCloseable { + + static final Duration DEFAULT_CHECK_INTERVAL = Duration.ofSeconds(1); + static final Duration DEFAULT_WARN_THRESHOLD = Duration.ofMillis(500); + + private final Timer timer; + private final Map trackedThreads = new ConcurrentHashMap<>(); + private final long warnThresholdNanos; + private volatile boolean closed; + + public BlockedThreadChecker() { + this(DEFAULT_CHECK_INTERVAL, DEFAULT_WARN_THRESHOLD); + } + + public BlockedThreadChecker(Duration checkInterval, Duration warnThreshold) { + this.warnThresholdNanos = warnThreshold.toNanos(); + this.timer = new Timer("oxia-blocked-thread-checker", true); + this.timer.schedule( + new TimerTask() { + @Override + public void run() { + checkAll(); + } + }, + checkInterval.toMillis(), + checkInterval.toMillis()); + } + + /** + * Creates an executor that wraps direct (caller-thread) execution with blocked-thread + * monitoring. Tasks executed through this executor will be tracked, and a warning is logged if + * any task exceeds the configured threshold. + */ + public Executor checkedDirectExecutor() { + return new CheckedDirectExecutor(this); + } + + void taskStarted(Thread thread) { + trackedThreads.put(thread, new TaskExecution(System.nanoTime())); + } + + void taskFinished(Thread thread) { + trackedThreads.remove(thread); + } + + private void checkAll() { + if (closed) { + return; + } + long now = System.nanoTime(); + trackedThreads.forEach( + (thread, execution) -> { + long elapsed = now - execution.startNanos(); + if (elapsed > warnThresholdNanos && execution.tryWarn()) { + log.warn( + "Thread {} has been blocked for {} ms (threshold: {} ms)", + thread.getName(), + elapsed / 1_000_000, + warnThresholdNanos / 1_000_000, + new BlockedThreadException(thread)); + } + }); + } + + @Override + public void close() { + closed = true; + timer.cancel(); + trackedThreads.clear(); + } + + private static class TaskExecution { + private final long startNanos; + private final AtomicLong warnedAt = new AtomicLong(0); + + TaskExecution(long startNanos) { + this.startNanos = startNanos; + } + + long startNanos() { + return startNanos; + } + + /** Returns true only on the first warn for this execution, to avoid log spam. */ + boolean tryWarn() { + return warnedAt.compareAndSet(0, System.nanoTime()); + } + } + + /** + * Exception used to capture the stack trace of a blocked thread. Not thrown — only used for + * diagnostic logging. + */ + public static class BlockedThreadException extends Exception { + BlockedThreadException(Thread thread) { + super("Thread " + thread.getName() + " blocked"); + setStackTrace(thread.getStackTrace()); + } + } + + /** + * An executor that runs tasks on the calling thread (like gRPC's directExecutor) but tracks + * execution time via the {@link BlockedThreadChecker}. + */ + private static class CheckedDirectExecutor implements Executor { + private final BlockedThreadChecker checker; + + CheckedDirectExecutor(BlockedThreadChecker checker) { + this.checker = checker; + } + + @Override + public void execute(Runnable command) { + Thread current = Thread.currentThread(); + checker.taskStarted(current); + try { + command.run(); + } finally { + checker.taskFinished(current); + } + } + } +} From 23b87f2abbfd08d87295fa4dfbf5211dc3e62983 Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 13:17:57 +0800 Subject: [PATCH 5/6] Make BlockedThreadChecker opt-in via JVM system properties - Controlled by -Doxia.client.blockedThreadChecker.enabled=true - Configurable interval via -Doxia.client.blockedThreadChecker.intervalMs - Configurable threshold via -Doxia.client.blockedThreadChecker.warnThresholdMs - Replace Timer with ScheduledExecutorService for robustness - Re-warn every 5s for long-running blocks instead of warn-once - Disabled by default: zero overhead in production unless opted in Co-Authored-By: Claude Opus 4.6 --- .../io/oxia/client/AsyncOxiaClientImpl.java | 11 +- .../client/util/BlockedThreadChecker.java | 104 ++++++++++++++---- 2 files changed, 88 insertions(+), 27 deletions(-) diff --git a/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java index fba77789..dae3bf28 100644 --- a/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/oxia/client/AsyncOxiaClientImpl.java @@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import javax.annotation.Nullable; import lombok.NonNull; class AsyncOxiaClientImpl implements AsyncOxiaClient { @@ -75,7 +76,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { static @NonNull CompletableFuture newInstance(@NonNull ClientConfig config) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("oxia-client")); - var blockedThreadChecker = new BlockedThreadChecker(); + var blockedThreadChecker = BlockedThreadChecker.createIfEnabled(); var stubManager = new OxiaStubManager(config, blockedThreadChecker); var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace()); @@ -117,7 +118,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { private final @NonNull BatchManager readBatchManager; private final @NonNull BatchManager writeBatchManager; private final @NonNull SessionManager sessionManager; - private final @NonNull BlockedThreadChecker blockedThreadChecker; + private final BlockedThreadChecker blockedThreadChecker; private final long requestTimeoutMs; private volatile boolean closed; @@ -154,7 +155,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { @NonNull BatchManager readBatchManager, @NonNull BatchManager writeBatchManager, @NonNull SessionManager sessionManager, - @NonNull BlockedThreadChecker blockedThreadChecker, + BlockedThreadChecker blockedThreadChecker, Duration requestTimeout) { this.clientIdentifier = clientIdentifier; this.instrumentProvider = instrumentProvider; @@ -869,7 +870,9 @@ public void close() throws Exception { notificationManager.close(); shardManager.close(); stubManager.close(); - blockedThreadChecker.close(); + if (blockedThreadChecker != null) { + blockedThreadChecker.close(); + } scheduledExecutor.shutdownNow(); } diff --git a/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java b/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java index 70eca62d..44f9c1e6 100644 --- a/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java +++ b/client/src/main/java/io/oxia/client/util/BlockedThreadChecker.java @@ -17,10 +17,11 @@ import java.time.Duration; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import lombok.extern.slf4j.Slf4j; @@ -31,34 +32,69 @@ * threads. If a callback blocks for too long, it stalls all gRPC I/O on that channel. This checker * periodically inspects registered threads and logs a warning (with stack trace) when a thread has * been executing a task longer than the configured threshold. + * + *

Controlled via JVM system properties: + * + *

    + *
  • {@code -Doxia.client.blockedThreadChecker.enabled=true} — enable the checker (default: + * disabled) + *
  • {@code -Doxia.client.blockedThreadChecker.intervalMs=1000} — check interval in + * milliseconds (default: 1000) + *
  • {@code -Doxia.client.blockedThreadChecker.warnThresholdMs=500} — warning threshold in + * milliseconds (default: 500) + *
*/ @Slf4j public class BlockedThreadChecker implements AutoCloseable { - static final Duration DEFAULT_CHECK_INTERVAL = Duration.ofSeconds(1); - static final Duration DEFAULT_WARN_THRESHOLD = Duration.ofMillis(500); + static final String PROP_ENABLED = "oxia.client.blockedThreadChecker.enabled"; + static final String PROP_INTERVAL_MS = "oxia.client.blockedThreadChecker.intervalMs"; + static final String PROP_WARN_THRESHOLD_MS = "oxia.client.blockedThreadChecker.warnThresholdMs"; - private final Timer timer; + static final long DEFAULT_CHECK_INTERVAL_MS = 1000; + static final long DEFAULT_WARN_THRESHOLD_MS = 500; + + private final ScheduledExecutorService scheduler; private final Map trackedThreads = new ConcurrentHashMap<>(); private final long warnThresholdNanos; private volatile boolean closed; - public BlockedThreadChecker() { - this(DEFAULT_CHECK_INTERVAL, DEFAULT_WARN_THRESHOLD); + /** Returns {@code true} if the checker is enabled via system property. */ + public static boolean isEnabled() { + return Boolean.getBoolean(PROP_ENABLED); + } + + /** + * Creates a checker configured from system properties, or {@code null} if not enabled. Call + * sites should use {@link #createIfEnabled()} and null-check. + */ + public static BlockedThreadChecker createIfEnabled() { + if (!isEnabled()) { + return null; + } + long intervalMs = getLongProperty(PROP_INTERVAL_MS, DEFAULT_CHECK_INTERVAL_MS); + long warnMs = getLongProperty(PROP_WARN_THRESHOLD_MS, DEFAULT_WARN_THRESHOLD_MS); + log.info( + "Blocked thread checker enabled (interval={}ms, warnThreshold={}ms)", + intervalMs, + warnMs); + return new BlockedThreadChecker(Duration.ofMillis(intervalMs), Duration.ofMillis(warnMs)); } - public BlockedThreadChecker(Duration checkInterval, Duration warnThreshold) { + BlockedThreadChecker(Duration checkInterval, Duration warnThreshold) { this.warnThresholdNanos = warnThreshold.toNanos(); - this.timer = new Timer("oxia-blocked-thread-checker", true); - this.timer.schedule( - new TimerTask() { - @Override - public void run() { - checkAll(); - } - }, + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "oxia-blocked-thread-checker"); + t.setDaemon(true); + return t; + }); + this.scheduler.scheduleAtFixedRate( + this::checkAll, + checkInterval.toMillis(), checkInterval.toMillis(), - checkInterval.toMillis()); + TimeUnit.MILLISECONDS); } /** @@ -86,7 +122,7 @@ private void checkAll() { trackedThreads.forEach( (thread, execution) -> { long elapsed = now - execution.startNanos(); - if (elapsed > warnThresholdNanos && execution.tryWarn()) { + if (elapsed > warnThresholdNanos && execution.shouldWarn(now)) { log.warn( "Thread {} has been blocked for {} ms (threshold: {} ms)", thread.getName(), @@ -100,13 +136,28 @@ private void checkAll() { @Override public void close() { closed = true; - timer.cancel(); + scheduler.shutdownNow(); trackedThreads.clear(); } + private static long getLongProperty(String key, long defaultValue) { + String value = System.getProperty(key); + if (value == null) { + return defaultValue; + } + try { + return Long.parseLong(value); + } catch (NumberFormatException e) { + log.warn("Invalid value for system property {}: '{}', using default {}", key, value, defaultValue); + return defaultValue; + } + } + private static class TaskExecution { + private static final long WARN_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(5); + private final long startNanos; - private final AtomicLong warnedAt = new AtomicLong(0); + private final AtomicLong lastWarnedNanos = new AtomicLong(0); TaskExecution(long startNanos) { this.startNanos = startNanos; @@ -116,9 +167,16 @@ long startNanos() { return startNanos; } - /** Returns true only on the first warn for this execution, to avoid log spam. */ - boolean tryWarn() { - return warnedAt.compareAndSet(0, System.nanoTime()); + /** + * Returns true if enough time has passed since the last warning. Warns on first detection, + * then at most once every 5 seconds to avoid log spam. + */ + boolean shouldWarn(long nowNanos) { + long lastWarned = lastWarnedNanos.get(); + if (lastWarned == 0 || (nowNanos - lastWarned) >= WARN_INTERVAL_NANOS) { + return lastWarnedNanos.compareAndSet(lastWarned, nowNanos); + } + return false; } } From 20efd09fb9f2580b139ec1cf9f1cf16c2e30781f Mon Sep 17 00:00:00 2001 From: mattisonchao Date: Mon, 9 Mar 2026 13:23:34 +0800 Subject: [PATCH 6/6] Fix test compilation: pass null BlockedThreadChecker in test Co-Authored-By: Claude Opus 4.6 --- client/src/test/java/io/oxia/client/AsyncOxiaClientImplTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/client/src/test/java/io/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/oxia/client/AsyncOxiaClientImplTest.java index 898a5cbd..287b4160 100644 --- a/client/src/test/java/io/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/oxia/client/AsyncOxiaClientImplTest.java @@ -96,6 +96,7 @@ void setUp() { readBatchManager, writeBatchManager, sessionManager, + null, requestTimeout); }