diff --git a/.github/workflows/ci-maven.yml b/.github/workflows/ci-maven.yml index d1c2ef3..d1572e6 100644 --- a/.github/workflows/ci-maven.yml +++ b/.github/workflows/ci-maven.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java-version: [8, 11, 17, 21] + java-version: [8, 11, 17, 21, 25] steps: - name: Checkout repository uses: actions/checkout@v6 @@ -34,7 +34,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - java-version: [8, 11, 17, 21] + java-version: [8, 11, 17, 21, 25] steps: - name: Checkout repository uses: actions/checkout@v6 diff --git a/README.md b/README.md index 75ac036..350b9d7 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network. Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html) -_**Compatible JDK 8, 11, 17 and 21**_ +_**Compatible JDK 8, 11, 17, 21 and 25**_ _**Compatible AWS JDK v1 >= 1.12**_ diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java index c20a0a9..8f1a220 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutor.java @@ -16,96 +16,26 @@ package com.amazon.sns.messaging.lib.concurrent; -import java.util.Objects; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +// @formatter:off /** - * A custom {@link ThreadPoolExecutor} that tracks active, failed, and succeeded task counts. - * Uses a blocking submission policy to handle rejection and a synchronous queue for task handoff. + * A {@link ThreadPoolExecutor} configured for Amazon SNS publishing. Uses a + * {@link SynchronousQueue} with zero core threads, allowing threads to be created + * on demand up to the specified maximum pool size. Tasks that cannot be accepted + * immediately by the queue will block up to 30 seconds via {@link BlockingSubmissionPolicy}. */ public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor { - private final AtomicInteger activeTaskCount = new AtomicInteger(); - - private final AtomicInteger failedTaskCount = new AtomicInteger(); - - private final AtomicInteger succeededTaskCount = new AtomicInteger(); - /** - * Creates a new executor with the specified maximum pool size. + * Creates a new thread pool executor with the given maximum pool size. * - * @param maximumPoolSize the maximum number of threads to allow in the pool + * @param maximumPoolSize the maximum number of threads allowed in the pool */ public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) { super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000)); } - /** - * Returns the number of currently active tasks. - * - * @return the active task count - */ - public int getActiveTaskCount() { - return activeTaskCount.get(); - } - - /** - * Returns the number of tasks that have failed. - * - * @return the failed task count - */ - public int getFailedTaskCount() { - return failedTaskCount.get(); - } - - /** - * Returns the number of tasks that have completed successfully. - * - * @return the succeeded task count - */ - public int getSucceededTaskCount() { - return succeededTaskCount.get(); - } - - /** - * Returns the current size of the task queue. - * - * @return the queue size - */ - public int getQueueSize() { - return getQueue().size(); - } - - /** - * {@inheritDoc} - */ - @Override - protected void beforeExecute(final Thread thread, final Runnable runnable) { - try { - super.beforeExecute(thread, runnable); - } finally { - activeTaskCount.incrementAndGet(); - } - } - - /** - * {@inheritDoc} - */ - @Override - protected void afterExecute(final Runnable runnable, final Throwable throwable) { - try { - super.afterExecute(runnable, throwable); - } finally { - if (Objects.nonNull(throwable)) { - failedTaskCount.incrementAndGet(); - } else { - succeededTaskCount.incrementAndGet(); - } - activeTaskCount.decrementAndGet(); - } - } - } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java deleted file mode 100644 index ba0b92f..0000000 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/ExecutorsProvider.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sns.messaging.lib.concurrent; - -import java.lang.reflect.Method; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Supplier; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.AccessLevel; -import lombok.NoArgsConstructor; -import lombok.SneakyThrows; - -/** - * Provides {@link ExecutorService} instances, selecting between virtual thread executors - * (Java 21+) and default single-thread executors based on the runtime Java version. - */ -@NoArgsConstructor(access = AccessLevel.PRIVATE) -public final class ExecutorsProvider { - - private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorsProvider.class); - - private static Supplier supplierExecutorService; - - static { - if (ExecutorsProvider.getJavaVersion() >= 21) { - ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getVirtualExecutorService; - ExecutorsProvider.LOGGER.info("Java version is {}, using virtual thread executor", ExecutorsProvider.getJavaVersion()); - } else { - ExecutorsProvider.supplierExecutorService = ExecutorsProvider::getDefaultExecutorService; - ExecutorsProvider.LOGGER.info("Java version is {}, using default thread executor", ExecutorsProvider.getJavaVersion()); - } - } - - /** - * Returns an {@link ExecutorService} appropriate for the current Java version. - * - * @return a virtual thread executor (Java 21+) or a single-thread executor - */ - public static ExecutorService getExecutorService() { - return ExecutorsProvider.supplierExecutorService.get(); - } - - /** - * Creates a single-thread executor for Java versions below 21. - * - * @return a single-thread executor - */ - @SneakyThrows - private static ExecutorService getDefaultExecutorService() { - return Executors.newSingleThreadExecutor(); - } - - /** - * Creates a virtual thread executor using reflection (Java 21+). - * - * @return a virtual thread per task executor - */ - @SneakyThrows - private static ExecutorService getVirtualExecutorService() { - final Class clazzThread = Executors.class; - final Method ofVirtualMethod = clazzThread.getMethod("newVirtualThreadPerTaskExecutor"); - return ExecutorService.class.cast(ofVirtualMethod.invoke(null)); - } - - /** - * Parses the Java runtime version. - * - * @return the major Java version number - */ - private static int getJavaVersion() { - String version = System.getProperty("java.version"); - - if (version.startsWith("1.")) { - version = version.substring(2); - } - - final int dotPos = version.indexOf('.'); - final int dashPos = version.indexOf('-'); - final int endIndex = dotPos > -1 ? dotPos : dashPos > -1 ? dashPos : 1; - - return Integer.parseInt(version.substring(0, endIndex)); - } - -} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java index ace27d3..07f2984 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java @@ -283,6 +283,11 @@ public int drainTo(final Collection collection, final int maxElements throw new UnsupportedOperationException(); } + /** + * Internal entry wrapper that holds a value within the ring buffer. + * + * @param the type of the value + */ @Getter @Setter static class Entry { diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index 6baf27f..58fc259 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -65,8 +65,14 @@ */ abstract class AbstractAmazonSnsConsumer implements Runnable { + /** + * Kilobyte constant used for size calculations. + */ private static final Integer KB = 1024; + /** + * Maximum batch size threshold of 256 KB imposed by Amazon SNS. + */ private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB; private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class); diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java index 3a28dc7..e3eb708 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java @@ -16,14 +16,9 @@ package com.amazon.sns.messaging.lib.core; -import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicReference; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; @@ -44,14 +39,12 @@ @RequiredArgsConstructor(access = AccessLevel.PROTECTED) abstract class AbstractAmazonSnsProducer { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class); + private final AtomicReference state = new AtomicReference<>(State.RUNNIG); private final ConcurrentMap> pendingRequests; private final BlockingQueue> topicRequests; - private final ExecutorService executorService; - /** * Sends a request entry by enqueuing it for batch processing. * @@ -60,23 +53,29 @@ abstract class AbstractAmazonSnsProducer { */ @SneakyThrows public ListenableFuture send(final RequestEntry requestEntry) { - return enqueueRequest(requestEntry); + if (State.RUNNIG.equals(state.get())) { + return enqueueRequest(requestEntry); + } else { + final ListenableFutureImpl listenableFutureImpl = new ListenableFutureImpl(); + + listenableFutureImpl.fail(ResponseFailEntry.builder() + .withCode("000") + .withId(requestEntry.getId()) + .withMessage(String.format("Producer is currently in %s mode; no further messages will be accepted.", state.get().name())) + .withSenderFault(true) + .build() + ); + + return listenableFutureImpl; + } } /** - * Shuts down the producer's executor service gracefully, waiting up to 60 seconds - * for termination. + * Transitions the producer to the shutdown state. No further messages will be + * accepted once shutdown. */ - @SneakyThrows public void shutdown() { - LOGGER.warn("Shutdown producer {}", getClass().getSimpleName()); - - executorService.shutdown(); - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - LOGGER.warn("Executor service did not terminate in the specified time."); - final List droppedTasks = executorService.shutdownNow(); - LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size()); - } + state.compareAndSet(State.RUNNIG, State.SHUTDOWN); } /** @@ -94,5 +93,12 @@ private ListenableFuture enqueueRequest return trackPendingRequest; } + /** + * Lifecycle states of the producer. + */ + enum State { + RUNNIG, SHUTDOWN + } + } // @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java index 06833fe..075a18d 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplate.java @@ -16,15 +16,29 @@ package com.amazon.sns.messaging.lib.core; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import java.util.function.UnaryOperator; import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor; +import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; +import com.amazon.sns.messaging.lib.metrics.BlockingQueueMetricsDecorator; +import com.amazon.sns.messaging.lib.metrics.ExecutorServiceMetricsDecorator; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import lombok.AccessLevel; +import lombok.Getter; import lombok.RequiredArgsConstructor; // @formatter:off @@ -79,8 +93,148 @@ public CompletableFuture await() { * @param topicProperty the topic configuration * @return a configured thread pool executor */ - protected static AmazonSnsThreadPoolExecutor getAmazonSnsThreadPoolExecutor(final TopicProperty topicProperty) { - return topicProperty.isFifo() ? new AmazonSnsThreadPoolExecutor(1) : new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize()); + protected static ExecutorService getExecutorService(final TopicProperty topicProperty, final MeterRegistry meterRegistry) { + return topicProperty.isFifo() + ? new ExecutorServiceMetricsDecorator( + new AmazonSnsThreadPoolExecutor(1), + meterRegistry, + topicProperty.getTopicArn() + ) + : new ExecutorServiceMetricsDecorator( + new AmazonSnsThreadPoolExecutor(topicProperty.getMaximumPoolSize()), + meterRegistry, + topicProperty.getTopicArn() + ); + } + + @Getter + public static final class Builder> { + + /** + * The Amazon SNS client used for publishing. + */ + private final C amazonSnsClient; + + /** + * The topic configuration properties. + */ + private final TopicProperty topicProperty; + + /** + * Map of pending requests tracked by request ID for asynchronous completion. + */ + private ConcurrentMap> pendingRequests = new ConcurrentHashMap<>(); + + /** + * The blocking queue for buffering topic requests before batching. + */ + private BlockingQueue> topicRequests; + + /** + * The Jackson ObjectMapper for serializing payloads. + */ + private ObjectMapper objectMapper = new ObjectMapper(); + + /** + * Decorator function applied to the publish batch request before sending. + */ + private UnaryOperator publishDecorator = UnaryOperator.identity(); + + /** + * The Micrometer meter registry for collecting metrics. + */ + private MeterRegistry meterRegistry = new SimpleMeterRegistry(); + + /** + * Internal constructor reference used to create the template instance. + */ + private final Function, T> constructor; + + /** + * Creates a new builder with the required constructor reference, client, and topic. + * + * @param constructor the constructor function for creating the template instance + * @param amazonSnsClient the Amazon SNS client + * @param topicProperty the topic configuration properties + */ + Builder(final Function, T> constructor, final C amazonSnsClient, final TopicProperty topicProperty) { + this.amazonSnsClient = Objects.requireNonNull(amazonSnsClient, "amazonSnsClient"); + this.topicProperty = Objects.requireNonNull(topicProperty, "topicProperty"); + this.constructor = Objects.requireNonNull(constructor, "constructor"); + } + + /** + * Sets the map of pending requests. + * + * @param pendingRequests the concurrent map keyed by request ID + * @return this builder + */ + public Builder pendingRequests(final ConcurrentMap> pendingRequests) { + this.pendingRequests = Objects.requireNonNull(pendingRequests, "pendingRequests"); + return this; + } + + /** + * Sets the blocking queue for topic requests. + * + * @param topicRequests the blocking queue for topic requests + * @return this builder + */ + public Builder topicRequests(final BlockingQueue> topicRequests) { + this.topicRequests = Objects.requireNonNull(topicRequests, "topicRequests"); + return this; + } + + /** + * Sets the Jackson ObjectMapper for serializing payloads. + * + * @param objectMapper the Jackson ObjectMapper + * @return this builder + */ + public Builder objectMapper(final ObjectMapper objectMapper) { + this.objectMapper = Objects.requireNonNull(objectMapper, "objectMapper"); + return this; + } + + /** + * Sets the decorator for the publish batch request. + * + * @param publishDecorator the unary operator to apply before publishing + * @return this builder + */ + public Builder publishDecorator(final UnaryOperator publishDecorator) { + this.publishDecorator = Objects.requireNonNull(publishDecorator, "publishDecorator"); + return this; + } + + /** + * Sets the Micrometer meter registry. + * + * @param meterRegistry the meter registry for metrics + * @return this builder + */ + public Builder meterRegistry(final MeterRegistry meterRegistry) { + this.meterRegistry = Objects.requireNonNull(meterRegistry, "meterRegistry"); + return this; + } + + /** + * Builds the template instance. If no topic requests queue was provided, a default + * {@link RingBufferBlockingQueue} is created. The queue is then decorated with + * {@link BlockingQueueMetricsDecorator}. + * + * @return the constructed template instance + */ + public T build() { + if (Objects.isNull(topicRequests)) { + topicRequests = new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()); + } + + topicRequests = new BlockingQueueMetricsDecorator<>(topicRequests, meterRegistry, topicProperty.getTopicArn()); + + return constructor.apply(this); + } + } } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java index 00a6d5b..90368ab 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractMessageAttributes.java @@ -33,12 +33,24 @@ @SuppressWarnings("java:S6204") abstract class AbstractMessageAttributes { + /** + * Data type constant for binary message attributes. + */ protected static final String BINARY = "Binary"; + /** + * Data type constant for string message attributes. + */ protected static final String STRING = "String"; + /** + * Data type constant for number message attributes. + */ protected static final String NUMBER = "Number"; + /** + * Data type constant for string array message attributes. + */ protected static final String STRING_ARRAY = "String.Array"; /** diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java index c4bf16d..ec3dc90 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/RequestEntryInternalFactory.java @@ -107,6 +107,8 @@ public Integer messageAttributesSize(final RequestEntry requestEntry) { /** * Internal representation of a batched request entry with a serialized payload. + * + * @param the payload type (unused here, payload is serialized to bytes) */ @Getter @ToString @@ -153,28 +155,47 @@ public String getMessage() { @NoArgsConstructor(access = AccessLevel.PRIVATE) static class MessageAttributesInternal extends AbstractMessageAttributes { + /** + * Singleton instance of the internal message attributes calculator. + */ + public static final MessageAttributesInternal INSTANCE = new MessageAttributesInternal(); + /** + * {@inheritDoc} + */ @Override public Integer getEnumMessageAttribute(final Enum value) { return value.name().length(); } + /** + * {@inheritDoc} + */ @Override public Integer getStringMessageAttribute(final String value) { return value.length(); } + /** + * {@inheritDoc} + */ @Override public Integer getNumberMessageAttribute(final Number value) { return value.toString().length(); } + /** + * {@inheritDoc} + */ @Override public Integer getBinaryMessageAttribute(final ByteBuffer value) { return value.remaining(); } + /** + * {@inheritDoc} + */ @Override public Integer getStringArrayMessageAttribute(final List values) { return stringArray(values).length(); diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java new file mode 100644 index 0000000..ab1a9a6 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecorator.java @@ -0,0 +1,335 @@ +package com.amazon.sns.messaging.lib.metrics; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; + +// @formatter:off +/** + * A decorator around {@link BlockingQueue} that collects Micrometer metrics for + * put/take operations, including counters, latency histograms, and queue size gauges. + * + * @param the type of elements held in the queue + */ +public class BlockingQueueMetricsDecorator implements BlockingQueue { + + private static final String TAG_QUEUE = "blocking.queue"; + + private static final String METRIC_PUTS_TOTAL = TAG_QUEUE.concat(".puts.total"); + + private static final String METRIC_PUTS_FAILED = TAG_QUEUE.concat(".puts.failed"); + + private static final String METRIC_PUT_DURATION = TAG_QUEUE.concat(".put.duration"); + + private static final String METRIC_TAKES_TOTAL = TAG_QUEUE.concat(".takes.total"); + + private static final String METRIC_TAKES_FAILED = TAG_QUEUE.concat(".takes.failed"); + + private static final String METRIC_TAKE_DURATION = TAG_QUEUE.concat(".take.duration"); + + private static final String METRIC_SIZE = TAG_QUEUE.concat(".size"); + + private final BlockingQueue delegate; + + private final Counter putsTotal; + + private final Counter putsFailed; + + private final Timer putDuration; + + private final Counter takesTotal; + + private final Counter takesFailed; + + private final Timer takeDuration; + + /** + * Creates a new metrics decorator for the given blocking queue. + * + * @param delegate the blocking queue to decorate + * @param registry the Micrometer meter registry + * @param queueName the name tag for the metrics + */ + public BlockingQueueMetricsDecorator( + final BlockingQueue delegate, + final MeterRegistry registry, + final String queueName) { + + this.delegate = delegate; + + final CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry(); + + Optional.ofNullable(registry).ifPresent(compositeMeterRegistry::add); + + final List tags = new LinkedList<>(Collections.singleton(Tag.of("name", queueName))); + + putsTotal = Counter.builder(METRIC_PUTS_TOTAL) + .description("Total number of successful put operations") + .tags(tags) + .register(compositeMeterRegistry); + + putsFailed = Counter.builder(METRIC_PUTS_FAILED) + .description("Total number of put operations that threw an exception") + .tags(tags) + .register(compositeMeterRegistry); + + takesTotal = Counter.builder(METRIC_TAKES_TOTAL) + .description("Total number of successful take operations") + .tags(tags) + .register(compositeMeterRegistry); + + takesFailed = Counter.builder(METRIC_TAKES_FAILED) + .description("Total number of take operations that threw an exception") + .tags(tags) + .register(compositeMeterRegistry); + + putDuration = Timer.builder(METRIC_PUT_DURATION) + .description("Latency of put operations (including wait time when queue is full)") + .tags(tags) + .publishPercentileHistogram() + .register(compositeMeterRegistry); + + takeDuration = Timer.builder(METRIC_TAKE_DURATION) + .description("Latency of take operations (including wait time when queue is empty)") + .tags(tags) + .publishPercentileHistogram() + .register(compositeMeterRegistry); + + Gauge.builder(METRIC_SIZE, this.delegate, BlockingQueue::size) + .description("Current number of elements in the queue") + .tags(tags) + .register(compositeMeterRegistry); + } + + /** + * {@inheritDoc} + */ + @Override + public void put(final E element) throws InterruptedException { + final Timer.Sample sample = Timer.start(); + try { + delegate.put(element); + putsTotal.increment(); + } catch (final Exception ex) { + putsFailed.increment(); + throw ex; + } finally { + sample.stop(putDuration); + } + } + + /** + * {@inheritDoc} + */ + @Override + public E take() throws InterruptedException { + final Timer.Sample sample = Timer.start(); + try { + final E value = delegate.take(); + takesTotal.increment(); + return value; + } catch (final Exception ex) { + takesFailed.increment(); + throw ex; + } finally { + sample.stop(takeDuration); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int size() { + return delegate.size(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + /** + * {@inheritDoc} + */ + @Override + public E peek() { + return delegate.peek(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean offer(final E element) { + return delegate.offer(element); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean offer(final E element, final long timeout, final TimeUnit unit) throws InterruptedException { + return delegate.offer(element, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public E poll() { + return delegate.poll(); + } + + /** + * {@inheritDoc} + */ + @Override + public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { + return delegate.poll(timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean add(final E element) { + return delegate.add(element); + } + + /** + * {@inheritDoc} + */ + @Override + public int remainingCapacity() { + return delegate.remainingCapacity(); + } + + /** + * {@inheritDoc} + */ + @Override + public int drainTo(final Collection collection) { + return delegate.drainTo(collection); + } + + /** + * {@inheritDoc} + */ + @Override + public int drainTo(final Collection collection, final int maxElements) { + return delegate.drainTo(collection, maxElements); + } + + /** + * {@inheritDoc} + */ + @Override + public E remove() { + return delegate.remove(); + } + + /** + * {@inheritDoc} + */ + @Override + public E element() { + return delegate.element(); + } + + /** + * {@inheritDoc} + */ + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + /** + * {@inheritDoc} + */ + @Override + public T[] toArray(final T[] a) { + return delegate.toArray(a); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean containsAll(final Collection c) { + return delegate.containsAll(c); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean addAll(final Collection c) { + return delegate.addAll(c); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean removeAll(final Collection c) { + return delegate.removeAll(c); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean retainAll(final Collection c) { + return delegate.retainAll(c); + } + + /** + * {@inheritDoc} + */ + @Override + public void clear() { + delegate.clear(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean remove(final Object o) { + return delegate.remove(o); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean contains(final Object o) { + return delegate.contains(o); + } + +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java new file mode 100644 index 0000000..83849b5 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecorator.java @@ -0,0 +1,217 @@ +package com.amazon.sns.messaging.lib.metrics; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; + +// @formatter:off +/** + * A decorator around {@link ExecutorService} that collects Micrometer metrics for + * task execution, including active task count, succeeded/failed counters, and + * task duration histograms. + */ +public class ExecutorServiceMetricsDecorator implements ExecutorService { + + private static final String TAG_EXECUTOR = "executor"; + + private static final String METRIC_ACTIVE = TAG_EXECUTOR.concat(".active"); + + private static final String METRIC_TASKS_SUCCEEDED = TAG_EXECUTOR.concat(".tasks.succeeded"); + + private static final String METRIC_TASKS_FAILED = TAG_EXECUTOR.concat(".tasks.failed"); + + private static final String METRIC_TASK_DURATION = TAG_EXECUTOR.concat(".task.duration"); + + private final ExecutorService delegate; + + private final AtomicInteger activeTaskCount = new AtomicInteger(); + + private final Counter succeededCounter; + + private final Counter failedCounter; + + private final Timer taskTimer; + + /** + * Creates a new metrics decorator for the given executor service. + * + * @param delegate the executor service to decorate + * @param registry the Micrometer meter registry + * @param executorName the name tag for the metrics + */ + public ExecutorServiceMetricsDecorator( + final ExecutorService delegate, + final MeterRegistry registry, + final String executorName) { + + this.delegate = delegate; + + final CompositeMeterRegistry compositeMeterRegistry = new CompositeMeterRegistry(); + + Optional.ofNullable(registry).ifPresent(compositeMeterRegistry::add); + + final List tags = new LinkedList<>(Collections.singleton(Tag.of("name", executorName))); + + Gauge.builder(METRIC_ACTIVE, activeTaskCount, AtomicInteger::get) + .tags(tags) + .description("Number of tasks currently being executed by pool threads") + .register(compositeMeterRegistry); + + succeededCounter = Counter.builder(METRIC_TASKS_SUCCEEDED) + .tags(tags) + .description("Total number of tasks that completed without throwing an exception") + .register(compositeMeterRegistry); + + failedCounter = Counter.builder(METRIC_TASKS_FAILED) + .tags(tags) + .description("Total number of tasks that completed by throwing an exception") + .register(compositeMeterRegistry); + + taskTimer = Timer.builder(METRIC_TASK_DURATION) + .tags(tags) + .description("Wall-clock duration of each task execution, from beforeExecute to afterExecute") + .register(compositeMeterRegistry); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(final Runnable task) { + return delegate.submit(task); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(final Runnable task, final T result) { + return delegate.submit(task, result); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(final Callable task) { + return delegate.submit(task); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(final Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(final Collection> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(final Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(final Collection> tasks, final long timeout, final TimeUnit unit) throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + delegate.shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public List shutdownNow() { + return delegate.shutdownNow(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + /** + * Delegates execution to the decorated executor, wrapping the command with + * metrics tracking for active count, success/failure counters, and duration. + * + * @param command the task to execute + */ + @Override + public void execute(final Runnable command) { + delegate.execute(() -> { + activeTaskCount.incrementAndGet(); + + final Timer.Sample sample = Timer.start(); + + try { + command.run(); + + succeededCounter.increment(); + } + catch (final Exception ex) { + failedCounter.increment(); + throw ex; + } + finally { + sample.stop(taskTimer); + activeTaskCount.decrementAndGet(); + } + }); + } + +} diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java index c33a658..d186ab9 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/model/PublishRequestBuilder.java @@ -33,6 +33,13 @@ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class PublishRequestBuilder { + /** + * Creates a new builder instance. + * + * @param the publish request type + * @param the entry type + * @return a new builder + */ public static Builder builder() { return new Builder<>(); } diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java index 7c56e39..3eb0ab3 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/AmazonSnsThreadPoolExecutorTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2023 the original author or authors. + * Copyright 2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,110 +16,136 @@ package com.amazon.sns.messaging.lib.concurrent; -import static org.assertj.core.api.Assertions.catchThrowableOfType; -import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; -// @formatter:off class AmazonSnsThreadPoolExecutorTest { @Test - void testSuccessCounters() { - final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = new AmazonSnsThreadPoolExecutor(10); - - assertThat(amazonSnsThreadPoolExecutor.getActiveTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getFailedTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getCorePoolSize(), is(0)); + void testConstructorCreatesInstance() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor, is(notNullValue())); + executor.shutdownNow(); } @Test - void testSuccessSucceededTaskCount() throws InterruptedException { - final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = new AmazonSnsThreadPoolExecutor(10); - - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); - - for(int i = 0; i < 300; i++) { - amazonSnsThreadPoolExecutor.execute(() -> { - await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); - }); - } - - amazonSnsThreadPoolExecutor.shutdown(); - - if (!amazonSnsThreadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - amazonSnsThreadPoolExecutor.shutdownNow(); - } - - assertThat(amazonSnsThreadPoolExecutor.getActiveTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(300)); - assertThat(amazonSnsThreadPoolExecutor.getFailedTaskCount(), is(0)); + void testExtendsThreadPoolExecutor() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor, is(instanceOf(ThreadPoolExecutor.class))); + executor.shutdownNow(); } @Test - void testSuccessFailedTaskCount() throws InterruptedException { - final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = new AmazonSnsThreadPoolExecutor(10); - - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); + void testCorePoolSizeIsZero() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getCorePoolSize(), is(equalTo(0))); + executor.shutdownNow(); + } - for(int i = 0; i < 300; i++) { - amazonSnsThreadPoolExecutor.execute(() -> { throw new RuntimeException(); }); - } + @Test + void testMaximumPoolSizeMatchesConstructorArgument() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(8); + assertThat(executor.getMaximumPoolSize(), is(equalTo(8))); + executor.shutdownNow(); + } - amazonSnsThreadPoolExecutor.shutdown(); + @Test + void testMaximumPoolSizeOfOne() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(1); + assertThat(executor.getMaximumPoolSize(), is(equalTo(1))); + executor.shutdownNow(); + } - if (!amazonSnsThreadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - amazonSnsThreadPoolExecutor.shutdownNow(); - } + @Test + void testKeepAliveTimeIsSetTo60Seconds() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getKeepAliveTime(TimeUnit.SECONDS), is(equalTo(60L))); + executor.shutdownNow(); + } - assertThat(amazonSnsThreadPoolExecutor.getActiveTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getFailedTaskCount(), is(300)); + @Test + void testQueueIsSynchronousQueue() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getQueue(), is(instanceOf(SynchronousQueue.class))); + executor.shutdownNow(); } @Test - void testSuccessActiveTaskCount() throws InterruptedException { - final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = new AmazonSnsThreadPoolExecutor(10); + void testRejectedExecutionHandlerIsBlockingSubmissionPolicy() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getRejectedExecutionHandler(), is(instanceOf(BlockingSubmissionPolicy.class))); + executor.shutdownNow(); + } - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); + @Test + void testThreadFactoryIsNotNull() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getThreadFactory(), is(notNullValue())); + executor.shutdownNow(); + } - for(int i = 0; i < 10; i++) { - amazonSnsThreadPoolExecutor.execute(() -> { - while(true) { - await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); - } - }); - } + @Test + void testIsNotShutdownAfterCreation() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.isShutdown(), is(false)); + executor.shutdownNow(); + } - amazonSnsThreadPoolExecutor.shutdown(); + @Test + void testIsShutdownAfterShutdownNow() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + executor.shutdownNow(); + assertThat(executor.isShutdown(), is(true)); + } - if (!amazonSnsThreadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - amazonSnsThreadPoolExecutor.shutdownNow(); - } + @Test + void testActiveCountIsZeroAfterCreation() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getActiveCount(), is(equalTo(0))); + executor.shutdownNow(); + } - assertThat(amazonSnsThreadPoolExecutor.getActiveTaskCount(), is(10)); - assertThat(amazonSnsThreadPoolExecutor.getSucceededTaskCount(), is(0)); - assertThat(amazonSnsThreadPoolExecutor.getFailedTaskCount(), is(0)); + @Test + void testTaskCountIsZeroAfterCreation() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getTaskCount(), is(equalTo(0L))); + executor.shutdownNow(); } @Test - void testSuccessBlockingSubmissionPolicy() { - final AmazonSnsThreadPoolExecutor amazonSnsThreadPoolExecutor = new AmazonSnsThreadPoolExecutor(1); + void testCompletedTaskCountIsZeroAfterCreation() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getCompletedTaskCount(), is(equalTo(0L))); + executor.shutdownNow(); + } - amazonSnsThreadPoolExecutor.execute(() -> { - while(true) { - await().pollDelay(1, TimeUnit.MILLISECONDS).until(() -> true); - } - }); + @Test + void testQueueIsEmptyAfterCreation() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getQueue().isEmpty(), is(true)); + executor.shutdownNow(); + } - catchThrowableOfType(() -> amazonSnsThreadPoolExecutor.execute(() -> { }), RejectedExecutionException.class); + @Test + void testPoolSizeIsZeroBeforeAnyTask() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getPoolSize(), is(equalTo(0))); + executor.shutdownNow(); } -} -// @formatter:on + @Test + void testLargestPoolSizeIsZeroBeforeAnyTask() { + final AmazonSnsThreadPoolExecutor executor = new AmazonSnsThreadPoolExecutor(4); + assertThat(executor.getLargestPoolSize(), is(equalTo(0))); + executor.shutdownNow(); + } +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java new file mode 100644 index 0000000..bdfe763 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/BlockingSubmissionPolicyTest.java @@ -0,0 +1,168 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.concurrent; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mock.Strictness; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class BlockingSubmissionPolicyTest { + + @Mock(strictness = Strictness.LENIENT) + private ThreadPoolExecutor executorMock; + + @Mock(strictness = Strictness.LENIENT) + private BlockingQueue queueMock; + + private BlockingSubmissionPolicy policy; + + @BeforeEach + void setUp() { + policy = new BlockingSubmissionPolicy(30000); + when(executorMock.getQueue()).thenReturn(queueMock); + } + + @Test + void testConstructorCreatesInstance() { + assertThat(new BlockingSubmissionPolicy(1000), is(notNullValue())); + } + + @Test + void testImplementsRejectedExecutionHandler() { + assertThat(policy, is(instanceOf(RejectedExecutionHandler.class))); + } + + @Test + void testRejectedExecutionOffersRunnableToQueue() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + policy.rejectedExecution(task, executorMock); + + verify(queueMock).offer(eq(task), eq(30000L), eq(TimeUnit.MILLISECONDS)); + } + + @Test + void testRejectedExecutionUsesConfiguredTimeout() throws InterruptedException { + final long customTimeout = 5000L; + final BlockingSubmissionPolicy customPolicy = new BlockingSubmissionPolicy(customTimeout); + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + customPolicy.rejectedExecution(task, executorMock); + + verify(queueMock).offer(eq(task), eq(customTimeout), eq(TimeUnit.MILLISECONDS)); + } + + @Test + void testRejectedExecutionUsesMillisecondsTimeUnit() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + policy.rejectedExecution(task, executorMock); + + verify(queueMock).offer(any(), anyLong(), eq(TimeUnit.MILLISECONDS)); + } + + @Test + void testRejectedExecutionSucceedsWhenQueueAcceptsTask() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + policy.rejectedExecution(task, executorMock); + + verify(executorMock).getQueue(); + } + + @Test + void testRejectedExecutionThrowsRejectedExecutionExceptionWhenQueueReturnsFalse() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(false); + + assertThrows(RejectedExecutionException.class, () -> policy.rejectedExecution(task, executorMock)); + } + + @Test + void testRejectedExecutionExceptionMessageIsTimeout() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(false); + + final RejectedExecutionException thrown = assertThrows(RejectedExecutionException.class, () -> policy.rejectedExecution(task, executorMock)); + + assertThat(thrown.getMessage(), is("Timeout")); + } + + @Test + void testRejectedExecutionPropagatesInterruptedException() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> policy.rejectedExecution(task, executorMock)); + } + + @Test + void testRejectedExecutionRetrievesQueueFromExecutor() throws InterruptedException { + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + policy.rejectedExecution(task, executorMock); + + verify(executorMock).getQueue(); + } + + @Test + void testRejectedExecutionWithZeroTimeoutThrowsWhenQueueFull() throws InterruptedException { + final BlockingSubmissionPolicy zeroTimeoutPolicy = new BlockingSubmissionPolicy(0); + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(false); + + assertThrows(RejectedExecutionException.class, () -> zeroTimeoutPolicy.rejectedExecution(task, executorMock)); + } + + @Test + void testRejectedExecutionWithZeroTimeoutSucceedsWhenQueueAccepts() throws InterruptedException { + final BlockingSubmissionPolicy zeroTimeoutPolicy = new BlockingSubmissionPolicy(0); + final Runnable task = mock(Runnable.class); + when(queueMock.offer(any(), anyLong(), any())).thenReturn(true); + + zeroTimeoutPolicy.rejectedExecution(task, executorMock); + + verify(queueMock).offer(eq(task), eq(0L), eq(TimeUnit.MILLISECONDS)); + } +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index c686473..0a0764c 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.spy; import java.util.Collections; import java.util.LinkedList; @@ -38,164 +39,162 @@ @SuppressWarnings({ "java:S2925", "java:S5778" }) class RingBufferBlockingQueueTest { - + @Test void testSuccess() { - final ExecutorService producer = ExecutorsProvider.getExecutorService(); - + final ExecutorService producer = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory()); - + final List> requestEntriesOut = new LinkedList<>(); - - final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); - + + final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(5120); + producer.submit(() -> { IntStream.range(0, 100_000).forEach(value -> { ringBlockingQueue.put(RequestEntry.builder().withValue(value).build()); }); }); - + consumer.scheduleAtFixedRate(() -> { while (!ringBlockingQueue.isEmpty()) { final List> requestEntries = new LinkedList<>(); - + while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) { - final RequestEntry take = ringBlockingQueue.take(); - requestEntries.add(take); + requestEntries.add(ringBlockingQueue.take()); } - + requestEntriesOut.addAll(requestEntries); } }, 0, 100L, TimeUnit.MILLISECONDS); - + await().pollInterval(5, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).until(() -> { return (ringBlockingQueue.writeSequence() == 99_999) && (ringBlockingQueue.readSequence() == 100_000); }); - - producer.shutdown(); - consumer.shutdown(); - - assertThat(ringBlockingQueue.size(), is(0)); + + producer.shutdownNow(); + consumer.shutdownNow(); + assertThat(ringBlockingQueue.isEmpty(), is(true)); - + assertThat(requestEntriesOut, hasSize(100_000)); requestEntriesOut.sort((a, b) -> a.getValue() - b.getValue()); - + for (int i = 0; i < 100_000; i++) { assertThat(requestEntriesOut.get(i).getValue(), is(i)); } } - + @Test void testSuccessWhenIsEmpty() { - final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); - - final ExecutorService producer = ExecutorsProvider.getExecutorService(); - - final ExecutorService consumer = ExecutorsProvider.getExecutorService(); - + final RingBufferBlockingQueue> ringBlockingQueue = spy(new RingBufferBlockingQueue<>()); + + final ExecutorService producer = Executors.newSingleThreadExecutor(); + + final ExecutorService consumer = Executors.newSingleThreadExecutor(); + consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - - await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); - + + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); + producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); producer.shutdownNow(); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); consumer.shutdownNow(); - + assertThat(ringBlockingQueue.isEmpty(), is(true)); } - + @Test void testSuccessWhenIsFull() { - final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(1); - - final ExecutorService producer = ExecutorsProvider.getExecutorService(); - - final ExecutorService consumer = ExecutorsProvider.getExecutorService(); - + final RingBufferBlockingQueue> ringBlockingQueue = spy(new RingBufferBlockingQueue<>(1)); + + final ExecutorService producer = Executors.newSingleThreadExecutor(); + + final ExecutorService consumer = Executors.newSingleThreadExecutor(); + producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - - await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); - + + await().pollDelay(2000, TimeUnit.MILLISECONDS).until(() -> true); + consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); producer.shutdownNow(); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); consumer.shutdownNow(); - + assertThat(ringBlockingQueue.isEmpty(), is(true)); } - + @Test void testFailOffer() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.builder().withValue(0).build())); } - + @Test void testFailOfferWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.builder().withValue(0).build(), 1, TimeUnit.MILLISECONDS)); } - + @Test void testFailPoll() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::poll); } - + @Test void testFailPollWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.poll(1, TimeUnit.MILLISECONDS)); } - + @Test void testFailIterator() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::iterator); } - + @Test void testFailAdd() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.add(RequestEntry.builder().withValue(0).build())); } - + @Test void testFailRemainingCapacity() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::remainingCapacity); } - + @Test void testFailDrainTo() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList())); } - + @Test void testFailDrainToWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList(), 1)); } - + } diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java index 5ec7c25..8d1c7ee 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducerTest.java @@ -20,27 +20,20 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,9 +51,6 @@ class AbstractAmazonSnsProducerTest { @Mock private BlockingQueue> topicRequests; - @Mock - private ExecutorService executorService; - private ConcurrentMap> pendingRequests; private AbstractAmazonSnsProducer producer; @@ -68,7 +58,37 @@ class AbstractAmazonSnsProducerTest { @BeforeEach void setUp() { pendingRequests = new ConcurrentHashMap<>(); - producer = new AbstractAmazonSnsProducer(pendingRequests, topicRequests, executorService) { }; + producer = new AbstractAmazonSnsProducer(pendingRequests, topicRequests) { }; + } + + @AfterEach + void tearDown() { + if (Objects.nonNull(producer)) { + producer.shutdown(); + } + } + + @Test + void testSendReturnsShutdownState() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + + final RequestEntry entry = requestEntry(); + + producer.shutdown(); + + final ListenableFuture future = producer.send(entry); + + assertThat(future, is(notNullValue())); + + future.addCallback(null, fail -> { + assertThat(fail.getId(), is(entry.getId())); + assertThat(fail.getCode(), is("000")); + assertThat(fail.getMessage(), is("Producer is currently in SHUTDOWN mode; no further messages will be accepted.")); + assertThat(fail.getSenderFault(), is(true)); + countDownLatch.countDown(); + }); + + countDownLatch.await(1, TimeUnit.MINUTES); } @Test @@ -152,62 +172,6 @@ void testSendPropagatesInterruptedExceptionFromQueue() throws InterruptedExcepti assertThrows(InterruptedException.class, () -> producer.send(entry)); } - @Test - void testShutdownInvokesExecutorServiceShutdown() throws InterruptedException { - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); - - producer.shutdown(); - - verify(executorService).shutdown(); - } - - @Test - void testShutdownAwaitsTerminationWith60Seconds() throws InterruptedException { - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); - - producer.shutdown(); - - verify(executorService).awaitTermination(60L, TimeUnit.SECONDS); - } - - @Test - void testShutdownDoesNotCallShutdownNowWhenTerminatesInTime() throws InterruptedException { - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(true); - - producer.shutdown(); - - verify(executorService, never()).shutdownNow(); - } - - @Test - void testShutdownCallsShutdownNowWhenTerminationTimeoutExpires() throws InterruptedException { - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false); - doReturn(Collections.emptyList()).when(executorService).shutdownNow(); - - producer.shutdown(); - - verify(executorService).shutdownNow(); - } - - @Test - void testShutdownForcesShutdownWhenPendingTasksRemain() throws InterruptedException { - final List pendingTasks = Arrays.asList(mock(Runnable.class), mock(Runnable.class)); - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false); - doReturn(pendingTasks).when(executorService).shutdownNow(); - - producer.shutdown(); - - verify(executorService).shutdownNow(); - } - - @Test - void testShutdownCompletesGracefullyWhenNoTasksAreDropped() throws InterruptedException { - when(executorService.awaitTermination(anyLong(), any(TimeUnit.class))).thenReturn(false); - doReturn(Collections.emptyList()).when(executorService).shutdownNow(); - - assertDoesNotThrow(() -> producer.shutdown()); - } - private RequestEntry requestEntry() { return RequestEntry.builder().withId(UUID.randomUUID().toString()).withValue("payload-" + UUID.randomUUID()).build(); } diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java index 4dd1415..e758bd1 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsTemplateTest.java @@ -17,14 +17,22 @@ package com.amazon.sns.messaging.lib.core; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.UnaryOperator; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,119 +40,369 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.amazon.sns.messaging.lib.concurrent.AmazonSnsThreadPoolExecutor; +import com.amazon.sns.messaging.lib.metrics.BlockingQueueMetricsDecorator; +import com.amazon.sns.messaging.lib.metrics.ExecutorServiceMetricsDecorator; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +// @formatter:off @ExtendWith(MockitoExtension.class) +@SuppressWarnings({ "rawtypes", "unchecked"}) class AbstractAmazonSnsTemplateTest { @Mock - private AbstractAmazonSnsProducer amazonSnsProducer; - - @Mock - private AbstractAmazonSnsConsumer amazonSnsConsumer; - - @Mock - private RequestEntry entry; + private AbstractAmazonSnsProducer producerMock; @Mock - private ListenableFuture future; + private AbstractAmazonSnsConsumer consumerMock; private AbstractAmazonSnsTemplate template; @BeforeEach void setUp() { - template = new AbstractAmazonSnsTemplate(amazonSnsProducer, amazonSnsConsumer) { - }; + template = new AbstractAmazonSnsTemplate(producerMock, consumerMock) { }; } @Test - void testSendDelegatesRequestToProducer() { - when(amazonSnsProducer.send(entry)).thenReturn(future); + void testSendDelegatesToProducer() { + final RequestEntry requestEntry = RequestEntry.builder().build(); + final ListenableFuture expectedFuture = new ListenableFutureImpl(); + when(producerMock.send(requestEntry)).thenReturn(expectedFuture); - template.send(entry); + final ListenableFuture result = template.send(requestEntry); - verify(amazonSnsProducer).send(entry); + assertThat(result, is(equalTo(expectedFuture))); + verify(producerMock).send(requestEntry); } @Test - void testSendReturnsProducerFuture() { - when(amazonSnsProducer.send(entry)).thenReturn(future); + void testShutdownDelegatesToProducer() { + template.shutdown(); + verify(producerMock).shutdown(); + } - final ListenableFuture result = template.send(entry); + @Test + void testShutdownDelegatesToConsumer() { + template.shutdown(); + verify(consumerMock).shutdown(); + } + + @Test + void testAwaitDelegatesToConsumer() { + final CompletableFuture expectedFuture = CompletableFuture.completedFuture(null); + when(consumerMock.await()).thenReturn(expectedFuture); - assertThat(result, is(future)); + final CompletableFuture result = template.await(); + + assertThat(result, is(equalTo(expectedFuture))); + verify(consumerMock).await(); } @Test - void testShutdownDelegatesShutdownToProducer() { - template.shutdown(); + void testGetExecutorServiceReturnsSingleThreadPoolForFifoTopic() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(true) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic.fifo") + .maximumPoolSize(10) + .maxBatchSize(10) + .build(); + + final ExecutorService executorService = AbstractAmazonSnsTemplate.getExecutorService(topicProperty, new SimpleMeterRegistry()); + + assertThat(executorService, is(notNullValue())); + assertThat(executorService, is(instanceOf(ExecutorServiceMetricsDecorator.class))); + executorService.shutdownNow(); + } - verify(amazonSnsProducer).shutdown(); + @Test + void testGetExecutorServiceReturnsMultiThreadPoolForNonFifoTopic() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(5) + .maxBatchSize(10) + .build(); + + final ExecutorService executorService = AbstractAmazonSnsTemplate.getExecutorService(topicProperty, new SimpleMeterRegistry()); + + assertThat(executorService, is(notNullValue())); + assertThat(executorService, is(instanceOf(ExecutorServiceMetricsDecorator.class))); + executorService.shutdownNow(); } @Test - void testShutdownDelegatesShutdownToConsumer() { - template.shutdown(); + void testGetExecutorServiceWithNullMeterRegistryDoesNotThrow() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(4) + .maxBatchSize(10) + .build(); + + final ExecutorService executorService = AbstractAmazonSnsTemplate.getExecutorService(topicProperty, null); + + assertThat(executorService, is(notNullValue())); + executorService.shutdownNow(); + } + + @Test + void testBuilderThrowsWhenAmazonSnsClientIsNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + assertThrows(NullPointerException.class, () -> new AbstractAmazonSnsTemplate.Builder<>(builder -> null, null, topicProperty)); + } - verify(amazonSnsConsumer).shutdown(); + @Test + void testBuilderThrowsWhenTopicPropertyIsNull() { + assertThrows(NullPointerException.class, () -> new AbstractAmazonSnsTemplate.Builder<>(builder -> null, new Object(), null)); } @Test - void testAwaitDelegatesAwaitToConsumer() { - final CompletableFuture expected = new CompletableFuture<>(); - when(amazonSnsConsumer.await()).thenReturn(expected); + void testBuilderThrowsWhenConstructorIsNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + assertThrows(NullPointerException.class, () -> new AbstractAmazonSnsTemplate.Builder<>(null, new Object(), topicProperty)); + } - template.await(); + @Test + void testBuilderPendingRequestsThrowsWhenNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); - verify(amazonSnsConsumer).await(); + assertThrows(NullPointerException.class, () -> builder.pendingRequests(null)); } @Test - void testAwaitReturnsConsumerCompletableFuture() { - final CompletableFuture expected = new CompletableFuture<>(); - when(amazonSnsConsumer.await()).thenReturn(expected); + void testBuilderTopicRequestsThrowsWhenNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); - final CompletableFuture result = template.await(); + assertThrows(NullPointerException.class, () -> builder.topicRequests(null)); + } + + @Test + void testBuilderObjectMapperThrowsWhenNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + assertThrows(NullPointerException.class, () -> builder.objectMapper(null)); + } + + @Test + void testBuilderPublishDecoratorThrowsWhenNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + assertThrows(NullPointerException.class, () -> builder.publishDecorator(null)); + } + + @Test + void testBuilderMeterRegistryThrowsWhenNull() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + assertThrows(NullPointerException.class, () -> builder.meterRegistry(null)); + } + + @Test + void testBuilderStoresAmazonSnsClient() { + final Object client = new Object(); + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, client, topicProperty); + + assertThat(builder.getAmazonSnsClient(), is(equalTo(client))); + } + + @Test + void testBuilderStoresTopicProperty() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + assertThat(builder.getTopicProperty(), is(equalTo(topicProperty))); + } + + @Test + void testBuilderDefaultPendingRequestsIsConcurrentHashMap() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); - assertThat(result, is(expected)); + assertThat(builder.getPendingRequests(), is(instanceOf(ConcurrentHashMap.class))); } @Test - void testGetAmazonSnsThreadPoolExecutorReturnsSingleThreadForFifoTopic() { + void testBuilderDefaultObjectMapperIsNotNull() { final TopicProperty topicProperty = mock(TopicProperty.class); - when(topicProperty.isFifo()).thenReturn(true); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + assertThat(builder.getObjectMapper(), is(notNullValue())); + assertThat(builder.getObjectMapper(), is(instanceOf(ObjectMapper.class))); + } - final AmazonSnsThreadPoolExecutor executor = AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty); + @Test + void testBuilderDefaultMeterRegistryIsSimpleMeterRegistry() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); - assertThat(executor, is(notNullValue())); - assertThat(executor.getMaximumPoolSize(), is(1)); + assertThat(builder.getMeterRegistry(), is(instanceOf(SimpleMeterRegistry.class))); } @Test - void testGetAmazonSnsThreadPoolExecutorReturnsConfiguredPoolSizeForStandardTopic() { + void testBuilderPendingRequestsReturnsSelf() { final TopicProperty topicProperty = mock(TopicProperty.class); - when(topicProperty.isFifo()).thenReturn(false); - when(topicProperty.getMaximumPoolSize()).thenReturn(4); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + final ConcurrentMap> map = new ConcurrentHashMap<>(); - final AmazonSnsThreadPoolExecutor executor = AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty); + final AbstractAmazonSnsTemplate.Builder result = builder.pendingRequests(map); - assertThat(executor, is(notNullValue())); - assertThat(executor.getMaximumPoolSize(), is(4)); + assertThat(result, is(equalTo(builder))); } @Test - void testGetAmazonSnsThreadPoolExecutorReturnsCorrectType() { + void testBuilderTopicRequestsReturnsSelf() { final TopicProperty topicProperty = mock(TopicProperty.class); - when(topicProperty.isFifo()).thenReturn(false); - when(topicProperty.getMaximumPoolSize()).thenReturn(2); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + final BlockingQueue> queue = new LinkedBlockingDeque<>(); - final AmazonSnsThreadPoolExecutor executor = AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty); + final AbstractAmazonSnsTemplate.Builder result = builder.topicRequests(queue); - assertThat(executor, instanceOf(AmazonSnsThreadPoolExecutor.class)); + assertThat(result, is(equalTo(builder))); } -} \ No newline at end of file + @Test + void testBuilderObjectMapperReturnsSelf() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + final AbstractAmazonSnsTemplate.Builder result = builder.objectMapper(new ObjectMapper()); + + assertThat(result, is(equalTo(builder))); + } + + @Test + void testBuilderPublishDecoratorReturnsSelf() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + final AbstractAmazonSnsTemplate.Builder result = builder.publishDecorator(UnaryOperator.identity()); + + assertThat(result, is(equalTo(builder))); + } + + @Test + void testBuilderMeterRegistryReturnsSelf() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + + final AbstractAmazonSnsTemplate.Builder result = builder.meterRegistry(new SimpleMeterRegistry()); + + assertThat(result, is(equalTo(builder))); + } + + @Test + void testBuilderBuildWrapsTopicRequestsWithMetricsDecorator() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(4) + .maxBatchSize(10) + .build(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> { + assertThat(b.getTopicRequests(), is(instanceOf(BlockingQueueMetricsDecorator.class))); + return null; + }, new Object(), topicProperty); + + builder.build(); + } + + @Test + void testBuilderBuildCreatesDefaultTopicRequestsWhenNotProvided() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(4) + .maxBatchSize(10) + .build(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> { + assertThat(b.getTopicRequests(), is(notNullValue())); + return null; + }, new Object(), topicProperty); + + builder.build(); + } + + @Test + void testBuilderBuildUsesProvidedTopicRequests() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(4) + .maxBatchSize(10) + .build(); + + final BlockingQueue> customQueue = new LinkedBlockingDeque<>(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> { + assertThat(b.getTopicRequests(), is(instanceOf(BlockingQueueMetricsDecorator.class))); + return null; + }, new Object(), topicProperty); + + builder.topicRequests(customQueue).build(); + } + + @Test + void testBuilderBuildInvokesConstructorFunction() { + final TopicProperty topicProperty = TopicProperty.builder() + .fifo(false) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .maximumPoolSize(4) + .maxBatchSize(10) + .build(); + + final AbstractAmazonSnsTemplate sentinel = new AbstractAmazonSnsTemplate(producerMock, consumerMock) { }; + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> sentinel, new Object(), topicProperty); + + final Object result = builder.build(); + + assertThat(result, is(equalTo(sentinel))); + } + + @Test + void testBuilderSetsPendingRequests() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final ConcurrentMap> customMap = new ConcurrentHashMap<>(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + builder.pendingRequests(customMap); + + assertThat(builder.getPendingRequests(), is(equalTo(customMap))); + } + + @Test + void testBuilderSetsObjectMapper() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final ObjectMapper customMapper = new ObjectMapper(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + builder.objectMapper(customMapper); + + assertThat(builder.getObjectMapper(), is(equalTo(customMapper))); + } + + @Test + void testBuilderSetsMeterRegistry() { + final TopicProperty topicProperty = mock(TopicProperty.class); + final MeterRegistry customRegistry = new SimpleMeterRegistry(); + + final AbstractAmazonSnsTemplate.Builder builder = new AbstractAmazonSnsTemplate.Builder<>(b -> null, new Object(), topicProperty); + builder.meterRegistry(customRegistry); + + assertThat(builder.getMeterRegistry(), is(equalTo(customRegistry))); + } +} diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java index c05c379..aca744c 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java @@ -114,7 +114,7 @@ void testAddCallbackDefaultFailureCallbackIsNoOp() { final boolean[] called = { false }; final ResponseFailEntry entry = mock(ResponseFailEntry.class); - listenableFuture.addCallback(successCallback -> called[0] = true); + listenableFuture.addCallback(callback -> called[0] = true); listenableFuture.fail(entry); assertThat(called[0], is(false)); diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java index 83b80cb..832f2eb 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/helpers/TryConsumer.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.amazon.sns.messaging.lib.helpers; @FunctionalInterface diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java new file mode 100644 index 0000000..5a2bd88 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/BlockingQueueMetricsDecoratorTest.java @@ -0,0 +1,521 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.metrics; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +@ExtendWith(MockitoExtension.class) +class BlockingQueueMetricsDecoratorTest { + + @Mock + private BlockingQueue delegateMock; + + private MeterRegistry meterRegistry; + + private BlockingQueueMetricsDecorator decorator; + + @BeforeEach + void setUp() { + meterRegistry = new SimpleMeterRegistry(); + decorator = new BlockingQueueMetricsDecorator<>(delegateMock, meterRegistry, "test-queue"); + } + + @Test + void testConstructorWithNullRegistryDoesNotThrow() { + final BlockingQueueMetricsDecorator instance = new BlockingQueueMetricsDecorator<>(delegateMock, null, "test-queue"); + assertThat(instance, is(notNullValue())); + } + + @Test + void testConstructorWithValidRegistryCreatesInstance() { + assertThat(decorator, is(notNullValue())); + } + + @Test + void testPutDelegatesToDelegate() throws InterruptedException { + decorator.put("element"); + verify(delegateMock).put("element"); + } + + @Test + void testPutIncrementsPutsTotalOnSuccess() throws InterruptedException { + decorator.put("element"); + final double count = meterRegistry.counter("blocking.queue.puts.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testPutDoesNotIncrementPutsFailedOnSuccess() throws InterruptedException { + decorator.put("element"); + final double count = meterRegistry.counter("blocking.queue.puts.failed", "name", "test-queue").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testPutIncrementsPutsFailedOnException() throws InterruptedException { + doThrow(new InterruptedException()).when(delegateMock).put(any()); + assertThrows(InterruptedException.class, () -> decorator.put("element")); + final double count = meterRegistry.counter("blocking.queue.puts.failed", "name", "test-queue").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testPutDoesNotIncrementPutsTotalOnException() throws InterruptedException { + doThrow(new InterruptedException()).when(delegateMock).put(any()); + assertThrows(InterruptedException.class, () -> decorator.put("element")); + final double count = meterRegistry.counter("blocking.queue.puts.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testPutRethrowsInterruptedException() throws InterruptedException { + doThrow(new InterruptedException()).when(delegateMock).put(any()); + assertThrows(InterruptedException.class, () -> decorator.put("element")); + } + + @Test + void testPutRecordsDuration() throws InterruptedException { + decorator.put("element"); + final long count = meterRegistry.get("blocking.queue.put.duration").tag("name", "test-queue").timer().count(); + assertThat(count, is(equalTo(1L))); + } + + @Test + void testPutRecordsDurationEvenOnException() throws InterruptedException { + doThrow(new InterruptedException()).when(delegateMock).put(any()); + assertThrows(InterruptedException.class, () -> decorator.put("element")); + final long count = meterRegistry.get("blocking.queue.put.duration").tag("name", "test-queue").timer().count(); + assertThat(count, is(equalTo(1L))); + } + + @Test + void testPutMultipleSuccessesAccumulateCount() throws InterruptedException { + decorator.put("a"); + decorator.put("b"); + decorator.put("c"); + final double count = meterRegistry.counter("blocking.queue.puts.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(3.0))); + } + + @Test + void testTakeDelegatesToDelegate() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + decorator.take(); + verify(delegateMock).take(); + } + + @Test + void testTakeReturnsValueFromDelegate() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + final String result = decorator.take(); + assertThat(result, is(equalTo("element"))); + } + + @Test + void testTakeIncrementsTakesTotalOnSuccess() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + decorator.take(); + final double count = meterRegistry.counter("blocking.queue.takes.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testTakeDoesNotIncrementTakesFailedOnSuccess() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + decorator.take(); + final double count = meterRegistry.counter("blocking.queue.takes.failed", "name", "test-queue").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testTakeIncrementsTakesFailedOnException() throws InterruptedException { + when(delegateMock.take()).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.take()); + final double count = meterRegistry.counter("blocking.queue.takes.failed", "name", "test-queue").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testTakeDoesNotIncrementTakesTotalOnException() throws InterruptedException { + when(delegateMock.take()).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.take()); + final double count = meterRegistry.counter("blocking.queue.takes.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testTakeRethrowsInterruptedException() throws InterruptedException { + when(delegateMock.take()).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.take()); + } + + @Test + void testTakeRecordsDuration() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + decorator.take(); + final long count = meterRegistry.get("blocking.queue.take.duration").tag("name", "test-queue").timer().count(); + assertThat(count, is(equalTo(1L))); + } + + @Test + void testTakeRecordsDurationEvenOnException() throws InterruptedException { + when(delegateMock.take()).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.take()); + final long count = meterRegistry.get("blocking.queue.take.duration").tag("name", "test-queue").timer().count(); + assertThat(count, is(equalTo(1L))); + } + + @Test + void testTakeMultipleSuccessesAccumulateCount() throws InterruptedException { + when(delegateMock.take()).thenReturn("a", "b", "c"); + decorator.take(); + decorator.take(); + decorator.take(); + final double count = meterRegistry.counter("blocking.queue.takes.total", "name", "test-queue").count(); + assertThat(count, is(equalTo(3.0))); + } + + @Test + void testSizeDelegatesToDelegate() { + when(delegateMock.size()).thenReturn(5); + assertThat(decorator.size(), is(equalTo(5))); + verify(delegateMock).size(); + } + + @Test + void testIsEmptyReturnsTrueWhenDelegateReturnsTrue() { + when(delegateMock.isEmpty()).thenReturn(true); + assertThat(decorator.isEmpty(), is(true)); + } + + @Test + void testIsEmptyReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.isEmpty()).thenReturn(false); + assertThat(decorator.isEmpty(), is(false)); + } + + @Test + void testPeekDelegatesToDelegate() { + when(delegateMock.peek()).thenReturn("element"); + assertThat(decorator.peek(), is(equalTo("element"))); + verify(delegateMock).peek(); + } + + @Test + void testPeekReturnsNullWhenDelegateReturnsNull() { + when(delegateMock.peek()).thenReturn(null); + assertThat(decorator.peek(), is(equalTo(null))); + } + + @Test + void testOfferDelegatesToDelegate() { + when(delegateMock.offer("element")).thenReturn(true); + assertThat(decorator.offer("element"), is(true)); + verify(delegateMock).offer("element"); + } + + @Test + void testOfferReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.offer("element")).thenReturn(false); + assertThat(decorator.offer("element"), is(false)); + } + + @Test + void testOfferWithTimeoutDelegatesToDelegate() throws InterruptedException { + when(delegateMock.offer("element", 5L, TimeUnit.SECONDS)).thenReturn(true); + assertThat(decorator.offer("element", 5L, TimeUnit.SECONDS), is(true)); + verify(delegateMock).offer("element", 5L, TimeUnit.SECONDS); + } + + @Test + void testOfferWithTimeoutReturnsFalseWhenDelegateReturnsFalse() throws InterruptedException { + when(delegateMock.offer("element", 5L, TimeUnit.SECONDS)).thenReturn(false); + assertThat(decorator.offer("element", 5L, TimeUnit.SECONDS), is(false)); + } + + @Test + void testOfferWithTimeoutPropagatesInterruptedException() throws InterruptedException { + when(delegateMock.offer(any(), anyLong(), any())).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.offer("element", 5L, TimeUnit.SECONDS)); + } + + @Test + void testPollDelegatesToDelegate() { + when(delegateMock.poll()).thenReturn("element"); + assertThat(decorator.poll(), is(equalTo("element"))); + verify(delegateMock).poll(); + } + + @Test + void testPollReturnsNullWhenDelegateReturnsNull() { + when(delegateMock.poll()).thenReturn(null); + assertThat(decorator.poll(), is(equalTo(null))); + } + + @Test + void testPollWithTimeoutDelegatesToDelegate() throws InterruptedException { + when(delegateMock.poll(5L, TimeUnit.SECONDS)).thenReturn("element"); + assertThat(decorator.poll(5L, TimeUnit.SECONDS), is(equalTo("element"))); + verify(delegateMock).poll(5L, TimeUnit.SECONDS); + } + + @Test + void testPollWithTimeoutPropagatesInterruptedException() throws InterruptedException { + when(delegateMock.poll(anyLong(), any())).thenThrow(new InterruptedException()); + assertThrows(InterruptedException.class, () -> decorator.poll(5L, TimeUnit.SECONDS)); + } + + @Test + void testIteratorDelegatesToDelegate() { + final Iterator expectedIterator = mock(Iterator.class); + when(delegateMock.iterator()).thenReturn(expectedIterator); + assertThat(decorator.iterator(), is(equalTo(expectedIterator))); + verify(delegateMock).iterator(); + } + + @Test + void testAddDelegatesToDelegate() { + when(delegateMock.add("element")).thenReturn(true); + assertThat(decorator.add("element"), is(true)); + verify(delegateMock).add("element"); + } + + @Test + void testAddReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.add("element")).thenReturn(false); + assertThat(decorator.add("element"), is(false)); + } + + @Test + void testRemainingCapacityDelegatesToDelegate() { + when(delegateMock.remainingCapacity()).thenReturn(10); + assertThat(decorator.remainingCapacity(), is(equalTo(10))); + verify(delegateMock).remainingCapacity(); + } + + @Test + void testDrainToDelegatesToDelegate() { + final List collection = new java.util.ArrayList<>(); + when(delegateMock.drainTo(collection)).thenReturn(3); + assertThat(decorator.drainTo(collection), is(equalTo(3))); + verify(delegateMock).drainTo(collection); + } + + @Test + void testDrainToWithMaxElementsDelegatesToDelegate() { + final List collection = new java.util.ArrayList<>(); + when(delegateMock.drainTo(collection, 5)).thenReturn(5); + assertThat(decorator.drainTo(collection, 5), is(equalTo(5))); + verify(delegateMock).drainTo(collection, 5); + } + + @Test + void testRemoveDelegatesToDelegate() { + when(delegateMock.remove()).thenReturn("element"); + assertThat(decorator.remove(), is(equalTo("element"))); + verify(delegateMock).remove(); + } + + @Test + void testRemoveObjectDelegatesToDelegate() { + when(delegateMock.remove("element")).thenReturn(true); + assertThat(decorator.remove("element"), is(true)); + verify(delegateMock).remove("element"); + } + + @Test + void testRemoveObjectReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.remove("element")).thenReturn(false); + assertThat(decorator.remove("element"), is(false)); + } + + @Test + void testElementDelegatesToDelegate() { + when(delegateMock.element()).thenReturn("element"); + assertThat(decorator.element(), is(equalTo("element"))); + verify(delegateMock).element(); + } + + @Test + void testToArrayDelegatesToDelegate() { + final Object[] expected = new Object[] { "a", "b" }; + when(delegateMock.toArray()).thenReturn(expected); + assertThat(decorator.toArray(), is(equalTo(expected))); + verify(delegateMock).toArray(); + } + + @Test + void testToArrayWithTypeDelegatesToDelegate() { + final String[] input = new String[0]; + final String[] expected = new String[] { "a", "b" }; + when(delegateMock.toArray(input)).thenReturn(expected); + assertThat(decorator.toArray(input), is(equalTo(expected))); + verify(delegateMock).toArray(input); + } + + @Test + void testContainsAllDelegatesToDelegate() { + final Collection c = Arrays.asList("a", "b"); + when(delegateMock.containsAll(c)).thenReturn(true); + assertThat(decorator.containsAll(c), is(true)); + verify(delegateMock).containsAll(c); + } + + @Test + void testContainsAllReturnsFalseWhenDelegateReturnsFalse() { + final Collection c = Arrays.asList("a", "b"); + when(delegateMock.containsAll(c)).thenReturn(false); + assertThat(decorator.containsAll(c), is(false)); + } + + @Test + void testAddAllDelegatesToDelegate() { + final Collection c = Arrays.asList("a", "b"); + when(delegateMock.addAll(c)).thenReturn(true); + assertThat(decorator.addAll(c), is(true)); + verify(delegateMock).addAll(c); + } + + @Test + void testRemoveAllDelegatesToDelegate() { + final Collection c = Arrays.asList("a", "b"); + when(delegateMock.removeAll(c)).thenReturn(true); + assertThat(decorator.removeAll(c), is(true)); + verify(delegateMock).removeAll(c); + } + + @Test + void testRetainAllDelegatesToDelegate() { + final Collection c = Arrays.asList("a", "b"); + when(delegateMock.retainAll(c)).thenReturn(true); + assertThat(decorator.retainAll(c), is(true)); + verify(delegateMock).retainAll(c); + } + + @Test + void testClearDelegatesToDelegate() { + decorator.clear(); + verify(delegateMock).clear(); + } + + @Test + void testContainsDelegatesToDelegate() { + when(delegateMock.contains("element")).thenReturn(true); + assertThat(decorator.contains("element"), is(true)); + verify(delegateMock).contains("element"); + } + + @Test + void testContainsReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.contains("element")).thenReturn(false); + assertThat(decorator.contains("element"), is(false)); + } + + @Test + void testPutsTotalCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.puts.total").tag("name", "test-queue").counter(), is(notNullValue())); + } + + @Test + void testPutsFailedCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.puts.failed").tag("name", "test-queue").counter(), is(notNullValue())); + } + + @Test + void testPutDurationTimerIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.put.duration").tag("name", "test-queue").timer(), is(notNullValue())); + } + + @Test + void testTakesTotalCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.takes.total").tag("name", "test-queue").counter(), is(notNullValue())); + } + + @Test + void testTakesFailedCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.takes.failed").tag("name", "test-queue").counter(), is(notNullValue())); + } + + @Test + void testTakeDurationTimerIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.take.duration").tag("name", "test-queue").timer(), is(notNullValue())); + } + + @Test + void testSizeGaugeIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("blocking.queue.size").tag("name", "test-queue").gauge(), is(notNullValue())); + } + + @Test + void testSizeGaugeReflectsDelegateSize() { + when(delegateMock.size()).thenReturn(7); + final double gaugeValue = meterRegistry.get("blocking.queue.size").tag("name", "test-queue").gauge().value(); + assertThat(gaugeValue, is(equalTo(7.0))); + } + + @Test + void testPutAndTakeCountersAreIndependent() throws InterruptedException { + when(delegateMock.take()).thenReturn("element"); + decorator.put("element"); + decorator.take(); + + final double puts = meterRegistry.counter("blocking.queue.puts.total", "name", "test-queue").count(); + final double takes = meterRegistry.counter("blocking.queue.takes.total", "name", "test-queue").count(); + assertThat(puts, is(equalTo(1.0))); + assertThat(takes, is(equalTo(1.0))); + } + + @Test + void testPutsFailedAndTakesFailedAreIndependent() throws InterruptedException { + doThrow(new InterruptedException()).when(delegateMock).put(any()); + when(delegateMock.take()).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> decorator.put("element")); + assertThrows(InterruptedException.class, () -> decorator.take()); + + final double putsFailed = meterRegistry.counter("blocking.queue.puts.failed", "name", "test-queue").count(); + final double takesFailed = meterRegistry.counter("blocking.queue.takes.failed", "name", "test-queue").count(); + assertThat(putsFailed, is(equalTo(1.0))); + assertThat(takesFailed, is(equalTo(1.0))); + } +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java new file mode 100644 index 0000000..05e9135 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/metrics/ExecutorServiceMetricsDecoratorTest.java @@ -0,0 +1,498 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.metrics; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; + +@ExtendWith(MockitoExtension.class) +class ExecutorServiceMetricsDecoratorTest { + + private ExecutorServiceMetricsDecorator decorator; + + @Mock + private ExecutorService delegateMock; + + @Spy + private SimpleMeterRegistry meterRegistry; + + @BeforeEach + void setUp() { + decorator = new ExecutorServiceMetricsDecorator(delegateMock, meterRegistry, "test-executor"); + } + + @Test + void testConstructorWithNullRegistryDoesNotThrow() { + final ExecutorServiceMetricsDecorator instance = new ExecutorServiceMetricsDecorator(delegateMock, null, "test-executor"); + assertThat(instance, is(notNullValue())); + } + + @Test + void testConstructorWithValidRegistryCreatesInstance() { + assertThat(decorator, is(notNullValue())); + } + + @Test + void testSubmitRunnableDelegatesToDelegate() { + final Runnable task = mock(Runnable.class); + final Future expectedFuture = mock(Future.class); + when(delegateMock.submit(task)).thenReturn(expectedFuture); + + final Future result = decorator.submit(task); + + assertThat(result, is(equalTo(expectedFuture))); + verify(delegateMock).submit(task); + } + + @Test + void testSubmitRunnableWithResultDelegatesToDelegate() { + final Runnable task = mock(Runnable.class); + final String expectedResult = "result"; + final Future expectedFuture = mock(Future.class); + when(delegateMock.submit(task, expectedResult)).thenReturn(expectedFuture); + + final Future result = decorator.submit(task, expectedResult); + + assertThat(result, is(equalTo(expectedFuture))); + verify(delegateMock).submit(task, expectedResult); + } + + @Test + void testSubmitCallableDelegatesToDelegate() { + final Callable task = mock(Callable.class); + final Future expectedFuture = mock(Future.class); + when(delegateMock.submit(task)).thenReturn(expectedFuture); + + final Future result = decorator.submit(task); + + assertThat(result, is(equalTo(expectedFuture))); + verify(delegateMock).submit(task); + } + + @Test + void testInvokeAnyDelegatesToDelegate() throws InterruptedException, ExecutionException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAny(tasks)).thenReturn("done"); + + final String result = decorator.invokeAny(tasks); + + assertThat(result, is(equalTo("done"))); + verify(delegateMock).invokeAny(tasks); + } + + @Test + void testInvokeAnyWithTimeoutDelegatesToDelegate() throws InterruptedException, ExecutionException, TimeoutException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAny(tasks, 5L, TimeUnit.SECONDS)).thenReturn("done"); + + final String result = decorator.invokeAny(tasks, 5L, TimeUnit.SECONDS); + + assertThat(result, is(equalTo("done"))); + verify(delegateMock).invokeAny(tasks, 5L, TimeUnit.SECONDS); + } + + @Test + void testInvokeAllDelegatesToDelegate() throws InterruptedException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + final List> expectedFutures = Collections.singletonList(mock(Future.class)); + when(delegateMock.invokeAll(tasks)).thenReturn(expectedFutures); + + final List> result = decorator.invokeAll(tasks); + + assertThat(result, is(equalTo(expectedFutures))); + verify(delegateMock).invokeAll(tasks); + } + + @Test + void testInvokeAllWithTimeoutDelegatesToDelegate() throws InterruptedException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + final List> expectedFutures = Collections.singletonList(mock(Future.class)); + when(delegateMock.invokeAll(tasks, 5L, TimeUnit.SECONDS)).thenReturn(expectedFutures); + + final List> result = decorator.invokeAll(tasks, 5L, TimeUnit.SECONDS); + + assertThat(result, is(equalTo(expectedFutures))); + verify(delegateMock).invokeAll(tasks, 5L, TimeUnit.SECONDS); + } + + @Test + void testShutdownDelegatesToDelegate() { + decorator.shutdown(); + verify(delegateMock).shutdown(); + } + + @Test + void testShutdownNowDelegatesToDelegate() { + final List pending = Collections.singletonList(mock(Runnable.class)); + when(delegateMock.shutdownNow()).thenReturn(pending); + + final List result = decorator.shutdownNow(); + + assertThat(result, is(equalTo(pending))); + verify(delegateMock).shutdownNow(); + } + + @Test + void testIsShutdownReturnsTrueWhenDelegateReturnsTrue() { + when(delegateMock.isShutdown()).thenReturn(true); + assertThat(decorator.isShutdown(), is(true)); + } + + @Test + void testIsShutdownReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.isShutdown()).thenReturn(false); + assertThat(decorator.isShutdown(), is(false)); + } + + @Test + void testIsTerminatedReturnsTrueWhenDelegateReturnsTrue() { + when(delegateMock.isTerminated()).thenReturn(true); + assertThat(decorator.isTerminated(), is(true)); + } + + @Test + void testIsTerminatedReturnsFalseWhenDelegateReturnsFalse() { + when(delegateMock.isTerminated()).thenReturn(false); + assertThat(decorator.isTerminated(), is(false)); + } + + @Test + void testAwaitTerminationReturnsTrueWhenDelegateReturnsTrue() throws InterruptedException { + when(delegateMock.awaitTermination(10L, TimeUnit.SECONDS)).thenReturn(true); + assertThat(decorator.awaitTermination(10L, TimeUnit.SECONDS), is(true)); + } + + @Test + void testAwaitTerminationReturnsFalseWhenDelegateReturnsFalse() throws InterruptedException { + when(delegateMock.awaitTermination(10L, TimeUnit.SECONDS)).thenReturn(false); + assertThat(decorator.awaitTermination(10L, TimeUnit.SECONDS), is(false)); + } + + @Test + void testExecuteWrapsCommandAndDelegatesToDelegate() { + final Runnable command = mock(Runnable.class); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(command); + + verify(delegateMock).execute(captor.capture()); + assertThat(captor.getValue(), is(notNullValue())); + } + + @Test + void testExecuteWrappedRunnableRunsOriginalCommand() { + final Runnable command = mock(Runnable.class); + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(command); + verify(delegateMock).execute(captor.capture()); + + captor.getValue().run(); + + verify(command).run(); + } + + @Test + void testExecuteIncrementsSucceededCounterOnSuccess() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(() -> { + }); + verify(delegateMock).execute(captor.capture()); + captor.getValue().run(); + + final double count = meterRegistry.counter("executor.tasks.succeeded", "name", "test-executor").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testExecuteDoesNotIncrementFailedCounterOnSuccess() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(() -> { + }); + verify(delegateMock).execute(captor.capture()); + captor.getValue().run(); + + final double count = meterRegistry.counter("executor.tasks.failed", "name", "test-executor").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testExecuteIncrementsFailedCounterOnException() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + final Runnable failingCommand = () -> { + throw new RuntimeException("boom"); + }; + + decorator.execute(failingCommand); + verify(delegateMock).execute(captor.capture()); + + try { + captor.getValue().run(); + } catch (final RuntimeException ignored) { + } + + final double count = meterRegistry.counter("executor.tasks.failed", "name", "test-executor").count(); + assertThat(count, is(equalTo(1.0))); + } + + @Test + void testExecuteDoesNotIncrementSucceededCounterOnException() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + final Runnable failingCommand = () -> { + throw new RuntimeException("boom"); + }; + + decorator.execute(failingCommand); + verify(delegateMock).execute(captor.capture()); + + try { + captor.getValue().run(); + } catch (final RuntimeException ignored) { + } + + final double count = meterRegistry.counter("executor.tasks.succeeded", "name", "test-executor").count(); + assertThat(count, is(equalTo(0.0))); + } + + @Test + void testExecuteRethrowsExceptionFromCommand() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + final RuntimeException expectedException = new RuntimeException("test error"); + final Runnable failingCommand = () -> { + throw expectedException; + }; + + decorator.execute(failingCommand); + verify(delegateMock).execute(captor.capture()); + + RuntimeException thrown = null; + try { + captor.getValue().run(); + } catch (final RuntimeException ex) { + thrown = ex; + } + + assertThat(thrown, is(equalTo(expectedException))); + } + + @Test + void testExecuteDecrementsActiveTaskCountAfterSuccessfulRun() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(() -> { + }); + verify(delegateMock).execute(captor.capture()); + captor.getValue().run(); + + final double gaugeValue = meterRegistry.get("executor.active").tag("name", "test-executor").gauge().value(); + assertThat(gaugeValue, is(equalTo(0.0))); + } + + @Test + void testExecuteDecrementsActiveTaskCountAfterFailedRun() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + final Runnable failingCommand = () -> { + throw new RuntimeException("fail"); + }; + + decorator.execute(failingCommand); + verify(delegateMock).execute(captor.capture()); + + try { + captor.getValue().run(); + } catch (final RuntimeException ignored) { + } + + final double gaugeValue = meterRegistry.get("executor.active").tag("name", "test-executor").gauge().value(); + assertThat(gaugeValue, is(equalTo(0.0))); + } + + @Test + void testExecuteRecordsTaskDuration() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + + decorator.execute(() -> { + }); + verify(delegateMock).execute(captor.capture()); + captor.getValue().run(); + + final long timerCount = meterRegistry.get("executor.task.duration").tag("name", "test-executor").timer().count(); + assertThat(timerCount, is(equalTo(1L))); + } + + @Test + void testExecuteRecordsTaskDurationEvenOnException() { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + final Runnable failingCommand = () -> { + throw new RuntimeException("fail"); + }; + + decorator.execute(failingCommand); + verify(delegateMock).execute(captor.capture()); + + try { + captor.getValue().run(); + } catch (final RuntimeException ignored) { + } + + final long timerCount = meterRegistry.get("executor.task.duration").tag("name", "test-executor").timer().count(); + assertThat(timerCount, is(equalTo(1L))); + } + + @Test + void testExecuteMultipleSuccessesAccumulateCount() { + final int runs = 5; + for (int i = 0; i < runs; i++) { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + decorator.execute(() -> { + }); + verify(delegateMock, atLeast(1)).execute(captor.capture()); + captor.getValue().run(); + } + + final double count = meterRegistry.counter("executor.tasks.succeeded", "name", "test-executor").count(); + assertThat(count, is(equalTo((double) runs))); + } + + @Test + void testExecuteMultipleFailuresAccumulateCount() { + final Runnable failingCommand = () -> { + throw new RuntimeException("fail"); + }; + final int runs = 3; + + for (int i = 0; i < runs; i++) { + final ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + decorator.execute(failingCommand); + verify(delegateMock, atLeast(1)).execute(captor.capture()); + try { + captor.getValue().run(); + } catch (final RuntimeException ignored) { + } + } + + final double count = meterRegistry.counter("executor.tasks.failed", "name", "test-executor").count(); + assertThat(count, is(equalTo((double) runs))); + } + + @Test + void testActiveGaugeIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("executor.active").tag("name", "test-executor").gauge(), is(notNullValue())); + } + + @Test + void testSucceededCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("executor.tasks.succeeded").tag("name", "test-executor").counter(), is(notNullValue())); + } + + @Test + void testFailedCounterIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("executor.tasks.failed").tag("name", "test-executor").counter(), is(notNullValue())); + } + + @Test + void testTaskTimerIsRegisteredWithCorrectName() { + assertThat(meterRegistry.get("executor.task.duration").tag("name", "test-executor").timer(), is(notNullValue())); + } + + @Test + void testInvokeAnyPropagatesInterruptedException() throws InterruptedException, ExecutionException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAny(tasks)).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> decorator.invokeAny(tasks)); + } + + @Test + void testInvokeAnyPropagatesExecutionException() throws InterruptedException, ExecutionException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAny(tasks)).thenThrow(new ExecutionException(new RuntimeException())); + + assertThrows(ExecutionException.class, () -> decorator.invokeAny(tasks)); + } + + @Test + void testInvokeAnyWithTimeoutPropagatesTimeoutException() throws InterruptedException, ExecutionException, TimeoutException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAny(tasks, 1L, TimeUnit.SECONDS)).thenThrow(new TimeoutException()); + + assertThrows(TimeoutException.class, () -> decorator.invokeAny(tasks, 1L, TimeUnit.SECONDS)); + } + + @Test + void testInvokeAllPropagatesInterruptedException() throws InterruptedException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAll(tasks)).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> decorator.invokeAll(tasks)); + } + + @Test + void testInvokeAllWithTimeoutPropagatesInterruptedException() throws InterruptedException { + final Collection> tasks = Collections.singletonList(mock(Callable.class)); + when(delegateMock.invokeAll(tasks, 1L, TimeUnit.SECONDS)).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> decorator.invokeAll(tasks, 1L, TimeUnit.SECONDS)); + } + + @Test + void testAwaitTerminationPropagatesInterruptedException() throws InterruptedException { + when(delegateMock.awaitTermination(anyLong(), any())).thenThrow(new InterruptedException()); + + assertThrows(InterruptedException.class, () -> decorator.awaitTermination(1L, TimeUnit.SECONDS)); + } + + @Test + void testShutdownNowReturnsEmptyListWhenNoPendingTasks() { + when(delegateMock.shutdownNow()).thenReturn(Collections.emptyList()); + + final List result = decorator.shutdownNow(); + + assertThat(result.isEmpty(), is(true)); + } +} \ No newline at end of file diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java index 004477d..fdcab93 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -18,7 +18,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; @@ -38,13 +37,11 @@ class AmazonSnsProducer extends AbstractAmazonSnsProducer { * * @param pendingRequests the shared map of pending requests keyed by request ID * @param topicRequests the shared blocking queue for topic requests - * @param executorService the executor service for async operations */ public AmazonSnsProducer( final ConcurrentMap> pendingRequests, - final BlockingQueue> topicRequests, - final ExecutorService executorService) { - super(pendingRequests, topicRequests, executorService); + final BlockingQueue> topicRequests) { + super(pendingRequests, topicRequests); } } diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index ef94eb5..ff53fbf 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -17,15 +17,10 @@ package com.amazon.sns.messaging.lib.core; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.UnaryOperator; -import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; -import com.amazon.sns.messaging.lib.model.ResponseFailEntry; -import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.model.PublishBatchRequest; @@ -41,27 +36,36 @@ */ public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { + private AmazonSnsTemplate(final Builder> builder) { + super( + new AmazonSnsProducer<>( + builder.getPendingRequests(), + builder.getTopicRequests() + ), + new AmazonSnsConsumer<>( + builder.getAmazonSnsClient(), + builder.getTopicProperty(), + builder.getObjectMapper(), + builder.getPendingRequests(), + builder.getTopicRequests(), + getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), + builder.getPublishDecorator() + ) + ); + } + /** - * Internal constructor that wires together the producer and consumer for the v1 SDK. + * Creates a new builder for constructing an {@link AmazonSnsTemplate}. * + * @param the request entry payload type * @param amazonSnsClient the v1 {@link AmazonSNS} client * @param topicProperty the topic configuration - * @param pendingRequests the shared map of pending requests - * @param topicRequests the shared blocking queue of requests - * @param objectMapper the Jackson ObjectMapper for payload serialization - * @param publishDecorator a decorator for the publish batch request + * @return a new builder instance */ - private AmazonSnsTemplate( + public static Builder> builder( final AmazonSNS amazonSnsClient, - final TopicProperty topicProperty, - final ConcurrentMap> pendingRequests, - final BlockingQueue> topicRequests, - final ObjectMapper objectMapper, - final UnaryOperator publishDecorator) { - super( - new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()), - new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) - ); + final TopicProperty topicProperty) { + return new Builder<>(AmazonSnsTemplate::new, amazonSnsClient, topicProperty); } /** @@ -70,7 +74,10 @@ private AmazonSnsTemplate( * @param amazonSnsClient the v1 {@link AmazonSNS} client * @param topicProperty the topic configuration */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty) { this(amazonSnsClient, topicProperty, UnaryOperator.identity()); } @@ -81,7 +88,11 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, new ObjectMapper(), publishDecorator); } @@ -92,7 +103,11 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param topicRequests the blocking queue for topic requests */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests) { this(amazonSnsClient, topicProperty, topicRequests, UnaryOperator.identity()); } @@ -104,7 +119,12 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param topicRequests the blocking queue for topic requests * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, topicRequests, new ObjectMapper(), publishDecorator); } @@ -115,7 +135,11 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param objectMapper the Jackson ObjectMapper for payload serialization */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final ObjectMapper objectMapper) { this(amazonSnsClient, topicProperty, objectMapper, UnaryOperator.identity()); } @@ -127,7 +151,12 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param objectMapper the Jackson ObjectMapper for payload serialization * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final ObjectMapper objectMapper, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper, publishDecorator); } @@ -139,7 +168,12 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param topicRequests the blocking queue for topic requests * @param objectMapper the Jackson ObjectMapper for payload serialization */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final ObjectMapper objectMapper) { + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final ObjectMapper objectMapper) { this(amazonSnsClient, topicProperty, topicRequests, objectMapper, UnaryOperator.identity()); } @@ -152,8 +186,18 @@ public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty to * @param objectMapper the Jackson ObjectMapper for payload serialization * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { - this(amazonSnsClient, topicProperty, new ConcurrentHashMap<>(), topicRequests, objectMapper, publishDecorator); + @Deprecated + public AmazonSnsTemplate( + final AmazonSNS amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final ObjectMapper objectMapper, + final UnaryOperator publishDecorator) { + this(AmazonSnsTemplate.builder(amazonSnsClient, topicProperty) + .topicRequests(topicRequests) + .objectMapper(objectMapper) + .publishDecorator(publishDecorator) + ); } } diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index 9f196c2..dcb64eb 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -74,7 +74,9 @@ void before() { .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new RingBufferBlockingQueue<>(2048)); + snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty) + .topicRequests(new RingBufferBlockingQueue<>(2048)) + .build(); } @Test diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java index 09a20c2..82284a1 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java @@ -69,7 +69,7 @@ void before() { .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); + snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); } @Test diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java index 2089866..87b2541 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java @@ -69,6 +69,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import lombok.SneakyThrows; // @formatter:off @@ -178,7 +179,10 @@ private AmazonSnsTemplate createTemplate( .topicArn(topicArn) .build(); - return new AmazonSnsTemplate<>(snsClient, topicProperty, new RingBufferBlockingQueue<>(1024)); + return AmazonSnsTemplate.builder(snsClient, topicProperty) + .meterRegistry(new SimpleMeterRegistry()) + .topicRequests(new RingBufferBlockingQueue<>(1024)) + .build(); } private void purgeQueue(final String queueUrl) { @@ -187,11 +191,11 @@ private void purgeQueue(final String queueUrl) { private ReceiveMessageResult receiveMessage(final String queueUrl, final Integer maxNumberOfMessages, final Integer waitTimeSeconds) { final ReceiveMessageResult result = sqsClient.receiveMessage( - new ReceiveMessageRequest(queueUrl) - .withMaxNumberOfMessages(maxNumberOfMessages) - .withWaitTimeSeconds(waitTimeSeconds) - .withAttributeNames(QueueAttributeName.All) - .withMessageAttributeNames("All")); + new ReceiveMessageRequest(queueUrl) + .withMaxNumberOfMessages(maxNumberOfMessages) + .withWaitTimeSeconds(waitTimeSeconds) + .withAttributeNames(QueueAttributeName.All) + .withMessageAttributeNames("All")); result.getMessages().forEach(message -> sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()))); diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java index 271d672..3b5d1dc 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -18,7 +18,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import com.amazon.sns.messaging.lib.model.RequestEntry; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; @@ -38,13 +37,11 @@ class AmazonSnsProducer extends AbstractAmazonSnsProducer { * * @param pendingRequests the shared map of pending requests keyed by request ID * @param topicRequests the shared blocking queue for topic requests - * @param executorService the executor service for async operations */ public AmazonSnsProducer( final ConcurrentMap> pendingRequests, - final BlockingQueue> topicRequests, - final ExecutorService executorService) { - super(pendingRequests, topicRequests, executorService); + final BlockingQueue> topicRequests) { + super(pendingRequests, topicRequests); } } diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index 384c62d..d206f88 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -17,15 +17,10 @@ package com.amazon.sns.messaging.lib.core; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.function.UnaryOperator; -import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; -import com.amazon.sns.messaging.lib.model.ResponseFailEntry; -import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -42,27 +37,36 @@ */ public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate { + private AmazonSnsTemplate(final Builder> builder) { + super( + new AmazonSnsProducer<>( + builder.getPendingRequests(), + builder.getTopicRequests() + ), + new AmazonSnsConsumer<>( + builder.getAmazonSnsClient(), + builder.getTopicProperty(), + builder.getObjectMapper(), + builder.getPendingRequests(), + builder.getTopicRequests(), + getExecutorService(builder.getTopicProperty(), builder.getMeterRegistry()), + builder.getPublishDecorator() + ) + ); + } + /** - * Internal constructor that wires together the producer and consumer for the v2 SDK. + * Creates a new builder for constructing an {@link AmazonSnsTemplate}. * + * @param the request entry payload type * @param amazonSnsClient the v2 {@link SnsClient} * @param topicProperty the topic configuration - * @param pendingRequests the shared map of pending requests - * @param topicRequests the shared blocking queue of requests - * @param objectMapper the Jackson ObjectMapper for payload serialization - * @param publishDecorator a decorator for the publish batch request + * @return a new builder instance */ - private AmazonSnsTemplate( + public static Builder> builder( final SnsClient amazonSnsClient, - final TopicProperty topicProperty, - final ConcurrentMap> pendingRequests, - final BlockingQueue> topicRequests, - final ObjectMapper objectMapper, - final UnaryOperator publishDecorator) { - super( - new AmazonSnsProducer<>(pendingRequests, topicRequests, ExecutorsProvider.getExecutorService()), - new AmazonSnsConsumer<>(amazonSnsClient, topicProperty, objectMapper, pendingRequests, topicRequests, AbstractAmazonSnsTemplate.getAmazonSnsThreadPoolExecutor(topicProperty), publishDecorator) - ); + final TopicProperty topicProperty) { + return new Builder<>(AmazonSnsTemplate::new, amazonSnsClient, topicProperty); } /** @@ -71,7 +75,10 @@ private AmazonSnsTemplate( * @param amazonSnsClient the v2 {@link SnsClient} * @param topicProperty the topic configuration */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty) { this(amazonSnsClient, topicProperty, UnaryOperator.identity()); } @@ -82,7 +89,11 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, new ObjectMapper(), publishDecorator); } @@ -93,7 +104,11 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param topicRequests the blocking queue for topic requests */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests) { this(amazonSnsClient, topicProperty, topicRequests, UnaryOperator.identity()); } @@ -105,7 +120,12 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param topicRequests the blocking queue for topic requests * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, topicRequests, new ObjectMapper(), publishDecorator); } @@ -116,7 +136,11 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param topicProperty the topic configuration * @param objectMapper the Jackson ObjectMapper for payload serialization */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final ObjectMapper objectMapper) { this(amazonSnsClient, topicProperty, objectMapper, UnaryOperator.identity()); } @@ -128,7 +152,12 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param objectMapper the Jackson ObjectMapper for payload serialization * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final ObjectMapper objectMapper, + final UnaryOperator publishDecorator) { this(amazonSnsClient, topicProperty, new RingBufferBlockingQueue<>(topicProperty.getMaximumPoolSize() * topicProperty.getMaxBatchSize()), objectMapper, publishDecorator); } @@ -140,7 +169,12 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param topicRequests the blocking queue for topic requests * @param objectMapper the Jackson ObjectMapper for payload serialization */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final ObjectMapper objectMapper) { + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final ObjectMapper objectMapper) { this(amazonSnsClient, topicProperty, topicRequests, objectMapper, UnaryOperator.identity()); } @@ -153,8 +187,18 @@ public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty to * @param objectMapper the Jackson ObjectMapper for payload serialization * @param publishDecorator a decorator for the publish batch request */ - public AmazonSnsTemplate(final SnsClient amazonSnsClient, final TopicProperty topicProperty, final BlockingQueue> topicRequests, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { - this(amazonSnsClient, topicProperty, new ConcurrentHashMap<>(), topicRequests, objectMapper, publishDecorator); + @Deprecated + public AmazonSnsTemplate( + final SnsClient amazonSnsClient, + final TopicProperty topicProperty, + final BlockingQueue> topicRequests, + final ObjectMapper objectMapper, + final UnaryOperator publishDecorator) { + this(AmazonSnsTemplate.builder(amazonSnsClient, topicProperty) + .topicRequests(topicRequests) + .objectMapper(objectMapper) + .publishDecorator(publishDecorator) + ); } } diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index 04e62ea..5ccf1e5 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -74,7 +74,7 @@ void before() { .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new RingBufferBlockingQueue<>(1024)); + snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).topicRequests(new RingBufferBlockingQueue<>(1024)).build(); } @Test diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java index ec206cd..18d71e7 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerSyncTest.java @@ -71,7 +71,7 @@ void before() { .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); - snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); + snsTemplate = AmazonSnsTemplate.builder(amazonSNS, topicProperty).build(); } @Test diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java index c12924a..3c1d6a1 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplateIntegrationTest.java @@ -54,6 +54,7 @@ import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import lombok.SneakyThrows; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; @@ -178,7 +179,10 @@ private AmazonSnsTemplate createTemplate( .topicArn(topicArn) .build(); - return new AmazonSnsTemplate<>(snsClient, topicProperty, new RingBufferBlockingQueue<>(1024)); + return AmazonSnsTemplate.builder(snsClient, topicProperty) + .meterRegistry(new SimpleMeterRegistry()) + .topicRequests(new RingBufferBlockingQueue<>(1024)) + .build(); } private void purgeQueue(final String queueUrl) { diff --git a/pom.xml b/pom.xml index 1f82c71..e23ce3f 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ true - 1.18.38 + 1.18.42 1.3.14 4.11.0 5.10.2 @@ -34,6 +34,7 @@ 4.5.0 2.0.6 3.0 + 1.16.3 1.20.4 @@ -47,7 +48,7 @@ 2.22.2 3.2.0 3.2.8 - 0.8.12 + 0.8.14 4.0.1.6619 0.8.0 @@ -108,6 +109,13 @@ amazon-sns-java-messaging-lib-v2 ${project.version} + + io.micrometer + micrometer-bom + ${micrometer.version} + pom + import + org.testcontainers testcontainers-bom @@ -142,6 +150,11 @@ jackson-databind ${jackson-databind.version} + + + io.micrometer + micrometer-core + org.projectlombok @@ -447,7 +460,17 @@ 21 - + + + 25 + + 25 + + + 25 + + + sonar