From cce458cdff23453e7705474cb1a75219bab451f6 Mon Sep 17 00:00:00 2001 From: nachatz Date: Tue, 13 Jan 2026 14:33:55 -0800 Subject: [PATCH 1/3] feat: add individual negative acknowledgement --- pulsar/asyncio.py | 28 ++++++++++++++++++++++++++++ src/consumer.cc | 12 ++++++++++++ tests/asyncio_test.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 5c3178a..e9bb34a 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -320,6 +320,34 @@ async def acknowledge_cumulative( ) await future + async def negative_acknowledge( + self, + message: Union[pulsar.Message, pulsar.MessageId, + _pulsar.Message, _pulsar.MessageId] + ) -> None: + """ + Acknowledge the failure to process a single message asynchronously. + + When a message is "negatively acked" it will be marked for redelivery after + some fixed delay. The delay is configurable when constructing the consumer + with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. + + This call is not blocking. + + Parameters + ---------- + + message: + The received message or message id. + """ + future = asyncio.get_running_loop().create_future() + if isinstance(message, pulsar.Message): + msg = message._message + else: + msg = message + self._consumer.negative_acknowledge_async(msg, functools.partial(_set_future, future, value=None)) + await future + async def unsubscribe(self) -> None: """ Unsubscribe the current consumer from the topic asynchronously. diff --git a/src/consumer.cc b/src/consumer.cc index f1d7367..170741e 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -133,6 +133,16 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me consumer.acknowledgeCumulativeAsync(msgId, callback); } +void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { + py::gil_scoped_release release; + consumer.negativeAcknowledgeAsync(msg, callback); +} + +void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) { + py::gil_scoped_release release; + consumer.negativeAcknowledgeAsync(msg, callback); +} + void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) { py::gil_scoped_release release; consumer.closeAsync(callback); @@ -183,6 +193,8 @@ void export_consumer(py::module_& m) { .def("acknowledge_async", &Consumer_acknowledgeAsync_message_id) .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync) .def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id) + .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync) + .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id) .def("close_async", &Consumer_closeAsync) .def("unsubscribe_async", &Consumer_unsubscribeAsync) .def("seek_async", &Consumer_seekAsync) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 048dc43..66ff0fd 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -203,6 +203,36 @@ async def test_consumer_individual_acknowledge(self): msg = await consumer.receive() self.assertEqual(msg.data(), b'msg-3') + async def test_consumer_negative_acknowledge(self): + topic = f'asyncio-test-consumer-negative-ack-{time.time()}' + sub = 'sub' + consumer = await self._client.subscribe(topic, sub, + consumer_type=pulsar.ConsumerType.Shared, + negative_ack_redelivery_delay_ms=100) + + producer = await self._client.create_producer(topic) + await self._prepare_messages(producer) + msgs = [] + for _ in range(5): + msg = await consumer.receive() + msgs.append(msg) + + await consumer.acknowledge(msgs[1]) + await consumer.acknowledge(msgs[3]) + + await consumer.negative_acknowledge(msgs[0]) + await consumer.negative_acknowledge(msgs[2]) + await consumer.negative_acknowledge(msgs[4]) + await asyncio.sleep(0.2) + + received = [] + for _ in range(3): + msg = await consumer.receive() + received.append(msg.data()) + + self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4']) + await consumer.close() + async def test_multi_topic_consumer(self): topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2'] producers = [] From 34ca90c7424d7ee793f8efc24444f8d785639bc0 Mon Sep 17 00:00:00 2001 From: nachatz Date: Tue, 13 Jan 2026 17:45:50 -0800 Subject: [PATCH 2/3] refactor: leverage asyncio to thread --- pulsar/asyncio.py | 24 ++++-------------------- src/consumer.cc | 4 ++-- 2 files changed, 6 insertions(+), 22 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index e9bb34a..13a82df 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -322,31 +322,15 @@ async def acknowledge_cumulative( async def negative_acknowledge( self, - message: Union[pulsar.Message, pulsar.MessageId, - _pulsar.Message, _pulsar.MessageId] + message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId] ) -> None: - """ - Acknowledge the failure to process a single message asynchronously. - - When a message is "negatively acked" it will be marked for redelivery after - some fixed delay. The delay is configurable when constructing the consumer - with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. - - This call is not blocking. - - Parameters - ---------- - - message: - The received message or message id. - """ - future = asyncio.get_running_loop().create_future() if isinstance(message, pulsar.Message): msg = message._message + elif isinstance(message, pulsar.MessageId): + msg = message._msg_id else: msg = message - self._consumer.negative_acknowledge_async(msg, functools.partial(_set_future, future, value=None)) - await future + await asyncio.to_thread(self._consumer.negative_acknowledge, msg) async def unsubscribe(self) -> None: """ diff --git a/src/consumer.cc b/src/consumer.cc index 170741e..fa52720 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -135,12 +135,12 @@ void Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message& msg, ResultCallback callback) { py::gil_scoped_release release; - consumer.negativeAcknowledgeAsync(msg, callback); + consumer.negativeAcknowledge(msg); } void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const MessageId& msgId, ResultCallback callback) { py::gil_scoped_release release; - consumer.negativeAcknowledgeAsync(msg, callback); + consumer.negativeAcknowledge(msgId); } void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) { From 784b1ed15d7d21e56ea032796d775954d6c46012 Mon Sep 17 00:00:00 2001 From: nachatz Date: Tue, 13 Jan 2026 17:49:41 -0800 Subject: [PATCH 3/3] docs: add docstring back: --- pulsar/asyncio.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 13a82df..064e353 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -324,6 +324,19 @@ async def negative_acknowledge( self, message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message, _pulsar.MessageId] ) -> None: + """ + Acknowledge the failure to process a single message asynchronously. + + When a message is "negatively acked" it will be marked for redelivery after + some fixed delay. The delay is configurable when constructing the consumer + with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}. + This call is not blocking. + + Parameters + ---------- + message: + The received message or message id. + """ if isinstance(message, pulsar.Message): msg = message._message elif isinstance(message, pulsar.MessageId):