diff --git a/backend/src/metrics.ts b/backend/src/metrics.ts index 2c8efc07..4f82bc32 100644 --- a/backend/src/metrics.ts +++ b/backend/src/metrics.ts @@ -197,3 +197,19 @@ export const endpointSloBreach = new Gauge({ export function recordSloBreachAlert(path: string, tier: string, type: string): void { endpointSloBreachTotal.inc({ path, tier, type }); } + +// --- Adaptive Throttle Metrics --- + +export const adaptiveThrottleBlockCount = new Counter({ + name: 'adaptive_throttle_block_count', + help: 'Total number of IPs blocked by adaptive throttle', + labelNames: ['using_redis'], + registers: [register], +}); + +export const adaptiveThrottleActiveBlocks = new Gauge({ + name: 'adaptive_throttle_active_blocks', + help: 'Current number of IPs actively blocked by adaptive throttle', + labelNames: ['using_redis'], + registers: [register], +}); diff --git a/backend/src/middleware/adaptiveThrottle.ts b/backend/src/middleware/adaptiveThrottle.ts index 11b081c7..3feff15a 100644 --- a/backend/src/middleware/adaptiveThrottle.ts +++ b/backend/src/middleware/adaptiveThrottle.ts @@ -1,5 +1,8 @@ import { NextFunction, Request, Response } from 'express'; import { logger } from './structuredLogging'; +import { redisClientManager } from '../rateLimiter'; +import { adaptiveThrottleBlockCount } from '../metrics'; +import type { Redis } from 'ioredis'; interface AbuseState { score: number; @@ -7,6 +10,7 @@ interface AbuseState { lastSeenAt: number; } +// In-memory fallback store const abuseByIp = new Map(); const HALFLIFE_MS = parseInt(process.env.ADAPTIVE_THROTTLE_HALFLIFE_MS || '300000', 10); @@ -14,6 +18,73 @@ const BASE_BLOCK_MS = parseInt(process.env.ADAPTIVE_THROTTLE_BASE_BLOCK_MS || '1 const SCORE_THRESHOLD = parseFloat(process.env.ADAPTIVE_THROTTLE_SCORE_THRESHOLD || '6'); const MAX_BLOCK_MS = parseInt(process.env.ADAPTIVE_THROTTLE_MAX_BLOCK_MS || '300000', 10); +// Redis-backed abuse store +class RedisAbuseStore { + private redis: Redis | null; + private keyPrefix = 'throttle:'; + + constructor() { + this.redis = redisClientManager.getClient(); + if (!this.redis) { + console.log( + JSON.stringify({ + level: 'warn', + event: 'adaptive_throttle_fallback', + message: 'REDIS_URL not set; adaptive throttle using in-memory store', + }) + ); + } + } + + private buildKey(ip: string): string { + return `${this.keyPrefix}${ip}`; + } + + async get(ip: string): Promise { + if (!this.redis || !redisClientManager.isReady()) { + return abuseByIp.get(ip) || null; + } + + try { + const key = this.buildKey(ip); + const data = await this.redis.get(key); + if (!data) return null; + return JSON.parse(data) as AbuseState; + } catch (err) { + logger.log('error', 'Redis get error in adaptive throttle', { + error: err instanceof Error ? err.message : String(err), + }); + return abuseByIp.get(ip) || null; + } + } + + async set(ip: string, state: AbuseState): Promise { + if (!this.redis || !redisClientManager.isReady()) { + abuseByIp.set(ip, state); + return; + } + + try { + const key = this.buildKey(ip); + const ttlMs = Math.max(state.blockedUntil - Date.now(), HALFLIFE_MS); + await this.redis.set(key, JSON.stringify(state), 'PX', ttlMs); + // Also store in memory as fallback + abuseByIp.set(ip, state); + } catch (err) { + logger.log('error', 'Redis set error in adaptive throttle', { + error: err instanceof Error ? err.message : String(err), + }); + abuseByIp.set(ip, state); + } + } + + isUsingRedis(): boolean { + return this.redis !== null && redisClientManager.isReady(); + } +} + +const abuseStore = new RedisAbuseStore(); + function getIp(req: Request): string { const forwarded = req.headers['x-forwarded-for']; if (typeof forwarded === 'string' && forwarded.trim().length > 0) { @@ -40,58 +111,67 @@ function scoreForStatus(statusCode: number): number { export function adaptiveThrottleMiddleware(req: Request, res: Response, next: NextFunction): void { const ip = getIp(req); const now = Date.now(); - const existing = abuseByIp.get(ip); - - if (existing) { - existing.score = decayScore(existing.score, now - existing.lastSeenAt); - existing.lastSeenAt = now; - - if (existing.blockedUntil > now) { - const retryAfter = Math.ceil((existing.blockedUntil - now) / 1000); - res.setHeader('Retry-After', retryAfter); - res.status(429).json({ - error: 'Too many invalid requests', - status: 429, - message: 'Adaptive throttle activated due to repeated invalid requests.', - retryAfter, - }); - return; - } - } - res.on('finish', () => { - if (res.statusCode < 400 || res.statusCode >= 500) { - return; + // Use async IIFE to handle Redis operations + void (async () => { + const existing = await abuseStore.get(ip); + + if (existing) { + existing.score = decayScore(existing.score, now - existing.lastSeenAt); + existing.lastSeenAt = now; + + if (existing.blockedUntil > now) { + const retryAfter = Math.ceil((existing.blockedUntil - now) / 1000); + res.setHeader('Retry-After', retryAfter); + res.status(429).json({ + error: 'Too many invalid requests', + status: 429, + message: 'Adaptive throttle activated due to repeated invalid requests.', + retryAfter, + }); + return; + } } - const state = abuseByIp.get(ip) || { - score: 0, - blockedUntil: 0, - lastSeenAt: now, - }; - - const current = Date.now(); - state.score = decayScore(state.score, current - state.lastSeenAt); - state.lastSeenAt = current; - state.score += scoreForStatus(res.statusCode); - - if (state.score >= SCORE_THRESHOLD) { - const multiplier = Math.max(1, Math.floor(state.score / SCORE_THRESHOLD)); - const blockMs = Math.min(MAX_BLOCK_MS, BASE_BLOCK_MS * multiplier); - state.blockedUntil = current + blockMs; - - logger.log('warn', 'Adaptive throttle triggered', { - ip, - score: Number(state.score.toFixed(2)), - blockMs, - path: req.path, - }); - } - - abuseByIp.set(ip, state); - }); - - next(); + res.on('finish', () => { + void (async () => { + if (res.statusCode < 400 || res.statusCode >= 500) { + return; + } + + const state = (await abuseStore.get(ip)) || { + score: 0, + blockedUntil: 0, + lastSeenAt: now, + }; + + const current = Date.now(); + state.score = decayScore(state.score, current - state.lastSeenAt); + state.lastSeenAt = current; + state.score += scoreForStatus(res.statusCode); + + if (state.score >= SCORE_THRESHOLD) { + const multiplier = Math.max(1, Math.floor(state.score / SCORE_THRESHOLD)); + const blockMs = Math.min(MAX_BLOCK_MS, BASE_BLOCK_MS * multiplier); + state.blockedUntil = current + blockMs; + + adaptiveThrottleBlockCount.inc({ using_redis: String(abuseStore.isUsingRedis()) }); + + logger.log('warn', 'Adaptive throttle triggered', { + ip, + score: Number(state.score.toFixed(2)), + blockMs, + path: req.path, + usingRedis: abuseStore.isUsingRedis(), + }); + } + + await abuseStore.set(ip, state); + })(); + }); + + next(); + })(); } export function resetAdaptiveThrottleStateForTests(): void { diff --git a/backend/src/middleware/cache.ts b/backend/src/middleware/cache.ts index 780c868e..d05513d2 100644 --- a/backend/src/middleware/cache.ts +++ b/backend/src/middleware/cache.ts @@ -223,6 +223,51 @@ export function cacheMiddleware(options: CacheOptions) { // ── Invalidation ───────────────────────────────────────────────────────────── +type InvalidationHook = (eventType: string, metadata?: Record) => string[]; + +const invalidationHooks: InvalidationHook[] = []; + +/** + * Register a hook that returns patterns to invalidate when a write event occurs. + * Hooks receive the event type and optional metadata and return an array of cache key patterns. + */ +export function registerInvalidationHook(hook: InvalidationHook): void { + invalidationHooks.push(hook); +} + +/** + * Trigger cache invalidation for a specific event type. + * All registered hooks are invoked and their returned patterns are invalidated. + */ +export function triggerCacheInvalidation( + eventType: string, + metadata?: Record, +): { patternsInvalidated: string[]; keysRemoved: number } { + const patterns: string[] = []; + + for (const hook of invalidationHooks) { + try { + const hookPatterns = hook(eventType, metadata); + patterns.push(...hookPatterns); + } catch (err) { + console.error( + JSON.stringify({ + level: 'error', + event: 'invalidation_hook_error', + error: err instanceof Error ? err.message : String(err), + }), + ); + } + } + + let totalRemoved = 0; + for (const pattern of patterns) { + totalRemoved += invalidateCache(pattern); + } + + return { patternsInvalidated: patterns, keysRemoved: totalRemoved }; +} + export function invalidateCache(pattern?: string): number { if (!pattern) { const count = responseCache.size; diff --git a/backend/src/retryBudget.ts b/backend/src/retryBudget.ts new file mode 100644 index 00000000..705f94ce --- /dev/null +++ b/backend/src/retryBudget.ts @@ -0,0 +1,143 @@ +/** + * Retry budget service for external RPC dependencies. + * Prevents runaway retries under upstream failure by tracking success/failure ratio. + */ + +import { logger } from './middleware/structuredLogging'; + +interface RetryBudgetConfig { + /** Maximum number of retries allowed per window */ + maxRetries: number; + /** Time window in milliseconds */ + windowMs: number; + /** Minimum success rate required (0.0 to 1.0) */ + minSuccessRate: number; + /** Number of consecutive failures before circuit opens */ + failureThreshold: number; +} + +interface RetryAttempt { + timestamp: number; + success: boolean; +} + +const DEFAULT_CONFIG: RetryBudgetConfig = { + maxRetries: parseInt(process.env.RETRY_BUDGET_MAX_RETRIES || '10', 10), + windowMs: parseInt(process.env.RETRY_BUDGET_WINDOW_MS || '60000', 10), + minSuccessRate: parseFloat(process.env.RETRY_BUDGET_MIN_SUCCESS_RATE || '0.5'), + failureThreshold: parseInt(process.env.RETRY_BUDGET_FAILURE_THRESHOLD || '5', 10), +}; + +class RetryBudgetService { + private attempts: RetryAttempt[] = []; + private consecutiveFailures = 0; + private config: RetryBudgetConfig; + + constructor(config: Partial = {}) { + this.config = { ...DEFAULT_CONFIG, ...config }; + } + + /** + * Check if a retry is allowed based on current budget. + * Returns true if retry is allowed, false if budget is exhausted. + */ + canRetry(): boolean { + this.pruneOldAttempts(); + + // Check retry count within window + const retryCount = this.attempts.filter((a) => !a.success).length; + if (retryCount >= this.config.maxRetries) { + logger.log('warn', 'Retry budget exhausted', { + retryCount, + maxRetries: this.config.maxRetries, + windowMs: this.config.windowMs, + }); + return false; + } + + // Check consecutive failures + if (this.consecutiveFailures >= this.config.failureThreshold) { + logger.log('warn', 'Retry budget failure threshold exceeded', { + consecutiveFailures: this.consecutiveFailures, + failureThreshold: this.config.failureThreshold, + }); + return false; + } + + // Check success rate + const totalAttempts = this.attempts.length; + if (totalAttempts > 0) { + const successCount = this.attempts.filter((a) => a.success).length; + const successRate = successCount / totalAttempts; + if (successRate < this.config.minSuccessRate) { + logger.log('warn', 'Retry budget success rate too low', { + successRate: Number(successRate.toFixed(2)), + minSuccessRate: this.config.minSuccessRate, + }); + return false; + } + } + + return true; + } + + /** + * Record a retry attempt result. + */ + recordAttempt(success: boolean): void { + const attempt: RetryAttempt = { + timestamp: Date.now(), + success, + }; + this.attempts.push(attempt); + + if (success) { + this.consecutiveFailures = 0; + } else { + this.consecutiveFailures++; + } + + this.pruneOldAttempts(); + } + + /** + * Get current retry budget stats. + */ + getStats(): { + remainingRetries: number; + consecutiveFailures: number; + successRate: number; + totalAttempts: number; + } { + this.pruneOldAttempts(); + const totalAttempts = this.attempts.length; + const successCount = this.attempts.filter((a) => a.success).length; + const retryCount = this.attempts.filter((a) => !a.success).length; + + return { + remainingRetries: Math.max(0, this.config.maxRetries - retryCount), + consecutiveFailures: this.consecutiveFailures, + successRate: totalAttempts > 0 ? successCount / totalAttempts : 1.0, + totalAttempts, + }; + } + + /** + * Reset the retry budget (for testing or manual intervention). + */ + reset(): void { + this.attempts = []; + this.consecutiveFailures = 0; + } + + private pruneOldAttempts(): void { + const cutoff = Date.now() - this.config.windowMs; + this.attempts = this.attempts.filter((a) => a.timestamp > cutoff); + } +} + +// Singleton instance for Soroban RPC retries +export const sorobanRetryBudget = new RetryBudgetService(); + +// Export for testing and custom instances +export { RetryBudgetService, RetryBudgetConfig }; diff --git a/backend/src/sorobanClient.ts b/backend/src/sorobanClient.ts index 54f613a6..c9f92496 100644 --- a/backend/src/sorobanClient.ts +++ b/backend/src/sorobanClient.ts @@ -16,6 +16,7 @@ import { } from '@stellar/stellar-sdk'; import { logger } from './middleware/structuredLogging'; import { getCurrentTraceId } from './tracing'; +import { sorobanRetryBudget } from './retryBudget'; // Well-known Stellar network passphrases (avoids importing Networks which is // not consistently re-exported across stellar-sdk minor versions). @@ -103,6 +104,60 @@ export class SorobanSimulationError extends Error implements SorobanTxError { // ─── Core RPC call ──────────────────────────────────────────────────────────── +const MAX_RPC_RETRIES = parseInt(process.env.SOROBAN_MAX_RETRIES || '3', 10); +const RETRY_DELAY_MS = parseInt(process.env.SOROBAN_RETRY_DELAY_MS || '1000', 10); + +/** + * Retry helper with exponential backoff and budget control. + */ +async function retryWithBudget( + operation: () => Promise, + operationName: string, + maxRetries: number = MAX_RPC_RETRIES, +): Promise { + let lastError: Error | null = null; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + const result = await operation(); + sorobanRetryBudget.recordAttempt(true); + return result; + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + + // Don't retry on validation errors + if (err instanceof SorobanSimulationError && err.statusCode === 422) { + sorobanRetryBudget.recordAttempt(false); + throw err; + } + + const isLastAttempt = attempt === maxRetries; + if (isLastAttempt || !sorobanRetryBudget.canRetry()) { + sorobanRetryBudget.recordAttempt(false); + logger.log('error', `${operationName} failed after ${attempt + 1} attempts`, { + error: lastError.message, + retryBudgetStats: sorobanRetryBudget.getStats(), + traceId: getCurrentTraceId(), + }); + throw lastError; + } + + const delay = RETRY_DELAY_MS * Math.pow(2, attempt); + logger.log('warn', `${operationName} failed, retrying`, { + attempt: attempt + 1, + maxRetries, + delayMs: delay, + error: lastError.message, + traceId: getCurrentTraceId(), + }); + + await new Promise((resolve) => setTimeout(resolve, delay)); + } + } + + throw lastError || new Error(`${operationName} failed with no error`); +} + /** * Submit a Soroban vault contract invocation (deposit or withdrawal) to the * Stellar network. Steps: @@ -177,7 +232,10 @@ export async function submitVaultOperation( traceId: getCurrentTraceId(), }); - const simulated = await rpcClient.simulateTransaction(tx); + const simulated = await retryWithBudget( + () => rpcClient.simulateTransaction(tx), + 'Soroban simulation', + ); if (rpc.Api.isSimulationError(simulated)) { const errorMessage = `Soroban simulation failed: ${ @@ -213,7 +271,10 @@ export async function submitVaultOperation( traceId: getCurrentTraceId(), }); - const txResponse = await rpcClient.sendTransaction(assembled); + const txResponse = await retryWithBudget( + () => rpcClient.sendTransaction(assembled), + 'Soroban transaction submission', + ); if (txResponse.status === 'ERROR') { const detail = txResponse.errorResult?.toXDR?.('base64') ?? 'unknown error'; diff --git a/backend/src/vaultEndpoints.ts b/backend/src/vaultEndpoints.ts index 0d465eed..d8abc139 100644 --- a/backend/src/vaultEndpoints.ts +++ b/backend/src/vaultEndpoints.ts @@ -2,7 +2,7 @@ import { Router, Request, Response, NextFunction } from 'express'; import { emailService } from './emailService'; import { logger } from './middleware/structuredLogging'; import { allowlistMiddleware } from './middleware/allowlist'; -import { invalidateCache } from './middleware/cache'; +import { triggerCacheInvalidation, registerInvalidationHook } from './middleware/cache'; import { depositsLimiter } from './rateLimiter'; import { idempotencyStore, IdempotencyConflictError } from './idempotency'; import { sorobanCircuitBreaker, CircuitOpenError } from './circuitBreaker'; @@ -30,11 +30,21 @@ const router = Router(); const ZERO = new Decimal(0); const DEFAULT_SHARE_PRICE = new Decimal(1); +// Register cache invalidation hooks for transaction state changes +registerInvalidationHook((eventType) => { + if (eventType.startsWith('transaction.')) { + return [ + 'GET:/api/v1/vault', + 'GET:/api/v1/transactions', + 'GET:/api/v1/portfolio', + ]; + } + return []; +}); + function invalidateReadCaches(_req: Request, _res: Response, next: NextFunction): void { - // R5: pattern-scoped invalidation — only clear vault, transactions, and portfolio entries - invalidateCache('GET:/api/v1/vault'); - invalidateCache('GET:/api/v1/transactions'); - invalidateCache('GET:/api/v1/portfolio'); + // Trigger adaptive cache invalidation via hooks + triggerCacheInvalidation('transaction.write'); next(); } @@ -271,18 +281,20 @@ async function handleVaultOperation( operation, ); if (replayed) res.setHeader('idempotency-status', 'replayed'); - // R5: pattern-scoped invalidation on successful write - invalidateCache('GET:/api/v1/vault'); - invalidateCache('GET:/api/v1/transactions'); - invalidateCache('GET:/api/v1/portfolio'); + // Trigger adaptive cache invalidation via hooks + triggerCacheInvalidation(`transaction.${type}.completed`, { + wallet: normalizedWallet, + amount: String(amount), + }); return res.status(result.statusCode).json(result.body); } const result = await operation(); - // R5: pattern-scoped invalidation on successful write - invalidateCache('GET:/api/v1/vault'); - invalidateCache('GET:/api/v1/transactions'); - invalidateCache('GET:/api/v1/portfolio'); + // Trigger adaptive cache invalidation via hooks + triggerCacheInvalidation(`transaction.${type}.completed`, { + wallet: normalizedWallet, + amount: String(amount), + }); return res.status(result.statusCode).json(result.body); } catch (err) { if (err instanceof IdempotencyConflictError) { diff --git a/frontend/src/context/ToastContext.tsx b/frontend/src/context/ToastContext.tsx index ff33cccb..27d7cc4e 100644 --- a/frontend/src/context/ToastContext.tsx +++ b/frontend/src/context/ToastContext.tsx @@ -7,12 +7,15 @@ export interface ToastOptions { description?: string; duration?: number; variant?: ToastVariant; + /** Unique key for deduplication. If not provided, deduplication uses title+description */ + dedupeKey?: string; } interface ToastItem extends ToastOptions { id: string; variant: ToastVariant; duration: number; + timestamp: number; } interface ToastContextType { @@ -22,11 +25,24 @@ interface ToastContextType { error: (options: Omit) => string; warning: (options: Omit) => string; info: (options: Omit) => string; + clearAll: () => void; } const ToastContext = createContext(undefined); const DEFAULT_DURATION = 5000; +const DEDUPE_WINDOW_MS = 3000; // Don't show duplicate toasts within 3 seconds + +/** + * Generate a deduplication key from toast content. + * Toasts with the same key within DEDUPE_WINDOW_MS are considered duplicates. + */ +function generateDedupeKey(options: ToastOptions): string { + if (options.dedupeKey) { + return options.dedupeKey; + } + return `${options.title}|${options.description || ''}|${options.variant || 'info'}`; +} export const ToastProvider: React.FC<{ children: React.ReactNode }> = ({ children, @@ -34,6 +50,8 @@ export const ToastProvider: React.FC<{ children: React.ReactNode }> = ({ const [toasts, setToasts] = useState([]); const nextToastId = useRef(0); const timeoutRefs = useRef>(new Map()); + // Track recent toast keys for deduplication + const recentToasts = useRef>(new Map()); const dismissToast = (id: string) => { setToasts((currentToasts) => currentToasts.filter((toast) => toast.id !== id)); @@ -45,17 +63,40 @@ export const ToastProvider: React.FC<{ children: React.ReactNode }> = ({ } }; + const clearAll = () => { + setToasts([]); + timeoutRefs.current.forEach((timeoutId) => { + window.clearTimeout(timeoutId); + }); + timeoutRefs.current.clear(); + }; + const showToast = ({ variant = "info", duration = DEFAULT_DURATION, ...options }: ToastOptions) => { + const dedupeKey = generateDedupeKey({ ...options, variant }); + const now = Date.now(); + + // Check for duplicate within dedupe window + const lastShown = recentToasts.current.get(dedupeKey); + if (lastShown && now - lastShown < DEDUPE_WINDOW_MS) { + // Duplicate detected - return the existing toast ID (we don't have it, so return empty) + // In a production system, you might want to bump the existing toast or extend its duration + return ''; + } + + // Record this toast for deduplication + recentToasts.current.set(dedupeKey, now); + nextToastId.current += 1; const id = `toast-${nextToastId.current}`; const nextToast: ToastItem = { id, variant, duration, + timestamp: now, ...options, }; @@ -69,6 +110,22 @@ export const ToastProvider: React.FC<{ children: React.ReactNode }> = ({ return id; }; + // Clean up old dedupe entries periodically + useEffect(() => { + const cleanupInterval = setInterval(() => { + const now = Date.now(); + const staleKeys: string[] = []; + recentToasts.current.forEach((timestamp, key) => { + if (now - timestamp > DEDUPE_WINDOW_MS) { + staleKeys.push(key); + } + }); + staleKeys.forEach((key) => recentToasts.current.delete(key)); + }, DEDUPE_WINDOW_MS); + + return () => clearInterval(cleanupInterval); + }, []); + useEffect(() => { const timeouts = timeoutRefs.current; @@ -85,6 +142,7 @@ export const ToastProvider: React.FC<{ children: React.ReactNode }> = ({ value={{ showToast, dismissToast, + clearAll, success: (options) => showToast({ ...options, variant: "success" }), error: (options) => showToast({ ...options, variant: "error" }), warning: (options) => showToast({ ...options, variant: "warning" }),