diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 4340949c72..8f5fd2ff05 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -34,6 +34,7 @@ __all__ = [ "ReceiveMessage", "SendMessage", "StreamDetails", + "Topic", "TopicDetails", ] @@ -302,6 +303,88 @@ class IggyClient: Gets topic by stream and id. Returns Option of topic details or a PyRuntimeError on failure. """ + def get_topics( + self, stream_id: builtins.str | builtins.int + ) -> collections.abc.Awaitable[list[Topic]]: + r""" + Get all topics in a stream. + + Args: + stream_id: Stream identifier as `str | int`. + + Returns: + An awaitable that resolves to `list[Topic]`. + + Raises: + PyRuntimeError: If the identifier is invalid or the request fails. + """ + def update_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + name: builtins.str, + compression_algorithm: builtins.str | None = None, + replication_factor: builtins.int | None = None, + message_expiry: datetime.timedelta | None = None, + max_topic_size: builtins.int | None = None, + ) -> collections.abc.Awaitable[None]: + r""" + Update an existing topic. + + This is a full replacement: any optional parameter left unset is reset to + its server default rather than preserved. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + name: New topic name as `str`. + compression_algorithm: Compression algorithm as `str | None`. + replication_factor: Replication factor as `int | None`. + message_expiry: Message expiry as `datetime.timedelta | None`. + max_topic_size: Maximum topic size in bytes as `int | None`. + + Returns: + An awaitable that resolves to `None` when the topic is updated. + + Raises: + PyRuntimeError: If an argument is invalid or the request fails. + """ + def delete_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + ) -> collections.abc.Awaitable[None]: + r""" + Delete a topic from a stream. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + + Returns: + An awaitable that resolves to `None` when the topic is deleted. + + Raises: + PyRuntimeError: If an identifier is invalid or the request fails. + """ + def purge_topic( + self, + stream_id: builtins.str | builtins.int, + topic_id: builtins.str | builtins.int, + ) -> collections.abc.Awaitable[None]: + r""" + Purge all messages from a topic. + + Args: + stream_id: Stream identifier as `str | int`. + topic_id: Topic identifier as `str | int`. + + Returns: + An awaitable that resolves to `None` when the topic is purged. + + Raises: + PyRuntimeError: If an identifier is invalid or the request fails. + """ def send_messages( self, stream: builtins.str | builtins.int, @@ -519,6 +602,17 @@ class StreamDetails: @property def topics_count(self) -> builtins.int: ... +@typing.final +class Topic: + @property + def id(self) -> builtins.int: ... + @property + def name(self) -> builtins.str: ... + @property + def messages_count(self) -> builtins.int: ... + @property + def partitions_count(self) -> builtins.int: ... + @typing.final class TopicDetails: @property @@ -529,3 +623,7 @@ class TopicDetails: def messages_count(self) -> builtins.int: ... @property def partitions_count(self) -> builtins.int: ... + @property + def compression_algorithm(self) -> builtins.str: ... + @property + def replication_factor(self) -> builtins.int: ... diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index 0c19c10aee..85a567d1dc 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -33,7 +33,7 @@ use crate::identifier::PyIdentifier; use crate::receive_message::{PollingStrategy, ReceiveMessage}; use crate::send_message::SendMessage; use crate::stream::StreamDetails; -use crate::topic::TopicDetails; +use crate::topic::{Topic, TopicDetails}; use tokio::sync::Mutex; /// A Python class representing the Iggy client. @@ -243,6 +243,170 @@ impl IggyClient { }) } + /// Get all topics in a stream. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `list[Topic]`. + /// + /// Raises: + /// PyRuntimeError: If the identifier is invalid or the request fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[Topic]]", imports=("collections.abc")))] + fn get_topics<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + let topics = inner + .get_topics(&stream_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(topics.into_iter().map(Topic::from).collect::>()) + }) + } + + /// Update an existing topic. + /// + /// This is a full replacement: any optional parameter left unset is reset to + /// its server default rather than preserved. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// name: New topic name as `str`. + /// compression_algorithm: Compression algorithm as `str | None`. + /// replication_factor: Replication factor as `int | None`. + /// message_expiry: Message expiry as `datetime.timedelta | None`. + /// max_topic_size: Maximum topic size in bytes as `int | None`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is updated. + /// + /// Raises: + /// PyRuntimeError: If an argument is invalid or the request fails. + #[pyo3( + signature = (stream_id, topic_id, name, compression_algorithm = None, replication_factor = None, message_expiry = None, max_topic_size = None) + )] + #[allow(clippy::too_many_arguments)] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn update_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + name: String, + #[gen_stub(override_type(type_repr = "builtins.str | None"))] compression_algorithm: Option< + String, + >, + #[gen_stub(override_type(type_repr = "builtins.int | None"))] replication_factor: Option< + u8, + >, + #[gen_stub(override_type(type_repr = "datetime.timedelta | None", imports=("datetime")))] + message_expiry: Option>, + #[gen_stub(override_type(type_repr = "builtins.int | None"))] max_topic_size: Option, + ) -> PyResult> { + let compression_algorithm = match compression_algorithm { + Some(algo) => CompressionAlgorithm::from_str(&algo) + .map_err(|e| PyErr::new::(e.to_string()))?, + None => CompressionAlgorithm::default(), + }; + + let expiry = match message_expiry { + Some(delta) => IggyExpiry::ExpireDuration(py_delta_to_iggy_duration(&delta)), + None => IggyExpiry::ServerDefault, + }; + + let max_size = max_topic_size.map_or(MaxTopicSize::ServerDefault, MaxTopicSize::from); + + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .update_topic( + &stream_id, + &topic_id, + &name, + compression_algorithm, + replication_factor, + expiry, + max_size, + ) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + + /// Delete a topic from a stream. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is deleted. + /// + /// Raises: + /// PyRuntimeError: If an identifier is invalid or the request fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn delete_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .delete_topic(&stream_id, &topic_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + + /// Purge all messages from a topic. + /// + /// Args: + /// stream_id: Stream identifier as `str | int`. + /// topic_id: Topic identifier as `str | int`. + /// + /// Returns: + /// An awaitable that resolves to `None` when the topic is purged. + /// + /// Raises: + /// PyRuntimeError: If an identifier is invalid or the request fails. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn purge_topic<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + topic_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::try_from(stream_id)?; + let topic_id = Identifier::try_from(topic_id)?; + let inner = self.inner.clone(); + + future_into_py(py, async move { + inner + .purge_topic(&stream_id, &topic_id) + .await + .map_err(|e| PyErr::new::(e.to_string()))?; + Ok(()) + }) + } + /// Sends a list of messages to the specified topic. /// Returns Ok(()) on successful sending or a PyRuntimeError on failure. #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] diff --git a/foreign/python/src/lib.rs b/foreign/python/src/lib.rs index d7b675ce0d..59b62608ba 100644 --- a/foreign/python/src/lib.rs +++ b/foreign/python/src/lib.rs @@ -29,7 +29,7 @@ use pyo3::prelude::*; use receive_message::{PollingStrategy, ReceiveMessage}; use send_message::SendMessage; use stream::StreamDetails; -use topic::TopicDetails; +use topic::{Topic, TopicDetails}; /// A Python module implemented in Rust. #[pymodule] @@ -38,6 +38,7 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/foreign/python/src/topic.rs b/foreign/python/src/topic.rs index 6784b6e4ee..7d8b6b69b2 100644 --- a/foreign/python/src/topic.rs +++ b/foreign/python/src/topic.rs @@ -15,10 +15,46 @@ // specific language governing permissions and limitations // under the License. -use iggy::prelude::TopicDetails as RustTopicDetails; +use iggy::prelude::{Topic as RustTopic, TopicDetails as RustTopicDetails}; use pyo3::prelude::*; use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods}; +#[gen_stub_pyclass] +#[pyclass] +pub struct Topic { + pub(crate) inner: RustTopic, +} + +impl From for Topic { + fn from(topic: RustTopic) -> Self { + Self { inner: topic } + } +} + +#[gen_stub_pymethods] +#[pymethods] +impl Topic { + #[getter] + pub fn id(&self) -> u32 { + self.inner.id + } + + #[getter] + pub fn name(&self) -> String { + self.inner.name.to_string() + } + + #[getter] + pub fn messages_count(&self) -> u64 { + self.inner.messages_count + } + + #[getter] + pub fn partitions_count(&self) -> u32 { + self.inner.partitions_count + } +} + #[gen_stub_pyclass] #[pyclass] pub struct TopicDetails { @@ -55,4 +91,14 @@ impl TopicDetails { pub fn partitions_count(&self) -> u32 { self.inner.partitions_count } + + #[getter] + pub fn compression_algorithm(&self) -> String { + self.inner.compression_algorithm.to_string() + } + + #[getter] + pub fn replication_factor(&self) -> u8 { + self.inner.replication_factor + } } diff --git a/foreign/python/tests/test_topic.py b/foreign/python/tests/test_topic.py index 76f60b0a44..ab94b6545a 100644 --- a/foreign/python/tests/test_topic.py +++ b/foreign/python/tests/test_topic.py @@ -19,13 +19,13 @@ import pytest -from apache_iggy import IggyClient +from apache_iggy import IggyClient, SendMessage from .utils import get_server_config, wait_for_ping, wait_for_server -class TestTopicOperations: - """Test topic creation, retrieval, and management.""" +class TestCreateTopic: + """Test topic creation via create_topic.""" @pytest.mark.asyncio @pytest.mark.parametrize( @@ -109,30 +109,6 @@ async def test_create_topic_invalid_names( stream=stream_name, name=topic_name, partitions_count=1 ) - @pytest.mark.asyncio - async def test_get_topic_by_name_and_id(self, iggy_client: IggyClient, unique_name): - """Test repeated topic lookup works by both name and numeric id.""" - stream_name = unique_name() - topic_name = unique_name() - - await iggy_client.create_stream(stream_name) - await iggy_client.create_topic( - stream=stream_name, name=topic_name, partitions_count=1 - ) - - topic_by_name = await iggy_client.get_topic(stream_name, topic_name) - assert topic_by_name is not None - - topic_by_name_again = await iggy_client.get_topic(stream_name, topic_name) - assert topic_by_name_again is not None - assert topic_by_name_again.id == topic_by_name.id - assert topic_by_name_again.name == topic_by_name.name - - topic_by_id = await iggy_client.get_topic(stream_name, topic_by_name.id) - assert topic_by_id is not None - assert topic_by_id.id == topic_by_name.id - assert topic_by_id.name == topic_by_name.name - @pytest.mark.asyncio async def test_create_and_get_topic_with_numeric_stream_id( self, iggy_client: IggyClient, unique_name @@ -177,31 +153,6 @@ async def test_duplicate_topic_creation(self, iggy_client: IggyClient, unique_na assert "already exists" in str(exc_info.value) - @pytest.mark.asyncio - async def test_get_nonexistent_topic(self, iggy_client: IggyClient, unique_name): - """Test getting a non-existent topic by name or numeric id.""" - stream_name = unique_name() - nonexistent_topic_name = unique_name() - - await iggy_client.create_stream(stream_name) - - topic_by_name = await iggy_client.get_topic(stream_name, nonexistent_topic_name) - assert topic_by_name is None - - topic_by_id = await iggy_client.get_topic(stream_name, 999999) - assert topic_by_id is None - - @pytest.mark.asyncio - async def test_get_topic_in_nonexistent_stream( - self, iggy_client: IggyClient, unique_name - ): - """Test getting a topic from a non-existent stream returns no topic.""" - nonexistent_stream = unique_name() - topic_name = unique_name() - - topic = await iggy_client.get_topic(nonexistent_stream, topic_name) - assert topic is None - @pytest.mark.asyncio async def test_topic_names_can_repeat_across_different_streams( self, iggy_client: IggyClient, unique_name @@ -544,47 +495,795 @@ async def test_create_topic_in_nonexistent_stream( ) @pytest.mark.asyncio - async def test_get_topic_before_connect_fails(self, unique_name): - """Test get_topic requires an established connection.""" + async def test_create_topic_requires_connection_and_auth(self, unique_name): + """Test create_topic fails both before connecting and before logging in.""" host, port = get_server_config() + wait_for_server(host, port) + client = IggyClient(f"{host}:{port}") + with pytest.raises(RuntimeError): + await client.create_topic( + stream=unique_name(), name=unique_name(), partitions_count=1 + ) + await client.connect() with pytest.raises(RuntimeError): - await client.get_topic(unique_name(), unique_name()) + await client.create_topic( + stream=unique_name(), name=unique_name(), partitions_count=1 + ) + + +class TestGetTopic: + """Test topic retrieval via get_topic.""" + + @pytest.mark.asyncio + async def test_get_topic_by_name_and_id(self, iggy_client: IggyClient, unique_name): + """Test repeated topic lookup works by both name and numeric id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + topic_by_name = await iggy_client.get_topic(stream_name, topic_name) + assert topic_by_name is not None + + topic_by_name_again = await iggy_client.get_topic(stream_name, topic_name) + assert topic_by_name_again is not None + assert topic_by_name_again.id == topic_by_name.id + assert topic_by_name_again.name == topic_by_name.name + + topic_by_id = await iggy_client.get_topic(stream_name, topic_by_name.id) + assert topic_by_id is not None + assert topic_by_id.id == topic_by_name.id + assert topic_by_id.name == topic_by_name.name + + @pytest.mark.asyncio + async def test_get_nonexistent_topic(self, iggy_client: IggyClient, unique_name): + """Test getting a non-existent topic by name or numeric id.""" + stream_name = unique_name() + nonexistent_topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + + topic_by_name = await iggy_client.get_topic(stream_name, nonexistent_topic_name) + assert topic_by_name is None + + topic_by_id = await iggy_client.get_topic(stream_name, 999999) + assert topic_by_id is None + + @pytest.mark.asyncio + async def test_get_topic_in_nonexistent_stream( + self, iggy_client: IggyClient, unique_name + ): + """Test getting a topic from a non-existent stream returns no topic.""" + nonexistent_stream = unique_name() + topic_name = unique_name() + + topic = await iggy_client.get_topic(nonexistent_stream, topic_name) + assert topic is None @pytest.mark.asyncio - async def test_get_topic_before_login_fails(self, unique_name): - """Test get_topic requires authentication.""" + async def test_get_topic_requires_connection_and_auth(self, unique_name): + """Test get_topic fails both before connecting and before logging in.""" host, port = get_server_config() wait_for_server(host, port) client = IggyClient(f"{host}:{port}") - await client.connect() + with pytest.raises(RuntimeError): + await client.get_topic(unique_name(), unique_name()) + await client.connect() with pytest.raises(RuntimeError): await client.get_topic(unique_name(), unique_name()) + +class TestGetTopics: + """Test listing topics in a stream via get_topics.""" + @pytest.mark.asyncio - async def test_create_topic_before_connect_fails(self, unique_name): - """Test create_topic requires an established connection.""" - host, port = get_server_config() - client = IggyClient(f"{host}:{port}") + async def test_get_topics_in_empty_stream_returns_empty_list( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics returns an empty list for a stream with no topics.""" + stream_name = unique_name() - with pytest.raises(RuntimeError): - await client.create_topic( - stream=unique_name(), name=unique_name(), partitions_count=1 + await iggy_client.create_stream(stream_name) + + topics = await iggy_client.get_topics(stream_name) + assert topics == [] + + @pytest.mark.asyncio + @pytest.mark.parametrize("topic_count", [1, 3, 5]) + async def test_get_topics_returns_all_created_topics( + self, iggy_client: IggyClient, unique_name, topic_count: int + ): + """Test get_topics returns every topic created in the stream.""" + stream_name = unique_name() + topic_names = {unique_name() for _ in range(topic_count)} + + await iggy_client.create_stream(stream_name) + for name in topic_names: + await iggy_client.create_topic( + stream=stream_name, name=name, partitions_count=1 + ) + + topics = await iggy_client.get_topics(stream_name) + assert len(topics) == len(topic_names) + assert {topic.name for topic in topics} == topic_names + # Only assert fields supplied at creation or deterministically set by + # the server: each topic keeps its partition count and a fresh topic + # holds no messages. The numeric id is server-assigned and not checked. + assert all(topic.partitions_count == 1 for topic in topics) + assert all(topic.messages_count == 0 for topic in topics) + + @pytest.mark.asyncio + async def test_get_topics_returns_same_result_when_called_repeatedly( + self, iggy_client: IggyClient, unique_name + ): + """Test repeated get_topics calls return a stable view of the stream.""" + stream_name = unique_name() + topic_names = {unique_name() for _ in range(3)} + + await iggy_client.create_stream(stream_name) + for name in topic_names: + await iggy_client.create_topic( + stream=stream_name, name=name, partitions_count=1 ) + first = await iggy_client.get_topics(stream_name) + second = await iggy_client.get_topics(stream_name) + assert {topic.name for topic in first} == topic_names + assert {topic.name for topic in second} == topic_names + assert {topic.id for topic in first} == {topic.id for topic in second} + + @pytest.mark.asyncio + async def test_get_topics_accepts_numeric_stream_id( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics works when the stream is referenced by numeric id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + stream = await iggy_client.get_stream(stream_name) + assert stream is not None + + topics_by_name = await iggy_client.get_topics(stream_name) + topics_by_id = await iggy_client.get_topics(stream.id) + assert len(topics_by_name) == 1 + assert len(topics_by_id) == 1 + assert topics_by_id[0].name == topic_name + + @pytest.mark.asyncio + async def test_get_topics_in_nonexistent_stream_returns_empty( + self, iggy_client: IggyClient, unique_name + ): + """Test get_topics returns an empty list for a non-existent stream.""" + topics = await iggy_client.get_topics(unique_name()) + assert topics == [] + @pytest.mark.asyncio - async def test_create_topic_before_login_fails(self, unique_name): - """Test create_topic requires authentication.""" + async def test_get_topics_requires_connection_and_auth(self, unique_name): + """Test get_topics fails both before connecting and before logging in.""" host, port = get_server_config() wait_for_server(host, port) client = IggyClient(f"{host}:{port}") - await client.connect() + with pytest.raises(RuntimeError): + await client.get_topics(unique_name()) + await client.connect() with pytest.raises(RuntimeError): - await client.create_topic( - stream=unique_name(), name=unique_name(), partitions_count=1 + await client.get_topics(unique_name()) + + +class TestUpdateTopic: + """Test updating topics via update_topic.""" + + @pytest.mark.asyncio + async def test_update_topic_renames_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic renames a topic; old name no longer resolves.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=new_name + ) + + renamed = await iggy_client.get_topic(stream_name, new_name) + assert renamed is not None + assert renamed.name == new_name + + old = await iggy_client.get_topic(stream_name, topic_name) + assert old is None + + @pytest.mark.asyncio + async def test_update_topic_preserves_id( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic keeps the same numeric id after a rename.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + before = await iggy_client.get_topic(stream_name, topic_name) + assert before is not None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=new_name + ) + + after = await iggy_client.get_topic(stream_name, new_name) + assert after is not None + assert after.id == before.id + + @pytest.mark.asyncio + async def test_update_topic_by_numeric_id( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic accepts a numeric topic id.""" + stream_name = unique_name() + topic_name = unique_name() + new_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic.id, name=new_name + ) + + renamed = await iggy_client.get_topic(stream_name, new_name) + assert renamed is not None + assert renamed.name == new_name + + @pytest.mark.asyncio + @pytest.mark.parametrize("compression_algorithm", ["gzip", "none"]) + async def test_update_topic_with_valid_compression_algorithm( + self, iggy_client: IggyClient, unique_name, compression_algorithm: str + ): + """Test update_topic accepts a supported compression algorithm.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + compression_algorithm=compression_algorithm, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + assert topic.compression_algorithm == compression_algorithm + + @pytest.mark.asyncio + @pytest.mark.parametrize("compression_algorithm", ["brotli", "gzipp", ""]) + async def test_update_topic_invalid_compression_algorithm( + self, iggy_client: IggyClient, unique_name, compression_algorithm: str + ): + """Test update_topic rejects unsupported compression algorithm values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(RuntimeError, match="Unknown compression type"): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + compression_algorithm=compression_algorithm, ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("invalid_message_expiry", [1, "1s", object()]) + async def test_update_topic_invalid_message_expiry( + self, iggy_client: IggyClient, unique_name, invalid_message_expiry + ): + """Test update_topic rejects message_expiry values that are not timedeltas.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(TypeError): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + message_expiry=invalid_message_expiry, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize("replication_factor", [0, 1, 255]) + async def test_update_topic_with_valid_replication_factor( + self, iggy_client: IggyClient, unique_name, replication_factor: int + ): + """Test update_topic accepts a supported replication factor.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + replication_factor=replication_factor, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + # The server normalizes a replication factor of 0 to 1. + assert topic.replication_factor == (replication_factor or 1) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("replication_factor", "expected_exception"), + [ + (-1, OverflowError), + (256, OverflowError), + ("1", TypeError), + (1.0, TypeError), + ], + ) + async def test_update_topic_invalid_replication_factor( + self, + iggy_client: IggyClient, + unique_name, + replication_factor, + expected_exception, + ): + """Test update_topic rejects invalid replication factor values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(expected_exception): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + replication_factor=replication_factor, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ("max_topic_size", "expected_exception"), + [ + (-1, OverflowError), + (2e64, TypeError), + ], + ) + async def test_update_topic_invalid_max_topic_size( + self, + iggy_client: IggyClient, + unique_name, + max_topic_size, + expected_exception, + ): + """Test update_topic rejects invalid maximum topic size values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + with pytest.raises(expected_exception): + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + max_topic_size=max_topic_size, + ) + + @pytest.mark.asyncio + async def test_update_topic_with_message_expiry( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic accepts an explicit message expiry.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + message_expiry=timedelta(minutes=10), + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + + @pytest.mark.asyncio + @pytest.mark.parametrize("max_topic_size", [0, 2_000_000_000, 2**64 - 1]) + async def test_update_topic_with_valid_max_topic_size( + self, iggy_client: IggyClient, unique_name, max_topic_size: int + ): + """Test update_topic accepts supported maximum topic size values.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, + topic_id=topic_name, + name=topic_name, + max_topic_size=max_topic_size, + ) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.name == topic_name + + @pytest.mark.asyncio + async def test_update_topic_applies_repeated_updates( + self, iggy_client: IggyClient, unique_name + ): + """Test successive update_topic calls each take effect.""" + stream_name = unique_name() + topic_name = unique_name() + first_rename = unique_name() + second_rename = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=topic_name, name=first_rename + ) + after_first = await iggy_client.get_topic(stream_name, first_rename) + assert after_first is not None + assert after_first.name == first_rename + assert await iggy_client.get_topic(stream_name, topic_name) is None + + await iggy_client.update_topic( + stream_id=stream_name, topic_id=first_rename, name=second_rename + ) + after_second = await iggy_client.get_topic(stream_name, second_rename) + assert after_second is not None + assert after_second.name == second_rename + assert await iggy_client.get_topic(stream_name, first_rename) is None + + @pytest.mark.asyncio + async def test_update_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic raises for a non-existent topic.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.update_topic( + stream_id=stream_name, topic_id=unique_name(), name=unique_name() + ) + + @pytest.mark.asyncio + async def test_update_topic_in_nonexistent_stream_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic raises when the stream does not exist.""" + with pytest.raises(RuntimeError): + await iggy_client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + + @pytest.mark.asyncio + async def test_update_topic_to_existing_name_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test update_topic rejects renaming a topic to a name already in use.""" + stream_name = unique_name() + first_topic = unique_name() + second_topic = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=first_topic, partitions_count=1 + ) + await iggy_client.create_topic( + stream=stream_name, name=second_topic, partitions_count=1 + ) + + with pytest.raises(RuntimeError): + await iggy_client.update_topic( + stream_id=stream_name, topic_id=second_topic, name=first_topic + ) + + @pytest.mark.asyncio + async def test_update_topic_requires_connection_and_auth(self, unique_name): + """Test update_topic fails both before connecting and before logging in.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + with pytest.raises(RuntimeError): + await client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + + await client.connect() + with pytest.raises(RuntimeError): + await client.update_topic( + stream_id=unique_name(), topic_id=unique_name(), name=unique_name() + ) + + +class TestDeleteTopic: + """Test deleting topics via delete_topic.""" + + @pytest.mark.asyncio + async def test_delete_topic_removes_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic removes the topic and drops the stream count.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_name) + + assert await iggy_client.get_topic(stream_name, topic_name) is None + assert await iggy_client.get_topics(stream_name) == [] + + stream = await iggy_client.get_stream(stream_name) + assert stream is not None + assert stream.topics_count == 0 + + @pytest.mark.asyncio + async def test_delete_topic_by_numeric_id( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic accepts a numeric topic id.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + + await iggy_client.delete_topic(stream_name, topic.id) + + assert await iggy_client.get_topic(stream_name, topic_name) is None + + @pytest.mark.asyncio + async def test_delete_topic_leaves_other_topics( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic removes only the targeted topic.""" + stream_name = unique_name() + topic_to_delete = unique_name() + topic_to_keep = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_to_delete, partitions_count=1 + ) + await iggy_client.create_topic( + stream=stream_name, name=topic_to_keep, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_to_delete) + + remaining = await iggy_client.get_topics(stream_name) + assert len(remaining) == 1 + assert remaining[0].name == topic_to_keep + + @pytest.mark.asyncio + async def test_delete_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test delete_topic raises for a non-existent topic.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.delete_topic(stream_name, unique_name()) + + @pytest.mark.asyncio + async def test_delete_topic_twice_fails_second_time( + self, iggy_client: IggyClient, unique_name + ): + """Test deleting an already-deleted topic raises on the second call.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.delete_topic(stream_name, topic_name) + with pytest.raises(RuntimeError): + await iggy_client.delete_topic(stream_name, topic_name) + + @pytest.mark.asyncio + async def test_delete_topic_requires_connection_and_auth(self, unique_name): + """Test delete_topic fails both before connecting and before logging in.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + with pytest.raises(RuntimeError): + await client.delete_topic(unique_name(), unique_name()) + + await client.connect() + with pytest.raises(RuntimeError): + await client.delete_topic(unique_name(), unique_name()) + + +class TestPurgeTopic: + """Test purging topic messages via purge_topic.""" + + @pytest.mark.asyncio + async def test_purge_topic_clears_messages_but_keeps_topic( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic empties the topic while leaving it in place.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + messages = [SendMessage(f"payload-{index}") for index in range(5)] + await iggy_client.send_messages(stream_name, topic_name, 0, messages) + + before = await iggy_client.get_topic(stream_name, topic_name) + assert before is not None + assert before.messages_count == 5 + + await iggy_client.purge_topic(stream_name, topic_name) + + after = await iggy_client.get_topic(stream_name, topic_name) + assert after is not None + assert after.messages_count == 0 + # Purging clears messages only; every other field is left unchanged. + assert after.id == before.id + assert after.name == before.name + assert after.partitions_count == before.partitions_count + assert after.compression_algorithm == before.compression_algorithm + assert after.replication_factor == before.replication_factor + + @pytest.mark.asyncio + async def test_purge_empty_topic_succeeds( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic is a no-op on a topic with no messages.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + await iggy_client.purge_topic(stream_name, topic_name) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.messages_count == 0 + + @pytest.mark.asyncio + async def test_purge_topic_is_idempotent_when_called_repeatedly( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic succeeds when called repeatedly on the same topic.""" + stream_name = unique_name() + topic_name = unique_name() + + await iggy_client.create_stream(stream_name) + await iggy_client.create_topic( + stream=stream_name, name=topic_name, partitions_count=1 + ) + + messages = [SendMessage(f"payload-{index}") for index in range(5)] + await iggy_client.send_messages(stream_name, topic_name, 0, messages) + + await iggy_client.purge_topic(stream_name, topic_name) + await iggy_client.purge_topic(stream_name, topic_name) + + topic = await iggy_client.get_topic(stream_name, topic_name) + assert topic is not None + assert topic.messages_count == 0 + + @pytest.mark.asyncio + async def test_purge_nonexistent_topic_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic raises for a non-existent topic in an existing stream.""" + stream_name = unique_name() + + await iggy_client.create_stream(stream_name) + + with pytest.raises(RuntimeError): + await iggy_client.purge_topic(stream_name, unique_name()) + + @pytest.mark.asyncio + async def test_purge_topic_in_nonexistent_stream_fails( + self, iggy_client: IggyClient, unique_name + ): + """Test purge_topic raises when the stream does not exist.""" + with pytest.raises(RuntimeError): + await iggy_client.purge_topic(unique_name(), unique_name()) + + @pytest.mark.asyncio + async def test_purge_topic_requires_connection_and_auth(self, unique_name): + """Test purge_topic fails both before connecting and before logging in.""" + host, port = get_server_config() + wait_for_server(host, port) + + client = IggyClient(f"{host}:{port}") + with pytest.raises(RuntimeError): + await client.purge_topic(unique_name(), unique_name()) + + await client.connect() + with pytest.raises(RuntimeError): + await client.purge_topic(unique_name(), unique_name())