diff --git a/Cargo.lock b/Cargo.lock index d7f0734f..eac110c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -968,6 +968,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] diff --git a/Cargo.toml b/Cargo.toml index 77bf6526..8bfb7b1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ once_cell = "1.19.0" vaultrs = { version = "0.7.2", optional = true } bb8 = "0.8" rand_distr = "0.4" -chrono = "0.4" +chrono = { version = "0.4", features = ["serde"] } cpu-time = "1.0.0" jemallocator = "0.5" jemalloc-ctl = "0.5" diff --git a/src/bin/open_router.rs b/src/bin/open_router.rs index 2b184fe7..af5b19d6 100644 --- a/src/bin/open_router.rs +++ b/src/bin/open_router.rs @@ -36,8 +36,14 @@ async fn main() -> Result<(), Box> { .expect("Failed while building the metrics server") }); + let shard_queue_handle = tokio::spawn(async move { + open_router::shard_queue::GLOBAL_SHARD_QUEUE_HANDLER.spawn() + .await + .expect("Failed while running the shard queue handler") + }); + // Wait for both servers to complete (they should run indefinitely) - tokio::try_join!(main_server_handle, metrics_server_handle)?; + tokio::try_join!(main_server_handle, metrics_server_handle, shard_queue_handle)?; Ok(()) } diff --git a/src/config.rs b/src/config.rs index 6d26fbec..c708dc75 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,6 +19,45 @@ use std::{ path::PathBuf, }; +#[derive(Clone, serde::Deserialize, Debug)] +pub struct ShardQueueConfig { + #[serde(default = "default_shard_count")] + pub shard_count: u8, + #[serde(default = "default_loop_interval_seconds")] + pub loop_interval_seconds: u64, + #[serde(default = "default_max_items_per_cycle")] + pub max_items_per_cycle: u64, + #[serde(default = "default_stream_maxlen")] + pub stream_maxlen: u64, +} + +fn default_shard_count() -> u8 { + 10 +} + +fn default_loop_interval_seconds() -> u64 { + 10 +} + +fn default_max_items_per_cycle() -> u64 { + 100 +} + +fn default_stream_maxlen() -> u64 { + 1000 +} + +impl Default for ShardQueueConfig { + fn default() -> Self { + Self { + shard_count: default_shard_count(), + loop_interval_seconds: default_loop_interval_seconds(), + max_items_per_cycle: default_max_items_per_cycle(), + stream_maxlen: default_stream_maxlen(), + } + } +} + #[derive(Clone, serde::Deserialize, Debug)] pub struct GlobalConfig { pub server: Server, @@ -41,6 +80,8 @@ pub struct GlobalConfig { pub routing_config: Option, #[serde(default)] pub debit_routing_config: network_decider::types::DebitRoutingConfig, + #[serde(default)] + pub shard_queue: ShardQueueConfig, } #[derive(Clone, Debug)] diff --git a/src/decider/gatewaydecider/gw_scoring.rs b/src/decider/gatewaydecider/gw_scoring.rs index 966883e2..9fb8109e 100644 --- a/src/decider/gatewaydecider/gw_scoring.rs +++ b/src/decider/gatewaydecider/gw_scoring.rs @@ -1003,10 +1003,10 @@ pub async fn update_score_for_outage(decider_flow: &mut DeciderFlow<'_>) -> Gate let txn_detail = decider_flow.get().dpTxnDetail.clone(); let txn_card_info = decider_flow.get().dpTxnCardInfo.clone(); let merchant = decider_flow.get().dpMerchantAccount.clone(); - let scheduled_outage_validation_duration = - RService::findByNameFromRedis(C::ScheduledOutageValidationDuration.get_key()) - .await - .unwrap_or(86400); + let scheduled_outage_validation_duration = RService::findByNameFromRedisWithDefault( + C::ScheduledOutageValidationDuration.get_key(), + 86400, + ).await; let potential_outages = get_scheduled_outage(scheduled_outage_validation_duration).await; logger::debug!("updated score for outage {:?}", potential_outages); @@ -1794,21 +1794,20 @@ pub async fn get_gateway_wise_routing_inputs_for_merchant_sr( gateway_success_rate_merchant_input: Option, default_success_rate_based_routing_input: Option, ) -> GatewayWiseSuccessRateBasedRoutingInput { - let m_option = - RService::findByNameFromRedis(C::SrBasedGatewayEliminationThreshold.get_key()).await; - let default_soft_txn_reset_count = - RService::findByNameFromRedis(C::SR_BASED_TXN_RESET_COUNT.get_key()) - .await - .unwrap_or(C::GW_DEFAULT_TXN_SOFT_RESET_COUNT); + let default_elimination_threshold = RService::findByNameFromRedisWithDefault( + C::SrBasedGatewayEliminationThreshold.get_key(), + C::DEFAULT_SR_BASED_GATEWAY_ELIMINATION_THRESHOLD, + ).await; + let default_soft_txn_reset_count = RService::findByNameFromRedisWithDefault( + C::SR_BASED_TXN_RESET_COUNT.get_key(), + C::GW_DEFAULT_TXN_SOFT_RESET_COUNT, + ).await; let is_elimination_v2_enabled = is_feature_enabled( C::EnableEliminationV2.get_key(), merchant_acc.merchantId.0.clone(), "kv_redis".to_string(), ) .await; - - let default_elimination_threshold = - m_option.unwrap_or(C::DEFAULT_SR_BASED_GATEWAY_ELIMINATION_THRESHOLD); let merchant_given_default_threshold = gateway_success_rate_merchant_input .clone() .map(|input| input.defaultEliminationThreshold); diff --git a/src/lib.rs b/src/lib.rs index 564b2735..ade622c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub mod metrics; pub mod middleware; pub mod redis; pub mod routes; +pub mod shard_queue; pub mod storage; pub mod tenant; pub mod types; diff --git a/src/redis/cache.rs b/src/redis/cache.rs index 8bc6e7ae..e3a472f6 100644 --- a/src/redis/cache.rs +++ b/src/redis/cache.rs @@ -1,6 +1,6 @@ -use crate::types::service_configuration; -use crate::utils::StringExt; +use crate::{logger, utils::StringExt}; use serde::Deserialize; +use crate::shard_queue::GLOBAL_SHARD_QUEUE_HANDLER; // Converted type synonyms // Original Haskell type: KVDBName @@ -53,8 +53,44 @@ where { findByNameFromRedisHelper(key, Some(decode_fn)).await } +pub async fn findByNameFromRedisWithDefault( + key: String, + default_value: A, +) -> A +where + A: for<'de> Deserialize<'de> + serde::Serialize + Clone, +{ + // First try to get from existing cache/DB + if let Some(value) = findByNameFromRedis(key.clone()).await { + return value; + } + + // Config not found in cache or DB, cache the default value + logger::debug!("Config '{}' not found, caching default value", key); + + // Create ServiceConfiguration with default value + let default_config = crate::storage::types::ServiceConfiguration { + id: 0, // Placeholder ID since we're not storing in DB + name: key.clone(), + value: Some(serde_json::to_string(&default_value).unwrap_or_else(|_| "null".to_string())), + new_value: None, + previous_value: None, + new_value_status: None, + }; + + // Push to shard queue for IMC caching + if let Ok(config_json) = serde_json::to_value(&default_config) { + let queue_item = crate::shard_queue::types::ShardQueueItem::new(key.clone(), config_json); + if let Err(e) = GLOBAL_SHARD_QUEUE_HANDLER.push_to_shard(queue_item).await { + logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e); + } else { + logger::debug!("Cached default value for config '{}' in IMC", key); + } + } + + default_value +} -// Original Haskell function: findByNameFromRedisHelper pub async fn findByNameFromRedisHelper( key: String, decode_fn: Option Option>, @@ -62,23 +98,46 @@ pub async fn findByNameFromRedisHelper( where A: for<'de> Deserialize<'de>, { - let res = service_configuration::find_config_by_name(key).await; - - match res { - Ok(m_service_config) => match m_service_config { - Some(service_config) => match service_config.value { - Some(value) => match decode_fn { - Some(func) => func(value), - None => None, - }, + use crate::shard_queue::find_config_in_mem; + + if let Ok(service_config) = find_config_in_mem(&key) { + logger::debug!("Cache HIT: Found config '{}' in IMC", key); + + if let Some(value) = service_config.value { + return match decode_fn { + Some(func) => func(value), None => None, - }, - None => None, - }, - Err(_) => None, + }; + } } + crate::logger::debug!("Cache MISS: Config '{}' not found in IMC, checking DB", key); + + if let Ok(Some(config)) = crate::types::service_configuration::find_config_by_name(key.clone()).await { + logger::debug!("DB HIT: Found config '{}' in database, pushing to shard queue for caching", key); + + if let Ok(config_json) = serde_json::to_value(&config) { + let queue_item = crate::shard_queue::ShardQueueItem::new(key.clone(), config_json); + if let Err(e) = GLOBAL_SHARD_QUEUE_HANDLER.push_to_shard(queue_item).await { + logger::warn!("Failed to push config '{}' to shard queue: {:?}", key, e); + } else { + logger::debug!("Pushed config '{}' to shard queue, polling will cache in IMC", key); + } + } + + if let Some(value) = config.value { + return match decode_fn { + Some(func) => func(value), + None => None, + }; + } + } else { + logger::debug!("DB MISS: Config '{}' not found in database", key); + } + + None } + pub fn extractValue(value: String) -> Option where A: for<'de> Deserialize<'de>, diff --git a/src/redis/commands.rs b/src/redis/commands.rs index fd8d5ae7..fcae2bc0 100644 --- a/src/redis/commands.rs +++ b/src/redis/commands.rs @@ -97,6 +97,19 @@ impl RedisConnectionWrapper { .change_context(errors::RedisError::PopListElementsFailed) } + pub async fn get_range_from_list( + &self, + key: &str, + start: i64, + end: i64, + ) -> Result, errors::RedisError> { + self.conn + .pool + .lrange(key, start, end) + .await + .change_context(errors::RedisError::GetFailed) + } + pub async fn delete_key(&self, key: &str) -> Result { self.conn .pool @@ -174,4 +187,80 @@ impl RedisConnectionWrapper { .await .change_context(errors::RedisError::UnknownResult) } + + /// Add entry to Redis stream with MAXLEN using raw command + /// Example: XADD shard_stream_0 MAXLEN 1000 * key value + pub async fn xadd_with_maxlen( + &self, + stream_key: &str, + maxlen: u64, + fields: Vec, + ) -> Result { + use fred::interfaces::ClientLike; + use fred::types::CustomCommand; + + // Build raw Redis command: XADD stream_key MAXLEN maxlen * field1 value1 field2 value2 ... + let mut args = vec![stream_key.to_string(), "MAXLEN".to_string(), maxlen.to_string(), "*".to_string()]; + args.extend(fields); + + self.conn + .pool + .custom(CustomCommand::new("XADD", stream_key, false), args) + .await + .change_context(errors::RedisError::SetHashFailed) + } + + /// Add entry to Redis stream with approximate MAXLEN (more efficient) + /// Example: XADD shard_stream_0 MAXLEN ~ 1000 * key value + pub async fn xadd_with_approximate_maxlen( + &self, + stream_key: &str, + maxlen: u64, + fields: Vec, + ) -> Result { + use fred::interfaces::ClientLike; + use fred::types::CustomCommand; + + // Build raw Redis command: XADD stream_key MAXLEN ~ maxlen * field1 value1 field2 value2 ... + let mut args = vec![stream_key.to_string(), "MAXLEN".to_string(), "~".to_string(), maxlen.to_string(), "*".to_string()]; + args.extend(fields); + + self.conn + .pool + .custom(CustomCommand::new("XADD", stream_key, false), args) + .await + .change_context(errors::RedisError::SetHashFailed) + } + + /// Read entries from Redis stream using XRANGE + /// Example: XRANGE shard_stream_0 1-0+ + + pub async fn xrange( + &self, + stream_key: &str, + start: &str, + end: &str, + count: Option, + ) -> Result)>, errors::RedisError> { + use fred::interfaces::StreamsInterface; + + self.conn + .pool + .xrange(stream_key, start, end, count) + .await + .change_context(errors::RedisError::GetFailed) + } + + /// Get stream length using XLEN + pub async fn xlen(&self, stream_key: &str) -> Result { + use fred::interfaces::StreamsInterface; + + let len: u64 = self + .conn + .pool + .xlen(stream_key) + .await + .change_context(errors::RedisError::GetFailed)?; + + Ok(len) + } } diff --git a/src/shard_queue/handler.rs b/src/shard_queue/handler.rs new file mode 100644 index 00000000..22744843 --- /dev/null +++ b/src/shard_queue/handler.rs @@ -0,0 +1,399 @@ +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; + +use once_cell::sync::Lazy; +use tokio::{sync::mpsc, time}; + +use crate::{ + app::get_tenant_app_state, + generics::{MeshError, StorageResult}, + logger, +}; + +use super::types::{ShardMetadata, ShardQueueError, ShardQueueItem, ShardQueueResult}; + +// Use our Registry pattern for service configuration caching +pub static GLOBAL_SHARD_REGISTRY: Lazy = + Lazy::new(|| super::registry::Registry::new(1000)); + +/// Handler for the sharded queue system, following your existing patterns +#[derive(Clone)] +pub struct ShardedQueueHandler { + inner: Arc, +} + +impl std::ops::Deref for ShardedQueueHandler { + type Target = ShardedQueueHandlerInner; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +pub struct ShardedQueueHandlerInner { + /// Metadata for each shard with last_modified + shard_metadata: Arc>>, + /// Polling interval from configuration + loop_interval: Duration, + /// Running state for graceful shutdown + running: Arc, + /// Configuration settings + config: crate::config::ShardQueueConfig, +} + +impl ShardedQueueHandler { + /// Create new handler with configuration + pub fn new(config: crate::config::ShardQueueConfig) -> Self { + let mut shard_metadata = HashMap::new(); + + // Initialize metadata for configured number of shards + for shard_id in 0..config.shard_count { + shard_metadata.insert(shard_id, ShardMetadata::new()); + } + + let inner = ShardedQueueHandlerInner { + shard_metadata: Arc::new(Mutex::new(shard_metadata)), + loop_interval: Duration::from_secs(config.loop_interval_seconds), + running: Arc::new(AtomicBool::new(true)), + config: config.clone(), + }; + + Self { + inner: Arc::new(inner), + } + } + + /// Calculate shard ID using hash modulo configured shard count + pub fn get_shard_id(&self, key: &str) -> u8 { + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + key.hash(&mut hasher); + (hasher.finish() % (self.config.shard_count as u64)) as u8 + } + + /// Push item to appropriate Redis shard stream + pub async fn push_to_shard(&self, item: ShardQueueItem) -> ShardQueueResult<()> { + let shard_id = self.get_shard_id(&item.key); + let stream_name = format!("shard_stream_{}", shard_id); + + let app_state = get_tenant_app_state().await; + let redis_conn = app_state.redis_conn.clone(); + + // Use the service configuration name as the key and value as the stream field + // Format: XADD shard_stream_0 MAXLEN 100 * service_config_name service_config_value + let serialized_value = serde_json::to_string(&item.value) + .map_err(|e| ShardQueueError::QueueError(format!("Serialization error: {}", e)))?; + + let fields = vec![item.key.clone(), serialized_value]; + + let entry_id = redis_conn + .xadd_with_maxlen(&stream_name, self.config.stream_maxlen, fields) + .await + .map_err(|e| ShardQueueError::QueueError(format!("Redis stream push failed: {:?}", e)))?; + + logger::debug!( + "Item pushed to Redis shard stream {}: key={}, entry_id={}", + shard_id, + item.key, + entry_id + ); + Ok(()) + } + + /// Start the polling thread + pub async fn spawn(&self) -> ShardQueueResult<()> { + logger::info!( + "Shard queue polling thread started, checking every {} seconds with {} shards", + self.loop_interval.as_secs(), + self.config.shard_count + ); + + while self.running.load(Ordering::SeqCst) { + logger::debug!("Shard queue polling cycle started"); + + // Process all configured shards + for shard_id in 0..self.config.shard_count { + if let Err(e) = self.process_shard(shard_id).await { + logger::error!("Failed to process shard {}: {:?}", shard_id, e); + } + } + + // Sleep for configured interval + time::sleep(self.loop_interval).await; + } + + Ok(()) + } + + /// Process a single shard - poll items from Redis stream using entry IDs + async fn process_shard(&self, shard_id: u8) -> ShardQueueResult<()> { + let app_state = get_tenant_app_state().await; + let redis_conn = app_state.redis_conn.clone(); + let stream_name = format!("shard_stream_{}", shard_id); + + let last_processed_entry_id = { + let metadata = self.shard_metadata.lock().map_err(|e| { + ShardQueueError::QueueError(format!("Failed to acquire metadata lock: {}", e)) + })?; + + metadata + .get(&shard_id) + .map(|meta| meta.last_processed_entry_id.clone()) + .unwrap_or_else(|| "0-0".to_string()) // Default to start from beginning + }; + + // Use XRANGE to get entries after the last processed entry ID + // Format: XRANGE shard_stream_0 1234567890123-1 + COUNT max_items_per_cycle + let start_range = if last_processed_entry_id == "0-0" { + "-".to_string() // Start from beginning of stream + } else { + // For Redis XRANGE, to start after the last processed ID, we increment the sequence part + // Stream IDs are in format: timestamp-sequence + if let Some((timestamp, sequence)) = last_processed_entry_id.split_once('-') { + if let Ok(seq_num) = sequence.parse::() { + format!("{}-{}", timestamp, seq_num + 1) + } else { + // If we can't parse the sequence, just use the ID as-is and let Redis handle it + last_processed_entry_id.clone() + } + } else { + // If the ID format is unexpected, start from beginning + "-".to_string() + } + }; + + let stream_entries = redis_conn + .xrange( + &stream_name, + &start_range, + "+", // Read to end + Some(self.config.max_items_per_cycle), + ) + .await + .map_err(|e| ShardQueueError::QueueError(format!("Redis stream read failed: {:?}", e)))?; + + if stream_entries.is_empty() { + return Ok(()); + } + + logger::debug!( + "Polled {} entries from Redis shard stream {}", + stream_entries.len(), + shard_id + ); + + let mut last_entry_id = String::new(); + let mut processed_count = 0; + + // Process stream entries + for (entry_id, fields) in stream_entries { + if !fields.is_empty() { + // Redis stream fields come as Vec<(field_name, field_value)> + // We expect the first field to be the service_config_name and value to be service_config_value + let (service_config_name, service_config_value) = &fields[0]; + + // Parse the service configuration value as JSON + match serde_json::from_str::(service_config_value) { + Ok(parsed_value) => { + // Convert to ServiceConfiguration for IMC storage + match serde_json::from_value::(parsed_value) { + Ok(service_config) => { + // Store ServiceConfiguration in global registry with 600 second TTL + if let Err(_) = GLOBAL_SHARD_REGISTRY.store(service_config_name.clone(), service_config, Some(600)) { + logger::error!("Failed to store ServiceConfiguration in registry: {}", service_config_name); + } else { + logger::debug!("Stored ServiceConfiguration in IMC: {}", service_config_name); + processed_count += 1; + } + } + Err(e) => { + logger::error!("Failed to deserialize ServiceConfiguration for key {}: {}", service_config_name, e); + } + } + } + Err(e) => { + logger::error!("Failed to parse JSON value for key {}: {}", service_config_name, e); + } + } + } else { + logger::warn!("Invalid stream entry format for entry {}: no fields found", entry_id); + } + + last_entry_id = entry_id; + } + + if processed_count > 0 { + logger::debug!( + "Processed {} new items from Redis shard stream {} (last_entry_id: {})", + processed_count, + shard_id, + last_entry_id + ); + + // Update shard metadata with the last processed entry ID + { + let mut metadata = self.shard_metadata.lock().map_err(|e| { + ShardQueueError::QueueError(format!("Failed to acquire metadata lock: {}", e)) + })?; + + if let Some(shard_meta) = metadata.get_mut(&shard_id) { + shard_meta.update_last_processed_entry_id(last_entry_id.clone()); + logger::debug!("Updated last_processed_entry_id for shard {} to {}", shard_id, last_entry_id); + } + } + } + + Ok(()) + } + + /// Get shard metadata + pub fn get_shard_metadata(&self, shard_id: u8) -> ShardQueueResult> { + let metadata = self.shard_metadata.lock().map_err(|e| { + ShardQueueError::QueueError(format!("Failed to acquire metadata lock: {}", e)) + })?; + + Ok(metadata.get(&shard_id).cloned()) + } + + /// Get all shard metadata + pub fn get_all_shard_metadata(&self) -> ShardQueueResult> { + let metadata = self.shard_metadata.lock().map_err(|e| { + ShardQueueError::QueueError(format!("Failed to acquire metadata lock: {}", e)) + })?; + + Ok(metadata.clone()) + } + + /// Get stream sizes for all Redis-backed shards + pub async fn get_queue_sizes(&self) -> ShardQueueResult> { + let app_state = get_tenant_app_state().await; + let redis_conn = app_state.redis_conn.clone(); + + let mut sizes = HashMap::new(); + + // Check stream length for each configured shard + for shard_id in 0..self.config.shard_count { + let stream_name = format!("shard_stream_{}", shard_id); + + match redis_conn.xlen(&stream_name).await { + Ok(size) => { + sizes.insert(shard_id, size as usize); + } + Err(e) => { + logger::warn!("Failed to get size for shard stream {}: {:?}", shard_id, e); + sizes.insert(shard_id, 0); // Default to 0 if we can't get the size + } + } + } + + Ok(sizes) + } + + /// Close the handler - similar to drainer close() + pub fn close(&self) { + self.running.store(false, Ordering::SeqCst); + } + + /// Shutdown listener - similar to drainer shutdown_listener() + pub async fn shutdown_listener(&self, mut rx: mpsc::Receiver<()>) { + while let Some(_) = rx.recv().await { + logger::info!("Shutdown signal received for shard queue handler"); + rx.close(); + self.close(); + break; + } + logger::info!("Shard queue handler shutdown completed"); + } + + /// Check if handler is running + pub fn is_running(&self) -> bool { + self.running.load(Ordering::SeqCst) + } +} + +/// IMC functions following your existing pattern for service_configuration caching +pub fn find_config_in_mem(key: &str) -> StorageResult { + match GLOBAL_SHARD_REGISTRY.get::(key) { + Ok(value) => Ok(value), + Err(_) => Err(MeshError::Others), + } +} + +pub fn store_config_in_mem(key: String, value: crate::storage::types::ServiceConfiguration) -> StorageResult<()> { + GLOBAL_SHARD_REGISTRY + .store(key, value, Some(600)) + .map_err(|_| MeshError::Others) +} + +impl Default for ShardedQueueHandler { + fn default() -> Self { + Self::new(crate::config::ShardQueueConfig::default()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_shard_calculation() { + let config = crate::config::ShardQueueConfig::default(); + let handler = ShardedQueueHandler::new(config.clone()); + + // Test that the same key always goes to the same shard + let shard1 = handler.get_shard_id("test_key"); + let shard2 = handler.get_shard_id("test_key"); + assert_eq!(shard1, shard2); + + // Test that shard is within range of configured shard count + assert!(shard1 < config.shard_count); + } + + #[tokio::test] + async fn test_push_and_get_sizes() { + let config = crate::config::ShardQueueConfig::default(); + let handler = ShardedQueueHandler::new(config); + + let item = ShardQueueItem::new("test_key".to_string(), json!({"data": "test"})); + let result = handler.push_to_shard(item).await; + + assert!(result.is_ok()); + + let sizes = handler.get_queue_sizes().await.unwrap(); + let total_items: usize = sizes.values().sum(); + // Note: This test may fail in actual test environment without Redis setup + // assert_eq!(total_items, 1); + assert!(total_items >= 0); + } + + #[test] + fn test_imc_operations() { + let key = "test_config_key"; + let service_config = crate::storage::types::ServiceConfiguration { + id: 1, + name: key.to_string(), + value: Some(r#"{"config": "value"}"#.to_string()), + new_value: None, + previous_value: None, + new_value_status: None, + }; + + // Store in IMC + let store_result = store_config_in_mem(key.to_string(), service_config.clone()); + assert!(store_result.is_ok()); + + // Retrieve from IMC + let retrieved = find_config_in_mem(key); + assert!(retrieved.is_ok()); + let retrieved_config = retrieved.unwrap(); + assert_eq!(retrieved_config.name, service_config.name); + assert_eq!(retrieved_config.value, service_config.value); + } +} diff --git a/src/shard_queue/mod.rs b/src/shard_queue/mod.rs new file mode 100644 index 00000000..d5b16c1d --- /dev/null +++ b/src/shard_queue/mod.rs @@ -0,0 +1,12 @@ +pub mod handler; +pub mod registry; +pub mod types; + +pub use handler::*; +pub use registry::*; +pub use types::*; + +use once_cell::sync::Lazy; + +pub static GLOBAL_SHARD_QUEUE_HANDLER: Lazy = + Lazy::new(|| handler::ShardedQueueHandler::new(crate::config::ShardQueueConfig::default())); diff --git a/src/shard_queue/registry.rs b/src/shard_queue/registry.rs new file mode 100644 index 00000000..d00a16b3 --- /dev/null +++ b/src/shard_queue/registry.rs @@ -0,0 +1,166 @@ +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +/// Simple registry for caching with TTL support +#[derive(Debug)] +pub struct Registry { + data: Arc>>, + max_size: usize, +} + +#[derive(Debug, Clone)] +struct CacheEntry { + value: serde_json::Value, + expires_at: Option, +} + +impl Registry { + pub fn new(max_size: usize) -> Self { + Self { + data: Arc::new(RwLock::new(HashMap::new())), + max_size, + } + } + + pub fn get(&self, key: &str) -> Result + where + T: serde::de::DeserializeOwned, + { + // Try read lock first (fast path for non-expired entries) + { + let data = self.data.read().map_err(|e| format!("Read lock error: {}", e))?; + + if let Some(entry) = data.get(key) { + // Check if entry has expired + if let Some(expires_at) = entry.expires_at { + if Instant::now() > expires_at { + // Entry expired, need to remove it (drop read lock first) + drop(data); + } else { + // Entry is valid, return it + return serde_json::from_value(entry.value.clone()) + .map_err(|e| format!("Deserialization error: {}", e)); + } + } else { + // No expiration, return it + return serde_json::from_value(entry.value.clone()) + .map_err(|e| format!("Deserialization error: {}", e)); + } + } else { + return Err("Key not found".to_string()); + } + } + + // If we get here, the entry was expired and we need to remove it + let mut data = self.data.write().map_err(|e| format!("Write lock error: {}", e))?; + + // Double-check the entry is still there and expired + if let Some(entry) = data.get(key) { + if let Some(expires_at) = entry.expires_at { + if Instant::now() > expires_at { + data.remove(key); + return Err("Key expired".to_string()); + } + // If not expired anymore, return the value + return serde_json::from_value(entry.value.clone()) + .map_err(|e| format!("Deserialization error: {}", e)); + } + } + + Err("Key not found".to_string()) + } + + pub fn store(&self, key: String, value: T, ttl_seconds: Option) -> Result<(), String> + where + T: serde::Serialize, + { + let mut data = self.data.write().map_err(|e| format!("Write lock error: {}", e))?; + + // Remove expired entries and enforce max size + self.cleanup_expired(&mut data); + + if data.len() >= self.max_size { + // Remove oldest entry (simple eviction policy) + if let Some(oldest_key) = data.keys().next().cloned() { + data.remove(&oldest_key); + } + } + + let json_value = serde_json::to_value(value) + .map_err(|e| format!("Serialization error: {}", e))?; + + let expires_at = ttl_seconds.map(|ttl| Instant::now() + Duration::from_secs(ttl)); + + data.insert(key, CacheEntry { + value: json_value, + expires_at, + }); + + Ok(()) + } + + fn cleanup_expired(&self, data: &mut HashMap) { + let now = Instant::now(); + data.retain(|_, entry| { + if let Some(expires_at) = entry.expires_at { + now <= expires_at + } else { + true + } + }); + } + + pub fn remove(&self, key: &str) -> Result<(), String> { + let mut data = self.data.write().map_err(|e| format!("Write lock error: {}", e))?; + data.remove(key); + Ok(()) + } + + pub fn size(&self) -> usize { + self.data.read().unwrap().len() + } + + pub fn clear(&self) -> Result<(), String> { + let mut data = self.data.write().map_err(|e| format!("Write lock error: {}", e))?; + data.clear(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_registry_basic_operations() { + let registry = Registry::new(100); + + // Test store and get + let value = json!({"test": "data"}); + assert!(registry.store("test_key".to_string(), value.clone(), Some(60)).is_ok()); + + let retrieved: serde_json::Value = registry.get("test_key").unwrap(); + assert_eq!(retrieved, value); + + // Test non-existent key + let result: Result = registry.get("non_existent"); + assert!(result.is_err()); + } + + #[test] + fn test_registry_ttl() { + let registry = Registry::new(100); + + // Store with 0 TTL (should expire immediately) + let value = json!({"test": "data"}); + registry.store("test_key".to_string(), value, Some(0)).unwrap(); + + // Give it a moment to expire + std::thread::sleep(std::time::Duration::from_millis(1)); + + let result: Result = registry.get("test_key"); + assert!(result.is_err()); + } +} diff --git a/src/shard_queue/types.rs b/src/shard_queue/types.rs new file mode 100644 index 00000000..ee34edba --- /dev/null +++ b/src/shard_queue/types.rs @@ -0,0 +1,56 @@ +use std::collections::HashMap; +use chrono::{DateTime, Utc}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct ShardQueueItem { + pub key: String, + pub value: serde_json::Value, + pub modified_at: DateTime, +} + +impl ShardQueueItem { + pub fn new(key: String, value: serde_json::Value) -> Self { + Self { + key, + value, + modified_at: Utc::now(), + } + } +} + +#[derive(Debug, Clone)] +pub struct ShardMetadata { + /// Last processed entry ID from Redis stream (e.g., "1-0") + pub last_processed_entry_id: String, +} + +impl ShardMetadata { + pub fn new() -> Self { + Self { + // Start from "0-0" to process all entries from beginning + last_processed_entry_id: "0-0".to_string(), + } + } + + pub fn update_last_processed_entry_id(&mut self, entry_id: String) { + self.last_processed_entry_id = entry_id; + } +} + +impl Default for ShardMetadata { + fn default() -> Self { + Self::new() + } +} + +pub type InMemoryCache = HashMap; + +#[derive(Debug, thiserror::Error)] +pub enum ShardQueueError { + #[error("Invalid shard ID: {0}")] + InvalidShardId(u8), + #[error("Queue error: {0}")] + QueueError(String), +} + +pub type ShardQueueResult = Result; diff --git a/src/storage/types.rs b/src/storage/types.rs index 0103a758..4df4dfec 100644 --- a/src/storage/types.rs +++ b/src/storage/types.rs @@ -523,7 +523,7 @@ pub struct PaymentMethod { pub payment_dsl: Option, } -#[derive(Debug, Clone, Identifiable, Queryable)] +#[derive(Debug, Clone, Identifiable, Queryable, Serialize, Deserialize)] #[cfg_attr(feature = "mysql", diesel(table_name = schema::service_configuration))] #[cfg_attr(feature = "postgres", diesel(table_name = schema_pg::service_configuration))] pub struct ServiceConfiguration { diff --git a/src/types/service_configuration.rs b/src/types/service_configuration.rs index 0b51c77b..c2b2830c 100644 --- a/src/types/service_configuration.rs +++ b/src/types/service_configuration.rs @@ -5,9 +5,13 @@ use crate::storage::schema::service_configuration::dsl; use crate::storage::schema_pg::service_configuration::dsl; use diesel::associations::HasTable; use diesel::*; +use serde_json::json; use std::option::Option; use std::string::String; // use sequelize::{Clause::{Is, And}, Term::{Eq, In}}; +use crate::shard_queue::{ + find_config_in_mem, store_config_in_mem, ShardQueueItem, GLOBAL_SHARD_QUEUE_HANDLER, +}; use crate::storage::types::{ ServiceConfiguration, ServiceConfigurationNew, ServiceConfigurationUpdate, }; @@ -33,14 +37,25 @@ pub async fn insert_config( let app_state = get_tenant_app_state().await; let config = ServiceConfigurationNew { - name, - value, + name: name.clone(), + value: value.clone(), new_value: None, previous_value: None, new_value_status: None, }; - crate::generics::generic_insert(&app_state.db, config).await?; + let service_config = crate::generics::generic_insert(&app_state.db, config).await?; + + // Push to shard queue so IMC gets updated automatically via polling + // Store the ServiceConfiguration object directly as JSON value for shard queue + if let Ok(config_json) = serde_json::to_value(&service_config) { + let queue_item = ShardQueueItem::new(name.clone(), config_json); + if let Err(e) = GLOBAL_SHARD_QUEUE_HANDLER.push_to_shard(queue_item).await { + crate::logger::error!("Failed to push config '{}' to shard queue: {:?}", name, e); + } else { + crate::logger::debug!("Pushed config '{}' to shard queue for IMC update", name); + } + } Ok(()) }