Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ once_cell = "1.19.0"
vaultrs = { version = "0.7.2", optional = true }
bb8 = "0.8"
rand_distr = "0.4"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
cpu-time = "1.0.0"
jemallocator = "0.5"
jemalloc-ctl = "0.5"
Expand Down
8 changes: 7 additions & 1 deletion src/bin/open_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.expect("Failed while building the metrics server")
});

let shard_queue_handle = tokio::spawn(async move {
open_router::shard_queue::GLOBAL_SHARD_QUEUE_HANDLER.spawn()
.await
.expect("Failed while running the shard queue handler")
});

// Wait for both servers to complete (they should run indefinitely)
tokio::try_join!(main_server_handle, metrics_server_handle)?;
tokio::try_join!(main_server_handle, metrics_server_handle, shard_queue_handle)?;

Ok(())
}
41 changes: 41 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,45 @@ use std::{
path::PathBuf,
};

#[derive(Clone, serde::Deserialize, Debug)]
pub struct ShardQueueConfig {
#[serde(default = "default_shard_count")]
pub shard_count: u8,
#[serde(default = "default_loop_interval_seconds")]
pub loop_interval_seconds: u64,
#[serde(default = "default_max_items_per_cycle")]
pub max_items_per_cycle: u64,
#[serde(default = "default_stream_maxlen")]
pub stream_maxlen: u64,
}

fn default_shard_count() -> u8 {
10
}

fn default_loop_interval_seconds() -> u64 {
10
}

fn default_max_items_per_cycle() -> u64 {
100
}

fn default_stream_maxlen() -> u64 {
1000
}

impl Default for ShardQueueConfig {
fn default() -> Self {
Self {
shard_count: default_shard_count(),
loop_interval_seconds: default_loop_interval_seconds(),
max_items_per_cycle: default_max_items_per_cycle(),
stream_maxlen: default_stream_maxlen(),
}
}
}

