From cc71999ea6a6cc8b5b38eb59597b531a44a704f1 Mon Sep 17 00:00:00 2001 From: Bright CLI Date: Mon, 29 Jun 2026 11:17:44 +0100 Subject: [PATCH 1/3] feat: add webhook delivery persistence, idempotency keys, and retry queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Webhook notifications were delivered fire-and-forget with no persistence, no retry, and no idempotency key. If the consumer endpoint timed out, returned 5xx, or the backend restarted mid-delivery, the event was permanently lost. Changes: - Add eventId (UUID v4) to NotificationPayload for consumer-side deduplication and included it in HMAC signature input - Add webhook_deliveries table to track delivery attempts with status - Persist delivery attempts before and after sending - Add BullMQ webhook-delivery queue with exponential backoff retries (4 attempts: 10s → 20s → 40s → 80s) - Create webhookDeliveryWorker with rate limiting (10/min) - Include X-SentientFi-Event-Id header in webhook requests Fixes #30 --- .../005_add_webhook_deliveries.down.sql | 7 + .../005_add_webhook_deliveries.up.sql | 18 +++ backend/src/db/notificationDb.ts | 90 ++++++++++++- backend/src/index.ts | 5 +- backend/src/queue/queues.ts | 35 +++++ .../queue/workers/webhookDeliveryWorker.ts | 122 ++++++++++++++++++ backend/src/services/notificationService.ts | 51 +++++++- 7 files changed, 323 insertions(+), 5 deletions(-) create mode 100644 backend/src/db/migrations/005_add_webhook_deliveries.down.sql create mode 100644 backend/src/db/migrations/005_add_webhook_deliveries.up.sql create mode 100644 backend/src/queue/workers/webhookDeliveryWorker.ts diff --git a/backend/src/db/migrations/005_add_webhook_deliveries.down.sql b/backend/src/db/migrations/005_add_webhook_deliveries.down.sql new file mode 100644 index 0000000..6325c6e --- /dev/null +++ b/backend/src/db/migrations/005_add_webhook_deliveries.down.sql @@ -0,0 +1,7 @@ +-- Migration: 005_add_webhook_deliveries (down) +-- Description: Remove webhook_deliveries table. + +DROP INDEX IF EXISTS idx_webhook_deliveries_created; +DROP INDEX IF EXISTS idx_webhook_deliveries_user; +DROP INDEX IF EXISTS idx_webhook_deliveries_status; +DROP TABLE IF EXISTS webhook_deliveries; diff --git a/backend/src/db/migrations/005_add_webhook_deliveries.up.sql b/backend/src/db/migrations/005_add_webhook_deliveries.up.sql new file mode 100644 index 0000000..0cb5ed7 --- /dev/null +++ b/backend/src/db/migrations/005_add_webhook_deliveries.up.sql @@ -0,0 +1,18 @@ +-- Migration: 005_add_webhook_deliveries (up) +-- Description: Add webhook_deliveries table for tracking delivery attempts and enabling retries. +-- Rollback: See 005_add_webhook_deliveries.down.sql + +CREATE TABLE IF NOT EXISTS webhook_deliveries ( + event_id VARCHAR(64) PRIMARY KEY, + user_id VARCHAR(256) NOT NULL, + event_type VARCHAR(64) NOT NULL, + url VARCHAR(1024) NOT NULL, + status VARCHAR(32) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'delivered', 'failed')), + attempts INTEGER NOT NULL DEFAULT 0, + last_attempt_at TIMESTAMPTZ, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status ON webhook_deliveries(status); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_user ON webhook_deliveries(user_id); +CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_created ON webhook_deliveries(created_at DESC); diff --git a/backend/src/db/notificationDb.ts b/backend/src/db/notificationDb.ts index 5954944..ce86e0f 100644 --- a/backend/src/db/notificationDb.ts +++ b/backend/src/db/notificationDb.ts @@ -150,7 +150,95 @@ export function dbDeleteNotificationPreferences(userId: string): boolean { export function dbUpdateWebhookSecret(userId: string, secret: string): void { ensureNotificationTable() const db = getDb() - + const now = new Date().toISOString() db.prepare('UPDATE notification_preferences SET webhook_secret = ?, updated_at = ? WHERE user_id = ?').run(secret, now, userId) } + +// ─── Webhook Delivery Tracking ───────────────────────────────────────────── + +export interface WebhookDeliveryRow { + event_id: string + user_id: string + event_type: string + url: string + status: string + attempts: number + last_attempt_at: string | null + created_at: string +} + +export interface WebhookDeliveryRecord { + eventId: string + userId: string + eventType: string + url: string + status: 'pending' | 'delivered' | 'failed' + attempts: number +} + +function ensureWebhookDeliveriesTable() { + const db = getDb() + db.exec(` + CREATE TABLE IF NOT EXISTS webhook_deliveries ( + event_id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + event_type TEXT NOT NULL, + url TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + last_attempt_at TEXT, + created_at TEXT NOT NULL + ); + `) +} + +export function dbRecordWebhookDelivery(record: WebhookDeliveryRecord): void { + ensureWebhookDeliveriesTable() + const db = getDb() + const now = new Date().toISOString() + + db.prepare(` + INSERT OR REPLACE INTO webhook_deliveries + (event_id, user_id, event_type, url, status, attempts, last_attempt_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `).run( + record.eventId, + record.userId, + record.eventType, + record.url, + record.status, + record.attempts, + now, + now + ) +} + +export function dbUpdateWebhookDeliveryStatus( + eventId: string, + status: 'pending' | 'delivered' | 'failed', + attempts?: number +): void { + ensureWebhookDeliveriesTable() + const db = getDb() + const now = new Date().toISOString() + + if (attempts !== undefined) { + db.prepare( + 'UPDATE webhook_deliveries SET status = ?, attempts = ?, last_attempt_at = ? WHERE event_id = ?' + ).run(status, attempts, now, eventId) + } else { + db.prepare( + 'UPDATE webhook_deliveries SET status = ?, last_attempt_at = ? WHERE event_id = ?' + ).run(status, now, eventId) + } +} + +export function dbGetPendingWebhookDeliveries(): WebhookDeliveryRow[] { + ensureWebhookDeliveriesTable() + const db = getDb() + + return db.prepare<[], WebhookDeliveryRow>( + "SELECT * FROM webhook_deliveries WHERE status = 'pending' ORDER BY created_at ASC" + ).all() +} diff --git a/backend/src/index.ts b/backend/src/index.ts index aac0d31..4f3d071 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -19,6 +19,7 @@ import { startQueueScheduler } from './queue/scheduler.js' import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js' import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js' import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js' +import { startWebhookDeliveryWorker, stopWebhookDeliveryWorker } from './queue/workers/webhookDeliveryWorker.js' import { contractEventIndexerService } from './services/contractEventIndexer.js' let startupConfig: StartupConfig @@ -248,10 +249,11 @@ server.listen(port, async () => { logQueueStartup(redisAvailable) if (redisAvailable) { - // Start all three workers + // Start all workers startPortfolioCheckWorker() startRebalanceWorker() startAnalyticsSnapshotWorker() + startWebhookDeliveryWorker() // Register repeatable jobs (scheduler) try { @@ -324,6 +326,7 @@ const gracefulShutdown = async (signal: string) => { stopPortfolioCheckWorker(), stopRebalanceWorker(), stopAnalyticsSnapshotWorker(), + stopWebhookDeliveryWorker(), ]) console.log('[SHUTDOWN] BullMQ workers stopped') } catch (error) { diff --git a/backend/src/queue/queues.ts b/backend/src/queue/queues.ts index c6cb9fc..45a1350 100644 --- a/backend/src/queue/queues.ts +++ b/backend/src/queue/queues.ts @@ -8,6 +8,7 @@ export const QUEUE_NAMES = { PORTFOLIO_CHECK: 'portfolio-check', REBALANCE: 'rebalance', ANALYTICS_SNAPSHOT: 'analytics-snapshot', + WEBHOOK_DELIVERY: 'webhook-delivery', } as const // ─── Job Data Types ─────────────────────────────────────────────────────────── @@ -25,11 +26,20 @@ export interface AnalyticsSnapshotJobData { triggeredBy?: 'scheduler' | 'manual' | 'startup' } +export interface WebhookDeliveryJobData { + eventId: string + url: string + payload: any + webhookSecret?: string + attempt: number +} + // ─── Singleton Queues ───────────────────────────────────────────────────────── let portfolioCheckQueue: Queue | null = null let rebalanceQueue: Queue | null = null let analyticsSnapshotQueue: Queue | null = null +let webhookDeliveryQueue: Queue | null = null function getDefaultJobOptions() { return { @@ -88,6 +98,29 @@ export function getAnalyticsSnapshotQueue(): Queue | n } } +export function getWebhookDeliveryQueue(): Queue | null { + try { + if (!webhookDeliveryQueue) { + webhookDeliveryQueue = new Queue(QUEUE_NAMES.WEBHOOK_DELIVERY, { + connection: getConnectionOptions(), + defaultJobOptions: { + removeOnComplete: { count: 50 }, + removeOnFail: { count: 100 }, + attempts: 4, + backoff: { + type: 'exponential' as const, + delay: 10000, + }, + }, + }) + logger.info(`[QUEUE] Created queue: ${QUEUE_NAMES.WEBHOOK_DELIVERY}`) + } + return webhookDeliveryQueue + } catch { + return null + } +} + // ─── Graceful Close ─────────────────────────────────────────────────────────── export async function closeAllQueues(): Promise { @@ -95,9 +128,11 @@ export async function closeAllQueues(): Promise { portfolioCheckQueue?.close(), rebalanceQueue?.close(), analyticsSnapshotQueue?.close(), + webhookDeliveryQueue?.close(), ]) portfolioCheckQueue = null rebalanceQueue = null analyticsSnapshotQueue = null + webhookDeliveryQueue = null logger.info('[QUEUE] All queues closed') } diff --git a/backend/src/queue/workers/webhookDeliveryWorker.ts b/backend/src/queue/workers/webhookDeliveryWorker.ts new file mode 100644 index 0000000..a75effc --- /dev/null +++ b/backend/src/queue/workers/webhookDeliveryWorker.ts @@ -0,0 +1,122 @@ +import { Worker, Job } from 'bullmq' +import { getConnectionOptions } from '../connection.js' +import { dbUpdateWebhookDeliveryStatus } from '../../db/notificationDb.js' +import { logger } from '../../utils/logger.js' +import type { WebhookDeliveryJobData } from '../queues.js' + +let worker: Worker | null = null + +const TIMEOUT_MS = 5000 + +export async function processWebhookDeliveryJob( + job: Job +): Promise { + const { eventId, url, payload, webhookSecret, attempt } = job.data + + logger.info('[WORKER:webhook-delivery] Attempting webhook delivery', { + jobId: job.id, + eventId, + attempt: attempt + 1, + url, + }) + + try { + const controller = new AbortController() + const timeoutId = setTimeout(() => controller.abort(), TIMEOUT_MS) + + const headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': 'StellarPortfolioRebalancer/1.0', + 'X-SentientFi-Event-Id': eventId, + } + + if (webhookSecret) { + const { createHmac } = await import('crypto') + const timestamp = Math.floor(Date.now() / 1000).toString() + const payloadString = JSON.stringify(payload) + const signatureInput = `${timestamp}.${eventId}.${payloadString}` + const signature = createHmac('sha256', webhookSecret) + .update(signatureInput) + .digest('hex') + headers['X-Webhook-Signature'] = `sha256=${signature}` + headers['X-Webhook-Timestamp'] = timestamp + } + + const response = await fetch(url, { + method: 'POST', + headers, + body: JSON.stringify(payload), + signal: controller.signal, + }) + + clearTimeout(timeoutId) + + if (!response.ok) { + throw new Error(`Webhook returned status ${response.status}`) + } + + dbUpdateWebhookDeliveryStatus(eventId, 'delivered', attempt + 1) + + logger.info('[WORKER:webhook-delivery] Webhook delivered successfully', { + eventId, + attempt: attempt + 1, + }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error) + + dbUpdateWebhookDeliveryStatus(eventId, 'failed', attempt + 1) + + logger.error('[WORKER:webhook-delivery] Webhook delivery failed', { + eventId, + attempt: attempt + 1, + error: errorMessage, + }) + + // BullMQ will handle retry based on job options + throw error + } +} + +export function startWebhookDeliveryWorker(): Worker | null { + try { + worker = new Worker('webhook-delivery', processWebhookDeliveryJob, { + connection: getConnectionOptions(), + concurrency: 3, + limiter: { + max: 10, + duration: 60000, + }, + }) + + worker.on('completed', (job) => { + logger.info('[WORKER:webhook-delivery] Job completed', { + jobId: job.id, + eventId: job.data.eventId, + }) + }) + + worker.on('failed', (job, err) => { + logger.error('[WORKER:webhook-delivery] Job failed', { + jobId: job?.id, + eventId: job?.data.eventId, + error: err.message, + }) + }) + + logger.info('[WORKER:webhook-delivery] Worker started') + return worker + } catch (error) { + logger.error('[WORKER:webhook-delivery] Failed to start worker', { + error: error instanceof Error ? error.message : String(error), + }) + return null + } +} + +export async function stopWebhookDeliveryWorker(): Promise { + if (worker) { + await worker.close() + worker = null + logger.info('[WORKER:webhook-delivery] Worker stopped') + } +} diff --git a/backend/src/services/notificationService.ts b/backend/src/services/notificationService.ts index a2de156..478d238 100644 --- a/backend/src/services/notificationService.ts +++ b/backend/src/services/notificationService.ts @@ -1,11 +1,16 @@ +import { randomUUID } from "crypto"; import { logger } from "../utils/logger.js"; import { dbSaveNotificationPreferences, dbGetNotificationPreferences, dbGetAllNotificationPreferences, dbUpdateWebhookSecret, + dbRecordWebhookDelivery, + dbUpdateWebhookDeliveryStatus, + dbGetPendingWebhookDeliveries, type NotificationPreferences, } from "../db/notificationDb.js"; +import { getWebhookDeliveryQueue } from "../queue/queues.js"; import nodemailer from "nodemailer"; import { createHmac, timingSafeEqual, randomBytes } from "crypto"; @@ -14,6 +19,7 @@ import { createHmac, timingSafeEqual, randomBytes } from "crypto"; // ───────────────────────────────────────────── export interface NotificationPayload { + eventId: string; userId: string; eventType: "rebalance" | "circuitBreaker" | "priceMovement" | "riskChange"; title: string; @@ -40,7 +46,7 @@ interface NotificationProvider { function signPayload(payload: any, secret: string): { signature: string; timestamp: string } { const timestamp = Math.floor(Date.now() / 1000).toString(); const payloadString = JSON.stringify(payload); - const signatureInput = `${timestamp}.${payloadString}`; + const signatureInput = `${timestamp}.${payload.eventId}.${payloadString}`; const signature = createHmac('sha256', secret) .update(signatureInput) .digest('hex'); @@ -62,7 +68,7 @@ function verifyWebhookSignature( // Compute expected signature const payloadString = JSON.stringify(payload); - const signatureInput = `${timestamp}.${payloadString}`; + const signatureInput = `${timestamp}.${payload.eventId}.${payloadString}`; const expectedSignature = createHmac('sha256', secret) .update(signatureInput) .digest('hex'); @@ -94,6 +100,7 @@ class WebhookProvider implements NotificationProvider { } const webhookPayload = { + eventId: payload.eventId, event: payload.eventType, title: payload.title, message: payload.message, @@ -102,7 +109,40 @@ class WebhookProvider implements NotificationProvider { userId: payload.userId, }; - await this.sendWithRetry(preferences.webhookUrl, webhookPayload, 0, preferences.webhookSecret); + // Persist delivery attempt before sending + dbRecordWebhookDelivery({ + eventId: payload.eventId, + userId: payload.userId, + eventType: payload.eventType, + url: preferences.webhookUrl, + status: 'pending', + attempts: 0, + }); + + try { + await this.sendWithRetry(preferences.webhookUrl, webhookPayload, 0, preferences.webhookSecret); + dbUpdateWebhookDeliveryStatus(payload.eventId, 'delivered'); + } catch (error) { + dbUpdateWebhookDeliveryStatus(payload.eventId, 'failed'); + // Enqueue for async retry via BullMQ + const queue = getWebhookDeliveryQueue(); + if (queue) { + await queue.add('webhook-retry', { + eventId: payload.eventId, + url: preferences.webhookUrl, + payload: webhookPayload, + webhookSecret: preferences.webhookSecret, + attempt: 0, + }, { + attempts: 4, + backoff: { type: 'exponential', delay: 10000 }, + }); + logger.info("Webhook delivery enqueued for retry", { + eventId: payload.eventId, + }); + } + throw error; + } } private async sendWithRetry( @@ -393,6 +433,11 @@ export class NotificationService { * Send notification to user */ async notify(payload: NotificationPayload): Promise { + // Ensure every notification has a stable event ID for deduplication + if (!payload.eventId) { + payload.eventId = randomUUID(); + } + const preferences = this.getPreferences(payload.userId); if (!preferences) { logger.info("No notification preferences found for user", { From 432def333e2252fe73cf6a2a00b4485cea967614 Mon Sep 17 00:00:00 2001 From: Bright CLI Date: Mon, 29 Jun 2026 15:05:39 +0100 Subject: [PATCH 2/3] fix: make eventId optional in NotificationPayload type The notify() method already auto-generates eventId via randomUUID() when not provided. Making it optional in the type definition allows existing callers (like rebalanceWorker) to work without changes. --- backend/src/services/notificationService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/services/notificationService.ts b/backend/src/services/notificationService.ts index 478d238..233126c 100644 --- a/backend/src/services/notificationService.ts +++ b/backend/src/services/notificationService.ts @@ -19,7 +19,7 @@ import { createHmac, timingSafeEqual, randomBytes } from "crypto"; // ───────────────────────────────────────────── export interface NotificationPayload { - eventId: string; + eventId?: string; userId: string; eventType: "rebalance" | "circuitBreaker" | "priceMovement" | "riskChange"; title: string; From 59ec8a2d7acc97ae38fff49dda62b7785f344c1a Mon Sep 17 00:00:00 2001 From: Bright CLI Date: Mon, 29 Jun 2026 15:12:39 +0100 Subject: [PATCH 3/3] fix: use non-null assertion for eventId in WebhookProvider eventId is guaranteed set by notify() before WebhookProvider.send() is called. Use non-null assertion to satisfy TypeScript while keeping eventId optional in the public interface. --- backend/src/services/notificationService.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/backend/src/services/notificationService.ts b/backend/src/services/notificationService.ts index 233126c..d0ee381 100644 --- a/backend/src/services/notificationService.ts +++ b/backend/src/services/notificationService.ts @@ -110,8 +110,10 @@ class WebhookProvider implements NotificationProvider { }; // Persist delivery attempt before sending + // eventId is guaranteed set by notify() before this method is called + const eventId = payload.eventId!; dbRecordWebhookDelivery({ - eventId: payload.eventId, + eventId, userId: payload.userId, eventType: payload.eventType, url: preferences.webhookUrl, @@ -121,14 +123,14 @@ class WebhookProvider implements NotificationProvider { try { await this.sendWithRetry(preferences.webhookUrl, webhookPayload, 0, preferences.webhookSecret); - dbUpdateWebhookDeliveryStatus(payload.eventId, 'delivered'); + dbUpdateWebhookDeliveryStatus(eventId, 'delivered'); } catch (error) { - dbUpdateWebhookDeliveryStatus(payload.eventId, 'failed'); + dbUpdateWebhookDeliveryStatus(eventId, 'failed'); // Enqueue for async retry via BullMQ const queue = getWebhookDeliveryQueue(); if (queue) { await queue.add('webhook-retry', { - eventId: payload.eventId, + eventId, url: preferences.webhookUrl, payload: webhookPayload, webhookSecret: preferences.webhookSecret,