From ce94a0da39fed8f053dd4720640abc66f0296c03 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Fri, 26 Jun 2026 18:51:05 -0400 Subject: [PATCH 1/6] feat(python): add topic listing, update, delete and purge --- foreign/python/apache_iggy.pyi | 55 +++ foreign/python/src/client.rs | 123 ++++++- foreign/python/src/lib.rs | 3 +- foreign/python/src/topic.rs | 47 +++ foreign/python/tests/test_topic.py | 548 ++++++++++++++++++++++++++++- 5 files changed, 773 insertions(+), 3 deletions(-) diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 4340949c72..157f714206 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -34,6 +34,7 @@ __all__ = [ "ReceiveMessage", "SendMessage", "StreamDetails", + "Topic", "TopicDetails", ] @@ -302,6 +303,45 @@ class IggyClient: Gets topic by stream and id. Returns Option of topic details or a PyRuntimeError on failure. """ + def get_topics( + self, stream_id: builtins.str | builtins.int + ) -> collections.abc.Awaitable[list[Topic]]: + r""" + Gets all topics in the given stream. + Returns a list of topics or a PyRuntimeError on failure. + """ + def update_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + name: builtins.str, + compression_algorithm: builtins.str | None = None, + replication_factor: builtins.int | None = None, + message_expiry: datetime.timedelta | None = None, + max_topic_size: builtins.int | None = None, + ) -> collections.abc.Awaitable[None]: + r""" + Updates an existing topic with the given parameters. + Returns Ok(()) on successful topic update or a PyRuntimeError on failure. + """ + def delete_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + ) -> collections.abc.Awaitable[None]: + r""" + Deletes the topic with the given id from the given stream. + Returns Ok(()) on successful topic deletion or a PyRuntimeError on failure. + """ + def purge_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + ) -> collections.abc.Awaitable[None]: + r""" + Purges all messages from the topic with the given id in the given stream. + Returns Ok(()) on successful topic purge or a PyRuntimeError on failure. + """ def send_messages( self, stream: builtins.str | builtins.int, @@ -519,6 +559,17 @@ class StreamDetails: @property def topics_count(self) -> builtins.int: ... +@typing.final +class Topic: + @property + def id(self) -> builtins.int: ... + @property + def name(self) -> builtins.str: ... + @property + def messages_count(self) -> builtins.int: ... + @property + def partitions_count(self) -> builtins.int: ... + @typing.final class TopicDetails: @property @@ -529,3 +580,7 @@ class TopicDetails: def messages_count(self) -> builtins.int: ... @property def partitions_count(self) -> builtins.int: ... + @property + def compression_algorithm(self) -> builtins.str: ... + @property + def replication_factor(self) -> builtins.int: ... diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index 0c19c10aee..33c8aec2fc 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -33,7 +33,7 @@ use crate::identifier::PyIdentifier; use crate::receive_message::{PollingStrategy, ReceiveMessage}; use crate::send_message::SendMessage; use crate::stream::StreamDetails; -use crate::topic::TopicDetails; +use crate::topic::{Topic, TopicDetails}; use tokio::sync::Mutex; /// A Python class representing the Iggy client. @@ -243,6 +243,127 @@ impl IggyClient { }) } + /// Gets all topics in the given stream. + /// Returns a list of topics or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[Topic]]", imports=("collections.abc")))] + fn get_topics<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + let topics = inner + .get_topics(&stream_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(topics.into_iter().map(Topic::from).collect::>()) + }) + } + + /// Updates an existing topic with the given parameters. + /// Returns Ok(()) on successful topic update or a PyRuntimeError on failure. + #[pyo3( + signature = (stream_id, topic_id, name, compression_algorithm = None, replication_factor = None, message_expiry = None, max_topic_size = None) + )] + #[allow(clippy::too_many_arguments)] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn update_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + name: String, + #[gen_stub(override_type(type_repr = "builtins.str | None"))] compression_algorithm: Option< + String, + >, + #[gen_stub(override_type(type_repr = "builtins.int | None"))] replication_factor: Option< + u8, + >, + #[gen_stub(override_type(type_repr = "datetime.timedelta | None", imports=("datetime")))] + message_expiry: Option>, + #[gen_stub(override_type(type_repr = "builtins.int | None"))] max_topic_size: Option, + ) -> PyResult> { + let compression_algorithm = match compression_algorithm { + Some(algo) => CompressionAlgorithm::from_str(&algo) + .map_err(|e| PyErr::new::(e.to_string()))?, + None => CompressionAlgorithm::default(), + }; + + let expiry = match message_expiry { + Some(delta) => IggyExpiry::ExpireDuration(py_delta_to_iggy_duration(&delta)), + None => IggyExpiry::ServerDefault, + }; + + let max_size = max_topic_size.map_or(MaxTopicSize::ServerDefault, MaxTopicSize::from); + + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .update_topic( + &stream_id, + &topic_id, + &name, + compression_algorithm, + replication_factor, + expiry, + max_size, + ) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + + /// Deletes the topic with the given id from the given stream. + /// Returns Ok(()) on successful topic deletion or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn delete_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .delete_topic(&stream_id, &topic_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + + /// Purges all messages from the topic with the given id in the given stream. + /// Returns Ok(()) on successful topic purge or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn purge_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .purge_topic(&stream_id, &topic_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + /// Sends a list of messages to the specified topic. /// Returns Ok(()) on successful sending or a PyRuntimeError on failure. #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs index d7b675ce0d..59b62608ba 100644 --- a/foreign/python/src/lib.rs +++ b/foreign/python/src/lib.rs @@ -29,7 +29,7 @@ use pyo3::prelude::*; use receive_message::{PollingStrategy, ReceiveMessage}; use send_message::SendMessage; use stream::StreamDetails; -use topic::TopicDetails; +use topic::{Topic, TopicDetails}; /// A Python module implemented in Rust. #[pymodule] @@ -38,6 +38,7 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/foreign/python/src/topic.rs b/foreign/python/src/topic.rs index 6784b6e4ee..9a4af870c2 100644 --- a/foreign/python/src/topic.rs +++ b/foreign/python/src/topic.rs @@ -15,10 +15,47 @@ // specific language governing permissions and limitations // under the License. +use iggy::prelude::Topic as RustTopic; use iggy::prelude::TopicDetails as RustTopicDetails; use pyo3::prelude::*; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +#[gen_stub_pyclass] +#[pyclass] +pub struct Topic { + pub(crate) inner: RustTopic, +} + +impl From for Topic { + fn from(topic: RustTopic) -> Self { + Self { inner: topic } + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl Topic { + #[getter] + pub fn id(&self) -> u32 { + self.inner.id + } + + #[getter] + pub fn name(&self) -> String { + self.inner.name.to_string() + } + + #[getter] + pub fn messages_count(&self) -> u64 { + self.inner.messages_count + } + + #[getter] + pub fn partitions_count(&self) -> u32 { + self.inner.partitions_count + } +} + #[gen_stub_pyclass] #[pyclass] pub struct TopicDetails { @@ -55,4 +92,14 @@ impl TopicDetails { pub fn partitions_count(&self) -> u32 { self.inner.partitions_count } + + #[getter] + pub fn compression_algorithm(&self) -> String { + self.inner.compression_algorithm.to_string() + } + + #[getter] + pub fn replication_factor(&self) -> u8 { + self.inner.replication_factor + } } diff --git a/foreign/python/tests/test_topic.py b/foreign/python/tests/test_topic.py index 76f60b0a44..58eca2a5e8 100644 --- a/foreign/python/tests/test_topic.py +++ b/foreign/python/tests/test_topic.py @@ -19,7 +19,7 @@ import pytest -from apache_iggy import IggyClient +from apache_iggy import IggyClient, SendMessage from .utils import get_server_config, wait_for_ping, wait_for_server @@ -588,3 +588,549 @@ async def test_create_topic_before_login_fails(self, unique_name): await client.create_topic( stream=unique_name(), name=unique_name(), partitions_count=1 ) + + +class TestGetTopics: + """Test listing topics in a stream via get_topics.""" + + @pytest.mark.asyncio + async def test_get_topics_in_empty_stream_returns_empty_list( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics returns an empty list for a stream with no topics.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + topics = await iggy_client.get_topics(stream_name) + assert topics == [] + + @pytest.mark.asyncio + @pytest.mark.parametrize("topic_count", [1, 3, 5]) + async def test_get_topics_returns_all_created_topics( + self, iggy_client: IggyClient, unique_name, topic_count: int + ): + """Test get_topics returns every topic created in the stream.""" + stream_name = unique_name() + topic_names = {unique_name() for _ in range(topic_count)} + + await iggy_client.create_stream(stream_name) + for name in topic_names: + await iggy_client.create_topic( + stream=stream_name, name=name, partitions_count=1 + ) + + topics = await iggy_client.get_topics(stream_name) + assert len(topics) == topic_count + assert {topic.name for topic in topics} == topic_names + + @pytest.mark.asyncio + async def test_get_topics_accepts_numeric_stream_id( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics works when the stream is referenced by numeric id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + stream = await iggy_client.get_stream(stream_name) + assert stream is not None + + topics_by_name = await iggy_client.get_topics(stream_name) + topics_by_id = await iggy_client.get_topics(stream.id) + assert len(topics_by_name) == 1 + assert len(topics_by_id) == 1 + assert topics_by_id[0].name == topic_name + + @pytest.mark.asyncio + async def test_get_topics_in_nonexistent_stream_returns_empty( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics returns an empty list for a non-existent stream.""" + topics = await iggy_client.get_topics(unique_name()) + assert topics == [] + + @pytest.mark.asyncio + async def test_get_topics_before_connect_fails(self, unique_name): + """Test get_topics requires an established connection.""" + host, port = get_server_config() + client = IggyClient(f"{host}:{port}") + + with pytest.raises(RuntimeError): + await client.get_topics(unique_name()) + + @pytest.mark.asyncio + async def test_get_topics_before_login_fails(self, unique_name): + """Test get_topics requires authentication.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + await client.connect() + + with pytest.raises(RuntimeError): + await client.get_topics(unique_name()) + + +class TestUpdateTopic: + """Test updating topics via update_topic.""" + + @pytest.mark.asyncio + async def test_update_topic_renames_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic renames a topic; old name no longer resolves.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=new_name + ) + + renamed = await iggy_client.get_topic(stream_name, new_name) + assert renamed is not None + assert renamed.name == new_name + + old = await iggy_client.get_topic(stream_name, topic_name) + assert old is None + + @pytest.mark.asyncio + async def test_update_topic_preserves_id( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic keeps the same numeric id after a rename.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + before = await iggy_client.get_topic(stream_name, topic_name) + assert before is not None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=new_name + ) + + after = await iggy_client.get_topic(stream_name, new_name) + assert after is not None + assert after.id == before.id + + @pytest.mark.asyncio + async def test_update_topic_by_numeric_id( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic accepts a numeric topic id.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic.id, name=new_name + ) + + renamed = await iggy_client.get_topic(stream_name, new_name) + assert renamed is not None + assert renamed.name == new_name + + @pytest.mark.asyncio + @pytest.mark.parametrize("compression_algorithm", ["gzip", "none"]) + async def test_update_topic_with_valid_compression_algorithm( + self, iggy_client: IggyClient, unique_name, compression_algorithm: str + ): + """Test update_topic accepts a supported compression algorithm.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + compression_algorithm=compression_algorithm, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + assert topic.compression_algorithm == compression_algorithm + + @pytest.mark.asyncio + @pytest.mark.parametrize("compression_algorithm", ["brotli", "gzipp", ""]) + async def test_update_topic_invalid_compression_algorithm( + self, iggy_client: IggyClient, unique_name, compression_algorithm: str + ): + """Test update_topic rejects unsupported compression algorithm values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(RuntimeError, match="Unknown compression type"): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + compression_algorithm=compression_algorithm, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("invalid_message_expiry", [1, "1s", object()]) + async def test_update_topic_invalid_message_expiry( + self, iggy_client: IggyClient, unique_name, invalid_message_expiry + ): + """Test update_topic rejects message_expiry values that are not timedeltas.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(TypeError): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + message_expiry=invalid_message_expiry, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("replication_factor", [0, 1, 255]) + async def test_update_topic_with_valid_replication_factor( + self, iggy_client: IggyClient, unique_name, replication_factor: int + ): + """Test update_topic accepts a supported replication factor.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + replication_factor=replication_factor, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + # The server normalizes a replication factor of 0 to 1. + assert topic.replication_factor == (replication_factor or 1) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("replication_factor", "expected_exception"), + [ + (-1, OverflowError), + (256, OverflowError), + ("1", TypeError), + (1.0, TypeError), + ], + ) + async def test_update_topic_invalid_replication_factor( + self, + iggy_client: IggyClient, + unique_name, + replication_factor, + expected_exception, + ): + """Test update_topic rejects invalid replication factor values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(expected_exception): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + replication_factor=replication_factor, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("max_topic_size", "expected_exception"), + [ + (-1, OverflowError), + (2e64, TypeError), + ], + ) + async def test_update_topic_invalid_max_topic_size( + self, + iggy_client: IggyClient, + unique_name, + max_topic_size, + expected_exception, + ): + """Test update_topic rejects invalid maximum topic size values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(expected_exception): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + max_topic_size=max_topic_size, + ) + + @pytest.mark.asyncio + async def test_update_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic raises for a non-existent topic.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.update_topic( + stream_id=stream_name, topic_id=unique_name(), name=unique_name() + ) + + @pytest.mark.asyncio + async def test_update_topic_before_connect_fails(self, unique_name): + """Test update_topic requires an established connection.""" + host, port = get_server_config() + client = IggyClient(f"{host}:{port}") + + with pytest.raises(RuntimeError): + await client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + + @pytest.mark.asyncio + async def test_update_topic_before_login_fails(self, unique_name): + """Test update_topic requires authentication.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + await client.connect() + + with pytest.raises(RuntimeError): + await client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + + +class TestDeleteTopic: + """Test deleting topics via delete_topic.""" + + @pytest.mark.asyncio + async def test_delete_topic_removes_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic removes the topic and drops the stream count.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_name) + + assert await iggy_client.get_topic(stream_name, topic_name) is None + assert await iggy_client.get_topics(stream_name) == [] + + stream = await iggy_client.get_stream(stream_name) + assert stream is not None + assert stream.topics_count == 0 + + @pytest.mark.asyncio + async def test_delete_topic_by_numeric_id( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic accepts a numeric topic id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + + await iggy_client.delete_topic(stream_name, topic.id) + + assert await iggy_client.get_topic(stream_name, topic_name) is None + + @pytest.mark.asyncio + async def test_delete_topic_leaves_other_topics( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic removes only the targeted topic.""" + stream_name = unique_name() + topic_to_delete = unique_name() + topic_to_keep = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_to_delete, partitions_count=1 + ) + await iggy_client.create_topic( + stream=stream_name, name=topic_to_keep, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_to_delete) + + remaining = await iggy_client.get_topics(stream_name) + assert len(remaining) == 1 + assert remaining[0].name == topic_to_keep + + @pytest.mark.asyncio + async def test_delete_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic raises for a non-existent topic.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.delete_topic(stream_name, unique_name()) + + @pytest.mark.asyncio + async def test_delete_topic_before_connect_fails(self, unique_name): + """Test delete_topic requires an established connection.""" + host, port = get_server_config() + client = IggyClient(f"{host}:{port}") + + with pytest.raises(RuntimeError): + await client.delete_topic(unique_name(), unique_name()) + + @pytest.mark.asyncio + async def test_delete_topic_before_login_fails(self, unique_name): + """Test delete_topic requires authentication.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + await client.connect() + + with pytest.raises(RuntimeError): + await client.delete_topic(unique_name(), unique_name()) + + +class TestPurgeTopic: + """Test purging topic messages via purge_topic.""" + + @pytest.mark.asyncio + async def test_purge_topic_clears_messages_but_keeps_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic empties the topic while leaving it in place.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + messages = [SendMessage(f"payload-{index}") for index in range(5)] + await iggy_client.send_messages(stream_name, topic_name, 0, messages) + + before = await iggy_client.get_topic(stream_name, topic_name) + assert before is not None + assert before.messages_count == 5 + + await iggy_client.purge_topic(stream_name, topic_name) + + after = await iggy_client.get_topic(stream_name, topic_name) + assert after is not None + assert after.messages_count == 0 + + @pytest.mark.asyncio + async def test_purge_empty_topic_succeeds( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic is a no-op on a topic with no messages.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.purge_topic(stream_name, topic_name) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.messages_count == 0 + + @pytest.mark.asyncio + async def test_purge_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic raises for a non-existent topic.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.purge_topic(stream_name, unique_name()) + + @pytest.mark.asyncio + async def test_purge_topic_before_connect_fails(self, unique_name): + """Test purge_topic requires an established connection.""" + host, port = get_server_config() + client = IggyClient(f"{host}:{port}") + + with pytest.raises(RuntimeError): + await client.purge_topic(unique_name(), unique_name()) + + @pytest.mark.asyncio + async def test_purge_topic_before_login_fails(self, unique_name): + """Test purge_topic requires authentication.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + await client.connect() + + with pytest.raises(RuntimeError): + await client.purge_topic(unique_name(), unique_name()) From 93042e364ab068ae1befaf074c751fa259ebfd74 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Sat, 27 Jun 2026 07:20:05 -0400 Subject: [PATCH 2/6] refactor(python): merge Topic and TopicDetails imports into one use --- foreign/python/src/topic.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/foreign/python/src/topic.rs b/foreign/python/src/topic.rs index 9a4af870c2..7d8b6b69b2 100644 --- a/foreign/python/src/topic.rs +++ b/foreign/python/src/topic.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iggy::prelude::Topic as RustTopic; -use iggy::prelude::TopicDetails as RustTopicDetails; +use iggy::prelude::{Topic as RustTopic, TopicDetails as RustTopicDetails}; use pyo3::prelude::*; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; From 98f00e1cc76d7c01930b7ed5809d84ca408a22e1 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Sat, 27 Jun 2026 09:03:13 -0400 Subject: [PATCH 3/6] test(python): expand topic management test coverage --- foreign/python/tests/test_topic.py | 241 ++++++++++++++++++++++++----- 1 file changed, 202 insertions(+), 39 deletions(-) diff --git a/foreign/python/tests/test_topic.py b/foreign/python/tests/test_topic.py index 58eca2a5e8..bbda2562af 100644 --- a/foreign/python/tests/test_topic.py +++ b/foreign/python/tests/test_topic.py @@ -621,8 +621,33 @@ async def test_get_topics_returns_all_created_topics( ) topics = await iggy_client.get_topics(stream_name) - assert len(topics) == topic_count + assert len(topics) == len(topic_names) assert {topic.name for topic in topics} == topic_names + # Only assert fields supplied at creation or deterministically set by + # the server: each topic keeps its partition count and a fresh topic + # holds no messages. The numeric id is server-assigned and not checked. + assert all(topic.partitions_count == 1 for topic in topics) + assert all(topic.messages_count == 0 for topic in topics) + + @pytest.mark.asyncio + async def test_get_topics_returns_same_result_when_called_repeatedly( + self, iggy_client: IggyClient, unique_name + ): + """Test repeated get_topics calls return a stable view of the stream.""" + stream_name = unique_name() + topic_names = {unique_name() for _ in range(3)} + + await iggy_client.create_stream(stream_name) + for name in topic_names: + await iggy_client.create_topic( + stream=stream_name, name=name, partitions_count=1 + ) + + first = await iggy_client.get_topics(stream_name) + second = await iggy_client.get_topics(stream_name) + assert {topic.name for topic in first} == topic_names + assert {topic.name for topic in second} == topic_names + assert {topic.id for topic in first} == {topic.id for topic in second} @pytest.mark.asyncio async def test_get_topics_accepts_numeric_stream_id( @@ -654,23 +679,16 @@ async def test_get_topics_in_nonexistent_stream_returns_empty( assert topics == [] @pytest.mark.asyncio - async def test_get_topics_before_connect_fails(self, unique_name): - """Test get_topics requires an established connection.""" + async def test_get_topics_requires_connection_and_auth(self, unique_name): + """Test get_topics fails both before connecting and before logging in.""" host, port = get_server_config() - client = IggyClient(f"{host}:{port}") + wait_for_server(host, port) + client = IggyClient(f"{host}:{port}") with pytest.raises(RuntimeError): await client.get_topics(unique_name()) - @pytest.mark.asyncio - async def test_get_topics_before_login_fails(self, unique_name): - """Test get_topics requires authentication.""" - host, port = get_server_config() - wait_for_server(host, port) - - client = IggyClient(f"{host}:{port}") await client.connect() - with pytest.raises(RuntimeError): await client.get_topics(unique_name()) @@ -913,6 +931,86 @@ async def test_update_topic_invalid_max_topic_size( max_topic_size=max_topic_size, ) + @pytest.mark.asyncio + async def test_update_topic_with_message_expiry( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic accepts an explicit message expiry.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + message_expiry=timedelta(minutes=10), + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + + @pytest.mark.asyncio + @pytest.mark.parametrize("max_topic_size", [0, 2_000_000_000, 2**64 - 1]) + async def test_update_topic_with_valid_max_topic_size( + self, iggy_client: IggyClient, unique_name, max_topic_size: int + ): + """Test update_topic accepts supported maximum topic size values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + max_topic_size=max_topic_size, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + + @pytest.mark.asyncio + async def test_update_topic_applies_repeated_updates( + self, iggy_client: IggyClient, unique_name + ): + """Test successive update_topic calls each take effect.""" + stream_name = unique_name() + topic_name = unique_name() + first_rename = unique_name() + second_rename = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=first_rename + ) + after_first = await iggy_client.get_topic(stream_name, first_rename) + assert after_first is not None + assert after_first.name == first_rename + assert await iggy_client.get_topic(stream_name, topic_name) is None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=first_rename, name=second_rename + ) + after_second = await iggy_client.get_topic(stream_name, second_rename) + assert after_second is not None + assert after_second.name == second_rename + assert await iggy_client.get_topic(stream_name, first_rename) is None + @pytest.mark.asyncio async def test_update_nonexistent_topic_fails( self, iggy_client: IggyClient, unique_name @@ -928,25 +1026,50 @@ async def test_update_nonexistent_topic_fails( ) @pytest.mark.asyncio - async def test_update_topic_before_connect_fails(self, unique_name): - """Test update_topic requires an established connection.""" - host, port = get_server_config() - client = IggyClient(f"{host}:{port}") - + async def test_update_topic_in_nonexistent_stream_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic raises when the stream does not exist.""" with pytest.raises(RuntimeError): - await client.update_topic( + await iggy_client.update_topic( stream_id=unique_name(), topic_id=unique_name(), name=unique_name() ) @pytest.mark.asyncio - async def test_update_topic_before_login_fails(self, unique_name): - """Test update_topic requires authentication.""" + async def test_update_topic_to_existing_name_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic rejects renaming a topic to a name already in use.""" + stream_name = unique_name() + first_topic = unique_name() + second_topic = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=first_topic, partitions_count=1 + ) + await iggy_client.create_topic( + stream=stream_name, name=second_topic, partitions_count=1 + ) + + with pytest.raises(RuntimeError): + await iggy_client.update_topic( + stream_id=stream_name, topic_id=second_topic, name=first_topic + ) + + @pytest.mark.asyncio + async def test_update_topic_requires_connection_and_auth(self, unique_name): + """Test update_topic fails both before connecting and before logging in.""" host, port = get_server_config() wait_for_server(host, port) client = IggyClient(f"{host}:{port}") - await client.connect() + with pytest.raises(RuntimeError): + await client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + await client.connect() with pytest.raises(RuntimeError): await client.update_topic( stream_id=unique_name(), topic_id=unique_name(), name=unique_name() @@ -1033,23 +1156,33 @@ async def test_delete_nonexistent_topic_fails( await iggy_client.delete_topic(stream_name, unique_name()) @pytest.mark.asyncio - async def test_delete_topic_before_connect_fails(self, unique_name): - """Test delete_topic requires an established connection.""" - host, port = get_server_config() - client = IggyClient(f"{host}:{port}") + async def test_delete_topic_twice_fails_second_time( + self, iggy_client: IggyClient, unique_name + ): + """Test deleting an already-deleted topic raises on the second call.""" + stream_name = unique_name() + topic_name = unique_name() + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_name) with pytest.raises(RuntimeError): - await client.delete_topic(unique_name(), unique_name()) + await iggy_client.delete_topic(stream_name, topic_name) @pytest.mark.asyncio - async def test_delete_topic_before_login_fails(self, unique_name): - """Test delete_topic requires authentication.""" + async def test_delete_topic_requires_connection_and_auth(self, unique_name): + """Test delete_topic fails both before connecting and before logging in.""" host, port = get_server_config() wait_for_server(host, port) client = IggyClient(f"{host}:{port}") - await client.connect() + with pytest.raises(RuntimeError): + await client.delete_topic(unique_name(), unique_name()) + await client.connect() with pytest.raises(RuntimeError): await client.delete_topic(unique_name(), unique_name()) @@ -1082,6 +1215,12 @@ async def test_purge_topic_clears_messages_but_keeps_topic( after = await iggy_client.get_topic(stream_name, topic_name) assert after is not None assert after.messages_count == 0 + # Purging clears messages only; every other field is left unchanged. + assert after.id == before.id + assert after.name == before.name + assert after.partitions_count == before.partitions_count + assert after.compression_algorithm == before.compression_algorithm + assert after.replication_factor == before.replication_factor @pytest.mark.asyncio async def test_purge_empty_topic_succeeds( @@ -1102,11 +1241,34 @@ async def test_purge_empty_topic_succeeds( assert topic is not None assert topic.messages_count == 0 + @pytest.mark.asyncio + async def test_purge_topic_is_idempotent_when_called_repeatedly( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic succeeds when called repeatedly on the same topic.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + messages = [SendMessage(f"payload-{index}") for index in range(5)] + await iggy_client.send_messages(stream_name, topic_name, 0, messages) + + await iggy_client.purge_topic(stream_name, topic_name) + await iggy_client.purge_topic(stream_name, topic_name) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.messages_count == 0 + @pytest.mark.asyncio async def test_purge_nonexistent_topic_fails( self, iggy_client: IggyClient, unique_name ): - """Test purge_topic raises for a non-existent topic.""" + """Test purge_topic raises for a non-existent topic in an existing stream.""" stream_name = unique_name() await iggy_client.create_stream(stream_name) @@ -1115,22 +1277,23 @@ async def test_purge_nonexistent_topic_fails( await iggy_client.purge_topic(stream_name, unique_name()) @pytest.mark.asyncio - async def test_purge_topic_before_connect_fails(self, unique_name): - """Test purge_topic requires an established connection.""" - host, port = get_server_config() - client = IggyClient(f"{host}:{port}") - + async def test_purge_topic_in_nonexistent_stream_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic raises when the stream does not exist.""" with pytest.raises(RuntimeError): - await client.purge_topic(unique_name(), unique_name()) + await iggy_client.purge_topic(unique_name(), unique_name()) @pytest.mark.asyncio - async def test_purge_topic_before_login_fails(self, unique_name): - """Test purge_topic requires authentication.""" + async def test_purge_topic_requires_connection_and_auth(self, unique_name): + """Test purge_topic fails both before connecting and before logging in.""" host, port = get_server_config() wait_for_server(host, port) client = IggyClient(f"{host}:{port}") - await client.connect() + with pytest.raises(RuntimeError): + await client.purge_topic(unique_name(), unique_name()) + await client.connect() with pytest.raises(RuntimeError): await client.purge_topic(unique_name(), unique_name()) From 50d5505e9c584a4b138a03e35cb0297493ee19f8 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Sat, 27 Jun 2026 20:07:20 -0400 Subject: [PATCH 4/6] test(python): split topic tests into TestCreateTopic and TestGetTopic --- foreign/python/tests/test_topic.py | 138 +++++++++++++++-------------- 1 file changed, 71 insertions(+), 67 deletions(-) diff --git a/foreign/python/tests/test_topic.py b/foreign/python/tests/test_topic.py index bbda2562af..c40260cef6 100644 --- a/foreign/python/tests/test_topic.py +++ b/foreign/python/tests/test_topic.py @@ -24,8 +24,8 @@ from .utils import get_server_config, wait_for_ping, wait_for_server -class TestTopicOperations: - """Test topic creation, retrieval, and management.""" +class TestCreateTopic: + """Test topic creation via create_topic.""" @pytest.mark.asyncio @pytest.mark.parametrize( @@ -109,30 +109,6 @@ async def test_create_topic_invalid_names( stream=stream_name, name=topic_name, partitions_count=1 ) - @pytest.mark.asyncio - async def test_get_topic_by_name_and_id(self, iggy_client: IggyClient, unique_name): - """Test repeated topic lookup works by both name and numeric id.""" - stream_name = unique_name() - topic_name = unique_name() - - await iggy_client.create_stream(stream_name) - await iggy_client.create_topic( - stream=stream_name, name=topic_name, partitions_count=1 - ) - - topic_by_name = await iggy_client.get_topic(stream_name, topic_name) - assert topic_by_name is not None - - topic_by_name_again = await iggy_client.get_topic(stream_name, topic_name) - assert topic_by_name_again is not None - assert topic_by_name_again.id == topic_by_name.id - assert topic_by_name_again.name == topic_by_name.name - - topic_by_id = await iggy_client.get_topic(stream_name, topic_by_name.id) - assert topic_by_id is not None - assert topic_by_id.id == topic_by_name.id - assert topic_by_id.name == topic_by_name.name - @pytest.mark.asyncio async def test_create_and_get_topic_with_numeric_stream_id( self, iggy_client: IggyClient, unique_name @@ -177,31 +153,6 @@ async def test_duplicate_topic_creation(self, iggy_client: IggyClient, unique_na assert "already exists" in str(exc_info.value) - @pytest.mark.asyncio - async def test_get_nonexistent_topic(self, iggy_client: IggyClient, unique_name): - """Test getting a non-existent topic by name or numeric id.""" - stream_name = unique_name() - nonexistent_topic_name = unique_name() - - await iggy_client.create_stream(stream_name) - - topic_by_name = await iggy_client.get_topic(stream_name, nonexistent_topic_name) - assert topic_by_name is None - - topic_by_id = await iggy_client.get_topic(stream_name, 999999) - assert topic_by_id is None - - @pytest.mark.asyncio - async def test_get_topic_in_nonexistent_stream( - self, iggy_client: IggyClient, unique_name - ): - """Test getting a topic from a non-existent stream returns no topic.""" - nonexistent_stream = unique_name() - topic_name = unique_name() - - topic = await iggy_client.get_topic(nonexistent_stream, topic_name) - assert topic is None - @pytest.mark.asyncio async def test_topic_names_can_repeat_across_different_streams( self, iggy_client: IggyClient, unique_name @@ -544,17 +495,19 @@ async def test_create_topic_in_nonexistent_stream( ) @pytest.mark.asyncio - async def test_get_topic_before_connect_fails(self, unique_name): - """Test get_topic requires an established connection.""" + async def test_create_topic_before_connect_fails(self, unique_name): + """Test create_topic requires an established connection.""" host, port = get_server_config() client = IggyClient(f"{host}:{port}") with pytest.raises(RuntimeError): - await client.get_topic(unique_name(), unique_name()) + await client.create_topic( + stream=unique_name(), name=unique_name(), partitions_count=1 + ) @pytest.mark.asyncio - async def test_get_topic_before_login_fails(self, unique_name): - """Test get_topic requires authentication.""" + async def test_create_topic_before_login_fails(self, unique_name): + """Test create_topic requires authentication.""" host, port = get_server_config() wait_for_server(host, port) @@ -562,22 +515,75 @@ async def test_get_topic_before_login_fails(self, unique_name): await client.connect() with pytest.raises(RuntimeError): - await client.get_topic(unique_name(), unique_name()) + await client.create_topic( + stream=unique_name(), name=unique_name(), partitions_count=1 + ) + + +class TestGetTopic: + """Test topic retrieval via get_topic.""" @pytest.mark.asyncio - async def test_create_topic_before_connect_fails(self, unique_name): - """Test create_topic requires an established connection.""" + async def test_get_topic_by_name_and_id(self, iggy_client: IggyClient, unique_name): + """Test repeated topic lookup works by both name and numeric id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + topic_by_name = await iggy_client.get_topic(stream_name, topic_name) + assert topic_by_name is not None + + topic_by_name_again = await iggy_client.get_topic(stream_name, topic_name) + assert topic_by_name_again is not None + assert topic_by_name_again.id == topic_by_name.id + assert topic_by_name_again.name == topic_by_name.name + + topic_by_id = await iggy_client.get_topic(stream_name, topic_by_name.id) + assert topic_by_id is not None + assert topic_by_id.id == topic_by_name.id + assert topic_by_id.name == topic_by_name.name + + @pytest.mark.asyncio + async def test_get_nonexistent_topic(self, iggy_client: IggyClient, unique_name): + """Test getting a non-existent topic by name or numeric id.""" + stream_name = unique_name() + nonexistent_topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + + topic_by_name = await iggy_client.get_topic(stream_name, nonexistent_topic_name) + assert topic_by_name is None + + topic_by_id = await iggy_client.get_topic(stream_name, 999999) + assert topic_by_id is None + + @pytest.mark.asyncio + async def test_get_topic_in_nonexistent_stream( + self, iggy_client: IggyClient, unique_name + ): + """Test getting a topic from a non-existent stream returns no topic.""" + nonexistent_stream = unique_name() + topic_name = unique_name() + + topic = await iggy_client.get_topic(nonexistent_stream, topic_name) + assert topic is None + + @pytest.mark.asyncio + async def test_get_topic_before_connect_fails(self, unique_name): + """Test get_topic requires an established connection.""" host, port = get_server_config() client = IggyClient(f"{host}:{port}") with pytest.raises(RuntimeError): - await client.create_topic( - stream=unique_name(), name=unique_name(), partitions_count=1 - ) + await client.get_topic(unique_name(), unique_name()) @pytest.mark.asyncio - async def test_create_topic_before_login_fails(self, unique_name): - """Test create_topic requires authentication.""" + async def test_get_topic_before_login_fails(self, unique_name): + """Test get_topic requires authentication.""" host, port = get_server_config() wait_for_server(host, port) @@ -585,9 +591,7 @@ async def test_create_topic_before_login_fails(self, unique_name): await client.connect() with pytest.raises(RuntimeError): - await client.create_topic( - stream=unique_name(), name=unique_name(), partitions_count=1 - ) + await client.get_topic(unique_name(), unique_name()) class TestGetTopics: From 15310b38957ae5bd86103682aa52d35b31c203f9 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Sat, 27 Jun 2026 20:12:30 -0400 Subject: [PATCH 5/6] test(python): combine before-connect and before-login precondition tests --- foreign/python/tests/test_topic.py | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/foreign/python/tests/test_topic.py b/foreign/python/tests/test_topic.py index c40260cef6..ab94b6545a 100644 --- a/foreign/python/tests/test_topic.py +++ b/foreign/python/tests/test_topic.py @@ -495,25 +495,18 @@ async def test_create_topic_in_nonexistent_stream( ) @pytest.mark.asyncio - async def test_create_topic_before_connect_fails(self, unique_name): - """Test create_topic requires an established connection.""" + async def test_create_topic_requires_connection_and_auth(self, unique_name): + """Test create_topic fails both before connecting and before logging in.""" host, port = get_server_config() - client = IggyClient(f"{host}:{port}") + wait_for_server(host, port) + client = IggyClient(f"{host}:{port}") with pytest.raises(RuntimeError): await client.create_topic( stream=unique_name(), name=unique_name(), partitions_count=1 ) - @pytest.mark.asyncio - async def test_create_topic_before_login_fails(self, unique_name): - """Test create_topic requires authentication.""" - host, port = get_server_config() - wait_for_server(host, port) - - client = IggyClient(f"{host}:{port}") await client.connect() - with pytest.raises(RuntimeError): await client.create_topic( stream=unique_name(), name=unique_name(), partitions_count=1 @@ -573,23 +566,16 @@ async def test_get_topic_in_nonexistent_stream( assert topic is None @pytest.mark.asyncio - async def test_get_topic_before_connect_fails(self, unique_name): - """Test get_topic requires an established connection.""" + async def test_get_topic_requires_connection_and_auth(self, unique_name): + """Test get_topic fails both before connecting and before logging in.""" host, port = get_server_config() - client = IggyClient(f"{host}:{port}") + wait_for_server(host, port) + client = IggyClient(f"{host}:{port}") with pytest.raises(RuntimeError): await client.get_topic(unique_name(), unique_name()) - @pytest.mark.asyncio - async def test_get_topic_before_login_fails(self, unique_name): - """Test get_topic requires authentication.""" - host, port = get_server_config() - wait_for_server(host, port) - - client = IggyClient(f"{host}:{port}") await client.connect() - with pytest.raises(RuntimeError): await client.get_topic(unique_name(), unique_name()) From 6db5ccf76be7059928f6b2ca2fbd6f50d475bd73 Mon Sep 17 00:00:00 2001 From: Matthew Patton Date: Sat, 27 Jun 2026 20:37:08 -0400 Subject: [PATCH 6/6] docs(python): add Args/Returns/Raises docstrings to topic methods --- foreign/python/apache_iggy.pyi | 59 +++++++++++++++++++++++++++++----- foreign/python/src/client.rs | 59 +++++++++++++++++++++++++++++----- 2 files changed, 102 insertions(+), 16 deletions(-) diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 157f714206..8f5fd2ff05 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -307,8 +307,16 @@ class IggyClient: self, stream_id: builtins.str | builtins.int ) -> collections.abc.Awaitable[list[Topic]]: r""" - Gets all topics in the given stream. - Returns a list of topics or a PyRuntimeError on failure. + Get all topics in a stream. + + Args: + stream_id: Stream identifier as `str | int`. + + Returns: + An awaitable that resolves to `list[Topic]`. + + Raises: + PyRuntimeError: If the identifier is invalid or the request fails. """ def update_topic( self, @@ -321,8 +329,25 @@ class IggyClient: max_topic_size: builtins.int | None = None, ) -> collections.abc.Awaitable[None]: r""" - Updates an existing topic with the given parameters. - Returns Ok(()) on successful topic update or a PyRuntimeError on failure. + Update an existing topic. + + This is a full replacement: any optional parameter left unset is reset to + its server default rather than preserved. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + name: New topic name as `str`. + compression_algorithm: Compression algorithm as `str | None`. + replication_factor: Replication factor as `int | None`. + message_expiry: Message expiry as `datetime.timedelta | None`. + max_topic_size: Maximum topic size in bytes as `int | None`. + + Returns: + An awaitable that resolves to `None` when the topic is updated. + + Raises: + PyRuntimeError: If an argument is invalid or the request fails. """ def delete_topic( self, @@ -330,8 +355,17 @@ class IggyClient: topic_id: builtins.str | builtins.int, ) -> collections.abc.Awaitable[None]: r""" - Deletes the topic with the given id from the given stream. - Returns Ok(()) on successful topic deletion or a PyRuntimeError on failure. + Delete a topic from a stream. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + + Returns: + An awaitable that resolves to `None` when the topic is deleted. + + Raises: + PyRuntimeError: If an identifier is invalid or the request fails. """ def purge_topic( self, @@ -339,8 +373,17 @@ class IggyClient: topic_id: builtins.str | builtins.int, ) -> collections.abc.Awaitable[None]: r""" - Purges all messages from the topic with the given id in the given stream. - Returns Ok(()) on successful topic purge or a PyRuntimeError on failure. + Purge all messages from a topic. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + + Returns: + An awaitable that resolves to `None` when the topic is purged. + + Raises: + PyRuntimeError: If an identifier is invalid or the request fails. """ def send_messages( self, diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index 33c8aec2fc..85a567d1dc 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -243,8 +243,16 @@ impl IggyClient { }) } - /// Gets all topics in the given stream. - /// Returns a list of topics or a PyRuntimeError on failure. + /// Get all topics in a stream. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `list[Topic]`. + /// + /// Raises: + /// PyRuntimeError: If the identifier is invalid or the request fails. #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[Topic]]", imports=("collections.abc")))] fn get_topics<'a>( &self, @@ -263,8 +271,25 @@ impl IggyClient { }) } - /// Updates an existing topic with the given parameters. - /// Returns Ok(()) on successful topic update or a PyRuntimeError on failure. + /// Update an existing topic. + /// + /// This is a full replacement: any optional parameter left unset is reset to + /// its server default rather than preserved. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// name: New topic name as `str`. + /// compression_algorithm: Compression algorithm as `str | None`. + /// replication_factor: Replication factor as `int | None`. + /// message_expiry: Message expiry as `datetime.timedelta | None`. + /// max_topic_size: Maximum topic size in bytes as `int | None`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is updated. + /// + /// Raises: + /// PyRuntimeError: If an argument is invalid or the request fails. #[pyo3( signature = (stream_id, topic_id, name, compression_algorithm = None, replication_factor = None, message_expiry = None, max_topic_size = None) )] @@ -320,8 +345,17 @@ impl IggyClient { }) } - /// Deletes the topic with the given id from the given stream. - /// Returns Ok(()) on successful topic deletion or a PyRuntimeError on failure. + /// Delete a topic from a stream. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is deleted. + /// + /// Raises: + /// PyRuntimeError: If an identifier is invalid or the request fails. #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn delete_topic<'a>( &self, @@ -342,8 +376,17 @@ impl IggyClient { }) } - /// Purges all messages from the topic with the given id in the given stream. - /// Returns Ok(()) on successful topic purge or a PyRuntimeError on failure. + /// Purge all messages from a topic. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is purged. + /// + /// Raises: + /// PyRuntimeError: If an identifier is invalid or the request fails. #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] fn purge_topic<'a>( &self,