diff --git a/CHANGELOG.md b/CHANGELOG.md index c259d114..819fea0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,10 @@ - Document supported Java types for Tarantool data mapping in `tuple_pojo_mapping` docs (RU/EN), including Tarantool extension types (`decimal`, `uuid`, `datetime`, `interval`, `tuple`) and related mapping notes. - Document Jackson MsgPack deserialization: integers, `bin`/`str` vs `byte[]`/`String`, floating-point vs `decimal`; reference `jackson-dataformat-msgpack` for defaults and type coercion. +### Pooling + +- Fix race conditions, ABBA deadlock between `PoolEntry` and `ConnectionImpl` monitors, NPE on inline connect failure, and connection leak after a KILL/reconnect cycle in `PoolEntry` and `IProtoClientPoolImpl`. + ### Dependencies - Updated dependencies: diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java index 953c1bb2..dfee5346 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/IProtoClientPoolImpl.java @@ -524,11 +524,15 @@ public void setReconnectAfter(long reconnectAfter) throws IllegalArgumentExcepti @Override public void forEach(Consumer action) { - for (List group : entries.values()) { - for (PoolEntry entry : group) { - action.accept(entry.getClient()); + List clients = new ArrayList<>(); + synchronized (connectionPoolLock) { + for (List group : entries.values()) { + for (PoolEntry entry : group) { + clients.add(entry.getClient()); + } } } + clients.forEach(action); } /** diff --git a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java index a746b9c2..2c3e0e13 100644 --- a/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java +++ b/tarantool-pooling/src/main/java/io/tarantool/pool/PoolEntry.java @@ -8,6 +8,7 @@ import java.util.ArrayDeque; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -146,16 +147,16 @@ final class PoolEntry { private CompletableFuture connectFuture; /** Last heartbeat state/event. */ - private HeartbeatEvent lastHeartbeatEvent; + private volatile HeartbeatEvent lastHeartbeatEvent; /** Heartbeat timer/task. */ - private Timeout heartbeatTask; + private volatile Timeout heartbeatTask; /** Reconnection task. */ private Timeout reconnectTask; /** Flag signaling if heartbeat started or not. */ - private boolean isHeartbeatStarted; + private volatile boolean isHeartbeatStarted; /** * Flag signaling if connection is available or not. @@ -163,7 +164,13 @@ final class PoolEntry { *

