Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 95 additions & 37 deletions crates/hotfix-store-mongodb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use hotfix_store::error::{Result, StoreError};

pub use mongodb::Client;

const DEFAULT_COLLECTION_NAME: &str = "messages";

#[derive(Debug, Deserialize, Serialize)]
struct SequenceMeta {
#[serde(rename = "_id")]
Expand Down Expand Up @@ -77,7 +79,7 @@ impl MongoDbMessageStore {
db: Database,
collection_name: Option<&str>,
) -> mongodb::error::Result<Self> {
let collection_name = collection_name.unwrap_or("messages");
let collection_name = collection_name.unwrap_or(DEFAULT_COLLECTION_NAME);
let meta_collection = db.collection(collection_name);
let message_collection = db.collection(collection_name);

Expand Down Expand Up @@ -159,48 +161,104 @@ impl MongoDbMessageStore {
///
/// Returns `StoreError::Cleanup` if the cleanup operation fails.
pub async fn cleanup_older_than(&self, age: Duration) -> Result<u64> {
let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis());
cleanup_older_than_inner(
&self.meta_collection,
&self.message_collection,
age,
Some(self.current_sequence.object_id),
)
.await
}
}

// Find old sequence IDs (excluding current sequence)
let filter = doc! {
"meta": true,
"creation_time": { "$lt": cutoff },
"_id": { "$ne": self.current_sequence.object_id }
};
let mut cursor = self
.meta_collection
.find(filter)
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;
/// Deletes sequences older than the specified age, along with their associated messages.
///
/// This function is useful for cleaning up old session data from MongoDB without
/// needing to instantiate a full [`MongoDbMessageStore`].
/// The latest sequence is never deleted, even if it matches the age criteria.
///
/// # Arguments
///
/// * `db` - The MongoDB database to use
/// * `collection_name` - Optional collection name (defaults to "messages")
/// * `age` - The minimum age of sequences to delete
///
/// # Returns
///
/// The number of deleted sequences.
///
/// # Errors
///
/// Returns `StoreError::Cleanup` if the cleanup operation fails.
pub async fn cleanup_older_than(
db: &Database,
collection_name: Option<&str>,
age: Duration,
) -> Result<u64> {
let collection_name = collection_name.unwrap_or(DEFAULT_COLLECTION_NAME);
let meta_collection: Collection<SequenceMeta> = db.collection(collection_name);
let message_collection: Collection<Message> = db.collection(collection_name);

// Find latest sequence to exclude
let options = FindOneOptions::builder().sort(doc! { "_id": -1 }).build();
let latest = meta_collection
.find_one(doc! { "meta": true })
.with_options(options)
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;
let exclude_id = latest.map(|m| m.object_id);

cleanup_older_than_inner(&meta_collection, &message_collection, age, exclude_id).await
}

let mut old_sequence_ids = Vec::new();
while let Some(meta) = cursor
.try_next()
.await
.map_err(|e| StoreError::Cleanup(e.into()))?
{
old_sequence_ids.push(meta.object_id);
}
async fn cleanup_older_than_inner(
meta_collection: &Collection<SequenceMeta>,
message_collection: &Collection<Message>,
age: Duration,
exclude_id: Option<ObjectId>,
) -> Result<u64> {
let cutoff = BsonDateTime::from_millis((Utc::now() - age).timestamp_millis());

// Find old sequence IDs (excluding the specified sequence if any)
let mut filter = doc! {
"meta": true,
"creation_time": { "$lt": cutoff },
};
if let Some(id) = exclude_id {
filter.insert("_id", doc! { "$ne": id });
}

if old_sequence_ids.is_empty() {
return Ok(0);
}
let mut cursor = meta_collection
.find(filter)
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

let mut old_sequence_ids = Vec::new();
while let Some(meta) = cursor
.try_next()
.await
.map_err(|e| StoreError::Cleanup(e.into()))?
{
old_sequence_ids.push(meta.object_id);
}

// Delete messages first to avoid orphaned meta documents
self.message_collection
.delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;
if old_sequence_ids.is_empty() {
return Ok(0);
}

// Delete sequence metas
let result = self
.meta_collection
.delete_many(doc! { "_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;
// Delete messages first to avoid orphaned meta documents
message_collection
.delete_many(doc! { "sequence_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

Ok(result.deleted_count)
}
// Delete sequence metas
let result = meta_collection
.delete_many(doc! { "_id": { "$in": &old_sequence_ids } })
.await
.map_err(|e| StoreError::Cleanup(e.into()))?;

Ok(result.deleted_count)
}

#[async_trait]
Expand Down
45 changes: 44 additions & 1 deletion crates/hotfix-store-mongodb/tests/mongodb_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use chrono::Duration;
use hotfix_store::MessageStore;
use hotfix_store::error::StoreError;
use hotfix_store_mongodb::{Client, MongoDbMessageStore};
use hotfix_store_mongodb::{Client, MongoDbMessageStore, cleanup_older_than};
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage};

Expand Down Expand Up @@ -193,3 +193,46 @@ async fn test_cleanup_after_connection_drop() {

assert!(matches!(result, Err(StoreError::Cleanup(_))));
}

#[tokio::test]
async fn test_standalone_cleanup_removes_old_sequences() {
let container = GenericImage::new("mongo", "8.0").start().await.unwrap();
let host = container.get_host().await.unwrap();
let port = container.get_host_port_ipv4(MONGO_PORT).await.unwrap();

let client = Client::with_uri_str(format!("mongodb://{host}:{port}"))
.await
.unwrap();
let db = client.database("test_standalone_cleanup");
let mut store = MongoDbMessageStore::new(db.clone(), Some("test"))
.await
.unwrap();

// Add a message to the initial sequence
store.add(1, b"message in sequence 1").await.unwrap();

// Reset creates a new sequence, making the first one "old"
store.reset().await.unwrap();
store.add(1, b"message in sequence 2").await.unwrap();

// Reset again to have two old sequences
store.reset().await.unwrap();
store.add(1, b"message in sequence 3").await.unwrap();

// Small delay to ensure old sequences have earlier timestamps than the cutoff
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

// Use the standalone function to cleanup (without needing the store)
let deleted = cleanup_older_than(&db, Some("test"), Duration::zero())
.await
.unwrap();

assert_eq!(deleted, 2);

// Verify current sequence messages are still accessible via the store
let messages = store.get_slice(1, 1).await.unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0], b"message in sequence 3");

drop(container);
}