-
Notifications
You must be signed in to change notification settings - Fork 0
fix(pooling): fix race conditions, deadlock, and connection leak in PoolEntry #100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5d045cb
6ba4d21
3d7d7a1
f8ca923
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ | |
| import java.util.ArrayDeque; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.function.BiFunction; | ||
| import java.util.function.Consumer; | ||
|
|
@@ -146,24 +147,30 @@ final class PoolEntry { | |
| private CompletableFuture<IProtoClient> connectFuture; | ||
|
|
||
| /** Last heartbeat state/event. */ | ||
| private HeartbeatEvent lastHeartbeatEvent; | ||
| private volatile HeartbeatEvent lastHeartbeatEvent; | ||
|
|
||
| /** Heartbeat timer/task. */ | ||
| private Timeout heartbeatTask; | ||
| private volatile Timeout heartbeatTask; | ||
|
|
||
| /** Reconnection task. */ | ||
| private Timeout reconnectTask; | ||
|
|
||
| /** Flag signaling if heartbeat started or not. */ | ||
| private boolean isHeartbeatStarted; | ||
| private volatile boolean isHeartbeatStarted; | ||
|
|
||
| /** | ||
| * Flag signaling if connection is available or not. | ||
| * | ||
| * <p>When connection comes to invalidated state or killed, pool entry is locked and connection | ||
| * will not be returned to outer client. | ||
| */ | ||
| private boolean isLocked; | ||
| private volatile boolean isLocked; | ||
|
|
||
| /** | ||
| * Per-generation idempotency flag for {@link #shutdown()} close-event emit; reset in {@link | ||
| * #internalConnect()} when a new connection generation begins. | ||
| */ | ||
| private final AtomicBoolean isShutdown = new AtomicBoolean(false); | ||
|
|
||
| /** Count of failed pings occurred in invalidated state. */ | ||
| private int currentDeathPings; | ||
|
|
@@ -305,7 +312,7 @@ public IProtoClient getClient() { | |
| * | ||
| * <p>Also increments count of unavailable clients. | ||
| */ | ||
| public void lock() { | ||
| public synchronized void lock() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Тоже давай отразим в комментарии, из каких потоков может быть вызван этот метод, и что может случиться:
Есть вероятность, что могут случиться одновременно события Б и В (решили вывести соединение из под нагрузки и тут же прилетает его завершение).2 В этом случае может увеличиться дважды счётчик unavailable. Какие еще события могут произойти? |
||
| if (!isLocked) { | ||
| unavailable.incrementAndGet(); | ||
| isLocked = true; | ||
|
|
@@ -317,7 +324,7 @@ public void lock() { | |
| * | ||
| * <p>Also decrements count of unavailable clients and cancels reconnect task. | ||
| */ | ||
| public void unlock() { | ||
| public synchronized void unlock() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Хорошо бы отразить здесь в комментарии, из каких потоков может быть вызован этот метод.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Здесь указанные события одновременно произойти не могут:
|
||
| if (isLocked) { | ||
| stopReconnectTask(); | ||
| unavailable.decrementAndGet(); | ||
|
|
@@ -340,27 +347,36 @@ public void close() { | |
| shutdown(); | ||
| } | ||
|
|
||
| /** Closes client and stops heartbeat task is started. */ | ||
| /** | ||
| * Closes the underlying client and stops the heartbeat task. | ||
| * | ||
| * <p>Performs field mutations under the entry monitor, then releases it before calling {@code | ||
| * client.close()} (which acquires the {@code ConnectionImpl} monitor) and emitting the close | ||
| * event. Holding the entry monitor across either of those calls would create an ABBA deadlock | ||
| * with the Netty close-callback path, which takes the {@code ConnectionImpl} monitor first and | ||
| * then re-enters {@link #handleConnectError(Object, Throwable)} on the entry. | ||
| */ | ||
| public void shutdown() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shutdown() также надо проанализировать, из каких потоков он может быть вызван. |
||
| connectFuture = null; | ||
| stopHeartbeat(); | ||
| synchronized (this) { | ||
| connectFuture = null; | ||
| stopHeartbeat(); | ||
| } | ||
| try { | ||
| client.close(); | ||
| } catch (Exception e) { | ||
| log.warn("Cannot close client in pool", e); | ||
| } | ||
| emit(listener -> listener.onConnectionClosed(tag, index)); | ||
| if (isShutdown.compareAndSet(false, true)) { | ||
| emit(listener -> listener.onConnectionClosed(tag, index)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Start client connection process and returns futures. | ||
| * | ||
| * @return {@link java.util.concurrent.CompletableFuture} with client | ||
| */ | ||
| public synchronized CompletableFuture<IProtoClient> connect() { | ||
| if (connectFuture != null) { | ||
| return connectFuture; | ||
| } | ||
| public CompletableFuture<IProtoClient> connect() { | ||
| return internalConnect(); | ||
| } | ||
|
|
||
|
|
@@ -410,15 +426,23 @@ public void stopHeartbeat() { | |
| /** | ||
| * Internal method used by reconnect task and public connect. | ||
| * | ||
| * <p>See {@link #shutdown()} for the monitor-ordering reasoning; {@code client.connect()} runs | ||
| * outside the entry monitor for the same reason. | ||
| * | ||
| * @return {@link java.util.concurrent.CompletableFuture} with client | ||
| */ | ||
| private CompletableFuture<IProtoClient> internalConnect() { | ||
| synchronized (this) { | ||
| if (connectFuture != null) { | ||
| return connectFuture; | ||
| } | ||
| } | ||
| log.info("connect {}/{}", tag, index); | ||
| LongTaskTimer.Sample timer = startTimer(connectTime); | ||
| CompletableFuture<?> future = | ||
| client.connect(group.getAddress(), connectTimeout, gracefulShutdown); | ||
| String user = group.getUser(); | ||
| connectFuture = | ||
| CompletableFuture<IProtoClient> cf = | ||
| future | ||
| .thenCompose( | ||
| greeting -> { | ||
|
|
@@ -428,9 +452,16 @@ private CompletableFuture<IProtoClient> internalConnect() { | |
| } | ||
| return client.ping(firstPingOpts); | ||
| }) | ||
| .thenApply(r -> client) | ||
| .whenComplete(this::onConnectComplete); | ||
| return connectFuture; | ||
| .thenApply(r -> client); | ||
| synchronized (this) { | ||
| if (connectFuture != null) { | ||
| return connectFuture; | ||
| } | ||
| connectFuture = cf; | ||
| isShutdown.set(false); | ||
| } | ||
| cf.whenComplete(this::onConnectComplete); | ||
| return cf; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -470,7 +501,9 @@ private void handleConnectError(Object r, Throwable exc) { | |
| return; | ||
| } | ||
| Throwable failure = exc.getCause() != null ? exc.getCause() : exc; | ||
| connectFuture = null; | ||
| synchronized (this) { | ||
| connectFuture = null; | ||
| } | ||
| log.error("connect error {}/{}: {}", tag, index, failure.toString()); | ||
| emit(listener -> listener.onConnectionFailed(tag, index, failure)); | ||
| lock(); | ||
|
|
@@ -480,13 +513,19 @@ private void handleConnectError(Object r, Throwable exc) { | |
|
|
||
| /** Reconnect task scheduler. */ | ||
| private void connectAfter() { | ||
| log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); | ||
| if (reconnectTask == null) { | ||
| reconnecting.incrementAndGet(); | ||
| synchronized (this) { | ||
| log.info("reconnect {}/{} after {} ms", tag, index, reconnectAfter); | ||
| if (reconnectTask != null) { | ||
| // existing task is being replaced; the existing increment in `reconnecting` carries over | ||
| // to the new task, so no counter change is needed here. | ||
| reconnectTask.cancel(); | ||
| } else { | ||
| reconnecting.incrementAndGet(); | ||
| } | ||
| reconnectTask = | ||
| timerService.newTimeout( | ||
| timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); | ||
| } | ||
| reconnectTask = | ||
| timerService.newTimeout( | ||
| timeout -> internalConnect(), reconnectAfter, TimeUnit.MILLISECONDS); | ||
| emit(listener -> listener.onReconnectScheduled(tag, index, reconnectAfter)); | ||
| } | ||
|
|
||
|
|
@@ -658,7 +697,7 @@ private void incHeartbeatCounters(int fail) { | |
| } | ||
|
|
||
| /** Stops reconnecting task if it is active. */ | ||
| private void stopReconnectTask() { | ||
| private synchronized void stopReconnectTask() { | ||
| if (reconnectTask != null) { | ||
| reconnecting.decrementAndGet(); | ||
| reconnectTask.cancel(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| import java.util.concurrent.CompletionException; | ||
| import java.util.concurrent.ThreadLocalRandom; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.junit.jupiter.api.Assertions.fail; | ||
| import io.micrometer.core.instrument.Counter; | ||
|
|
@@ -93,10 +94,19 @@ protected void execLua(TarantoolContainer<?> container, String command) { | |
|
|
||
| protected int getActiveConnectionsCount(TarantoolContainer<?> tt) { | ||
| try { | ||
| List<? extends Object> result = | ||
| TarantoolContainerClientHelper.executeCommandDecoded( | ||
| tt, "return box.stat.net().CONNECTIONS.current"); | ||
| return (Integer) result.get(0) - 1; | ||
| // box.stat.net().CONNECTIONS.current is updated asynchronously by the IProto worker; | ||
| // the loop's fiber.sleep lets it drain pending connections before we read. | ||
| String lua = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Мне кажется, что именно эти изменения, вкупе с изменениями ниже (waitForActiveConnections) и дают наибольшй эффект, нежели попытки обмазать всё блокировками в PoolEntry |
||
| "local last = box.stat.net().CONNECTIONS.current;" | ||
| + " for i = 1, 50 do" | ||
| + " require('fiber').sleep(0.05);" | ||
| + " local cur = box.stat.net().CONNECTIONS.current;" | ||
| + " if cur == last then return cur - 1 end;" | ||
| + " last = cur;" | ||
| + " end;" | ||
| + " return last - 1"; | ||
| List<? extends Object> result = TarantoolContainerClientHelper.executeCommandDecoded(tt, lua); | ||
| return (Integer) result.get(0); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
|
|
@@ -106,6 +116,24 @@ protected int getActiveConnectionsCountDelta(TarantoolContainer<?> tt, int basel | |
| return getActiveConnectionsCount(tt) - baseline; | ||
| } | ||
|
|
||
| /** | ||
| * Retries {@link #getActiveConnectionsCount} until it equals {@code expected} — see there for why | ||
| * a single read is unreliable. | ||
| * | ||
| * @param tt the Tarantool container under test | ||
| * @param expected the expected number of active connections | ||
| */ | ||
| protected void waitForActiveConnections(TarantoolContainer<?> tt, int expected) { | ||
| try { | ||
| waitFor( | ||
| "Active connections count never reached " + expected, | ||
| Duration.ofSeconds(10), | ||
| () -> assertEquals(expected, getActiveConnectionsCount(tt))); | ||
| } catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| } | ||
|
|
||
| protected MeterRegistry createMetricsRegistry() { | ||
| MeterRegistry metricsRegistry = new SimpleMeterRegistry(); | ||
| LongTaskTimer.builder("request.timer") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Т.к. подключение в обойме может быть затронуто из нескольких потоков, для его блокировки и разблокировки, то вероятно, полезнее будет сделать isLocked как AtomicBoolean и переделать методы блокировки через него. Возможно, этого изменения будет достаточно