Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
- Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes.
- Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion.

### Pooling

- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`.

### Dependencies
- Updated dependencies:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti

@Override
public void forEach(Consumer<IProtoClient> action) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
action.accept(entry.getClient());
List<IProtoClient> clients = new ArrayList<>();
synchronized (connectionPoolLock) {
for (List<PoolEntry> group : entries.values()) {
for (PoolEntry entry : group) {
clients.add(entry.getClient());
}
}
}
clients.forEach(action);
}

/**
Expand Down
91 changes: 65 additions & 26 deletions tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -146,24 +147,30 @@ final class PoolEntry {
private CompletableFuture<IProtoClient> connectFuture;

/** Last heartbeat state/event. */
private HeartbeatEvent lastHeartbeatEvent;
private volatile HeartbeatEvent lastHeartbeatEvent;

/** Heartbeat timer/task. */
private Timeout heartbeatTask;
private volatile Timeout heartbeatTask;

/** Reconnection task. */
private Timeout reconnectTask;

/** Flag signaling if heartbeat started or not. */
private boolean isHeartbeatStarted;
private volatile boolean isHeartbeatStarted;

/**
* Flag signaling if connection is available or not.
*
* <p>When connection comes to invalidated state or killed, pool entry is locked and connection
* will not be returned to outer client.
*/
private boolean isLocked;
private volatile boolean isLocked;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Т.к. подключение в обойме может быть затронуто из нескольких потоков, для его блокировки и разблокировки, то вероятно, полезнее будет сделать isLocked как AtomicBoolean и переделать методы блокировки через него. Возможно, этого изменения будет достаточно


/**
* Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link
* #internalConnect()} when a new connection generation begins.
*/
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

/** Count of failed pings occurred in invalidated state. */
private int currentDeathPings;
Expand Down Expand Up @@ -305,7 +312,7 @@ public IProtoClient getClient() {
*
* <p>Also increments count of unavailable clients.
*/
public void lock() {
public synchronized void lock() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тоже давай отразим в комментарии, из каких потоков может быть вызван этот метод, и что может случиться:

  1. Из потока ввода/вывода Netty в случае, когда подключение не удаётся: вызывается handleConnectError. (обозначим как событие A).
  2. Из потока ввода/вывода Netty в случае, когда происходит обрыв подключения: также вызывается handleConnectError. (обозначим как событие Б).
  3. Из потока ввода/вывода Netty в случае, когда происходит анализ ответа на запрос PING и принимается решение заблокировать соединение, чтобы вывести его из распределения нагрузки. (обозначим как событие В).

Есть вероятность, что могут случиться одновременно события Б и В (решили вывести соединение из под нагрузки и тут же прилетает его завершение).2

В этом случае может увеличиться дважды счётчик unavailable.

Какие еще события могут произойти?

if (!isLocked) {
unavailable.incrementAndGet();
isLocked = true;
Expand All @@ -317,7 +324,7 @@ public void lock() {
*
* <p>Also decrements count of unavailable clients and cancels reconnect task.
*/
public void unlock() {
public synchronized void unlock() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хорошо бы отразить здесь в комментарии, из каких потоков может быть вызован этот метод.

  1. Из потока пользовательского кода: при вызове функции изменения настроек обоймы подключений может вызываться unlock(), когда закрываются лишние). Не вызывается практически никогда. (Событие А).
  2. Из потока ввода/вывода Netty при обработке ответов на запросы IPROTO_PING (или другого метода для проверки): при анализе ответа может вызываться как lock, так и unlock. (Событие Б).
  3. Из потока ввода/вывода Netty при обработке события успешного подключения: вызывается unlock. (Событие В).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь указанные события одновременно произойти не могут:

  1. Событием А можно пренебречь.
  2. Событие Б не может произойти одновременно с событием В, так как событие Б случается только после того, как успешно подключились и запустили задачу проверки соединения.

if (isLocked) {
stopReconnectTask();
unavailable.decrementAndGet();
Expand All @@ -340,27 +347,36 @@ public void close() {
shutdown();
}

/** Closes client and stops heartbeat task is started. */
/**
* Closes the underlying client and stops the heartbeat task.
*
* <p>Performs field mutations under the entry monitor, then releases it before calling {@code
* client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close
* event. Holding the entry monitor across either of those calls would create an ABBA deadlock
* with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and
* then re-enters {@link #handleConnectError(Object, Throwable)} on the entry.
*/
public void shutdown() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown() также надо проанализировать, из каких потоков он может быть вызван.
Как минимум, из потока пользовательского кода (закрытие обоймы соединений) ии... каких еще?

connectFuture = null;
stopHeartbeat();
synchronized (this) {
connectFuture = null;
stopHeartbeat();
}
try {
client.close();
} catch (Exception e) {
log.warn("Cannot close client in pool", e);
}
emit(listener -> listener.onConnectionClosed(tag, index));
if (isShutdown.compareAndSet(false, true)) {
emit(listener -> listener.onConnectionClosed(tag, index));
}
}

/**
* Start client connection process and returns futures.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
public synchronized CompletableFuture<IProtoClient> connect() {
if (connectFuture != null) {
return connectFuture;
}
public CompletableFuture<IProtoClient> connect() {
return internalConnect();
}

Expand Down Expand Up @@ -410,15 +426,23 @@ public void stopHeartbeat() {
/**
* Internal method used by reconnect task and public connect.
*
* <p>See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs
* outside the entry monitor for the same reason.
*
* @return {@link java.util.concurrent.CompletableFuture} with client
*/
private CompletableFuture<IProtoClient> internalConnect() {
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
}
log.info("connect {}/{}", tag, index);
LongTaskTimer.Sample timer = startTimer(connectTime);
CompletableFuture<?> future =
client.connect(group.getAddress(), connectTimeout, gracefulShutdown);
String user = group.getUser();
connectFuture =
CompletableFuture<IProtoClient> cf =
future
.thenCompose(
greeting -> {
Expand All @@ -428,9 +452,16 @@ private CompletableFuture<IProtoClient> internalConnect() {
}
return client.ping(firstPingOpts);
})
.thenApply(r -> client)
.whenComplete(this::onConnectComplete);
return connectFuture;
.thenApply(r -> client);
synchronized (this) {
if (connectFuture != null) {
return connectFuture;
}
connectFuture = cf;
isShutdown.set(false);
}
cf.whenComplete(this::onConnectComplete);
return cf;
}

/**
Expand Down Expand Up @@ -470,7 +501,9 @@ private void handleConnectError(Object r, Throwable exc) {
return;
}
Throwable failure = exc.getCause() != null ? exc.getCause() : exc;
connectFuture = null;
synchronized (this) {
connectFuture = null;
}
log.error("connect error {}/{}: {}", tag, index, failure.toString());
emit(listener -> listener.onConnectionFailed(tag, index, failure));
lock();
Expand All @@ -480,13 +513,19 @@ private void handleConnectError(Object r, Throwable exc) {

/** Reconnect task scheduler. */
private void connectAfter() {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask == null) {
reconnecting.incrementAndGet();
synchronized (this) {
log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter);
if (reconnectTask != null) {
// existing task is being replaced; the existing increment in `reconnecting` carries over
// to the new task, so no counter change is needed here.
reconnectTask.cancel();
} else {
reconnecting.incrementAndGet();
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
}
reconnectTask =
timerService.newTimeout(
timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS);
emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter));
}

Expand Down Expand Up @@ -658,7 +697,7 @@ private void incHeartbeatCounters(int fail) {
}

/** Stops reconnecting task if it is active. */
private void stopReconnectTask() {
private synchronized void stopReconnectTask() {
if (reconnectTask != null) {
reconnecting.decrementAndGet();
reconnectTask.cancel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer<?> container, String command) {

protected int getActiveConnectionsCount(TarantoolContainer<?> tt) {
try {
List<? extends Object> result =
TarantoolContainerClientHelper.executeCommandDecoded(
tt, "return box.stat.net().CONNECTIONS.current");
return (Integer) result.get(0) - 1;
// box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker;
// the loop's fiber.sleep lets it drain pending connections before we read.
String lua =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Мне кажется, что именно эти изменения, вкупе с изменениями ниже (waitForActiveConnections) и дают наибольшй эффект, нежели попытки обмазать всё блокировками в PoolEntry

"local last = box.stat.net().CONNECTIONS.current;"
+ " for i = 1, 50 do"
+ " require('fiber').sleep(0.05);"
+ " local cur = box.stat.net().CONNECTIONS.current;"
+ " if cur == last then return cur - 1 end;"
+ " last = cur;"
+ " end;"
+ " return last - 1";
List<? extends Object> result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua);
return (Integer) result.get(0);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer<?> tt, int basel
return getActiveConnectionsCount(tt) - baseline;
}

/**
* Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why
* a single read is unreliable.
*
* @param tt the Tarantool container under test
* @param expected the expected number of active connections
*/
protected void waitForActiveConnections(TarantoolContainer<?> tt, int expected) {
try {
waitFor(
"Active connections count never reached " + expected,
Duration.ofSeconds(10),
() -> assertEquals(expected, getActiveConnectionsCount(tt)));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

protected MeterRegistry createMetricsRegistry() {
MeterRegistry metricsRegistry = new SimpleMeterRegistry();
LongTaskTimer.builder("request.timer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
assertTrue(pool.hasAvailableClients());
List<IProtoClient> clients = getConnects(pool, "node-a", count1);
assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

tt.stop();
Thread.sleep(1000);
Expand Down Expand Up @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception {
});

assertTrue(pingClients(clients));
assertEquals(count1, getActiveConnectionsCount(tt));
waitForActiveConnections(tt, count1);

assertEquals(count1, metricsRegistry.get("pool.size").gauge().value());
assertEquals(count1, metricsRegistry.get("pool.available").gauge().value());
Expand Down