From a3faf2673b266251b87765db6e039e4446b662bf Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Tue, 17 Mar 2026 20:10:31 -0700 Subject: [PATCH 01/17] Add tiered send/receive recovery matching Go SDK parity Add RecoveryKind error classification and recovery-aware retry to azure-core-amqp. Apply tiered recovery to all Service Bus sender, receiver, and session paths. On LINK errors: dispose stale link/session, retry with fresh resources. On CONNECTION errors: force-close the cached connection, retry with fresh connection. Includes quick-retry optimization and didQuickRetry deduplication. Fixes #44688 --- .../implementation/AmqpChannelProcessor.java | 9 + .../ReactorConnectionCache.java | 22 +++ .../amqp/implementation/RecoveryKind.java | 146 ++++++++++++++ .../core/amqp/implementation/RetryUtil.java | 102 ++++++++++ .../amqp/implementation/RecoveryKindTest.java | 180 ++++++++++++++++++ .../servicebus/ConnectionCacheWrapper.java | 13 ++ .../ServiceBusReceiverAsyncClient.java | 30 ++- .../ServiceBusSenderAsyncClient.java | 83 ++++++-- .../servicebus/ServiceBusSessionAcquirer.java | 15 +- .../servicebus/ServiceBusSessionManager.java | 19 +- .../ServiceBusSenderAsyncClientTest.java | 5 +- 11 files changed, 595 insertions(+), 29 deletions(-) create mode 100644 sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java index 3c1eef933a38..4f23f86e92da 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java @@ -347,6 +347,15 @@ private void setAndClearChannel() { close(oldChannel); } + /** + * Force-closes the current cached channel so that the next subscriber receives a fresh one. + * This is used for connection-level recovery when the current connection is stale + * but the processor has not detected it (e.g., heartbeats echoed by intermediate infrastructure). + */ + public void forceCloseChannel() { + setAndClearChannel(); + } + /** * Checks the current state of the channel for this channel and returns true if the channel is null or if this * processor is disposed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index d772cfdf0698..ee155d982b15 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -172,6 +172,28 @@ public boolean isCurrentConnectionClosed() { return (currentConnection != null && currentConnection.isDisposed()) || terminated; } + /** + * Closes the current cached connection (if any) so that the next {@link #get()} call creates + * a fresh connection. This is used for connection-level recovery when the current connection + * is in a stale state that the cache's normal error detection (via endpoint state signals) + * has not detected — for example, when intermediate infrastructure (load balancers, NAT gateways) + * is echoing AMQP heartbeats on behalf of a dead connection. + * + *

This is modeled after the Go SDK's {@code Namespace.Recover()} which explicitly closes + * the old connection and increments the connection revision.

+ * + *

This method is safe to call concurrently. If the connection is already closed or being + * closed, this is a no-op.

