Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions foreign/python/apache_iggy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ __all__ = [
"ReceiveMessage",
"SendMessage",
"StreamDetails",
"Topic",
"TopicDetails",
]

Expand Down Expand Up @@ -302,6 +303,88 @@ class IggyClient:
Gets topic by stream and id.
Returns Option of topic details or a PyRuntimeError on failure.
"""
def get_topics(
self, stream_id: builtins.str | builtins.int
) -> collections.abc.Awaitable[list[Topic]]:
r"""
Get all topics in a stream.

Args:
stream_id: Stream identifier as `str | int`.

Returns:
An awaitable that resolves to `list[Topic]`.

Raises:
PyRuntimeError: If the identifier is invalid or the request fails.
"""
def update_topic(
self,
stream_id: builtins.str | builtins.int,
topic_id: builtins.str | builtins.int,
name: builtins.str,
compression_algorithm: builtins.str | None = None,
replication_factor: builtins.int | None = None,
message_expiry: datetime.timedelta | None = None,
max_topic_size: builtins.int | None = None,
) -> collections.abc.Awaitable[None]:
r"""
Update an existing topic.

This is a full replacement: any optional parameter left unset is reset to
its server default rather than preserved.

Args:
stream_id: Stream identifier as `str | int`.
topic_id: Topic identifier as `str | int`.
name: New topic name as `str`.
compression_algorithm: Compression algorithm as `str | None`.
replication_factor: Replication factor as `int | None`.
message_expiry: Message expiry as `datetime.timedelta | None`.
max_topic_size: Maximum topic size in bytes as `int | None`.

Returns:
An awaitable that resolves to `None` when the topic is updated.

Raises:
PyRuntimeError: If an argument is invalid or the request fails.
"""
def delete_topic(
self,
stream_id: builtins.str | builtins.int,
topic_id: builtins.str | builtins.int,
) -> collections.abc.Awaitable[None]:
r"""
Delete a topic from a stream.

Args:
stream_id: Stream identifier as `str | int`.
topic_id: Topic identifier as `str | int`.

Returns:
An awaitable that resolves to `None` when the topic is deleted.

Raises:
PyRuntimeError: If an identifier is invalid or the request fails.
"""
def purge_topic(
self,
stream_id: builtins.str | builtins.int,
topic_id: builtins.str | builtins.int,
) -> collections.abc.Awaitable[None]:
r"""
Purge all messages from a topic.

Args:
stream_id: Stream identifier as `str | int`.
topic_id: Topic identifier as `str | int`.

Returns:
An awaitable that resolves to `None` when the topic is purged.

