Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions backend/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,19 @@ export const endpointSloBreach = new Gauge({
export function recordSloBreachAlert(path: string, tier: string, type: string): void {
endpointSloBreachTotal.inc({ path, tier, type });
}

// --- Adaptive Throttle Metrics ---

export const adaptiveThrottleBlockCount = new Counter({
name: 'adaptive_throttle_block_count',
help: 'Total number of IPs blocked by adaptive throttle',
labelNames: ['using_redis'],
registers: [register],
});

export const adaptiveThrottleActiveBlocks = new Gauge({
name: 'adaptive_throttle_active_blocks',
help: 'Current number of IPs actively blocked by adaptive throttle',
labelNames: ['using_redis'],
registers: [register],
});
178 changes: 129 additions & 49 deletions backend/src/middleware/adaptiveThrottle.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,90 @@
import { NextFunction, Request, Response } from 'express';
import { logger } from './structuredLogging';
import { redisClientManager } from '../rateLimiter';
import { adaptiveThrottleBlockCount } from '../metrics';
import type { Redis } from 'ioredis';

interface AbuseState {
score: number;
blockedUntil: number;
lastSeenAt: number;
}

// In-memory fallback store
const abuseByIp = new Map<string, AbuseState>();

const HALFLIFE_MS = parseInt(process.env.ADAPTIVE_THROTTLE_HALFLIFE_MS || '300000', 10);
const BASE_BLOCK_MS = parseInt(process.env.ADAPTIVE_THROTTLE_BASE_BLOCK_MS || '15000', 10);
const SCORE_THRESHOLD = parseFloat(process.env.ADAPTIVE_THROTTLE_SCORE_THRESHOLD || '6');
const MAX_BLOCK_MS = parseInt(process.env.ADAPTIVE_THROTTLE_MAX_BLOCK_MS || '300000', 10);

// Redis-backed abuse store
class RedisAbuseStore {
private redis: Redis | null;
private keyPrefix = 'throttle:';

constructor() {
this.redis = redisClientManager.getClient();
if (!this.redis) {
console.log(
JSON.stringify({
level: 'warn',
event: 'adaptive_throttle_fallback',
message: 'REDIS_URL not set; adaptive throttle using in-memory store',
})
);
}
}

private buildKey(ip: string): string {
return `${this.keyPrefix}${ip}`;
}

async get(ip: string): Promise<AbuseState | null> {
if (!this.redis || !redisClientManager.isReady()) {
return abuseByIp.get(ip) || null;
}

try {
const key = this.buildKey(ip);
const data = await this.redis.get(key);
if (!data) return null;
return JSON.parse(data) as AbuseState;
} catch (err) {
logger.log('error', 'Redis get error in adaptive throttle', {
error: err instanceof Error ? err.message : String(err),
});
return abuseByIp.get(ip) || null;
}
}

async set(ip: string, state: AbuseState): Promise<void> {
if (!this.redis || !redisClientManager.isReady()) {
abuseByIp.set(ip, state);
return;
}

try {
const key = this.buildKey(ip);
const ttlMs = Math.max(state.blockedUntil - Date.now(), HALFLIFE_MS);
await this.redis.set(key, JSON.stringify(state), 'PX', ttlMs);
// Also store in memory as fallback
abuseByIp.set(ip, state);
} catch (err) {
logger.log('error', 'Redis set error in adaptive throttle', {
error: err instanceof Error ? err.message : String(err),
});
abuseByIp.set(ip, state);
}
}

isUsingRedis(): boolean {
return this.redis !== null && redisClientManager.isReady();
}
}

const abuseStore = new RedisAbuseStore();

function getIp(req: Request): string {
const forwarded = req.headers['x-forwarded-for'];
if (typeof forwarded === 'string' && forwarded.trim().length > 0) {
Expand All @@ -40,58 +111,67 @@ function scoreForStatus(statusCode: number): number {
export function adaptiveThrottleMiddleware(req: Request, res: Response, next: NextFunction): void {
const ip = getIp(req);
const now = Date.now();
const existing = abuseByIp.get(ip);

if (existing) {
existing.score = decayScore(existing.score, now - existing.lastSeenAt);
existing.lastSeenAt = now;

if (existing.blockedUntil > now) {
const retryAfter = Math.ceil((existing.blockedUntil - now) / 1000);
res.setHeader('Retry-After', retryAfter);
res.status(429).json({
error: 'Too many invalid requests',
status: 429,
message: 'Adaptive throttle activated due to repeated invalid requests.',
retryAfter,
});
return;
}
}

res.on('finish', () => {
if (res.statusCode < 400 || res.statusCode >= 500) {
return;
// Use async IIFE to handle Redis operations
void (async () => {
const existing = await abuseStore.get(ip);

if (existing) {
existing.score = decayScore(existing.score, now - existing.lastSeenAt);
existing.lastSeenAt = now;

if (existing.blockedUntil > now) {
const retryAfter = Math.ceil((existing.blockedUntil - now) / 1000);
res.setHeader('Retry-After', retryAfter);
res.status(429).json({
error: 'Too many invalid requests',
status: 429,
message: 'Adaptive throttle activated due to repeated invalid requests.',
retryAfter,
});
return;
}
}

const state = abuseByIp.get(ip) || {
score: 0,
blockedUntil: 0,
lastSeenAt: now,
};

const current = Date.now();
state.score = decayScore(state.score, current - state.lastSeenAt);
state.lastSeenAt = current;
state.score += scoreForStatus(res.statusCode);

if (state.score >= SCORE_THRESHOLD) {
const multiplier = Math.max(1, Math.floor(state.score / SCORE_THRESHOLD));
const blockMs = Math.min(MAX_BLOCK_MS, BASE_BLOCK_MS * multiplier);
state.blockedUntil = current + blockMs;

logger.log('warn', 'Adaptive throttle triggered', {
ip,
score: Number(state.score.toFixed(2)),
blockMs,
path: req.path,
});
}

abuseByIp.set(ip, state);
});

next();
res.on('finish', () => {
void (async () => {
if (res.statusCode < 400 || res.statusCode >= 500) {
return;
}

const state = (await abuseStore.get(ip)) || {
score: 0,
blockedUntil: 0,
lastSeenAt: now,
};

const current = Date.now();
state.score = decayScore(state.score, current - state.lastSeenAt);
state.lastSeenAt = current;
state.score += scoreForStatus(res.statusCode);

if (state.score >= SCORE_THRESHOLD) {
const multiplier = Math.max(1, Math.floor(state.score / SCORE_THRESHOLD));
const blockMs = Math.min(MAX_BLOCK_MS, BASE_BLOCK_MS * multiplier);
state.blockedUntil = current + blockMs;

adaptiveThrottleBlockCount.inc({ using_redis: String(abuseStore.isUsingRedis()) });

logger.log('warn', 'Adaptive throttle triggered', {
ip,
score: Number(state.score.toFixed(2)),
blockMs,
path: req.path,
usingRedis: abuseStore.isUsingRedis(),
});
}

await abuseStore.set(ip, state);
})();
});

next();
})();
}

export function resetAdaptiveThrottleStateForTests(): void {
Expand Down
45 changes: 45 additions & 0 deletions backend/src/middleware/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,51 @@ export function cacheMiddleware(options: CacheOptions) {

// ── Invalidation ─────────────────────────────────────────────────────────────

type InvalidationHook = (eventType: string, metadata?: Record<string, unknown>) => string[];

const invalidationHooks: InvalidationHook[] = [];

/**
* Register a hook that returns patterns to invalidate when a write event occurs.
* Hooks receive the event type and optional metadata and return an array of cache key patterns.
*/
export function registerInvalidationHook(hook: InvalidationHook): void {
invalidationHooks.push(hook);
}

/**
* Trigger cache invalidation for a specific event type.
* All registered hooks are invoked and their returned patterns are invalidated.
*/
export function triggerCacheInvalidation(
eventType: string,
metadata?: Record<string, unknown>,
): { patternsInvalidated: string[]; keysRemoved: number } {
const patterns: string[] = [];

for (const hook of invalidationHooks) {
try {
const hookPatterns = hook(eventType, metadata);
patterns.push(...hookPatterns);
} catch (err) {
console.error(
JSON.stringify({
level: 'error',
event: 'invalidation_hook_error',
error: err instanceof Error ? err.message : String(err),
}),
);
}
}

let totalRemoved = 0;
for (const pattern of patterns) {
totalRemoved += invalidateCache(pattern);
}

return { patternsInvalidated: patterns, keysRemoved: totalRemoved };
}

export function invalidateCache(pattern?: string): number {
if (!pattern) {
const count = responseCache.size;
Expand Down
Loading
Loading