When connection comes to invalidated state or killed, pool entry is locked and connection * will not be returned to outer client. */ - private boolean isLocked; + private final AtomicBoolean isLocked = new AtomicBoolean(false); + + /** + * Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link + * #internalConnect()} when a new connection generation begins. + */ + private final AtomicBoolean isShutdown = new AtomicBoolean(false); /** Count of failed pings occurred in invalidated state. */ private int currentDeathPings; @@ -273,7 +280,6 @@ public PoolEntry( this.gracefulShutdown = gracefulShutdown; this.group = group; this.index = index; - this.isLocked = false; this.reconnectAfter = reconnectAfter; this.tag = group.getTag(); this.timerService = timerService; @@ -301,27 +307,63 @@ public IProtoClient getClient() { } /** - * Method for locking pool entry. + * Locks the pool entry and increments the pool-wide {@code unavailable} counter on the {@code + * false → true} transition of {@link #isLocked}. * - *

Also increments count of unavailable clients. + *

All callers run on Netty IO threads: + * + *

    + *
  • {@code ConnectFailure} — initial connect attempt fails; reaches this method via {@link + * #handleConnectError(Object, Throwable)} from the {@code CLOSE_BY_REMOTE} or {@code + * CLOSE_BY_SHUTDOWN} listener registered on {@link #client}. + *
  • {@code ConnectionBreak} — an established connection is closed by the remote side or shut + * down locally; same path as {@code ConnectFailure}. + *
  • {@code HeartbeatInvalidate} — the sliding-window failure rate in {@link #pong} crosses + * the invalidation threshold; reaches this method via {@link #fire(HeartbeatEvent)} for + * {@code INVALIDATE}. + *
+ * + *

{@code ConnectionBreak} and {@code HeartbeatInvalidate} can fire concurrently (the heartbeat + * decides to invalidate the connection at the same instant the close callback runs). The {@link + * AtomicBoolean#compareAndSet} on {@link #isLocked} ensures only one of the two callers wins the + * {@code false → true} transition, so {@code unavailable} is incremented at most once per lock + * acquisition. */ public void lock() { - if (!isLocked) { + if (isLocked.compareAndSet(false, true)) { unavailable.incrementAndGet(); - isLocked = true; } } /** - * Method for unlocking pool entry. + * Unlocks the pool entry, cancels any pending reconnect task, and decrements the pool-wide {@code + * unavailable} counter on the {@code true → false} transition of {@link #isLocked}. + * + *

Callers: * - *

Also decrements count of unavailable clients and cancels reconnect task. + *

    + *
  • {@code HeartbeatResponse} — Netty IO thread, {@link #pong} reports a healthy response and + * {@link #fire(HeartbeatEvent)} for {@code ACTIVATE} reaches this method. + *
  • {@code ConnectSuccess} — Netty IO thread, {@link #onConnectComplete} reaches this method + * after a successful connect (and {@link #startHeartbeat}). + *
  • {@code UserConfigChange} — user code, when {@code pool.setGroups(...)} shrinks a group. + * Runs on the user thread while it holds {@code connectionPoolLock} (see {@code + * IProtoClientPoolImpl#shrinkGroup}); that lock serialises this path against other pool + * mutators, which is the only reason it is safe for user code to call {@code unlock()} + * directly — no other user-code path reaches this method. Rare in practice; not driven by + * Netty. + *
+ * + *

{@code HeartbeatResponse} and {@code ConnectSuccess} cannot run at the same time for the + * same entry: the heartbeat is started only after a successful connect, and {@code + * HeartbeatResponse} only follows a prior {@code HeartbeatInvalidate} that already locked the + * entry. The {@link AtomicBoolean#compareAndSet} ensures only one of possibly several concurrent + * unlockers runs {@link #stopReconnectTask} and decrements {@code unavailable}. */ public void unlock() { - if (isLocked) { + if (isLocked.compareAndSet(true, false)) { stopReconnectTask(); unavailable.decrementAndGet(); - isLocked = false; } } @@ -331,7 +373,7 @@ public void unlock() { * @return {@link #isLocked} value. */ public boolean isLocked() { - return isLocked; + return isLocked.get(); } /** Closes client and stops heartbeat and reconnect tasks if started. */ @@ -340,16 +382,46 @@ public void close() { shutdown(); } - /** Closes client and stops heartbeat task is started. */ + /** + * Closes the underlying client and stops the heartbeat task. + * + *

Performs field mutations under the entry monitor, then releases it before calling {@code + * client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close + * event. Holding the entry monitor across either of those calls would create an ABBA deadlock + * with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and + * then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. + * + *

Callers: + * + *

    + *
  • {@code PoolClose} — user code, via {@code pool.close()} → {@link #close()} → this method. + * Runs on the user thread that closes the pool. + *
  • {@code ConnectError} — Netty IO thread, via {@link #handleConnectError(Object, + * Throwable)} which fires for both {@code ConnectFailure} (initial connect fails) and + * {@code ConnectionBreak} (established connection drops). Runs on the Netty IO thread + * delivering the close event. + *
  • {@code HeartbeatKill} — Netty IO thread, via {@link #fire(HeartbeatEvent)} for {@code + * KILL} when the death-ping counter crosses the death threshold. Runs on the Netty IO + * thread processing the heartbeat pong. + *
+ * + *

The {@code isShutdown} {@link AtomicBoolean} guard on the {@code onConnectionClosed} emit + * makes the listener invocation one-shot regardless of how many of the above paths reach this + * method for the same entry. + */ public void shutdown() { - connectFuture = null; - stopHeartbeat(); + synchronized (this) { + connectFuture = null; + stopHeartbeat(); + } try { client.close(); } catch (Exception e) { log.warn("Cannot close client in pool", e); } - emit(listener -> listener.onConnectionClosed(tag, index)); + if (isShutdown.compareAndSet(false, true)) { + emit(listener -> listener.onConnectionClosed(tag, index)); + } } /** @@ -357,10 +429,7 @@ public void shutdown() { * * @return {@link java.util.concurrent.CompletableFuture} with client */ - public synchronized CompletableFuture connect() { - if (connectFuture != null) { - return connectFuture; - } + public CompletableFuture connect() { return internalConnect(); } @@ -410,15 +479,23 @@ public void stopHeartbeat() { /** * Internal method used by reconnect task and public connect. * + *

See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs + * outside the entry monitor for the same reason. + * * @return {@link java.util.concurrent.CompletableFuture} with client */ private CompletableFuture internalConnect() { + synchronized (this) { + if (connectFuture != null) { + return connectFuture; + } + } log.info("connect {}/{}", tag, index); LongTaskTimer.Sample timer = startTimer(connectTime); CompletableFuture future = client.connect(group.getAddress(), connectTimeout, gracefulShutdown); String user = group.getUser(); - connectFuture = + CompletableFuture cf = future .thenCompose( greeting -> { @@ -428,9 +505,16 @@ private CompletableFuture internalConnect() { } return client.ping(firstPingOpts); }) - .thenApply(r -> client) - .whenComplete(this::onConnectComplete); - return connectFuture; + .thenApply(r -> client); + synchronized (this) { + if (connectFuture != null) { + return connectFuture; + } + connectFuture = cf; + isShutdown.set(false); + } + cf.whenComplete(this::onConnectComplete); + return cf; } /** @@ -470,7 +554,9 @@ private void handleConnectError(Object r, Throwable exc) { return; } Throwable failure = exc.getCause() != null ? exc.getCause() : exc; - connectFuture = null; + synchronized (this) { + connectFuture = null; + } log.error("connect error {}/{}: {}", tag, index, failure.toString()); emit(listener -> listener.onConnectionFailed(tag, index, failure)); lock(); @@ -480,13 +566,19 @@ private void handleConnectError(Object r, Throwable exc) { /** Reconnect task scheduler. */ private void connectAfter() { - log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); - if (reconnectTask == null) { - reconnecting.incrementAndGet(); + synchronized (this) { + log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); + if (reconnectTask != null) { + // existing task is being replaced; the existing increment in `reconnecting` carries over + // to the new task, so no counter change is needed here. + reconnectTask.cancel(); + } else { + reconnecting.incrementAndGet(); + } + reconnectTask = + timerService.newTimeout( + timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); } - reconnectTask = - timerService.newTimeout( - timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter)); } @@ -658,7 +750,7 @@ private void incHeartbeatCounters(int fail) { } /** Stops reconnecting task if it is active. */ - private void stopReconnectTask() { + private synchronized void stopReconnectTask() { if (reconnectTask != null) { reconnecting.decrementAndGet(); reconnectTask.cancel(); diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java index a3a048b0..0b69b8ff 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/BasePoolTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ThreadLocalRandom; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import io.micrometer.core.instrument.Counter; @@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer container, String command) { protected int getActiveConnectionsCount(TarantoolContainer tt) { try { - List result = - TarantoolContainerClientHelper.executeCommandDecoded( - tt, "return box.stat.net().CONNECTIONS.current"); - return (Integer) result.get(0) - 1; + // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker; + // the loop's fiber.sleep lets it drain pending connections before we read. + String lua = + "local last = box.stat.net().CONNECTIONS.current;" + + " for i = 1, 50 do" + + " require('fiber').sleep(0.05);" + + " local cur = box.stat.net().CONNECTIONS.current;" + + " if cur == last then return cur - 1 end;" + + " last = cur;" + + " end;" + + " return last - 1"; + List result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua); + return (Integer) result.get(0); } catch (Exception e) { throw new RuntimeException(e); } @@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer tt, int basel return getActiveConnectionsCount(tt) - baseline; } + /** + * Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why + * a single read is unreliable. + * + * @param tt the Tarantool container under test + * @param expected the expected number of active connections + */ + protected void waitForActiveConnections(TarantoolContainer tt, int expected) { + try { + waitFor( + "Active connections count never reached " + expected, + Duration.ofSeconds(10), + () -> assertEquals(expected, getActiveConnectionsCount(tt))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + protected MeterRegistry createMetricsRegistry() { MeterRegistry metricsRegistry = new SimpleMeterRegistry(); LongTaskTimer.builder("request.timer") diff --git a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java index 1050061e..6106a473 100644 --- a/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java +++ b/tarantool-pooling/src/test/java/io/tarantool/pool/integration/ConnectionPoolReconnectsTest.java @@ -77,7 +77,7 @@ public void testReconnectAfterNodeFailure() throws Exception { assertTrue(pool.hasAvailableClients()); List clients = getConnects(pool, "node-a", count1); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); tt.stop(); Thread.sleep(1000); @@ -110,7 +110,7 @@ public void testReconnectAfterNodeFailure() throws Exception { }); assertTrue(pingClients(clients)); - assertEquals(count1, getActiveConnectionsCount(tt)); + waitForActiveConnections(tt, count1); assertEquals(count1, metricsRegistry.get("pool.size").gauge().value()); assertEquals(count1, metricsRegistry.get("pool.available").gauge().value());