Raises:
PyRuntimeError: If an identifier is invalid or the request fails.
"""
def send_messages(
self,
stream: builtins.str | builtins.int,
Expand Down Expand Up @@ -519,6 +602,17 @@ class StreamDetails:
@property
def topics_count(self) -> builtins.int: ...

@typing.final
class Topic:
@property
def id(self) -> builtins.int: ...
@property
def name(self) -> builtins.str: ...
@property
def messages_count(self) -> builtins.int: ...
@property
def partitions_count(self) -> builtins.int: ...

@typing.final
class TopicDetails:
@property
Expand All @@ -529,3 +623,7 @@ class TopicDetails:
def messages_count(self) -> builtins.int: ...
@property
def partitions_count(self) -> builtins.int: ...
@property
def compression_algorithm(self) -> builtins.str: ...
@property
def replication_factor(self) -> builtins.int: ...
166 changes: 165 additions & 1 deletion foreign/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::identifier::PyIdentifier;
use crate::receive_message::{PollingStrategy, ReceiveMessage};
use crate::send_message::SendMessage;
use crate::stream::StreamDetails;
use crate::topic::TopicDetails;
use crate::topic::{Topic, TopicDetails};
use tokio::sync::Mutex;

/// A Python class representing the Iggy client.
Expand Down Expand Up @@ -243,6 +243,170 @@ impl IggyClient {
})
}

/// Get all topics in a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `list[Topic]`.
///
/// Raises:
/// PyRuntimeError: If the identifier is invalid or the request fails.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[list[Topic]]", imports=("collections.abc")))]
fn get_topics<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
) -> PyResult<Bound<'a, PyAny>> {
let stream_id = Identifier::try_from(stream_id)?;
let inner = self.inner.clone();

future_into_py(py, async move {
let topics = inner
.get_topics(&stream_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(topics.into_iter().map(Topic::from).collect::<Vec<_>>())
})
}

/// Update an existing topic.
///
/// This is a full replacement: any optional parameter left unset is reset to
/// its server default rather than preserved.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
/// name: New topic name as `str`.
/// compression_algorithm: Compression algorithm as `str | None`.
/// replication_factor: Replication factor as `int | None`.
/// message_expiry: Message expiry as `datetime.timedelta | None`.
/// max_topic_size: Maximum topic size in bytes as `int | None`.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is updated.
///
/// Raises:
/// PyRuntimeError: If an argument is invalid or the request fails.
#[pyo3(
signature = (stream_id, topic_id, name, compression_algorithm = None, replication_factor = None, message_expiry = None, max_topic_size = None)
)]
#[allow(clippy::too_many_arguments)]
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn update_topic<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
name: String,
#[gen_stub(override_type(type_repr = "builtins.str | None"))] compression_algorithm: Option<
String,
>,
#[gen_stub(override_type(type_repr = "builtins.int | None"))] replication_factor: Option<
u8,
>,
#[gen_stub(override_type(type_repr = "datetime.timedelta | None", imports=("datetime")))]
message_expiry: Option<Py<PyDelta>>,
#[gen_stub(override_type(type_repr = "builtins.int | None"))] max_topic_size: Option<u64>,
) -> PyResult<Bound<'a, PyAny>> {
let compression_algorithm = match compression_algorithm {
Some(algo) => CompressionAlgorithm::from_str(&algo)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?,
None => CompressionAlgorithm::default(),
};

let expiry = match message_expiry {
Some(delta) => IggyExpiry::ExpireDuration(py_delta_to_iggy_duration(&delta)),
None => IggyExpiry::ServerDefault,
};

let max_size = max_topic_size.map_or(MaxTopicSize::ServerDefault, MaxTopicSize::from);

let stream_id = Identifier::try_from(stream_id)?;
let topic_id = Identifier::try_from(topic_id)?;
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.update_topic(
&stream_id,
&topic_id,
&name,
compression_algorithm,
replication_factor,
expiry,
max_size,
)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(())
})
}

/// Delete a topic from a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is deleted.
///
/// Raises:
/// PyRuntimeError: If an identifier is invalid or the request fails.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn delete_topic<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
) -> PyResult<Bound<'a, PyAny>> {
let stream_id = Identifier::try_from(stream_id)?;
let topic_id = Identifier::try_from(topic_id)?;
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.delete_topic(&stream_id, &topic_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(())
})
}

/// Purge all messages from a topic.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is purged.
///
/// Raises:
/// PyRuntimeError: If an identifier is invalid or the request fails.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
fn purge_topic<'a>(
&self,
py: Python<'a>,
stream_id: PyIdentifier,
topic_id: PyIdentifier,
) -> PyResult<Bound<'a, PyAny>> {
let stream_id = Identifier::try_from(stream_id)?;
let topic_id = Identifier::try_from(topic_id)?;
let inner = self.inner.clone();

future_into_py(py, async move {
inner
.purge_topic(&stream_id, &topic_id)
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
Ok(())
})
}

/// Sends a list of messages to the specified topic.
/// Returns Ok(()) on successful sending or a PyRuntimeError on failure.
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))]
Expand Down
3 changes: 2 additions & 1 deletion foreign/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use pyo3::prelude::*;
use receive_message::{PollingStrategy, ReceiveMessage};
use send_message::SendMessage;
use stream::StreamDetails;
use topic::TopicDetails;
use topic::{Topic, TopicDetails};

/// A Python module implemented in Rust.
#[pymodule]
Expand All @@ -38,6 +38,7 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<ReceiveMessage>()?;
m.add_class::<IggyClient>()?;
m.add_class::<StreamDetails>()?;
m.add_class::<Topic>()?;
m.add_class::<TopicDetails>()?;
m.add_class::<PollingStrategy>()?;
m.add_class::<IggyConsumer>()?;
Expand Down
48 changes: 47 additions & 1 deletion foreign/python/src/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,46 @@
// specific language governing permissions and limitations
// under the License.

use iggy::prelude::TopicDetails as RustTopicDetails;
use iggy::prelude::{Topic as RustTopic, TopicDetails as RustTopicDetails};
use pyo3::prelude::*;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};

#[gen_stub_pyclass]
#[pyclass]
pub struct Topic {
pub(crate) inner: RustTopic,
}

impl From<RustTopic> for Topic {
fn from(topic: RustTopic) -> Self {
Self { inner: topic }
}
}

#[gen_stub_pymethods]
#[pymethods]
impl Topic {
#[getter]
pub fn id(&self) -> u32 {
self.inner.id
}

#[getter]
pub fn name(&self) -> String {
self.inner.name.to_string()
}

#[getter]
pub fn messages_count(&self) -> u64 {
self.inner.messages_count
}

#[getter]
pub fn partitions_count(&self) -> u32 {
self.inner.partitions_count
}
}
Comment on lines +36 to +56

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add getters for the following as well:

  • created_at
  • size
  • message_expiry
  • compression_algorithm
  • max_topic_size
  • replication_factor

Please also add docs to Topic and its getters in the style as shown in the review previously.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added getters for compression_algorithm and replication_factor since those are simple scalars. The rest wrap richer types. message_expiry and max_topic_size are tri-state enums (IggyExpiry, MaxTopicSize) that the Rust client returns without losing information, so they need a proper Python representation rather than a one-line getter. I'd like to keep this PR scoped to the topic operations and handle message_expiry, max_topic_size, size, and created_at in a follow-up. I'll open an issue to track it and link it here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, please go ahead and open an issue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, please go ahead and open an issue


#[gen_stub_pyclass]
#[pyclass]
pub struct TopicDetails {
Expand Down Expand Up @@ -55,4 +91,14 @@ impl TopicDetails {
pub fn partitions_count(&self) -> u32 {
self.inner.partitions_count
}

#[getter]
pub fn compression_algorithm(&self) -> String {
self.inner.compression_algorithm.to_string()
}

#[getter]
pub fn replication_factor(&self) -> u8 {
self.inner.replication_factor
}
Comment on lines +94 to +103

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add getters for the rest of the fields as well:

  • created_at
  • size
  • message_expiry
  • max_topic_size
  • partitions

With docs.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same situation as the Topic getters. created_at, size, message_expiry, and max_topic_size wrap richer types (timestamp, byte size, and the IggyExpiry / MaxTopicSize tri-state enums), so they need a proper Python representation rather than one-line getters. partitions is a Vec, so exposing it also means adding a Partition class with its own getters. I'd like to handle all of these in the same follow-up to keep this PR scoped to the topic operations, and I'll cover them in the tracking issue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

}
Loading