From 64c58e8498ea9857f3b9d8b05b00cc9f03a957f0 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 30 Jan 2026 10:20:19 +0100 Subject: [PATCH 1/2] Make cleanup functional available as a standalone function in MongoDB store --- crates/hotfix-store-mongodb/src/lib.rs | 132 ++++++++++++++++++------- 1 file changed, 95 insertions(+), 37 deletions(-) diff --git a/crates/hotfix-store-mongodb/src/lib.rs b/crates/hotfix-store-mongodb/src/lib.rs index 50c2990..fa641d1 100644 --- a/crates/hotfix-store-mongodb/src/lib.rs +++ b/crates/hotfix-store-mongodb/src/lib.rs @@ -29,6 +29,8 @@ use hotfix_store::error::{Result, StoreError}; pub use mongodb::Client; +const DEFAULT_COLLECTION_NAME: &str = "messages"; + #[derive(Debug, Deserialize, Serialize)] struct SequenceMeta { #[serde(rename = "_id")] @@ -77,7 +79,7 @@ impl MongoDbMessageStore { db: Database, collection_name: Option<&str>, ) -> mongodb::error::Result { - let collection_name = collection_name.unwrap_or("messages"); + let collection_name = collection_name.unwrap_or(DEFAULT_COLLECTION_NAME); let meta_collection = db.collection(collection_name); let message_collection = db.collection(collection_name); @@ -159,48 +161,104 @@ impl MongoDbMessageStore { /// /// Returns `StoreError::Cleanup` if the cleanup operation fails. pub async fn cleanup_older_than(&self, age: Duration) -> Result { - let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis()); + cleanup_older_than_inner( + &self.meta_collection, + &self.message_collection, + age, + Some(self.current_sequence.object_id), + ) + .await + } +} - // Find old sequence IDs (excluding current sequence) - let filter = doc! { - "meta": true, - "creation_time": { "$lt": cutoff }, - "_id": { "$ne": self.current_sequence.object_id } - }; - let mut cursor = self - .meta_collection - .find(filter) - .await - .map_err(|e| StoreError::Cleanup(e.into()))?; +/// Deletes sequences older than the specified age, along with their associated messages. +/// +/// This function is useful for cleaning up old session data from MongoDB without +/// needing to instantiate a full [`MongoDbMessageStore`]. +/// The latest sequence is never deleted, even if it matches the age criteria. +/// +/// # Arguments +/// +/// * `db` - The MongoDB database to use +/// * `collection_name` - Optional collection name (defaults to "messages") +/// * `age` - The minimum age of sequences to delete +/// +/// # Returns +/// +/// The number of deleted sequences. +/// +/// # Errors +/// +/// Returns `StoreError::Cleanup` if the cleanup operation fails. +pub async fn cleanup_older_than( + db: &Database, + collection_name: Option<&str>, + age: Duration, +) -> Result { + let collection_name = collection_name.unwrap_or(DEFAULT_COLLECTION_NAME); + let meta_collection: Collection = db.collection(collection_name); + let message_collection: Collection = db.collection(collection_name); + + // Find latest sequence to exclude + let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build(); + let latest = meta_collection + .find_one(doc! { "meta": true }) + .with_options(options) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + let exclude_id = latest.map(|m| m.object_id); + + cleanup_older_than_inner(&meta_collection, &message_collection, age, exclude_id).await +} - let mut old_sequence_ids = Vec::new(); - while let Some(meta) = cursor - .try_next() - .await - .map_err(|e| StoreError::Cleanup(e.into()))? - { - old_sequence_ids.push(meta.object_id); - } +async fn cleanup_older_than_inner( + meta_collection: &Collection, + message_collection: &Collection, + age: Duration, + exclude_id: Option, +) -> Result { + let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis()); + + // Find old sequence IDs (excluding the specified sequence if any) + let mut filter = doc! { + "meta": true, + "creation_time": { "$lt": cutoff }, + }; + if let Some(id) = exclude_id { + filter.insert("_id", doc! { "$ne": id }); + } - if old_sequence_ids.is_empty() { - return Ok(0); - } + let mut cursor = meta_collection + .find(filter) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + let mut old_sequence_ids = Vec::new(); + while let Some(meta) = cursor + .try_next() + .await + .map_err(|e| StoreError::Cleanup(e.into()))? + { + old_sequence_ids.push(meta.object_id); + } - // Delete messages first to avoid orphaned meta documents - self.message_collection - .delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } }) - .await - .map_err(|e| StoreError::Cleanup(e.into()))?; + if old_sequence_ids.is_empty() { + return Ok(0); + } - // Delete sequence metas - let result = self - .meta_collection - .delete_many(doc! { "_id": { "$in": &old_sequence_ids } }) - .await - .map_err(|e| StoreError::Cleanup(e.into()))?; + // Delete messages first to avoid orphaned meta documents + message_collection + .delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; - Ok(result.deleted_count) - } + // Delete sequence metas + let result = meta_collection + .delete_many(doc! { "_id": { "$in": &old_sequence_ids } }) + .await + .map_err(|e| StoreError::Cleanup(e.into()))?; + + Ok(result.deleted_count) } #[async_trait] From 973a195a52cfaff7f5c7c62794ba58fc39c97b50 Mon Sep 17 00:00:00 2001 From: David Steiner Date: Fri, 30 Jan 2026 10:26:21 +0100 Subject: [PATCH 2/2] Add test case for new standalone cleanup function --- .../tests/mongodb_tests.rs | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/crates/hotfix-store-mongodb/tests/mongodb_tests.rs b/crates/hotfix-store-mongodb/tests/mongodb_tests.rs index 0629b73..6ddd1b4 100644 --- a/crates/hotfix-store-mongodb/tests/mongodb_tests.rs +++ b/crates/hotfix-store-mongodb/tests/mongodb_tests.rs @@ -6,7 +6,7 @@ use chrono::Duration; use hotfix_store::MessageStore; use hotfix_store::error::StoreError; -use hotfix_store_mongodb::{Client, MongoDbMessageStore}; +use hotfix_store_mongodb::{Client, MongoDbMessageStore, cleanup_older_than}; use testcontainers::runners::AsyncRunner; use testcontainers::{ContainerAsync, GenericImage}; @@ -193,3 +193,46 @@ async fn test_cleanup_after_connection_drop() { assert!(matches!(result, Err(StoreError::Cleanup(_)))); } + +#[tokio::test] +async fn test_standalone_cleanup_removes_old_sequences() { + let container = GenericImage::new("mongo", "8.0").start().await.unwrap(); + let host = container.get_host().await.unwrap(); + let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap(); + + let client = Client::with_uri_str(format!("mongodb://{host}:{port}")) + .await + .unwrap(); + let db = client.database("test_standalone_cleanup"); + let mut store = MongoDbMessageStore::new(db.clone(), Some("test")) + .await + .unwrap(); + + // Add a message to the initial sequence + store.add(1, b"message in sequence 1").await.unwrap(); + + // Reset creates a new sequence, making the first one "old" + store.reset().await.unwrap(); + store.add(1, b"message in sequence 2").await.unwrap(); + + // Reset again to have two old sequences + store.reset().await.unwrap(); + store.add(1, b"message in sequence 3").await.unwrap(); + + // Small delay to ensure old sequences have earlier timestamps than the cutoff + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + // Use the standalone function to cleanup (without needing the store) + let deleted = cleanup_older_than(&db, Some("test"), Duration::zero()) + .await + .unwrap(); + + assert_eq!(deleted, 2); + + // Verify current sequence messages are still accessible via the store + let messages = store.get_slice(1, 1).await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0], b"message in sequence 3"); + + drop(container); +}