Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ subprojects {
}

dependencies {
implementation(rootProject.libs.slf4j.api)
implementation(rootProject.libs.slog)

compileOnly(rootProject.libs.lombok)
annotationProcessor(rootProject.libs.lombok)
Expand Down
7 changes: 4 additions & 3 deletions client/src/main/java/io/oxia/client/SequenceUpdates.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.oxia.client;

import io.github.merlimat.slog.Logger;
import io.grpc.ClientCall;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
Expand All @@ -31,11 +32,11 @@
import java.util.function.Consumer;
import java.util.function.Function;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SequenceUpdates implements Closeable, StreamObserver<GetSequenceUpdatesResponse> {

private static final Logger log = Logger.get(SequenceUpdates.class);

private final String key;
private final String partitionKey;

Expand Down Expand Up @@ -108,7 +109,7 @@ public synchronized void onError(Throwable t) {
if (closed || isClientClosed.apply(null)) {
return;
}
log.warn("Failure while processing sequence updates: {}", t.getMessage(), t);
log.warn().exception(t).log("Failure while processing sequence updates");
createStream();
}

Expand Down
2 changes: 0 additions & 2 deletions client/src/main/java/io/oxia/client/grpc/OxiaStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class OxiaStub implements AutoCloseable {
public static String TLS_SCHEMA = "tls://";
private final ManagedChannel channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public WriteStreamWrapper getWriteStream(long shardId) {
final var asyncStub = provider.getStubForShard(shardId).async();
return new WriteStreamWrapper(
asyncStub.withInterceptors(
MetadataUtils.newAttachHeadersInterceptor(headers)));
MetadataUtils.newAttachHeadersInterceptor(headers)),
shardId);
});
}
if (wrapper.isValid()) {
Expand Down
28 changes: 15 additions & 13 deletions client/src/main/java/io/oxia/client/grpc/WriteStreamWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.oxia.client.grpc;

import io.github.merlimat.slog.Logger;
import io.grpc.stub.StreamObserver;
import io.oxia.proto.OxiaClientGrpc;
import io.oxia.proto.WriteRequest;
Expand All @@ -24,18 +25,19 @@
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public final class WriteStreamWrapper implements StreamObserver<WriteResponse> {

private final Logger log;

private final StreamObserver<WriteRequest> clientStream;
private final Deque<CompletableFuture<WriteResponse>> pendingWrites;

private volatile boolean completed;
private volatile Throwable completedException;

public WriteStreamWrapper(OxiaClientGrpc.OxiaClientStub stub) {
public WriteStreamWrapper(OxiaClientGrpc.OxiaClientStub stub, long shardId) {
this.log = Logger.get(WriteStreamWrapper.class).with().attr("shard", shardId).build();
this.pendingWrites = new ArrayDeque<>();
this.completed = false;
this.completedException = null;
Expand All @@ -62,10 +64,11 @@ public void onError(Throwable error) {
completedException = error;
completed = true;
if (!pendingWrites.isEmpty()) {
log.warn(
"Receive error when writing data to server through the stream, prepare to fail pending requests. pendingWrites={} {}",
pendingWrites.size(),
completedException.getMessage());
log.warn()
.attr("pendingWrites", pendingWrites.size())
.exceptionMessage(completedException)
.log(
"Receive error when writing data to server through the stream, prepare to fail pending requests");
}
pendingWrites.forEach(f -> f.completeExceptionally(completedException));
pendingWrites.clear();
Expand All @@ -77,9 +80,10 @@ public void onCompleted() {
synchronized (WriteStreamWrapper.this) {
completed = true;
if (!pendingWrites.isEmpty()) {
log.info(
"Receive stream close signal when writing data to server through the stream, prepare to cancel pending requests. pendingWrites={}",
pendingWrites.size());
log.info()
.attr("pendingWrites", pendingWrites.size())
.log(
"Receive stream close signal when writing data to server through the stream, prepare to cancel pending requests");
}
pendingWrites.forEach(f -> f.completeExceptionally(new CancellationException()));
pendingWrites.clear();
Expand All @@ -98,9 +102,7 @@ public CompletableFuture<WriteResponse> send(WriteRequest request) {
}
final CompletableFuture<WriteResponse> future = new CompletableFuture<>();
try {
if (log.isDebugEnabled()) {
log.debug("Sending request {}", request);
}
log.debug().attr("request", request).log("Sending request");
clientStream.onNext(request);
pendingWrites.add(future);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import java.util.function.Consumer;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NotificationManager implements AutoCloseable, Consumer<ShardAssignmentChanges> {
private final ConcurrentMap<Long, ShardNotificationReceiver> shardReceivers =
new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static io.oxia.client.api.Notification.KeyModified;
import static lombok.AccessLevel.PACKAGE;

import io.github.merlimat.slog.Logger;
import io.grpc.stub.StreamObserver;
import io.oxia.client.CompositeConsumer;
import io.oxia.client.api.Notification;
Expand All @@ -35,11 +36,11 @@
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ShardNotificationReceiver implements Closeable, StreamObserver<NotificationBatch> {

private final Logger log;

private final OxiaStubManager manager;
private final String leader;
private final NotificationManager notificationManager;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class ShardNotificationReceiver implements Closeable, StreamObserver<Noti
this.shardId = shardId;
this.callback = callback;
this.offset = offset;
this.log = Logger.get(ShardNotificationReceiver.class).with().attr("shard", shardId).build();

start();
}
Expand Down Expand Up @@ -98,9 +100,7 @@ public void onNext(NotificationBatch batch) {

batch.forEachNotifications(
(key, notification) -> {
if (log.isDebugEnabled()) {
log.debug("--- Got notification: {} - {}", key, notification.getType());
}
log.debug().attr("key", key).attr("type", notification.getType()).log("Got notification");

var n =
switch (notification.getType()) {
Expand All @@ -125,17 +125,16 @@ public void onError(Throwable t) {
}

long retryDelayMillis = backoff.nextDelayMillis();
log.warn(
"Error while receiving notifications for shard={}: {} - Retrying in {} seconds",
shardId,
t.getMessage(),
retryDelayMillis / 1000.0);
log.warn()
.attr("retryInSeconds", retryDelayMillis / 1000.0)
.exceptionMessage(t)
.log("Error while receiving notifications");
notificationManager
.getExecutor()
.schedule(
() -> {
if (!closed) {
log.info("Retrying getting notifications for shard={}", shardId);
log.info("Retrying getting notifications");
start();
}
},
Expand Down
49 changes: 18 additions & 31 deletions client/src/main/java/io/oxia/client/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import io.github.merlimat.slog.Logger;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.oxia.client.ClientConfig;
Expand All @@ -38,11 +39,11 @@
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Session implements StreamObserver<KeepAliveResponse> {

private final @NonNull Logger log;

private final @NonNull OxiaStubProvider stubProvider;
private final @NonNull Duration sessionTimeout;
private final @NonNull Duration heartbeatInterval;
Expand Down Expand Up @@ -89,12 +90,15 @@ public class Session implements StreamObserver<KeepAliveResponse> {
this.heartbeat = new SessionHeartbeat();
this.heartbeat.setShard(shardId).setSessionId(sessionId);
this.listener = listener;
this.log =
Logger.get(Session.class)
.with()
.attr("shard", shardId)
.attr("sessionId", sessionId)
.attr("clientIdentity", config.clientIdentifier())
.build();

log.info(
"Session created shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
config.clientIdentifier());
log.info("Session created");

this.sessionsOpened =
instrumentProvider.newCounter(
Expand Down Expand Up @@ -125,7 +129,9 @@ public class Session implements StreamObserver<KeepAliveResponse> {
try {
sendKeepAlive();
} catch (Throwable ex) {
log.warn("receive error when send keep-alive request", Throwables.getRootCause(ex));
log.warn()
.exception(Throwables.getRootCause(ex))
.log("receive error when send keep-alive request");
}
},
heartbeatInterval.toMillis(),
Expand All @@ -147,23 +153,12 @@ private void sendKeepAlive() {
@Override
public void onNext(KeepAliveResponse value) {
lastSuccessfullResponse = Instant.now();
if (log.isDebugEnabled()) {
log.debug(
"Received keep-alive response shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
}
log.debug("Received keep-alive response");
}

@Override
public void onError(Throwable t) {
log.warn(
"Error during session keep-alive shard={} sessionId={} clientIdentity={}: {}",
shardId,
sessionId,
clientIdentifier,
t.getMessage());
log.warn().exceptionMessage(t).log("Error during session keep-alive");
}

@Override
Expand All @@ -173,11 +168,7 @@ public void onCompleted() {

private void handleSessionExpired() {
sessionsExpired.increment();
log.warn(
"Session expired shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
log.warn("Session expired");
close();
}

Expand All @@ -198,11 +189,7 @@ public CompletableFuture<Void> close() {
return future.whenComplete(
(__, ignore) -> {
listener.onSessionClosed(Session.this);
log.info(
"Session closed shard={} sessionId={} clientIdentity={}",
shardId,
sessionId,
clientIdentifier);
log.info("Session closed");
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SessionManager
implements AutoCloseable, Consumer<ShardAssignmentChanges>, SessionNotificationListener {

Expand Down
24 changes: 13 additions & 11 deletions client/src/main/java/io/oxia/client/shard/ShardManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static java.util.stream.Collectors.toSet;

import com.google.common.annotations.VisibleForTesting;
import io.github.merlimat.slog.Logger;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
Expand All @@ -48,10 +49,11 @@
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ShardManager implements AutoCloseable, StreamObserver<ShardAssignments> {

private static final Logger LOG = Logger.get(ShardManager.class);
private final Logger log;
private final ScheduledExecutorService executor;
private final OxiaStub stub;
private final @NonNull ShardAssignmentsContainer assignments;
Expand All @@ -75,6 +77,7 @@ public class ShardManager implements AutoCloseable, StreamObserver<ShardAssignme
this.executor = executor;
this.assignments = assignments;
this.callbacks = callbacks;
this.log = LOG.with().attr("namespace", assignments.getNamespace()).build();

this.shardAssignmentsEvents =
instrumentProvider.newCounter(
Expand Down Expand Up @@ -134,7 +137,7 @@ public void onError(Throwable error) {
if (description != null) {
var customStatusCode = CustomStatusCode.fromDescription(description);
if (customStatusCode == CustomStatusCode.ErrorNamespaceNotFound) {
log.error("Namespace not found: {}", assignments.getNamespace());
log.error("Namespace not found");
if (!initialAssignmentsFuture.isDone()) {
if (initialAssignmentsFuture.completeExceptionally(
new NamespaceNotFoundException(assignments.getNamespace()))) {
Expand All @@ -145,13 +148,11 @@ public void onError(Throwable error) {
}
}
}
log.warn("Failed receiving shard assignments.", getRootCause(error));
log.warn().exception(getRootCause(error)).log("Failed receiving shard assignments");
executor.schedule(
() -> {
if (!closed) {
log.info(
"Retry creating stream for shard assignments namespace={}",
assignments.getNamespace());
log.info("Retry creating stream for shard assignments");
start();
}
},
Expand All @@ -169,9 +170,7 @@ public void onCompleted() {
executor.schedule(
() -> {
if (!closed) {
log.info(
"Retry creating stream for shard assignments after stream closed namespace={}",
assignments.getNamespace());
log.info("Retry creating stream for shard assignments after stream closed");
start();
}
},
Expand Down Expand Up @@ -213,7 +212,10 @@ static Map<Long, Shard> recomputeShardHashBoundaries(
.findOverlapping(assignments.values())
.forEach(
existing -> {
log.info("Deleting shard {} as it overlaps with {}", existing, update);
LOG.info()
.attr("existing", existing)
.attr("update", update)
.log("Deleting shard as it overlaps");
toDelete.add(existing.id());
}));

Expand Down
Loading
Loading