diff --git a/src/db_cleanup.rs b/src/db_cleanup.rs index e1094fe..997ea37 100644 --- a/src/db_cleanup.rs +++ b/src/db_cleanup.rs @@ -1,44 +1,40 @@ +use crate::config::{ + FIFTEEN_MINUTE_DATA_RETENTION_DAYS, FIVE_MINUTE_DATA_RETENTION_DAYS, + MINUTE_DATA_RETENTION_DAYS, RAW_DATA_RETENTION_HOURS, +}; +use crate::database::PriceDatabase; use crate::errors::{BotError, BotResult}; -use crate::config::{RAW_DATA_RETENTION_HOURS, MINUTE_DATA_RETENTION_DAYS, FIVE_MINUTE_DATA_RETENTION_DAYS, FIFTEEN_MINUTE_DATA_RETENTION_DAYS, DATABASE_PATH}; use crate::health::HealthState; use rusqlite::Connection; use std::sync::Arc; use std::time::Duration; use tokio::time::sleep; -use tracing::{info, error, debug}; +use tracing::{debug, error, info, warn}; /// Database cleanup service for aggregating and compacting price data pub struct DatabaseCleanup { health: Arc, + database: Arc, } impl DatabaseCleanup { - pub fn new() -> Self { + pub fn new(database: Arc) -> Self { let health = Arc::new(HealthState::new("DB-CLEANUP".to_string())); - Self { health } + Self { health, database } } - /// Get a database connection with WAL mode and busy timeout - fn get_connection(&self) -> BotResult { - let conn = Connection::open(DATABASE_PATH).map_err(BotError::Database)?; - - // Enable WAL mode for better concurrent access - if let Err(e) = conn.pragma_update(None, "journal_mode", "WAL") { - error!("Failed to enable WAL mode: {}", e); - } - // Set busy timeout to handle locks better (30 seconds) - if let Err(e) = conn.pragma_update(None, "busy_timeout", 30000) { - error!("Failed to set busy timeout: {}", e); - } - - Ok(conn) + /// Get a database connection from the pool + fn get_connection( + &self, + ) -> BotResult> { + self.database.get_connection() } /// Initialize the aggregated data table fn init_aggregated_table(&self) -> BotResult<()> { let conn = self.get_connection()?; - + conn.execute( "CREATE TABLE IF NOT EXISTS price_aggregates ( id INTEGER PRIMARY KEY AUTOINCREMENT, @@ -74,27 +70,37 @@ impl DatabaseCleanup { } /// Aggregate raw data into time buckets with batching to reduce lock time - fn aggregate_data(&self, bucket_duration_seconds: u64, older_than_seconds: u64) -> BotResult { - info!(" ๐Ÿ” Checking for data older than {} seconds to aggregate into {}-second buckets", older_than_seconds, bucket_duration_seconds); - + fn aggregate_data( + &self, + bucket_duration_seconds: u64, + older_than_seconds: u64, + ) -> BotResult { + info!( + " ๐Ÿ” Checking for data older than {} seconds to aggregate into {}-second buckets", + older_than_seconds, bucket_duration_seconds + ); + let conn = self.get_connection()?; let current_time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| BotError::SystemTime(format!("System time error: {}", e)))? .as_secs(); - + let cutoff_time = current_time - older_than_seconds; let bucket_duration = bucket_duration_seconds as i64; - + // Process in smaller batches to reduce lock contention let batch_size = 100; let mut total_aggregated = 0u64; let mut batch_number = 0; - + loop { batch_number += 1; - debug!(" ๐Ÿ“ฆ Processing batch {} for {}-second aggregation", batch_number, bucket_duration_seconds); - + debug!( + " ๐Ÿ“ฆ Processing batch {} for {}-second aggregation", + batch_number, bucket_duration_seconds + ); + // Get a small batch of data to aggregate let mut stmt = conn.prepare( "SELECT crypto_name, @@ -114,43 +120,64 @@ impl DatabaseCleanup { GROUP BY crypto_name, bucket_start HAVING COUNT(*) > 0 ORDER BY crypto_name, bucket_start - LIMIT ?" + LIMIT ?", )?; - let rows = stmt.query_map([ - bucket_duration, bucket_duration, // bucket_start calculation - cutoff_time as i64, // WHERE timestamp < cutoff - bucket_duration, bucket_duration, bucket_duration, // NOT EXISTS check - batch_size as i64 // LIMIT - ], |row| { - Ok(( - row.get::<_, String>(0)?, // crypto_name - row.get::<_, i64>(1)?, // bucket_start - row.get::<_, f64>(2)?, // low_price - row.get::<_, f64>(3)?, // high_price - row.get::<_, f64>(4)?, // avg_price - row.get::<_, i64>(5)?, // sample_count - )) - })?; + let rows = stmt.query_map( + [ + bucket_duration, + bucket_duration, // bucket_start calculation + cutoff_time as i64, // WHERE timestamp < cutoff + bucket_duration, + bucket_duration, + bucket_duration, // NOT EXISTS check + batch_size as i64, // LIMIT + ], + |row| { + Ok(( + row.get::<_, String>(0)?, // crypto_name + row.get::<_, i64>(1)?, // bucket_start + row.get::<_, f64>(2)?, // low_price + row.get::<_, f64>(3)?, // high_price + row.get::<_, f64>(4)?, // avg_price + row.get::<_, i64>(5)?, // sample_count + )) + }, + )?; let batch_data: Vec<_> = rows.collect::, _>>()?; - + if batch_data.is_empty() { - debug!(" โœ… No more data to aggregate for {}-second buckets", bucket_duration_seconds); + debug!( + " โœ… No more data to aggregate for {}-second buckets", + bucket_duration_seconds + ); break; // No more data to process } - - debug!(" ๐Ÿ“Š Found {} records to aggregate in batch {}", batch_data.len(), batch_number); - + + debug!( + " ๐Ÿ“Š Found {} records to aggregate in batch {}", + batch_data.len(), + batch_number + ); + // Process this batch in a transaction let tx = conn.unchecked_transaction()?; let mut batch_count = 0u64; - - for (crypto_name, bucket_start, low_price, high_price, avg_price, sample_count) in batch_data { + + for (crypto_name, bucket_start, low_price, high_price, avg_price, sample_count) in + batch_data + { // Get open and close prices separately for accuracy - let open_price = self.get_bucket_open_price(&conn, &crypto_name, bucket_start, bucket_duration)?; - let close_price = self.get_bucket_close_price(&conn, &crypto_name, bucket_start, bucket_duration)?; - + let open_price = + self.get_bucket_open_price(&conn, &crypto_name, bucket_start, bucket_duration)?; + let close_price = self.get_bucket_close_price( + &conn, + &crypto_name, + bucket_start, + bucket_duration, + )?; + // Insert the aggregated data tx.execute( "INSERT INTO price_aggregates @@ -168,88 +195,122 @@ impl DatabaseCleanup { &sample_count.to_string(), ] )?; - + batch_count += 1; } - + // Commit this batch tx.commit()?; total_aggregated += batch_count; - - debug!(" โœ… Batch {} completed: {} buckets aggregated (total: {})", batch_number, batch_count, total_aggregated); - + + debug!( + " โœ… Batch {} completed: {} buckets aggregated (total: {})", + batch_number, batch_count, total_aggregated + ); + // Small delay between batches to allow other processes to access DB std::thread::sleep(std::time::Duration::from_millis(100)); } if total_aggregated > 0 { - info!("๐Ÿ“Š Aggregated {} buckets of {}-second data", total_aggregated, bucket_duration_seconds); + info!( + "๐Ÿ“Š Aggregated {} buckets of {}-second data", + total_aggregated, bucket_duration_seconds + ); } Ok(total_aggregated) } /// Get the opening price for a bucket - fn get_bucket_open_price(&self, conn: &Connection, crypto_name: &str, bucket_start: i64, bucket_duration: i64) -> BotResult { + fn get_bucket_open_price( + &self, + conn: &Connection, + crypto_name: &str, + bucket_start: i64, + bucket_duration: i64, + ) -> BotResult { let bucket_end = bucket_start + bucket_duration; let mut stmt = conn.prepare( "SELECT price FROM prices WHERE crypto_name = ? AND timestamp >= ? AND timestamp < ? - ORDER BY timestamp ASC LIMIT 1" + ORDER BY timestamp ASC LIMIT 1", )?; - - let price: f64 = stmt.query_row([crypto_name, &bucket_start.to_string(), &bucket_end.to_string()], |row| { - row.get(0) - })?; - + + let price: f64 = stmt.query_row( + [ + crypto_name, + &bucket_start.to_string(), + &bucket_end.to_string(), + ], + |row| row.get(0), + )?; + Ok(price) } /// Get the closing price for a bucket - fn get_bucket_close_price(&self, conn: &Connection, crypto_name: &str, bucket_start: i64, bucket_duration: i64) -> BotResult { + fn get_bucket_close_price( + &self, + conn: &Connection, + crypto_name: &str, + bucket_start: i64, + bucket_duration: i64, + ) -> BotResult { let bucket_end = bucket_start + bucket_duration; let mut stmt = conn.prepare( "SELECT price FROM prices WHERE crypto_name = ? AND timestamp >= ? AND timestamp < ? - ORDER BY timestamp DESC LIMIT 1" + ORDER BY timestamp DESC LIMIT 1", )?; - - let price: f64 = stmt.query_row([crypto_name, &bucket_start.to_string(), &bucket_end.to_string()], |row| { - row.get(0) - })?; - + + let price: f64 = stmt.query_row( + [ + crypto_name, + &bucket_start.to_string(), + &bucket_end.to_string(), + ], + |row| row.get(0), + )?; + Ok(price) } /// Delete raw data that has been successfully aggregated fn cleanup_aggregated_raw_data(&self, older_than_seconds: u64) -> BotResult { - info!(" ๐Ÿ” Checking how many raw records are older than {} seconds", older_than_seconds); - + info!( + " ๐Ÿ” Checking how many raw records are older than {} seconds", + older_than_seconds + ); + let conn = self.get_connection()?; let current_time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| BotError::SystemTime(format!("System time error: {}", e)))? .as_secs(); - + let cutoff_time = current_time - older_than_seconds; - + // First, count how many records we're about to delete let mut count_stmt = conn.prepare("SELECT COUNT(*) FROM prices WHERE timestamp < ?")?; let count: i64 = count_stmt.query_row([cutoff_time as i64], |row| row.get(0))?; - - info!(" ๐Ÿ“Š Found {} raw records older than {} seconds", count, older_than_seconds); - + + info!( + " ๐Ÿ“Š Found {} raw records older than {} seconds", + count, older_than_seconds + ); + if count == 0 { info!(" โœ… No old raw data to clean up"); return Ok(0); } - + info!(" ๐Ÿ—‘๏ธ Deleting old raw records in batches..."); - + // Delete in batches to avoid hanging let mut total_deleted = 0i64; let batch_size = 10000; - + loop { let deleted = conn.execute( "DELETE FROM prices @@ -263,42 +324,63 @@ impl DatabaseCleanup { LIMIT ?", rusqlite::params![cutoff_time as i64, batch_size], )?; - + if deleted == 0 { break; } - + total_deleted += deleted as i64; info!(" Deleted {} records (total: {})", deleted, total_deleted); } - info!(" {} raw price records โœ… Successfully deleted older than {} seconds", total_deleted, older_than_seconds); + info!( + " {} raw price records โœ… Successfully deleted older than {} seconds", + total_deleted, older_than_seconds + ); Ok(total_deleted as u64) } /// Delete old aggregated data beyond retention period - fn cleanup_old_aggregates(&self, bucket_duration_seconds: u64, older_than_seconds: u64) -> BotResult { - info!(" ๐Ÿงน Cleaning up {}-second aggregates older than {} seconds", bucket_duration_seconds, older_than_seconds); - + fn cleanup_old_aggregates( + &self, + bucket_duration_seconds: u64, + older_than_seconds: u64, + ) -> BotResult { + info!( + " ๐Ÿงน Cleaning up {}-second aggregates older than {} seconds", + bucket_duration_seconds, older_than_seconds + ); + let conn = self.get_connection()?; let current_time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| BotError::SystemTime(format!("System time error: {}", e)))? .as_secs(); - + let cutoff_time = current_time - older_than_seconds; - + // First count what we're about to delete - let mut count_stmt = conn.prepare("SELECT COUNT(*) FROM price_aggregates WHERE bucket_start < ? AND bucket_duration = ?")?; - let count: i64 = count_stmt.query_row([cutoff_time as i64, bucket_duration_seconds as i64], |row| row.get(0))?; - + let mut count_stmt = conn.prepare( + "SELECT COUNT(*) FROM price_aggregates WHERE bucket_start < ? AND bucket_duration = ?", + )?; + let count: i64 = count_stmt.query_row( + [cutoff_time as i64, bucket_duration_seconds as i64], + |row| row.get(0), + )?; + if count > 0 { - info!(" ๐Ÿ—‘๏ธ Deleting {} old {}-second aggregate records...", count, bucket_duration_seconds); + info!( + " ๐Ÿ—‘๏ธ Deleting {} old {}-second aggregate records...", + count, bucket_duration_seconds + ); } else { - info!(" โœ… No old {}-second aggregates to clean up", bucket_duration_seconds); + info!( + " โœ… No old {}-second aggregates to clean up", + bucket_duration_seconds + ); } - + let deleted = conn.execute( "DELETE FROM price_aggregates WHERE bucket_start < ? AND bucket_duration = ?", @@ -306,7 +388,10 @@ impl DatabaseCleanup { )?; if deleted > 0 { - info!(" โœ… Deleted {} aggregated records ({}-second buckets)", deleted, bucket_duration_seconds); + info!( + " โœ… Deleted {} aggregated records ({}-second buckets)", + deleted, bucket_duration_seconds + ); } Ok(deleted as u64) @@ -315,33 +400,36 @@ impl DatabaseCleanup { /// Vacuum the database to reclaim space fn vacuum_database(&self) -> BotResult<()> { let conn = self.get_connection()?; - + info!("๐Ÿงน Starting database vacuum..."); conn.execute("VACUUM", [])?; info!("โœ… Database vacuum completed"); - + Ok(()) } /// Get database statistics fn get_database_stats(&self) -> BotResult<()> { let conn = self.get_connection()?; - + // Count raw price records - let raw_count: i64 = conn.query_row("SELECT COUNT(*) FROM prices", [], |row: &rusqlite::Row| row.get(0))?; - + let raw_count: i64 = + conn.query_row("SELECT COUNT(*) FROM prices", [], |row: &rusqlite::Row| { + row.get(0) + })?; + // Count aggregated records by bucket size let mut stmt = conn.prepare( "SELECT bucket_duration, COUNT(*) FROM price_aggregates GROUP BY bucket_duration ORDER BY bucket_duration" )?; - + let rows = stmt.query_map([], |row: &rusqlite::Row| { Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)) })?; info!("๐Ÿ“Š Database Statistics:"); info!(" Raw price records: {}", raw_count); - + for row in rows { let (duration, count) = row?; info!(" {}-second aggregates: {}", duration, count); @@ -353,7 +441,7 @@ impl DatabaseCleanup { /// Perform complete cleanup cycle with retry logic pub async fn perform_cleanup(&self) -> BotResult<()> { const MAX_RETRIES: u32 = 3; - + for attempt in 1..=MAX_RETRIES { match self.perform_cleanup_attempt().await { Ok(()) => return Ok(()), @@ -372,25 +460,33 @@ impl DatabaseCleanup { } /// Aggregate data from smaller buckets into larger buckets (e.g., 1m -> 5m) - fn aggregate_buckets(&self, source_duration: u64, target_duration: u64, older_than_seconds: u64) -> BotResult { - info!(" ๐Ÿ” Aggregating {}-second buckets older than {} seconds into {}-second buckets", source_duration, older_than_seconds, target_duration); - + fn aggregate_buckets( + &self, + source_duration: u64, + target_duration: u64, + older_than_seconds: u64, + ) -> BotResult { + info!( + " ๐Ÿ” Aggregating {}-second buckets older than {} seconds into {}-second buckets", + source_duration, older_than_seconds, target_duration + ); + let conn = self.get_connection()?; let current_time = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map_err(|e| BotError::SystemTime(format!("System time error: {}", e)))? .as_secs(); - + let cutoff_time = current_time - older_than_seconds; - + // Process in batches let batch_size = 100; let mut total_aggregated = 0u64; let mut batch_number = 0; - + loop { batch_number += 1; - + // Get a batch of source buckets to aggregate // We group by the NEW bucket start time let mut stmt = conn.prepare( @@ -412,63 +508,103 @@ impl DatabaseCleanup { GROUP BY crypto_name, new_bucket_start HAVING COUNT(*) > 0 ORDER BY crypto_name, new_bucket_start - LIMIT ?" + LIMIT ?", )?; - let rows = stmt.query_map([ - target_duration, target_duration, // new_bucket_start calculation - source_duration, // WHERE bucket_duration = source - cutoff_time, // AND bucket_start < cutoff - target_duration, target_duration, target_duration, // NOT EXISTS check - batch_size as u64 // LIMIT - ], |row| { - Ok(( - row.get::<_, String>(0)?, // crypto_name - row.get::<_, i64>(1)?, // new_bucket_start - row.get::<_, f64>(2)?, // low_price - row.get::<_, f64>(3)?, // high_price - row.get::<_, f64>(4)?, // avg_price - row.get::<_, i64>(5)?, // sample_count - )) - })?; + let rows = stmt.query_map( + [ + target_duration, + target_duration, // new_bucket_start calculation + source_duration, // WHERE bucket_duration = source + cutoff_time, // AND bucket_start < cutoff + target_duration, + target_duration, + target_duration, // NOT EXISTS check + batch_size as u64, // LIMIT + ], + |row| { + Ok(( + row.get::<_, String>(0)?, // crypto_name + row.get::<_, i64>(1)?, // new_bucket_start + row.get::<_, f64>(2)?, // low_price + row.get::<_, f64>(3)?, // high_price + row.get::<_, f64>(4)?, // avg_price + row.get::<_, i64>(5)?, // sample_count + )) + }, + )?; let batch_data: Vec<_> = rows.collect::, _>>()?; - + if batch_data.is_empty() { break; // No more data to process } - - debug!(" ๐Ÿ“Š Found {} bucket groups to aggregate in batch {}", batch_data.len(), batch_number); - + + debug!( + " ๐Ÿ“Š Found {} bucket groups to aggregate in batch {}", + batch_data.len(), + batch_number + ); + // Process this batch in a transaction let tx = conn.unchecked_transaction()?; let mut batch_count = 0u64; - - for (crypto_name, bucket_start, low_price, high_price, avg_price, sample_count) in batch_data { + + for (crypto_name, bucket_start, low_price, high_price, avg_price, sample_count) in + batch_data + { // For open/close, we need to query the source buckets // Open price = Open price of the earliest source bucket in this range // Close price = Close price of the latest source bucket in this range let bucket_end = bucket_start + target_duration as i64; - + // Get open price - let open_price: f64 = tx.query_row( + let open_price: f64 = match tx.query_row( "SELECT open_price FROM price_aggregates WHERE crypto_name = ? AND bucket_duration = ? AND bucket_start >= ? AND bucket_start < ? ORDER BY bucket_start ASC LIMIT 1", - [&crypto_name, &source_duration.to_string(), &bucket_start.to_string(), &bucket_end.to_string()], - |row| row.get(0) - ).unwrap_or(avg_price); // Fallback to avg if query fails (shouldn't happen) + [ + &crypto_name, + &source_duration.to_string(), + &bucket_start.to_string(), + &bucket_end.to_string(), + ], + |row| row.get(0), + ) { + Ok(price) => price, + Err(e) => { + warn!( + "Failed to get open price for {}: {}, using avg", + crypto_name, e + ); + avg_price + } + }; // Get close price - let close_price: f64 = tx.query_row( + let close_price: f64 = match tx.query_row( "SELECT close_price FROM price_aggregates WHERE crypto_name = ? AND bucket_duration = ? AND bucket_start >= ? AND bucket_start < ? ORDER BY bucket_start DESC LIMIT 1", - [&crypto_name, &source_duration.to_string(), &bucket_start.to_string(), &bucket_end.to_string()], - |row| row.get(0) - ).unwrap_or(avg_price); + [ + &crypto_name, + &source_duration.to_string(), + &bucket_start.to_string(), + &bucket_end.to_string(), + ], + |row| row.get(0), + ) { + Ok(price) => price, + Err(e) => { + warn!( + "Failed to get close price for {}: {}, using avg", + crypto_name, e + ); + avg_price + } + }; // Insert the aggregated data tx.execute( @@ -487,20 +623,23 @@ impl DatabaseCleanup { &sample_count.to_string(), ] )?; - + batch_count += 1; } - + // Commit this batch tx.commit()?; total_aggregated += batch_count; - + // Small delay std::thread::sleep(std::time::Duration::from_millis(50)); } if total_aggregated > 0 { - info!("๐Ÿ“Š Aggregated {} buckets of {}-second data (sourced from {}-second buckets)", total_aggregated, target_duration, source_duration); + info!( + "๐Ÿ“Š Aggregated {} buckets of {}-second data (sourced from {}-second buckets)", + total_aggregated, target_duration, source_duration + ); } Ok(total_aggregated) @@ -510,55 +649,68 @@ impl DatabaseCleanup { async fn perform_cleanup_attempt(&self) -> BotResult<()> { info!("๐Ÿงน Starting database cleanup cycle..."); self.health.update_price_timestamp(); // Use as "last activity" timestamp - + // Initialize aggregated table if needed info!("๐Ÿ“‹ Step 1/7: Initializing aggregated data table..."); self.init_aggregated_table()?; - + // Tier 1: Aggregate raw data older than 24 hours into 1-minute buckets info!("๐Ÿ“Š Step 2/7: Aggregating raw data into 1-minute buckets..."); let aggregated_1m = self.aggregate_data(60, RAW_DATA_RETENTION_HOURS * 3600)?; - - // Tier 2: Aggregate 1-minute data older than 7 days into 5-minute buckets + + // Tier 2: Aggregate 1-minute data older than 7 days into 5-minute buckets info!("๐Ÿ“Š Step 3/7: Aggregating 1-minute data into 5-minute buckets..."); // CHANGED: Source from 60s buckets instead of raw data - let aggregated_5m = self.aggregate_buckets(60, 300, MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; - + let aggregated_5m = + self.aggregate_buckets(60, 300, MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; + // Tier 3: Aggregate 5-minute data older than 30 days into 15-minute buckets info!("๐Ÿ“Š Step 4/7: Aggregating 5-minute data into 15-minute buckets..."); // CHANGED: Source from 300s buckets instead of raw data - let aggregated_15m = self.aggregate_buckets(300, 900, FIVE_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; - + let aggregated_15m = + self.aggregate_buckets(300, 900, FIVE_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; + // Clean up raw data that has been aggregated (older than 24 hours) info!("๐Ÿ—‘๏ธ Step 5/7: Cleaning up old raw data (older than 24 hours)..."); let deleted_raw = self.cleanup_aggregated_raw_data(RAW_DATA_RETENTION_HOURS * 3600)?; - + // Clean up old aggregated data beyond retention periods info!("๐Ÿ—‘๏ธ Step 6/7: Cleaning up old aggregated data..."); let deleted_1m = self.cleanup_old_aggregates(60, MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; - let deleted_5m = self.cleanup_old_aggregates(300, FIVE_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; - let deleted_15m = self.cleanup_old_aggregates(900, FIFTEEN_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; - + let deleted_5m = + self.cleanup_old_aggregates(300, FIVE_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; + let deleted_15m = + self.cleanup_old_aggregates(900, FIFTEEN_MINUTE_DATA_RETENTION_DAYS * 24 * 3600)?; + // Vacuum database if significant cleanup occurred let total_deleted = deleted_raw + deleted_1m + deleted_5m + deleted_15m; if total_deleted > 1000 { - info!("๐Ÿ”ง Step 7/7: Running database vacuum (deleted {} records)...", total_deleted); + info!( + "๐Ÿ”ง Step 7/7: Running database vacuum (deleted {} records)...", + total_deleted + ); self.vacuum_database()?; } else { - info!("โญ๏ธ Step 7/7: Skipping vacuum (only {} records deleted)", total_deleted); + info!( + "โญ๏ธ Step 7/7: Skipping vacuum (only {} records deleted)", + total_deleted + ); } - + // Update health timestamp self.health.update_db_timestamp(); - + // Show final statistics info!("๐Ÿ“ˆ Generating final database statistics..."); self.get_database_stats()?; - + info!("โœ… Cleanup cycle completed:"); - info!(" ๐Ÿ“Š Aggregated: {}x1m + {}x5m + {}x15m buckets", aggregated_1m, aggregated_5m, aggregated_15m); + info!( + " ๐Ÿ“Š Aggregated: {}x1m + {}x5m + {}x15m buckets", + aggregated_1m, aggregated_5m, aggregated_15m + ); info!(" ๐Ÿ—‘๏ธ Deleted: {} total records", total_deleted); - + Ok(()) } @@ -568,17 +720,17 @@ impl DatabaseCleanup { .unwrap_or_else(|_| "24".to_string()) .parse::() .unwrap_or(24); - + let interval = Duration::from_secs(interval_hours * 3600); - + info!("๐Ÿš€ Database cleanup service started"); info!("โฐ Cleanup interval: {} hours", interval_hours); - + // Note: Health server is now started by main.rs with aggregated health from all bots - + // Run initial cleanup after a short delay sleep(Duration::from_secs(30)).await; - + loop { match self.perform_cleanup().await { Ok(_) => { @@ -590,9 +742,9 @@ impl DatabaseCleanup { self.health.increment_failures(); } } - + info!("โฐ Next cleanup in {} hours", interval_hours); sleep(interval).await; } } -} \ No newline at end of file +} diff --git a/src/health.rs b/src/health.rs index 832065c..04e9c4e 100644 --- a/src/health.rs +++ b/src/health.rs @@ -107,7 +107,7 @@ impl HealthState { .unwrap_or_default() .as_secs(); - let start_time = self.start_time.load(Ordering::Relaxed); + let _start_time = self.start_time.load(Ordering::Relaxed); let last_price = self.last_price_update.load(Ordering::Relaxed); let last_db = self.last_db_write.load(Ordering::Relaxed); let last_discord = self.last_discord_update.load(Ordering::Relaxed); @@ -116,9 +116,6 @@ impl HealthState { let gateway_failures = self.gateway_failures.load(Ordering::Relaxed); let discord_test_failures = self.discord_test_failures.load(Ordering::Relaxed); - // For newly started bots, use start_time as baseline (value of 0 means never updated) - let _effective_start = if start_time > 0 { start_time } else { now }; - // Consider unhealthy if: // - No price update in last 5 minutes // - No database write in last 5 minutes diff --git a/src/main.rs b/src/main.rs index 455f534..0d76d5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,8 +52,9 @@ async fn main() -> BotResult<()> { // Start Database Cleanup Service info!("๐Ÿงน Starting Database Cleanup Service..."); { + let db_clone = db.clone(); tokio::spawn(async move { - let cleanup = DatabaseCleanup::new(); + let cleanup = DatabaseCleanup::new(db_clone); if let Err(e) = cleanup.run().await { error!("Cleanup service crashed: {}", e); }