From af791a0665b28cb6389caca1a895c9ac9d00d4b7 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Wed, 3 Jun 2026 20:25:31 -0300 Subject: [PATCH 1/2] feat: add decorator metrics Signed-off-by: Marcos Tischer Vallim --- .../concurrent/BlockingSubmissionPolicy.java | 3 +- .../concurrent/RingBufferBlockingQueue.java | 9 + .../lib/concurrent/ThreadFactoryProvider.java | 4 +- .../lib/core/AbstractAmazonSnsConsumer.java | 36 +- .../lib/core/AbstractAmazonSnsProducer.java | 7 +- .../lib/core/AbstractAmazonSnsTemplate.java | 9 +- .../messaging/lib/core/AmazonSnsConsumer.java | 52 ++ .../messaging/lib/core/AmazonSnsProducer.java | 45 ++ .../lib/core/RequestEntryInternalFactory.java | 7 + ...ractAmazonSnsConsumerMetricsDecorator.java | 165 ++++++ .../BlockingQueueMetricsDecorator.java | 22 +- .../ExecutorServiceMetricsDecorator.java | 16 +- .../lib/model/PublishRequestBuilder.java | 3 + .../core/AbstractAmazonSnsConsumerTest.java | 6 +- .../core/AbstractAmazonSnsTemplateTest.java | 4 +- ...nsumer.java => AmazonSnsConsumerImpl.java} | 16 +- ...oducer.java => AmazonSnsProducerImpl.java} | 4 +- .../messaging/lib/core/AmazonSnsTemplate.java | 23 +- .../AmazonSnsConsumerMetricsDecorator.java | 95 ++++ ...AmazonSnsConsumerMetricsDecoratorTest.java | 462 +++++++++++++++++ ...nsumer.java => AmazonSnsConsumerImpl.java} | 16 +- ...oducer.java => AmazonSnsProducerImpl.java} | 4 +- .../messaging/lib/core/AmazonSnsTemplate.java | 23 +- .../AmazonSnsConsumerMetricsDecorator.java | 95 ++++ ...AmazonSnsConsumerMetricsDecoratorTest.java | 478 ++++++++++++++++++ 25 files changed, 1520 insertions(+), 84 deletions(-) create mode 100644 amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java create mode 100644 amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java create mode 100644 amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java rename amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/{AmazonSnsConsumer.java => AmazonSnsConsumerImpl.java} (90%) rename amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/{AmazonSnsProducer.java => AmazonSnsProducerImpl.java} (91%) create mode 100644 amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java create mode 100644 amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java rename amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/{AmazonSnsConsumer.java => AmazonSnsConsumerImpl.java} (90%) rename amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/{AmazonSnsProducer.java => AmazonSnsProducerImpl.java} (91%) create mode 100644 amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java create mode 100644 amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java index b8a9b03..86af6ba 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicy.java @@ -30,7 +30,8 @@ * is thrown. */ public class BlockingSubmissionPolicy implements RejectedExecutionHandler { - + + /** The maximum time to wait for queue insertion, in milliseconds. */ private final long timeout; /** diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java index 07f2984..fe1f2de 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java @@ -42,22 +42,31 @@ */ public class RingBufferBlockingQueue extends AbstractQueue implements BlockingQueue { + /** Default capacity when no explicit capacity is provided. */ private static final int DEFAULT_CAPACITY = 2048; + /** The ring buffer array holding queue entries. */ private final AtomicReferenceArray> buffer; + /** The fixed maximum number of elements the queue can hold. */ private final int capacity; + /** Sequence number tracking the next write position (starts at -1 indicating no writes). */ private final AtomicLong writeSequence = new AtomicLong(-1); + /** Sequence number tracking the next read position. */ private final AtomicLong readSequence = new AtomicLong(0); + /** Current number of elements in the queue. */ private final AtomicInteger size = new AtomicInteger(0); + /** Fair reentrant lock for coordinating producer/consumer access. */ private final ReentrantLock reentrantLock; + /** Condition for consumers waiting when the queue is empty. */ private final Condition waitingConsumer; + /** Condition for producers waiting when the queue is full. */ private final Condition waitingProducer; /** diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java index 669cb7c..32ceec3 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ThreadFactoryProvider.java @@ -35,8 +35,10 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class ThreadFactoryProvider { + /** Class logger. */ private static final Logger LOGGER = LoggerFactory.getLogger(ThreadFactoryProvider.class); - + + /** Cached supplier of the appropriate thread factory for the runtime Java version. */ private static Supplier supplierThreadFactory; static { diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index 58fc259..924f3f7 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -63,7 +63,7 @@ * @param the publish batch result type * @param the request entry payload type */ -abstract class AbstractAmazonSnsConsumer implements Runnable { +abstract class AbstractAmazonSnsConsumer implements Runnable, AmazonSnsConsumer { /** * Kilobyte constant used for size calculations. @@ -75,22 +75,31 @@ abstract class AbstractAmazonSnsConsumer implements Runnable { */ private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB; + /** Class logger. */ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class); + /** Single-thread scheduler that periodically triggers batch draining. */ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory()); + /** The Amazon SNS client used for publishing batches. */ protected final C amazonSnsClient; + /** The topic configuration properties. */ private final TopicProperty topicProperty; + /** Factory for creating internal request entry representations. */ private final RequestEntryInternalFactory requestEntryInternalFactory; + /** Shared map of pending requests keyed by request ID for async completion. */ protected final ConcurrentMap> pendingRequests; + /** The blocking queue that buffers incoming topic requests. */ private final BlockingQueue> topicRequests; + /** Optional decorator applied to the publish batch request before sending. */ private final UnaryOperator publishDecorator; + /** Executor service for asynchronous (non-FIFO) publishing. */ private final ExecutorService executorService; /** @@ -124,29 +133,6 @@ protected AbstractAmazonSnsConsumer( scheduledExecutorService.scheduleAtFixedRate(this, 0, topicProperty.getLinger(), TimeUnit.MILLISECONDS); } - /** - * Publishes a batch request to Amazon SNS. - * - * @param publishBatchRequest the batch request to publish - * @return the publish result - */ - protected abstract O publish(final R publishBatchRequest); - - /** - * Handles an error that occurred during publishing. - * - * @param publishBatchRequest the batch request that failed - * @param throwable the exception that was thrown - */ - protected abstract void handleError(final R publishBatchRequest, final Throwable throwable); - - /** - * Handles the response from a successful publish call. - * - * @param publishBatchResult the result of the publish operation - */ - protected abstract void handleResponse(final O publishBatchResult); - /** * Returns a factory function that creates a publish batch request from a topic ARN * and a list of internal request entries. @@ -206,6 +192,7 @@ public void run() { * Shuts down the consumer, waiting up to 60 seconds for both the scheduled and * worker executor services to terminate. */ + @Override @SneakyThrows public void shutdown() { await().thenRun(() -> { @@ -340,6 +327,7 @@ private Optional createBatch(final BlockingQueue> requests) { * * @return a future that completes when all requests are drained */ + @Override @SneakyThrows public CompletableFuture await() { return CompletableFuture.runAsync(() -> { diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java index e3eb708..af6685a 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java @@ -37,12 +37,15 @@ * @param the request entry payload type */ @RequiredArgsConstructor(access = AccessLevel.PROTECTED) -abstract class AbstractAmazonSnsProducer { +abstract class AbstractAmazonSnsProducer implements AmazonSnsProducer { + /** The producer lifecycle state, initially {@link State#RUNNIG}. */ private final AtomicReference state = new AtomicReference<>(State.RUNNIG); + /** Map of pending requests keyed by request ID for asynchronous completion. */ private final ConcurrentMap> pendingRequests; + /** The blocking queue for buffering requests before batch processing. */ private final BlockingQueue> topicRequests; /** @@ -51,6 +54,7 @@ abstract class AbstractAmazonSnsProducer { * @param requestEntry the request to enqueue * @return a {@link ListenableFuture} that tracks the completion of this request */ + @Override @SneakyThrows public ListenableFuture send(final RequestEntry requestEntry) { if (State.RUNNIG.equals(state.get())) { @@ -74,6 +78,7 @@ public ListenableFuture send(final Requ * Transitions the producer to the shutdown state. No further messages will be * accepted once shutdown. */ + @Override public void shutdown() { state.compareAndSet(State.RUNNIG, State.SHUTDOWN); } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java index 075a18d..c159664 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java @@ -47,17 +47,16 @@ * sending messages, shutting down, and awaiting completion. Delegates to a producer * and consumer for actual processing. * - * @param the Amazon SNS client type * @param the publish batch request type * @param the publish batch result type * @param the request entry payload type */ @RequiredArgsConstructor(access = AccessLevel.PROTECTED) -abstract class AbstractAmazonSnsTemplate { +abstract class AbstractAmazonSnsTemplate { - private final AbstractAmazonSnsProducer amazonSnsProducer; + private final AmazonSnsProducer amazonSnsProducer; - private final AbstractAmazonSnsConsumer amazonSnsConsumer; + private final AmazonSnsConsumer amazonSnsConsumer; /** * Sends a request entry to the SNS topic asynchronously. @@ -108,7 +107,7 @@ protected static ExecutorService getExecutorService(final TopicProperty topicPro } @Getter - public static final class Builder> { + public static final class Builder> { /** * The Amazon SNS client used for publishing. diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java new file mode 100644 index 0000000..14776d5 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -0,0 +1,52 @@ +package com.amazon.sns.messaging.lib.core; + +import java.util.concurrent.CompletableFuture; + +/** + * Consumer interface for Amazon SNS messaging. Implementations handle batch publishing + * of requests and dispatching of responses or errors to pending request futures. + * + * @param the publish batch request type + * @param the publish batch result type + */ +public interface AmazonSnsConsumer { + + /** + * Publishes a batch request to Amazon SNS. + * + * @param publishBatchRequest the batch request to publish + * @return the publish result + */ + public abstract O publish(final R publishBatchRequest); + + /** + * Handles an error that occurred during publishing. + * + * @param publishBatchRequest the batch request that failed + * @param throwable the exception that was thrown + */ + public void handleError(final R publishBatchRequest, final Throwable throwable); + + /** + * Handles the response from a successful publish call. + * + * @param publishBatchResult the result of the publish operation + */ + public void handleResponse(final O publishBatchResult); + + /** + * Shuts down the consumer, waiting up to 60 seconds for both the scheduled and + * worker executor services to terminate. + */ + public void shutdown(); + + /** + * Returns a {@link CompletableFuture} that completes once all pending requests + * have been processed (i.e., both the pending requests map and the topic + * requests queue are empty). + * + * @return a future that completes when all requests are drained + */ + public CompletableFuture await(); + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java new file mode 100644 index 0000000..91f9b14 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.core; + +import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; + +/** + * Producer interface for Amazon SNS messaging. Implementations enqueue request entries + * for batch publishing and track pending requests for asynchronous completion. + * + * @param the request entry payload type + */ +public interface AmazonSnsProducer { + + /** + * Sends a request entry for asynchronous publishing to an SNS topic. + * + * @param requestEntry the request entry containing the message payload and metadata + * @return a {@link ListenableFuture} that completes when the request is processed + */ + public ListenableFuture send(final RequestEntry requestEntry); + + /** + * Shuts down the producer, preventing any further messages from being accepted. + */ + public void shutdown(); + +} +// @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java index ec3dc90..95f1d59 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java @@ -116,19 +116,26 @@ public Integer messageAttributesSize(final RequestEntry requestEntry) { @Builder(setterPrefix = "with") static class RequestEntryInternal { + /** The creation timestamp in nanoseconds. */ private final long createTime; + /** The unique identifier of the request. */ private final String id; + /** The serialized payload as a byte buffer. */ @Getter(value = AccessLevel.PRIVATE) private final ByteBuffer value; + /** Optional message attributes / headers. */ private final Map messageHeaders; + /** Optional subject line for the message. */ private final String subject; + /** The message group ID for FIFO topics. */ private final String groupId; + /** The message deduplication ID for FIFO topics. */ private final String deduplicationId; /** diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java new file mode 100644 index 0000000..e31034d --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/AbstractAmazonSnsConsumerMetricsDecorator.java @@ -0,0 +1,165 @@ +package com.amazon.sns.messaging.lib.metrics; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; +import com.amazon.sns.messaging.lib.model.TopicProperty; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tags; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; + +// @formatter:off +/** + * Abstract base class for decorating an {@link AmazonSnsConsumer} with Micrometer metrics. + * Tracks publish attempts, successes, failures, latency, batch size, and inflight counts. + * + * @param the publish batch request type + * @param the publish batch result type + */ +abstract class AbstractAmazonSnsConsumerMetricsDecorator implements AmazonSnsConsumer { + + /** Base tag prefix for all SNS metrics. */ + private static final String TAG_SNS = "sns"; + + /** Metric name for total publish attempts. */ + protected static final String METRIC_PUBLISH_ATTEMPTS = TAG_SNS.concat(".publish.attempts"); + + /** Metric name for successful publish operations. */ + protected static final String METRIC_PUBLISH_SUCCESS = TAG_SNS.concat(".publish.success"); + + /** Metric name for failed publish operations. */ + protected static final String METRIC_PUBLISH_FAILURE = TAG_SNS.concat(".publish.failure"); + + /** Metric name for publish latency duration. */ + protected static final String METRIC_PUBLISH_DURATION = TAG_SNS.concat(".publish.duration"); + + /** Metric name for publish batch size distribution. */ + protected static final String METRIC_PUBLISH_BATCH_SIZE = TAG_SNS.concat(".publish.batch.size"); + + /** Metric name for inflight publish count. */ + protected static final String METRIC_PUBLISH_INFLIGHT = TAG_SNS.concat(".publish.inflight"); + + /** Tag key for error code dimension. */ + protected static final String TAG_ERROR_CODE = "error_code"; + + /** Tag key for error type dimension. */ + protected static final String TAG_ERROR_TYPE = "error_type"; + + /** Error type value for Amazon service exceptions. */ + protected static final String ERROR_TYPE_AMAZON = "amazon_service_exception"; + + /** Error type value for unknown exceptions. */ + protected static final String ERROR_TYPE_OTHER = "unknown"; + + /** The composite Micrometer meter registry. */ + protected final MeterRegistry registry; + + /** Metric tags identifying the SNS topic. */ + protected final Tags tags; + + /** Counter for total publish attempts. */ + protected final Counter publishAttemptsCounter; + + /** Counter for successful publishes. */ + protected final Counter successCounter; + + /** Timer for publish latency. */ + protected final Timer publishTimer; + + /** Distribution summary for batch sizes. */ + protected final DistributionSummary batchSizeSummary; + + /** Atomic gauge tracking the number of inflight publish operations. */ + protected final AtomicInteger inflightGauge = new AtomicInteger(); + + /** The decorated {@link AmazonSnsConsumer} delegate. */ + protected final AmazonSnsConsumer delegate; + + /** + * Creates a new metrics decorator. + * + * @param delegate the consumer to decorate + * @param topicProperty the topic configuration (used for topic tags) + * @param meterRegistry the Micrometer meter registry (may be null) + */ + AbstractAmazonSnsConsumerMetricsDecorator( + final AmazonSnsConsumer delegate, + final TopicProperty topicProperty, + final MeterRegistry meterRegistry) { + + this.delegate = delegate; + + final CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry(); + + Optional.ofNullable(meterRegistry).ifPresent(compositeMeterRegistry::add); + + registry = compositeMeterRegistry; + + tags = Tags.of("topic", topicProperty.getTopicArn()); + + publishAttemptsCounter = Counter.builder(METRIC_PUBLISH_ATTEMPTS) + .tags(tags) + .description("Total number of SNS PublishBatch calls attempted") + .register(compositeMeterRegistry); + + successCounter = Counter.builder(METRIC_PUBLISH_SUCCESS) + .tags(tags) + .description("Individual SNS messages acknowledged as successful") + .register(compositeMeterRegistry); + + publishTimer = Timer.builder(METRIC_PUBLISH_DURATION) + .tags(tags) + .description("End-to-end latency of SNS PublishBatch calls") + .publishPercentiles(0.5, 0.95, 0.99) + .register(compositeMeterRegistry); + + batchSizeSummary = DistributionSummary.builder(METRIC_PUBLISH_BATCH_SIZE) + .tags(tags) + .description("Number of entries per SNS PublishBatch request") + .register(compositeMeterRegistry); + + Gauge.builder(METRIC_PUBLISH_INFLIGHT, inflightGauge, AtomicInteger::get) + .tags(tags) + .description("PublishBatches currently in progress") + .register(compositeMeterRegistry); + } + + /** + * Returns (creating if necessary) a failure counter tagged with the given error code and type. + * + * @param errorCode the error code for the failure tag + * @param errorType the error type for the failure tag + * @return the failure counter + */ + protected Counter failureCounter(final String errorCode, final String errorType) { + return Counter.builder(METRIC_PUBLISH_FAILURE) + .description("Individual SNS messages that failed to be published") + .tags(tags.and(TAG_ERROR_CODE, errorCode) + .and(TAG_ERROR_TYPE, errorType)) + .register(registry); + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + delegate.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture await() { + return delegate.await(); + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java index ab1a9a6..a948a69 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java @@ -1,10 +1,7 @@ package com.amazon.sns.messaging.lib.metrics; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -12,7 +9,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; @@ -25,34 +22,49 @@ */ public class BlockingQueueMetricsDecorator implements BlockingQueue { + /** Base tag prefix for blocking queue metrics. */ private static final String TAG_QUEUE = "blocking.queue"; + /** Metric name for total put operations. */ private static final String METRIC_PUTS_TOTAL = TAG_QUEUE.concat(".puts.total"); + /** Metric name for failed put operations. */ private static final String METRIC_PUTS_FAILED = TAG_QUEUE.concat(".puts.failed"); + /** Metric name for put operation latency. */ private static final String METRIC_PUT_DURATION = TAG_QUEUE.concat(".put.duration"); + /** Metric name for total take operations. */ private static final String METRIC_TAKES_TOTAL = TAG_QUEUE.concat(".takes.total"); + /** Metric name for failed take operations. */ private static final String METRIC_TAKES_FAILED = TAG_QUEUE.concat(".takes.failed"); + /** Metric name for take operation latency. */ private static final String METRIC_TAKE_DURATION = TAG_QUEUE.concat(".take.duration"); + /** Metric name for queue size gauge. */ private static final String METRIC_SIZE = TAG_QUEUE.concat(".size"); + /** The decorated blocking queue. */ private final BlockingQueue delegate; + /** Counter for successful put operations. */ private final Counter putsTotal; + /** Counter for put operations that threw an exception. */ private final Counter putsFailed; + /** Timer for put operation latency. */ private final Timer putDuration; + /** Counter for successful take operations. */ private final Counter takesTotal; + /** Counter for take operations that threw an exception. */ private final Counter takesFailed; + /** Timer for take operation latency. */ private final Timer takeDuration; /** @@ -73,7 +85,7 @@ public BlockingQueueMetricsDecorator( Optional.ofNullable(registry).ifPresent(compositeMeterRegistry::add); - final List tags = new LinkedList<>(Collections.singleton(Tag.of("name", queueName))); + final Tags tags = Tags.of("name", queueName); putsTotal = Counter.builder(METRIC_PUTS_TOTAL) .description("Total number of successful put operations") diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java index 83849b5..0decc2c 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java @@ -1,8 +1,6 @@ package com.amazon.sns.messaging.lib.metrics; import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; @@ -16,7 +14,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.composite.CompositeMeterRegistry; @@ -28,24 +26,34 @@ */ public class ExecutorServiceMetricsDecorator implements ExecutorService { + /** Base tag prefix for executor metrics. */ private static final String TAG_EXECUTOR = "executor"; + /** Metric name for active task count gauge. */ private static final String METRIC_ACTIVE = TAG_EXECUTOR.concat(".active"); + /** Metric name for succeeded task counter. */ private static final String METRIC_TASKS_SUCCEEDED = TAG_EXECUTOR.concat(".tasks.succeeded"); + /** Metric name for failed task counter. */ private static final String METRIC_TASKS_FAILED = TAG_EXECUTOR.concat(".tasks.failed"); + /** Metric name for task duration timer. */ private static final String METRIC_TASK_DURATION = TAG_EXECUTOR.concat(".task.duration"); + /** The decorated executor service. */ private final ExecutorService delegate; + /** Atomic gauge tracking the number of active tasks. */ private final AtomicInteger activeTaskCount = new AtomicInteger(); + /** Counter for tasks that completed without throwing an exception. */ private final Counter succeededCounter; + /** Counter for tasks that completed by throwing an exception. */ private final Counter failedCounter; + /** Timer for wall-clock duration of task execution. */ private final Timer taskTimer; /** @@ -66,7 +74,7 @@ public ExecutorServiceMetricsDecorator( Optional.ofNullable(registry).ifPresent(compositeMeterRegistry::add); - final List tags = new LinkedList<>(Collections.singleton(Tag.of("name", executorName))); + final Tags tags = Tags.of("name", executorName); Gauge.builder(METRIC_ACTIVE, activeTaskCount, AtomicInteger::get) .tags(tags) diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java index d186ab9..74516e3 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java @@ -54,10 +54,13 @@ public static Builder builder() { @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public static class Builder { + /** The supplier function that creates a publish request from a topic ARN and entries. */ private BiFunction, R> supplier; + /** The SNS topic ARN to publish to. */ private String topicArn; + /** The list of entries to include in the publish request. */ private List entries; /** diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java index afd6e30..0944284 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumerTest.java @@ -557,7 +557,7 @@ static class TestableAmazonSnsConsumer extends AbstractAmazonSnsConsumer consumerMock; - private AbstractAmazonSnsTemplate template; + private AbstractAmazonSnsTemplate template; @BeforeEach void setUp() { - template = new AbstractAmazonSnsTemplate(producerMock, consumerMock) { }; + template = new AbstractAmazonSnsTemplate(producerMock, consumerMock) { }; } @Test diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java similarity index 90% rename from amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java rename to amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java index f689af7..4f2c24e 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java @@ -50,9 +50,9 @@ * @param the request entry payload type */ @SuppressWarnings("java:S6204") -class AmazonSnsConsumer extends AbstractAmazonSnsConsumer { +class AmazonSnsConsumerImpl extends AbstractAmazonSnsConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumerImpl.class); private static final MessageAttributes messageAttributes = new MessageAttributes(); @@ -67,7 +67,7 @@ class AmazonSnsConsumer extends AbstractAmazonSnsConsumer, PublishBatchRequest> su .withSubject(StringUtils.isNotBlank(entry.getSubject()) ? entry.getSubject() : null) .withMessageGroupId(StringUtils.isNotBlank(entry.getGroupId()) ? entry.getGroupId() : null) .withMessageDeduplicationId(StringUtils.isNotBlank(entry.getDeduplicationId()) ? entry.getDeduplicationId() : null) - .withMessageAttributes(AmazonSnsConsumer.messageAttributes.messageAttributes(entry.getMessageHeaders())) + .withMessageAttributes(AmazonSnsConsumerImpl.messageAttributes.messageAttributes(entry.getMessageHeaders())) .withMessage(entry.getMessage())) .collect(Collectors.toList()); return new PublishBatchRequest().withPublishBatchRequestEntries(entries).withTopicArn(topicArn); @@ -109,11 +109,11 @@ protected BiFunction, PublishBatchRequest> su * {@inheritDoc} */ @Override - protected void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { + public void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { final String code = throwable instanceof AmazonServiceException ? AmazonServiceException.class.cast(throwable).getErrorCode() : "000"; final String message = throwable instanceof AmazonServiceException ? AmazonServiceException.class.cast(throwable).getErrorMessage() : throwable.getMessage(); - AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); + AmazonSnsConsumerImpl.LOGGER.error(throwable.getMessage(), throwable); publishBatchRequest.getPublishBatchRequestEntries().forEach(entry -> Optional.ofNullable(pendingRequests.remove(entry.getId())).ifPresent(listenableFuture -> @@ -131,7 +131,7 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final * {@inheritDoc} */ @Override - protected void handleResponse(final PublishBatchResult publishBatchResult) { + public void handleResponse(final PublishBatchResult publishBatchResult) { publishBatchResult.getSuccessful().forEach(entry -> Optional.ofNullable(pendingRequests.remove(entry.getId())).ifPresent(listenableFuture -> listenableFuture.success(ResponseSuccessEntry.builder() diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java similarity index 91% rename from amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java rename to amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java index fdcab93..9c25f96 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java @@ -30,7 +30,7 @@ * * @param the request entry payload type */ -class AmazonSnsProducer extends AbstractAmazonSnsProducer { +class AmazonSnsProducerImpl extends AbstractAmazonSnsProducer { /** * Creates a new v1 producer. @@ -38,7 +38,7 @@ class AmazonSnsProducer extends AbstractAmazonSnsProducer { * @param pendingRequests the shared map of pending requests keyed by request ID * @param topicRequests the shared blocking queue for topic requests */ - public AmazonSnsProducer( + public AmazonSnsProducerImpl( final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests) { super(pendingRequests, topicRequests); diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index 8127504..d4c9e0f 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -20,6 +20,7 @@ import java.util.function.UnaryOperator; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; +import com.amazon.sns.messaging.lib.metrics.AmazonSnsConsumerMetricsDecorator; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.amazonaws.services.sns.AmazonSNS; @@ -34,22 +35,26 @@ * * @param the request entry payload type */ -public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { +public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { private AmazonSnsTemplate(final Builder> builder) { super( - new AmazonSnsProducer<>( + new AmazonSnsProducerImpl<>( builder.getPendingRequests(), builder.getTopicRequests() ), - new AmazonSnsConsumer<>( - builder.getAmazonSnsClient(), + new AmazonSnsConsumerMetricsDecorator( + new AmazonSnsConsumerImpl<>( + builder.getAmazonSnsClient(), + builder.getTopicProperty(), + builder.getObjectMapper(), + builder.getPendingRequests(), + builder.getTopicRequests(), + getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), + builder.getPublishDecorator() + ), builder.getTopicProperty(), - builder.getObjectMapper(), - builder.getPendingRequests(), - builder.getTopicRequests(), - getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), - builder.getPublishDecorator() + builder.getMeterRegistry() ) ); } diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java new file mode 100644 index 0000000..ab75eed --- /dev/null +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java @@ -0,0 +1,95 @@ +package com.amazon.sns.messaging.lib.metrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; +import com.amazon.sns.messaging.lib.model.TopicProperty; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.sns.model.PublishBatchRequest; +import com.amazonaws.services.sns.model.PublishBatchResult; + +import io.micrometer.core.instrument.MeterRegistry; + +// @formatter:off +/** + * AWS SDK v1 metrics decorator for {@link AmazonSnsConsumer}. Records publish attempt/success/failure + * counters, latency, batch size, and inflight gauges using Micrometer. Handles + * {@link AmazonServiceException} error codes for failure tagging. + */ +public class AmazonSnsConsumerMetricsDecorator extends AbstractAmazonSnsConsumerMetricsDecorator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumerMetricsDecorator.class); + + /** + * Creates a new v1 SNS consumer metrics decorator. + * + * @param delegate the consumer to decorate + * @param topicProperty the topic configuration + * @param meterRegistry the Micrometer meter registry + */ + public AmazonSnsConsumerMetricsDecorator( + final AmazonSnsConsumer delegate, + final TopicProperty topicProperty, + final MeterRegistry meterRegistry) { + super(delegate, topicProperty, meterRegistry); + } + + /** + * {@inheritDoc} + */ + @Override + public PublishBatchResult publish(final PublishBatchRequest publishBatchRequest) { + publishAttemptsCounter.increment(); + batchSizeSummary.record(publishBatchRequest.getPublishBatchRequestEntries().size()); + inflightGauge.incrementAndGet(); + + try { + return publishTimer.recordCallable(() -> delegate.publish(publishBatchRequest)); + } catch (final RuntimeException ex) { + throw ex; + } catch (final Exception ex) { + throw new RuntimeException(ex); + } finally { + inflightGauge.decrementAndGet(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void handleResponse(final PublishBatchResult publishBatchResult) { + delegate.handleResponse(publishBatchResult); + + final int successCount = publishBatchResult.getSuccessful().size(); + final int failureCount = publishBatchResult.getFailed().size(); + + if (successCount > 0) { + successCounter.increment(successCount); + } + + publishBatchResult.getFailed().forEach(entry -> failureCounter(entry.getCode(), ERROR_TYPE_AMAZON).increment()); + + if (failureCount > 0) { + LOGGER.warn("SNS batch partially failed: {} succeeded, {} failed", successCount, failureCount); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { + delegate.handleError(publishBatchRequest, throwable); + + final String errorCode = throwable instanceof AmazonServiceException ? AmazonServiceException.class.cast(throwable).getErrorCode() : "000"; + + final String errorType = throwable instanceof AmazonServiceException ? ERROR_TYPE_AMAZON : ERROR_TYPE_OTHER; + + final int failedEntries = publishBatchRequest.getPublishBatchRequestEntries().size(); + + failureCounter(errorCode, errorType).increment(failedEntries); + } + +} diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java new file mode 100644 index 0000000..e03b07c --- /dev/null +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java @@ -0,0 +1,462 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; +import com.amazon.sns.messaging.lib.model.TopicProperty; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.sns.model.BatchResultErrorEntry; +import com.amazonaws.services.sns.model.PublishBatchRequest; +import com.amazonaws.services.sns.model.PublishBatchRequestEntry; +import com.amazonaws.services.sns.model.PublishBatchResult; +import com.amazonaws.services.sns.model.PublishBatchResultEntry; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +// @formatter:off +@ExtendWith(MockitoExtension.class) +class AmazonSnsConsumerMetricsDecoratorTest { + + private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:my-topic"; + + @Mock + private AmazonSnsConsumer delegate; + + @Mock + private TopicProperty topicProperty; + + private MeterRegistry registry; + + private AmazonSnsConsumerMetricsDecorator sut; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + when(topicProperty.getTopicArn()).thenReturn(TOPIC_ARN); + sut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, registry); + } + + @Nested + @DisplayName("publish()") + class Publish { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchRequest request = batchRequest(2); + when(delegate.publish(request)).thenReturn(successResult("id-1", "id-2")); + + sut.publish(request); + + verify(delegate, times(1)).publish(request); + } + + @Test + @DisplayName("should increment attempt counter on success") + void shouldIncrementAttemptCounterOnSuccess() { + when(delegate.publish(any())).thenReturn(successResult("id-1")); + + sut.publish(batchRequest(1)); + + assertThat(attemptsCount()).isEqualTo(1.0); + } + + @Test + @DisplayName("should increment attempt counter even when delegate throws") + void shouldIncrementAttemptCounterOnException() { + when(delegate.publish(any())).thenThrow(new RuntimeException("connection refused")); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + + assertThat(attemptsCount()).isEqualTo(1.0); + } + + @Test + @DisplayName("should record batch size in distribution summary") + void shouldRecordBatchSize() { + when(delegate.publish(any())).thenReturn(successResult("a", "b", "c")); + + sut.publish(batchRequest(3)); + + assertThat(batchSizeCount()).isEqualTo(1L); + assertThat(batchSizeMean()).isEqualTo(3.0); + } + + @Test + @DisplayName("should record duration in timer") + void shouldRecordDurationInTimer() { + when(delegate.publish(any())).thenReturn(successResult("x")); + + sut.publish(batchRequest(1)); + + assertThat(timerCount()).isEqualTo(1L); + } + + @Test + @DisplayName("should decrement inflight gauge after successful publish") + void shouldDecrementInflightAfterSuccess() { + when(delegate.publish(any())).thenReturn(successResult("y")); + + sut.publish(batchRequest(1)); + + assertThat(inflightValue()).isZero(); + } + + @Test + @DisplayName("should decrement inflight gauge even when delegate throws") + void shouldDecrementInflightAfterException() { + when(delegate.publish(any())).thenThrow(new RuntimeException("timeout")); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + + assertThat(inflightValue()).isZero(); + } + + @Test + @DisplayName("should propagate RuntimeException from delegate unchanged") + void shouldPropagateRuntimeException() { + final RuntimeException cause = new RuntimeException("sns down"); + when(delegate.publish(any())).thenThrow(cause); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isSameAs(cause); + } + + @Test + @DisplayName("should wrap checked Exception from delegate in RuntimeException") + void shouldWrapCheckedExceptionInRuntimeException() { + when(delegate.publish(any())).thenAnswer(inv -> { + throw new Exception("checked"); + }); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class).hasMessageContaining("checked"); + } + } + + @Nested + @DisplayName("handleResponse()") + class HandleResponse { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchResult result = successResult("id-1"); + + sut.handleResponse(result); + + verify(delegate, times(1)).handleResponse(result); + } + + @Test + @DisplayName("should increment success counter once per successful message") + void shouldIncrementSuccessCounterPerMessage() { + final PublishBatchResult result = successResult("id-1", "id-2", "id-3"); + + sut.handleResponse(result); + + assertThat(successCount()).isEqualTo(3.0); + } + + @Test + @DisplayName("should not increment success counter when there are no successful entries") + void shouldNotIncrementSuccessCounterWhenEmpty() { + final PublishBatchResult result = new PublishBatchResult(); + result.setSuccessful(Collections.emptyList()); + result.setFailed(Collections.singleton(failedEntry("id-1", "InvalidParameter"))); + + sut.handleResponse(result); + + assertThat(successCount()).isZero(); + } + + @Test + @DisplayName("should increment failure counter once per failed message") + void shouldIncrementFailureCounterPerFailedMessage() { + final PublishBatchResult result = new PublishBatchResult(); + result.setSuccessful(Collections.emptyList()); + result.setFailed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "MessageTooLong"))); + + sut.handleResponse(result); + + assertThat(failureCountByCode("InvalidParameter")).isEqualTo(1.0); + assertThat(failureCountByCode("MessageTooLong")).isEqualTo(1.0); + } + + @Test + @DisplayName("should accumulate failures with the same error code") + void shouldAccumulateFailuresWithSameErrorCode() { + final PublishBatchResult result = new PublishBatchResult(); + result.setSuccessful(Collections.emptyList()); + result.setFailed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "InvalidParameter"), failedEntry("id-c", "InvalidParameter"))); + + sut.handleResponse(result); + + assertThat(failureCountByCode("InvalidParameter")).isEqualTo(3.0); + } + + @Test + @DisplayName("should handle mixed batch with both successes and failures") + void shouldHandleMixedBatch() { + final PublishBatchResult result = new PublishBatchResult(); + result.setSuccessful(Arrays.asList(successEntry("id-ok-1"), successEntry("id-ok-2"))); + result.setFailed(Collections.singleton(failedEntry("id-bad", "KMSDisabled"))); + + sut.handleResponse(result); + + assertThat(successCount()).isEqualTo(2.0); + assertThat(failureCountByCode("KMSDisabled")).isEqualTo(1.0); + } + + @Test + @DisplayName("should tag failures with error_type 'amazon_service_exception'") + void shouldTagFailuresWithAmazonErrorType() { + final PublishBatchResult result = new PublishBatchResult(); + result.setSuccessful(Collections.emptyList()); + result.setFailed(Collections.singleton(failedEntry("id-x", "ThrottledException"))); + + sut.handleResponse(result); + + final Counter failureCounter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "ThrottledException") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); + + assertThat(failureCounter).isNotNull(); + assertThat(failureCounter.count()).isEqualTo(1.0); + } + } + + @Nested + @DisplayName("handleError()") + class HandleError { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchRequest request = batchRequest(1); + final RuntimeException cause = new RuntimeException("transport error"); + + sut.handleError(request, cause); + + verify(delegate, times(1)).handleError(request, cause); + } + + @Test + @DisplayName("should count all batch entries as failures on AmazonServiceException") + void shouldCountAllEntriesAsFailures_onAmazonServiceException() { + final PublishBatchRequest request = batchRequest(3); + final AmazonServiceException cause = serviceException("InternalError"); + + sut.handleError(request, cause); + + assertThat(failureCountByCode("InternalError")).isEqualTo(3.0); + } + + @Test + @DisplayName("should tag AmazonServiceException failures with correct error_type") + void shouldTagAmazonServiceExceptionWithCorrectErrorType() { + final PublishBatchRequest request = batchRequest(1); + final AmazonServiceException cause = serviceException("InternalError"); + + sut.handleError(request, cause); + + final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "InternalError") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); + + assertThat(counter).isNotNull(); + assertThat(counter.count()).isEqualTo(1.0); + } + + @Test + @DisplayName("should use error code '000' for non-AmazonServiceException") + void shouldUseDefaultErrorCode_forGenericException() { + final PublishBatchRequest request = batchRequest(2); + + sut.handleError(request, new RuntimeException("network timeout")); + + assertThat(failureCountByCode("000")).isEqualTo(2.0); + } + + @Test + @DisplayName("should tag generic exceptions with error_type 'unknown'") + void shouldTagGenericExceptionsWithUnknownErrorType() { + final PublishBatchRequest request = batchRequest(1); + + sut.handleError(request, new RuntimeException("network timeout")); + + final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "000") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter(); + + assertThat(counter).isNotNull(); + assertThat(counter.count()).isEqualTo(1.0); + } + + @Test + @DisplayName("should count each entry individually when batch has multiple entries") + void shouldCountEachEntryIndividually() { + final PublishBatchRequest request = batchRequest(5); + final AmazonServiceException cause = serviceException("ServiceUnavailable"); + + sut.handleError(request, cause); + + assertThat(failureCountByCode("ServiceUnavailable")).isEqualTo(5.0); + } + } + + @Nested + @DisplayName("lifecycle methods") + class Lifecycle { + + @Test + @DisplayName("shutdown() should delegate to the wrapped consumer") + void shutdown_shouldDelegate() { + sut.shutdown(); + verify(delegate, times(1)).shutdown(); + } + + @Test + @DisplayName("await() should delegate to the wrapped consumer") + void await_shouldDelegate() { + sut.await(); + verify(delegate, times(1)).await(); + } + } + + @Nested + @DisplayName("null MeterRegistry") + class NullRegistry { + + @Test + @DisplayName("should not throw when MeterRegistry is null") + void shouldNotThrowWhenRegistryIsNull() { + final AmazonSnsConsumerMetricsDecorator nullRegistrySut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); + + when(delegate.publish(any())).thenReturn(successResult("id-1")); + + // should execute without NullPointerException + nullRegistrySut.publish(batchRequest(1)); + } + } + + private double attemptsCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); + } + + private double successCount() { + final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter(); + + return c == null ? 0.0 : c.count(); + } + + private double failureCountByCode(final String code) { + final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag("error_code", code) + .counter(); + + return c == null ? 0.0 : c.count(); + } + + private long batchSizeCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .count(); + } + + private double batchSizeMean() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .mean(); + } + + private long timerCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer() + .count(); + } + + private double inflightValue() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); + } + + private static PublishBatchRequest batchRequest(final int count) { + final PublishBatchRequest request = new PublishBatchRequest().withTopicArn(TOPIC_ARN); + for (int i = 0; i < count; i++) { + request.getPublishBatchRequestEntries().add(new PublishBatchRequestEntry().withId("id-" + i).withMessage("msg-" + i)); + } + return request; + } + + private static PublishBatchResult successResult(final String... ids) { + final PublishBatchResult result = new PublishBatchResult(); + for (final String id : ids) { + result.getSuccessful().add(successEntry(id)); + } + result.setFailed(Collections.emptyList()); + return result; + } + + private static PublishBatchResultEntry successEntry(final String id) { + return new PublishBatchResultEntry().withId(id).withMessageId("msg-" + id); + } + + private static BatchResultErrorEntry failedEntry(final String id, final String code) { + return new BatchResultErrorEntry().withId(id).withCode(code).withMessage("error detail").withSenderFault(true); + } + + private static AmazonServiceException serviceException(final String code) { + final AmazonServiceException ex = new AmazonServiceException("Service error"); + ex.setErrorCode(code); + return ex; + } + +} +// @formatter:on \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java similarity index 90% rename from amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java rename to amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java index ffbd720..c88efb7 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumerImpl.java @@ -51,9 +51,9 @@ * @param the request entry payload type */ @SuppressWarnings("java:S6204") -class AmazonSnsConsumer extends AbstractAmazonSnsConsumer { +class AmazonSnsConsumerImpl extends AbstractAmazonSnsConsumer { - private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumer.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumerImpl.class); private static final MessageAttributes messageAttributes = new MessageAttributes(); @@ -68,7 +68,7 @@ class AmazonSnsConsumer extends AbstractAmazonSnsConsumer, PublishBatchRequest> su .subject(StringUtils.isNotBlank(entry.getSubject()) ? entry.getSubject() : null) .messageGroupId(StringUtils.isNotBlank(entry.getGroupId()) ? entry.getGroupId() : null) .messageDeduplicationId(StringUtils.isNotBlank(entry.getDeduplicationId()) ? entry.getDeduplicationId() : null) - .messageAttributes(AmazonSnsConsumer.messageAttributes.messageAttributes(entry.getMessageHeaders())) + .messageAttributes(AmazonSnsConsumerImpl.messageAttributes.messageAttributes(entry.getMessageHeaders())) .message(entry.getMessage()) .build()) .collect(Collectors.toList()); @@ -111,11 +111,11 @@ protected BiFunction, PublishBatchRequest> su * {@inheritDoc} */ @Override - protected void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { + public void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { final String code = throwable instanceof AwsServiceException ? AwsServiceException.class.cast(throwable).awsErrorDetails().errorCode() : "000"; final String message = throwable instanceof AwsServiceException ? AwsServiceException.class.cast(throwable).awsErrorDetails().errorMessage() : throwable.getMessage(); - AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); + AmazonSnsConsumerImpl.LOGGER.error(throwable.getMessage(), throwable); publishBatchRequest.publishBatchRequestEntries().forEach(entry -> Optional.ofNullable(pendingRequests.remove(entry.id())).ifPresent(listenableFuture -> @@ -133,7 +133,7 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final * {@inheritDoc} */ @Override - protected void handleResponse(final PublishBatchResponse publishBatchResult) { + public void handleResponse(final PublishBatchResponse publishBatchResult) { publishBatchResult.successful().forEach(entry -> Optional.ofNullable(pendingRequests.remove(entry.id())).ifPresent(listenableFuture -> listenableFuture.success(ResponseSuccessEntry.builder() diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java similarity index 91% rename from amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java rename to amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java index 3b5d1dc..8b06b39 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerImpl.java @@ -30,7 +30,7 @@ * * @param the request entry payload type */ -class AmazonSnsProducer extends AbstractAmazonSnsProducer { +class AmazonSnsProducerImpl extends AbstractAmazonSnsProducer { /** * Creates a new v2 producer. @@ -38,7 +38,7 @@ class AmazonSnsProducer extends AbstractAmazonSnsProducer { * @param pendingRequests the shared map of pending requests keyed by request ID * @param topicRequests the shared blocking queue for topic requests */ - public AmazonSnsProducer( + public AmazonSnsProducerImpl( final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests) { super(pendingRequests, topicRequests); diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index d3f84fd..d9403ae 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -20,6 +20,7 @@ import java.util.function.UnaryOperator; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; +import com.amazon.sns.messaging.lib.metrics.AmazonSnsConsumerMetricsDecorator; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -35,22 +36,26 @@ * * @param the request entry payload type */ -public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { +public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { private AmazonSnsTemplate(final Builder> builder) { super( - new AmazonSnsProducer<>( + new AmazonSnsProducerImpl<>( builder.getPendingRequests(), builder.getTopicRequests() ), - new AmazonSnsConsumer<>( - builder.getAmazonSnsClient(), + new AmazonSnsConsumerMetricsDecorator( + new AmazonSnsConsumerImpl<>( + builder.getAmazonSnsClient(), + builder.getTopicProperty(), + builder.getObjectMapper(), + builder.getPendingRequests(), + builder.getTopicRequests(), + getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), + builder.getPublishDecorator() + ), builder.getTopicProperty(), - builder.getObjectMapper(), - builder.getPendingRequests(), - builder.getTopicRequests(), - getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), - builder.getPublishDecorator() + builder.getMeterRegistry() ) ); } diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java new file mode 100644 index 0000000..775a53a --- /dev/null +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecorator.java @@ -0,0 +1,95 @@ +package com.amazon.sns.messaging.lib.metrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; +import com.amazon.sns.messaging.lib.model.TopicProperty; + +import io.micrometer.core.instrument.MeterRegistry; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sns.model.PublishBatchRequest; +import software.amazon.awssdk.services.sns.model.PublishBatchResponse; + +// @formatter:off +/** + * AWS SDK v2 metrics decorator for {@link AmazonSnsConsumer}. Records publish attempt/success/failure + * counters, latency, batch size, and inflight gauges using Micrometer. Handles + * {@link AwsServiceException} error codes for failure tagging. + */ +public class AmazonSnsConsumerMetricsDecorator extends AbstractAmazonSnsConsumerMetricsDecorator { + + private static final Logger LOGGER = LoggerFactory.getLogger(AmazonSnsConsumerMetricsDecorator.class); + + /** + * Creates a new v2 SNS consumer metrics decorator. + * + * @param delegate the consumer to decorate + * @param topicProperty the topic configuration + * @param meterRegistry the Micrometer meter registry + */ + public AmazonSnsConsumerMetricsDecorator( + final AmazonSnsConsumer delegate, + final TopicProperty topicProperty, + final MeterRegistry meterRegistry) { + super(delegate, topicProperty, meterRegistry); + } + + /** + * {@inheritDoc} + */ + @Override + public PublishBatchResponse publish(final PublishBatchRequest publishBatchRequest) { + publishAttemptsCounter.increment(); + batchSizeSummary.record(publishBatchRequest.publishBatchRequestEntries().size()); + inflightGauge.incrementAndGet(); + + try { + return publishTimer.recordCallable(() -> delegate.publish(publishBatchRequest)); + } catch (final RuntimeException ex) { + throw ex; + } catch (final Exception ex) { + throw new RuntimeException(ex); + } finally { + inflightGauge.decrementAndGet(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void handleResponse(final PublishBatchResponse publishBatchResult) { + delegate.handleResponse(publishBatchResult); + + final int successCount = publishBatchResult.successful().size(); + final int failureCount = publishBatchResult.failed().size(); + + if (successCount > 0) { + successCounter.increment(successCount); + } + + publishBatchResult.failed().forEach(entry -> failureCounter(entry.code(), ERROR_TYPE_AMAZON).increment()); + + if (failureCount > 0) { + LOGGER.warn("SNS batch partially failed: {} succeeded, {} failed", successCount, failureCount); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void handleError(final PublishBatchRequest publishBatchRequest, final Throwable throwable) { + delegate.handleError(publishBatchRequest, throwable); + + final String errorCode = throwable instanceof AwsServiceException ? AwsServiceException.class.cast(throwable).awsErrorDetails().errorCode() : "000"; + + final String errorType = throwable instanceof AwsServiceException ? ERROR_TYPE_AMAZON : ERROR_TYPE_OTHER; + + final int failedEntries = publishBatchRequest.publishBatchRequestEntries().size(); + + failureCounter(errorCode, errorType).increment(failedEntries); + } + +} diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java new file mode 100644 index 0000000..179ef13 --- /dev/null +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/metrics/AmazonSnsConsumerMetricsDecoratorTest.java @@ -0,0 +1,478 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.metrics; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.amazon.sns.messaging.lib.core.AmazonSnsConsumer; +import com.amazon.sns.messaging.lib.model.TopicProperty; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sns.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sns.model.PublishBatchRequest; +import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry; +import software.amazon.awssdk.services.sns.model.PublishBatchResponse; +import software.amazon.awssdk.services.sns.model.PublishBatchResultEntry; + +// @formatter:off +@ExtendWith(MockitoExtension.class) +class AmazonSnsConsumerMetricsDecoratorTest { + + private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:000000000000:my-topic"; + + @Mock + private AmazonSnsConsumer delegate; + + @Mock + private TopicProperty topicProperty; + + private MeterRegistry registry; + + private AmazonSnsConsumerMetricsDecorator sut; + + @BeforeEach + void setUp() { + registry = new SimpleMeterRegistry(); + when(topicProperty.getTopicArn()).thenReturn(TOPIC_ARN); + sut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, registry); + } + + @Nested + @DisplayName("publish()") + class Publish { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchRequest request = batchRequest(2); + when(delegate.publish(request)).thenReturn(successResult("id-1", "id-2")); + + sut.publish(request); + + verify(delegate, times(1)).publish(request); + } + + @Test + @DisplayName("should increment attempt counter on success") + void shouldIncrementAttemptCounterOnSuccess() { + when(delegate.publish(any())).thenReturn(successResult("id-1")); + + sut.publish(batchRequest(1)); + + assertThat(attemptsCount()).isEqualTo(1.0); + } + + @Test + @DisplayName("should increment attempt counter even when delegate throws") + void shouldIncrementAttemptCounterOnException() { + when(delegate.publish(any())).thenThrow(new RuntimeException("connection refused")); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + + assertThat(attemptsCount()).isEqualTo(1.0); + } + + @Test + @DisplayName("should record batch size in distribution summary") + void shouldRecordBatchSize() { + when(delegate.publish(any())).thenReturn(successResult("a", "b", "c")); + + sut.publish(batchRequest(3)); + + assertThat(batchSizeCount()).isEqualTo(1L); + assertThat(batchSizeMean()).isEqualTo(3.0); + } + + @Test + @DisplayName("should record duration in timer") + void shouldRecordDurationInTimer() { + when(delegate.publish(any())).thenReturn(successResult("x")); + + sut.publish(batchRequest(1)); + + assertThat(timerCount()).isEqualTo(1L); + } + + @Test + @DisplayName("should decrement inflight gauge after successful publish") + void shouldDecrementInflightAfterSuccess() { + when(delegate.publish(any())).thenReturn(successResult("y")); + + sut.publish(batchRequest(1)); + + assertThat(inflightValue()).isZero(); + } + + @Test + @DisplayName("should decrement inflight gauge even when delegate throws") + void shouldDecrementInflightAfterException() { + when(delegate.publish(any())).thenThrow(new RuntimeException("timeout")); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class); + + assertThat(inflightValue()).isZero(); + } + + @Test + @DisplayName("should propagate RuntimeException from delegate unchanged") + void shouldPropagateRuntimeException() { + final RuntimeException cause = new RuntimeException("sns down"); + when(delegate.publish(any())).thenThrow(cause); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isSameAs(cause); + } + + @Test + @DisplayName("should wrap checked Exception from delegate in RuntimeException") + void shouldWrapCheckedExceptionInRuntimeException() { + when(delegate.publish(any())).thenAnswer(inv -> { + throw new Exception("checked"); + }); + + assertThatThrownBy(() -> sut.publish(batchRequest(1))).isInstanceOf(RuntimeException.class).hasMessageContaining("checked"); + } + } + + @Nested + @DisplayName("handleResponse()") + class HandleResponse { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchResponse result = successResult("id-1"); + + sut.handleResponse(result); + + verify(delegate, times(1)).handleResponse(result); + } + + @Test + @DisplayName("should increment success counter once per successful message") + void shouldIncrementSuccessCounterPerMessage() { + final PublishBatchResponse result = successResult("id-1", "id-2", "id-3"); + + sut.handleResponse(result); + + assertThat(successCount()).isEqualTo(3.0); + } + + @Test + @DisplayName("should not increment success counter when there are no successful entries") + void shouldNotIncrementSuccessCounterWhenEmpty() { + final PublishBatchResponse result = PublishBatchResponse.builder() + .successful(Collections.emptyList()) + .failed(Collections.singleton(failedEntry("id-1", "InvalidParameter"))) + .build(); + + sut.handleResponse(result); + + assertThat(successCount()).isZero(); + } + + @Test + @DisplayName("should increment failure counter once per failed message") + void shouldIncrementFailureCounterPerFailedMessage() { + final PublishBatchResponse result = PublishBatchResponse.builder() + .successful(Collections.emptyList()) + .failed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "MessageTooLong"))) + .build(); + + sut.handleResponse(result); + + assertThat(failureCountByCode("InvalidParameter")).isEqualTo(1.0); + assertThat(failureCountByCode("MessageTooLong")).isEqualTo(1.0); + } + + @Test + @DisplayName("should accumulate failures with the same error code") + void shouldAccumulateFailuresWithSameErrorCode() { + final PublishBatchResponse result = PublishBatchResponse.builder() + .successful(Collections.emptyList()) + .failed(Arrays.asList(failedEntry("id-a", "InvalidParameter"), failedEntry("id-b", "InvalidParameter"), failedEntry("id-c", "InvalidParameter"))) + .build(); + + sut.handleResponse(result); + + assertThat(failureCountByCode("InvalidParameter")).isEqualTo(3.0); + } + + @Test + @DisplayName("should handle mixed batch with both successes and failures") + void shouldHandleMixedBatch() { + final PublishBatchResponse result = PublishBatchResponse.builder() + .successful(Arrays.asList(successEntry("id-ok-1"), successEntry("id-ok-2"))) + .failed(Collections.singleton(failedEntry("id-bad", "KMSDisabled"))) + .build(); + + sut.handleResponse(result); + + assertThat(successCount()).isEqualTo(2.0); + assertThat(failureCountByCode("KMSDisabled")).isEqualTo(1.0); + } + + @Test + @DisplayName("should tag failures with error_type 'amazon_service_exception'") + void shouldTagFailuresWithAmazonErrorType() { + final PublishBatchResponse result = PublishBatchResponse.builder() + .successful(Collections.emptyList()) + .failed(Collections.singleton(failedEntry("id-x", "ThrottledException"))) + .build(); + + sut.handleResponse(result); + + final Counter failureCounter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "ThrottledException") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); + + assertThat(failureCounter).isNotNull(); + assertThat(failureCounter.count()).isEqualTo(1.0); + } + } + + @Nested + @DisplayName("handleError()") + class HandleError { + + @Test + @DisplayName("should delegate to the wrapped consumer") + void shouldDelegateToWrappedConsumer() { + final PublishBatchRequest request = batchRequest(1); + final RuntimeException cause = new RuntimeException("transport error"); + + sut.handleError(request, cause); + + verify(delegate, times(1)).handleError(request, cause); + } + + @Test + @DisplayName("should count all batch entries as failures on AwsServiceException") + void shouldCountAllEntriesAsFailures_onAwsServiceException() { + final PublishBatchRequest request = batchRequest(3); + final AwsServiceException cause = serviceException("InternalError"); + + sut.handleError(request, cause); + + assertThat(failureCountByCode("InternalError")).isEqualTo(3.0); + } + + @Test + @DisplayName("should tag AwsServiceException failures with correct error_type") + void shouldTagAwsServiceExceptionWithCorrectErrorType() { + final PublishBatchRequest request = batchRequest(1); + final AwsServiceException cause = serviceException("InternalError"); + + sut.handleError(request, cause); + + final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "InternalError") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_AMAZON) + .counter(); + + assertThat(counter).isNotNull(); + assertThat(counter.count()).isEqualTo(1.0); + } + + @Test + @DisplayName("should use error code '000' for non-AwsServiceException") + void shouldUseDefaultErrorCode_forGenericException() { + final PublishBatchRequest request = batchRequest(2); + + sut.handleError(request, new RuntimeException("network timeout")); + + assertThat(failureCountByCode("000")).isEqualTo(2.0); + } + + @Test + @DisplayName("should tag generic exceptions with error_type 'unknown'") + void shouldTagGenericExceptionsWithUnknownErrorType() { + final PublishBatchRequest request = batchRequest(1); + + sut.handleError(request, new RuntimeException("network timeout")); + + final Counter counter = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("error_code", "000") + .tag("error_type", AbstractAmazonSnsConsumerMetricsDecorator.ERROR_TYPE_OTHER) + .counter(); + + assertThat(counter).isNotNull(); + assertThat(counter.count()).isEqualTo(1.0); + } + + @Test + @DisplayName("should count each entry individually when batch has multiple entries") + void shouldCountEachEntryIndividually() { + final PublishBatchRequest request = batchRequest(5); + final AwsServiceException cause = serviceException("ServiceUnavailable"); + + sut.handleError(request, cause); + + assertThat(failureCountByCode("ServiceUnavailable")).isEqualTo(5.0); + } + } + + @Nested + @DisplayName("lifecycle methods") + class Lifecycle { + + @Test + @DisplayName("shutdown() should delegate to the wrapped consumer") + void shutdown_shouldDelegate() { + sut.shutdown(); + verify(delegate, times(1)).shutdown(); + } + + @Test + @DisplayName("await() should delegate to the wrapped consumer") + void await_shouldDelegate() { + sut.await(); + verify(delegate, times(1)).await(); + } + } + + @Nested + @DisplayName("null MeterRegistry") + class NullRegistry { + + @Test + @DisplayName("should not throw when MeterRegistry is null") + void shouldNotThrowWhenRegistryIsNull() { + final AmazonSnsConsumerMetricsDecorator nullRegistrySut = new AmazonSnsConsumerMetricsDecorator(delegate, topicProperty, null); + + when(delegate.publish(any())).thenReturn(successResult("id-1")); + + // should execute without NullPointerException + nullRegistrySut.publish(batchRequest(1)); + } + } + + private double attemptsCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_ATTEMPTS) + .tag("topic", TOPIC_ARN) + .counter() + .count(); + } + + private double successCount() { + final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_SUCCESS) + .tag("topic", TOPIC_ARN) + .counter(); + + return c == null ? 0.0 : c.count(); + } + + private double failureCountByCode(final String code) { + final Counter c = registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_FAILURE) + .tag("topic", TOPIC_ARN) + .tag("error_code", code) + .counter(); + + return c == null ? 0.0 : c.count(); + } + + private long batchSizeCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .count(); + } + + private double batchSizeMean() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_BATCH_SIZE) + .tag("topic", TOPIC_ARN) + .summary() + .mean(); + } + + private long timerCount() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_DURATION) + .tag("topic", TOPIC_ARN) + .timer() + .count(); + } + + private double inflightValue() { + return registry.find(AbstractAmazonSnsConsumerMetricsDecorator.METRIC_PUBLISH_INFLIGHT) + .tag("topic", TOPIC_ARN) + .gauge() + .value(); + } + + private static PublishBatchRequest batchRequest(final int count) { + final List entries = new LinkedList<>(); + + IntStream.range(0, count).forEach(i -> { + entries.add(PublishBatchRequestEntry.builder().id("id-" + i).message("msg-" + i).build()); + }); + + return PublishBatchRequest.builder().topicArn(TOPIC_ARN).publishBatchRequestEntries(entries).build(); + } + + private static PublishBatchResponse successResult(final String... ids) { + + final List successEntries = new LinkedList<>(); + + for (final String id : ids) { + successEntries.add(successEntry(id)); + } + + return PublishBatchResponse.builder().successful(successEntries).failed(Collections.emptyList()).build(); + } + + private static PublishBatchResultEntry successEntry(final String id) { + return PublishBatchResultEntry.builder().id(id).messageId("msg-" + id).build(); + } + + private static BatchResultErrorEntry failedEntry(final String id, final String code) { + return BatchResultErrorEntry.builder().id(id).code(code).message("error detail").senderFault(true).build(); + } + + private static AwsServiceException serviceException(final String code) { + return AwsServiceException.builder() + .message("Service error") + .awsErrorDetails(AwsErrorDetails.builder() + .errorCode(code) + .build() + ).build(); + } + +} +// @formatter:on \ No newline at end of file From a2b2b408073fb79bf15c1ba25492559abdaea1e3 Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Wed, 3 Jun 2026 20:30:34 -0300 Subject: [PATCH 2/2] feat: add decorator metrics Signed-off-by: Marcos Tischer Vallim --- amazon-sns-java-messaging-lib-v1/src/test/resources/logback.xml | 2 +- amazon-sns-java-messaging-lib-v2/src/test/resources/logback.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-sns-java-messaging-lib-v1/src/test/resources/logback.xml b/amazon-sns-java-messaging-lib-v1/src/test/resources/logback.xml index cbec016..5ea5380 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/resources/logback.xml +++ b/amazon-sns-java-messaging-lib-v1/src/test/resources/logback.xml @@ -10,6 +10,6 @@ - + \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-v2/src/test/resources/logback.xml b/amazon-sns-java-messaging-lib-v2/src/test/resources/logback.xml index cbec016..5ea5380 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/resources/logback.xml +++ b/amazon-sns-java-messaging-lib-v2/src/test/resources/logback.xml @@ -10,6 +10,6 @@ - + \ No newline at end of file