#[derive(Clone, serde::Deserialize, Debug)]
pub struct GlobalConfig {
pub server: Server,
Expand All @@ -41,6 +80,8 @@ pub struct GlobalConfig {
pub routing_config: Option<TomlConfig>,
#[serde(default)]
pub debit_routing_config: network_decider::types::DebitRoutingConfig,
#[serde(default)]
pub shard_queue: ShardQueueConfig,
}

#[derive(Clone, Debug)]
Expand Down
25 changes: 12 additions & 13 deletions src/decider/gatewaydecider/gw_scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,10 +1003,10 @@ pub async fn update_score_for_outage(decider_flow: &mut DeciderFlow<'_>) -> Gate
let txn_detail = decider_flow.get().dpTxnDetail.clone();
let txn_card_info = decider_flow.get().dpTxnCardInfo.clone();
let merchant = decider_flow.get().dpMerchantAccount.clone();
let scheduled_outage_validation_duration =
RService::findByNameFromRedis(C::ScheduledOutageValidationDuration.get_key())
.await
.unwrap_or(86400);
let scheduled_outage_validation_duration = RService::findByNameFromRedisWithDefault(
C::ScheduledOutageValidationDuration.get_key(),
86400,
).await;

let potential_outages = get_scheduled_outage(scheduled_outage_validation_duration).await;
logger::debug!("updated score for outage {:?}", potential_outages);
Expand Down Expand Up @@ -1794,21 +1794,20 @@ pub async fn get_gateway_wise_routing_inputs_for_merchant_sr(
gateway_success_rate_merchant_input: Option<GatewaySuccessRateBasedRoutingInput>,
default_success_rate_based_routing_input: Option<GatewaySuccessRateBasedRoutingInput>,
) -> GatewayWiseSuccessRateBasedRoutingInput {
let m_option =
RService::findByNameFromRedis(C::SrBasedGatewayEliminationThreshold.get_key()).await;
let default_soft_txn_reset_count =
RService::findByNameFromRedis(C::SR_BASED_TXN_RESET_COUNT.get_key())
.await
.unwrap_or(C::GW_DEFAULT_TXN_SOFT_RESET_COUNT);
let default_elimination_threshold = RService::findByNameFromRedisWithDefault(
C::SrBasedGatewayEliminationThreshold.get_key(),
C::DEFAULT_SR_BASED_GATEWAY_ELIMINATION_THRESHOLD,
).await;
let default_soft_txn_reset_count = RService::findByNameFromRedisWithDefault(
C::SR_BASED_TXN_RESET_COUNT.get_key(),
C::GW_DEFAULT_TXN_SOFT_RESET_COUNT,
).await;
let is_elimination_v2_enabled = is_feature_enabled(
C::EnableEliminationV2.get_key(),
merchant_acc.merchantId.0.clone(),
"kv_redis".to_string(),
)
.await;

let default_elimination_threshold =
m_option.unwrap_or(C::DEFAULT_SR_BASED_GATEWAY_ELIMINATION_THRESHOLD);
let merchant_given_default_threshold = gateway_success_rate_merchant_input
.clone()
.map(|input| input.defaultEliminationThreshold);
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod metrics;
pub mod middleware;
pub mod redis;
pub mod routes;
pub mod shard_queue;
pub mod storage;
pub mod tenant;
pub mod types;
Expand Down
91 changes: 75 additions & 16 deletions src/redis/cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::types::service_configuration;
use crate::utils::StringExt;
use crate::{logger, utils::StringExt};
use serde::Deserialize;
use crate::shard_queue::GLOBAL_SHARD_QUEUE_HANDLER;

// Converted type synonyms
// Original Haskell type: KVDBName
Expand Down Expand Up @@ -53,32 +53,91 @@ where
{
findByNameFromRedisHelper(key, Some(decode_fn)).await
}
pub async fn findByNameFromRedisWithDefault<A>(
key: String,
default_value: A,
) -> A
where
A: for<'de> Deserialize<'de> + serde::Serialize + Clone,
{
// First try to get from existing cache/DB
if let Some(value) = findByNameFromRedis(key.clone()).await {
return value;
}

// Config not found in cache or DB, cache the default value
logger::debug!("Config '{}' not found, caching default value", key);

// Create ServiceConfiguration with default value
let default_config = crate::storage::types::ServiceConfiguration {
id: 0, // Placeholder ID since we're not storing in DB
name: key.clone(),
value: Some(serde_json::to_string(&default_value).unwrap_or_else(|_| "null".to_string())),
new_value: None,
previous_value: None,
new_value_status: None,
};
Comment on lines +72 to +79
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serialization logic here may cause issues. You're serializing default_value to a JSON string and storing it in the value field of ServiceConfiguration. However, the value field is Option<String>, which typically stores a string representation. When this is later retrieved and deserialized, the code expects to deserialize from this string value, which could lead to double-encoding issues (JSON string containing a JSON string). Consider whether the value field should contain the direct JSON representation or if the deserialization logic needs adjustment.

Copilot uses AI. Check for mistakes.

// Push to shard queue for IMC caching
if let Ok(config_json) = serde_json::to_value(&default_config) {
let queue_item = crate::shard_queue::types::ShardQueueItem::new(key.clone(), config_json);
if let Err(e) = GLOBAL_SHARD_QUEUE_HANDLER.push_to_shard(queue_item).await {
logger::warn!("Failed to push default config '{}' to shard queue: {:?}", key, e);
} else {
logger::debug!("Cached default value for config '{}' in IMC", key);
}
}

default_value
}

// Original Haskell function: findByNameFromRedisHelper
pub async fn findByNameFromRedisHelper<A>(
key: String,
decode_fn: Option<impl Fn(String) -> Option<A>>,
) -> Option<A>
where
A: for<'de> Deserialize<'de>,
{
let res = service_configuration::find_config_by_name(key).await;

match res {
Ok(m_service_config) => match m_service_config {
Some(service_config) => match service_config.value {
Some(value) => match decode_fn {
Some(func) => func(value),
None => None,
},
use crate::shard_queue::find_config_in_mem;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import it in file top


if let Ok(service_config) = find_config_in_mem(&key) {
logger::debug!("Cache HIT: Found config '{}' in IMC", key);

if let Some(value) = service_config.value {
return match decode_fn {
Some(func) => func(value),
None => None,
},
None => None,
},
Err(_) => None,
};
}
}
crate::logger::debug!("Cache MISS: Config '{}' not found in IMC, checking DB", key);

if let Ok(Some(config)) = crate::types::service_configuration::find_config_by_name(key.clone()).await {
logger::debug!("DB HIT: Found config '{}' in database, pushing to shard queue for caching", key);

if let Ok(config_json) = serde_json::to_value(&config) {
let queue_item = crate::shard_queue::ShardQueueItem::new(key.clone(), config_json);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we pushing to the redis_queue, queue should be updated only during an update operation

if let Err(e) = GLOBAL_SHARD_QUEUE_HANDLER.push_to_shard(queue_item).await {
logger::warn!("Failed to push config '{}' to shard queue: {:?}", key, e);
} else {
logger::debug!("Pushed config '{}' to shard queue, polling will cache in IMC", key);
}
}

if let Some(value) = config.value {
return match decode_fn {
Some(func) => func(value),
None => None,
};
}
} else {
logger::debug!("DB MISS: Config '{}' not found in database", key);
}

None
}


pub fn extractValue<A>(value: String) -> Option<A>
where
A: for<'de> Deserialize<'de>,
Expand Down
89 changes: 89 additions & 0 deletions src/redis/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ impl RedisConnectionWrapper {
.change_context(errors::RedisError::PopListElementsFailed)
}

pub async fn get_range_from_list(
&self,
key: &str,
start: i64,
end: i64,
) -> Result<Vec<String>, errors::RedisError> {
self.conn
.pool
.lrange(key, start, end)
.await
.change_context(errors::RedisError::GetFailed)
}

pub async fn delete_key(&self, key: &str) -> Result<DelReply, errors::RedisError> {
self.conn
.pool
Expand Down Expand Up @@ -174,4 +187,80 @@ impl RedisConnectionWrapper {
.await
.change_context(errors::RedisError::UnknownResult)
}

/// Add entry to Redis stream with MAXLEN using raw command
/// Example: XADD shard_stream_0 MAXLEN 1000 * key value
pub async fn xadd_with_maxlen(
&self,
stream_key: &str,
maxlen: u64,
fields: Vec<String>,
) -> Result<String, errors::RedisError> {
use fred::interfaces::ClientLike;
use fred::types::CustomCommand;

// Build raw Redis command: XADD stream_key MAXLEN maxlen * field1 value1 field2 value2 ...
let mut args = vec![stream_key.to_string(), "MAXLEN".to_string(), maxlen.to_string(), "*".to_string()];
args.extend(fields);

self.conn
.pool
.custom(CustomCommand::new("XADD", stream_key, false), args)
.await
.change_context(errors::RedisError::SetHashFailed)
}

/// Add entry to Redis stream with approximate MAXLEN (more efficient)
/// Example: XADD shard_stream_0 MAXLEN ~ 1000 * key value
pub async fn xadd_with_approximate_maxlen(
&self,
stream_key: &str,
maxlen: u64,
fields: Vec<String>,
) -> Result<String, errors::RedisError> {
use fred::interfaces::ClientLike;
use fred::types::CustomCommand;

// Build raw Redis command: XADD stream_key MAXLEN ~ maxlen * field1 value1 field2 value2 ...
let mut args = vec![stream_key.to_string(), "MAXLEN".to_string(), "~".to_string(), maxlen.to_string(), "*".to_string()];
args.extend(fields);

self.conn
.pool
.custom(CustomCommand::new("XADD", stream_key, false), args)
.await
.change_context(errors::RedisError::SetHashFailed)
}

/// Read entries from Redis stream using XRANGE
/// Example: XRANGE shard_stream_0 1-0+ +
pub async fn xrange(
&self,
stream_key: &str,
start: &str,
end: &str,
count: Option<u64>,
) -> Result<Vec<(String, Vec<(String, String)>)>, errors::RedisError> {
use fred::interfaces::StreamsInterface;

self.conn
.pool
.xrange(stream_key, start, end, count)
.await
.change_context(errors::RedisError::GetFailed)
}

/// Get stream length using XLEN
pub async fn xlen(&self, stream_key: &str) -> Result<u64, errors::RedisError> {
use fred::interfaces::StreamsInterface;

let len: u64 = self
.conn
.pool
.xlen(stream_key)
.await
.change_context(errors::RedisError::GetFailed)?;

Ok(len)
}
}
Loading
Loading