feat(python): add topic listing, update, delete and purge#3572
feat(python): add topic listing, update, delete and purge#3572mattp5657 wants to merge 6 commits into
Conversation
2db95ac to
02a1008
Compare
02a1008 to
ce94a0d
Compare
There was a problem hiding this comment.
Looks good at a first glance.
As an aside, please do not force push new code. Instead, address the review in well-formed commits. This helps me understand what changed since my last review.
You can write /ready on a new line in a comment to indicate that your PR is ready for review.
| /// Gets all topics in the given stream. | ||
| /// Returns a list of topics or a PyRuntimeError on failure. |
There was a problem hiding this comment.
Let's change this to
/// Get all topics in a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `list[TopicDetails]`.
///
/// Raises:
/// PyRuntimeError: If the identifier is invalid or the request fails.
There was a problem hiding this comment.
Wouldn't this break the pattern already established in client.rs. This also returns a list[Topic] similar to the Rust client. ex.
/// Logs in the user with the given credentials.
/// Returns `Ok(())` on success, or a PyRuntimeError on failure.| /// Updates an existing topic with the given parameters. | ||
| /// Returns Ok(()) on successful topic update or a PyRuntimeError on failure. |
There was a problem hiding this comment.
Change to
/// Update a topic in a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
/// name: Topic name as `str`.
/// compression_algorithm: Compression algorithm as `str | None`. Supported
/// values are `\"none\"` and `\"gzip\"`, case-insensitive.
/// replication_factor: Replication factor as `int | None`. Must be greater
/// than `0` when provided. If `None`, the server uses `1`.
/// message_expiry: Message retention period as `datetime.timedelta | None`.
/// If `None`, the server default is used, which currently means
/// messages do not expire.
/// max_topic_size: Maximum topic size in bytes as `int | None`. Use `0` to
/// request the server default, which is currently unlimited. If `None`,
/// the server default is also used.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is updated.
///
/// Raises:
/// PyRuntimeError: If an identifier or argument is invalid, or the update request fails.
There was a problem hiding this comment.
I'm already doing it for the rest of the functions right now. that's why i asked you to do it for your PR :)
There was a problem hiding this comment.
I'm already doing it for the rest of the functions right now. that's why i asked you to do it for your PR :)
There was a problem hiding this comment.
Awesome, will do thank you for the clarification :)
| /// Deletes the topic with the given id from the given stream. | ||
| /// Returns Ok(()) on successful topic deletion or a PyRuntimeError on failure. |
There was a problem hiding this comment.
Change to
/// Delete a topic from a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is deleted.
///
/// Raises:
/// PyRuntimeError: If an identifier is invalid or the delete request fails.
|
|
||
| /// Purges all messages from the topic with the given id in the given stream. | ||
| /// Returns Ok(()) on successful topic purge or a PyRuntimeError on failure. |
There was a problem hiding this comment.
change to
/// Purge all messages from a topic in a stream.
///
/// Args:
/// stream_id: Stream identifier as `str | int`.
/// topic_id: Topic identifier as `str | int`.
///
/// Returns:
/// An awaitable that resolves to `None` when the topic is purged.
///
/// Raises:
/// PyRuntimeError: If an identifier is invalid or the purge request fails.
| use iggy::prelude::Topic as RustTopic; | ||
| use iggy::prelude::TopicDetails as RustTopicDetails; |
There was a problem hiding this comment.
These can be merged together
| impl Topic { | ||
| #[getter] | ||
| pub fn id(&self) -> u32 { | ||
| self.inner.id | ||
| } | ||
|
|
||
| #[getter] | ||
| pub fn name(&self) -> String { | ||
| self.inner.name.to_string() | ||
| } | ||
|
|
||
| #[getter] | ||
| pub fn messages_count(&self) -> u64 { | ||
| self.inner.messages_count | ||
| } | ||
|
|
||
| #[getter] | ||
| pub fn partitions_count(&self) -> u32 { | ||
| self.inner.partitions_count | ||
| } | ||
| } |
There was a problem hiding this comment.
Lets add getters for the following as well:
- created_at
- size
- message_expiry
- compression_algorithm
- max_topic_size
- replication_factor
Please also add docs to Topic and its getters in the style as shown in the review previously.
There was a problem hiding this comment.
I added getters for compression_algorithm and replication_factor since those are simple scalars. The rest wrap richer types. message_expiry and max_topic_size are tri-state enums (IggyExpiry, MaxTopicSize) that the Rust client returns without losing information, so they need a proper Python representation rather than a one-line getter. I'd like to keep this PR scoped to the topic operations and handle message_expiry, max_topic_size, size, and created_at in a follow-up. I'll open an issue to track it and link it here.
There was a problem hiding this comment.
sure, please go ahead and open an issue
There was a problem hiding this comment.
sure, please go ahead and open an issue
|
|
||
| #[getter] | ||
| pub fn compression_algorithm(&self) -> String { | ||
| self.inner.compression_algorithm.to_string() | ||
| } | ||
|
|
||
| #[getter] | ||
| pub fn replication_factor(&self) -> u8 { | ||
| self.inner.replication_factor | ||
| } |
There was a problem hiding this comment.
Let's add getters for the rest of the fields as well:
- created_at
- size
- message_expiry
- max_topic_size
- partitions
With docs.
There was a problem hiding this comment.
Same situation as the Topic getters. created_at, size, message_expiry, and max_topic_size wrap richer types (timestamp, byte size, and the IggyExpiry / MaxTopicSize tri-state enums), so they need a proper Python representation rather than one-line getters. partitions is a Vec, so exposing it also means adding a Partition class with its own getters. I'd like to handle all of these in the same follow-up to keep this PR scoped to the topic operations, and I'll cover them in the tracking issue.
There was a problem hiding this comment.
The tests look good. I would like you to add some more tests though:
- for all 4 functions, test what happens when you call the function repeatedly.
- merge the tests calling the function before connect, and before login into one. No need for two tests.
- purge_topic on non-existent stream
- purge_topic on existing stream with non-existing topic
- update_topic on non-existent stream
- testing naming collision by trying to update a topic's name to one that another topic already possesses.
- happy path tests for message_expiry and max_topic_size args in update_topic
Some additions is the current tests that can be made:
- after purging the topic, check that all fields except message count remain the same.
- for get_topics, check all fields you supply, or those that are deterministically set by the server. Do not check fields that are set randomly by the server.
I also liked your way of creating a class per function. Could you do that for the rest of the tests in this file? ie. create TestCreateTopic and TestGetTopic classes and split and move the existing tests under TestTopicOperations into these?
|
/author |
|
Appreciate the comments and have addressed most of them. The remaining Topic/TopicDetails getters are tracked in #3577 as a follow-up I'll take on after this merges. |
|
/ready |
Which issue does this PR address?
Closes #3521
Rationale
Completes the Python SDK's topic management by adding the missing list, update, delete, and purge operations, which previously had no Python binding and forced callers to another SDK.
What changed?
The Python SDK exposed only
create_topicandget_topic, so listing, updating, deleting, or purging topics required falling back to another SDK.TopicDetailsalso hidcompression_algorithmandreplication_factor, leaving the result of an update impossible to assert from Python.get_topics,update_topic,delete_topic, andpurge_topicnow bind through to the RustTopicClient, a newTopictype backs the list view, andTopicDetailsexposes the two previously hidden fields. The new methods takestream_idto stay consistent with the Rust trait and the existingget_topic.Local Execution
AI Usage
Claude was used to help generate and review this PR.