+ */ + public void forceCloseConnection() { + final T connection = currentConnection; + if (connection != null && !connection.isDisposed()) { + withConnectionId(logger, connection.getId()) + .log("Force-closing connection for recovery. Next get() will create a fresh connection."); + closeConnection(connection, logger, "Force-close for connection recovery."); + } + } + /** * Terminate so that consumers will no longer be able to request connection. If there is a current (cached) * connection then it will be closed. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java new file mode 100644 index 000000000000..3c210c51cf45 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -0,0 +1,146 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; + +import java.util.concurrent.TimeoutException; + +/** + * Classifies errors into recovery tiers, determining what resources should be closed + * between retry attempts. This follows the tiered recovery pattern used by the Go, .NET, + * Python, and JS Azure SDKs. + * + * + */ +public enum RecoveryKind { + /** + * No recovery needed — retry on the same link and connection. + * Applies to: server-busy, timeouts, operation-cancelled. + */ + NONE, + + /** + * Close the link (and its session) before retrying. The next retry creates a fresh link + * on the same connection. + * Applies to: link:detach-forced, link:stolen, transient AMQP errors on the link. + */ + LINK, + + /** + * Close the entire connection before retrying. The next retry creates a fresh connection, + * session, and link. + * Applies to: connection:forced, connection:framing-error, proton:io, internal-error. + */ + CONNECTION, + + /** + * Do not retry — the error is permanent. + * Applies to: unauthorized-access, not-found, message-size-exceeded. + */ + FATAL; + + /** + * Classifies the given error into a {@link RecoveryKind} that determines what resources + * should be invalidated between retry attempts. + * + * @param error The error to classify. + * @return The recovery kind for the given error. + */ + public static RecoveryKind classify(Throwable error) { + if (error == null) { + return NONE; + } + + // Timeouts — retry on same link, the link may still be healthy. + if (error instanceof TimeoutException) { + return NONE; + } + + if (error instanceof AmqpException) { + final AmqpException amqpError = (AmqpException) error; + final AmqpErrorCondition condition = amqpError.getErrorCondition(); + + if (condition != null) { + switch (condition) { + // Connection-level errors — close the entire connection. + case CONNECTION_FORCED: + case CONNECTION_FRAMING_ERROR: + case CONNECTION_REDIRECT: + case PROTON_IO: + case INTERNAL_ERROR: + return CONNECTION; + + // Link-level errors — close the link, keep the connection. + case LINK_DETACH_FORCED: + case LINK_STOLEN: + case LINK_REDIRECT: + case PARTITION_NOT_OWNED_ERROR: + case TRANSFER_LIMIT_EXCEEDED: + return LINK; + + // Fatal errors — do not retry. + case NOT_FOUND: + case UNAUTHORIZED_ACCESS: + case LINK_PAYLOAD_SIZE_EXCEEDED: + case RESOURCE_LIMIT_EXCEEDED: + case NOT_ALLOWED: + case NOT_IMPLEMENTED: + case ENTITY_DISABLED_ERROR: + case ENTITY_ALREADY_EXISTS: + case PUBLISHER_REVOKED_ERROR: + case ARGUMENT_ERROR: + case ARGUMENT_OUT_OF_RANGE_ERROR: + case ILLEGAL_STATE: + case MESSAGE_LOCK_LOST: + case STORE_LOCK_LOST_ERROR: + return FATAL; + + // Server-busy and timeouts — retry on same link. + case SERVER_BUSY_ERROR: + case TIMEOUT_ERROR: + case OPERATION_CANCELLED: + return NONE; + + // Session/lock errors — link-level recovery. + // Session lock loss means the session link is invalid and + // a fresh link must be acquired for a new session. + case SESSION_LOCK_LOST: + case SESSION_CANNOT_BE_LOCKED: + case SESSION_NOT_FOUND: + case MESSAGE_NOT_FOUND: + return LINK; + + default: + break; + } + } + + // Transient AMQP errors without a specific condition — link recovery. + if (amqpError.isTransient()) { + return LINK; + } + + // Non-transient AMQP errors without a recognized condition — fatal. + return FATAL; + } + + // RequestResponseChannelClosedException — link-level (parent connection disposing). + if (error instanceof RequestResponseChannelClosedException) { + return LINK; + } + + // Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs). + // The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer + // errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException, + // RuntimeException) should fail fast rather than trigger connection recovery. + return FATAL; + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index c22c8ac928b3..79af3ea9c78c 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -17,6 +17,8 @@ import java.time.Duration; import java.util.Locale; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; /** * Helper class to help with retry policies. @@ -106,6 +108,44 @@ public static Mono withRetry(Mono source, AmqpRetryOptions retryOption return withRetry(source, retryOptions, timeoutMessage, false); } + /** + * Applies the retry policy with tiered recovery between attempts. Before each retry, + * the error is classified via {@link RecoveryKind#classify(Throwable)} and the recovery + * callback is invoked so the caller can close the appropriate resources (link or connection). + * + *

This matches the tiered recovery pattern used by the Go, .NET, Python, and JS SDKs.

+ * + * @param Type of value in the {@link Mono}. + * @param source The publisher to apply the retry policy to. + * @param retryOptions A {@link AmqpRetryOptions}. + * @param errorMessage Text added to error logs. + * @param recoveryAction Called between retry attempts with the classified {@link RecoveryKind}. + * The caller should close the link (for {@link RecoveryKind#LINK}) or connection + * (for {@link RecoveryKind#CONNECTION}) so the next retry creates fresh resources. + * + * @return A publisher that returns the results of the {@link Mono} if any of the retry attempts + * are successful. Otherwise, propagates the last error. + */ + public static Mono withRetryAndRecovery(Mono source, AmqpRetryOptions retryOptions, String errorMessage, + Consumer recoveryAction) { + return withRetryAndRecovery(source, retryOptions, errorMessage, false, recoveryAction); + } + + /** + * Like {@link #withRetryAndRecovery(Mono, AmqpRetryOptions, String, Consumer)} but with an option to allow + * long-running operations that should not be subject to the per-attempt timeout. + * + * @param allowsLongOperation If true, the source Mono will not be wrapped with a per-attempt timeout. + */ + public static Mono withRetryAndRecovery(Mono source, AmqpRetryOptions retryOptions, String errorMessage, + boolean allowsLongOperation, Consumer recoveryAction) { + if (!allowsLongOperation) { + source = source.timeout(retryOptions.getTryTimeout()); + } + return source.retryWhen(createRetryWithRecovery(retryOptions, recoveryAction)) + .doOnError(error -> LOGGER.error(errorMessage, error)); + } + static Retry createRetry(AmqpRetryOptions options) { final Duration delay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); final RetryBackoffSpec retrySpec; @@ -129,4 +169,66 @@ static Retry createRetry(AmqpRetryOptions options) { .filter(error -> error instanceof TimeoutException || (error instanceof AmqpException && ((AmqpException) error).isTransient())); } + + /** + * Creates a Reactor {@link Retry} spec that performs tiered recovery between retry attempts. + * Before each retry, the error is classified and the recovery callback is invoked. + * + *

Includes a quick-retry optimization matching the Go SDK: on the first LINK or CONNECTION + * error, the retry fires immediately (no backoff) since the error may come from a previously + * stale link and recovery has just created a fresh one.

+ */ + static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer recoveryAction) { + final int maxRetries = options.getMaxRetries(); + final Duration baseDelay = options.getDelay().plus(SERVER_BUSY_WAIT_TIME); + final Duration maxDelay = options.getMaxDelay(); + final boolean isFixed = options.getMode() == com.azure.core.amqp.AmqpRetryMode.FIXED; + final AtomicBoolean didQuickRetry = new AtomicBoolean(false); + + return Retry.from(retrySignals -> retrySignals.flatMap(signal -> { + final Throwable failure = signal.failure(); + final long attempt = signal.totalRetriesInARow(); + final RecoveryKind kind = RecoveryKind.classify(failure); + + // FATAL errors — do not retry. + if (kind == RecoveryKind.FATAL) { + return Mono.error(failure); + } + + // Check retry budget. + if (attempt >= maxRetries) { + return Mono.error(failure); + } + + // Perform recovery before retry. + if (kind != RecoveryKind.NONE && recoveryAction != null) { + try { + recoveryAction.accept(kind); + } catch (Exception e) { + LOGGER.atWarning().log("Recovery action failed.", e); + } + } + + // Quick retry: on the FIRST LINK/CONNECTION error, retry immediately (no backoff). + // Uses didQuickRetry flag to prevent repeated immediate retries under persistent + // errors — matching the Go SDK's didQuickRetry + ResetAttempts() pattern. + if (!didQuickRetry.getAndSet(true) && (kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION)) { + LOGGER.atInfo().log("Quick retry after {} recovery (first occurrence).", kind); + return Mono.just(attempt); + } + + // Standard backoff delay. + final Duration delay; + if (isFixed) { + delay = baseDelay; + } else { + long millis = baseDelay.toMillis() * (1L << Math.min(attempt, 30)); + delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis())); + } + final double jitter = 1.0 + (Math.random() * 2 - 1) * JITTER_FACTOR; + final Duration jitteredDelay = Duration.ofMillis((long) (delay.toMillis() * jitter)); + + return Mono.delay(jitteredDelay).thenReturn(attempt); + })); + } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java new file mode 100644 index 000000000000..6009f40b6d18 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -0,0 +1,180 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Tests for {@link RecoveryKind#classify(Throwable)}. + */ +class RecoveryKindTest { + + @Test + void nullErrorReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(null)); + } + + @Test + void timeoutExceptionReturnsNone() { + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(new TimeoutException("timed out"))); + } + + @Test + void serverBusyReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "server busy", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void timeoutErrorConditionReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "timeout", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } + + @Test + void linkDetachForcedReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach forced", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void linkStolenReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_STOLEN, "link stolen", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transientAmqpErrorWithoutConditionReturnsLink() { + final AmqpException error = new AmqpException(true, "transient error", null, null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void connectionForcedReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FORCED, "connection forced", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionFramingErrorReturnsConnection() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.CONNECTION_FRAMING_ERROR, "framing error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void internalErrorReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.INTERNAL_ERROR, "internal error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void protonIoReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.PROTON_IO, "io error", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void connectionRedirectReturnsConnection() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.CONNECTION_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.CONNECTION, RecoveryKind.classify(error)); + } + + @Test + void linkRedirectReturnsLink() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.LINK_REDIRECT, "redirect", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void transferLimitExceededReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.TRANSFER_LIMIT_EXCEEDED, "transfer limit", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void argumentErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.ARGUMENT_ERROR, "bad argument", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notFoundReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "not found", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void unauthorizedAccessReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.UNAUTHORIZED_ACCESS, "unauthorized", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void payloadSizeExceededReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, "too large", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void notAllowedReturnsFatal() { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.NOT_ALLOWED, "not allowed", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void nonTransientAmqpErrorReturnsFatal() { + final AmqpException error = new AmqpException(false, "permanent error", null, null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void requestResponseChannelClosedReturnsLink() { + final RequestResponseChannelClosedException error = new RequestResponseChannelClosedException("channel closed"); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void unknownExceptionReturnsFatal() { + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new RuntimeException("unknown"))); + } + + @Test + void sessionLockLostReturnsLink() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.SESSION_LOCK_LOST, "session lock lost", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void messageLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.MESSAGE_LOCK_LOST, "message lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void storeLockLostReturnsFatal() { + final AmqpException error + = new AmqpException(false, AmqpErrorCondition.STORE_LOCK_LOST_ERROR, "store lock lost", null); + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(error)); + } + + @Test + void operationCancelledReturnsNone() { + final AmqpException error = new AmqpException(true, AmqpErrorCondition.OPERATION_CANCELLED, "cancelled", null); + assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); + } +} diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java index 1d4aa44b4c42..885ec94e606d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ConnectionCacheWrapper.java @@ -51,4 +51,17 @@ AmqpRetryOptions getRetryOptions() { boolean isChannelClosed() { return isV2 ? cache.isCurrentConnectionClosed() : processor.isChannelClosed(); } + + /** + * Force-closes the current cached connection so the next get() creates a fresh one. + * Used for connection-level recovery when the connection is stale but the cache + * has not detected it via endpoint state signals. + */ + void forceCloseConnection() { + if (isV2) { + cache.forceCloseConnection(); + } else { + processor.forceCloseChannel(); + } + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 0b3154ec33ef..ad5fbba1f930 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -11,6 +11,7 @@ import com.azure.core.amqp.implementation.MessageFlux; import com.azure.core.amqp.implementation.MessageSerializer; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException; @@ -1734,17 +1735,26 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { // [2]. When we try to create a new session (to host the new link) but on a connection being disposed, // the retry can eventually receive a new connection and then proceed with creating session and link. // - final Mono retryableReceiveLinkMono - = RetryUtil.withRetry(receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> { - // When the current connection is being disposed, the V1 ConnectionProcessor or V2 ReactorConnectionCache - // can produce a new connection if downstream request. In this context, treat - // RequestResponseChannelClosedException error from the following two sources as retry-able so that - // retry can obtain a new connection - - // 1. error from the RequestResponseChannel scoped to the current connection being disposed, - // 2. error from the V2 RequestResponseChannelCache scoped to the current connection being disposed. - // + final Mono retryableReceiveLinkMono = RetryUtil + .withRetryAndRecovery(receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> { return new AmqpException(true, e.getMessage(), e, null); - }), connectionCacheWrapper.getRetryOptions(), "Failed to create receive link " + linkName, true); + }), connectionCacheWrapper.getRetryOptions(), "Failed to create receive link " + linkName, true, + recoveryKind -> { + if (recoveryKind == RecoveryKind.LINK || recoveryKind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .addKeyValue("recoveryKind", recoveryKind) + .log("Receive link creation failed, performing {} recovery.", recoveryKind); + + // For LINK errors during link creation, the session hosting the link may be stale. + // Ask the connection to remove it so the next retry creates a fresh session + link. + // The entityPath is the session name used by createReceiveLink(). + connectionProcessor.subscribe(connection -> connection.removeSession(entityPath)); + } + if (recoveryKind == RecoveryKind.CONNECTION) { + connectionCacheWrapper.forceCloseConnection(); + } + }); // A Flux that produces a new AmqpReceiveLink each time it receives a request from the below // 'AmqpReceiveLinkProcessor'. Obviously, the processor requests a link when there is a downstream subscriber. diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 58db3b129e65..030cf772243b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -10,7 +10,9 @@ import com.azure.core.amqp.implementation.AmqpSendLink; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.RequestResponseChannelClosedException; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.annotation.ServiceClient; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; @@ -228,12 +230,14 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { private final ClientLogger logger; private final AtomicReference linkName = new AtomicReference<>(); + private final AtomicReference lastSendLink = new AtomicReference<>(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final MessageSerializer messageSerializer; private final AmqpRetryOptions retryOptions; private final MessagingEntityType entityType; private final Runnable onClientClose; private final String entityName; + private final ConnectionCacheWrapper connectionCacheWrapper; private final Mono connectionProcessor; private final String fullyQualifiedNamespace; private final String viaEntityName; @@ -254,6 +258,7 @@ public final class ServiceBusSenderAsyncClient implements AutoCloseable { this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' cannot be null."); this.entityName = Objects.requireNonNull(entityName, "'entityPath' cannot be null."); Objects.requireNonNull(connectionCacheWrapper, "'connectionCacheWrapper' cannot be null."); + this.connectionCacheWrapper = connectionCacheWrapper; this.connectionProcessor = connectionCacheWrapper.getConnection(); this.fullyQualifiedNamespace = connectionCacheWrapper.getFullyQualifiedNamespace(); this.instrumentation = Objects.requireNonNull(instrumentation, "'instrumentation' cannot be null."); @@ -810,8 +815,9 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate return monoError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return tracer.traceScheduleMono("ServiceBus.scheduleMessage", - getSendLinkWithRetry("schedule-message").flatMap(link -> link.getLinkSize().flatMap(size -> { + return tracer.traceScheduleMono("ServiceBus.scheduleMessage", getSendLink("schedule-message").flatMap(link -> { + lastSendLink.set(link); + return link.getLinkSize().flatMap(size -> { final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; return connectionProcessor.flatMap(connection -> connection.getManagementNode(entityName, entityType)) .flatMap( @@ -819,7 +825,8 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), transactionContext) .next()); - })), message, message.getContext()).onErrorMap(this::mapError); + }); + }), message, message.getContext()).onErrorMap(this::mapError); } /** @@ -860,6 +867,7 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, }); final Mono sendMessage = getSendLink("send-batch").flatMap(link -> { + lastSendLink.set(link); if (transactionContext != null && transactionContext.getTransactionId() != null) { final TransactionalState deliveryState = new TransactionalState(); deliveryState.setTxnId(Binary.create(transactionContext.getTransactionId())); @@ -871,8 +879,11 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, } }); - final String message = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); - final Mono withRetry = withRetry(sendMessage, retryOptions, message).onErrorMap(this::mapError); + final String timeoutMessage = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); + final Mono withRetry + = RetryUtil.withRetryAndRecovery(sendMessage, retryOptions, timeoutMessage, recoveryKind -> { + performRecovery(recoveryKind, "sendBatch"); + }).onErrorMap(this::mapError); return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } @@ -883,19 +894,26 @@ private Mono sendFluxInternal(Flux messages, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } - final Mono> batchList - = getSendLinkWithRetry("send-batches").flatMap(link -> link.getLinkSize().flatMap(size -> { + final Mono> batchList = getSendLink("send-batches").flatMap(link -> { + lastSendLink.set(link); + return link.getLinkSize().flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; final CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); return messages.collect( new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); - })); + }); + }); - return batchList.flatMap(list -> Flux.fromIterable(list) + final Mono sendOperation = batchList.flatMap(list -> Flux.fromIterable(list) .flatMap(batch -> sendBatchInternal(batch, transactionContext)) .then() - .doOnError(error -> logger.error("Error sending batch.", error))).onErrorMap(this::mapError); + .doOnError(error -> logger.error("Error sending batch.", error))); + + return RetryUtil.withRetryAndRecovery(sendOperation, retryOptions, "Sending messages timed out." + entityId(), + recoveryKind -> { + performRecovery(recoveryKind, "sendFlux"); + }).onErrorMap(this::mapError); } private Mono getSendLink(String callSite) { @@ -926,7 +944,10 @@ private Mono getSendLink(String callSite) { } private Mono getSendLinkWithRetry(String callSite) { - return withRetry(getSendLink(callSite), retryOptions, String.format(retryGetLinkErrorMessageFormat, callSite)); + return RetryUtil.withRetryAndRecovery(getSendLink(callSite), retryOptions, + String.format(retryGetLinkErrorMessageFormat, callSite), recoveryKind -> { + performRecovery(recoveryKind, "getSendLink-" + callSite); + }); } private Throwable mapError(Throwable throwable) { @@ -936,6 +957,46 @@ private Throwable mapError(Throwable throwable) { return throwable; } + /** + * Performs tiered recovery by disposing stale resources based on the classified error. + * For LINK recovery, disposes the send link so the next retry creates a fresh one. + * For CONNECTION recovery, disposes the link and the connection so the connection + * processor creates everything fresh. + * + *

This matches the Go SDK's RecoverIfNeeded() and the .NET SDK's + * FaultTolerantAmqpObject pattern.

+ */ + private void performRecovery(RecoveryKind recoveryKind, String callSite) { + if (recoveryKind == RecoveryKind.NONE || recoveryKind == RecoveryKind.FATAL) { + return; + } + + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .addKeyValue("recoveryKind", recoveryKind) + .addKeyValue("callSite", callSite) + .log("Performing {} recovery before retry.", recoveryKind); + + // Dispose the cached send link so the next retry creates a fresh one. + final AmqpSendLink link = lastSendLink.getAndSet(null); + if (link != null) { + link.dispose(); + } + linkName.set(null); + + // For CONNECTION errors, the link disposal above is the sender's responsibility. + // Connection-level recovery (closing and recreating the AMQP connection) is handled + // by the ReactorConnectionCache when it detects the connection's endpoint state has + // changed. My job is to reset the cached link reference so the next retry goes through the fresh path — the connection disposal happens organically. + if (recoveryKind == RecoveryKind.CONNECTION) { + // Force-close the cached connection so the next get() on the connection + // processor creates a fresh one. This handles the stale-connection scenario + // where heartbeats are echoed by intermediate infrastructure and the cache + // hasn't detected the failure. Matches Go SDK's Namespace.Recover(). + connectionCacheWrapper.forceCloseConnection(); + } + } + private String entityId() { return " " + ENTITY_PATH_KEY + ":" + entityName + (viaEntityName != null ? " " + VIA_ENTITY_NAME_KEY + ":" + viaEntityName : "") + " "; diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java index ee9a0a9e0399..147524ab1c7b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.MessagingEntityType; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; @@ -137,12 +138,24 @@ private Mono acquireIntern(String sessionId) { return acquireSession(sessionId).timeout(tryTimeout) .retryWhen(Retry.from(signals -> signals.flatMap(signal -> { final Throwable t = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(t); + if (kind == RecoveryKind.CONNECTION) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error acquiring session, forcing connection recovery.", t); + connectionCacheWrapper.forceCloseConnection(); + } if (isTimeoutError(t)) { logger.atVerbose() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) .log("Timeout while acquiring session '{}'.", sessionName(sessionId), t); - // retry session acquire using Schedulers.parallel() and free the QPid thread. + return Mono.delay(Duration.ZERO); + } + if (kind == RecoveryKind.LINK) { + logger.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Link-level error acquiring session, retrying.", t); return Mono.delay(Duration.ZERO); } return publishError(sessionId, t, true); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 3a6343c98710..0a111a305795 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -8,6 +8,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.SessionErrorContext; import com.azure.core.amqp.implementation.MessageSerializer; +import com.azure.core.amqp.implementation.RecoveryKind; import com.azure.core.amqp.implementation.StringUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.servicebus.implementation.DispositionStatus; @@ -281,11 +282,20 @@ Mono getActiveLink() { .timeout(operationTimeout) .then(Mono.just(link)))).retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> { final Throwable failure = signal.failure(); + final RecoveryKind kind = RecoveryKind.classify(failure); LOGGER.atInfo() .addKeyValue(ENTITY_PATH_KEY, entityPath) .addKeyValue("attempt", signal.totalRetriesInARow()) + .addKeyValue("recoveryKind", kind) .log("Error occurred while getting unnamed session.", failure); + if (kind == RecoveryKind.CONNECTION) { + LOGGER.atWarning() + .addKeyValue(ENTITY_PATH_KEY, entityPath) + .log("Connection-level error in session manager, forcing connection recovery."); + connectionCacheWrapper.forceCloseConnection(); + } + if (isDisposed.get()) { return Mono.error( new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())); @@ -293,12 +303,9 @@ Mono getActiveLink() { return Mono.delay(Duration.ZERO); } else if (failure instanceof AmqpException && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) { - // The link closed remotely with 'Detach {errorCondition:com.microsoft:timeout}' frame because - // the broker waited for N seconds (60 sec hard limit today) but there was no free or new session. - // - // Given N seconds elapsed since the last session acquire attempt, request for a session on - // the 'parallel' Scheduler and free the 'QPid' thread for other IO. - // + return Mono.delay(Duration.ZERO); + } else if (kind == RecoveryKind.LINK) { + // Link-level error — retry to get a fresh link. return Mono.delay(Duration.ZERO); } else { final long id = System.nanoTime(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java index 20ca897316e9..204eb1db52f7 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClientTest.java @@ -6,6 +6,8 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpRetryMode; import com.azure.core.amqp.AmqpRetryOptions; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.AmqpTransaction; import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.FixedAmqpRetryPolicy; @@ -668,7 +670,8 @@ void failedSendMessageReportsMetrics(boolean isV2) { when(connection.createSendLink(eq(ENTITY_NAME), eq(ENTITY_NAME), eq(retryOptions), isNull(), eq(CLIENT_IDENTIFIER))).thenReturn(Mono.just(sendLink)); - when(sendLink.send(any(Message.class))).thenThrow(new RuntimeException("foo")); + when(sendLink.send(any(Message.class))) + .thenReturn(Mono.error(new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "entity not found", null))); // Act StepVerifier.create(sender.sendMessage(new ServiceBusMessage(TEST_CONTENTS))) From 5e28c3366e093ae8c5f8c18dcc4338badf255c08 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Tue, 17 Mar 2026 21:09:01 -0700 Subject: [PATCH 02/17] fix(ci): build azure-core-amqp from source for tiered recovery PR Add azure-core-amqp to AdditionalModules in ci.yml and trigger paths so CI builds it from source alongside servicebus. Update pom.xml to reference 2.12.0-beta.1 with current tag (cross-module PR -- to be revisited after azure-core-amqp is released to Maven Central). Note: uses current tag temporarily; reviewer to confirm release sequencing. --- sdk/servicebus/azure-messaging-servicebus/pom.xml | 2 +- sdk/servicebus/ci.yml | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/pom.xml b/sdk/servicebus/azure-messaging-servicebus/pom.xml index 47390eae5860..d55656f00bbc 100644 --- a/sdk/servicebus/azure-messaging-servicebus/pom.xml +++ b/sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -69,7 +69,7 @@ com.azure azure-core-amqp - 2.11.3 + 2.12.0-beta.1 com.azure diff --git a/sdk/servicebus/ci.yml b/sdk/servicebus/ci.yml index 58823bab7a69..b957c80a24d4 100644 --- a/sdk/servicebus/ci.yml +++ b/sdk/servicebus/ci.yml @@ -13,6 +13,7 @@ trigger: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -33,6 +34,7 @@ pr: - sdk/servicebus/azure-messaging-servicebus-stress/ - sdk/servicebus/azure-messaging-servicebus-track2-perf/ - sdk/servicebus/build/ + - sdk/core/azure-core-amqp/ exclude: - sdk/servicebus/pom.xml - sdk/servicebus/azure-messaging-servicebus/pom.xml @@ -59,3 +61,6 @@ extends: # required by the above perf libraries - name: perf-test-core groupId: com.azure + # Build azure-core-amqp from source (needed for RecoveryKind, tiered retry) + - name: azure-core-amqp + groupId: com.azure From 6116b7b30cb099d57615a947339e78032216b057 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 10:41:38 -0700 Subject: [PATCH 03/17] fix(review): address Copilot PR review feedback on tiered recovery Address 6 comments from Copilot PR review on #48460: - sendFluxInternal: wrap only batchList (link acquisition) with withRetryAndRecovery, not the full sendOperation. Wrapping the outer operation caused the user-provided Flux to be re-subscribed on each retry and nested retries with sendBatchInternal. - scheduleMessageInternal: change getSendLink to getSendLinkWithRetry so schedule operations get the same tiered recovery as other send paths. - ReactorConnectionCache.forceCloseConnection: use connection.dispose() instead of closeAsync() so isDisposed() returns true synchronously. This ensures cacheInvalidateIf invalidates the cached reference immediately on the next get() call. - RetryUtil: replace Math.random() with ThreadLocalRandom.current() to eliminate shared RNG contention and improve testability. - performRecovery comment: remove contradictory 'happens organically' comment that conflicted with the explicit forceCloseConnection() call. - ServiceBusReceiverAsyncClient: add error handler to the subscribe() call inside the LINK recovery callback so failures are logged and do not silently leak. --- .../ReactorConnectionCache.java | 5 +- .../core/amqp/implementation/RetryUtil.java | 3 +- .../ServiceBusReceiverAsyncClient.java | 5 +- .../ServiceBusSenderAsyncClient.java | 55 ++++++++++--------- 4 files changed, 40 insertions(+), 28 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index ee155d982b15..248877d56563 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -190,7 +190,10 @@ public void forceCloseConnection() { if (connection != null && !connection.isDisposed()) { withConnectionId(logger, connection.getId()) .log("Force-closing connection for recovery. Next get() will create a fresh connection."); - closeConnection(connection, logger, "Force-close for connection recovery."); + // Call dispose() rather than closeAsync() so that isDisposed() returns true synchronously. + // This ensures cacheInvalidateIf immediately invalidates the cached reference on the next + // get() call, rather than waiting for the async close handshake to complete. + connection.dispose(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index 79af3ea9c78c..a590fe3c034c 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.Locale; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -225,7 +226,7 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer connection.removeSession(entityPath)); + connectionProcessor.subscribe(connection -> connection.removeSession(entityPath), + error -> LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .log("Error removing stale session during LINK recovery.", error)); } if (recoveryKind == RecoveryKind.CONNECTION) { connectionCacheWrapper.forceCloseConnection(); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 030cf772243b..99dc2b8ddb69 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -815,18 +815,21 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate return monoError(logger, new NullPointerException("'scheduledEnqueueTime' cannot be null.")); } - return tracer.traceScheduleMono("ServiceBus.scheduleMessage", getSendLink("schedule-message").flatMap(link -> { - lastSendLink.set(link); - return link.getLinkSize().flatMap(size -> { - final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; - return connectionProcessor.flatMap(connection -> connection.getManagementNode(entityName, entityType)) - .flatMap( - managementNode -> managementNode - .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), - transactionContext) - .next()); - }); - }), message, message.getContext()).onErrorMap(this::mapError); + return tracer + .traceScheduleMono("ServiceBus.scheduleMessage", getSendLinkWithRetry("schedule-message").flatMap(link -> { + lastSendLink.set(link); + return link.getLinkSize().flatMap(size -> { + final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; + return connectionProcessor + .flatMap(connection -> connection.getManagementNode(entityName, entityType)) + .flatMap( + managementNode -> managementNode + .schedule(Arrays.asList(message), scheduledEnqueueTime, maxSize, link.getLinkName(), + transactionContext) + .next()); + }); + }), message, message.getContext()) + .onErrorMap(this::mapError); } /** @@ -905,15 +908,20 @@ private Mono sendFluxInternal(Flux messages, }); }); - final Mono sendOperation = batchList.flatMap(list -> Flux.fromIterable(list) + // Wrap only the link-acquisition+batch-collection phase with retry+recovery. + // Individual batch sends are already retried inside sendBatchInternal, so wrapping the + // entire sendOperation (which includes messages.collect()) would cause the user-provided + // Flux to be re-subscribed on each retry and could duplicate batches that already sent. + final Mono> batchListWithRecovery = RetryUtil.withRetryAndRecovery(batchList, + retryOptions, "Failed to acquire send link for batch collection." + entityId(), + recoveryKind -> performRecovery(recoveryKind, "sendFlux-link")); + + final Mono sendOperation = batchListWithRecovery.flatMap(list -> Flux.fromIterable(list) .flatMap(batch -> sendBatchInternal(batch, transactionContext)) .then() .doOnError(error -> logger.error("Error sending batch.", error))); - return RetryUtil.withRetryAndRecovery(sendOperation, retryOptions, "Sending messages timed out." + entityId(), - recoveryKind -> { - performRecovery(recoveryKind, "sendFlux"); - }).onErrorMap(this::mapError); + return sendOperation.onErrorMap(this::mapError); } private Mono getSendLink(String callSite) { @@ -984,15 +992,12 @@ private void performRecovery(RecoveryKind recoveryKind, String callSite) { } linkName.set(null); - // For CONNECTION errors, the link disposal above is the sender's responsibility. - // Connection-level recovery (closing and recreating the AMQP connection) is handled - // by the ReactorConnectionCache when it detects the connection's endpoint state has - // changed. My job is to reset the cached link reference so the next retry goes through the fresh path — the connection disposal happens organically. + // For CONNECTION errors, explicitly force-close the cached connection so the + // next get() on the connection processor creates a fresh one. This handles the + // stale-connection scenario where heartbeats are echoed by intermediate + // infrastructure and the cache has not yet detected the failure. + // Matches Go SDK's Namespace.Recover(). if (recoveryKind == RecoveryKind.CONNECTION) { - // Force-close the cached connection so the next get() on the connection - // processor creates a fresh one. This handles the stale-connection scenario - // where heartbeats are echoed by intermediate infrastructure and the cache - // hasn't detected the failure. Matches Go SDK's Namespace.Recover(). connectionCacheWrapper.forceCloseConnection(); } } From f3a29a7bef1d38626453e336962420372a4fb416 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 13:24:11 -0700 Subject: [PATCH 04/17] fix(review): address second Copilot review on tiered recovery - T1: ServiceBusSessionAcquirer - return delay after forceCloseConnection for RecoveryKind.CONNECTION so session acquisition retries after connection recovery - T2: ServiceBusSessionManager - merge LINK and CONNECTION into a single delay branch so CONNECTION errors retry instead of falling through to publishError - T3: ServiceBusSenderAsyncClient - narrow withRetryAndRecovery to wrap only link acquisition (getSendLink); messages.collect() moved outside retry boundary to avoid re-subscribing user Flux on retry - T4: RecoveryKind - reclassify OPERATION_CANCELLED from NONE to LINK because core-amqp raises it when AMQP layer unexpectedly aborts or disconnects, which requires link recovery (e.g. ReceiverUnsettledDeliveries remote Released outcome) - T5: RecoveryKind - reclassify RESOURCE_LIMIT_EXCEEDED from FATAL to NONE to match ReactorSender.isGeneralSendError() which treats it as retriable alongside SERVER_BUSY and TIMEOUT - T6: RetryUtilTest - add four tests for createRetryWithRecovery covering FATAL no-retry, LINK recovery callback, CONNECTION recovery callback, and retry budget exhaustion; use virtual time for backoff-delay scenarios - T7: RecoveryKindTest - rename operationCancelled test to expect LINK result, add new test asserting RESOURCE_LIMIT_EXCEEDED classifies as NONE --- .../amqp/implementation/RecoveryKind.java | 12 +- .../amqp/implementation/RecoveryKindTest.java | 9 +- .../amqp/implementation/RetryUtilTest.java | 104 ++++++++++++++++++ .../ServiceBusSenderAsyncClient.java | 26 ++--- .../servicebus/ServiceBusSessionAcquirer.java | 1 + .../servicebus/ServiceBusSessionManager.java | 4 +- 6 files changed, 135 insertions(+), 21 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java index 3c210c51cf45..31ccc5be35d4 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -23,7 +23,7 @@ public enum RecoveryKind { /** * No recovery needed — retry on the same link and connection. - * Applies to: server-busy, timeouts, operation-cancelled. + * Applies to: server-busy, timeouts, resource-limit-exceeded. */ NONE, @@ -84,13 +84,15 @@ public static RecoveryKind classify(Throwable error) { case LINK_REDIRECT: case PARTITION_NOT_OWNED_ERROR: case TRANSFER_LIMIT_EXCEEDED: + // operation-cancelled can signal "AMQP layer unexpectedly aborted or disconnected" + // (e.g. ReceiverUnsettledDeliveries remote Released outcome), requiring link recovery. + case OPERATION_CANCELLED: return LINK; // Fatal errors — do not retry. case NOT_FOUND: case UNAUTHORIZED_ACCESS: case LINK_PAYLOAD_SIZE_EXCEEDED: - case RESOURCE_LIMIT_EXCEEDED: case NOT_ALLOWED: case NOT_IMPLEMENTED: case ENTITY_DISABLED_ERROR: @@ -103,10 +105,12 @@ public static RecoveryKind classify(Throwable error) { case STORE_LOCK_LOST_ERROR: return FATAL; - // Server-busy and timeouts — retry on same link. + // Server-busy, timeouts, and resource-limit errors — retry on same link. + // RESOURCE_LIMIT_EXCEEDED is treated as transient here because ReactorSender + // groups it alongside SERVER_BUSY and TIMEOUT in its send-error retry logic. case SERVER_BUSY_ERROR: case TIMEOUT_ERROR: - case OPERATION_CANCELLED: + case RESOURCE_LIMIT_EXCEEDED: return NONE; // Session/lock errors — link-level recovery. diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java index 6009f40b6d18..7c36e7d61d9f 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -173,8 +173,15 @@ void storeLockLostReturnsFatal() { } @Test - void operationCancelledReturnsNone() { + void operationCancelledReturnsLink() { final AmqpException error = new AmqpException(true, AmqpErrorCondition.OPERATION_CANCELLED, "cancelled", null); + assertEquals(RecoveryKind.LINK, RecoveryKind.classify(error)); + } + + @Test + void resourceLimitExceededReturnsNone() { + final AmqpException error + = new AmqpException(true, AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED, "resource limit", null); assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java index b25d920b31f6..cd5e08a81558 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -9,6 +9,7 @@ import com.azure.core.amqp.AmqpTransportType; import com.azure.core.amqp.ExponentialAmqpRetryPolicy; import com.azure.core.amqp.FixedAmqpRetryPolicy; +import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; import org.junit.jupiter.api.Assertions; @@ -24,9 +25,12 @@ import reactor.util.retry.RetryBackoffSpec; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Stream; @@ -203,4 +207,104 @@ void retryFilter(Throwable throwable, boolean expected) { // Assert assertEquals(expected, actual); } + + // ---- createRetryWithRecovery tests ---- + + /** + * FATAL errors must not be retried and must not invoke the recovery callback. + */ + @Test + void createRetryWithRecovery_fatalErrorTerminatesImmediately() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); + final AtomicInteger recoveryCount = new AtomicInteger(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, kind -> recoveryCount.incrementAndGet()); + final AmqpException fatalError = new AmqpException(false, AmqpErrorCondition.NOT_FOUND, "not found", null); + + // Act & Assert + StepVerifier.create(Mono.error(fatalError).retryWhen(retry)) + .expectErrorSatisfies(e -> assertEquals(fatalError, e)) + .verify(Duration.ofSeconds(5)); + + assertEquals(0, recoveryCount.get(), "Recovery callback must not be called for FATAL errors"); + } + + /** + * LINK errors must invoke the recovery callback and retry. The first occurrence uses + * the quick-retry path (no backoff delay). + */ + @Test + void createRetryWithRecovery_linkErrorInvokesRecoveryAndRetries() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); + final List recoveries = new ArrayList<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, recoveries::add); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + if (attempt.getAndIncrement() < 2) { + return Mono.error(new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null)); + } + return Mono.just(42); + }); + + // Act & Assert — use virtual time because the second retry applies a SERVER_BUSY backoff delay. + StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectNext(42) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + assertEquals(2, recoveries.size(), "Recovery callback called once per LINK error"); + assertTrue(recoveries.stream().allMatch(k -> k == RecoveryKind.LINK)); + } + + /** + * CONNECTION errors must invoke the recovery callback with CONNECTION kind. + */ + @Test + void createRetryWithRecovery_connectionErrorInvokesRecovery() { + // Arrange + final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(2).setDelay(Duration.ofMillis(100)); + final AtomicReference capturedKind = new AtomicReference<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, capturedKind::set); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + if (attempt.getAndIncrement() == 0) { + return Mono.error(new AmqpException(true, AmqpErrorCondition.CONNECTION_FORCED, "forced", null)); + } + return Mono.just(1); + }); + + // Act & Assert + StepVerifier.create(source.retryWhen(retry)).expectNext(1).expectComplete().verify(Duration.ofSeconds(5)); + + assertEquals(RecoveryKind.CONNECTION, capturedKind.get(), + "Recovery callback must receive CONNECTION kind for connection errors"); + } + + /** + * After the retry budget is exhausted the error must propagate without further retries. + */ + @Test + void createRetryWithRecovery_exhaustedRetriesTerminateWithError() { + // Arrange + final int maxRetries = 2; + final AmqpRetryOptions options + = new AmqpRetryOptions().setMaxRetries(maxRetries).setDelay(Duration.ofMillis(10)); + final AtomicInteger recoveryCount = new AtomicInteger(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, kind -> recoveryCount.incrementAndGet()); + final AmqpException transientError + = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null); + + // Act & Assert — use virtual time because the second retry applies a SERVER_BUSY backoff delay. + StepVerifier.withVirtualTime(() -> Mono.error(transientError).retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectError(AmqpException.class) + .verify(Duration.ofSeconds(5)); + + // Recovery called on each retry attempt (not the final one which terminates) + assertEquals(maxRetries, recoveryCount.get(), "Recovery callback called once per non-terminal retry"); + } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java index 99dc2b8ddb69..1a639f39ff73 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSenderAsyncClient.java @@ -897,26 +897,24 @@ private Mono sendFluxInternal(Flux messages, new IllegalStateException(String.format(INVALID_OPERATION_DISPOSED_SENDER, "sendMessage"))); } - final Mono> batchList = getSendLink("send-batches").flatMap(link -> { - lastSendLink.set(link); - return link.getLinkSize().flatMap(size -> { + // Apply retry+recovery only to link acquisition. Keeping messages.collect() outside the + // retry boundary avoids re-subscribing the user-provided Flux on each retry attempt, + // which could duplicate side-effects or re-consume a hot publisher. + final Mono linkWithRecovery + = RetryUtil.withRetryAndRecovery(getSendLink("send-batches").doOnNext(link -> lastSendLink.set(link)), + retryOptions, "Failed to acquire send link for batch collection." + entityId(), + recoveryKind -> performRecovery(recoveryKind, "sendFlux-link")); + + final Mono> batchListMono + = linkWithRecovery.flatMap(link -> link.getLinkSize().flatMap(size -> { final int batchSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; final CreateMessageBatchOptions batchOptions = new CreateMessageBatchOptions().setMaximumSizeInBytes(batchSize); return messages.collect( new AmqpMessageCollector(isV2, batchOptions, 1, link::getErrorContext, tracer, messageSerializer)); - }); - }); - - // Wrap only the link-acquisition+batch-collection phase with retry+recovery. - // Individual batch sends are already retried inside sendBatchInternal, so wrapping the - // entire sendOperation (which includes messages.collect()) would cause the user-provided - // Flux to be re-subscribed on each retry and could duplicate batches that already sent. - final Mono> batchListWithRecovery = RetryUtil.withRetryAndRecovery(batchList, - retryOptions, "Failed to acquire send link for batch collection." + entityId(), - recoveryKind -> performRecovery(recoveryKind, "sendFlux-link")); + })); - final Mono sendOperation = batchListWithRecovery.flatMap(list -> Flux.fromIterable(list) + final Mono sendOperation = batchListMono.flatMap(list -> Flux.fromIterable(list) .flatMap(batch -> sendBatchInternal(batch, transactionContext)) .then() .doOnError(error -> logger.error("Error sending batch.", error))); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java index 147524ab1c7b..d3a715e9976b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java @@ -144,6 +144,7 @@ private Mono acquireIntern(String sessionId) { .addKeyValue(ENTITY_PATH_KEY, entityPath) .log("Connection-level error acquiring session, forcing connection recovery.", t); connectionCacheWrapper.forceCloseConnection(); + return Mono.delay(Duration.ZERO); } if (isTimeoutError(t)) { logger.atVerbose() diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 0a111a305795..0335fbf6b9e9 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -304,8 +304,8 @@ Mono getActiveLink() { } else if (failure instanceof AmqpException && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) { return Mono.delay(Duration.ZERO); - } else if (kind == RecoveryKind.LINK) { - // Link-level error — retry to get a fresh link. + } else if (kind == RecoveryKind.LINK || kind == RecoveryKind.CONNECTION) { + // Link or connection-level error — retry to acquire a fresh link (or connection). return Mono.delay(Duration.ZERO); } else { final long id = System.nanoTime(); From 79c52f9ccf7cbe60f9dbc4f777577fcf51f32c8f Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 13:46:10 -0700 Subject: [PATCH 05/17] fix(review): address third Copilot review on tiered recovery - T8: RetryUtil - clamp final jittered delay to maxDelay so retryOptions are consistently respected. Previously jitter was applied after the pre-jitter cap which could produce a delay exceeding retryOptions.getMaxDelay(). Also cap baseDelay to maxDelay in FIXED mode (FIXED previously used baseDelay without checking against maxDelay, unlike the EXPONENTIAL path). - T9: ServiceBusReceiverAsyncClient - fix misleading log message in the LINK/ CONNECTION recovery callback. The error handler on connectionProcessor.subscribe fires only when obtaining the connection fails (not when removeSession fails, since it returns a boolean). Renamed to "Error obtaining connection during {} recovery." Also log the boolean result of removeSession at VERBOSE level to confirm whether a stale session was actually evicted. --- .../azure/core/amqp/implementation/RetryUtil.java | 7 +++++-- .../servicebus/ServiceBusReceiverAsyncClient.java | 13 ++++++++++--- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index a590fe3c034c..9476ea51882f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -221,13 +221,16 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer 0 ? maxDelay : baseDelay; } else { long millis = baseDelay.toMillis() * (1L << Math.min(attempt, 30)); delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis())); } final double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * JITTER_FACTOR; - final Duration jitteredDelay = Duration.ofMillis((long) (delay.toMillis() * jitter)); + // Clamp the final jittered delay to maxDelay so retryOptions are consistently respected. + final Duration jitteredDelay + = Duration.ofMillis(Math.min((long) (delay.toMillis() * jitter), maxDelay.toMillis())); return Mono.delay(jitteredDelay).thenReturn(attempt); })); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 8096f73c6c6d..21d0753618d2 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -1749,10 +1749,17 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() { // For LINK errors during link creation, the session hosting the link may be stale. // Ask the connection to remove it so the next retry creates a fresh session + link. // The entityPath is the session name used by createReceiveLink(). - connectionProcessor.subscribe(connection -> connection.removeSession(entityPath), - error -> LOGGER.atWarning() + // Note: the error handler fires only if obtaining the connection fails, not if removeSession fails + // (removeSession returns a boolean and never propagates an error into the reactive stream). + connectionProcessor.subscribe(connection -> { + final boolean removed = connection.removeSession(entityPath); + LOGGER.atVerbose() .addKeyValue(LINK_NAME_KEY, linkName) - .log("Error removing stale session during LINK recovery.", error)); + .addKeyValue("sessionRemoved", removed) + .log("Attempted stale session removal during {} recovery.", recoveryKind); + }, error -> LOGGER.atWarning() + .addKeyValue(LINK_NAME_KEY, linkName) + .log("Error obtaining connection during {} recovery.", recoveryKind, error)); } if (recoveryKind == RecoveryKind.CONNECTION) { connectionCacheWrapper.forceCloseConnection(); From 1fec9e58ed368fd2eac9a645b9c2019724049a33 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 14:11:58 -0700 Subject: [PATCH 06/17] fix(servicebus): make link/connection force-close non-blocking in recovery T10: In ServiceBusSenderAsyncClient.performRecovery(), replace link.dispose() with link.closeAsync().subscribe(...). ReactorSender.dispose() calls closeAsync().block(tryTimeout), which blocks the Reactor thread when invoked from a recovery callback on a non-blocking scheduler. T11: In ReactorConnectionCache.forceCloseConnection(), replace connection.dispose() with a non-blocking equivalent: set a new forceInvalidate AtomicBoolean flag before starting connection.closeAsync().subscribe(...). The cacheInvalidateIf predicate now checks forceInvalidate in addition to isDisposed(), ensuring the cache is invalidated synchronously (by the flag) while the close handshake completes asynchronously. ReactorConnection.dispose() has the same blocking pattern. T12: Update comment in RetryUtil.createRetryWithRecovery() to remove the misleading claim that the quick-retry path matches Go's ResetAttempts(). The Java implementation uses a didQuickRetry flag only (no attempt counter reset); subsequent retries continue standard exponential backoff from the running count. --- .../implementation/ReactorConnectionCache.java | 18 +++++++++++++----- .../core/amqp/implementation/RetryUtil.java | 4 +++- .../ServiceBusSenderAsyncClient.java | 10 ++++++++-- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index 248877d56563..94f3179e6d8f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -48,6 +49,7 @@ public final class ReactorConnectionCache implement // any dependent type; instead, the dependent type must acquire Connection only through the cache route, // i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter. private volatile T currentConnection; + private final AtomicBoolean forceInvalidate = new AtomicBoolean(false); private final State state = new State(); /** @@ -112,7 +114,7 @@ public ReactorConnectionCache(Supplier connectionSupplier, String fullyQualif sink.next(connection); } }).cacheInvalidateIf(c -> { - if (c.isDisposed()) { + if (c.isDisposed() || forceInvalidate.compareAndSet(true, false)) { withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); return true; } else { @@ -190,10 +192,16 @@ public void forceCloseConnection() { if (connection != null && !connection.isDisposed()) { withConnectionId(logger, connection.getId()) .log("Force-closing connection for recovery. Next get() will create a fresh connection."); - // Call dispose() rather than closeAsync() so that isDisposed() returns true synchronously. - // This ensures cacheInvalidateIf immediately invalidates the cached reference on the next - // get() call, rather than waiting for the async close handshake to complete. - connection.dispose(); + // Set forceInvalidate before starting async close so that cacheInvalidateIf immediately + // invalidates this connection on the next get() call, without blocking the caller + // while the AMQP close handshake completes. ReactorConnection.dispose() calls + // closeAsync().block(), which is illegal on a non-blocking Reactor thread. + forceInvalidate.set(true); + connection.closeAsync() + .subscribe(null, + error -> logger.atVerbose() + .addKeyValue(CONNECTION_ID_KEY, connection.getId()) + .log("Error during async connection force-close.", error)); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index 9476ea51882f..73175615fbc4 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -212,7 +212,9 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer logger.atVerbose() + .addKeyValue(ENTITY_PATH_KEY, entityName) + .log("Error closing stale send link during recovery.", error)); } linkName.set(null); From d4c64fd99edeefb9f328ca5035e240c40ea59929 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 14:26:53 -0700 Subject: [PATCH 07/17] fix(amqp): prevent NONE failure from consuming quick-retry flag T13: In RetryUtil.createRetryWithRecovery(), the condition if (!didQuickRetry.getAndSet(true) && (kind == LINK || kind == CONNECTION)) evaluated getAndSet(true) unconditionally before the kind check. When the first retryable failure was NONE (e.g. SERVER_BUSY_ERROR, TimeoutException), the flag was set to true even though no quick retry occurred. The subsequent first LINK/CONNECTION failure therefore skipped the quick-retry optimization. Fix: reverse the operand order so the kind check comes first. Java short-circuit evaluation now prevents getAndSet(true) from being called on NONE/FATAL failures: if ((kind == LINK || kind == CONNECTION) && !didQuickRetry.getAndSet(true)) Added RetryUtilTest.createRetryWithRecovery_noneFailureBeforeLinkPreservesQuickRetry to directly exercise this scenario. --- .../core/amqp/implementation/RetryUtil.java | 4 +- .../amqp/implementation/RetryUtilTest.java | 42 +++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index 73175615fbc4..66b80924268e 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -215,7 +215,9 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer recoveries = new ArrayList<>(); + final Retry retry = RetryUtil.createRetryWithRecovery(options, recoveries::add); + final AtomicInteger attempt = new AtomicInteger(); + final Mono source = Mono.defer(() -> { + switch (attempt.getAndIncrement()) { + case 0: + // NONE kind — server-busy; should not consume the quick-retry flag. + return Mono.error(new AmqpException(true, AmqpErrorCondition.SERVER_BUSY_ERROR, "busy", null)); + + case 1: + // LINK kind — first occurrence; flag must still be available → quick-retry fires. + return Mono.error(new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null)); + + default: + return Mono.just(99); + } + }); + + // Act & Assert — virtual time to skip the SERVER_BUSY backoff; LINK quick-retry fires without delay. + StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) + .expectSubscription() + .thenAwait(Duration.ofMinutes(1)) + .expectNext(99) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + // NONE failures do not invoke recovery; only LINK does. + assertEquals(1, recoveries.size(), "Only the LINK failure should invoke recovery"); + assertEquals(RecoveryKind.LINK, recoveries.get(0), + "Recovery callback must be called with LINK kind for the detach error"); + } } From 52d4435b62b70ce59bc1c33674facd19f0a60b8b Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 14:42:15 -0700 Subject: [PATCH 08/17] fix(amqp): classify disposed-link IllegalStateException as LINK recovery T14: RecoveryKind.classify() previously returned FATAL for all non-AMQP exceptions, including IllegalStateException thrown by ReactorSender.send() when the link is disposed: 'connectionId[%s] linkName[%s] Cannot publish message when disposed.' 'connectionId[%s] linkName[%s] Cannot publish data batch when disposed.' These exceptions signal link staleness (a race between link.closeAsync() and an in-flight send), not a permanent application error. They should trigger LINK recovery so the retry path creates a fresh link. Added explicit check before the FATAL fallthrough: if the exception is an IllegalStateException with 'disposed' in the message, return LINK. Added three tests in RecoveryKindTest: - illegalStateExceptionDisposedMessageReturnsLink - illegalStateExceptionDisposedDataBatchReturnsLink - illegalStateExceptionUnrelatedToDisposedReturnsFatal --- .../amqp/implementation/RecoveryKind.java | 12 +++++++++++ .../amqp/implementation/RecoveryKindTest.java | 20 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java index 31ccc5be35d4..ff0690f9c4f7 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -141,6 +141,18 @@ public static RecoveryKind classify(Throwable error) { return LINK; } + // IllegalStateException thrown by a disposed ReactorSender (e.g., "Cannot publish + // message when disposed." or "Cannot publish data batch when disposed."). This is + // a link-staleness signal: the link was closed (possibly by a concurrent recovery + // path) before the in-flight send could complete. LINK recovery creates a fresh + // link on the next retry. + if (error instanceof IllegalStateException) { + final String msg = error.getMessage(); + if (msg != null && msg.contains("disposed")) { + return LINK; + } + } + // Unknown non-AMQP errors — treat as fatal (don't retry application or SDK bugs). // The Go SDK defaults to CONNECTION for unknown errors, but those are AMQP-layer // errors (io.EOF, net.Error). Java's non-AMQP exceptions (e.g., AzureException, diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java index 7c36e7d61d9f..4655fc72ddb7 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -184,4 +184,24 @@ void resourceLimitExceededReturnsNone() { = new AmqpException(true, AmqpErrorCondition.RESOURCE_LIMIT_EXCEEDED, "resource limit", null); assertEquals(RecoveryKind.NONE, RecoveryKind.classify(error)); } + + @Test + void illegalStateExceptionDisposedMessageReturnsLink() { + // Matches ReactorSender.send() message: "connectionId[%s] linkName[%s] Cannot publish message when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish message when disposed."))); + } + + @Test + void illegalStateExceptionDisposedDataBatchReturnsLink() { + // Matches ReactorSender.send(List) message: "connectionId[%s] linkName[%s] Cannot publish data batch when disposed." + assertEquals(RecoveryKind.LINK, RecoveryKind.classify( + new IllegalStateException("connectionId[abc] linkName[xyz] Cannot publish data batch when disposed."))); + } + + @Test + void illegalStateExceptionUnrelatedToDisposedReturnsFatal() { + // Non-disposed IllegalStateException must remain FATAL (genuine application or SDK bug). + assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new IllegalStateException("some unexpected state"))); + } } From ce10aa262f6cd21c48ea0670f7e1383394523a84 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 15:03:36 -0700 Subject: [PATCH 09/17] fix(amqp): narrow disposed-ISE match to prevent tier misclassification T15: The 'disposed' substring check in RecoveryKind.classify() was too broad. 'Connection is disposed. Cannot get management instance.' would also match, incorrectly returning LINK instead of FATAL for connection-level errors. Narrowed to require both 'Cannot publish' AND 'disposed' in the message, which precisely matches ReactorSender's two disposal messages and nothing else. Added illegalStateExceptionConnectionDisposedReturnsFatal test to RecoveryKindTest to pin the regression. --- .../com/azure/core/amqp/implementation/RecoveryKind.java | 4 +++- .../azure/core/amqp/implementation/RecoveryKindTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java index ff0690f9c4f7..2fb64031fc1b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -146,9 +146,11 @@ public static RecoveryKind classify(Throwable error) { // a link-staleness signal: the link was closed (possibly by a concurrent recovery // path) before the in-flight send could complete. LINK recovery creates a fresh // link on the next retry. + // Match both "Cannot publish" and "disposed" to avoid misclassifying unrelated + // disposal signals (e.g., "Connection is disposed. Cannot get management instance."). if (error instanceof IllegalStateException) { final String msg = error.getMessage(); - if (msg != null && msg.contains("disposed")) { + if (msg != null && msg.contains("Cannot publish") && msg.contains("disposed")) { return LINK; } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java index 4655fc72ddb7..d123a2c8963c 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RecoveryKindTest.java @@ -204,4 +204,12 @@ void illegalStateExceptionUnrelatedToDisposedReturnsFatal() { // Non-disposed IllegalStateException must remain FATAL (genuine application or SDK bug). assertEquals(RecoveryKind.FATAL, RecoveryKind.classify(new IllegalStateException("some unexpected state"))); } + + @Test + void illegalStateExceptionConnectionDisposedReturnsFatal() { + // "Connection is disposed. Cannot get management instance." contains "disposed" but NOT + // "Cannot publish" — must remain FATAL to avoid misclassifying connection-level disposal. + assertEquals(RecoveryKind.FATAL, RecoveryKind + .classify(new IllegalStateException("Connection is disposed. Cannot get management instance."))); + } } From 5448c974d0d6fffaec64ca3713a2d49d9d01199b Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 15:43:11 -0700 Subject: [PATCH 10/17] fix(amqp): clarify virtual-time reason in retry tests T17: Three RetryUtilTest comments claimed virtual time was needed because of a 'SERVER_BUSY backoff delay'. The actual reason is that createRetryWithRecovery() unconditionally adds SERVER_BUSY_WAIT_TIME (4 s) to the base delay for every retry, regardless of error type (line 184: baseDelay = options.getDelay() + SERVER_BUSY_WAIT_TIME). Updated all three comments to reflect the actual mechanism so future maintainers do not infer incorrect retry semantics. --- .../azure/core/amqp/implementation/RetryUtilTest.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java index d7a12816e51f..06bdf33ab211 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -247,7 +247,9 @@ void createRetryWithRecovery_linkErrorInvokesRecoveryAndRetries() { return Mono.just(42); }); - // Act & Assert — use virtual time because the second retry applies a SERVER_BUSY backoff delay. + // Act & Assert — use virtual time because the base retry delay unconditionally includes + // SERVER_BUSY_WAIT_TIME (4 s), regardless of error type — the cumulative + // wait would exceed real-time test limits. StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) .expectSubscription() .thenAwait(Duration.ofMinutes(1)) @@ -297,7 +299,9 @@ void createRetryWithRecovery_exhaustedRetriesTerminateWithError() { final AmqpException transientError = new AmqpException(true, AmqpErrorCondition.LINK_DETACH_FORCED, "detach", null); - // Act & Assert — use virtual time because the second retry applies a SERVER_BUSY backoff delay. + // Act & Assert — use virtual time because the base retry delay unconditionally includes + // SERVER_BUSY_WAIT_TIME (4 s), regardless of error type — the cumulative + // wait would exceed real-time test limits. StepVerifier.withVirtualTime(() -> Mono.error(transientError).retryWhen(retry)) .expectSubscription() .thenAwait(Duration.ofMinutes(1)) @@ -336,7 +340,8 @@ void createRetryWithRecovery_noneFailureBeforeLinkPreservesQuickRetry() { } }); - // Act & Assert — virtual time to skip the SERVER_BUSY backoff; LINK quick-retry fires without delay. + // Act & Assert — virtual time because the base delay logic unconditionally adds SERVER_BUSY_WAIT_TIME + // (4 s) to every retry; the first LINK error uses the quick-retry path (no delay). StepVerifier.withVirtualTime(() -> source.retryWhen(retry)) .expectSubscription() .thenAwait(Duration.ofMinutes(1)) From 4bd93bb8dfb60f68c46b891e2cc7abcc2424cbba Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 15:59:21 -0700 Subject: [PATCH 11/17] fix(amqp): tie forceCloseConnection invalidation to specific connection id MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit T18: The shared AtomicBoolean forceInvalidate flag was not bound to a specific connection instance. Under concurrency, if the cache refreshed and created a new connection C' between forceCloseConnection() setting the flag and the next cacheInvalidateIf() check, C' would be spuriously invalidated — causing unnecessary connection churn. Replace AtomicBoolean with AtomicReference holding the connection ID to be force-invalidated. cacheInvalidateIf() now only consumes the flag when the cached connection's ID exactly matches the stored target; a fresh connection with a different ID passes through the cache unaffected. --- .../ReactorConnectionCache.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index 94f3179e6d8f..a7affe9284cb 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -18,7 +18,6 @@ import java.util.Objects; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -49,7 +48,10 @@ public final class ReactorConnectionCache implement // any dependent type; instead, the dependent type must acquire Connection only through the cache route, // i.e., by subscribing to 'createOrGetCachedConnection' via 'get()' getter. private volatile T currentConnection; - private final AtomicBoolean forceInvalidate = new AtomicBoolean(false); + // Holds the ID of the connection that forceCloseConnection() asked to force-invalidate. + // Only the connection whose getId() matches this value will be invalidated by cacheInvalidateIf; + // a freshly created connection with a different ID is never accidentally invalidated. + private final AtomicReference forceInvalidateConnectionId = new AtomicReference<>(null); private final State state = new State(); /** @@ -114,13 +116,24 @@ public ReactorConnectionCache(Supplier connectionSupplier, String fullyQualif sink.next(connection); } }).cacheInvalidateIf(c -> { - if (c.isDisposed() || forceInvalidate.compareAndSet(true, false)) { + if (c.isDisposed()) { + // Connection disposed for any reason. Clean up the force-invalidate marker if it + // was targeting this connection so it is not accidentally consumed by a future + // connection that happens to have the same ID. + forceInvalidateConnectionId.compareAndSet(c.getId(), null); + withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); + return true; + } + final String targetId = forceInvalidateConnectionId.get(); + if (targetId != null + && targetId.equals(c.getId()) + && forceInvalidateConnectionId.compareAndSet(targetId, null)) { + // forceCloseConnection() asked to invalidate exactly this connection. withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); return true; - } else { - // Emit cached connection. - return false; } + // No forced invalidation targeted this connection — emit it from cache. + return false; }); } @@ -196,7 +209,7 @@ public void forceCloseConnection() { // invalidates this connection on the next get() call, without blocking the caller // while the AMQP close handshake completes. ReactorConnection.dispose() calls // closeAsync().block(), which is illegal on a non-blocking Reactor thread. - forceInvalidate.set(true); + forceInvalidateConnectionId.set(connection.getId()); connection.closeAsync() .subscribe(null, error -> logger.atVerbose() From 2e80396bdd2efb18c2d7a10a631726b470ef9ce8 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 16:28:19 -0700 Subject: [PATCH 12/17] fix(amqp): rename test methods to camelCase for checkstyle compliance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CI Build Analyze failed with checkstyle MethodName violations — the pattern (?=^[a-z][a-zA-Z0-9]*$) forbids underscores. Renamed all 5 createRetryWithRecovery_* test methods in RetryUtilTest to camelCase. --- .../azure/core/amqp/implementation/RetryUtilTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java index 06bdf33ab211..fd48b0ecfddc 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -214,7 +214,7 @@ void retryFilter(Throwable throwable, boolean expected) { * FATAL errors must not be retried and must not invoke the recovery callback. */ @Test - void createRetryWithRecovery_fatalErrorTerminatesImmediately() { + void createRetryWithRecoveryFatalErrorTerminatesImmediately() { // Arrange final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); final AtomicInteger recoveryCount = new AtomicInteger(); @@ -234,7 +234,7 @@ void createRetryWithRecovery_fatalErrorTerminatesImmediately() { * the quick-retry path (no backoff delay). */ @Test - void createRetryWithRecovery_linkErrorInvokesRecoveryAndRetries() { + void createRetryWithRecoveryLinkErrorInvokesRecoveryAndRetries() { // Arrange final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); final List recoveries = new ArrayList<>(); @@ -265,7 +265,7 @@ void createRetryWithRecovery_linkErrorInvokesRecoveryAndRetries() { * CONNECTION errors must invoke the recovery callback with CONNECTION kind. */ @Test - void createRetryWithRecovery_connectionErrorInvokesRecovery() { + void createRetryWithRecoveryConnectionErrorInvokesRecovery() { // Arrange final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(2).setDelay(Duration.ofMillis(100)); final AtomicReference capturedKind = new AtomicReference<>(); @@ -289,7 +289,7 @@ void createRetryWithRecovery_connectionErrorInvokesRecovery() { * After the retry budget is exhausted the error must propagate without further retries. */ @Test - void createRetryWithRecovery_exhaustedRetriesTerminateWithError() { + void createRetryWithRecoveryExhaustedRetriesTerminateWithError() { // Arrange final int maxRetries = 2; final AmqpRetryOptions options @@ -319,7 +319,7 @@ void createRetryWithRecovery_exhaustedRetriesTerminateWithError() { * unconditionally; this test verifies the kind check comes first. */ @Test - void createRetryWithRecovery_noneFailureBeforeLinkPreservesQuickRetry() { + void createRetryWithRecoveryNoneFailureBeforeLinkPreservesQuickRetry() { // Arrange final AmqpRetryOptions options = new AmqpRetryOptions().setMaxRetries(3).setDelay(Duration.ofMillis(100)); final List recoveries = new ArrayList<>(); From 1860dd38b6e009f08a37399871096d5cb66f3cf8 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Wed, 18 Mar 2026 16:40:14 -0700 Subject: [PATCH 13/17] fix(amqp): use distinct log message for force-invalidation path The force-invalidation path in cacheInvalidateIf used the same log message as the disposed-connection path ('The connection is closed, requesting a new connection.'), which was misleading because the connection may not yet be disposed when a force-close is requested. Changed the force path message to 'Forcing connection close, requesting a new connection.' --- .../azure/core/amqp/implementation/ReactorConnectionCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java index a7affe9284cb..20aa2f90c7aa 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnectionCache.java @@ -129,7 +129,7 @@ public ReactorConnectionCache(Supplier connectionSupplier, String fullyQualif && targetId.equals(c.getId()) && forceInvalidateConnectionId.compareAndSet(targetId, null)) { // forceCloseConnection() asked to invalidate exactly this connection. - withConnectionId(logger, c.getId()).log("The connection is closed, requesting a new connection."); + withConnectionId(logger, c.getId()).log("Forcing connection close, requesting a new connection."); return true; } // No forced invalidation targeted this connection — emit it from cache. From bf2b5605b8265be9cbab99ade4e06a2bd9c9806c Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 19 Mar 2026 09:49:37 -0700 Subject: [PATCH 14/17] fix(amqp): guard backoff overflow and normalize RecoveryKind message matching --- .../com/azure/core/amqp/implementation/RecoveryKind.java | 8 ++++++-- .../com/azure/core/amqp/implementation/RetryUtil.java | 7 ++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java index 2fb64031fc1b..df3dc4c1d651 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RecoveryKind.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpException; +import java.util.Locale; import java.util.concurrent.TimeoutException; /** @@ -150,8 +151,11 @@ public static RecoveryKind classify(Throwable error) { // disposal signals (e.g., "Connection is disposed. Cannot get management instance."). if (error instanceof IllegalStateException) { final String msg = error.getMessage(); - if (msg != null && msg.contains("Cannot publish") && msg.contains("disposed")) { - return LINK; + if (msg != null) { + final String normalizedMsg = msg.toLowerCase(Locale.ROOT); + if (normalizedMsg.contains("cannot publish") && normalizedMsg.contains("disposed")) { + return LINK; + } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index 66b80924268e..40c33f030063 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -228,7 +228,12 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer 0 ? maxDelay : baseDelay; } else { - long millis = baseDelay.toMillis() * (1L << Math.min(attempt, 30)); + final long multiplier = 1L << Math.min(attempt, 30); + final long baseMillis = baseDelay.toMillis(); + // Guard against overflow: if baseMillis * multiplier would exceed Long.MAX_VALUE, + // saturate to maxDelay (the clamp below would cap it there anyway). + final long millis + = baseMillis > Long.MAX_VALUE / multiplier ? maxDelay.toMillis() : baseMillis * multiplier; delay = Duration.ofMillis(Math.min(millis, maxDelay.toMillis())); } final double jitter = 1.0 + (ThreadLocalRandom.current().nextDouble() * 2 - 1) * JITTER_FACTOR; From e76c2d831284b4fa562d4e47e86dd62c22ae177d Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 19 Mar 2026 10:37:10 -0700 Subject: [PATCH 15/17] fix(servicebus): scope send-link recovery to per-operation reference, guard session disposal, improve recovery log context --- .../core/amqp/implementation/RetryUtil.java | 2 +- .../ServiceBusSenderAsyncClient.java | 24 ++++++++++--------- .../servicebus/ServiceBusSessionManager.java | 10 ++++---- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java index 40c33f030063..d71ea5b064a6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -206,7 +206,7 @@ static Retry createRetryWithRecovery(AmqpRetryOptions options, Consumer linkName = new AtomicReference<>(); - private final AtomicReference lastSendLink = new AtomicReference<>(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final MessageSerializer messageSerializer; private final AmqpRetryOptions retryOptions; @@ -817,7 +816,6 @@ private Mono scheduleMessageInternal(ServiceBusMessage message, OffsetDate return tracer .traceScheduleMono("ServiceBus.scheduleMessage", getSendLinkWithRetry("schedule-message").flatMap(link -> { - lastSendLink.set(link); return link.getLinkSize().flatMap(size -> { final int maxSize = size > 0 ? size : MAX_MESSAGE_LENGTH_BYTES; return connectionProcessor @@ -869,8 +867,9 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, messages.add(message); }); + final AtomicReference operationLink = new AtomicReference<>(); final Mono sendMessage = getSendLink("send-batch").flatMap(link -> { - lastSendLink.set(link); + operationLink.set(link); if (transactionContext != null && transactionContext.getTransactionId() != null) { final TransactionalState deliveryState = new TransactionalState(); deliveryState.setTxnId(Binary.create(transactionContext.getTransactionId())); @@ -885,7 +884,7 @@ private Mono sendBatchInternal(ServiceBusMessageBatch batch, final String timeoutMessage = "Sending messages timed out. message-count:" + batch.getCount() + entityId(); final Mono withRetry = RetryUtil.withRetryAndRecovery(sendMessage, retryOptions, timeoutMessage, recoveryKind -> { - performRecovery(recoveryKind, "sendBatch"); + performRecovery(recoveryKind, "sendBatch", operationLink); }).onErrorMap(this::mapError); return instrumentation.instrumentSendBatch("ServiceBus.send", withRetry, batch.getMessages()); } @@ -900,10 +899,11 @@ private Mono sendFluxInternal(Flux messages, // Apply retry+recovery only to link acquisition. Keeping messages.collect() outside the // retry boundary avoids re-subscribing the user-provided Flux on each retry attempt, // which could duplicate side-effects or re-consume a hot publisher. + final AtomicReference operationLink = new AtomicReference<>(); final Mono linkWithRecovery - = RetryUtil.withRetryAndRecovery(getSendLink("send-batches").doOnNext(link -> lastSendLink.set(link)), - retryOptions, "Failed to acquire send link for batch collection." + entityId(), - recoveryKind -> performRecovery(recoveryKind, "sendFlux-link")); + = RetryUtil.withRetryAndRecovery(getSendLink("send-batches").doOnNext(operationLink::set), retryOptions, + "Failed to acquire send link for batch collection." + entityId(), + recoveryKind -> performRecovery(recoveryKind, "sendFlux-link", operationLink)); final Mono> batchListMono = linkWithRecovery.flatMap(link -> link.getLinkSize().flatMap(size -> { @@ -952,7 +952,7 @@ private Mono getSendLink(String callSite) { private Mono getSendLinkWithRetry(String callSite) { return RetryUtil.withRetryAndRecovery(getSendLink(callSite), retryOptions, String.format(retryGetLinkErrorMessageFormat, callSite), recoveryKind -> { - performRecovery(recoveryKind, "getSendLink-" + callSite); + performRecovery(recoveryKind, "getSendLink-" + callSite, null); }); } @@ -972,7 +972,7 @@ private Throwable mapError(Throwable throwable) { *

This matches the Go SDK's RecoverIfNeeded() and the .NET SDK's * FaultTolerantAmqpObject pattern.

*/ - private void performRecovery(RecoveryKind recoveryKind, String callSite) { + private void performRecovery(RecoveryKind recoveryKind, String callSite, AtomicReference linkRef) { if (recoveryKind == RecoveryKind.NONE || recoveryKind == RecoveryKind.FATAL) { return; } @@ -983,10 +983,12 @@ private void performRecovery(RecoveryKind recoveryKind, String callSite) { .addKeyValue("callSite", callSite) .log("Performing {} recovery before retry.", recoveryKind); - // Start async close of the cached send link so the next retry creates a fresh one. + // Start async close of the operation-scoped send link so the next retry creates a fresh one. + // Using a per-operation AtomicReference (not a class-level field) prevents concurrent send + // operations from accidentally closing each other's links. // Use closeAsync() rather than dispose() to avoid blocking the Reactor thread; ReactorSender // dispose() calls closeAsync().block(tryTimeout), which is illegal on a non-blocking scheduler. - final AmqpSendLink link = lastSendLink.getAndSet(null); + final AmqpSendLink link = linkRef != null ? linkRef.getAndSet(null) : null; if (link != null) { link.closeAsync() .subscribe(null, diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 0335fbf6b9e9..5ac863ebb57d 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -289,6 +289,11 @@ Mono getActiveLink() { .addKeyValue("recoveryKind", kind) .log("Error occurred while getting unnamed session.", failure); + if (isDisposed.get()) { + return Mono.error( + new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())); + } + if (kind == RecoveryKind.CONNECTION) { LOGGER.atWarning() .addKeyValue(ENTITY_PATH_KEY, entityPath) @@ -296,10 +301,7 @@ Mono getActiveLink() { connectionCacheWrapper.forceCloseConnection(); } - if (isDisposed.get()) { - return Mono.error( - new AmqpException(false, "SessionManager is already disposed.", failure, getErrorContext())); - } else if (failure instanceof TimeoutException) { + if (failure instanceof TimeoutException) { return Mono.delay(Duration.ZERO); } else if (failure instanceof AmqpException && ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) { From 3eea3b2f9b6c73e560e144d91343a3cfca197cbb Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 19 Mar 2026 12:09:31 -0700 Subject: [PATCH 16/17] fix(servicebus): include exception in connection-recovery warning log --- .../azure/messaging/servicebus/ServiceBusSessionManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java index 5ac863ebb57d..59971f46fc80 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionManager.java @@ -297,7 +297,7 @@ Mono getActiveLink() { if (kind == RecoveryKind.CONNECTION) { LOGGER.atWarning() .addKeyValue(ENTITY_PATH_KEY, entityPath) - .log("Connection-level error in session manager, forcing connection recovery."); + .log("Connection-level error in session manager, forcing connection recovery.", failure); connectionCacheWrapper.forceCloseConnection(); } From 1f9bf9231f57c33ddb42cf7dba37c1902070c9b9 Mon Sep 17 00:00:00 2001 From: "Eldert Grootenboer (from Dev Box)" Date: Thu, 19 Mar 2026 13:51:54 -0700 Subject: [PATCH 17/17] fix(ci): move azure-core-amqp source comment to correct entry --- sdk/servicebus/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/ci.yml b/sdk/servicebus/ci.yml index b957c80a24d4..49564ad549aa 100644 --- a/sdk/servicebus/ci.yml +++ b/sdk/servicebus/ci.yml @@ -61,6 +61,6 @@ extends: # required by the above perf libraries - name: perf-test-core groupId: com.azure - # Build azure-core-amqp from source (needed for RecoveryKind, tiered retry) - name: azure-core-amqp groupId: com.azure + # Build azure-core-amqp from source (needed for RecoveryKind, tiered retry)