diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index ed4abb7..72d443b 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -301,6 +301,11 @@ private Optional createBatch(final BlockingQueue> requests) { final String message = String.format("The maximum allowed message size exceeding 256KB (262,144 bytes). Payload: %s, Headers: %s", stringPayload, request.getMessageHeaders()); handleError(publishBatchRequest, new MaximumAllowedMessageException(message, requests.take())); + + // This entry was rejected and already removed from the queue above; its size + // must NOT be folded into batchSizeBytes, or it would wrongly cut the batch + // short even when smaller, valid entries are still waiting right behind it. + continue; } if (canAddPayload(batchSizeBytes.addAndGet(messageSize))) { diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java index 9c0068e..d4bc6d6 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java @@ -16,6 +16,8 @@ package com.amazon.sns.messaging.lib.core; +import static java.util.function.Function.identity; + import java.util.function.Consumer; import com.amazon.sns.messaging.lib.model.ResponseFailEntry; @@ -45,7 +47,7 @@ public interface ListenableFuture { * @param successCallback the callback to invoke on success */ default void addCallback(final Consumer successCallback) { - addCallback(successCallback, result -> { }); + addCallback(successCallback, identity()::apply); } /** diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java index 7d8f085..443b138 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java @@ -16,8 +16,10 @@ package com.amazon.sns.messaging.lib.core; +import static java.util.function.Function.identity; + import java.util.LinkedList; -import java.util.Objects; +import java.util.Optional; import java.util.Queue; import java.util.function.Consumer; @@ -54,8 +56,8 @@ class ListenableFutureImpl implements ListenableFuture successCallback, final Consumer failureCallback) { synchronized (mutex) { - final Consumer success = Objects.nonNull(successCallback) ? successCallback : result -> { }; - final Consumer failure = Objects.nonNull(failureCallback) ? failureCallback : result -> { }; + final Consumer success = Optional.ofNullable(successCallback).orElse(identity()::apply); + final Consumer failure = Optional.ofNullable(failureCallback).orElse(identity()::apply); switch (state) { case NEW: