Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions backend/src/db/migrations/005_add_webhook_deliveries.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Migration: 005_add_webhook_deliveries (down)
-- Description: Remove webhook_deliveries table.

DROP INDEX IF EXISTS idx_webhook_deliveries_created;
DROP INDEX IF EXISTS idx_webhook_deliveries_user;
DROP INDEX IF EXISTS idx_webhook_deliveries_status;
DROP TABLE IF EXISTS webhook_deliveries;
18 changes: 18 additions & 0 deletions backend/src/db/migrations/005_add_webhook_deliveries.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- Migration: 005_add_webhook_deliveries (up)
-- Description: Add webhook_deliveries table for tracking delivery attempts and enabling retries.
-- Rollback: See 005_add_webhook_deliveries.down.sql

CREATE TABLE IF NOT EXISTS webhook_deliveries (
event_id VARCHAR(64) PRIMARY KEY,
user_id VARCHAR(256) NOT NULL,
event_type VARCHAR(64) NOT NULL,
url VARCHAR(1024) NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'delivered', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_status ON webhook_deliveries(status);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_user ON webhook_deliveries(user_id);
CREATE INDEX IF NOT EXISTS idx_webhook_deliveries_created ON webhook_deliveries(created_at DESC);
90 changes: 89 additions & 1 deletion backend/src/db/notificationDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,95 @@ export function dbDeleteNotificationPreferences(userId: string): boolean {
export function dbUpdateWebhookSecret(userId: string, secret: string): void {
ensureNotificationTable()
const db = getDb()

const now = new Date().toISOString()
db.prepare('UPDATE notification_preferences SET webhook_secret = ?, updated_at = ? WHERE user_id = ?').run(secret, now, userId)
}

// ─── Webhook Delivery Tracking ─────────────────────────────────────────────

export interface WebhookDeliveryRow {
event_id: string
user_id: string
event_type: string
url: string
status: string
attempts: number
last_attempt_at: string | null
created_at: string
}

export interface WebhookDeliveryRecord {
eventId: string
userId: string
eventType: string
url: string
status: 'pending' | 'delivered' | 'failed'
attempts: number
}

function ensureWebhookDeliveriesTable() {
const db = getDb()
db.exec(`
CREATE TABLE IF NOT EXISTS webhook_deliveries (
event_id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
url TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_attempt_at TEXT,
created_at TEXT NOT NULL
);
`)
}

export function dbRecordWebhookDelivery(record: WebhookDeliveryRecord): void {
ensureWebhookDeliveriesTable()
const db = getDb()
const now = new Date().toISOString()

db.prepare(`
INSERT OR REPLACE INTO webhook_deliveries
(event_id, user_id, event_type, url, status, attempts, last_attempt_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`).run(
record.eventId,
record.userId,
record.eventType,
record.url,
record.status,
record.attempts,
now,
now
)
}

export function dbUpdateWebhookDeliveryStatus(
eventId: string,
status: 'pending' | 'delivered' | 'failed',
attempts?: number
): void {
ensureWebhookDeliveriesTable()
const db = getDb()
const now = new Date().toISOString()

if (attempts !== undefined) {
db.prepare(
'UPDATE webhook_deliveries SET status = ?, attempts = ?, last_attempt_at = ? WHERE event_id = ?'
).run(status, attempts, now, eventId)
} else {
db.prepare(
'UPDATE webhook_deliveries SET status = ?, last_attempt_at = ? WHERE event_id = ?'
).run(status, now, eventId)
}
}

export function dbGetPendingWebhookDeliveries(): WebhookDeliveryRow[] {
ensureWebhookDeliveriesTable()
const db = getDb()

return db.prepare<[], WebhookDeliveryRow>(
"SELECT * FROM webhook_deliveries WHERE status = 'pending' ORDER BY created_at ASC"
).all()
}
5 changes: 4 additions & 1 deletion backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { startQueueScheduler } from './queue/scheduler.js'
import { startPortfolioCheckWorker, stopPortfolioCheckWorker } from './queue/workers/portfolioCheckWorker.js'
import { startRebalanceWorker, stopRebalanceWorker } from './queue/workers/rebalanceWorker.js'
import { startAnalyticsSnapshotWorker, stopAnalyticsSnapshotWorker } from './queue/workers/analyticsSnapshotWorker.js'
import { startWebhookDeliveryWorker, stopWebhookDeliveryWorker } from './queue/workers/webhookDeliveryWorker.js'
import { contractEventIndexerService } from './services/contractEventIndexer.js'

let startupConfig: StartupConfig
Expand Down Expand Up @@ -248,10 +249,11 @@ server.listen(port, async () => {
logQueueStartup(redisAvailable)

if (redisAvailable) {
// Start all three workers
// Start all workers
startPortfolioCheckWorker()
startRebalanceWorker()
startAnalyticsSnapshotWorker()
startWebhookDeliveryWorker()

// Register repeatable jobs (scheduler)
try {
Expand Down Expand Up @@ -324,6 +326,7 @@ const gracefulShutdown = async (signal: string) => {
stopPortfolioCheckWorker(),
stopRebalanceWorker(),
stopAnalyticsSnapshotWorker(),
stopWebhookDeliveryWorker(),
])
console.log('[SHUTDOWN] BullMQ workers stopped')
} catch (error) {
Expand Down
35 changes: 35 additions & 0 deletions backend/src/queue/queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const QUEUE_NAMES = {
PORTFOLIO_CHECK: 'portfolio-check',
REBALANCE: 'rebalance',
ANALYTICS_SNAPSHOT: 'analytics-snapshot',
WEBHOOK_DELIVERY: 'webhook-delivery',
} as const

// ─── Job Data Types ───────────────────────────────────────────────────────────
Expand All @@ -25,11 +26,20 @@ export interface AnalyticsSnapshotJobData {
triggeredBy?: 'scheduler' | 'manual' | 'startup'
}

export interface WebhookDeliveryJobData {
eventId: string
url: string
payload: any
webhookSecret?: string
attempt: number
}

// ─── Singleton Queues ─────────────────────────────────────────────────────────

let portfolioCheckQueue: Queue<PortfolioCheckJobData> | null = null
let rebalanceQueue: Queue<RebalanceJobData> | null = null
let analyticsSnapshotQueue: Queue<AnalyticsSnapshotJobData> | null = null
let webhookDeliveryQueue: Queue<WebhookDeliveryJobData> | null = null

function getDefaultJobOptions() {
return {
Expand Down Expand Up @@ -88,16 +98,41 @@ export function getAnalyticsSnapshotQueue(): Queue<AnalyticsSnapshotJobData> | n
}
}

export function getWebhookDeliveryQueue(): Queue<WebhookDeliveryJobData> | null {
try {
if (!webhookDeliveryQueue) {
webhookDeliveryQueue = new Queue(QUEUE_NAMES.WEBHOOK_DELIVERY, {
connection: getConnectionOptions(),
defaultJobOptions: {
removeOnComplete: { count: 50 },
removeOnFail: { count: 100 },
attempts: 4,
backoff: {
type: 'exponential' as const,
delay: 10000,
},
},
})
logger.info(`[QUEUE] Created queue: ${QUEUE_NAMES.WEBHOOK_DELIVERY}`)
}
return webhookDeliveryQueue
} catch {
return null
}
}

// ─── Graceful Close ───────────────────────────────────────────────────────────

export async function closeAllQueues(): Promise<void> {
await Promise.all([
portfolioCheckQueue?.close(),
rebalanceQueue?.close(),
analyticsSnapshotQueue?.close(),
webhookDeliveryQueue?.close(),
])
portfolioCheckQueue = null
rebalanceQueue = null
analyticsSnapshotQueue = null
webhookDeliveryQueue = null
logger.info('[QUEUE] All queues closed')
}
122 changes: 122 additions & 0 deletions backend/src/queue/workers/webhookDeliveryWorker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { Worker, Job } from 'bullmq'
import { getConnectionOptions } from '../connection.js'
import { dbUpdateWebhookDeliveryStatus } from '../../db/notificationDb.js'
import { logger } from '../../utils/logger.js'
import type { WebhookDeliveryJobData } from '../queues.js'

let worker: Worker | null = null

const TIMEOUT_MS = 5000

export async function processWebhookDeliveryJob(
job: Job<WebhookDeliveryJobData>
): Promise<void> {
const { eventId, url, payload, webhookSecret, attempt } = job.data

logger.info('[WORKER:webhook-delivery] Attempting webhook delivery', {
jobId: job.id,
eventId,
attempt: attempt + 1,
url,
})

try {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), TIMEOUT_MS)

const headers: Record<string, string> = {
'Content-Type': 'application/json',
'User-Agent': 'StellarPortfolioRebalancer/1.0',
'X-SentientFi-Event-Id': eventId,
}

if (webhookSecret) {
const { createHmac } = await import('crypto')
const timestamp = Math.floor(Date.now() / 1000).toString()
const payloadString = JSON.stringify(payload)
const signatureInput = `${timestamp}.${eventId}.${payloadString}`
const signature = createHmac('sha256', webhookSecret)
.update(signatureInput)
.digest('hex')
headers['X-Webhook-Signature'] = `sha256=${signature}`
headers['X-Webhook-Timestamp'] = timestamp
}

const response = await fetch(url, {
method: 'POST',
headers,
body: JSON.stringify(payload),
signal: controller.signal,
})

clearTimeout(timeoutId)

if (!response.ok) {
throw new Error(`Webhook returned status ${response.status}`)
}

dbUpdateWebhookDeliveryStatus(eventId, 'delivered', attempt + 1)

logger.info('[WORKER:webhook-delivery] Webhook delivered successfully', {
eventId,
attempt: attempt + 1,
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)

dbUpdateWebhookDeliveryStatus(eventId, 'failed', attempt + 1)

logger.error('[WORKER:webhook-delivery] Webhook delivery failed', {
eventId,
attempt: attempt + 1,
error: errorMessage,
})

// BullMQ will handle retry based on job options
throw error
}
}

export function startWebhookDeliveryWorker(): Worker | null {
try {
worker = new Worker('webhook-delivery', processWebhookDeliveryJob, {
connection: getConnectionOptions(),
concurrency: 3,
limiter: {
max: 10,
duration: 60000,
},
})

worker.on('completed', (job) => {
logger.info('[WORKER:webhook-delivery] Job completed', {
jobId: job.id,
eventId: job.data.eventId,
})
})

worker.on('failed', (job, err) => {
logger.error('[WORKER:webhook-delivery] Job failed', {
jobId: job?.id,
eventId: job?.data.eventId,
error: err.message,
})
})

logger.info('[WORKER:webhook-delivery] Worker started')
return worker
} catch (error) {
logger.error('[WORKER:webhook-delivery] Failed to start worker', {
error: error instanceof Error ? error.message : String(error),
})
return null
}
}

export async function stopWebhookDeliveryWorker(): Promise<void> {
if (worker) {
await worker.close()
worker = null
logger.info('[WORKER:webhook-delivery] Worker stopped')
}
}
Loading
Loading