Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [8, 11, 17, 21]
java-version: [8, 11, 17, 21, 25]
steps:
- name: Checkout repository
uses: actions/checkout@v6
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
java-version: [8, 11, 17, 21]
java-version: [8, 11, 17, 21, 25]
steps:
- name: Checkout repository
uses: actions/checkout@v6
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Combine multiple requests to optimally utilise the network.

Article [Martin Fowler](https://martinfowler.com) [Request Batch](https://martinfowler.com/articles/patterns-of-distributed-systems/request-batch.html)

_**Compatible JDK 8, 11, 17 and 21**_
_**Compatible JDK 8, 11, 17, 21 and 25**_

_**Compatible AWS JDK v1 >= 1.12**_

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,96 +16,26 @@

package com.amazon.sns.messaging.lib.concurrent;

import java.util.Objects;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

// @formatter:off
/**
* A custom {@link ThreadPoolExecutor} that tracks active, failed, and succeeded task counts.
* Uses a blocking submission policy to handle rejection and a synchronous queue for task handoff.
* A {@link ThreadPoolExecutor} configured for Amazon SNS publishing. Uses a
* {@link SynchronousQueue} with zero core threads, allowing threads to be created
* on demand up to the specified maximum pool size. Tasks that cannot be accepted
* immediately by the queue will block up to 30 seconds via {@link BlockingSubmissionPolicy}.
*/
public class AmazonSnsThreadPoolExecutor extends ThreadPoolExecutor {

private final AtomicInteger activeTaskCount = new AtomicInteger();

private final AtomicInteger failedTaskCount = new AtomicInteger();

private final AtomicInteger succeededTaskCount = new AtomicInteger();

/**
* Creates a new executor with the specified maximum pool size.
* Creates a new thread pool executor with the given maximum pool size.
*
* @param maximumPoolSize the maximum number of threads to allow in the pool
* @param maximumPoolSize the maximum number of threads allowed in the pool
*/
public AmazonSnsThreadPoolExecutor(final int maximumPoolSize) {
super(0, maximumPoolSize, 60, TimeUnit.SECONDS, new SynchronousQueue<>(), ThreadFactoryProvider.getThreadFactory(), new BlockingSubmissionPolicy(30000));
}

/**
* Returns the number of currently active tasks.
*
* @return the active task count
*/
public int getActiveTaskCount() {
return activeTaskCount.get();
}

/**
* Returns the number of tasks that have failed.
*
* @return the failed task count
*/
public int getFailedTaskCount() {
return failedTaskCount.get();
}

/**
* Returns the number of tasks that have completed successfully.
*
* @return the succeeded task count
*/
public int getSucceededTaskCount() {
return succeededTaskCount.get();
}

/**
* Returns the current size of the task queue.
*
* @return the queue size
*/
public int getQueueSize() {
return getQueue().size();
}

/**
* {@inheritDoc}
*/
@Override
protected void beforeExecute(final Thread thread, final Runnable runnable) {
try {
super.beforeExecute(thread, runnable);
} finally {
activeTaskCount.incrementAndGet();
}
}

/**
* {@inheritDoc}
*/
@Override
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
try {
super.afterExecute(runnable, throwable);
} finally {
if (Objects.nonNull(throwable)) {
failedTaskCount.incrementAndGet();
} else {
succeededTaskCount.incrementAndGet();
}
activeTaskCount.decrementAndGet();
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ public int drainTo(final Collection<? super E> collection, final int maxElements
throw new UnsupportedOperationException();
}

/**
* Internal entry wrapper that holds a value within the ring buffer.
*
* @param <E> the type of the value
*/
@Getter
@Setter
static class Entry<E> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,14 @@
*/
abstract class AbstractAmazonSnsConsumer<C, R, O, E> implements Runnable {

/**
* Kilobyte constant used for size calculations.
*/
private static final Integer KB = 1024;

/**
* Maximum batch size threshold of 256 KB imposed by Amazon SNS.
*/
private static final Integer BATCH_SIZE_BYTES_THRESHOLD = 256 * AbstractAmazonSnsConsumer.KB;

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsConsumer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,9 @@

package com.amazon.sns.messaging.lib.core;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicReference;

import com.amazon.sns.messaging.lib.model.RequestEntry;
import com.amazon.sns.messaging.lib.model.ResponseFailEntry;
Expand All @@ -44,14 +39,12 @@
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class AbstractAmazonSnsProducer<E> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class);
private final AtomicReference<State> state = new AtomicReference<>(State.RUNNIG);

private final ConcurrentMap<String, ListenableFuture<ResponseSuccessEntry, ResponseFailEntry>> pendingRequests;

private final BlockingQueue<RequestEntry<E>> topicRequests;

private final ExecutorService executorService;

/**
* Sends a request entry by enqueuing it for batch processing.
*
Expand All @@ -60,23 +53,29 @@ abstract class AbstractAmazonSnsProducer<E> {
*/
@SneakyThrows
public ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> send(final RequestEntry<E> requestEntry) {
return enqueueRequest(requestEntry);
if (State.RUNNIG.equals(state.get())) {
return enqueueRequest(requestEntry);
} else {
final ListenableFutureImpl listenableFutureImpl = new ListenableFutureImpl();

listenableFutureImpl.fail(ResponseFailEntry.builder()
.withCode("000")
.withId(requestEntry.getId())
.withMessage(String.format("Producer is currently in %s mode; no further messages will be accepted.", state.get().name()))
.withSenderFault(true)
.build()
);

return listenableFutureImpl;
}
}

/**
* Shuts down the producer's executor service gracefully, waiting up to 60 seconds
* for termination.
* Transitions the producer to the shutdown state. No further messages will be
* accepted once shutdown.
*/
@SneakyThrows
public void shutdown() {
LOGGER.warn("Shutdown producer {}", getClass().getSimpleName());

executorService.shutdown();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
LOGGER.warn("Executor service did not terminate in the specified time.");
final List<Runnable> droppedTasks = executorService.shutdownNow();
LOGGER.warn("Executor service was abruptly shut down. {} tasks will not be executed.", droppedTasks.size());
}
state.compareAndSet(State.RUNNIG, State.SHUTDOWN);
}

/**
Expand All @@ -94,5 +93,12 @@ private ListenableFuture<ResponseSuccessEntry, ResponseFailEntry> enqueueRequest
return trackPendingRequest;
}

/**
* Lifecycle states of the producer.
*/
enum State {
RUNNIG, SHUTDOWN
}

}
// @formatter:on
Loading