diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index d32bff1..99b144b 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -53,6 +53,7 @@ export interface EventsServerOptions { contractAddresses?: ContractConfig[]; discordWebhookUrl?: string; webhookSecrets?: WebhookSecret[]; + apiKeys?: Array<{ key: string; name?: string }>; notificationAPI?: NotificationAPI | null; rateLimit?: RateLimitConfig; /** @@ -782,8 +783,27 @@ export function createEventsServer(options: EventsServerOptions): http.Server { return; } + function isValidApiKey(apiKey: string | undefined, allowedKeys: Array<{ key: string; name?: string }> | undefined): boolean { + if (!allowedKeys || allowedKeys.length === 0) { + // If no API keys are configured, allow unauthenticated access is allowed (for backward compatibility) + return true; + } + if (!apiKey) { + return false; + } + return allowedKeys.some(k => k.key === apiKey); + } + // Get notification delivery history endpoint if (req.method === 'GET' && req.url?.startsWith('/api/notifications/history')) { + // Check API key first + const apiKey = req.headers['x-api-key'] as string | undefined; + if (!isValidApiKey(apiKey, options.apiKeys)) { + res.writeHead(401, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Unauthorized: Invalid or missing API key' })); + return; + } + const url = new URL(req.url, 'http://localhost'); const limit = url.searchParams.get('limit') ? parseInt(url.searchParams.get('limit')!, 10) : undefined; const offset = url.searchParams.get('offset') ? parseInt(url.searchParams.get('offset')!, 10) : undefined; diff --git a/listener/src/config.ts b/listener/src/config.ts index a25292e..46eb975 100644 --- a/listener/src/config.ts +++ b/listener/src/config.ts @@ -112,6 +112,29 @@ function validateWebhookSecrets(value: unknown): WebhookSecret[] { }); } +function validateApiKeys(value: unknown): ApiKey[] { + if (!Array.isArray(value)) { + throw new ConfigError('API_KEYS must be a JSON array of key objects.'); + } + + return value.map((item, index) => { + if (typeof item !== 'object' || item === null) { + throw new ConfigError( + `API_KEYS[${index}] must be an object with key (and optional name).` + ); + } + + const key = (item as any).key; + const name = (item as any).name; + + if (typeof key !== 'string' || !key.trim()) { + throw new ConfigError(`API_KEYS[${index}].key must be a non-empty string.`); + } + + return { key: key.trim(), name: name?.trim() }; + }); +} + function loadCleanupConfig(): AppCleanupConfig { return { intervalMs: parseIntegerEnv('CLEANUP_INTERVAL_MS', String(60 * 60 * 1000)), @@ -186,6 +209,7 @@ export function loadConfig(): Config { pollIntervalMs: parseIntegerEnv('EVENT_QUEUE_POLL_INTERVAL_MS', '1000'), }, webhookSecrets: validateWebhookSecrets(rawWebhookSecrets), + apiKeys: validateApiKeys(rawApiKeys), scheduler: { enabled: trimEnv('SCHEDULER_ENABLED') !== 'false', pollIntervalMs: parseIntegerEnv('SCHEDULER_POLL_INTERVAL_MS', '10000'), diff --git a/listener/src/index.ts b/listener/src/index.ts index f7dd5ea..dabd4c8 100644 --- a/listener/src/index.ts +++ b/listener/src/index.ts @@ -131,6 +131,8 @@ async function main() { stellarNetworkPassphrase: config.stellarNetworkPassphrase, contractAddresses: config.contractAddresses, discordWebhookUrl: config.discord?.webhookUrl, + webhookSecrets: config.webhookSecrets, + apiKeys: config.apiKeys, notificationAPI, templateService, rateLimit: config.rateLimit, diff --git a/listener/src/services/event-processing-queue.ts b/listener/src/services/event-processing-queue.ts index ba44649..e83cb16 100644 --- a/listener/src/services/event-processing-queue.ts +++ b/listener/src/services/event-processing-queue.ts @@ -54,6 +54,15 @@ export class EventProcessingQueue { private timer: ReturnType | null = null; private priorityCounters: { high: number; medium: number; low: number } = { high: 0, medium: 0, low: 0 }; + // Metrics + private metrics = { + totalEnqueued: 0, + totalProcessed: 0, + totalSucceeded: 0, + totalFailed: 0, + processingTimes: [] as number[], + }; + constructor(processor: EventProcessor, options?: EventProcessingQueueOptions) { this.processor = processor; this.maxConcurrency = Math.max(1, options?.maxConcurrency ?? DEFAULTS.maxConcurrency); @@ -105,6 +114,7 @@ export class EventProcessingQueue { priority, enqueuedAt: Date.now(), }); + this.metrics.totalEnqueued++; return true; } @@ -192,13 +202,18 @@ export class EventProcessingQueue { private async processItem(item: QueuedEvent): Promise { this.activeFingerprints.add(item.fingerprint); + const startTime = Date.now(); try { const success = await this.processor(item.event, item.contractConfig, item.requestId); + const duration = Date.now() - startTime; if (success) { this.queuedFingerprints.delete(item.fingerprint); this.activeFingerprints.delete(item.fingerprint); + this.metrics.totalProcessed++; + this.metrics.totalSucceeded++; + this.metrics.processingTimes.push(duration); logger.info('Event processing succeeded', { requestId: item.requestId, eventId: item.event.id, @@ -212,6 +227,9 @@ export class EventProcessingQueue { if (attempt >= this.maxRetries) { this.queuedFingerprints.delete(item.fingerprint); this.activeFingerprints.delete(item.fingerprint); + this.metrics.totalProcessed++; + this.metrics.totalFailed++; + this.metrics.processingTimes.push(duration); logger.error('Event processing permanently failed after max retries', { requestId: item.requestId, eventId: item.event.id, @@ -237,11 +255,15 @@ export class EventProcessingQueue { this.queue.push({ ...item, retryCount: attempt, nextRetryAt }); } catch (error) { this.activeFingerprints.delete(item.fingerprint); + const duration = Date.now() - startTime; const attempt = item.retryCount + 1; if (attempt >= this.maxRetries) { this.queuedFingerprints.delete(item.fingerprint); + this.metrics.totalProcessed++; + this.metrics.totalFailed++; + this.metrics.processingTimes.push(duration); logger.error('Event processing crashed after max retries', { requestId: item.requestId, eventId: item.event.id, @@ -268,6 +290,24 @@ export class EventProcessingQueue { } } + getMetrics() { + const times = this.metrics.processingTimes; + const avg = times.length > 0 ? times.reduce((a, b) => a + b, 0) / times.length : 0; + const min = times.length > 0 ? Math.min(...times) : 0; + const max = times.length > 0 ? Math.max(...times) : 0; + + return { + queueSize: this.queue.length, + activeCount: this.activeFingerprints.size, + ...this.metrics, + processingTime: { + min, + max, + avg, + }, + }; + } + private calculateDelay(retryCount: number): number { return this.baseDelayMs * Math.pow(2, retryCount); } diff --git a/listener/src/services/event-subscriber.ts b/listener/src/services/event-subscriber.ts index 9569ace..c4d98d9 100644 --- a/listener/src/services/event-subscriber.ts +++ b/listener/src/services/event-subscriber.ts @@ -364,4 +364,11 @@ export class EventSubscriber { private delay(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } + + getQueueMetrics() { + return { + eventQueue: this.eventQueue?.getMetrics() || null, + retryQueue: this.retryQueue?.getMetrics() || null, + }; + } } diff --git a/listener/src/services/notification-retry-queue.ts b/listener/src/services/notification-retry-queue.ts index 6c66e9b..39fc817 100644 --- a/listener/src/services/notification-retry-queue.ts +++ b/listener/src/services/notification-retry-queue.ts @@ -59,6 +59,15 @@ export class NotificationRetryQueue { private readonly analytics: NotificationAnalyticsAggregator | null; private priorityCounters: { high: number; medium: number; low: number } = { high: 0, medium: 0, low: 0 }; + // Metrics + private metrics = { + totalEnqueued: 0, + totalProcessed: 0, + totalSucceeded: 0, + totalFailed: 0, + processingTimes: [] as number[], + }; + constructor(notificationFn: NotificationFn, options?: RetryQueueOptions) { this.notificationFn = notificationFn; this.baseDelayMs = options?.baseDelayMs ?? DEFAULTS.baseDelayMs; @@ -102,6 +111,8 @@ export class NotificationRetryQueue { }); this.queuedFingerprints.add(fingerprint); + this.queue.push({ event, contractConfig, retryCount: 0, nextRetryAt, requestId }); + this.metrics.totalEnqueued++; this.queue.push({ event, contractConfig, retryCount: 0, nextRetryAt, requestId, priority, enqueuedAt: Date.now() }); } @@ -184,9 +195,13 @@ export class NotificationRetryQueue { }); const success = await this.notificationFn(item.event, item.contractConfig, item.requestId); + const duration = Date.now() - retryStart; if (success) { this.queuedFingerprints.delete(fingerprint); + this.metrics.totalProcessed++; + this.metrics.totalSucceeded++; + this.metrics.processingTimes.push(duration); this.analytics?.record({ notificationType: NotificationType.DISCORD, contractAddress: item.contractConfig.address, @@ -205,11 +220,14 @@ export class NotificationRetryQueue { if (attempt >= this.maxRetries) { this.queuedFingerprints.delete(fingerprint); + this.metrics.totalProcessed++; + this.metrics.totalFailed++; + this.metrics.processingTimes.push(duration); this.analytics?.record({ notificationType: NotificationType.DISCORD, contractAddress: item.contractConfig.address, outcome: 'failure', - durationMs: Date.now() - retryStart, + durationMs: duration, errorReason: `exhausted ${this.maxRetries} retries`, timestamp: Date.now(), }); @@ -237,6 +255,23 @@ export class NotificationRetryQueue { this.queue.push({ ...item, retryCount: attempt, nextRetryAt }); } + getMetrics() { + const times = this.metrics.processingTimes; + const avg = times.length > 0 ? times.reduce((a, b) => a + b, 0) / times.length : 0; + const min = times.length > 0 ? Math.min(...times) : 0; + const max = times.length > 0 ? Math.max(...times) : 0; + + return { + queueSize: this.queue.length, + ...this.metrics, + processingTime: { + min, + max, + avg, + }, + }; + } + private calculateDelay(retryCount: number): number { const base = this.baseDelayMs * Math.pow(this.multiplier, retryCount); return this.jitter ? base * (0.5 + Math.random() * 0.5) : base; diff --git a/listener/src/types/index.ts b/listener/src/types/index.ts index 62e40a3..17783ed 100644 --- a/listener/src/types/index.ts +++ b/listener/src/types/index.ts @@ -33,6 +33,11 @@ export interface RateLimitConfig { clientOverrides: Record; } +export interface ApiKey { + key: string; + name?: string; +} + export interface Config { stellarNetwork: string; stellarRpcUrl: string; @@ -47,6 +52,7 @@ export interface Config { retryQueue?: RetryQueueConfig; eventQueue?: EventQueueConfig; webhookSecrets?: WebhookSecret[]; + apiKeys?: ApiKey[]; scheduler?: SchedulerConfig; retryScheduler?: RetrySchedulerOptions; databasePath?: string;