diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java index 6a5fc92..c05c379 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureTest.java @@ -18,6 +18,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -85,7 +86,7 @@ void testAddCallbackWithSuccessOnlyDoesNotThrowOnFail() { listenableFuture.addCallback(successCallback); - org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> listenableFuture.fail(entry)); + assertDoesNotThrow(() -> listenableFuture.fail(entry)); } @Test @@ -111,10 +112,9 @@ void testFailDoesNotInvokeSuccessCallback() { @Test void testAddCallbackDefaultFailureCallbackIsNoOp() { final boolean[] called = { false }; - final Consumer successCallback = result -> called[0] = true; final ResponseFailEntry entry = mock(ResponseFailEntry.class); - listenableFuture.addCallback(successCallback); + listenableFuture.addCallback(successCallback -> called[0] = true); listenableFuture.fail(entry); assertThat(called[0], is(false)); diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index a51f9cb..f689af7 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -17,6 +17,7 @@ package com.amazon.sns.messaging.lib.core; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -114,15 +115,16 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); - publishBatchRequest.getPublishBatchRequestEntries().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); - listenableFuture.fail(ResponseFailEntry.builder() - .withId(entry.getId()) - .withCode(code) - .withMessage(message) - .withSenderFault(true) - .build()); - }); + publishBatchRequest.getPublishBatchRequestEntries().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.getId())).ifPresent(listenableFuture -> + listenableFuture.fail(ResponseFailEntry.builder() + .withId(entry.getId()) + .withCode(code) + .withMessage(message) + .withSenderFault(true) + .build()) + ) + ); } /** @@ -130,24 +132,26 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final */ @Override protected void handleResponse(final PublishBatchResult publishBatchResult) { - publishBatchResult.getSuccessful().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); - listenableFuture.success(ResponseSuccessEntry.builder() - .withId(entry.getId()) - .withMessageId(entry.getMessageId()) - .withSequenceNumber(entry.getSequenceNumber()) - .build()); - }); - - publishBatchResult.getFailed().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); - listenableFuture.fail(ResponseFailEntry.builder() - .withId(entry.getId()) - .withCode(entry.getCode()) - .withMessage(entry.getMessage()) - .withSenderFault(entry.getSenderFault()) - .build()); - }); + publishBatchResult.getSuccessful().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.getId())).ifPresent(listenableFuture -> + listenableFuture.success(ResponseSuccessEntry.builder() + .withId(entry.getId()) + .withMessageId(entry.getMessageId()) + .withSequenceNumber(entry.getSequenceNumber()) + .build()) + ) + ); + + publishBatchResult.getFailed().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.getId())).ifPresent(listenableFuture -> + listenableFuture.fail(ResponseFailEntry.builder() + .withId(entry.getId()) + .withCode(entry.getCode()) + .withMessage(entry.getMessage()) + .withSenderFault(entry.getSenderFault()) + .build()) + ) + ); } } diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index 5f88583..ffbd720 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -17,6 +17,7 @@ package com.amazon.sns.messaging.lib.core; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -116,15 +117,16 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); - publishBatchRequest.publishBatchRequestEntries().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); - listenableFuture.fail(ResponseFailEntry.builder() - .withId(entry.id()) - .withCode(code) - .withMessage(message) - .withSenderFault(true) - .build()); - }); + publishBatchRequest.publishBatchRequestEntries().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.id())).ifPresent(listenableFuture -> + listenableFuture.fail(ResponseFailEntry.builder() + .withId(entry.id()) + .withCode(code) + .withMessage(message) + .withSenderFault(true) + .build()) + ) + ); } /** @@ -132,24 +134,26 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final */ @Override protected void handleResponse(final PublishBatchResponse publishBatchResult) { - publishBatchResult.successful().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); - listenableFuture.success(ResponseSuccessEntry.builder() - .withId(entry.id()) - .withMessageId(entry.messageId()) - .withSequenceNumber(entry.sequenceNumber()) - .build()); - }); - - publishBatchResult.failed().forEach(entry -> { - final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); - listenableFuture.fail(ResponseFailEntry.builder() - .withId(entry.id()) - .withCode(entry.code()) - .withMessage(entry.message()) - .withSenderFault(entry.senderFault()) - .build()); - }); + publishBatchResult.successful().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.id())).ifPresent(listenableFuture -> + listenableFuture.success(ResponseSuccessEntry.builder() + .withId(entry.id()) + .withMessageId(entry.messageId()) + .withSequenceNumber(entry.sequenceNumber()) + .build()) + ) + ); + + publishBatchResult.failed().forEach(entry -> + Optional.ofNullable(pendingRequests.remove(entry.id())).ifPresent(listenableFuture -> + listenableFuture.fail(ResponseFailEntry.builder() + .withId(entry.id()) + .withCode(entry.code()) + .withMessage(entry.message()) + .withSenderFault(entry.senderFault()) + .build()) + ) + ); } }