From 281376d0633f8b0d644258113b41c3cc82cea415 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:50:36 -0600 Subject: [PATCH 1/8] feat: add webhook event registry and HMAC signature helper Introduce the foundational primitives the webhook subsystem will build on: the catalogue of pool/price event types (with wildcard support) and a timing-safe HMAC-SHA256 sign/verify helper plus secret generator. --- src/services/webhookEvents.js | 41 +++++++++++++++++++++++++ src/services/webhookSignature.js | 36 ++++++++++++++++++++++ test/webhookEvents.test.js | 38 +++++++++++++++++++++++ test/webhookSignature.test.js | 52 ++++++++++++++++++++++++++++++++ 4 files changed, 167 insertions(+) create mode 100644 src/services/webhookEvents.js create mode 100644 src/services/webhookSignature.js create mode 100644 test/webhookEvents.test.js create mode 100644 test/webhookSignature.test.js diff --git a/src/services/webhookEvents.js b/src/services/webhookEvents.js new file mode 100644 index 0000000..7b05280 --- /dev/null +++ b/src/services/webhookEvents.js @@ -0,0 +1,41 @@ +'use strict'; + +const POOL_EVENTS = Object.freeze([ + 'pool.created', + 'pool.assets_locked', + 'pool.assets_unlocked', + 'pool.rewards_distributed', + 'pool.closed', +]); + +const PRICE_EVENTS = Object.freeze(['price.alert']); + +const ALL_EVENTS = Object.freeze([...POOL_EVENTS, ...PRICE_EVENTS]); +const EVENT_SET = new Set(ALL_EVENTS); + +const WILDCARD = '*'; + +function isKnownEvent(eventType) { + return typeof eventType === 'string' && EVENT_SET.has(eventType); +} + +function isValidSubscription(events) { + if (!Array.isArray(events) || events.length === 0) return false; + return events.every((e) => e === WILDCARD || EVENT_SET.has(e)); +} + +function matchesSubscription(subscribedEvents, eventType) { + if (!Array.isArray(subscribedEvents) || subscribedEvents.length === 0) return false; + if (subscribedEvents.includes(WILDCARD)) return true; + return subscribedEvents.includes(eventType); +} + +module.exports = { + POOL_EVENTS, + PRICE_EVENTS, + ALL_EVENTS, + WILDCARD, + isKnownEvent, + isValidSubscription, + matchesSubscription, +}; diff --git a/src/services/webhookSignature.js b/src/services/webhookSignature.js new file mode 100644 index 0000000..867ab8c --- /dev/null +++ b/src/services/webhookSignature.js @@ -0,0 +1,36 @@ +'use strict'; + +const crypto = require('crypto'); + +const SIGNATURE_PREFIX = 'sha256='; + +function sign(secret, body) { + if (typeof secret !== 'string' || secret.length === 0) { + throw new Error('signature secret must be a non-empty string'); + } + const payload = typeof body === 'string' ? body : JSON.stringify(body); + const digest = crypto.createHmac('sha256', secret).update(payload).digest('hex'); + return `${SIGNATURE_PREFIX}${digest}`; +} + +function verify(secret, body, providedSignature) { + if (typeof providedSignature !== 'string' || !providedSignature.startsWith(SIGNATURE_PREFIX)) { + return false; + } + let expected; + try { + expected = sign(secret, body); + } catch { + return false; + } + const a = Buffer.from(expected); + const b = Buffer.from(providedSignature); + if (a.length !== b.length) return false; + return crypto.timingSafeEqual(a, b); +} + +function generateSecret(bytes = 32) { + return `whsec_${crypto.randomBytes(bytes).toString('hex')}`; +} + +module.exports = { sign, verify, generateSecret, SIGNATURE_PREFIX }; diff --git a/test/webhookEvents.test.js b/test/webhookEvents.test.js new file mode 100644 index 0000000..c174077 --- /dev/null +++ b/test/webhookEvents.test.js @@ -0,0 +1,38 @@ +'use strict'; + +const events = require('../src/services/webhookEvents'); + +describe('webhook event registry', () => { + test('pool.assets_locked is a known event', () => { + expect(events.isKnownEvent('pool.assets_locked')).toBe(true); + }); + + test('unknown event types are rejected', () => { + expect(events.isKnownEvent('something.random')).toBe(false); + }); + + test('isValidSubscription accepts a non-empty array of known events', () => { + expect(events.isValidSubscription(['pool.assets_locked'])).toBe(true); + expect(events.isValidSubscription(['pool.assets_locked', 'pool.closed'])).toBe(true); + }); + + test('isValidSubscription accepts wildcard', () => { + expect(events.isValidSubscription(['*'])).toBe(true); + }); + + test('isValidSubscription rejects empty arrays and bad inputs', () => { + expect(events.isValidSubscription([])).toBe(false); + expect(events.isValidSubscription(null)).toBe(false); + expect(events.isValidSubscription(['nope'])).toBe(false); + }); + + test('matchesSubscription exact match', () => { + expect(events.matchesSubscription(['pool.assets_locked'], 'pool.assets_locked')).toBe(true); + expect(events.matchesSubscription(['pool.closed'], 'pool.assets_locked')).toBe(false); + }); + + test('matchesSubscription wildcard subscribes to all', () => { + expect(events.matchesSubscription(['*'], 'pool.assets_locked')).toBe(true); + expect(events.matchesSubscription(['*'], 'price.alert')).toBe(true); + }); +}); diff --git a/test/webhookSignature.test.js b/test/webhookSignature.test.js new file mode 100644 index 0000000..4008bb8 --- /dev/null +++ b/test/webhookSignature.test.js @@ -0,0 +1,52 @@ +'use strict'; + +const signature = require('../src/services/webhookSignature'); + +describe('webhook signature', () => { + const secret = 'whsec_test_supersecret_value'; + const body = JSON.stringify({ event: 'pool.assets_locked', amount: 42 }); + + test('sign produces a sha256= prefixed hex string', () => { + const sig = signature.sign(secret, body); + expect(sig).toMatch(/^sha256=[0-9a-f]{64}$/); + }); + + test('verify returns true for matching body and signature', () => { + const sig = signature.sign(secret, body); + expect(signature.verify(secret, body, sig)).toBe(true); + }); + + test('verify returns false when body is tampered', () => { + const sig = signature.sign(secret, body); + const tampered = body.replace('42', '43'); + expect(signature.verify(secret, tampered, sig)).toBe(false); + }); + + test('verify returns false when signature is tampered', () => { + const sig = signature.sign(secret, body); + const tampered = sig.replace(/.$/, sig.endsWith('a') ? 'b' : 'a'); + expect(signature.verify(secret, body, tampered)).toBe(false); + }); + + test('verify returns false when signature lacks the prefix', () => { + const sig = signature.sign(secret, body).replace('sha256=', ''); + expect(signature.verify(secret, body, sig)).toBe(false); + }); + + test('verify returns false for empty/wrong secret', () => { + const sig = signature.sign(secret, body); + expect(signature.verify('other_secret_value', body, sig)).toBe(false); + }); + + test('generateSecret produces a whsec_-prefixed token', () => { + const s = signature.generateSecret(); + expect(s).toMatch(/^whsec_[0-9a-f]{64}$/); + }); + + test('sign accepts objects by stringifying them', () => { + const obj = { a: 1, b: 'two' }; + const sigFromObj = signature.sign(secret, obj); + const sigFromStr = signature.sign(secret, JSON.stringify(obj)); + expect(sigFromObj).toBe(sigFromStr); + }); +}); From 39a12b4f8afeae7b6cf23304d1f1771debedb67a Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:50:47 -0600 Subject: [PATCH 2/8] feat: add webhook and delivery repositories Model the future PostgreSQL schema for webhooks and webhook_deliveries behind a repository abstraction backed by Redis today. The SQL DDL is documented inline so migrating to Postgres only requires swapping the repository implementation. Includes a reusable in-memory cache mock so the existing Jest pattern keeps working across the new test files. --- src/repositories/deliveryRepository.js | 114 +++++++++++++++++++++++++ src/repositories/webhookRepository.js | 114 +++++++++++++++++++++++++ test/helpers/cacheMock.js | 99 +++++++++++++++++++++ test/webhookRepository.test.js | 78 +++++++++++++++++ 4 files changed, 405 insertions(+) create mode 100644 src/repositories/deliveryRepository.js create mode 100644 src/repositories/webhookRepository.js create mode 100644 test/helpers/cacheMock.js create mode 100644 test/webhookRepository.test.js diff --git a/src/repositories/deliveryRepository.js b/src/repositories/deliveryRepository.js new file mode 100644 index 0000000..e897f40 --- /dev/null +++ b/src/repositories/deliveryRepository.js @@ -0,0 +1,114 @@ +'use strict'; + +/** + * Webhook delivery log repository. + * + * Schema mirrors the future PostgreSQL `webhook_deliveries` table: + * + * webhook_deliveries ( + * id text primary key, + * webhook_id text not null references webhooks(id) on delete cascade, + * event_id text not null, + * event_type text not null, + * status text not null, -- pending | success | failed + * attempts int not null default 0, + * last_error text, + * last_attempt_at timestamptz, + * next_retry_at timestamptz, + * response_status int, + * created_at timestamptz not null default now() + * ) + * + * Indexes that would back the queries below: + * (webhook_id, created_at desc) - listing recent deliveries per webhook + * (next_retry_at) - retry worker scan + */ + +const crypto = require('crypto'); +const cache = require('../services/cache'); + +const RETRY_QUEUE_KEY = 'webhooks:retries'; +const RECENT_DELIVERIES_LIMIT = 100; + +function key(id) { + return `webhook_delivery:${id}`; +} + +function indexKey(webhookId) { + return `webhook:${webhookId}:deliveries`; +} + +function generateId() { + return `dlv_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`; +} + +async function create({ webhook_id, event_id, event_type }) { + const id = generateId(); + const now = new Date().toISOString(); + const record = { + id, + webhook_id, + event_id, + event_type, + status: 'pending', + attempts: 0, + last_error: null, + last_attempt_at: null, + next_retry_at: null, + response_status: null, + created_at: now, + }; + + const redis = cache.getClient(); + await cache.set(key(id), record); + await redis.zadd(indexKey(webhook_id), Date.now(), id); + await redis.zremrangebyrank(indexKey(webhook_id), 0, -(RECENT_DELIVERIES_LIMIT + 1)); + return record; +} + +async function findById(id) { + return cache.get(key(id)); +} + +async function update(id, patch) { + const existing = await cache.get(key(id)); + if (!existing) return null; + const next = { ...existing, ...patch, id: existing.id }; + await cache.set(key(id), next); + return next; +} + +async function listByWebhook(webhookId, limit = 50) { + const redis = cache.getClient(); + const ids = await redis.zrevrange(indexKey(webhookId), 0, Math.max(0, limit - 1)); + const records = await Promise.all(ids.map((id) => cache.get(key(id)))); + return records.filter(Boolean); +} + +async function scheduleRetry(deliveryId, nextRetryAtMs) { + const redis = cache.getClient(); + await redis.zadd(RETRY_QUEUE_KEY, nextRetryAtMs, deliveryId); +} + +async function popDueRetries(nowMs, max = 25) { + const redis = cache.getClient(); + const ids = await redis.zrangebyscore(RETRY_QUEUE_KEY, '-inf', nowMs, 'LIMIT', 0, max); + if (ids.length === 0) return []; + await redis.zrem(RETRY_QUEUE_KEY, ...ids); + return ids; +} + +async function cancelRetry(deliveryId) { + const redis = cache.getClient(); + await redis.zrem(RETRY_QUEUE_KEY, deliveryId); +} + +module.exports = { + create, + findById, + update, + listByWebhook, + scheduleRetry, + popDueRetries, + cancelRetry, +}; diff --git a/src/repositories/webhookRepository.js b/src/repositories/webhookRepository.js new file mode 100644 index 0000000..cb031a8 --- /dev/null +++ b/src/repositories/webhookRepository.js @@ -0,0 +1,114 @@ +'use strict'; + +/** + * Webhook repository. + * + * Schema mirrors the future PostgreSQL `webhooks` table so that swapping the + * Redis backing for a real DB only requires re-implementing this module: + * + * webhooks ( + * id text primary key, + * url text not null, + * events text[] not null, + * secret text not null, + * active boolean not null default true, + * description text, + * created_at timestamptz not null default now(), + * updated_at timestamptz not null default now() + * ) + */ + +const crypto = require('crypto'); +const cache = require('../services/cache'); + +const IDS_KEY = 'webhooks:ids'; + +function key(id) { + return `webhook:${id}`; +} + +function generateId() { + return `wh_${crypto.randomUUID().replace(/-/g, '').slice(0, 20)}`; +} + +function normalize(record) { + if (!record) return null; + return { + id: record.id, + url: record.url, + events: Array.isArray(record.events) ? [...record.events] : [], + secret: record.secret, + active: record.active !== false, + description: record.description || null, + created_at: record.created_at, + updated_at: record.updated_at, + }; +} + +async function create({ url, events, secret, description }) { + const id = generateId(); + const now = new Date().toISOString(); + const record = { + id, + url, + events, + secret, + active: true, + description: description || null, + created_at: now, + updated_at: now, + }; + const redis = cache.getClient(); + await cache.set(key(id), record); + await redis.sadd(IDS_KEY, id); + return normalize(record); +} + +async function findById(id) { + const record = await cache.get(key(id)); + return normalize(record); +} + +async function list() { + const redis = cache.getClient(); + const ids = await redis.smembers(IDS_KEY); + const records = await Promise.all(ids.map((id) => cache.get(key(id)))); + return records.filter(Boolean).map(normalize); +} + +async function listActiveForEvent(eventType, matcher) { + const all = await list(); + return all.filter((w) => w.active && matcher(w.events, eventType)); +} + +async function update(id, patch) { + const existing = await cache.get(key(id)); + if (!existing) return null; + const next = { + ...existing, + ...patch, + id: existing.id, + created_at: existing.created_at, + updated_at: new Date().toISOString(), + }; + await cache.set(key(id), next); + return normalize(next); +} + +async function remove(id) { + const redis = cache.getClient(); + const existing = await cache.get(key(id)); + if (!existing) return null; + await cache.del(key(id)); + await redis.srem(IDS_KEY, id); + return normalize(existing); +} + +module.exports = { + create, + findById, + list, + listActiveForEvent, + update, + remove, +}; diff --git a/test/helpers/cacheMock.js b/test/helpers/cacheMock.js new file mode 100644 index 0000000..e7f3f0c --- /dev/null +++ b/test/helpers/cacheMock.js @@ -0,0 +1,99 @@ +'use strict'; + +/** + * In-memory mock of the ioredis surface used by src/services/cache.js. + * Covers strings (used by cache.get/set/del), SETs, and sorted SETs. + */ +function createCacheMock() { + const store = new Map(); + const sets = new Map(); + const zsets = new Map(); + const counters = new Map(); + + function getSet(key) { + if (!sets.has(key)) sets.set(key, new Set()); + return sets.get(key); + } + function getZSet(key) { + if (!zsets.has(key)) zsets.set(key, new Map()); + return zsets.get(key); + } + + const redis = { + smembers: jest.fn(async (key) => [...(sets.get(key) || [])]), + sadd: jest.fn(async (key, val) => { getSet(key).add(val); }), + srem: jest.fn(async (key, val) => { sets.get(key)?.delete(val); }), + zadd: jest.fn(async (key, score, member) => { getZSet(key).set(member, Number(score)); }), + zrem: jest.fn(async (key, ...members) => { + const z = zsets.get(key); + if (!z) return; + for (const m of members) z.delete(m); + }), + zrevrange: jest.fn(async (key, start, stop) => { + const z = zsets.get(key); + if (!z) return []; + const sorted = [...z.entries()].sort((a, b) => b[1] - a[1]).map(([m]) => m); + return sorted.slice(start, stop + 1); + }), + zrangebyscore: jest.fn(async (key, min, max, ...rest) => { + const z = zsets.get(key); + if (!z) return []; + const minScore = min === '-inf' ? -Infinity : Number(min); + const maxScore = max === '+inf' ? Infinity : Number(max); + let sorted = [...z.entries()] + .filter(([, score]) => score >= minScore && score <= maxScore) + .sort((a, b) => a[1] - b[1]) + .map(([m]) => m); + const limitIdx = rest.indexOf('LIMIT'); + if (limitIdx !== -1) { + const offset = Number(rest[limitIdx + 1]); + const count = Number(rest[limitIdx + 2]); + sorted = sorted.slice(offset, offset + count); + } + return sorted; + }), + zremrangebyrank: jest.fn(async (key, start, stop) => { + const z = zsets.get(key); + if (!z) return; + const sortedAsc = [...z.entries()].sort((a, b) => a[1] - b[1]).map(([m]) => m); + const end = stop < 0 ? sortedAsc.length + stop : stop; + const begin = start < 0 ? sortedAsc.length + start : start; + for (let i = begin; i <= end && i < sortedAsc.length; i += 1) { + z.delete(sortedAsc[i]); + } + }), + incr: jest.fn(async (key) => { + const n = (counters.get(key) || 0) + 1; + counters.set(key, n); + return n; + }), + expire: jest.fn(async () => 1), + }; + + const cacheMock = { + getClient: () => redis, + isConnected: () => true, + get: jest.fn(async (key) => { + const v = store.get(key); + return v !== undefined ? JSON.parse(JSON.stringify(v)) : null; + }), + set: jest.fn(async (key, value) => { store.set(key, JSON.parse(JSON.stringify(value))); }), + del: jest.fn(async (key) => { store.delete(key); }), + disconnect: jest.fn(async () => {}), + }; + + function reset() { + store.clear(); + sets.clear(); + zsets.clear(); + counters.clear(); + Object.values(redis).forEach((fn) => fn.mockClear?.()); + cacheMock.get.mockClear(); + cacheMock.set.mockClear(); + cacheMock.del.mockClear(); + } + + return { cacheMock, redis, store, sets, zsets, counters, reset }; +} + +module.exports = { createCacheMock }; diff --git a/test/webhookRepository.test.js b/test/webhookRepository.test.js new file mode 100644 index 0000000..1897db6 --- /dev/null +++ b/test/webhookRepository.test.js @@ -0,0 +1,78 @@ +'use strict'; + +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const webhookRepo = require('../src/repositories/webhookRepository'); +const events = require('../src/services/webhookEvents'); + +beforeEach(() => reset()); + +describe('webhookRepository', () => { + test('create persists a webhook with generated id and active=true', async () => { + const w = await webhookRepo.create({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + expect(w.id).toMatch(/^wh_/); + expect(w.active).toBe(true); + expect(w.events).toEqual(['pool.assets_locked']); + }); + + test('findById returns the stored webhook', async () => { + const created = await webhookRepo.create({ + url: 'https://example.com/hook', + events: ['*'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const found = await webhookRepo.findById(created.id); + expect(found.id).toBe(created.id); + }); + + test('findById returns null when missing', async () => { + expect(await webhookRepo.findById('wh_nope')).toBeNull(); + }); + + test('list returns all created webhooks', async () => { + await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + await webhookRepo.create({ url: 'https://b.com', events: ['*'], secret: 'whsec_bbbbbbbbbbbbbbbb' }); + const all = await webhookRepo.list(); + expect(all).toHaveLength(2); + }); + + test('update merges patch and bumps updated_at', async () => { + const w = await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const updated = await webhookRepo.update(w.id, { active: false }); + expect(updated.active).toBe(false); + expect(updated.created_at).toBe(w.created_at); + expect(updated.updated_at >= w.updated_at).toBe(true); + }); + + test('remove deletes and returns the previous record', async () => { + const w = await webhookRepo.create({ url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const removed = await webhookRepo.remove(w.id); + expect(removed.id).toBe(w.id); + expect(await webhookRepo.list()).toHaveLength(0); + }); + + test('listActiveForEvent filters by subscription and active flag', async () => { + const a = await webhookRepo.create({ url: 'https://a.com', events: ['pool.assets_locked'], secret: 'whsec_aaaaaaaaaaaaaaaa' }); + const b = await webhookRepo.create({ url: 'https://b.com', events: ['pool.closed'], secret: 'whsec_bbbbbbbbbbbbbbbb' }); + const c = await webhookRepo.create({ url: 'https://c.com', events: ['*'], secret: 'whsec_cccccccccccccccc' }); + await webhookRepo.update(c.id, { active: false }); + + const result = await webhookRepo.listActiveForEvent('pool.assets_locked', events.matchesSubscription); + const ids = result.map((w) => w.id).sort(); + expect(ids).toEqual([a.id].sort()); + expect(ids).not.toContain(b.id); + expect(ids).not.toContain(c.id); + }); +}); From 5efb48268b5e7fde7d7687693ef00bc282e3bc75 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:52:34 -0600 Subject: [PATCH 3/8] feat: add webhook dispatcher with exponential backoff retries The dispatcher fans an event out to every active webhook subscribed to its type, persists a delivery record per webhook, signs the body with HMAC-SHA256, and retries failed deliveries up to WEBHOOK_MAX_ATTEMPTS. Retry decisions follow standard webhook semantics: network errors and 5xx/408/429 responses are retried with exponential backoff, while other 4xx responses are marked failed immediately so a misconfigured consumer cannot be retried into the ground. Retries are queued in a Redis ZSET so they survive process restarts. All webhook subsystem config knobs are introduced here so callers in later commits can rely on them. --- src/config.js | 16 ++ src/services/webhookDispatcher.js | 193 ++++++++++++++++++++++++ test/webhookDispatcher.test.js | 241 ++++++++++++++++++++++++++++++ 3 files changed, 450 insertions(+) create mode 100644 src/services/webhookDispatcher.js create mode 100644 test/webhookDispatcher.test.js diff --git a/src/config.js b/src/config.js index b99a1ac..e29d68d 100644 --- a/src/config.js +++ b/src/config.js @@ -29,4 +29,20 @@ module.exports = { .split(',') .map((o) => o.trim()) .filter(Boolean), + webhooks: { + maxAttempts: parseInt(process.env.WEBHOOK_MAX_ATTEMPTS, 10) || 3, + retryBaseMs: parseInt(process.env.WEBHOOK_RETRY_BASE_MS, 10) || 30000, + retryFactor: parseFloat(process.env.WEBHOOK_RETRY_FACTOR) || 2, + timeoutMs: parseInt(process.env.WEBHOOK_TIMEOUT_MS, 10) || 5000, + retryPollMs: parseInt(process.env.WEBHOOK_RETRY_POLL_MS, 10) || 5000, + retryBatchSize: parseInt(process.env.WEBHOOK_RETRY_BATCH, 10) || 25, + rateLimit: { + windowSeconds: parseInt(process.env.WEBHOOK_RATELIMIT_WINDOW, 10) || 60, + max: parseInt(process.env.WEBHOOK_RATELIMIT_MAX, 10) || 60, + }, + testRateLimit: { + windowSeconds: parseInt(process.env.WEBHOOK_TEST_RATELIMIT_WINDOW, 10) || 60, + max: parseInt(process.env.WEBHOOK_TEST_RATELIMIT_MAX, 10) || 5, + }, + }, }; diff --git a/src/services/webhookDispatcher.js b/src/services/webhookDispatcher.js new file mode 100644 index 0000000..45ce9ca --- /dev/null +++ b/src/services/webhookDispatcher.js @@ -0,0 +1,193 @@ +'use strict'; + +const axios = require('axios'); +const config = require('../config'); +const logger = require('../logger'); +const signature = require('./webhookSignature'); +const events = require('./webhookEvents'); +const webhookRepo = require('../repositories/webhookRepository'); +const deliveryRepo = require('../repositories/deliveryRepository'); + +const USER_AGENT = 'SmartDrop-Webhooks/1.0'; + +function backoffMs(attemptsCompleted) { + const base = config.webhooks.retryBaseMs; + const factor = config.webhooks.retryFactor; + return base * factor ** (attemptsCompleted - 1); +} + +function shouldRetry(responseStatus, networkError) { + if (networkError) return true; + if (responseStatus == null) return true; + if (responseStatus >= 500 && responseStatus < 600) return true; + if (responseStatus === 408 || responseStatus === 429) return true; + return false; +} + +function buildHeaders(secret, body, eventType, deliveryId) { + return { + 'Content-Type': 'application/json', + 'User-Agent': USER_AGENT, + 'X-SmartDrop-Event': eventType, + 'X-SmartDrop-Delivery': deliveryId, + 'X-SmartDrop-Signature': signature.sign(secret, body), + }; +} + +async function postOnce(url, headers, body) { + return axios.post(url, body, { + headers, + timeout: config.webhooks.timeoutMs, + transformRequest: [(data) => data], + validateStatus: () => true, + }); +} + +async function attempt(deliveryId) { + const delivery = await deliveryRepo.findById(deliveryId); + if (!delivery) { + logger.warn('Delivery missing, dropping retry', { delivery_id: deliveryId }); + return null; + } + if (delivery.status === 'success') return delivery; + + const webhook = await webhookRepo.findById(delivery.webhook_id); + if (!webhook || !webhook.active) { + return deliveryRepo.update(deliveryId, { + status: 'failed', + last_error: 'webhook missing or inactive', + last_attempt_at: new Date().toISOString(), + next_retry_at: null, + }); + } + + const payload = delivery.payload || { + event: delivery.event_type, + event_id: delivery.event_id, + delivery_id: delivery.id, + occurred_at: delivery.created_at, + }; + const body = JSON.stringify(payload); + const headers = buildHeaders(webhook.secret, body, delivery.event_type, delivery.id); + + const attempts = delivery.attempts + 1; + let responseStatus = null; + let networkError = null; + + try { + const res = await postOnce(webhook.url, headers, body); + responseStatus = res.status; + } catch (err) { + networkError = err.message || 'network error'; + } + + const succeeded = responseStatus != null && responseStatus >= 200 && responseStatus < 300; + const nowIso = new Date().toISOString(); + + if (succeeded) { + logger.info('Webhook delivered', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + status: responseStatus, + }); + return deliveryRepo.update(deliveryId, { + status: 'success', + attempts, + last_attempt_at: nowIso, + next_retry_at: null, + last_error: null, + response_status: responseStatus, + }); + } + + const errorMessage = networkError || `HTTP ${responseStatus}`; + const retryable = shouldRetry(responseStatus, Boolean(networkError)); + const hasAttemptsLeft = attempts < config.webhooks.maxAttempts; + + if (retryable && hasAttemptsLeft) { + const delayMs = backoffMs(attempts); + const nextRetryAt = new Date(Date.now() + delayMs).toISOString(); + await deliveryRepo.scheduleRetry(delivery.id, Date.now() + delayMs); + logger.warn('Webhook delivery failed, retry scheduled', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + error: errorMessage, + next_retry_at: nextRetryAt, + }); + return deliveryRepo.update(deliveryId, { + status: 'pending', + attempts, + last_attempt_at: nowIso, + next_retry_at: nextRetryAt, + last_error: errorMessage, + response_status: responseStatus, + }); + } + + logger.error('Webhook delivery failed permanently', { + delivery_id: delivery.id, + webhook_id: webhook.id, + attempts, + error: errorMessage, + }); + return deliveryRepo.update(deliveryId, { + status: 'failed', + attempts, + last_attempt_at: nowIso, + next_retry_at: null, + last_error: errorMessage, + response_status: responseStatus, + }); +} + +async function deliverToWebhook(webhook, eventType, eventId, payload) { + const delivery = await deliveryRepo.create({ + webhook_id: webhook.id, + event_id: eventId, + event_type: eventType, + }); + await deliveryRepo.update(delivery.id, { payload }); + return attempt(delivery.id); +} + +async function dispatch({ event_type: eventType, event_id: eventId, data }) { + if (!events.isKnownEvent(eventType)) { + logger.warn('Dispatch skipped, unknown event type', { event_type: eventType }); + return []; + } + if (!eventId || typeof eventId !== 'string') { + throw new Error('event_id is required to dispatch a webhook event'); + } + + const targets = await webhookRepo.listActiveForEvent(eventType, events.matchesSubscription); + if (targets.length === 0) return []; + + const occurredAt = new Date().toISOString(); + const payload = { + event: eventType, + event_id: eventId, + occurred_at: occurredAt, + data: data || {}, + }; + + return Promise.all( + targets.map((webhook) => deliverToWebhook(webhook, eventType, eventId, payload)) + ); +} + +async function sendTest(webhookId) { + const webhook = await webhookRepo.findById(webhookId); + if (!webhook) return null; + const eventType = 'pool.assets_locked'; + const payload = { + event: eventType, + event_id: `evt_test_${Date.now()}`, + occurred_at: new Date().toISOString(), + data: { test: true, message: 'This is a test delivery from SmartDrop' }, + }; + return deliverToWebhook(webhook, eventType, payload.event_id, payload); +} + +module.exports = { dispatch, attempt, sendTest, backoffMs, shouldRetry }; diff --git a/test/webhookDispatcher.test.js b/test/webhookDispatcher.test.js new file mode 100644 index 0000000..34e144a --- /dev/null +++ b/test/webhookDispatcher.test.js @@ -0,0 +1,241 @@ +'use strict'; + +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset, zsets } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const mockAxiosPost = jest.fn(); +jest.mock('axios', () => ({ post: (...args) => mockAxiosPost(...args) })); + +const dispatcher = require('../src/services/webhookDispatcher'); +const webhookRepo = require('../src/repositories/webhookRepository'); +const deliveryRepo = require('../src/repositories/deliveryRepository'); +const signature = require('../src/services/webhookSignature'); + +beforeEach(() => { + reset(); + mockAxiosPost.mockReset(); +}); + +async function createWebhook(overrides = {}) { + return webhookRepo.create({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_aaaaaaaaaaaaaaaa', + ...overrides, + }); +} + +describe('dispatcher delivery success', () => { + test('successful 200 marks delivery as success with attempts=1', async () => { + const w = await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_1', + data: { pool_id: 'p1' }, + }); + + expect(delivery.status).toBe('success'); + expect(delivery.attempts).toBe(1); + expect(delivery.response_status).toBe(200); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + + const [url, body, opts] = mockAxiosPost.mock.calls[0]; + expect(url).toBe(w.url); + expect(typeof body).toBe('string'); + const parsed = JSON.parse(body); + expect(parsed.event).toBe('pool.assets_locked'); + expect(parsed.event_id).toBe('evt_1'); + expect(parsed.data).toEqual({ pool_id: 'p1' }); + expect(opts.headers['X-SmartDrop-Signature']).toBe(signature.sign(w.secret, body)); + expect(opts.headers['X-SmartDrop-Event']).toBe('pool.assets_locked'); + }); +}); + +describe('dispatcher event-type filtering', () => { + test('only webhooks subscribed to the event receive a delivery', async () => { + await createWebhook({ url: 'https://a.com', events: ['pool.assets_locked'] }); + await createWebhook({ url: 'https://b.com', events: ['pool.closed'] }); + await createWebhook({ url: 'https://c.com', events: ['*'] }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + + const results = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_42', + }); + + expect(results).toHaveLength(2); + const urls = mockAxiosPost.mock.calls.map((c) => c[0]).sort(); + expect(urls).toEqual(['https://a.com', 'https://c.com']); + }); + + test('inactive webhooks are skipped', async () => { + const w = await createWebhook(); + await webhookRepo.update(w.id, { active: false }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + + const results = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_skip', + }); + expect(results).toHaveLength(0); + expect(mockAxiosPost).not.toHaveBeenCalled(); + }); + + test('unknown event types do not dispatch', async () => { + await createWebhook(); + const results = await dispatcher.dispatch({ + event_type: 'foo.bar', + event_id: 'evt_x', + }); + expect(results).toEqual([]); + expect(mockAxiosPost).not.toHaveBeenCalled(); + }); +}); + +describe('dispatcher retry semantics', () => { + test('5xx schedules a retry and keeps status pending', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 503 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_retry', + }); + + expect(delivery.status).toBe('pending'); + expect(delivery.attempts).toBe(1); + expect(delivery.next_retry_at).not.toBeNull(); + expect(delivery.last_error).toBe('HTTP 503'); + const queued = zsets.get('webhooks:retries'); + expect(queued.size).toBe(1); + expect([...queued.keys()][0]).toBe(delivery.id); + }); + + test('4xx (non-429) does NOT retry and marks failed', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 400 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_4xx', + }); + + expect(delivery.status).toBe('failed'); + expect(delivery.attempts).toBe(1); + expect(delivery.next_retry_at).toBeNull(); + const queued = zsets.get('webhooks:retries') || new Map(); + expect(queued.size).toBe(0); + }); + + test('network errors trigger a retry', async () => { + await createWebhook(); + mockAxiosPost.mockRejectedValueOnce(new Error('ECONNREFUSED')); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_net', + }); + + expect(delivery.status).toBe('pending'); + expect(delivery.last_error).toBe('ECONNREFUSED'); + expect(delivery.next_retry_at).not.toBeNull(); + }); + + test('after maxAttempts failures the delivery is permanently failed', async () => { + await createWebhook(); + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_max', + }); + + mockAxiosPost.mockResolvedValue({ status: 500 }); + const second = await dispatcher.attempt(delivery.id); + const third = await dispatcher.attempt(delivery.id); + + expect(third.status).toBe('failed'); + expect(third.attempts).toBe(3); + expect(third.next_retry_at).toBeNull(); + expect(second.status).toBe('pending'); + }); + + test('429 is treated as retryable', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 429 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_429', + }); + expect(delivery.status).toBe('pending'); + expect(delivery.next_retry_at).not.toBeNull(); + }); +}); + +describe('exponential backoff', () => { + test('delay grows by retryFactor each attempt', () => { + const d1 = dispatcher.backoffMs(1); + const d2 = dispatcher.backoffMs(2); + const d3 = dispatcher.backoffMs(3); + expect(d2).toBeGreaterThan(d1); + expect(d3).toBeGreaterThan(d2); + }); +}); + +describe('shouldRetry decision table', () => { + test('retries on network error', () => expect(dispatcher.shouldRetry(null, true)).toBe(true)); + test('retries on 500', () => expect(dispatcher.shouldRetry(500, false)).toBe(true)); + test('retries on 503', () => expect(dispatcher.shouldRetry(503, false)).toBe(true)); + test('retries on 408', () => expect(dispatcher.shouldRetry(408, false)).toBe(true)); + test('retries on 429', () => expect(dispatcher.shouldRetry(429, false)).toBe(true)); + test('does not retry on 400', () => expect(dispatcher.shouldRetry(400, false)).toBe(false)); + test('does not retry on 404', () => expect(dispatcher.shouldRetry(404, false)).toBe(false)); + test('does not retry on 200', () => expect(dispatcher.shouldRetry(200, false)).toBe(false)); +}); + +describe('sendTest', () => { + test('sends a test event to a specific webhook', async () => { + const w = await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + const delivery = await dispatcher.sendTest(w.id); + expect(delivery.status).toBe('success'); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + const body = JSON.parse(mockAxiosPost.mock.calls[0][1]); + expect(body.data.test).toBe(true); + }); + + test('returns null for unknown webhook', async () => { + const result = await dispatcher.sendTest('wh_unknown'); + expect(result).toBeNull(); + }); +}); + +describe('delivery payload persistence', () => { + test('payload is persisted so retries do not lose event data', async () => { + await createWebhook(); + mockAxiosPost.mockResolvedValueOnce({ status: 500 }); + + const [delivery] = await dispatcher.dispatch({ + event_type: 'pool.assets_locked', + event_id: 'evt_payload', + data: { important: 'value' }, + }); + + const persisted = await deliveryRepo.findById(delivery.id); + expect(persisted.payload.data.important).toBe('value'); + + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + const retried = await dispatcher.attempt(delivery.id); + expect(retried.status).toBe('success'); + const body = JSON.parse(mockAxiosPost.mock.calls[1][1]); + expect(body.data.important).toBe('value'); + }); +}); From de326605f82ca73f6860238c8edc56990b4c8b19 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:54:01 -0600 Subject: [PATCH 4/8] feat: add background worker that drains the webhook retry queue Poll the Redis ZSET of due delivery retries on a configurable interval and re-invoke the dispatcher for each one. A simple running flag avoids overlapping ticks if a previous batch is still in flight. --- src/jobs/webhookRetryWorker.js | 48 ++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 src/jobs/webhookRetryWorker.js diff --git a/src/jobs/webhookRetryWorker.js b/src/jobs/webhookRetryWorker.js new file mode 100644 index 0000000..924381e --- /dev/null +++ b/src/jobs/webhookRetryWorker.js @@ -0,0 +1,48 @@ +'use strict'; + +const config = require('../config'); +const logger = require('../logger'); +const dispatcher = require('../services/webhookDispatcher'); +const deliveryRepo = require('../repositories/deliveryRepository'); + +let timer = null; +let running = false; + +async function tick() { + if (running) return; + running = true; + try { + const ids = await deliveryRepo.popDueRetries(Date.now(), config.webhooks.retryBatchSize); + if (ids.length === 0) return; + logger.info('Processing webhook retries', { count: ids.length }); + for (const id of ids) { + try { + await dispatcher.attempt(id); + } catch (err) { + logger.error('Retry attempt failed', { delivery_id: id, error: err.message }); + } + } + } catch (err) { + logger.error('Webhook retry worker tick failed', { error: err.message }); + } finally { + running = false; + } +} + +function start() { + if (timer) return; + const interval = config.webhooks.retryPollMs; + timer = setInterval(tick, interval); + if (typeof timer.unref === 'function') timer.unref(); + logger.info('Webhook retry worker started', { intervalMs: interval }); +} + +function stop() { + if (timer) { + clearInterval(timer); + timer = null; + logger.info('Webhook retry worker stopped'); + } +} + +module.exports = { start, stop, tick }; From e7ec548e9e5f2ebed3586e6343c8c7b407e0640b Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:54:25 -0600 Subject: [PATCH 5/8] feat: add Redis-backed fixed-window rate limit middleware Generic per-IP limiter built on INCR + EXPIRE so it can be reused by any route by tagging it with a keyPrefix. The middleware fails open if Redis is unreachable so a cache outage cannot lock users out of management endpoints, and surfaces standard X-RateLimit-* headers on every response. --- src/middleware/rateLimit.js | 50 +++++++++++++++++++++++++++++++++++++ test/rateLimit.test.js | 49 ++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/middleware/rateLimit.js create mode 100644 test/rateLimit.test.js diff --git a/src/middleware/rateLimit.js b/src/middleware/rateLimit.js new file mode 100644 index 0000000..f315ed9 --- /dev/null +++ b/src/middleware/rateLimit.js @@ -0,0 +1,50 @@ +'use strict'; + +const cache = require('../services/cache'); +const logger = require('../logger'); + +/** + * Fixed-window rate limiter backed by Redis INCR + EXPIRE. + * Fails open if Redis is unreachable so a cache outage cannot lock out users. + */ +function buildRateLimit({ windowSeconds, max, keyPrefix }) { + if (!Number.isFinite(windowSeconds) || windowSeconds <= 0) { + throw new Error('windowSeconds must be a positive number'); + } + if (!Number.isFinite(max) || max <= 0) { + throw new Error('max must be a positive number'); + } + if (!keyPrefix || typeof keyPrefix !== 'string') { + throw new Error('keyPrefix is required'); + } + + return async function rateLimit(req, res, next) { + const identifier = req.ip || req.connection?.remoteAddress || 'unknown'; + const bucket = Math.floor(Date.now() / 1000 / windowSeconds); + const key = `ratelimit:${keyPrefix}:${identifier}:${bucket}`; + + try { + const redis = cache.getClient(); + const count = await redis.incr(key); + if (count === 1) { + await redis.expire(key, windowSeconds); + } + const remaining = Math.max(0, max - count); + res.setHeader('X-RateLimit-Limit', String(max)); + res.setHeader('X-RateLimit-Remaining', String(remaining)); + res.setHeader('X-RateLimit-Reset', String((bucket + 1) * windowSeconds)); + if (count > max) { + return res.status(429).json({ + error: 'Too many requests', + message: `Rate limit of ${max} requests per ${windowSeconds}s exceeded`, + }); + } + return next(); + } catch (err) { + logger.warn('Rate limit fail-open due to cache error', { error: err.message }); + return next(); + } + }; +} + +module.exports = buildRateLimit; diff --git a/test/rateLimit.test.js b/test/rateLimit.test.js new file mode 100644 index 0000000..629409e --- /dev/null +++ b/test/rateLimit.test.js @@ -0,0 +1,49 @@ +'use strict'; + +const express = require('express'); +const request = require('supertest'); +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const buildRateLimit = require('../src/middleware/rateLimit'); + +function buildApp(limiter) { + const app = express(); + app.use(limiter); + app.get('/test', (_req, res) => res.json({ ok: true })); + return app; +} + +beforeEach(() => reset()); + +describe('rateLimit middleware', () => { + test('allows requests under the limit and sets rate-limit headers', async () => { + const app = buildApp(buildRateLimit({ windowSeconds: 60, max: 3, keyPrefix: 't' })); + const r1 = await request(app).get('/test'); + expect(r1.status).toBe(200); + expect(r1.headers['x-ratelimit-limit']).toBe('3'); + expect(r1.headers['x-ratelimit-remaining']).toBe('2'); + }); + + test('returns 429 once the limit is exceeded', async () => { + const app = buildApp(buildRateLimit({ windowSeconds: 60, max: 2, keyPrefix: 'lim' })); + await request(app).get('/test'); + await request(app).get('/test'); + const blocked = await request(app).get('/test'); + expect(blocked.status).toBe(429); + expect(blocked.body.error).toBe('Too many requests'); + }); + + test('throws when configured with invalid options', () => { + expect(() => buildRateLimit({ windowSeconds: 0, max: 10, keyPrefix: 'x' })).toThrow(); + expect(() => buildRateLimit({ windowSeconds: 60, max: 0, keyPrefix: 'x' })).toThrow(); + expect(() => buildRateLimit({ windowSeconds: 60, max: 10 })).toThrow(); + }); +}); From c24ae9a58351ab60337d4e09d5bad15c6ced64d6 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:54:34 -0600 Subject: [PATCH 6/8] feat: expose webhook management REST API Add CRUD endpoints for webhook registration, a dedicated test-delivery endpoint (rate-limited tighter than mgmt to prevent abuse as an outbound HTTP cannon) and a deliveries listing endpoint that powers the admin dashboard view of webhook health. The secret is returned in plaintext only on creation and never echoed back from list/get; subsequent reads only expose a short preview. --- src/routes/webhooks.js | 198 +++++++++++++++++++++++++++++++++++ test/webhooks.routes.test.js | 177 +++++++++++++++++++++++++++++++ 2 files changed, 375 insertions(+) create mode 100644 src/routes/webhooks.js create mode 100644 test/webhooks.routes.test.js diff --git a/src/routes/webhooks.js b/src/routes/webhooks.js new file mode 100644 index 0000000..b6d02a0 --- /dev/null +++ b/src/routes/webhooks.js @@ -0,0 +1,198 @@ +'use strict'; + +const express = require('express'); +const logger = require('../logger'); +const config = require('../config'); +const webhookRepo = require('../repositories/webhookRepository'); +const deliveryRepo = require('../repositories/deliveryRepository'); +const dispatcher = require('../services/webhookDispatcher'); +const signatureService = require('../services/webhookSignature'); +const events = require('../services/webhookEvents'); +const buildRateLimit = require('../middleware/rateLimit'); + +const router = express.Router(); + +const manageLimit = buildRateLimit({ + windowSeconds: config.webhooks.rateLimit.windowSeconds, + max: config.webhooks.rateLimit.max, + keyPrefix: 'webhooks', +}); + +const testLimit = buildRateLimit({ + windowSeconds: config.webhooks.testRateLimit.windowSeconds, + max: config.webhooks.testRateLimit.max, + keyPrefix: 'webhooks_test', +}); + +router.use('/webhooks', manageLimit); + +function isValidUrl(str) { + try { + const u = new URL(str); + return u.protocol === 'http:' || u.protocol === 'https:'; + } catch { + return false; + } +} + +function validateCreate(body) { + if (!body || typeof body !== 'object') return 'body must be an object'; + const { url, events: subscribedEvents, secret, description } = body; + + if (!url || !isValidUrl(url)) { + return 'url must be a valid http(s) URL'; + } + if (!events.isValidSubscription(subscribedEvents)) { + return `events must be a non-empty array of: ${events.ALL_EVENTS.join(', ')} or "*"`; + } + if (secret !== undefined) { + if (typeof secret !== 'string' || secret.length < 16) { + return 'secret must be a string of at least 16 characters'; + } + } + if (description !== undefined && typeof description !== 'string') { + return 'description must be a string'; + } + return null; +} + +function publicView(webhook) { + if (!webhook) return null; + return { + id: webhook.id, + url: webhook.url, + events: webhook.events, + active: webhook.active, + description: webhook.description, + created_at: webhook.created_at, + updated_at: webhook.updated_at, + secret_preview: webhook.secret ? `${webhook.secret.slice(0, 10)}…` : null, + }; +} + +router.post('/webhooks', async (req, res) => { + try { + const validationError = validateCreate(req.body); + if (validationError) { + return res.status(400).json({ error: 'Validation error', message: validationError }); + } + + const secret = req.body.secret || signatureService.generateSecret(); + const webhook = await webhookRepo.create({ + url: req.body.url, + events: req.body.events, + secret, + description: req.body.description, + }); + + return res.status(201).json({ + ...publicView(webhook), + secret, + secret_warning: 'Store this secret now — it will not be shown again in plaintext.', + }); + } catch (err) { + logger.error('Create webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/webhooks', async (_req, res) => { + try { + const webhooks = await webhookRepo.list(); + return res.json({ webhooks: webhooks.map(publicView) }); + } catch (err) { + logger.error('List webhooks error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/webhooks/:id', async (req, res) => { + try { + const webhook = await webhookRepo.findById(req.params.id); + if (!webhook) return res.status(404).json({ error: 'Webhook not found' }); + return res.json(publicView(webhook)); + } catch (err) { + logger.error('Get webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.patch('/webhooks/:id', async (req, res) => { + try { + const patch = {}; + if (req.body.url !== undefined) { + if (!isValidUrl(req.body.url)) { + return res.status(400).json({ error: 'Validation error', message: 'url must be a valid http(s) URL' }); + } + patch.url = req.body.url; + } + if (req.body.events !== undefined) { + if (!events.isValidSubscription(req.body.events)) { + return res.status(400).json({ error: 'Validation error', message: 'events invalid' }); + } + patch.events = req.body.events; + } + if (req.body.active !== undefined) { + if (typeof req.body.active !== 'boolean') { + return res.status(400).json({ error: 'Validation error', message: 'active must be boolean' }); + } + patch.active = req.body.active; + } + if (req.body.description !== undefined) { + if (typeof req.body.description !== 'string') { + return res.status(400).json({ error: 'Validation error', message: 'description must be a string' }); + } + patch.description = req.body.description; + } + + const updated = await webhookRepo.update(req.params.id, patch); + if (!updated) return res.status(404).json({ error: 'Webhook not found' }); + return res.json(publicView(updated)); + } catch (err) { + logger.error('Update webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.delete('/webhooks/:id', async (req, res) => { + try { + const deleted = await webhookRepo.remove(req.params.id); + if (!deleted) return res.status(404).json({ error: 'Webhook not found' }); + return res.json({ deleted: true, id: req.params.id }); + } catch (err) { + logger.error('Delete webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.post('/webhooks/:id/test', testLimit, async (req, res) => { + try { + const delivery = await dispatcher.sendTest(req.params.id); + if (!delivery) return res.status(404).json({ error: 'Webhook not found' }); + return res.status(202).json({ + delivery_id: delivery.id, + status: delivery.status, + attempts: delivery.attempts, + response_status: delivery.response_status, + last_error: delivery.last_error, + }); + } catch (err) { + logger.error('Test webhook error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +router.get('/webhooks/:id/deliveries', async (req, res) => { + try { + const webhook = await webhookRepo.findById(req.params.id); + if (!webhook) return res.status(404).json({ error: 'Webhook not found' }); + const limit = Math.min(parseInt(req.query.limit, 10) || 50, 100); + const deliveries = await deliveryRepo.listByWebhook(req.params.id, limit); + return res.json({ deliveries }); + } catch (err) { + logger.error('List deliveries error', { error: err.message }); + return res.status(500).json({ error: 'Internal server error' }); + } +}); + +module.exports = router; diff --git a/test/webhooks.routes.test.js b/test/webhooks.routes.test.js new file mode 100644 index 0000000..6552ddd --- /dev/null +++ b/test/webhooks.routes.test.js @@ -0,0 +1,177 @@ +'use strict'; + +const express = require('express'); +const request = require('supertest'); +const { createCacheMock } = require('./helpers/cacheMock'); + +const mockHelper = createCacheMock(); +const { reset } = mockHelper; + +jest.mock('../src/services/cache', () => mockHelper.cacheMock); +jest.mock('../src/logger', () => ({ + info: jest.fn(), warn: jest.fn(), error: jest.fn(), debug: jest.fn(), +})); + +const mockAxiosPost = jest.fn(); +jest.mock('axios', () => ({ post: (...args) => mockAxiosPost(...args) })); + +const webhooksRouter = require('../src/routes/webhooks'); + +function buildApp() { + const app = express(); + app.use(express.json()); + app.use('/api/v1', webhooksRouter); + return app; +} + +beforeEach(() => { + reset(); + mockAxiosPost.mockReset(); +}); + +describe('POST /api/v1/webhooks', () => { + const app = buildApp(); + + test('creates a webhook with valid input', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ + url: 'https://example.com/hook', + events: ['pool.assets_locked'], + secret: 'whsec_user_supplied_secret_long_enough', + }); + expect(res.status).toBe(201); + expect(res.body.id).toMatch(/^wh_/); + expect(res.body.events).toEqual(['pool.assets_locked']); + expect(res.body.secret).toBe('whsec_user_supplied_secret_long_enough'); + expect(res.body.secret_warning).toMatch(/Store this secret/); + }); + + test('generates a secret when none provided', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['*'] }); + expect(res.status).toBe(201); + expect(res.body.secret).toMatch(/^whsec_[0-9a-f]+$/); + }); + + test('rejects invalid url', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'not-a-url', events: ['*'] }); + expect(res.status).toBe(400); + }); + + test('rejects unknown event types', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['totally.fake'] }); + expect(res.status).toBe(400); + }); + + test('rejects too-short secret', async () => { + const res = await request(app) + .post('/api/v1/webhooks') + .send({ url: 'https://example.com/hook', events: ['*'], secret: 'short' }); + expect(res.status).toBe(400); + }); +}); + +describe('GET /api/v1/webhooks', () => { + const app = buildApp(); + + test('lists registered webhooks without leaking full secrets', async () => { + await request(app).post('/api/v1/webhooks').send({ + url: 'https://a.com', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const res = await request(app).get('/api/v1/webhooks'); + expect(res.status).toBe(200); + expect(res.body.webhooks).toHaveLength(1); + expect(res.body.webhooks[0].secret_preview).toMatch(/^whsec_/); + expect(res.body.webhooks[0]).not.toHaveProperty('secret'); + }); +}); + +describe('GET /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('returns 404 for unknown webhook', async () => { + const res = await request(app).get('/api/v1/webhooks/wh_nope'); + expect(res.status).toBe(404); + }); +}); + +describe('DELETE /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('deletes a registered webhook', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const del = await request(app).delete(`/api/v1/webhooks/${created.body.id}`); + expect(del.status).toBe(200); + expect(del.body.deleted).toBe(true); + + const list = await request(app).get('/api/v1/webhooks'); + expect(list.body.webhooks).toHaveLength(0); + }); + + test('returns 404 when deleting unknown id', async () => { + const res = await request(app).delete('/api/v1/webhooks/wh_nope'); + expect(res.status).toBe(404); + }); +}); + +describe('PATCH /api/v1/webhooks/:id', () => { + const app = buildApp(); + + test('updates active status', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + const res = await request(app) + .patch(`/api/v1/webhooks/${created.body.id}`) + .send({ active: false }); + expect(res.status).toBe(200); + expect(res.body.active).toBe(false); + }); +}); + +describe('POST /api/v1/webhooks/:id/test', () => { + const app = buildApp(); + + test('sends a test delivery and returns delivery summary', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + mockAxiosPost.mockResolvedValueOnce({ status: 200 }); + + const res = await request(app).post(`/api/v1/webhooks/${created.body.id}/test`); + expect(res.status).toBe(202); + expect(res.body.delivery_id).toMatch(/^dlv_/); + expect(res.body.status).toBe('success'); + expect(mockAxiosPost).toHaveBeenCalledTimes(1); + }); + + test('returns 404 when webhook does not exist', async () => { + const res = await request(app).post('/api/v1/webhooks/wh_nope/test'); + expect(res.status).toBe(404); + }); +}); + +describe('GET /api/v1/webhooks/:id/deliveries', () => { + const app = buildApp(); + + test('lists deliveries for a webhook', async () => { + const created = await request(app).post('/api/v1/webhooks').send({ + url: 'https://example.com/hook', events: ['*'], secret: 'whsec_aaaaaaaaaaaaaaaa', + }); + mockAxiosPost.mockResolvedValue({ status: 200 }); + await request(app).post(`/api/v1/webhooks/${created.body.id}/test`); + + const res = await request(app).get(`/api/v1/webhooks/${created.body.id}/deliveries`); + expect(res.status).toBe(200); + expect(Array.isArray(res.body.deliveries)).toBe(true); + expect(res.body.deliveries.length).toBeGreaterThan(0); + }); +}); From 56cf31ed6890cbafe8b73a8f64ec0e13ae957545 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:54:40 -0600 Subject: [PATCH 7/8] feat: mount webhook router and start retry worker on boot Wire the webhook subsystem into the Express bootstrap and shut the retry worker down cleanly on SIGTERM/SIGINT alongside the existing price job. --- src/index.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/index.js b/src/index.js index 07299e0..2f3f3c3 100644 --- a/src/index.js +++ b/src/index.js @@ -7,6 +7,8 @@ const priceRefreshJob = require('./jobs/priceRefresh'); const buildCorsMiddleware = require('./middleware/cors'); const pricesRouter = require('./routes/prices'); const alertsRouter = require('./routes/alerts'); +const webhooksRouter = require('./routes/webhooks'); +const webhookRetryWorker = require('./jobs/webhookRetryWorker'); const app = express(); @@ -26,6 +28,7 @@ app.get('/health', (req, res) => { app.use('/api/v1', pricesRouter); app.use('/api/v1', alertsRouter); +app.use('/api/v1', webhooksRouter); app.use((err, req, res, _next) => { const status = err.status || 500; @@ -36,11 +39,13 @@ app.use((err, req, res, _next) => { const server = app.listen(config.port, () => { logger.info(`SmartDrop backend running on port ${config.port}`); priceRefreshJob.start(); + webhookRetryWorker.start(); }); process.on('SIGTERM', async () => { logger.info('SIGTERM received, shutting down'); priceRefreshJob.stop(); + webhookRetryWorker.stop(); server.close(); await cache.disconnect(); process.exit(0); @@ -49,6 +54,7 @@ process.on('SIGTERM', async () => { process.on('SIGINT', async () => { logger.info('SIGINT received, shutting down'); priceRefreshJob.stop(); + webhookRetryWorker.stop(); server.close(); await cache.disconnect(); process.exit(0); From 91f5dd909717cfd2b58b58c8393227a6aed8d722 Mon Sep 17 00:00:00 2001 From: mariocodecr Date: Thu, 25 Jun 2026 21:54:53 -0600 Subject: [PATCH 8/8] docs: document webhook integration, signature verification and retry semantics Add a full Webhooks section to the README: supported event types, the public REST API surface, the outgoing request shape (headers + body), a Node.js HMAC verification example, retry/backoff behavior, the repository-pattern note explaining the PG migration path, rate-limit defaults and the new env vars. --- README.md | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/README.md b/README.md index 986083f..a16e78b 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,16 @@ cp .env.example .env | `PRICE_STALE_THRESHOLD` | Stale threshold in minutes | 5 | No | | `PRICE_ANOMALY_THRESHOLD` | Anomaly detection threshold % | 10 | No | | `LOG_LEVEL` | Logging level | info | No | +| `WEBHOOK_MAX_ATTEMPTS` | Total delivery attempts (initial + retries) | 3 | No | +| `WEBHOOK_RETRY_BASE_MS` | Base backoff between retries (ms) | 30000 | No | +| `WEBHOOK_RETRY_FACTOR` | Exponential backoff multiplier | 2 | No | +| `WEBHOOK_TIMEOUT_MS` | HTTP timeout per delivery attempt | 5000 | No | +| `WEBHOOK_RETRY_POLL_MS` | Retry worker poll interval | 5000 | No | +| `WEBHOOK_RETRY_BATCH` | Max retries processed per tick | 25 | No | +| `WEBHOOK_RATELIMIT_WINDOW` | Mgmt rate-limit window (s) | 60 | No | +| `WEBHOOK_RATELIMIT_MAX` | Mgmt rate-limit max requests / window / IP | 60 | No | +| `WEBHOOK_TEST_RATELIMIT_WINDOW` | Test endpoint rate-limit window (s) | 60 | No | +| `WEBHOOK_TEST_RATELIMIT_MAX` | Test endpoint rate-limit max / window / IP | 5 | No | ### Running @@ -170,6 +180,119 @@ curl http://localhost:3000/api/v1/prices/XLM/refresh curl http://localhost:3000/health ``` +## Webhooks + +Register endpoints that receive HTTP POST callbacks when SmartDrop indexes farming/pool events. + +### Supported event types + +| Event | Description | +|-------|-------------| +| `pool.created` | A new farming pool was created on-chain | +| `pool.assets_locked` | Assets were locked into a pool | +| `pool.assets_unlocked` | Assets were unlocked from a pool | +| `pool.rewards_distributed` | Pool distributed rewards to participants | +| `pool.closed` | Pool was closed | +| `price.alert` | Existing price-alert event | +| `*` | Wildcard — subscribe to every known event | + +### API + +#### Register a webhook +``` +POST /api/v1/webhooks +Content-Type: application/json + +{ + "url": "https://example.com/webhooks/smartdrop", + "events": ["pool.assets_locked", "pool.rewards_distributed"], + "secret": "whsec_at_least_16_chars", // optional, generated if omitted + "description": "Production webhook" // optional +} +``` + +The response includes the secret in plaintext **exactly once**. Subsequent reads only return `secret_preview`. + +#### Manage webhooks +``` +GET /api/v1/webhooks # list +GET /api/v1/webhooks/:id # fetch one +PATCH /api/v1/webhooks/:id # update url / events / active / description +DELETE /api/v1/webhooks/:id # remove +``` + +#### Test endpoint +``` +POST /api/v1/webhooks/:id/test +``` +Sends a synthetic `pool.assets_locked` payload to the registered URL and returns the resulting delivery summary. Limited to 5 calls/min/IP by default. + +#### Inspect deliveries (admin dashboard feed) +``` +GET /api/v1/webhooks/:id/deliveries?limit=50 +``` +Returns the most recent delivery records: `status` (`success | pending | failed`), `attempts`, `response_status`, `last_error`, `next_retry_at`. + +### Outgoing request shape + +Every delivery is a JSON POST with the following headers: + +| Header | Value | +|--------|-------| +| `Content-Type` | `application/json` | +| `User-Agent` | `SmartDrop-Webhooks/1.0` | +| `X-SmartDrop-Event` | event type (e.g. `pool.assets_locked`) | +| `X-SmartDrop-Delivery` | unique delivery id (`dlv_…`) | +| `X-SmartDrop-Signature` | `sha256=` | + +Body: +```json +{ + "event": "pool.assets_locked", + "event_id": "evt_…", + "occurred_at": "2026-06-25T12:00:00.000Z", + "data": { "...": "event-specific fields" } +} +``` + +### Verifying the signature (Node.js) + +```js +const crypto = require('crypto'); + +function verifySmartDrop(req, secret) { + const provided = req.header('X-SmartDrop-Signature') || ''; + const expected = 'sha256=' + crypto + .createHmac('sha256', secret) + .update(req.rawBody) // verify against the RAW body, not re-stringified JSON + .digest('hex'); + const a = Buffer.from(provided); + const b = Buffer.from(expected); + return a.length === b.length && crypto.timingSafeEqual(a, b); +} +``` + +Express tip: capture the raw body via `express.json({ verify: (req, _res, buf) => { req.rawBody = buf.toString(); } })` so the HMAC matches byte-for-byte. + +### Retry & failure semantics + +- Up to `WEBHOOK_MAX_ATTEMPTS` (default 3) total attempts per event. +- Retries are scheduled in Redis and processed by a background worker, so retries survive process restarts. +- Backoff is exponential: `base * factor^(attempts-1)` (default 30s → 60s → 120s). +- **Retryable**: network errors, HTTP 5xx, 408, 429. +- **Not retried**: HTTP 4xx (except 408/429). These are marked `failed` immediately so a misconfigured consumer cannot be retried into the ground. +- Each delivery is logged in `webhook_deliveries` (Redis-backed today, drop-in PG migration documented in `src/repositories/deliveryRepository.js`). + +### Storage model + +The current implementation stores webhooks and delivery logs in Redis behind a repository abstraction. The repository files document the equivalent PostgreSQL schema verbatim — migrating to PG is a matter of swapping the repository implementation only; no caller code changes. + +### Rate limiting + +- Management endpoints under `/api/v1/webhooks`: 60 req/min/IP (configurable). +- `/test` endpoint: 5 req/min/IP (configurable) — prevents using SmartDrop as an outbound HTTP cannon. +- The limiter fails **open** if Redis is unreachable so a cache outage does not lock you out of management calls. + ## Error Handling The API returns appropriate HTTP status codes: